Bulk processing of AEMO MMS data with a custom Spark Data Source

Background

AEMO is the Australian Energy Market Operator. It makes available a well organised database for market participants to track bids, demand, generation and other market functions. This database is known as the MMS (Market Management System Data Model).

Electricity researchers, retailers, distributors and others use this data to get insights and manage their business.

Public MMS data is freely available:

The traditional approach to make use of MMS datasets is to load them into an RDBMS. The volume, and variety of data can make this difficult, although some helper tools do exist. However loading a large history of granular data for analysis, even for a particular dataset is also a common business requirement.

Apache Spark (an alternative to traditional RDBMS) has a natural advantage in being able to read and process large datasets in parallel, particularly for analytics.

Can it be used here?

Challenges

The AEMO CSV format used to populate MMS allows there to be multiple reports in a single file.

Furthermore files are frequently compressed in Zip format. This usually means pre-processing is required – e.g. before reading in as text or CSV.

Whilst the underlying files are comma separated, the number of columns in each row also varies in a given file due to:

  • Different record types (Comment, Information or Data)
  • Different report schemas (each having a different column set)
AEMO MMS Data Model CSV structure

Here is a snippet from a sample file:

C,SETP.WORLD,DVD_DISPATCH_UNIT_SCADA,AEMO,PUBLIC,2021/10/07,00:00:05,0000000350353790,,0000000350353744
I,DISPATCH,UNIT_SCADA,1,SETTLEMENTDATE,DUID,SCADAVALUE
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BARCSF1,0
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BUTLERSG,9.499998
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",CAPTL_WF,47.048004
...lots more rows...
C,"END OF REPORT",3368947

This file structure presents some specific challenges for parsing with Spark and thus being able to derive useful insights from the underlying data.

Issue #1 – reading too many rows in a file (even for a single report) can cause out of memory issues

Issue #2 – naively reading just the data (D) rows misses file and report header information, such as column names

Issue #3 – parsing full files can result in unnecessary data being read, when only a subset is needed

Solution

SparkMMS is a custom data reader implemented in Java using Apache Spark’s DataSource V2 API.

It can be used to efficiently read AEMO MMS files in bulk.

Input:

SparkMMS takes a glob path, which means it can read multiple files based on a file pattern – e.g. to read all dispatch related zip files from a monthly archive:

/tmp/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH*.zip

Output:

Spark MMS creates a Spark dataframe with chunks of rows related to each specific report type across all input files. The data rows are nested in the “data” column of the dataframe. The file header, report headers (including column names) and data rows are also preserved:

>>> df.printSchema()
root
 |-- original_filename: string (nullable = false)
 |-- system: string (nullable = false)
 |-- report_id: string (nullable = false)
 |-- report_from: string (nullable = false)
 |-- report_to: string (nullable = false)
 |-- publish_datetime: timestamp (nullable = false)
 |-- id1: string (nullable = false)
 |-- id2: string (nullable = false)
 |-- id3: string (nullable = false)
 |-- report_type: string (nullable = false)
 |-- report_subtype: string (nullable = false)
 |-- report_version: integer (nullable = false)
 |-- column_headers: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = false)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

This structure makes it easy to do further processing of the data and means no information is lost when reading files in parallel:

Other features:

  • Reads both .CSV and .zip
  • Automatically splits large files into multiple partitions
  • Extracts useful metadata from raw files, including column headers
  • Supports multiple report schemas / versions
  • Supports predicate pushdown – skips reports within a file if not selected
  • Column pruning – reads of only a subset of data from raw files, if columns not selected
  • Can read from cloud storage (e.g. Azure Blob storage, Amazon S3, Databricks DBFS)

Demo

These steps show the SparkMMS custom reader in action using Azure Databricks:

Note: Databricks is a paid cloud based Data Lake / ML platform. Alternatively, see source code for a demonstration running Spark MMS locally on a single node.

