Creating a custom Hive input format and record reader to read fixed format flat files

Apache Hive is great for enabling SQL-like queryability over flat files.  This is trivial in the case of tabular formatted files such as CSV files where we can set custom row and field delimiters out-of-the-box (e.g. Hive’s inbuilt CSV serde).  Even more complex files can be read and converted to a desired row and column format using Hive’s regexp_extract() function, so long as we can operate on a single row at a time.

The problem

What if the rows we want in Hive aren’t rows in the input files?  That is, we need to read the file as a whole and decode it to produce the output we want to see in Hive.

An example is the Australian Bureau of Meteorology’s ASCII Grid format.  These files are fixed file formats with a header section which effectively describes how to read the file.  In the data section, each data row corresponds to a row of latitude on a map (with starting coordinates identified in the header) and similarly each column defines a line of longitude.  Read as a whole, the file contains a grid of readings of particular weather observations – e.g. rainfall for a given time period:

BOM-grid-data
Example decoding of ASCII grid format file

To read this data in Hive it might be possible to define a table which hard-codes column values to their corresponding longitude, but this leaves the problem of reading simlarly formatted files with a different geographical granularity or different starting position on the globe.  Similarly, we may struggle at the Hive query language layer to determine the appropriate latitude of a given data row in the file.  This is because the header contains the required metadata as to which row in the file corresponds to a certain latitude.

To make the grid data easier to consume in Hive we may wish to transform files into a format such as this:

BOM-grid-data-desired_tabular_format
Geospatial data – desired tabular format

This means we can query a file by filtering on particular lat / long combinations.  One way to transform the file into this format is via creating custom Hive InputFormat and Record Reader Java classes that we can use at query time.

InputFormat / RecordReader vs SerDe

A key distinction when creating custom classes to use with Hive is the following:

  • InputFormat and RecordReader – takes files as input – generates rows
  • SerDe – takes rows as input – generates columns

Here, ASCII grid formatted files cannot be de-serialised row-by-row because there is important information in the header about what each row contains (i.e. the latitude of a given row is dependent on its position in the file and also information in the header), so a SerDe is likely not the best option.  Instead, an InputFormat Java class can be written to convert the input ASCII grid formatted files into the desired tabular format above, making it possibe to query via Hive via arbitrary lat / long coordinates.

Creating a custom InputFormat

An InputFormat compatible with Hive can be created by creating classes which implement and extend standard mapred library classes:

  1. CustomTextInputFormat.java –  extends FixedLengthInputFormat.  Returns a CustomTextRecordReader which plugs in to Hive at runtime behind the scenes.
  2. CustomTextRecordReader.java – implements mapred RecordReader<LongWritable, BytesWritable>.  Reads and decompresses (if required) files off the Hadoop filesystem. Calls ReadASCIIGridFile to do the actual transformation.
  3. ReadASCIIGridFile.java – contains a static class which does the transformation from input (a byte array – ASCII grid formatted) to output (a byte array – Hive row format)

Notes:

  • Code uses the mapred rather than mapreduce API of Hadoop, as Hive only supports mapred style InputFormat objects
  • CustomTextInputFormat.java sets all files to be non-splittable.  This is done because a file must be read in full with its header to properly convert to the target format.
  • The CustomTextRecordReader copes with compressed input files by utilising the org.apache.hadoop.io.compress.CompressionCodec class to decompress any input files which are compressed.  This is advantageous for ASCII grid formatted files which lend themselves well to compression (e.g. via GZIP) before being uploaded to HDFS.

Compiling the custom InputFormat

Copy text out of the above files and save to a folder on the filesystem.  Build these and then and compile into a JAR file (note – a Java JDK must be installed to run the compilation, preferably the same version as the target Hadoop system is running):

cd </path/where/java/files/are/stored>
mkdir build
javac -d ./build/ -cp "/usr/hdp/2.3.0.0-2557/hadoop/lib/*:/usr/hdp/2.3.0.0-2557/hadoop-mapreduce/*" *.java
cd build
jar cvf CustomTextInputFormat.jar *
cp CustomTextInputFormat.jar /tmp/

Note – the classpath in the javac command assumes that necessary Hadoop library jar files are installed  in certain locations.  The locations mentioned are for the Hortonworks HDP 2.3.0 Sandbox VM, but can be changed to suit other versions / distributions.

Using the custom InputFormat with Hive

Run hive at the command line.

