神刀安全网

Writing Custom Hive UDF and UDAF

In my Sqoop import article, I imported a customers table. Similarly, I have imported an orders table, which I used in my Hive Joins article. Also, I am using a dummy table for UDF verification. You can find the relevant Sqoop commands on GitHub .

Hive supports of a lot of built-in SQL-like functions in HiveQL. But just in case, if there is a need to write your own UDF, no one is stopping you.

UDF (User Defined Function)

Here I am going to show how to write a simple “trim-like” function called “Strip” – of course, you can write something fancier, but my goal here is to take away something in a short amount of time. So let’s begin.

How to Write a UDF function in Hive?

  1. Create a Java class for the User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF and implements more than one evaluate() methods. Put in your desired logic and you are almost there.
  2. Package your Java class into a JAR file (I am using Maven)
  3. Go to Hive CLI, add your JAR, and verify your JARs is in the Hive CLI classpath
  4. CREATE TEMPORARY FUNCTION in Hive which points to your Java class
  5. Use it in Hive SQL and have fun!

There are better ways to do this, by writing your own GenericUDF to deal with non-primitive types like arrays and maps – but I am not going to cover it in this article.

I will go into detail for each one.

Create Java Class for a User Defined Function 

As you can see below I am calling my Java class “Strip”. You can call it anything, but the important point is that it extends the UDF interface and provides two evaluate() implementations.

evaluate(Text str, String stripChars) - will trim specified characters in stripChars from first argument str. evaluate(Text str) - will trim leading and trailing spaces.
package org.hardik.letsdobigdata; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text;  public class Strip extends UDF {  private Text result = new Text();  public Text evaluate(Text str, String stripChars) {  if(str == null) {  return null;  }  result.set(StringUtils.strip(str.toString(), stripChars));  return result;  }  public Text evaluate(Text str) {  if(str == null) {  return null;  }  result.set(StringUtils.strip(str.toString()));  return result;  } }   

Package Your Java Class into a JAR 

There is a pom.xml attached in GitHub. Please make sure you have Maven installed. If you are working with a GitHub clone, go to your shell:

$ cd HiveUDFs

and run "mvn clean package". This will create a JAR file which contains our UDF class. Copy the JAR’s path.

Go to the Hive CLI and Add the UDF JAR 

hive> ADD /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path

Added resources: [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar]

Verify JAR is in Hive CLI Classpath

You should see your jar in the list.

hive> list jars; /usr/lib/hive/lib/hive-contrib.jar /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar

Create Temporary Function

It does not have to be a temporary function. You can create your own function, but just to keep things moving, go ahead and create a temporary function.

You may want to add ADD JAR and CREATE TEMPORARY FUNCTION to .hiverc file so they will execute at the beginning of each Hive session.

UDF Output

The first query strips ‘ha’ from string ‘hadoop’ as expected (2 argument evaluate() in code). The second query strips trailing and leading spaces as expected.

hive> CREATE TEMPORARY FUNCTION STRIP AS 'org.hardik.letsdobigdata.Strip';  hive> select strip('hadoop','ha') from dummy;  OK  doop  Time taken: 0.131 seconds, Fetched: 1 row(s)  hive> select strip('   hiveUDF ') from dummy;  OK  hiveUDF

If you have made this far, congratulations! That was our UDF in action! You can follow the code on GitHub .

UDAF (User Defined Aggregated Function)

Now, equipped with our first UDF knowledge, we will move to a next step. When we say aggregation, COUNT, AVG, SUM, MIN, and MAX come to our mind.

I am picking a very simple aggregation function AVG/MEAN, where I am going to work with the “orders” table imported using Sqoop . Once you import it into Hive, it will look like the below (or you can use LOAD DATA INPATH – it is totally up to you.)

hive> select * from orders;  OK  orders.order_id orders.order_date orders.customer_id orders.amount  101 2016-01-01 7 3540  102 2016-03-01 1 240  103 2016-03-02 6 2340  104 2016-02-12 3 5000  105 2016-02-12 3 5500  106 2016-02-14 9 3005  107 2016-02-14 1 20  108 2016-02-29 2 2000  109 2016-02-29 3 2500  110 2016-02-27 1 200

The goal of our UDAF is to find the average amount of orders for all customers in the orders table.

We are looking for Query:  SELECT CUSTOMER_ID, AVG(AMOUNT) FROM ORDERS GROUP BY CUSTOMER_ID;

I am going to replace AVG function with “MEAN” function

But before I begin, let’s stop and think as we are entering the MapReduce world. One of the bottlenecks you want to avoid is moving too much data from the Map to the Reduce phase.

An aggregate function is more difficult to write than a regular UDF. Values are aggregated in chunks (across many maps or many reducers), so the implementation has to be capable of combining partial aggregations into final results.

At a high-level, there are two parts to implementing a Generic UDAF:

  1. evaluator – The evaluator class actually implements the UDAF logic.
  2. resolver – The resolver class handles type checking and operator overloading (if you want it), and helps Hive find the correct evaluator class for a given set of argument types.

