H
Hono2mo ago
ex0ns

Closing resources and streaming response

I'm running Hono on node.js and I had a bug the other day, we added an endpoint to our API that was streaming the response back to the client, however we got some weird connection error to our database while the stream was running. In one of our middleware we instanciate a postgres.js connection to the database, and we use this connection to generate streaming data to the client, however to avoid leaking connection we explicitly close the connection at the end of the middleware (I know there are some ways to re-use connection using pooling, but this is not the point here (for info, we are dynamically connecting to many different database depending on where the request is coming from, so we can't easily implement a pooling mecanism on this server). Of course the middleware is running as soon as the response (i.e the beginning of the stream) is send back to the user, which trigger the cleaning of the resource, and a failure of the generator to produce more results in the stream. The following reproducer explains the problems quite well I think:
16 Replies
ex0ns
ex0nsOP2mo ago
import { Hono } from "hono";
import { createMiddleware } from "hono/factory";
import { streamText } from "hono/streaming";

const middleware = createMiddleware<{
Variables: {
resourceToClose: { use: () => AsyncGenerator<string, void, unknown> };
};
}>(async (c, next) => {
let isClosed = false;
const resourceToClose = {
use: async function* () {
for (let x of ["first", "second", "last"]) {
if (!isClosed) yield x;
else yield "I have been closed";
}
},
close: () => {
isClosed = true;
console.log("closing");
},
};
c.set("resourceToClose", resourceToClose);
await next();
resourceToClose.close();
});

const server = new Hono().get("/test", middleware, async (c) => {
c.var.resourceToClose.use();
return streamText(c, async (stream) => {
for await (const val of c.var.resourceToClose.use()) {
await stream.writeln(val);
await stream.sleep(200);
}
});
});

export default server;
import { Hono } from "hono";
import { createMiddleware } from "hono/factory";
import { streamText } from "hono/streaming";

const middleware = createMiddleware<{
Variables: {
resourceToClose: { use: () => AsyncGenerator<string, void, unknown> };
};
}>(async (c, next) => {
let isClosed = false;
const resourceToClose = {
use: async function* () {
for (let x of ["first", "second", "last"]) {
if (!isClosed) yield x;
else yield "I have been closed";
}
},
close: () => {
isClosed = true;
console.log("closing");
},
};
c.set("resourceToClose", resourceToClose);
await next();
resourceToClose.close();
});

const server = new Hono().get("/test", middleware, async (c) => {
c.var.resourceToClose.use();
return streamText(c, async (stream) => {
for await (const val of c.var.resourceToClose.use()) {
await stream.writeln(val);
await stream.sleep(200);
}
});
});

