Bulk processing of AEMO MMS data with a custom Spark Data Source

Background

AEMO is the Australian Energy Market Operator. It makes available a well organised database for market participants to track bids, demand, generation and other market functions. This database is known as the MMS (Market Management System Data Model).

Electricity researchers, retailers, distributors and others use this data to get insights and manage their business.

Public MMS data is freely available:

The traditional approach to make use of MMS datasets is to load them into an RDBMS. The volume, and variety of data can make this difficult, although some helper tools do exist. However loading a large history of granular data for analysis, even for a particular dataset is also a common business requirement.

Apache Spark (an alternative to traditional RDBMS) has a natural advantage in being able to read and process large datasets in parallel, particularly for analytics.

Can it be used here?

Challenges

The AEMO CSV format used to populate MMS allows there to be multiple reports in a single file.

Furthermore files are frequently compressed in Zip format. This usually means pre-processing is required – e.g. before reading in as text or CSV.

Whilst the underlying files are comma separated, the number of columns in each row also varies in a given file due to:

  • Different record types (Comment, Information or Data)
  • Different report schemas (each having a different column set)
AEMO MMS Data Model CSV structure

Here is a snippet from a sample file:

C,SETP.WORLD,DVD_DISPATCH_UNIT_SCADA,AEMO,PUBLIC,2021/10/07,00:00:05,0000000350353790,,0000000350353744
I,DISPATCH,UNIT_SCADA,1,SETTLEMENTDATE,DUID,SCADAVALUE
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BARCSF1,0
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BUTLERSG,9.499998
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",CAPTL_WF,47.048004
...lots more rows...
C,"END OF REPORT",3368947

This file structure presents some specific challenges for parsing with Spark and thus being able to derive useful insights from the underlying data.

Issue #1 – reading too many rows in a file (even for a single report) can cause out of memory issues

Issue #2 – naively reading just the data (D) rows misses file and report header information, such as column names

Issue #3 – parsing full files can result in unnecessary data being read, when only a subset is needed

Solution

SparkMMS is a custom data reader implemented in Java using Apache Spark’s DataSource V2 API.

It can be used to efficiently read AEMO MMS files in bulk.

Input:

SparkMMS takes a glob path, which means it can read multiple files based on a file pattern – e.g. to read all dispatch related zip files from a monthly archive:

/tmp/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH*.zip

Output:

Spark MMS creates a Spark dataframe with chunks of rows related to each specific report type across all input files. The data rows are nested in the “data” column of the dataframe. The file header, report headers (including column names) and data rows are also preserved:

>>> df.printSchema()
root
 |-- original_filename: string (nullable = false)
 |-- system: string (nullable = false)
 |-- report_id: string (nullable = false)
 |-- report_from: string (nullable = false)
 |-- report_to: string (nullable = false)
 |-- publish_datetime: timestamp (nullable = false)
 |-- id1: string (nullable = false)
 |-- id2: string (nullable = false)
 |-- id3: string (nullable = false)
 |-- report_type: string (nullable = false)
 |-- report_subtype: string (nullable = false)
 |-- report_version: integer (nullable = false)
 |-- column_headers: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = false)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

This structure makes it easy to do further processing of the data and means no information is lost when reading files in parallel:

Other features:

  • Reads both .CSV and .zip
  • Automatically splits large files into multiple partitions
  • Extracts useful metadata from raw files, including column headers
  • Supports multiple report schemas / versions
  • Supports predicate pushdown – skips reports within a file if not selected
  • Column pruning – reads of only a subset of data from raw files, if columns not selected
  • Can read from cloud storage (e.g. Azure Blob storage, Amazon S3, Databricks DBFS)

Demo

These steps show the SparkMMS custom reader in action using Azure Databricks:

Note: Databricks is a paid cloud based Data Lake / ML platform. Alternatively, see source code for a demonstration running Spark MMS locally on a single node.

Prerequisites

  1. Download the library:
    https://github.com/niftimus/SparkMMS/releases/download/v0.1/SparkMMS-0.1-SNAPSHOT.jar
  2. Start a Databricks cluster – e.g.:

    Note: Select Runtime 9.1 LTS for compatibility
  3. Add the SparkMMS library to the cluster via Cluster > Libraries > Install New > Drag and Drop Jar:

Using SparkMMS

1. Define helper functions. At runtime, these create MMS report specific dataframe definitions (with correct per-report column headings) and also create temporary tables to streamline querying via SQL:

# Get a new dataframe with the schema of a single report type
def getReport(df, report_type, report_subtype, report_version):
    from pyspark.sql.functions import explode
    df = df.where(f"report_type = '{report_type}' and report_subtype = '{report_subtype}' and report_version = {report_version}")
    tmpDF = df.select("column_headers", explode(df.data).alias("datarow"))
    
    colHeaders = df.select("column_headers").first().column_headers
    
    for idx, colName in enumerate(colHeaders):
        tmpDF = tmpDF.withColumn(colName, tmpDF.datarow[idx])
    
    tmpDF = tmpDF.drop("column_headers").drop("datarow")    
    
    return tmpDF

# Register all reports available in the dataframe as temporary view in the metastore
def registerAllReports(df=df):
    tmpDF = df.select("report_type","report_subtype","report_version")
    tmpDF = tmpDF.dropDuplicates()
    
    reports = tmpDF.collect()
    
    for r in reports:
        tmpReportDF = getReport(df,r.report_type,r.report_subtype,r.report_version)
        tmpReportDF.createOrReplaceTempView(f"{r.report_type}_{r.report_subtype}_{r.report_version}")

2. Create a temporary directory and download sample data from AEMO (15mb zipped, 191mb unzipped):

%sh
cd /dbfs/
mkdir tmp
cd tmp
wget https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip

Note – there is no need to unzip the file.

3. Read raw data into a Spark dataframe using SparkMMS:

Notes:

  • Option maxRowsPerPartition tells the reader to create each partition with a maximum of 50,000 report data rows. All report rows will be read, however some will be in different partitions for performance reasons.
  • Option minSplitFilesize tells the reader not to bother splitting files smaller than 1,000,000 bytes, which improves performance.
df = (
    spark 
    .read
    .format("com.analyticsanvil.SparkMMS")
    .option("fileName", "/tmp/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip")
    .option("maxRowsPerPartition","50000")
    .option("minSplitFilesize","1000000")
    .load()
)

4. Validate that the dataframe contains rows:

df.show()

Example output:

Note: Optionally here we can also run df.cache() to improve performance in subsequent steps.

5. Register each report found in the raw file(s) as a temporary table and then validate the output:

registerAllReports(df)

After the above command, a single temp table is registered because our file only contained one report:
Report type: DISPATCH
Report sub-type: UNIT_SCADA
Version: 1



Note: If we selected more files in step 2 above we would see more temp tables above.

Now query the temp table and check the data:

6. Finally, we can create a view on top of the temporary table(s) with further calculations or data-type conversions – for example:

%sql
-- Create a temporary view with expected data types
CREATE OR REPLACE TEMPORARY VIEW vw_dispatch_unit_scada_1
AS
SELECT
  to_timestamp(REPLACE(SETTLEMENTDATE,'"',''), 'yyyy/MM/dd HH:mm:ss') AS dispatch_time, -- Strip quote characters from SETTLEMENTDATE and convert to native timestamp type
  DUID AS generator,
  CAST(SCADAVALUE AS DOUBLE) AS generation_MW -- Convert to numeric
FROM dispatch_unit_scada_1;

…and then perform charting, aggregations. For example, charting the average generation in MW for three generation units (Coal, Wind, Solar) in September 2021:

Conclusion

Apache Spark provides a convenient way to process large datasets in parallel once data is available in a structured format.

AEMO’s MMS data model data is vast and varied, so keeping all data loaded in an online platform for eternity can be an expensive option. Occasionally, however, a use case may arise which relies on having a long period of historical data available to query.

SparkMMS demonstrates a convenient way to process raw files in bulk, with no pre-processing or manual schema design. In some organisations, historical files may be available on cloud / local storage, even if data has been archived from an RDBMS. Therefore, custom readers like SparkMMS may be a convenient option to explore for ad-hoc use cases, as an alternative to re-loading old data into a relational database.