We are not creating a GenericUDAF. We are creating our one-time aggregation function, so we do not have to worry about a resolver. I am planning write on GenericUDF/GenericUDAF, though. It may be some other day, but soon. :)

How to Write UDAF?

  1. Create a Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
  2. Create an inner class which implements UDAFEvaluator;
  3. Implement five methods ()
    1. init() – The init() method initializes the evaluator and resets its internal state. We are using new Column() in the code below to indicate that no values have been aggregated yet.
    2. iterate() – this method is called every time there is a new value to be aggregated. The evaulator should update its internal state with the result of performing the aggregation (we are doing sum – see below). We return true to indicate that the input was valid.
    3. terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
    4. merge() – this method is called when Hive decides to combine one partial aggregation with another.
    5. terminate() – this method is called when the final result of the aggregation is needed.
  4. Compile and package the JAR
  5. CREATE TEMPORARY FUNCTION in hive CLI
  6. Run Aggregation Query and Verify Output!!!

MeanUDAF.java

package org.hardik.letsdobigdata; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;  @Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")  public class MeanUDAF extends UDAF {  // Define Logging static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());  public static class MeanUDAFEvaluator implements UDAFEvaluator {  /**  * Use Column class to serialize intermediate computation  * This is our groupByColumn  */ public static class Column {  double sum = 0;  int count = 0;  }  private Column col = null;  public MeanUDAFEvaluator() {  super();  init();  } // A - Initalize evaluator - indicating that no values have been  // aggregated yet.  public void init() {  LOG.debug("Initialize evaluator");  col = new Column();  }  // B- Iterate every time there is a new value to be aggregated  public boolean iterate(double value) throws HiveException {  LOG.debug("Iterating over each value for aggregation");  if (col == null)  throw new HiveException("Item is not initialized");  col.sum = col.sum + value;  col.count = col.count + 1;  return true;  } // C - Called when Hive wants partially aggregated results.  public Column terminatePartial() {  LOG.debug("Return partially aggregated results");  return col;  }  // D - Called when Hive decides to combine one partial aggregation with another  public boolean merge(Column other) {  LOG.debug("merging by combining partial aggregation");  if(other == null) {  return true;  }  col.sum += other.sum;  col.count += other.count;  return true;  }  // E - Called when the final result of the aggregation needed.  public double terminate(){  LOG.debug("At the end of last record of the group - returning final result");   return col.sum/col.count;  }   } }

Writing Custom Hive UDF and UDAF

Package and ADD JAR

hive> ADD JAR /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;

Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path

Added resources: [/home/cloudera/workspace/HiveUDFs/target/StudentCourseMRJob-0.0.1-SNAPSHOT.jar]

CREATE FUNCTION in HIVE

hive> CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF'; OK

Verify Output

Execute the below group by query. Our function is called MeanFunc

 hive> select customer_id, MeanFunc(amount) from orders group by customer_id; FAILED: SemanticException [Error 10001]: Line 1:42 Table not found 'orders' hive> use sqoop_workspace; OK Time taken: 0.247 seconds hive> select customer_id, MeanFunc(amount) from orders group by customer_id; Query ID = cloudera_20160302030202_fb24b7c1-4227-4640-afb9-4ccd29bd735f Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes):   set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers:   set hive.exec.reducers.max=<number> In order to set a constant number of reducers:   set mapreduce.job.reduces=<number> Starting Job = job_1456782715090_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1456782715090_0020/ Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1456782715090_0020 Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1 2016-03-02 03:03:16,703 Stage-1 map = 0%,  reduce = 0% 2016-03-02 03:03:53,241 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 3.31 sec 2016-03-02 03:03:55,593 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.9 sec 2016-03-02 03:04:09,201 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.18 sec MapReduce Total cumulative CPU time: 6 seconds 180 msec Ended Job = job_1456782715090_0020 MapReduce Jobs Launched:  Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 6.18 sec   HDFS Read: 12524 HDFS Write: 77 SUCCESS Total MapReduce CPU Time Spent: 6 seconds 180 msec OK 1    153.33333333333334 2    2000.0 3    4333.333333333333 6    2340.0 7    3540.0 9    3005.0 Time taken: 72.172 seconds, Fetched: 6 row(s)

Verify Individual customer_id : As you can see, group by value matches – you can cross check manually. Thanks for your time and reading my blog – hope this is helpful!!!

hive> select * from orders where customer_id = 1; OK 102    2016-03-01    1    240 107    2016-02-14    1    20 110    2016-02-27    1    200 Time taken: 0.32 seconds, Fetched: 3 row(s) hive> select * from orders where customer_id = 2; OK 108    2016-02-29    2    2000 Time taken: 0.191 seconds, Fetched: 1 row(s) hive> select * from orders where customer_id = 3; OK 104    2016-02-12    3    5000 105    2016-02-12    3    5500 109    2016-02-29    3    2500 Time taken: 0.093 seconds, Fetched: 3 row(s)

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Writing Custom Hive UDF and UDAF

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