HadoopTimes

ストリーミングアーキテクチャ Apache Kafka とMapR Streams による新しい設計手法
技術情報

Scalaを使用したApache Spark GraphX入門

編集者注: こちらにある、Apache Sparkを利用してデータパイプラインアプリケーションを作成する方法に関する無料の新規オンデマンドトレーニングコースについても是非ご覧ください。

この記事は、MapR Sandbox上でScalaを使用してApache Spark GraphXを利用し始めるのを支援します。GraphXはグラフ並列計算用のApache Sparkのコンポーネントで、グラフ理論という数学の一分野に基づき構築されています。Spark Core上で動作する分散グラフ処理フレームワークです。

グラフの概念の概要

グラフはオブジェクト間の関係を表すのに使われる数学的構造です。グラフは頂点と、頂点を結ぶ辺から成ります。頂点がオブジェクトで、辺がオブジェクト間の関係を表します。

有向グラフは、辺に向きがあるグラフです。有向グラフの一例は、Twitterのフォロワーです。ユーザーのボブは、ユーザーのキャロルがボブをフォローすることを示唆せずに、キャロルをフォローすることができます。

正則グラフは、各頂点に同じ数の辺があるグラフです。正則グラフの一例は、Facebookの友達です。ボブがキャロルの友達の場合、キャロルもボブの友達です。

GRAPHXプロパティグラフ

GraphXはSpark RDDにResilient Distributed Property Graph (耐障害性分散プロパティグラフ) を加えて拡張します。

プロパティグラフは有向多重グラフで、複数の辺を平行して持つことができます。各辺および頂点にはユーザーが定義したプロパティが関連付けられています。平行辺により、同じ頂点間に複数の関係を持つことができます。

この課題では、GraphXを使用してフライトデータを分析します。

シナリオ

はじめに簡単な例として、3つのフライトを分析します。各フライトについては、以下の情報を持っています:

出発空港 到着空港 距離
SFO ORD 1800マイル
ORD DFW> 800マイル
DFW SFO> 1400マイル

このシナリオでは、空港を頂点、ルートを辺で表します。グラフには3つの頂点があり、それぞれが空港を表します。空港間の距離はルートのプロパティで、以下のようになります:

空港を表す頂点のテーブル

ID プロパティ
1 SFO
2 ORD
3 DFW

ルートを表す辺のテーブル

SrcId DestId プロパティ
1 2 1800
2 3 800
3 1 1400

ソフトウェア

このチュートリアルは、Sparkを搭載したMapR Sandbox上で実行します。

  • 以下の事例を実行するためのコードやデータは下記からダウンロードできます:
  • 本記事の事例は、spark-shellコマンドでローンチ後、Sparkシェル上で実行できます。
  • MapR Sandbox上でのSpark入門チュートリアルで説明されているように、コードはスタンドアロンアプリケーションとしても実行できます。

 

SPARKのインタラクティブシェルのローンチ

MapR Sandbox上でのSpark入門で説明されている通り、ユーザーIDはuser01、パスワードはmaprを使用してMapR Sandboxにログインしてください。下記でSparkシェルを起動します:

$ spark-shell

頂点の定義

まずGraphXのパッケージをインポートします。

(コードボックス内では、コメントは緑アウトプットは青で表示しています。)

import org.apache.spark._
import org.apache.spark.rdd.RDD
// import classes required for using GraphX
import org.apache.spark.graphx._

空港を頂点として定義します。頂点にはIDがあり、プロパティや属性を関連付けることができます。各頂点は以下から成ります。

  • 頂点のID → Id (Long型)
  • 頂点のプロパティ → name (String型)

空港を表す頂点のテーブル

ID プロパティ(V)
1 SFO

上記のプロパティを持つRDDを定義し、頂点として使用します。

// create routes RDD with srcid, destid, distance
val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))
val eRDD= sc.parallelize(edges)
eRDD.take(2) // Array(Edge(1,2,1800), Edge(2,3,800))

辺の定義

辺は空港間のルートです。辺には出発地と目的地が必要で、プロパティを持つことができます。今回の事例では、辺は以下から成ります:

  • 辺の出発地ID → src (Long型)
  • 辺の目的地ID → dest (Long型)
  • 辺のプロパティの距離 → distance (Long型)

ルートを表す辺のテーブル

srcid (出発地ID) destid (目的地ID) プロパティ(E)
1 12 1800

上記プロパティを持つRDDを定義し、辺として使用します。辺のRDDは (出発地ID, 目的地ID, 距離) という形式を取ります。

 

