Using Hadoop and Mahout to cluster and analyse our customer data (Part 1)

The problems

  • We manage quite a bit of customer data, starting from the beginning of a customer’s search for a new insurance policy, all the way until they buy (or don’t buy) the policy. We keep all of this data, but for the most part, we don’t do anything to improve our customer offering.
  • Our site looks exactly the same for every customer – we don’t try to engage with them on a more personal level. No customisation exists, which means that each customer’s experience doesn’t adapt to his or her personality, specific trade, or home or business location. Nothing at all.
  • Filling out a long form is always boring. But filling it out while being unsure of what information to put where, and being forced to make a phone call to confirm details, is even more boring. In many cases, it could mean the customer just gets bored and leaves our form.

The idea

We wanted to create and test a solution that allowed us to group together similar customers using different sets of dimensions depending on the information we wanted to provide or obtain. We thought about introducing clustering technology and algorithms to group our customers.

This would be a very rough implementation that would allow us to prove certain techniques and solutions for this type of problems – it certainly would NOT cover all the nuances that machine learning algorithms and analysis carry with them. Many liberties were taken to get to a proof of concept. The code presented here is not 100% the same code used in the spike, but it forms a very accurate approximation

This post covers the implementation of the solution - a follow-up post will cover the data analysis portion!

Solution

Setting up the Clustering backend algorithms to allow multidimensional clustering.
  • I had already decided that I would put into practice my knowledge of Mahout and Hadoop to run the clustering processing. I installed Hadoop using my own recipes from hadoop-vagrant to be run on a local Vagrant cluster, and then to be run in a AWS cluster.
  • Hadoop is a framework that allows the processing of certaing types of tasks in a distributed environment using commodity machines that allows it to massively scale horizontaly. Its main components are the map-reduce execution framework and the HDFS distributed filesystem. For more details, check out my blog post.
Getting the data:
  • After Hadoop was installed, the first task was to find and extract the data. The data was stored on a SqlServer database, so we needed to fetch it and put it into HDFS. There is a fantastic tool called Sqoop that’s built just for this. Sqoop not only allows you to get data ready for HDFS, it actually uses Hadoop itself to paralellize the extraction of the data. The standard way to run Sqoop is as follows: ~~~ sqoop import –driver com.microsoft.sqlserver.jdbc.SQLServerDriver –connect “jdbc:sqlserver://xxx:1433;username=xxx;password=xxx;databaseName=xxx” –query “select xxx from xxxx” –split-by customer._id –fetch-size 20 –delete-target-dir –target-dir aggregated_customers –package-name “clustercustomers.sqoop” –null-string ‘’ –fields-terminated-by ‘,’ ~~~

The previous command generates the required hadoop compatible files that will be used in the subsequent analysis. The most important part is the query that you want to use to extract the data. In our case, in the first iteration, we extracted information like trade, vertical, claims, years_insured, and turnover. These values are the dimensions that we will use to group our “similar” customers.

K-Means Clustering.

I have read quite a bit about different machine learning techniques and algorithms. I have developed a bit with them in the past, particularly in the recommendation area. The first thing to decide with a Machine Learning problem is what exactly I want to achieve. First, let’s look at the three main problems that Machine Learning solves, and then follow the reasoning behind my choices.

Machine Learning algorithms in Mahout can be broadly categorized in three main areas:

  • Recommendation Algorithms: Try to make an informed guess about what things you might like out of a large domain of things. In the simplest and most common form, the inference is done based on similarity. This similarity could be based on items that you’ve already said you like, or similarity with other users that happen to like the same items as you.
    • Assume we have a database of movies, and say you like Lethal Weapon.
      • Item-Based similarity:
        • recommendations for movies similar to Lethal Weapon.
      • User-Based similarity:
        • recommendations for movies that other people who liked Lethal Weapon liked as well
  • Classification Algorithms: In the family of Supervised Learning algorithms (supervised because the set of resolutions and categories are known beforehand). Classification algortihms allow you to assign an item to a particular category given a set of known characteristics (where the category belongs to a limited set of options)
    • This technique used in Spam detection systems.
      • Let’s say you decide that any email with at least two of the following characteristics: 4 or more images, 4 words written in all-capital letters, and the text ‘congratulations’ with an exclamation mark at the end should be marked as Spam, and anything with fewer than two of these is not Spam.
      • The Classification system will be built upon this characteristics and rules. It knows that any incoming email that matches these rules will belong to the corresponding category.
  • Clustering Algorithms: These belong to the Unsupervised Learning family because there is no predetermined set of possible answers. Clustering algorithms are simply given a set of inputs with dimensions. The algorithm itself works out how to organize and group the data into individual clusters.

Given the previous definitions, it was very clear to me that what we needed was a Clustering solution because I didn’t have any idea how the data was supposed to be organized. I wanted the system to figure out the clustering and return a set of groups containing similar customers in each of them.

