Create features for data in an Hadoop cluster using Hive queries

This document shows how to create features for data stored in an Azure HDInsight Hadoop cluster using Hive queries. These Hive queries use embedded Hive User Defined Functions (UDFs), the scripts for which are provided.

The operations needed to create features can be memory intensive. The performance of Hive queries becomes more critical in such cases and can be improved by tuning certain parameters. The tuning of these parameters is discussed in the final section.

Examples of the queries that are presented are specific to the NYC Taxi Trip Data scenarios are also provided in GitHub repository. These queries already have data schema specified and are ready to be submitted to run. In the final section, parameters that users can tune so that the performance of Hive queries can be improved are also discussed.

This menu links to topics that describe how to create features for data in various environments. This task is a step in the Team Data Science Process (TDSP).

Prerequisites

This article assumes that you have:

Feature Generation

In this section, several examples of the ways in which features can be generating using Hive queries are described. Once you have generated additional features, you can either add them as columns to the existing table or create a new table with the additional features and primary key, which can then be joined with the original table. Here are the examples presented:

  1. Frequency based Feature Generation
  2. Risks of Categorical Variables in Binary Classification
  3. Extract features from Datetime Field
  4. Extract features from Text Field
  5. Calculate distance between GPS coordinates

Frequency based Feature Generation

It is often useful to calculate the frequencies of the levels of a categorical variable, or the frequencies of certain combinations of levels from multiple categorical variables. Users can use the following script to calculate these frequencies:

    select
        a.<column_name1>, a.<column_name2>, a.sub_count/sum(a.sub_count) over () as frequency
    from
    (
        select
            <column_name1>,<column_name2>, count(*) as sub_count
        from <databasename>.<tablename> group by <column_name1>, <column_name2>
    )a
    order by frequency desc;

Risks of Categorical Variables in Binary Classification

In binary classification, we need to convert non-numeric categorical variables into numeric features when the models being used only take numeric features. This is done by replacing each non-numeric level with a numeric risk. In this section, we show some generic Hive queries that calculate the risk values (log odds) of a categorical variable.

    set smooth_param1=1;
    set smooth_param2=20;
    select
        <column_name1>,<column_name2>,
        ln((sum_target+${hiveconf:smooth_param1})/(record_count-sum_target+${hiveconf:smooth_param2}-${hiveconf:smooth_param1})) as risk
    from
        (
        select
            <column_nam1>, <column_name2>, sum(binary_target) as sum_target, sum(1) as record_count
        from
            (
            select
                <column_name1>, <column_name2>, if(target_column>0,1,0) as binary_target
            from <databasename>.<tablename>
            )a
        group by <column_name1>, <column_name2>
        )b

In this example, variables smooth_param1 and smooth_param2 are set to smooth the risk values calculated from the data. Risks have a range between -Inf and Inf. A risks > 0 indicates that the probability that the target is equal to 1 is greater than 0.5.

After the risk table is calculated, users can assign risk values to a table by joining it with the risk table. The Hive joining query was provided in previous section.

Extract features from Datetime Fields

Hive comes with a set of UDFs for processing datetime fields. In Hive, the default datetime format is 'yyyy-MM-dd 00:00:00' ('1970-01-01 12:21:32' for example). In this section, we show examples that extract the day of a month, the month from a datetime field, and other examples that convert a datetime string in a format other than the default format to a datetime string in default format.

    select day(<datetime field>), month(<datetime field>)
    from <databasename>.<tablename>;

This Hive query assumes that the <datetime field> is in the default datetime format.

If a datetime field is not in the default format, you need to convert the datetime field into Unix time stamp first, and then convert the Unix time stamp to a datetime string that is in the default format. When the datetime is in default format, users can apply the embedded datetime UDFs to extract features.

    select from_unixtime(unix_timestamp(<datetime field>,'<pattern of the datetime field>'))
    from <databasename>.<tablename>;

In this query, if the <datetime field> has the pattern like 03/26/2015 12:04:39, the '<pattern of the datetime field>' should be 'MM/dd/yyyy HH:mm:ss'. To test it, users can run

    select from_unixtime(unix_timestamp('05/15/2015 09:32:10','MM/dd/yyyy HH:mm:ss'))
    from hivesampletable limit 1;

The hivesampletable in this query comes preinstalled on all Azure HDInsight Hadoop clusters by default when the clusters are provisioned.

Extract features from Text Fields

When the Hive table has a text field that contains a string of words that are delimited by spaces, the following query extracts the length of the string, and the number of words in the string.

    select length(<text field>) as str_len, size(split(<text field>,' ')) as word_num
    from <databasename>.<tablename>;

