Workaround for com.microsoft.aad.adal4j.AuthenticationException when accessing SQL Server table via Active Directory in Databricks

Symptom

When using Databricks 5.5 LTS to read a table from SQL Server using Azure Active Directory (AAD) authentication, the following exception occurs:

Error : java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException Error : java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.getFedAuthToken(SQLServerConnection.java:3609)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.onFedAuthInfo(SQLServerConnection.java:3580)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.processFedAuthInfo(SQLServerConnection.java:3548)
 at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onFedAuthInfo(tdsparser.java:261)
 at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:103)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.sendLogon(SQLServerConnection.java:4290)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.logon(SQLServerConnection.java:3157)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection.access$100(SQLServerConnection.java:82)
 at com.microsoft.sqlserver.jdbc.SQLServerConnection$LogonCommand.doExecute(SQLServerConnection.java:3121)
 at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
 at ...io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
 at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.microsoft.aad.adal4j.AuthenticationException
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 59 more...

Cause

https://github.com/Azure/azure-sqldb-spark/issues/28

Workaround steps

1 – Create a new init script which will remove legacy MSSQL drivers from the cluster. The following commands create a new directory on DBFS and then create a shell script with a single command to remove mssql driver JARs:

%sh
mkdir /dbfs/myInitScriptDir
echo "rm /databricks/jars/*mssql*" > /dbfs/myInitScriptDir/myInitScript.sh

2 – Add the cluster init script in Clusters > Cluster > Edit > Advanced Options:

3 – Add the following two libraries to the cluster via Clusters > Cluster > Libraries > Install new:

com.microsoft.azure:adal4j:1.6.5
com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre8

4 – Restart the cluster.

5 – Run the following R code in aworkbook cell to validate that AAD authentication is working. NB – Replace the placeholder values in bold:

library(sparklyr)

connection <- spark_connect(method = "databricks")

x <- spark_read_jdbc(
connection,
name = 'mytemptable',
options = list(
url = 'jdbc:sqlserver://myazuresqlserver.database.windows.net:1433;database=myazuresqldatabase;authentication=ActiveDirectoryPassword;',
driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
user = 'myuser@example.com',
password = 'XXXXXXXX',
hostNameInCertificate = '*.database.windows.net',
dbtable = 'dbo.mytable'
)
)

x

After running the command “x” above, the table data should be displayed.

Conclusion

The Azure SQL Database table can now be read and the AuthenticationException no longer occurs:

Successful table query after spark_read_jdbc()

Credit: This workaround is based on thereverand‘s very helpful post on GitHub here.

Automatically tagging, captioning and categorising locally stored images using the Azure Computer Vision API

It’s easy in the digital age to amass tens of thousands of photos (or more!). Categorising these can be a challenging task, let alone searching through them to find that one happy snap from 10 years ago.

Significant advances in machine learning over the past decade have made it possible to automatically tag and categorise photos without user input (assuming a machine learning model has been pre-trained). Many social media and photo sharing platforms make this functionality available for their users — for example, Flickr’s “Magic View”.  What if a user has a large number of files stored locally on a Hard Disk?

The problem

  • 49,049 uncategorised digital images stored locally
  • Manual categorisation
  • No easy way to search (e.g. “red dress”, “mountain”, “cat on a mat”)

The solution

