HadoopTimes

ストリーミングアーキテクチャ Apache Kafka とMapR Streams による新しい設計手法
技術情報

PigからSparkへ: Apache PigデベロッパのためのSparkへの簡単な移行

過去にApache Pigを利用するデータアナリストだった私は、複雑なジョブプログラムのために、高度で柔軟な言語であるApache Sparkを使う必要に迫られました。一見Sparkは難しいように見えますが、この記事を読んでいただけば、Spark(特にPySpark)への移行は、それほど難しくないことが分かっていただけると思います。

とはいえ、全ての場合においてApache PigからSparkへ移行するべきと言っている訳ではありません。Pigはすばらしい言語です。プロジェクションやアグリゲーションを通じてデータを変換する際には、シンプルかつ効果的で、通常のMap/Reduceジョブでの性能は、他の追従を許さないものがあります。

Apache Pigはすばらしい特徴を持っているが・・・

PigはハイレベルのMap/Reduceコマンドパイプラインだと考えています。元SQLプログラマーの私にとっては操作が直感的であり、私の組織ではHadoopジョブをほとんどPigによって開発していました。

Pigは多くの特性を持っています。安定している上に、スケーラビリティも高く、HiveのメタストアHCatalog互換性を持っています。それぞれのステップを最小限に書き出すことで、複雑なSQLコードによくある概念的なバグを最小化することができます。

しかし時には、使用するのに不都合なプログラミングの典型例になりうる制限がいくつかあります。主な弱点は次の3つです。

まず1つ目はPigがパイプラインということです。そして、2つ目の弱点は、コードの中で時々使わなくてはいけないようなループやコードのインダクレクション処理(IF…THEN)を行うことができない。これはJai Ranganathan と Matei Zahariaによる記事の中で述べられています:

Apache Pigのようなスクリプトフレームワークも多くのハイレベルオペレーターを提供しますが、Sparkはこれらのオペレーターに、完全なプログラミングにおけるコンテキストへアクセスすることができます。これによって、典型的なプログラミング環境のように制御ステートメント、ファンクション、クラスを使うことができます。

最後に、Pigの3番目の弱点はインプットデータのフォーマットに関連したものです。PigはCSVとHCatalogは得意ですが、(JsonLoaderを通じての)JSONなど他のデータフォーマットを読み込むプロセスは得意ではありません。それに対し、Sparkは初期設定のままでそれらを統合することができます。

Apache Sparkを試してみよう

ここでSparkについて触れていきましょう!PigとSparkはプログラムモデルが共通なので、双方間の行き来は簡単です。基本的には、エイリアス(Pig)またはRDD変数(Spark)によって定義される、変更不能の変換を通じて作業すれば良いのです。変換は通常のプロジェクション(maps)、フィルター、またはGroupByのようなアグリゲーション、ソートなどです。

このようにプログラミングのアプローチが共通なので、Pigを使用するデベロッパにとって、Sparkは馴染みやすいのです。

Pythonの基本的な技術を持っているデータアナリストにとってPySparkはごく自然な選択になりますが、JavaやScalaなど他のSparkでもコードは似たものになります。

まとまった例

説明のために、ログファイルを1つロードして、特定の日付にフィルターをかけ、アイテムによってグループ分けされたログエントリーの数を計算し、他のファイルからのアイテムのデスクリプションを加えるというPigのスクリプトを試してみましょう。

/* load a log file of user sessions. Filter for a specific date and count entries per item
*/
 
f0 = LOAD 'logfile' using PigStorage('\t') AS (log_date:chararray, item_id:chararray, some_stuff:chararray);
 
f1 = FILTER f0 BY log_date == '20160515';
 
f2 = FOREACH f1 GENERATE item_id;
 
f3 = GROUP f2 BY item_id;
 
f4 = FOREACH f3 GENERATE group AS item_id, COUNT(f2) AS nb_entries;
 
/* add item name
*/
 
item1 = LOAD 'item' using PigStorage('\t') AS (item_id:chararray, item_name:chararray);
 
join1 = JOIN f4 BY item_id LEFT, item1 BY item_id;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
 
STORE result INTO 'result_file' USING PigStorage('\t');

コードは極めてシンプルで、それぞれのステップで変換が一回行われます。

ここでSparkに関しては、ローレベルRDDを使ったRaw Sparkを使い、Pigコードとの類似点を見ていきましょう。コードではエイリアスごとに要素は細分されますが、プロダクションコードは当然もっとコンパクトになります。

Raw Spark(RDD使用)

conf = SparkConf()
sc = SparkContext(conf=conf)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
f1 = f0.filter(lambda x: x[0] == '20160515')
 
