Skip to main content

Implementing Read-Write Transactions with R-M-W Pattern

For an interactive Jupyter notebook experience: Launch in Binder#

This tutorial explains how to use the Read-Modify-Write pattern in order to ensure atomicity and isolation for read-write single-record transactions.

This notebook requires Aerospike datbase running on localhost and that python and the Aerospike python client have been installed (pip install aerospike). Visit Aerospike notebooks repo for additional details and the docker container.

Introduction#

In Aerospike, the transactional boundaries are "single request, single record". While multiple operations may be specified in a single request on a single record, each such operation can involve a single bin and only certain write operations are allowed. Therefore, neither updates involving multiple bins (e.g, "a=a+b") nor general logic (e.g., "concatenate alternate letters and append") are possible as server-side operations. Of course, UDFs allow complex logic in a transactional update of a single record, however they are not suitable for all situations for various reasons such as performance and ease. Therefore most updates entail the R-M-W pattern or Reading the record, Modifying bins on the client side, and then Writing the record updates back to the server.

The tutorial first demonstrates how read-write operations can result in lost writes in a concurrent multi-client environment.

Then we show how to specify conditional writes with version check to address the problem by disallowing intereaved read-write and thus protecting against lost writes.

Prerequisites#

This tutorial assumes familiarity with the following topics:

[Provide topics and links. For example:]

Initialization#

Ensure database is running#

This notebook requires that Aerospike datbase is running. [Include the right code cell for Java or Python from the two cells below.]

!asd >& /dev/null!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"

Output:

Aerospike database is running!

Connect to database.#

# import the modulesimport sysimport aerospike
# Configure the clientconfig = {  'hosts': [ ('127.0.0.1', 3000) ],  'policy' : {'key': aerospike.POLICY_KEY_SEND}}
# Create a client and connect it to the clustertry:  client = aerospike.client(config).connect()except:  print("failed to connect to the cluster with", config['hosts'])  sys.exit(1)print('Client successfully connected to the database.')

Output:

Client successfully connected to the database.

Populate database with test data.#

We create one record with an integer bin "gen-times-2" (the names will become clear below), initialized to 1.

namespace = 'test'tutorial_set = 'rmw-tutorial-set'user_key = 'id-1'# Records are addressable via a tuple of (namespace, set, user_key)rec_key = (namespace, tutorial_set, user_key)rmw_bin = 'gen-times-2'try:  # Create the record  client.put(rec_key, {rmw_bin: 1})except Exception as e:  print("error: {0}".format(e), file=sys.stderr)
print('Test data populated.')

Output:

Test data populated.

The Problem of Lost Writes#

In a concurrent setting, multiple clients may be performaing Read-Modify-Write on the same record in a way that get in each other's way. Since various R-M-W transactions can interleave, a transaction can be lost, if another client updates the record without reading the transaction's update.

To demonstrate this, we make use of a record's "generation" or version, that is available as the record metadata, and is automatically incremented on each successful update of the record.

The integer bin “gen-times-2” holds the value that is 2 times the value of the current generation of the record. A client first reads the current generation of the record, and then updates the bin value 2 times that value.

In the case of a single client, there are no issues in maintaining the semantics of the bin. However when there are multiple clients, the interleaving of reads and writes of different transactions can violate the semantics. By updating the bin using an older generation value, it may not be 2 times the current generation, which is the constraint that we want to preserve.

First, we will show how transaction writes are lost in a simple concurrent case by observing whether the relationship between record's current generation and the bin value is maintained. Then we will show how the problem is solved using a conditional write with version check.

Test Framework#

We spawn multiple (num_threads) threads to simulate concurrent access. Each thread repeatedly (num_txns) does the following:

  • waits for a random duration (with average of txn_wait_ms)
  • executes a passed-in R-M-W function that returns the failure type (string, null if success).

At the end the thread prints out the aggregate counts for each error type. In aggregate, they signify the likelihood of a read-write transaction failing.

import threadingimport timeimport random
num_txns = 10txn_wait_ms = 500
def thread_fn(thread_id, rmw_fn):    random.seed(thread_id)    lost_writes_count = 0    failures = {}    for i in range(num_txns):        failure = rmw_fn()        if failure:            if not failure in failures:                failures[failure] = 1            else:                 failures[failure] += 1      print('\tThead {0} failures: {1}'.format(thread_id, failures))    return           def run_test(num_threads, rmw_fn):    threads = list()    print('{0} threads, {1} transcations per thread:'.format(num_threads, num_txns))    for thread_index in range(num_threads):        thread = threading.Thread(target=thread_fn, args=(thread_index, rmw_fn))        threads.append(thread)        thread.start()    for thread in threads:        thread.join()    return

Simple RMW Function#

Next we implement a simple RMW function simple_rmw_fn to pass into the above framework. The function:

  • Reads the record.
  • Computes new value of gen_times_2 (= 2 * read generation). Then waits for a random duration, with average of write_wait_ms average to simulate the application computation time between read and write.
  • Writes the new bin value. In the same (multi-op) request, reads back the record for the record's new generation value.
  • Returns "lost writes" if the updated value of gen_times_2/2 is smaller than the new gen. If they are the same, it returns None.
