Skip to main content

Anatomy of a CSV Import Pipeline — Where 90% of the Work Happens Before the First Write

· 19 min read
Gabriel Paunescu
Founder CTO Neologic

Your CSV is not your data. It's a promise that needs verification, transformation, and three layers of validation before it earns the right to touch your database. Here's how a production import hook turns a tab-delimited file into trusted journal entries.

The Premise

Every data import starts with the same fantasy: upload a file, click import, done. The reality is a defensive pipeline where the database write — the part most people think of as "the import" — is the very last step, and often the simplest one. The hard work is everything that comes before it.

This article dissects a production hook that imports general ledger journal entries from an external accounting system's CSV export. The hook is 300+ lines. Fewer than 20 of those lines actually write to the database.

The Source Data Problem

Before writing a single line of import code, you need to confront an uncomfortable truth: the source data is your responsibility. The external system exported it. Your users downloaded it. Nobody in that chain guarantees it's clean.

In this pipeline, the source is a tab-delimited CSV with columns like Trans #, Date, GL Code, Debit, Credit, Memo, and Class. Each row is a single journal item line. Multiple rows share the same Trans # to form a compound journal entry.

The file could have:

  • Rows with no transaction number (separator lines, subtotals)
  • GL codes that don't exist in your chart of accounts
  • Amounts formatted with US thousands commas (1,250.00)
  • Stray quote characters from copy-paste artifacts
  • Zero-amount lines (e.g., Sales Tax Payable 0.00)

The hook's job isn't to "import a CSV." It's to distill a messy file into trusted financial documents — or reject the entire batch with a clear explanation of why.

The Pipeline

Here's the shape of the import, with the write at the end where it belongs:

flowchart LR
A["Source File"] --> B["Download"] --> C["Parse"] --> D["Clean"] --> E["Group"] --> F["Preflight Validate"] --> G["Deduplicate"] --> H["Build Payloads"] --> I["Write"] --> J["Report"]
style I fill:#16a34a,color:#fff,stroke:#15803d

Nine steps. Only one writes to the database. Let's walk through them.

Step 1: Validate the Request

Before downloading or parsing anything, validate that the request itself is well-formed:

bob.validator.joi().validateData(
{ fileUrl: bob.dataPayload.fileUrl },
SuperJoi.object({
fileUrl: SuperJoi.string().required()
}),
bob.flowUser.getUserInfo()
)

If there's no file URL, fail immediately. Don't download, don't parse, don't allocate memory. Guard clauses first.

Step 2: Load Reference Data

Before you can validate the CSV, you need to know what "valid" means. That means loading the reference datasets the import will validate against:

// -->Get: all active general ledger accounts
const generalLedgerAccounts = await generalLedgerFlowCollection.docs
.flowQuery()
.user(bob.flowUser)
.addPolicy('canRead')
.cache()
.flowOptions(eventOptions.generalLedgerAccountNaoQueryOptions)
.limit(10000)
.getMany(undefined, 'FinanceInterface.GeneralLedgerAccount')

// -->Get: warehouses
const warehouses = await warehouseFlowCollection.docs
.flowQuery()
.user(bob.flowUser)
.addPolicy('canRead')
.getDocumentStatuses(['active'], 'data')
.cache()
.flowOptions(eventOptions.warehouseNaoQueryOptions)
.limit(100)
.getMany(undefined, 'InventoryManagementInterface.Warehouse')

Notice the .cache() — these datasets are stable reference data. And notice the hard limit. You never query reference data without a ceiling.

Both queries fail fast with descriptive errors if they return empty:

if (generalLedgerAccounts.data.length === 0) {
throw naoFormatErrorById('bad_request', {
reason: 'Failed to find general ledger accounts'
})
}

No GL accounts means no chart of accounts means no point continuing. The error is specific enough that the user knows exactly what to fix.

Step 3: Ensure Preconditions

Some imports depend on system state that might not exist yet. This hook ensures fiscal periods and years are created before any journal entries are written:

await bob.executeRequestOrThrow(
eventOptions.ensureFiscalPeriodsActionCfpQuery, {}
)

This is easy to overlook. If you write a journal entry dated March 2024 and no fiscal period exists for Q1 2024, the entry either fails silently or lands in an undefined period. The hook handles this before it even looks at the CSV.

Step 4: Build the Deduplication Index

For idempotent imports — where re-running the same file shouldn't create duplicates — you need to know what already exists:

