Skip to main content

Tweetaspike: A Simple Application

For an interactive Jupyter notebook experience: Launch in Binder#

Tweetaspike is a simple application that illustrates some key aspects of an Aerospike application design.

This notebook requires Aerospike datbase running locally and that Java kernel has been installed. Visit Aerospike notebooks repo for additional details and the docker container.

Use magics to load aerospike client from pom#

%%loadFromPOM<dependencies>  <dependency>   <groupId>com.aerospike</groupId>   <artifactId>aerospike-client</artifactId>   <version>5.0.1</version>  </dependency>  <!-- Apache command line parser. -->  <dependency>   <groupId>commons-cli</groupId>   <artifactId>commons-cli</artifactId>   <version>1.2</version>  </dependency>  <!-- Log4j. -->  <dependency>   <groupId>log4j</groupId>   <artifactId>log4j</artifactId>   <version>1.2.17</version>  </dependency>  <!-- JSON simple -->  <dependency>   <groupId>com.googlecode.json-simple</groupId>   <artifactId>json-simple</artifactId>   <version>1.1.1</version>  </dependency>  <dependency>   <groupId>junit</groupId>   <artifactId>junit</artifactId>   <version>4.4</version>  </dependency></dependencies>
import java.io.BufferedReader;import java.io.Console;import java.io.IOException;import java.io.InputStreamReader;

public class EclipseConsole {    Console systemConsole = System.console();    boolean useSystemConsole = false;
    public EclipseConsole(){        this.useSystemConsole = (this.systemConsole != null);    }
    public void printf(String message){        if (useSystemConsole)            systemConsole.printf(message);        else {            System.out.printf(message);        }    }
    public String readLine(){        if (useSystemConsole)            return systemConsole.readLine();        else {            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));            String line = "";            try {                line = bufferedReader.readLine();            } catch (IOException e) {                e.printStackTrace();            }            return line;        }    }
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/

