Skip to main content

Bulk

bulk() batches a set of operations into a managed execution unit. It exists for two reasons:

  1. Round-trip efficiency. Many DB writes fit in a single Mongo call instead of N calls.
  2. Ordering. Database writes run first, then side-effects (hooks and actions) run in a concurrency-limited window afterwards β€” so the side-effects see a consistent committed state.

Two-phase execution​

       bulk.add(...)                bulk.addHook(...)
bulk.add(...) bulk.addAction(...)
β”‚ β”‚
β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ PHASE 1 β€” DB β”‚ β”‚ PHASE 2 β€” Side β”‚
β”‚ single round-tripβ”‚ then β”‚ effects β”‚
β”‚ executeBulk() │──────────▢│ sliding-window β”‚
β”‚ β”‚ β”‚ concurrency β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  • Phase 1 runs first regardless of registration order β€” every add() entry is compiled, batched, and shipped as a single executeBulk(). If it fails, Phase 2 is skipped and the result is returned with ok: false.
  • Phase 2 runs hooks and actions with a sliding-window concurrency cap. Each batch uses Promise.allSettled β€” one failure doesn't derail the batch unless you opted into throwOnError: true.

Creating a bulk​

Start from any builder β€” the parent is only used for its executor and FlowUser:

const bulk = bob.flowQuery('shipping/billOfLanding').bulk()

Phase 1 β€” DB operations​

Use .add(builder) with a write-configured builder. The builder's IR is read, compiled, and included in the bulk payload. Supported configurations:

// Insert (configure via .insert(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor').skipSession().insert({
id: 'v1', name: 'Vendor A', legalEntityName: 'Vendor A LLC',
})
)

// Update via mutation (configure via .mutate().asUpdateOne(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V001')
.skipSession()
.mutate()
.set({ 'data.phoneNumber': '555-BULK' })
.asUpdateOne()
)

// Patch
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V002')
.skipSession()
.mutate()
.set({ 'data.fax': '555-FAX' })
.asPatchOne()
)

// Delete (configure via .delete(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V003')
.skipSession()
.delete()
)

.skipSession() is recommended inside a bulk β€” Mongo session contention will serialise otherwise-parallel operations for large batches.

Phase 2 β€” side-effects​

.addHook(builder, opts?) queues a hook call. .addAction(builder, opts?) queues an action call. In both cases the builder must be fully configured except for .execute() β€” the bulk calls .execute() itself during Phase 2.

bulk.addHook(
bob.executeHook('shipping-ai/ensure-sales-order-shipping-documents')
.requestPayload(salesOrder)
.documentCfpPath(bob.documentCfpPath)
.skipSession(),
)

bulk.addAction(
bob.executeAction('manufacturing/manufacturing.delete-by-id')
.requestPayload({ docId: manufacturingOrder.docId })
.documentCfpPath(bob.documentCfpPath)
.skipSession(),
)

throwOnError​

By default, a failing side-effect is reported in the result but doesn't halt the bulk. Pass { throwOnError: true } to make a specific entry critical: if it fails, execute() throws with a bad_request and subsequent entries in the queue are skipped.

bulk.addHook(
bob.executeHook('notify-slack').requestPayload(payload),
{ throwOnError: false }, // non-critical β€” default
)

bulk.addHook(
bob.executeHook('ensure-required-documents').requestPayload(payload),
{ throwOnError: true }, // critical β€” bulk throws on failure
)

Executing​

const result = await bulk.execute()

result.ok // true if Phase 1 and all non-throwing side effects passed
result.operations.insert // 3
result.operations.update // 2
result.operations.delete // 1
result.operations.skipped // operations skipped by the executor (if any)
result.sideEffects // [{ kind, ok, errors, data }] β€” only when Phase 2 ran

Concurrency for Phase 2 defaults to 10 parallel side effects. Override it to match the workload:

// Large batches β€” widen the window
await bulk.execute({ concurrency: 20 })

// Force serial execution (useful for ordering-sensitive flows)
await bulk.execute({ concurrency: 1 })

bulk.length reflects the total number of registered entries (DB + side-effects).

Behaviour rules​

  • DB first, always. Registration order between add, addHook, and addAction does not matter for phase ordering β€” DB operations always run first.
  • Phase 1 is all-or-nothing. If the underlying bulk write fails, Phase 2 is skipped entirely.
  • Phase 1 failure does not throw. The result is returned with ok: false; inspect operations to see what landed.
  • Side-effects are only reported when they ran. If Phase 1 failed or no side-effects were registered, result.sideEffects is undefined.
  • Unique numbers are batch-allocated for insert entries that use unique-number-backed document IDs β€” one $inc per document path, regardless of how many inserts target it.

Cross-collection writes share a session​

Multiple .bulk() calls against different document paths share the surrounding debe session when you don't .skipSession(), so they commit or roll back together. This is the idiom for transactional writes across several collections inside one hook:

// Both bulks see each other's writes (same session) and commit together.
const vendorBulk = bob.flowQuery('vendors/vendor').bulk()
vendorBulk.add(bob.flowQuery('vendors/vendor').insert(vendorData))
await vendorBulk.execute()

const billBulk = bob.flowQuery('finance/bill').bulk()
billBulk.add(bob.flowQuery('finance/bill').insert(billData))
await billBulk.execute()

Empty bulks​

An empty bulk is a no-op: ok: true, all operation counts 0, sideEffects undefined.

const empty = await bob.flowQuery('vendors/vendor').bulk().execute()
empty.ok // true
empty.operations.insert // 0

Next: Aggregations β†’