Simply Business homepage
  • Business insurance

    • Business Insurance FAQs

    Business insurance covers

  • Support
  • Claims
  • Sign In
Call Us0333 0146 683
Chat With UsChat support 24/7
Tech blog

Using Hadoop and Mahout to cluster and analyse our customer data (Part 2)

3-minute read

Carlo Scarioni

Carlo Scarioni

20 November 2014

Share on FacebookShare on TwitterShare on LinkedIn

Read the first part of this hackathon implementation at Clustering our customers to get a full background on what's being presented here!

Analysing the data

At the end of the first post, we have obtained various clusters of customers in HDFS. This is the most important part of the job. However, we still need to do something with those clusters. We though about different possibilities: "The most common frequently asked question among members of a cluster", "What is the average premium that people pay in a particular cluster?", "What is the average time people are being insured with us in a particular cluster?".

Most of the analysis we defined here were straightforward aggregations, a job that Hadoop map-reduce does nicely (moreso as the data is already stored on HDFS).

There are many ways to analyse data with Hadoop including Hive, Scalding, Pig, and some others. Most of these do translations from a particular language to the native map-reduce algorithms supported by Hadoop. I chose Apache Pig for my analysis as I really liked the abstraction it creates on top of the standard Hadoop map-reduce. It does this by creating a language called Pig Latin. Pig Latin is very easy to read and write, both for people familiar with SQL analysis queries and for developers familiar with a procedural way of doing things.

Here I will show just one example, how to calculate the average premium for each cluster. Following is the Pig program that does this:

premiums = LOAD '/user/cscarion/imported_quotes' USING PigStorage('|') AS(rfq_id, premium, insurer);
cluster = LOAD '/user/cscarion/individual-clusters/part-r-00000' AS(clusterId, customerId);
customers = LOAD '/user/cscarion/aggregated_customers_text' using PigStorage('|') AS(id, vertical, trade, turnover, claims,rfq_id);

withPremiums = JOIN premiums BY rfq_id, customers BY rfq_id;

store withPremiums into 'withPremiums' using PigStorage('|');

groupCluster2 = JOIN withPremiums BY customers::id, cluster BY customerId;

grouped2 = GROUP groupCluster2 BY cluster::clusterId;

premiumsAverage = FOREACH grouped2 GENERATE group, AVG(groupCluster2.withPremiums::premiums::premium);

STORE premiumsAverage into 'premiumsAverage' using PigStorage('|');

The previous Pig program should be fairly straightforward to understand.

  • The RFQs with their respective premiums are loaded from HDFS into tuples like (rfq_id, premium, insurer)
  • The cluster information is loaded from HDFS into tuples like (cluster_id, customer_id)
  • The customers are loaded from the originally imported file into a tuple like (id, vertical, trade, turnover, claims,rfq_id)
  • The RFQs are joined with the customers using the rfq_id. This will basically add the premium to the customer collection.
  • The result of the previous join is joined with the cluster tuples. This essentially adds the cluster_id to the customer collection.
  • Results in the previous join are grouped by the cluster_id
  • For each group computed in the previous step, the average is calculated.
  • The results from the previous step are stored back into HDFS as a tuple (cluster_id, average_premium)

Simply Business technology

Querying the data from the main application

Now that we have the data and the analytics are passed to it, the next step is to consume this data from the main web application. For this, and to make it as unobtrusive as possible, I created a new server application (using Play) with a simple interface that was connected to the HDFS and could be queried from our main application.

So for example, when a customer is filling the form we can invoke an endpoint on this new service like this: GET /?trade=accountant&product=business&claims=2&turnover=25000&amployees=2

This call will vectorise that information, find the correct cluster, and return the information for given cluster. The important parts of the code follow:

def similarBusinesses(business: Business): Seq[Business] = {
  loadCentroids()
  val cluster = clusterForBusiness(business)
  val maxBusinesses = 100
  var currentBusinesses = 0
  CustomHadoopTextFileReader.readFile(s"hdfs://localhost:9000/individual-clusters/cluster-${cluster}-r-00000") {
    line =>
      val splitted = line.split("\t")
      userIds += splitted(1)
      currentBusinesses += 1

  }(currentBusinesses < maxBusinesses)
  businessesForUserIds(userIds)
}

That code finds the cluster for the business arriving from the main application. Then it reads the HDFS file representing that individual cluster and gets the business information for the returned user ids.

To find the cluster to which the business belongs to, we compare against the stored centroids:

private def clusterForBusiness(business: Business): String = {
  val businessVector = business.vector
  var currentDistance = Double.MaxValue
  var selectedCentroid: (String, Cluster) = null
  for (centroid <- SimilarBusinessesRetriever.centroids) {
    if (distance(centroid._2.getCenter, businessVector) < currentDistance) {
      currentDistance = distance(centroid._2.getCenter, businessVector);
      selectedCentroid = centroid;
    }
  }
  clusterId = Integer.valueOf(selectedCentroid._1)
  selectedCentroid._1
}

The code that actually reads the Hadoop filesystem follows, it looks like reading a simple file:

object CustomHadoopTextFileReader {
  def readFile(filePath: String)(f: String => Unit)(g: => Boolean = true) {
    try {
      val pt = new Path(filePath)
      val br = new BufferedReader(new InputStreamReader(SimilarBusinessesRetriever.fs.open(pt)));
      var line = br.readLine()
      while (line != null && g) {
        f(line)
        line = br.readLine()
      }
    } catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }
}

Then to return the premium for a particular cluster:

def averagePremium(cluster: Int): Int = {
  CustomHadoopTextFileReader.readFile("hdfs://localhost:9000/premiumsAverage/part-r-00000") {
    line =>
      val splitted = line.split("\\|")
      if (cluster.toString == splitted(0)) {
        return Math.ceil(java.lang.Double.valueOf(splitted(1))).toInt
      }
  }(true)
  0
}

Any of these values can then be returned to the calling application - this can provide a list of "similar" businesses or retrieve the premium average paid for those similar businesses.

This was it for the first iteration of the hack! The second iteration was to use the same infrastructure to cluster customers based on the location of their businesses instead of the dimensions used here. Given that the basic procedure for clustering remains the same despite the dimensions utilised, the code for that looks similar to the code presented here.

Ready to start your career at Simply Business?

Want to know more about what it's like to work in tech at Simply Business? Read about our approach to tech, then check out our current vacancies.

Find out more

We create this content for general information purposes and it should not be taken as advice. Always take professional advice. Read our full disclaimer

Find this article useful? Spread the word.

Share on Facebook
Share on Twitter
Share on LinkedIn

Keep up to date with Simply Business. Subscribe to our monthly newsletter and follow us on social media.

Subscribe to our newsletter

Insurance

Public liability insuranceBusiness insuranceProfessional indemnity insuranceEmployers’ liability insuranceLandlord insuranceTradesman insuranceSelf-employed insuranceRestaurant insuranceVan insuranceInsurers

Address

6th Floor99 Gresham StreetLondonEC2V 7NG

Northampton 900900 Pavilion DriveNorthamptonNN4 7RG

© Copyright 2024 Simply Business. All Rights Reserved. Simply Business is a trading name of Xbridge Limited which is authorised and regulated by the Financial Conduct Authority (Financial Services Registration No: 313348). Xbridge Limited (No: 3967717) has its registered office at 6th Floor, 99 Gresham Street, London, EC2V 7NG.