This is the 3rd post in the Predicting Global Energy Demand using Spark series. The series has the following posts.
This is a sequel of the previous blogs from our team members, where the solution for the problem related to energy consumption was explained. In this blog, let me explain our solution to another problem on the same data whose schema is mentioned here. What would be household’s peak time load (Peak time is between 7 AM to 10 AM) for the next month.Problem
Brief Solution
In order to solve the above stated problem, we need to model the data in such a way that it is aggregated on hourly basis followed by the filtration of peak time records and splitting it into weekday and weekend records. This separated records are aggregated on monthly basis and the peak time load can be calculated on this.
Hourly Aggregation
In order to aggregate the energy consumption data, we need to think about some key factors such as the meter readings add up to form its aggregate whereas voltage and global intensity are averaged out to form its aggregate.
def hourlyAggregator(inputRDD: RDD[Record]): RDD[((String, Long), Record)] = { val groupRDD = inputRDD.map(record => ((record.date, record.hourofDay), record)).reduceByKey((firstRecord, secondRecord) => { val record = new Record() record.date = firstRecord.date record.day = firstRecord.day record.month = firstRecord.month record.year = firstRecord.year record.hourofDay = firstRecord.hourofDay record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1 record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2 record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3 record.activePower = firstRecord.activePower + secondRecord.activePower record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2 record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2 record }) groupRDD }
Monthly Aggregation
Once the hourly aggregation is done, the records are aggregated on daily basis followed by aggregation based on the month.
def dailyAggregator(inputRDD: RDD[Record]): RDD[(String, Record)] = { val groupRDD = inputRDD.map(record => (record.date, record)).reduceByKey((firstRecord, secondRecord) => { val record = new Record() record.date = firstRecord.date record.day = firstRecord.day record.month = firstRecord.month record.year = firstRecord.year record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1 record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2 record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3 record.totalCost = firstRecord.totalCost + secondRecord.totalCost record.activePower = firstRecord.activePower + secondRecord.activePower record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2 record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2 record }) groupRDD }
def monthlyAggregator(inputRDD: RDD[Record]): RDD[((Int, Long), Record)] = { val groupRDD = inputRDD.map(record => ((record.month, record.year), record)).reduceByKey((firstRecord, secondRecord) => { val record = new Record() record.date = firstRecord.date record.day = firstRecord.day record.month = firstRecord.month record.year = firstRecord.year record.subMetering1 = firstRecord.subMetering1 + secondRecord.subMetering1 record.subMetering2 = firstRecord.subMetering2 + secondRecord.subMetering2 record.subMetering3 = firstRecord.subMetering3 + secondRecord.subMetering3 record.totalCost = firstRecord.totalCost + secondRecord.totalCost record.activePower = firstRecord.activePower + secondRecord.activePower record.reactivePower = firstRecord.reactivePower + secondRecord.reactivePower record.voltage = (firstRecord.voltage + secondRecord.voltage) / 2 record.globalIntensity = (firstRecord.globalIntensity + secondRecord.globalIntensity) / 2 record }) groupRDD }
Calculation of peak time load for the next month
The peak time load for the next month can be predicted using the Geometric Brownian motion algorithm, since the data is aggregated based on the month and this algorithm can be applied upon the data directly
def findNextMonthPeakLoad(rdd:RDD[((Int,Long),Record)],sparkContext:SparkContext) : Double={ def standardMean(inputRDD: RDD[((Int,Long),Record)]): (List[Double], Double) = { val count = inputRDD.count() var sum = 0.0 var riList = List[Double]() for (i <- Range(1, count.toInt)) { val firstRecord = inputRDD.toArray()(i) val secondRecord = inputRDD.toArray()(i - 1) val difference = (firstRecord._2.totalPowerUsed -secondRecord._2.totalPowerUsed) /firstRecord._2.totalPowerUsed riList = riList ::: List(difference) sum += difference } (riList, sum / count) } def standDeviation(inputRDD:RDD[Double],mean:Double): Double = { val sum = inputRDD.map(value => { (value - mean) * (value - mean) }).reduce((firstValue, secondValue) => { firstValue + secondValue }) scala.math.sqrt(sum / inputRDD.count()) } val (rList,mean) = standardMean(rdd) val stdDeviation = standDeviation(sparkContext.makeRDD(rList),mean) val sortedRdd=rdd.sortByKey(false) val lastValue = sortedRdd.first()._2.totalPowerUsed var newValues = List[Double]() for(i<- Range(0,1000)){ val predictedValue = lastValue * (1 + mean * 1 + stdDeviation * scala.math.random * 1) newValues::=predictedValue } val sorted = newValues.sorted val value = sorted(10)/1000 value }
Hope you understood the idea behind predicting the peak time load for the next month. Further blogs in this series would explain the solution for other problems.
For complete code refer here