Skip to main content

Splitting Large Data Sets for Parallel Processing of Queries

For an interactive Jupyter notebook experience: Binder

This tutorial describes multiple schemes for dividing a large data set into equal splits for parallel processing of queries, and a test framework with multiple parameters.

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Introduction

This notebook walks through:

  • implementing multiple schemes to divide a data set in Aerospike into equal splits that can be queried in parallel.
  • creating a test framework that allows the splits to be processed over a range of parameters such as the number of workers, query and filter options, processing mode, and more.
  • running various combinations of these parameters with the test data, and ensure the computation results remain the same for specific query and filter choices.

Please refer to the adjunct blog post Processing Large Data Sets with Fine Grained Streams for additional discussion.

The specific topics covered in this notebook include:

  • Understanding how the digest-module scheme for dividing a partition into multiple sub-partitions averts disk io bottleneck.
  • Schemes for dividing data equally over an arbitrary number of splits.
  • A test framework to process the splits in parallel using a range of parameters such as the number of workers, query and filter options, processing function, and sync and async modes.
  • Putting it to test with different values for splits, workers, query, filter, and processing mode.

Prerequisites

This tutorial assumes familiarity with the following topics:

Setup

Ensure database is running

This notebook requires that Aerospike database is running.

import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

Download and install additional components.

Install the Java client.

%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>6.1.0</version>
</dependency>
</dependencies>

Initialize Client

Initialize the client that can be used for both sync and async processing modes.

Initialize event loops for async processing mode

We will use async processing using NIO event loops, but the other event loop types may also be used.

import java.util.concurrent.atomic.AtomicInteger;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventLoop;
import com.aerospike.client.async.Throttles;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.listener.RecordSequenceListener;

// initialize event loops
final int NumLoops = 2;
final int CommandsPerEventLoop = 50;
final int DelayQueueSize = 50;

EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = CommandsPerEventLoop;
eventPolicy.maxCommandsInQueue = DelayQueueSize;
EventLoops eventLoops = new NioEventLoops(eventPolicy, NumLoops);

// initialize event loop throttles
Throttles throttles = new Throttles(NumLoops, CommandsPerEventLoop);

System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n",
NumLoops, CommandsPerEventLoop);;

Output:

Throttles initialized for 2 loops with 50 concurrent operations per loop.

Initialize client with event loops

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;

// initialize the client
final int MaxConnPerNode = 10000; // adjust accordingly for max workers
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.maxConnsPerNode = MaxConnPerNode;
clientPolicy.eventLoops = eventLoops;
int concurrentMax = CommandsPerEventLoop * NumLoops;
if (clientPolicy.maxConnsPerNode < concurrentMax) {
clientPolicy.maxConnsPerNode = concurrentMax;
}
Host[] hosts = Host.parseHosts("localhost", 3000);
AerospikeClient client = new AerospikeClient(clientPolicy, hosts);

System.out.println("Initialized the client and connected to the cluster.");;

Output:

Initialized the client and connected to the cluster.

Includes and Constants

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.Record;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.ResultCode;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.BatchWrite;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.BatchWritePolicy;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;

final String Namespace = "test";
final String Set = "query-splits";
final String KeyPrefix = "id-";
final Integer NumRecords = 100000; // CHANGE TO THE REQUIRED NUMBER OF TEST DATA RECORDS

final int MIN_NUM_SPLITS = 0; // at-least the requested number of splits
final int EXACT_NUM_SPLITS = 1; // exactly the requested number of splits
final int MAX_NUM_SPLITS = 2; // at-most the requested number of splits
final int EXACT_NUM_SPLITS_SLICED = 3; // exactly the requested number of splits, a slice in each partition
final int PROCESSING_MODE_SYNC = 0; // sync processing mode
final int PROCESSING_MODE_ASYNC = 1; // async processing mode
final int PRIMARY_INDEX_QUERY = 0; // primary index query (scan)
final int SECONDARY_INDEX_QUERY = 1; // secondary index query - uses secondaryIndexQueryPredicate
final int QUERY_FILTER_NONE = 0; // no additional filter in query
final int QUERY_FILTER_INCLUDE = 1; // include the specified filter - uses includeQueryFilterExp

Populate Test Data.

The test data consists of NumRecords records, each with a user key "id-\<i>", an integer bin "bin1" with value i, and another integer bin with value 10*i, where 1 \<= i \<= NumRecords. An integer secondary index is created on "bin1".

// convenience function to truncate test data
void truncateTestData() {
try {
client.truncate(null, Namespace, Set, null);
}
catch (AerospikeException e) {
// ignore
}
}

// convenience function to initialize test data
void initializeTestData() {
truncateTestData();
// Insert in batches for speed
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
BatchWritePolicy wpolicy = new BatchWritePolicy();
wpolicy.sendKey = true;
int batchMaxSize = 100;
int recordIdx = 0;
while (recordIdx < NumRecords) {
int batchSize = Math.min(batchMaxSize, NumRecords-recordIdx);
List<BatchRecord> batchRecords = new ArrayList<BatchRecord>();
for (int i=0; i < batchSize; i++) {
Operation[] ops = Operation.array(
Operation.put(new Bin("bin1", Value.get(recordIdx+1))),
Operation.put(new Bin("bin2", Value.get(10*(recordIdx+1)))),
Operation.put(new Bin("bin3", Value.get(new byte[1000]))));
batchRecords.add(new BatchWrite(wpolicy, new Key(Namespace, Set, KeyPrefix + (recordIdx+1)), ops));
recordIdx++;
}
// execute the batch
try {
boolean status = client.operate(bPolicy, batchRecords);
if (!status) {
System.out.println("Some batch operations failed.");
}
}
catch (AerospikeException e) {
System.out.format("%s", e);
}
}
}
initializeTestData();
System.out.println("Test data populated.");;

Output:

Test data populated.
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.

final String IndexName = "idx_bin1_number_idx";

try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}

System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;

Output:

Created index idx_bin1_number_idx on ns=test set=query-splits bin=bin1.

Overview

The main sections consist of:

  • Parallel IO Streams within Partition with Digest-Modulo
  • Dividing Partitions into Splits
  • Parallel Query Framework
  • Experimenting with Parameter Variations

Note:

  • The notebook has server restarts. A restart can take some time before the server is ready to accept requests. If you see client errors after a restart, wait a bit longer.
  • The kernel could die if a specific run exceeds the resources available. In this case, reduce the number of workers below the breaking point in all cells, or if possible, increase the container resource limits.

Parallel IO Streams within Partition with Digest-Modulo

In this section, we will establish why parallel io streams makes sense by dividing a partition into subpartitions. A parittion is divided into M subpartitions using digest-modulo % M = s, for s=0 to M-1. Each subpartition stream evaluates the digest-modulo expression for every record in the partition, but reads from device only the records in the subpartition. We will show the digest-modulo evaluation is much faster than reading a record from the device.

To illustrate the difference, we will change the default config to eliminate buffering of data and force record data to be read from device. This allows us see the difference in speeds when a filter is computed purely in memory versus when it requires data on disk. We will evaluate the following filters:

  • Metadata or digest-modulo filter: digest % 100 = -1. This condition is not true for any record, and no record is returned. However it can be evaluated from memory (a record digest is held in memory in the primary index) without reading the record.
  • Data filter: bin1 = -1. This condition also is not true for any record, so no record is returned. But to evaluate it, a record must be read from the disk.

The average query execution time for the two filters is printed. Note the difference and the fact that a metadata filter is much faster. For larger amount of data and/or slower device, the speed difference will be more pronounced.

// to run the test meaningfully, configure the namespace for unbuffered io and restart server
// set data-in-memory in the default config to false, add direct-files=true for unbuffered io
%sh cp /etc/aerospike/aerospike.conf /home/jovyan/notebooks/java/aerospike.conf
%sh sed -i -e "s/data-in-memory true.*/data-in-memory false/" -e "/data-in-memory.*/a direct-files true" /home/jovyan/notebooks/java/aerospike.conf
// start server with the modified config, sleep 10s for server to be ready to process requests
%sh pkill asd
%sh asd --config-file /home/jovyan/notebooks/java/aerospike.conf
%sh sleep 10
System.out.println("Server restarted with unbuffered io.");;

Output:

Server restarted with unbuffered io.
// function that runs many iterations of a query and returns average execution time
long getAvgQueryExecutionTimeMillis(QueryPolicy qPolicy, Statement stmt) {
int numIters = 2;
long startTime = System.currentTimeMillis();
for (int i=0; i < numIters; i++) {
RecordSet rs = client.query(qPolicy, stmt);
try {
while(rs.next()) {
Record rec = rs.getRecord();
// do something with the record - ignore here
}
}
finally {
rs.close();
}
}
long endTime = System.currentTimeMillis();
return (endTime - startTime)/numIters;
}

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames("bin1", "bin2");
stmt.setFilter(null); // this query filter is null for a primary-index query
QueryPolicy qPolicy = new QueryPolicy(client.queryPolicyDefault);
qPolicy.socketTimeout = 120000; // 2 min
qPolicy.maxRetries = 0; // do not retry


