Bulk
bulk() batches a set of operations into a managed execution unit.
It exists for two reasons:
- Round-trip efficiency. Many DB writes fit in a single Mongo call instead of N calls.
- Ordering. Database writes run first, then side-effects (hooks and actions) run in a concurrency-limited window afterwards β so the side-effects see a consistent committed state.
Two-phase executionβ
bulk.add(...) bulk.addHook(...)
bulk.add(...) bulk.addAction(...)
β β
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββ
β PHASE 1 β DB β β PHASE 2 β Side β
β single round-tripβ then β effects β
β executeBulk() ββββββββββββΆβ sliding-window β
β β β concurrency β
ββββββββββββββββββββ ββββββββββββββββββββ
- Phase 1 runs first regardless of registration order β
every
add()entry is compiled, batched, and shipped as a singleexecuteBulk(). If it fails, Phase 2 is skipped and the result is returned withok: false. - Phase 2 runs hooks and actions with a sliding-window concurrency
cap. Each batch uses
Promise.allSettledβ one failure doesn't derail the batch unless you opted intothrowOnError: true.
Creating a bulkβ
Start from any builder β the parent is only used for its executor
and FlowUser:
const bulk = bob.flowQuery('shipping/billOfLanding').bulk()
Phase 1 β DB operationsβ
Use .add(builder) with a write-configured builder. The builder's
IR is read, compiled, and included in the bulk payload. Supported
configurations:
// Insert (configure via .insert(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor').skipSession().insert({
id: 'v1', name: 'Vendor A', legalEntityName: 'Vendor A LLC',
})
)
// Update via mutation (configure via .mutate().asUpdateOne(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V001')
.skipSession()
.mutate()
.set({ 'data.phoneNumber': '555-BULK' })
.asUpdateOne()
)
// Patch
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V002')
.skipSession()
.mutate()
.set({ 'data.fax': '555-FAX' })
.asPatchOne()
)
// Delete (configure via .delete(), no terminal)
bulk.add(
bob.flowQuery('vendors/vendor')
.docId('V003')
.skipSession()
.delete()
)
.skipSession() is recommended inside a bulk β Mongo session
contention will serialise otherwise-parallel operations for large
batches.
Phase 2 β side-effectsβ
.addHook(builder, opts?) queues a hook call. .addAction(builder, opts?) queues an action call. In both cases the builder must be
fully configured except for .execute() β the bulk calls
.execute() itself during Phase 2.
bulk.addHook(
bob.executeHook('shipping-ai/ensure-sales-order-shipping-documents')
.requestPayload(salesOrder)
.documentCfpPath(bob.documentCfpPath)
.skipSession(),
)
bulk.addAction(
bob.executeAction('manufacturing/manufacturing.delete-by-id')
.requestPayload({ docId: manufacturingOrder.docId })
.documentCfpPath(bob.documentCfpPath)
.skipSession(),
)
throwOnErrorβ
By default, a failing side-effect is reported in the result but
doesn't halt the bulk. Pass { throwOnError: true } to make a
specific entry critical: if it fails, execute() throws with a
bad_request and subsequent entries in the queue are skipped.
bulk.addHook(
bob.executeHook('notify-slack').requestPayload(payload),
{ throwOnError: false }, // non-critical β default
)
bulk.addHook(
bob.executeHook('ensure-required-documents').requestPayload(payload),
{ throwOnError: true }, // critical β bulk throws on failure
)
Executingβ
const result = await bulk.execute()
result.ok // true if Phase 1 and all non-throwing side effects passed
result.operations.insert // 3
result.operations.update // 2
result.operations.delete // 1
result.operations.skipped // operations skipped by the executor (if any)
result.sideEffects // [{ kind, ok, errors, data }] β only when Phase 2 ran
Concurrency for Phase 2 defaults to 10 parallel side effects. Override it to match the workload:
// Large batches β widen the window
await bulk.execute({ concurrency: 20 })
// Force serial execution (useful for ordering-sensitive flows)
await bulk.execute({ concurrency: 1 })
bulk.length reflects the total number of registered entries
(DB + side-effects).
Behaviour rulesβ
- DB first, always. Registration order between
add,addHook, andaddActiondoes not matter for phase ordering β DB operations always run first. - Phase 1 is all-or-nothing. If the underlying bulk write fails, Phase 2 is skipped entirely.
- Phase 1 failure does not throw. The result is returned with
ok: false; inspectoperationsto see what landed. - Side-effects are only reported when they ran. If Phase 1 failed
or no side-effects were registered,
result.sideEffectsisundefined. - Unique numbers are batch-allocated for insert entries that use
unique-number-backed document IDs β one
$incper document path, regardless of how many inserts target it.
Cross-collection writes share a sessionβ
Multiple .bulk() calls against different document paths share the
surrounding debe session when you don't .skipSession(), so they
commit or roll back together. This is the idiom for transactional
writes across several collections inside one hook:
// Both bulks see each other's writes (same session) and commit together.
const vendorBulk = bob.flowQuery('vendors/vendor').bulk()
vendorBulk.add(bob.flowQuery('vendors/vendor').insert(vendorData))
await vendorBulk.execute()
const billBulk = bob.flowQuery('finance/bill').bulk()
billBulk.add(bob.flowQuery('finance/bill').insert(billData))
await billBulk.execute()
Empty bulksβ
An empty bulk is a no-op: ok: true, all operation counts 0,
sideEffects undefined.
const empty = await bob.flowQuery('vendors/vendor').bulk().execute()
empty.ok // true
empty.operations.insert // 0
Next: Aggregations β