In the Hive session, add the newly created InputFormat JAR:

hive> add jar /tmp/CustomTextInputFormat.jar;
Added [/tmp/CustomTextInputFormat.jar] to class path
Added resources: [/tmp/CustomTextInputFormat.jar]

Create an external table on an HDFS directory containing ASCII grid formatted files:

hive> create external table default.test_ascii
(
lat1 float, long1 float, lat2 float, long2 float, measurement float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'com.analyticsanvil.custominputformat.CustomTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/'; 
OK
Time taken: 13.192 seconds

Run a test query on the external table:

hive> select * from default.test_ascii limit 10;
OK
-43.975002      112.025 -43.925007      112.075 -999.0
-43.975002      112.075 -43.925007      112.12499       -999.0
-43.975002      112.12499       -43.925007      112.17499       -999.0
-43.975002      112.17499       -43.925007      112.22498       -999.0
-43.975002      112.22498       -43.925007      112.27498       -999.0
-43.975002      112.27498       -43.925007      112.324974      -999.0
-43.975002      112.324974      -43.925007      112.37497       -999.0
-43.975002      112.37497       -43.925007      112.424965      -999.0
-43.975002      112.424965      -43.925007      112.47496       -999.0
-43.975002      112.47496       -43.925007      112.524956      -999.0
Time taken: 3.94 seconds, Fetched: 10 row(s)

The query returns data in the desired tabular format.

Conclusion

By creating an InputFormat Java class which reads and transforms fixed format files at the time of Hive querying, we can effectively convert data into forms better which are better suited to analytical purposes.

Similarly, writing a custom input format allows almost any data to be read by invoking a custom Java class on each mapper, translating the input into tabular format for use in Hive.  In the above example ReadASCIIGridFile.java converts an ASCII grid formatted file to a long list of lat / long combinations and readings, but equally, a new Java class could be coded to read more exotic forms of input data – e.g. MP3 audio files, JPEGs or other types of binary file.  So long as a developer knows how to code the translation in Java, input formats can be converted into Hive queryable tabular data on the fly and in parallel (e.g. MP3 files with timestamps and spectral / frequency analysis).

It’s worth noting, further improvements can be made to the Java code above.  For example – more efficient parsing of input files (currently using regular expressions), better error checking, memory utilisation and a mechanism to combine input files for a given input split to improve performance.

Running Spark on Yarn with Zeppelin and WASB storage

It’s increasingly said that “notebooks” are the new spreadsheets in terms of being a tool for exploratory data analysis.  The Apache Zeppelin project (https://zeppelin.incubator.apache.org/) is certainly one such promising notebook-style interface for performing advanced interactive querying of Hadoop data (whether via Hive, Spark, Shell or other scripting languages).

At the time of writing Zeppelin is not completely mature, for example – it lacks the ability to connect to a Kerberos secured Hive service, which may make things difficult in an enterprise environment.  Nonetheless it’s worth the look as a new type of workflow for data scientists and other data analysts.

Setup

Binaries for Zeppelin can be obtained here (version 5.5):
https://zeppelin.incubator.apache.org/download.html

Startup steps

Normal startup steps for Zeppelin are:

cd ~/zeppelin-0.5.5-incubating-bin-all
bin/zeppelin-daemon.sh restart

If using WASB (Windows Azure Blob Storage), however, as the default Hadoop filesystem, this step should be run before starting the Zeppelin daemon as per the above step.  Adding these JARs to the Classpath tells Zeppelin how to read from the WASB filesystem:

export CLASSPATH=.:/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar:/usr/hdp/2.3.0.1-3/hadoop/hadoop-azure-2.7.1.2.3.0.1-3.jar

Config

The following config should be set for running spark on as a Yarn job on an existing HDP2.3 Hadoop cluster (via the Interpreter tab in Zeppelin):

master=yarn-client
spark.driver.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.home=/usr/hdp/current/spark-client

zeppelin_spark_on_yarn
Spark on Yarn config for Zeppelin Spark interpreter

Results

Running the following code in a Zeppelin Notebook should succeed:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

zeppelin_start_spark_sql_context.png

And the Spark instance should be kicked off and remain running in the Yarn Resource Manager:

zeppelin_spark_on_yarn_rm.png

Troubleshooting

Zeppelin may have trouble reading from the WASB filesystem if the above classpath is not added prior to starting Zeppelin:

java.io.IOException: No FileSystem for scheme: wasb
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)

zeppelin_wasb_error.png

Thanks to the Microsoft support team for assisting with finding the right JAR files to add to the Classpath!

Selecting maximum of two columns – timestamp and sequence in Hive

Sometimes it is useful to sort data by two columns and get the maximum of both columns for each record (column A then column B).

An example is a table with a logical primary key and an additional timestamp field and a sequence number field.  This could be the case, for example if data is coming from a change-data capture ETL tool, where multiple changes (inserts, updates, deletes) may be present for a single record.  The timestamp could denote the batch date the ETL tool extracted the records, and within each batch there could also be a sequence number, where the highest sequence number in the highest timestamp denotes the latest version of the record.

E.g. the final record here (where f1 happens to be ‘z’) is the latest record, with a timestamp of 3 and a sequence of 2:

Hive test table with composite logical primary key and a timestamp and sequence field
Hive test table with composite logical primary key and a timestamp and sequence field

Below are compared two options for achieving this in HiveQL – using two nested maximum aggregations and one which is a single pass aggregation of a named structure.

Creating a test table and data

create table if not exists test
(
pk1 string,
pk2 string,
pk3 string,
f1 string,
ts int,
sequence int
)
stored as orc;

insert into table test values (‘a’,’a’,’a’,’x’,1,1);
insert into table test values (‘a’,’a’,’a’,’y’,1,2);
insert into table test values (‘a’,’a’,’a’,’y’,1,3);
insert into table test values (‘a’,’a’,’a’,’y’,2,1);
insert into table test values (‘a’,’a’,’a’,’y’,3,1);
insert into table test values (‘a’,’a’,’a’,’z’,3,2);

select max(mysortstruct(ts,ts,sequence, sequence)).f1 from test;

Option 1 – runtime 37 seconds

set hive.execution.engine=tez;

select t1.pk1,t1.pk2,t1.pk3,t1.ts, max(t1.sequence) as maxseq
from
test t1
join
(select
pk1,pk2,pk3,max(ts) as maxts
from test
group by pk1, pk2, pk3) t2
on
t1.pk1=t2.pk1 and
t1.pk2=t2.pk2 and
t1.pk3=t2.pk3 and
t1.ts=t2.maxts
group by t1.pk1,t1.pk2,t1.pk3,t1.ts
;

 Hive DAG - Two nested maximum aggregations

Option 2 – runtime 11 seconds

set hive.execution.engine=tez;

select
pk1,
pk2,
pk3,
max(named_struct(‘ts’,ts,’sequence’, sequence)).ts,
max(named_struct(‘ts’,ts,’sequence’, sequence)).sequence
from test
group by pk1, pk2, pk3;

Hive DAG - Named struct maximum aggregation

Note — max(named_struct(‘ts’,ts,’sequence’, sequence)).ts basically tells Hive “get me the latest sequence number for the latest timestamp” and then output the timestamp of that record.

The Result

Both option 1 and 2 produce the correct result —

Hive query - maximum sequence within a timestamp

If we look up these values in our original table we see that this corresponds to the latest record according to timestamp and then sequence.  In practice, we could use this result-set to look up (via a join) non-key or attribute fields in a larger table.

It can be seen from the above via the much shorter runtime and simpler Tez execution graph that named structures (see here for Hive documentation on named structures) can help us with the timstamp + sequence use case, and any situation where we need the maximum of two columns for each logical primary key combination.  This is because it gets this information in a single map-reduce pass over the dataset and does not need to operate on an intermediate dataset once the maximum of the first column has been found for each record, to then get the timestamp.

Permission denied and org.apache.hadoop.util.DiskChecker$DiskErrorException errors after Kerberising Hadoop cluster

Background

Kerberizing a Hadoop cluster enables a properly authorised user to access the cluster without entering of username / password details.  For example (after running a kinit command and starting the beeline JDBC client):

beeline>  !connect jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;

Connecting to jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;

Enter username for jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;: myusername

Enter password for jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;: ************
Connected to: Apache Hive (version 1.2.1.2.3.0.1-3)
Driver: Hive JDBC (version 1.2.1.2.3.0.1-3)
Transaction isolation: TRANSACTION_REPEATABLE_READ

Despite the successful login above, two errors occurred subsequently when running Hive queries.

First error (permission denied)

1: jdbc:hive2://hdplinux1.company.internal:10000/default> select a,b from c where a=1;

INFO  : Tez session hasn’t been created yet. Opening session
ERROR : Failed to execute tez graph.
org.apache.tez.dag.api.SessionNotRunning: TezSession has already shutdown. Application application_1441612826389_0022 failed 2 times due to AM Container for appattempt_1441612826389_0022_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hdplinux1.company.internal:8088/cluster/app/application_1441612826389_0022Then, click on links to logs of each attempt.
Diagnostics: Application application_1441612826389_0022 initialization failed (exitCode=255) with output: main : command provided 0
main : run as user is hive
main : requested yarn user is hive
Can’t create directory /var/log/hadoop/yarn/local/usercache/hive/appcache/application_1441612826389_0022 – Permission denied
Did not create any app directories

Failing this attempt. Failing the application.
at org.apache.tez.client.TezClient.waitTillReady(TezClient.java:678)
at org.apache.hadoop.hive.ql.exec.tez.TezSessionState.open(TezSessionState.java:205)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.updateSession(TezTask.java:239)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:137)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask (state=08S01,code=1)

