SparkのDAGを確認するためにSpark UI Event LogをSpark SQLで調べてみた

Spark では、Spark UIというWEB画面が用意されており、実行中のSparkアプリケーションの稼働状況を確認することができます。

f:id:teppei-studio:20150224192017p:plain

このSpark UIを見ると、どのJOB、Stageがどの程度時間かかっていて、あとどれくらい残っているかとか、どれだけRDDがメモリを消費しているかなどが分かります。

ところが、自分が実装した処理のどれが、どのJOBなのか、どのStageなのか、あるいは自分が作成したRDDオブジェクトがどれなのか、Spark UIを見てもよく分かりません。

RDDの区別は、setName を使えば、RDDに対して明示的に名前を設定できます。ただ、プログラム上でsetNameできるRDD以外にも内部的には色々RDDオブジェクトが生成されて、それがSpark UI上に表示されるのでよく分かりません。

もっとわからないのは、自分が実装したどの処理が、どのJOBか、どのStageなのかです。

これを調べるために、Spark UIのEventLogを調べてみることにします。

EventLogの出力はデフォルトでは false ですが、spark.eventLog.enabled をtrue にすることで出力することができます。オプションは、spark-submit時には、--confで指定します。そうでない場合は、new SparkConf()に直接指定することが可能です。

ログの出力先は、デフォルトでは、/tmp/spark-events/配下に出力されます。

ログはJSON形式で出力されます。

ところで、Spark SQLには、JSONスキーマを事前設定無しに動的に解析して、そのJSONデータに対してSQLを実行できるというすごい機能があります。せっかくなので、これを使ってみたいと思います。

サンプルソースをこちらにアップしてありますので、ご参照ください。


ironpeace/spark-eventlog-parse-sample · GitHub

まずは適当なディレクトリでcloneしてください。

$ git clone https://github.com/ironpeace/spark-eventlog-parse-sample.git
$ cd spark-eventlog-parse-sample/

WordCountのサンプルプログラムを用意しているので、まずはそれを実行してみてください。

$ sbt 'run WordCount'

※ 一度実行するとプロジェクト直下に count-result ディレクトリができて、二度目以降の実行時にはこれを消しておかないとエラーになるのでご注意ください。

/tmp/spark-events/ 配下を確認してみると、ディレクトリができています。

$ ll /tmp/spark-events/
drwxrwx---  5 teppei_tosa  wheel  170  2 24 19:44 sparkeventlogparsesample-1424774686030

このディレクトリ配下のEVENT_LOG_1ファイルをフルパス指定して、下記のように実行してみてください。

$ sbt 'run EventLogParser /tmp/spark-events/sparkeventlogparsesample-1424774686030/EVENT_LOG_1'

途中でこのJSONファイルのスキーマが出力されます。すごいですね、Spark SQL

