ScalaでMapReduceを書くためのTwitter製フレームワーク「Scalding」を使ってみた
※ この記事は、Scaldingのgithub上の、branch/0.9.0のReadmeと、Getting Startedをまとめた超訳です。
Scalding とは
Scaldingは、Hadoop上のMapReduce開発を簡単にするためのScalaライブラリです。MapとReduceの機能をベタに書くよりも、よりScalaっぽいコーディングを可能にしてくれます。Pig に似ていますが、ScalaとJVMに統合させるためのより高度な抽象化を提供してくれます。
ScaldingはCascadingをベースにしています。Cascadingについては、Hadoop Cascadingメモ(Hishidama's Hadoop Cascading Memo)にて分かりやすく解説されています。
また、Hadoop環境を構築しなくても、MapReduceプログラムを稼働させることが可能で、非常に簡単にプログラムを検証することが可能です。
WordCount
HadoopのHalloWorld的サンプルプログラム・WordCountをScaldingで書くとこうなります。
package com.twitter.scalding.examples import com.twitter.scalding._ class WordCountJob(args : Args) extends Job(args) { TextLine( args("input") ) .flatMap('line -> 'word) { line : String => tokenize(line) } .groupBy('word) { _.size } .write( Tsv( args("output") ) ) // 文章を単語に分解 def tokenize(text : String) : Array[String] = { // 各単語を小文字化して、句読点を削除 text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") } }
tokenize 関数を見てください。他のMapReduce処理と自然に統合されていることが分かります。これはScaldingのとても強力な特徴です。(PigのUDFの使い方と比べてみてください)
他のサンプルは、/tutorialでたくさん見ることができます。もし他の言語と比較したくなったら、Rosetta Code · twitter/scalding Wiki · GitHubを参照してみてください。ここではいくつかのMapReduceタスクをScaldingや、PigやHadoop Streamingなどの他の言語・フレームワークで書いたものを載せています。
Getting Started
事前準備
何に置いてもまずは、sbt0.12.2をインストールしておいてください。
次に、Scalding のgithubリポジトリをclone してください。
git clone git@github.com:twitter/scalding.git -b branch/0.9.0
次にこのコードをsbtでビルドしてください。sbtのバージョンに注意です。
sbt update
sbt test
sbt assembly
さて、本題に入りましょう。
Scalding で WordCount
シンプルなWordCountジョブを見てみましょう
import com.twitter.scalding._ class WordCountJob(args : Args) extends Job(args) { TextLine(args("input")) .flatMap('line -> 'word) { line : String => line.split("""\s+""") } .groupBy('word) { _.size } .write(Tsv(args("output"))) }
このジョブは、読み込んだファイルをlineに出力して、各単語をカウントし、単語と単語数のペアをタブ区切りのファイルに出力します。
このジョブを稼働させるには、以下のように scald.rb スクリプトを使います。
scripts/scald.rb --local WordCountJob.scala --input someInputfile.txt --output ./someOutputFile.tsv
このスクリプトではローカルモードでジョブが稼働します。(Hadoopクラスタ上での稼働ではありません)
WordCount 解説
前述のWordCountジョブを詳細に見てみましょう。
TextLine
TextLine は、Scalding の入出力関数のひとつで、ファイルの各行を読み込んで、 line という名前のフィールドに格納します。
TextLine(args("input")) // 引数 "input"には読み込み対象のファイル名も含まれる
他の入出力関数に、タブ区切りのファイルを読み込む、 Tsv があります。HDFS上のLZO圧縮されたファイルを直接読み込む関数や、MySQLテーブルから直接読み込む関数も作ることができます。
flatMap
flatMap は Tupleストリーム適用することができる関数の一つです。
TextLine(args("input")) // lineフィールドを空白で分割しつつ、wordフィールドに格納 .flatMap('line -> 'word) { line : String => line.split("\s+") }
最初に flatMap したいフィールド名(この場合は、line )と、出力フィールド名(この場合は、word )を指定します。そして、これらのフィールドに対してどのようにflatMapするかするかの処理を引き渡します(この場合は、line を単語に分割)。
この時点でtupleストリームは以下のような感じになっています。
this is a line this
this is a line is
this is a line a
this is a line line
...
その他のflatMapの例や、tupleストリームに変換する処理の例は、API Referenceを参照してください。
groupBy
次に、同じ単語をグルーピングし、各グループの単語数を数えます。
TextLine(args("input")) .read .flatMap('line -> 'word) { line : String => line.split("\s+") } .groupBy('word) { _.size } // .groupBy('word) { group => group.size } というのと同じ処理です
ここでは、Tupleストリームを同じ単語でグルーピングして、各グループのサイズ(単語数)を新しいフィールドとして追加しています。デフォルトでは新しいフィールドは単純に size になりますが、_.size('numWords) とすることによってフィールド名を指定することができます。
この時点でTupleストリームは、以下のような感じになります。
hello 5
world 3
this 1
...
繰り返しになりますが、グルーピング処理の例は、他にも、API Referenceにありますので、参照してください。
write, Tsv
最後に、TextLine 入出力関数でデータを読み込んだように、これまでの処理結果を Tsv 入出力関数 を使って出力することができます。
TextLine(args("input")) .flatMap('line -> 'word) { line : String => line.split("\s+") } .groupBy('word) { _.size } .write(Tsv(args("output")))
scald.rb
scald.rb スクリプトは、scripts/ ディレクトリにある、お手軽な実行スクリプトです。これによって、ローカルモードでの稼働と、リモートのHadoopクラスタ環境での稼働の両方が制御できます。このスクリプトでは、簡単なコマンドラインが解釈され、リモートジョブ実行時に必要なJARファイルがコピーされます。
もしたくさんのScaldingジョブを実行するのであれば、このスクリプトをPATHに通すと便利です。
例えばこんな感じで。
ln -s scripts/scald.rb $HOME/bin/
これによって、scald.rb へのシンボリックリンクを、$HOME/bin ディレクトリに追加することができます。
次は、Tutorialをやっていきたいと思います。