GraphX Advent Calendar - Day 05 - 頂点と辺と、そしてトリプレット
GraphX Advent Calendar 2014 - Adventar 5日目です。
Day04で生成したグラフデータを元に、色々な編集処理を見ていきましょう。
頂点の値を編集
頂点の値を編集するには、 mapVertices という関数を使います。
val graph2:Graph[Long, (Long, java.util.Date)] = graph.mapVertices((vid:VertexId, attr:(String, Long)) => attr._1.length * attr._2)
(処理の内容としては、特に意味がありません。)
graphの頂点は、もともと (String, Long) というタプル型を値として持っています。上の例では、これを編集結果としてLong型の値に置き換えています。置き換える際に、String の length を Long にかけた結果を格納しています。
辺の値を編集
辺の値を編集するには、mapEdges という関数を使います。前述で生成した graph の Edge には、 Long 値と Date 値が入ったタプル値が入っているので、その Long 値だけにするというサンプルをやってみます。
val graph3:Graph[(String, Long), Long] = graph.mapEdges( edge => edge.attr._1 )
このコードにおける、edgeは、Edge[( (Long, java.util.Date) )] という型になっています。これが、graph3 の辺の型は、Edge[Long] という型になります。
頂点と辺の情報をセットで持つ Triplet
GraphX を理解する上で、Triplet は重要です。Triplet は直訳すると「三つ子」ですが、GraphX においては、ひとつの辺とそれにつながる両頂点の情報を持つデータを意味します。図解すると以下のような感じです。
以下のコードで、グラフの Triplet を標準出力することができます。
graph.triplets.collect.foreach(println(_))
頂点の情報を参照して辺の情報を編集する
前述のmapEdgesは、辺の情報だけを見て、辺の情報を参照していました。Triplet を使えば、辺の情報から頂点の情報も参照することができます。 mapTriplets は Triplets を使って、辺の情報を編集する関数です。
val graph4:Graph[(String, Long), Long] = graph.mapTriplets(edge => edge.srcAttr._2 + edge.attr._1 + edge.dstAttr._2)
辺の情報を参照して、頂点の情報を編集する
mapEdges は辺の情報の編集でしたが、 mapReduceTriplets を使うと、 Triplet を使って頂点の情報を編集できます。これまで紹介してきた関数と違って、 map だけでなく reduce 処理の実装も必要になります。これは、ひとつの頂点に接続している辺が複数存在する可能性があるので、 Reduce 処理によって、各頂点それぞれにまとめあげる処理が必要になるためです。
val newVertices:VertexRDD[Long] = graph.mapReduceTriplets( mapFunc = (edge:EdgeTriplet[(String, Long), (Long, java.util.Date)]) => { val toSrc = Iterator((edge.srcId, edge.srcAttr._2 - edge.attr._1)) val toDst = Iterator((edge.dstId, edge.dstAttr._2 + edge.attr._1)) toSrc ++ toDst }, reduceFunc = (a1:Long, a2:Long) => ( a1 + a2 ) )
この例では、
Srcの頂点(接続元の頂点)の値から辺の値を引いて、
Dstの頂点(接続先の頂点)の値に辺の値を足しています。
このソースをコード通り map と reduce の処理として見ると分かりづらいと思います。
mapFunc を各辺にどんなメッセージを流すのかを決める処理、 reduceFunc を各頂点に複数の辺からメッセージが流れたきた時の処理、と見るとわかり易いのではないでしょうか。
つまり、
Iterator((edge.srcId, edge.srcAttr._2 - edge.attr._1))
は、edge.srcId宛に edge.srcAttr._2 - edge.attr._1 という値を流している、
Iterator((edge.dstId, edge.dstAttr._2 + edge.attr._1))
は edge.dstId 宛に edge.dstAttr._2 + edge.attr._1 という値を流している。
(a1, a2) => ( a1 + a2 )
では、ひとつの頂点に複数の辺から値が流れ込んできたら、単純に足し合わせる。
という風に考えると、少しイメージ湧くのではないでしょうか。
この考え方は、今後説明予定のPregelの処理をイメージする上でも重要ですので、是非理解してみてください。
次回予告
明日は、 頂点と辺の情報のためのRDDについて、説明します。
※ 今日説明したソースは、
graphx-advent-samples/Day05.scala at master · ironpeace/graphx-advent-samples · GitHub を御参照ください。