This is the 3rd post in the Predicting Global Energy Demand using Spark series. The series has the following posts.

This is a sequel of the previous blogs from our team members, where the solution for the problem related to energy consumption was explained. In this blog, let me explain our solution to another problem on the same data whose schema is mentioned here.

Problem

What would be household’s peak time load (Peak time is between 7 AM to 10 AM) for the next month.

  • During Weekdays?
  • During Weekends?

Brief Solution

In order to solve the above stated problem, we need to model the data in such a way that it is aggregated on hourly basis followed by the filtration of peak time records and splitting it into weekday and weekend records. This separated records are aggregated on monthly basis and the peak time load can be calculated on this.

Hourly Aggregation

In order to aggregate the energy consumption data, we need to think about some key factors such as the meter readings add up to form its aggregate whereas voltage and global intensity are averaged out to form its aggregate.

def hourlyAggregator(inputRDD: RDD[Record]): RDD[((String, Long), Record)] = {
    val groupRDD = inputRDD.map(record => ((record.date, record.hourofDay), record)).reduceByKey((firstRecord,
    secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.hourofDay = firstRecord.hourofDay
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }

Monthly Aggregation

Once the hourly aggregation is done, the records are aggregated on daily basis followed by aggregation based on the month.

 def dailyAggregator(inputRDD: RDD[Record]): RDD[(String, Record)] = {
    val groupRDD = inputRDD.map(record => (record.date, record)).reduceByKey((firstRecord, secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.totalCost = firstRecord.totalCost + secondRecord.totalCost
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }
def monthlyAggregator(inputRDD: RDD[Record]): RDD[((Int, Long), Record)] = {
    val groupRDD = inputRDD.map(record => ((record.month, record.year), record)).reduceByKey((firstRecord,
    secondRecord) => {
      val record = new Record()
      record.date = firstRecord.date
      record.day = firstRecord.day
      record.month = firstRecord.month
      record.year = firstRecord.year
      record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
      record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
      record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
      record.totalCost = firstRecord.totalCost + secondRecord.totalCost
      record.activePower = firstRecord.activePower + secondRecord.activePower
      record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
      record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
      record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
      record
    })
    groupRDD
  }

Calculation of peak time load for the next month

The peak time load for the next month can be predicted using the Geometric Brownian motion algorithm, since the data is aggregated based on the month and this algorithm can be applied upon the data directly

def findNextMonthPeakLoad(rdd:RDD[((Int,Long),Record)],sparkContext:SparkContext) : Double={
      def standardMean(inputRDD: RDD[((Int,Long),Record)]): (List[Double], Double) = {
        val count = inputRDD.count()
        var sum = 0.0
        var riList = List[Double]()
        for (i <- Range(1, count.toInt)) {
          val firstRecord = inputRDD.toArray()(i)
          val secondRecord = inputRDD.toArray()(i - 1)
          val difference = (firstRecord._2.totalPowerUsed -secondRecord._2.totalPowerUsed)
          /firstRecord._2.totalPowerUsed
          riList = riList ::: List(difference)
          sum += difference
        }
        (riList, sum / count)
      }
      def standDeviation(inputRDD:RDD[Double],mean:Double): Double = {
        val sum = inputRDD.map(value => {
          (value - mean) * (value - mean)
        }).reduce((firstValue, secondValue) => {
          firstValue + secondValue
        })
        scala.math.sqrt(sum / inputRDD.count())
      }
      val (rList,mean) = standardMean(rdd)
      val stdDeviation = standDeviation(sparkContext.makeRDD(rList),mean)
      val sortedRdd=rdd.sortByKey(false)
      val lastValue = sortedRdd.first()._2.totalPowerUsed
      var newValues = List[Double]()
      for(i<- Range(0,1000)){
        val predictedValue = lastValue * (1 + mean * 1 + stdDeviation * scala.math.random * 1)
        newValues::=predictedValue
      }
      val sorted = newValues.sorted
      val value = sorted(10)/1000
      value
    }

Hope you understood the idea behind predicting the peak time load for the next month. Further blogs in this series would explain the solution for other problems.

For complete code refer here

Leave a reply

required