Prerequisites

  1. Download the library:
    https://github.com/niftimus/SparkMMS/releases/download/v0.1/SparkMMS-0.1-SNAPSHOT.jar
  2. Start a Databricks cluster – e.g.:

    Note: Select Runtime 9.1 LTS for compatibility
  3. Add the SparkMMS library to the cluster via Cluster > Libraries > Install New > Drag and Drop Jar:

Using SparkMMS

1. Define helper functions. At runtime, these create MMS report specific dataframe definitions (with correct per-report column headings) and also create temporary tables to streamline querying via SQL:

# Get a new dataframe with the schema of a single report type
def getReport(df, report_type, report_subtype, report_version):
    from pyspark.sql.functions import explode
    df = df.where(f"report_type = '{report_type}' and report_subtype = '{report_subtype}' and report_version = {report_version}")
    tmpDF = df.select("column_headers", explode(df.data).alias("datarow"))
    
    colHeaders = df.select("column_headers").first().column_headers
    
    for idx, colName in enumerate(colHeaders):
        tmpDF = tmpDF.withColumn(colName, tmpDF.datarow[idx])
    
    tmpDF = tmpDF.drop("column_headers").drop("datarow")    
    
    return tmpDF

# Register all reports available in the dataframe as temporary view in the metastore
def registerAllReports(df=df):
    tmpDF = df.select("report_type","report_subtype","report_version")
    tmpDF = tmpDF.dropDuplicates()
    
    reports = tmpDF.collect()
    
    for r in reports:
        tmpReportDF = getReport(df,r.report_type,r.report_subtype,r.report_version)
        tmpReportDF.createOrReplaceTempView(f"{r.report_type}_{r.report_subtype}_{r.report_version}")

2. Create a temporary directory and download sample data from AEMO (15mb zipped, 191mb unzipped):

%sh
cd /dbfs/
mkdir tmp
cd tmp
wget https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip

Note – there is no need to unzip the file.

3. Read raw data into a Spark dataframe using SparkMMS:

Notes:

  • Option maxRowsPerPartition tells the reader to create each partition with a maximum of 50,000 report data rows. All report rows will be read, however some will be in different partitions for performance reasons.
  • Option minSplitFilesize tells the reader not to bother splitting files smaller than 1,000,000 bytes, which improves performance.
df = (
    spark 
    .read
    .format("com.analyticsanvil.SparkMMS")
    .option("fileName", "/tmp/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip")
    .option("maxRowsPerPartition","50000")
    .option("minSplitFilesize","1000000")
    .load()
)

4. Validate that the dataframe contains rows:

df.show()

Example output:

Note: Optionally here we can also run df.cache() to improve performance in subsequent steps.

5. Register each report found in the raw file(s) as a temporary table and then validate the output:

registerAllReports(df)

After the above command, a single temp table is registered because our file only contained one report:
Report type: DISPATCH
Report sub-type: UNIT_SCADA
Version: 1



Note: If we selected more files in step 2 above we would see more temp tables above.

Now query the temp table and check the data:

6. Finally, we can create a view on top of the temporary table(s) with further calculations or data-type conversions – for example:

%sql
-- Create a temporary view with expected data types
CREATE OR REPLACE TEMPORARY VIEW vw_dispatch_unit_scada_1
AS
SELECT
  to_timestamp(REPLACE(SETTLEMENTDATE,'"',''), 'yyyy/MM/dd HH:mm:ss') AS dispatch_time, -- Strip quote characters from SETTLEMENTDATE and convert to native timestamp type
  DUID AS generator,
  CAST(SCADAVALUE AS DOUBLE) AS generation_MW -- Convert to numeric
FROM dispatch_unit_scada_1;

…and then perform charting, aggregations. For example, charting the average generation in MW for three generation units (Coal, Wind, Solar) in September 2021:

Conclusion

Apache Spark provides a convenient way to process large datasets in parallel once data is available in a structured format.

AEMO’s MMS data model data is vast and varied, so keeping all data loaded in an online platform for eternity can be an expensive option. Occasionally, however, a use case may arise which relies on having a long period of historical data available to query.

