Bulk processing of AEMO MMS data with a custom Spark Data Source

Background

AEMO is the Australian Energy Market Operator. It makes available a well organised database for market participants to track bids, demand, generation and other market functions. This database is known as the MMS (Market Management System Data Model).

Electricity researchers, retailers, distributors and others use this data to get insights and manage their business.

Public MMS data is freely available:

The traditional approach to make use of MMS datasets is to load them into an RDBMS. The volume, and variety of data can make this difficult, although some helper tools do exist. However loading a large history of granular data for analysis, even for a particular dataset is also a common business requirement.

Apache Spark (an alternative to traditional RDBMS) has a natural advantage in being able to read and process large datasets in parallel, particularly for analytics.

Can it be used here?

Challenges

The AEMO CSV format used to populate MMS allows there to be multiple reports in a single file.

Furthermore files are frequently compressed in Zip format. This usually means pre-processing is required – e.g. before reading in as text or CSV.

Whilst the underlying files are comma separated, the number of columns in each row also varies in a given file due to:

  • Different record types (Comment, Information or Data)
  • Different report schemas (each having a different column set)
AEMO MMS Data Model CSV structure

Here is a snippet from a sample file:

C,SETP.WORLD,DVD_DISPATCH_UNIT_SCADA,AEMO,PUBLIC,2021/10/07,00:00:05,0000000350353790,,0000000350353744
I,DISPATCH,UNIT_SCADA,1,SETTLEMENTDATE,DUID,SCADAVALUE
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BARCSF1,0
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BUTLERSG,9.499998
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",CAPTL_WF,47.048004
...lots more rows...
C,"END OF REPORT",3368947

This file structure presents some specific challenges for parsing with Spark and thus being able to derive useful insights from the underlying data.

Issue #1 – reading too many rows in a file (even for a single report) can cause out of memory issues

Issue #2 – naively reading just the data (D) rows misses file and report header information, such as column names

Issue #3 – parsing full files can result in unnecessary data being read, when only a subset is needed

Solution

SparkMMS is a custom data reader implemented in Java using Apache Spark’s DataSource V2 API.

It can be used to efficiently read AEMO MMS files in bulk.

Input:

SparkMMS takes a glob path, which means it can read multiple files based on a file pattern – e.g. to read all dispatch related zip files from a monthly archive:

/tmp/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH*.zip

Output:

Spark MMS creates a Spark dataframe with chunks of rows related to each specific report type across all input files. The data rows are nested in the “data” column of the dataframe. The file header, report headers (including column names) and data rows are also preserved:

>>> df.printSchema()
root
 |-- original_filename: string (nullable = false)
 |-- system: string (nullable = false)
 |-- report_id: string (nullable = false)
 |-- report_from: string (nullable = false)
 |-- report_to: string (nullable = false)
 |-- publish_datetime: timestamp (nullable = false)
 |-- id1: string (nullable = false)
 |-- id2: string (nullable = false)
 |-- id3: string (nullable = false)
 |-- report_type: string (nullable = false)
 |-- report_subtype: string (nullable = false)
 |-- report_version: integer (nullable = false)
 |-- column_headers: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = false)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

This structure makes it easy to do further processing of the data and means no information is lost when reading files in parallel:

Other features:

  • Reads both .CSV and .zip
  • Automatically splits large files into multiple partitions
  • Extracts useful metadata from raw files, including column headers
  • Supports multiple report schemas / versions
  • Supports predicate pushdown – skips reports within a file if not selected
  • Column pruning – reads of only a subset of data from raw files, if columns not selected
  • Can read from cloud storage (e.g. Azure Blob storage, Amazon S3, Databricks DBFS)

Demo

These steps show the SparkMMS custom reader in action using Azure Databricks:

Note: Databricks is a paid cloud based Data Lake / ML platform. Alternatively, see source code for a demonstration running Spark MMS locally on a single node.

Prerequisites

  1. Download the library:
    https://github.com/niftimus/SparkMMS/releases/download/v0.1/SparkMMS-0.1-SNAPSHOT.jar
  2. Start a Databricks cluster – e.g.:

    Note: Select Runtime 9.1 LTS for compatibility
  3. Add the SparkMMS library to the cluster via Cluster > Libraries > Install New > Drag and Drop Jar:

Using SparkMMS

