C
C#15mo ago
NacDan

Iterating over AsyncEnumerable and writing to MemoryStream

Mainly a question on how you all would go about this, I am thinking of just initializing a SemaphoreSlim(1,1) and waiting while inside the iteration then writing to the memory stream. Any opposition to this? The compiled code inside an AsyncEnumerable seems to be what I'd expect so I would need some locking while writing to the stream regardless.
13 Replies
NacDan
NacDan15mo ago
For instance
SemaphoreSlim semaphoreSlim = new(1, 1);
await foreach (var batch in batches)
{
try
{
await semaphoreSlim.WaitAsync();
var records = batch.AsCollection();
totalBatches++;
totalBatches += records.Count;

await WriteToParquet(memoryStream, records);
}
finally
{
semaphoreSlim.Release();
}
}
SemaphoreSlim semaphoreSlim = new(1, 1);
await foreach (var batch in batches)
{
try
{
await semaphoreSlim.WaitAsync();
var records = batch.AsCollection();
totalBatches++;
totalBatches += records.Count;

await WriteToParquet(memoryStream, records);
}
finally
{
semaphoreSlim.Release();
}
}
Mayor McCheese
Mayor McCheese15mo ago
Why not block outside the foreach? Or are you concerned about the order going into parquet?
Aaron
Aaron15mo ago
what's the semaphore for? it seems to be doing nothing here
NacDan
NacDan15mo ago
Correct, WriteToParquet takes in the existing memorystream
Mayor McCheese
Mayor McCheese15mo ago
My comment on this somehow truncated 😦 Are you rationally worried?
NacDan
NacDan15mo ago
Force synchonization while writing to the MemoryStream
Aaron
Aaron15mo ago
await foreach (var batch in batches)
{
var records = batch.AsCollection();
totalBatches++;
totalBatches += records.Count;

await WriteToParquet(memoryStream, records);
}
await foreach (var batch in batches)
{
var records = batch.AsCollection();
totalBatches++;
totalBatches += records.Count;

await WriteToParquet(memoryStream, records);
}
that's the exact same code it will behave the same way
NacDan
NacDan15mo ago
Is that because it is disposed before the next iteration?
Aaron
Aaron15mo ago
what is disposed
NacDan
NacDan15mo ago
the current enumerator My main concern was unintentionally writing to the stream concurrently. I'm not all too familiar with AsyncEnumerables
Mayor McCheese
Mayor McCheese15mo ago
It's possible I suppose that batches could come in the wrong order, we don't know the implementation of batches, but you won't solve that with a semaphore
Aaron
Aaron15mo ago
AsyncEnumerable is essentially just a normal enumerable, but made of tasks instead of normal items it doesn't inherently make you process items in parallel this code will still process items one at a time if you wanted to process items in parallel, you'd have to use Parallel.ForEachAsync
NacDan
NacDan15mo ago
I am seeing the compiler transformations right now yes I see now Of what it looks like under the hood Gotcha that makes sense. Thanks