Skip to main content

Implementing SQL Operations: Aggregates (Part 2)

For an interactive Jupyter notebook experience: Launch in Binder#

This tutorial is Part 2 of how to implement SQL aggregate queries in Aerospike.

This notebook requires Aerospike datbase running on localhost. Visit Aerospike notebooks repo for additional details and the docker container.

Introduction#

In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike.

SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.

This notebook is the third in the SQL Operations series that consists of the following notebooks:

  • Implementing SQL Operations: SELECT
  • Implementing SQL Operations: Aggregate functions Part 1 and 2 (this notebook)
  • Implementing SQL Operations: UPDATE, CREATE, and DELETE

Part 1 of Aggregate functions describes simpler aggregate processing of a stream of records.

The specific topics and aggregate functions we discuss in this notebook include:

  • Stream Partitioning with GROUP BY
    • Filtering partitions: HAVING
    • Sorting partitions: ORDER BY
  • Additional aggregate functions
    • DISTINCT
    • LIMIT
    • TOP N

The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.

Prerequisites#

This tutorial assumes familiarity with the following topics:

Working with UDF Module#

All UDF functions for this notebook are placed in "aggregate_fns.lua" file under the "udf" subdirectory. If the subdirectory or file is not there, you may download the file from here and place it there using the notebook's File->Open followed by Upload/New menus.

You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the comvenience function "registerUDF()" in a code cell for the changes to take effect.

Initialization#

Ensure database is running#

This notebook requires that Aerospike datbase is running.

import io.github.spencerpark.ijava.IJava;import io.github.spencerpark.jupyter.kernel.magic.common.Shell;IJava.getKernelInstance().getMagics().registerMagics(Shell.class);%sh asd

Download and install additional components.#

Install the Java client.

%%loadFromPOM<dependencies>  <dependency>    <groupId>com.aerospike</groupId>    <artifactId>aerospike-client</artifactId>    <version>5.0.0</version>  </dependency></dependencies>

Connect to database and populate test data#

The test data has 1000 records with user-key "id-1" through "id-1000", two integer bins (fields) "bin1" (1-1000) and "bin2" (1001-2000), and one string bin "bin3" (random 5 values "A" through "E"), in the namespace "test" and set "sql-aggregate".

import com.aerospike.client.AerospikeClient;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.policy.WritePolicy;import java.util.Random; 
String[] groups = {"A", "B", "C", "D", "E"}; Random rand = new Random(1); 
AerospikeClient client = new AerospikeClient("localhost", 3000);System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";String Set = "sql-aggregate";
WritePolicy wpolicy = new WritePolicy();wpolicy.sendKey = true;for (int i = 1; i <= 1000; i++) {    Key key = new Key(Namespace, Set, "id-"+i);    Bin bin1 = new Bin(new String("bin1"), i);    Bin bin2 = new Bin(new String("bin2"), 1000+i);    Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]);     client.put(wpolicy, key, bin1, bin2, bin3);}
System.out.format("Test data popuated");;

Output:

Initialized the client and connected to the cluster.Test data popuated

Create a secondary index#

To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-aggregate" set.

import com.aerospike.client.policy.Policy;import com.aerospike.client.query.IndexType;import com.aerospike.client.task.IndexTask;import com.aerospike.client.AerospikeException;import com.aerospike.client.ResultCode;
String IndexName = "test_sql_aggregate_bin1_number_idx";
Policy policy = new Policy();policy.socketTimeout = 0; // Do not timeout on index create.
try {    IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,                                         "bin1", IndexType.NUMERIC);    task.waitTillComplete();}catch (AerospikeException ae) {    if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {        throw ae;    }}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",                                     IndexName, Namespace, Set, "bin1");;

Output:

Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.

Execution Model for Processing Aggregates#

(This section is repeated for convenience from Part 1. Please skip to the the next section if you are familiar with the execution model.)

Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.

Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.

  • Filter: Object -> Boolean; filters input objects, input and output objects are of the same type.
  • Map: Object -> Object; any transformation is possible.
  • Aggregate: (Current State, Object) -> New State; maintains the global "aggregate" state of the stream. While any type can be used, a (Aerospike) Map type is often used.
  • Reduce: (Object, Object) -> Object; reduces two objects to a single object of the same type.

The operators may appear any number of times and in any order in the pipeline.

The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.

  • Phase 1: Server nodes execute all operators up to and including the first reduce operation in the pipeline.
  • Phase 2: The client processes results from multiple nodes through the remaining pipeline operators starting with and including the first reduce operation in the pipeline.

Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.

Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side.

Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record).

A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing.

The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the documentation.

Register UDF#

Note, all UDF functions for this notebook are assumed to be in "aggregate_fns.lua" file under "udf" directory. Please refer to "Working with UDF Module" section above.

