Useful queries for the Hive metastore

Hive metastore tables

The Hive metastore stores metadata about objects within Hive.  Usually this metastore sits within a relational database such as MySQL.

Sometimes it’s useful to query the Hive metastore directly to find out what databases, tables and views exist in Hive and how they’re defined. For example, say we want to expose a report to users about how many Hive tables are currently in a Hadoop cluster.  Or perhaps we want to run a script which performs some bulk operation on all tables in a particular Hive database.

Luckily, it’s easy to query the metastore using a tool such as MySQL Workbench using appropriate connectors – e.g. MySQL JDBC drivers.

Here’s a rough database diagram showing how the Hive metastore hangs together:

Hive metastore database diagram (ERD)
Hive metastore database diagram (from HDP 2.3, click here for full screen)

Handy metastore SQL queries

Show all Hive databases

SELECT * FROM hive.DBS;

Output:

DB_ID DESC DB_LOCATION_URI NAME OWNER_NAME OWNER_TYPE
1 Default Hive database hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse default public ROLE
6 NULL hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/xademo.db xademo hive USER

 List tables in a given database

SELECT t.* FROM hive.TBLS t
 JOIN hive.DBS d
 ON t.DB_ID = d.DB_ID
 WHERE d.NAME = 'default';

Output:

TBL_ID CREATE_TIME DB_ID LAST_ACCESS_TIME OWNER RETENTION SD_ID TBL_NAME TBL_TYPE VIEW_EXPANDED_TEXT VIEW_ORIGINAL_TEXT LINK_TARGET_ID
1 1439988377 1 0 hue 0 1 sample_07 MANAGED_TABLE NULL NULL NULL
2 1439988387 1 0 hue 0 2 sample_08 MANAGED_TABLE NULL NULL NULL

Show the storage location of a given table

SELECT s.* FROM hive.TBLS t
JOIN hive.DBS d
ON t.DB_ID = d.DB_ID
JOIN hive.SDS s
ON t.SD_ID = s.SD_ID
WHERE TBL_NAME = 'sample_07'
AND d.NAME='default';

Output:

SD_ID CD_ID INPUT_FORMAT IS_COMPRESSED IS_STOREDASSUBDIRECTORIES LOCATION NUM_BUCKETS OUTPUT_FORMAT SERDE_ID
1 1 org.apache.hadoop.mapred.TextInputFormat 0 0 hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/sample_07 -1 org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 1

Find out how a given view has been defined

SELECT t.* FROM hive.TBLS t
JOIN hive.DBS d
ON t.DB_ID = d.DB_ID
WHERE TBL_NAME = 'vw_sample_07'
AND d.NAME='default';

Output:

TBL_ID CREATE_TIME DB_ID LAST_ACCESS_TIME OWNER RETENTION SD_ID TBL_NAME TBL_TYPE VIEW_EXPANDED_TEXT VIEW_ORIGINAL_TEXT LINK_TARGET_ID
31 1471788438 1 0 hue 0 31 vw_sample_07 VIRTUAL_VIEW select count(*) from `default`.`sample_07` select count(*) from default.sample_07 NULL

Get column names, types and comments of a given table

SELECT c.* FROM hive.TBLS t
 JOIN hive.DBS d
 ON t.DB_ID = d.DB_ID
 JOIN hive.SDS s
 ON t.SD_ID = s.SD_ID
 JOIN hive.COLUMNS_V2 c
 ON s.CD_ID = c.CD_ID
 WHERE TBL_NAME = 'sample_07'
 AND d.NAME='default'
 ORDER by INTEGER_IDX;

Output:

CD_ID COMMENT COLUMN_NAME TYPE_NAME INTEGER_IDX
1 NULL code string 0
1 NULL description string 1
1 NULL total_emp int 2
1 NULL salary int 3

Conclusion

It’s possible to query metadata from the Hive metastore which can be handy for understanding what data is available in a Hive instance.  It’s also possible to edit this information too, although this would usually be inadvisable as the schema of the metastore may be subject to change between different Hive versions, and the results of modifying Hive internals could be unexpected at best, and catastrophic at worst.

Advertisement

Python + JDBC = Dynamic Hive scripting

