Holidays are always good time to slow down the pace and do some reflections. So for this time, I’m trying to randomly read something, not specific for solving any on hand problem, but just enjoy the taste of fruits from other researchers. So here are some readings for mapreduce & machine learning.
Many machine learning algorithms fit Kearnsʼ Statistical Query Model:
Linear regression, k-means, Naive Bayes, SVM, EM, PCA, backprop. These can all be written (exactly) in a summation, which leads to a linear speedup in the number of processors.
Some papers on Mapreduce:
– Map0reduce for Machine Learning on Multicore
– Mapreduce: Distributed Computing for machine Learning, 2006
– Large Language Models in Machine Translation
– Fast, easy, and cheap: construction of statistical machine translation models with mapreduce.
– Parallel implementations of word alignment tool
– Inducing Gazetteers for Named Entity Recognition by Large-scale Clustering of Dependency Relations
– Pairwise document similarity in Large Collections with Mapreduce
– Aligning needles in a Haystack: Paraphrase acquisition across the web
– Google news personalization: scalable online collaborative filtering(Assign users to clusters, and assign weights to stories based on the ratings of the users in that cluster)
Pig is a very powerful tool which builds upon hadoop.
Some tutorial links:
Here are some basic notes for using it:
tmp_table = LOAD ‘hdfs_data_file’ USING PigStorage(‘\t’) AS (key1:chararray, key2:int, key3:int);
2) Check out the entire table:
Mysql > SELECT * FROM TMP_TABLE;
Pig > DUMP tmp_table;
3) Check out the first 50 rows ( this will start a mapreduce job, so don’t do it if your table is very big):
Mysql > SELECT * FROM TMP_TABLE LIMIT 50;
Pig > tmp_table_limit = LIMIT tmp_table 50;
4) Order Table > tmp_table_order = ORDER tmp_table BY age ASC;
5) Filter Table > tmp_table_where = FILTER tmp_table BYage > 20;
6) Inner Join
Mysql> SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;mp_table_inner_join =
Pig> JOIN tmp_table BY age,tmp_table_2 BY age;
7) Left Join
Mysql> SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
Pig > tmp_table_left_join = JOIN tmp_table BY age LEFT OUTER,tmp_table_2 BY age;
8) Right Join
Mysql > SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
Pig > tmp_table_right_join = JOIN tmp_table BY age RIGHT OUTER, tmp_table_2 BY age;
Mysql > SELECT * FROM TMP_TABLE,TMP_TABLE_2;
Pig > tmp_table_cross = CROSS tmp_table,tmp_table_2;
10) GROUP BY
Mysql > SELECT * FROM TMP_TABLE GROUP BY IS_MALE;
Pig > tmp_table_group = GROUP tmp_table BY is_male;
Mysql > SELECT IS_MALE,COUNT(*) FROM TMP_TABLE GROUP BY IS_MALE;
Pig > tmp_table_group_count = GROUP tmp_table BY is_male;
tmp_table_group_count = FOREACH tmp_table_group_count GENERATE group,COUNT($1);
MYSQL > SELECT DISTINCT IS_MALE FROM TMP_TABLE;
Pig > tmp_table_distinct = FOREACH tmp_table GENERATE is_male;
tmp_table_distinct = DISTINCT tmp_table_distinct;
Here are some links if you want to highlight the pig script:
just follow the direction to add one line code to you .emacs file
If you have the permission to change the system configuration, you can copy the pig.vim to the system </usr/share/vim/vimXX/syntax> folder, where you will find there are already a lot of .vim files there.
Otherwise you can also do it simplely in your .vimrc file ( make sure you have a .vimrc in your home directory, if not create one), put down the following lines in it:
filetype on syntax on au BufRead,BufNewFile *.pig set filetype=pig "Create a new filetype in your vimrc file. au! Syntax pig source your-path-to-pig.vim "An entry to read your syntax file
* hadoop compression
There are several tools like
DEFLATE ( file extentiosn will be .deflate) this is what we often see
LZO (optimize for speed)
There are also nice different options: -1 means optimize for spped and -9 mieans optimize for space. e.g. gzip -1 file
Another way of doing compression is to use ‘Codecs’
A codec is the implementation of a compression-decompression algorithm. In Hadoop, a codec is represented by an implementation of the CompressionCodec interface.
For performance, it is recommended to use the native library for compression and decompression: DEFLATE, gzip, LZO.
If you are using a native library and doing a lot of compression or decompression in your application, consider using “Codecpool”.
For only compress the mapper output, an example( of also choosing the codec type):
In Hadoop, there are two basic data types:
WritableComparable (base interface for keys)
Writable (base class interface for values), Writable is a general-purpose wrapper for the following: Java primitives, String, enum, Writable, null, or arrays of any of these types, e.g.
- IntWritable (Int)
- LongWritable (Long)
Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent of java.lang.String. Text is a replacement for the UTF8 class. The length of a String is the number of char code units it contains. The length of a Text object is the number of bytes in its UTF-8 encoding. You could basically treat everything as Text, and hack it if it actually has a very complicate content. Also, using you are doing Int value, Text will cost your more time.
NullWritable is a special type of Writable, as it has a zero-length serialization. No bytes are written to, or read from, the stream.
There are four Writable collection types in the org.apache.hadoop.io package: Array Writable, TwoDArrayWritable, MapWritable, and SortedMapWritable. ArrayWritable and TwoDArrayWritable are Writable implementations for arrays and two-dimensional arrays (array of arrays) of Writable instances. All the elements of an ArrayWritable or a TwoDArrayWritable must be instances of the same class, which is specified at construction.
hadoop fs command has a -text option to display sequence files in textual form. e.g.
%hadoop fs -text number.seq | head
The cache-file option provides a good way for using AWS Elastic MapReduce when you have extra data (rather than input data — where input data will be processed via stdin to mapper) , such as parameter file or other kind of information. Also using GZipped input in the extra arguments to let Hadoop decompress data on the fly before passing data to mapper: -jobconf stream.recordreader.compression=gzip . Here’s an example of how to specify the cache-file in boto:
mapper = 's3://<your-code-bucket>/mapper.py' reducer = 's3://<your-code-bucket>/reducer.py' input_mr = 's3://<your-input-data-bucket>' output_mr = 's3://<your-output-bucket>' + job_name step_args = ['-jobconf', 'mapred.reduce.tasks=1', '-jobconf', 'mapred.map.tasks=2', '-jobconf', 'stream.recordreader.compression=gzip'] cache_files=['s3://<your-cache-file-bucket>/randomForest-model-1.txt#rf1.txt', s3://<your-cache-file-bucket>/randomForest-model-1.txt#rf2.txt'] step = StreamingStep(name = "my-step", mapper = mapper, reducer = reducer, input = input_mr, output = output_mr, step_args = step_args, cache_files= cache_files)
Still having the issue using rpy2 in userdata sent to EC2 instance. Everything works well when opening the terminal of the instance created by my own image, but fails when sending userdata via boto. Tried several testing for test the rpy2, like python -m ‘rpy2.tests’, went through well, but still not work.
wired error like:
RRuntimeError”, “<no args> \”/usr/local/lib/python2.7/dist-packages/rpy2-2.2.4dev_20111122-py2.7-linux-x86_64.egg/rpy2/robjects/__init__.py\”, line 225, in __call__\n res = self.eval(p)\n”, ” File \”/usr/local/lib/python2.7/dist-packages/rpy2-2.2.4dev_20111122-py2.7-linux-x86_64.egg/rpy2/robjects/functions.py\”, line 82, in __call__\n return super(SignatureTranslatedFunction, self).__call__(*args, **kwargs)\n”, ” File \”/usr/local/lib/python2.7/dist-packages/rpy2-2.2.4dev_20111122-py2.7-linux-x86_64.egg/rpy2/robjects/functions.py\”, line 34, in __call__\n res = super(Function, self).__call__(*new_args, **new_kwargs)\n”]]
Below just some random links for records.
Amazon Web Service: http://aws.amazon.com/elasticmapreduce/
Get started with Amazon EC2 by using using the boto library for Python: http://aws.amazon.com/articles/3998?_encoding=UTF8&jiveRedirect=1
Documentation of Boto, online at http://boto.cloudhackers.com
Cloud vs AMR:
Best Cloud Apps and Service: https://sites.google.com/site/truthkos/old-pages/ci-days-research-in-the-cloud/favoritecloudapps
Create Amazon Machine Image:
Lunch the instance and save the key-value pair:
Get serious problem of hadoop MapReduce for the compressed files. It seems that it takes longer time to process, and when sequentially process the data (de-compress them), data kinds of explod and crashed the nodes.
Here’s what I found:
If the file is compressed then the file could not be split and wold need to be processed by a single node (effectively destroying the advantage of running a mapreduce ver a cluster of parallel machines).