Asakusa Framework : はじめの一歩

f:id:teppei-studio:20131112202621j:plain

前回の記事でAsakusaがおススメだということを書きましたが、今回はではどうAsakusa Frameworkを始めるのかというところごご紹介したいと思います。

環境構築から、簡単なバッチアプリケーションを作るところまで通してやってみたいと思います。

尚、この記事でお見せするソースは、GitHubに上げているので、適宜ご参照ください。

環境構築

まずは環境構築です。
ちなみに、Mac OSX 10.7.5 上でやっています。(※別環境で、Mac OSX 10.6.8でテスト中にJobFlowのJarが見つからないという事象が発生しました。)
Windowsで環境構築する方法はNode0(有償)を使う他にはありません。

Jinrikishaのインストール

さて、これからHadoop環境を構築し、Asakusaの開発環境を構築していくわけですが、素でやるとなかなか大変です。しかし、Asakusaは素晴らしいことに、開発環境構築用パッケージを用意しています。その名も「jinrikisha」!非常に便利です。

jinrikishaのダウンロードページからダウンロードしてインストールします。インストールは手順に従っていけば特につまづくことなく進んでいけます。途中、いくつか聞かれるので、Yとかデフォルトとかでどんどん進めていきます。

これでHadoopの環境構築から、Asaksuaの導入、Eclipseの導入までまるっと全部やってくれます。素晴らしい。

※ この記事を書いている最中に、0.5.2がリリースされたようなので、途中で切り替えたのですが、Eclipseでプロジェクトをインポートした際に、asakusa-develop/repository/commons-codec/commons-codec/1.5/commons-codec-1.5.jar が無いというエラーが出ました。今回は、別のところから持ってきて、対応しちゃってます。

はじめてのアプリケーション作成

さて、ここまではいいんですが、ここからのはじめての一歩がなかなか情報が少ないです。

どんな処理を作るか

今回は、以下のように商品マスタと売上明細のテーブルをジョインさせるような簡単な処理を作ってみます。

f:id:teppei-studio:20131119191410p:plain

プロジェクト作成

このあたりはまだ公式ドキュメントに記載がありますが、網羅的に書かれているものなので、ここでは最低限必要な手順を書いていきます。

workspaceディレクトリに移動します。

cd $ASAKUSA_HOME
cd ../workspace/

Mavenアーキタイプカタログを指定してアプリケーションを作成します。

mvn archetype:generate -DarchetypeCatalog=http://asakusafw.s3.amazonaws.com/maven/archetype-catalog-0.5.xml

アーキタイプの選択を要求されます。ここでは、3番のdirectioを指定します。

Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 3

Asakusa Frameworkのバージョンの指定を要求されるので、ここではデフォルトの4(0.5.2)を選択します。

Choose com.asakusafw:asakusa-archetype-directio version: 

groupidを聞かれますので、適当に指定してください。

Define value for property 'groupId': : teppeistudio

artifactIdを聞かれます。これがアプリケーション名になります。

Define value for property 'artifactId': : firststep

バージョンの指定を要求されます。デフォルト通りいきましょう。

Define value for property 'version':  1.0-SNAPSHOT: : 

パッケージ名の指定を要求されます。デフォルト通りいきましょう。

Define value for property 'package':  teppeistudio: : 

設定の内容について確認されるので「Y」で先に進みましょう。

Confirm properties configuration:
groupId: teppeistudio
artifactId: firststep
version: 
package: teppeistudio
Y: : Y

アプリケーション作成完了です。

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------


さて、このアプリケーションをEclipseで開発できるようにしましょう。

cd firststep
mvn eclipse:eclipse

これで、作成したプロジェクトをEclipseインポートすることができるようになりました。

尚、ここまでの手順については、
Mavenアーキタイプ利用ガイド — Asakusa Framework 0.5.1 documentationに詳しく書かれてあります。

データモデルの作成

まずはデータモデルを作成します。Asakusaの開発では、兎に角、データモデルの作成から始まります。先ほどの図を改めて確認しましょう。

f:id:teppei-studio:20131119191410p:plain

このアプリケーションは、インプットとして、「商品マスタ」と「売上明細」のデータが必要です。このデータモデルを作ってみましょう。

Eclipseにインポートしたプロジェクトから、src > main > dmdl > models.dmdl を開いてみます。ここに既にサンプルとなるアプリケーションのモデルが入っていますが、これを一旦、まるまる以下に置き換えてください。

-- 入力CSVファイル形式

"商品マスタ"
@directio.csv(
    has_header = TRUE
)
item_master = {
	"商品ID"
    @directio.csv.field(name = "商品ID")
	item_id : INT;

	"商品名"
    @directio.csv.field(name = "商品名")
	item_name : TEXT;
};