Workaround:

The above error was fixed by renaming the local application cache directory on each datanode:

su –
mv /var/log/hadoop/yarn/local/usercache/hive/appcache appcache.bak

A new appcache directory will get created when re-running the hive query.  Note – this step was performed in a development cluster with no other users, so may have more harmful effects in a running cluster!

Second error (org.apache.hadoop.util.DiskChecker$DiskErrorException)

After the above workaround was applied a new error appeared when executing the Hive query:

1: jdbc:hive2://hdplinux1.company.internal:10000/default> select a,b from c where a=1;

INFO  : Tez session hasn’t been created yet. Opening session
ERROR : Failed to execute tez graph.
org.apache.tez.dag.api.SessionNotRunning: TezSession has already shutdown. Application application_1441612826389_0036 failed 2 times due to AM Container for appattempt_1441612826389_0036_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hdplinux1.company.internal:8088/cluster/app/application_1441612826389_0036Then, click on links to logs of each attempt.
Diagnostics: Application application_1441612826389_0036 initialization failed (exitCode=255) with output: main : command provided 0
main : run as user is hive
main : requested yarn user is hive
org.apache.hadoop.util.DiskChecker$DiskErrorException: Cannot create directory: /var/log/hadoop/yarn/local/usercache/hive/filecache/0/11603
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:105)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.download(ContainerLocalizer.java:199)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.localizeFiles(ContainerLocalizer.java:241)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.runLocalization(ContainerLocalizer.java:169)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.main(ContainerLocalizer.java:372)

