C
C#6mo ago
engineertdog

Parallel list

I'm trying to write to a list in parallel due to vendor SDK performance issues with sequential operations. I have the following code. Is this a proper approach?
c#
private readonly ConcurrentQueue<DRT> _recordQueue = new ConcurrentQueue<DRT>();
private readonly SemaphoreSlim _flushLock = new SemaphoreSlim(1, 1);

public async Task Execute() {
try {
// external/irrelevant to the list question
await PerformWorkerSetup();
IEnumerable<SRT> sourceRecords = ExtractRecords();
//

await TransformRecords(sourceRecords);
await FlushRecords(1);
} catch (Exception ex) {
Log.Error(ex.Message);
}
}

public async Task TransformRecordsParallel(IEnumerable<SRT> sourceRecords) {
IEnumerable<Task> tasks = sourceRecords.Select(async (record) => {
_recordQueue.Enqueue(this.FormatRecord(record));
await FlushRecords(Config.Threshold);
});

await Task.WhenAll(tasks);
}

public async Task FlushRecords(int threshold) {
if (_recordQueue.Count >= threshold) {
await _flushLock.WaitAsync();

try {
IList<DRT> recordList = new List<DRT>();

// Pull records out of the bag for processing.
while (recordList.Count <= threshold && _recordQueue.TryDequeue(out DRT record)) {
recordList.Add(record);
}

// write the recordList to file or SQL
} catch {
throw;
} finally {
_flushLock.Release();
}
}
}
c#
private readonly ConcurrentQueue<DRT> _recordQueue = new ConcurrentQueue<DRT>();
private readonly SemaphoreSlim _flushLock = new SemaphoreSlim(1, 1);

public async Task Execute() {
try {
// external/irrelevant to the list question
await PerformWorkerSetup();
IEnumerable<SRT> sourceRecords = ExtractRecords();
//

await TransformRecords(sourceRecords);
await FlushRecords(1);
} catch (Exception ex) {
Log.Error(ex.Message);
}
}

public async Task TransformRecordsParallel(IEnumerable<SRT> sourceRecords) {
IEnumerable<Task> tasks = sourceRecords.Select(async (record) => {
_recordQueue.Enqueue(this.FormatRecord(record));
await FlushRecords(Config.Threshold);
});

await Task.WhenAll(tasks);
}

public async Task FlushRecords(int threshold) {
if (_recordQueue.Count >= threshold) {
await _flushLock.WaitAsync();

try {
IList<DRT> recordList = new List<DRT>();

// Pull records out of the bag for processing.
while (recordList.Count <= threshold && _recordQueue.TryDequeue(out DRT record)) {
recordList.Add(record);
}

// write the recordList to file or SQL
} catch {
throw;
} finally {
_flushLock.Release();
}
}
}
3 Replies
Moods
Moods6mo ago
Task.WhenAll is fine yeah(btw unless I'm blind TransformRecordsParallel is never called in your code)
Moods
Moods6mo ago
engineertdog
engineertdogOP6mo ago
Whoops, I forgot that function. Basically it's a user-defined choice of parallel vs sequential.
c#
public async Task TransformRecords(IEnumerable<SRT> sourceRecords) {
if (Config.Parallel == "") {
await TransformRecordsSequential(sourceRecords);
} else {
await TransformRecordsParallel(sourceRecords);
}
}
c#
public async Task TransformRecords(IEnumerable<SRT> sourceRecords) {
if (Config.Parallel == "") {
await TransformRecordsSequential(sourceRecords);
} else {
await TransformRecordsParallel(sourceRecords);
}
}

Did you find this page helpful?