skreeus
skreeus
DTDrizzle Team
Created by skreeus on 7/28/2024 in #help
Getting 'error: write CONNECTION_CLOSED' when trying to seed with stream-json from a 100MB file
I've run into a problem when trying to utilize a stream to load my data to my DB. All of my other seeds are working but this one encounters the connection error. I'm leveraging the stream-json package with drizzle-orm and postgres-js to stream single json objects in an effort to handle a larger than 100 MB JSON file. The seed function I wrote is as follows:
import { eq } from 'drizzle-orm'
import type db from '@/db'
import * as schema from '@/models'
const sets = './src/db/seeds/data/sets.json'

import { chain } from 'stream-chain'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import * as fs from 'fs'

export default async function seed(db: db) {
const pipeline = chain([fs.createReadStream(sets), parser(), streamArray()])

let objectCounter = 0

pipeline.on('data', async (data) => {
const newSet: Set = { ...data.value }

await db.transaction(async (trx) => {
const idReturned = await trx
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, newSet.set_type))

console.log('idReturned[0].id: ' + idReturned[0].id)

await trx
.insert(set)
.values({
...newSet
})
.returning()
objectCounter++
})
})

await pipeline.on('end', () => console.log(`Found ${objectCounter} sets.`))
await pipeline.on('error', (err) => console.log('Error: ', err))
}
import { eq } from 'drizzle-orm'
import type db from '@/db'
import * as schema from '@/models'
const sets = './src/db/seeds/data/sets.json'

import { chain } from 'stream-chain'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import * as fs from 'fs'

export default async function seed(db: db) {
const pipeline = chain([fs.createReadStream(sets), parser(), streamArray()])

let objectCounter = 0

pipeline.on('data', async (data) => {
const newSet: Set = { ...data.value }

await db.transaction(async (trx) => {
const idReturned = await trx
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, newSet.set_type))

console.log('idReturned[0].id: ' + idReturned[0].id)

await trx
.insert(set)
.values({
...newSet
})
.returning()
objectCounter++
})
})

await pipeline.on('end', () => console.log(`Found ${objectCounter} sets.`))
await pipeline.on('error', (err) => console.log('Error: ', err))
}
3 replies