C
C#16mo ago
Turwaith

❔ Akka.net Actors are not responding in time

I have a system that consists of two Actor classes and a class that calls these actors.
public class ActorsystemManager
{
readonly ActorSystem simulationActorSystem;
readonly IActorRef updateActorRef;
private List<ActorState> currentActorStates = new();

private readonly object locker = new();

public int ActorChangeInterval { get; set; }

// This property is accessed by both this class to write and by the interface to read. Always use "lock" when accessing it!
public List<ActorState> CurrentActorStates
{
get
{
lock (locker)
{
return currentActorStates;
}
}
set
{
lock (locker)
{
currentActorStates = value;
}
}
}

public ActorsystemManager(int actorChangeInterval, List<int[]> coordinates)
{
ActorChangeInterval = actorChangeInterval;

simulationActorSystem = ActorSystem.Create("SimulationActorSystem");
updateActorRef = simulationActorSystem.ActorOf(Props.Create<UpdateActor>(actorChangeInterval, coordinates), "UpdateActor");

UpdateTimer timer = new UpdateTimer();
timer.Elapsed += OnTimerTick;
}

private void OnTimerTick(object sender, EventArgs e)
{
updateActorRef.Ask<List<ActorState>>("update")
.ContinueWith(task =>
{
if (task.IsCompletedSuccessfully)
{
UpdateCurrentActorStates(task.Result);
}
// Handle other cases, like task.IsFaulted or task.IsCanceled, if needed
}, TaskScheduler.Default);
}

private void UpdateCurrentActorStates(List<ActorState> result)
{
CurrentActorStates = new List<ActorState>(result);
}

public void Trigger(int[] coordinates)
{
updateActorRef.Tell(coordinates);
}

public bool Dispose()
{
try
{
simulationActorSystem.Dispose();
return true;
}
catch (Exception)
{

return false;
}
}
}
public class ActorsystemManager
{
readonly ActorSystem simulationActorSystem;
readonly IActorRef updateActorRef;
private List<ActorState> currentActorStates = new();

private readonly object locker = new();

public int ActorChangeInterval { get; set; }

// This property is accessed by both this class to write and by the interface to read. Always use "lock" when accessing it!
public List<ActorState> CurrentActorStates
{
get
{
lock (locker)
{
return currentActorStates;
}
}
set
{
lock (locker)
{
currentActorStates = value;
}
}
}

public ActorsystemManager(int actorChangeInterval, List<int[]> coordinates)
{
ActorChangeInterval = actorChangeInterval;

simulationActorSystem = ActorSystem.Create("SimulationActorSystem");
updateActorRef = simulationActorSystem.ActorOf(Props.Create<UpdateActor>(actorChangeInterval, coordinates), "UpdateActor");

UpdateTimer timer = new UpdateTimer();
timer.Elapsed += OnTimerTick;
}

private void OnTimerTick(object sender, EventArgs e)
{
updateActorRef.Ask<List<ActorState>>("update")
.ContinueWith(task =>
{
if (task.IsCompletedSuccessfully)
{
UpdateCurrentActorStates(task.Result);
}
// Handle other cases, like task.IsFaulted or task.IsCanceled, if needed
}, TaskScheduler.Default);
}

private void UpdateCurrentActorStates(List<ActorState> result)
{
CurrentActorStates = new List<ActorState>(result);
}

public void Trigger(int[] coordinates)
{
updateActorRef.Tell(coordinates);
}

public bool Dispose()
{
try
{
simulationActorSystem.Dispose();
return true;
}
catch (Exception)
{

return false;
}
}
}
Actors:
internal class UpdateActor : ReceiveActor
{
public List<IActorRef> HexagonActors { get; set; }

public UpdateActor()
{
HexagonActors = new List<IActorRef>();

ReceiveAsync<string>(data => data.ToLower().Equals("update"), async data => await Update());

Receive<int[]>(data => Trigger(data));
}

public UpdateActor(int actorChangeInterval, List<int[]> coordinates) : this()
{
foreach (var item in coordinates)
{
string coordinatesText = $"{item[0]}_{item[1]}";
var child = Context.ActorOf(Props.Create<HexagonActor>(actorChangeInterval, item), coordinatesText);
HexagonActors.Add(child);
}
}

private async Task Update()
{
var actorStates = new List<ActorState>();

foreach (var actor in HexagonActors)
{
try
{
var actorState = await actor.Ask<ActorState>("update", TimeSpan.FromMilliseconds(200));
actorStates.Add(actorState);
}
catch (AskTimeoutException ex)
{
throw new AskTimeoutException("");
}
}

Sender.Tell(actorStates);
}

private void Trigger(int[] coordinates)
{
var coordinateAddress = $"{coordinates[0]}_{coordinates[1]}";
Context.ActorSelection("akka://SimulationActorSystem/user/UpdateActor/" + coordinateAddress).Tell("trigger");
}
}
internal class UpdateActor : ReceiveActor
{
public List<IActorRef> HexagonActors { get; set; }

public UpdateActor()
{
HexagonActors = new List<IActorRef>();

ReceiveAsync<string>(data => data.ToLower().Equals("update"), async data => await Update());

Receive<int[]>(data => Trigger(data));
}

public UpdateActor(int actorChangeInterval, List<int[]> coordinates) : this()
{
foreach (var item in coordinates)
{
string coordinatesText = $"{item[0]}_{item[1]}";
var child = Context.ActorOf(Props.Create<HexagonActor>(actorChangeInterval, item), coordinatesText);
HexagonActors.Add(child);
}
}

private async Task Update()
{
var actorStates = new List<ActorState>();

foreach (var actor in HexagonActors)
{
try
{
var actorState = await actor.Ask<ActorState>("update", TimeSpan.FromMilliseconds(200));
actorStates.Add(actorState);
}
catch (AskTimeoutException ex)
{
throw new AskTimeoutException("");
}
}

Sender.Tell(actorStates);
}

private void Trigger(int[] coordinates)
{
var coordinateAddress = $"{coordinates[0]}_{coordinates[1]}";
Context.ActorSelection("akka://SimulationActorSystem/user/UpdateActor/" + coordinateAddress).Tell("trigger");
}
}
2nd actor and problem follows in a second message, hang on
2 Replies
Turwaith
Turwaith16mo ago
internal class HexagonActor : ReceiveActor
{
readonly int stateChangeInterval;
readonly int vCoordinate;
readonly int uCoordinate;
bool currentState;

readonly object currentStateLock = new object();

readonly Dictionary<string, int[]> Neighbours;

public HexagonActor()
{
currentState = false;

ReceiveAsync<string>(data => data.ToLower().Equals("update"), async data => await TellCurrentState());

ReceiveAsync<string>(data => data.ToLower().Equals("trigger"), async data => await InitialTrigger());

ReceiveAsync<int[]>(async data => await ContinuousTrigger(data));
}

public HexagonActor(int interval, int[] coordinate) : this()
{
stateChangeInterval = interval;

vCoordinate = coordinate[0];
uCoordinate = coordinate[1];

Neighbours = new Dictionary<string, int[]>()
{
{ "North", new int[2] {vCoordinate + 0, uCoordinate + 1 } },
{ "NorthEast", new int[2] {vCoordinate + 1, uCoordinate + 0} },
{ "SouthEast", new int[2] {vCoordinate + 1, uCoordinate - 1 } },
{ "South", new int[2] { vCoordinate + 0, uCoordinate - 1 } },
{ "SouthWest", new int[2] { vCoordinate - 1, uCoordinate + 0 } },
{ "NorthWest", new int[2] { vCoordinate - 1, uCoordinate + 1 } }
};
}

public async Task TellCurrentState()
{
bool state;
lock (currentStateLock)
{
state = currentState;
}
Sender.Tell(new ActorState(vCoordinate, uCoordinate, state));
}

public async Task InitialTrigger()
{
lock (currentStateLock)
{
currentState = true;
}

//await Task.Delay(TimeSpan.FromSeconds(stateChangeInterval/1000));

foreach (var value in Neighbours.Values)
{
var neighbourName = $"{value[0]}_{value[1]}";
Context.ActorSelection($"akka://SimulationActorSystem/user/UpdateActor/{neighbourName}").Tell(new int[2] {vCoordinate, uCoordinate});
// {akka://SimulationActorSystem/user/UpdateActor/13_12}
}

lock (currentStateLock)
{
currentState = false;
}
}

public async Task ContinuousTrigger(int[] sender)
{
currentState = true;

var senderNeighbourName = Neighbours.FirstOrDefault(kvp => kvp.Value.SequenceEqual(sender)).Key;

if (senderNeighbourName == null) return;

var neighboursToNotify = getNeighboursToNotify(senderNeighbourName);

var neighbour1Address = $"{Neighbours[neighboursToNotify[0]][0]}_{Neighbours[neighboursToNotify[0]][1]}";
var neighbour2Address = $"{Neighbours[neighboursToNotify[1]][0]}_{Neighbours[neighboursToNotify[1]][1]}";

//await Task.Delay(TimeSpan.FromSeconds(stateChangeInterval/1000));

Context.ActorSelection($"../{neighbour1Address}").Tell(new int[2] { vCoordinate, uCoordinate });
Context.ActorSelection($"../{neighbour2Address}").Tell(new int[2] { vCoordinate, uCoordinate });

currentState = false;
}

public string[] getNeighboursToNotify(string sendingNeighbour)
{
switch(sendingNeighbour)
{
case "North":
return new string[2] { "South", "SouthWest" };
case "NorthEast":
return new string[2] { "SouthWest", "NorthWest" };
case "SouthEast":
return new string[2] { "NorthWest", "North" };
case "South":
return new string[2] { "North", "NorthEast" };
case "SouthWest":
return new string[2] { "NorthEast", "SouthEast" };
case "NorthWest":
return new string[2] { "SouthEast", "South" };
}

return new string[2] { "", "" };
}
}
internal class HexagonActor : ReceiveActor
{
readonly int stateChangeInterval;
readonly int vCoordinate;
readonly int uCoordinate;
bool currentState;

readonly object currentStateLock = new object();

readonly Dictionary<string, int[]> Neighbours;

public HexagonActor()
{
currentState = false;

ReceiveAsync<string>(data => data.ToLower().Equals("update"), async data => await TellCurrentState());

ReceiveAsync<string>(data => data.ToLower().Equals("trigger"), async data => await InitialTrigger());

ReceiveAsync<int[]>(async data => await ContinuousTrigger(data));
}

public HexagonActor(int interval, int[] coordinate) : this()
{
stateChangeInterval = interval;

vCoordinate = coordinate[0];
uCoordinate = coordinate[1];

Neighbours = new Dictionary<string, int[]>()
{
{ "North", new int[2] {vCoordinate + 0, uCoordinate + 1 } },
{ "NorthEast", new int[2] {vCoordinate + 1, uCoordinate + 0} },
{ "SouthEast", new int[2] {vCoordinate + 1, uCoordinate - 1 } },
{ "South", new int[2] { vCoordinate + 0, uCoordinate - 1 } },
{ "SouthWest", new int[2] { vCoordinate - 1, uCoordinate + 0 } },
{ "NorthWest", new int[2] { vCoordinate - 1, uCoordinate + 1 } }
};
}

public async Task TellCurrentState()
{
bool state;
lock (currentStateLock)
{
state = currentState;
}
Sender.Tell(new ActorState(vCoordinate, uCoordinate, state));
}

public async Task InitialTrigger()
{
lock (currentStateLock)
{
currentState = true;
}

//await Task.Delay(TimeSpan.FromSeconds(stateChangeInterval/1000));

foreach (var value in Neighbours.Values)
{
var neighbourName = $"{value[0]}_{value[1]}";
Context.ActorSelection($"akka://SimulationActorSystem/user/UpdateActor/{neighbourName}").Tell(new int[2] {vCoordinate, uCoordinate});
// {akka://SimulationActorSystem/user/UpdateActor/13_12}
}

lock (currentStateLock)
{
currentState = false;
}
}

public async Task ContinuousTrigger(int[] sender)
{
currentState = true;

var senderNeighbourName = Neighbours.FirstOrDefault(kvp => kvp.Value.SequenceEqual(sender)).Key;

if (senderNeighbourName == null) return;

var neighboursToNotify = getNeighboursToNotify(senderNeighbourName);

var neighbour1Address = $"{Neighbours[neighboursToNotify[0]][0]}_{Neighbours[neighboursToNotify[0]][1]}";
var neighbour2Address = $"{Neighbours[neighboursToNotify[1]][0]}_{Neighbours[neighboursToNotify[1]][1]}";

//await Task.Delay(TimeSpan.FromSeconds(stateChangeInterval/1000));

Context.ActorSelection($"../{neighbour1Address}").Tell(new int[2] { vCoordinate, uCoordinate });
Context.ActorSelection($"../{neighbour2Address}").Tell(new int[2] { vCoordinate, uCoordinate });

currentState = false;
}

public string[] getNeighboursToNotify(string sendingNeighbour)
{
switch(sendingNeighbour)
{
case "North":
return new string[2] { "South", "SouthWest" };
case "NorthEast":
return new string[2] { "SouthWest", "NorthWest" };
case "SouthEast":
return new string[2] { "NorthWest", "North" };
case "South":
return new string[2] { "North", "NorthEast" };
case "SouthWest":
return new string[2] { "NorthEast", "SouthEast" };
case "NorthWest":
return new string[2] { "SouthEast", "South" };
}

return new string[2] { "", "" };
}
}
The problem: UpdateTimer will trigger the method OnTimerTick every 100ms. The UpdateActor then goes through 1200 HexagonActors and returns their status. Now that works well, but as soon as the method ActorsystemManager.Trigger has been called, I get AskTimeoutExceptions inside the UpdateActor and I don't understand why that is.
Accord
Accord15mo ago
Looks like nothing has happened here. I will mark this as stale and this post will be archived until there is new activity.