N
Nuxt2w ago
Toffe

How to Use SSE for Real-Time Updates Across Multiple Stores in Nuxt 3 with Pinia

Hey everyone! I'm working on a Nuxt 3 project using Vue, Vuetify, and Pinia, and I’m implementing real-time updates with Server-Sent Events (SSE) as described in the Nitro WebSocket guide. I’ve successfully set up an SSE endpoint at /sse, and I can receive messages without any issues. However, I’m stuck on two points: How can I send SSE messages from other API routes (like /api/updateNote.ts) to notify changes globally? How can I subscribe to these SSE updates in a way that works across my entire app, rather than manually subscribing in each store or component? For example, I want my Pinia store for "notes" to automatically update when an SSE message arrives, regardless of the current page. Any advice or best practices would be greatly appreciated! Thanks in advance!
6 Replies
kapa.ai
kapa.ai2w ago
Hi @Toffe. I'm kapa.ai, your friendly support bot. I will try to answer your question by searching through the knowledge base. :wave: ---------------------- Using SSE for Real-Time Updates with Pinia in Nuxt 3 Your question about implementing Server-Sent Events (SSE) for real-time updates across multiple Pinia stores in Nuxt 3 is a common architecture challenge. Let me help you with this based on the available information. Setting Up SSE in Nuxt 3 Nuxt 3 and Nitro have built-in support for SSE. According to the documentation, you can create an event stream using the createEventStream utility:
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'

export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Store the event stream for later use
// (We'll address this in the global messaging section)

return eventStream.send()
})
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'

export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Store the event stream for later use
// (We'll address this in the global messaging section)

return eventStream.send()
})
Sending SSE Messages from Other API Routes To send messages from other API routes (like your /api/updateNote.ts), you need a way to access active SSE connections. This requires implementing a simple pub/sub system in your Nitro server. Here's how you can approach this:
// server/utils/sseConnections.ts
let activeConnections = []

export const addConnection = (stream) => {
activeConnections.push(stream)
return () => {
activeConnections = activeConnections.filter(s => s !== stream)
}
}

export const broadcastMessage = (channel, data) => {
activeConnections.forEach(stream => {
stream.push(JSON.stringify({ channel, data }))
})
}
// server/utils/sseConnections.ts
let activeConnections = []

export const addConnection = (stream) => {
activeConnections.push(stream)
return () => {
activeConnections = activeConnections.filter(s => s !== stream)
}
}

export const broadcastMessage = (channel, data) => {
activeConnections.forEach(stream => {
stream.push(JSON.stringify({ channel, data }))
})
}
Then in your SSE endpoint:
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'
import { addConnection } from '../utils/sseConnections'
export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Register this connection
const cleanup = addConnection(eventStream)

// Clean up when connection closes
eventStream.onClosed(cleanup)

return eventStream.send()
})
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'
import { addConnection } from '../utils/sseConnections'
export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Register this connection
const cleanup = addConnection(eventStream)

// Clean up when connection closes
eventStream.onClosed(cleanup)

return eventStream.send()
})
And in your update API:
// server/api/updateNote.ts
import { eventHandler } from 'h3'
import { broadcastMessage } from '../utils/sseConnections'

export default eventHandler(async (event) => {
// Process your note update logic

// Broadcast the change to all connected clients
broadcastMessage('notes', { type: 'updated', data: updatedNote })

return { success: true }
})
// server/api/updateNote.ts
import { eventHandler } from 'h3'
import { broadcastMessage } from '../utils/sseConnections'

export default eventHandler(async (event) => {
// Process your note update logic

// Broadcast the change to all connected clients
broadcastMessage('notes', { type: 'updated', data: updatedNote })

return { success: true }
})
Subscribing to SSE Updates Across Your App For the client side, you can create a composable that manages the SSE connection and dispatches updates to your Pinia stores. This approach centralizes the SSE handling logic:
// composables/useSSE.js
import { onMounted, onUnmounted } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

