Skip to main content

Implementing Read-Write Transactions with R-M-W Pattern

For an interactive Jupyter notebook experience: Binder

This tutorial explains how to use the Read-Modify-Write pattern in order to ensure atomicity and isolation for read-write single-record transactions.

This notebook requires Aerospike database running on localhost and that python and the Aerospike python client have been installed (pip install aerospike). Visit Aerospike notebooks repo for additional details and the docker container.

Introduction

In Aerospike, the transactional boundaries are "single request, single record". While multiple operations may be specified in a single request on a single record, each such operation can involve a single bin and only certain write operations are allowed. Therefore, neither updates involving multiple bins (e.g, "a=a+b") nor general logic (e.g., "concatenate alternate letters and append") are possible as server-side operations. Of course, UDFs allow complex logic in a transactional update of a single record, however they are not suitable for all situations for various reasons such as performance and ease. Therefore most updates entail the R-M-W pattern or Reading the record, Modifying bins on the client side, and then Writing the record updates back to the server.

The tutorial first demonstrates how read-write operations can result in lost writes in a concurrent multi-client environment.

Then we show how to specify conditional writes with version check to address the problem by disallowing intereaved read-write and thus protecting against lost writes.

Prerequisites

This tutorial assumes familiarity with the following topics:

[Provide topics and links. For example:]

Initialization

Ensure database is running

This notebook requires that Aerospike database is running. [Include the right code cell for Java or Python from the two cells below.]

!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"

Output:

Aerospike database is running!

Connect to database.

# import the modules
import sys
import aerospike

# Configure the client
config = {
'hosts': [ ('127.0.0.1', 3000) ],
'policy' : {'key': aerospike.POLICY_KEY_SEND}
}

# Create a client and connect it to the cluster
try:
client = aerospike.client(config).connect()
except:
print("failed to connect to the cluster with", config['hosts'])
sys.exit(1)
print('Client successfully connected to the database.')

Output:

Client successfully connected to the database.

Populate database with test data.

We create one record with an integer bin "gen-times-2" (the names will become clear below), initialized to 1.

namespace = 'test'
tutorial_set = 'rmw-tutorial-set'
user_key = 'id-1'
# Records are addressable via a tuple of (namespace, set, user_key)
rec_key = (namespace, tutorial_set, user_key)
rmw_bin = 'gen-times-2'
try:
# Create the record
client.put(rec_key, {rmw_bin: 1})
except Exception as e:
print("error: {0}".format(e), file=sys.stderr)

print('Test data populated.')

Output:

Test data populated.

The Problem of Lost Writes

In a concurrent setting, multiple clients may be performaing Read-Modify-Write on the same record in a way that get in each other's way. Since various R-M-W transactions can interleave, a transaction can be lost, if another client updates the record without reading the transaction's update.

To demonstrate this, we make use of a record's "generation" or version, that is available as the record metadata, and is automatically incremented on each successful update of the record.

The integer bin “gen-times-2” holds the value that is 2 times the value of the current generation of the record. A client first reads the current generation of the record, and then updates the bin value 2 times that value.

In the case of a single client, there are no issues in maintaining the semantics of the bin. However when there are multiple clients, the interleaving of reads and writes of different transactions can violate the semantics. By updating the bin using an older generation value, it may not be 2 times the current generation, which is the constraint that we want to preserve.

First, we will show how transaction writes are lost in a simple concurrent case by observing whether the relationship between record's current generation and the bin value is maintained. Then we will show how the problem is solved using a conditional write with version check.

Test Framework

We spawn multiple (num_threads) threads to simulate concurrent access. Each thread repeatedly (num_txns) does the following:

  • waits for a random duration (with average of txn_wait_ms)
  • executes a passed-in R-M-W function that returns the failure type (string, null if success).

At the end the thread prints out the aggregate counts for each error type. In aggregate, they signify the likelihood of a read-write transaction failing.

import threading
import time
import random

num_txns = 10
txn_wait_ms = 500

def thread_fn(thread_id, rmw_fn):
random.seed(thread_id)
lost_writes_count = 0
failures = {}
for i in range(num_txns):
failure = rmw_fn()
if failure:
if not failure in failures:
failures[failure] = 1
else:
failures[failure] += 1
print('\tThead {0} failures: {1}'.format(thread_id, failures))
return


def run_test(num_threads, rmw_fn):
threads = list()
print('{0} threads, {1} transcations per thread:'.format(num_threads, num_txns))
for thread_index in range(num_threads):
thread = threading.Thread(target=thread_fn, args=(thread_index, rmw_fn))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return

Simple RMW Function

Next we implement a simple RMW function simple_rmw_fn to pass into the above framework. The function:

  • Reads the record.
  • Computes new value of gen_times_2 (= 2 * read generation). Then waits for a random duration, with average of write_wait_ms average to simulate the application computation time between read and write.
  • Writes the new bin value. In the same (multi-op) request, reads back the record for the record's new generation value.
  • Returns "lost writes" if the updated value of gen_times_2/2 is smaller than the new gen. If they are the same, it returns None.
import aerospike_helpers.operations.operations as op_helpers

write_wait_ms = 50