import java.io.PrintWriter;import java.io.StringWriter;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.query.IndexType;import com.aerospike.client.task.IndexTask;
public class UtilityService {    private AerospikeClient client;    private EclipseConsole console = new EclipseConsole();
    public UtilityService(AerospikeClient c) {        this.client = c;    }
    public void createSecondaryIndexes() throws AerospikeException,            InterruptedException {
    // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. The recommended way of creating indexes in production env is via AQL
        console.printf("\nCreating secondary index on: set=tweets, bin=username...\n");        IndexTask task1 = client.createIndex(null, "test", "tweets",                "username_index", "username", IndexType.STRING);        task1.waitTillComplete(100);        console.printf("Done creating secondary index on: set=tweets, bin=username\n");
        console.printf("\nCreating secondary index on: set=tweets, bin=ts...\n");        IndexTask task2 = client.createIndex(null, "test", "tweets", "ts_index",                "ts", IndexType.NUMERIC);        task2.waitTillComplete(100);        console.printf("Done creating secondary index on: set=tweets, bin=ts\n");
        console.printf("\nCreating secondary index on: set=users, bin=tweetcount...\n");        IndexTask task3 = client.createIndex(null, "test", "users",                "tweetcount_index", "tweetcount", IndexType.NUMERIC);        task3.waitTillComplete(100);        console.printf("Done creating secondary index on: set=users, bin=tweetcount\n");            }
    public static String printStackTrace(Exception ex) {        StringWriter errors = new StringWriter();        ex.printStackTrace(new PrintWriter(errors));        return errors.toString();    }
    /*     * Example functions not in use     */    @SuppressWarnings("unused")    private void add() throws AerospikeException {        // Java Add        Key userKey = new Key("test", "users", "user1234");        Bin bin2 = new Bin("count", 3);        client.add(null, userKey, bin2);    }
    @SuppressWarnings("unused")    private void append() throws AerospikeException {        // Java Append        Key userKey = new Key("test", "users", "user1234");        Bin bin1 = new Bin("greet", "hello");        Bin bin2 = new Bin("greet", " world");        client.append(null, userKey, bin2);    }
    @SuppressWarnings("unused")    private void exists() throws AerospikeException {        // Java Exists        Key userKey = new Key("test", "users", "user1234");        boolean recordKeyExists = client.exists(null, userKey);    }
    @SuppressWarnings("unused")    private void touch() throws AerospikeException {        // Java Touch        Key userKey = new Key("test", "users", "user1234");        client.touch(null, userKey);    }
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/

import java.io.File;import java.util.Random;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.Language;import com.aerospike.client.Operation;import com.aerospike.client.Record;import com.aerospike.client.ScanCallback;import com.aerospike.client.Value;import com.aerospike.client.lua.LuaConfig;import com.aerospike.client.policy.Priority;import com.aerospike.client.policy.RecordExistsAction;import com.aerospike.client.policy.ScanPolicy;import com.aerospike.client.policy.WritePolicy;import com.aerospike.client.query.Filter;import com.aerospike.client.query.RecordSet;import com.aerospike.client.query.Statement;import com.aerospike.client.task.RegisterTask;import com.aerospike.client.query.IndexType;import com.aerospike.client.task.IndexTask;
public class TweetService {    private AerospikeClient client;    private EclipseConsole console = new EclipseConsole();
    public TweetService(AerospikeClient client) {        this.client = client;    }
    public void createTweet() throws AerospikeException, InterruptedException {
        console.printf("\n********** Create Tweet **********\n");
        ///*********************///        ///*****Data Model*****///        //Namespace: test        //Set: tweets            //Key: <username:<counter>>            //Bins:                //tweet - string                 //ts - int (Stores epoch timestamp of the tweet)                //username - string                //Sample Key: dash:1            //Sample Record:                //{ tweet: 'Put. A. Bird. On. It.',                //  ts: 1408574221,                //  username: 'dash'                //}        ///*********************///                Record userRecord = null;        Key userKey = null;        Key tweetKey = null;
        // Get username        String username;        console.printf("\nEnter username:");        username = console.readLine();
        if (username != null && username.length() > 0) {            // Check if username exists            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null) {                int nextTweetCount = Integer.parseInt(userRecord.getValue(                        "tweetcount").toString()) + 1;
                // Get tweet                String tweet;                console.printf("Enter tweet for " + username + ":");                tweet = console.readLine();
                // Write record                WritePolicy wPolicy = new WritePolicy();                wPolicy.sendKey = true;                wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
                // Create timestamp to store along with the tweet so we can                // query, index and report on it                long ts = getTimeStamp();
                tweetKey = new Key("test", "tweets", username + ":"                        + nextTweetCount);                Bin bin1 = new Bin("tweet", tweet);                Bin bin2 = new Bin("ts", ts);                Bin bin3 = new Bin("username", username);
                client.put(wPolicy, tweetKey, bin1, bin2, bin3);                console.printf("\nINFO: Tweet record created!\n");
                // Update tweet count and last tweet'd timestamp in the user                // record                updateUser(client, userKey, wPolicy, ts, nextTweetCount);            } else {                console.printf("ERROR: User record not found!\n");            }        }    } //createTweet        public void queryTweets() throws AerospikeException {        queryTweetsByUsername();        queryUsersByTweetCount();    } //queryTweets
    public void queryTweetsByUsername() throws AerospikeException {                console.printf("\n********** Query Tweets By Username **********\n");                RecordSet rs = null;        try {
        // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.         // NOTE: The recommended way of creating indexes in production env is via AQL.            IndexTask task = client.createIndex(null, "test", "tweets",                    "username_index", "username", IndexType.STRING);            task.waitTillComplete(100);
            // Get username            String username;            console.printf("\nEnter username:");            username = console.readLine();
            if (username != null && username.length() > 0) {                String[] bins = { "tweet" };                Statement stmt = new Statement();                stmt.setNamespace("test");                stmt.setSetName("tweets");                stmt.setIndexName("username_index");                stmt.setBinNames(bins);                stmt.setFilter(Filter.equal("username", username));
                console.printf("\nHere's " + username + "'s tweet(s):\n");
                rs = client.query(null, stmt);                while (rs.next()) {                    Record r = rs.getRecord();                    console.printf(r.getValue("tweet").toString() + "\n");                }            } else {                console.printf("ERROR: User record not found!\n");            }        } finally {            if (rs != null) {                // Close record set                rs.close();            }        }    } //queryTweetsByUsername
    public void queryUsersByTweetCount() throws AerospikeException {
        console.printf("\n********** Query Users By Tweet Count Range **********\n");
        RecordSet rs = null;        try {
        // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.         // NOTE: The recommended way of creating indexes in production env is via AQL.            IndexTask task = client.createIndex(null, "test", "users",                    "tweetcount_index", "tweetcount", IndexType.NUMERIC);            task.waitTillComplete(100);
            // Get min and max tweet counts            int min;            int max;            console.printf("\nEnter Min Tweet Count:");            min = Integer.parseInt(console.readLine());            console.printf("Enter Max Tweet Count:");            max = Integer.parseInt(console.readLine());
            console.printf("\nList of users with " + min + "-" + max                    + " tweets:\n");
            String[] bins = { "username", "tweetcount", "gender" };            Statement stmt = new Statement();            stmt.setNamespace("test");            stmt.setSetName("users");            stmt.setBinNames(bins);            stmt.setFilter(Filter.range("tweetcount", min, max));
            rs = client.query(null, stmt);            while (rs.next()) {                Record r = rs.getRecord();                console.printf(r.getValue("username") + " has "                        + r.getValue("tweetcount") + " tweets\n");            }        } finally {            if (rs != null) {                // Close record set                rs.close();            }        }    } //queryUsersByTweetCount        public void scanSomeTweetsForSomeUsers() {        try {            // Java Scan            ScanPolicy policy = new ScanPolicy();            policy.concurrentNodes = true;            policy.priority = Priority.LOW;            policy.includeBinData = true;            policy.maxRecords = 100;            policy.sendKey = true;            client.scanAll(policy, "test", "tweets", new ScanCallback() {
                @Override                public void scanCallback(Key key, Record record)                        throws AerospikeException {                    console.printf(key.toString() + " => ");                                        console.printf(record.getValue("username") + " ");                    console.printf(record.getValue("tweet") + "\n");                    
                }            }, "tweet");        } catch (AerospikeException e) {            System.out.println("EXCEPTION - Message: " + e.getMessage());            System.out.println("EXCEPTION - StackTrace: "                    + UtilityService.printStackTrace(e));        }    } //scanSomeTweetsForSomeUsers
    private void updateUser(AerospikeClient client, Key userKey,            WritePolicy policy, long ts, int tweetCount) throws AerospikeException, InterruptedException {
        client.put(policy, userKey, new Bin("tweetcount", tweetCount), new Bin("lasttweeted", ts));        console.printf("\nINFO: The tweet count now is: " + tweetCount);    } //updateUser
    @SuppressWarnings("unused")    private void updateUserUsingOperate(AerospikeClient client, Key userKey,            WritePolicy policy, long ts) throws AerospikeException {                Record record = client.operate(policy, userKey,                Operation.add(new Bin("tweetcount", 1)),                Operation.put(new Bin("lasttweeted", ts)),                Operation.get());                console.printf("\nINFO: The tweet count now is: " + record.getValue("tweetcount"));    } //updateUserUsingOperate
    public void createTweets() throws AerospikeException {        String[] randomTweets = {                "For just $1 you get a half price download of half of the song and listen to it just once.",                "People tell me my body looks like a melted candle",                "Come on movie! Make it start!", "Byaaaayy",                "Please, please, win! Meow, meow, meow!",                "Put. A. Bird. On. It.",                "A weekend wasted is a weekend well spent",                "Would you like to super spike your meal?",                "We have a mean no-no-bring-bag up here on aisle two.",                "SEEK: See, Every, EVERY, Kind... of spot",                "We can order that for you. It will take a year to get there.",                "If you are pregnant, have a soda.",                "Hear that snap? Hear that clap?",                "Follow me and I may follow you",                "Which is the best cafe in Portland? Discuss...",                "Portland Coffee is for closers!",                "Lets get this party started!",                "How about them portland blazers!", "You got school'd, yo",                "I love animals", "I love my dog", "What's up Portland",                "Which is the best cafe in Portland? Discuss...",                "I dont always tweet, but when I do it is on Tweetaspike" };        Random rnd1 = new Random();        Random rnd2 = new Random();        Random rnd3 = new Random();        Key userKey;        Record userRecord;        int totalUsers = 10000;        int maxTweets = 20;        String username;        long ts = 0;                WritePolicy wPolicy = new WritePolicy();        wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
        console.printf("\nCreate up to " + maxTweets + " tweets each for "                + totalUsers + " users.\n");       // console.readLine();
        for (int j = 0; j < totalUsers; j++) {            // Check if user record exists            username = "user" + rnd3.nextInt(100000);            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null) {                // create up to maxTweets random tweets for this user                int totalTweets = rnd1.nextInt(maxTweets);                for (int k = 1; k <= totalTweets; k++) {                    // Create timestamp to store along with the tweet so we can                    // query, index and report on it                    ts = getTimeStamp();                    Key tweetKey = new Key("test", "tweets", username + ":" + k);                    Bin bin1 = new Bin("tweet",                            randomTweets[rnd2.nextInt(randomTweets.length)]);                    Bin bin2 = new Bin("ts", ts);                    Bin bin3 = new Bin("username", username);
                    client.put(wPolicy, tweetKey, bin1, bin2, bin3);                }                if (totalTweets > 0) {                    client.put(wPolicy, userKey, new Bin("tweetcount", totalTweets), new Bin("lasttweeted", ts));                }            }        }        console.printf("\n\nDone creating up to " + maxTweets                + " tweets each for " + totalUsers + " users!\n");    } //createTweets
    private long getTimeStamp() {        return System.currentTimeMillis();    } //getTimeStamp
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/

import java.io.File;import java.util.ArrayList;import java.util.Arrays;import java.util.Map;import java.util.Random;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.Language;import com.aerospike.client.Record;import com.aerospike.client.Value;import com.aerospike.client.lua.LuaConfig;import com.aerospike.client.policy.GenerationPolicy;import com.aerospike.client.policy.BatchPolicy;import com.aerospike.client.policy.RecordExistsAction;import com.aerospike.client.policy.WritePolicy;import com.aerospike.client.query.Filter;import com.aerospike.client.query.ResultSet;import com.aerospike.client.query.Statement;import com.aerospike.client.task.RegisterTask;
public class UserService {    private AerospikeClient client;    private EclipseConsole console = new EclipseConsole();
    public UserService(AerospikeClient client) {        this.client = client;    }
    public void createUser() throws AerospikeException {        console.printf("\n********** Create User **********\n");
        ///*********************///        ///*****Data Model*****///        //Namespace: test        //Set: users        //Key: <username>        //Bins:        //username - String        //password - String (For simplicity password is stored in plain-text)        //gender - String (Valid values are 'm' or 'f')        //region - String (Valid values are: 'n' (North), 's' (South), 'e' (East), 'w' (West) -- to keep data entry to minimal we just store the first letter)        //lasttweeted - int (Stores epoch timestamp of the last/most recent tweet) -- Default to 0        //tweetcount - int (Stores total number of tweets for the user) -- Default to 0        //interests - Array of interests
        //Sample Key: dash        //Sample Record:        //{ username: 'dash',        //  password: 'dash',        //  gender: 'm',        //  region: 'w',        //  lasttweeted: 1408574221,        //  tweetcount: 20,        //  interests: ['photography', 'technology', 'dancing', 'house music]        //}        ///*********************///
        String username;        String password;        String gender;        String region;        String interests;
        // Get username        console.printf("Enter username: ");        username = console.readLine();
        if (username != null && username.length() > 0) {            // Get password            console.printf("Enter password for " + username + ":");            password = console.readLine();
            // Get gender            console.printf("Select gender (f or m) for " + username + ":");            gender = console.readLine().substring(0, 1);
            // Get region            console.printf("Select region (north, south, east or west) for "                    + username + ":");            region = console.readLine().substring(0, 1);
            // Get interests            console.printf("Enter comma-separated interests for " + username + ":");            interests = console.readLine();
            // Write record            WritePolicy wPolicy = new WritePolicy();            wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
            Key key = new Key("test", "users", username);            Bin bin1 = new Bin("username", username);            Bin bin2 = new Bin("password", password);            Bin bin3 = new Bin("gender", gender);            Bin bin4 = new Bin("region", region);            Bin bin5 = new Bin("lasttweeted", 0);            Bin bin6 = new Bin("tweetcount", 0);            Bin bin7 = new Bin("interests", Arrays.asList(interests.split(",")));
            client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);
            console.printf("\nINFO: User record created!");        }    } //createUser
    public void getUser() throws AerospikeException {        Record userRecord = null;        Key userKey = null;
        // Get username        String username;        console.printf("\nEnter username:");        username = console.readLine();
        if (username != null && username.length() > 0) {            // Check if username exists            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null) {                console.printf("\nINFO: User record read successfully! Here are the details:\n");                console.printf("username:   " + userRecord.getValue("username")                        + "\n");                console.printf("password:   " + userRecord.getValue("password")                        + "\n");                console.printf("gender:     " + userRecord.getValue("gender") + "\n");                console.printf("region:     " + userRecord.getValue("region") + "\n");                console.printf("tweetcount: " + userRecord.getValue("tweetcount") + "\n");                console.printf("interests:  " + userRecord.getValue("interests") + "\n");            } else {                console.printf("ERROR: User record not found!\n");            }        } else {            console.printf("ERROR: User record not found!\n");        }    } //getUser
    public void updatePasswordUsingUDF() throws AerospikeException    {        Record userRecord = null;        Key userKey = null;
        // Get username        String username;        console.printf("\nEnter username:");        username = console.readLine();
        if (username != null && username.length() > 0)        {            // Check if username exists            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null)            {                // Get new password                String password;                console.printf("Enter new password for " + username + ":");                password = console.readLine();
                // NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL                LuaConfig.SourceDirectory = "udf";                File udfFile = new File("udf/updateUserPwd.lua");
                RegisterTask rt = client.register(null, udfFile.getPath(),                        udfFile.getName(), Language.LUA);                rt.waitTillComplete(100);
                String updatedPassword = client.execute(null, userKey, "updateUserPwd", "updatePassword", Value.get(password)).toString();                console.printf("\nINFO: The password has been set to: " + updatedPassword);            }            else            {                console.printf("ERROR: User record not found!");            }        }        else        {            console.printf("ERROR: User record not found!");        }    } //updatePasswordUsingUDF
    public void updatePasswordUsingCAS() throws AerospikeException    {        Record userRecord = null;        Key userKey = null;        Bin passwordBin = null;
        // Get username        String username;        console.printf("\nEnter username:");        username = console.readLine();
        if (username != null && username.length() > 0)        {            // Check if username exists            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null)            {                // Get new password                String password;                console.printf("Enter new password for " + username + ":");                password = console.readLine();
                WritePolicy writePolicy = new WritePolicy();                // record generation                writePolicy.generation = userRecord.generation;                writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;                // password Bin                passwordBin = new Bin("password", Value.get(password));                client.put(writePolicy, userKey, passwordBin);
                console.printf("\nINFO: The password has been set to: " + password);            }            else            {                console.printf("ERROR: User record not found!");            }        }        else        {            console.printf("ERROR: User record not found!");        }    } //updatePasswordUsingCAS
    public void batchGetUserTweets() throws AerospikeException {
        Record userRecord = null;        Key userKey = null;
        // Get username        String username;        console.printf("\nEnter username:");        username = console.readLine();
        if (username != null && username.length() > 0) {            // Check if username exists            userKey = new Key("test", "users", username);            userRecord = client.get(null, userKey);            if (userRecord != null) {                // Get how many tweets the user has                int tweetCount = userRecord.getInt("tweetcount");

                // Create an array of keys so we can initiate batch read                // operation                Key[] keys = new Key[tweetCount];                for (int i = 0; i < keys.length; i++) {                    keys[i] = new Key("test", "tweets",                            (username + ":" + (i + 1)));                }
                console.printf("\nHere's " + username + "'s tweet(s):\n");
                // Initiate batch read operation                if (keys.length > 0){                    Record[] records = client.get(new BatchPolicy(), keys);                    for (int j = 0; j < records.length; j++) {                        console.printf(records[j].getValue("tweet").toString() + "\n");                    }                }            }        } else {            console.printf("ERROR: User record not found!\n");        }    } //batchGetUserTweets
    @SuppressWarnings("unchecked")    public void aggregateUsersByTweetCountByRegion() throws AerospikeException,    InterruptedException {        ResultSet rs = null;        try {            int min;            int max;            console.printf("\nEnter Min Tweet Count:");            min = Integer.parseInt(console.readLine());            console.printf("Enter Max Tweet Count:");            max = Integer.parseInt(console.readLine());
            console.printf("\nAggregating users with " + min + "-"                    + max + " tweets by region. Hang on...\n");
            // NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL            LuaConfig.SourceDirectory = "udf";            File udfFile = new File("udf/aggregationByRegion.lua");
            RegisterTask rt = client.register(null, udfFile.getPath(),                    udfFile.getName(), Language.LUA);            rt.waitTillComplete(100);
            String[] bins = { "tweetcount", "region" };            Statement stmt = new Statement();            stmt.setNamespace("test");            stmt.setSetName("users");            stmt.setIndexName("tweetcount_index");            stmt.setBinNames(bins);            stmt.setFilter(Filter.range("tweetcount", min, max));
            rs = client.queryAggregate(null, stmt, "aggregationByRegion", "sum");
            if (rs.next()) {                Map<Object, Object> result = (Map<Object, Object>) rs                        .getObject();                console.printf("\nTotal Users in North: " + result.get("n") + "\n");                console.printf("Total Users in South: " + result.get("s") + "\n");                console.printf("Total Users in East: " + result.get("e") + "\n");                console.printf("Total Users in West: " + result.get("w") + "\n");            }        } finally {            if (rs != null) {                // Close result set                rs.close();            }        }    } //aggregateUsersByTweetCountByRegion
    public void createUsers() throws AerospikeException {        String[] genders = { "m", "f" };        String[] regions = { "n", "s", "e", "w" };        String[] randomInterests = { "Music", "Football", "Soccer", "Baseball", "Basketball", "Hockey", "Weekend Warrior", "Hiking", "Camping", "Travel", "Photography"};        String username;        ArrayList<Object> userInterests = null;        int totalInterests = 0;        int start = 1;        int end = 100000;        int totalUsers = end - start;        Random rnd1 = new Random();        Random rnd2 = new Random();        Random rnd3 = new Random();
        WritePolicy wPolicy = new WritePolicy();        wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
        console.printf("\nCreate " + totalUsers                + " users.\n");       // console.readLine();
        for (int j = start; j <= end; j++) {            // Write user record            username = "user" + j;            Key key = new Key("test", "users", username);            Bin bin1 = new Bin("username", "user" + j);            Bin bin2 = new Bin("password", "pwd" + j);            Bin bin3 = new Bin("gender", genders[rnd1.nextInt(2)]);            Bin bin4 = new Bin("region", regions[rnd2.nextInt(4)]);            Bin bin5 = new Bin("lasttweeted", 0);            Bin bin6 = new Bin("tweetcount", 0);
            totalInterests = rnd3.nextInt(7);            userInterests = new ArrayList<Object>();            for(int i = 0; i < totalInterests; i++) {                userInterests.add(randomInterests[rnd3.nextInt(randomInterests.length)]);            }            Bin bin7 = new Bin("interests", userInterests);
            client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);            //console.printf("Wrote user record for " + username + "\n");        }        console.printf("\nDone creating " + totalUsers + "!\n");    } // createUsers
}
import java.io.PrintWriter;import java.io.StringWriter;import java.util.List;
import org.apache.commons.cli.CommandLine;import org.apache.commons.cli.CommandLineParser;import org.apache.commons.cli.HelpFormatter;import org.apache.commons.cli.Options;import org.apache.commons.cli.PosixParser;import org.apache.log4j.Logger;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Host;import com.aerospike.client.Key;import com.aerospike.client.Operation;import com.aerospike.client.Record;import com.aerospike.client.policy.ClientPolicy;import com.aerospike.client.policy.GenerationPolicy;import com.aerospike.client.policy.Policy;import com.aerospike.client.policy.WritePolicy;
/** * @author Dash Desai */public class Program {    private AerospikeClient client;    private String seedHost;    private int port;    private String namespace;    private String set;    private WritePolicy writePolicy;    private Policy policy;    private EclipseConsole console = new EclipseConsole();
    private static Logger log = Logger.getLogger(Program.class);
    public Program(String host, int port, String namespace, String set)            throws AerospikeException {        this.seedHost = host;        this.port = port;        this.namespace = namespace;        this.set = set;        this.writePolicy = new WritePolicy();        this.writePolicy.totalTimeout = 100;        this.policy = new Policy();        this.policy.totalTimeout = 100;        // Establish a connection to Aerospike cluster        ClientPolicy cPolicy = new ClientPolicy();        cPolicy.timeout = 500;        this.client = new AerospikeClient(cPolicy, this.seedHost, this.port);    }
    protected void finalize() throws Throwable {        if (this.client != null){            this.client.close();        }    };    public static void engage(String[] args) throws AerospikeException {        try {            Options options = new Options();            options.addOption("h", "host", true,                    "Server hostname (default: 12)");            options.addOption("p", "port", true, "Server port (default: 3000)");            options.addOption("n", "namespace", true,                    "Namespace (default: test)");            options.addOption("s", "set", true, "Set (default: demo)");            options.addOption("u", "usage", false, "Print usage.");
            CommandLineParser parser = new PosixParser();            CommandLine cl = parser.parse(options, args, false);
            String host = cl.getOptionValue("h", "127.0.0.1");            String portString = cl.getOptionValue("p", "3000");            int port = Integer.parseInt(portString);            String namespace = cl.getOptionValue("n", "test");            String set = cl.getOptionValue("s", "demo");            log.debug("Host: " + host);            log.debug("Port: " + port);            log.debug("Namespace: " + namespace);            log.debug("Set: " + set);
            @SuppressWarnings("unchecked")            List<String> cmds = cl.getArgList();            if (cmds.size() == 0 && cl.hasOption("u")) {                logUsage(options);                return;            }
            Program as = new Program(host, port, namespace, set);
            as.work();
        } catch (Exception e) {            log.error("Critical error", e);        }    }
    /**     * Write usage to console.     */    private static void logUsage(Options options) {        HelpFormatter formatter = new HelpFormatter();        StringWriter sw = new StringWriter();        PrintWriter pw = new PrintWriter(sw);        String syntax = Program.class.getName() + " [<options>]";        formatter.printHelp(pw, 100, syntax, "options:", options, 0, 2, null);        log.info(sw.toString());    }
    public void work() throws Exception {        try {            console.printf("INFO: Connecting to Aerospike cluster...");
            // Establish connection to Aerospike server
            if (client == null || !client.isConnected()) {                console.printf("\nERROR: Connection to Aerospike cluster failed! Please check the server settings and try again!");                console.readLine();            } else {                console.printf("\nINFO: Connection to Aerospike cluster succeeded!\n");        while (true) {
                // Create instance of UserService                UserService us = new UserService(client);                // Create instance of TweetService                TweetService ts = new TweetService(client);                // Create instance of UtilityService                UtilityService util = new UtilityService(client);
                // Present options                console.printf("\nWhat would you like to do:\n");                console.printf("1> Create A User And A Tweet\n");                console.printf("2> Read A User Record\n");                console.printf("3> Batch Read Tweets For A User\n");                console.printf("4> Scan Sample of Tweets \n");                console.printf("5> Query Tweets By Username And Users By Tweet Count Range\n");                console.printf("6> Create canned users\n");                console.printf("7> Create canned tweets\n");                console.printf("0> Exit\n");                console.printf("\nSelect and hit enter:\n");                int feature = Integer.parseInt(console.readLine());
                if (feature != 0) {                    switch (feature) {                    case 1:                        console.printf("\n********** Your Selection: Create User And A Tweet **********\n");                        us.createUser();                        ts.createTweet();                        break;                    case 2:                        console.printf("\n********** Your Selection: Read A User Record **********\n");                        us.getUser();                        break;                    case 3:                        console.printf("\n********** Your Selection: Batch Read Tweets For A User **********\n");                        us.batchGetUserTweets();                        break;                    case 4:                        console.printf("\n********** Your Selection: Sample Tweets **********\n");                         ts.scanSomeTweetsForSomeUsers();                        break;                    case 5:                        console.printf("\n********** Your Selection: Query Tweets By Username And Users By Tweet Count Range **********\n");                        ts.queryTweets();                        break;                    case 6:                        console.printf("\n********** Create Users **********\n");                        us.createUsers();                        break;                    case 7:                        console.printf("\n********** Create Tweets **********\n");                        ts.createTweets();                        break;                    default:                        break;                    }                }            }            }        } catch (AerospikeException e) {            console.printf("AerospikeException - Message: " + e.getMessage()                    + "\n");            console.printf("AerospikeException - StackTrace: "                    + UtilityService.printStackTrace(e) + "\n");        } catch (Exception e) {            console.printf("Exception - Message: " + e.getMessage() + "\n");            console.printf("Exception - StackTrace: "                    + UtilityService.printStackTrace(e) + "\n");        } finally {            if (client != null && client.isConnected()) {                // Close Aerospike server connection                client.close();            }            console.printf("\n\nINFO: Press any key to exit...\n");            console.readLine();        }        }
    /*     * example method calls     */    public Record readPartial(String userName) throws AerospikeException {        // Java read specific bins        Key key = new Key("test", "users", userName);        Record record = this.client.get(null, key, "username", "password",                "gender", "region");        return record;    }
    public Record readMeta(String userName) throws AerospikeException {        // Java get meta data        Key key = new Key("test", "users", userName);        Record record = this.client.getHeader(null, key);        return record;    }
    public void write(String username, String password)            throws AerospikeException {        // Java read-modify-write        WritePolicy wPolicy = new WritePolicy();        wPolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
        Key key = new Key("test", "users", username);        Bin bin1 = new Bin("username", username);        Bin bin2 = new Bin("password", password);
        client.put(wPolicy, key, bin1, bin2);    }
    public void delete(String username) throws AerospikeException {        // Java Delete record.        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        client.delete(wPolicy, key);    }
    public boolean exisis(String username) throws AerospikeException {        // Java exists        Key key = new Key("test", "users", username);        boolean itsHere = client.exists(policy, key);        return itsHere;    }
    public void add(String username) throws AerospikeException {        // Java add        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        Bin counter = new Bin("tweetcount", 1);        client.add(wPolicy, key, counter);    }
    public void touch(String username) throws AerospikeException {        // Java touch        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        client.touch(wPolicy, key);    }
    public void append(String username) throws AerospikeException {        // Java append        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        Bin bin2 = new Bin("interests", "cats");        client.append(wPolicy, key, bin2);    }
    public void operate(String username) throws AerospikeException {        // Java operate        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        client.operate(wPolicy, key, Operation.put(new Bin("tweetcount", 153)),                Operation.put(new Bin("lasttweeted", 1406755079L)));
    }
    @SuppressWarnings("unused")    public void batch(String username) throws AerospikeException {        // Java batch        // Create an array of keys so we can initiate batch read operation        Key[] keys = new Key[27];        for (int i = 0; i < keys.length; i++) {            keys[i] = new Key("test", "tweets", (username + ":" + (i + 1)));        }
        // Initiate batch read operation        Record[] records = client.get(null, keys);
    }
    @SuppressWarnings({ "unused", "resource" })    public void multipleSeedNodes() throws AerospikeException {        Host[] hosts = new Host[] { new Host("a.host", 3000),                new Host("another.host", 3000),                new Host("and.another.host", 3000) };        AerospikeClient client = new AerospikeClient(new ClientPolicy(), hosts);
    }    @SuppressWarnings({ "unused", "resource" })    public void connectWithClientPolicy() throws AerospikeException {        // Java connection with Client policy        ClientPolicy clientPolicy = new ClientPolicy();//        clientPolicy.maxThreads = 200; //200 threads        clientPolicy.maxSocketIdle = 3; // 3 seconds        AerospikeClient client = new AerospikeClient(clientPolicy, "a.host", 3000);
    }
    public void deleteBin(String username) throws AerospikeException{        // Java delete a bin        WritePolicy wPolicy = new WritePolicy();        Key key = new Key("test", "users", username);        Bin bin1 = Bin.asNull("shoe-size"); // Set bin value to null to drop bin.        client.put(wPolicy, key, bin1);    }
}
Program.engage(null);