// first, time the metadata filter
// metadata filter is a digest-modulo expression which is always false
Exp metadataFilterExp = Exp.eq(Exp.digestModulo(100), Exp.val(-1));
qPolicy.filterExp = Exp.build(metadataFilterExp); // filter expression is specified in policy
System.out.println("Average time for a metadata filter: " + getAvgQueryExecutionTimeMillis(qPolicy, stmt) + " milliseconds.");

// next, time the data filter
// data filter is a equality expression which is always false
Exp dataFilterExp = Exp.eq(Exp.intBin("bin1"), Exp.val(-1));
qPolicy.filterExp = Exp.build(dataFilterExp); // filter expression is specified in policy
System.out.println("Average time for a data filter: " + getAvgQueryExecutionTimeMillis(qPolicy, stmt) + " milliseconds.");


Output:

Average time for a metadata filter: 333 milliseconds.
Average time for a data filter: 3393 milliseconds.
// choose to revert to the default config with data-in-memory=true 
// so that the scenarios in this notebook will run quickly.
%sh pkill asd
%sh asd
// sleep 10s for server to be ready to process requests
%sh sleep 10
System.out.println("Server restarted with the default config.");;

Output:

Server restarted with the default config.

Dividing Partitions into Splits

The main function assignSplits() returns an array of assignments across the requested number of splits. Each assignment has:

Partial or full partition assignment A full partition assignment has the range of partitions, specified as the start partition id and count.

A partial partition assignment has a range of subpartitions, specified as the start subpartition id, count, and modulo (division) factor. The subpartition range may be defined over one or more partitions.

The assignment parameters directly correspond to the PartitionFilter specification in the queryPartitions API. For example, in Java, the query over full parttiions can be translated as follows:

Statement stmt = new Statement(); stmt.setNamespace(namespace); stmt.setSetName(setName); stmt.setBinNames(bin1, ...); stmt.setMaxRecords(chunkSize); PartitionFilter pFilter = PartitionFilter.range(startPartitionId, partitionCount); RecordSet rs = client.queryPartitions(qPolicy, qStatement, pFilter);

A query over sub-partition assignments is translated as follows:

PartitionFilter pFilter = PartitionFilter.range(startPartitionId, partitionCount) QueryPolicy qPolicy = new QueryPolicy(client.queryPolicyDefault); qPolicy.filterExp = Exp.build( Exp.and( Exp.ge(Exp.moduloDigest(moduloFactor), startSubpartitionId)), Exp.le(Exp.moduloDigest(moduloFactor), startSubpartitionId+subpartitionCount-1))); RecordSet rs = client.queryPartitions(qPolicy, qStatement, pFilter);

Each chunk of chunkSize records is processed, and the query is re-issued until pFilter.isDone() returns true.

class QueryPartition {
boolean isFullPartitionAssignment; // false for sub-partition assignments
Integer startPartitionId;
Integer partitionCount;
Integer subpartitionModuloFactor;
Integer startSubpartitionId;
Integer subpartitionCount;

QueryPartition (int startPartitionId, int count) {
this.isFullPartitionAssignment = true;
this.startPartitionId = startPartitionId;
this.partitionCount = count;
}

QueryPartition(int startPartitionId, int partitionCount,
int moduloFactor, int startSubpartitionId,
int subpartitionCount) {
this.isFullPartitionAssignment = false;
this.startPartitionId = startPartitionId;
this.partitionCount = partitionCount;
this.subpartitionModuloFactor = moduloFactor;
this.startSubpartitionId = startSubpartitionId;
this.subpartitionCount = subpartitionCount;
}
}

class SplitAssignment {
List<QueryPartition> queryPartitions = new ArrayList<QueryPartition>();

SplitAssignment(QueryPartition qp) {
this.queryPartitions.add(qp);
}

void add(QueryPartition qp) {
this.queryPartitions.add(qp);
}
}

final int FactorsOf4096[] = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096};

SplitAssignment[] assignMaxSplits(int reqSplits) {
int numSplits = 0;
SplitAssignment[] assignments = null;
if (reqSplits < 2*4096) {
// Case 1: N < 2*4096: Full partition assignments
// Closest approximation of splits, numSplits: factor of 4096 that is <= reqSplits
for (int i = FactorsOf4096.length - 1; i >= 0; i--) {
if (FactorsOf4096[i] <= reqSplits) {
numSplits = FactorsOf4096[i];
break;
}
}
// Number of partitions in each split, count: 4096/numSplits
int count = 4096 / numSplits;
// Partitions in split i: i*count to (i+1)*n (upper bound exclusive)
assignments = new SplitAssignment[numSplits];
for (int i = 0; i < numSplits; i++) {
QueryPartition qp = new QueryPartition(i*count, count);
assignments[i] = new SplitAssignment(qp);
}
} else {
// Case 2: N >= 2*4096: Sub-partition assignment
//Closest approximation of splits, numSplits: multiple of 4096 that is <= reqSplits
numSplits = 4096 * (int)Math.floor((double)reqSplits/4096);
//Number of sub-partitions per partition, s: numSplits/4096
int moduloFactor = numSplits/4096;
//Sub-partition in split i: (floor(i/s), i%s, s)
assignments = new SplitAssignment[numSplits];
for (int i = 0; i < numSplits; i++) {
QueryPartition qp = new QueryPartition((int)Math.floor((double)i/moduloFactor), 1,
moduloFactor, i % moduloFactor, 1);
assignments[i] = new SplitAssignment(qp);
}
}
return assignments;
}

SplitAssignment[] assignMinSplits(int reqSplits) {
int numSplits = 0;
SplitAssignment[] assignments = null;
if (reqSplits <= 4096) {
// Case 1: N <= 4096: Full partition assignments
// Closest approximation of splits, numSplits: factor of 4096 that is >= reqSplits
for (int f: FactorsOf4096) {
if (f >= reqSplits) {
numSplits = f;
break;
}
}
// Number of partitions in each split, count: 4096/numSplits
int count = 4096 / numSplits;
// Partitions in split i: i*count to (i+1)*n (upper bound exclusive)
assignments = new SplitAssignment[numSplits];
for (int i = 0; i < numSplits; i++) {
QueryPartition qp = new QueryPartition(i*count, count);
assignments[i] = new SplitAssignment(qp);
}
} else {
// Case 2: N > 4096: Sub-partition assignment
//Closest approximation of splits, numSplits: multiple of 4096 that is >= reqSplits
numSplits = 4096 * (int)Math.ceil((double)reqSplits/4096);
//Number of sub-partitions per partition, s: numSplits/4096
int moduloFactor = numSplits/4096;
//Sub-partition in split i: (floor(i/s), i%s, s)
assignments = new SplitAssignment[numSplits];
for (int i = 0; i < numSplits; i++) {
QueryPartition qp = new QueryPartition((int)Math.floor((double)i/moduloFactor), 1,
moduloFactor, i % moduloFactor, 1);
assignments[i] = new SplitAssignment(qp);
}
}
return assignments;
}

// the following function assigns exact number of splits that may be slightly different in size, but
// each split data can be requested using a single API call (the client library may distribute
// the request across multiple nodes). provided as another example, but we do not use it in the subsequent
// runs.
SplitAssignment[] assignExactSplitsSingleQuery(int reqSplits) {
int numSplits = 0;
SplitAssignment[] assignments = null;
if (reqSplits <= 4096) {
// Case 1: N <= 4096
// Each split is assigned full partitions only (maxModulo=1),
//. with some may get an additional partition.
// Evenly assign partitions over N splits.
// Number of full partitions per split, n: floor(4096/N)
int minPartitionsPerSplit = (int)Math.floor((double)4096/reqSplits);
// Remaining partitions r are allocated in full to splits 0 to r-1
int remainingPartitions = 4096 - minPartitionsPerSplit*reqSplits;
// Assign minPartitionsPerSplit+1 to the first remainingPartitions splits, and
// minPartitionsPerSplit to the rest.
int nextStartPartition = 0;
assignments = new SplitAssignment[reqSplits];
for (int i=0; i < reqSplits; i++){
if (i < remainingPartitions) {
QueryPartition qp = new QueryPartition(nextStartPartition, minPartitionsPerSplit+1);
assignments[i] = new SplitAssignment(qp);
nextStartPartition += minPartitionsPerSplit+1;
} else {
QueryPartition qp = new QueryPartition(nextStartPartition, minPartitionsPerSplit);
assignments[i] = new SplitAssignment(qp);
nextStartPartition += minPartitionsPerSplit;
}
}
} else {
// Case 2: N > 4096
// Each split is assigned one or two subpartitions in the same partition.
// Divide each partition into m (modulo-factor) subparittions: ceiling(N/4096) –
// the ceiling function ensures assignment across all N splits.
int moduloFactor = (int)Math.ceil((double)reqSplits/4096);
// 4096*moduloFactor sub-partitions are available to be assigned across reqSplits
// reqSplits sub-partitions are assigned one per split
// Remaining sub-partitions r = (4096*moduloFactor - reqSplits) are allocated to splits 0 to r-1.
int remainingSubPartitions = 4096*moduloFactor - reqSplits;
// Assign 2 sub-partitions to the first remainingSubPartitions splits while ensuring both
//. sub-partitions are from the same partition, and 1 sub-partition to the rest.
int twoSubsSplitIndex = 0; // varies from 0 to remainingSubPartitions-1
int oneSubSplitIndex = remainingSubPartitions; // varies from remainingSubPartitions to reqSplits
assignments = new SplitAssignment[reqSplits];
for (int i=0; i < 4096; i++) {
int subPartition = 0;
while (subPartition < moduloFactor) {
if (twoSubsSplitIndex < remainingSubPartitions &&
subPartition+1 < moduloFactor) { // both subpartitions must be in the same partition
QueryPartition qp = new QueryPartition(i, 1, moduloFactor, subPartition, 2);
assignments[twoSubsSplitIndex] = new SplitAssignment(qp);
twoSubsSplitIndex++;
subPartition += 2;
} else {
// single partition assignment
QueryPartition qp = new QueryPartition(i, 1, moduloFactor, subPartition, 1);
assignments[oneSubSplitIndex] = new SplitAssignment(qp);
oneSubSplitIndex++;
subPartition++;
}
}
}
}
return assignments;
}

