Skip to main content

Processing Query Results as a Stream of Records

For an interactive Jupyter notebook experience: Binder

This tutorial shows processing of query results as a stream of records and related capabilities.

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Introduction

The notebook shows how to:

  • process query results as a stream of records,
  • paginate over results,
  • partition a query for parallelism, and
  • resume query execution at a later time.

Refer to the adjunct blog post Working with Query Result Streams for more information.

Prerequisites

This tutorial assumes familiarity with the following topics:

Setup

Ensure database is running

This notebook requires that Aerospike database is running.

import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

Download and install additional components.

Install the Java client.

%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>6.1.0</version>
</dependency>
</dependencies>

Initialize Client

Initialize the client that can be used for both sync and async processing modes.

Initialize event loops for async processing mode

We will use async processing using NIO event loops, but the other event loop types may also be used. The event loops initialization is needed only if asynchronous API calls are used.

import java.util.concurrent.atomic.AtomicInteger;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventLoop;
import com.aerospike.client.async.Throttles;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.listener.RecordSequenceListener;

// initialize event loops
final int NumLoops = 2;
final int CommandsPerEventLoop = 50;
final int DelayQueueSize = 50;

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = CommandsPerEventLoop;
eventPolicy.maxCommandsInQueue = DelayQueueSize;
EventLoops eventLoops = new NioEventLoops(eventPolicy, NumLoops);

// initialize event loop throttles
Throttles throttles = new Throttles(NumLoops, CommandsPerEventLoop);

System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n",
NumLoops, CommandsPerEventLoop);;

Output:

Throttles initialized for 2 loops with 50 concurrent operations per loop.

Initialize client with event loops

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;

ClientPolicy clientPolicy = new ClientPolicy();

// needed only if async apis are used
clientPolicy.eventLoops = eventLoops;
int concurrentMax = CommandsPerEventLoop * NumLoops;
if (clientPolicy.maxConnsPerNode < concurrentMax) {
clientPolicy.maxConnsPerNode = concurrentMax;
}

// initialize the client
Host[] hosts = Host.parseHosts("localhost", 3000);
AerospikeClient client = new AerospikeClient(clientPolicy, hosts);

System.out.println("Initialized the client and connected to the cluster.");;

Output:

Initialized the client and connected to the cluster.

Includes and Constants

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.client.query.PartitionStatus;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.Record;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;

final String Namespace = "test";
final String SetIndexed = "indexed";
final String SetUnindexed = "unindexed";
final String KeyPrefix = "id-";
final Integer NumRecords = 10000;

Populate Test Data.

The test data consists of NumRecords records in each set, each with a user key "id-\<i>", an integer bin "bin1" with value i, and another integer bin with value 10*i, where 1 \<= i \<= NumRecords.

The set SetIndexed has a set index and an integer secondary index on "bin1". The set SetUnindexed has no set or secondary index, and is used to illustrate primary index query functionality.

// convenience function to truncate test data
void truncateTestData() {
try {
client.truncate(null, Namespace, null, null);
}
catch (AerospikeException e) {
// ignore
}
}

// convenience function to initialize test data
void initializeTestData() {
truncateTestData();
WritePolicy wPolicy = new WritePolicy(client.writePolicyDefault);
wPolicy.sendKey = true;
for (int i=0; i < NumRecords; i++) {
Bin bin1 = new Bin("bin1", i+1);
Bin bin2 = new Bin("bin2", 10*(i+1));
Key key1 = new Key(Namespace, SetIndexed, KeyPrefix+(i+1));
Key key2 = new Key(Namespace, SetUnindexed, KeyPrefix+(i+1));
try {
client.put(wPolicy, key1, bin1, bin2);
client.put(wPolicy, key2, bin1, bin2);
}
catch (AerospikeException e) {
System.out.format("%s", e);
}
}
}
initializeTestData();
System.out.println("Test data populated.");;

Output:

Test data populated.

Create Indexes

The system defined primary index already exists for the namespace. We will create a secondary index and a set index on the set SetIndexed in order to show a secondary index and set index query (scan) capabilities using this set.

The set SetUnindexed does not have a secondary or set index, which means a query (scan) of this set must use the primary index. We will use this set to show the primary index query (scan) capabilities.

Create Secondary Index

final String IndexName = "idx_indexed_bin1_number";

try {
IndexTask task = client.createIndex(null, Namespace, SetIndexed, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}

System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, SetIndexed, "bin1");;

Output:

Created index idx_indexed_bin1_number on ns=test set=indexed bin=bin1.

Create Set Index

