Archive

Archive for the ‘Hive’ Category

Hive tips

November 29, 2012 8 comments

Command Line Use

* Running query from command line and setting configuration
$HIVE_HOME/bin/hive -e ‘select a.col from tab1 a’  -hiveconf mapred.reduce.tasks=32

* Running script non-interactivley
$HIVE_HOME/bin/hive -f /home/my/hive-script.sql

* Hive stores data in hdfs, to do a simple check:
hadoop fs -ls /user/hive/warehouse/tablename/file/ | head -n 20

Interactive Shell Commands

Command Description
quit Use quit or exit to leave the interactive shell.
set key=value Use this to set value of particular configuration variable. One thing to note here is that if you misspell the variable name, cli will not show an error.
set This will print a list of configuration variables that are overridden by user or hive. ( that were explictly set)
set -v This will print all hadoop and hive configuration variables.
add FILE [file] [file]* Adds a file to the list of resources
list FILE list all the files added to the distributed cache
list FILE [file]* Check if given resources are already added to distributed cache
! [cmd] Executes a shell command from the hive shell
dfs [dfs cmd] Executes a dfs command from the hive shell
[query] Executes a hive query and prints results to standard out
source FILE Used to execute a script file inside the CLI.

SHOW TABLES;
— To list all the tables in Hive, type
SHOW TABLES ‘foo*’
— (this will list all tables that start with “foo”). The regular expression can contain ‘*’ for any character(s) or “|” for a choice.
SHOW TABLE EXTENDED LIKE `foo*`
— gives information about the matching tables such as the number of files for that table, total file size, and partition information. Note that the pattern should use backticks, not quotes
SHOW PARTITIONS tablename;
— returns a list of partitions for a table
SHOW FUNCTIONS [“foo*”];
— SHOW FUNCTIONS will list the functions available. It is also possible to list all  functions that match a regular expression e.g.SHOW FUNCTIONS “.*str.*”;
DESCRIBE FUNCTIONS [EXTENDED] function;
— e.g.  DESCRIBE FUNCTIONS EXTENDED substr;

Add Additional Files

