HadoopTimes

CONVERGEN TOKYO 2017
技術情報

Facebook コネクション分析のための Spark GraphFramesの使用

様々なデータセットを眺めていると、最もマッチするグラフの表現がイメージできることがあるでしょう。それらのデータセットがソーシャルメディアやコンピュータネットワーク、マシンインタラクションなど、どれであっても、グラフ表現は、データセットの様々な実態を表現する効果的な方法の一つです。データ構造で現実の事象をモデル化するためにグラフを利用する手法は、コンピュータ処理と同じぐらいの歴史があります。

Sparkの新しいバージョン(1.4以降)では、GraphFramesと呼ばれる新機能が導入されました。これを使うと、多様なユースケースでグラフの処理、グラフの分析を表現することができます。ローカラムスタイルの処理で(Spark SQLの一部である) DataFrames を使用したことがあれば、馴染みやすいことでしょう。

素晴らしいことに、 データ・リプレゼンテーション間の移動やコピーを行わずに、同一データセット上でクエリの実行やグラフアルゴリズムを実行することが可能です。

パターン見つけだしてPageRankを実行するというような複数の操作が必要な場合でも、通常のSparkのラングリング、 トランスフォーメーション、パイプラインの処理完了後、単一の GraphFrame から結果を得ることができれば効率的かつコードの節約にもなります。これは、「motifs」と呼ばれるエッジと頂点のロケーションパターンに対応した便利で新しいクエリ言語です。

一番良い事(かわかりませんが)は、Python APIがあることです。このブログ記事では、MapR と Sparkがプリロード済みで無償提供されている仮想マシンにて、MapR Sandboxを使用しGraphFramesの簡単な例をご説明します。

事前準備

Spark 1.5 環境を動作させるには、事前に以下の準備を行ってください。その後Facebookのコネクショングラフを始めるために、例題となるコードをいくつか実行します。

  • mapr.com/sandboxから MapR Sandboxの最新版をダウンロードしてください。クラスタ上でMapR Converged Community Editionを無料でインストールするか、Sparkを既に実行している既存のMapR クラスタで始めることも可能です。
  • 「mapr」ユーザでログインした後、GraphFrames Python パッケージをSandboxにインストールしてください。Python モジュールへのリンクは、spark-submitを実行するディレクトリと同一にしておくと便利です。
    wget https://github.com/graphframes/graphframes/archive/master.tar.gz
    tar xvfz master.tar.gz
    ln -s graphframes-master/python/graphframes ./graphframes
    
  • Stanford Facebook データセットを入手し、MapR-FSへ移動します。
    wget https://snap.stanford.edu/data/facebook.tar.gz
    wget https://snap.stanford.edu/data/facebook_combined.txt.gz
    wget https://snap.stanford.edu/data/readme-Ego.txt
    tar xvfz facebook.tar.gz -C /mapr/demo.mapr.com/user/mapr --strip 1
    gunzip -c facebook_combined.txt.gz > /mapr/demo.mapr.com/user/mapr
    mv facebook_combined.txt /mapr/demo.mapr.com/user/mapr
    
  • このブログ記事のサンプルコードを、githubから入手してください。
    wget https://raw.githubusercontent.com/mapr-demos/spark-graphframes/master/gframes.py
    

ここまでで、NFSでローカルにマウントされたMapR-FS ディレクトリ 「/mapr/demo.mapr.com/user/mapr/」配下に「0.feat」のようなノードIDで始まるファイルが多数あるはずです。

Facebook データを見る

ソーシャルメディアは、グラフ解析の典型的な応用事例の一つになっています。我々のサンプルデータは、調査協力者のFacebook 「友達リスト」を匿名化したものです。仕様によれば、全処理後に計4039個のノード、88234本のノード間エッジがあるはずです。このデータセットの構造はやや扱いづらいのですが、前処理段階で処理が可能です。