プロパティグラフの作成

グラフを作成するには、頂点のRDD、辺のRDD、そしてデフォルトの頂点が必要です。

 

graphというプロパティグラフを作成します。

// define the graph
val graph = Graph(vRDD,eRDD, nowhere)
// graph vertices
graph.vertices.collect.foreach(println)
// (2,ORD)
// (1,SFO)
// (3,DFW)
// graph edges graph.edges.collect.foreach(println)
// Edge(1,2,1800) // Edge(2,3,800) // Edge(3,1,1400)

1.空港はいくつありますか?

// How many airports?
val numairports = graph.numVertices
// Long = 3

2.ルートはいくつありますか?

// How many routes?
val numroutes = graph.numEdges
// Long = 3

3.どのルートは距離が1,000マイルを超えますか?

// routes > 1000 miles distance?
graph.edges.filter { case Edge(src, dst, prop) => prop > 1000 }.collect.foreach(println)
// Edge(1,2,1800)
// Edge(3,1,1400)

4.EdgeTripletクラスは、それぞれ出発地と目的地のプロパティを含むsrcAttrとdstAttrメンバーを追加することで、Edgeクラスを拡張します。

// triplets
graph.triplets.take(3).foreach(println)
((1,SFO),(2,ORD),1800)
((2,ORD),(3,DFW),800)
((3,DFW),(1,SFO),1400)

5.ソートして、最長距離のルートを出力します。

// print out longest routes
graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
     "Distance " + triplet.attr.toString + " from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").collect.foreach(println)
Distance 1800 from SFO to ORD. Distance 1400 from DFW to SFO. Distance 800 from ORD to DFW.

 

GraphXを使用して実際のフライトデータを分析

シナリオ

データは右記より取得しました: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time。2015年1月のフライトデータを使用します。各フライトについては以下の情報があります:

フィールド 説明 値の例
dOfM (String) 日にち 1
dOfW (String) 曜日 4
carrier (String) キャリアコード AA
tailNum (String) 航空機の一意識別子-テールナンバー N787AA
flnum (Int) 便名 21
org_id (String) 出発空港ID 12478
origin (String) 出発空港コード JFK
dest_id (String) 目的地空港ID 12892
dest (String) 目的地空港コード LAX
crsdeptime (Double) 出発予定時刻 900
deptime (Double) 実出発時刻 855
depdelaymins (Double) 出発遅延分数 0
crsarrtime (Double) 到着予定時刻 1230
arrtime (Double) 実到着時刻 1237
arrdelaymins (Double) 到着遅延分数 7
crselapsedtime (Double) 経過時間 390
dist (Int) 距離 2475

このシナリオでは、空港を頂点、ルートを辺で表します。空港とルートを可視化し、出発または到着のある空港の数を見られるようにしたいと思います。

 

この事例を実行するためのコードやデータは下記よりダウンロードできます:

https://github.com/caroljmcdonald/sparkgraphxexample

MapR Sandbox上でのSpark入門で説明されている通り、ユーザーIDはuser01、パスワードはmaprを使用してMapR Sandboxにログインしてください。サンプルデータファイルrita2014jan.csvをサンドボックスのホームディレクトリ/user/user01へSCPを使用してコピーしてください。

Sparkシェルを下記で起動します:

$ spark-shell

頂点の定義

まずGraphXのパッケージをインポートします。

(コードボックス内では、コメントは緑アウトプットは青で表示しています。)

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
// import classes required for using GraphX
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators

以下ではCSVファイルに対応するフライトのスキーマを定義するのにScalaのケースクラスを使用します。

// define the Flight Schema
case class Flight(dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, org_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double,crselapsedtime:Double,dist:Int)

以下の関数は、データファイルの行をフライトクラスにパースします。

// function to parse input into Flight class
def parseFlight(str: String): Flight = {
 val line = str.split(",")
 Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
}

以下ではCSVファイルのデータをResilient Distributed Dataset (RDD、耐障害性分散データセット) にロードします。RDDには変換アクションがあり、最初の () アクションはRDDの最初の要素を返します。

// load the data into a RDD
val textRDD = sc.textFile("/user/user01/data/rita2014jan.csv")
// MapPartitionsRDD[1] at textFile
// parse the RDD of csv lines into an RDD of flight classes val flightsRDD = textRDD.map(parseFlight).cache()

空港を頂点として定義します。頂点にはプロパティや属性を関連付けることができます。各頂点には以下のプロパティがあります:

  • 空港名 (String型)