export default server;
An idea I had was to detect the headers on c.res.headers to check if [ "transfer-encoding", "chunked" ] was set or not and skip the resource cleanup in those case, but I really don't like this solution and this will in the long run still leak resources. (also the stream helper does not seems to set the transfer-encoding header, but the streamText does) Do you know how I could solve this issue ?
ambergristle
ambergristle2mo ago
this is sort of extreme, but you could make use a higher-order function, and close after the callback resolves but it feels better to try and represent the stream as a promise within the middleware or, why not close when you're done using it? in use
ex0ns
ex0nsOP2mo ago
This is the example for a single resource, but there might be more (in our case there is), I agree that I could somehow store this "cleanup" function in the context and make sure that all endpoint returning a stream call this cleanup function (either at the end of the streaming or in the onError callback), and continue closing it automatically for all other endpoints, but this is very error prone.
Arjix
Arjix2mo ago
Making a custom stream response that automatically calls the cleanup function is the best that I can think of your middleware could provide that function e.g.
const cleanup = createMiddleware(async (ctx, next) => {
// TODO: Middleware stuff
ctx.set("stream", myCustomStream);
});
const cleanup = createMiddleware(async (ctx, next) => {
// TODO: Middleware stuff
ctx.set("stream", myCustomStream);
});
Your route would then do
app.get("/",
cleanup,
async (ctx) => {
return ctx.var.stream(async stream => {
// do stuff
});
}
);
app.get("/",
cleanup,
async (ctx) => {
return ctx.var.stream(async stream => {
// do stuff
});
}
);
Your myCustomStream would wrap around the real stream helper, and free up the resources on end I wrote that code on my phone, expect mistakes You could possibly track if the injected stream is used but the route has returned, so all this can be in one middleware, for both normal requests and streamed requests This sounds like the cleanest and most maintainable approach to me I can make a demo tomorrow if you'd like me to
ex0ns
ex0nsOP2mo ago
Thanks for the reply, I just don't really see what you mean when you say free the resources on end ? How would you implement this part ?
ambergristle
ambergristle2mo ago
i'm not sure i totally understand. i think there's been some miscommunication have you tried using c.finalized as a flag? idk when that actually flips, but it's worth a shot this is what i was suggesting
use: async (
consume: (data: ReadableStream) => Promise<void>
) => {
await consume()
close()
}
use: async (
consume: (data: ReadableStream) => Promise<void>
) => {
await consume()
close()
}
return streamText(c, async (stream) => {
await c.var.use((readable) => {
for (const val of readable) { /** */ }
}
})
return streamText(c, async (stream) => {
await c.var.use((readable) => {
for (const val of readable) { /** */ }
}
})
ex0ns
ex0nsOP2mo ago
Thanks for clarifying, though I don't think that it would work in my case, as this is pretty specific to the simpler reproducer I created to illustrate the problem I had, in my case the real scenario is a pool of connection to postgres (using postgres.js) that are closed before I could stream all the data to the frontend, and I could indeed call postgres.end manually at the end of all the method that use a postgres connection, but I don't really like this. For now I think that I will go with the custom header to bypass the automatic cleanup + adding a cleanup function in c.var that some route would have to call whenever they are done to cleanup the resources (or maybe a custom ctx.set("stream", to replace the default .stream method from Hono and take care of closing the resources as well Thanks for all the ideas
Arjix
Arjix2mo ago
wrote this on pc
import { createMiddleware } from "hono/factory";
import { stream } from "hono/streaming";

export const clean = createMiddleware<{ Variables: { stream: typeof stream } }>(
async (ctx, next) => {
let isStream = false;
let streamPromise: Promise<void> | undefined;

ctx.set("stream", (...params: Parameters<typeof stream>) => {
const [ctx, callback, onError] = params;
const { promise, resolve, reject } = Promise.withResolvers<void>();

isStream = true;
streamPromise = promise;

return stream(ctx, async (streamApi) => {
// TODO: Custom logic here
// use resolve/reject
return await callback(streamApi);
}, onError);
});

await next();

if (isStream) {
try {
await streamPromise;
} catch { /* TODO: Do smth with the error */ }
}

// TODO: clean up resources
},
);
import { createMiddleware } from "hono/factory";
import { stream } from "hono/streaming";

export const clean = createMiddleware<{ Variables: { stream: typeof stream } }>(
async (ctx, next) => {
let isStream = false;
let streamPromise: Promise<void> | undefined;

ctx.set("stream", (...params: Parameters<typeof stream>) => {
const [ctx, callback, onError] = params;
const { promise, resolve, reject } = Promise.withResolvers<void>();

isStream = true;
streamPromise = promise;

return stream(ctx, async (streamApi) => {
// TODO: Custom logic here
// use resolve/reject
return await callback(streamApi);
}, onError);
});

await next();

if (isStream) {
try {
await streamPromise;
} catch { /* TODO: Do smth with the error */ }
}

// TODO: clean up resources
},
);
app.get("/", clean, (ctx) => {
return ctx.var.stream(ctx, async (streamApi) => {
// TODO: do stream stuff
});
});
app.get("/", clean, (ctx) => {
return ctx.var.stream(ctx, async (streamApi) => {
// TODO: do stream stuff
});
});
instead of passing streamApi to the callback, I'd make a wrapper that would call resolve on the close() and abort() methods, as well as add custom logic to the onError (calling reject()) honestly would be better if StreamingApi was an event emitter, so a wrapper would not be required I'd be careful with cleanup logic in a middleware, middlewares can be used multiple times on the same route maybe a check if stream is already defined in ctx?
ambergristle
ambergristle2mo ago
sorry, i'm really unclear on what you're trying to accomplish. i thought your middleware was meant to open a pg connection, that could be used by downstream middleware and handlers, then close it after the stream ends tbh, Hono's streaming helper is only ~30 lines. the simple path might be just to create your own version locally, modifying it as needed: https://github.com/honojs/hono/blob/eb86162a9a4472ef86329efe27007caf0afb9284/src/helper/streaming/stream.ts this approach also makes sense to me. i tried to do something similar but couldn't figure out how to manage the promises. @Arjix is there a reason you wrapped the promise construction in the stream callback instead of the middleware? ngl, i didn't know Promise.withResolvers even existed
Arjix
Arjix2mo ago
No real reason, I tend to keep variables close to where they are used
ambergristle
ambergristle2mo ago
afaik, stream would only ever be called once per request, right?
Arjix
Arjix2mo ago
Yes but what about the cleanup?
ambergristle
ambergristle2mo ago
Sorry, lol. I wasn’t really replying to that specific message. Just a habit If stream was called multiple times though, would that mess with the promise/cleanup?
Arjix
Arjix2mo ago
I'd imagine yes
ambergristle
ambergristle2mo ago
Would pulling the promise constructor into the middleware solve that problem, or just cause new ones? (Im wondering whether there’s a way to pass just the promise through context, instead of the whole stream helper)
Arjix
Arjix2mo ago
I mean, if you call middleware that is not idempotent, then you have bigger issues to worry about

Did you find this page helpful?