Skip to main content
Legacy API

This page documents the legacy @logic-bee/flow-query package, which has been superseded by @logic-bee/data-flow. See Aggregations and the migration guide for equivalents. New code should use the new API.

Aggregations

Aggregations are a way to group, join, and process data from the database. They are a powerful tool for deriving insights from your data.

aggregate()

Use the aggregate() function to run an aggregation pipeline:

// -->Get: the flow collection
const inventoryFlowCollection = bob.flowGlobal.getFlowCollection(bob.cfpPath);
// -->Set: query for stock movements
let flowQuery = inventoryFlowCollection.docs.flowQuery()
.user(bob.flowUser)
.addPolicy('canRead')
.flowOptions(bob.options.stockMovementNaoQueryOptions)
.query({
'data.status': { $in: bob.options.allowedStatus }
})
.sort({
'info.createdAt': -1
});

// -->Query: group stock movements
flowQuery = flowQuery.aggregate([
{
$match: {
...flowQuery.getQuery().query,
},
},
{
$sort: flowQuery.getQuery().queryOptions.sort
},
{
$group: {
_id: {
itemId: '$data.itemId',
},
itemId: { $first: '$data.itemId' },
name: { $first: '$data.name' },
}
},
]);

// -->Search: all the stock movements
const stockMovementData: any = await inventoryFlowCollection.docs.search<any>(
bob.flowUser,
{ flowQuery },
bob.dbSession()
);

$lookup (Joins)

You can make aggregations more complex by using $lookup to execute a pipeline on a joined document:

flowQuery = flowQuery.aggregate([
{
$match: {
...workCenterFlowQuery.getQuery().query,
},
},
{
$project: {
...workCenterFlowQuery.getQuery().queryOptions.projection,
}
},
{
'$lookup': {
'from': 'data',
'let': { 'workCenterId': '$docId' },
'as': 'workOrders',
'pipeline': [
{
'$match': {
...workOrderFlowQuery.getQuery().query,
'$expr': {
'$and': [
{ '$eq': [ '$data.workCenterId', '$$workCenterId' ] },
]
}
}
},
{
$sort: workOrderFlowQuery.getQuery().queryOptions.sort
},
{
$limit: workOrderFlowQuery.getQuery().queryOptions.limit
},
{
$project: workOrderFlowQuery.getQuery().queryOptions.projection
},
]
}
}
]);

vectorSearch()

Use the aggregate() function with $vectorSearch for vector-based similarity search:

const accuracy = 0.4;
// -->Get: the flow collection
const inventoryFlowCollection = bob.flowGlobal.getFlowCollection(bob.cfpPath);
// -->Set: query for stock movements
let flowQuery = inventoryFlowCollection.docs.flowQuery()
.user(bob.flowUser)
.addPolicy('canRead')
.flowOptions(bob.options.stockMovementNaoQueryOptions)

// -->Set: aggregation pipeline
const agg = [
{
'$vectorSearch': {
'index': 'search_files_by_knn_vectors_vertex',
'path': 'data.knnVectorVertex',
'queryVector': textEmbed.textEmbedding,
'numCandidates': 100,
'limit': 10,
}
},
{
'$project': {
...queryData.queryOptions.projection,
'cosineScore': {
'$meta': 'vectorSearchScore'
}
}
},
{
$match: {
...queryData.query,
cosineScore: { $gte: accuracy }
},
},
{
$skip: queryData.queryOptions.skip
},
{
$limit: queryData.queryOptions.limit
},
{
$sort: {
cosineScore: -1
}
},
];

// -->Add: aggregator
flowQuery = flowQuery.aggregate(agg);
// -->Search: documents
const search$ = await flowCollection.docs.search(bob.flowUser, { flowQuery });

Debug

To display the raw MongoDB aggregation pipeline (useful for pasting into Compass):

loggStringify(flowQuery.flowAggregate().getRawAggregator().pipeline)