カタカタブログ

SIerで働くITエンジニアがカタカタした記録を残す技術ブログ。Java, Oracle Database, Linuxが中心です。たまにRuby on Railsなども。

RubyでHadoop Streamingを動かしてみる

mHadoopでMapReduceジョブを実行するには最近はHiveを使うのが一般的だが、MapReduceを手軽に使うための方法としてHadoop Streamingがある。
これは標準入出力を利用してMapReduceジョブを実行できるというもので、Javaで複雑なコードを実装せずに手軽に試せるので便利。
Hiveばかり使っているとMapReduceジョブのレイヤーがブラックボックスとなってしまうけど、MapperやReducerの動き自体を理解する上でもわかりやすいので、今回はこれを試してみる。

ちなみに環境はCDH 5.5の入った4ノードのクラスタ環境で試す。

Hadoop Streamingを使うときはMapperとReducerのそれぞれの処理を実装する実行可能なファイルを用意する。
ここで、MapperやReducerは入力データを標準入力から受け取り、結果を標準出力するように実装するのがポイントとなる。
そのため、標準入出力さえできればなんだってよいので、perlだろうがシェルスクリプトでもHadoopが動かせる。便利。今回はRubyを使うことにする。

WordCount

分散処理といえばまずはWordCountを実装するのがならわしのようなので、文字数カウント処理をHadoop Streamingを使って実装してみる。
WordCountは入力にスペース区切りの単語が並んだテキストを持ち、それらのテキストごとの出現数を算出する。

Mapperを作る

まずはmapperを作る。

wc_map.rb

#!/usr/bin/ruby
def wc_map(line)
 line.chomp.split.map { |e| [e, 1] }
end

def output(records)
 records.each do |key, value|
  puts "#{key}\t#{value}"
 end
end

while l = STDIN.gets
 output(wc_map(l))
end

標準入力から1行ずつテキストを受け取り、それをスペースで分割しつつ(単語, 1)という配列を作る。
最後にそれらをタブ区切りで標準出力する。

まずは単体で実行してみる。

$ cat sample.txt
aa bb cc
bb cc dd
cc ee ff
$ cat sample.txt | ./wc_map.r
aa   1
bb   1
cc   1
bb   1
cc   1
dd   1
cc   1
ee   1
ff   1

期待どおりの結果がえられたので、次はこれ(実際はソートもされる)を標準入力から受け取り、単語ごとに集計した結果を標準出力するReducerを作る。

Reducerを作る

wc_reduce.rb

#!/usr/bin/ruby
def wc_reduce(line, results)
 key, value = line.chomp.strip.split
 results[key] += value.to_i
end

results = Hash.new(0)
while l = STDIN.gets
 wc_reduce(l, results)
end
p results

標準入力が単語、数値のタブ区切りなのでそれを分割し、単語をキーとするハッシュを作り、バリューの部分を足しあわせていく。
最後にハッシュを標準出力して終わり。

MapperとReducerができたので、これらをパイプで組み合わせて以下のようにテストする。Hadoop Streamingで分散する場合も同じように処理されるので、これが通ればプログラムレベルではHadoop Streamingで動く。

$ cat sample.txt | ./wc_map.rb | sort | ./wc_reduce.rb
{"ff"=>1, "cc"=>3, "ee"=>1, "bb"=>2, "dd"=>1, "aa"=>1}

うまく動いているよう!

Hadoop Streamingで実行

MapperとReducerができたので、これをHadoop Streamingで分散処理させてみる。
以下のようなコマンドで実行する。入力となるサンプルファイルはHDFS上に配置した上でそのパスを指定する。また、-fileで作成したMapperとRedcuerを指定する。

$ hadoop jar CDH-5.5.1-1.cdh5.5.1.p1168.923/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar -input /user/mapred/example/stream/sample.txt -output /user/mapred/example/stream/wc_result -mapper wc_map.rb -reducer wc_reduce.rb -file wc_map.rb wc_reduce.rb
16/06/08 17:13:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [wc_map.rb, wc_reduce.rb] [CDH-5.5.2-1.cdh5.5.2.p0.4/jars/hadoop-streaming-2.6.0-cdh5.5.2.jar] /tmp/streamjob8479180127713306959.jar tmpDir=null
16/06/08 17:13:34 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm16
16/06/08 17:13:34 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/08 17:13:35 INFO mapreduce.JobSubmitter: number of splits:2
16/06/08 17:13:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1465349429352_0013
16/06/08 17:13:35 INFO impl.YarnClientImpl: Submitted application application_1465349429352_0013
16/06/08 17:13:35 INFO mapreduce.Job: The url to track the job: http://myhadoop04:8088/proxy/application_1465349429352_0013/
16/06/08 17:13:35 INFO mapreduce.Job: Running job: job_1465349429352_0013
16/06/08 17:13:39 INFO mapreduce.Job: Job job_1465349429352_0013 running in uber mode : false
16/06/08 17:13:39 INFO mapreduce.Job: map 0% reduce 0%
(・・・略)
16/06/08 17:13:48 INFO streaming.StreamJob: Output directory: /user/mapred/example/stream/wc_result