Files

References

Realtime Solar PV charting

Solar PV inverters often have their own web-based monitoring solutions. However some of these do not make it easy to view current generation or consumption due to refresh delays. Out of the box monitoring is usually good for looking at long-term time periods however lacks the granularity to see consumption of appliances over the short term.

The challenge

Realtime monitoring of Solar generation and net export helps to maximise self-consumption. For example coordinating appliances to make best use of solar PV.

Existing inverter monitoring does not show granular data over recent history – for example, to be able to tell when a dishwasher has finished its heating cycle and whether another high-consumption appliance should be turned on:

Solution

This sample android application allows realtime monitoring whilst charting consumption, generation and net export:

Solar Watch screenshot

The chart shows recent data over time and is configurable for SMA and Enphase inverters. In both cases the local interface of each inverter is used to pull raw data:

  • SMA: https://<inverter_ip_address>/dyn/getDashValues.json
    • NB – Smart Inverter Screen must be enabled
  • Enphase: http://<envoy_ip_address>/production.json

Code

https://github.com/niftimus/SolarWatch

Features

  • Interactive UI:

  • Configurable settings:

Limitations / areas for future improvement

  • Improve security handling of SSL – the current code imports a self-signed SMA inverter certificate and disables hostname verification to allow the SMA local data to be retrieved
  • Refine code and publish to an app store
  • Remove hard-coding for extraction of metrics
  • Better error handling
  • Add a data export function

Conclusion

This sample app is really handy to monitor appliances in realtime and allows making informed decisions about when to start appliances.

Creating a custom Hive input format and record reader to read fixed format flat files

Apache Hive is great for enabling SQL-like queryability over flat files.  This is trivial in the case of tabular formatted files such as CSV files where we can set custom row and field delimiters out-of-the-box (e.g. Hive’s inbuilt CSV serde).  Even more complex files can be read and converted to a desired row and column format using Hive’s regexp_extract() function, so long as we can operate on a single row at a time.

The problem

What if the rows we want in Hive aren’t rows in the input files?  That is, we need to read the file as a whole and decode it to produce the output we want to see in Hive.

An example is the Australian Bureau of Meteorology’s ASCII Grid format.  These files are fixed file formats with a header section which effectively describes how to read the file.  In the data section, each data row corresponds to a row of latitude on a map (with starting coordinates identified in the header) and similarly each column defines a line of longitude.  Read as a whole, the file contains a grid of readings of particular weather observations – e.g. rainfall for a given time period:

BOM-grid-data
Example decoding of ASCII grid format file

To read this data in Hive it might be possible to define a table which hard-codes column values to their corresponding longitude, but this leaves the problem of reading simlarly formatted files with a different geographical granularity or different starting position on the globe.  Similarly, we may struggle at the Hive query language layer to determine the appropriate latitude of a given data row in the file.  This is because the header contains the required metadata as to which row in the file corresponds to a certain latitude.

To make the grid data easier to consume in Hive we may wish to transform files into a format such as this:

BOM-grid-data-desired_tabular_format
Geospatial data – desired tabular format

This means we can query a file by filtering on particular lat / long combinations.  One way to transform the file into this format is via creating custom Hive InputFormat and Record Reader Java classes that we can use at query time.

InputFormat / RecordReader vs SerDe

A key distinction when creating custom classes to use with Hive is the following:

  • InputFormat and RecordReader – takes files as input – generates rows
  • SerDe – takes rows as input – generates columns

Here, ASCII grid formatted files cannot be de-serialised row-by-row because there is important information in the header about what each row contains (i.e. the latitude of a given row is dependent on its position in the file and also information in the header), so a SerDe is likely not the best option.  Instead, an InputFormat Java class can be written to convert the input ASCII grid formatted files into the desired tabular format above, making it possibe to query via Hive via arbitrary lat / long coordinates.

Creating a custom InputFormat