空港を表す頂点のテーブル

ID プロパティ(V)
10397 ATL

上記のプロパティを持つRDDを定義し、頂点として使用します。

// create airports RDD with ID and Name
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
airports.take(1) // Array((14057,PDX))
// Defining a default vertex called nowhere val nowhere = "nowhere"
// Map airport ID to the 3-letter code to use for printlns val airportMap = airports.map { case ((org_id), name) => (org_id -> name) }.collect.toList.toMap // Map(13024 -> LMT, 10785 -> BTV,…)

辺の定義

辺は空港間のルートを表します。辺には出発地と目的地が必要で、プロパティを持つことができます。今回の事例では、辺は以下から成ります:

  • 辺の出発地ID → src (Long型)
  • 辺の目的地ID → dest (Long型)
  • 辺のプロパティの距離 → distance (Long型)

ルートを表す辺のテーブル

srcid (出発地ID) Destid (目的地ID) プロパティ(E)
14869 14683 1087

上記のプロパティを持つRDDを定義し、辺として使用します。辺のRDDは (出発地ID, 目的地ID, 距離) という形式を取ります。

// create routes RDD with srcid, destid, distance
val routes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinctdistinct
routes.take(2) // Array(((14869,14683),1087), ((14683,14771),1482))
// create edges RDD with srcid, destid , distance val edges = routes.map { case ((org_id, dest_id), distance) =>Edge(org_id.toLong, dest_id.toLong, distance) }
edges.take(1) //Array(Edge(10299,10926,160))

プロパティグラフの作成

グラフを作成するためには、頂点のRDD、辺のRDD、そしてデフォルトの頂点が必要です。

graphというプロパティグラフを作成します。

// define the graph
val graph = Graph(airports, edges, nowhere)
// graph vertices graph.vertices.take(2) Array((10208,AGS), (10268,ALO))
// graph edges graph.edges.take(2) Array(Edge(10135,10397,692), Edge(10135,13930,654))

6.空港はいくつありますか?

// How many airports?
val numairports = graph.numVertices
// Long = 301

7.ルートはいくつありますか?

// How many airports?
val numroutes = graph.numEdges
// Long = 4090

8.どのルートは距離が1,000マイルを超えますか?

// routes > 1000 miles distance?
graph.edges.filter { case ( Edge(org_id, dest_id,distance))=> distance > 1000}.take(3)
// Array(Edge(10140,10397,1269), Edge(10140,10821,1670), Edge(10140,12264,1628))

9.EdgeTripletクラスは、それぞれ出発地と目的地のプロパティを含むsrcAttrとdstAttrメンバーを追加することで、Edgeクラスを拡張します。

// triplets
graph.triplets.take(3).foreach(println)
((10135,ABE),(10397,ATL),692)
((10135,ABE),(13930,ORD),654)
((10140,ABQ),(10397,ATL),1269)

10.ソートして、最長距離のルートを出力します。

// print out longest routes
graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
     "Distance " + triplet.attr.toString + " from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10).foreach(println)
Distance 4983 from JFK to HNL. Distance 4983 from HNL to JFK. Distance 4963 from EWR to HNL. Distance 4963 from HNL to EWR. Distance 4817 from HNL to IAD. Distance 4817 from IAD to HNL. Distance 4502 from ATL to HNL. Distance 4502 from HNL to ATL. Distance 4243 from HNL to ORD. Distance 4243 from ORD to HNL.

11.最大次数を持つ頂点を計算します。

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
 if (a._2 > b._2) a else b
}
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
//maxInDegree: (org.apache.spark.graphx.VertexId, Int) = (10397,152)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) //maxOutDegree: (org.apache.spark.graphx.VertexId, Int) = (10397,153)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) //maxDegrees: (org.apache.spark.graphx.VertexId, Int) = (10397,305)
// Get the name for the airport with id 10397 airportMap(10397) //res70: String = ATL

12.どのエアポートが最も到着便があるか?

// get top 3
val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)
maxIncoming.foreach(println) (ATL,152) (ORD,145) (DFW,143)
// which airport has the most outgoing flights? val maxout= graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false).take(3)
maxout.foreach(println) (10397,(153,ATL)) (13930,(146,ORD)) (11298,(143,DFW))

PageRank

もうひとつのGraphXのオペレーターとして、PageRankがあります。これはGoogleのPageRankアルゴリズムに基づいています。

 