ダウンロードしたfacebook.tar.gzファイルには、 いくつかのファイルが含まれており、それぞれがユーザの友達リストから構成され、ユーザは、ノードIDで識別されます。各ユーザには、以下のような関連ファイルがいくつかあります(「n」はあるユーザのユーザIDです)。

  • edges – 各行に空白で区切られたノードIDが2個ある単純なファイルで、2ノードの間に単方向エッジ (友達関係) が存在していることを示しています。
  • featnames – ノード n の友達(edgesファイルに現れたノード)に対して、(誕生日、通学していた学校などの)特徴値それぞれが匿名化されたIDに変換され、このファイルにリストされます。
  • feat – edgesファイル内の全ノードに対する特徴。特徴がユーザプロファイルに現れた場合、この値は1となり、それ以外では0になる。各ノードについての値は、関連付けられた上記「featnames」ファイルに対応している。
  • egofeat – このユーザの特徴(同じ1/0 の値)
  • circles – このユーザの「サークル」を表現する (この記事では使用しない)

「匿名化」された特徴の表現 — 2個のノードが(誕生日などの)何か同じものを共有している場合、比較や問い合わせではわかるが、実際の値は知らない、という意味である。

facebook_combined.txt ファイルには、「n」edgesファイルと同じフォーマットにより1ファイルにまとめられた、ノードからのエッジ全てを含んでいます。

ここで処理する必要がある小さな問題がいくつかあります。その一つが複製です。少し調べるとわかりますが、同じノードIDがedgesファイル複数に現れ、それぞれのファイルが表現する別々の特徴の組を持っています。

