Skip to main content

Implementing SQL Operations: Aggregates (Part 2)

For an interactive Jupyter notebook experience: Binder

This tutorial is Part 2 of how to implement SQL aggregate queries in Aerospike.

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Introduction

In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike.

SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.

This notebook is the third in the SQL Operations series that consists of the following notebooks:

  • Implementing SQL Operations: SELECT
  • Implementing SQL Operations: Aggregate functions Part 1 and 2 (this notebook)
  • Implementing SQL Operations: UPDATE, CREATE, and DELETE

Part 1 of Aggregate functions describes simpler aggregate processing of a stream of records.

The specific topics and aggregate functions we discuss in this notebook include:

  • Stream Partitioning with GROUP BY
    • Filtering partitions: HAVING
    • Sorting partitions: ORDER BY
  • Additional aggregate functions
    • DISTINCT
    • LIMIT
    • TOP N

The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.

Prerequisites

This tutorial assumes familiarity with the following topics:

Working with UDF Module

All UDF functions for this notebook are placed in "aggregate_fns.lua" file under the "udf" subdirectory. If the subdirectory or file is not there, you may download the file from here and place it there using the notebook's File->Open followed by Upload/New menus.

You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the convenience function "registerUDF()" in a code cell for the changes to take effect.

Initialization

Ensure database is running

This notebook requires that Aerospike Database is running.

import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

Download and install additional components.

Install the Java client.

%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>

Connect to database and populate test data

The test data has 1000 records with user-key "id-1" through "id-1000", two integer bins (fields) "bin1" (1-1000) and "bin2" (1001-2000), and one string bin "bin3" (random 5 values "A" through "E"), in the namespace "test" and set "sql-aggregate".

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import java.util.Random;

String[] groups = {"A", "B", "C", "D", "E"};
Random rand = new Random(1);

AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");

String Namespace = "test";
String Set = "sql-aggregate";

WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 1000; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]);
client.put(wpolicy, key, bin1, bin2, bin3);
}

System.out.format("Test data populated");;

Output:

Initialized the client and connected to the cluster.
Test data populated

Create a secondary index

To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-aggregate" set.

import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;

String IndexName = "test_sql_aggregate_bin1_number_idx";

Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.

try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}

System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;

Output:

Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.

Execution Model for Processing Aggregates

(This section is repeated for convenience from Part 1. Please skip to the the next section if you are familiar with the execution model.)

Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.

Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.

  • Filter: Object -> Boolean; filters input objects, input and output objects are of the same type.
  • Map: Object -> Object; any transformation is possible.
  • Aggregate: (Current State, Object) -> New State; maintains the global "aggregate" state of the stream. While any type can be used, a (Aerospike) Map type is often used.
  • Reduce: (Object, Object) -> Object; reduces two objects to a single object of the same type.

The operators may appear any number of times and in any order in the pipeline.

The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.

  • Phase 1: Server nodes execute all operators up to and including the first reduce operation in the pipeline.
  • Phase 2: The client processes results from multiple nodes through the remaining pipeline operators starting with and including the first reduce operation in the pipeline.

Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.

Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side.

Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record).

A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing.

The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the documentation.

Register UDF

Note, all UDF functions for this notebook are assumed to be in "aggregate_fns.lua" file under "udf" directory. Please refer to "Working with UDF Module" section above.

Register the UDF with the server by executing the following code cell.

The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version.

import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;

LuaConfig.SourceDirectory = "../udf";
String UDFFile = "aggregate_fns.lua";
String UDFModule = "aggregate_fns";

void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}

registerUDF();

Output:

Registered the UDF module aggregate_fns.lua.

Stream Partitioning with GROUP BY

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1

GROUP BY processing partitions the record stream into multiple partitions, one for each distinct value of the grouped-by bin. The aggregate operator in the pipeline outputs a nested map - an outer map of all partitions or distinct values of the grouped-by bin, and an inner map for each partition to maintain each partition's aggregates. The bins aggregated for each group, such as agg(bin2) in the above SQL statement, are stored within a group's map.

