Bulk processing of AEMO MMS data with a custom Spark Data Source

Background

AEMO is the Australian Energy Market Operator. It makes available a well organised database for market participants to track bids, demand, generation and other market functions. This database is known as the MMS (Market Management System Data Model).

Electricity researchers, retailers, distributors and others use this data to get insights and manage their business.

Public MMS data is freely available:

The traditional approach to make use of MMS datasets is to load them into an RDBMS. The volume, and variety of data can make this difficult, although some helper tools do exist. However loading a large history of granular data for analysis, even for a particular dataset is also a common business requirement.

Apache Spark (an alternative to traditional RDBMS) has a natural advantage in being able to read and process large datasets in parallel, particularly for analytics.

Can it be used here?

Challenges

The AEMO CSV format used to populate MMS allows there to be multiple reports in a single file.

Furthermore files are frequently compressed in Zip format. This usually means pre-processing is required – e.g. before reading in as text or CSV.

Whilst the underlying files are comma separated, the number of columns in each row also varies in a given file due to:

  • Different record types (Comment, Information or Data)
  • Different report schemas (each having a different column set)
AEMO MMS Data Model CSV structure

Here is a snippet from a sample file:

C,SETP.WORLD,DVD_DISPATCH_UNIT_SCADA,AEMO,PUBLIC,2021/10/07,00:00:05,0000000350353790,,0000000350353744
I,DISPATCH,UNIT_SCADA,1,SETTLEMENTDATE,DUID,SCADAVALUE
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BARCSF1,0
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",BUTLERSG,9.499998
D,DISPATCH,UNIT_SCADA,1,"2021/09/01 00:05:00",CAPTL_WF,47.048004
...lots more rows...
C,"END OF REPORT",3368947

This file structure presents some specific challenges for parsing with Spark and thus being able to derive useful insights from the underlying data.

Issue #1 – reading too many rows in a file (even for a single report) can cause out of memory issues

Issue #2 – naively reading just the data (D) rows misses file and report header information, such as column names

Issue #3 – parsing full files can result in unnecessary data being read, when only a subset is needed

Solution

SparkMMS is a custom data reader implemented in Java using Apache Spark’s DataSource V2 API.

It can be used to efficiently read AEMO MMS files in bulk.

Input:

SparkMMS takes a glob path, which means it can read multiple files based on a file pattern – e.g. to read all dispatch related zip files from a monthly archive:

/tmp/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH*.zip

Output:

Spark MMS creates a Spark dataframe with chunks of rows related to each specific report type across all input files. The data rows are nested in the “data” column of the dataframe. The file header, report headers (including column names) and data rows are also preserved:

>>> df.printSchema()
root
 |-- original_filename: string (nullable = false)
 |-- system: string (nullable = false)
 |-- report_id: string (nullable = false)
 |-- report_from: string (nullable = false)
 |-- report_to: string (nullable = false)
 |-- publish_datetime: timestamp (nullable = false)
 |-- id1: string (nullable = false)
 |-- id2: string (nullable = false)
 |-- id3: string (nullable = false)
 |-- report_type: string (nullable = false)
 |-- report_subtype: string (nullable = false)
 |-- report_version: integer (nullable = false)
 |-- column_headers: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = false)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

This structure makes it easy to do further processing of the data and means no information is lost when reading files in parallel:

Other features:

  • Reads both .CSV and .zip
  • Automatically splits large files into multiple partitions
  • Extracts useful metadata from raw files, including column headers
  • Supports multiple report schemas / versions
  • Supports predicate pushdown – skips reports within a file if not selected
  • Column pruning – reads of only a subset of data from raw files, if columns not selected
  • Can read from cloud storage (e.g. Azure Blob storage, Amazon S3, Databricks DBFS)

Demo

These steps show the SparkMMS custom reader in action using Azure Databricks:

Note: Databricks is a paid cloud based Data Lake / ML platform. Alternatively, see source code for a demonstration running Spark MMS locally on a single node.

Prerequisites

  1. Download the library:
    https://github.com/niftimus/SparkMMS/releases/download/v0.1/SparkMMS-0.1-SNAPSHOT.jar
  2. Start a Databricks cluster – e.g.:

    Note: Select Runtime 9.1 LTS for compatibility
  3. Add the SparkMMS library to the cluster via Cluster > Libraries > Install New > Drag and Drop Jar:

Using SparkMMS

1. Define helper functions. At runtime, these create MMS report specific dataframe definitions (with correct per-report column headings) and also create temporary tables to streamline querying via SQL:

# Get a new dataframe with the schema of a single report type
def getReport(df, report_type, report_subtype, report_version):
    from pyspark.sql.functions import explode
    df = df.where(f"report_type = '{report_type}' and report_subtype = '{report_subtype}' and report_version = {report_version}")
    tmpDF = df.select("column_headers", explode(df.data).alias("datarow"))
    
    colHeaders = df.select("column_headers").first().column_headers
    
    for idx, colName in enumerate(colHeaders):
        tmpDF = tmpDF.withColumn(colName, tmpDF.datarow[idx])
    
    tmpDF = tmpDF.drop("column_headers").drop("datarow")    
    
    return tmpDF

# Register all reports available in the dataframe as temporary view in the metastore
def registerAllReports(df=df):
    tmpDF = df.select("report_type","report_subtype","report_version")
    tmpDF = tmpDF.dropDuplicates()
    
    reports = tmpDF.collect()
    
    for r in reports:
        tmpReportDF = getReport(df,r.report_type,r.report_subtype,r.report_version)
        tmpReportDF.createOrReplaceTempView(f"{r.report_type}_{r.report_subtype}_{r.report_version}")

2. Create a temporary directory and download sample data from AEMO (15mb zipped, 191mb unzipped):

%sh
cd /dbfs/
mkdir tmp
cd tmp
wget https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_09/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip

Note – there is no need to unzip the file.

3. Read raw data into a Spark dataframe using SparkMMS:

Notes:

  • Option maxRowsPerPartition tells the reader to create each partition with a maximum of 50,000 report data rows. All report rows will be read, however some will be in different partitions for performance reasons.
  • Option minSplitFilesize tells the reader not to bother splitting files smaller than 1,000,000 bytes, which improves performance.
df = (
    spark 
    .read
    .format("com.analyticsanvil.SparkMMS")
    .option("fileName", "/tmp/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202109010000.zip")
    .option("maxRowsPerPartition","50000")
    .option("minSplitFilesize","1000000")
    .load()
)

4. Validate that the dataframe contains rows:

df.show()

Example output:

Note: Optionally here we can also run df.cache() to improve performance in subsequent steps.

5. Register each report found in the raw file(s) as a temporary table and then validate the output:

registerAllReports(df)

After the above command, a single temp table is registered because our file only contained one report:
Report type: DISPATCH
Report sub-type: UNIT_SCADA
Version: 1



Note: If we selected more files in step 2 above we would see more temp tables above.

Now query the temp table and check the data:

6. Finally, we can create a view on top of the temporary table(s) with further calculations or data-type conversions – for example:

%sql
-- Create a temporary view with expected data types
CREATE OR REPLACE TEMPORARY VIEW vw_dispatch_unit_scada_1
AS
SELECT
  to_timestamp(REPLACE(SETTLEMENTDATE,'"',''), 'yyyy/MM/dd HH:mm:ss') AS dispatch_time, -- Strip quote characters from SETTLEMENTDATE and convert to native timestamp type
  DUID AS generator,
  CAST(SCADAVALUE AS DOUBLE) AS generation_MW -- Convert to numeric
FROM dispatch_unit_scada_1;

…and then perform charting, aggregations. For example, charting the average generation in MW for three generation units (Coal, Wind, Solar) in September 2021:

Conclusion

Apache Spark provides a convenient way to process large datasets in parallel once data is available in a structured format.

AEMO’s MMS data model data is vast and varied, so keeping all data loaded in an online platform for eternity can be an expensive option. Occasionally, however, a use case may arise which relies on having a long period of historical data available to query.

SparkMMS demonstrates a convenient way to process raw files in bulk, with no pre-processing or manual schema design. In some organisations, historical files may be available on cloud / local storage, even if data has been archived from an RDBMS. Therefore, custom readers like SparkMMS may be a convenient option to explore for ad-hoc use cases, as an alternative to re-loading old data into a relational database.

Files

References

Advertisement

Point in time Delta Lake table restore after S3 object deletion

Background

The Delta Lake format in Databricks provides a helpful way to restore table data using “time-travel” in case a DML statement removed or overwrote some data.

