Asyncronous task queue

I have one thread that delegates tasks over to another thread, which runs those tasks in a queue. These tasks are blocking tasks (database), that I want to be run in the correct order, first in first out. Currently using BlockingQueue, but when testing it using junit, it can freeze the whole test. That's because after each test, this thread is waited on until it finishes. I have not found a good way to enable/disable the queue How would one do this properly?
33 Replies
JavaBot
JavaBot3mo ago
This post has been reserved for your question.
Hey @Thorinwasher! Please use /close or the Close Post button above when your problem is solved. Please remember to follow the help guidelines. This post will be automatically closed after 300 minutes of inactivity.
TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here.
dan1st
dan1st3mo ago
Can you show the relevant code? Just the code that's necessary to reproduce the issue
Thorinwasher
ThorinwasherOP3mo ago
Sure, don't laugh at it though 🤣
dan1st
dan1st3mo ago
And if possible, please also show where exactly the code blocks the tests - Is it somewhere during the test or after running the tests?
Thorinwasher
ThorinwasherOP3mo ago
Oh, too large...
dan1st
dan1st3mo ago
I am fairly sure your code is better than the code of many other people - after all you are writing tests you can also try to upload the files
Thorinwasher
ThorinwasherOP3mo ago
Good point, I'm not educated enough in concurency though Ah
dan1st
dan1st3mo ago
or ideally only include the relevant code
Thorinwasher
ThorinwasherOP3mo ago
let me do that then
dan1st
dan1st3mo ago
concurrency is hard
dan1st
dan1st3mo ago
The most complicated chapter of the JVM specification (which specifies how JVMs have to work) is pretty much about what should happen in multithreaded environments
Thorinwasher
ThorinwasherOP3mo ago
I think you need more info than that though
dan1st
dan1st3mo ago
Can you show the test and where it blocks?
Thorinwasher
ThorinwasherOP3mo ago
Sure, this is one test that blocks everything, but that relates to the test that ran before it It's quite layered though
@Test
void portalInvalidBlockPlaceTest() {
Location bottomLeft = new Location(world, 0, 7, 0);
Location insidePortal = new Location(world, 0, 9, 0);
Block signBlock = PortalBlockGenerator.generatePortal(bottomLeft);
Block irisBlock = new Location(world, 1, 9, 0).getBlock();


((Directional) signBlock.getBlockData()).setFacing(BlockFace.SOUTH);
blockEventListener
.onSignChange(new SignChangeEvent(signBlock, player, new String[]{"test", "", CUSTOM_NETNAME, ""}));

BlockPlaceEvent event = new BlockPlaceEvent(irisBlock, irisBlock.getState(), irisBlock,
new ItemStack(Material.ANDESITE), player, false);
blockEventListener.onBlockPlace(event);
blockEventListener.onBlockBreak(new BlockBreakEvent(insidePortal.getBlock(), player));
Assertions.assertTrue(event.isCancelled());
}
@Test
void portalInvalidBlockPlaceTest() {
Location bottomLeft = new Location(world, 0, 7, 0);
Location insidePortal = new Location(world, 0, 9, 0);
Block signBlock = PortalBlockGenerator.generatePortal(bottomLeft);
Block irisBlock = new Location(world, 1, 9, 0).getBlock();


((Directional) signBlock.getBlockData()).setFacing(BlockFace.SOUTH);
blockEventListener
.onSignChange(new SignChangeEvent(signBlock, player, new String[]{"test", "", CUSTOM_NETNAME, ""}));

BlockPlaceEvent event = new BlockPlaceEvent(irisBlock, irisBlock.getState(), irisBlock,
new ItemStack(Material.ANDESITE), player, false);
blockEventListener.onBlockPlace(event);
blockEventListener.onBlockBreak(new BlockBreakEvent(insidePortal.getBlock(), player));
Assertions.assertTrue(event.isCancelled());
}
dan1st
dan1st3mo ago
And what's StargateAsyncTask? So it only happens when running multiple tests? Do you know what "suspending" is when it comes to debugging?
Thorinwasher
ThorinwasherOP3mo ago
It's an task run in another thread, with some wrapper stuff. (mostly because I want to be sure the tasks get executed whenever the srerver shuts down)= yep
dan1st
dan1st3mo ago
When portalInvalidBlockPlaceTest starts, is the queue empty?
Thorinwasher
ThorinwasherOP3mo ago
Yeah. Might be a good idea
dan1st
dan1st3mo ago
Also is the thread processing the tasks doing something at that point?
Thorinwasher
ThorinwasherOP3mo ago
Let's see, good idea to check that
dan1st
dan1st3mo ago
My suggesion is the following: - Run both tests so that you run into the blocking issue - Wait until you are sure the blocking issue happens - Suspend the thread running the test and check what it's doing - Check where it has been suspended (Did the test start? Is it somewhere inside the test? Did the test finish?) - Check the content of the BlockingQueue - Check what the working thread is doing (Is it processing a task from the previous thread? Is it waiting for a task (maybe it's waiting even though there are elements in the queue)?)
Thorinwasher
ThorinwasherOP3mo ago
Alright, btw do you have a good source I can read up on concurrency? (Your recomendation here will definitely help, thanks btw)
dan1st
dan1st3mo ago
If you are into books: There's a book called "Java Concurrency in Practice"
Thorinwasher
ThorinwasherOP3mo ago
Alright, thanks (Like my main fallback has just been to let external classes handle this. But in this case, I think I need to get better at concurency)
dan1st
dan1st3mo ago
I would also recommend doing that for most cases as it's the best way to avoid issues but it's still good to know what these classes do, what they can and cannot do and how to fix issues btw if you did that and you still need help with it, just say what the result of these things is
Thorinwasher
ThorinwasherOP3mo ago
Okay. (need to eat some dinner first) I think I know what's happening now. I made a disable queue item and an enable queue item. These items sets the value of asyncQueueThreadIsEnabled to true or false. It's a way to avoid dealing with concurrency issues. Now to the actual issue this is how I'm cycling through the queue:
do {
try {
Runnable runnable = asyncQueue.take();
runnable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
Stargate.log(e);
}
} while (asyncQueueThreadIsEnabled || !asyncQueue.isEmpty());
do {
try {
Runnable runnable = asyncQueue.take();
runnable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
Stargate.log(e);
}
} while (asyncQueueThreadIsEnabled || !asyncQueue.isEmpty());
If per say the last item would be a enable queue item in the queue. Then the boolean switch asyncQueueThreadIsEnabled would be true, therefore blocking the thread in the next BlockingQueue#take() call.
dan1st
dan1st3mo ago
If the thread is interrupted, you should stop looping interrupts typically mean asking the thread to stop In your case, if a thread is interrupted, it will continue looping but take() will fail every time if the queue is empty because the thread is interrupted
Thorinwasher
ThorinwasherOP3mo ago
Oh... that's stupid on my end. I just thought it threw an exception :facepalm: That might partly fix the issue, does still require the thread to be interupted though
dan1st
dan1st3mo ago
you can still do that at the end of the test if applicable but note that this may require restarting the thread
Thorinwasher
ThorinwasherOP3mo ago
Yes, that's how I want it to be
dan1st
dan1st3mo ago
check what the test is waiting for then you need the Thread object of the worker thread and call interrupt() on it
JavaBot
JavaBot3mo ago
💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one. In case your post is not getting any attention, you can try to use /help ping. Warning: abusing this will result in moderative actions taken against you.
Want results from more Discord servers?
Add your server