Here’s the previous example on Logistic Regression using mahout.
Here‘s is my recent try out of Mahout K-means. There are some key points I think it’s necessary to clarify first. Mahout kmeans is mainly for text processing, if you need to process some numerical data, you need to write some utility functions to write the numerical data into sequence-vector format. For the general example “Reuters”, the first few Mahout steps are actually doing some data processing.
To be explicit, for reuters example, the original downloaded file is in SGML format, which is similar to XML. So we need to first parse(like preprocessing) those files into document-id and document-text. After that we can convert the file into sequenceFiles. SequencesFiles is kind of key-value format. Key is the document id and value is the document content. This step will be done using ‘seqdirectory’. Then use ‘seq2sparse’ do if-idf convert the id-text data to vectors (Vector Space Model: VSM).
For the first preprocessing job, a much quicker way is to reuse the Reuters parser given in the Lucene benchmark JAR file.
Because its bundled along with Mahout, all you need to do is change to the examples/ directory under the Mahout source tree and run the org.apache.lucene.benchmark.utils.ExtractReuters class. Details see the chapter 8 of book Mahout In Action. (http://manning.com/owen/MiA_SampleCh08.pdf)
The generated vectors dir should contain the following items:
We will then use tfidf-vectors to run kmeans. You could give a ‘fake’ initial center path, as given argument k, mahout will automatically random select k to initial the clustering.
mahout-0.5-cdh3u5:$./bin/mahout kmeans -i reuters-vectors/tfidf-vectors/ -o mahout-clusters -c mahout-initial-centers -c 0.1 -k 20 -x 10 -ow
The clustering results will look like this
Holidays are always good time to slow down the pace and do some reflections. So for this time, I’m trying to randomly read something, not specific for solving any on hand problem, but just enjoy the taste of fruits from other researchers. So here are some readings for mapreduce & machine learning.
Many machine learning algorithms fit Kearnsʼ Statistical Query Model:
Linear regression, k-means, Naive Bayes, SVM, EM, PCA, backprop. These can all be written (exactly) in a summation, which leads to a linear speedup in the number of processors.
Some papers on Mapreduce:
– Map0reduce for Machine Learning on Multicore
– Mapreduce: Distributed Computing for machine Learning, 2006
– Large Language Models in Machine Translation
– Fast, easy, and cheap: construction of statistical machine translation models with mapreduce.
– Parallel implementations of word alignment tool
– Inducing Gazetteers for Named Entity Recognition by Large-scale Clustering of Dependency Relations
– Pairwise document similarity in Large Collections with Mapreduce
– Aligning needles in a Haystack: Paraphrase acquisition across the web
– Google news personalization: scalable online collaborative filtering(Assign users to clusters, and assign weights to stories based on the ratings of the users in that cluster)
Pig is a very powerful tool which builds upon hadoop.
Some tutorial links:
Here are some basic notes for using it:
tmp_table = LOAD ‘hdfs_data_file’ USING PigStorage(‘\t’) AS (key1:chararray, key2:int, key3:int);
2) Check out the entire table:
Mysql > SELECT * FROM TMP_TABLE;
Pig > DUMP tmp_table;
3) Check out the first 50 rows ( this will start a mapreduce job, so don’t do it if your table is very big):
Mysql > SELECT * FROM TMP_TABLE LIMIT 50;
Pig > tmp_table_limit = LIMIT tmp_table 50;
4) Order Table > tmp_table_order = ORDER tmp_table BY age ASC;
5) Filter Table > tmp_table_where = FILTER tmp_table BYage > 20;
6) Inner Join
Mysql> SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;mp_table_inner_join =
Pig> JOIN tmp_table BY age,tmp_table_2 BY age;
7) Left Join
Mysql> SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
Pig > tmp_table_left_join = JOIN tmp_table BY age LEFT OUTER,tmp_table_2 BY age;
8) Right Join
Mysql > SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
Pig > tmp_table_right_join = JOIN tmp_table BY age RIGHT OUTER, tmp_table_2 BY age;
Mysql > SELECT * FROM TMP_TABLE,TMP_TABLE_2;
Pig > tmp_table_cross = CROSS tmp_table,tmp_table_2;
10) GROUP BY
Mysql > SELECT * FROM TMP_TABLE GROUP BY IS_MALE;
Pig > tmp_table_group = GROUP tmp_table BY is_male;
Mysql > SELECT IS_MALE,COUNT(*) FROM TMP_TABLE GROUP BY IS_MALE;
Pig > tmp_table_group_count = GROUP tmp_table BY is_male;
tmp_table_group_count = FOREACH tmp_table_group_count GENERATE group,COUNT($1);
MYSQL > SELECT DISTINCT IS_MALE FROM TMP_TABLE;
Pig > tmp_table_distinct = FOREACH tmp_table GENERATE is_male;
tmp_table_distinct = DISTINCT tmp_table_distinct;
Here are some links if you want to highlight the pig script:
just follow the direction to add one line code to you .emacs file
If you have the permission to change the system configuration, you can copy the pig.vim to the system </usr/share/vim/vimXX/syntax> folder, where you will find there are already a lot of .vim files there.
Otherwise you can also do it simplely in your .vimrc file ( make sure you have a .vimrc in your home directory, if not create one), put down the following lines in it:
filetype on syntax on au BufRead,BufNewFile *.pig set filetype=pig "Create a new filetype in your vimrc file. au! Syntax pig source your-path-to-pig.vim "An entry to read your syntax file
Since I can’t request the admistrator to configure any particular setting for the cluster nodes. I found the simplest way is to compile your code locally, use hadoop streaming, ship all you liked os-files with your mapper (compiled executable) together to the node. You could write a bash codes as the mapper, which config the lib-path, and wrap the exe inside it. Below are some websites talking about MapReduce in C++, which actually very nice, but may need more to configure.
JavaCV : nice for using opencv http://code.google.com/p/javacv/
Very nice example(Mapper with Second key): http://code.google.com/p/hadoop-stream-mapreduce/
* hadoop compression
There are several tools like
DEFLATE ( file extentiosn will be .deflate) this is what we often see
LZO (optimize for speed)
There are also nice different options: -1 means optimize for spped and -9 mieans optimize for space. e.g. gzip -1 file
Another way of doing compression is to use ‘Codecs’
A codec is the implementation of a compression-decompression algorithm. In Hadoop, a codec is represented by an implementation of the CompressionCodec interface.
For performance, it is recommended to use the native library for compression and decompression: DEFLATE, gzip, LZO.
If you are using a native library and doing a lot of compression or decompression in your application, consider using “Codecpool”.
For only compress the mapper output, an example( of also choosing the codec type):
In Hadoop, there are two basic data types:
WritableComparable (base interface for keys)
Writable (base class interface for values), Writable is a general-purpose wrapper for the following: Java primitives, String, enum, Writable, null, or arrays of any of these types, e.g.
- IntWritable (Int)
- LongWritable (Long)
Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent of java.lang.String. Text is a replacement for the UTF8 class. The length of a String is the number of char code units it contains. The length of a Text object is the number of bytes in its UTF-8 encoding. You could basically treat everything as Text, and hack it if it actually has a very complicate content. Also, using you are doing Int value, Text will cost your more time.
NullWritable is a special type of Writable, as it has a zero-length serialization. No bytes are written to, or read from, the stream.
There are four Writable collection types in the org.apache.hadoop.io package: Array Writable, TwoDArrayWritable, MapWritable, and SortedMapWritable. ArrayWritable and TwoDArrayWritable are Writable implementations for arrays and two-dimensional arrays (array of arrays) of Writable instances. All the elements of an ArrayWritable or a TwoDArrayWritable must be instances of the same class, which is specified at construction.
hadoop fs command has a -text option to display sequence files in textual form. e.g.
%hadoop fs -text number.seq | head
* TIME OUT PROBLEM
Very new to Hbase. Just had the time-out exception problem, since I am looping though a 300×300 size of image in the mapper. The exception means that the ‘next’ in mapper taking up too long time to wait.
org.apache.hadoop.hbase.client.ScannerTimeoutException: 556054ms passed since the last invocation, timeout is currently set to 300000
One solution would be increase the timeout limit <hbase.rpc.timeout>
* Understand HADOOP_CLASSPATH
In the very begning, I thought it was the folder path. However, it turns out to be the exactly jar file path. What I end up doing is to have several bash command in the bashprofile, to automatically add each one:
jfs=$(ls /home/username/mylib/*.jar) for jf in $jfs ;do # echo "$jf" export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:"$jf" done
* DIFFERENCE of a typical HADOOP job vs. HBASE-HADOOP job
For hadoop job, to set the mapper and reducer classes (input from hdfs, output to hdfs):
The input/output hdfs pathes will also need to be set, e.g. ‘FileOutputFormat.setOutputPath‘
Mapper and Reducer class extend ‘org.apache.hadoop.mapreduce.Mapper;’ and ‘org.apache.hadoop.mapreduce.Reducer;‘
For hbase-hadoop job, to set the mapper and reducer classes(input from htable, output to htable):
TableMapReduceUtil.initTableMapperJob("hbase-input-table-name", scan, hMapper.class, OneKindWritable.class, OneKindWritable.class, job); TableMapReduceUtil.initTableReducerJob("hbase-output-table-name", hReducer.class, job);
Mapper and Reducer class extend ‘org.apache.hadoop.hbase.mapreduce.TableMapper;’ and ‘org.apache.hadoop.hbase.mapreduce.TableReducer;’
The ouput table should be created before launch the job, with corresponding column family name and qualifier name as you may did in your code. For input part, you can set up certain filters for ‘scan’, add input columns with family name and qualifier.
A nice thing is you can mix those settings, so you can read data from hdfs, output to hbase, or read data from hbase output to hdfs.
efficient hadoop : http://www.cloudera.com/blog/2009/05/10-mapreduce-tips/
The cache-file option provides a good way for using AWS Elastic MapReduce when you have extra data (rather than input data — where input data will be processed via stdin to mapper) , such as parameter file or other kind of information. Also using GZipped input in the extra arguments to let Hadoop decompress data on the fly before passing data to mapper: -jobconf stream.recordreader.compression=gzip . Here’s an example of how to specify the cache-file in boto:
mapper = 's3://<your-code-bucket>/mapper.py' reducer = 's3://<your-code-bucket>/reducer.py' input_mr = 's3://<your-input-data-bucket>' output_mr = 's3://<your-output-bucket>' + job_name step_args = ['-jobconf', 'mapred.reduce.tasks=1', '-jobconf', 'mapred.map.tasks=2', '-jobconf', 'stream.recordreader.compression=gzip'] cache_files=['s3://<your-cache-file-bucket>/randomForest-model-1.txt#rf1.txt', s3://<your-cache-file-bucket>/randomForest-model-1.txt#rf2.txt'] step = StreamingStep(name = "my-step", mapper = mapper, reducer = reducer, input = input_mr, output = output_mr, step_args = step_args, cache_files= cache_files)