1. Define helper functions. At runtime, these create MMS report specific dataframe definitions (with correct per-report column headings) and also create temporary tables to streamline querying via SQL:

# Get a new dataframe with the schema of a single report type
def getReport(df, report_type, report_subtype, report_version):
    from pyspark.sql.functions import explode
    df = df.where(f"report_type = '{report_type}' and report_subtype = '{report_subtype}' and report_version = {report_version}")
    tmpDF = df.select("column_headers", explode(df.data).alias("datarow"))
    
    colHeaders = df.select("column_headers").first().column_headers
    
    for idx, colName in enumerate(colHeaders):
        tmpDF = tmpDF.withColumn(colName, tmpDF.datarow[idx])
    
    tmpDF = tmpDF.drop("column_headers").drop("datarow")    
    
    return tmpDF

# Register all reports available in the dataframe as temporary view in the metastore
def registerAllReports(df=df):
    tmpDF = df.select("report_type","report_subtype","report_version")
    tmpDF = tmpDF.dropDuplicates()
    
    reports = tmpDF.collect()
    
    for r in reports:
        tmpReportDF = getReport(df,r.report_type,r.report_subtype,r.report_version)
        tmpReportDF.createOrReplaceTempView(f"{r.report_type}_{r.report_subtype}_{r.report_version}")

2. Create a temporary directory and download sample data from AEMO (15mb zipped, 191mb unzipped):

%sh
cd /dbfs/
mkdir tmp
cd tmp
wget https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip

Note – there is no need to unzip the file.

3. Read raw data into a Spark dataframe using SparkMMS:

Notes:

  • Option maxRowsPerPartition tells the reader to create each partition with a maximum of 50,000 report data rows. All report rows will be read, however some will be in different partitions for performance reasons.
  • Option minSplitFilesize tells the reader not to bother splitting files smaller than 1,000,000 bytes, which improves performance.
df = (
    spark 
    .read
    .format("com.analyticsanvil.SparkMMS")
    .option("fileName", "/tmp/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip")
    .option("maxRowsPerPartition","50000")
    .option("minSplitFilesize","1000000")
    .load()
)

4. Validate that the dataframe contains rows:

df.show()

Example output:

Note: Optionally here we can also run df.cache() to improve performance in subsequent steps.

5. Register each report found in the raw file(s) as a temporary table and then validate the output:

registerAllReports(df)

After the above command, a single temp table is registered because our file only contained one report:
Report type: DISPATCH
Report sub-type: UNIT_SCADA
Version: 1



Note: If we selected more files in step 2 above we would see more temp tables above.

Now query the temp table and check the data:

6. Finally, we can create a view on top of the temporary table(s) with further calculations or data-type conversions – for example:

%sql
-- Create a temporary view with expected data types
CREATE OR REPLACE TEMPORARY VIEW vw_dispatch_unit_scada_1
AS
SELECT
  to_timestamp(REPLACE(SETTLEMENTDATE,'"',''), 'yyyy/MM/dd HH:mm:ss') AS dispatch_time, -- Strip quote characters from SETTLEMENTDATE and convert to native timestamp type
  DUID AS generator,
  CAST(SCADAVALUE AS DOUBLE) AS generation_MW -- Convert to numeric
FROM dispatch_unit_scada_1;

…and then perform charting, aggregations. For example, charting the average generation in MW for three generation units (Coal, Wind, Solar) in September 2021:

Conclusion

Apache Spark provides a convenient way to process large datasets in parallel once data is available in a structured format.

AEMO’s MMS data model data is vast and varied, so keeping all data loaded in an online platform for eternity can be an expensive option. Occasionally, however, a use case may arise which relies on having a long period of historical data available to query.

SparkMMS demonstrates a convenient way to process raw files in bulk, with no pre-processing or manual schema design. In some organisations, historical files may be available on cloud / local storage, even if data has been archived from an RDBMS. Therefore, custom readers like SparkMMS may be a convenient option to explore for ad-hoc use cases, as an alternative to re-loading old data into a relational database.

Files

References

Advertisement

Using Azurite blob storage emulator with Spark

Sometimes it’s handy to be able to test Apache Spark developments locally. This might include testing cloud storage such as WASB (Windows Azure Storage Blob).

These steps describe the process for testing WASB locally without the need for an Azure account. These steps make use of the Azurite Storage Emulator.