Register the UDF with the server by executing the following code cell.

The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiement with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version.

import com.aerospike.client.policy.Policy;import com.aerospike.client.task.RegisterTask;import com.aerospike.client.Language;import com.aerospike.client.lua.LuaConfig;import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";String UDFFile = "aggregate_fns.lua";String UDFModule = "aggregate_fns";
void registerUDF() {    // clear the lua cache    LuaCache.clearPackages();    Policy policy = new Policy();    // remove the current module, if any    client.removeUdf(null, UDFFile);    RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,                                         UDFFile, Language.LUA);    task.waitTillComplete();    System.out.format("Registered the UDF module %s.", UDFFile);;}
registerUDF();

Output:

Registered the UDF module aggregate_fns.lua.

Stream Partitioning with GROUP BY#

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1

GROUP BY processing partitions the record stream into multiple partitions, one for each distinct value of the grouped-by bin. The aggregate operator in the pipeline ouputs a nested map - an outer map of all partitions or distinct values of the grouped-by bin, and an inner map for each partition to maintain each partition's aggregates. The bins aggregated for each group, such as agg(bin2) in the above SQL statement, are stored within a group's map.

Reduce uses map-merge to merge partial aggregates. Since map-merge currently does not handle nested maps, merging at multiplle levels have to be explicitly specified as shown.

The filter "inner-condition" can be specified on any bins in the record, and can be processed using a query predicate filter and/or a stream filter operator. This is as described in Part 1, and so the example below will omit the WHERE clause for simplicity.

SELECT bin1, SUM(bin2) FROM test.sql-aggregate GROUP BY bin1

We will implement a new UDF "groupby_with_sum" for this.

GROUPBY_WITH_SUM

It takes two bins: the bin to group-by and the bin to sum. The pipeline consists of map, aggregate, and reduce operators.

  • the map function "rec_to_group_and_bin" adds a group tag, and return a map containing the group and the value of bin to sum.
  • the aggregate function "group_sum" takes the current aggregate state and "groupval" map and returns the new aggregate state. It creates a map for each distinct group value and adds the value tagged for a group to the group's sum
  • the reduce function "merge_group_sum" is a nested map merge that merges maps explicitly at the two levels.

-- nested map merge for group-by sum/count; explicit map merge at each nested levellocal function merge_group_sum(a, b)    local function merge_group(x, y)    -- inner map merge        return map.merge(x, y, add_values)    end    -- outer map merge    return map.merge(a, b, merge_group)end
-- aggregate for group-by sum--    creates a map for each distinct group value and adds the value tagged for a group to the group's sumlocal function group_sum(agg, groupval)    if not agg[groupval["group"]] then agg[groupval["group"]] = map() end    agg[groupval["group"]]["sum"] = (agg[groupval["group"]]["sum"] or 0) + (groupval["value"] or 0)    return aggend
-- group-by with sumfunction groupby_with_sum(stream, bin_grpby, bin_sum)    local function rec_to_group_and_bin(rec)    -- tag the group by bin_grpby value, return a map containing group and bin_sum value         local ret = map()        ret["group"] = rec[bin_grpby]        local val = rec[bin_sum]        if (not val or type(val) ~= "number") then val = 0 end        ret["value"] = val        return ret    end    return stream : map(rec_to_group_and_bin) : aggregate(map(), group_sum) : reduce(merge_group_sum) end
import com.aerospike.client.query.Statement;import com.aerospike.client.Value;import com.aerospike.client.query.RecordSet;import com.aerospike.client.query.ResultSet;
Statement stmt = new Statement();stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_sum", Value.get("bin3"), Value.get("bin2"));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed GROUP BY with SUM.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed GROUP BY with SUM.Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}

Filtering Partitions: HAVING#

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition

Note the inner filter "inner-condition" can be specified using any bins in the record, whereas the outer filter and ORDER BY must use selected (aggregated) bins from the query. We will focus on the outer condition in the following example which outputs the count of distinct bin3 values in the range "B" and "E".

SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING "B" <= bin3 AND bin3 <= "E"

Processing for Having clause can be done by using a filter operator after reduce.

Here we implement a new UDF "groupby_with_count_having" for this.

GROUPBY_WITH_COUNT_HAVING

It takes the group-by bin and the range values for the groups. The pipeline consists of map, aggregate, reduce, and filter operators.

  • the map function "rec_to_group" simply returns the group-by bin value.
  • the aggregate function "group_count" takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.
  • the reduce function "merge_group_sum" is a nested map merge that merges maps explicitly at the two levels.
  • the filter function "process_having" iterates over the nested map, applies the filter condition, and returns a slice of the input map.
-- aggregate for group-by count--   creates a map for each distinct group value and increments the tagged group's countlocal function group_count(agg, group)    if not agg[group] then agg[group] = map() end    agg[group]["count"] = (agg[group]["count"] or 0) + ((group and 1) or 0)    return aggend
-- map function for group-by processinglocal function rec_to_group_closure(bin_grpby)    local function rec_to_group(rec)        -- returns group-by bin value in a record        return rec[bin_grpby]    end    return rec_to_groupend
-- group-by having example: count(*) having low <= count <= highfunction groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high)    local function process_having(stats)        -- filters groups with count in the range        local ret = map()        for key, value in map.pairs(stats) do             if (key >= having_range_low and key <= having_range_high) then                 ret[key] = value            end        end        return ret    end    return stream : map(rec_to_group_closure(bin_grpby)) : aggregate(map(), group_count)                     : reduce(merge_group_sum) : map(process_having)end
Statement stmt = new Statement();stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_count_having", Value.get("bin3"), Value.get("B"), Value.get("D"));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed GROUP BY with COUNT and HAVING.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed GROUP BY with COUNT and HAVING.Returned object: {D={count=222}, B={count=196}, C={count=172}}