SparkMMS demonstrates a convenient way to process raw files in bulk, with no pre-processing or manual schema design. In some organisations, historical files may be available on cloud / local storage, even if data has been archived from an RDBMS. Therefore, custom readers like SparkMMS may be a convenient option to explore for ad-hoc use cases, as an alternative to re-loading old data into a relational database.

Files

References

Advertisement

Using Azurite blob storage emulator with Spark

Sometimes it’s handy to be able to test Apache Spark developments locally. This might include testing cloud storage such as WASB (Windows Azure Storage Blob).

These steps describe the process for testing WASB locally without the need for an Azure account. These steps make use of the Azurite Storage Emulator.

Steps

  1. Prerequisites
    • Download and extract Apache Spark (spark-3.1.2-bin-hadoop3.2.tgz)
    • Download and install Docker
    • Start the Docker service – e.g. on Linux:
      sudo service docker start
    • (Optionally) Download and install Azure Storage Explorer
  2. Create a new directory and start the Azurite Storage Emulator Docker container – e.g.:

    mkdir ~/blob

    docker run -p 10000:10000 -p 10001:10001 -v /home/david/blob/:/data mcr.microsoft.com/azure-storage/azurite

    NB – in the above example, data will be persisted to the local linux directory /home/david/blob.
  3. Upload files with Storage Explorer:

    Connect Storage Explorer to the Local Storage emulator (keep defaults when adding the connection):





    Upload a sample file – e.g. to the “data” container:

  4. Start Spark using the packages option to include libraries needed to access Blob storage. The Maven coordinates are shown here are for the latest hadoop-azure package:

    cd ~/spark/spark-3.1.2-bin-hadoop3.2/bin

    ./pyspark --packages org.apache.hadoop:hadoop-azure:3.3.1

    The PySpark shell should start as per normal after downloading hadoop-azure and its dependencies.

    Troubleshooting:
    The following stack trace indicates the hadoop-azure driver or dependencies were not loaded successfully:
    ... py4j.protocol.Py4JJavaError: An error occurred while calling o33.load. : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269) ... Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593) ... 25 more ...

    Ensure the “packages” option is correctly set when invoking pyspark above.
  5. Query the data using the emulated Blob storage location from the PySpark shell:

    df=spark.read.format("csv").option("header",True).load("wasb://data@storageemulator/iris.csv")

    df.show()


    Notes:
    data – container where the data was uploaded earlier
    @storageemulator – this is a fixed string used to tell the WASB connector to point to the local emulator

    Example output:

Conclusion

Local storage emulation allows testing of wasb locations without the need to connect to a remote Azure subscription / storage account.

Creating a virtual solar PV plug for EV charging – Part 1

A while ago the Fully Charged show featured a great device called the Zappi, which can charge an EV using surplus solar:

https://www.youtube.com/watch?v=0EtegQfZQRw

This is pretty amazing for EV owners who also have solar PV.

It means that instead of exporting surplus energy at a reduced rate ($0.12/kWh) it is possible to avoid importing energy at a higher rate ($0.25/kWh). This can effectively double the benefit of having solar PV by boosting self consumption.

However as of writing, the Zappi V2 is $1,395 (for example, from EVolution here).

Challenge

Is it possible to create a software virtual plug to charge an EV using only self-generated solar PV?

The idea

Charging the EV using only rooftop solar costs $0.12/kWh. This is the opportunity cost of the feed-in tariff which would would otherwise be earned for feeding energy into the grid.

Charging the EV using grid power alone costs around $0.25/kWh.

Depending on the proportion of PV generation at a given time, the effective cost per kWh may be somewhere in between.

What if we can turn on the charger only at times when the solar is generating 100% or more of what the EV will use?

A custom software program could query net solar export and control a smart plug to generate savings.

Equipment

Mitsubishi Outlander PHEV
Envoy S Metered Solar house monitor
TP Link HS110 Smart Plug

Potential benefits

  • Cheaper EV charging (approximately 50% savings)
  • No need to manually enable / disable charging when:
    • Weather is variable
    • Household consumption is high (e.g. boiling a kettle or running the dishwasher)

Things to consider