[mapr@maprdemo ~]$ cat /mapr/demo.mapr.com/user/mapr/*.feat | cut -d ' ' -f 1 | sort | uniq -d | wc -l
116

私たちは、 facebook_combined.txt ファイルを使ってエッジを構築しますので、今回の場合、複製は単純に廃棄することにします。ファイル内のノード全てをざっと数えてみたところ、実際にユニークなノードは4039個がありました。

[mapr@maprdemo ~]$ cat facebook_combined.txt | tr ' ' '\n' | sort | uniq | wc -l
4039

これがどう動くかを理解するため少しサンプルコードを見てみましょう。

Facebook グラフをSpark に読み込む。

特に Spark SQLが関係する場合、ファイルからSpark にデータを読み込む方法はいくつか存在します。練習目的のために、ここでは2つの違った方法を使ってみましょう。facebook_combined.txt ファイルから、興味ある特徴の一部だけを使って、DataFrameに読み込んでみましょう。

from graphframes import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
import re

sc = SparkContext()
sqlContext = SQLContext(sc)
peopleids = [0, 107, 1684, 1912, 3437, 348, 3980, 414, 686, 698]
featids = ["id", "birthday", "hometown_id", "work_employer_id",
    "education_school_id", "education_year_id" ]
formatter = 'com.databricks.spark.csv'
vtx = Row(*featids)

# load the entire edge and node set into a Spark DataFrame
edf = sqlContext.read.format(formatter).options(delimiter=' ', \
    header='false', inferSchema=True) \
    .load('facebook_combined.txt').withColumnRenamed( \
    'C0', 'src').withColumnRenamed('C1', 'dst')

ヘルパー関数はいくつか利用可能です。これらは、後ほど例の中で使ってみましょう。

  • featurematch は、find()を使ってグラフサーチAPIでマッチする文字列を整形するため使用します。
  • fn_process は、特徴名のマッピングを読み込むために正規表現を使用します。
  • feat_process は、特徴の読み込みを処理し、get_feats は、RDD構築時に実際のマッピングを行います。

「featnames」ファイルには、次のような行が含まれます。

0 birthday;anonymized feature 0

このノードに対応する「feats」と「egofeats」ファイルには、特徴インデックス0がこの特定の誕生日の値に対応していることを示しています。特徴は1または0の値が可能です。特徴が1の場合、そのノードの誕生日の値は「匿名化された特徴0」と等しく、それ以外の場合、その特徴はデータには存在しません。

def featurematch(a, b):
    return "%s != 'None' and %s != 'None' and %s = %s" % \
            (a, b, a, b)

def fn_process(line):
    psd = re.search(
        r'\d+ (.*);anonymized feature (\d+)', line, re.IGNORECASE)
    if not psd:
        print "parse error, line: %s" % line
        sys.exit(1)
    n = psd.group(1).replace(';', '_')
    #n = re.sub('_$', '', n)
    f = psd.group(2)
    return (n, f)

def feat_process(line, selfid):
    allents = line.split(' ')
    if (selfid != -1):
        return (selfid, allents)
    else:
        return (allents[0], allents[1:])

def get_feats(vtxid, mapping, feats):
    thisfeats = {}
    vtxfeats = []
    for idx, f in enumerate(feats):
        name, value = mapping[idx]
        if (f == '1'):
            thisfeats[name] = value
        else:
            thisfeats[name] = 'None'
    for ff in featids[1:]:
        vtxfeats.append(thisfeats[ff])
    return vtx(vtxid, *vtxfeats)

GraphFrameを構築する

上記の構造を要約すると、ノードとエッジからなるGraphFrame を構築するには、いくつかのことをまとめなければなりません。まず、非常に小さいので、マップファイル 「n.featnames」 をメモリに読み込みます。次に、 (「n.feat」から) 接続され、自身(「n.egofeat」)を含めた、「n.feat」から接続されたノード全てのマップを行います。作業の大半はここで完了します。

# load all of the feature maps, feature files, and self features into an RDD
alledges = sc.emptyRDD()
for personid in peopleids:
        featmap_fname = "/mapr/demo.mapr.com/user/mapr/%d.featnames" % personid
        feats_fname = "%d.feat" % personid
        this_feats_fname = "%d.egofeat" % personid

        # load the feature map
        fmap = []
        with open(featmap_fname) as flines:
            for line in flines:
                fmap.append(fn_process(line))

        # load the features for all the edges, and our own
        f_rdd = sc.textFile(feats_fname).map(lambda x: feat_process(x, -1)). \
             union(sc.textFile(this_feats_fname).map(lambda x: feat_process(x, personid)))

        # add the new data to the group
        alledges = f_rdd.map(lambda x: get_feats(x[0], fmap, x[1])).union(alledges)

# remove duplicates
print "rdd raw count: %d" % alledges.count()

# create a GraphFrame from the result
vdf = sqlContext.createDataFrame(alledges, featids).dropDuplicates(['id'])
print "vertex count: %d" % vdf.count()
print "edge count: %d" % edf.count()
g = GraphFrame(vdf, edf)

ここでedgeファイルに現れた重複ノードを処理します。これらの処理は(理知的にマージを試すなど)もっと色々やりようがあるでしょうが、この例では、とりあえず、重複ノードのエントリは廃棄し、最後にデータファイル全てからマージされたRDDをGraphFrameに変換します。このステップ後、結果として得られたノード数が予想された数(4039)であることがわかります。

Facebookグラフをサーチする

それではGraphFrameを動作させてみましょう。最初にできることは、BがAの友達で、二人の誕生日が同じである例を全て探すというような、有益な情報を与える簡単な問い合わせの実行です。関心があるノード属性と関係性の構造(または「モチーフ」)両方とマッチさせることができる強力なグラフサーチ言語で実行します(「g.find」に渡して)。

# find all connected vertices with the same birthday identifier
print "same birthdays"
res = g.find("(a)-[]->(b)") \
         .filter(featurematch("a.birthday", "b.birthday"))
print "count: %d" % res.count()
res.select("a.id", "a.birthday", "b.id", "b.birthday").show(5)

出力は次のようになります。

rdd raw count: 4177                                                             
vertex count: 4039                                                              
edge count: 88234                                                               
same birthdays
count: 100                                                                      
+---+--------+---+--------+                                                     
| id|birthday| id|birthday|
+---+--------+---+--------+
|  3|       7| 85|       7|
| 75|       7| 85|       7|
| 13|       7|109|       7|
| 56|       7|109|       7|
|200|       7|274|       7|
+---+--------+---+--------+
only showing top 5 rows

ここに表示されたノード5個は、全て「匿名化された特徴 id 7」が誕生日となっています。全て同じ誕生日であることはわかりますが、実際の誕生日はわかりません。匿名化されたデータセットの多くは同様な動きとなります。

それでは、もう少し複雑な問い合わせを実行してみましょう。下記の問い合わせを使用して、レコメンドの最適化、友達の提案、新しいオファー、キャンペーンのアイデア調査をすることができます。

# find "friends of friends" who are not connected to us, but graduated the same
# year from the same school
print "same class"
res = g.find("(a)-[]->(b); (b)-[]->(c); !(a)-[]->(c)") \
         .filter("%s and %s" % \
                 (featurematch("a.education_school_id", "c.education_school_id"), \
                 featurematch("a.education_year_id", "c.education_year_id")))
res = res.filter("a.id != c.id").select("a.id", "a.education_school_id", "a.education_year_id",
        "c.id", "c.education_school_id", "c.education_year_id") 
print "count: %d" % res.count()
res.show(5)

以下が表示されます。

count: 1285                                                                     
+----+-------------------+-----------------+----+-------------------+-----------------+
|  id|education_school_id|education_year_id|  id|education_school_id|education_year_id|
+----+-------------------+-----------------+----+-------------------+-----------------+
|1008|                538|               72|1480|                538|               72|
|1271|                538|               72|1551|                538|               72|
| 953|                538|               72|1864|                538|               72|
|1003|                538|               72|1571|                538|               72|
|1480|                538|               72|1864|                538|               72|
+----+-------------------+-----------------+----+-------------------+-----------------+
only showing top 5 rows

条件全てに合うノードを表示してみましたところ1285件あリました。

最後に同じGraphFrameに対してグラフアルゴリズムを実行することができます。本パッケージは、最短経路、そしてPageRankなどの一般的なアルゴリズムを含んでいます。

# finally do a page rank on the graph
print "page rank"
g.pageRank(resetProbability=0.15, tol=0.01).vertices.sort(
    	    'pagerank', ascending=False).show(5)

print "done"

この出力は、次の通りです。

+----+--------+-----------+----------------+-------------------+-----------------+------------------+
|  id|birthday|hometown_id|work_employer_id|education_school_id|education_year_id|          pagerank|
+----+--------+-----------+----------------+-------------------+-----------------+------------------+
|1911|    None|       None|            None|                538|             None|17.706611805364968|
|3434|    None|       None|            None|                538|             None| 17.68697075025074|
|2655|    None|       None|            None|               None|             None| 17.11712984943487|
|1902|    None|       None|            None|                538|               72|16.868179332524562|
|1888|    None|       None|            None|                538|             None| 12.93200439748058|
+----+--------+-----------+----------------+-------------------+-----------------+------------------+
only showing top 5 rows

興味深いことに、PageRankでのトップスコア5件中の4件は、同じ学校出身でした。大きな学校であったか、選ばれたデータ、または調査協力者に共通していたか、のどちらかになります。おそらく、双方が幾分か関係していたのでしょう。

結論

GraphFramesは、まだ開発初期段階ですが (本稿執筆時のバージョン 0.1)、特に問い合わせとグラフアルゴリズムの両方が要求される場合、Sparkの大規模データセットの問い合わせや処理を行うためのスケーラブルで便利な方法になる見込みがあります。

上記のコード全ては、github の mapr-demosに置かれており、簡単にsandboxへロードして、グラフ構造を用いた実験をすることができます。ぜひお試しください。

著者情報

NickAmato

ニック・アマート

(MapR Technologies テクニカルマーケティングディレクター)

「いまさら聞けない」データ分析の総ざらい

「いまさら聞けない」データ分析の総ざらい
昨今、「データ分析」の重要性が強く叫ばれています。ただ、それはアナリストやデータ・サイエンティストと呼ばれる“専門家だけが担うことができる難解なもの”という誤解を持たれてはいないでしょうか。

データ分析という手法は、IT のパワーを活用することで、どんな企業も導入・実践することができる“現実解”なのです。

データ分析とはいかなるものなのか――。

この資料では、これからデータ分析を始める方や始めたが、もう一度初歩知識を復習したい方々向けにデータ分析の基本を解説します。

無料ダウンロードはこちら

こちらの記事もおすすめです