const existingJournalEntries = await journalEntryFlowCollection.docs
.flowQuery()
.user(bob.flowUser)
.addPolicy('canRead')
.flowOptions(eventOptions.journalEntryNaoQueryOptions)
.query({ 'data.name': { $regex: '^PIPE-' } })
.projection({ 'data.name': 1 })
.limit(2_000_000)
.getMany(undefined, 'FinanceInterface.JournalEntry')

const existingJournalNameSet: Set<string> = new Set(
existingJournalEntries.data.map((je) => je.data.name)
)

Two things to highlight here. First, the .projection({ 'data.name': 1 }) — we only need the name field, so we only fetch the name field. With 2 million potential records, this is the difference between a 30-second query and a timeout. Second, the Set — O(1) lookup for every deduplication check instead of scanning an array.

Step 5: Parse and Clean

Now — and only now — do we touch the CSV:

const glCsv = await new NaoCsvRunner2().loadFile({
inputFilePath: downloadToLocalTemp.filePath,
delimiter: '\t'
})

const allRowsRaw = await glCsv.readFile({ numberOfLines: Infinity })
const allRows = allRowsRaw.filter(
(row: any) => cleanValue(row['Trans #'])
)

The cleanValue helper does the unglamorous work that prevents downstream failures:

const cleanValue = (v: string | undefined): string =>
(v || '').trim().replace(/"+/g, '')

Trim whitespace. Strip stray quotes. Return an empty string instead of undefined. Every field in the CSV passes through this function before it's used anywhere. It's three lines of code that prevent dozens of edge-case bugs.

The filter step drops rows with no Trans # — subtotal rows, blank lines, section headers. These aren't data. They're presentation artifacts from the source system's export.

Step 6: Preflight Validation — The Gate

This is the most important step in the entire pipeline, and it happens before a single document is created.

The hook scans every row in the file and checks that every GL code exists in the system's chart of accounts:

const existingGlCodes: Set<string> = new Set(
generalLedgerAccounts.data.map((gl) => gl.data.code)
)
const missingGlCodeLines: Map<string, number[]> = new Map()

for (let i = 0; i < allRows.length; i++) {
const glCode = cleanValue(allRows[i]['GL Code'])
if (!existingGlCodes.has(glCode)) {
const lineNumber = i + 2 // +1 for 1-indexed, +1 for header row
if (!missingGlCodeLines.has(glCode)) {
missingGlCodeLines.set(glCode, [])
}
missingGlCodeLines.get(glCode).push(lineNumber)
}
}

If any GL codes are missing, the entire import aborts — with a detailed error listing every invalid code and the exact line numbers where they appear:

if (missingGlCodeLines.size > 0) {
for (const [code, lines] of missingGlCodeLines.entries()) {
naoLogger.error(
`Missing GL code: "${code}" — found on ${lines.length} rows: lines ${lines.join(', ')}`
)
}
throw naoFormatErrorById('bad_request', {
reason: `Import aborted — ${missingGlCodeLines.size} GL codes not found: ${[...missingGlCodeLines.keys()].join(', ')}`
})
}

This is an all-or-nothing gate. The pipeline doesn't import 9,950 valid rows and skip 50 bad ones — that would leave your ledger in a partial state. It validates everything first, then processes everything. Fail before you begin, not while you're halfway through.

Step 7: Group and Transform

CSV rows are flat. Journal entries are compound documents with a header and multiple line items. The grouping step bridges that gap:

const groupedTransactions = naoUtils._.groupBy(
allRows, (row: any) => cleanValue(row['Trans #'])
)

Each group becomes one journal entry. Within the group, every row is transformed into a typed journal item line. This is where raw strings become domain objects:

// -->Parse: amounts — strip US thousands commas then convert to number
const debitAmount = naoUtils._.toNumber(
(debitRaw || '0').replace(/,/g, '')
) || 0
const creditAmount = naoUtils._.toNumber(
(creditRaw || '0').replace(/,/g, '')
) || 0

Dates get the same treatment — string to DateTime object via naoDateTime(), never new Date():

journalEntryDate: naoDateTime(date).toJSDate(),
accountingDate: naoDateTime(date).toJSDate(),

Every field is explicitly converted to its target type. Strings become numbers. Date strings become Date objects. GL codes become document references via lookup. The CSV column Class becomes a warehouseId through a resolution function. Nothing arrives at the database in its original CSV form.

Step 8: Write — The Easy Part

After all that preparation, the actual write is anticlimactic:

await bob.executeRequestOrThrow(
eventOptions.createJournalEntryWithItemsActionCfpQuery,
payload,
{ skipSession: true, skipEvent: true }
)
countCreated++

One call per journal entry. The skipSession: true flag is deliberate — each entry auto-commits independently, avoiding MongoDB's 60-second transaction timeout on long-running imports. The skipEvent: true prevents downstream event cascades during bulk import.

Notice what's not here: no validation, no deduplication, no type conversion. All of that was handled in steps 1 through 7. The write trusts the pipeline above it.

The Preferred Alternative: flowBulk

The executeRequestOrThrow approach works here because each journal entry is a compound document — a header with nested line items — and the downstream hook (create-data-entry-for-journal-entry) handles that compound creation logic. We're delegating to a hook that knows how to build both the parent and child documents atomically.

But for simpler, flat document imports — GL accounts, contacts, products, warehouses — flowBulk is the preferred write strategy. It batches all operations into a single database round-trip, which is dramatically faster than sequential per-document calls.

Here's what the write step looks like with flowBulk:

// -->Get: the flow collection
const flowCollection = bob.flowGlobal.getFlowCollection(eventOptions.glNaoQueryOptions)

// -->Init: bulk orders
const flowBulk = flowCollection.docs.flowQuery()
.user(bob.flowUser)
.flowOptions(eventOptions.glNaoQueryOptions)
.bulkOrders()

// -->Build: bulk insert operations from prepared payloads
const flowQuery = flowCollection.docs.flowQuery()
.user(bob.flowUser)
.flowOptions(eventOptions.glNaoQueryOptions)

for (const account of preparedAccounts) {
const flowData = flowQuery.flowData().setData(account)
flowBulk.insertOne(flowData)
}

// -->Execute: all operations in a single batch
if (flowBulk.length > 0) {
data = await flowCollection.docs.runBulk(
bob.flowUser, { flowBulk }, bob.dbSession()
)
}

The performance difference is significant. Importing 10,000 GL accounts with executeRequestOrThrow in a loop takes ~500 seconds (50ms × 10,000). The same import with flowBulk takes ~2 seconds — one database round-trip for the entire batch.

When to Use Which

StrategyUse WhenTrade-off
executeRequestOrThrowCompound documents (header + lines), or when the write triggers business logic in a downstream hookSlower — one network round-trip per document, but each entry is independently committed and validated by the target hook
flowBulkFlat document imports (accounts, contacts, products) where you're writing directly to a collectionFaster — single batch operation, but you own all validation and no downstream hooks fire

The journal entry import in this article uses executeRequestOrThrow because the create-data-entry-for-journal-entry hook does more than just insert — it creates the journal header, generates line items, calculates balances, and links to fiscal periods. That logic already exists and is tested. Duplicating it inside the import hook would be a mistake.

Use flowBulk when you can. Fall back to per-entry calls when the write involves business logic you shouldn't be reimplementing.

Step 9: Monitor and Report

The hook tracks every outcome:

let countCreated = 0
let countSkipped = 0
let countDuplicate = 0
const skippedRows: { transNum: string; reason: string }[] = []

Memory usage is monitored every 30 seconds during long imports:

const memoryInterval = setInterval(() => {
const mem = process.memoryUsage()
naoLogger.log(
`Memory — heap: ${(mem.heapUsed / 1024 / 1024).toFixed(1)}MB`
)
}, 30_000)

Progress is logged every 500 transactions. And the final result gives you the full picture:

data = { countCreated, countSkipped, countDuplicate, skippedRows }

No silent failures. No "import complete" with no details. Every row is accounted for.

The Pipeline at a Glance

StepPurposeWrites to DB?
1. Validate requestReject malformed callsNo
2. Load reference dataGL accounts, warehousesNo (reads)
3. Ensure preconditionsFiscal periods existConditional
4. Build dedup indexFetch existing entriesNo (reads)
5. Parse and cleanCSV → clean rowsNo
6. Preflight validateAll-or-nothing GL checkNo
7. Group and transformRows → journal payloadsNo
8. WriteCreate journal entriesYes
9. ReportCounters, skipped rowsNo

Eight steps of preparation. One step of writing. That ratio is not an accident — it's the architecture.

Hard-Won Lessons From Production Imports

The pipeline above works. But getting it to that state involved months of production imports, failed runs, angry rollbacks, and late-night debugging sessions. These are the lessons that don't show up in the code — they show up in the decisions behind it.

1. AI Doesn't Know Your Domain — And It Will Guess Confidently

Data imports are soaked in domain knowledge. What does "Class" mean in this CSV? It's a warehouse. What's a Trans #? It's a grouping key for compound journal entries. What does a zero-amount line mean — is it an error, or is it a tax-exempt placeholder?

AI will generate a perfectly structured import hook. It will parse, loop, and write. But it will consistently get the meaning wrong because it doesn't know your chart of accounts, your fiscal calendar, your warehouse naming conventions, or what "posted" means in your system versus the source system.

Every column in the CSV carries implicit business context that the AI has never seen. The GL Code column isn't just a string — it's a foreign key into your chart of accounts, and the mapping between the source system's codes and yours might not be 1:1. The Date column isn't just a date — it determines which fiscal period the entry lands in, which affects financial reporting.

Expect to rewrite 50-70% of the AI's field mapping and business logic. Use the AI for the pipeline structure — the download, parse, loop, write skeleton. Do the domain mapping yourself.

2. Data Preparation Should Be 70% of the Work

If you're spending most of your time on the database write, you're building the pipeline backward. The write is trivial once the data is clean. The hard part — the part that takes 70% of your development time — is everything before it:

  • Understanding the source file format (delimiters, encoding, quoting)
  • Identifying and filtering junk rows (subtotals, headers, blank lines)
  • Mapping source columns to target fields
  • Converting types (string amounts with commas → numbers, date strings → Date objects)
  • Resolving foreign keys (GL code → generalLedgerAccountId, Class → warehouseId)
  • Grouping flat rows into compound documents

If your commit history shows three days on data prep and an afternoon on the write, you're doing it right. If it's the inverse, your pipeline will break on the first real file.

3. The Error Rate Must Be Zero

Not "low." Not "acceptable." Zero.

Financial data imports don't have a margin for error. If you import 10,000 journal entries and 3 are wrong, you don't have 99.97% accuracy — you have a corrupted ledger. The accountant won't find those 3 bad entries. They'll find them in six months during an audit, and by then the cascade of dependent transactions makes the fix exponentially harder.

This is why the pipeline uses an all-or-nothing preflight gate. If a single GL code is invalid, the entire import aborts. It's aggressive, and users sometimes push back — "just skip the bad ones" — but partial imports create partial ledgers, and partial ledgers create real financial risk.

Design your pipeline so it either completes perfectly or doesn't start at all.

4. Prepare the Workspace for Fast Import-Wipe Cycles

You will run the import multiple times before it's right. The first run will reveal data issues you didn't anticipate. The second run will expose edge cases in your transformation logic. The third run might actually work.

This means you need the ability to wipe and reimport quickly:

  • Tag all imported documents with a recognizable prefix (PIPE- in this hook) so you can query and delete them in bulk
  • Use skipSession: true so each document auto-commits — if you need to kill a run midway, you know exactly what was written
  • Keep the source file immutable — download it once, process it from local storage, never modify the original
  • Have a cleanup script ready that deletes all PIPE-* journal entries so you can re-run from scratch

The import-wipe-reimport cycle is your development loop. Make it fast. If a wipe takes 20 minutes, you'll only test three iterations in an hour. If it takes 30 seconds, you'll test twenty.

5. The Audit Trail Checklist

Before writing the first line of code, answer these questions:

  • Naming convention: How will imported documents be identified? (PIPE-{transNum}, IMPORT-{batchId}, etc.)
  • Deduplication strategy: What makes two entries "the same"? Name? Source ID? Content hash?
  • Audit fields: Does each imported document carry its source file reference, import timestamp, and batch ID?
  • Transaction grouping: Are compound documents (header + lines) created atomically or individually?
  • Batching strategy: One DB call per document, or batched inserts? Does the downstream hook support batch payloads?
  • Session management: Shared transaction across all writes, or independent auto-commits? (For long imports, independent commits avoid MongoDB's 60-second transaction timeout)
  • Event suppression: Should downstream events (notifications, recalculations) fire during import, or be suppressed with skipEvent: true?
  • Rollback plan: If the import is wrong, how do you delete everything it created?
  • Completion guarantee: If the process crashes at row 5,000 of 10,000, can you resume or must you wipe and restart?

Answer every one of these before you start coding. They shape the pipeline's architecture in ways that are expensive to change later.

6. Measure Every Prep Query — You'll Be Surprised

Every reference data query in the prep stage loads data into memory. You need to know how much. The instinct is to worry about "200,000 documents" — that sounds like a lot. But if you're only projecting the name field, 200,000 short strings might be 2MB. That's nothing.

On the other hand, fetching 10,000 full documents with nested arrays of line items could be 500MB.

Measure, don't guess:

const result = await fc.docs.flowQuery()
.projection({ 'data.name': 1 })
.limit(2_000_000)
.getMany()

// Log the actual size
const sizeInBytes = Buffer.byteLength(JSON.stringify(result.data))
naoLogger.log(`Dedup index: ${result.data.length} docs, ${(sizeInBytes / 1024 / 1024).toFixed(1)}MB`)

This changes your architecture decisions. If the dedup index is 2MB, load it all into a Set — fast and safe. If it's 200MB, you need a different strategy: streaming comparison, database-side dedup, or chunked lookups.

The projection operator is your best friend here. The difference between { 'data.name': 1 } and fetching full documents can be 100x in memory footprint. Always project to the minimum fields you need, and always log the actual size.

7. There Is No Room for Error — The Import Must Complete

A 50% imported ledger is worse than a 0% imported ledger. With zero imports, the customer knows nothing happened. With half the data imported, they don't know which half — and they can't trust any of it.

This has design implications:

  • Preflight everything: Validate the entire file before writing the first document. If row 9,999 has an invalid GL code, you want to know before row 1 is committed.
  • Idempotent design: If the process crashes and restarts, it should skip already-imported entries (the dedup Set) and continue — not create duplicates.
  • Progress logging: Log every 500 transactions so you can see exactly where a failure occurred. The memory monitor isn't vanity — it's your early warning system for OOM kills.
  • Independent commits: skipSession: true means each journal entry is its own commit. If the process dies at entry 5,001, entries 1–5,000 are safe and the restart picks up at 5,001.

Design for the crash. It will happen. The question is whether you recover in minutes or spend a weekend rebuilding the ledger.

8. The Validation Report Is Your Customer Deliverable

Here's a pattern that saves weeks of back-and-forth: run the pipeline in validation-only mode first.

The preflight validation step — the one that checks GL codes, resolves warehouses, and verifies fiscal periods — produces a report that is exactly what the customer needs to clean their data. Don't just throw an error. Format the validation results as a deliverable:

Import Validation Report
========================
File: general-ledger-export-2024.csv
Total rows: 12,847
Valid rows: 12,691
Skipped (no Trans #): 142
Invalid GL codes: 3
- "4510" → found on 8 rows (lines 234, 567, 891, ...)
- "6200-A" → found on 2 rows (lines 1002, 1003)
- "MISC" → found on 4 rows (lines 5501, 5502, 5503, 5504)

Action required: Add missing GL codes to chart of accounts, or update the CSV to use valid codes.

Send this to the customer before running the actual import. Nine times out of ten, this report is all they need. They fix the three GL codes in their spreadsheet, re-export, and the next run succeeds cleanly.

The validation stage isn't just a safety gate — it's a communication tool. It speaks the customer's language (GL codes, line numbers) instead of yours (stack traces, error IDs).

9. Always Benchmark at Scale — What If This File Has 10 Million Entries?

Your test file has 500 rows and runs in 3 seconds. Your production file has 200,000 rows. Your largest customer's file has 10 million. Does your pipeline survive?

Things that break at scale:

  • Memory: Loading 10 million rows into a JavaScript array. At ~1KB per parsed row object, that's 10GB. Your 8GB server is dead.
  • Dedup index: A Set of 2 million strings is fine (~150MB). A Set of 50 million strings is not.
  • Sequential writes: 10 million executeRequestOrThrow calls at 50ms each = 139 hours. You need batching or parallelism.
  • Logging: Logging every skipped row with its reason — if 2 million rows are skipped, your skippedRows array consumes more memory than the actual data.
  • Progress intervals: Logging every 500 transactions makes sense at 10,000. At 10 million, that's 20,000 log lines — still manageable, but worth verifying.

Run the numbers before you run the import:

Metric500 rows200K rows10M rows
Raw file size~50KB~20MB~1GB
Parse time< 1s~5s~60s
Memory (parsed)~500KB~200MB~10GB ❌
Dedup queryinstant~2s~30s
Sequential writes (50ms/ea)25s2.7hrs139hrs ❌

If the 10M column has red marks, your architecture needs to change — streaming parsers, chunked processing, parallel writes, or database-side dedup. Don't discover this in production.

The file you're testing with is never the file you'll receive.

The Takeaway

The instinct is to start with the database write and work backward. Production import pipelines work the other way: start with the source data and build forward through layers of validation, cleaning, and transformation until the write becomes a formality.

Data preparation is 70% of the work. The error rate must be zero. The AI will get the structure right and the domain wrong. Your validation report is your customer's favorite deliverable. And the file you tested with is never the file you'll receive in production.

Your CSV is not your data. It's raw material. The pipeline is the refinery.

Trust the data only after you've earned that trust — one step at a time.