I selected K-Means clustering as the clustering algorithm I wanted to use. I have some familiarity with it, and it’s the most common clustering algorithm in use and in the bibliography around.

To quickly explain K-Means: When given a number of elements N and a number of resulting clusters K, it finds K centroid points (firstly at random) and iterates an X amount of times finding the n elements in N that are closer to each centroid k and grouping them together. In each iteration x the centroids are recalculated and the n elements are assigned to the cluster determined by their closest centroid. A better explanation is here.

From the previous explanation, you can see that K-Means expects the number of clusters K as an input. However, I had no idea at all of what a good number of clusters would be. In Mahout, you can combine K-Means with another clustering algorithm named Canopy. Canopy is capable of finding a first set of K centroids that can then be fed into K-Means.

The way Canopy works is roughly the following: Instead of being given a K for the total number of clusters, you provide Canopy with a measure of the size that you expect each cluster to have. This allows you to get different sized clusters on different runs of the algorithm (i.e. if you want to cluster people by a wider or narrower geographical location). The algorithm works by using a distance measure (most machine learning algorithms use this to find similarities) and a couple of threshold values T1 and T2. Looping through all the points in the dataset, the algorithm takes each point and compares aginst T1 and T2 (T2 > T2) to each of the already created Canopies. If it is between T2, it will be assigned to that existing Canopy. If it is between T1, but not T2, it will be added to the Canopy but will be allowed to be part of another Canopy. If it is neither it will be used as the center of a new Canopy. At the end, after all Canopies are created, the centroid is calculated for each of them. And these will be the centroids used for the next step using K-Means.

Deciding on and weighting the dimensions

For the K-Means, Canopy and in most Machine Learning algorithms, the way to find whether or not a particular item belongs or how to make a recommendation is based on a measure of distance. To be able to measure the distance between two items (customers, in our use case) their values need to be converted into a form that allows them to be compared and measured. This means that we need to convert our values, whatever they are, to a numeric representation that would allow us to use traditional distance measure algorithms to compare them.

The dimensions I planned on using for this first iteration were:

  • product
  • trade
  • turnover
  • employees
  • claims

Out of these dimensions, only 2 were already numbers (turnover and claims), and the other 3 were text values. Even trickier, only employees followed a directly comparable value (“less than 5 employees” is comparable to “more than 100 employees”) while the other 2 were discret disjunt values (product “business” is not direcly comparable to product “shop”).

For the case of employees, I converted the values to consecutive numbers like (values are made up for this example):

  • “less than 15 employees” -> 1
  • “between 15 and 50 employees” -> 2
  • “between 50 and 200 employees” -> 3

For the two discrete properties product and trade, I have to create individual dimensions for each of the discrete values that they can be. As in my example I was only going to use Baker and Accountant for trades and Shop and Business for product, the final dimensions Vector ended something like:

| shop | business | accountant | baker | turnover | employees | claims |

So let’s say we wanted to model an accountant with 50000 turnover 20 employees and 2 claims. His vector would look like:

| 0 | 1 | 1 | 0 | 50000 | 2 | 2 |

We can already see a problem with this vector. In particular we can see that the value turnover is much larger than the rest of the dimensions. This means that the calculation of measure will be extremely influenced by this value: we say this value has a much bigger weight than the rest. For the example, we assume that there is a maximum turnover of 100000.

In our case, we want to give extra weight to the product and trade dimensions and make turnover much less significant.

Mahout offers some functionality for doing just that. Normally as an implementation of the class WeightedDistanceMeasure it works by building a Vector with multipliers for each of the dimensions of the original vector. This vector needs to be the same size as the dimensions vector. In our case, we could have a vector like this:

| 10 | 10 | 5 | 5 | 1/100000 | 1/2 | 1/10 |

The effect of that Vector will be to alter the values of the original by multiplying the product by 10, the trade by 5, making sure that turnover is always less than 1, halving the influence of number of employees and making claims less influential.

NOTE: Finding the correct dimensions and weights for a clustering algorithm is a really hard exercise which normally requires multiple iterations to find the “best” solution. Our example, following with the Spike approach for this hackathon, is using completely arbitrary values chosen just to prove the technique, and not carefully crafted normalizations of data. If these values are good enough for our examples, then they are good enough.

Following are the main parts of the raw code written to convert the initial data to a list of vectors:

public class VectorCreationMapReduce extends Configured implements Tool {

public static class VectorizerMapper extends Mapper<LongWritable, Text, Text, VectorWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        VectorWritable writer = new VectorWritable();
        System.out.println(value.toString());
        String[] values = value.toString().split("\\|");
        double[] verticals = vectorForVertical(values[1]);
        double[] trade = vectorForTrade(values[2]);
        double[] turnover = vectorForDouble(values[3]);
        double[] claimCount = vectorForDouble(values[4]);
        double[] xCoordinate = vectorForDouble(values[7]);
        double[] yCoordinate = vectorForDouble(values[8]);
        NamedVector vector = new NamedVector(new DenseVector(concatArrays(verticals, trade, turnover, claimCount, xCoordinate, yCoordinate)), values[0]);
        writer.set(vector);
        context.write(new Text(values[0]), writer);
    }

 }

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int res = ToolRunner.run(conf, new VectorCreationMapReduce(), args);
    System.exit(res);
}

  @Override
  public int run(String[] strings) throws Exception {
    Configuration conf = super.getConf();
    conf.set("fs.default.name", "hdfs://"+ Configurations.HADOOP_MASTER_IP+":9000/");
    conf.set("mapred.job.tracker", Configurations.HADOOP_MASTER_IP+":9001");
    Job job = new Job(conf, "customer_to_vector_mapreduce");
    job.setJarByClass(VectorCreationMapReduce.class);
    job.setMapperClass(VectorizerMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(VectorWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(VectorWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPaths(job, "aggregated_customers_with_coordinates");
    FileOutputFormat.setOutputPath(job, new Path("vector_seq_file"));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

The code is a Hadoop map-reduce job (with only map phase) that takes the input from the Sqoop exported file and creates a NamedVector with the dimensions values. The Vector classes used are Mahout provided classes for use within Hadoop.

The next step was to run the actual Canopy algorithm to find the K centroids that we are going to feed to the K-Means algorithm.

This is run something like:

./hadoop org.apache.mahout.clustering.canopy.CanopyDriver  -i /vector_seq_file/part-m-00000 -o customer-centroids -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -t1 4.0 -t2 2.0

The previous command specifies the Mahout class that contains the Canopy Hadoop job. It specifies an input file from HDFS which is the output of the file generated by the previous vectorization process. It also specifies an output file customer-centroids where the generated vector centroids will be generated to. We also specify that we want to use an Euclidean distance measure with weighting, which is defined with the custom class we see below:

public class CustomWeightedEuclideanDistanceMeasure extends WeightedEuclideanDistanceMeasure {

  public static final Vector WEIGHTS = new DenseVector(new double[]{10, 10, 5, 5 ,1/1000, 1/2,1/10});
  public CustomWeightedEuclideanDistanceMeasure(){
      super();
      setWeights(WEIGHTS);
  }
}

This class simply extends the Mahout provided WeightedEuclideanDistanceMeasure and sets the custom weight vector that we mentioned above. This will make sure that when the algorithm runs, the weighting will be applied to all the vectors from the input.

Now that we have generated our K centroids, it is time to run the actual K-Means clustering algorithm. This is also very simple to run by using the Mahout provided classes:

 hadoop org.apache.mahout.clustering.kmeans.KMeansDriver  -i vector_seq_file/part-m-00000 -c customer-centroids/clusters-0-final -o customer-kmeans -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -x 10 -ow --clustering

In this command, we are specifying that we want to run the KMeansDriver hadoop job. We pass in the input vector file again, and we specify the centroids file generated by canopy with the -c option. We then specify where the clustering output should go, the use of the weighting mechanism again, and how many iterations we want to do on the data.

Here’s a quick overview of how K-Means actually works:

The K-Means Clustering algorithm starts with the given set of K centroids and iterates over adjusting the centorids until the iteration limit X is reached or until the centroids converge to a point from where they don’t move. Each iteration has 2 steps and works the following way. - For each point in the input, it finds the nearest centroid and assigns the point to the cluster represented by that centroid. - At the end of the iteration, the points are averaged to recalculate the new centroid possition. - If the maximum number of iterations is reached, or centroid points don’t move any more, the clustering concludes.

K-Means (and Canopy as well) are parallelizable algorithms, meaning that you can have many jobs working on a subset of the problem and aggregating results. This is where Hadoop comes in for the clustering execution. Internally Mahout and in particular KMeansDriver is build to work on the Hadoop Map-Reduce infrastructure. By leveraging Hadoop’s map-reduce proved implementation, Mahout algorithms are able to scale to very big data sets and process them in a parallel way.

After generating this cluster, the next step is to create individual cluster files and to a single cluster file with the simple syntax (cluster_id, customer_id).

This is done with the following map and reduce methods:

 public static class ClusterPassThroughMapper extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, Text> {
    public void map(IntWritable key, WeightedVectorWritable value, Context context) throws IOException, InterruptedException {
      NamedVector vector = (NamedVector) value.getVector();
      context.write(key,new Text(vector.getName()));
    }
}

public static class ClusterPointsToIndividualFile extends Reducer<IntWritable, Text, IntWritable, Text> {
    private MultipleOutputs mos;

    public void setup(Context context) {
        mos = new MultipleOutputs(context);
    }


    public void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
        for(Text text: value){
            mos.write("seq", key, text, "cluster-"+key.toString());
            context.write(key,text);
        }
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

This has allowed us to obtain clusters of customers - next week’s post will explore what can be done with these clusters!