"売上明細"
@directio.csv(
    has_header = TRUE
)
sales_detail = {
	"明細ID"
    @directio.csv.field(name = "明細ID")
	sales_id : INT;

	"商品ID"
    @directio.csv.field(name = "商品ID")
	item_id : INT;

    "売上数"
    @directio.csv.field(name = "売上数")
    sales_number : INT;
};

ご覧の通り、商品マスタと売上明細の二つのデータモデルが定義されています。

models.dmdlが上記定義に差し替えたならば、Eclipse上でpom.xmlを右クリックし、 Run As > 4 Maven generate-sources を選択し、DMDLからのソース生成処理を稼働させてください。

そうすると、Eclipse上でいくつかエラーが出てきます。これはDMDLの中身を差し替える前の定義情報から生成されたものに依存しているクラスがあるためです。エラーとなっているソースファイルを片っ端から削除していってください。

※ exampleソース抜きでプロジェクトを作成したいんだけど、やり方は無いんだろうか。

次に、上記で作成した二つのモデルをJOINさせる定義を追加します。
同じ、models.dmdl に以下記載を追加してください

joined joined_sales_detail = item_master -> {
	item_id -> item_id;
	item_name -> item_name;
} % item_id + sales_detail -> {
	sales_id -> sales_id;
	item_id -> item_id;
	sales_number -> sales_number;
} % item_id;

また、出力用のデータモデルを作りたいので、さらに、以下のモデル定義をmodels.dmdlに定義してください。

"売上明細2"
@directio.csv(
    has_header = TRUE
)
sales_detail2 = {
	"明細ID"
	@directio.csv.field(name = "明細ID")
	sales_id : INT;

	"商品ID"
	@directio.csv.field(name = "商品ID")
	item_id : INT;

	"商品名"
	@directio.csv.field(name = "商品名")
	item_name : TEXT;

	"売上数"
	@directio.csv.field(name = "売上数")
	sales_number : INT;
};


完成したmodels.dmdlは、こちら(
Asakusa-FirstStep/src/main/dmdl/models.dmdl at master · ironpeace/Asakusa-FirstStep · GitHub
)を参照してください。

dmdlが完成したら、再度、Eclipse上でpom.xmlを右クリックし、 Run As > 4 Maven generate-sources を選択し、DMDLからのソース生成処理を稼働させておいてください。

CSV入出力用クラスの作成

このデータモデルにCSVファイルのデータを入出力するための処理を作ります。

jobflowパッケージ配下に、以下のように3つのクラスを作って行きます。

商品マスタデータの入力用クラス

package teppeistudio.jobflow;

import teppeistudio.modelgen.dmdl.csv.AbstractItemMasterCsvInputDescription;

public class ItemMasterFromCsv extends AbstractItemMasterCsvInputDescription {

	@Override
	public String getBasePath() {
		return "firststep";
	}

	@Override
	public String getResourcePattern() {
		return "item-master.csv";
	}

}

売上明細データの入力用クラス

package teppeistudio.jobflow;

import teppeistudio.modelgen.dmdl.csv.AbstractSalesDetailCsvInputDescription;

public class SalesDetailFromCsv extends AbstractSalesDetailCsvInputDescription {

	@Override
	public String getBasePath() {
		return "firststep";
	}

	@Override
	public String getResourcePattern() {
		return "sales-detail.csv";
	}

}

売上明細2データの出力処理用クラス

package teppeistudio.jobflow;

import teppeistudio.modelgen.dmdl.csv.AbstractSalesDetail2CsvOutputDescription;

public class SalesDetail2ToCsv extends AbstractSalesDetail2CsvOutputDescription {

	@Override
	public String getBasePath() {
		return "firststep_out";
	}

	@Override
	public String getResourcePattern() {
		return "sales-detail2.csv";
	}

}

Operatorを作成

次に、実際にJOINの実装を行う、Operatorの処理を実装します。
operatorパッケージに以下クラスを作成してください。

package teppeistudio.operator;

import teppeistudio.modelgen.dmdl.model.ItemMaster;
import teppeistudio.modelgen.dmdl.model.JoinedSalesDetail;
import teppeistudio.modelgen.dmdl.model.SalesDetail;

import com.asakusafw.vocabulary.operator.MasterJoin;

public abstract class JoinOperator {

	@MasterJoin
	public abstract JoinedSalesDetail joined(ItemMaster master, SalesDetail detail);
}

JobFlowの作成

続いて、Operatorをどう呼出すかという実装をしていきます。