These are also some risks to consider when designing a DIY software control:

  • The PHEV plug safety instructions say not to plug anything in between the wall socket and charger plug – i.e. where the SmartPlug should go.
  • The PHEV charger expects to be plugged in and left alone – will it be happy with power being enabled / disabled?

Another thing to consider… is it worth buying a Smartplug to do this?

Assuming the plug can be purchased for a reasonable price (for example $40 including shipping from here) and weekly EV charging from nearly empty, the plug pays itself off in <1 year:

Plug cost:40.00
Opportunity cost / lost export ($/kWh):0.12
Saved expense ($/kWh):0.25
Net saving ($/kWh):0.13
kWh savings to pay off:307.69
Average charging session (kWh):8.00
Number of charges:38.46
Back of the envelope calculations

Continued…

See Part 2 for an approach to implement this solution in Python…

Useful date formulas for Hive

Hive comes with some handy functions for transforming dates.  These can be helpful when working with date dimension tables and performing time-based comparisons and aggregations.

e.g. Convert a native Hive date formatted date string:

date_format(myDate,'dd-MM-yyyy')

Return the week number (within the year) of a particular date – i.e. first week of the year is 1, the week of new year’s eve is 52, etc:

weekofyear(myDate)

Other less obvious examples

Current month’s name (e.g. January, February, etc):

date_format(myDate, 'MMMMM')

First date of the current quarter:

cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date)

Last date of the current quarter:

cast(date_add(trunc(add_months(myDate,3-pmod(month(myDate)-1,3)),'MM'),-1) as date)

Day number of the current quarter (e.g. April 2nd is day 2 of the second quarter, December 9th is day 70 of the fourth quarter, etc):

datediff(myDate,cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date))+1

Managing Yarn memory with multiple Hive users

Out of the box (e.g. a standard Hortonworks HDP 2.2 install), Hive does not come configured optimally to manage multiple users running queries simultaneously.  This means it is possible for a single Hive query to use up all available Yarn memory, preventing other users from running a query simultaneously.

This high memory consumption can be observed via the resource manager HTTP management screen – e.g. http://<resourcemanagerIP>:8088/cluster

Almost all yarn memory used
Almost all yarn memory used

Also in Ambari…

Yarn used memory at 100%
Yarn used memory at 100%

Minimum queue memory per user

To guarantee the ability for more users to run Hive queries simultaneously (assuming capacity scheduler is used with default queue configuration), we can make a simple config settings change via Ambari:

Ambari Yarn config for capacity scheduler
Ambari Yarn config for capacity scheduler

Change from:

yarn.scheduler.capacity.root.default.user-limit-factor=1

To:

yarn.scheduler.capacity.root.default.user-limit-factor=0.33

This now means that each user of Hive will now receive a maximum of a third (or close to it) of Yarn memory resources.

Only a third of yarn memory used
Only a third of yarn memory used

Yarn used memory at 39%
Yarn used memory at 39%

This enables a better user experience for multi-user interactive querying of Hive – for example, by enabling 2-3 users to simultaneously use the cluster.

Another option

There is, however, one potential disadvantage to the above — namely cluster memory is potentially being wasted (by not being allocated) if the job queue contains only a single user’s jobs.  A related parameter change can alleviate this – namely by setting:

yarn.scheduler.capacity.default.minimum-user-limit-percent=33

The “minimum user limit percent” means that each user is guaranteed a certain percentage of the yarn job queue’s memory if there is a mix of different users’ jobs waiting in the queue.  In other words, 3 users will each get 33% of the queue memory for execution if their jobs are all waiting in the queue at the same time. If however, there is only one user with jobs waiting in the queue, his / her jobs will execute and consume all available memory in the queue.  For User A this means a better use of memory overall, but possibly at the expense of User B who might return from their lunch break and must wait for one of User A’s jobs to finish before getting the guaranteed percentage memory allocation.

Finding the balance

The above, along with other parameters can be used to ensure users make the most of available cluster memory but do not effectively lock out other users by filling the queue with long running jobs.

For example – these settings allow a single user to use up to 90% of available yarn queue memory, and up to 4 users (each with 25%) to eventually be running in the cluster (the 5th, 6th, 7th users will have to wait for other users’ jobs to be fully completed):

