Skip to main content

Aerospike Query and UDF

For an interactive Jupyter notebook experience: Binder

Aerospike queries allow filtering based on a predicate and User Defined Functions (UDFs) offer arbitrary server side processing. This notebook illustrates how a query and a UDF can be combined in a useful pattern. Two examples are given: the first with a query with a UDF aggregate function, and the second with a query, predicate expression and a UDF update function. The code for the first example is also available in this repo.

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Getting Started

Ensure database is running

This notebook requires that Aerospike Database is running.

import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

Download Aerospike client from POM

Here the Java client version 5.0.0 is installed.

%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>

Connect to database and populate test data

The test data has ten records with user-key 1 through 10, two bins (fields) "binint" and "binstr", in the namespace "test" and set "demo". The two bins are initialized with the user key and a string of the form "(id). (name)".

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;

AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");

Output:

Initialized the client and connected to the cluster.
import com.aerospike.client.policy.ClientPolicy;

String Namespace = "test";
String Set = "demo";
String BinInt = "binint";
String BinStr = "binstr";
int NumRecords = 10;

String Names[] = {"1. Clark", "2. Keenan", "3. Smith", "4. Jones", "5. Clark",
"6. Jones", "7. Iyer", "8. Smith", "9. Hernandez", "10. Smith"};

ClientPolicy policy = new ClientPolicy();
for (int i = 1; i <= NumRecords; i++) {
Key key = new Key(Namespace, Set, i);
Bin bin1 = new Bin(BinInt, i);
Bin bin2 = new Bin(BinStr, Names[i-1]);
client.put(policy.writePolicyDefault, key, bin1, bin2);
}

System.out.format("Written %d records in ns=%s set=%s with userkeys 1-%d.",
NumRecords, Namespace, Set, NumRecords);

Output:

Written 10 records in ns=test set=demo with userkeys 1-10.

Create a secondary index

import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;

String IndexName = "idx_numeric_test_demo_binint";

Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.

try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
BinInt, IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}

System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, BinInt);

Output:

Created index idx_numeric_test_demo_binint on ns=test set=demo bin=binint.

Part 1: Query with aggregate function

In the first example, we will demonstrate how an aggregate function (sum) is calculated over a stream of records returned by a query. First we will create the aggregate function in an UDF module. A UDF function is like a stored procedure that is executed on all server nodes of the cluster. All streams of the partial node-specific answers are then combined locally using the same UDF function. For this reason, the UDF module must be registered to the server for the first phase of parallel processing across all node, and also available locally for the final phase of aggregation.

Create UDF module and aggregate function

Examine the following Lua code that is aggregating (reducing) a stream of records into a sum of bin values. Create the UDF module "sum_example.lua" in "udf" directory.

Create a "udf" directory and place sum_example.lua file with this content in it. Alternatively, execute the following two cells to achieve the same effect.

-- sum_example.lua

local function reducer(val1,val2)
return val1 + val2
end

function sum_single_bin(stream,name)
local function mapper(rec)
return rec[name]
end
return stream : map(mapper) : reduce(reducer)
end

import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileWriter;

void CreateUDFModule(String name, String code) {
try {
if (!Files.exists(Paths.get("./udf"))) {
Files.createDirectory(Paths.get("./udf"));
}
FileWriter fw = new FileWriter("./udf/" + name);
fw.write(luaCode);
fw.close();
}
catch (Exception e) {
System.out.format("Failed to create Lua module %s, exception: %s.",
"udf/"+name, e);
}
}
//  Execute this cell to create UDF module "udf/sum_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.

String luaCode =
"-- sum_example.lua" + "\n" +
"" + "\n" +
"local function reducer(val1,val2)" + "\n" +
" return val1 + val2" + "\n" +
"end" + "\n" +
"" + "\n" +
"function sum_single_bin(stream,name)" + "\n" +
" local function mapper(rec)" + "\n" +
" return rec[name]" + "\n" +
" end" + "\n" +
" return stream : map(mapper) : reduce(reducer)" + "\n" +
"end";
CreateUDFModule("sum_example.lua", luaCode);