Steps

  1. Prerequisites
    • Download and extract Apache Spark (spark-3.1.2-bin-hadoop3.2.tgz)
    • Download and install Docker
    • Start the Docker service – e.g. on Linux:
      sudo service docker start
    • (Optionally) Download and install Azure Storage Explorer
  2. Create a new directory and start the Azurite Storage Emulator Docker container – e.g.:

    mkdir ~/blob

    docker run -p 10000:10000 -p 10001:10001 -v /home/david/blob/:/data mcr.microsoft.com/azure-storage/azurite

    NB – in the above example, data will be persisted to the local linux directory /home/david/blob.
  3. Upload files with Storage Explorer:

    Connect Storage Explorer to the Local Storage emulator (keep defaults when adding the connection):





    Upload a sample file – e.g. to the “data” container:

  4. Start Spark using the packages option to include libraries needed to access Blob storage. The Maven coordinates are shown here are for the latest hadoop-azure package:

    cd ~/spark/spark-3.1.2-bin-hadoop3.2/bin

    ./pyspark --packages org.apache.hadoop:hadoop-azure:3.3.1

    The PySpark shell should start as per normal after downloading hadoop-azure and its dependencies.

    Troubleshooting:
    The following stack trace indicates the hadoop-azure driver or dependencies were not loaded successfully:
    ... py4j.protocol.Py4JJavaError: An error occurred while calling o33.load. : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269) ... Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593) ... 25 more ...

    Ensure the “packages” option is correctly set when invoking pyspark above.
  5. Query the data using the emulated Blob storage location from the PySpark shell:

    df=spark.read.format("csv").option("header",True).load("wasb://data@storageemulator/iris.csv")

    df.show()


    Notes:
    data – container where the data was uploaded earlier
    @storageemulator – this is a fixed string used to tell the WASB connector to point to the local emulator

    Example output:

Conclusion

Local storage emulation allows testing of wasb locations without the need to connect to a remote Azure subscription / storage account.

Workaround for com.microsoft.aad.adal4j.AuthenticationException when accessing SQL Server table via Active Directory in Databricks

Symptom

When using Databricks 5.5 LTS to read a table from SQL Server using Azure Active Directory (AAD) authentication, the following exception occurs:

Error : java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException Error : java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.getFedAuthToken(SQLServerConnection.java:3609)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.onFedAuthInfo(SQLServerConnection.java:3580)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.processFedAuthInfo(SQLServerConnection.java:3548)
 at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onFedAuthInfo(tdsparser.java:261)
 at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:103)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:4290)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3157)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$100(SQLServerConnection.java:82)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3121)
 at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
 at ...io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
 at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.microsoft.aad.adal4j.AuthenticationException
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 59 more...

Cause

https://github.com/Azure/azure-sqldb-spark/issues/28

Workaround steps

1 – Create a new init script which will remove legacy MSSQL drivers from the cluster. The following commands create a new directory on DBFS and then create a shell script with a single command to remove mssql driver JARs:

%sh
mkdir /dbfs/myInitScriptDir
echo "rm /databricks/jars/*mssql*" > /dbfs/myInitScriptDir/myInitScript.sh

2 – Add the cluster init script in Clusters > Cluster > Edit > Advanced Options:

3 – Add the following two libraries to the cluster via Clusters > Cluster > Libraries > Install new:

com.microsoft.azure:adal4j:1.6.5
com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre8

4 – Restart the cluster.

5 – Run the following R code in aworkbook cell to validate that AAD authentication is working. NB – Replace the placeholder values in bold:

library(sparklyr)

connection <- spark_connect(method = "databricks")

x <- spark_read_jdbc(
connection,
name = 'mytemptable',
options = list(
url = 'jdbc:sqlserver://myazuresqlserver.database.windows.net:1433;database=myazuresqldatabase;authentication=ActiveDirectoryPassword;',
driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
user = 'myuser@example.com',
password = 'XXXXXXXX',
hostNameInCertificate = '*.database.windows.net',
dbtable = 'dbo.mytable'
)
)

x

After running the command “x” above, the table data should be displayed.

Conclusion

The Azure SQL Database table can now be read and the AuthenticationException no longer occurs:

Successful table query after spark_read_jdbc()

