Skip to main content

Model Serving with Aerospike Feature Store

For an interactive Jupyter notebook experience: Launch in Binder

This notebook is the third in the series of notebooks that show how Aerospike can be used as a feature store.

This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Introduction

This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 3 of the Feature Store series of notebooks, and focuses on Model Serving aspects concerning a Feature Store. The first two notebooks in the series discuss Feature Engineering and Model Training.

Reference Architecture

This notebook is organized as follows:

  • Summary of the prior (Data Engineering and Model Training) notebooks
  • Load the trained and saved model for making a prediction.
  • Use Aerospike API to retrieve precomputed features.
  • Implement and test a web service that combines the above elements, that is, accesses features, runs the model and returns the prediction.

Prerequisites

This tutorial assumes familiarity with the following topics:

Setup

Set up Aerospike Server. Spark Server, and Spark Connector.

Ensure Database Is Running

This notebook requires that Aerospike datbase is running.

!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"

Output:

Aerospike database is running!

Initialize Client

Initialize Python Client used to access features stored in the Aerospike feature store.

import aerospike
import sys
# connect to the database
config = {
'hosts': [ ('127.0.0.1', 3000) ]
}
try:
client = aerospike.client(config).connect()
except:
print("failed to connect to the cluster with", config['hosts'])
sys.exit(1)
print('Client initialized and connected to database')

Output:

Client initialized and connected to database

Initialize Spark

We will be using Spark functionality in this notebook.

Initialize Paths and Env Variables

# directory where spark notebook requisites are installed
#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set 
import findspark
findspark.init(SPARK_HOME)
# Aerospike Spark Connector related settings
import os
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'

Configure Spark Session

Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.

# imports
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

Access Shell Commands

You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.

Prior Context

In the prior two notebooks on Feature Enginnering and Model Training, we saved feature data to the feature store, and trained an ML model on it, respectively. If the saved feature data or the model is not available in this environment, you can run the following cells to recreate them.

Feature Data

Run the following cells ONLY IF the database does not have the feature data for credit card transactions from the prior notebooks (Part 1 or Part 2). You will need to covert them to Code cells before you can run them.

# read and transform the sample credit card transactions data from a csv file
from pyspark.sql.functions import expr
df = spark.read.options(header="True", inferSchema="True") \
.csv("resources/creditcard_small.csv") \
. orderBy(['_c0'], ascending=[True])
new_col_names = ['CC1_' + (c if c != '_c0' else 'OldIdx') for c in df.columns]
df = df.toDF(*new_col_names) \
.withColumn('TxnId', expr('CC1_OldIdx+1').cast(StringType())) \
.select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])
#df.toPandas().head()# Save feature values in entity records
ENTITY_TYPE = 'cctxn'
ID_COLUMN = 'TxnId'
df.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", ENTITY_TYPE+'-features')\
.option("aerospike.updateByKey", ID_COLUMN) \
.save()

print('Features stored to Feature Store.')

Trained Model

If the saved model fs_model_rf is not in the resources directory, first extract it from the archive using the following commands in the terminal tab:

cd /home/jovyan/notebooks/spark/resources
tar -xvzf fs_model_rf.tar.gz

Components of Web Service

The main components of the web service are:

  1. Loading the model
  2. Retrieving the required features
  3. Running the model to make a prediction

Loading Model

Load the RandomForestClassifer model that we saved in the Model Training notebook (Part 2).

from pyspark.ml.classification import RandomForestClassificationModel

rf_model = RandomForestClassificationModel.read().load("resources/fs_model_rf")
print("Loaded Random Forest Classification model.")

Output:

Loaded Random Forest Classification model.

Retrieving Features

The Python Client provides a convenient API to access specific features from the entity set as shown below. Recall, the model uses features CC1_V1 through CC1-V28. We also need to construct a schema for the dataframe which is needed to run the model.

namespace = 'test'
entity_set = 'cctxn-features'
txnid = '5' # dummy value, the web service will get the id from the request params

record_key = (namespace, entity_set, txnid)
features = ["CC1_V"+str(i) for i in range(1,29)] # need features CC1_V1-CC1_V28
schema = StructType()
for i in range(1,29): # all features are of type float or Double
schema.add("CC1_V"+str(i), DoubleType(), True)
# get the needed features
try:
(key, meta, bins) = client.select(record_key, features)
except:
print('failed to get record')
sys.exit(1)

# create an input dataframe for the model
featureBuf = [tuple([bins[f] for f in features])]
featureRDD = spark.sparkContext.parallelize(featureBuf)
featureDF = spark.createDataFrame(featureRDD, schema)
featureDF.toPandas().transpose()

Output:

0
CC1_V1-1.158233
CC1_V20.877737
CC1_V31.548718
CC1_V40.403034
CC1_V5-0.407193
CC1_V60.095921
CC1_V70.592941
CC1_V8-0.270533
CC1_V90.817739
CC1_V100.753074
CC1_V11-0.822843
CC1_V120.538196
CC1_V131.345852
CC1_V14-1.119670
CC1_V150.175121
CC1_V16-0.451449
CC1_V17-0.237033
CC1_V18-0.038195
CC1_V190.803487
CC1_V200.408542
CC1_V21-0.009431
CC1_V220.798278
CC1_V23-0.137458
CC1_V240.141267
CC1_V25-0.206010
CC1_V260.502292
CC1_V270.219422
CC1_V280.215153