Sorting Partitions: ORDER BY#

SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin

In the following example, the count of distinct bin3 values is produced in descending order.

SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT

Processing for Order By clause can be done by using a map operator at the end that outputs an ordered list.

The UDF "groupby_with_count_orderby" is very similar to the HAVING example.

GROUPBY_WITH_COUNT_ORDERBY

It takes two bins to group-by order-by. The pipeline consists of map, aggregate, reduce, and map operators.

  • the map function "rec_to_group" (see above) simply returns the group-by bin value.
  • the aggregate function "group_count"(see above) takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.
  • the reduce function "merge_group_sum"(see above) is a nested map merge that merges maps explicitly at the two levels.
  • the map function "process_orderby" uses lua table's sort function to sort the aggregate map into a flattened ordered list in this format [k1, v1, k2, v2, ...].
-- group-by count(*) order-by countfunction groupby_with_count_orderby(stream, bin_grpby, bin_orderby)    local function orderby(t, order)        -- collect the keys        local keys = {}        for k in pairs(t) do keys[#keys+1] = k end        -- sort by the order by passing the table and keys a, b,        table.sort(keys, function(a,b) return order(t, a, b) end)        -- return the iterator function        local i = 0        return function()            i = i + 1            if keys[i] then                return keys[i], t[keys[i] ]            end        end    end    local function process_orderby(stats)        -- uses lua table sort to sort aggregate map into a list         -- list has k and v separately added for sorted entries         local ret = list()        local t = {}        for k,v in map.pairs(stats) do t[k] = v end        for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do            list.append(ret, k)            list.append(ret, v)        end                return ret    end    return stream : map(rec_to_group) : aggregate(map(), group_count)                     : reduce(merge_group_count) : map(process_orderby)end
Statement stmt = new Statement();stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_count_orderby", Value.get("bin3"), Value.get("count"));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed GROUP BY with COUNT and ORDER BY.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed GROUP BY with COUNT and ORDER BY.Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]

More Aggregates: DISTINCT, LIMIT, and TOP N#

Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the first two appear in SQL syntax, and the third is a special case of a LIMIT query.

DISTINCT#

SELECT DISTINCT(bin) FROM namespace.set WHERE condition

DISTINCT can be processed by storing all values in a map (in the aggregate state) that is keyed on the value(s) of the bin(s) so only unique values are retained.

In the following example, distinct bin3 values are produced for records whose bin1 is in the range [101,200].

SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200

The UDF "distinct" implements a single bin distinct.

DISTINCT

It takes the bin and returns its distinct values. The pipeline consists of map, aggregate, reduce, and map operators.

  • the map function "rec_to_bin_value" simply returns the bin value.
  • the aggregate function "distinct_values" takes the current aggregate state and a value, and returns the new aggregate state. Only unique values are retained in a map as keys.
  • the reduce function "merge_values"is a map merge that merges two maps that has the union of their keys.
  • the map function "map_to_list" returns a list of map keys.
-- return map keys in a listlocal function map_to_list(values)    local ret = list()    for k in map.keys(values) do list.append(ret, k) end    return retend
-- merge partial aggregate mapslocal function merge_values(a, b)    return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end)end
-- map for distinct; using map unique keyslocal function distinct_values(agg, value)    if value then agg[value] = 1 end    return aggend
-- distinct function distinct(stream, bin)    local function rec_to_bin_value(rec)        -- simply return bin value in rec        return rec[bin]    end    return stream : map(rec_to_bin_value) : aggregate(map(), distinct_values)                     : reduce(merge_values) : map(map_to_list)end 
import com.aerospike.client.query.Filter;
Statement stmt = new Statement();stmt.setNamespace(Namespace); stmt.setSetName(Set); // range filter using the secondary index on bin1stmt.setFilter(Filter.range("bin1", 101, 200));stmt.setAggregateFunction(UDFModule, "distinct", Value.get("bin3"));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed DISTINCT.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed DISTINCT.Returned object: [A, C, B, E, D]