Credit: This workaround is based on thereverand‘s very helpful post on GitHub here.

Running Spark on Yarn with Zeppelin and WASB storage

It’s increasingly said that “notebooks” are the new spreadsheets in terms of being a tool for exploratory data analysis.  The Apache Zeppelin project (https://zeppelin.incubator.apache.org/) is certainly one such promising notebook-style interface for performing advanced interactive querying of Hadoop data (whether via Hive, Spark, Shell or other scripting languages).

At the time of writing Zeppelin is not completely mature, for example – it lacks the ability to connect to a Kerberos secured Hive service, which may make things difficult in an enterprise environment.  Nonetheless it’s worth the look as a new type of workflow for data scientists and other data analysts.

Setup

Binaries for Zeppelin can be obtained here (version 5.5):
https://zeppelin.incubator.apache.org/download.html

Startup steps

Normal startup steps for Zeppelin are:

cd ~/zeppelin-0.5.5-incubating-bin-all
bin/zeppelin-daemon.sh restart

If using WASB (Windows Azure Blob Storage), however, as the default Hadoop filesystem, this step should be run before starting the Zeppelin daemon as per the above step.  Adding these JARs to the Classpath tells Zeppelin how to read from the WASB filesystem:

export CLASSPATH=.:/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar:/usr/hdp/2.3.0.1-3/hadoop/hadoop-azure-2.7.1.2.3.0.1-3.jar

Config

The following config should be set for running spark on as a Yarn job on an existing HDP2.3 Hadoop cluster (via the Interpreter tab in Zeppelin):

master=yarn-client
spark.driver.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.home=/usr/hdp/current/spark-client

zeppelin_spark_on_yarn
Spark on Yarn config for Zeppelin Spark interpreter

Results

Running the following code in a Zeppelin Notebook should succeed:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

zeppelin_start_spark_sql_context.png

And the Spark instance should be kicked off and remain running in the Yarn Resource Manager:

zeppelin_spark_on_yarn_rm.png

Troubleshooting

Zeppelin may have trouble reading from the WASB filesystem if the above classpath is not added prior to starting Zeppelin:

java.io.IOException: No FileSystem for scheme: wasb
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)

zeppelin_wasb_error.png

Thanks to the Microsoft support team for assisting with finding the right JAR files to add to the Classpath!

Sparkling-water – keeping the web UI alive

Spark is a great way to make use of the available RAM on a Hadoop cluster to run fast in-memory analysis and queries, and H2O is a great project for running distributed machine learning algorithms on data stored in Hadoop.  Together they form “Sparkling Water” (Spark + H2O, obviously!).

Easy to follow instructions for setting up Sparkling Water are available here: http://h2o-release.s3.amazonaws.com/sparkling-water/master/103/index.html

Running spark on Yarn is a good way to utilise an existing Hadoop cluster, however it’s challenging using the “live” method below to keep the Sparkling Water H2O Flow interface running permanently.  Doing so can let a number of data scientists use the notebook style interface to run machine learning tasks.  Luckily, using the spark-submit invocation with the water.SparklingWaterDriver class can ensure the web UI remains online even after the shell session which kicked it off exits (see below Persistent method).

Live method – doesn’t stay online after exiting shell session

  1. Create a shell script:

    #!/bin/bash
    export SPARK_HOME=’/usr/hdp/current/spark-client/’
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export MASTER=”yarn-client”
    sparkling-water-1.3.5/bin/sparkling-shell –num-executors 3 –executor-memory 2g –master yarn-client

  2. Run sparkling-shell

    import org.apache.spark.h2o._
    val h2oContext = new H2OContext(sc).start()
    import h2oContext._

Persistent method – stays online even after exiting shell session

To start a “persistent” H2O cluster on Yarn (i.e. one which doesn’t exit immediately) simply run this command at the command line of a node where the spark client and sparkling water is installed:

nohup bin/spark-submit –class water.SparklingWaterDriver –master yarn-client –num-executors 3 –driver-memory 4g –executor-memory 2g –executor-cores 1 ../sparkling-water-0.2.1-58/assembly/build/libs/*.jar &

The Spark UI should be available on it’s usual port (http://XXX.XXX.XXX.XXX:54321) and should remain there even if the shell session which started the UI dies!

Thanks to the helpful and responsive folks at H2Oai for the above tip (originally answered here)!