Using Hadoop and Mahout to cluster and analyse our customer data (Part 2)

Read the first part of this hackathon implementation at Clustering our customers to get a full background on what’s being presented here!

Analysing the data

At the end of the first post, we have obtained various clusters of customers in HDFS. This is the most important part of the job. However, we still need to do something with those clusters. We though about different possibilities: “The most common frequently asked question among members of a cluster”, “What is the average premium that people pay in a particular cluster?”, “What is the average time people are being insured with us in a particular cluster?”.

Most of the analysis we defined here were straightforward aggregations, a job that Hadoop map-reduce does nicely (moreso as the data is already stored on HDFS).

There are many ways to analyse data with Hadoop including Hive, Scalding, Pig, and some others. Most of these do translations from a particular language to the native map-reduce algorithms supported by Hadoop. I chose Apache Pig for my analysis as I really liked the abstraction it creates on top of the standard Hadoop map-reduce. It does this by creating a language called Pig Latin. Pig Latin is very easy to read and write, both for people familiar with SQL analysis queries and for developers familiar with a procedural way of doing things.

Here I will show just one example, how to calculate the average premium for each cluster. Following is the Pig program that does this:

premiums = LOAD '/user/cscarion/imported_quotes' USING PigStorage('|') AS(rfq_id, premium, insurer);
cluster = LOAD '/user/cscarion/individual-clusters/part-r-00000' AS(clusterId, customerId);
customers = LOAD '/user/cscarion/aggregated_customers_text' using PigStorage('|') AS(id, vertical, trade, turnover, claims,rfq_id);

withPremiums = JOIN premiums BY rfq_id, customers BY rfq_id;

store withPremiums into 'withPremiums' using PigStorage('|');

groupCluster2 = JOIN withPremiums BY customers::id, cluster BY customerId;

grouped2 = GROUP groupCluster2 BY cluster::clusterId;

premiumsAverage = FOREACH grouped2 GENERATE group, AVG(groupCluster2.withPremiums::premiums::premium);

STORE premiumsAverage into 'premiumsAverage' using PigStorage('|');

The previous Pig program should be fairly straightforward to understand.

  • The RFQs with their respective premiums are loaded from HDFS into tuples like (rfq_id, premium, insurer)
  • The cluster information is loaded from HDFS into tuples like (cluster_id, customer_id)
  • The customers are loaded from the originally imported file into a tuple like (id, vertical, trade, turnover, claims,rfq_id)
  • The RFQs are joined with the customers using the rfq_id. This will basically add the premium to the customer collection.
  • The result of the previous join is joined with the cluster tuples. This essentially adds the cluster_id to the customer collection.
  • Results in the previous join are grouped by the cluster_id
  • For each group computed in the previous step, the average is calculated.
  • The results from the previous step are stored back into HDFS as a tuple (cluster_id, average_premium)

Querying the data from the main application

Now that we have the data and the analytics are passed to it, the next step is to consume this data from the main web application. For this, and to make it as unobtrusive as possible, I created a new server application (using Play) with a simple interface that was connected to the HDFS and could be queried from our main application.

So for example, when a customer is filling the form we can invoke an endpoint on this new service like this: GET /?trade=accountant&product=business&claims=2&turnover=25000&amployees=2

This call will vectorise that information, find the correct cluster, and return the information for given cluster. The important parts of the code follow:

def similarBusinesses(business: Business): Seq[Business] = {
  loadCentroids()
  val cluster = clusterForBusiness(business)
  val maxBusinesses = 100
  var currentBusinesses = 0
  CustomHadoopTextFileReader.readFile(s"hdfs://localhost:9000/individual-clusters/cluster-${cluster}-r-00000") {
    line =>
      val splitted = line.split("\t")
      userIds += splitted(1)
      currentBusinesses += 1

  }(currentBusinesses < maxBusinesses)
  businessesForUserIds(userIds)
}

That code finds the cluster for the business arriving from the main application. Then it reads the HDFS file representing that individual cluster and gets the business information for the returned user ids.

To find the cluster to which the business belongs to, we compare against the stored centroids:

  private def clusterForBusiness(business: Business): String = {
    val businessVector = business.vector
    var currentDistance = Double.MaxValue
    var selectedCentroid: (String, Cluster) = null
    for (centroid <- SimilarBusinessesRetriever.centroids) {
      if (distance(centroid._2.getCenter, businessVector) < currentDistance) {
        currentDistance = distance(centroid._2.getCenter, businessVector);
        selectedCentroid = centroid;
      }
    }
    clusterId = Integer.valueOf(selectedCentroid._1)
    selectedCentroid._1
  }

The code that actually reads the Hadoop filesystem follows, it looks like reading a simple file:

object CustomHadoopTextFileReader {
  def readFile(filePath: String)(f: String => Unit)(g: => Boolean = true) {
    try {
      val pt = new Path(filePath)
      val br = new BufferedReader(new InputStreamReader(SimilarBusinessesRetriever.fs.open(pt)));
      var line = br.readLine()
      while (line != null && g) {
        f(line)
        line = br.readLine()
      }
    } catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }
}

Then to return the premium for a particular cluster:

def averagePremium(cluster: Int): Int = {
  CustomHadoopTextFileReader.readFile("hdfs://localhost:9000/premiumsAverage/part-r-00000") {
    line =>
      val splitted = line.split("\\|")
      if (cluster.toString == splitted(0)) {
        return Math.ceil(java.lang.Double.valueOf(splitted(1))).toInt
      }
  }(true)
  0
}

Any of these values can then be returned to the calling application - this can provide a list of “similar” businesses or retrieve the premium average paid for those similar businesses.

This was it for the first iteration of the hack! The second iteration was to use the same infrastructure to cluster customers based on the location of their businesses instead of the dimensions used here. Given that the basic procedure for clustering remains the same despite the dimensions utilised, the code for that looks similar to the code presented here.

Footer