Sveltekit not receiving real-time data

Hello devs , i was trying to add real time events in my sveltekit app and with the release of prisma pulse and prisma postgreSQL so i added these features to my app but i had problem with prisma pulse , when i create a new DB i receive events when a change happens only in a short period of time and it stops working and i'm not having any error , in my pluse dashboard it says Replication slot status Unavailable idk if it means something , and this is my code , it will be nice of you if you take a look maybe i was doing things not in the right way
//api/+server.js
import { prisma } from "$lib/index.js";

export async function GET({ request }) {
const stream = new ReadableStream({
async start(controller) {
async function main() {
try {
const prismaStream = await prisma.links.stream();

for await (const event of prismaStream) {
console.log("Received event:", event);
controller.enqueue(`data: ${JSON.stringify(event)}\n\n`);
}
} catch (error) {
console.error("Error in Prisma stream:", error);
controller.error(error);
}
}

main();
},
cancel() {
console.log("Stream cancelled");
},
});

return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Access-Control-Allow-Origin": "*",
},
});
}
//api/+server.js
import { prisma } from "$lib/index.js";

export async function GET({ request }) {
const stream = new ReadableStream({
async start(controller) {
async function main() {
try {
const prismaStream = await prisma.links.stream();

for await (const event of prismaStream) {
console.log("Received event:", event);
controller.enqueue(`data: ${JSON.stringify(event)}\n\n`);
}
} catch (error) {
console.error("Error in Prisma stream:", error);
controller.error(error);
}
}

main();
},
cancel() {
console.log("Stream cancelled");
},
});

return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Access-Control-Allow-Origin": "*",
},
});
}
// lib/index.js
import { PrismaClient } from "@prisma/client";
import { withPulse } from "@prisma/extension-pulse";
import { PULSE_API_KEY } from "$env/static/private";

if (!PULSE_API_KEY) {
throw new Error("PULSE_API_KEY environment variable is not set");
}

const prisma = new PrismaClient().$extends(
withPulse({
apiKey: PULSE_API_KEY,
// Add retry options for better reliability
retry: {
maxRetries: 3,
initialDelay: 1000, // 1 second
},
})
);

export { prisma };
// lib/index.js
import { PrismaClient } from "@prisma/client";
import { withPulse } from "@prisma/extension-pulse";
import { PULSE_API_KEY } from "$env/static/private";

if (!PULSE_API_KEY) {
throw new Error("PULSE_API_KEY environment variable is not set");
}

const prisma = new PrismaClient().$extends(
withPulse({
apiKey: PULSE_API_KEY,
// Add retry options for better reliability
retry: {
maxRetries: 3,
initialDelay: 1000, // 1 second
},
})
);

export { prisma };
2 Replies
adam boukhris
adam boukhrisOP3d ago
//+page.svelte
<script>
import { onMount } from "svelte";
export let data;
let { links } = data;

onMount(() => {
const eventSource = new EventSource("/api");

eventSource.onmessage = (event) => {
console.log("SSE message received:", event.data);
const eventData = JSON.parse(event.data);

switch (eventData.action) {
case "create":
links = [...links, eventData.created];
break;
case "update":
links = links.map((link) =>
link.id === eventData.updated.id ? eventData.updated : link
);
break;
case "delete":
links = links.filter((link) => link.id !== eventData.deleted.id);
break;
}
};

eventSource.onerror = (error) => {
console.error("SSE error:", error);
};

return () => {
eventSource.close();
};
});
const delet = async (id) => {
const response = await fetch(`/api/delete/${id}`, {
method: "DELETE",
});

if (response.ok) {
links = links.filter((link) => link.id !== id);
}
};
</script>
//+page.svelte
<script>
import { onMount } from "svelte";
export let data;
let { links } = data;

onMount(() => {
const eventSource = new EventSource("/api");

eventSource.onmessage = (event) => {
console.log("SSE message received:", event.data);
const eventData = JSON.parse(event.data);

switch (eventData.action) {
case "create":
links = [...links, eventData.created];
break;
case "update":
links = links.map((link) =>
link.id === eventData.updated.id ? eventData.updated : link
);
break;
case "delete":
links = links.filter((link) => link.id !== eventData.deleted.id);
break;
}
};

eventSource.onerror = (error) => {
console.error("SSE error:", error);
};

return () => {
eventSource.close();
};
});
const delet = async (id) => {
const response = await fetch(`/api/delete/${id}`, {
method: "DELETE",
});

if (response.ok) {
links = links.filter((link) => link.id !== id);
}
};
</script>
RaphaelEtim
RaphaelEtimthis hour
Hi @adam boukhris Can you try to re-enable the replication slot? Without the replication slot available, Prisma Pulse cannot stream changes from your DB.

Did you find this page helpful?