package teppeistudio.jobflow;

import teppeistudio.modelgen.dmdl.model.ItemMaster;
import teppeistudio.modelgen.dmdl.model.SalesDetail;
import teppeistudio.modelgen.dmdl.model.SalesDetail2;
import teppeistudio.operator.JoinOperatorFactory;
import teppeistudio.operator.JoinOperatorFactory.JoinedData;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory.Project;

@JobFlow(name="MainJobFlow")
public class MainJobFlow extends FlowDescription {

	final In<ItemMaster> itemMaster;
	final In<SalesDetail> salesDetail;
	final Out<SalesDetail2> salesDetail2;

	public MainJobFlow(
			@Import(name="ItemMaster", description=ItemMasterFromCsv.class)
			In<ItemMaster> itemMaster,
			@Import(name="SalesDetail", description=SalesDetailFromCsv.class)
			In<SalesDetail> salesDetail,
			@Export(name="SalesDetail2", description=SalesDetail2ToCsv.class)
			Out<SalesDetail2> salesDetail2
			){

		this.itemMaster = itemMaster;
		this.salesDetail = salesDetail;
		this.salesDetail2 = salesDetail2;
	}
	
	@Override
	protected void describe() {
		JoinOperatorFactory op = new JoinOperatorFactory();
		CoreOperatorFactory cp = new CoreOperatorFactory();
		
		JoinedData joinedData = op.joinedData(this.itemMaster, this.salesDetail);
		cp.stop(joinedData.missed);
		
		Project<SalesDetail2> project = cp.project(joinedData.joined, SalesDetail2.class);
		
		salesDetail2.add(project.out);
	}

}

Batchの実装

最後にバッチアプリケーションを作成です。ここではJobFlowを呼出すだけです。
batchパッケージ配下に以下クラスを作成してください。

package teppeistudio.batch;

import teppeistudio.jobflow.MainJobFlow;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;

@Batch(name="MainBatch")
public class MainBatch extends BatchDescription {

	@Override
	protected void describe() {
		run(MainJobFlow.class).soon();
	}

}

テスト実施

ここまででアプリケーションの実装は終了です。
次は、テストを実装してみましょう。

テストはOperatorのテストと、JobFlowのテストと二種類あるのですが、今回の例では、Operatorとして開発者自身が実装したものがほとんどないので、テストとしてやれることはありません。JobFlowのテストだけ実装します。

テスト用入出力データの定義

JobFlowのテストは、入出力データをExcelで定義してテストします。Excelファイルは、DMDLの定義をベースに自動生成されて、target/excel/ 配下に格納されています。これをまずはsrc/test/resources 配下に移しましょう。(exampleのものが元々入っているので一旦消しておきましょう)