// Enable set index on the set 'indexed'.
%sh asinfo -v "set-config:context=namespace;id=test;set=indexed;enable-index=true"
System.out.println("Set index created on set 'indexed'.");;

Output:

Set index created on set 'indexed'.

Define Convenience Functions

Define convenience functions to process results, which simply involves printing them.

// a convenience function to process a record which simply prints its user key and bins
void processRecord(Key key, Record rec) {
System.out.format("Record key: %s, bins: %s\n", key.userKey, rec.bins);
}

// a convenience function to process results
void processResults(RecordSet rs) {
int recs = 0;
try {
while (rs.next()) {
recs++;
Key key = rs.getKey();
Record rec = rs.getRecord();
processRecord(key, rec);
}
}
finally {
rs.close();
}
}

Overview

The main sections in the notebook are:

  • Query results as a record stream
  • Pagination
  • Parallelism with query partitions
  • Resuming with partition cursors

Query Results as a Stream of Records

The following examples show how all results are retrieved with one request and processed as a record stream:

  • secondary index query results in a sync and an async request
  • set index query (scan) with an expression filter results in a sync request
  • primary index query (scan) with an expression filter results in a sync request

Note that an expression filter is different from the query filter. The former can be used with any type of query and is specified in the query policy, whereas the latter can only be used with a secondary index query and is specified in the query statement.

When the query filter is null or unspecified in a query, the query is executed as a set scan using a set index, if one exists, or the primary index.

In the examples below, we use the expression filter only with the set and primary index queries (scans) to make the returned results equivalent to those from the secondary index query.

Secondary Index Query

The secondary index filter is specified in the query statement. When a query filter is specified, the corresponding secondary index must exist, otherwise the query returns an error.

Sync Processing

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);

Output:

Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-12, bins: {bin1=12, bin2=120}

Async Processing

The query statement is the same, but the setup of the async request is more involved. Please see the tutorial Understanding Asynchronous Operations for details.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1

// async request framework
// query monitor synchronizes the main thread with completion of the query
Monitor queryMonitor = new Monitor();
// submit async operation with throttle by waiting for an available slot
EventLoop eventLoop = eventLoops.next();
int eventLoopIndex = eventLoop.getIndex();
if (throttles.waitForSlot(eventLoopIndex, 1)) {
try {
// the async callback object has three methods: onRecord, onSuccess, onFailure
client.query(eventLoop, new RecordSequenceListener() {
// called for each record
public void onRecord(Key key, Record rec) throws AerospikeException {
processRecord(key, rec);
}
// called on successful completion
public void onSuccess() {
throttles.addSlot(eventLoopIndex, 1);
queryMonitor.notifyComplete(); // unblock the main thread
}
// called in case of a failure
public void onFailure(AerospikeException e) {
throttles.addSlot(eventLoopIndex, 1);
System.out.format("Error: query failed with exception - %s", e);
queryMonitor.notifyComplete();
}
},
qPolicy, stmt);
}
catch (Exception e) {
System.out.format("Error: exception in record sequence listener - %s\n", e.getMessage());
}
}
// the main thread waits for the query to complete
queryMonitor.waitTillComplete();

Output:

Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}

Set Index Query (Scan)

The set is scanned when the query filter is not specified using a set index if it is available.

We use an equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial Understanding Expressions for the details on expressions.

// using the set index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);

Output:

Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}

Primary Index Query (Scan)

The set is scanned when the query filter is not specified using the primary index when a set index is not available.

We use an equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial Understanding Expressions for the details on expressions.

// using the primary index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);

Output:

Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-10, bins: {bin1=10, bin2=100}

Pagination

An application can get query results in chunks by specifying maximum number of records returned in a single response, and iterating until all results are retrieved using queryPartitions API call. The partitionFilter associated with the query supports the isDone test to check if there are more records to process in the stream.

Pagination for queries on all index types is shown below.

Paginating Secondary Index Query Results

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);

PartitionFilter pFilter;
pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}

Output:

Page 1: 
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Page 3:
Record key: id-11, bins: {bin1=11, bin2=110}

Paginating Set Index Query Results

The set index is used for SetIndexed.

// using the set index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);

PartitionFilter pFilter;
pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}

Output:

Page 1: 
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Page 3:
Record key: id-11, bins: {bin1=11, bin2=110}

Paginating Primary Index Query Results

The primary index is used as there is no set index defined on SetUnindexed.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);

PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}

Output:

Page 1: 
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-10, bins: {bin1=10, bin2=100}
Page 3:
Record key: id-12, bins: {bin1=12, bin2=120}

Parallelism with Query Partitions

The queryPartitions API provides the ability to select specific partitions so that the application can control how to distribute work over multiple workers for the desired level of parallelism, with each worker processing the query over its assigned partitions.