Running Model

Construct Feature Vector

We first construct a feature vector from the input features as required by the model interface. The model only uses fvector column created by VectorAssembler.

from pyspark.ml.feature import VectorAssembler

# create a feature vector from features
assembler = VectorAssembler(inputCols=features, outputCol="fvector")
featureVectorDF = assembler.transform(featureDF)
featureVectorDF.toPandas().transpose()

Output:

0
CC1_V1-1.158233
CC1_V20.877737
CC1_V31.548718
CC1_V40.403034
CC1_V5-0.407193
CC1_V60.095921
CC1_V70.592941
CC1_V8-0.270533
CC1_V90.817739
CC1_V100.753074
CC1_V11-0.822843
CC1_V120.538196
CC1_V131.345852
CC1_V14-1.11967
CC1_V150.175121
CC1_V16-0.451449
CC1_V17-0.237033
CC1_V18-0.038195
CC1_V190.803487
CC1_V200.408542
CC1_V21-0.009431
CC1_V220.798278
CC1_V23-0.137458
CC1_V240.141267
CC1_V25-0.20601
CC1_V260.502292
CC1_V270.219422
CC1_V280.215153
fvector[-1.15823309349523, 0.877736754848451, 1.54871...

Predict

Call the model's transform function to predict. We input only a dataframe with fvector column, and use only two columns from the prediction dataframe record: probablity and prediction. The threshold for fraud/no-fraud decision is 50%.

rf_prediction = rf_model.transform(featureVectorDF['fvector',])
result = rf_prediction['probability', 'prediction'].collect()[0]
print('normal txn prob: ', result[0][0])
print('fraud prob: ', result[0][1])
print('prediction: ', 'no fraud' if result[0][1] < 0.5 else 'fraud')

Output:

normal txn prob:  0.9361028509004404
fraud prob: 0.0638971490995595
prediction: no fraud

Model Serving with Web Service

Let's create a simple web service that serves the model. We will use the Flask framework to create the web service. The web service takes txnid as the query parameter, retrieves the features from the feature store, runs the model, and returns the prediction.

Note, this model serving example is not realistic as we are using only precomputed features for inference. Also, we have trained and tested the model with the same data. Nonetheless, the example serves the purpose which is to illustrate the use of a feature store for model serving. It should not be difficult to use the patterns shown here to devise a realistic example.

# stop the existing spark session before starting the web service
spark.stop()

Install Web Service Framework

Open a terminal tab, and install the Flask framework with the following command.

pip install flask
pip install flask_restful

Examine Web Service File

First open the file resources/fs-model-ws.py that implements the web service using Flask frameowrk, and examine its contents.

Note that it is mostly the code in the above cells organized to run as a Flask web service. You can learn more about Flask here.

Run Web Service

Run the web service by opening a terminal tab and running the following commands in it:

  1. cd /home/jovyan/notebooks/spark/resources
  2. python fs-model-ws.py

You can ignore the warning messages. After the "Debugger is active" message, the service is ready to receive requests.

Send Requests to Web Service

Let's call the web service to predict the outcome for a transaction id.

We can submit requests through the curl command as below. We can test with a few normal transactions (ids: 1, 2, 3, 10), and a few fraud transactions (ids: 6337, 120506, 150669).

You can query the database to view other fraud and normal transaction ids. As you may recall, TxnId and CC1_Class are the bins for the transaction id and label respectively.

# Send a request to the model web service running at 127.0.0.1:5000
!curl http://127.0.0.1:5000/?txnid=1

Output:

{
"fraud_prob": 0.052470997597310345,
"normal_prob": 0.9475290024026897,
"prediction": "no fraud"
}
# You can query a transaction in the database. 
# remember CC1_Class is the label with 1 indicating a fraudulent transaction
!aql -c "select TxnId, CC1_Class from test.cctxn-features where PK = '6337'"

Output:

select TxnId, CC1_Class from test.cctxn-features where PK = '6337'
+--------+------------+
| TxnId | CC1_Class |
+--------+------------+
| "6337" | 1 |
+--------+------------+
1 row in set (0.000 secs)

OK

Takeaways and Conclusion

In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how the precomputed features stored in the Aerospike feature store can be used at model serving time. We implemented a simple web service that loads the trained model, and then for each request, retrieves features, runs the model, and returns the model prediction.

This is the third notebook in the series of notebooks on how Aerospike can be used as a feature store. The first and second notebooks discussed Feature Engineering and Model Training aspects respectively.

Cleaning Up

Shut down the web service by hitting Ctrl-C in the tab in which it is running.

Close the spark session, and remove the tutorial data by executing the cell below.

try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
#!aql -c "truncate test"

Further Exploration and Resources

Here are some links for further exploration.

Resources

Exploring Other Notebooks

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.