System.out.format("Lua module %s created.", "udf/sum_example.lua"); // </pre>

Output:

Lua module udf/sum_example.lua created.

Register the UDF module

Register the lua module for the aggregate function with the server.

import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;

String UDFDir = "./udf";
String UDFFile = "sum_example.lua";

RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA);
task.waitTillComplete();

System.out.format("Registered the UDF module %s.", UDFFile);

Output:

Registered the UDF module sum_example.lua.

Define the query statement

The query statement includes elements such as namespace, set, bins to retrieve, and filter or predicate.

import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Value;

int begin = 4;
int end = 7;

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames(BinInt, BinStr);
stmt.setFilter(Filter.range(BinInt, begin, end));
System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d",
Namespace, Set, BinInt, begin, end);

Output:

Query on ns=test set=demo, with bin binint >= 4 <= 7

Include the aggregate processing and its parameters in the query statement.

String UDFModule = "sum_example";
String UDFFunction = "sum_single_bin";
stmt.setAggregateFunction(UDFModule, UDFFunction, Value.get(BinInt));

System.out.format("Aggregate function %s added for server processing.", UDFFunction);

Output:

Aggregate function sum_single_bin added for server processing.

Execute the query

Let's now execute the query.

import com.aerospike.client.query.ResultSet;

ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed the query with UDF; got results.");

Output:

Executed the query with UDF; got results.

Process results

The expected sum for the records from 4 to 7 (both inclusive) is 4+5+6+7 = 22.

System.out.println("Processing results:");
try {
int expected = 22; // 4 + 5 + 6 + 7
int count = 0;

while (rs.next()) {
Object object = rs.getObject();
long sum;

if (object instanceof Long) {
sum = (Long)rs.getObject();
}
else {
System.out.println("Return value not a long: " + object);
continue;
}

if (expected == (int)sum) {
System.out.format("Sum matched! Value=%d.", expected);
}
else {
System.out.format("Sum mismatch: Expected %d. Received %d.", expected, (int)sum);
}
count++;
}

if (count == 0) {
System.out.println("Query failed. No records returned.");
}
}
finally {
rs.close();
}

Output:

Processing results:
Sum matched! Value=22.

Part 2: Query, predicate expression, and UDF update

We will illustrate an update UDF function with a query and predicate expression.

Let's say we want to:

  • update all records by multiplying the integer bin value by 5
  • that have the bin value between 2 and 9,
  • AND whose string bin value have either "Smith" or "Jones" in them.

Records with user-keys 3, 4, 6 and 8 meet these conditions.

This update can be achieved in different ways using a combination of query, predicate expression, and UDF. For the purpose of this exercise, we use a query with the "between" predicate, a predicate expression for string comparison, and a UDF to update the integer bin.

Let's start defining them one by one starting with a new UDF.

Create UDF module with update function

Examine the code below, It simply multiplies a bin value by the input factor and updates the record.

Create a "udf" directory and place update_example.lua file with this content in it. Alternatively, execute the following cell to achieve the same effect.

-- update_example.lua 

function multiplyBy(rec, binName, factor)
rec[binName] = rec[binName] * factor
aerospike:update(rec)
end

// Execute this cell to create UDF module "udf/update_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.

String luaCode =
"-- update_example.lua" + "\n" +
"" + "\n" +
"function multiplyBy(rec, binName, factor)" + "\n" +
" rec[binName] = rec[binName] * factor" + "\n" +
" aerospike:update(rec)" + "\n" +
"end";
CreateUDFModule("update_example.lua", luaCode);

System.out.format("Lua module %s created.", "udf/update_example.lua"); //</pre>

Output:

Lua module udf/update_example.lua created.

Register the UDF module

String UDFFile = "update_example.lua";
RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();

System.out.format("Registered the UDF module %s.", UDFFile);

