Aggregations
Aggregations reshape data in the database instead of in application
code. Flow Query exposes three composable primitives — .aggregate(),
.lookup(), and .addFields() — that accumulate aggregation stages
into the IR. When a read terminal runs, the compiler produces the
right backend-native pipeline.
On MongoDB, the stages are MongoDB aggregation stages. On ClickHouse and PostgreSQL, the same structural operators are translated into the target backend's SQL.
.aggregate(stages, opts?)
Append arbitrary aggregation stages. The stages follow MongoDB's pipeline syntax.
const totalsByVendor = await bob.flowQuery('finance/bill')
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
{ $sort: { total: -1 } },
])
.limit(10)
.getMany()
By default the stages are appended to the compiled pipeline
(filters, views, sort, and pagination still apply). Pass
{ replace: true } to replace the pipeline outright — the stages
become the entire compiled pipeline:
.aggregate(customStages, { replace: true })
Multiple .aggregate() calls concatenate in order.
.lookup(config)
Ad-hoc $lookup stage — use when a documented view isn't available
or when you need a lookup that's specific to this query.
.lookup({
from: 'contacts',
localField: 'data.vendorId',
foreignField: 'docId',
as: 'vendor',
})
Prefer .withView('some-view') when the lookup is a reusable one —
views live on the document definition and the IR records the
name, not the pipeline, so they compose better with plugins,
caches, and analytics.
.addFields(fields)
Append computed fields to each document as an $addFields stage.
.addFields({
'data.totalFormatted': { $concat: ['$', { $toString: '$data.total' }] },
})
Accumulates over multiple calls. Use for derived fields in the
result shape when you don't want a full $project.
Running on ClickHouse
Aggregations are the idiomatic use case for routing a query to
ClickHouse. The same fluent chain applies; only the provider
changes:
const totals = await bob.flowQuery('finance/bill', { provider: 'clickhouse' })
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
])
.getMany()
If you use a stage that the target backend doesn't support, the compiler throws at compile time rather than failing silently at execution.
Composition with filters
Filter methods (.where, .status, .dataType, etc.) run as the
initial $match in the compiled pipeline, so aggregation stages
see an already-filtered set:
// Equivalent to:
// [{ $match: { status: { $in: statusesResolved } } },
// { $group: { ... } },
// { $sort: { total: -1 } }]
await bob.flowQuery('finance/bill')
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
{ $sort: { total: -1 } },
])
.getMany()
Read terminals work normally
.getMany() is the typical terminal for an aggregation, but
everything that works on a filter also works on an aggregation:
.getOne, .count, .paginate, .pluck, .distinct,
.getManyInBatches. The compiler adapts the pipeline per terminal.
Next: Plugins →