Skip to main content

AI-Assisted Data Import Pipelines: A Real-World Case Study

· 4 min read
Gabriel Paunescu
Founder CTO Neologic

Importing 10,000 general ledger accounts shouldn't require 10,000 lines of code. Here's how we built a complete data import pipeline with AI — and the six places where the developer had to step in.

The Premise

Data import is one of ERP's most common, tedious, and error-prone workflows. Parse a file, validate rows, map fields, handle duplicates, write to the database, and report errors — all with transaction safety and tenant isolation. It's the perfect candidate for AI generation because the structure is repetitive, but the business logic is unique per import type.

The Story

The team needed to import general ledger (GL) accounts from a client's CSV export. The import pipeline needed to:

  1. Parse a CSV file with 10,000+ rows
  2. Validate each row against a GL account schema
  3. Skip duplicate account numbers
  4. Create new accounts in the finance/general-ledger collection
  5. Report successes, skips, and failures in a structured response

A developer prompted the AI to build the entire pipeline. The AI generated 90% of the working code. But six critical interventions turned it from "mostly working" into "production-ready."

The Pipeline Architecture

CSV Upload → Parse → Validate → Deduplicate → Insert → Report

Each step maps to a section of the hook:

@LogicHook({
name: 'ImportPipeGeneralLedgerAccounts',
path: 'import-pipe/general-ledger-accounts',
library: 'import-pipe',
method: 'general-ledger-accounts',
legacy: 'app/app.importPipeHooks.generalLedgerAccounts'
})

Where the Developer Stepped In

Intervention 1: CSV Parsing Edge Cases

The AI used a basic split-by-comma approach. Real CSV files have quoted fields, embedded commas, and encoding issues.

// ❌ AI-generated: naive parsing
const rows = csvContent.split('\n').map(line => line.split(','))

// ✅ Developer fix: use the project's CSV utility
const rows = naoUtils.csv.parse(csvContent, {
header: true,
skipEmptyLines: true,
dynamicTyping: false // keep everything as strings for validation
})

Intervention 2: Batch Size for Database Operations

The AI generated a loop that inserted one document at a time — 10,000 individual insert operations.

// ❌ AI approach: one-by-one inserts
for (const account of validAccounts) {
await fc.docs.flowQuery()
.user(bob.flowUser)
.flowOptions(eventOptions.glNaoQueryOptions)
.insertOne(account, bob.dbSession())
}

// ✅ Developer fix: batch inserts
const BATCH_SIZE = 500
for (let i = 0; i < validAccounts.length; i += BATCH_SIZE) {
const batch = validAccounts.slice(i, i + BATCH_SIZE)
await fc.docs.flowQuery()
.user(bob.flowUser)
.flowOptions(eventOptions.glNaoQueryOptions)
.insertMany(batch, bob.dbSession())
}

Intervention 3: Duplicate Detection Strategy

The AI checked for duplicates by querying each account number individually. The developer replaced it with a single bulk query:

// ✅ Fetch all existing account numbers in one query
const existing = await fc.docs.flowQuery()
.user(bob.flowUser)
.flowOptions(eventOptions.glNaoQueryOptions)
.query({ 'data.accountNumber': { $in: incomingAccountNumbers } })
.project({ 'data.accountNumber': 1 })
.getMany(undefined, 'FinanceInterface.GeneralLedgerAccount')

const existingSet = new Set(existing.map(e => e.data.accountNumber))
const newAccounts = validAccounts.filter(a => !existingSet.has(a.data.accountNumber))

Intervention 4: Validation Schema

The AI generated a validation schema that was too permissive — accepting any string for account type. The developer added enum constraints:

// ✅ Strict validation
const accountSchema = SuperJoi.object({
accountNumber: SuperJoi.string().pattern(/^\d{4,6}$/).required(),
accountName: SuperJoi.string().max(200).required(),
accountType: SuperJoi.string().valid(
'asset', 'liability', 'equity', 'revenue', 'expense'
).required(),
parentAccount: SuperJoi.string().allow(null, ''),
isActive: SuperJoi.boolean().default(true)
})

Intervention 5: Error Reporting Granularity

The AI returned a generic "X succeeded, Y failed" message. The developer added row-level error reporting:

// ✅ Row-level reporting
const report = {
total: rows.length,
created: newAccounts.length,
skipped: existingSet.size,
errors: validationErrors.map(e => ({
row: e.rowNumber,
accountNumber: e.data?.accountNumber || 'unknown',
reason: e.error.message
}))
}

Intervention 6: Memory Management

With 10,000+ rows, the AI was holding all raw data, parsed data, and results in memory simultaneously. The developer added streaming-style processing:

// ✅ Process in chunks, release references
let processedCount = 0
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
const chunk = rows.slice(i, i + BATCH_SIZE)
const { valid, errors } = validateChunk(chunk, i)
validationErrors.push(...errors)
const newInChunk = valid.filter(a => !existingSet.has(a.data.accountNumber))
if (newInChunk.length > 0) {
await insertBatch(fc, bob, newInChunk, eventOptions)
processedCount += newInChunk.length
}
// chunk goes out of scope, eligible for GC
}

The Final Result

MetricAI-GeneratedAfter Intervention
Parse strategySplit by commanaoUtils.csv.parse()
Insert strategyOne-by-one (10,000 ops)Batch of 500 (20 ops)
Duplicate checkIndividual queriesSingle bulk query
ValidationPermissive schemaStrict enums + patterns
Error reportingSummary onlyRow-level details
Memory usageAll-in-memoryChunked processing

The Takeaway

AI generates excellent pipeline structure. The developer adds:

  1. Domain-specific parsing — real-world files aren't clean
  2. Performance optimization — batch operations, bulk queries
  3. Strict validation — business rules the AI doesn't know
  4. Detailed reporting — users need row-level feedback
  5. Resource management — memory and connection awareness

The AI builds the highway. You paint the lane markings.