def rmw_simple():
#read
_, meta, bins = client.get(rec_key)
# wait before write to simulate computation time
time.sleep(random.uniform(0,2*write_wait_ms/1000.0))
# modify
read_gen = meta['gen']
new_rmw_bin_value = 2*(read_gen+1)
# write and read back bin_inc to compare
ops = [op_helpers.write(rmw_bin, new_rmw_bin_value),
op_helpers.read(rmw_bin)]
try:
_, meta, bins = client.operate(rec_key, ops)
except Exception as e:
print("error: {0}".format(e), file=sys.stderr)
exit(-1)
# compare new_rmw_bin_value//2 and new gen; if different return 'lost writes'
new_gen = meta['gen']
if new_rmw_bin_value//2 != new_gen:
#print('gen: {0}, bin: {1}, lost: {2}'.format(new_gen, new_rmw_bin_value//2, new_gen-new_rmw_bin_value//2))
return 'lost writes'
return None

Test Results

For various values of concurrency (num_threads), we can see that with greater concurrent updates, a larger percentage of read-write transactions are lost, meaning greater likelihood of the semantics of the gen_times_2 bin not being preserved.

run_test(num_threads=1, rmw_fn=rmw_simple)
run_test(num_threads=2, rmw_fn=rmw_simple)
run_test(num_threads=3, rmw_fn=rmw_simple)
run_test(num_threads=4, rmw_fn=rmw_simple)

Output:

1 threads, 10 transcations per thread:
Thead 0 failures: {}
2 threads, 10 transcations per thread:
Thead 0 failures: {'lost writes': 5}
Thead 1 failures: {'lost writes': 6}
3 threads, 10 transcations per thread:
Thead 0 failures: {'lost writes': 4}
Thead 1 failures: {'lost writes': 8}
Thead 2 failures: {'lost writes': 7}
4 threads, 10 transcations per thread:
Thead 0 failures: {'lost writes': 9}
Thead 3 failures: {'lost writes': 8}
Thead 1 failures: {'lost writes': 8}
Thead 2 failures: {'lost writes': 8}

Using Generation Check

To solve the problem of lost writes, the simple R-M-W is modified with how the Write is done: by making it conditional on the record not having been modified since the Read. It is a "check-and-set (CAS)" like operation that succeeds if the record generation (version) is still the same as at the time of Read. Otherwise it fails, and the client must retry the whole R-M-W pattern. The syntax and usage is shown in the code below.

RMW Function with Version Check and Retries

In the rmw_with_gen_check function below, a failed read-write due to generation mismatch is retried for max_retries attempts or until the write is successful. Each retry is attempted after a exponential backoff wait of (retry_number ** 2) * retry_wait_ms.

A write can still fail after max_retries attempts, and the client can suitably handle it. However no writes are overwritten or lost, and the intended semantics of the gen-times-2 bin is always preserved.

We perform the same concurrent test with the version check at Write. We expect no interleaved_writes reported in any thread.

from aerospike_helpers.operations import operations as op_helpers
from aerospike import exception as ex

max_retries = 3
retry_wait_ms = 20

def rmw_with_gen_check():
retryRMWCount = 0
done = False
while (not done):
#read
_, meta, bins = client.get(rec_key)
# wait before write to simulate computation time
time.sleep(random.uniform(0,2*write_wait_ms/1000.0))
# modify
read_gen = meta['gen']
new_rmw_bin_value = 2*(read_gen+1)
# write and read back bin_inc to compare
ops = [op_helpers.write(rmw_bin, new_rmw_bin_value),
op_helpers.read(rmw_bin)]
write_policy = { 'gen': aerospike.POLICY_GEN_EQ }
try:
_, meta, bins = client.operate(rec_key, ops, meta={'gen': read_gen}, policy=write_policy)
except ex.RecordGenerationError as e:
if retryRMWCount < max_retries:
retryRMWCount += 1
time.sleep((2**retryRMWCount)*retry_wait_ms/1000.0)
else:
return 'max retries exceeded'
except Exception as e:
print("error: {0}".format(e), file=sys.stderr)
exit(-1)
else:
done = True
# compare new_rmw_bin_value//2 and new gen; if different
new_gen = meta['gen']
if new_rmw_bin_value//2 != new_gen:
return 'lost writes'
return None

Test Results

Let's execute for various levels of concurrency and see the results. We expect to see no lost writes. Even when max-retries are exceeded, transaction and database integrity is preserved.

run_test(num_threads=2, rmw_fn=rmw_with_gen_check)
run_test(num_threads=3, rmw_fn=rmw_with_gen_check)
run_test(num_threads=4, rmw_fn=rmw_with_gen_check)

Output:

2 threads, 10 transcations per thread:
Thead 1 failures: {}
Thead 0 failures: {}
3 threads, 10 transcations per thread:
Thead 1 failures: {}
Thead 0 failures: {}
Thead 2 failures: {}
4 threads, 10 transcations per thread:
Thead 0 failures: {}
Thead 3 failures: {'max retries exceeded': 1}
Thead 2 failures: {'max retries exceeded': 1}
Thead 1 failures: {'max retries exceeded': 2}

Takeaways

In the tutorial we showed:

  • the need for read-write transactions in Aerospike to use the R-M-W pattern
  • how writes can be overwritten and lost in a concurrent environment if performed simply
  • how the developer can ensure atomicity and isolation of a read-write transaction by using version check logic and syntax.

Clean up

Remove data and close connection.

client.truncate(namespace, tutorial_set, 0)
# Close the connection to the Aerospike cluster
client.close()
print('Removed tutorial data. Connection closed.')

Output:

Removed tutorial data. Connection closed.

Further Exploration and Resources

For further exploration of transactions support in Aerospike, check out the following resources:

Next steps

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File > Open, and select Upload.