import aerospike_helpers.operations.operations as op_helpers
write_wait_ms = 50
def rmw_simple():    #read    _, meta, bins = client.get(rec_key)    # wait before write to simulate computation time    time.sleep(random.uniform(0,2*write_wait_ms/1000.0))    # modify     read_gen = meta['gen']    new_rmw_bin_value = 2*(read_gen+1)    # write and read back bin_inc to compare    ops = [op_helpers.write(rmw_bin, new_rmw_bin_value),          op_helpers.read(rmw_bin)]    try:        _, meta, bins = client.operate(rec_key, ops)    except Exception as e:        print("error: {0}".format(e), file=sys.stderr)        exit(-1)    # compare new_rmw_bin_value//2 and new gen; if different return 'lost writes'    new_gen = meta['gen']    if new_rmw_bin_value//2 != new_gen:          #print('gen: {0}, bin: {1}, lost: {2}'.format(new_gen, new_rmw_bin_value//2, new_gen-new_rmw_bin_value//2))        return 'lost writes'    return None

Test Results#

For various values of concurrency (num_threads), we can see that with greater concurrent updates, a larger percentage of read-write transactions are lost, meaning greater likelihood of the semantics of the gen_times_2 bin not being preserved.

run_test(num_threads=1, rmw_fn=rmw_simple)run_test(num_threads=2, rmw_fn=rmw_simple)run_test(num_threads=3, rmw_fn=rmw_simple)run_test(num_threads=4, rmw_fn=rmw_simple)

Output:

1 threads, 10 transcations per thread:    Thead 0 failures: {}2 threads, 10 transcations per thread:    Thead 0 failures: {'lost writes': 5}    Thead 1 failures: {'lost writes': 6}3 threads, 10 transcations per thread:    Thead 0 failures: {'lost writes': 4}    Thead 1 failures: {'lost writes': 8}    Thead 2 failures: {'lost writes': 7}4 threads, 10 transcations per thread:    Thead 0 failures: {'lost writes': 9}    Thead 3 failures: {'lost writes': 8}    Thead 1 failures: {'lost writes': 8}    Thead 2 failures: {'lost writes': 8}

Using Generation Check#

To solve the problem of lost writes, the simple R-M-W is modified with how the Write is done: by making it conditional on the record not having been modified since the Read. It is a "check-and-set (CAS)" like operation that succeeds if the record generation (version) is still the same as at the time of Read. Otherwise it fails, and the client must retry the whole R-M-W pattern. The syntax and usage is shown in the code below.

RMW Function with Version Check and Retries#

In the rmw_with_gen_check function below, a failed read-write due to generation mismatch is retried for max_retries attempts or until the write is successful. Each retry is attempted after a exponential backoff wait of (retry_number ** 2) * retry_wait_ms.

A write can still fail after max_retries attempts, and the client can suitably handle it. However no writes are overwritten or lost, and the intended semantics of the gen-times-2 bin is always preserved.

We perform the same concurrent test with the version check at Write. We expect no interleaved_writes reported in any thread.

from aerospike_helpers.operations import operations as op_helpersfrom aerospike import exception as ex
max_retries = 3retry_wait_ms = 20
def rmw_with_gen_check():    retryRMWCount = 0    done = False    while (not done):        #read        _, meta, bins = client.get(rec_key)        # wait before write to simulate computation time        time.sleep(random.uniform(0,2*write_wait_ms/1000.0))        # modify         read_gen = meta['gen']        new_rmw_bin_value = 2*(read_gen+1)        # write and read back bin_inc to compare        ops = [op_helpers.write(rmw_bin, new_rmw_bin_value),              op_helpers.read(rmw_bin)]        write_policy = { 'gen': aerospike.POLICY_GEN_EQ }        try:            _, meta, bins = client.operate(rec_key, ops, meta={'gen': read_gen}, policy=write_policy)        except ex.RecordGenerationError as e:            if retryRMWCount < max_retries:                retryRMWCount += 1                time.sleep((2**retryRMWCount)*retry_wait_ms/1000.0)                   else:                return 'max retries exceeded'         except Exception as e:            print("error: {0}".format(e), file=sys.stderr)            exit(-1)        else:            done = True            # compare new_rmw_bin_value//2 and new gen; if different     new_gen = meta['gen']    if new_rmw_bin_value//2 != new_gen:          return 'lost writes'    return None

Test Results#

Let's execute for various levels of concurrency and see the results. We expect to see no lost writes. Even when max-retries are exceeded, transaction and database integrity is preserved.

run_test(num_threads=2, rmw_fn=rmw_with_gen_check)run_test(num_threads=3, rmw_fn=rmw_with_gen_check)run_test(num_threads=4, rmw_fn=rmw_with_gen_check)

Output:

2 threads, 10 transcations per thread:    Thead 1 failures: {}    Thead 0 failures: {}3 threads, 10 transcations per thread:    Thead 1 failures: {}    Thead 0 failures: {}    Thead 2 failures: {}4 threads, 10 transcations per thread:    Thead 0 failures: {}    Thead 3 failures: {'max retries exceeded': 1}    Thead 2 failures: {'max retries exceeded': 1}    Thead 1 failures: {'max retries exceeded': 2}

Takeaways#

In the tutorial we showed:

  • the need for read-write transactions in Aerospike to use the R-M-W pattern
  • how writes can be overwritten and lost in a concurrent environment if performed simply
  • how the developer can ensure atomicity and isolation of a read-write transaction by using version check logic and syntax.

Clean up#

Remove data and close connection.

client.truncate(namespace, tutorial_set, 0)# Close the connection to the Aerospike clusterclient.close()print('Removed tutorial data. Connection closed.')

Output:

Removed tutorial data. Connection closed.

Further Exploration and Resources#

For further exploration of transactions support in Aerospike, check out the following resources:

Next steps#

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File > Open, and select Upload.