C
C#6mo ago
no >> body

✅ EF core transactions in PostgreSQL

I have a custom extension method for running operations on DbContext. One is for running operations in transactions with a specific isolation level, and the second one is for retry logic. The problem I just spotted is that isolation doesn't work properly, despite having enough retries and time to process transactions. I have an entity called WalletBalance that contains a property Amount. And here is my job for testing:
[DisallowConcurrentExecution]
public class TestJob(ILogger<TestJob> logger, MasterDbContext masterDbContext, IServiceScopeFactory factory) : IJob
{
private Guid WalletBalanceId { get; set; }

public async Task Execute(IJobExecutionContext context)
{
WalletBalanceId = masterDbContext.WalletBalances.First(x => x.Amount == 0).Id;

var tasks = new List<Task>
{
UpdateBalance(100, 100),
UpdateBalance(-100, 100),
UpdateBalance(100, 100),
UpdateBalance(-100, 100)
};

await Task.WhenAll(tasks);
}

private async Task UpdateBalance(int amount, int times)
{
using IServiceScope scope = factory.CreateScope();
MasterDbContext context = scope.ServiceProvider.GetRequiredService<MasterDbContext>();

for (var i = 0; i < times; i++)
{
await context.ExecuteInCustomStrategyAsync(
async _ =>
{
await context.ExecuteInTransactionAsync(
async _ =>
{
WalletBalance walletBalance = (await context.WalletBalances.FindAsync(WalletBalanceId))!;
walletBalance.Amount += amount;
});
});
}
}
}
[DisallowConcurrentExecution]
public class TestJob(ILogger<TestJob> logger, MasterDbContext masterDbContext, IServiceScopeFactory factory) : IJob
{
private Guid WalletBalanceId { get; set; }

public async Task Execute(IJobExecutionContext context)
{
WalletBalanceId = masterDbContext.WalletBalances.First(x => x.Amount == 0).Id;

var tasks = new List<Task>
{
UpdateBalance(100, 100),
UpdateBalance(-100, 100),
UpdateBalance(100, 100),
UpdateBalance(-100, 100)
};

await Task.WhenAll(tasks);
}

private async Task UpdateBalance(int amount, int times)
{
using IServiceScope scope = factory.CreateScope();
MasterDbContext context = scope.ServiceProvider.GetRequiredService<MasterDbContext>();

for (var i = 0; i < times; i++)
{
await context.ExecuteInCustomStrategyAsync(
async _ =>
{
await context.ExecuteInTransactionAsync(
async _ =>
{
WalletBalance walletBalance = (await context.WalletBalances.FindAsync(WalletBalanceId))!;
walletBalance.Amount += amount;
});
});
}
}
}
I expect the amount to be 0 after all the transactions have been run. But in fact Amount has random numbers after each run of the job. My implementation of ExecuteInCustomStrategyAsync and ExecuteInTransactionAsync below
9 Replies
no >> body
no >> bodyOP6mo ago
public static async Task ExecuteInCustomStrategyAsync(
this DbContext context,
Func<CancellationToken, Task> operation,
int maxRetries = 20,
int delayMilliseconds = 1,
CancellationToken ct = default)
{
IExecutionStrategy strategy = context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(
(operation, maxRetries, delayMilliseconds, ct),
async x => await ExecuteWithRetriesAsync(x.operation, x.maxRetries, x.delayMilliseconds, x.ct));
}
public static async Task ExecuteInCustomStrategyAsync(
this DbContext context,
Func<CancellationToken, Task> operation,
int maxRetries = 20,
int delayMilliseconds = 1,
CancellationToken ct = default)
{
IExecutionStrategy strategy = context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(
(operation, maxRetries, delayMilliseconds, ct),
async x => await ExecuteWithRetriesAsync(x.operation, x.maxRetries, x.delayMilliseconds, x.ct));
}
Where ExecuteWithRetriesAsync is:
private static async Task ExecuteWithRetriesAsync(
Func<CancellationToken, Task> operation,
int maxRetries,
int delayMilliseconds,
CancellationToken ct)
{
var retryCount = 0;
int delay = delayMilliseconds;
while (true)
{
try
{
await operation(ct);
return;
}
catch (NpgsqlException npgsqlException) when (npgsqlException.InnerException is EndOfStreamException)
{
if (retryCount++ > maxRetries)
{
throw;
}

delay = Math.Min((int)Math.Pow(delay, 2) * retryCount, 30000);
int jitter = Random.Shared.Next(-delay / 4, delay / 4);
int nextDelay = delay + jitter;
await Task.Delay(nextDelay, ct);
}
}
}
private static async Task ExecuteWithRetriesAsync(
Func<CancellationToken, Task> operation,
int maxRetries,
int delayMilliseconds,
CancellationToken ct)
{
var retryCount = 0;
int delay = delayMilliseconds;
while (true)
{
try
{
await operation(ct);
return;
}
catch (NpgsqlException npgsqlException) when (npgsqlException.InnerException is EndOfStreamException)
{
if (retryCount++ > maxRetries)
{
throw;
}

delay = Math.Min((int)Math.Pow(delay, 2) * retryCount, 30000);
int jitter = Random.Shared.Next(-delay / 4, delay / 4);
int nextDelay = delay + jitter;
await Task.Delay(nextDelay, ct);
}
}
}
public static async Task ExecuteInTransactionAsync(
this DbContext context,
Func<CancellationToken, Task> operation,
IsolationLevel isolationLevel = IsolationLevel.Serializable,
int maxRetries = 20,
CancellationToken ct = default)
{
var retryCount = 0;
while (true)
{
IDbContextTransaction? dbTransaction = null;
try
{
dbTransaction = await context.Database.BeginTransactionAsync(isolationLevel, ct);
await operation(ct);
await context.SaveChangesAsync(false, ct);
await dbTransaction.CommitAsync(ct);
context.ChangeTracker.AcceptAllChanges();
return;
}
catch (Exception ex)
{
Exception baseException = ex.GetBaseException();
if (baseException is PostgresException { SqlState: PostgresErrorCodes.SerializationFailure })
{
if (retryCount++ > maxRetries)
{
throw;
}

DetachUpdatedEntities(context);
}
else
{
if (dbTransaction is not null)
{
await dbTransaction.RollbackAsync(ct);
}

throw;
}
}
finally
{
if (dbTransaction is not null)
{
await dbTransaction.DisposeAsync();
}
}
}
}
public static async Task ExecuteInTransactionAsync(
this DbContext context,
Func<CancellationToken, Task> operation,
IsolationLevel isolationLevel = IsolationLevel.Serializable,
int maxRetries = 20,
CancellationToken ct = default)
{
var retryCount = 0;
while (true)
{
IDbContextTransaction? dbTransaction = null;
try
{
dbTransaction = await context.Database.BeginTransactionAsync(isolationLevel, ct);
await operation(ct);
await context.SaveChangesAsync(false, ct);
await dbTransaction.CommitAsync(ct);
context.ChangeTracker.AcceptAllChanges();
return;
}
catch (Exception ex)
{
Exception baseException = ex.GetBaseException();
if (baseException is PostgresException { SqlState: PostgresErrorCodes.SerializationFailure })
{
if (retryCount++ > maxRetries)
{
throw;
}

DetachUpdatedEntities(context);
}
else
{
if (dbTransaction is not null)
{
await dbTransaction.RollbackAsync(ct);
}

throw;
}
}
finally
{
if (dbTransaction is not null)
{
await dbTransaction.DisposeAsync();
}
}
}
}
where DetachUpdatedEntities is:
private static void DetachUpdatedEntities(DbContext context)
{
var changedEntries = context.ChangeTracker.Entries()
.Where(e => e.State is EntityState.Added or EntityState.Modified or EntityState.Deleted)
.ToList();

foreach (EntityEntry entry in changedEntries)
{
entry.State = EntityState.Detached;
}
}
private static void DetachUpdatedEntities(DbContext context)
{
var changedEntries = context.ChangeTracker.Entries()
.Where(e => e.State is EntityState.Added or EntityState.Modified or EntityState.Deleted)
.ToList();

foreach (EntityEntry entry in changedEntries)
{
entry.State = EntityState.Detached;
}
}
So, the retry policy is pretty aggressive. It has 20 retries with a max delay of 30s, which should be more than enough. I have a feeling that I need something like SELECT FOR UPDATE here to prevent other from reading, while I'm doing transaction Hmm, I changed FindAsync to FirstAsync and it looks like the problem is solved. As far as I know Find caches result, so It could use entity with not actual results. Another approach to solve this was to add await context.Entry(walletBalance).ReloadAsync(); inside transaction body
Yawnder
Yawnder6mo ago
You're digging your own grave by playing around with context.Entry(...), by using Find and by using touching the ChangeTracker with stuff like AcceptAllChanges. These are to be used only in specific cases.
no >> body
no >> bodyOP6mo ago
Unfortunately, I haven't found a better way to implement retries with transactions in EF Core with PostgreSQL. I believe I dug my own grave when I chose EF for that project. But it is what it is, the code is running already in production and I have to fix this issue with concurrent transactions. Also, FirstAsync and ReloadAsync didn't solve my problems. I still have issues with balances.
Yawnder
Yawnder6mo ago
In your case, couldn't just use the ExecuteUpdate ?
no >> body
no >> bodyOP6mo ago
Hmmm, something new. Haven't seen this method before. Let me check
Yawnder
Yawnder6mo ago
It's new in 7 or 8
no >> body
no >> bodyOP6mo ago
This not only fixed my problem but improved performance as well. Thanks!
Yawnder
Yawnder6mo ago
Great! In many cases, it also removes the need for transactions. If "whether or not to execute something" (think "do I let this purchase go through or not based on the balance") is data driven, you might still need them, but when it's stuff that "has to happen" (think "view count"), it can remove the need for transactions.
no >> body
no >> bodyOP6mo ago
Yeah, this is exactly what I thought about. If I don't need to read something before updating, then I don't need a transaction either. I also have a place in my code where I create/delete something alongside with updating balances. This still requires a transaction in case something fails, but I reckon I don't need it to be Serializable now
Want results from more Discord servers?
Add your server