Below, 4096 partitions are split across three sub-queries. We execute the sub-queries sequentially, but it is easy to imagine them being assigned to individual workers or threads and processed in parallel.

A secondary index query is shown below, but it also works with set and primary index queries. The code will be as shown in the earlier examples.

You can define queries at a granularity finer than a partition. For detailed discussion of parallelism, refer to the blog post Processing Large Data Sets in Fine-Grained Parallel Streams.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1

// create multiple sub-queries that divide and cover 0-4095 partitions
PartitionFilter pFilter1, pFilter2, pFilter3;
pFilter1 = PartitionFilter.range(0, 1366); // 0-1365 partitions
pFilter2 = PartitionFilter.range(1366, 1366); // 1366-2731 partitions
pFilter3 = PartitionFilter.range(2732, 1364); // 2732-4095 partitions

// run the sub-queries
System.out.format("Subquery 1: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter1);
processResults(rs);

System.out.format("Subquery 2: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs);

System.out.format("Subquery 3: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter3);
processResults(rs);

Output:

Subquery 1: 
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Subquery 2:
Record key: id-9, bins: {bin1=9, bin2=90}
Subquery 3:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}

Resuming a Query

A query can be resumed from the point where the result stream processing is left off.

Use the queryPartitions API to resume a query.

The queryPartitions API allows the application to get the partition cursors using the getPartitions call. The partition cursors mark points in corresponding partitions from which the query request can resume. The cursor state can be set in another query request to resume processing. Note that a returned stream from a sync request must be read completely in order to resume the query correctly.

The code examples below illustrate:

  • Resume the same query
  • Set partitions state in a different query

Resuming Same Query

Read partially from the stream, and resubmit the query request using the same query instance to obtain a new stream for the next results. Note all records are returned across the two calls.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions

stmt.setMaxRecords(3); // request 3 results
System.out.format("Paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);

// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();

System.out.format("\nResumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);

Output:

Paused after 3 results: 
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}

Resumed after 3 results:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}

Setting State in Different query

Read partially from the stream, and resume it later in a different programming context by submitting a new query request in which the saved cursor state is reinstated. The cursor state is serialized and deserialized between the two calls to iluustrate arbitrarily separate programming contexts.

Note all records are returned across the two calls.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions

stmt.setMaxRecords(3); // request 3 results
System.out.format("Paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);

// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();

// serialize to save cursors
import org.apache.commons.lang3.SerializationUtils;
byte[] serialized = SerializationUtils.serialize(cursors);

System.out.format("\nResumed after 3 results - with a new query: \n");
// a new query with the same query parameters
QueryPolicy qPolicy2 = new QueryPolicy();
Statement stmt2 = new Statement();
stmt2.setNamespace(Namespace);
stmt2.setSetName(SetIndexed);
stmt2.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter2 = PartitionFilter.all(); // include all data partitions

// cursors are set to resume the query from the saved state
// deserialize to restore
InputStream instr = new ByteArrayInputStream(serialized);
ObjectInputStream obj = new ObjectInputStream(instr);
PartitionStatus[] cursors2 = (PartitionStatus[]) obj.readObject();
pFilter2.setPartitions(cursors2);

stmt2.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy2, stmt2, pFilter2);
processResults(rs2);

Output:

Paused after 3 results: 
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}

Resumed after 3 results - with a new query:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}

Cursors with Set Index Query

Query resume works as expected with set index queries.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions

stmt.setMaxRecords(3); // request 3 results
System.out.format("Set index scan paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);

// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();

System.out.format("\nSet index scan resumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);

// set cursors for a set index query
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setMaxRecords(3);
stmt.setFilter(null);
QueryPolicy qPolicy = new QueryPolicy();
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(3)), Exp.le(Exp.intBin("bin1"), Exp.val(7)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all();

Output:

Set index scan paused after 3 results: 
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-10, bins: {bin1=10, bin2=100}

Set index scan resumed after 3 results:
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}

Cursors with Primary Index Query

Query resume works as expected with primary index queries.

QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions

stmt.setMaxRecords(3); // request 3 results
System.out.format("Primary index scan paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);

// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();

System.out.format("\nPrimary index scan resumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);

Output:

Primary index scan paused after 3 results: 
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-13, bins: {bin1=13, bin2=130}

Primary index scan resumed after 3 results:
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-12, bins: {bin1=12, bin2=120}

Takeaways

The notebook showed code examples for how to process query results as a stream of records, paginate over results, partition a query for parallelism, and resume query execution at a later time.

Further Exploration and Resources

Here are some links for further exploration

Resources