export function useSSE() {
let eventSource = null

const handleMessage = (event) => {
const data = JSON.parse(event.data)

// Route messages to appropriate stores based on channel
if (data.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(data.data)
} else if (data.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(data.data)
}
}

onMounted(() => {
eventSource = new EventSource('/api/sse')
eventSource.onmessage = handleMessage
})

onUnmounted(() => {
if (eventSource) {
eventSource.close()
}
})
}
// composables/useSSE.js
import { onMounted, onUnmounted } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

export function useSSE() {
let eventSource = null

const handleMessage = (event) => {
const data = JSON.parse(event.data)

// Route messages to appropriate stores based on channel
if (data.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(data.data)
} else if (data.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(data.data)
}
}

onMounted(() => {
eventSource = new EventSource('/api/sse')
eventSource.onmessage = handleMessage
})

onUnmounted(() => {
if (eventSource) {
eventSource.close()
}
})
}
Then in your Pinia stores, implement the handler method:
// stores/noteStore.js
import { defineStore } from 'pinia'
export const useNoteStore = defineStore('notes', {
state: () => ({
notes: []
}),
actions: {
handleSSEUpdate(update) {
if (update.type === 'updated') {
// Update the specific note
const index = this.notes.findIndex(n => n.id === update.data.id)
if (index !== -1) {
this.notes[index] = update.data
}
} else if (update.type === 'created') {
this.notes.push(update.data)
} else if (update.type === 'deleted') {
this.notes = this.notes.filter(n => n.id !== update.data.id)
}
}
}
})
// stores/noteStore.js
import { defineStore } from 'pinia'
export const useNoteStore = defineStore('notes', {
state: () => ({
notes: []
}),
actions: {
handleSSEUpdate(update) {
if (update.type === 'updated') {
// Update the specific note
const index = this.notes.findIndex(n => n.id === update.data.id)
if (index !== -1) {
this.notes[index] = update.data
}
} else if (update.type === 'created') {
this.notes.push(update.data)
} else if (update.type === 'deleted') {
this.notes = this.notes.filter(n => n.id !== update.data.id)
}
}
}
})
Using the SSE Connection in Your App Finally, you can use this composable in your app.vue or a layout file to ensure it's always active:
// app.vue or layouts/default.vue
<script setup>
import { useSSE } from '~/composables/useSSE'

// This will establish the SSE connection and handle routing messages to stores
useSSE()
</script>
// app.vue or layouts/default.vue
<script setup>
import { useSSE } from '~/composables/useSSE'

// This will establish the SSE connection and handle routing messages to stores
useSSE()
</script>
Alternative Approach Using POST Requests for SSE If you need to send data to the SSE endpoint (like for authentication), you can use the POST method as described in the Nuxt documentation:
// composables/useSSE.js
import { onMounted, onUnmounted, ref } from 'vue'
export function useSSE() {
const isConnected = ref(false)
let reader = null

const connect = async () => {
try {
const response = await $fetch('/api/sse', {
method: 'POST',
body: {
// Auth data or other parameters
userId: 'user123'
},
responseType: 'stream'
})

// Create a new ReadableStream from the response
reader = response.pipeThrough(new TextDecoderStream()).getReader()
isConnected.value = true

// Process the stream
processStream()
} catch (error) {
console.error('SSE connection error:', error)
}
}

const processStream = async () => {
try {
while (true) {
const { value, done } = await reader.read()

if (done) break

// Process each chunk of data
const messages = value.split('\n').filter(Boolean)
for (const message of messages) {
try {
const data = JSON.parse(message)
// Route to appropriate store
// Similar to the previous example
} catch (e) {
console.error('Error parsing SSE message:', e)
}
}
}
} catch (error) {
console.error('Error reading SSE stream:', error)
} finally {
isConnected.value = false
}
}

onMounted(() => {
connect()
})

onUnmounted(() => {
if (reader) {
reader.cancel()
}
})

return { isConnected }
}
// composables/useSSE.js
import { onMounted, onUnmounted, ref } from 'vue'
export function useSSE() {
const isConnected = ref(false)
let reader = null

const connect = async () => {
try {
const response = await $fetch('/api/sse', {
method: 'POST',
body: {
// Auth data or other parameters
userId: 'user123'
},
responseType: 'stream'
})

// Create a new ReadableStream from the response
reader = response.pipeThrough(new TextDecoderStream()).getReader()
isConnected.value = true

// Process the stream
processStream()
} catch (error) {
console.error('SSE connection error:', error)
}
}

const processStream = async () => {
try {
while (true) {
const { value, done } = await reader.read()

if (done) break

// Process each chunk of data
const messages = value.split('\n').filter(Boolean)
for (const message of messages) {
try {
const data = JSON.parse(message)
// Route to appropriate store
// Similar to the previous example
} catch (e) {
console.error('Error parsing SSE message:', e)
}
}
}
} catch (error) {
console.error('Error reading SSE stream:', error)
} finally {
isConnected.value = false
}
}

onMounted(() => {
connect()
})

onUnmounted(() => {
if (reader) {
reader.cancel()
}
})

return { isConnected }
}
This approach is documented in the Nuxt data fetching guide for consuming SSE via POST requests Nuxt Data Fetching Guide. By implementing this architecture, you'll have a centralized system for real-time updates that works across your entire Nuxt application, with each Pinia store handling its own update logic. ---------------------- Note: you can ask me a follow up question by @ mentioning me again :speech_balloon: ----------------------
kapa.ai
kapa.ai2w ago
<@260145305754927104> Kapa.ai is still learning and improving, please let me know how I did by reacting below.
Toffe
ToffeOP2w ago
@kapa.ai please rephrase in typescript only.
kapa.ai
kapa.ai2w ago
Let me check the knowledge base for you again. One moment! :books: ---------------------- I'll provide a TypeScript implementation of the SSE solution for real-time updates across multiple Pinia stores in Nuxt 3. Server-Side Implementation
// server/utils/sseConnections.ts
import { H3Event } from 'h3'
import { EventStream } from 'h3'