LIMIT#

SELECT bin FROM namespace.set WHERE condition LIMIT N

In the following example, up to 10 values in bin2 are produced for records whose bin1 is in the range [101,200].

SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10

The UDF "limit" returns a single bin with an upper limit on number of results returned.

LIMIT

It takes the bin and max limit. and returns up to max number of bin values. The pipeline consists of aggregate and reduce.

  • the aggregate function "list_limit" takes the current aggregate state and a record, and returns the new aggregate state by adding the record's bin value to a list only if the list size is below the limit.
  • the reduce function "list_merge_limit" merges two lists to retain only max number of values.
function limit(stream, bin, max)   local function list_limit(agg, rec)        -- add to list if the list size is below the limit        if list.size(agg) < max then            local ret = map()            ret[bin] = rec[bin]            list.append(agg, ret)        end        return agg    end    local function list_merge_limit(a, b)        local ret = list()        list.concat(ret, list.take(a, max))        list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0))        return ret    end    return stream : aggregate(list(), list_limit) : reduce(list_merge_limit) end 
Statement stmt = new Statement();stmt.setNamespace(Namespace); // range filter using the secondary index on bin1stmt.setFilter(Filter.range("bin1", 101, 200));stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "limit", Value.get("bin2"), Value.get(10));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed LIMIT N.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed LIMIT N.Returned object: [{bin2=1128}, {bin2=1160}, {bin2=1192}, {bin2=1129}, {bin2=1161}, {bin2=1193}, {bin2=1130}, {bin2=1162}, {bin2=1194}, {bin2=1131}]

TOP N#

SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N

TOP N can be processed by retaining top N values in a list in aggregate as well as reduce operators.

In the following example, top 10 values in bin2 are produced for records whose bin1 is in the range [101,200].

SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10

The UDF "top_n" returns the top N values from a bin.

TOP_N

It takes the bin and N. and returns top N bin values. The pipeline consists of map, aggregate, reduce, and map.

  • the map function "rec_to_bin_value" simply returns the bin value.
  • the aggregate function "top_n_values" takes the current aggregate state and a record, and returns the new aggregate state by retaining distinct bin values in a map. It trims the retained values by retaining only top N values if the retained values ever exceed a max limit (in this code 10*N).
  • the reduce function "merge_values" (see above) merges two maps that represent top n values in two partial streams.
  • the map function "get_top_n" return top n values in a map as an ordered list. It leverages the table sort function for sorting.
-- top nfunction top_n(stream, bin, n)    local function get_top_n(values)        -- return top n values in a map as an ordered list        -- uses lua table sort        local t = {}        local i = 1        for k in map.keys(values) do             t[i] = k             i = i + 1        end        table.sort(t, function(a,b) return a > b end)        local ret = list()        local i = 0        for k, v in pairs(t) do             list.append(ret, v)             i = i + 1             if i == n then break end        end        return ret    end    local function top_n_values(agg, value)        if value then agg[value] = 1 end        -- if map size exceeds n*10, trim to top n        if map.size(agg) > n*10 then             local new_agg = map()            local trimmed = trim_to_top_n(agg)             for value in list.iterator(trimmed) do                new_agg[value] = 1            end            agg = new_agg        end        return agg    end    return stream : map(rec_to_bin_value_closure(bin)) : aggregate(map(), top_n_values)                     : reduce(merge_values) : map(get_top_n)end 
Statement stmt = new Statement();stmt.setNamespace(Namespace); stmt.setSetName(Set); // range filter using the secondary index on bin1stmt.setFilter(Filter.range("bin1", 101, 200));stmt.setAggregateFunction(UDFModule, "top_n", Value.get("bin2"), Value.get(5));ResultSet rs = client.queryAggregate(null, stmt);System.out.println("Executed TOP N.");while (rs.next()) {    Object obj = rs.getObject();    System.out.format("Returned object: %s", obj.toString());}rs.close();

Output:

Executed TOP N.Returned object: [1200, 1199, 1198, 1197, 1196]

Takeaways and Conclusion#

Many developers that are familiar with SQL would like to see how SQL operations translate to Aeropsike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.

Clean up#

Remove tutorial data and close connection.

client.dropIndex(null, Namespace, Set, IndexName);client.truncate(null, Namespace, null, null);client.close();System.out.println("Removed tutorial data and closed server connection.");

Output:

Removed tutorial data and closed server connection.

Further Exploration and Resources#

Here are some links for further exploration

Resources

Next steps#

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.