Lua UDF – Word Count
This example walks through a variant of the algorithm published in Google's MapReduce paper, which counts the number of times each word appears in a document.
Each record will represent a page of a book, and will be composed of the following bins:
- book_id – the unique identifier for the book in which this page belongs
- page_id – the unique identifier for the page.
- page_no – the page number
- body – the body of the page.
We will assume the following secondary indexes exist:
- A NUMERIC secondary index on the book_id bin – This will allow us to query on the book identifier, so we can get the count of words for a book.
- A NUMERIC secondary index on the page_id bin – This will allow us to query on the page identifier, so we can get the count of words for a specific page.
We will create a Stream UDF that will:
- Use the
aggregate()operation to map words to counts for each page.
- Use the
reduce()operation to combine the maps generated from
aggregate()into a single map.
The following is a Stream UDF named
return s : aggregate(map(), page_words) : reduce(sum_words)
word_count() function applies the
reduce() operation to stream
First, we will utilize the
aggregate() operation to build a mapping of words to number of instances of the word in a single page.
In this program, we will use an empty map as the initial value and
page_word() as the function for the
local function page_words(words, rec)
-- read the 'body' bin
local body = rec['body']
-- iterate over each word in contents
for word in string.gmatch(body, "%a+") do
-- if words[word] is not nil, then use words[word], otherwise use 0 (zero)
local count = words[word] or 0
-- add 1 to the previous count
words[word] = count + 1
You will notice that words is used as both the first argument and return value for
page_words(). We are essentially collecting a mapping of words to counts, which we will call words map. The second argument is a record from the database. We know it is a record because it is the first operation on the stream, which is a stream of records.
The result of the
aggregate() operation will be a words map. If you have multiple nodes in a cluster, then you will have multiple words maps. In most instances, each node may have multiple words maps, because aggregations will be performed on partitions of data.
Next, we will use the
reduce() operation to merge multiple words maps into single words map.
In the is program, we will use
sum_words() as the function for the
local function sum_words(word1, words2)
-- use map.merge() to merge two maps
-- if the same name exists in both maps, then use the function to merge the values.
return map.merge(word1, words2, math.sum)
We combine the two maps using
map.merge(), which takes two maps and function. The function is used to resolve conflicts between two (name,value) pair in a map. In this example, we use
math.sum(), which will add the two values.