Blog

Working with Query Result Streams

Author's photo
Neel Phadnis
Director - Developer Ecosystem
March 28, 2023|6 min read

This article describes how queries in Aerospike allow applications to process query results as a stream of records, paginate over results, partition a query for parallelism, and resume execution at a later time.

Stream of Records

Results of a query are returned to the application as a stream of records. This is true for all query requests and types including simple and partition-based, sync and async, short and long, as well as with and without UDFs. The one exception is background queries, which are actually background updates and therefore do not return any results.

Since a query can be combined with an expression filter, it provides a stream access to records meeting any condition.

Query Execution

The client library executes a query in a scatter-gather fashion: it sends the query request to multiple cluster nodes in parallel, gathers the responses, and returns the combined results to the application.

While all required server nodes are involved during the course of a query execution, all nodes do not participate concurrently in a query execution. The policy parameter max-concurrent-nodes controls the maximum number of concurrent nodes.

Similarly, all results from a server node are not returned together. The client library requests results from server nodes in chunks by specifying the maximum number of records to be returned. Each node returns the specified (or fewer) number of records, along with the cursor or marker that marks the current position in processing, and from where processing will resume in a following request. Note that a query is processed using an index (either primary, set, or secondary), and an index is internally organized by data partitions in which records are distributed. Partition cursors are processing positions in partition indexes, and they are not affected by whether and how records change.

In processing query results, the application can:

  • Read and process all records in the stream.

  • Read partially from the stream, and iterate the request until all records are read. Each iteration resumes from the partition cursors.

  • Read partially from the stream, save the cursor state, and resume it later in a different programming context by submitting the query with the saved cursor state.

The application can also stop processing results at any point.

You can follow along with code examples in this interactive tutorial.

Sync and Async Requests

Sync Requests

In a sync query request, the query returns a stream handle. In the same thread, the application reads records from the stream.

Async Requests

In the async mode, each record in the stream is returned to the application through a method in the registered callback class. The class implements three methods:

  • to process each record,

  • to notify successful completion at the end of the stream, and

  • to notify failure at any point. The callback functions are called in a separate thread from the request thread.

You can view the sync and async request code examples here.

Partition Queries

The application can have greater control over:

  • The flow, by breaking the stream in specified chunks.

  • Parallel processing, by limiting the scope to a subset of the partitions, and spreading a query processing over multiple workers where each worker is responsible for a specific subset of partitions.

These capabilities are possible through the new query-partitions API. The simpler but less functional query API retrieves a maximum number of records, but does is not able to obtain subsequent chunks or to specify partitions.

Pagination

Query results can be requested repeatedly in smaller chunks until all results are returned. The ability to retrieve a specific number of records at a time is called pagination. It protects the application from being overwhelmed by a large number of results, and the result buffer space from being tied up. The number of results returned can be smaller than the requested number of records.

The same query statement and partition filters are used to obtain the next pageful of records from the stream. Partition-filter has the is-done API to check if there are more records remaining to be read from the stream.

You can view the pagination code examples here.

Specific Partitions

The query-partitions request provides the ability to select specific partitions so that the application can control how to distribute work over multiple workers for the desired level of parallelism, with each worker processing the query over its assigned partitions. The API allows a single partition, a range of partitions, or all partitions to be selected. The client library determines the nodes that hold the requested partitions, and sends respective sub-queries to them

Check out an example of querying specific partitions here.

Partition Cursors

The query-filter object provides a get-partitions call to obtain the partition cursors. The partition cursors mark points in the corresponding partitions from which the query request will resume. The cursor state can be used in another query request to resume processing.

View examples of resuming queries here.

Fine-Grained Parallelism

You can define queries at a granularity finer than a partition. We discuss this in the blog post Processing Large Data Sets in Fine-Grained Parallel Streams.

What Query Is Not

Query on Change Data

Queries do not run on the change data capture (CDC) stream. A subsequent query request using the same partition-filter will not return all new records that have met the query criterion since. It only returns any new records that appear after the cursor positions in the previous query. This means that none, some, or all changes since the last request may be returned.

Ordered or Direct Access

The query results are not in any order that is significant to the application. Any sort order must be computed within the application, and all results must be retrieved before they are sorted. Similarly, direct access to a record at a specific position in the stream is not supported; the application must first retrieve all preceding records.


Queries constitute a core functionality of any database. This post describes some key query capabilities available to Aerospike applications such as how to process query results as a stream of records, paginate over results, partition a query for parallelism, and resume query execution at a later time. View code examples in this interactive notebook