Steps

  1. Obtain a Microsoft Azure cloud subscription (note – Azure is not free, however free trials may be available):
    https://azure.microsoft.com/en-us/free/
  2. Start a cognitive services account from the Azure portal and take note of one of the “Keys” (keys are interchangeable):
    https://portal.azure.com/
    computer_vision-azure_keys
  3. Log in to your Linux machine and ensure you have python3 installed:
    user@host.site:~> which python3
    /usr/bin/python3
  4. Ensure you have these python libraries installed:
    sudo su -
    pip3 install python-xmp-toolkit
    pip3 install argparse
    pip3 install Pillow
    exit
  5. Obtain a copy of the image-auto-tag script:
    git clone https://github.com/niftimusmaximus/image-auto-tag
  6. Automatically tag, caption and categorise an image (e.g. image.jpg):
    cd image-auto-tag
    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
      --captionConfidenceLevel 0.50 --tagConfidenceLevel 0.5
      --categoryConfidenceLevel 0.5 image.jpg

    Note – replace key with one of the ones obtained from the Azure Portal above

    Script will process the image:

    INFO: [image.jpg] Reading input file 1/1                                                                                                                      
    INFO: [image.jpg] Temporarily resized to 800x600                                                                                                              
    INFO: [image.jpg] Uploading to Azure Computer Vision API
                      (length: 107330 bytes)                                                                               
    INFO: [image.jpg] Response received from Azure Computer Vision API
                      (length: 1026 bytes)                                                                       
    INFO: [image.jpg] Appended caption 'a river with a mountain in the
                      background' (confidence: 0.67 >= 0.50)                                                     
    INFO: [image.jpg] Appended category 'outdoor_water'
                      (confidence: 0.84 >= 0.50)                                                                                
    INFO: [image.jpg] Appending tag 'nature' (confidence: 1.00 >= 0.50)                                                                                           
    INFO: [image.jpg] Appending tag 'outdoor' (confidence: 1.00 >= 0.50)                                                                                          
    INFO: [image.jpg] Appending tag 'water' (confidence: 0.99 >= 0.50)                                                                                            
    INFO: [image.jpg] Appending tag 'mountain' (confidence: 0.94 >= 0.50)                                                                                         
    INFO: [image.jpg] Appending tag 'river' (confidence: 0.90 >= 0.50)                                                                                            
    INFO: [image.jpg] Appending tag 'rock' (confidence: 0.89 >= 0.50)                                                                                             
    INFO: [image.jpg] Appending tag 'valley' (confidence: 0.75 >= 0.50)                                                                                           
    INFO: [image.jpg] Appending tag 'lake' (confidence: 0.60 >= 0.50)                                                                                             
    INFO: [image.jpg] Appending tag 'waterfall' (confidence: 0.60 >= 0.50)                                                                                        
    INFO: [image.jpg] Finished writing XMP data to file 1/1
  7. Verify the results:
    Auto tagging

    computer_vision-keyword_search
    API has applied “tags” which can be searched

    Auto captioning

    computer_vision-auto_caption
    API has captioned this image as “a beach with palm trees”

    Auto categorisation

    "plant_tree" hierarchical category has been applied
    API has applied the category “plant_tree” to this image

    Note – please see here for the API’s 86 category taxonomy

Script features

  • Writes to standard XMP metadata tags within JPG images which can be read by image management applications such as XnView MP and digiKam
  • Sends downsized images to Azure to improve performance

    Example
    – only send image of width 640 pixels (original image will retain its dimensions)

    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    --azureResizeWidth 640 image.jpg
  • Allows customisation of thresholds for tags, description and caption. This is useful because whilst good, the API is not perfect!

    Example – only caption image if caption confidence score from API is 0.5 or above:

    ./image-auto-tag.py --key XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    --captionConfidenceLevel 0.5 image.jpg

Useful queries for the Hive metastore

Hive metastore tables

The Hive metastore stores metadata about objects within Hive.  Usually this metastore sits within a relational database such as MySQL.

Sometimes it’s useful to query the Hive metastore directly to find out what databases, tables and views exist in Hive and how they’re defined. For example, say we want to expose a report to users about how many Hive tables are currently in a Hadoop cluster.  Or perhaps we want to run a script which performs some bulk operation on all tables in a particular Hive database.

Luckily, it’s easy to query the metastore using a tool such as MySQL Workbench using appropriate connectors – e.g. MySQL JDBC drivers.

Here’s a rough database diagram showing how the Hive metastore hangs together:

Hive metastore database diagram (ERD)
Hive metastore database diagram (from HDP 2.3, click here for full screen)

Handy metastore SQL queries

Show all Hive databases

SELECT * FROM hive.DBS;

Output:

DB_ID DESC DB_LOCATION_URI NAME OWNER_NAME OWNER_TYPE
1 Default Hive database hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse default public ROLE
6 NULL hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/xademo.db xademo hive USER

 List tables in a given database

SELECT t.* FROM hive.TBLS t
 JOIN hive.DBS d
 ON t.DB_ID = d.DB_ID
 WHERE d.NAME = 'default';

Output:

TBL_ID CREATE_TIME DB_ID LAST_ACCESS_TIME OWNER RETENTION SD_ID TBL_NAME TBL_TYPE VIEW_EXPANDED_TEXT VIEW_ORIGINAL_TEXT LINK_TARGET_ID
1 1439988377 1 0 hue 0 1 sample_07 MANAGED_TABLE NULL NULL NULL
2 1439988387 1 0 hue 0 2 sample_08 MANAGED_TABLE NULL NULL NULL

Show the storage location of a given table

SELECT s.* FROM hive.TBLS t
JOIN hive.DBS d
ON t.DB_ID = d.DB_ID
JOIN hive.SDS s
ON t.SD_ID = s.SD_ID
WHERE TBL_NAME = 'sample_07'
AND d.NAME='default';

Output:

SD_ID CD_ID INPUT_FORMAT IS_COMPRESSED IS_STOREDASSUBDIRECTORIES LOCATION NUM_BUCKETS OUTPUT_FORMAT SERDE_ID
1 1 org.apache.hadoop.mapred.TextInputFormat 0 0 hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/sample_07 -1 org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 1

Find out how a given view has been defined

SELECT t.* FROM hive.TBLS t
JOIN hive.DBS d
ON t.DB_ID = d.DB_ID
WHERE TBL_NAME = 'vw_sample_07'
AND d.NAME='default';

Output:

TBL_ID CREATE_TIME DB_ID LAST_ACCESS_TIME OWNER RETENTION SD_ID TBL_NAME TBL_TYPE VIEW_EXPANDED_TEXT VIEW_ORIGINAL_TEXT LINK_TARGET_ID
31 1471788438 1 0 hue 0 31 vw_sample_07 VIRTUAL_VIEW select count(*) from `default`.`sample_07` select count(*) from default.sample_07 NULL

Get column names, types and comments of a given table

SELECT c.* FROM hive.TBLS t
 JOIN hive.DBS d
 ON t.DB_ID = d.DB_ID
 JOIN hive.SDS s
 ON t.SD_ID = s.SD_ID
 JOIN hive.COLUMNS_V2 c
 ON s.CD_ID = c.CD_ID
 WHERE TBL_NAME = 'sample_07'
 AND d.NAME='default'
 ORDER by INTEGER_IDX;

Output:

CD_ID COMMENT COLUMN_NAME TYPE_NAME INTEGER_IDX
1 NULL code string 0
1 NULL description string 1
1 NULL total_emp int 2
1 NULL salary int 3

Conclusion

It’s possible to query metadata from the Hive metastore which can be handy for understanding what data is available in a Hive instance.  It’s also possible to edit this information too, although this would usually be inadvisable as the schema of the metastore may be subject to change between different Hive versions, and the results of modifying Hive internals could be unexpected at best, and catastrophic at worst.

Python + JDBC = Dynamic Hive scripting

Working with Hive can be challenging without the benefit of a procedural language (such as T-SQL or PL/SQL) in order to do things with data in between Hive statements or run dynamic hive statements in bulk.  For example – we may want to do a rowcount of all tables in one of our Hive databases, without having to code a fixed list of tables in our Hive code.

We can compile Java code to run queries against hive dynamically, but this can be overkill for smaller requirements. Scripting can be a better way to code more complex Hive tasks.

Python to the rescue

Python code can be used to execute dynamic Hive statements, which is useful in these sorts of scenarios:

  1. Code branching depending on results of a Hive query – e.g. ensuring Hive query A successfully executes before running Hive query B
  2. Using looked-up data to form a filter in a Hive query – e.g. selecting data from the latest partition in a Hive table without needing to perform a nested query to get the latest partition

There are several Python libraries available for connecting to Hive such as PyHive and Pyhs2 (the latter unfortunately now unmanaged).  Some major Hadoop vendors however decline to support this type of direct integration explicitly.  They do, however, still strongly support ODBC and JDBC interfaces.

Python + JDBC

We can, in fact, connect Python to sources including Hive and also the Hive metastore using the package JayDeBe API. This is effectively a wrapper allowing Java DB drivers to be used in Python scripts.

