ScalaでMapReduceを書くためのTwitter製フレームワーク「Scalding」を使ってみた

※ この記事は、Scaldingのgithub上の、branch/0.9.0のReadmeと、Getting Startedをまとめた超訳です。

Scalding とは

Scaldingは、Hadoop上のMapReduce開発を簡単にするためのScalaライブラリです。MapとReduceの機能をベタに書くよりも、よりScalaっぽいコーディングを可能にしてくれます。Pig に似ていますが、ScalaJVMに統合させるためのより高度な抽象化を提供してくれます。

ScaldingはCascadingをベースにしています。Cascadingについては、Hadoop Cascadingメモ(Hishidama's Hadoop Cascading Memo)にて分かりやすく解説されています。

また、Hadoop環境を構築しなくても、MapReduceプログラムを稼働させることが可能で、非常に簡単にプログラムを検証することが可能です。

WordCount

HadoopのHalloWorld的サンプルプログラム・WordCountをScaldingで書くとこうなります。

package com.twitter.scalding.examples

import com.twitter.scalding._

class WordCountJob(args : Args) extends Job(args) {
  TextLine( args("input") )
    .flatMap('line -> 'word) { line : String => tokenize(line) }
    .groupBy('word) { _.size }
    .write( Tsv( args("output") ) )

  // 文章を単語に分解
  def tokenize(text : String) : Array[String] = {
    // 各単語を小文字化して、句読点を削除
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
  }
}

tokenize 関数を見てください。他のMapReduce処理と自然に統合されていることが分かります。これはScaldingのとても強力な特徴です。(PigのUDFの使い方と比べてみてください)

他のサンプルは、/tutorialでたくさん見ることができます。もし他の言語と比較したくなったら、Rosetta Code · twitter/scalding Wiki · GitHubを参照してみてください。ここではいくつかのMapReduceタスクをScaldingや、PigやHadoop Streamingなどの他の言語・フレームワークで書いたものを載せています。

Getting Started

事前準備

何に置いてもまずは、sbt0.12.2をインストールしておいてください。

次に、Scalding のgithubリポジトリをclone してください。

git clone git@github.com:twitter/scalding.git -b branch/0.9.0

次にこのコードをsbtでビルドしてください。sbtのバージョンに注意です。

sbt update
sbt test
sbt assembly

さて、本題に入りましょう。

Scalding で WordCount

シンプルなWordCountジョブを見てみましょう

import com.twitter.scalding._

class WordCountJob(args : Args) extends Job(args) {
  TextLine(args("input"))
    .flatMap('line -> 'word) { line : String => line.split("""\s+""") }
    .groupBy('word) { _.size }
    .write(Tsv(args("output")))
}

このジョブは、読み込んだファイルをlineに出力して、各単語をカウントし、単語と単語数のペアをタブ区切りのファイルに出力します。

このジョブを稼働させるには、以下のように scald.rb スクリプトを使います。

scripts/scald.rb --local WordCountJob.scala --input someInputfile.txt --output ./someOutputFile.tsv

このスクリプトではローカルモードでジョブが稼働します。(Hadoopクラスタ上での稼働ではありません)

WordCount 解説

前述のWordCountジョブを詳細に見てみましょう。

TextLine

TextLine は、Scalding の入出力関数のひとつで、ファイルの各行を読み込んで、 line という名前のフィールドに格納します。

TextLine(args("input")) // 引数 "input"には読み込み対象のファイル名も含まれる

他の入出力関数に、タブ区切りのファイルを読み込む、 Tsv があります。HDFS上のLZO圧縮されたファイルを直接読み込む関数や、MySQLテーブルから直接読み込む関数も作ることができます。

flatMap

flatMap は Tupleストリーム適用することができる関数の一つです。

TextLine(args("input"))
  // lineフィールドを空白で分割しつつ、wordフィールドに格納
  .flatMap('line -> 'word) { line : String => line.split("\s+") }

最初に flatMap したいフィールド名(この場合は、line )と、出力フィールド名(この場合は、word )を指定します。そして、これらのフィールドに対してどのようにflatMapするかするかの処理を引き渡します(この場合は、line を単語に分割)。

この時点でtupleストリームは以下のような感じになっています。

this is a line this
this is a line is
this is a line a
this is a line line
...

その他のflatMapの例や、tupleストリームに変換する処理の例は、API Referenceを参照してください。

groupBy

次に、同じ単語をグルーピングし、各グループの単語数を数えます。

TextLine(args("input"))
  .read
  .flatMap('line -> 'word) { line : String => line.split("\s+") }
  .groupBy('word) { _.size } //  .groupBy('word) { group => group.size } というのと同じ処理です

ここでは、Tupleストリームを同じ単語でグルーピングして、各グループのサイズ(単語数)を新しいフィールドとして追加しています。デフォルトでは新しいフィールドは単純に size になりますが、_.size('numWords) とすることによってフィールド名を指定することができます。

この時点でTupleストリームは、以下のような感じになります。

hello 5
world 3
this 1
...

繰り返しになりますが、グルーピング処理の例は、他にも、API Referenceにありますので、参照してください。

write, Tsv

最後に、TextLine 入出力関数でデータを読み込んだように、これまでの処理結果を Tsv 入出力関数 を使って出力することができます。

TextLine(args("input"))
  .flatMap('line -> 'word) { line : String => line.split("\s+") }
  .groupBy('word) { _.size }
  .write(Tsv(args("output")))
scald.rb

scald.rb スクリプトは、scripts/ ディレクトリにある、お手軽な実行スクリプトです。これによって、ローカルモードでの稼働と、リモートのHadoopクラスタ環境での稼働の両方が制御できます。このスクリプトでは、簡単なコマンドラインが解釈され、リモートジョブ実行時に必要なJARファイルがコピーされます。

もしたくさんのScaldingジョブを実行するのであれば、このスクリプトをPATHに通すと便利です。
例えばこんな感じで。

ln -s scripts/scald.rb $HOME/bin/

これによって、scald.rb へのシンボリックリンクを、$HOME/bin ディレクトリに追加することができます。


次は、Tutorialをやっていきたいと思います。