ADD { FILE[S] | JAR[S] | ARCHIVE[S] } <filepath1> [<filepath2>]*
LIST { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
DELETE { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]

hive> add FILE /tmp/tt.py;
hive> list FILES; /tmp/tt.py
hive> from networks a MAP a.networkid USING ‘python tt.py’ as nn where a.ds = ‘2009-01-04’ limit 10;

if in one of you mapper or reduer python file, you are import another module, and you encounder the error like “module can not found”, you can try to add the current system path before you the import:

import sys
import os
sys.path.append(os.getcwd())

MapReduce Setting:

SET mapred.job.queue.name=yourqueuename;
SET mapred.child.java.opts=’-Xmx512m’;
SET hive.exec.reducers.bytes.per.reducer=<number>    — to change the average load for a reducer (in bytes)
SET hive.exec.reducers.max=                                — to limit the maximum number of reducers
SET mapred.compress.map.output=true;                                — compress the map output
SET hive.exec.compress.output=false;
SET mapred.reduce.tasks=                                       — to set a constant number of reducers

SET -v : a dump of all settings

EXPLAIN

When a command is submitted to Hive, several things typically happen:
1. Parse the query
2. Get metadata from the metastore (this may be the embedded Derby database or a centralized RDBMS).
3. Create a logical plan (abstract syntax tree)
4. Optimize the plan
5. Create a physical plan. This is a directed acyclic graph of map reduce jobs.
The execution engine submits the jobs sequentially in dependency order, running jobs in parallel whenever possible. There are certain queries that do not require MapReduce. Metadata-only commands, for example, only need to communicate with the Hive metastore to evaluate results. Some queries cause Hive to accessing HDFS, but do not require any map/reduce phases. For example, “SELECT * FROM table” would not require MapReduce. Note: do not open more than one hive shell at a time when using the default Derby database for the metastore

The EXPLAIN keyword can be used before any SELECT statement. This invokes the parser, logical plan generator, optimizer and physical plan generator (but does not  execute the query). Example:
hive> EXPLAIN SELECT * FROM customer LIMIT 3;
As expected, this query does not require any MapReduce jobs. Instead, Hive invokes a “Fetch Operator” which uses the HDFS API to read the data for this table. When a MapReduce job is required, the EXPLAIN output shows what is being done in the map and reduce (if a reduce task is required). In this example, the map is scanning the purchases table and filtering for cost > 40.

explain

The MapReduce framework guarantees that the output of the map tasks will be sorted by key before being consumed by a reducer. The process is called the “shuffle and sort”. Hive takes advantage of this by having the mappers output “order_date” as the key and setting the number of reducers to one. This accomplishes the ORDER BY clause of the query. The work for filtering and sorting the result set has already been done by the map task and shuffle/sort process. Therefore the reducer just needs to output the results. This is called the IdentityReducer.

explain2

Loading&Writing Data

For hive, you could write the new collected data into a hive table(can be external) , or local director, or hdfs directory directly. So if you don’t what to create any additional table, just use directory, or local directory.

 hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100;
 hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
 hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;

Schema on read, not write

Hive uses a “schema on read” approach, whereas many traditional databases use a “schema on write” technique. This means that when you
LOAD data into Hive, it does not parse, verify and serialize the data into its own format. Instead, Hive merely moves files (or copies from the local filesystem). Or in the case of an external table, it merely points to the files. This has some benefits such as very fast loads. It is also very flexible; it is possible to have two schemas for the same underlying data using EXTERNAL tables.

– Data is not checked during the load ( loads are very fast)
– Parsing errors would be found at query time
– Possible to have multiple schemas for the same data (using EXTERNAL tables)

Use external table

Use external table to create a directory in the hdfs, by this way, when you drop the table in hive, you data will still be there. You can also use BIGINT if you think your count will be very give and overflow.

CREATE EXTERNAL TABLE yourTableName(column1Name INT, cnt BIGINT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE
LOCATION 'some-directory-in-your-hdfs';

The STORED AS clause indicates what file format the data uses. This translates to an InputFormat class which reads rows of data. Currently the possible values are TextFile (the default) or SequenceFile. The ROW FORMAT clause tells Hive how to read the columns in each row. If ROW FORMAT is not specified, then Hive uses a built-in SerDe that assumes ctrl-A as a field terminator.

Hive extends the SQL standard to support multi-table insert statements. This allows data to be scanned a single time and used multiple times which is much more efficient than streaming the data each time it is used.

FROM tablename alias
INSERT OVERWRITE TABLE t2 SELECT alias.col1
INSERT OVERWRITE TABLE t3 SELECT alias.col2;

Creating Table from Another Table

CREATE TABLE [IF NOT EXISTS] newtable AS
SELECT col1, col2 FROM oldtable;

It is possible to create a table from an existing table using CREATE TABLE..AS SELECT (i.e., “CTAS”). The SELECT statement can be any valid SELECT statement in Hive. The keyword AS is required. The new table derives the column definitions from the SELECTed data. For example,
if col1 is an INT and col2 is a String in the oldtable table, then newtable will have two columns of type INT and STRING respectively. The SELECT can contain column aliases, which will become the column names in the new table.

The keywords IF NOT EXISTS prevent an error from occurring if the table exists. However, there is no verification that the existing table has a structure identical to that indicated by the CREATE TABLE statement.

Case sensitivity: Most aspects of the Hive language are case-insensitive (e.g., keywords, identifiers, and functions). Capitalizing keywords is a convention but not required. String comparisons are case-sensitive.

Primitive Data Types

primitive data types

A TINYINT uses only 1 byte but is limited to a small range of numbers (-256 to 255).  Boolean can only be true, false or null.

Complex Types:

complex types

A map is a set of (key, value) pairs. For example, a group of options could be represented as a map where the key and value are both strings. To retrieve a value from a map, reference it by key such as options[‘myopt’].
An array is a list of items of the same type. Individual items can be accessed by a zero-based index. For example, to retrieve the first phone number in an array called phone_numbers, use phone_numbers[0].
A struct is a user-defined structure of any number of typed fields. To access the fields, use a dot notation such as user.id and user.name.
The complex types can also be nested.

Creating a table with an array

CREATE TABLE tablename (col1 STRING, col2 ARRAY)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’
COLLECTION ITEMS TERMINATED BY ‘,’ STORED AS TEXTFILE;

To create a table with a complex data type, use COLLECTION ITEMS TERMINATED BY in the CREATE TABLE.

For map structures such as JSON-like data, use MAP KEYS TERMINATED BY to specify the character between the key and value.

Built-in functions: Hive supports a good number of built-in functions (though not as many as most RDBMSs). For a full list of Hive’s functions, see http://wiki.apache.org/hadoop/Hive/LanguageManual/UDF#Built-in_Functions or issue SHOW FUNCTIONS.

Use DESCRIBE to see sample usage information:
hive> DESCRIBE FUNCTION length;

User Defined Functions

One simple example is using ’embedded’ python or other script(bin/cat). You can also using your own java function(add a jar file instead).

add file mypyfile.py;    — you can have args for your python file.

INSERT OVERWRITE TABLE output_table
SELECT TRANSFORM(col1, col2, col4, col5)
USING ‘python mypyfile.py python-args’               — if no args: USING ‘python mypyfile.py’
AS (output_col1, output_col2, output_col3)
FROM input_table;

Aggregate Functions:  There are 6 special built-in functions that aggregate values into a single result: count, sum, avg, max, min, percentile

The count function can be used two ways: count(1) returns the number of items in the group; count(DISTINCT column) returns the number of distinct values for column. Summing numeric values can be done with sum. Likewise avg can be used to find an average (mean). Sum and avg can use DISTINCT as well. Max and min return the largest and smallest value in the group respectively.This example would count the number of purchases in the purchases table as well as  calculate a total cost:

SELECT count(1), sum(cost) FROM purchases;
Note: while count, sum and avg can use DISTINCT, a query cannot contain two aggregate functions that apply DISTINCT to different columns. E.g., this is allowed:
SELECT count(DISTINCT foo_col), sum(DISTINCT foo_col)…
But this is not allowed:
SELECT count(DISTINCT foo_col), sum(DISTINCT bar_col)…

Using Complex Datatype

CREATE TABLE t (id INT, letters ARRAY<STRING>)…

id    letters
1     [“a”, “b”, “c”]
2     [“d”, “e”]

The explode function takes a single argument, an array, and outputs a row for each element of the array. A column alias is required.

SELECT explode(my_array) as l from T;

Type                  Input                         Output                              Example
————————————————————————————————-
UDF                   single row                single row                       SELECT length(firstname)
UDAF               multiple rows         single row                       SELECT count(custid)
UDTF                single row                 multiple rows               SELECT explode(my_array)

The explode function is useful along with LATERAL VIEW, which joins the rows from the base table to the output of the UDTF. It applies the function to each row of  the base table and then does the join.

let_view

JOIN

An inner join only returns rows that have a match in both tables.  It is necessary to qualify column names that are ambiguous if same name exists in both tables. Table aliases can also be used. Only equi-joins are supported. Non-equality comparisons cannot be used in the ON clause. Comma join syntax (available in some RDBMSes) is not supported.

An outer join is like an inner join, but also includes the rows that did not have a  match. For example, Doug did not have any purchase records.
• LEFT OUTER JOIN will return the non-matching rows from the left table
• RIGHT OUTER JOIN returns the non-matching rows from the right table
• FULL OUTER JOIN returns both
For rows that did not have a match in the joined table, there will be NULLs in all  columns of the non-matching table.

outer_join

Outer Join is useful for finding unmatched records, for example, find customers who have not made purchaes

SELECT C.custin, firstname, surname FROM customer C LEFT OUTER JOIN purchases P ON (C.custid = P.custid)  WHERE P.custin IS NULL;

Be sure to use the “IS NULL” operator, not “= NULL”

One Confusing Case: Joins occur BEFORE WHERE CLAUSES. So, if you want to restrict the OUTPUT of a join, a requirement should be in the WHERE clause, otherwise it should be in the JOIN clause. A big point of confusion for this issue is partitioned tables:

  SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
  WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'

will join a on b, producing a list of a.val and b.val. The WHERE clause, however, can also reference other columns of a and b that are in the output of the join, and then filter them out. However, whenever a row from the JOIN has found a key for a and no key for b, all of the columns of b will be NULL, including the ds column. This is to say, you will filter out all rows of join output for which there was no valid b.key, and thus you have outsmarted your LEFT OUTER requirement. In other words, the LEFT OUTER part of the join is irrelevant if you reference any column of b in the WHERE clause. Instead, when OUTER JOINing, use this syntax:

  SELECT a.val, b.val FROM a LEFT OUTER JOIN b
  ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')

..the result is that the output of the join is pre-filtered, and you won’t get post-filtering trouble for rows that have a valid a.key but no matching b.key. The same logic applies to RIGHT and FULL joins.

Partition & Bucketing

Bucketing is similar to partitioning in that it distributes the rows into separate groups that are stored in their own subdirectory. Unlike partitioning, the user does not have to define which rows go into a particular bucket. Hive will use this formula for assigning a row to a bucket: hash(column) MOD (number of buckets). This will usually cause an even distribution of rows across the buckets.

– Similar to partitioning, but based on a hash of the incoming data
– Causes a mostly even distribution or rows across N number of buckets
– Userful for “sampling ” rows in a table

CREATE TABLE tablename (columns) CLUSTERED BY (column_name) INTO N BUCKETS

Partition is very useful which brings better efficiency for loading your data. I also found that it’s very useful if you have multiple directories organized in certain way and wanted to include them using partition. For example. I could create a external table pointing to some exist hdfs, and load its subdirectories (‘birth-geo-data/year=2012/month=12/day=12’)as partition, e.g.

CREATE EXTERNAL TABLE tablename (name STRING, birth_city STRING)
PARTITIONED BY (year STRING, month STRING, day STRING)
LOCATION ‘hdfs-path-to/birth-geo-data’;

You will need to inform hive to add the partition first.

ALTER TABLE tablename ADD PARTITION(year=’2012′, month=’12’, day=’12’);

Because your partition string names are exactly same as your subdirectory scheme. By using the above code, hive will recognize the partition.

In some cases, you may have data like ‘birth-geo-data/2012/12/12’. You can add ‘LOCATION’ in the ALTER TABLE code to add the partition, which will directly point hive the location that the data is:

ALTER TABLE tablename ADD PARTITION(year=’2012′, month=’12’, day=’12’) LOCATION ‘birth-geo-data/2012/12/12’;

Actually, if your data are in someplace you can only read but not write such as ‘only-read-birth-geo-data/2012/12/12’. You can create your own path to your EXTERNAL TABLE, and then add partitions to each one of the subdirectory. By this way, no need to move the data, while you can still read it and overlay a partition schema on the data. And since we use external table, it’s safe to drop any partition — you won’t lose your data in the original hdfs.

CREATE EXTERNAL TABLE my_birth_table (name STRING, birth_city STRING)
PARTITIONED BY (year STRING, month STRING, day STRING)
LOCATION ‘your-own-hdfs-path-to/my-own-birth-geo-data’;
ALTER TABLE my_birth_table ADD PARTITION(year=’2012′, month=’12’, day=’12’) LOCATION ‘birth-geo-data/2012/12/12’;

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

Using UDF(python) in HIVE

FROM (
FROM input_table
MAP col1, col2, col3
USING ‘mapper.py’                                                          — mapper.py will generate a k and a value(v1)
AS k, v1
CLUSTER BY k) map_output
INSERT OVERWRITE TABLE output_table
REDUCE k, v1
USING ‘reducer.py parameter1 parameter2’           — you can use parameter for your mapper or reducer
AS k, v1, v2, v3, v4                                                            — here, your reducer generate a key and 4 values
;

Create Custom UDF

example

Handling dates

-No native date types
– Use STRING (2010-03-29)or INT(1273082420)

SELECT from_unixtime(1273082420) ..                                                — result will be 2010-05-05 11:00:20
SELECT to_date(2010-05-05 11:00:20) ..                                                — result will be 2010-05-05
SELECT .. ORDER BY unix_timestamp(adate, ‘dd-MM-yyyy’)        — if adate in your table is like ’08-Mar-1999′

There is no native date or time data type in Hive. The recommended approach for storing dates is to use a string or integer type. As a string, store the data in ISO format (“year-month-day”). As an integer, store the date or datetime as a the number of seconds since 1970-01-01 (Unix epoch). There are useful functions for converting unix timestamps to date format and vice
versus. There are also functions for extracting or transforming dates.

If your data file contains a date representation that is not a Unix timestamp or a string in ISO format, then you will probably need a function: unix_timestamp(STRING date, STRING pattern) . The pattern can be any pattern used by java.text.SimpleDateFormat.

Add Comments to table

Adding comments to your table definitions is a good practice. These comments are saved in the metastore and are visible in DESCRIBE EXTENDED. A recommended practice is to use column comments to describe the format of the data (e.g., if the
column is a STRING that stores a date). It may also be helpful to include the creator’s name in the table comment.

CREAT TABLE tablename (col type COMMENT ‘Column comment’, ..)
COMMENT ‘Table comment’
PARTITIONED BY (… COMMENT ‘Partition comment’);

Sent log information to console

hive -hiveconf hive.root.logger=INFO,console

Hive creates a log file which defaults to /tmp/{user.name}/hive.log. Sometimes it is more useful to have verbose logging to the console. The  configuration setting hive.root.logger controls the level of logging as well as the  location. “hive.root.logger=INFO,console” means that the INFO level of  logging should be used and the messages sent to the console instead of the log file. This configuration setting cannot be enabled dynamically (it cannot be turned on via  SET). It is necessary to logout and use the -hiveconf option or edit the hivesite.xml file.

Others

You may also do this to sort your table:

INSERT OVERWRITE TABLE tablename
SELECT * FROM tablename
ORDER BY col1 DESC;

Others

Sqoop is an open-source project supported by Cloudera for importing data into Hadoop. Sqoop automatically generates the necessary Java classes, then runs a MapReduce job to read data in parallel and creates a set of files in HDFS.Full documentation for Sqoop can be found at:
http://archive.cloudera.com/docs/sqoop/

Categories: Hive