type MessageData = Record<string, any>

interface SSEMessage {
channel: string;
data: MessageData;
}

let activeConnections: EventStream[] = []

export const addConnection = (stream: EventStream): () => void => {
activeConnections.push(stream)
return () => {
activeConnections = activeConnections.filter(s => s !== stream)
}
}

export const broadcastMessage = (channel: string, data: MessageData): void => {
activeConnections.forEach(stream => {
stream.push(JSON.stringify({ channel, data }))
})
}
// server/utils/sseConnections.ts
import { H3Event } from 'h3'
import { EventStream } from 'h3'

type MessageData = Record<string, any>

interface SSEMessage {
channel: string;
data: MessageData;
}

let activeConnections: EventStream[] = []

export const addConnection = (stream: EventStream): () => void => {
activeConnections.push(stream)
return () => {
activeConnections = activeConnections.filter(s => s !== stream)
}
}

export const broadcastMessage = (channel: string, data: MessageData): void => {
activeConnections.forEach(stream => {
stream.push(JSON.stringify({ channel, data }))
})
}
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'
import { addConnection } from '../utils/sseConnections'

export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Register this connection
const cleanup = addConnection(eventStream)

// Clean up when connection closes
eventStream.onClosed(cleanup)

return eventStream.send()
})
// server/api/sse.ts
import { eventHandler, createEventStream } from 'h3'
import { addConnection } from '../utils/sseConnections'

export default eventHandler((event) => {
const eventStream = createEventStream(event)

// Register this connection
const cleanup = addConnection(eventStream)

// Clean up when connection closes
eventStream.onClosed(cleanup)

return eventStream.send()
})
// server/api/updateNote.ts
import { eventHandler, readBody } from 'h3'
import { broadcastMessage } from '../utils/sseConnections'

interface Note {
id: string;
title: string;
content: string;
// Add other properties as needed
}

export default eventHandler(async (event) => {
// Process your note update logic
const body = await readBody<Note>(event)

// Your database update logic here
const updatedNote: Note = { ...body }

// Broadcast the change to all connected clients
broadcastMessage('notes', { type: 'updated', data: updatedNote })

return { success: true }
})
// server/api/updateNote.ts
import { eventHandler, readBody } from 'h3'
import { broadcastMessage } from '../utils/sseConnections'

interface Note {
id: string;
title: string;
content: string;
// Add other properties as needed
}