The goal of a restore is to bring back table data to a consistent version.

Delta lake timetravel

This allows accidental table operations to be reverted.

Example

Original table – contains 7 distinct diamond colour types including color = “G”:

Original table

Then, an accidental deletion occurs:

Accidental SQL delete statement

The table is now missing some data:

Modified table

However, we can bring back the deleted data by checking the Delta Lake history and restoring to a version or timestamp prior to when the delete occurred – in this case version 0 of mytable:

Delta Lake table history

Restoring the original table based on a timestamp (after version 0, but prior to version 1):

%sql
DROP TABLE IF EXISTS mytable_deltarestore;

CREATE TABLE mytable_deltarestore
USING DELTA
LOCATION "s3a://<mybucket>/mytable_deltarestore"
AS SELECT * FROM default.mytable TIMESTAMP AS OF "2021-07-25 12:20:00"; 

Now, the original data is available in the restored table, thanks to Delta Lake time-travel:

Restored data – via Timetravel

Challenge

What happens if table files (parquet data files or transaction log files) have been deleted in the underlying storage?

This might occur if a user or administrator accidentally deletes objects from S3 cloud storage.

Two types of files might get deleted manually.

Delta Lake data files

Symptom – table is missing data and can’t be queried:

%sql
SELECT * FROM mytable@v0;

(1) Spark Jobs
FileReadException: Error while reading file s3a://<mybucket>/mytable/part-00000-1932f078-53a0-4cbe-ac92-1b7c48f4900e-c000.snappy.parquet. A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement. For more information, see https://docs.microsoft.com/azure/databricks/delta/delta-intro#frequently-asked-questions
Caused by: FileNotFoundException: No such file or directory: s3a://<mybucket>/mytable/part-00000-1932f078-53a0-4cbe-ac92-1b7c48f4900e-c000.snappy.parquet

Delta Lake transaction logs

Symptom – table state is inconsistent and can’t be queried:

%sql
FSCK REPAIR TABLE mytable DRY RUN

Error in SQL statement: FileNotFoundException: s3a://<mybucket>/mytable/_delta_log/00000000000000000000.json: Unable to reconstruct state at version 1 as the transaction log has been truncated due to manual deletion or the log retention policy (delta.logRetentionDuration=30 days) and checkpoint retention policy (delta.checkpointRetentionDuration=2 days)

Solution

Versioning can be enabled for S3 buckets via the AWS management console:

S3 bucket configuration – Bucket Versioning enabled

This means that if any current object versions are deleted after the above configuration is set, it may be possible to restore them.

Databricks Delta Lake tables are stored on S3 under a given folder / prefix – e.g.:

s3a://<mybucket>/<mytable>

If this prefix can be restored to a “point in time”, this can be used to restore a non-corrupted version of a table – for example:

NB: Restoring will mean all data added after deletion occurs will be lost and would need to be reloaded from an upstream source. This also assumes that previous object versions are available on S3.

The following steps can be used in Databricks to restore past S3 object versions to a new location and re-read the table at the restore point:

  1. Install the s3-pit-restore python library in a new Databricks notebook cell:
    %pip install s3-pit-restore
  2. Run the restore command with a timestamp prior to the deletion:
    %sh
    export AWS_ACCESS_KEY_ID="<access_key_id>"
    export AWS_SECRET_ACCESS_KEY="<secret_access_key>"
    export AWS_DEFAULT_REGION="<aws_region>"
    s3-pit-restore -b <mybucket> -B <mybucket> -p mytable/ -P mytable_s3restore -t "25-07-2021 23:26:00 +10"
  3. Create a new table pointing to the restore location:
    %sql
    CREATE TABLE mytable_s3restore
    USING DELTA
    LOCATION "s3a://<mybucket>/mytable_s3restore/mytable";
  4. Verify the table contents are again available and no longer corrupted:

Conclusion

Other techniques like Table Access Control may be preferable to prevent Databricks users from deleting underlying S3 data, however Point in Time restore techniques may be possible where table corruption has occurred and S3 bucket versioning is enabled.

References

Automatically tagging, captioning and categorising locally stored images using the Azure Computer Vision API

It’s easy in the digital age to amass tens of thousands of photos (or more!). Categorising these can be a challenging task, let alone searching through them to find that one happy snap from 10 years ago.

