We wanted to create and test a solution that allowed us to group together similar customers using different sets of dimensions depending on the information we wanted to provide or obtain. We thought about introducing clustering technology and algorithms to group our customers.
This would be a very rough implementation that would allow us to prove certain techniques and solutions for this type of problems -- it certainly would NOT cover all the nuances that machine learning algorithms and analysis carry with them. Many liberties were taken to get to a proof of concept. The code presented here is not 100% the same code used in the spike, but it forms a very accurate approximation
This post covers the implementation of the solution - a follow-up post will cover the data analysis portion!
Hadoop
is a framework that allows the processing of certaing types of tasks in a distributed environment using commodity machines that allows it to massively scale horizontaly. Its main components are the map-reduce execution framework and the HDFS distributed filesystem. For more details, check out my blog post.sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver --connect "jdbc:sqlserver://xxx:1433;username=xxx;password=xxx;databaseName=xxx" --query "select xxx from xxxx" --split-by customer._id --fetch-size 20 --delete-target-dir --target-dir aggregated_customers --package-name "clustercustomers.sqoop" --null-string '' --fields-terminated-by ','
The previous command generates the required hadoop
compatible files that will be used in the subsequent analysis. The most important part is the query that you want to use to extract the data. In our case, in the first iteration, we extracted information like trade
, vertical
, claims
, years_insured
, and turnover
. These values are the dimensions that we will use to group our "similar" customers.
I have read quite a bit about different machine learning techniques and algorithms. I have developed a bit with them in the past, particularly in the recommendation area. The first thing to decide with a Machine Learning problem is what exactly I want to achieve. First, let's look at the three main problems that Machine Learning solves, and then follow the reasoning behind my choices.
Machine Learning algorithms in Mahout can be broadly categorized in three main areas:
Given the previous definitions, it was very clear to me that what we needed was a Clustering solution because I didn't have any idea how the data was supposed to be organized. I wanted the system to figure out the clustering and return a set of groups containing similar customers in each of them.
I selected K-Means clustering as the clustering algorithm I wanted to use. I have some familiarity with it, and it's the most common clustering algorithm in use and in the bibliography around.
To quickly explain K-Means: When given a number of elements N and a number of resulting clusters K, it finds K centroid points (firstly at random) and iterates an X amount of times finding the n elements in N that are closer to each centroid k and grouping them together. In each iteration x the centroids are recalculated and the n elements are assigned to the cluster determined by their closest centroid. A better explanation is here.
From the previous explanation, you can see that K-Means expects the number of clusters K as an input. However, I had no idea at all of what a good number of clusters would be. In Mahout, you can combine K-Means with another clustering algorithm named Canopy. Canopy is capable of finding a first set of K centroids that can then be fed into K-Means.
The way Canopy works is roughly the following: Instead of being given a K for the total number of clusters, you provide Canopy with a measure of the size that you expect each cluster to have. This allows you to get different sized clusters on different runs of the algorithm (i.e. if you want to cluster people by a wider or narrower geographical location). The algorithm works by using a distance measure (most machine learning algorithms use this to find similarities) and a couple of threshold values T1 and T2. Looping through all the points in the dataset, the algorithm takes each point and compares aginst T1 and T2 (T2 > T2) to each of the already created Canopies. If it is between T2, it will be assigned to that existing Canopy. If it is between T1, but not T2, it will be added to the Canopy but will be allowed to be part of another Canopy. If it is neither it will be used as the center of a new Canopy. At the end, after all Canopies are created, the centroid is calculated for each of them. And these will be the centroids used for the next step using K-Means.
For the K-Means, Canopy and in most Machine Learning algorithms, the way to find whether or not a particular item belongs or how to make a recommendation is based on a measure of distance. To be able to measure the distance between two items (customers, in our use case) their values need to be converted into a form that allows them to be compared and measured. This means that we need to convert our values, whatever they are, to a numeric representation that would allow us to use traditional distance measure algorithms to compare them.
The dimensions I planned on using for this first iteration were:
Out of these dimensions, only 2 were already numbers (turnover and claims), and the other 3 were text values. Even trickier, only employees followed a directly comparable value ("less than 5 employees" is comparable to "more than 100 employees") while the other 2 were discret disjunt values (product "business" is not direcly comparable to product "shop").
For the case of employees, I converted the values to consecutive numbers like (values are made up for this example):
For the two discrete properties product and trade, I have to create individual dimensions for each of the discrete values that they can be. As in my example I was only going to use Baker and Accountant for trades and Shop and Business for product, the final dimensions Vector ended something like:
| shop | business | accountant | baker | turnover | employees | claims |
So let's say we wanted to model an accountant with 50000 turnover 20 employees and 2 claims. His vector would look like:
| 0 | 1 | 1 | 0 | 50000 | 2 | 2 |
We can already see a problem with this vector. In particular we can see that the value turnover is much larger than the rest of the dimensions. This means that the calculation of measure will be extremely influenced by this value: we say this value has a much bigger weight than the rest. For the example, we assume that there is a maximum turnover of 100000.
In our case, we want to give extra weight to the product and trade dimensions and make turnover much less significant.
Mahout offers some functionality for doing just that. Normally as an implementation of the class WeightedDistanceMeasure
it works by building a Vector with multipliers for each of the dimensions of the original vector. This vector needs to be the same size as the dimensions vector. In our case, we could have a vector like this:
| 10 | 10 | 5 | 5 | 1/100000 | 1/2 | 1/10 |
The effect of that Vector will be to alter the values of the original by multiplying the product by 10, the trade by 5, making sure that turnover is always less than 1, halving the influence of number of employees and making claims less influential.
NOTE: Finding the correct dimensions and weights for a clustering algorithm is a really hard exercise which normally requires multiple iterations to find the "best" solution. Our example, following with the Spike approach for this hackathon, is using completely arbitrary values chosen just to prove the technique, and not carefully crafted normalizations of data. If these values are good enough for our examples, then they are good enough.
Following are the main parts of the raw code written to convert the initial data to a list of vectors:
public class VectorCreationMapReduce extends Configured implements Tool {
public static class VectorizerMapper extends Mapper<LongWritable, Text, Text, VectorWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
VectorWritable writer = new VectorWritable();
System.out.println(value.toString());
String[] values = value.toString().split("\\|");
double[] verticals = vectorForVertical(values[1]);
double[] trade = vectorForTrade(values[2]);
double[] turnover = vectorForDouble(values[3]);
double[] claimCount = vectorForDouble(values[4]);
double[] xCoordinate = vectorForDouble(values[7]);
double[] yCoordinate = vectorForDouble(values[8]);
NamedVector vector = new NamedVector(new DenseVector(concatArrays(verticals, trade, turnover, claimCount, xCoordinate, yCoordinate)), values[0]);
writer.set(vector);
context.write(new Text(values[0]), writer);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int res = ToolRunner.run(conf, new VectorCreationMapReduce(), args);
System.exit(res);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = super.getConf();
conf.set("fs.default.name", "hdfs://"+ Configurations.HADOOP_MASTER_IP+":9000/");
conf.set("mapred.job.tracker", Configurations.HADOOP_MASTER_IP+":9001");
Job job = new Job(conf, "customer_to_vector_mapreduce");
job.setJarByClass(VectorCreationMapReduce.class);
job.setMapperClass(VectorizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VectorWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPaths(job, "aggregated_customers_with_coordinates");
FileOutputFormat.setOutputPath(job, new Path("vector_seq_file"));
return job.waitForCompletion(true) ? 0 : 1;
}
}
The code is a Hadoop map-reduce job (with only map phase) that takes the input from the Sqoop exported file and creates a NamedVector
with the dimensions values. The Vector
classes used are Mahout provided classes for use within Hadoop.
The next step was to run the actual Canopy algorithm to find the K centroids that we are going to feed to the K-Means algorithm.
This is run something like:
./hadoop org.apache.mahout.clustering.canopy.CanopyDriver -i /vector_seq_file/part-m-00000 -o customer-centroids -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -t1 4.0 -t2 2.0
The previous command specifies the Mahout class that contains the Canopy
Hadoop job. It specifies an input file from HDFS which is the output of the file generated by the previous vectorization process. It also specifies an output file customer-centroids where the generated vector centroids will be generated to. We also specify that we want to use an Euclidean distance measure with weighting, which is defined with the custom class we see below:
public class CustomWeightedEuclideanDistanceMeasure extends WeightedEuclideanDistanceMeasure {
public static final Vector WEIGHTS = new DenseVector(new double[]{10, 10, 5, 5 ,1/1000, 1/2,1/10});
public CustomWeightedEuclideanDistanceMeasure(){
super();
setWeights(WEIGHTS);
}
}
This class simply extends the Mahout provided WeightedEuclideanDistanceMeasure
and sets the custom weight vector that we mentioned above. This will make sure that when the algorithm runs, the weighting will be applied to all the vectors from the input.
Now that we have generated our K centroids, it is time to run the actual K-Means clustering algorithm. This is also very simple to run by using the Mahout provided classes:
hadoop org.apache.mahout.clustering.kmeans.KMeansDriver -i vector_seq_file/part-m-00000 -c customer-centroids/clusters-0-final -o customer-kmeans -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -x 10 -ow --clustering
In this command, we are specifying that we want to run the KMeansDriver
hadoop job. We pass in the input vector file again, and we specify the centroids file generated by canopy with the -c
option. We then specify where the clustering output should go, the use of the weighting mechanism again, and how many iterations we want to do on the data.
Here's a quick overview of how K-Means actually works:
The K-Means Clustering algorithm starts with the given set of K centroids and iterates over adjusting the centorids until the iteration limit X is reached or until the centroids converge to a point from where they don't move. Each iteration has 2 steps and works the following way.
K-Means (and Canopy as well) are parallelizable algorithms, meaning that you can have many jobs working on a subset of the problem and aggregating results. This is where Hadoop comes in for the clustering execution. Internally Mahout and in particular KMeansDriver is build to work on the Hadoop Map-Reduce infrastructure. By leveraging Hadoop's map-reduce proved implementation, Mahout algorithms are able to scale to very big data sets and process them in a parallel way.
After generating this cluster, the next step is to create individual cluster files and to a single cluster file with the simple syntax (cluster_id, customer_id)
.
This is done with the following map and reduce methods:
public static class ClusterPassThroughMapper extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, Text> {
public void map(IntWritable key, WeightedVectorWritable value, Context context) throws IOException, InterruptedException {
NamedVector vector = (NamedVector) value.getVector();
context.write(key,new Text(vector.getName()));
}
}
public static class ClusterPointsToIndividualFile extends Reducer<IntWritable, Text, IntWritable, Text> {
private MultipleOutputs mos;
public void setup(Context context) {
mos = new MultipleOutputs(context);
}
public void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
for(Text text: value){
mos.write("seq", key, text, "cluster-"+key.toString());
context.write(key,text);
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
This has allowed us to obtain clusters of customers - next week's post will explore what can be done with these clusters!
Want to know more about what it's like to work in tech at Simply Business? Read about our approach to tech, then check out our current vacancies.
Find out moreWe create this content for general information purposes and it should not be taken as advice. Always take professional advice. Read our full disclaimer
Keep up to date with Simply Business. Subscribe to our monthly newsletter and follow us on social media.
Subscribe to our newsletter6th Floor99 Gresham StreetLondonEC2V 7NG
Sol House29 St Katherine's StreetNorthamptonNN1 2QZ
© Copyright 2023 Simply Business. All Rights Reserved. Simply Business is a trading name of Xbridge Limited which is authorised and regulated by the Financial Conduct Authority (Financial Services Registration No: 313348). Xbridge Limited (No: 3967717) has its registered office at 6th Floor, 99 Gresham Street, London, EC2V 7NG.