root
 |-- AppName: string (nullable = true)
 |-- BlockManagerID: struct (nullable = true)
 |    |-- ExecutorID: string (nullable = true)
 |    |-- Host: string (nullable = true)
 |    |-- NettyPort: integer (nullable = true)
 |    |-- Port: integer (nullable = true)
 |-- ClasspathEntries: struct (nullable = true)
 |    |-- /Users/teppei_tosa/Apps/sbt/bin/sbt-launch.jar: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- JVMInformation: struct (nullable = true)
 |    |-- JavaHome: string (nullable = true)
 |    |-- JavaVersion: string (nullable = true)
 |    |-- ScalaVersion: string (nullable = true)
 |-- JobID: integer (nullable = true)
 |-- JobResult: struct (nullable = true)
 |    |-- Result: string (nullable = true)
 |-- MaximumMemory: integer (nullable = true)
 |-- SparkProperties: struct (nullable = true)
 |    |-- spark.app.name: string (nullable = true)
 |    |-- spark.driver.host: string (nullable = true)
 |    |-- spark.driver.port: string (nullable = true)
 |    |-- spark.eventLog.enabled: string (nullable = true)
 |    |-- spark.files.overwrite: string (nullable = true)
 |    |-- spark.fileserver.uri: string (nullable = true)
 |    |-- spark.master: string (nullable = true)
 |    |-- spark.scheduler.mode: string (nullable = true)
 |    |-- spark.tachyonStore.folderName: string (nullable = true)
 |-- StageAttemptID: integer (nullable = true)
 |-- StageID: integer (nullable = true)
 |-- StageIDs: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- StageInfo: struct (nullable = true)
 |    |-- Accumulables: array (nullable = true)
 |    |    |-- element: string (containsNull = false)
 |    |-- CompletionTime: long (nullable = true)
 |    |-- Details: string (nullable = true)
 |    |-- NumberofTasks: integer (nullable = true)
 |    |-- RDDInfo: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- DiskSize: integer (nullable = true)
 |    |    |    |-- MemorySize: integer (nullable = true)
 |    |    |    |-- Name: string (nullable = true)
 |    |    |    |-- NumberofCachedPartitions: integer (nullable = true)
 |    |    |    |-- NumberofPartitions: integer (nullable = true)
 |    |    |    |-- RDDID: integer (nullable = true)
 |    |    |    |-- StorageLevel: struct (nullable = true)
 |    |    |    |    |-- Deserialized: boolean (nullable = true)
 |    |    |    |    |-- Replication: integer (nullable = true)
 |    |    |    |    |-- UseDisk: boolean (nullable = true)
 |    |    |    |    |-- UseMemory: boolean (nullable = true)
 |    |    |    |    |-- UseTachyon: boolean (nullable = true)
 |    |    |    |-- TachyonSize: integer (nullable = true)
 |    |-- StageAttemptID: integer (nullable = true)
 |    |-- StageID: integer (nullable = true)
 |    |-- StageName: string (nullable = true)
 |    |-- SubmissionTime: long (nullable = true)
 |-- SystemProperties: struct (nullable = true)
 |    |-- awt.toolkit: string (nullable = true)
 |    |-- file.encoding: string (nullable = true)
 |    |-- file.encoding.pkg: string (nullable = true)
 |    |-- file.separator: string (nullable = true)
 |    |-- ftp.nonProxyHosts: string (nullable = true)
 |    |-- gopherProxySet: string (nullable = true)
 |    |-- http.nonProxyHosts: string (nullable = true)
 |    |-- java.awt.graphicsenv: string (nullable = true)
 |    |-- java.awt.printerjob: string (nullable = true)
 |    |-- java.class.version: string (nullable = true)
 |    |-- java.endorsed.dirs: string (nullable = true)
 |    |-- java.ext.dirs: string (nullable = true)
 |    |-- java.home: string (nullable = true)
 |    |-- java.io.tmpdir: string (nullable = true)
 |    |-- java.library.path: string (nullable = true)
 |    |-- java.runtime.name: string (nullable = true)
 |    |-- java.runtime.version: string (nullable = true)
 |    |-- java.specification.name: string (nullable = true)
 |    |-- java.specification.vendor: string (nullable = true)
 |    |-- java.specification.version: string (nullable = true)
 |    |-- java.vendor: string (nullable = true)
 |    |-- java.vendor.url: string (nullable = true)
 |    |-- java.vendor.url.bug: string (nullable = true)
 |    |-- java.version: string (nullable = true)
 |    |-- java.vm.info: string (nullable = true)
 |    |-- java.vm.name: string (nullable = true)
 |    |-- java.vm.specification.name: string (nullable = true)
 |    |-- java.vm.specification.vendor: string (nullable = true)
 |    |-- java.vm.specification.version: string (nullable = true)
 |    |-- java.vm.vendor: string (nullable = true)
 |    |-- java.vm.version: string (nullable = true)
 |    |-- jline.esc.timeout: string (nullable = true)
 |    |-- jline.shutdownhook: string (nullable = true)
 |    |-- line.separator: string (nullable = true)
 |    |-- os.arch: string (nullable = true)
 |    |-- os.name: string (nullable = true)
 |    |-- os.version: string (nullable = true)
 |    |-- path.separator: string (nullable = true)
 |    |-- socksNonProxyHosts: string (nullable = true)
 |    |-- sun.arch.data.model: string (nullable = true)
 |    |-- sun.boot.class.path: string (nullable = true)
 |    |-- sun.boot.library.path: string (nullable = true)
 |    |-- sun.cpu.endian: string (nullable = true)
 |    |-- sun.cpu.isalist: string (nullable = true)
 |    |-- sun.io.unicode.encoding: string (nullable = true)
 |    |-- sun.java.command: string (nullable = true)
 |    |-- sun.java.launcher: string (nullable = true)
 |    |-- sun.jnu.encoding: string (nullable = true)
 |    |-- sun.management.compiler: string (nullable = true)
 |    |-- sun.nio.ch.bugLevel: string (nullable = true)
 |    |-- sun.os.patch.level: string (nullable = true)
 |    |-- user.country: string (nullable = true)
 |    |-- user.dir: string (nullable = true)
 |    |-- user.home: string (nullable = true)
 |    |-- user.language: string (nullable = true)
 |    |-- user.name: string (nullable = true)
 |    |-- user.timezone: string (nullable = true)
 |-- TaskEndReason: struct (nullable = true)
 |    |-- Reason: string (nullable = true)
 |-- TaskInfo: struct (nullable = true)
 |    |-- Accumulables: array (nullable = true)
 |    |    |-- element: string (containsNull = false)
 |    |-- Attempt: integer (nullable = true)
 |    |-- ExecutorID: string (nullable = true)
 |    |-- Failed: boolean (nullable = true)
 |    |-- FinishTime: long (nullable = true)
 |    |-- GettingResultTime: integer (nullable = true)
 |    |-- Host: string (nullable = true)
 |    |-- Index: integer (nullable = true)
 |    |-- LaunchTime: long (nullable = true)
 |    |-- Locality: string (nullable = true)
 |    |-- Speculative: boolean (nullable = true)
 |    |-- TaskID: integer (nullable = true)
 |-- TaskMetrics: struct (nullable = true)
 |    |-- DiskBytesSpilled: integer (nullable = true)
 |    |-- ExecutorDeserializeTime: integer (nullable = true)
 |    |-- ExecutorRunTime: integer (nullable = true)
 |    |-- HostName: string (nullable = true)
 |    |-- InputMetrics: struct (nullable = true)
 |    |    |-- BytesRead: integer (nullable = true)
 |    |    |-- DataReadMethod: string (nullable = true)
 |    |-- JVMGCTime: integer (nullable = true)
 |    |-- MemoryBytesSpilled: integer (nullable = true)
 |    |-- ResultSerializationTime: integer (nullable = true)
 |    |-- ResultSize: integer (nullable = true)
 |    |-- ShuffleReadMetrics: struct (nullable = true)
 |    |    |-- FetchWaitTime: integer (nullable = true)
 |    |    |-- LocalBlocksFetched: integer (nullable = true)
 |    |    |-- RemoteBlocksFetched: integer (nullable = true)
 |    |    |-- RemoteBytesRead: integer (nullable = true)
 |    |    |-- ShuffleFinishTime: integer (nullable = true)
 |    |-- ShuffleWriteMetrics: struct (nullable = true)
 |    |    |-- ShuffleBytesWritten: integer (nullable = true)
 |    |    |-- ShuffleWriteTime: integer (nullable = true)
 |-- TaskType: string (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- User: string (nullable = true)

そして、最後に、必要な情報だけを整形して出力した結果が出ます。

JobID	StageID	Time	StageName
0	1	423	map_at_SparkEventLogParseSample.scala:31
0	0	131	saveAsTextFile_at_SparkEventLogParseSample.scala:33

これでひとまず、どのJOB / Stage が自分の実装したどの処理なのかは分かりそうですね。

他にも、どのStageでどういうTaskが発行されたかとか、どのRDDが使われたかとかもLog自体には情報が出ているので、適宜プログラムを直して、出力結果をカスタマイズしていただければと思います。