Reduce uses map-merge to merge partial aggregates. Since map-merge currently does not handle nested maps, merging at multiple levels have to be explicitly specified as shown.

The filter "inner-condition" can be specified on any bins in the record, and can be processed using a query predicate filter and/or a stream filter operator. This is as described in Part 1, and so the example below will omit the WHERE clause for simplicity.

SELECT bin1, SUM(bin2) FROM test.sql-aggregate GROUP BY bin1

We will implement a new UDF "groupby_with_sum" for this.

GROUPBY_WITH_SUM

It takes two bins: the bin to group-by and the bin to sum. The pipeline consists of map, aggregate, and reduce operators.

  • the map function "rec_to_group_and_bin" adds a group tag, and return a map containing the group and the value of bin to sum.
  • the aggregate function "group_sum" takes the current aggregate state and "groupval" map and returns the new aggregate state. It creates a map for each distinct group value and adds the value tagged for a group to the group's sum
  • the reduce function "merge_group_sum" is a nested map merge that merges maps explicitly at the two levels.

-- nested map merge for group-by sum/count; explicit map merge at each nested level
local function merge_group_sum(a, b)
local function merge_group(x, y)
-- inner map merge
return map.merge(x, y, add_values)
end
-- outer map merge
return map.merge(a, b, merge_group)
end

-- aggregate for group-by sum
-- creates a map for each distinct group value and adds the value tagged for a group to the group's sum
local function group_sum(agg, groupval)
if not agg[groupval["group"]] then agg[groupval["group"]] = map() end
agg[groupval["group"]]["sum"] = (agg[groupval["group"]]["sum"] or 0) + (groupval["value"] or 0)
return agg
end

-- group-by with sum
function groupby_with_sum(stream, bin_grpby, bin_sum)
local function rec_to_group_and_bin(rec)
-- tag the group by bin_grpby value, return a map containing group and bin_sum value
local ret = map()
ret["group"] = rec[bin_grpby]
local val = rec[bin_sum]
if (not val or type(val) ~= "number") then val = 0 end
ret["value"] = val
return ret
end
return stream : map(rec_to_group_and_bin) : aggregate(map(), group_sum) : reduce(merge_group_sum)
end

import com.aerospike.client.query.Statement;
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.ResultSet;

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_sum", Value.get("bin3"), Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with SUM.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed GROUP BY with SUM.
Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}

Filtering Partitions: HAVING

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition

Note the inner filter "inner-condition" can be specified using any bins in the record, whereas the outer filter and ORDER BY must use selected (aggregated) bins from the query. We will focus on the outer condition in the following example which outputs the count of distinct bin3 values in the range "B" and "E".

SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING "B" <= bin3 AND bin3 <= "E"

Processing for Having clause can be done by using a filter operator after reduce.

Here we implement a new UDF "groupby_with_count_having" for this.

GROUPBY_WITH_COUNT_HAVING

It takes the group-by bin and the range values for the groups. The pipeline consists of map, aggregate, reduce, and filter operators.

  • the map function "rec_to_group" simply returns the group-by bin value.
  • the aggregate function "group_count" takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.
  • the reduce function "merge_group_sum" is a nested map merge that merges maps explicitly at the two levels.
  • the filter function "process_having" iterates over the nested map, applies the filter condition, and returns a slice of the input map.
-- aggregate for group-by count
-- creates a map for each distinct group value and increments the tagged group's count
local function group_count(agg, group)
if not agg[group] then agg[group] = map() end
agg[group]["count"] = (agg[group]["count"] or 0) + ((group and 1) or 0)
return agg
end

-- map function for group-by processing
local function rec_to_group_closure(bin_grpby)
local function rec_to_group(rec)
-- returns group-by bin value in a record
return rec[bin_grpby]
end
return rec_to_group
end

