|
From Data Science at Scale with Python and Dask by Jesse C. Daniel This article delves into building linear models using Dask-ML. |
Get Data Science at Scale with Python and Dask for 37% off normal price by entering code fccdaniel into the discount code box at checkout at manning.com.
To make the most of this article we will be using the “Amazon Fine Foods Reviews” dataset, train a sentiment classifier model with Dask ML that can interpret whether a review is positive or negative without knowing the review score. The dataset can be found here.
Before we jump straight into model building, we’ll need to take care of a few things:
- We need to tag the reviews as positive or negative
- We need to convert the data into a format that our machine learning model understands
- Finally, we’ll need to set aside a small chunk of the data for testing and validating the accuracy of our model
First, the reviews need to be tagged as either positive or negative. You should do this based on a review score that the reviewer provides. If the review received 3 stars or higher, tag the review as positive. If the review received 2 stars or less, tag the review as negative. To recap, here’s code which you can use to do this:
Listing 1. Tagging the review data based on the review score
>>> import dask.bag as bag >>> import os >>> from dask.diagnostics import ProgressBar >>> os.chdir('/Users/jesse/Documents') ❶ >>> raw_data = bag.read_text('foods.txt') >>> def get_next_part(file, start_index, span_index=0, blocksize=1024): ❷ >>> file.seek(start_index) >>> buffer = file.read(blocksize + span_index).decode('cp1252') >>> delimiter_position = buffer.find('\n\n') >>> if delimiter_position == -1: >>> return get_next_part(file, start_index, span_index + blocksize) >>> else: >>> file.seek(start_index) >>> return start_index, delimiter_position >>> def get_item(filename, start_index, delimiter_position, encoding='cp1252'): ❸ >>> with open(filename, 'rb') as file_handle: >>> file_handle.seek(start_index) >>> text = file_handle.read(delimiter_position).decode(encoding) >>> elements = text.strip().split('\n') >>> key_value_pairs = [(element.split(': ')[0], element.split(': ')[1]) >>> if len(element.split(': ')) > 1 >>> else ('unknown', element) >>> for element in elements] >>> return dict(key_value_pairs) >>> with open('foods.txt', 'rb') as file_handle: ❹ >>> size = file_handle.seek(0,2) - 1 >>> more_data = True >>> output = [] >>> current_position = next_position = 0 >>> while more_data: >>> if current_position >= size: >>> more_data = False >>> else: >>> current_position, next_position = get_next_part(file_handle, current_position, 0) >>> output.append((current_position, next_position)) >>> current_position = current_position + next_position + 2 >>> reviews = bag.from_sequence(output).map(lambda x: get_item('foods.txt', x[0], x[1])) ❺ >>> def tag_positive_negative_by_score(element): ❻ >>> if float(element['review/score']) > 3: >>> element['review/sentiment'] = 'positive' >>> else: >>> element['review/sentiment'] = 'negative' >>> return element >>> tagged_reviews = reviews.map(tag_positive_negative_by_score)
❶ Open the raw text file for processing
❷ Helper function to find the next review based on the current byte position in the file handle
❸ Helper function to read the data and parse at a given byte position
❹ Create a list of byte ranges for each complete review object
❺ Turn the bag of byte ranges to a bag of review objects
❻ Tag each review as positive or negative using the review score
Now that you’ve tagged the reviews, we need to turn the review text into a format that the machine learning algorithm can understand. Humans intuitively understand that if someone says a product is “great”, that person likely has a positive sentiment towards the product. Computers, on the other hand, don’t generally share the same grasp of language that humans do—a computer doesn’t intrinsically understand what “great” means or how it translates to sentiments about a product. Think about it, if a person says a product is “great,” they probably feel positively towards the product. This is a pattern we can search for in our data. Were reviews which used the word “great” more likely to be positive than reviews that didn’t? If this is true, we could state that the presence of the word “great” in a review makes it more likely to be positive. This is the whole idea behind one common way to transform text data to a machine understandable format called binary vectorization. Using binary vectorization, we take a corpus, or a unique list of all words that show up in our review data, and generate a vector of 1s and 0s, where a 1 indicates the presence of a word and a 0 indicates the absence of a word.
Figure 1. An example of binary vectorization
In Figure 1, you can see that the words which appear in the raw text, such as “lots” and “fun,” are assigned a 1 in the binary vector, whereas words that don’t appear in the raw text (but appear in other text samples) are marked with a 0. Once the text has been transformed with binary vectorization, we can use any of the standard classification algorithms like logistic regression to find correlations between the presence of words and the sentiment. This, in turn, helps us build a model to classify reviews as positive or negative where we don’t have the review score. Let’s take a look at how to transform our raw reviews into an array of binary vectors.
First, apply some these transformations to tokenize the text and remove stopwords (Note: make sure you have NLTK set up correctly):
Listing 2. Tokenize text and remove stopwords
>>> from nltk.corpus import stopwords >>> from nltk.tokenize import RegexpTokenizer >>> from functools import partial >>> tokenizer = RegexpTokenizer(r'\w+') >>> def extract_reviews(element): ❶ >>> element['review/tokens'] = element['review/text'].lower() >>> return element >>> def tokenize_reviews(element): ❷ >>> element['review/tokens'] = tokenizer.tokenize(element['review/tokens']) >>> return element >>> def filter_stopword(word, stopwords): ❸ >>> return word not in stopwords >>> def filter_stopwords(element, stopwords): ❹ >>> element['review/tokens'] = list(filter(partial(filter_stopword, stopwords=stopwords), element['review/tokens'])) >>> return element >>> stopword_set = set(stopwords.words('english')) >>> more_stopwords = {'br', 'amazon', 'com', 'http', 'www', 'href', 'gp'} ❺ >>> all_stopwords = stopword_set.union(more_stopwords) ❻ >>> review_extracted_text = tagged_reviews.map(extract_reviews) >>> review_tokens = review_extracted_text.map(tokenize_reviews) >>> review_text_clean = review_tokens.map(partial(filter_stopwords, stopwords=all_stopwords))
❶ Helper function to isolate the review text from each review and change all letters to lowercase
❷ Helper function to break long strings of text into individual words (tokens) using the NLTK tokenizer
❸ Filter function to check if a token is in the list of stopwords
❹ Helper function to drop all stopwords from each set of review tokens
❺ Add a few more stopwords to the base stopword collection
❻ Map the helper functions to the data, producing a cleaned set of tokens for each review
With the cleaned and tokenized review data, let’s get a quick count of the number of unique words that show up in the reviews. To do this, we’ll use a few built-in functions from the Bag API:
Listing 3. Count the unique words in the Amazon Fine Foods review set
>>> def extract_tokens(element): ❶ >>> return element['review/tokens'] >>> extracted_tokens = review_text_clean.map(extract_tokens) >>> unique_tokens = extracted_tokens.flatten().distinct() ❷ >>> with ProgressBar(): >>> number_of_tokens = unique_tokens.count().compute() ❸ >>> number_of_tokens # Produces the following output: # 114290
❶ Isolate the review tokens from each review
❷ Flatten the data so we have a single list of all non-unique tokens, then uniqueify the list with distinct
❸ Count the number of unique tokens
This code should look mostly familiar. The only thing noteworthy is the extracted tokens must be flattened in order to get a distinct list of all words. Because the extract_tokens
function returns a list of lists of strings, we need to use flatten to concatenate all the inner lists together before applying distinct
. According to our code, there are 114,290 unique words that appear in our 568,454 reviews. This means the array we’d produce with binary vectorization would be 568,454 rows by 114,290 columns or 64.9 billion ones and zeros. At one byte per Boolean value, by way of NumPy’s data sizes, this is ~64GB of data. Although Dask is certainly up to the task of dealing with large arrays, we’ll scale down the exercise a bit to make it easier to run this solution quickly. Instead of using the entire corpus of 114,290 unique words, we’ll use a corpus of the top 100 most frequently used words in the review dataset. If you’d like to use a larger or smaller corpus, you can modify the code to use the top 1000 or top 10 words instead. You can also modify the code to use the entire corpus if you’d like. All the code works regardless of the size of the corpus you select. Let’s take a look at how to get the top 100 most common words in our corpus:
Listing 4. Find the top 100 most common words in the reviews dataset
>>> def count(accumulator, element): ❶ >>> return accumulator + 1 >>> def combine(total_1, total_2): ❷ >>> return total_1 + total_2 >>> with ProgressBar(): ❸ >>> token_counts = extracted_tokens.flatten().foldby(lambda x: x, count, 0, combine, 0).compute() >>> top_tokens = sorted(token_counts, key=lambda x: x[1], reverse=True) ❹ >>> top_100_tokens = list(map(lambda x: x[0], top_tokens[:100])) ❺
❶ Helper function to add 1 to the counter for each instance of a given word found in the corpus
❷ Helper function to combine results for the same word across partitions
❸ Grouping the data by word and counting the occurrences using folding
❹ Sort the results descending by count
❺ Slice the list of words to the first 100 records, then produce a list of the words isolated from the counts
Use the count
and combine
functions to count the occurrences of each word in the corpus. The result of the fold gives us a list of tuples where element 0 of each tuple is the word and element 1 of each tuple is the count of occurrences. Using Python’s built-in sorted
method, sort along element 1 of each tuple (the frequency counts) to return a list of tuples sorted in descending order. Finally, use the map
function to peel the words out of the sorted tuples to return a list of the top 100 most commonly used words. Now that we have our final corpus, we can apply binary vectorization across the review tokens. Do this by searching each review to see if it contains the words in the corpus:
Listing 5. Generate training data by applying binary vectorization
>>> import numpy as np >>> def vectorize_tokens(element): ❶ >>> vectorized_tokens = np.where(np.isin(top_100_tokens, element['review/tokens']), 1, 0) >>> element['review/token_vector'] = vectorized_tokens >>> return element >>> def prep_model_data(element): ❷ >>> return {'target': 1 if element['review/sentiment'] == 'positive' else 0, 'features': element['review/token_vector']} >>> model_data = review_text_clean.map(vectorize_tokens).map(prep_model_data) ❸ >>> model_data.take(5) ''' Produces the following output: ({'target': 1, 'features': array([1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0])}, ... {'target': 1, 'features': array([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])}) '''
❶ Compare the corpus to each review’s list of tokens using np.where; return 1 if the word exists in the list of tokens and 0 if not
❷ Change the positive/negative sentiment tag into a binary value; 1 represents positive and 0 represents negative
❸ Map both functions on the data to produce a dictionary with the target and feature vector for each review
The code in Listing 5 shows another good example of how we can mix other libraries like NumPy into Dask. Here, we use the where
function in NumPy to compare the list of words from the corpus to the list of tokens for each review. This results in a vector of 100 ones and zeros for each review as you can see in the sample output. We also apply binary vectorization to the sentiment tag, which is what we want to predict – also known as our target. The result of the code returns a bag of dictionaries, where each dictionary object represents a review and contains its respective binarized values. We’re getting close to building our model, but one important thing stands in the way: our data is still in a Bag and it needs to be in an Array for Dask ML to read it. Some would convert data from Bag to an Array by first converting it to a DataFrame, and then using the values
attribute of the DataFrame to directly access the underlying Array. We could do that here, but DataFrames don’t tend to perform well with a large number of columns. Instead, we’ll take the existing NumPy arrays that we produced in the binary vectorization step and concatenate them together into one large Dask Array. Put another way, we’ll reduce a list of arrays to a single array using concatenation. Figure 3 shows a visual representation of what we want to accomplish:
Figure 2. Vectorizing the raw data into a bag of arrays, then concatenating to a single array
Effectively, we’re building a Dask Array from scratch one row at a time. This is fairly quick and efficient, because Dask’s lazy evaluation means we’re largely dealing with metadata until we try to materialize the data in the final array. Let’s take a look at how to do this in code:
Listing 6. Creating the feature array
>>> from dask import array as dask_array >>> def stacker(partition): ❶ >>> return dask_array.concatenate([element for element in partition]) >>> with ProgressBar(): ❷ >>> feature_arrays = model_data.pluck('features').map(lambda x: dask_array.from_array(x, 1000).reshape(1,-1)).reduction(perpartition=stacker, aggregate=stacker) >>> feature_array = feature_arrays.compute() >>> feature_array # Produces the following output: # dask.array<concatenate, shape=(568454, 100), dtype=int64, chunksize=(1, 100)>
❶ Partition is an iterable object that has to be materialized before passing to dask_array.concatenate
❷ Extract the features element from each dictionary, convert each NumPy array to a Dask Array object, then reduce all arrays together using concatenation
Listing 6 contains several new methods that we’ll unpack. First is the concatenate
function from the Dask Array API. It concatenates, or combines, a list of Dask Arrays into a single Dask Array. Because we ultimately want to combine each of the 568,454 vectors into one large array, this is exactly the function we want to use. Because the data is spread out across roughly 100 partitions, we’ll need to reduce each partition’s list of arrays into a single array, then combine the 100 partition-level arrays into one final large array. This can be done with the reduction
method of Dask Array. This function works slightly different from map
in that the function passed to it should receive an entire partition instead of a single element. After mapping the from_array
function to each element, each partition is a lazy list of Dask Array objects. This is exactly what input dask_array.concatenate
wants. The partition object passed into our stacker
function happens to be a generator object, which dask_array.concatenate
can’t cope with. Therefore, we need to materialize it into a list by using a for comprehension. You may think, at first, that this is counterproductive because materializing the partition into a list brings the data with it, but the partition happens to be a list of lazy Dask Array objects, and the only data that gets shuttled around is some metadata and the DAG tracking the computation which has occurred. We can see that we get the result we want because the new Array shape states it’s 568,454 rows by 100 columns. The shape of the feature array can be seen in Figure 4.
Figure 3. The shape of the feature array
Because we’ve done a lot to the data already, now is an opportune time to save our progress. Writing out the data before we train the model also speeds things up because the data is already in the shape needed to build the model. The Array API contains a method to write Dask Arrays to disk using the ZARR format, which is a column-store format similar to Parquet. The specifics of the file format are irrelevant here – we’re using ZARR because the Array API makes it easy to read and write to that format. We’ll quickly dump the prepared data to disk and read it back in for fast access:
Listing 7. Writing the data to ZARR and reading it back in
>>> with ProgressBar(): >>> feature_array.rechunk(5000).to_zarr('sentiment_feature_array.zarr') >>> feature_array = dask_array.from_zarr('sentiment_feature_array.zarr') >>> with ProgressBar(): >>> target_arrays = model_data.pluck('target').map(lambda x: dask_array.from_array(x, 1000).reshape(-1,1)).reduction(perpartition=stacker, aggregate=stacker) >>> target_arrays.compute().rechunk(5000).to_zarr('sentiment_target_array.zarr') >>> target_array = dask_array.from_zarr('sentiment_target_array.zarr')
Listing 7 is straightforward – because we’ve already gotten the feature array into the shape that we want through the concatenating we did in Listing 6; we only need to save it. We reuse the concatenating code on the target array data to follow the same process for the target data. The only new item worth pointing out is our decision to rechunk the data. You might have noticed after the concatenation that the array had a chunk size of (1,100). This means that each chunk contains one row and 100 columns. The ZARR format writes one file per chunk, meaning we’d produce 568,454 individual files if we didn’t rechunk the data. This is extremely inefficient because of the overhead involved with getting data off of a disk—this is the case whether we’re running Dask in local mode or on a large cluster. Typically, we’d want each chunk to be somewhere between 10MB and 1GB to minimize the IO overhead. I’ve selected a chunk size of 5000 rows per chunk in this example, and we end up with around 100 files, similar to the 100 partitions that the raw data was broken into. We also follow the same process of converting the target variable to an array and writing it to disk. Now we’re finally ready to build our model!
We’ll start by using an algorithm built in to Dask ML’s API: logistic regression. Logistic regression is an algorithm that can be used to predict binary (yes or no, good or bad, etc.) outcomes. This perfectly fits our desire to build a model to predict the sentiment of a review, because sentiment is discrete: it’s either positive or negative. But how can we know how good our model is at predicting sentiment? Or, put another way, how can we be sure that our model has learned some useful patterns from the data? To do that, we’ll want to set aside some reviews that the algorithm isn’t allowed to look at and learn from. This is called a holdout set or a test set. If the model does a good job predicting the outcomes of the holdout set, we can be reasonably confident that the model has learned useful patterns that generalize to our problem well. Otherwise, if the model doesn’t perform well on the holdout set, it’s likely due to the algorithm picking up on strong patterns that are unique to the data that it was trained on. This is called overfitting to the training set and should be avoided. Dask ML, like Scikit-learn, has some tools to help randomly select a holdout set that we can use for validation. Let’s take a look at how to split the data and build a logistic regression model:
Listing 8. Building the logistic regression
>>> from dask_ml.linear_model import LogisticRegression >>> from dask_ml.model_selection import train_test_split >>> X = feature_array >>> y = target_array.flatten() >>> X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) ❶ >>> lr = LogisticRegression() >>> with ProgressBar(): >>> lr.fit(X_train, y_train) ❷
❶ The train_test_split function divides the data into two pieces randomly by default this is a 90/10 split, where 90% of the data is in train and 10% of the data is in test
❷ The fit method is not lazy, so we wrap it in a ProgressBar context to monitor execution
In Listing 8, now that we’ve done all the hard work for data prep, building the model is relatively easy. The train_test_split
function randomly splits off a holdout set for us, then it’s as simple as feeding the features (X) and targets (y) to the fit
method of the LogisticRegression
object. It’s worth mentioning that we set the random_state
parameter of the train_test_split
function to 42, and you may be wondering why. The value of this parameter doesn’t matter – what’s most important is that you set it. This ensures the data is split the same way every time the train_test_split
function is called on the dataset. This is important when you’re running and comparing many models against each other. Because of inherent variability in the data, you could, by random chance, test on an easy or difficult to predict holdout set. In this case, the improvement (or worsening) of the model you’d witness wouldn’t be because you did anything to affect the model. Therefore, we want to make sure the data is “randomly” split the same way every time the model is built. After a few minutes, the model will be trained and ready to make predictions. Then, it’s time to score the model to see how good of a job it does predicting reviews it hasn’t seen before.
That’s all for now. If you want to learn more about the book, check it out on liveBook here and see this slide deck.