C
C#5mo ago
Robert

Skill issue serializing tasks

Hi! I need to serialize a list of tasks to only begin when the previous task is fully completed, let's take this snippet for example
async Task TestAsync(string log)
{
Console.WriteLine($"delay start {log}");
await Task.Delay(5000);
Console.WriteLine($"delay end {log}");
}

[TestMethod]
public async Task TestTasks()
{
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};

/*
How do I achieve this?
delay start 1
delay end 1
delay start 2
delay end 2
delay start 3
delay end 3
*/
}
async Task TestAsync(string log)
{
Console.WriteLine($"delay start {log}");
await Task.Delay(5000);
Console.WriteLine($"delay end {log}");
}

[TestMethod]
public async Task TestTasks()
{
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};

/*
How do I achieve this?
delay start 1
delay end 1
delay start 2
delay end 2
delay start 3
delay end 3
*/
}
I've tried using Task.WaitAll, await Task.WhenAll, looping and awaiting them, continue with (with and without await), but the closest i've been to succes is delay start 1 delay start 2 delay start 3 delay end 3 delay end 2 delay end 1 Thank you!
29 Replies
blueberriesiftheywerecats
so making a loop and awaiting every i-th element? but i think, when writing like that
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};
they would still start in random order
Robert
RobertOP5mo ago
doesn't achieve what i need
No description
Robert
RobertOP5mo ago
interested is the order not guaranteed as a list?
blueberriesiftheywerecats
try making a delay for first element and you ll see cuz i think they wont always even start in order so maybe you have to make list or references to function and its arguments, then when iterating throught it you just call it but why even making async code at this point
Robert
RobertOP5mo ago
well it still waits for the first task, but not for its completion
No description
Robert
RobertOP5mo ago
the real application is that i have a stream of messages that i want to cache and write into the db. I want to wait until the previous message has been cached to cache the next one (as the order matters) and I don't want to block the main thread
reflectronic
reflectronic5mo ago
when you call TestAsync it starts the task immediately so,
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};
List<Task> tasks = new()
{
TestAsync("1"),
TestAsync("2"),
TestAsync("3"),
};
is already non-serial, and the only option is to change this code i think what you want is
List<Func<Task>> tasks = new()
{
() => TestAsync("1"),
() => TestAsync("2"),
() => TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
List<Func<Task>> tasks = new()
{
() => TestAsync("1"),
() => TestAsync("2"),
() => TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
Robert
RobertOP5mo ago
Thank you! This is what I wanted. I don't really get why the latter doesn't start the task automatically?
reflectronic
reflectronic5mo ago
because () => TestAsync("1") is a delegate it is a method, which can be called (and is called inside of the loop) it is declared there, but it's not called there
Robert
RobertOP5mo ago
Thank you for your help!
SleepWellPupper
SleepWellPupper5mo ago
Some more bits for better understanding: your TestAsync methods only returns a task when hitting the delay, until then, it executes synchronously. That is why you see your delay start messages in sequence; it is only after the last start message that you have your fully initialized list and actually enter the loop.
Robert
RobertOP5mo ago
If we make the delegates async, they will be scheduled on the threadpool rather than on the same thread, right?
public async Task TestTasks()
{
List<Func<Task>> tasks = new()
{
async () => await TestAsync("1"),
async () => await TestAsync("2"),
async () => await TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
}
public async Task TestTasks()
{
List<Func<Task>> tasks = new()
{
async () => await TestAsync("1"),
async () => await TestAsync("2"),
async () => await TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
}
List<Func<Task>> tasks = new()
{
() => TestAsync("1"),
() => TestAsync("2"),
() => TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
List<Func<Task>> tasks = new()
{
() => TestAsync("1"),
() => TestAsync("2"),
() => TestAsync("3"),
};

foreach (var t in tasks)
{
await t();
}
reflectronic
reflectronic5mo ago
no it will be the same as before but less efficient
SleepWellPupper
SleepWellPupper5mo ago
You can use Task.Run to schedule tasks on the threadpool instead.
Robert
RobertOP5mo ago
So using this pattern, I face this problem I have a sync method (which i cannot change) that needs to achieve this 2 things: create a json and send it to 2 places (1. to a redis cache, 2. to an appendblbo) I do not wish to block the main thread for this so Task.Run would be used.
public void MySyncMethod()
{
var json = CreateJson();
Task.Run( async () => await SendToRedisCache(json));
Task.Run( async () => await SendToAppendblob(json));
}
public void MySyncMethod()
{
var json = CreateJson();
Task.Run( async () => await SendToRedisCache(json));
Task.Run( async () => await SendToAppendblob(json));
}
The issue I have is that my order of completion is not guaranteed, what mechanism or pattern should I use to guarantee the order of caching and sending to an append blob? I have thought of using 2 concurrent queues (one for caching, one for uploading the append blob, as they are independent of each other) with long-running tasks, but I read it is not ideal, any ideas? So essentially the same problem, how do I wait in the SendToRedisCache for the previous SendToRedisCache to finish before starting?
reflectronic
reflectronic5mo ago
by using Task.Run you just reintroduced the same issue that was already fixed i am not sure why you need the tasks to be scheduled on the thread pool it does not really make a difference
Robert
RobertOP5mo ago
But how do I delegate this work on other threads so I don't block this one? without task.run
reflectronic
reflectronic5mo ago
they are already asynchronous, they are not blocking the thread
Robert
RobertOP5mo ago
SendToRedisCache SendToAppendblob are indeed async, but if you're not awaiting them, aren't they run synchronously?
reflectronic
reflectronic5mo ago
"blocking" means that you are making the thread wait for I/O. an async method named SendToRedisCache is almost certainly not blocking it does not matter whether you await it or not--it is not blocking the thread on I/O if you want the SendToRedisCache to happen asynchonously--like, you want to immediately continue once you have started sending--that is a different thing but that is pretty easy
Task.Run(async () =>
{
await SendToRedisCache(json);
await SendToAppendblob(json);
});
Task.Run(async () =>
{
await SendToRedisCache(json);
await SendToAppendblob(json);
});
Robert
RobertOP5mo ago
Yes, I want to immediately continue - sort of a fire and forget, however it happens that MySyncMethod is reached again before the previous SendToRedisCache is finished, is there a way to guarantee that SendToRedisCache2 will finish after SendToRedisCache1 has finished?
reflectronic
reflectronic5mo ago
i think what you want is a producer-consumer type pattern
Robert
RobertOP5mo ago
We have previously used azure service bus for this, but we have reached to a point where we have more requests than it could handle so that's why we are trying sort of to replicate this with redis
reflectronic
reflectronic5mo ago
Channel<Json> jsonChannel = Channel.CreateUnbounded<Json>(new() { SingleReader = true });
Task.Run(SendJson);

void WriteJson(Json json)
{
jsonChannel.Writer.TryWrite(json);
}

Task SendJson()
{
await foreach (var json in jsonChannel.Reader.ReadAllAsync())
{
await SendToRedisCache(json);
await SendToAppendblob(json)
}
}
Channel<Json> jsonChannel = Channel.CreateUnbounded<Json>(new() { SingleReader = true });
Task.Run(SendJson);

void WriteJson(Json json)
{
jsonChannel.Writer.TryWrite(json);
}

Task SendJson()
{
await foreach (var json in jsonChannel.Reader.ReadAllAsync())
{
await SendToRedisCache(json);
await SendToAppendblob(json)
}
}
Robert
RobertOP5mo ago
and this will effectively be a long running task? or when does the json gets consumed?
reflectronic
reflectronic5mo ago
yes
Robert
RobertOP5mo ago
forgot to mention .net framework not .net core :(, but I got the idea even without await foreach (such a cool feature btw)
reflectronic
reflectronic5mo ago
yeah you can use WaitToReadAsync and then TryRead instead, i suppose
Robert
RobertOP5mo ago
Thank you! I'll have a try
Want results from more Discord servers?
Add your server