An InputFormat compatible with Hive can be created by creating classes which implement and extend standard mapred library classes:

  1. CustomTextInputFormat.java –  extends FixedLengthInputFormat.  Returns a CustomTextRecordReader which plugs in to Hive at runtime behind the scenes.
  2. CustomTextRecordReader.java – implements mapred RecordReader<LongWritable, BytesWritable>.  Reads and decompresses (if required) files off the Hadoop filesystem. Calls ReadASCIIGridFile to do the actual transformation.
  3. ReadASCIIGridFile.java – contains a static class which does the transformation from input (a byte array – ASCII grid formatted) to output (a byte array – Hive row format)

Notes:

  • Code uses the mapred rather than mapreduce API of Hadoop, as Hive only supports mapred style InputFormat objects
  • CustomTextInputFormat.java sets all files to be non-splittable.  This is done because a file must be read in full with its header to properly convert to the target format.
  • The CustomTextRecordReader copes with compressed input files by utilising the org.apache.hadoop.io.compress.CompressionCodec class to decompress any input files which are compressed.  This is advantageous for ASCII grid formatted files which lend themselves well to compression (e.g. via GZIP) before being uploaded to HDFS.

Compiling the custom InputFormat

Copy text out of the above files and save to a folder on the filesystem.  Build these and then and compile into a JAR file (note – a Java JDK must be installed to run the compilation, preferably the same version as the target Hadoop system is running):

cd </path/where/java/files/are/stored>
mkdir build
javac -d ./build/ -cp "/usr/hdp/2.3.0.0-2557/hadoop/lib/*:/usr/hdp/2.3.0.0-2557/hadoop-mapreduce/*" *.java
cd build
jar cvf CustomTextInputFormat.jar *
cp CustomTextInputFormat.jar /tmp/

Note – the classpath in the javac command assumes that necessary Hadoop library jar files are installed  in certain locations.  The locations mentioned are for the Hortonworks HDP 2.3.0 Sandbox VM, but can be changed to suit other versions / distributions.

Using the custom InputFormat with Hive

Run hive at the command line.

In the Hive session, add the newly created InputFormat JAR:

hive> add jar /tmp/CustomTextInputFormat.jar;
Added [/tmp/CustomTextInputFormat.jar] to class path
Added resources: [/tmp/CustomTextInputFormat.jar]

Create an external table on an HDFS directory containing ASCII grid formatted files:

hive> create external table default.test_ascii
(
lat1 float, long1 float, lat2 float, long2 float, measurement float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'com.analyticsanvil.custominputformat.CustomTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/'; 
OK
Time taken: 13.192 seconds

Run a test query on the external table:

hive> select * from default.test_ascii limit 10;
OK
-43.975002      112.025 -43.925007      112.075 -999.0
-43.975002      112.075 -43.925007      112.12499       -999.0
-43.975002      112.12499       -43.925007      112.17499       -999.0
-43.975002      112.17499       -43.925007      112.22498       -999.0
-43.975002      112.22498       -43.925007      112.27498       -999.0
-43.975002      112.27498       -43.925007      112.324974      -999.0
-43.975002      112.324974      -43.925007      112.37497       -999.0
-43.975002      112.37497       -43.925007      112.424965      -999.0
-43.975002      112.424965      -43.925007      112.47496       -999.0
-43.975002      112.47496       -43.925007      112.524956      -999.0
Time taken: 3.94 seconds, Fetched: 10 row(s)

The query returns data in the desired tabular format.

Conclusion

By creating an InputFormat Java class which reads and transforms fixed format files at the time of Hive querying, we can effectively convert data into forms better which are better suited to analytical purposes.

Similarly, writing a custom input format allows almost any data to be read by invoking a custom Java class on each mapper, translating the input into tabular format for use in Hive.  In the above example ReadASCIIGridFile.java converts an ASCII grid formatted file to a long list of lat / long combinations and readings, but equally, a new Java class could be coded to read more exotic forms of input data – e.g. MP3 audio files, JPEGs or other types of binary file.  So long as a developer knows how to code the translation in Java, input formats can be converted into Hive queryable tabular data on the fly and in parallel (e.g. MP3 files with timestamps and spectral / frequency analysis).

It’s worth noting, further improvements can be made to the Java code above.  For example – more efficient parsing of input files (currently using regular expressions), better error checking, memory utilisation and a mechanism to combine input files for a given input split to improve performance.