Skip to main content

How to develop custom code to stream Aerospike Change Notifications to an HTTP-based system

· 5 min read
Kiran Matty

source: Markus Spiske on unsplash.com

Aerospike database along with the Aerospike Connect family of connectors enable you to build low latency and high throughput streaming pipelines. Last year, we had released the Message Transformer for Kafka and Pulsar inbound connectors, which allows you an easy way to develop custom code for event processing prior to writing them to Aerospike. We have now extended that capability to support our outbound connectors to operate on Aerospike Change Notification records that are received by a target system over XDR. Unlike Change Data Capture (CDC) that reports every change made to the data in the database, Change Notification (CN) reports the latest version of a change in a pre-configured interval (0–5000 ms). In this way, multiple writes to a single record in an Aerospike cluster generate only one change event, which is an essential feature for minimizing overhead when dealing with “hot” data.

With the outbound message transformer, you can route the incoming record to a target system, transform it i.e. add bins, modify bin values, delete bins, modify key, modify record metadata, and/or format into a custom format for that destination. The transformer is a custom piece of code developed using the Java SDK and bundled as a JAR file. It runs in the JVM instance of the underlying connector for Kafka, Pulsar, JMS, or Event Stream Processing (ESP) and is made available to the connector via the classpath along with associated parameters that are specified in the configuration file.

Figure 1 shows how you can develop your custom Aerospike connector for any HTTP-based target system, for e.g. ElasticSearch, Splunk, or any web service using the Aerospike ESP connector and the Outbound Message Transformer. The custom code (shown later in blog) that you would have to develop would basically ingest the ESP message and transform it into a REST API call for the target system depending on the type of the message that was received over XDR i.e. write or delete.

Figure 1: High Level Architecture

What it is not:

It is not a distributed data stream processing engine like Spark, Flink, or Kafka Streams, hence you must exercise caution when writing custom code that does heavy-weight event processing. However, your custom code can be easily scaled by scaling the underlying connector. See scaling connectors for more information.

What it can be used for:

Here are a few ideas to spark your imagination :

  1. Use the ESP connector to export a subset or partial Aerospike Change Notification messages for analysis in Splunk using the HTTP Event Collector in Splunk or any other analytics system that supports an HTTP endpoint for ingesting data.
  2. Meet the compliance needs of your data lake. You can either filter out Aerospike Change Notification that have sensitive bins such as SSN etc., or deidentify/mask them as needed using an external Key Management Server (KMS) before ingesting the records into your data lake. Additionally, Aerospike server also offers Expression-based XDR Filtering to filter records at the source itself to address this use case.
  3. Use Aerospike as an online Feature Store. As data that is ingested into your ML platform gets featurized and inserted into the feature store, it could be served up to the downstream ML model by using the ESP connector and the outbound transformer for any pre-preprocessing that is required.
  4. Convert Aerospike records into columnar file formats such as Parquet, ORC, Arrow, etc. to make it available to ML platforms and analytical systems that are optimized to work with columnar data. You could potentially use the ESP connector along with the outbound message transformer to accomplish this.

What else you should care about:

The transformer is thread-safe i.e. it runs in parallel for each incoming record. Although the API is thread-safe, the custom code running in it would have to ensure its own thread-safety when accessing non-local variables.

How to write an outbound message transformer using the Java SDK?

The SDK has been published to Maven central. Add it to your project’s dependencies using your prefered tool. The maven example would be (use the appropriate version):

<dependency>  
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-outbound-sdk</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>

The routing transform config is part of the routing config. We have added a new “custom-transformer” config section for the custom transformer, and the format transform config is part of the format config. Figure 2 shows a sample message transformer configuration:

Figure 2 — Sample message transformer configuration

Figure 3 shows the sample code for a customer transformer for Apache Pulsar that updates the generation metadata field of the incoming Change Notification record and inserts bins “color” and “shade” provided in the above config file. You can view the full code at this location.

Figure 3 — Sample message transformer code

How to deploy the message transformer?

Generate the JARs for the custom code plugin with all dependencies shaded to avoid classpath conflicts with the connectors. Copy all the JARs to the /opt/aerospike-<outbound-connector>/usr-lib directory before starting the outbound connector.

What’s next?

Outbound Message Transform provides you control, via custom code, to easily integrate Aerospike with the broader big data ecosystem to enable your innovative use cases.

If this blog has inspired you to pursue your ideas, please download the Aerospike connector for your platform, and full-featured single-node Aerospike Database Enterprise Edition, and follow the documentation for a seamless trial experience.

I’d like to thank my co-author Tejas Sampige, as well as Ashish Shinde,

, and Joe Martin for developing this capability.

Aerospike is hiring! If you’re interested in technical challenges within a growing team, take a look at our available jobs.