Skip to main content

Aggregations

Aggregations reshape data in the database instead of in application code. Flow Query exposes three composable primitives — .aggregate(), .lookup(), and .addFields() — that accumulate aggregation stages into the IR. When a read terminal runs, the compiler produces the right backend-native pipeline.

On MongoDB, the stages are MongoDB aggregation stages. On ClickHouse and PostgreSQL, the same structural operators are translated into the target backend's SQL.

.aggregate(stages, opts?)

Append arbitrary aggregation stages. The stages follow MongoDB's pipeline syntax.

const totalsByVendor = await bob.flowQuery('finance/bill')
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
{ $sort: { total: -1 } },
])
.limit(10)
.getMany()

By default the stages are appended to the compiled pipeline (filters, views, sort, and pagination still apply). Pass { replace: true } to replace the pipeline outright — the stages become the entire compiled pipeline:

.aggregate(customStages, { replace: true })

Multiple .aggregate() calls concatenate in order.

.lookup(config)

Ad-hoc $lookup stage — use when a documented view isn't available or when you need a lookup that's specific to this query.

.lookup({
from: 'contacts',
localField: 'data.vendorId',
foreignField: 'docId',
as: 'vendor',
})

Prefer .withView('some-view') when the lookup is a reusable one — views live on the document definition and the IR records the name, not the pipeline, so they compose better with plugins, caches, and analytics.

.addFields(fields)

Append computed fields to each document as an $addFields stage.

.addFields({
'data.totalFormatted': { $concat: ['$', { $toString: '$data.total' }] },
})

Accumulates over multiple calls. Use for derived fields in the result shape when you don't want a full $project.

Running on ClickHouse

Aggregations are the idiomatic use case for routing a query to ClickHouse. The same fluent chain applies; only the provider changes:

const totals = await bob.flowQuery('finance/bill', { provider: 'clickhouse' })
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
])
.getMany()

If you use a stage that the target backend doesn't support, the compiler throws at compile time rather than failing silently at execution.

Composition with filters

Filter methods (.where, .status, .dataType, etc.) run as the initial $match in the compiled pipeline, so aggregation stages see an already-filtered set:

// Equivalent to:
// [{ $match: { status: { $in: statusesResolved } } },
// { $group: { ... } },
// { $sort: { total: -1 } }]
await bob.flowQuery('finance/bill')
.statusFinal()
.aggregate([
{ $group: { _id: '$data.vendorId', total: { $sum: '$data.amount' } } },
{ $sort: { total: -1 } },
])
.getMany()

Read terminals work normally

.getMany() is the typical terminal for an aggregation, but everything that works on a filter also works on an aggregation: .getOne, .count, .paginate, .pluck, .distinct, .getManyInBatches. The compiler adapts the pipeline per terminal.

Next: Plugins →