-- group-by having example: count(*) having low <= count <= high
function groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high)
local function process_having(stats)
-- filters groups with count in the range
local ret = map()
for key, value in map.pairs(stats) do
if (key >= having_range_low and key <= having_range_high) then
ret[key] = value
end
end
return ret
end
return stream : map(rec_to_group_closure(bin_grpby)) : aggregate(map(), group_count)
: reduce(merge_group_sum) : map(process_having)
end

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_count_having", Value.get("bin3"), Value.get("B"), Value.get("D"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with COUNT and HAVING.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed GROUP BY with COUNT and HAVING.
Returned object: {D={count=222}, B={count=196}, C={count=172}}

Sorting Partitions: ORDER BY

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin

In the following example, the count of distinct bin3 values is produced in descending order.

SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT

Processing for Order By clause can be done by using a map operator at the end that outputs an ordered list.

The UDF "groupby_with_count_orderby" is very similar to the HAVING example.

GROUPBY_WITH_COUNT_ORDERBY

It takes two bins to group-by order-by. The pipeline consists of map, aggregate, reduce, and map operators.

  • the map function "rec_to_group" (see above) simply returns the group-by bin value.
  • the aggregate function "group_count"(see above) takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.
  • the reduce function "merge_group_sum"(see above) is a nested map merge that merges maps explicitly at the two levels.
  • the map function "process_orderby" uses lua table's sort function to sort the aggregate map into a flattened ordered list in this format [k1, v1, k2, v2, ...].
-- group-by count(*) order-by count
function groupby_with_count_orderby(stream, bin_grpby, bin_orderby)
local function orderby(t, order)
-- collect the keys
local keys = {}
for k in pairs(t) do keys[#keys+1] = k end
-- sort by the order by passing the table and keys a, b,
table.sort(keys, function(a,b) return order(t, a, b) end)
-- return the iterator function
local i = 0
return function()
i = i + 1
if keys[i] then
return keys[i], t[keys[i] ]
end
end
end
local function process_orderby(stats)
-- uses lua table sort to sort aggregate map into a list
-- list has k and v separately added for sorted entries
local ret = list()
local t = {}
for k,v in map.pairs(stats) do t[k] = v end
for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do
list.append(ret, k)
list.append(ret, v)
end
return ret
end
return stream : map(rec_to_group) : aggregate(map(), group_count)
: reduce(merge_group_count) : map(process_orderby)
end

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_count_orderby", Value.get("bin3"), Value.get("count"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with COUNT and ORDER BY.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed GROUP BY with COUNT and ORDER BY.
Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]

More Aggregates: DISTINCT, LIMIT, and TOP N

Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the first two appear in SQL syntax, and the third is a special case of a LIMIT query.

DISTINCT

SELECT DISTINCT(bin) FROM namespace.set WHERE condition

DISTINCT can be processed by storing all values in a map (in the aggregate state) that is keyed on the value(s) of the bin(s) so only unique values are retained.

In the following example, distinct bin3 values are produced for records whose bin1 is in the range [101,200].

SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200

The UDF "distinct" implements a single bin distinct.

DISTINCT

It takes the bin and returns its distinct values. The pipeline consists of map, aggregate, reduce, and map operators.

  • the map function "rec_to_bin_value" simply returns the bin value.
  • the aggregate function "distinct_values" takes the current aggregate state and a value, and returns the new aggregate state. Only unique values are retained in a map as keys.
  • the reduce function "merge_values"is a map merge that merges two maps that has the union of their keys.
  • the map function "map_to_list" returns a list of map keys.
-- return map keys in a list
local function map_to_list(values)
local ret = list()
for k in map.keys(values) do list.append(ret, k) end
return ret
end

-- merge partial aggregate maps
local function merge_values(a, b)
return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end)
end

-- map for distinct; using map unique keys
local function distinct_values(agg, value)
if value then agg[value] = 1 end
return agg
end

-- distinct
function distinct(stream, bin)
local function rec_to_bin_value(rec)
-- simply return bin value in rec
return rec[bin]
end
return stream : map(rec_to_bin_value) : aggregate(map(), distinct_values)
: reduce(merge_values) : map(map_to_list)
end

