RubyでHadoop Streamingを動かしてみる
mHadoopでMapReduceジョブを実行するには最近はHiveを使うのが一般的だが、MapReduceを手軽に使うための方法としてHadoop Streamingがある。
これは標準入出力を利用してMapReduceジョブを実行できるというもので、Javaで複雑なコードを実装せずに手軽に試せるので便利。
Hiveばかり使っているとMapReduceジョブのレイヤーがブラックボックスとなってしまうけど、MapperやReducerの動き自体を理解する上でもわかりやすいので、今回はこれを試してみる。
ちなみに環境はCDH 5.5の入った4ノードのクラスタ環境で試す。
Hadoop Streamingを使うときはMapperとReducerのそれぞれの処理を実装する実行可能なファイルを用意する。
ここで、MapperやReducerは入力データを標準入力から受け取り、結果を標準出力するように実装するのがポイントとなる。
そのため、標準入出力さえできればなんだってよいので、perlだろうがシェルスクリプトでもHadoopが動かせる。便利。今回はRubyを使うことにする。
WordCount
分散処理といえばまずはWordCountを実装するのがならわしのようなので、文字数カウント処理をHadoop Streamingを使って実装してみる。
WordCountは入力にスペース区切りの単語が並んだテキストを持ち、それらのテキストごとの出現数を算出する。
Mapperを作る
まずはmapperを作る。
wc_map.rb
#!/usr/bin/ruby def wc_map(line) line.chomp.split.map { |e| [e, 1] } end def output(records) records.each do |key, value| puts "#{key}\t#{value}" end end while l = STDIN.gets output(wc_map(l)) end
標準入力から1行ずつテキストを受け取り、それをスペースで分割しつつ(単語, 1)という配列を作る。
最後にそれらをタブ区切りで標準出力する。
まずは単体で実行してみる。
$ cat sample.txt aa bb cc bb cc dd cc ee ff $ cat sample.txt | ./wc_map.r aa 1 bb 1 cc 1 bb 1 cc 1 dd 1 cc 1 ee 1 ff 1
期待どおりの結果がえられたので、次はこれ(実際はソートもされる)を標準入力から受け取り、単語ごとに集計した結果を標準出力するReducerを作る。
Reducerを作る
wc_reduce.rb
#!/usr/bin/ruby def wc_reduce(line, results) key, value = line.chomp.strip.split results[key] += value.to_i end results = Hash.new(0) while l = STDIN.gets wc_reduce(l, results) end p results
標準入力が単語、数値のタブ区切りなのでそれを分割し、単語をキーとするハッシュを作り、バリューの部分を足しあわせていく。
最後にハッシュを標準出力して終わり。
MapperとReducerができたので、これらをパイプで組み合わせて以下のようにテストする。Hadoop Streamingで分散する場合も同じように処理されるので、これが通ればプログラムレベルではHadoop Streamingで動く。
$ cat sample.txt | ./wc_map.rb | sort | ./wc_reduce.rb {"ff"=>1, "cc"=>3, "ee"=>1, "bb"=>2, "dd"=>1, "aa"=>1}
うまく動いているよう!
Hadoop Streamingで実行
MapperとReducerができたので、これをHadoop Streamingで分散処理させてみる。
以下のようなコマンドで実行する。入力となるサンプルファイルはHDFS上に配置した上でそのパスを指定する。また、-fileで作成したMapperとRedcuerを指定する。
$ hadoop jar CDH-5.5.1-1.cdh5.5.1.p1168.923/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar -input /user/mapred/example/stream/sample.txt -output /user/mapred/example/stream/wc_result -mapper wc_map.rb -reducer wc_reduce.rb -file wc_map.rb wc_reduce.rb 16/06/08 17:13:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. packageJobJar: [wc_map.rb, wc_reduce.rb] [CDH-5.5.2-1.cdh5.5.2.p0.4/jars/hadoop-streaming-2.6.0-cdh5.5.2.jar] /tmp/streamjob8479180127713306959.jar tmpDir=null 16/06/08 17:13:34 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm16 16/06/08 17:13:34 INFO mapred.FileInputFormat: Total input paths to process : 1 16/06/08 17:13:35 INFO mapreduce.JobSubmitter: number of splits:2 16/06/08 17:13:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1465349429352_0013 16/06/08 17:13:35 INFO impl.YarnClientImpl: Submitted application application_1465349429352_0013 16/06/08 17:13:35 INFO mapreduce.Job: The url to track the job: http://myhadoop04:8088/proxy/application_1465349429352_0013/ 16/06/08 17:13:35 INFO mapreduce.Job: Running job: job_1465349429352_0013 16/06/08 17:13:39 INFO mapreduce.Job: Job job_1465349429352_0013 running in uber mode : false 16/06/08 17:13:39 INFO mapreduce.Job: map 0% reduce 0% (・・・略) 16/06/08 17:13:48 INFO streaming.StreamJob: Output directory: /user/mapred/example/stream/wc_result
正常にMapReduceジョブとして実行できた。結果はHDFSのoutputオプションで指定したパスに出力されているので内容を確認する。
$ hadoop fs -cat /user/mapred/example/stream/wc_result/part-00000 {"ff"=>1, "cc"=>3, "ee"=>1, "bb"=>2, "dd"=>1, "aa"=>1}
さきほどコマンドラインでやったのと同じく、正しい結果が得られた!
分散データファイル生成
定型的なWordCountをHadoop Streamingで動かすことができた。次は、これを応用してランダムな文字列からなるファイル生成を分散させてやってみる。
こういった処理はHiveとかだとクエリが書きづらいので、Hadoop Streamingでやると簡単に書ける。
Mapperを作る
Hadoopの負荷分散の仕組みは、Mapperからの入力ごとにタスクを切り出して行うため、複数行の入力をあたえる必要がある。
そこで、今回は入力に各行に生成したランダムファイルの行数と行の長さが書かれたデータを想定する。
例えば、以下のような入力を与えると、10文字のランダム文字列が5行というファイルが2つ作られるような仕様とする。
5 10 5 10
Mapperの実装は以下。
gendata_map.rb
#!/usr/bin/ruby def random_string(length) o = ("a".."z").to_a (0..length).map { o[rand(o.length)] }.join end def gendata_map(size, length) size.times do |i| puts random_string(length) end end # STDIN row : <row num> <row length> STDIN.each_line do |line| size, length = line.chomp.split.map(&:to_i) size = 5 unless size length = 5 unless length gendata_map(size, length) end
指定した長さのランダム文字列を作り、それを行数分だけ繰り返す。
ここでMapperのみでコマンドラインで単体テストを実施しておく。Hadoop StreamingはMapperやReducer単位で単体テストしやすいのがよい。
$ cat test.txt 10 2 10 2 10 2 $ cat test.txt | ruby gendata_map.rb fkzuvyzib lsvymrrlz ecvtpsplx wawdxfeps lkstptsva metunykpj
Mapperは意図どおりに動いているよう。
Reducerは不要
今回の処理はMapperだけで処理が完結しているので、このようなプログラムの場合はReducerは不要となる。
Hadoop Streamingで実行
WordCountのときと同じように実行する。ReducerがないのでReduceタスクは不要となる。
また、今回は行ごとに処理を分割したいので、map数とreducer数をオプションで明示して実行する(3行をそれぞれ別のMapperで処理させたいのでMap数は3とする)。
$ hadoop jar hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \ -D mapred.map.tasks=3 \ -D mapred.reduce.tasks=0 \ -input /user/mapred/example/gendata/input/input.txt -output/user/mapred/example/gendata/output -mapper gendata_map.rb -file gendata_map.rb
正常に処理が完了し、outputディレクトリを確認すると3つのMapが生成したそれぞれのファイルが作成されている。
$ hadoop fs -ls /user/mapred/example/gendata/output | head Found 10001 items -rw-r--r-- 3 mapred hadoop 0 2016-06-13 15:59 /user/mapred/example/gendata/output/_SUCCESS -rw-r--r-- 3 mapred hadoop 20600000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00000 -rw-r--r-- 3 mapred hadoop 10300000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00001 -rw-r--r-- 3 mapred hadoop 10300000 2016-06-13 15:47 /user/mapred/example/gendata/output/part-00002
内容は以下のようになっている。
$ hadoop fs -cat /user/mapred/example/gendata/output/part-00000 dcvvxbxyu fnijosqtz $ hadoop fs -cat /user/mapred/example/gendata/output/part-00001 ocndfuctp limwdnvke $ hadoop fs -cat /user/mapred/example/gendata/output/part-00002 mdkewafhj bcekxzcns
今回はReducerで集約していないので、最終的な出力結果はMapperそれぞれが出力したファイルが並ぶことになるが、Hive等でテーブルにする場合はこれで問題ない。
大量ランダムデータ生成のような処理をHadoopで分散処理させたい場合はHadoop Streamingを使うと便利な気がする。
まとめ
Hadoop Streamingを、簡単なRubyスクリプトで動かしてみた。MapReduceをJavaで実装するのは面倒だけど、Hiveではクエリが書きづらい、といった場合に、標準入出力のみ意識したコードをかけばHadoopクラスタで分散処理できるため、便利な機能だと思う。
また、Hadoop入門者がMapReduceの動きを理解する上で、コマンドラインとパイプでMapperとReducerそれぞれの処理のイメージがつかめるので、勉強用でもいいと思う。
以上!
Hadoop関連書籍(過去に読んだ本)
- 作者: Tom White,Sky株式会社玉川竜司,兼田聖士
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/07/26
- メディア: 大型本
- この商品を含むブログ (4件) を見る
Hadoop徹底入門 第2版 オープンソース分散処理環境の構築
- 作者: 太田一樹,岩崎正剛,猿田浩輔,下垣徹,藤井達朗,山下真一,濱野賢一朗
- 出版社/メーカー: 翔泳社
- 発売日: 2013/07/09
- メディア: 大型本
- この商品を含むブログ (5件) を見る