export default eventHandler(async (event) => {
// Process your note update logic
const body = await readBody<Note>(event)

// Your database update logic here
const updatedNote: Note = { ...body }

// Broadcast the change to all connected clients
broadcastMessage('notes', { type: 'updated', data: updatedNote })

return { success: true }
})
Client-Side Implementation
// composables/useSSE.ts
import { onMounted, onUnmounted, ref } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

interface SSEMessage {
channel: string;
data: {
type: 'created' | 'updated' | 'deleted';
data: any;
};
}

export function useSSE() {
let eventSource: EventSource | null = null
const isConnected = ref(false)

const handleMessage = (event: MessageEvent) => {
try {
const message: SSEMessage = JSON.parse(event.data)

// Route messages to appropriate stores based on channel
if (message.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(message.data)
} else if (message.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(message.data)
}
} catch (error) {
console.error('Error parsing SSE message:', error)
}
}

onMounted(() => {
eventSource = new EventSource('/api/sse')
eventSource.onmessage = handleMessage
eventSource.onopen = () => { isConnected.value = true }
eventSource.onerror = () => { isConnected.value = false }
})

onUnmounted(() => {
if (eventSource) {
eventSource.close()
eventSource = null
}
})

return { isConnected }
}
// composables/useSSE.ts
import { onMounted, onUnmounted, ref } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

interface SSEMessage {
channel: string;
data: {
type: 'created' | 'updated' | 'deleted';
data: any;
};
}

export function useSSE() {
let eventSource: EventSource | null = null
const isConnected = ref(false)

const handleMessage = (event: MessageEvent) => {
try {
const message: SSEMessage = JSON.parse(event.data)

// Route messages to appropriate stores based on channel
if (message.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(message.data)
} else if (message.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(message.data)
}
} catch (error) {
console.error('Error parsing SSE message:', error)
}
}

onMounted(() => {
eventSource = new EventSource('/api/sse')
eventSource.onmessage = handleMessage
eventSource.onopen = () => { isConnected.value = true }
eventSource.onerror = () => { isConnected.value = false }
})

onUnmounted(() => {
if (eventSource) {
eventSource.close()
eventSource = null
}
})

return { isConnected }
}
// stores/noteStore.ts
import { defineStore } from 'pinia'

interface Note {
id: string;
title: string;
content: string;
// Add other properties as needed
}

interface SSEUpdate {
type: 'created' | 'updated' | 'deleted';
data: Note;
}
export const useNoteStore = defineStore('notes', {
state: () => ({
notes: [] as Note[]
}),
actions: {
handleSSEUpdate(update: SSEUpdate): void {
if (update.type === 'updated') {
// Update the specific note
const index = this.notes.findIndex(n => n.id === update.data.id)
if (index !== -1) {
this.notes[index] = update.data
}
} else if (update.type === 'created') {
this.notes.push(update.data)
} else if (update.type === 'deleted') {
this.notes = this.notes.filter(n => n.id !== update.data.id)
}
}
}
})
// stores/noteStore.ts
import { defineStore } from 'pinia'

interface Note {
id: string;
title: string;
content: string;
// Add other properties as needed
}

interface SSEUpdate {
type: 'created' | 'updated' | 'deleted';
data: Note;
}
export const useNoteStore = defineStore('notes', {
state: () => ({
notes: [] as Note[]
}),
actions: {
handleSSEUpdate(update: SSEUpdate): void {
if (update.type === 'updated') {
// Update the specific note
const index = this.notes.findIndex(n => n.id === update.data.id)
if (index !== -1) {
this.notes[index] = update.data
}
} else if (update.type === 'created') {
this.notes.push(update.data)
} else if (update.type === 'deleted') {
this.notes = this.notes.filter(n => n.id !== update.data.id)
}
}
}
})
// Alternative approach using POST for SSE
// composables/useSSEWithPost.ts
import { onMounted, onUnmounted, ref } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

