What question strikes your mind when you are planning to buy a new gadget? There can be one common questions in everyone’s mind, how long this technology may sustain in the market? We will know the fate of todays technology only when we know what is coming next.

Wondering how to know about future technology or products today itself? A simple solution is reading technical blogs everyday, searching for rumors about future technology. Isn’t that tough? Yes, we also felt the same and that was an inspiration to build an app which is capable of capturing rumors from wide variety of sources.

What is a Rumor engine?
Once upon a time customers used to know about new technology only after weeks or months of its release, but now they are so much curious to know about the next generation technology at the time of its development or testing phase itself. Most of the customers are curious to know about the new technology way before its release but not everyone have time or patience to read technical blogs across 10s of blogging sites and follow rumors, leaks and releases. This Rumor engine is for such people. We crawl multiple technical blogging sites for the data, crunch them into a machine understandable format, apply machine learning and natural language processing algorithms on them to capture rumors. This rumor engine also gives users the flexibility of searching for a specific type of rumors like rumors based about Samsung, Iphone etc.

rumorengine

How do we do it?
We use machine learning algorithms and natural processing techniques to do the above specified task.

  • Technique
    The idea of having a Rumor engine is quite a new idea because of which we were not able to get any labelled dataset on the internet, hence we choose to do it manually. We collected xml feeds from multiple blog sites for 2 months and labelled 1.5k blogs manually, this is how we created our initial dataset. After this we used a technique called Co-training to label the remaining unlabelled data. This complete dataset we used to train our algorithm and create a model out of it. We use this model to classify blogs daily and show up on our UI. Now you would have got some idea about this engine, you can go ahead and use it here
  • Data Collection phase.
    The data collection phase was made easy by using Google api for parsing xml feed from the blog sites. Different blog sites use different formats for feed and developing a generic parser would be a time killing task, api provided by google does this task and provides feed in a single format.
  • Features creation
    We create feature array based on the words present in the blog. A numerical statistic called tf-idf (term frequency–inverse document frequency) represents a score of how important a word is in a corpus. The tf-idf value increases proportionally to the number of times a word appears in the document, but is offset by the frequency of the word in the corpus, which helps to control for the fact that some words are generally more common than others. This score handles stop words and common words that appear in all blogs and adjusts the score such that common words do not play a prominent role in classifying documents. We had experimented with raw feature extraction from word count which performed very poorly compared to later implement tf-idf technique.
  • Building a classification model
    We use Naive Bayes classification algorithm to build a model which is capable of classifying rumors and non-rumors. Naive Bayes algorithm is said to be one of the best performing algorithm for natural language processing.
  • Searching for specific rumors
    Once we completed first phase of our rumor engine, we wanted something more than just showing up todays rumors on an user interface. This pushed us to provide user an option to search for specific categories of rumors. Initially we had an idea of implementing a hash based database search but that would not be fast enough for real time application. Then we came across Apache Solr which is built upon Apache Lucene project. Apache Solr is an open source enterprise search platform which includes full-text-search and other features. This is a in-memory hash based search which performs well in real time.

What do we use underneath to do this?

  • Technology stack
    Spark is a high performance cluster computing engine with great api. MLLib is machine learning library which allow to run naive bayes and natural language processing to run on large number of blogs.

In short, What are we trying to say?
Our Rumor engine is for people out there who are very keen in understanding the upcoming products and rumors about it. So go ahead and use our engine. As of now our Rumor engine is for technology domain driven engine. This idea can be expanded to different domains like automobiles industry. We hope we get more chance to build that too and give out to people who are interested.

This is the 3rd post in the Predicting Global Energy Demand using Spark series. The series has the following posts.

This is a sequel of the previous blogs from our team members, where the solution for the problem related to energy consumption was explained. In this blog, let me explain our solution to another problem on the same data whose schema is mentioned here.

Problem

What would be household’s peak time load (Peak time is between 7 AM to 10 AM) for the next month.

  • During Weekdays?
  • During Weekends?

Brief Solution

In order to solve the above stated problem, we need to model the data in such a way that it is aggregated on hourly basis followed by the filtration of peak time records and splitting it into weekday and weekend records. This separated records are aggregated on monthly basis and the peak time load can be calculated on this.

Hourly Aggregation

In order to aggregate the energy consumption data, we need to think about some key factors such as the meter readings add up to form its aggregate whereas voltage and global intensity are averaged out to form its aggregate.

def hourlyAggregator(inputRDD: RDD[Record]): RDD[((String, Long), Record)] = {
    val groupRDD = inputRDD.map(record => ((record.date, record.hourofDay), record)).reduceByKey((firstRecord,
    secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.hourofDay = firstRecord.hourofDay
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }

Monthly Aggregation

Once the hourly aggregation is done, the records are aggregated on daily basis followed by aggregation based on the month.

 def dailyAggregator(inputRDD: RDD[Record]): RDD[(String, Record)] = {
    val groupRDD = inputRDD.map(record => (record.date, record)).reduceByKey((firstRecord, secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.totalCost = firstRecord.totalCost + secondRecord.totalCost
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }
def monthlyAggregator(inputRDD: RDD[Record]): RDD[((Int, Long), Record)] = {
    val groupRDD = inputRDD.map(record => ((record.month, record.year), record)).reduceByKey((firstRecord,
    secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.totalCost = firstRecord.totalCost + secondRecord.totalCost
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }

Calculation of peak time load for the next month

The peak time load for the next month can be predicted using the Geometric Brownian motion algorithm, since the data is aggregated based on the month and this algorithm can be applied upon the data directly

def findNextMonthPeakLoad(rdd:RDD[((Int,Long),Record)],sparkContext:SparkContext) : Double={
      def standardMean(inputRDD: RDD[((Int,Long),Record)]): (List[Double], Double) = {
        val count = inputRDD.count()
        var sum = 0.0
        var riList = List[Double]()
        for (i <- Range(1, count.toInt)) {
          val firstRecord = inputRDD.toArray()(i)
          val secondRecord = inputRDD.toArray()(i - 1)
          val difference = (firstRecord._2.totalPowerUsed -secondRecord._2.totalPowerUsed)
          /firstRecord._2.totalPowerUsed
          riList = riList ::: List(difference)
          sum += difference
        }
        (riList, sum / count)
      }
      def standDeviation(inputRDD:RDD[Double],mean:Double): Double = {
        val sum = inputRDD.map(value => {
          (value - mean) * (value - mean)
        }).reduce((firstValue, secondValue) => {
          firstValue + secondValue
        })
        scala.math.sqrt(sum / inputRDD.count())
      }
      val (rList,mean) = standardMean(rdd)
      val stdDeviation = standDeviation(sparkContext.makeRDD(rList),mean)
      val sortedRdd=rdd.sortByKey(false)
      val lastValue = sortedRdd.first()._2.totalPowerUsed
      var newValues = List[Double]()
      for(i<- Range(0,1000)){
        val predictedValue = lastValue * (1 + mean * 1 + stdDeviation * scala.math.random * 1)
        newValues::=predictedValue
      }
      val sorted = newValues.sorted
      val value = sorted(10)/1000
      value
    }

Hope you understood the idea behind predicting the peak time load for the next month. Further blogs in this series would explain the solution for other problems.

For complete code refer here