cd $ASAKUSA_HOME
cd ../workspace/firststep
rm -rf src/test/resources/*.xls
cp target/excel/* src/test/resources/teppeistudio/jobflow/

以下のファイルが格納されていると思います。

item_master.xls
joined_sales_detail.xls
sales_detail.xls
sales_detail2.xls

今回は、joined_sales_detail.xls は使いません。

残りの3つのファイルをExcelで開いて下記のように入力してください。
Excelをお持ちでなければ、OpenOfficeでも大丈夫です。

item_master.xls の シート「input」
f:id:teppei-studio:20131120150943p:plain

sales_detail.xls の シート「input」
f:id:teppei-studio:20131120150949p:plain

sales_detail2.xls の シート「output」
f:id:teppei-studio:20131120150957p:plain

sales_detail2.xls の シート「rule」
f:id:teppei-studio:20131120151003p:plain

テストクラスの作成

次に、テストクラスを作成します。上記のテストデータをどう利用するかを定義するものになります。

package teppeistudio.jobflow;

import org.junit.Test;

import teppeistudio.modelgen.dmdl.model.ItemMaster;
import teppeistudio.modelgen.dmdl.model.SalesDetail;
import teppeistudio.modelgen.dmdl.model.SalesDetail2;

import com.asakusafw.testdriver.JobFlowTester;


public class MainTest {

	@Test
	public void test(){
		JobFlowTester tester = new JobFlowTester(getClass());
		
		tester
			.input("ItemMaster", ItemMaster.class)
			.prepare("item_master.xls#input");
		
		tester
			.input("SalesDetail", SalesDetail.class)
			.prepare("sales_detail.xls#input");

		tester
			.output("SalesDetail2", SalesDetail2.class)
			.verify("sales_detail2.xls#output", "sales_detail2.xls#rule");
		
		tester.runTest(MainJobFlow.class);
	}
}

作成が完了したら、Eclipse上でこのクラスを右クリックして、Run As > 1 JUnit Test を実行してください。テストが通ります。

バッチ実行

テストが完了したら、実際にバッチを実行してみましょう。

パッケージング

まずはパッケージングです。

cd $ASAKUSA_HOME
cd ../workspace/Asakusa-FirstStep/
mvn clean package

これで、ソースの再生成からテストの実行、ビルド、パッケージングまで実施されます。

デプロイ

プロジェクトディレクトリ配下のtargetディレクトリ配下にJARができあがっているので、それを、$ASAKUSA_HOME/batchapps配下にコピーし、展開します。

cp target/firststep-batchapps-1.0-SNAPSHOT.jar $ASAKUSA_HOME/batchapps/
cd $ASAKUSA_HOME/batchapps
jar xvf firststep-batchapps-1.0-SNAPSHOT.jar

データ配置

バッチが読み込むデータを配置してあげます。
デフォルトでは、$HOME/target/testing/directio 配下を見に行くので、そこに、firststep ディレクトリを掘って、以下のファイルを格納しましょう。

$HOME/target/testing/directio/firststep/item-master.csv

"商品ID","商品名"
1,"チョコレート"
2,"アメ"

$HOME/target/testing/directio/firststep/sales-detail.csv

"明細ID","商品ID","売上数"
10,1,100
11,2,200
12,1,300
13,2,400

バッチ実行

いよいよバッチ実行です。

$ASAKUSA_HOME/yaess/bin/yaess-batch.sh FirstStepBatch

出力は、 $HOME/target/testing/directio/firststep_out/ 配下に出ます。
$HOME/target/testing/directio/firststep_out/sales-detail2.csv

明細ID,商品ID,商品名,売上数
10,1,チョコレート,100
12,1,チョコレート,300
11,2,アメ,200
13,2,アメ,400

実行結果確認

実行結果を簡単に追ってみましょう

標準出力結果を確認

バッチアプリケーションを実行すると大量の標準出力が出ます。
今回実行した結果について、以下のところだけ追ってみてください。

Starting YAESS

(略)

13/11/24 06:05:13 INFO stage.AbstractStageClient: Job Submitted: id=job_local1588909256_0001, name=YAESS/FirstStepBatch/MainJobFlow/main/d1f223f6-8642-4b4d-9b43-af7108a9c1da/stage0001

(略)

13/11/24 06:05:14 INFO mapred.JobClient:  map 100% reduce 100%
13/11/24 06:05:14 INFO mapred.JobClient: Job complete: job_local1588909256_0001

(略)

13/11/24 06:05:14 INFO mapred.JobClient:     Map input records=6

(略)

13/11/24 06:05:14 INFO mapred.JobClient:     Reduce input records=6

(略)

13/11/24 06:05:14 INFO mapred.JobClient:     Reduce output records=4
13/11/24 06:05:14 INFO mapred.JobClient:     Map output records=6

(略)

2013/11/24 06:05:19 INFO  [YS-BOOTSTRAP-I00999] Exiting YAESS: code=0, elapsed=8,101ms
Finished: SUCCESS

ご覧のように、

  • stage001というJOBが稼働したこと
  • Map のインプットが6件だったこと
  • Map のアウトプットが6件だったこと
  • Reduce のインプットが6件だったこと
  • Reduce のアウトプットが4件だったこと

が分かります。

稼働結果確認やデバッグで確認すると便利です。

自動生成ドキュメントの確認

最初に、Graphizをインストールしておきましょう。

Download. | Graphviz - Graph Visualization Software


Asakusaが生成したJobFlowの図を画像出力してみます。

cd $ASAKUSA_HOME
cd batchapps/FirstStepBatch/opt/dsl-analysis/jobflow/MainJobFlow/
dot -Tpng -o stagegraph.png stagegraph.dot 

こうして出来上がった、stagegraph.pngがこれです

f:id:teppei-studio:20131124075900p:plain

前述の標準出力結果にあったstage001のJOBひとつが作られていて、その中でMasterJoinの処理が実装されていることが分かると思います。この画像を見て、AsakusaがどういうMapReduceプログラムを自動生成しているかを確認し、その稼働状況を標準出力で確認することができます。

おしまい

ふう。これで一通りできました。
プロジェクト作成から実行まで、この程度の規模で、慣れれば30分といったところだと思います。
やはり一定規模以上のアプリケーションを作るのでないと、ちょっとツライ手間かもしれませんね。

この記事でお見せたソースは、一通り、GitHubに上げているので、適宜ご参照ください。

次回以降は、演算子の検証を中心に、具体的な使い方や、ここがいまいち、というところを紹介していきたいと思います。