Output:

Registered the UDF module update_example.lua.

Define the query statement

Specify the namespace, set, bins, and query filter.

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames(BinInt, BinStr);
int begin = 3;
int end = 9;
// Filter is evaluated using a secondary index and therefore can only reference an indexed bin.
stmt.setFilter(Filter.range(BinInt, begin, end));
System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d",
Namespace, Set, BinInt, begin, end);

Output:

Query on ns=test set=demo, with bin binint >= 3 <= 9

Define the predicate expression filter

In addition to the predicate in the query (which requires a secondary index), additional filtering can be specified using a predicate expression. A predicate expression is specified as part of the request policy and does not require a secondary index. It is evaluated on each record returned after applying the query predicate, and only the records that evaluate True are processed further (in this case for update with the UDF function).

Here the predicate expression is the string bin has either "smith" or "jones" in it. We use an expression with an OR clause that combines two regular expression matches.

// Predicate Expressions are applied on query results on server side.
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.query.RegexFlag;

WritePolicy policy = new WritePolicy(client.writePolicyDefault);
policy.filterExp = Exp.build(
Exp.or(
Exp.regexCompare(".*smith.*", RegexFlag.ICASE, Exp.stringBin(BinStr)),
Exp.regexCompare(".*jones.*", RegexFlag.ICASE, Exp.stringBin(BinStr))));

System.out.format("Predicate Expression: (valstr ilike '%%smith%%' || valstr ilike '%%jones%%')");

Output:

Predicate Expression: (valstr ilike '%smith%' || valstr ilike '%jones%')

Execute the UDF update on filtered records

// Execute the update UDF function on records that match the statement filter and policy filter. 
// Records are not returned to the client. This asynchronous server call will return
// before the command is complete. The user can optionally wait for command completion
// by using the returned ExecuteTask instance.

import com.aerospike.client.task.ExecuteTask;
import com.aerospike.client.Value;

int MultiplicationFactor = 5;
ExecuteTask task = client.execute(policy, stmt, "update_example", "multiplyBy",
Value.get(BinInt), Value.get(MultiplicationFactor));
task.waitTillComplete(3000, 0); // poll time 3s, no timeout

System.out.format("Executed the query and filter expression and applied UDF update to records.");

Output:

Executed the query and filter expression and applied UDF update to records.

View updated records.

Remember records 3, 4, 6, and 8 should have received the update, that is, their binint values should be multipled by the specified factor (5).

import com.aerospike.client.Record;
for (int i = 1; i <= NumRecords; i++) {
Key key = new Key(Namespace, Set, i);
Record record = client.get(null, key, BinInt, BinStr);
System.out.println(record);
}

Output:

(gen:1),(exp:351567276),(bins:(binint:1),(binstr:1. Clark))
(gen:1),(exp:351567276),(bins:(binint:2),(binstr:2. Keenan))
(gen:2),(exp:351567283),(bins:(binint:15),(binstr:3. Smith))
(gen:2),(exp:351567283),(bins:(binint:20),(binstr:4. Jones))
(gen:1),(exp:351567276),(bins:(binint:5),(binstr:5. Clark))
(gen:2),(exp:351567283),(bins:(binint:30),(binstr:6. Jones))
(gen:1),(exp:351567276),(bins:(binint:7),(binstr:7. Iyer))
(gen:2),(exp:351567283),(bins:(binint:40),(binstr:8. Smith))
(gen:1),(exp:351567276),(bins:(binint:9),(binstr:9. Hernandez))
(gen:1),(exp:351567276),(bins:(binint:10),(binstr:10. Smith))

Perform clean up

client.dropIndex(null, Namespace, Set, IndexName);
client.close();
System.out.println("Index dropped and server connection closed.");

Output:

Index dropped and server connection closed.

Explore other query, expression, and UDF capabilities

Feel free to check out the code example in the repo, and also explore other examples, and capabilities of queries, expression, and UDF.

Next steps

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.