How to retry seriazible transactions Drizzle Orm

Hello everyone, I'm currently working with Drizzle ORM in a Node.js application and facing challenges handling high concurrency when multiple instances of my program are interacting with the same PostgreSQL database. Specifically, I have multiple processes running that update a scraped status in a URL table. When one process is updating a row, I want the other processes to skip this row and proceed to the next available one instead of trying to update the same row. However, I'm encountering frequent serialization errors like "could not serialize access due to concurrent update," which cause my programs to crash completely. Here's a simplified version of how my transaction logic is set up:
await db.transaction(
async (tx) => {
const urlResult = await tx.query.url.findFirst({
where: { scraped: false },
});

if (!urlResult) {
console.log("No unscraped URL found, retrying...");
await new Promise(resolve => setTimeout(resolve, 5000));
return;
}

try {
// Marking as currently being scraped
await tx.update(url)
.set({ scraped: null })
.where({ gmPartNo: urlResult.gmPartNo });

// Process scraping here
await processScraping(urlResult);

// Mark as scraped. Inside of processscraping actually, but showing here
// for clarity
await tx.update(url)
.set({ scraped: true })
.where({ gmPartNo: urlResult.gmPartNo });
} catch (error) {
console.error("Error processing URL:", error);
// Attempt to revert or handle error
await tx.update(url)
.set({ scraped: false })
.where({ gmPartNo: urlResult.gmPartNo });
}
},
{
isolationLevel: "serializable",
accessMode: "read write",
deferrable: true,
}
);
await db.transaction(
async (tx) => {
const urlResult = await tx.query.url.findFirst({
where: { scraped: false },
});

if (!urlResult) {
console.log("No unscraped URL found, retrying...");
await new Promise(resolve => setTimeout(resolve, 5000));
return;
}

try {
// Marking as currently being scraped
await tx.update(url)
.set({ scraped: null })
.where({ gmPartNo: urlResult.gmPartNo });

// Process scraping here
await processScraping(urlResult);

// Mark as scraped. Inside of processscraping actually, but showing here
// for clarity
await tx.update(url)
.set({ scraped: true })
.where({ gmPartNo: urlResult.gmPartNo });
} catch (error) {
console.error("Error processing URL:", error);
// Attempt to revert or handle error
await tx.update(url)
.set({ scraped: false })
.where({ gmPartNo: urlResult.gmPartNo });
}
},
{
isolationLevel: "serializable",
accessMode: "read write",
deferrable: true,
}
);
Is there a best way to handle retrying when one process is encountering "could not serialize access due to concurrent update"? Thanks!
1 Reply
CrazyCroatKid
CrazyCroatKidOP7mo ago
Got it working! Looks like there is for lock that is not in the documentation. Just set something up like this:
const lockedRowQuery = tx
.select()
.from(url)
.limit(1)
.orderBy(url.gmPartNo)
.for("update", { skipLocked: true })
.where(eq(url.scraped, false));
const lockedRowQuery = tx
.select()
.from(url)
.limit(1)
.orderBy(url.gmPartNo)
.for("update", { skipLocked: true })
.where(eq(url.scraped, false));
Want results from more Discord servers?
Add your server