yarn.scheduler.capacity.root.default.user-limit-factor=0.90
yarn.scheduler.capacity.default.minimum-user-limit-percent=25

Visualising energy consumption profile (by hour of day) using D3.js

With the benefit of smart electricity meters it’s possible to obtain hourly data showing household consumption in KWh. I downloaded this dataset for my own house in CSV format from United Energy’s EnergyEasy portal.

With some massaging, the data can be formatted to a structure which which makes aggregation easier.  The excellent tool OpenRefine made this task easier, effectively unpivoting half-hourly measures which were in many columns into a single column, so that the data looks like this:

Day,Interval,Quantity
2012-01-01,0000,0.05
2012-01-01,0030,0.05
2012-01-01,0100,0.044
2012-01-01,0130,0.05
2012-01-01,0200,0.044
[...]
2013-12-31,2130,0.025
2013-12-31,2200,0.019
2013-12-31,2230,0.025
2013-12-31,2300,0.025
2013-12-31,2330,0.025

Using the D3 Javascript visualisation library, it’s possible to create an interactive visualisation which can interrogate this data in arbitrary ways to find patterns and answer basic questions about household energy consumption.  For example:

  1. During which hours of the day is the highest average energy consumption? Is this different in summer vs winter?  Has this changed from 2012 to 2013?
  2. Has the minimum energy consumption overnight changed?  Is the new (and slightly annoying) energy saving power board purchased in mid 2013 doing its job to reduce standby power use?
  3. During which hours of the day is power usage the most variable?

Features

  • Selectable date range – e.g. to compare a rolling 12 month period. This uses a “context” graphics section in D3.js with brush functionality to trigger realtime recalculation of data in the “focus” section when a user selects a range using their mouse.  The live update of the hourly consumption profile means it’s easy to see trends over time in the “focus” area of the screen (shown in the following point):

    D3 - Visualising hourly energy consumption profile - Brush
    D3 selectable time range using “brush” technique
  • Plotting of max / min / mean / standard deviation of KWh consumption per hour of the day:

    D3 - Visualising hourly energy consumption profile - Mean Min Max StdDev
    D3 Mean Max Min and Standard Deviation calculations for each half-hourly time interval of the day
  • “Snapshotting” of date range – e.g. to compare two consecutive years in an interactive way:

    D3 snapshot time comparison
    D3 snapshot time comparison

Demo

Check out a live example here:
http://jsfiddle.net/ugxo00bu/7/

Using Mondrian’s CurrentDateMember to show current day’s data in MDX

Let’s say we have the following MDX query to show data for a particular date (in this case the quantity measure of the cube Electricity):

WITH
SET [~ROWS] AS {[Time].[Day].[2014-01-01]}
SELECT
NON EMPTY {[Measures].[Quantity]} ON COLUMNS,
NON EMPTY [~ROWS] ON ROWS
FROM [Electricity]

Works OK:

Saiku MDX - Current Date Member 1

But what if we want the date to be dynamic, reflecting today’s date?

We can change the MDX to use Mondiran’s CurrentDateMember function:

WITH
SET [~ROWS] AS {CurrentDateMember([Time], """[Time].[Day]""\.[yyyy-mm-dd]")}
SELECT
NON EMPTY {[Measures].[Quantity]} ON COLUMNS,
NON EMPTY [~ROWS] ON ROWS
FROM [Electricity]

Now the date selected is today’s date (Note: date members in the time hierarchy are in the format of yyyy-mm-dd):

Saiku MDX - Current Date Member 2

Extra tip – rolling date range up to current day
We can also change the MDX query to select a rolling 365-day date range (ending with the current day):

...
SET [~ROWS] AS {CurrentDateMember([Time], """[Time].[Day]""\.[yyyy-mm-dd]").Lag(356):CurrentDateMember([Time], """[Time].[Day]""\.[yyyy-mm-dd]")}
...

Now a rolling 365-day date range is shown:

Saiku MDX - Current Date Member 3