Selecting maximum of two columns – timestamp and sequence in Hive

Sometimes it is useful to sort data by two columns and get the maximum of both columns for each record (column A then column B).

An example is a table with a logical primary key and an additional timestamp field and a sequence number field.  This could be the case, for example if data is coming from a change-data capture ETL tool, where multiple changes (inserts, updates, deletes) may be present for a single record.  The timestamp could denote the batch date the ETL tool extracted the records, and within each batch there could also be a sequence number, where the highest sequence number in the highest timestamp denotes the latest version of the record.

E.g. the final record here (where f1 happens to be ‘z’) is the latest record, with a timestamp of 3 and a sequence of 2:

Hive test table with composite logical primary key and a timestamp and sequence field
Hive test table with composite logical primary key and a timestamp and sequence field

Below are compared two options for achieving this in HiveQL – using two nested maximum aggregations and one which is a single pass aggregation of a named structure.

Creating a test table and data

create table if not exists test
(
pk1 string,
pk2 string,
pk3 string,
f1 string,
ts int,
sequence int
)
stored as orc;

insert into table test values (‘a’,’a’,’a’,’x’,1,1);
insert into table test values (‘a’,’a’,’a’,’y’,1,2);
insert into table test values (‘a’,’a’,’a’,’y’,1,3);
insert into table test values (‘a’,’a’,’a’,’y’,2,1);
insert into table test values (‘a’,’a’,’a’,’y’,3,1);
insert into table test values (‘a’,’a’,’a’,’z’,3,2);

select max(mysortstruct(ts,ts,sequence, sequence)).f1 from test;

Option 1 – runtime 37 seconds

set hive.execution.engine=tez;

select t1.pk1,t1.pk2,t1.pk3,t1.ts, max(t1.sequence) as maxseq
from
test t1
join
(select
pk1,pk2,pk3,max(ts) as maxts
from test
group by pk1, pk2, pk3) t2
on
t1.pk1=t2.pk1 and
t1.pk2=t2.pk2 and
t1.pk3=t2.pk3 and
t1.ts=t2.maxts
group by t1.pk1,t1.pk2,t1.pk3,t1.ts
;

 Hive DAG - Two nested maximum aggregations

Option 2 – runtime 11 seconds

set hive.execution.engine=tez;

select
pk1,
pk2,
pk3,
max(named_struct(‘ts’,ts,’sequence’, sequence)).ts,
max(named_struct(‘ts’,ts,’sequence’, sequence)).sequence
from test
group by pk1, pk2, pk3;

Hive DAG - Named struct maximum aggregation

Note — max(named_struct(‘ts’,ts,’sequence’, sequence)).ts basically tells Hive “get me the latest sequence number for the latest timestamp” and then output the timestamp of that record.

The Result

Both option 1 and 2 produce the correct result —

Hive query - maximum sequence within a timestamp

If we look up these values in our original table we see that this corresponds to the latest record according to timestamp and then sequence.  In practice, we could use this result-set to look up (via a join) non-key or attribute fields in a larger table.

It can be seen from the above via the much shorter runtime and simpler Tez execution graph that named structures (see here for Hive documentation on named structures) can help us with the timstamp + sequence use case, and any situation where we need the maximum of two columns for each logical primary key combination.  This is because it gets this information in a single map-reduce pass over the dataset and does not need to operate on an intermediate dataset once the maximum of the first column has been found for each record, to then get the timestamp.

Leave a comment