SparkのDAGを確認するためにSpark UI Event LogをSpark SQLで調べてみた
Spark では、Spark UIというWEB画面が用意されており、実行中のSparkアプリケーションの稼働状況を確認することができます。
このSpark UIを見ると、どのJOB、Stageがどの程度時間かかっていて、あとどれくらい残っているかとか、どれだけRDDがメモリを消費しているかなどが分かります。
ところが、自分が実装した処理のどれが、どのJOBなのか、どのStageなのか、あるいは自分が作成したRDDオブジェクトがどれなのか、Spark UIを見てもよく分かりません。
RDDの区別は、setName を使えば、RDDに対して明示的に名前を設定できます。ただ、プログラム上でsetNameできるRDD以外にも内部的には色々RDDオブジェクトが生成されて、それがSpark UI上に表示されるのでよく分かりません。
もっとわからないのは、自分が実装したどの処理が、どのJOBか、どのStageなのかです。
これを調べるために、Spark UIのEventLogを調べてみることにします。
EventLogの出力はデフォルトでは false ですが、spark.eventLog.enabled をtrue にすることで出力することができます。オプションは、spark-submit時には、--confで指定します。そうでない場合は、new SparkConf()に直接指定することが可能です。
ログの出力先は、デフォルトでは、/tmp/spark-events/配下に出力されます。
ログはJSON形式で出力されます。
ところで、Spark SQLには、JSONのスキーマを事前設定無しに動的に解析して、そのJSONデータに対してSQLを実行できるというすごい機能があります。せっかくなので、これを使ってみたいと思います。
サンプルソースをこちらにアップしてありますので、ご参照ください。
ironpeace/spark-eventlog-parse-sample · GitHub
まずは適当なディレクトリでcloneしてください。
$ git clone https://github.com/ironpeace/spark-eventlog-parse-sample.git $ cd spark-eventlog-parse-sample/
WordCountのサンプルプログラムを用意しているので、まずはそれを実行してみてください。
$ sbt 'run WordCount'
※ 一度実行するとプロジェクト直下に count-result ディレクトリができて、二度目以降の実行時にはこれを消しておかないとエラーになるのでご注意ください。
/tmp/spark-events/ 配下を確認してみると、ディレクトリができています。
$ ll /tmp/spark-events/ drwxrwx--- 5 teppei_tosa wheel 170 2 24 19:44 sparkeventlogparsesample-1424774686030
このディレクトリ配下のEVENT_LOG_1ファイルをフルパス指定して、下記のように実行してみてください。
$ sbt 'run EventLogParser /tmp/spark-events/sparkeventlogparsesample-1424774686030/EVENT_LOG_1'
途中でこのJSONファイルのスキーマが出力されます。すごいですね、Spark SQL。
root |-- AppName: string (nullable = true) |-- BlockManagerID: struct (nullable = true) | |-- ExecutorID: string (nullable = true) | |-- Host: string (nullable = true) | |-- NettyPort: integer (nullable = true) | |-- Port: integer (nullable = true) |-- ClasspathEntries: struct (nullable = true) | |-- /Users/teppei_tosa/Apps/sbt/bin/sbt-launch.jar: string (nullable = true) |-- Event: string (nullable = true) |-- JVMInformation: struct (nullable = true) | |-- JavaHome: string (nullable = true) | |-- JavaVersion: string (nullable = true) | |-- ScalaVersion: string (nullable = true) |-- JobID: integer (nullable = true) |-- JobResult: struct (nullable = true) | |-- Result: string (nullable = true) |-- MaximumMemory: integer (nullable = true) |-- SparkProperties: struct (nullable = true) | |-- spark.app.name: string (nullable = true) | |-- spark.driver.host: string (nullable = true) | |-- spark.driver.port: string (nullable = true) | |-- spark.eventLog.enabled: string (nullable = true) | |-- spark.files.overwrite: string (nullable = true) | |-- spark.fileserver.uri: string (nullable = true) | |-- spark.master: string (nullable = true) | |-- spark.scheduler.mode: string (nullable = true) | |-- spark.tachyonStore.folderName: string (nullable = true) |-- StageAttemptID: integer (nullable = true) |-- StageID: integer (nullable = true) |-- StageIDs: array (nullable = true) | |-- element: integer (containsNull = false) |-- StageInfo: struct (nullable = true) | |-- Accumulables: array (nullable = true) | | |-- element: string (containsNull = false) | |-- CompletionTime: long (nullable = true) | |-- Details: string (nullable = true) | |-- NumberofTasks: integer (nullable = true) | |-- RDDInfo: array (nullable = true) | | |-- element: struct (containsNull = false) | | | |-- DiskSize: integer (nullable = true) | | | |-- MemorySize: integer (nullable = true) | | | |-- Name: string (nullable = true) | | | |-- NumberofCachedPartitions: integer (nullable = true) | | | |-- NumberofPartitions: integer (nullable = true) | | | |-- RDDID: integer (nullable = true) | | | |-- StorageLevel: struct (nullable = true) | | | | |-- Deserialized: boolean (nullable = true) | | | | |-- Replication: integer (nullable = true) | | | | |-- UseDisk: boolean (nullable = true) | | | | |-- UseMemory: boolean (nullable = true) | | | | |-- UseTachyon: boolean (nullable = true) | | | |-- TachyonSize: integer (nullable = true) | |-- StageAttemptID: integer (nullable = true) | |-- StageID: integer (nullable = true) | |-- StageName: string (nullable = true) | |-- SubmissionTime: long (nullable = true) |-- SystemProperties: struct (nullable = true) | |-- awt.toolkit: string (nullable = true) | |-- file.encoding: string (nullable = true) | |-- file.encoding.pkg: string (nullable = true) | |-- file.separator: string (nullable = true) | |-- ftp.nonProxyHosts: string (nullable = true) | |-- gopherProxySet: string (nullable = true) | |-- http.nonProxyHosts: string (nullable = true) | |-- java.awt.graphicsenv: string (nullable = true) | |-- java.awt.printerjob: string (nullable = true) | |-- java.class.version: string (nullable = true) | |-- java.endorsed.dirs: string (nullable = true) | |-- java.ext.dirs: string (nullable = true) | |-- java.home: string (nullable = true) | |-- java.io.tmpdir: string (nullable = true) | |-- java.library.path: string (nullable = true) | |-- java.runtime.name: string (nullable = true) | |-- java.runtime.version: string (nullable = true) | |-- java.specification.name: string (nullable = true) | |-- java.specification.vendor: string (nullable = true) | |-- java.specification.version: string (nullable = true) | |-- java.vendor: string (nullable = true) | |-- java.vendor.url: string (nullable = true) | |-- java.vendor.url.bug: string (nullable = true) | |-- java.version: string (nullable = true) | |-- java.vm.info: string (nullable = true) | |-- java.vm.name: string (nullable = true) | |-- java.vm.specification.name: string (nullable = true) | |-- java.vm.specification.vendor: string (nullable = true) | |-- java.vm.specification.version: string (nullable = true) | |-- java.vm.vendor: string (nullable = true) | |-- java.vm.version: string (nullable = true) | |-- jline.esc.timeout: string (nullable = true) | |-- jline.shutdownhook: string (nullable = true) | |-- line.separator: string (nullable = true) | |-- os.arch: string (nullable = true) | |-- os.name: string (nullable = true) | |-- os.version: string (nullable = true) | |-- path.separator: string (nullable = true) | |-- socksNonProxyHosts: string (nullable = true) | |-- sun.arch.data.model: string (nullable = true) | |-- sun.boot.class.path: string (nullable = true) | |-- sun.boot.library.path: string (nullable = true) | |-- sun.cpu.endian: string (nullable = true) | |-- sun.cpu.isalist: string (nullable = true) | |-- sun.io.unicode.encoding: string (nullable = true) | |-- sun.java.command: string (nullable = true) | |-- sun.java.launcher: string (nullable = true) | |-- sun.jnu.encoding: string (nullable = true) | |-- sun.management.compiler: string (nullable = true) | |-- sun.nio.ch.bugLevel: string (nullable = true) | |-- sun.os.patch.level: string (nullable = true) | |-- user.country: string (nullable = true) | |-- user.dir: string (nullable = true) | |-- user.home: string (nullable = true) | |-- user.language: string (nullable = true) | |-- user.name: string (nullable = true) | |-- user.timezone: string (nullable = true) |-- TaskEndReason: struct (nullable = true) | |-- Reason: string (nullable = true) |-- TaskInfo: struct (nullable = true) | |-- Accumulables: array (nullable = true) | | |-- element: string (containsNull = false) | |-- Attempt: integer (nullable = true) | |-- ExecutorID: string (nullable = true) | |-- Failed: boolean (nullable = true) | |-- FinishTime: long (nullable = true) | |-- GettingResultTime: integer (nullable = true) | |-- Host: string (nullable = true) | |-- Index: integer (nullable = true) | |-- LaunchTime: long (nullable = true) | |-- Locality: string (nullable = true) | |-- Speculative: boolean (nullable = true) | |-- TaskID: integer (nullable = true) |-- TaskMetrics: struct (nullable = true) | |-- DiskBytesSpilled: integer (nullable = true) | |-- ExecutorDeserializeTime: integer (nullable = true) | |-- ExecutorRunTime: integer (nullable = true) | |-- HostName: string (nullable = true) | |-- InputMetrics: struct (nullable = true) | | |-- BytesRead: integer (nullable = true) | | |-- DataReadMethod: string (nullable = true) | |-- JVMGCTime: integer (nullable = true) | |-- MemoryBytesSpilled: integer (nullable = true) | |-- ResultSerializationTime: integer (nullable = true) | |-- ResultSize: integer (nullable = true) | |-- ShuffleReadMetrics: struct (nullable = true) | | |-- FetchWaitTime: integer (nullable = true) | | |-- LocalBlocksFetched: integer (nullable = true) | | |-- RemoteBlocksFetched: integer (nullable = true) | | |-- RemoteBytesRead: integer (nullable = true) | | |-- ShuffleFinishTime: integer (nullable = true) | |-- ShuffleWriteMetrics: struct (nullable = true) | | |-- ShuffleBytesWritten: integer (nullable = true) | | |-- ShuffleWriteTime: integer (nullable = true) |-- TaskType: string (nullable = true) |-- Timestamp: long (nullable = true) |-- User: string (nullable = true)
そして、最後に、必要な情報だけを整形して出力した結果が出ます。
JobID StageID Time StageName 0 1 423 map_at_SparkEventLogParseSample.scala:31 0 0 131 saveAsTextFile_at_SparkEventLogParseSample.scala:33
これでひとまず、どのJOB / Stage が自分の実装したどの処理なのかは分かりそうですね。
他にも、どのStageでどういうTaskが発行されたかとか、どのRDDが使われたかとかもLog自体には情報が出ているので、適宜プログラムを直して、出力結果をカスタマイズしていただければと思います。