正常にMapReduceジョブとして実行できた。結果はHDFSのoutputオプションで指定したパスに出力されているので内容を確認する。

$ hadoop fs -cat /user/mapred/example/stream/wc_result/part-00000
{"ff"=>1, "cc"=>3, "ee"=>1, "bb"=>2, "dd"=>1, "aa"=>1}

さきほどコマンドラインでやったのと同じく、正しい結果が得られた!

分散データファイル生成

定型的なWordCountをHadoop Streamingで動かすことができた。次は、これを応用してランダムな文字列からなるファイル生成を分散させてやってみる。
こういった処理はHiveとかだとクエリが書きづらいので、Hadoop Streamingでやると簡単に書ける。

Mapperを作る

Hadoopの負荷分散の仕組みは、Mapperからの入力ごとにタスクを切り出して行うため、複数行の入力をあたえる必要がある。
そこで、今回は入力に各行に生成したランダムファイルの行数と行の長さが書かれたデータを想定する。
例えば、以下のような入力を与えると、10文字のランダム文字列が5行というファイルが2つ作られるような仕様とする。

5   10
5   10

Mapperの実装は以下。

gendata_map.rb

#!/usr/bin/ruby

def random_string(length)
 o = ("a".."z").to_a
 (0..length).map { o[rand(o.length)] }.join
end

def gendata_map(size, length)
 size.times do |i|
    puts random_string(length)
 end
end

# STDIN row : <row num> <row length>
STDIN.each_line do |line|
 size, length = line.chomp.split.map(&:to_i)
 size = 5 unless size
 length = 5 unless length
 gendata_map(size, length)
end

指定した長さのランダム文字列を作り、それを行数分だけ繰り返す。
ここでMapperのみでコマンドラインで単体テストを実施しておく。Hadoop StreamingはMapperやReducer単位で単体テストしやすいのがよい。

$ cat test.txt
10 2
10 2
10 2
$ cat test.txt | ruby gendata_map.rb
fkzuvyzib
lsvymrrlz
ecvtpsplx
wawdxfeps
lkstptsva
metunykpj

Mapperは意図どおりに動いているよう。

Reducerは不要

今回の処理はMapperだけで処理が完結しているので、このようなプログラムの場合はReducerは不要となる。

Hadoop Streamingで実行

WordCountのときと同じように実行する。ReducerがないのでReduceタスクは不要となる。
また、今回は行ごとに処理を分割したいので、map数とreducer数をオプションで明示して実行する(3行をそれぞれ別のMapperで処理させたいのでMap数は3とする)。

$ hadoop jar hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D mapred.map.tasks=3 \
-D mapred.reduce.tasks=0 \
-input /user/mapred/example/gendata/input/input.txt -output/user/mapred/example/gendata/output -mapper gendata_map.rb -file gendata_map.rb

正常に処理が完了し、outputディレクトリを確認すると3つのMapが生成したそれぞれのファイルが作成されている。

$ hadoop fs -ls /user/mapred/example/gendata/output | head
Found 10001 items
-rw-r--r-- 3 mapred hadoop     0 2016-06-13 15:59 /user/mapred/example/gendata/output/_SUCCESS
-rw-r--r-- 3 mapred hadoop 20600000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00000
-rw-r--r-- 3 mapred hadoop 10300000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00001
-rw-r--r-- 3 mapred hadoop 10300000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00002

内容は以下のようになっている。

$ hadoop fs -cat /user/mapred/example/gendata/output/part-00000
dcvvxbxyu
fnijosqtz
$ hadoop fs -cat /user/mapred/example/gendata/output/part-00001
ocndfuctp
limwdnvke
$ hadoop fs -cat /user/mapred/example/gendata/output/part-00002
mdkewafhj
bcekxzcns

今回はReducerで集約していないので、最終的な出力結果はMapperそれぞれが出力したファイルが並ぶことになるが、Hive等でテーブルにする場合はこれで問題ない。
大量ランダムデータ生成のような処理をHadoopで分散処理させたい場合はHadoop Streamingを使うと便利な気がする。

まとめ

Hadoop Streamingを、簡単なRubyスクリプトで動かしてみた。MapReduceをJavaで実装するのは面倒だけど、Hiveではクエリが書きづらい、といった場合に、標準入出力のみ意識したコードをかけばHadoopクラスタで分散処理できるため、便利な機能だと思う。
また、Hadoop入門者がMapReduceの動きを理解する上で、コマンドラインとパイプでMapperとReducerそれぞれの処理のイメージがつかめるので、勉強用でもいいと思う。

以上!

Hadoop関連書籍(過去に読んだ本)

Hadoop 第3版

Hadoop 第3版

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築