Significant advances in machine learning over the past decade have made it possible to automatically tag and categorise photos without user input (assuming a machine learning model has been pre-trained). Many social media and photo sharing platforms make this functionality available for their users — for example, Flickr’s “Magic View”.  What if a user has a large number of files stored locally on a Hard Disk?

The problem

  • 49,049 uncategorised digital images stored locally
  • Manual categorisation
  • No easy way to search (e.g. “red dress”, “mountain”, “cat on a mat”)

The solution

Steps

  1. Obtain a Microsoft Azure cloud subscription (note – Azure is not free, however free trials may be available):
    https://azure.microsoft.com/en-us/free/
  2. Start a cognitive services account from the Azure portal and take note of one of the “Keys” (keys are interchangeable):
    https://portal.azure.com/
    computer_vision-azure_keys
  3. Log in to your Linux machine and ensure you have python3 installed:
    user@host.site:~> which python3
    /usr/bin/python3
  4. Ensure you have these python libraries installed:
    sudo su -
    pip3 install python-xmp-toolkit
    pip3 install argparse
    pip3 install Pillow
    exit
  5. Obtain a copy of the image-auto-tag script:
    git clone https://github.com/niftimusmaximus/image-auto-tag
  6. Automatically tag, caption and categorise an image (e.g. image.jpg):
    cd image-auto-tag
    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
      --captionConfidenceLevel 0.50 --tagConfidenceLevel 0.5
      --categoryConfidenceLevel 0.5 image.jpg

    Note – replace key with one of the ones obtained from the Azure Portal above

    Script will process the image:

    INFO: [image.jpg] Reading input file 1/1                                                                                                                      
    INFO: [image.jpg] Temporarily resized to 800x600                                                                                                              
    INFO: [image.jpg] Uploading to Azure Computer Vision API
                      (length: 107330 bytes)                                                                               
    INFO: [image.jpg] Response received from Azure Computer Vision API
                      (length: 1026 bytes)                                                                       
    INFO: [image.jpg] Appended caption 'a river with a mountain in the
                      background' (confidence: 0.67 >= 0.50)                                                     
    INFO: [image.jpg] Appended category 'outdoor_water'
                      (confidence: 0.84 >= 0.50)                                                                                
    INFO: [image.jpg] Appending tag 'nature' (confidence: 1.00 >= 0.50)                                                                                           
    INFO: [image.jpg] Appending tag 'outdoor' (confidence: 1.00 >= 0.50)                                                                                          
    INFO: [image.jpg] Appending tag 'water' (confidence: 0.99 >= 0.50)                                                                                            
    INFO: [image.jpg] Appending tag 'mountain' (confidence: 0.94 >= 0.50)                                                                                         
    INFO: [image.jpg] Appending tag 'river' (confidence: 0.90 >= 0.50)                                                                                            
    INFO: [image.jpg] Appending tag 'rock' (confidence: 0.89 >= 0.50)                                                                                             
    INFO: [image.jpg] Appending tag 'valley' (confidence: 0.75 >= 0.50)                                                                                           
    INFO: [image.jpg] Appending tag 'lake' (confidence: 0.60 >= 0.50)                                                                                             
    INFO: [image.jpg] Appending tag 'waterfall' (confidence: 0.60 >= 0.50)                                                                                        
    INFO: [image.jpg] Finished writing XMP data to file 1/1
  7. Verify the results:
    Auto tagging

    computer_vision-keyword_search
    API has applied “tags” which can be searched

    Auto captioning

    computer_vision-auto_caption
    API has captioned this image as “a beach with palm trees”

    Auto categorisation

    "plant_tree" hierarchical category has been applied
    API has applied the category “plant_tree” to this image

    Note – please see here for the API’s 86 category taxonomy

Script features

  • Writes to standard XMP metadata tags within JPG images which can be read by image management applications such as XnView MP and digiKam
  • Sends downsized images to Azure to improve performance

    Example
    – only send image of width 640 pixels (original image will retain its dimensions)

    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    --azureResizeWidth 640 image.jpg
  • Allows customisation of thresholds for tags, description and caption. This is useful because whilst good, the API is not perfect!

    Example – only caption image if caption confidence score from API is 0.5 or above:

    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    --captionConfidenceLevel 0.5 image.jpg