// EXACT_SPLITS assigns exact number of splits of equal sizes.
// The algorithm divides each of the 4096 partitions into num-splits number of subpartitions.
// This results in 4096 * num-splits subpartitions, with each split getting 4096 subpartitions.
// The 4096 subpartitions are enumerated from subpartition 0 to num-splits in partitions starting from
// parttion 0 4096. Assignment for the next split starts after the last subpartion of the previous split.
// Thus a split has subpartitions belonging in one or more of the following 3 groups,
// each of which can be retrieved using one API call:
// 1. Consecutive 0-4095 subpartitions in the starting (partial) partition
// 2. Consecutive 0-4096 full partitions
// 3. Consecutive 0-4095 subpartitions in the ending (partial) partition
SplitAssignment[] assignExactSplits(int reqSplits) {
SplitAssignment[] assignments = new SplitAssignment[reqSplits];
int nextSlot = 0;
for (int i=0; i < reqSplits; i++) {
int remaining = 4096;
boolean noneAssigned = true;
// 1. Consecutive 0-4095 subpartitions in the starting (partial) partition
int partition = nextSlot / reqSplits;
int sub = nextSlot % reqSplits;
if (sub + remaining > reqSplits && sub > 0) {
int subCount = reqSplits-sub;
QueryPartition qp = new QueryPartition(partition, 1, reqSplits, sub, subCount);
assignments[i] = new SplitAssignment(qp);
noneAssigned = false;
remaining -= subCount;
nextSlot += subCount;
}
// 2. Consecutive 0-4096 full partitions
partition = nextSlot / reqSplits;
sub = nextSlot % reqSplits;
assert(sub == 0 || sub + remaining <= reqSplits);
int count = 0;
while (remaining >= reqSplits) {
count++;
remaining -= reqSplits;
nextSlot += reqSplits;
}
if (count > 0) {
QueryPartition qp = new QueryPartition(partition, count);
if (noneAssigned) {
assignments[i] = new SplitAssignment(qp);
noneAssigned = false;
} else {
assignments[i].add(qp);
}
}
// 3. Consecutive 0-4095 subpartitions in the ending (partial) partition
partition = nextSlot / reqSplits;
sub = nextSlot % reqSplits;
assert(sub + remaining <= reqSplits);
if (remaining > 0) {
QueryPartition qp = new QueryPartition(partition, 1, reqSplits, sub, remaining);
if (noneAssigned) {
assignments[i] = new SplitAssignment(qp);
noneAssigned = false;
} else {
assignments[i].add(qp);
}
nextSlot += remaining;
remaining = 0;
}
}
assert(nextSlot == reqSplits*4096);
return assignments;
}

// split assignment types:
// MAX_NUM_SPLITS At most requested number of splits (can be fewer), but same sized.
// MIN_NUM_SPLITS At least requested number of splits (can be more), but same sized
// EXACT_NUM_SPLITS_SINGLE_QUERY Exactly requested number of splits, can be different sized,
// each processed in one query operation.
// EXACT_NUM_SPLITS Exactly requested number of splits, same sized, each split may be
// processed in multiple API operations.
SplitAssignment[] assignSplits(int reqSplits, int splitType) {
if (splitType == MAX_NUM_SPLITS) {
return assignMaxSplits(reqSplits);
} else if (splitType == MIN_NUM_SPLITS) {
return assignMinSplits(reqSplits);
} else if (splitType == EXACT_NUM_SPLITS) {
return assignExactSplits(reqSplits);
}
return null;
}

void assignmentSummary(String type, int reqSplits, SplitAssignment[] sa) {
System.out.format("Split type %s, requested %d, assigned %d (ends and middle): \n", type, reqSplits, sa.length);
List<Integer> sample = new ArrayList<Integer>();
sample.add(0);
if (sa.length > 1) sample.add(1);
if (sa.length > 4) sample.add(sa.length/2);
if (sa.length > 3) sample.add(sa.length-2);
if (sa.length > 2) sample.add(sa.length-1);
for (int i: sample) {
System.out.format("\tSplit %s:\n", i);
for (QueryPartition qp: sa[i].queryPartitions) {
System.out.format("\t\tptn_start: %s, ptn_count: %s, sub_start: %s, sub_count: %s, sub_mod: %s\n",
qp.startPartitionId, qp.partitionCount, qp.startSubpartitionId, qp.subpartitionCount, qp.subpartitionModuloFactor);
}
}
}

Examples of Split Type Assignments

Below are examples of split assignments for different number of splits and types.

SplitAssignment[] sa = assignSplits(5000, MAX_NUM_SPLITS);
assignmentSummary("MAX_NUM_SPLITS", 5000, sa);
System.out.println();

sa = assignSplits(500, MIN_NUM_SPLITS);
assignmentSummary("MIN_NUM_SPLITS", 500, sa);
System.out.println();

sa = assignSplits(3, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 3, sa);
System.out.println();

sa = assignSplits(100, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 100, sa);
System.out.println();

sa = assignSplits(1000, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 1000, sa);
System.out.println();

sa = assignSplits(4096, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 4096, sa);
System.out.println();

sa = assignSplits(10000, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 10000, sa);

Output:

Split type MAX_NUM_SPLITS, requested 5000, assigned 4096 (ends and middle): 
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 1, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 2048:
ptn_start: 2048, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4094:
ptn_start: 4094, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4095:
ptn_start: 4095, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null

Split type MIN_NUM_SPLITS, requested 500, assigned 512 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 8, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 256:
ptn_start: 2048, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 510:
ptn_start: 4080, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 511:
ptn_start: 4088, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null

Split type EXACT_NUM_SPLITS, requested 3, assigned 3 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1365, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 1365, ptn_count: 1, sub_start: 0, sub_count: 1, sub_mod: 3
Split 1:
ptn_start: 1365, ptn_count: 1, sub_start: 1, sub_count: 2, sub_mod: 3
ptn_start: 1366, ptn_count: 1364, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2730, ptn_count: 1, sub_start: 0, sub_count: 2, sub_mod: 3
Split 2:
ptn_start: 2730, ptn_count: 1, sub_start: 2, sub_count: 1, sub_mod: 3
ptn_start: 2731, ptn_count: 1365, sub_start: null, sub_count: null, sub_mod: null

Split type EXACT_NUM_SPLITS, requested 100, assigned 100 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 40, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 1:
ptn_start: 40, ptn_count: 1, sub_start: 96, sub_count: 4, sub_mod: 100
ptn_start: 41, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 81, ptn_count: 1, sub_start: 0, sub_count: 92, sub_mod: 100
Split 50:
ptn_start: 2048, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2088, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 98:
ptn_start: 4014, ptn_count: 1, sub_start: 8, sub_count: 92, sub_mod: 100
ptn_start: 4015, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4055, ptn_count: 1, sub_start: 0, sub_count: 4, sub_mod: 100
Split 99:
ptn_start: 4055, ptn_count: 1, sub_start: 4, sub_count: 96, sub_mod: 100
ptn_start: 4056, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null

Split type EXACT_NUM_SPLITS, requested 1000, assigned 1000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 1:
ptn_start: 4, ptn_count: 1, sub_start: 96, sub_count: 904, sub_mod: 1000
ptn_start: 5, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 8, ptn_count: 1, sub_start: 0, sub_count: 192, sub_mod: 1000
Split 500:
ptn_start: 2048, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2052, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 998:
ptn_start: 4087, ptn_count: 1, sub_start: 808, sub_count: 192, sub_mod: 1000
ptn_start: 4088, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4091, ptn_count: 1, sub_start: 0, sub_count: 904, sub_mod: 1000
Split 999:
ptn_start: 4091, ptn_count: 1, sub_start: 904, sub_count: 96, sub_mod: 1000
ptn_start: 4092, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null

Split type EXACT_NUM_SPLITS, requested 4096, assigned 4096 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 1, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 2048:
ptn_start: 2048, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4094:
ptn_start: 4094, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4095:
ptn_start: 4095, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null

Split type EXACT_NUM_SPLITS, requested 10000, assigned 10000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 1:
ptn_start: 0, ptn_count: 1, sub_start: 4096, sub_count: 4096, sub_mod: 10000
Split 5000:
ptn_start: 2048, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 9998:
ptn_start: 4095, ptn_count: 1, sub_start: 1808, sub_count: 4096, sub_mod: 10000
Split 9999:
ptn_start: 4095, ptn_count: 1, sub_start: 5904, sub_count: 4096, sub_mod: 10000