Failing this attempt. Failing the application.
at org.apache.tez.client.TezClient.waitTillReady(TezClient.java:678)
at org.apache.hadoop.hive.ql.exec.tez.TezSessionState.open(TezSessionState.java:205)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.updateSession(TezTask.java:239)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:137)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask (state=08S01,code=1)

Workaround:

This second error was fixed by renaming the local filecache directory on each datanode:

su –
mv /var/log/hadoop/yarn/local/usercache/hive/filecache filecache.bak

A new filecache directory will get created when re-running the hive query. Again note that the impact on a running cluster is uncertain as other jobs may be actively using files in these local cache directories.

After performing the above steps, the original hive query now reruns successfully.

Further info

Vinod Vavilapalli and Omakar Vinit Joshi from Hortonworks describe the role of the appcache and filecache directories in their post on Resource Localization in Yarn.  They describe how resources are localised to Yarn application nodes for performance reasons and downloaded files may be found in different local directories depending on categorisation.  For example – application specific files are found in <local-dir>/usercache/<userid>/appcache/<app-id>/
and private (user-specific) files are found in <local-dir>/usercache/<userid>/filecache .

Using Azure Blob storage with Hadoop

Cloud providers such as Amazon (AWS) and Microsoft (Azure) provide fault-tolerant distributed storage services which can literally “take the load” off a Hadoop installation, providing some compelling advantages.  In the case of Microsoft Azure’s blob storage, however, this is not without its pitfalls.

With the release of Hadoop version 2.7.0 (and vendor packaged versions such as Hortonworks HDP 2.3) Windows Azure Blob storage can be used as either default or secondary storage for Hadoop as instead of HDFS.  See Alexei Khalyako’s description of how to configure both of these options here.