PageRankは、どの頂点が他の頂点との間に最も多くの辺を持っているかを測定することで、グラフ内の各頂点の重要度を測定します。今回の例では、PageRankを使用して他の空港との接続が最も多い空港を測定することで、最も重要な空港を決定することができます。
収束の基準となる許容範囲を定義する必要があります。

13.PageRankによると、最も重要な空港はどこですか?

// use pageRank
val ranks = graph.pageRank(0.1).vertices
// join the ranks  with the map of airport id to name
val temp= ranks.join(airports)
temp.take(1)
// Array((15370,(0.5365013694244737,TUL)))
// sort by ranking val temp2 = temp.sortBy(_._2._1, false) temp2.take(2) //Array((10397,(5.431032677813346,ATL)), (13930,(5.4148119418905765,ORD)))
// get just the airport names val impAirports =temp2.map(_._2._2) impAirports.take(4) //res6: Array[String] = Array(ATL, ORD, DFW, DEN)

Pregel

重要なグラフアルゴリズムの多くは反復アルゴリズムです。これは頂点のプロパティは隣接する頂点のプロパティに依存し、その隣接する頂点のプロパティはそれらに隣接する頂点のプロパティに依存するためです。PregelはGoogleが開発した反復型グラフ処理モデルであり、グラフ内の頂点間を通る一連のメッセージを繰り返し処理します。GraphXはPregelのようなバルク同期のメッセージ通信APIを実装しています。

 

PregelをGraphXに実装したことで、頂点は隣接する頂点にしかメッセージを送信することができません。
Pregelのオペレーターは、一連のスーパーステップで実行されます。各スーパーステップでは:

  • 頂点は、直前のスーパーステップから入力メッセージの合計を受け取ります。
  • 頂点のプロパティについて新しい値を計算します。
  • 次のスーパーステップで、隣接する頂点にメッセージを送信します。

メッセージがなくなるとPregelのオペレーターは反復を終了し、最終的なグラフを返します。

以下のコードでは、Pregelを使用して下記の式で航空運賃を計算し、最安値の航空運賃を算出します。

50 + 距離 / 20

// starting vertex
val sourceId: VertexId = 13024
// a graph with edges containing airfare cost calculation
val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20 )
// initialize graph, all vertices except source have distance infinity
val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
// call pregel on graph
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
 // Vertex Program
 (id, dist, newDist) => math.min(dist, newDist),
 triplet => {
  // Send Message
  if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
   Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  } else {
   Iterator.empty
  }
 },
 // Merge Message
 (a,b) => math.min(a,b)
)
// routes , lowest flight cost println(sssp.edges.take(4).mkString("\n")) Edge(10135,10397,84.6) Edge(10135,13930,82.7) Edge(10140,10397,113.45) Edge(10140,10821,133.5)
// routes with airport codes , lowest flight cost ssp.edges.map{ case ( Edge(org_id, dest_id,price))=> ( (airportMap(org_id), airportMap(dest_id), price)) }.takeOrdered(10)(Ordering.by(_._3)) Array((WRG,PSG,51.55), (PSG,WRG,51.55), (CEC,ACV,52.8), (ACV,CEC,52.8), (ORD,MKE,53.35), (IMT,RHI,53.35), (MKE,ORD,53.35), (RHI,IMT,53.35), (STT,SJU,53.4), (SJU,STT,53.4))
// airports , lowest flight cost println(sssp.vertices.take(4).mkString("\n"))
(10208,277.79) (10268,260.7) (14828,261.65) (14698,125.25)
// airport codes , sorted lowest flight cost sssp.vertices.collect.map(x => (airportMap(x._1), x._2)).sortWith(_._2 < _._2) res21: Array[(String, Double)] = Array(PDX,62.05), (SFO,65.75), (EUG,117.35)

著者情報

CarolMcDonald

キャロル・マクドナルド

(MapR Technologies ソリューションアーキテクト)

「いまさら聞けない」データ分析の総ざらい

「いまさら聞けない」データ分析の総ざらい
昨今、「データ分析」の重要性が強く叫ばれています。ただ、それはアナリストやデータ・サイエンティストと呼ばれる“専門家だけが担うことができる難解なもの”という誤解を持たれてはいないでしょうか。

データ分析という手法は、IT のパワーを活用することで、どんな企業も導入・実践することができる“現実解”なのです。

データ分析とはいかなるものなのか――。

この資料では、これからデータ分析を始める方や始めたが、もう一度初歩知識を復習したい方々向けにデータ分析の基本を解説します。

無料ダウンロードはこちら

こちらの記事もおすすめです