Using Azurite blob storage emulator with Spark

Sometimes it’s handy to be able to test Apache Spark developments locally. This might include testing cloud storage such as WASB (Windows Azure Storage Blob).

These steps describe the process for testing WASB locally without the need for an Azure account. These steps make use of the Azurite Storage Emulator.

Steps

  1. Prerequisites
    • Download and extract Apache Spark (spark-3.1.2-bin-hadoop3.2.tgz)
    • Download and install Docker
    • Start the Docker service – e.g. on Linux:
      sudo service docker start
    • (Optionally) Download and install Azure Storage Explorer
  2. Create a new directory and start the Azurite Storage Emulator Docker container – e.g.:

    mkdir ~/blob

    docker run -p 10000:10000 -p 10001:10001 -v /home/david/blob/:/data mcr.microsoft.com/azure-storage/azurite

    NB – in the above example, data will be persisted to the local linux directory /home/david/blob.
  3. Upload files with Storage Explorer:

    Connect Storage Explorer to the Local Storage emulator (keep defaults when adding the connection):





    Upload a sample file – e.g. to the “data” container:

  4. Start Spark using the packages option to include libraries needed to access Blob storage. The Maven coordinates are shown here are for the latest hadoop-azure package:

    cd ~/spark/spark-3.1.2-bin-hadoop3.2/bin

    ./pyspark --packages org.apache.hadoop:hadoop-azure:3.3.1

    The PySpark shell should start as per normal after downloading hadoop-azure and its dependencies.

    Troubleshooting:
    The following stack trace indicates the hadoop-azure driver or dependencies were not loaded successfully:
    ... py4j.protocol.Py4JJavaError: An error occurred while calling o33.load. : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269) ... Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593) ... 25 more ...

    Ensure the “packages” option is correctly set when invoking pyspark above.
  5. Query the data using the emulated Blob storage location from the PySpark shell:

    df=spark.read.format("csv").option("header",True).load("wasb://data@storageemulator/iris.csv")

    df.show()


    Notes:
    data – container where the data was uploaded earlier
    @storageemulator – this is a fixed string used to tell the WASB connector to point to the local emulator

    Example output:

Conclusion

Local storage emulation allows testing of wasb locations without the need to connect to a remote Azure subscription / storage account.

Advertisement

Running Spark on Yarn with Zeppelin and WASB storage

It’s increasingly said that “notebooks” are the new spreadsheets in terms of being a tool for exploratory data analysis.  The Apache Zeppelin project (https://zeppelin.incubator.apache.org/) is certainly one such promising notebook-style interface for performing advanced interactive querying of Hadoop data (whether via Hive, Spark, Shell or other scripting languages).

At the time of writing Zeppelin is not completely mature, for example – it lacks the ability to connect to a Kerberos secured Hive service, which may make things difficult in an enterprise environment.  Nonetheless it’s worth the look as a new type of workflow for data scientists and other data analysts.

Setup

Binaries for Zeppelin can be obtained here (version 5.5):
https://zeppelin.incubator.apache.org/download.html

Startup steps

Normal startup steps for Zeppelin are:

cd ~/zeppelin-0.5.5-incubating-bin-all
bin/zeppelin-daemon.sh restart

If using WASB (Windows Azure Blob Storage), however, as the default Hadoop filesystem, this step should be run before starting the Zeppelin daemon as per the above step.  Adding these JARs to the Classpath tells Zeppelin how to read from the WASB filesystem:

export CLASSPATH=.:/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar:/usr/hdp/2.3.0.1-3/hadoop/hadoop-azure-2.7.1.2.3.0.1-3.jar

Config

The following config should be set for running spark on as a Yarn job on an existing HDP2.3 Hadoop cluster (via the Interpreter tab in Zeppelin):

master=yarn-client
spark.driver.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.home=/usr/hdp/current/spark-client

zeppelin_spark_on_yarn
Spark on Yarn config for Zeppelin Spark interpreter

Results

Running the following code in a Zeppelin Notebook should succeed:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

zeppelin_start_spark_sql_context.png

And the Spark instance should be kicked off and remain running in the Yarn Resource Manager:

zeppelin_spark_on_yarn_rm.png

Troubleshooting

Zeppelin may have trouble reading from the WASB filesystem if the above classpath is not added prior to starting Zeppelin:

java.io.IOException: No FileSystem for scheme: wasb
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)

zeppelin_wasb_error.png

Thanks to the Microsoft support team for assisting with finding the right JAR files to add to the Classpath!

Using Azure Blob storage with Hadoop

Cloud providers such as Amazon (AWS) and Microsoft (Azure) provide fault-tolerant distributed storage services which can literally “take the load” off a Hadoop installation, providing some compelling advantages.  In the case of Microsoft Azure’s blob storage, however, this is not without its pitfalls.

With the release of Hadoop version 2.7.0 (and vendor packaged versions such as Hortonworks HDP 2.3) Windows Azure Blob storage can be used as either default or secondary storage for Hadoop as instead of HDFS.  See Alexei Khalyako’s description of how to configure both of these options here.