Parallel Query Framework

The parallel processing test framework works as follows:

  • Create N splits. This can be varied over a wide range such as 1-50K, along with the type AtLeast, AtMost, and Exact.
  • Spawn W worker threads. This can be varied over a wide range such as 1-5K or more if the setup allows it. All threads start processing splits at the same time, and the main thread waits until all threads finish.
  • A worker executes a loop:
    • While threre is an unprocessed split available:
      • Get a split from the available splits.
      • Process the split:
        • While there are more records to be processed in the split, in sync and async mode:
          • Execute a query to retrieve the next chunk of max-records. The query can be a primary-index query or a secondary-index query, and with or without a fiter expression.
          • Process the chunk using the provided implementation of StreamProcessing abstract class. A simple CountAndSum implementation is used.
  • The total record count and aggregate sum is printed out at the end, which should be the same for a given query and filter expression

The code below implements the parallel processing framework that allows interesting parameters to be selected for each run.

Stream Processing: Aggregate Computation

The framework processes the records retrieved through a parallel query over splits with a map-reduce computation. A simple count-and-sum computation is used, but can be substituted with another computation by overriding the abstract class StreamProcessing, and providing the class instance in the global StreamProcessor (defined below).

// A simple SplitsProcessor abstract class - must provide implementation
abstract class StreamProcessing {
abstract void initialize(int numStreams);
abstract void processRecordSet(int splitId, RecordSet rs);
abstract void processRecord(int splitId, Record rec);
abstract void finish();
abstract void printResults();
}

//
class CountAndSum extends StreamProcessing {
AtomicInteger[] streamCounts;
AtomicLong[] streamSums;
CountAndSum() {};
void initialize(int numStreams) {
this.streamCounts = new AtomicInteger[numStreams+1]; //final aggregation uses the last slot
this.streamSums = new AtomicLong[numStreams+1]; //final aggregation uses the last slot
for (int i=0; i < numStreams+1; i++) {
this.streamCounts[i] = new AtomicInteger(0);
this.streamSums[i] = new AtomicLong(0);
}
}
void processRecordSet(int streamId, RecordSet rs) {
int recs = 0;
long sum = 0;
try {
while (rs.next()) {
recs++;
Record rec = rs.getRecord();
sum += (long)rec.bins.get("bin1");
}
}
finally {
rs.close();
}
this.streamCounts[streamId].addAndGet(recs);
this.streamSums[streamId].addAndGet(sum);
}
void processRecord(int streamId, Record rec) {
this.streamCounts[streamId].incrementAndGet();
this.streamSums[streamId].addAndGet((long)rec.bins.get("bin1"));
}
void finish() {
// aggregste results in the final slot
int resultsSlot = this.streamCounts.length-1;
for (int i=0; i < this.streamCounts.length-1; i++) {
this.streamCounts[resultsSlot].addAndGet(this.streamCounts[i].get());
this.streamSums[resultsSlot].addAndGet(this.streamSums[i].get());
}
}
void printResults() {
// print results from the final slot
int resultsSlot = this.streamCounts.length-1;
System.out.format("Record count is %d, sum is %d.\n",
this.streamCounts[resultsSlot].get(),this.streamSums[resultsSlot].get());
}
}

Work Scheduling: Next Available Split

The framework assigns the next split for a worker to process from the available splits. A simple next-available computation is used, but can be substituted with another scheduler by overriding the abstract class WorkScheduling, and providing the class instance in the global WorkScheduler (defined below).

// A simple WorkScheduling abstract class - must provide implementation
abstract class WorkScheduling {
abstract void initialize(int numWorkers, int numSplits);
abstract Integer scheduleSplit(int workerId);
}

//
class NextAvailable extends WorkScheduling {
int numSplits;
AtomicInteger nextAvailable = new AtomicInteger(0);
NextAvailable() {};
void initialize(int numWorkers, int numSplits) {
this.numSplits = numSplits;
nextAvailable.set(0);
}
Integer scheduleSplit(int workerId) {
if (this.nextAvailable.get() >= numSplits) {
return null;
}
int splitToSchedule = this.nextAvailable.getAndIncrement();
if (splitToSchedule >= numSplits) {
return null;
}
return splitToSchedule;
}
}

Global parameters

The following parameters are implemented as global variables for convenience. They can be set to appropriate desired values before a run.

// Global parameters - set to specific values later before a run
int ProcessingMode; // set to PROCESSING_MODE_SYNC or PROCESSING_MODE_ASYNC
int QueryType; // PRIMARY_INDEX_QUERY or SECONDARY_INDEX_QUERY
int QueryFilter; // QUERY_FILTER_NONE or QUERY_FILTER_INCLUDE
int ChunkSize; // max number of records retrieved in a chunk
Filter SecondaryIndexQueryPredicate; // sindex query predicate, used when QueryType = SECONDARY_INDEX_QUERY
Exp IncludeQueryFilterExp; // filter expression, used when QueryFilter = QUERY_FILTER_INCLUDE
StreamProcessing StreamProcessor; // computation over streams
WorkScheduling WorkScheduler; // schedule the next split to a worker
// define default for StreamProcessor; can be redefined/
StreamProcessor = new CountAndSum();
// define default for WorkScheduler; can be redefined/
WorkScheduler = new NextAvailable();
// Implementation of the parallel query framework

CountDownLatch startLineLatch; // forces all workers to start at the same time

Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames("bin1", "bin2");

class Worker extends Thread {
static AtomicInteger nextWorkerId = new AtomicInteger(); // worker id
static SplitAssignment[] assignments; // splits to be processed

static void initialize(int numWorkers, SplitAssignment[] splitAssignments) {
Worker.nextWorkerId.set(0);
Worker.assignments = splitAssignments;
WorkScheduler.initialize(numWorkers, splitAssignments.length);
}

@Override public void run() {
int myId = Worker.nextWorkerId.getAndIncrement();
try {
startLineLatch.countDown(); // count down
startLineLatch.await(); // all workers start when count reaches zero
}
catch (Exception e) {
System.out.format("%s", e);
}
//System.out.println(Thread.currentThread().getName()
// + ": " + myId);
int numSplits = Worker.assignments.length;
// loop to get the next available split until all splits are claimed
for (Integer split = WorkScheduler.scheduleSplit(myId); split != null;
split = WorkScheduler.scheduleSplit(myId)) {
SplitAssignment sa = Worker.assignments[split]; // split to process
for (QueryPartition qp: sa.queryPartitions) {
PartitionFilter pFilter;
QueryPolicy qPolicy = new QueryPolicy(client.queryPolicyDefault);
qPolicy.socketTimeout = 120000; // 2 min
qPolicy.maxRetries = 0; // do not retry

qPolicy.filterExp = null;
// define the query partition filter
pFilter = PartitionFilter.range(qp.startPartitionId, qp.partitionCount); // assign the partition range
if (qp.isFullPartitionAssignment) {
if (QueryFilter == QUERY_FILTER_INCLUDE) {
qPolicy.filterExp = Exp.build(IncludeQueryFilterExp);
}
} else { // if partial partitions, define the digest-modulo filter expression
int moduloFactor = qp.subpartitionModuloFactor;
int lowerSubpartition = qp.startSubpartitionId;
int upperSubpartition = lowerSubpartition + qp.subpartitionCount - 1;
Exp subpartitionExp = Exp.and(
Exp.ge(Exp.digestModulo(moduloFactor), Exp.val(lowerSubpartition)),
Exp.le(Exp.digestModulo(moduloFactor), Exp.val(upperSubpartition)));
Exp queryFilterExp = subpartitionExp;
if (QueryFilter == QUERY_FILTER_INCLUDE) { // AND with the query's filter expression
queryFilterExp = Exp.and(
subpartitionExp,
IncludeQueryFilterExp);
}
qPolicy.filterExp = Exp.build(queryFilterExp); // filter expression is specified in policy
}
stmt.setMaxRecords(ChunkSize); // chunked retrieval of query results
stmt.setFilter(null); // this query filter is null for a primary-index query
if (QueryType == SECONDARY_INDEX_QUERY) {
stmt.setFilter(SecondaryIndexQueryPredicate); // set secondary-index query predicate here
}
if (ProcessingMode == PROCESSING_MODE_SYNC) { // select the processing mode
this.processSync(myId, qPolicy, stmt, pFilter);
} else {
this.processAsync(myId, qPolicy, stmt, pFilter);
}
try {
Thread.sleep(1); // allow other threads to run
}
catch (Exception e) {
System.out.format("%s", e);
}
}
}
}

void processSync(int workerId, QueryPolicy qPolicy, Statement stmt, PartitionFilter pFilter) {
// retrieve chunks of records in a loop
while (! pFilter.isDone()) {
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
StreamProcessor.processRecordSet(workerId, rs);
}
}

void processAsync(int workerId, QueryPolicy qPolicy, Statement stmt, PartitionFilter pFilter) {
// Query all pages of records.
while (! pFilter.isDone()) {
// query monitor to await/notify completion of each chunk
Monitor queryMonitor = new Monitor();
// submit async operation with throttle by waiting for an available slot
EventLoop eventLoop = eventLoops.next();
int eventLoopIndex = eventLoop.getIndex();
if (throttles.waitForSlot(eventLoopIndex, 1)) {
try {
client.queryPartitions(eventLoop, new RecordSequenceListener() {
public void onRecord(Key key, Record record) throws AerospikeException {
StreamProcessor.processRecord(workerId, record);
}

public void onSuccess() {
throttles.addSlot(eventLoopIndex, 1);
queryMonitor.notifyComplete();
}

public void onFailure(AerospikeException e) {
throttles.addSlot(eventLoopIndex, 1);
System.out.format("Error: query failed with exception - %s", e);
queryMonitor.notifyComplete();
}
},
qPolicy, stmt, pFilter);
}
catch (Exception e) {
System.out.format("Error: exception in record sequence listener - %s\n", e.getMessage());
}
}
queryMonitor.waitTillComplete();
}
}
}