f3 = f1.groupBy(lambda (log_date, item_id, some_stuff): item_id)
f4 = f3.map (lambda (item_id, iterable): (item_id, len(iterable)))
 
# add item name
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
# no need to set the key item_id on both parts before performing the join,
# It's already on first place on each part.
 
join1 = f4.leftOuterJoin(item1)
 
result = join1.map(lambda (item_id, (nb_entries, item_name)): (item_id, item_name, str(nb_entries)))
 
# creating a line of tab separated fields, and save it in the result file
result_to_store = result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

ここで、PigとSparkに似通ったコードアウトラインであることがわかります。このためPigのデベロッパがSparkでコーディングを始めることは容易ですが、残念な点は、このような比較的簡単な操作ではPigの方がSparkより生産的で、実行にかかる時間もわずかなががらSparkより短いということです。

ローレベルRDDに慣れると、DataFramesやSparkSQLを使用してコードを改善することもできます。先ほどのコードはもっと読みやすい形に書き直すことができます

DateFramesやSparkSQLを使ったSpark

conf = SparkConf()
sc = SparkContext(conf=conf)
 
sqlContext = SQLContext(sc)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
fpFields = [ \
   StructField('log_date', StringType(), True), \
   StructField('item_id', StringType(), True), \
   StructField('some_stuff', StringType(), True) \
]
 
fpSchema = StructType(fpFields)
df_f0 = sqlContext.createDataFrame(f0, fpSchema)
df_f0.registerTempTable('log')
 