interface SSEMessage {
channel: string;
data: {
type: 'created' | 'updated' | 'deleted';
data: any;
};
}
export function useSSEWithPost() {
const isConnected = ref(false)
let reader: ReadableStreamDefaultReader<string> | null = null

const connect = async (): Promise<void> => {
try {
const response = await $fetch<ReadableStream>('/api/sse', {
method: 'POST',
body: {
// Auth data or other parameters
userId: 'user123'
},
responseType: 'stream'
})

// Create a new ReadableStream from the response
reader = response.pipeThrough(new TextDecoderStream()).getReader()
isConnected.value = true

// Process the stream
processStream()
} catch (error) {
console.error('SSE connection error:', error)
}
}

const processStream = async (): Promise<void> => {
if (!reader) return

try {
while (true) {
const { value, done } = await reader.read()

if (done) break

// Process each chunk of data
const messages = value.split('\n').filter(Boolean)
for (const message of messages) {
try {
const data: SSEMessage = JSON.parse(message)

// Route to appropriate store
if (data.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(data.data)
} else if (data.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(data.data)
}
} catch (e) {
console.error('Error parsing SSE message:', e)
}
}
}
} catch (error) {
console.error('Error reading SSE stream:', error)
} finally {
isConnected.value = false
}
}

onMounted(() => {
connect()
})

onUnmounted(() => {
if (reader) {
reader.cancel()
reader = null
}
})
return { isConnected }
}
// Alternative approach using POST for SSE
// composables/useSSEWithPost.ts
import { onMounted, onUnmounted, ref } from 'vue'
import { useNoteStore } from '~/stores/noteStore'
import { useOtherStore } from '~/stores/otherStore'

interface SSEMessage {
channel: string;
data: {
type: 'created' | 'updated' | 'deleted';
data: any;
};
}
export function useSSEWithPost() {
const isConnected = ref(false)
let reader: ReadableStreamDefaultReader<string> | null = null

const connect = async (): Promise<void> => {
try {
const response = await $fetch<ReadableStream>('/api/sse', {
method: 'POST',
body: {
// Auth data or other parameters
userId: 'user123'
},
responseType: 'stream'
})

// Create a new ReadableStream from the response
reader = response.pipeThrough(new TextDecoderStream()).getReader()
isConnected.value = true

// Process the stream
processStream()
} catch (error) {
console.error('SSE connection error:', error)
}
}

const processStream = async (): Promise<void> => {
if (!reader) return

try {
while (true) {
const { value, done } = await reader.read()

if (done) break

// Process each chunk of data
const messages = value.split('\n').filter(Boolean)
for (const message of messages) {
try {
const data: SSEMessage = JSON.parse(message)

// Route to appropriate store
if (data.channel === 'notes') {
const noteStore = useNoteStore()
noteStore.handleSSEUpdate(data.data)
} else if (data.channel === 'other') {
const otherStore = useOtherStore()
otherStore.handleSSEUpdate(data.data)
}
} catch (e) {
console.error('Error parsing SSE message:', e)
}
}
}
} catch (error) {
console.error('Error reading SSE stream:', error)
} finally {
isConnected.value = false
}
}

onMounted(() => {
connect()
})

onUnmounted(() => {
if (reader) {
reader.cancel()
reader = null
}
})
return { isConnected }
}
// app.vue or layouts/default.vue
<script setup lang="ts">
import { useSSE } from '~/composables/useSSE'

// This will establish the SSE connection and handle routing messages to stores
const { isConnected } = useSSE()
</script>
// app.vue or layouts/default.vue
<script setup lang="ts">
import { useSSE } from '~/composables/useSSE'

// This will establish the SSE connection and handle routing messages to stores
const { isConnected } = useSSE()
</script>
This TypeScript implementation provides type safety throughout your SSE system, from server-side connections to client-side store updates. The approach using POST requests for SSE is based on the pattern described in the Nuxt Data Fetching Guide. ---------------------- Note: you can ask me a follow up question by @ mentioning me again :speech_balloon: ----------------------
kapa.ai
kapa.ai2w ago
<@260145305754927104> Kapa.ai is still learning and improving, please let me know how I did by reacting below.
Toffe
ToffeOP2w ago
For humans: Is this a good method to "lock" documents e.t.c. in real time so other people do not edit them? 🙂 I want to load all data into the browser, then once something is updated / edited they do pull

Did you find this page helpful?