void workerSummary() {
System.out.print("Records by worker (~50 worker sample): ");
float step = (((CountAndSum)StreamProcessor).streamCounts.length-2)/50;
for (int i = 0; i < ((CountAndSum)StreamProcessor).streamCounts.length-1; i += 1+step) {
System.out.format("%d:%d ", i, ((CountAndSum)StreamProcessor).streamCounts[i].get());
}
System.out.println("");
}

void processSplits(SplitAssignment[] splitAssignments, int numWorkers) {
System.out.format("%d worker threads.\n", numWorkers);
StreamProcessor.initialize(numWorkers);
Worker.initialize(numWorkers, splitAssignments);
startLineLatch = new CountDownLatch(numWorkers);
List<Worker> workers = new ArrayList<>();
for (int i = 0; i < numWorkers; i++) {
Worker worker = new Worker();
worker.setName("Worker " + i);
workers.add(worker);
worker.start();
}
// workers don't start work until all workers are spawned
long startTime = System.currentTimeMillis();
try {
// wait for all workers to finish
for (Worker worker: workers) {
worker.join();
}
}
catch (Exception e) {
System.out.format("%s", e);
}
long endTime = System.currentTimeMillis();
StreamProcessor.finish();
StreamProcessor.printResults();
//System.out.format("Record count is %d, sum is %d.\n", recCount.get(), recSum.get());
System.out.println("That took " + (endTime - startTime)/1000 + " seconds.");
}

Baseline Runs

We will establish the baseline with these parameters:

  • Single worker
  • Chunk size 100 and unlimited (100000)
  • Secondary query and equivalent filter
  • Sync and async modes
// Equivalent sseconday-index and filter predicates
IncludeQueryFilterExp = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(25001)), Exp.le(Exp.intBin("bin1"), Exp.val(75000)));
SecondaryIndexQueryPredicate = Filter.range("bin1", 25001, 75000);

int numSplits = 1;
int numWorkers = 1;
ProcessingMode = PROCESSING_MODE_SYNC;
ChunkSize = 50;
QueryFilter = QUERY_FILTER_NONE;
QueryType = PRIMARY_INDEX_QUERY;

System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan w/o filter", "SYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

ChunkSize = 100000;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan w/o filter", "SYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

ChunkSize = 50;
QueryFilter = QUERY_FILTER_INCLUDE;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan with filter", "SYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

QueryFilter = QUERY_FILTER_NONE;
QueryType = SECONDARY_INDEX_QUERY;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "sindex query", "SYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

ProcessingMode = PROCESSING_MODE_ASYNC;
ChunkSize = 50;
QueryFilter = QUERY_FILTER_NONE;
QueryType = PRIMARY_INDEX_QUERY;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan w/o filter", "ASYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

ChunkSize = 100000;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan w/o filter", "ASYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

ChunkSize = 50;
QueryFilter = QUERY_FILTER_INCLUDE;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "scan with filter", "ASYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

QueryFilter = QUERY_FILTER_NONE;
QueryType = SECONDARY_INDEX_QUERY;
System.out.format("Splits: %d, type: %s, chunk size: %s, query type: %s, mode: %s\n",
numSplits, "EXACT_NUM_SPLITS", ChunkSize, "sindex query", "ASYNC");
SplitAssignment[] splitAssignments = assignSplits(numSplits, EXACT_NUM_SPLITS);
processSplits(splitAssignments, numWorkers);
System.out.println();

Output:

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: scan w/o filter, mode: SYNC
1 worker threads.
Record count is 100000, sum is 5000050000.
That took 29 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 100000, query type: scan w/o filter, mode: SYNC
1 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: scan with filter, mode: SYNC
1 worker threads.
Record count is 50000, sum is 2500025000.
That took 21 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: sindex query, mode: SYNC
1 worker threads.
Record count is 50000, sum is 2500025000.
That took 9 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: scan w/o filter, mode: ASYNC
1 worker threads.
Record count is 100000, sum is 5000050000.
That took 25 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 100000, query type: scan w/o filter, mode: ASYNC
1 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: scan with filter, mode: ASYNC
1 worker threads.
Record count is 50000, sum is 2500025000.
That took 21 seconds.

Splits: 1, type: EXACT_NUM_SPLITS, chunk size: 50, query type: sindex query, mode: ASYNC
1 worker threads.
Record count is 50000, sum is 2500025000.
That took 9 seconds.

Running Parallel Queries with Interesting Parameter Variations

The key things to note are:

  • Split assignments - a summary can be printed out optionally as seen below.
  • Number of records and aggregated sum should be the same for the same query/filter setting, confirming all records are processed correctly.
  • Performance indicated by the time it took to execute a query
  • Distribution of records processed by worker - a summary can be printed out optionally as seen below.
// set the base global parameter values - can be changed here or before a run in the cells below
ProcessingMode = PROCESSING_MODE_SYNC;
QueryType = PRIMARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_NONE;
ChunkSize = 30;

// the following filter expression chooses records with even values of bin1 - CAN CHANGE TO A DIFFERENT EXPRESSION
// used when QueryFilter = QUERY_FILTER_INCLUDE
IncludeQueryFilterExp = Exp.eq(Exp.mod(Exp.intBin("bin1"), Exp.val(2)), Exp.val(0));

// the following query predicate is a range filter for records with 50001 <= bin1 <= 100000 - CAN CHANGE TO A DIFFERENT QUERY PREDICATE
// used when QueryType = SECONDARY_INDEX_QUERY
SecondaryIndexQueryPredicate = Filter.range("bin1", 25001, 75000);

Varying number of splits and split types

Vary the number of splits from 1-50000 acoss all split types.

int NumWorkers = 100;

