Skip to main content

Aggregation

There are various forms of processing results of a secondary index query. A common form is aggregation, where you apply a function on the entire results set of a query.

Many developers use SQL for define aggregation queries against a database. For example, the following SQL statement counts rows from the database:

SELECT count(*)
FROM test.demo
WHERE d = 50

This counts the number of records in test.demo that contain a column d equal 50. In Aerospike you can use UDFs and queries to do this.

Read these sections before continuing:

Defining Query

Read Query Records to learn how define a query:

as_query query;
as_query_init(&query, "test", "demoset");

as_query_where_inita(&query, 1);
as_query_where(&query, "d", integer_equals(50));

as_query_apply(&query, "mymodule", "mycount", NULL);

Use this query object to query for records in the Namespace test within set demoset, looking for records with bin name d and an integer value of 50. The mycount() Stream UDF is applied on the result set of the query. (Stream UDFs are in the mymodule UDF module.)

note

Queries without a "where" clause result in a full database scan.

Defining a Stream UDF

The mycount() Stream UDF allows you to process a stream of data:

local function one(rec)
return 1
end

local function add(a, b)
return a + b
end

function mycount(stream)
return stream : map(one) : reduce(add);
end

The mycount() Stream UDF is applied to a stream of results from the query. We can add to the stream these operations to perform on the results:

  • map Maps a value from the stream to another value. In this example, mapping is defined as one(), which maps a record to the value 1.
  • reduce Reduces the values from the stream to a single value. In the example, reduction is performed by adding two values from the stream, which are the 1s from map.

The end result is a stream that contains a single value: the count (or the sum of 1 for each record in the result set).

Registering UDFs

Before making a query using the Stream UDF, the UDF must register with the Aerospike server.

as_error err;

if (aerospike_udf_put(&as, &err, NULL, "mymodule", AS_UDF_TYPE_LUA,
&udf_content) != AEROSPIKE_OK) {
LOG("aerospike_udf_put() returned %d - %s", err.code, err.message);
}

Executing Queries

To execute the query using aerospike_query_foreach():

if (aerospike_query_foreach(&as, &err, NULL, &query, each_value, NULL) != AEROSPIKE_OK) {
fprintf(stderr, "err(%d) %s at [%s:%d]\n", err.code, err.message, err.file, err.line);
}

Processing the Results

Call each_value() for each value that returns from the query:

bool each_value(const as_val *val, void *udata) {
if (val == NULL) {
// query is complete
return true;
}

as_integer *ival = as_integer_fromval(val);

if (ival == NULL) {
// abort the query
return false;
}

// process the value

return true;
}

The example above returns a single integer: the count of the records that satisfy the query.

To pass a global object each time during the callback, provide userdatain as_query_foreach().

Cleaning Up Resources

After the query results complete processing, the client can safely destroy the query object and its member objects using as_query_destroy(). Note the example avoids an explicit as_query_destroy() by using the stack-allocated as_query object and as_query_where_inita() to avoid using the internal heap.