Example:

  1. The shell code (setting environment variables)

    First, we need to set the classpath to include the library directories where Hive JDBC drivers can be found, and also where the Python JayDeBe API module can be found:

    export CLASSPATH=$CLASSPATH:`hadoop classpath`:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hive-client/*:/usr/hdp/current/hadoop-client/client/*
    export PYTHONPATH=$PYTHONPATH:/home/me/jaydebeapi/build/
  2. The Python code

    Connections can be established to Hive and Hive metastore using jaydebeapi’s connect() method:

    # Connect to Hive
    conn_hive = jaydebeapi.connect('org.apache.hive.jdbc.HiveDriver',
            ['jdbc:hive2://myhiveserver.mydomain.local/default;principal=hive/_HOST@MYDOMAIN.LOCAL;',
            '', ''], '/path/to/hive-jdbc.jar',)
    curs_hive = conn_hive.cursor()
    
    # Connect to Hive metastore
    conn_mysql = jaydebeapi.connect('com.mysql.jdbc.Driver',
            ['jdbc:mysql://metastoremysqlserver.mydomain.local:3306/hive',
             'mysql_username', 'mysql_password'],
            '/path/to/mysql-jdbc-connector.jar',)
    curs_mysql = conn_mysql.cursor()

    A metastore query can be run to retrieve the names of all tables in the default database into an arry (mysql_query_output):

    # Query the metastore to get all tables in defined databases
    mysql_query_string = "select t.TBL_NAME
    from TBLS t join DBS d
    on t.DB_ID = d.DB_ID
    where t.TBL_NAME like '%mytable%'
    and d.NAME='default'"
    
    curs_mysql.execute(mysql_query_string)
    
    mysql_query_output = curs_mysql.fetchall()

    Hive queries can be dynamically generated and executed to retrieve row counts for all the tables found above:

    # Perform a row count of each hive table found and output it to the screen
    for i in mysql_query_output:
            
            hive_query_string = "select '" + i[0] + "' as tabname,
            count(*) as cnt
            from default." + i[0]
    
            curs_hive.execute(hive_query_string)
    
            hive_query_output = curs_hive.fetchall()
    
            print hive_query_output

    Done! Output from Hive queries now should be printed to the screen.

Pros and cons of the solution

Pros:

  • Provides a nice way of scripting whilst using Hive data
  • Basic error handling is possible through Python after each HQL is executed
  • Connection to a wide variety of JDBC compatible databases

Cons:

  • Relies on client memory to store query results – not suitable for big data volumes (Spark would be a better solution on this front, as all processing is done in parallel and not brought back to the client unless absolutely necessary)
  • Minimal control / visibility over Hive query whilst running

Useful date formulas for Hive

Hive comes with some handy functions for transforming dates.  These can be helpful when working with date dimension tables and performing time-based comparisons and aggregations.

e.g. Convert a native Hive date formatted date string:

date_format(myDate,'dd-MM-yyyy')

Return the week number (within the year) of a particular date – i.e. first week of the year is 1, the week of new year’s eve is 52, etc:

weekofyear(myDate)

Other less obvious examples

Current month’s name (e.g. January, February, etc):

date_format(myDate, 'MMMMM')

First date of the current quarter:

cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date)

Last date of the current quarter:

cast(date_add(trunc(add_months(myDate,3-pmod(month(myDate)-1,3)),'MM'),-1) as date)

Day number of the current quarter (e.g. April 2nd is day 2 of the second quarter, December 9th is day 70 of the fourth quarter, etc):

datediff(myDate,cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date))+1

Finding a sequence of events in Hive using analytic functions

Hadoop Hive features several useful functions for efficiently performing analytics over ordered sets of rows — these are known as the windowing and analytics functions.  For example, lead and lag functions can be used to produce rolling averages or cumulative sums over a window of time (e.g. hourly averages for some metric over the preceeding rolling 24 hours).

Another useful feature is the ability to introduce ordinality or sequence into SQL data where there is no strict or predictable sequence field.  This can help us search for chains of events over time.

Example

Imagine a supermarket tracking customer purchases. The following query can be used to find customers who have purchased an Apple in one transaction and in their immediate next transaction, an Orange (assuming transaction_id is a field which increases over time, such as a receipt number):

select x.customer_id from
(
    SELECT customer_id,
    product_name,
    row_number() OVER (
        PARTITION BY customer_id ORDER BY transaction_id
    ) as rn
    FROM default.tbl_product_sales
) x
join
(
    SELECT customer_id,
    product_name,
    row_number() OVER (
        PARTITION BY customer_id ORDER BY transaction_id
    ) as rn
    FROM default.tbl_product_sales
) y
on x.customer_id=y.customer_id
where y.rn=x.rn+1
and x.product_name='Apple'
and y.product_name='Orange';

So, even though transaction_id may not be contiguous or predictable (i.e. a single customer might have consecutive transactions with numbers 1234, 1255, 1257, etc.), we can still use PARTITION BY and ORDER BY to assign a new row number field which is contiguous – whereby each each customer will have their transactions grouped and ordered.  In the above query, rn and rn+1 represent any transaction for a given customer and the transaction immediately afterwards.

 

Creating a custom Hive input format and record reader to read fixed format flat files

Apache Hive is great for enabling SQL-like queryability over flat files.  This is trivial in the case of tabular formatted files such as CSV files where we can set custom row and field delimiters out-of-the-box (e.g. Hive’s inbuilt CSV serde).  Even more complex files can be read and converted to a desired row and column format using Hive’s regexp_extract() function, so long as we can operate on a single row at a time.

The problem

What if the rows we want in Hive aren’t rows in the input files?  That is, we need to read the file as a whole and decode it to produce the output we want to see in Hive.

An example is the Australian Bureau of Meteorology’s ASCII Grid format.  These files are fixed file formats with a header section which effectively describes how to read the file.  In the data section, each data row corresponds to a row of latitude on a map (with starting coordinates identified in the header) and similarly each column defines a line of longitude.  Read as a whole, the file contains a grid of readings of particular weather observations – e.g. rainfall for a given time period:

BOM-grid-data
Example decoding of ASCII grid format file

To read this data in Hive it might be possible to define a table which hard-codes column values to their corresponding longitude, but this leaves the problem of reading simlarly formatted files with a different geographical granularity or different starting position on the globe.  Similarly, we may struggle at the Hive query language layer to determine the appropriate latitude of a given data row in the file.  This is because the header contains the required metadata as to which row in the file corresponds to a certain latitude.

To make the grid data easier to consume in Hive we may wish to transform files into a format such as this:

BOM-grid-data-desired_tabular_format
Geospatial data – desired tabular format

This means we can query a file by filtering on particular lat / long combinations.  One way to transform the file into this format is via creating custom Hive InputFormat and Record Reader Java classes that we can use at query time.

InputFormat / RecordReader vs SerDe

A key distinction when creating custom classes to use with Hive is the following:

  • InputFormat and RecordReader – takes files as input – generates rows
  • SerDe – takes rows as input – generates columns

Here, ASCII grid formatted files cannot be de-serialised row-by-row because there is important information in the header about what each row contains (i.e. the latitude of a given row is dependent on its position in the file and also information in the header), so a SerDe is likely not the best option.  Instead, an InputFormat Java class can be written to convert the input ASCII grid formatted files into the desired tabular format above, making it possibe to query via Hive via arbitrary lat / long coordinates.

Creating a custom InputFormat

An InputFormat compatible with Hive can be created by creating classes which implement and extend standard mapred library classes:

  1. CustomTextInputFormat.java –  extends FixedLengthInputFormat.  Returns a CustomTextRecordReader which plugs in to Hive at runtime behind the scenes.
  2. CustomTextRecordReader.java – implements mapred RecordReader<LongWritable, BytesWritable>.  Reads and decompresses (if required) files off the Hadoop filesystem. Calls ReadASCIIGridFile to do the actual transformation.
  3. ReadASCIIGridFile.java – contains a static class which does the transformation from input (a byte array – ASCII grid formatted) to output (a byte array – Hive row format)

Notes:

  • Code uses the mapred rather than mapreduce API of Hadoop, as Hive only supports mapred style InputFormat objects
  • CustomTextInputFormat.java sets all files to be non-splittable.  This is done because a file must be read in full with its header to properly convert to the target format.
  • The CustomTextRecordReader copes with compressed input files by utilising the org.apache.hadoop.io.compress.CompressionCodec class to decompress any input files which are compressed.  This is advantageous for ASCII grid formatted files which lend themselves well to compression (e.g. via GZIP) before being uploaded to HDFS.

Compiling the custom InputFormat

Copy text out of the above files and save to a folder on the filesystem.  Build these and then and compile into a JAR file (note – a Java JDK must be installed to run the compilation, preferably the same version as the target Hadoop system is running):

cd </path/where/java/files/are/stored>
mkdir build
javac -d ./build/ -cp "/usr/hdp/2.3.0.0-2557/hadoop/lib/*:/usr/hdp/2.3.0.0-2557/hadoop-mapreduce/*" *.java
cd build
jar cvf CustomTextInputFormat.jar *
cp CustomTextInputFormat.jar /tmp/

Note – the classpath in the javac command assumes that necessary Hadoop library jar files are installed  in certain locations.  The locations mentioned are for the Hortonworks HDP 2.3.0 Sandbox VM, but can be changed to suit other versions / distributions.

Using the custom InputFormat with Hive

Run hive at the command line.

In the Hive session, add the newly created InputFormat JAR:

hive> add jar /tmp/CustomTextInputFormat.jar;
Added [/tmp/CustomTextInputFormat.jar] to class path
Added resources: [/tmp/CustomTextInputFormat.jar]

Create an external table on an HDFS directory containing ASCII grid formatted files:

hive> create external table default.test_ascii
(
lat1 float, long1 float, lat2 float, long2 float, measurement float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'com.analyticsanvil.custominputformat.CustomTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/'; 
OK
Time taken: 13.192 seconds

Run a test query on the external table:

hive> select * from default.test_ascii limit 10;
OK
-43.975002      112.025 -43.925007      112.075 -999.0
-43.975002      112.075 -43.925007      112.12499       -999.0
-43.975002      112.12499       -43.925007      112.17499       -999.0
-43.975002      112.17499       -43.925007      112.22498       -999.0
-43.975002      112.22498       -43.925007      112.27498       -999.0
-43.975002      112.27498       -43.925007      112.324974      -999.0
-43.975002      112.324974      -43.925007      112.37497       -999.0
-43.975002      112.37497       -43.925007      112.424965      -999.0
-43.975002      112.424965      -43.925007      112.47496       -999.0
-43.975002      112.47496       -43.925007      112.524956      -999.0
Time taken: 3.94 seconds, Fetched: 10 row(s)

The query returns data in the desired tabular format.

Conclusion

By creating an InputFormat Java class which reads and transforms fixed format files at the time of Hive querying, we can effectively convert data into forms better which are better suited to analytical purposes.

Similarly, writing a custom input format allows almost any data to be read by invoking a custom Java class on each mapper, translating the input into tabular format for use in Hive.  In the above example ReadASCIIGridFile.java converts an ASCII grid formatted file to a long list of lat / long combinations and readings, but equally, a new Java class could be coded to read more exotic forms of input data – e.g. MP3 audio files, JPEGs or other types of binary file.  So long as a developer knows how to code the translation in Java, input formats can be converted into Hive queryable tabular data on the fly and in parallel (e.g. MP3 files with timestamps and spectral / frequency analysis).

It’s worth noting, further improvements can be made to the Java code above.  For example – more efficient parsing of input files (currently using regular expressions), better error checking, memory utilisation and a mechanism to combine input files for a given input split to improve performance.

Running Spark on Yarn with Zeppelin and WASB storage

It’s increasingly said that “notebooks” are the new spreadsheets in terms of being a tool for exploratory data analysis.  The Apache Zeppelin project (https://zeppelin.incubator.apache.org/) is certainly one such promising notebook-style interface for performing advanced interactive querying of Hadoop data (whether via Hive, Spark, Shell or other scripting languages).

At the time of writing Zeppelin is not completely mature, for example – it lacks the ability to connect to a Kerberos secured Hive service, which may make things difficult in an enterprise environment.  Nonetheless it’s worth the look as a new type of workflow for data scientists and other data analysts.

Setup

Binaries for Zeppelin can be obtained here (version 5.5):
https://zeppelin.incubator.apache.org/download.html

Startup steps

Normal startup steps for Zeppelin are:

cd ~/zeppelin-0.5.5-incubating-bin-all
bin/zeppelin-daemon.sh restart

If using WASB (Windows Azure Blob Storage), however, as the default Hadoop filesystem, this step should be run before starting the Zeppelin daemon as per the above step.  Adding these JARs to the Classpath tells Zeppelin how to read from the WASB filesystem:

export CLASSPATH=.:/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar:/usr/hdp/2.3.0.1-3/hadoop/hadoop-azure-2.7.1.2.3.0.1-3.jar

Config

The following config should be set for running spark on as a Yarn job on an existing HDP2.3 Hadoop cluster (via the Interpreter tab in Zeppelin):

master=yarn-client
spark.driver.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.home=/usr/hdp/current/spark-client

zeppelin_spark_on_yarn
Spark on Yarn config for Zeppelin Spark interpreter

Results

Running the following code in a Zeppelin Notebook should succeed:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

zeppelin_start_spark_sql_context.png

And the Spark instance should be kicked off and remain running in the Yarn Resource Manager:

zeppelin_spark_on_yarn_rm.png

Troubleshooting

Zeppelin may have trouble reading from the WASB filesystem if the above classpath is not added prior to starting Zeppelin:

java.io.IOException: No FileSystem for scheme: wasb
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)

zeppelin_wasb_error.png

Thanks to the Microsoft support team for assisting with finding the right JAR files to add to the Classpath!

Selecting maximum of two columns – timestamp and sequence in Hive

Sometimes it is useful to sort data by two columns and get the maximum of both columns for each record (column A then column B).

An example is a table with a logical primary key and an additional timestamp field and a sequence number field.  This could be the case, for example if data is coming from a change-data capture ETL tool, where multiple changes (inserts, updates, deletes) may be present for a single record.  The timestamp could denote the batch date the ETL tool extracted the records, and within each batch there could also be a sequence number, where the highest sequence number in the highest timestamp denotes the latest version of the record.

E.g. the final record here (where f1 happens to be ‘z’) is the latest record, with a timestamp of 3 and a sequence of 2:

Hive test table with composite logical primary key and a timestamp and sequence field
Hive test table with composite logical primary key and a timestamp and sequence field

Below are compared two options for achieving this in HiveQL – using two nested maximum aggregations and one which is a single pass aggregation of a named structure.

Creating a test table and data

create table if not exists test
(
pk1 string,
pk2 string,
pk3 string,
f1 string,
ts int,
sequence int
)
stored as orc;

insert into table test values (‘a’,’a’,’a’,’x’,1,1);
insert into table test values (‘a’,’a’,’a’,’y’,1,2);
insert into table test values (‘a’,’a’,’a’,’y’,1,3);
insert into table test values (‘a’,’a’,’a’,’y’,2,1);
insert into table test values (‘a’,’a’,’a’,’y’,3,1);
insert into table test values (‘a’,’a’,’a’,’z’,3,2);

select max(mysortstruct(ts,ts,sequence, sequence)).f1 from test;

Option 1 – runtime 37 seconds

set hive.execution.engine=tez;

select t1.pk1,t1.pk2,t1.pk3,t1.ts, max(t1.sequence) as maxseq
from
test t1
join
(select
pk1,pk2,pk3,max(ts) as maxts
from test
group by pk1, pk2, pk3) t2
on
t1.pk1=t2.pk1 and
t1.pk2=t2.pk2 and
t1.pk3=t2.pk3 and
t1.ts=t2.maxts
group by t1.pk1,t1.pk2,t1.pk3,t1.ts
;

 Hive DAG - Two nested maximum aggregations

Option 2 – runtime 11 seconds

set hive.execution.engine=tez;

select
pk1,
pk2,
pk3,
max(named_struct(‘ts’,ts,’sequence’, sequence)).ts,
max(named_struct(‘ts’,ts,’sequence’, sequence)).sequence
from test
group by pk1, pk2, pk3;

Hive DAG - Named struct maximum aggregation

Note — max(named_struct(‘ts’,ts,’sequence’, sequence)).ts basically tells Hive “get me the latest sequence number for the latest timestamp” and then output the timestamp of that record.

The Result

Both option 1 and 2 produce the correct result —

Hive query - maximum sequence within a timestamp

If we look up these values in our original table we see that this corresponds to the latest record according to timestamp and then sequence.  In practice, we could use this result-set to look up (via a join) non-key or attribute fields in a larger table.

It can be seen from the above via the much shorter runtime and simpler Tez execution graph that named structures (see here for Hive documentation on named structures) can help us with the timstamp + sequence use case, and any situation where we need the maximum of two columns for each logical primary key combination.  This is because it gets this information in a single map-reduce pass over the dataset and does not need to operate on an intermediate dataset once the maximum of the first column has been found for each record, to then get the timestamp.