using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading;
using CustomThreadPool.Collections;
namespace CustomThreadPool.ThreadPools;
public class MyThreadPool : IThreadPool
{
private long _processedTask;
private readonly Queue<Action> _globalQueue = new();
private readonly ReadOnlyDictionary<int, WorkStealingQueue<Action>> _localQueues;
private const int WorkerCount = 10;
public MyThreadPool()
{
var workers = new Thread[WorkerCount];
var queues = new Dictionary<int, WorkStealingQueue<Action>>();
for (var i = 0; i < WorkerCount; i++)
{
var worker = new Thread(Worker) {IsBackground = true};
workers[i] = worker;
queues.Add(worker.ManagedThreadId, new WorkStealingQueue<Action>());
}
_localQueues = new ReadOnlyDictionary<int, WorkStealingQueue<Action>>(queues);
foreach (var worker in workers)
worker.Start();
}
public void EnqueueAction(Action action)
{
if (_localQueues.TryGetValue(Environment.CurrentManagedThreadId, out var localQueue))
{
localQueue.LocalPush(action);
return;
}
lock (_globalQueue)
{
_globalQueue.Enqueue(action);
Monitor.Pulse(_globalQueue);
}
}
public long GetTasksProcessedCount() => _processedTask;
private void Worker()
{
while (true)
{
if (TryGetFromLocalQueue(out var work)
|| TryGetFromGlobalQueue(out work)
|| TrySteal(out work))
{
work();
Interlocked.Increment(ref _processedTask);
continue;
}
lock (_globalQueue)
Monitor.Wait(_globalQueue);
}
// ReSharper disable once FunctionNeverReturns
}