Calculate distances between sets of GPS coordinates

The query given in this section can be directly applied to the NYC Taxi Trip Data. The purpose of this query is to show how to apply an embedded mathematical functions in Hive to generate features.

The fields that are used in this query are the GPS coordinates of pickup and dropoff locations, named pickup_longitude, pickup_latitude, dropoff_longitude, and dropoff_latitude. The queries that calculate the direct distance between the pickup and dropoff coordinates are:

    set R=3959;
    set pi=radians(180);
    select pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude,
        ${hiveconf:R}*2*2*atan((1-sqrt(1-pow(sin((dropoff_latitude-pickup_latitude)
        *${hiveconf:pi}/180/2),2)-cos(pickup_latitude*${hiveconf:pi}/180)
        *cos(dropoff_latitude*${hiveconf:pi}/180)*pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2)))
        /sqrt(pow(sin((dropoff_latitude-pickup_latitude)*${hiveconf:pi}/180/2),2)
        +cos(pickup_latitude*${hiveconf:pi}/180)*cos(dropoff_latitude*${hiveconf:pi}/180)*
        pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2))) as direct_distance
    from nyctaxi.trip
    where pickup_longitude between -90 and 0
    and pickup_latitude between 30 and 90
    and dropoff_longitude between -90 and 0
    and dropoff_latitude between 30 and 90
    limit 10;

The mathematical equations that calculate the distance between two GPS coordinates can be found on the Movable Type Scripts site, authored by Peter Lapisu. In his Javascript, the function toRad() is just lat_or_lonpi/180, which converts degrees to radians. Here, *lat_or_lon is the latitude or longitude. Since Hive does not provide the function atan2, but provides the function atan, the atan2 function is implemented by atan function in the above Hive query using the definition provided in Wikipedia.

Create workspace

A full list of Hive embedded UDFs can be found in the Built-in Functions section on the Apache Hive wiki).

Advanced topics: Tune Hive Parameters to Improve Query Speed

The default parameter settings of Hive cluster might not be suitable for the Hive queries and the data that the queries are processing. In this section, we discuss some parameters that users can tune that improve the performance of Hive queries. Users need to add the parameter tuning queries before the queries of processing data.

  1. Java heap space: For queries involving joining large datasets, or processing long records, running out of heap space is one of the common error. This can be tuned by setting parameters mapreduce.map.java.opts and mapreduce.task.io.sort.mb to desired values. Here is an example:

     set mapreduce.map.java.opts=-Xmx4096m;
     set mapreduce.task.io.sort.mb=-Xmx1024m;
    

    This parameter allocates 4GB memory to Java heap space and also makes sorting more efficient by allocating more memory for it. It is a good idea to play with these allocations if there are any job failure errors related to heap space.

  2. DFS block size : This parameter sets the smallest unit of data that the file system stores. As an example, if the DFS block size is 128MB, then any data of size less than and up to 128MB is stored in a single block, while data that is larger than 128MB is allotted extra blocks. Choosing a very small block size causes large overheads in Hadoop since the name node has to process many more requests to find the relevant block pertaining to the file. A recommended setting when dealing with gigabytes (or larger) data is :

     set dfs.block.size=128m;
    
  3. Optimizing join operation in Hive : While join operations in the map/reduce framework typically take place in the reduce phase, sometimes, enormous gains can be achieved by scheduling joins in the map phase (also called "mapjoins"). To direct Hive to do this whenever possible, we can set :

     set hive.auto.convert.join=true;
    
  4. Specifying the number of mappers to Hive : While Hadoop allows the user to set the number of reducers, the number of mappers is typically not be set by the user. A trick that allows some degree of control on this number is to choose the Hadoop variables, mapred.min.split.size and mapred.max.split.size as the size of each map task is determined by :

     num_maps = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
    

    Typically, the default value of mapred.min.split.size is 0, that of mapred.max.split.size is Long.MAX and that of dfs.block.size is 64MB. As we can see, given the data size, tuning these parameters by "setting" them allows us to tune the number of mappers used.

  5. A few other more advanced options for optimizing Hive performance are mentioned below. These allow you to set the memory allocated to map and reduce tasks, and can be useful in tweaking performance. Please keep in mind that the mapreduce.reduce.memory.mb cannot be greater than the physical memory size of each worker node in the Hadoop cluster.

     set mapreduce.map.memory.mb = 2048;
     set mapreduce.reduce.memory.mb=6144;
     set mapreduce.reduce.java.opts=-Xmx8192m;
     set mapred.reduce.tasks=128;
     set mapred.tasktracker.reduce.tasks.maximum=128;