This is the 3rd post in the Predicting Global Energy Demand using Spark series. The series has the following posts.

This is a sequel of the previous blogs from our team members, where the solution for the problem related to energy consumption was explained. In this blog, let me explain our solution to another problem on the same data whose schema is mentioned here.

#### Problem

What would be household’s peak time load (Peak time is between 7 AM to 10 AM) for the next month.

• During Weekdays?
• During Weekends?

#### Brief Solution

In order to solve the above stated problem, we need to model the data in such a way that it is aggregated on hourly basis followed by the filtration of peak time records and splitting it into weekday and weekend records. This separated records are aggregated on monthly basis and the peak time load can be calculated on this.

#### Hourly Aggregation

In order to aggregate the energy consumption data, we need to think about some key factors such as the meter readings add up to form its aggregate whereas voltage and global intensity are averaged out to form its aggregate.

```def hourlyAggregator(inputRDD: RDD[Record]): RDD[((String, Long), Record)] = {
val groupRDD = inputRDD.map(record => ((record.date, record.hourofDay), record)).reduceByKey((firstRecord,
secondRecord) => {
val record = new Record()
record.date = firstRecord.date
record.day = firstRecord.day
record.month = firstRecord.month
record.year = firstRecord.year
record.hourofDay = firstRecord.hourofDay
record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
record.activePower = firstRecord.activePower + secondRecord.activePower
record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
record
})
groupRDD
}
```

#### Monthly Aggregation

Once the hourly aggregation is done, the records are aggregated on daily basis followed by aggregation based on the month.

``` def dailyAggregator(inputRDD: RDD[Record]): RDD[(String, Record)] = {
val groupRDD = inputRDD.map(record => (record.date, record)).reduceByKey((firstRecord, secondRecord) => {
val record = new Record()
record.date = firstRecord.date
record.day = firstRecord.day
record.month = firstRecord.month
record.year = firstRecord.year
record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
record.totalCost = firstRecord.totalCost + secondRecord.totalCost
record.activePower = firstRecord.activePower + secondRecord.activePower
record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
record
})
groupRDD
}
```
```def monthlyAggregator(inputRDD: RDD[Record]): RDD[((Int, Long), Record)] = {
val groupRDD = inputRDD.map(record => ((record.month, record.year), record)).reduceByKey((firstRecord,
secondRecord) => {
val record = new Record()
record.date = firstRecord.date
record.day = firstRecord.day
record.month = firstRecord.month
record.year = firstRecord.year
record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1
record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2
record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3
record.totalCost = firstRecord.totalCost + secondRecord.totalCost
record.activePower = firstRecord.activePower + secondRecord.activePower
record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower
record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2
record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2
record
})
groupRDD
}
```

#### Calculation of peak time load for the next month

The peak time load for the next month can be predicted using the Geometric Brownian motion algorithm, since the data is aggregated based on the month and this algorithm can be applied upon the data directly

```def findNextMonthPeakLoad(rdd:RDD[((Int,Long),Record)],sparkContext:SparkContext) : Double={
def standardMean(inputRDD: RDD[((Int,Long),Record)]): (List[Double], Double) = {
val count = inputRDD.count()
var sum = 0.0
var riList = List[Double]()
for (i <- Range(1, count.toInt)) {
val firstRecord = inputRDD.toArray()(i)
val secondRecord = inputRDD.toArray()(i - 1)
val difference = (firstRecord._2.totalPowerUsed -secondRecord._2.totalPowerUsed)
/firstRecord._2.totalPowerUsed
riList = riList ::: List(difference)
sum += difference
}
(riList, sum / count)
}
def standDeviation(inputRDD:RDD[Double],mean:Double): Double = {
val sum = inputRDD.map(value => {
(value - mean) * (value - mean)
}).reduce((firstValue, secondValue) => {
firstValue + secondValue
})
scala.math.sqrt(sum / inputRDD.count())
}
val (rList,mean) = standardMean(rdd)
val stdDeviation = standDeviation(sparkContext.makeRDD(rList),mean)
val sortedRdd=rdd.sortByKey(false)
val lastValue = sortedRdd.first()._2.totalPowerUsed
var newValues = List[Double]()
for(i<- Range(0,1000)){
val predictedValue = lastValue * (1 + mean * 1 + stdDeviation * scala.math.random * 1)
newValues::=predictedValue
}
val sorted = newValues.sorted
val value = sorted(10)/1000
value
}
```

Hope you understood the idea behind predicting the peak time load for the next month. Further blogs in this series would explain the solution for other problems.

For complete code refer here