System.out.format("Requested splits: %d, type: %s\n", 2, "MIN_NUM_SPLITS");
SplitAssignment[] splitAssignments = assignSplits(2, MIN_NUM_SPLITS);
assignmentSummary("MIN_NUM_SPLITS", 2, splitAssignments);
processSplits(splitAssignments, NumWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 10, "MIN_NUM_SPLITS");
splitAssignments = assignSplits(10, MIN_NUM_SPLITS);
assignmentSummary("MIN_NUM_SPLITS", 10, splitAssignments);
processSplits(splitAssignments, NumWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 100, "EXACT_NUM_SPLITS");
splitAssignments = assignSplits(100, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 100, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 1000, "MAX_NUM_SPLITS");
splitAssignments = assignSplits(1000, MAX_NUM_SPLITS);
assignmentSummary("MAX_NUM_SPLITS", 1000, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 5000, "MAX_NUM_SPLITS");
splitAssignments = assignSplits(5000, MAX_NUM_SPLITS);
assignmentSummary("MAX_NUM_SPLITS", 5000, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 10000, "EXACT_NUM_SPLITS");
splitAssignments = assignSplits(10000, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", 1000, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 50000, "MAX_NUM_SPLITS");
splitAssignments = assignSplits(50000, MAX_NUM_SPLITS);
assignmentSummary("MAX_NUM_SPLITS", 50000, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

Output:

Requested splits: 2, type: MIN_NUM_SPLITS
Split type MIN_NUM_SPLITS, requested 2, assigned 2 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 2048, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 2048, ptn_count: 2048, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 22 seconds.

Requested splits: 10, type: MIN_NUM_SPLITS
Split type MIN_NUM_SPLITS, requested 10, assigned 16 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 256, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 256, ptn_count: 256, sub_start: null, sub_count: null, sub_mod: null
Split 8:
ptn_start: 2048, ptn_count: 256, sub_start: null, sub_count: null, sub_mod: null
Split 14:
ptn_start: 3584, ptn_count: 256, sub_start: null, sub_count: null, sub_mod: null
Split 15:
ptn_start: 3840, ptn_count: 256, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 3 seconds.

Requested splits: 100, type: EXACT_NUM_SPLITS
Split type EXACT_NUM_SPLITS, requested 100, assigned 100 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 40, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 1:
ptn_start: 40, ptn_count: 1, sub_start: 96, sub_count: 4, sub_mod: 100
ptn_start: 41, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 81, ptn_count: 1, sub_start: 0, sub_count: 92, sub_mod: 100
Split 50:
ptn_start: 2048, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2088, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 98:
ptn_start: 4014, ptn_count: 1, sub_start: 8, sub_count: 92, sub_mod: 100
ptn_start: 4015, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4055, ptn_count: 1, sub_start: 0, sub_count: 4, sub_mod: 100
Split 99:
ptn_start: 4055, ptn_count: 1, sub_start: 4, sub_count: 96, sub_mod: 100
ptn_start: 4056, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 2 seconds.
Records by worker (~50 worker sample): 0:1008 2:1016 4:1057 6:1040 8:995 10:986 12:1019 14:959 16:1003 18:962 20:1020 22:989 24:1016 26:1016 28:1018 30:1053 32:953 34:1002 36:969 38:1025 40:1036 42:986 44:920 46:970 48:898 50:1089 52:995 54:1003 56:1021 58:1033 60:1026 62:1024 64:1030 66:1040 68:1033 70:1051 72:1012 74:943 76:1009 78:1058 80:992 82:1042 84:968 86:1061 88:989 90:976 92:948 94:985 96:1007 98:1008

Requested splits: 1000, type: MAX_NUM_SPLITS
Split type MAX_NUM_SPLITS, requested 1000, assigned 512 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 8, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 256:
ptn_start: 2048, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 510:
ptn_start: 4080, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
Split 511:
ptn_start: 4088, ptn_count: 8, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 1 seconds.
Records by worker (~50 worker sample): 0:1020 2:1001 4:988 6:949 8:933 10:995 12:1123 14:1058 16:966 18:996 20:954 22:978 24:1110 26:1001 28:998 30:959 32:941 34:809 36:993 38:998 40:968 42:946 44:1066 46:1176 48:961 50:977 52:952 54:996 56:978 58:1146 60:1140 62:939 64:930 66:1013 68:999 70:978 72:1050 74:965 76:946 78:1152 80:1037 82:980 84:978 86:959 88:943 90:974 92:1013 94:944 96:988 98:990

Requested splits: 5000, type: MAX_NUM_SPLITS
Split type MAX_NUM_SPLITS, requested 5000, assigned 4096 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 1:
ptn_start: 1, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 2048:
ptn_start: 2048, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4094:
ptn_start: 4094, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
Split 4095:
ptn_start: 4095, ptn_count: 1, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 2 seconds.
Records by worker (~50 worker sample): 0:954 2:952 4:1057 6:1029 8:1053 10:1084 12:1041 14:999 16:988 18:1022 20:1051 22:965 24:1014 26:1020 28:1034 30:965 32:1010 34:1046 36:1041 38:979 40:1110 42:976 44:975 46:1028 48:987 50:1017 52:1069 54:1086 56:1031 58:1054 60:938 62:1121 64:901 66:935 68:1068 70:998 72:997 74:970 76:987 78:1022 80:907 82:938 84:1004 86:921 88:930 90:1015 92:1047 94:1004 96:891 98:848

Requested splits: 10000, type: EXACT_NUM_SPLITS
Split type EXACT_NUM_SPLITS, requested 1000, assigned 10000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 1:
ptn_start: 0, ptn_count: 1, sub_start: 4096, sub_count: 4096, sub_mod: 10000
Split 5000:
ptn_start: 2048, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 9998:
ptn_start: 4095, ptn_count: 1, sub_start: 1808, sub_count: 4096, sub_mod: 10000
Split 9999:
ptn_start: 4095, ptn_count: 1, sub_start: 5904, sub_count: 4096, sub_mod: 10000
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 5 seconds.
Records by worker (~50 worker sample): 0:1051 2:969 4:998 6:1027 8:1077 10:1037 12:994 14:1039 16:1043 18:1011 20:993 22:997 24:916 26:958 28:980 30:971 32:935 34:1018 36:908 38:958 40:1022 42:1075 44:1018 46:940 48:1140 50:919 52:1048 54:1030 56:931 58:928 60:1013 62:978 64:992 66:951 68:901 70:997 72:964 74:1046 76:1112 78:951 80:1024 82:1139 84:1007 86:1029 88:936 90:991 92:1077 94:927 96:994 98:1057

Requested splits: 50000, type: MAX_NUM_SPLITS
Split type MAX_NUM_SPLITS, requested 50000, assigned 49152 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: 0, sub_count: 1, sub_mod: 12
Split 1:
ptn_start: 0, ptn_count: 1, sub_start: 1, sub_count: 1, sub_mod: 12
Split 24576:
ptn_start: 2048, ptn_count: 1, sub_start: 0, sub_count: 1, sub_mod: 12
Split 49150:
ptn_start: 4095, ptn_count: 1, sub_start: 10, sub_count: 1, sub_mod: 12
Split 49151:
ptn_start: 4095, ptn_count: 1, sub_start: 11, sub_count: 1, sub_mod: 12
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 20 seconds.
Records by worker (~50 worker sample): 0:1023 2:979 4:988 6:1008 8:1062 10:995 12:971 14:1017 16:963 18:1030 20:1040 22:1002 24:985 26:962 28:1064 30:1014 32:1091 34:1008 36:979 38:948 40:976 42:1033 44:951 46:1044 48:1002 50:999 52:998 54:989 56:982 58:1005 60:1077 62:946 64:1031 66:965 68:941 70:1025 72:1005 74:975 76:1017 78:1016 80:1000 82:995 84:959 86:1017 88:996 90:1011 92:972 94:1002 96:1054 98:943

Varying number of workers in sync mode

Vary the number of workers from 10 to 5000.

int reqSplits = 10000;

SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 10);
workerSummary();
System.out.println();

splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 100);
workerSummary();
System.out.println();

splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 1000);
workerSummary();
System.out.println();

splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 5000);
workerSummary();
System.out.println();

Output:

Split type EXACT_NUM_SPLITS, requested 10000, assigned 10000 (ends and middle): 
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 1:
ptn_start: 0, ptn_count: 1, sub_start: 4096, sub_count: 4096, sub_mod: 10000
Split 5000:
ptn_start: 2048, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 9998:
ptn_start: 4095, ptn_count: 1, sub_start: 1808, sub_count: 4096, sub_mod: 10000
Split 9999:
ptn_start: 4095, ptn_count: 1, sub_start: 5904, sub_count: 4096, sub_mod: 10000
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 6 seconds.
Records by worker (~50 worker sample): 0:10037 1:9718 2:10096 3:10193 4:10232 5:10013 6:9923 7:9674 8:9991 9:10123

100 worker threads.
Record count is 100000, sum is 5000050000.
That took 6 seconds.
Records by worker (~50 worker sample): 0:882 2:973 4:989 6:1090 8:950 10:1029 12:883 14:1001 16:1091 18:1023 20:999 22:937 24:969 26:966 28:925 30:1065 32:961 34:981 36:919 38:1052 40:1090 42:960 44:1001 46:927 48:950 50:1063 52:1115 54:905 56:1003 58:978 60:927 62:1108 64:1001 66:1153 68:1114 70:1035 72:1090 74:983 76:994 78:1042 80:954 82:1010 84:1050 86:940 88:923 90:1025 92:1061 94:993 96:1034 98:1013

1000 worker threads.
Record count is 100000, sum is 5000050000.
That took 6 seconds.
Records by worker (~50 worker sample): 0:148 20:98 40:113 60:98 80:147 100:105 120:99 140:92 160:113 180:126 200:82 220:88 240:88 260:131 280:70 300:96 320:131 340:132 360:122 380:112 400:99 420:107 440:104 460:98 480:122 500:91 520:91 540:86 560:105 580:85 600:91 620:87 640:72 660:57 680:74 700:83 720:105 740:96 760:112 780:104 800:67 820:85 840:65 860:103 880:93 900:138 920:112 940:108 960:149 980:72

5000 worker threads.
Record count is 100000, sum is 5000050000.
That took 10 seconds.
Records by worker (~50 worker sample): 0:23 100:64 200:26 300:42 400:18 500:48 600:43 700:26 800:56 900:66 1000:44 1100:47 1200:40 1300:24 1400:23 1500:16 1600:24 1700:9 1800:15 1900:72 2000:7 2100:19 2200:33 2300:37 2400:9 2500:9 2600:11 2700:14 2800:4 2900:12 3000:10 3100:14 3200:6 3300:12 3400:5 3500:21 3600:11 3700:11 3800:9 3900:18 4000:8 4100:19 4200:15 4300:12 4400:14 4500:6 4600:5 4700:41 4800:24 4900:0

Varying number of workers in sync and async mode

Vary the number of workers from 10 to 5000 in sync and async mode.

int reqSplits = 10000;