These are some benefits of using Blob storage instead of HDFS (see also Microsoft’s opinion):

  • Separate storage from compute – data can exist with 1, 10, 1000 or even zero Hadoop nodes, meaning compute resources can be scaled freely as there is no reliance on having HDFS services running on a minimum number of nodes with locally attached disks in order to simply access the data.  Equally there is no need to “rebalance” data when nodes are added or removed from the cluster.
  • Relatively Low cost of storage – at the time of writing this is roughly $414.10 AUD per Terabyte per year (see here).  This is quite impressive given the cost of hardware / electricity required to maintain this data even on commodity equipment.
  • Automatic replication – Azure storage can be replicated long-distance with the click of a mouse (by choosing geographically replicated storage).  This means the Azure cloud layer will take care of replicating data to another of its data-centres for disaster recovery purposes.  With the cheapest form of replication (local replication) data is stored 3 times in a single data centre, ensuring High Availability in the event of a single disk failure in the azure Datacentre.  With geo-replicated storage the data is also copied another 3 times into a secondary datacentre (although it would require a declaration from Microsoft to make the secondary copy accessible after a disaster – something completely out of an Azure customer’s control).

Questions you might ask about using Azure Blob storage instead of HDFS:

Q: Wouldn’t it be really slow having an on-premise Hadoop cluster connecting to storage accessibly only over the internet via TCP/IP?
A: Yes.  For this reason this architecture is not recommended.  Instead, it’s worth thinking about Azure storage only for clusters which are stood up in Azure itself (as VM’s or as the platform as a service offering HDInsight).  It is assumed the TCP/IP connectivity within an Azure datacentre is fast enough not to worry about network bottlenecks – i.e. from machines to storage, even despite being over TCP/IP.

Q: Hadoop is all about moving compute closer to storage – doesn’t using blob go against this principle?
A: Microsoft’s answer to this seems to be that the backbone connecting Azure compute VMs to blob storage should provide performance similar to what would be seen with locally attached disks (see Cindy Gross’s useful blog post here).  In other words, the architecture is like having a big disk attached to many nodes simultaneously and directly.

Q: If a blob storage account behaves like a hard disk, won’t it get overloaded with multiple nodes connecting to it simultaneously?
A: No – luckily per Microsoft blob storage accounts apparently do not behave like disks.  The performance characteristics of blob storage allow many Hadoop nodes to be simultaneously reading / writing data.  Microsoft claims a target of 60MB/s throughput (see here) per blob which might correspond to a single chunked file of a Hive table, as well as 15 Gbps overall read performance for a single storage account – i.e. allowing for approx 31 nodes, each reading from a blob account simultaneously at 60MB/s.

Pitfalls of blob storage instead of HDFS?

A very significant pitfall of using blob storage with Hadoop (despite the above advantages) is that whilst…:

“File owner and group are persisted, but the permissions model is not enforced.” (https://hadoop.apache.org/docs/stable/hadoop-azure/index.html)

This presents enormous challenges at the enterprise level in providing access to multiple users or even self-service access to unstructured or semi-structured data in a Hadoop-based data lake.  Given the frequent need to protect sensitive data within an organisation (e.g. customer, employee, financial data) it seems a severe limitation that the Hadoop interface to the blob storage APIs has not been augmented with the ability to enforce the file and folder permissions which it so dutifully records!

An example of the problem can be seen here by comparing native HDFS storage behaviour with blob storage behaviour (both when acting as the default hadoop filesystem):

Using HDFS – Authorisations working correctly:

[azureuser@hdplinux4 tmp]$ id
uid=500(azureuser) gid=500(azureuser) groups=500(azureuser)
[azureuser@hdplinux4 tmp]$ hdfs dfs -ls /tmp/testonly/
Found 1 items
-rw-------   1 hdfs hdfs         13 2015-08-26 00:57 /tmp/testonly/test.txt
[azureuser@hdplinux4 tmp]$ pwd
/tmp
[azureuser@hdplinux4 tmp]$ hdfs dfs -copyToLocal /tmp/testonly/test.txt
copyToLocal: Permission denied: user=azureuser, access=READ, inode="/tmp/testonly/test.txt":hdfs:hdfs:-rw-------

Using Blob – Authorisations are completely ignored (despite being visible via a hdfs dfs -ls command, permissions which say that only user hdfs should be allowed to read test.txt are completely ignored when user azureuser tries to copy the file):

azureuser@hdplinuxblob:/tmp> id
uid=1002(azureuser) gid=100(users) groups=100(users),16(dialout),33(video)
azureuser@hdplinuxblob:/tmp> hdfs dfs -ls /tmp/testonly/test.txt
-rw-------   1 hdfs hdfs         13 2015-08-26 01:00 /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> hdfs dfs -copyToLocal /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> ls -la test.txt
-rw-r--r-- 1 azureuser users 13 Aug 26 01:09 test.txt

Only time will tell whether Microsoft will rectify the severe gap currently in the Azure Blob storage integration into Hadoop.  There is some indication that they intend to close the gap with the Azure Data Lake service due for eventual release, which promises compatibility with many flavours of Hadoop (e.g. Hortonworks and Cloudera) as well as integration into Active Directory to allow for files and folders to be secured.  The challenge still remains, however, of providing a security mechanism which is compatible with the wider Hadoop ecosystem, and this gives pause to think about the choosing blob over HDFS when it should otherwise be an easy decision.