Working with Hive can be challenging without the benefit of a procedural language (such as T-SQL or PL/SQL) in order to do things with data in between Hive statements or run dynamic hive statements in bulk.  For example – we may want to do a rowcount of all tables in one of our Hive databases, without having to code a fixed list of tables in our Hive code.

We can compile Java code to run queries against hive dynamically, but this can be overkill for smaller requirements. Scripting can be a better way to code more complex Hive tasks.

Python to the rescue

Python code can be used to execute dynamic Hive statements, which is useful in these sorts of scenarios:

  1. Code branching depending on results of a Hive query – e.g. ensuring Hive query A successfully executes before running Hive query B
  2. Using looked-up data to form a filter in a Hive query – e.g. selecting data from the latest partition in a Hive table without needing to perform a nested query to get the latest partition

There are several Python libraries available for connecting to Hive such as PyHive and Pyhs2 (the latter unfortunately now unmanaged).  Some major Hadoop vendors however decline to support this type of direct integration explicitly.  They do, however, still strongly support ODBC and JDBC interfaces.

Python + JDBC

We can, in fact, connect Python to sources including Hive and also the Hive metastore using the package JayDeBe API. This is effectively a wrapper allowing Java DB drivers to be used in Python scripts.

Example:

  1. The shell code (setting environment variables)

    First, we need to set the classpath to include the library directories where Hive JDBC drivers can be found, and also where the Python JayDeBe API module can be found:

    export CLASSPATH=$CLASSPATH:`hadoop classpath`:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hive-client/*:/usr/hdp/current/hadoop-client/client/*
    export PYTHONPATH=$PYTHONPATH:/home/me/jaydebeapi/build/
  2. The Python code

    Connections can be established to Hive and Hive metastore using jaydebeapi’s connect() method:

    # Connect to Hive
    conn_hive = jaydebeapi.connect('org.apache.hive.jdbc.HiveDriver',
            ['jdbc:hive2://myhiveserver.mydomain.local/default;principal=hive/_HOST@MYDOMAIN.LOCAL;',
            '', ''], '/path/to/hive-jdbc.jar',)
    curs_hive = conn_hive.cursor()
    
    # Connect to Hive metastore
    conn_mysql = jaydebeapi.connect('com.mysql.jdbc.Driver',
            ['jdbc:mysql://metastoremysqlserver.mydomain.local:3306/hive',
             'mysql_username', 'mysql_password'],
            '/path/to/mysql-jdbc-connector.jar',)
    curs_mysql = conn_mysql.cursor()

    A metastore query can be run to retrieve the names of all tables in the default database into an arry (mysql_query_output):

    # Query the metastore to get all tables in defined databases
    mysql_query_string = "select t.TBL_NAME
    from TBLS t join DBS d
    on t.DB_ID = d.DB_ID
    where t.TBL_NAME like '%mytable%'
    and d.NAME='default'"
    
    curs_mysql.execute(mysql_query_string)
    
    mysql_query_output = curs_mysql.fetchall()

    Hive queries can be dynamically generated and executed to retrieve row counts for all the tables found above:

    # Perform a row count of each hive table found and output it to the screen
    for i in mysql_query_output:
            
            hive_query_string = "select '" + i[0] + "' as tabname,
            count(*) as cnt
            from default." + i[0]
    
            curs_hive.execute(hive_query_string)
    
            hive_query_output = curs_hive.fetchall()
    
            print hive_query_output

    Done! Output from Hive queries now should be printed to the screen.

Pros and cons of the solution

Pros:

  • Provides a nice way of scripting whilst using Hive data
  • Basic error handling is possible through Python after each HQL is executed
  • Connection to a wide variety of JDBC compatible databases

Cons:

  • Relies on client memory to store query results – not suitable for big data volumes (Spark would be a better solution on this front, as all processing is done in parallel and not brought back to the client unless absolutely necessary)
  • Minimal control / visibility over Hive query whilst running

Useful date formulas for Hive

Hive comes with some handy functions for transforming dates.  These can be helpful when working with date dimension tables and performing time-based comparisons and aggregations.

e.g. Convert a native Hive date formatted date string:

date_format(myDate,'dd-MM-yyyy')

Return the week number (within the year) of a particular date – i.e. first week of the year is 1, the week of new year’s eve is 52, etc:

weekofyear(myDate)

Other less obvious examples

Current month’s name (e.g. January, February, etc):

date_format(myDate, 'MMMMM')

First date of the current quarter:

cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date)

Last date of the current quarter:

cast(date_add(trunc(add_months(myDate,3-pmod(month(myDate)-1,3)),'MM'),-1) as date)

Day number of the current quarter (e.g. April 2nd is day 2 of the second quarter, December 9th is day 70 of the fourth quarter, etc):

datediff(myDate,cast(trunc(add_months(myDate,-pmod(month(myDate)-1,3)),'MM') as date))+1

Finding a sequence of events in Hive using analytic functions

Hadoop Hive features several useful functions for efficiently performing analytics over ordered sets of rows — these are known as the windowing and analytics functions.  For example, lead and lag functions can be used to produce rolling averages or cumulative sums over a window of time (e.g. hourly averages for some metric over the preceeding rolling 24 hours).

Another useful feature is the ability to introduce ordinality or sequence into SQL data where there is no strict or predictable sequence field.  This can help us search for chains of events over time.

Example

Imagine a supermarket tracking customer purchases. The following query can be used to find customers who have purchased an Apple in one transaction and in their immediate next transaction, an Orange (assuming transaction_id is a field which increases over time, such as a receipt number):

select x.customer_id from
(
    SELECT customer_id,
    product_name,
    row_number() OVER (
        PARTITION BY customer_id ORDER BY transaction_id
    ) as rn
    FROM default.tbl_product_sales
) x
join
(
    SELECT customer_id,
    product_name,
    row_number() OVER (
        PARTITION BY customer_id ORDER BY transaction_id
    ) as rn
    FROM default.tbl_product_sales
) y
on x.customer_id=y.customer_id
where y.rn=x.rn+1
and x.product_name='Apple'
and y.product_name='Orange';

So, even though transaction_id may not be contiguous or predictable (i.e. a single customer might have consecutive transactions with numbers 1234, 1255, 1257, etc.), we can still use PARTITION BY and ORDER BY to assign a new row number field which is contiguous – whereby each each customer will have their transactions grouped and ordered.  In the above query, rn and rn+1 represent any transaction for a given customer and the transaction immediately afterwards.

 

Running Spark on Yarn with Zeppelin and WASB storage

It’s increasingly said that “notebooks” are the new spreadsheets in terms of being a tool for exploratory data analysis.  The Apache Zeppelin project (https://zeppelin.incubator.apache.org/) is certainly one such promising notebook-style interface for performing advanced interactive querying of Hadoop data (whether via Hive, Spark, Shell or other scripting languages).

At the time of writing Zeppelin is not completely mature, for example – it lacks the ability to connect to a Kerberos secured Hive service, which may make things difficult in an enterprise environment.  Nonetheless it’s worth the look as a new type of workflow for data scientists and other data analysts.

Setup

Binaries for Zeppelin can be obtained here (version 5.5):
https://zeppelin.incubator.apache.org/download.html

Startup steps

Normal startup steps for Zeppelin are:

cd ~/zeppelin-0.5.5-incubating-bin-all
bin/zeppelin-daemon.sh restart

If using WASB (Windows Azure Blob Storage), however, as the default Hadoop filesystem, this step should be run before starting the Zeppelin daemon as per the above step.  Adding these JARs to the Classpath tells Zeppelin how to read from the WASB filesystem:

export CLASSPATH=.:/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar:/usr/hdp/2.3.0.1-3/hadoop/hadoop-azure-2.7.1.2.3.0.1-3.jar

Config

The following config should be set for running spark on as a Yarn job on an existing HDP2.3 Hadoop cluster (via the Interpreter tab in Zeppelin):

master=yarn-client
spark.driver.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.yarn.am.extraJavaOptions=-Dhdp.version=2.3.0.1-3
spark.home=/usr/hdp/current/spark-client

zeppelin_spark_on_yarn
Spark on Yarn config for Zeppelin Spark interpreter

Results

Running the following code in a Zeppelin Notebook should succeed:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

zeppelin_start_spark_sql_context.png

And the Spark instance should be kicked off and remain running in the Yarn Resource Manager:

zeppelin_spark_on_yarn_rm.png

Troubleshooting

Zeppelin may have trouble reading from the WASB filesystem if the above classpath is not added prior to starting Zeppelin:

java.io.IOException: No FileSystem for scheme: wasb
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)

zeppelin_wasb_error.png

Thanks to the Microsoft support team for assisting with finding the right JAR files to add to the Classpath!

Selecting maximum of two columns – timestamp and sequence in Hive

Sometimes it is useful to sort data by two columns and get the maximum of both columns for each record (column A then column B).

An example is a table with a logical primary key and an additional timestamp field and a sequence number field.  This could be the case, for example if data is coming from a change-data capture ETL tool, where multiple changes (inserts, updates, deletes) may be present for a single record.  The timestamp could denote the batch date the ETL tool extracted the records, and within each batch there could also be a sequence number, where the highest sequence number in the highest timestamp denotes the latest version of the record.

E.g. the final record here (where f1 happens to be ‘z’) is the latest record, with a timestamp of 3 and a sequence of 2:

Hive test table with composite logical primary key and a timestamp and sequence field
Hive test table with composite logical primary key and a timestamp and sequence field

Below are compared two options for achieving this in HiveQL – using two nested maximum aggregations and one which is a single pass aggregation of a named structure.

Creating a test table and data

create table if not exists test
(
pk1 string,
pk2 string,
pk3 string,
f1 string,
ts int,
sequence int
)
stored as orc;

insert into table test values (‘a’,’a’,’a’,’x’,1,1);
insert into table test values (‘a’,’a’,’a’,’y’,1,2);
insert into table test values (‘a’,’a’,’a’,’y’,1,3);
insert into table test values (‘a’,’a’,’a’,’y’,2,1);
insert into table test values (‘a’,’a’,’a’,’y’,3,1);
insert into table test values (‘a’,’a’,’a’,’z’,3,2);

select max(mysortstruct(ts,ts,sequence, sequence)).f1 from test;

Option 1 – runtime 37 seconds

set hive.execution.engine=tez;

select t1.pk1,t1.pk2,t1.pk3,t1.ts, max(t1.sequence) as maxseq
from
test t1
join
(select
pk1,pk2,pk3,max(ts) as maxts
from test
group by pk1, pk2, pk3) t2
on
t1.pk1=t2.pk1 and
t1.pk2=t2.pk2 and
t1.pk3=t2.pk3 and
t1.ts=t2.maxts
group by t1.pk1,t1.pk2,t1.pk3,t1.ts
;

 Hive DAG - Two nested maximum aggregations

Option 2 – runtime 11 seconds

set hive.execution.engine=tez;

select
pk1,
pk2,
pk3,
max(named_struct(‘ts’,ts,’sequence’, sequence)).ts,
max(named_struct(‘ts’,ts,’sequence’, sequence)).sequence
from test
group by pk1, pk2, pk3;

Hive DAG - Named struct maximum aggregation

Note — max(named_struct(‘ts’,ts,’sequence’, sequence)).ts basically tells Hive “get me the latest sequence number for the latest timestamp” and then output the timestamp of that record.

The Result

Both option 1 and 2 produce the correct result —

Hive query - maximum sequence within a timestamp

If we look up these values in our original table we see that this corresponds to the latest record according to timestamp and then sequence.  In practice, we could use this result-set to look up (via a join) non-key or attribute fields in a larger table.

It can be seen from the above via the much shorter runtime and simpler Tez execution graph that named structures (see here for Hive documentation on named structures) can help us with the timstamp + sequence use case, and any situation where we need the maximum of two columns for each logical primary key combination.  This is because it gets this information in a single map-reduce pass over the dataset and does not need to operate on an intermediate dataset once the maximum of the first column has been found for each record, to then get the timestamp.

Permission denied and org.apache.hadoop.util.DiskChecker$DiskErrorException errors after Kerberising Hadoop cluster

Background

Kerberizing a Hadoop cluster enables a properly authorised user to access the cluster without entering of username / password details.  For example (after running a kinit command and starting the beeline JDBC client):

beeline>  !connect jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;

Connecting to jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;

Enter username for jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;: myusername

Enter password for jdbc:hive2://hdplinux1.company.internal:10000/default;principal=hive/hdplinux1.company.internal@COMPANY.INTERNAL;: ************
Connected to: Apache Hive (version 1.2.1.2.3.0.1-3)
Driver: Hive JDBC (version 1.2.1.2.3.0.1-3)
Transaction isolation: TRANSACTION_REPEATABLE_READ

Despite the successful login above, two errors occurred subsequently when running Hive queries.

First error (permission denied)

1: jdbc:hive2://hdplinux1.company.internal:10000/default> select a,b from c where a=1;

INFO  : Tez session hasn’t been created yet. Opening session
ERROR : Failed to execute tez graph.
org.apache.tez.dag.api.SessionNotRunning: TezSession has already shutdown. Application application_1441612826389_0022 failed 2 times due to AM Container for appattempt_1441612826389_0022_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hdplinux1.company.internal:8088/cluster/app/application_1441612826389_0022Then, click on links to logs of each attempt.
Diagnostics: Application application_1441612826389_0022 initialization failed (exitCode=255) with output: main : command provided 0
main : run as user is hive
main : requested yarn user is hive
Can’t create directory /var/log/hadoop/yarn/local/usercache/hive/appcache/application_1441612826389_0022 – Permission denied
Did not create any app directories

Failing this attempt. Failing the application.
at org.apache.tez.client.TezClient.waitTillReady(TezClient.java:678)
at org.apache.hadoop.hive.ql.exec.tez.TezSessionState.open(TezSessionState.java:205)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.updateSession(TezTask.java:239)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:137)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask (state=08S01,code=1)

Workaround:

The above error was fixed by renaming the local application cache directory on each datanode:

su –
mv /var/log/hadoop/yarn/local/usercache/hive/appcache appcache.bak

A new appcache directory will get created when re-running the hive query.  Note – this step was performed in a development cluster with no other users, so may have more harmful effects in a running cluster!

Second error (org.apache.hadoop.util.DiskChecker$DiskErrorException)

After the above workaround was applied a new error appeared when executing the Hive query:

1: jdbc:hive2://hdplinux1.company.internal:10000/default> select a,b from c where a=1;

INFO  : Tez session hasn’t been created yet. Opening session
ERROR : Failed to execute tez graph.
org.apache.tez.dag.api.SessionNotRunning: TezSession has already shutdown. Application application_1441612826389_0036 failed 2 times due to AM Container for appattempt_1441612826389_0036_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hdplinux1.company.internal:8088/cluster/app/application_1441612826389_0036Then, click on links to logs of each attempt.
Diagnostics: Application application_1441612826389_0036 initialization failed (exitCode=255) with output: main : command provided 0
main : run as user is hive
main : requested yarn user is hive
org.apache.hadoop.util.DiskChecker$DiskErrorException: Cannot create directory: /var/log/hadoop/yarn/local/usercache/hive/filecache/0/11603
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:105)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.download(ContainerLocalizer.java:199)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.localizeFiles(ContainerLocalizer.java:241)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.runLocalization(ContainerLocalizer.java:169)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.main(ContainerLocalizer.java:372)

Failing this attempt. Failing the application.
at org.apache.tez.client.TezClient.waitTillReady(TezClient.java:678)
at org.apache.hadoop.hive.ql.exec.tez.TezSessionState.open(TezSessionState.java:205)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.updateSession(TezTask.java:239)
at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:137)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154)
at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask (state=08S01,code=1)

Workaround:

This second error was fixed by renaming the local filecache directory on each datanode:

su –
mv /var/log/hadoop/yarn/local/usercache/hive/filecache filecache.bak

A new filecache directory will get created when re-running the hive query. Again note that the impact on a running cluster is uncertain as other jobs may be actively using files in these local cache directories.

After performing the above steps, the original hive query now reruns successfully.

Further info

Vinod Vavilapalli and Omakar Vinit Joshi from Hortonworks describe the role of the appcache and filecache directories in their post on Resource Localization in Yarn.  They describe how resources are localised to Yarn application nodes for performance reasons and downloaded files may be found in different local directories depending on categorisation.  For example – application specific files are found in <local-dir>/usercache/<userid>/appcache/<app-id>/
and private (user-specific) files are found in <local-dir>/usercache/<userid>/filecache .

Using Azure Blob storage with Hadoop

Cloud providers such as Amazon (AWS) and Microsoft (Azure) provide fault-tolerant distributed storage services which can literally “take the load” off a Hadoop installation, providing some compelling advantages.  In the case of Microsoft Azure’s blob storage, however, this is not without its pitfalls.

With the release of Hadoop version 2.7.0 (and vendor packaged versions such as Hortonworks HDP 2.3) Windows Azure Blob storage can be used as either default or secondary storage for Hadoop as instead of HDFS.  See Alexei Khalyako’s description of how to configure both of these options here.

These are some benefits of using Blob storage instead of HDFS (see also Microsoft’s opinion):

  • Separate storage from compute – data can exist with 1, 10, 1000 or even zero Hadoop nodes, meaning compute resources can be scaled freely as there is no reliance on having HDFS services running on a minimum number of nodes with locally attached disks in order to simply access the data.  Equally there is no need to “rebalance” data when nodes are added or removed from the cluster.
  • Relatively Low cost of storage – at the time of writing this is roughly $414.10 AUD per Terabyte per year (see here).  This is quite impressive given the cost of hardware / electricity required to maintain this data even on commodity equipment.
  • Automatic replication – Azure storage can be replicated long-distance with the click of a mouse (by choosing geographically replicated storage).  This means the Azure cloud layer will take care of replicating data to another of its data-centres for disaster recovery purposes.  With the cheapest form of replication (local replication) data is stored 3 times in a single data centre, ensuring High Availability in the event of a single disk failure in the azure Datacentre.  With geo-replicated storage the data is also copied another 3 times into a secondary datacentre (although it would require a declaration from Microsoft to make the secondary copy accessible after a disaster – something completely out of an Azure customer’s control).

Questions you might ask about using Azure Blob storage instead of HDFS:

Q: Wouldn’t it be really slow having an on-premise Hadoop cluster connecting to storage accessibly only over the internet via TCP/IP?
A: Yes.  For this reason this architecture is not recommended.  Instead, it’s worth thinking about Azure storage only for clusters which are stood up in Azure itself (as VM’s or as the platform as a service offering HDInsight).  It is assumed the TCP/IP connectivity within an Azure datacentre is fast enough not to worry about network bottlenecks – i.e. from machines to storage, even despite being over TCP/IP.

Q: Hadoop is all about moving compute closer to storage – doesn’t using blob go against this principle?
A: Microsoft’s answer to this seems to be that the backbone connecting Azure compute VMs to blob storage should provide performance similar to what would be seen with locally attached disks (see Cindy Gross’s useful blog post here).  In other words, the architecture is like having a big disk attached to many nodes simultaneously and directly.

Q: If a blob storage account behaves like a hard disk, won’t it get overloaded with multiple nodes connecting to it simultaneously?
A: No – luckily per Microsoft blob storage accounts apparently do not behave like disks.  The performance characteristics of blob storage allow many Hadoop nodes to be simultaneously reading / writing data.  Microsoft claims a target of 60MB/s throughput (see here) per blob which might correspond to a single chunked file of a Hive table, as well as 15 Gbps overall read performance for a single storage account – i.e. allowing for approx 31 nodes, each reading from a blob account simultaneously at 60MB/s.

Pitfalls of blob storage instead of HDFS?

A very significant pitfall of using blob storage with Hadoop (despite the above advantages) is that whilst…:

“File owner and group are persisted, but the permissions model is not enforced.” (https://hadoop.apache.org/docs/stable/hadoop-azure/index.html)

This presents enormous challenges at the enterprise level in providing access to multiple users or even self-service access to unstructured or semi-structured data in a Hadoop-based data lake.  Given the frequent need to protect sensitive data within an organisation (e.g. customer, employee, financial data) it seems a severe limitation that the Hadoop interface to the blob storage APIs has not been augmented with the ability to enforce the file and folder permissions which it so dutifully records!

An example of the problem can be seen here by comparing native HDFS storage behaviour with blob storage behaviour (both when acting as the default hadoop filesystem):

Using HDFS – Authorisations working correctly:

[azureuser@hdplinux4 tmp]$ id
uid=500(azureuser) gid=500(azureuser) groups=500(azureuser)
[azureuser@hdplinux4 tmp]$ hdfs dfs -ls /tmp/testonly/
Found 1 items
-rw-------   1 hdfs hdfs         13 2015-08-26 00:57 /tmp/testonly/test.txt
[azureuser@hdplinux4 tmp]$ pwd
/tmp
[azureuser@hdplinux4 tmp]$ hdfs dfs -copyToLocal /tmp/testonly/test.txt
copyToLocal: Permission denied: user=azureuser, access=READ, inode="/tmp/testonly/test.txt":hdfs:hdfs:-rw-------

Using Blob – Authorisations are completely ignored (despite being visible via a hdfs dfs -ls command, permissions which say that only user hdfs should be allowed to read test.txt are completely ignored when user azureuser tries to copy the file):

azureuser@hdplinuxblob:/tmp> id
uid=1002(azureuser) gid=100(users) groups=100(users),16(dialout),33(video)
azureuser@hdplinuxblob:/tmp> hdfs dfs -ls /tmp/testonly/test.txt
-rw-------   1 hdfs hdfs         13 2015-08-26 01:00 /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> hdfs dfs -copyToLocal /tmp/testonly/test.txt
azureuser@hdplinuxblob:/tmp> ls -la test.txt
-rw-r--r-- 1 azureuser users 13 Aug 26 01:09 test.txt

Only time will tell whether Microsoft will rectify the severe gap currently in the Azure Blob storage integration into Hadoop.  There is some indication that they intend to close the gap with the Azure Data Lake service due for eventual release, which promises compatibility with many flavours of Hadoop (e.g. Hortonworks and Cloudera) as well as integration into Active Directory to allow for files and folders to be secured.  The challenge still remains, however, of providing a security mechanism which is compatible with the wider Hadoop ecosystem, and this gives pause to think about the choosing blob over HDFS when it should otherwise be an easy decision.

Sparkling-water – keeping the web UI alive

Spark is a great way to make use of the available RAM on a Hadoop cluster to run fast in-memory analysis and queries, and H2O is a great project for running distributed machine learning algorithms on data stored in Hadoop.  Together they form “Sparkling Water” (Spark + H2O, obviously!).

Easy to follow instructions for setting up Sparkling Water are available here: http://h2o-release.s3.amazonaws.com/sparkling-water/master/103/index.html

Running spark on Yarn is a good way to utilise an existing Hadoop cluster, however it’s challenging using the “live” method below to keep the Sparkling Water H2O Flow interface running permanently.  Doing so can let a number of data scientists use the notebook style interface to run machine learning tasks.  Luckily, using the spark-submit invocation with the water.SparklingWaterDriver class can ensure the web UI remains online even after the shell session which kicked it off exits (see below Persistent method).

Live method – doesn’t stay online after exiting shell session

  1. Create a shell script:

    #!/bin/bash
    export SPARK_HOME=’/usr/hdp/current/spark-client/’
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export MASTER=”yarn-client”
    sparkling-water-1.3.5/bin/sparkling-shell –num-executors 3 –executor-memory 2g –master yarn-client

  2. Run sparkling-shell

    import org.apache.spark.h2o._
    val h2oContext = new H2OContext(sc).start()
    import h2oContext._

Persistent method – stays online even after exiting shell session

To start a “persistent” H2O cluster on Yarn (i.e. one which doesn’t exit immediately) simply run this command at the command line of a node where the spark client and sparkling water is installed:

nohup bin/spark-submit –class water.SparklingWaterDriver –master yarn-client –num-executors 3 –driver-memory 4g –executor-memory 2g –executor-cores 1 ../sparkling-water-0.2.1-58/assembly/build/libs/*.jar &

The Spark UI should be available on it’s usual port (http://XXX.XXX.XXX.XXX:54321) and should remain there even if the shell session which started the UI dies!

Thanks to the helpful and responsive folks at H2Oai for the above tip (originally answered here)!