ProcessingMode = PROCESSING_MODE_SYNC;
System.out.format("Processing mode: SYNC\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 10);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_ASYNC;
System.out.format("Processing mode: ASYNC\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 10);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_SYNC;
System.out.format("Processing mode: SYNC\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 100);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_ASYNC;
System.out.format("Processing mode: ASYNC\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 100);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_SYNC;
System.out.format("Processing mode: SYNC\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 1000);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_ASYNC;
System.out.format("Processing mode: ASYNC\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 1000);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_SYNC;
System.out.format("Processing mode: SYNC\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 5000);
workerSummary();
System.out.println();

ProcessingMode = PROCESSING_MODE_ASYNC;
System.out.format("Processing mode: ASYNC\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, 5000);
workerSummary();
System.out.println();

Output:

Processing mode: SYNC
Split type EXACT_NUM_SPLITS, requested 10000, assigned 10000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 1:
ptn_start: 0, ptn_count: 1, sub_start: 4096, sub_count: 4096, sub_mod: 10000
Split 5000:
ptn_start: 2048, ptn_count: 1, sub_start: 0, sub_count: 4096, sub_mod: 10000
Split 9998:
ptn_start: 4095, ptn_count: 1, sub_start: 1808, sub_count: 4096, sub_mod: 10000
Split 9999:
ptn_start: 4095, ptn_count: 1, sub_start: 5904, sub_count: 4096, sub_mod: 10000
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 5 seconds.
Records by worker (~50 worker sample): 0:10031 1:10153 2:9838 3:9816 4:9852 5:10097 6:9922 7:10032 8:10022 9:10237

Processing mode: ASYNC
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 4 seconds.
Records by worker (~50 worker sample): 0:10112 1:10082 2:9734 3:10029 4:9757 5:10370 6:10125 7:9925 8:9878 9:9988

Processing mode: SYNC
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 5 seconds.
Records by worker (~50 worker sample): 0:1537 2:1355 4:1420 6:1384 8:1472 10:1509 12:1048 14:792 16:856 18:757 20:821 22:812 24:798 26:825 28:892 30:816 32:827 34:870 36:828 38:913 40:841 42:826 44:884 46:917 48:860 50:1018 52:787 54:904 56:955 58:917 60:980 62:991 64:852 66:1017 68:1030 70:949 72:999 74:972 76:1106 78:1532 80:1009 82:1220 84:1079 86:1175 88:982 90:1073 92:1088 94:1124 96:1123 98:1355

Processing mode: ASYNC
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 3 seconds.
Records by worker (~50 worker sample): 0:924 2:1079 4:936 6:1025 8:869 10:1166 12:1115 14:895 16:948 18:1129 20:962 22:1053 24:922 26:964 28:993 30:850 32:1018 34:1016 36:1122 38:1095 40:1015 42:917 44:1027 46:1024 48:902 50:953 52:1006 54:1130 56:986 58:1012 60:1093 62:909 64:984 66:1023 68:1045 70:1066 72:1043 74:1058 76:937 78:935 80:1008 82:967 84:1042 86:897 88:960 90:1089 92:1034 94:1014 96:971 98:1018

Processing mode: SYNC
1000 worker threads.
Record count is 100000, sum is 5000050000.
That took 5 seconds.
Records by worker (~50 worker sample): 0:124 20:74 40:80 60:241 80:297 100:150 120:101 140:219 160:137 180:104 200:106 220:54 240:163 260:56 280:92 300:151 320:145 340:141 360:271 380:220 400:178 420:206 440:47 460:56 480:69 500:74 520:70 540:73 560:78 580:71 600:39 620:42 640:58 660:62 680:62 700:47 720:243 740:25 760:59 780:44 800:52 820:42 840:46 860:56 880:55 900:64 920:77 940:111 960:60 980:87

Processing mode: ASYNC
1000 worker threads.
Record count is 100000, sum is 5000050000.
That took 3 seconds.
Records by worker (~50 worker sample): 0:113 20:127 40:116 60:133 80:98 100:92 120:103 140:109 160:114 180:118 200:89 220:138 240:128 260:54 280:86 300:103 320:77 340:148 360:132 380:93 400:94 420:115 440:73 460:85 480:110 500:51 520:131 540:125 560:81 580:105 600:125 620:115 640:103 660:90 680:102 700:135 720:42 740:106 760:53 780:99 800:134 820:92 840:107 860:150 880:89 900:134 920:168 940:82 960:76 980:88

Processing mode: SYNC
5000 worker threads.
Record count is 100000, sum is 5000050000.
That took 11 seconds.
Records by worker (~50 worker sample): 0:62 100:19 200:14 300:31 400:18 500:36 600:49 700:56 800:41 900:12 1000:27 1100:33 1200:33 1300:13 1400:8 1500:12 1600:21 1700:11 1800:14 1900:28 2000:27 2100:27 2200:14 2300:32 2400:7 2500:14 2600:5 2700:8 2800:6 2900:10 3000:8 3100:15 3200:58 3300:11 3400:10 3500:10 3600:10 3700:8 3800:8 3900:19 4000:14 4100:5 4200:28 4300:9 4400:10 4500:26 4600:13 4700:37 4800:26 4900:15

Processing mode: ASYNC
5000 worker threads.
Record count is 100000, sum is 5000050000.
That took 6 seconds.
Records by worker (~50 worker sample): 0:58 100:10 200:15 300:34 400:32 500:12 600:41 700:21 800:36 900:15 1000:25 1100:13 1200:6 1300:11 1400:19 1500:12 1600:8 1700:29 1800:32 1900:18 2000:16 2100:21 2200:13 2300:21 2400:13 2500:21 2600:18 2700:20 2800:23 2900:22 3000:10 3100:11 3200:11 3300:23 3400:26 3500:8 3600:16 3700:17 3800:18 3900:8 4000:7 4100:38 4200:23 4300:7 4400:34 4500:7 4600:8 4700:18 4800:10 4900:21

Varying query type and filter

Primary-index and sexondary-index queries with and without a filter expression. Note the default secondary-index query predicate will process half the records and yield half the sum value. The default filter expression, in turn, will also process half the records and yield half the sum of the filterless query.

// the following filter expression chooses records with even values of bin1 - CAN CHANGE TO A DIFFERENT EXPRESSION
// used when QueryFilter = QUERY_FILTER_INCLUDE
IncludeQueryFilterExp = Exp.eq(Exp.mod(Exp.intBin("bin1"), Exp.val(2)), Exp.val(0));

// the following query predicate is a range filter for records with 50001 <= bin1 <= 100000 - CAN CHANGE TO A DIFFERENT QUERY PREDICATE
// used when QueryType = SECONDARY_INDEX_QUERY
SecondaryIndexQueryPredicate = Filter.range("bin1", 25001, 75000);

int reqSplits = 1000;
//int NumWorkers = 100;
int NumWorkers = 10;
ProcessingMode = PROCESSING_MODE_SYNC;

QueryType = PRIMARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_NONE;
System.out.format("Primary-index query with no filter\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, NumWorkers);
workerSummary();
System.out.println();

QueryType = PRIMARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_INCLUDE;
System.out.format("Primary-index query with filter expression\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, NumWorkers);
//workerSummary();
System.out.println();

QueryType = SECONDARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_NONE;
System.out.format("Secondary-index query with no filter\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, NumWorkers);
//workerSummary();
System.out.println();

QueryType = SECONDARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_INCLUDE;
System.out.format("Secondary-index query with filter expression\n");
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, NumWorkers);
//workerSummary();
System.out.println();

Output:

Primary-index query with no filter
Split type EXACT_NUM_SPLITS, requested 1000, assigned 1000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 1:
ptn_start: 4, ptn_count: 1, sub_start: 96, sub_count: 904, sub_mod: 1000
ptn_start: 5, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 8, ptn_count: 1, sub_start: 0, sub_count: 192, sub_mod: 1000
Split 500:
ptn_start: 2048, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2052, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 998:
ptn_start: 4087, ptn_count: 1, sub_start: 808, sub_count: 192, sub_mod: 1000
ptn_start: 4088, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4091, ptn_count: 1, sub_start: 0, sub_count: 904, sub_mod: 1000
Split 999:
ptn_start: 4091, ptn_count: 1, sub_start: 904, sub_count: 96, sub_mod: 1000
ptn_start: 4092, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 2 seconds.
Records by worker (~50 worker sample): 0:9761 1:9662 2:10238 3:10074 4:9921 5:10233 6:9942 7:9999 8:9989 9:10181

Primary-index query with filter expression
10 worker threads.
Record count is 50000, sum is 2500050000.
That took 2 seconds.

Secondary-index query with no filter
10 worker threads.
Record count is 50000, sum is 2500025000.
That took 1 seconds.

Secondary-index query with filter expression
10 worker threads.
Record count is 25000, sum is 1250025000.
That took 1 seconds.

Varying max-records chunk size

Vary the chunk size from 10 to 500.

int reqSplits = 100;
int numWorkers = 50;
ProcessingMode = PROCESSING_MODE_ASYNC;
QueryType = PRIMARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_NONE;

ChunkSize = 10;
System.out.format("Max-records chunk size: %d\n", ChunkSize);
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
workerSummary();
System.out.println();

ChunkSize = 25;
System.out.format("Max-records chunk size: %d\n", ChunkSize);
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
//workerSummary();
System.out.println();

ChunkSize = 100;
System.out.format("Max-records chunk size: %d\n", ChunkSize);
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
//workerSummary();
System.out.println();

ChunkSize = 300;
System.out.format("Max-records chunk size: %d\n", ChunkSize);
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
//workerSummary();
System.out.println();

ChunkSize = 500;
System.out.format("Max-records chunk size: %d\n", ChunkSize);
splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
//workerSummary();
System.out.println();

Output:

Max-records chunk size: 10
Split type EXACT_NUM_SPLITS, requested 100, assigned 100 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 40, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 1:
ptn_start: 40, ptn_count: 1, sub_start: 96, sub_count: 4, sub_mod: 100
ptn_start: 41, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 81, ptn_count: 1, sub_start: 0, sub_count: 92, sub_mod: 100
Split 50:
ptn_start: 2048, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2088, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 100
Split 98:
ptn_start: 4014, ptn_count: 1, sub_start: 8, sub_count: 92, sub_mod: 100
ptn_start: 4015, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4055, ptn_count: 1, sub_start: 0, sub_count: 4, sub_mod: 100
Split 99:
ptn_start: 4055, ptn_count: 1, sub_start: 4, sub_count: 96, sub_mod: 100
ptn_start: 4056, ptn_count: 40, sub_start: null, sub_count: null, sub_mod: null
50 worker threads.
Record count is 100000, sum is 5000050000.
That took 4 seconds.
Records by worker (~50 worker sample): 0:1985 1:1983 2:2077 3:2030 4:2001 5:1960 6:2048 7:1983 8:1946 9:1989 10:2064 11:1980 12:2019 13:1934 14:1960 15:1962 16:1978 17:1914 18:2059 19:1965 20:2080 21:2042 22:2013 23:1976 24:2044 25:2001 26:2049 27:2038 28:1994 29:2013 30:2068 31:2017 32:1965 33:2048 34:2023 35:1930 36:2009 37:2036 38:1979 39:1904 40:1913 41:1981 42:2042 43:2087 44:2002 45:1973 46:2102 47:1936 48:1898 49:2000

Max-records chunk size: 25
50 worker threads.
Record count is 100000, sum is 5000050000.
That took 2 seconds.

Max-records chunk size: 100
50 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Max-records chunk size: 300
50 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Max-records chunk size: 500
50 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Partition Slices: A New Split Assignment Scheme

To illustrate how to define an new split assignment scheme, let us define a simple variation of exact number of splits, in which each partition is divided into as many subpartitions as the desired number of splits. Each split is assigned one slice (subpartition) from each of the 4096 partitions.

The following cell defines the function for such a split assignment.

SplitAssignment[] assignExactSplitsSliced(int reqSplits) {
SplitAssignment[] assignments = new SplitAssignment[reqSplits];
for (int i=0; i < reqSplits; i++) {
QueryPartition qp = new QueryPartition(0, 4096, reqSplits, i, 1); // one slice from all 4096 partitions
assignments[i] = new SplitAssignment(qp);
}
return assignments;
}

Let us now compare the new scheme with the EXACT_NUM_SPLITS scheme defined earlier. Below, we run the two schemes with diffrent number of splits and chunk sizes.

You will notice that with a smaller chunk size and finer slices, the performance of partition-slices deteriorates. Even with non chunked retrieval, its performance is worse than the vertical assignment.

You can also customize the test framework to suit your needs.

int numWorkers = 10;

System.out.println("Chunk size: 200");
ChunkSize = 1000;

System.out.format("Requested splits: %d, type: %s\n", 10, "EXACT_NUM_SPLITS");
SplitAssignment[] splitAssignments = assignSplits(10, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 10, "PARTITION_SLICES");
SplitAssignment[] splitAssignments = assignExactSplitsSliced(10);
assignmentSummary("PARTITION_SLICES", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 100, "EXACT_NUM_SPLITS");
SplitAssignment[] splitAssignments = assignSplits(100, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 100, "PARTITION_SLICES");
SplitAssignment[] splitAssignments = assignExactSplitsSliced(100);
assignmentSummary("PARTITION_SLICES", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

// with unlimited chunk size
System.out.println("Chunk size: 1000000");
ChunkSize = 1000000;

System.out.format("Requested splits: %d, type: %s\n", 10, "EXACT_NUM_SPLITS");
SplitAssignment[] splitAssignments = assignSplits(10, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 10, "PARTITION_SLICES");
SplitAssignment[] splitAssignments = assignExactSplitsSliced(10);
//assignmentSummary("PARTITION_SLICES", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 100, "EXACT_NUM_SPLITS");
SplitAssignment[] splitAssignments = assignSplits(100, EXACT_NUM_SPLITS);
//assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

System.out.format("Requested splits: %d, type: %s\n", 100, "PARTITION_SLICES");
SplitAssignment[] splitAssignments = assignExactSplitsSliced(100);
//assignmentSummary("PARTITION_SLICES", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
System.out.println();

// reset
ChunkSize = 20;

Output:

Chunk size: 200
Requested splits: 10, type: EXACT_NUM_SPLITS
Split type EXACT_NUM_SPLITS, requested 100, assigned 10 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 409, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 409, ptn_count: 1, sub_start: 0, sub_count: 6, sub_mod: 10
Split 1:
ptn_start: 409, ptn_count: 1, sub_start: 6, sub_count: 4, sub_mod: 10
ptn_start: 410, ptn_count: 409, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 819, ptn_count: 1, sub_start: 0, sub_count: 2, sub_mod: 10
Split 5:
ptn_start: 2048, ptn_count: 409, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2457, ptn_count: 1, sub_start: 0, sub_count: 6, sub_mod: 10
Split 8:
ptn_start: 3276, ptn_count: 1, sub_start: 8, sub_count: 2, sub_mod: 10
ptn_start: 3277, ptn_count: 409, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 3686, ptn_count: 1, sub_start: 0, sub_count: 4, sub_mod: 10
Split 9:
ptn_start: 3686, ptn_count: 1, sub_start: 4, sub_count: 6, sub_mod: 10
ptn_start: 3687, ptn_count: 409, sub_start: null, sub_count: null, sub_mod: null
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Requested splits: 10, type: PARTITION_SLICES
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 13 seconds.

Requested splits: 100, type: EXACT_NUM_SPLITS
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Requested splits: 100, type: PARTITION_SLICES
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 51 seconds.

Chunk size: 1000000
Requested splits: 10, type: EXACT_NUM_SPLITS
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Requested splits: 10, type: PARTITION_SLICES
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 3 seconds.

Requested splits: 100, type: EXACT_NUM_SPLITS
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 0 seconds.

Requested splits: 100, type: PARTITION_SLICES
10 worker threads.
Record count is 100000, sum is 5000050000.
That took 35 seconds.

Select and run with your own parameters

Experiment with the values in the following cell and see what results you get.

ProcessingMode = PROCESSING_MODE_SYNC;
QueryType = PRIMARY_INDEX_QUERY;
QueryFilter = QUERY_FILTER_NONE;
ChunkSize = 20;

// the following filter expression chooses records with even values of bin1 - CAN CHANGE TO A DIFFERENT EXPRESSION
// used when QueryFilter = QUERY_FILTER_INCLUDE
IncludeQueryFilterExp = Exp.eq(Exp.mod(Exp.intBin("bin1"), Exp.val(2)), Exp.val(0));

// the following query predicate is a range filter for records with 50001 <= bin1 <= 100000 - CAN CHANGE TO A DIFFERENT QUERY PREDICATE
// used when QueryType = SECONDARY_INDEX_QUERY
SecondaryIndexQueryPredicate = Filter.range("bin1", 50001, 100000);

int reqSplits = 1000;
int SplitType = EXACT_NUM_SPLITS;
int numWorkers = 100;

System.out.format("Running with parameter values (specify)\n");
SplitAssignment[] splitAssignments = assignSplits(reqSplits, EXACT_NUM_SPLITS);
assignmentSummary("EXACT_NUM_SPLITS", reqSplits, splitAssignments);
processSplits(splitAssignments, numWorkers);
workerSummary();


Output:

Running with parameter values (specify)
Split type EXACT_NUM_SPLITS, requested 1000, assigned 1000 (ends and middle):
Split 0:
ptn_start: 0, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 1:
ptn_start: 4, ptn_count: 1, sub_start: 96, sub_count: 904, sub_mod: 1000
ptn_start: 5, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 8, ptn_count: 1, sub_start: 0, sub_count: 192, sub_mod: 1000
Split 500:
ptn_start: 2048, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 2052, ptn_count: 1, sub_start: 0, sub_count: 96, sub_mod: 1000
Split 998:
ptn_start: 4087, ptn_count: 1, sub_start: 808, sub_count: 192, sub_mod: 1000
ptn_start: 4088, ptn_count: 3, sub_start: null, sub_count: null, sub_mod: null
ptn_start: 4091, ptn_count: 1, sub_start: 0, sub_count: 904, sub_mod: 1000
Split 999:
ptn_start: 4091, ptn_count: 1, sub_start: 904, sub_count: 96, sub_mod: 1000
ptn_start: 4092, ptn_count: 4, sub_start: null, sub_count: null, sub_mod: null
100 worker threads.
Record count is 100000, sum is 5000050000.
That took 2 seconds.
Records by worker (~50 worker sample): 0:3319 2:789 4:786 6:699 8:695 10:666 12:801 14:781 16:1194 18:899 20:828 22:765 24:950 26:958 28:1009 30:865 32:530 34:1243 36:1085 38:1097 40:1188 42:1353 44:1620 46:2031 48:2511 50:2534 52:2942 54:2865 56:621 58:571 60:673 62:606 64:594 66:705 68:663 70:588 72:619 74:668 76:598 78:1145 80:617 82:706 84:685 86:603 88:697 90:701 92:663 94:697 96:742 98:497

Takeaways and Conclusion

The ability to retrieve a large data set in parallel streams for processing in a large number of workers is essential to high throughput applications. Aerospike provides mechanisms to divide a large data set over an arbitrary number of splits and process each split in smaller chunks at a time. The notebook implements mulriple schemes for split assignments, and provides a framework for parallel processing a query over splits. It also demonstrates parallel processing using varying values of splits, workers, query and filter, chunk size, and processing mode.

Further Exploration and Resources

Here are some links for further exploration

Resources