These are some benefits of using Blob storage instead of HDFS (see also Microsoft’s opinion):

  • Separate storage from compute – data can exist with 1, 10, 1000 or even zero Hadoop nodes, meaning compute resources can be scaled freely as there is no reliance on having HDFS services running on a minimum number of nodes with locally attached disks in order to simply access the data.  Equally there is no need to “rebalance” data when nodes are added or removed from the cluster.
  • Relatively Low cost of storage – at the time of writing this is roughly $414.10 AUD per Terabyte per year (see here).  This is quite impressive given the cost of hardware / electricity required to maintain this data even on commodity equipment.
  • Automatic replication – Azure storage can be replicated long-distance with the click of a mouse (by choosing geographically replicated storage).  This means the Azure cloud layer will take care of replicating data to another of its data-centres for disaster recovery purposes.  With the cheapest form of replication (local replication) data is stored 3 times in a single data centre, ensuring High Availability in the event of a single disk failure in the azure Datacentre.  With geo-replicated storage the data is also copied another 3 times into a secondary datacentre (although it would require a declaration from Microsoft to make the secondary copy accessible after a disaster – something completely out of an Azure customer’s control).

Questions you might ask about using Azure Blob storage instead of HDFS:

Q: Wouldn’t it be really slow having an on-premise Hadoop cluster connecting to storage accessibly only over the internet via TCP/IP?
A: Yes.  For this reason this architecture is not recommended.  Instead, it’s worth thinking about Azure storage only for clusters which are stood up in Azure itself (as VM’s or as the platform as a service offering HDInsight).  It is assumed the TCP/IP connectivity within an Azure datacentre is fast enough not to worry about network bottlenecks – i.e. from machines to storage, even despite being over TCP/IP.

Q: Hadoop is all about moving compute closer to storage – doesn’t using blob go against this principle?
A: Microsoft’s answer to this seems to be that the backbone connecting Azure compute VMs to blob storage should provide performance similar to what would be seen with locally attached disks (see Cindy Gross’s useful blog post here).  In other words, the architecture is like having a big disk attached to many nodes simultaneously and directly.

Q: If a blob storage account behaves like a hard disk, won’t it get overloaded with multiple nodes connecting to it simultaneously?
A: No – luckily per Microsoft blob storage accounts apparently do not behave like disks.  The performance characteristics of blob storage allow many Hadoop nodes to be simultaneously reading / writing data.  Microsoft claims a target of 60MB/s throughput (see here) per blob which might correspond to a single chunked file of a Hive table, as well as 15 Gbps overall read performance for a single storage account – i.e. allowing for approx 31 nodes, each reading from a blob account simultaneously at 60MB/s.

Pitfalls of blob storage instead of HDFS?

A very significant pitfall of using blob storage with Hadoop (despite the above advantages) is that whilst…:

“File owner and group are persisted, but the permissions model is not enforced.” (https://hadoop.apache.org/docs/stable/hadoop-azure/index.html)

This presents enormous challenges at the enterprise level in providing access to multiple users or even self-service access to unstructured or semi-structured data in a Hadoop-based data lake.  Given the frequent need to protect sensitive data within an organisation (e.g. customer, employee, financial data) it seems a severe limitation that the Hadoop interface to the blob storage APIs has not been augmented with the ability to enforce the file and folder permissions which it so dutifully records!

An example of the problem can be seen here by comparing native HDFS storage behaviour with blob storage behaviour (both when acting as the default hadoop filesystem):

Using HDFS – Authorisations working correctly:

[azureuser@hdplinux4 tmp]$ id
uid=500(azureuser) gid=500(azureuser) groups=500(azureuser)
[azureuser@hdplinux4 tmp]$ hdfs dfs -ls /tmp/testonly/
Found 1 items
-rw-------   1 hdfs hdfs         13 2015-08-26 00:57 /tmp/testonly/test.txt
[azureuser@hdplinux4 tmp]$ pwd
/tmp
[azureuser@hdplinux4 tmp]$ hdfs dfs -copyToLocal /tmp/testonly/test.txt
copyToLocal: Permission denied: user=azureuser, access=READ, inode="/tmp/testonly/test.txt":hdfs:hdfs:-rw-------

Using Blob – Authorisations are completely ignored (despite being visible via a hdfs dfs -ls command, permissions which say that only user hdfs should be allowed to read test.txt are completely ignored when user azureuser tries to copy the file):

azureuser@hdplinuxblob:/tmp> id
uid=1002(azureuser) gid=100(users) groups=100(users),16(dialout),33(video)
azureuser@hdplinuxblob:/tmp> hdfs dfs -ls /tmp/testonly/test.txt
-rw-------   1 hdfs hdfs         13 2015-08-26 01:00 /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> hdfs dfs -copyToLocal /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> ls -la test.txt
-rw-r--r-- 1 azureuser users 13 Aug 26 01:09 test.txt

Only time will tell whether Microsoft will rectify the severe gap currently in the Azure Blob storage integration into Hadoop.  There is some indication that they intend to close the gap with the Azure Data Lake service due for eventual release, which promises compatibility with many flavours of Hadoop (e.g. Hortonworks and Cloudera) as well as integration into Active Directory to allow for files and folders to be secured.  The challenge still remains, however, of providing a security mechanism which is compatible with the wider Hadoop ecosystem, and this gives pause to think about the choosing blob over HDFS when it should otherwise be an easy decision.

Managing Yarn memory with multiple Hive users

Out of the box (e.g. a standard Hortonworks HDP 2.2 install), Hive does not come configured optimally to manage multiple users running queries simultaneously.  This means it is possible for a single Hive query to use up all available Yarn memory, preventing other users from running a query simultaneously.

This high memory consumption can be observed via the resource manager HTTP management screen – e.g. http://<resourcemanagerIP>:8088/cluster

Almost all yarn memory used
Almost all yarn memory used

Also in Ambari…

Yarn used memory at 100%
Yarn used memory at 100%

Minimum queue memory per user

To guarantee the ability for more users to run Hive queries simultaneously (assuming capacity scheduler is used with default queue configuration), we can make a simple config settings change via Ambari:

Ambari Yarn config for capacity scheduler
Ambari Yarn config for capacity scheduler

Change from:

yarn.scheduler.capacity.root.default.user-limit-factor=1

To:

yarn.scheduler.capacity.root.default.user-limit-factor=0.33

This now means that each user of Hive will now receive a maximum of a third (or close to it) of Yarn memory resources.

Only a third of yarn memory used
Only a third of yarn memory used
Yarn used memory at 39%
Yarn used memory at 39%

This enables a better user experience for multi-user interactive querying of Hive – for example, by enabling 2-3 users to simultaneously use the cluster.

Another option

There is, however, one potential disadvantage to the above — namely cluster memory is potentially being wasted (by not being allocated) if the job queue contains only a single user’s jobs.  A related parameter change can alleviate this – namely by setting:

yarn.scheduler.capacity.default.minimum-user-limit-percent=33

The “minimum user limit percent” means that each user is guaranteed a certain percentage of the yarn job queue’s memory if there is a mix of different users’ jobs waiting in the queue.  In other words, 3 users will each get 33% of the queue memory for execution if their jobs are all waiting in the queue at the same time. If however, there is only one user with jobs waiting in the queue, his / her jobs will execute and consume all available memory in the queue.  For User A this means a better use of memory overall, but possibly at the expense of User B who might return from their lunch break and must wait for one of User A’s jobs to finish before getting the guaranteed percentage memory allocation.

Finding the balance

The above, along with other parameters can be used to ensure users make the most of available cluster memory but do not effectively lock out other users by filling the queue with long running jobs.

For example – these settings allow a single user to use up to 90% of available yarn queue memory, and up to 4 users (each with 25%) to eventually be running in the cluster (the 5th, 6th, 7th users will have to wait for other users’ jobs to be fully completed):

yarn.scheduler.capacity.root.default.user-limit-factor=0.90
yarn.scheduler.capacity.default.minimum-user-limit-percent=25

Sparkling-water – keeping the web UI alive

Spark is a great way to make use of the available RAM on a Hadoop cluster to run fast in-memory analysis and queries, and H2O is a great project for running distributed machine learning algorithms on data stored in Hadoop.  Together they form “Sparkling Water” (Spark + H2O, obviously!).

Easy to follow instructions for setting up Sparkling Water are available here: http://h2o-release.s3.amazonaws.com/sparkling-water/master/103/index.html

Running spark on Yarn is a good way to utilise an existing Hadoop cluster, however it’s challenging using the “live” method below to keep the Sparkling Water H2O Flow interface running permanently.  Doing so can let a number of data scientists use the notebook style interface to run machine learning tasks.  Luckily, using the spark-submit invocation with the water.SparklingWaterDriver class can ensure the web UI remains online even after the shell session which kicked it off exits (see below Persistent method).

Live method – doesn’t stay online after exiting shell session

  1. Create a shell script:

    #!/bin/bash
    export SPARK_HOME=’/usr/hdp/current/spark-client/’
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export MASTER=”yarn-client”
    sparkling-water-1.3.5/bin/sparkling-shell –num-executors 3 –executor-memory 2g –master yarn-client

  2. Run sparkling-shell

    import org.apache.spark.h2o._
    val h2oContext = new H2OContext(sc).start()
    import h2oContext._

Persistent method – stays online even after exiting shell session

To start a “persistent” H2O cluster on Yarn (i.e. one which doesn’t exit immediately) simply run this command at the command line of a node where the spark client and sparkling water is installed:

nohup bin/spark-submit –class water.SparklingWaterDriver –master yarn-client –num-executors 3 –driver-memory 4g –executor-memory 2g –executor-cores 1 ../sparkling-water-0.2.1-58/assembly/build/libs/*.jar &

The Spark UI should be available on it’s usual port (http://XXX.XXX.XXX.XXX:54321) and should remain there even if the shell session which started the UI dies!

Thanks to the helpful and responsive folks at H2Oai for the above tip (originally answered here)!

Avoiding “add jar” to load custom SerDe when using Excel or Beeswax on Hortonworks Hadoop

Intro – analysing tweets with Hive

Following various tutorial examples online (e.g. Hortonworks – How To Refine and Visualize Sentiment Data and Microsoft – Analyze Twitter data using Hive in HDInsight) it is possible to expose semi structured Twitter feed data in tabular format via Hadoop and Hive.  Once the data is available in Hive it is possible to visualise this via Excel and the Power BI toolset.

Even after getting this data from Twitter’s stream API or a small sample from Twitter’s API testing development console, a custom SerDe is required for Hive to be able to read an external table based on JSON files.  This SerDe takes the form of a Jar file which must be made available whenever the hive query is executed.  Usually via this command:

add jar json-serde-1.3-jar-with-dependencies.jar;

select * from vw_tweets […]

PS – The jar in question in Roberto Congiu’s JSON SerDe (binaries available here, and CDH5 version appears to be compatible with Hortonworks).

The Problem?

In the case of client tools such as Hive and Beeswax, it’s necessary to execute the “add jar” command before querying any tables based on the custom SerDe, otherwise, errors such as this occur:

hive> select * from vw_tweets_hashtagsPerDay limit 2;

FAILED: RuntimeException MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found)

Similarly, errors such as this occur when using the Hortonworks Hive ODBC driver in Excel 2013 with Powerquery, and it is not otherwise possible to manually load the JAR in the Excel ODBC connection query:

DataSource.Error: ODBC: ERROR [HY000] [Hortonworks][HiveODBC] (35) Error from Hive: error code: ‘40000’ error message: ‘Error while compiling statement: FAILED: RuntimeException MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found)’.
Details:
Message=ERROR [HY000] [Hortonworks][HiveODBC] (35) Error from Hive: error code: ‘40000’ error message: ‘Error while compiling statement: FAILED: RuntimeException MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found)’.
ErrorCode=-2146232009

The solution

It is possible to fix the above issues by loading the SerDe JAR file to a location on the local filesystem of the Hadoop node running the Hiveserver2, e.g.:

/home/hive/json-serde-1.3-jar-with-dependencies.jar

Two config changes must then be made to the cluster:

  1. Add this line to the end of hive-env.sh:
    export HIVE_AUX_JARS_PATH=${HIVE_AUX_JARS_PATH}:/home/hive/json-serde-1.3-jar-with-dependencies.jar

    hive-env

  2. Add this config to the custom hive-site.xml file:

    hive.aux.jars.path = file:///home/hive/json-serde-1.3-jar-with-dependencies.jar
    hive-aux-jars-path

  3. Restart Hive components.

Finally – confirm ODBC is working by querying your Hive table / view in Excel or Beeswax!  It should work without first running the “add jar” command, because the custom SerDe responsible for reading the JSON files is now loaded automatically when a session is opened.

Visualising Solar Generation Data in a Custom Histogram using D3.js

Using the “brush” feature of the D3 Javascript library again proves handy for creating an interactive, animated histogram.  This type of visualisation helps to analyse and explore the distribution of time-series data.

For this demo, home solar PV generation data has been obtained from United Energy’s Energy Easy portal in CSV format.  For the sake of convenience in dealing with the raw data which usually comes in half-hourly intervals, this data has been loaded in to a Pentaho data warehouse instance (more details in a later post, perhaps!) and converted to day-by-day figures.

D3 - Interactive Histogram - Preparing Solar PV generation data with Saiku
Preparing Solar PV generation data with Saiku

Analysis can help explore whether solar panels are getting less efficient over time, or even determine what a “good” day of production is like in summer vs winter (by looking at the relevant frequency of each in the histogram).

Features:

  • Drag and scroll date region which affects histogram above:
    D3 - Interactive Histogram - Selectable Date Range
  • Changeable histogram buckets:
    D3 - Interactive Histogram - Adjustable Buckets
  • Snap-shotting of one selected date range for visual comparison with another (e.g. summer vs winter comparison):
    D3 - Interactive Histogram - Summer vs Winter Comparison

Key techniques – how it’s achieved:

  1. Data is embedded into the HTML page:

    <pre id=”csvdata”>
    Day,Quantity
    2013-01-01,9.288
    2013-01-02,10.494
    2013-01-03,7.376

    2014-12-29,3.832
    2014-12-30,7.752
    2014-12-31,7.212
    </pre>

    It’s hidden visually using the CSS display:none directive…

    #csvdata {
    display: none;
    }

    …and then read into a Javascript variable for display and computation via D3.

    var raw = d3.select(“#csvdata”).text();
    var dataset_copy = d3.csv.parse(raw);

  2. Maximum value is found in the selected region of the dataset:

    max = d3.max(dataset, function (d) {    return +d.Quantity;});

    …histogram is created based on maximum value:

    // Generate a histogram using uniformly-spaced bins.
    datasetHist = d3.layout.histogram()
    .frequency(false)

    .range([0, max])
    .bins(buckets)
    (dataset.map(function (d) {

    return d.Quantity;
    }));

  3. An HTML form input slider is used for selection of number of buckets:

        240px; text-align: right; font-size: 10px;font-family: sans-serif;”>Number of buckets: 10

    </label>
    nBuckets”>

    Value of slider is passed into Javascript (to variable “buckets”) via this code:

    d3.select(“#nBuckets”).on(“input”, function () {
    buckets = +this.value;
    d3.select(“#nBuckets-value”).text(+this.value);
    clearSnapshot();
    render();
    });

Demo / code:

Try out the demo and check out the code here:
http://jsfiddle.net/hzmj24d1/

Re-pivoting data using OpenRefine’s Columns to Rows feature

Problem

A frequent challenge for transforming time-series data (e.g. weather, meter data) is changing columns representing multiple times of the day to a single column or in OLAP terms what might generically be described as an “Hour of the Day” or “Interval” dimension.

Example input schema:

Date, 00:00, 00:30, 01:00, 01:30, ..., 23:00, 23:30
01-01-2015,0,0,0,1,...,1,0
02-01-2015,0,0,1,2,...,2,1

Example output schema:

Date, Interval, Value
01-01-2015,00:00,0
01-01-2015,00:30,0
...
02-01-2015,23:00,2
02-01-2015,23:30,1

Luckily, this is quite straightforward with a tool designed for dealing with messy tabular (e.g. CSV) data such as the free, web-based tool OpenRefine.

Solution

  1. Open the input CSV file in OpenRefine:

    OpenRefine Demo - Columns to Rows - Before

  2. Right-click one of the interval / time-based columns and select Transpose > Transpose cells across columns into rows:

    OpenRefine Demo - Columns to Rows - Transpose

  3. Select all the interval columns and select a new “one column” called Interval to transpose these into.  Also select the option to prepend “:” to each new cell (this is useful in a subsequent step):

    OpenRefine Demo - Columns to Rows - Columns to Rows

    Measure columns have now become a single column called “Interval” and the number of rows has multiplied greatly:

    OpenRefine Demo - Columns to Rows - Semicolon character

  4. Now, use Edit column > Split into several columns to ensure the Interval name and Value figures are separate:

    OpenRefine Demo - Columns to Rows - Split into columns

    Select “:” as the separator and click OK:
    OpenRefine Demo - Columns to Rows - Split into columns 2

    …and Voilà — after renaming the new column, the data is now in a pivoted form with only a single “Value” column containing the measure for the day and interval:

    OpenRefine Demo - Columns to Rows - After