P
Prisma5mo ago
jisoon

Websocket error when using Pulse

When I tried pulse it was working for some minutes but then it crashes with this error. I don't have any idea why this keep happening. Even increasing the event Listener, it still crashes Btw, I am using Supabase as my hosting provider. The error:
No description
18 Replies
jisoon
jisoon5mo ago
My code:
require("dotenv").config();
import express from "express";
import { createServer } from "http";
import { Server, Socket } from "socket.io";

import prisma from "../prisma/prisma";


const port = process.env.APP_PORT || 3000;

const app = express();
const server = createServer(app);

const io = new Server(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
io.on("connection", async (socket) => {});

export async function streamTodo(io: Server) {
await prisma.$disconnect();
console.log("streamTodo");

const stream = await prisma.todo.subscribe();

// Handle Prisma stream events
for await (const event of stream) {
console.log(`received event: `, event);

if (event.action === "create") {
io.sockets.emit("todo_added", event);
}
}
}

server.listen(port, async () => {
streamTodo(io);
});
require("dotenv").config();
import express from "express";
import { createServer } from "http";
import { Server, Socket } from "socket.io";

import prisma from "../prisma/prisma";


const port = process.env.APP_PORT || 3000;

const app = express();
const server = createServer(app);

const io = new Server(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
io.on("connection", async (socket) => {});

export async function streamTodo(io: Server) {
await prisma.$disconnect();
console.log("streamTodo");

const stream = await prisma.todo.subscribe();

// Handle Prisma stream events
for await (const event of stream) {
console.log(`received event: `, event);

if (event.action === "create") {
io.sockets.emit("todo_added", event);
}
}
}

server.listen(port, async () => {
streamTodo(io);
});
I just started using pulse last week, and that error kept showing. Whenever I started the server, the pulse worked for some minutes, then the MaxListenerExceedWarningError kept appearing and then crashed. @batmanwholaughz this is how i implemented in my sveltekit app
onMount(() => {
const url = 'http://localhost:3000';

const socket = io(url);
socket.on('todo_added', async (event: any) => {
console.log('received UPDATE event from server', event);
todos.push(event.created);
let resp = await axios.get(`/api/issue`);
todos = resp.data.issues;
});

return () => {
socket.off('issue_added');
};
});
onMount(() => {
const url = 'http://localhost:3000';

const socket = io(url);
socket.on('todo_added', async (event: any) => {
console.log('received UPDATE event from server', event);
todos.push(event.created);
let resp = await axios.get(`/api/issue`);
todos = resp.data.issues;
});

return () => {
socket.off('issue_added');
};
});
Nurul
Nurul5mo ago
@jisoon Are you still facing this issue?
jisoon
jisoon5mo ago
yes @Nurul (Prisma) I still facing this issue
Nurul
Nurul5mo ago
Is your repository open source by any chance? I would like to run this locally to debug. Also, are you using latest version of Prisma (5.15.0) and Pulse Extension (1.1.0)? Also, what happens if you change
prisma.todo.subscribe();
To
prisma.todo.stream();
Do you get the same error?
jisoon
jisoon5mo ago
yeah i still get the same error using stream(); i am using "prisma": "^5.14.0", and "@prisma/extension-pulse": "^1.1.0",
Nurul
Nurul5mo ago
This error doesn't happen immediately, right? It happens after a few minutes?
jisoon
jisoon5mo ago
yes
jisoon
jisoon5mo ago
this is the example server that i created https://github.com/COROTANjayson/prisma-pulse-test
GitHub
GitHub - COROTANjayson/prisma-pulse-test
Contribute to COROTANjayson/prisma-pulse-test development by creating an account on GitHub.
jisoon
jisoon5mo ago
i actually just copied the example implementation from prisma pulse docs
Nurul
Nurul5mo ago
Thanks! Let me try to run this locally with a supabase database and check if I can reproduce this. To confirm, you were receiving a few database events successfully and then you got this event listener error, correct?
jisoon
jisoon5mo ago
yes the pulse events are working
Nurul
Nurul5mo ago
Okay, thanks for confirming. I am trying to reproduce this
jisoon
jisoon5mo ago
Hello @Nurul. May I kindly ask if there are any updates? Thank you!
Nurul
Nurul5mo ago
Hey, Apologies for the delay. This slipped from my radar. I’ll get back to this latest by tomorrow 🙏
Nurul
Nurul5mo ago
I added 1000 events to test this, but I don't seem to get any errors. I created a streamTest function similar to your streamIssue and streamTodo functions.
export async function streamTest() {
await prisma.$disconnect();
console.log("streamTest");

const stream = await prisma.test.subscribe();

// Handle Prisma stream events
for await (const event of stream) {
console.log(`received event: `, event);

if (event.action === "create") {
console.log("test_added", event);
}
}
}
export async function streamTest() {
await prisma.$disconnect();
console.log("streamTest");

const stream = await prisma.test.subscribe();

// Handle Prisma stream events
for await (const event of stream) {
console.log(`received event: `, event);

if (event.action === "create") {
console.log("test_added", event);
}
}
}
And created 1000 tests records like this:
require("dotenv").config();
import express from "express";
import { createServer } from "http";
import { Server, Socket } from "socket.io";

import prisma from "../prisma/prisma";
import { streamIssue, streamTodo, streamTest } from "./pulse/meetingSession";
const port = process.env.APP_PORT || 3000;

const app = express();
const server = createServer(app);

// const io = new Server(server, {
// cors: {
// origin: "*",
// methods: ["GET", "POST"],
// },
// });
// io.on("connection", async (socket) => {});

server.listen(port, async () => {
// process.setMaxListeners(99);

// io.on("connection", (socket) => {
// console.log("ellooo");
// const key = socket.handshake.query.api_key;

// if (key && key === "apiKey") {
// streamIssue(io);
// streamTodo(io);
// } else {
// // next(new Error("Unauthorized"));
// }
// });
//streamIssue(io);
//streamTodo(io);
streamTest();
await createTest();
});

const createTest = async () => {
for (let i = 2; i < 1000; i++) {
const todo = await prisma.test.create({
data: {
name: "Test",
id: `${i}`,
},
});
console.log(todo);
}
};
require("dotenv").config();
import express from "express";
import { createServer } from "http";
import { Server, Socket } from "socket.io";

import prisma from "../prisma/prisma";
import { streamIssue, streamTodo, streamTest } from "./pulse/meetingSession";
const port = process.env.APP_PORT || 3000;

const app = express();
const server = createServer(app);

// const io = new Server(server, {
// cors: {
// origin: "*",
// methods: ["GET", "POST"],
// },
// });
// io.on("connection", async (socket) => {});

server.listen(port, async () => {
// process.setMaxListeners(99);

// io.on("connection", (socket) => {
// console.log("ellooo");
// const key = socket.handshake.query.api_key;

// if (key && key === "apiKey") {
// streamIssue(io);
// streamTodo(io);
// } else {
// // next(new Error("Unauthorized"));
// }
// });
//streamIssue(io);
//streamTodo(io);
streamTest();
await createTest();
});

const createTest = async () => {
for (let i = 2; i < 1000; i++) {
const todo = await prisma.test.create({
data: {
name: "Test",
id: `${i}`,
},
});
console.log(todo);
}
};
After how much time did you used to get this error? Can you comment the socket part and check if you still get the same error?
No description
jisoon
jisoon4mo ago
hello @Nurul (Prisma) sorry for the very late reply, I commented the socket part and there is no error. I think it only appear when the socket is on. Thank you btw
Nurul
Nurul4mo ago
No worries! We will be releasing a new version of Pulse Extension very soon, that will improve the performance and alleviate the error you were observing. I'll let you know once it's out.
Nurul
Nurul4mo ago
The new Prisma Pulse extension version 1.1.1 is out. Can you try this and check if you still get the error even after adding the socket part? https://www.npmjs.com/package/@prisma/extension-pulse
npm
@prisma/extension-pulse
Prisma Client extension for Pulse. Latest version: 1.1.1, last published: 2 days ago. Start using @prisma/extension-pulse in your project by running npm i @prisma/extension-pulse. There are no other projects in the npm registry using @prisma/extension-pulse.
Want results from more Discord servers?
Add your server