import com.aerospike.client.query.Filter;

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setAggregateFunction(UDFModule, "distinct", Value.get("bin3"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed DISTINCT.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed DISTINCT.
Returned object: [A, C, B, E, D]

LIMIT

SELECT bin FROM namespace.set WHERE condition LIMIT N

In the following example, up to 10 values in bin2 are produced for records whose bin1 is in the range [101,200].

SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10

The UDF "limit" returns a single bin with an upper limit on number of results returned.

LIMIT

It takes the bin and max limit. and returns up to max number of bin values. The pipeline consists of aggregate and reduce.

  • the aggregate function "list_limit" takes the current aggregate state and a record, and returns the new aggregate state by adding the record's bin value to a list only if the list size is below the limit.
  • the reduce function "list_merge_limit" merges two lists to retain only max number of values.
function limit(stream, bin, max)
local function list_limit(agg, rec)
-- add to list if the list size is below the limit
if list.size(agg) < max then
local ret = map()
ret[bin] = rec[bin]
list.append(agg, ret)
end
return agg
end
local function list_merge_limit(a, b)
local ret = list()
list.concat(ret, list.take(a, max))
list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0))
return ret
end
return stream : aggregate(list(), list_limit) : reduce(list_merge_limit)
end

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "limit", Value.get("bin2"), Value.get(10));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed LIMIT N.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed LIMIT N.
Returned object: [{bin2=1128}, {bin2=1160}, {bin2=1192}, {bin2=1129}, {bin2=1161}, {bin2=1193}, {bin2=1130}, {bin2=1162}, {bin2=1194}, {bin2=1131}]

TOP N

SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N

TOP N can be processed by retaining top N values in a list in aggregate as well as reduce operators.

In the following example, top 10 values in bin2 are produced for records whose bin1 is in the range [101,200].

SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10

The UDF "top_n" returns the top N values from a bin.

TOP_N

It takes the bin and N. and returns top N bin values. The pipeline consists of map, aggregate, reduce, and map.

  • the map function "rec_to_bin_value" simply returns the bin value.
  • the aggregate function "top_n_values" takes the current aggregate state and a record, and returns the new aggregate state by retaining distinct bin values in a map. It trims the retained values by retaining only top N values if the retained values ever exceed a max limit (in this code 10*N).
  • the reduce function "merge_values" (see above) merges two maps that represent top n values in two partial streams.
  • the map function "get_top_n" return top n values in a map as an ordered list. It leverages the table sort function for sorting.
-- top n
function top_n(stream, bin, n)
local function get_top_n(values)
-- return top n values in a map as an ordered list
-- uses lua table sort
local t = {}
local i = 1
for k in map.keys(values) do
t[i] = k
i = i + 1
end
table.sort(t, function(a,b) return a > b end)
local ret = list()
local i = 0
for k, v in pairs(t) do
list.append(ret, v)
i = i + 1
if i == n then break end
end
return ret
end
local function top_n_values(agg, value)
if value then agg[value] = 1 end
-- if map size exceeds n*10, trim to top n
if map.size(agg) > n*10 then
local new_agg = map()
local trimmed = trim_to_top_n(agg)
for value in list.iterator(trimmed) do
new_agg[value] = 1
end
agg = new_agg
end
return agg
end
return stream : map(rec_to_bin_value_closure(bin)) : aggregate(map(), top_n_values)
: reduce(merge_values) : map(get_top_n)
end

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setAggregateFunction(UDFModule, "top_n", Value.get("bin2"), Value.get(5));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed TOP N.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();

Output:

Executed TOP N.
Returned object: [1200, 1199, 1198, 1197, 1196]

Takeaways and Conclusion

Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.

Clean up

Remove tutorial data and close connection.

client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");

Output:

Removed tutorial data and closed server connection.

Further Exploration and Resources

Here are some links for further exploration

Resources

Next steps

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.