Getting 'error: write CONNECTION_CLOSED' when trying to seed with stream-json from a 100MB file
I've run into a problem when trying to utilize a stream to load my data to my DB.
All of my other seeds are working but this one encounters the connection error.
I'm leveraging the stream-json package with drizzle-orm and postgres-js to stream single json objects in an effort to handle a larger than 100 MB JSON file. The seed function I wrote is as follows:
import { eq } from 'drizzle-orm'
import type db from '@/db'
import * as schema from '@/models'
const sets = './src/db/seeds/data/sets.json'
import { chain } from 'stream-chain'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import * as fs from 'fs'
export default async function seed(db: db) {
const pipeline = chain([fs.createReadStream(sets), parser(), streamArray()])
let objectCounter = 0
pipeline.on('data', async (data) => {
const newSet: Set = { ...data.value }
await db.transaction(async (trx) => {
const idReturned = await trx
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, newSet.set_type))
console.log('idReturned[0].id: ' + idReturned[0].id)
await trx
.insert(set)
.values({
...newSet
})
.returning()
objectCounter++
})
})
await pipeline.on('end', () => console.log(`Found ${objectCounter} sets.`))
await pipeline.on('error', (err) => console.log('Error: ', err))
}
import { eq } from 'drizzle-orm'
import type db from '@/db'
import * as schema from '@/models'
const sets = './src/db/seeds/data/sets.json'
import { chain } from 'stream-chain'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import * as fs from 'fs'
export default async function seed(db: db) {
const pipeline = chain([fs.createReadStream(sets), parser(), streamArray()])
let objectCounter = 0
pipeline.on('data', async (data) => {
const newSet: Set = { ...data.value }
await db.transaction(async (trx) => {
const idReturned = await trx
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, newSet.set_type))
console.log('idReturned[0].id: ' + idReturned[0].id)
await trx
.insert(set)
.values({
...newSet
})
.returning()
objectCounter++
})
})
await pipeline.on('end', () => console.log(`Found ${objectCounter} sets.`))
await pipeline.on('error', (err) => console.log('Error: ', err))
}
1 Reply
I resolved my issue with the following code. I'm certain it could be cleaned up but right now I'm just happy it works.
import { eq, exists } from 'drizzle-orm'
import type db from '@/db'
import { set, type NewSet } from './src/api/v1/sets/set.model'
import { setType } from './src/api/v1/set_types/set_type.model'
const sets = './src/db/seeds/data/sets.json'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import { createReadStream } from 'fs'
export default async function seed(db: db) {
const pipeline = createReadStream(sets).pipe(parser()).pipe(streamArray())
let objectCounter = 0
let erroredObjects: Array<NewSet> = []
let newSet: NewSet
for await (const { value } of pipeline) {
try {
// Look up the set_type_id
const setTypeResult = await db
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, value.set_type))
.limit(1)
if (setTypeResult.length === 0) {
console.warn(`Set type not found: ${value.set_type}`)
continue
}
const setTypeId = setTypeResult[0].id
newSet = {
<params>
}
if (newSet.parentSetCode !== null && newSet.parentSetCode !== undefined && newSet.parentSetCode !== '') {
const setExists = await db.select().from(set).where(eq(set.setCode, newSet.parentSetCode))
if (setExists.length === 0) {
console.error(`Parent set not found: ${newSet.parentSetCode}`)
erroredObjects.unshift(newSet)
continue
}
}
// Insert into the set table
await db.insert(set).values({
...newSet,
})
objectCounter++
if (objectCounter % 1000 === 0) {
console.log(`Processed ${objectCounter} sets`)
}
} catch (error) {
console.error(`setCode: ${value.code}\nError processing set:\n${error}`)
}
}
await clearForeignKeyConstraintErrors(erroredObjects, db)
}
import { eq, exists } from 'drizzle-orm'
import type db from '@/db'
import { set, type NewSet } from './src/api/v1/sets/set.model'
import { setType } from './src/api/v1/set_types/set_type.model'
const sets = './src/db/seeds/data/sets.json'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import { createReadStream } from 'fs'
export default async function seed(db: db) {
const pipeline = createReadStream(sets).pipe(parser()).pipe(streamArray())
let objectCounter = 0
let erroredObjects: Array<NewSet> = []
let newSet: NewSet
for await (const { value } of pipeline) {
try {
// Look up the set_type_id
const setTypeResult = await db
.select({ id: setType.setTypeId })
.from(setType)
.where(eq(setType.name, value.set_type))
.limit(1)
if (setTypeResult.length === 0) {
console.warn(`Set type not found: ${value.set_type}`)
continue
}
const setTypeId = setTypeResult[0].id
newSet = {
<params>
}
if (newSet.parentSetCode !== null && newSet.parentSetCode !== undefined && newSet.parentSetCode !== '') {
const setExists = await db.select().from(set).where(eq(set.setCode, newSet.parentSetCode))
if (setExists.length === 0) {
console.error(`Parent set not found: ${newSet.parentSetCode}`)
erroredObjects.unshift(newSet)
continue
}
}
// Insert into the set table
await db.insert(set).values({
...newSet,
})
objectCounter++
if (objectCounter % 1000 === 0) {
console.log(`Processed ${objectCounter} sets`)
}
} catch (error) {
console.error(`setCode: ${value.code}\nError processing set:\n${error}`)
}
}
await clearForeignKeyConstraintErrors(erroredObjects, db)
}
async function clearForeignKeyConstraintErrors(erroredSets: Array<NewSet>, db: db) {
let newObjects: Array<NewSet> = []
for await (const eSet of erroredSets) {
try {
if (eSet.parentSetCode !== null && eSet.parentSetCode !== undefined && eSet.parentSetCode !== '') {
const setExists = await db.select().from(set).where(eq(set.setCode, eSet.parentSetCode))
if (setExists.length === 0) {
newObjects.unshift(eSet)
}
}
await db.insert(set).values({
...eSet,
})
} catch (error) {
console.error(`Error processing set:\n${eSet.name}\n${eSet.setCode}\n${eSet.parentSetCode}\n${error}\n`)
}
}
if (newObjects.length > 0) {
await clearForeignKeyConstraintErrors(newObjects, db)
}
return
}
async function clearForeignKeyConstraintErrors(erroredSets: Array<NewSet>, db: db) {
let newObjects: Array<NewSet> = []
for await (const eSet of erroredSets) {
try {
if (eSet.parentSetCode !== null && eSet.parentSetCode !== undefined && eSet.parentSetCode !== '') {
const setExists = await db.select().from(set).where(eq(set.setCode, eSet.parentSetCode))
if (setExists.length === 0) {
newObjects.unshift(eSet)
}
}
await db.insert(set).values({
...eSet,
})
} catch (error) {
console.error(`Error processing set:\n${eSet.name}\n${eSet.setCode}\n${eSet.parentSetCode}\n${error}\n`)
}
}
if (newObjects.length > 0) {
await clearForeignKeyConstraintErrors(newObjects, db)
}
return
}