Read the first part of this hackathon implementation at Clustering our customers to get a full background on what's being presented here!
At the end of the first post, we have obtained various clusters of customers in HDFS. This is the most important part of the job. However, we still need to do something with those clusters. We though about different possibilities: "The most common frequently asked question among members of a cluster", "What is the average premium that people pay in a particular cluster?", "What is the average time people are being insured with us in a particular cluster?".
Most of the analysis we defined here were straightforward aggregations, a job that Hadoop map-reduce does nicely (moreso as the data is already stored on HDFS).
There are many ways to analyse data with Hadoop including Hive, Scalding, Pig, and some others. Most of these do translations from a particular language to the native map-reduce algorithms supported by Hadoop. I chose Apache Pig for my analysis as I really liked the abstraction it creates on top of the standard Hadoop map-reduce. It does this by creating a language called Pig Latin. Pig Latin is very easy to read and write, both for people familiar with SQL analysis queries and for developers familiar with a procedural way of doing things.
Here I will show just one example, how to calculate the average premium for each cluster. Following is the Pig program that does this:
premiums = LOAD '/user/cscarion/imported_quotes' USING PigStorage('|') AS(rfq_id, premium, insurer);
cluster = LOAD '/user/cscarion/individual-clusters/part-r-00000' AS(clusterId, customerId);
customers = LOAD '/user/cscarion/aggregated_customers_text' using PigStorage('|') AS(id, vertical, trade, turnover, claims,rfq_id);
withPremiums = JOIN premiums BY rfq_id, customers BY rfq_id;
store withPremiums into 'withPremiums' using PigStorage('|');
groupCluster2 = JOIN withPremiums BY customers::id, cluster BY customerId;
grouped2 = GROUP groupCluster2 BY cluster::clusterId;
premiumsAverage = FOREACH grouped2 GENERATE group, AVG(groupCluster2.withPremiums::premiums::premium);
STORE premiumsAverage into 'premiumsAverage' using PigStorage('|');
The previous Pig program should be fairly straightforward to understand.
(rfq_id, premium, insurer)
(cluster_id, customer_id)
(id, vertical, trade, turnover, claims,rfq_id)
rfq_id
. This will basically add the premium to the customer collection.cluster_id
to the customer collection.cluster_id
(cluster_id, average_premium)
Now that we have the data and the analytics are passed to it, the next step is to consume this data from the main web application. For this, and to make it as unobtrusive as possible, I created a new server application (using Play) with a simple interface that was connected to the HDFS and could be queried from our main application.
So for example, when a customer is filling the form we can invoke an endpoint on this new service like this: GET /?trade=accountant&product=business&claims=2&turnover=25000&loyees=2
This call will vectorise that information, find the correct cluster, and return the information for given cluster. The important parts of the code follow:
def similarBusinesses(business: Business): Seq[Business] = {
loadCentroids()
val cluster = clusterForBusiness(business)
val maxBusinesses = 100
var currentBusinesses = 0
CustomHadoopTextFileReader.readFile(s"hdfs://localhost:9000/individual-clusters/cluster-${cluster}-r-00000") {
line =>
val splitted = line.split("\t")
userIds += splitted(1)
currentBusinesses += 1
}(currentBusinesses < maxBusinesses)
businessesForUserIds(userIds)
}
That code finds the cluster for the business arriving from the main application. Then it reads the HDFS file representing that individual cluster and gets the business information for the returned user ids.
To find the cluster to which the business belongs to, we compare against the stored centroids:
private def clusterForBusiness(business: Business): String = {
val businessVector = business.vector
var currentDistance = Double.MaxValue
var selectedCentroid: (String, Cluster) = null
for (centroid <- SimilarBusinessesRetriever.centroids) {
if (distance(centroid._2.getCenter, businessVector) < currentDistance) {
currentDistance = distance(centroid._2.getCenter, businessVector);
selectedCentroid = centroid;
}
}
clusterId = Integer.valueOf(selectedCentroid._1)
selectedCentroid._1
}
The code that actually reads the Hadoop filesystem follows, it looks like reading a simple file:
object CustomHadoopTextFileReader {
def readFile(filePath: String)(f: String => Unit)(g: => Boolean = true) {
try {
val pt = new Path(filePath)
val br = new BufferedReader(new InputStreamReader(SimilarBusinessesRetriever.fs.open(pt)));
var line = br.readLine()
while (line != null && g) {
f(line)
line = br.readLine()
}
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}
Then to return the premium for a particular cluster:
def averagePremium(cluster: Int): Int = {
CustomHadoopTextFileReader.readFile("hdfs://localhost:9000/premiumsAverage/part-r-00000") {
line =>
val splitted = line.split("\\|")
if (cluster.toString == splitted(0)) {
return Math.ceil(java.lang.Double.valueOf(splitted(1))).toInt
}
}(true)
0
}
Any of these values can then be returned to the calling application - this can provide a list of "similar" businesses or retrieve the premium average paid for those similar businesses.
This was it for the first iteration of the hack! The second iteration was to use the same infrastructure to cluster customers based on the location of their businesses instead of the dimensions used here. Given that the basic procedure for clustering remains the same despite the dimensions utilised, the code for that looks similar to the code presented here.
Want to know more about what it's like to work in tech at Simply Business? Read about our approach to tech, then check out our current vacancies.
Find out moreWe create this content for general information purposes and it should not be taken as advice. Always take professional advice. Read our full disclaimer
Keep up to date with Simply Business. Subscribe to our monthly newsletter and follow us on social media.
Subscribe to our newsletter6th Floor99 Gresham StreetLondonEC2V 7NG
Sol House29 St Katherine's StreetNorthamptonNN1 2QZ
© Copyright 2023 Simply Business. All Rights Reserved. Simply Business is a trading name of Xbridge Limited which is authorised and regulated by the Financial Conduct Authority (Financial Services Registration No: 313348). Xbridge Limited (No: 3967717) has its registered office at 6th Floor, 99 Gresham Street, London, EC2V 7NG.