f1_df = sqlContext.sql(
   "SELECT log.item_id, count(*) AS nb_entries \
      FROM log \
     WHERE log_date = '20160515'\
  GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')
# items dataframe
 
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
itemFields = [ \
   StructField('item_id', StringType(), True), \
   StructField('item_name', StringType(), True) \
]
 
itemSchema = StructType(itemFields)
df_item1 = sqlContext.createDataFrame(item1, itemSchema)
 
df_item1.registerTempTable('item')
 
result = sqlContext.sql(
   'SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
  LEFT OUTER JOIN item ON log_agg.item_id = item.item_id'
)
 
result_to_store = result.rdd \
     .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile('result_file')

SparkSQL内でもっとコンパクトかつエレガントにやる方法があり、上記は概要に過ぎません。

これで、名前付きフィールド、タイプセーフティ、データアナリストにとって読みやすいコンパクトなSQLコードができました。生産性も向上し、Pigに対する良い代替策になります。

残念な点は、それぞれのSQLがブラックボックス化され、全体でしかテストできなくなったことです。これは結果が予想とことなったり、処理時間が遅かったりした場合に対処がしづらくなります。そこで、読みやすいながらも、独立したコードユニットで実行できるステップをデザインすることが、それぞれのデベロッパの腕の見せ所となります。

HiveメタストアHCatalogからデータをロードする

データがHive HCatalogで保存されている場合、全てのDataFrameメタデータはメタストアから継承され、Sparkコードはさらにシンプルになります。

conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
 
f1_df = sqlContext.sql(
   "SELECT item_id, count(*) AS nb_entries \
   FROM my_db.log \
   WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
f1_df.registerTempTable('log_agg')
 
result = sqlContext.sql(
   "SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
LEFT OUTER JOIN my_db.item item ON log_agg.item_id = item.item_id"
)
 
result_to_store = result.rdd \
   .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile(outputFileName)

これでさらにコンパクトで読みやすいコードになります:)

さらに、Sparkがもつユーザー定義機能のアドバンテージについて見ていきましょう。

ユーザー定義機能

以前お伝えしたようにSparkではUDFの必要がなく、Pythonメソッドとして関数を書けば良いだけです。

Pigでは:

/* the function below has been written and deployed in a jar file */
DEFINE myFancyUdf com.mydomain.myfunction1;
 
...
 
log1 = FOREACH log0 GENERATE field1, myFancyUdf (field1l);

Sparkでは:

def myFancyUdf(f1):
   someStuff
   return result
 
log1 = log0.map (lambda field1: (field1, myFancyUdf(field1))

さらに高度なトピック

このセクションでは、SparkにおけるPigの強力な機能を二つの例を通してさらに見てゆきましょう:

Mapサイドジョイン

Pigの便利な機能の1つにMap-Side Joinというものがあります。参加するテーブルの一つが十分に小さい場合マップジョブに参加する各ワーカーに送信する(高価なreduce作業が必要ない)機能で、これはJOINのなかのreplicated hintを使用することで簡単に実行することができます。

前の例で、「アイテム」テーブルがメモリーに収まるくらい小さいと想定してみてください。

join1エイリアスはこのようになります:

join1 = JOIN f4 BY item_id, item1 BY item_id USING ‘replicated;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;

Sparkではこれはbroadcast variablesによって簡単に実行されます:

# broadcast items
item_bc = sc.broadcast(item.collect())
 

'''
gets item name from its id
'''

def getItemName (item_id_to_match): # we know there will be only one result, so we take the first from the list
  (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]

アイテムテーブルはそれぞれのワーカーノードにブロードキャストされます。getItemName()機能がブロードキャストされたテーブルの中からどのレコードが与えられたitem_idを持っているかを確認し、名前を返します。この機能はSparkジョブのMapサイドで、実行されるレコードそれぞれにおいて呼び出されます。

完成したコードはこのようになります:


'''
gets item name from its id
'''


def getItemName (item_id_to_match):
 # we know there will be only one result, so we take the first from the
   (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
   return name
 
f1_df = sqlContext.sql(
  "SELECT item_id, count(*) AS nb_entries \
     FROM my_db.log \
    WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
item_df = sqlContext.sql(
   "SELECT item_id, item_name \
      FROM my_db.item"
)
 
item_bc = sc.broadcast(item_df.rdd.collect())
 
result = f1_df.rdd.map (lambda= result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

ウィンドウ機能: Grouped Byアイテムの種類分けされたリストのはじめのn事象を取得

共通の特徴によってグループ分けされたあるテーブルのtop-n first recordの抽出が必要になることがあります。例に挙げたログファイルから、それぞれのアイテムに関して、10個の最新レコードを取得してみましょう(SQLでいうとPARTITION BYのようなウィンドウ機能になります)。
Pigではこれは次のようなコードで実行することができます:

f0 = LOAD ‘logfile’ using PigStorage('\t') AS (log_date:char array, item_id:chararray, some_stuff:chararray); 
 
f1 = GROUP f0 BY item_id; 
 
f2 = FOREACH f1 {
   o = ORDER f0 BY log_date DESC;
   l = LIMIT o 10;
   GENERATE FLATTEN(l) AS (log_date, item_id, some_stuff);
}

Sparkでも、ローレベルRDDスタッフまたはSparkSQL Windowing capabilitiesを使って実行することができます。
RDDローレベルソリューションからはじめましょう:

# create a tuple with the key for the GroupBy
f1 = f0.map (lambda (log_date, item_id, some_stuff): (item_id, (log_date, some_stuff)))
 
f2 = f1.groupByKey()
 
# result of the GroupBy is a tuple (item_id, iterable over grouped items)
# we sort the iterable according to log_date and retain only first 10 elements
f3 = f2.map (lambda (item_id, iter1): (item_id, sorted(list(iter1), key=lambda (log_date, item_id, some_stuff):log_date, reverse=True)[:10]))
 
# transform tuples of (item_id, [(log_date, item_id, some_stuff), ...]) into tuples of (log_date, item_id, some_stuff)
f4 = f3.flatMapValues(lambda x:x) \
.map (lambda (item_id, (log_date, some_stuff)):(log_date, item_id, some_stuff)

エレガントではありませんが、一応これでも動きます。

それからSparkSQLでは:

f1_df = sqlContext.sql(
'SELECT \
  log_date, \
  item_id,  \
  some_stuff  \
FROM (  \
  SELECT  \
  log_date, \
  item_id,  \
  some_stuff, \
  dense_rank() OVER (PARTITION BY item_id ORDER BY log_date DESC) as rank \
FROM my_db.log) tmp \
WHERE rank <= 10')
 
f2 = f1_df.rdd.map (lambda row: (row.log_date, row.item_id, row.some_stuff))

こっちの方がずっといいです!

結論

本記事では、デプロイ、デバッグ、実行モニター、ダイナミックリソースアロケーション、パーティション、スプリットサイズ調節、サンプリングなどの興味深いトピックは含んでいません。本記事の目的は、PigのデベロッパがSparkでコーディングをはじめる方法を示すものでした。本記事がその点で助けになることを願っています。

著者情報

Philippe de Cuzey

フィリップ・デ・クゼイ

(データアナリスト)

「いまさら聞けない」データ分析の総ざらい

「いまさら聞けない」データ分析の総ざらい
昨今、「データ分析」の重要性が強く叫ばれています。ただ、それはアナリストやデータ・サイエンティストと呼ばれる“専門家だけが担うことができる難解なもの”という誤解を持たれてはいないでしょうか。

データ分析という手法は、IT のパワーを活用することで、どんな企業も導入・実践することができる“現実解”なのです。

データ分析とはいかなるものなのか――。

この資料では、これからデータ分析を始める方や始めたが、もう一度初歩知識を復習したい方々向けにデータ分析の基本を解説します。

無料ダウンロードはこちら

こちらの記事もおすすめです