HadoopTimes

ストリーミングアーキテクチャ Apache Kafka とMapR Streams による新しい設計手法
技術情報

SparkデータソースAPI: Spark SQLクエリエンジンの拡張

前回の記事、『分散SQLエンジンとしてのApache Spark』では、Hadoop内の格納データをSQLを使用して問い合わせる方法を説明しました。Spark SQLエンジンは、分散ファイルシステムのCSVファイルを読み取ること、ファイルからスキーマを自動的に発見すること、そして、Hiveメタストアでそれらをテーブルとして公開することが可能です。これらはすべて標準SQLクライアントを使用して、Spark SQLエンジンへ接続し、実行することが可能です。また、ETL作業を避けて、ファイルのスキーマを手動で定義することなくデータセットを探索することも可能です。

Sparkは拡張可能なフレームワークを提供しており、機能を拡張することで、その能力をさらにいっそう高めることが出来ます。

SparkデータソースAPI

データソースAPIでは、構造化されたデータを任意のフォーマットで管理できます。SparkはすでにAvroやParquetなどのいくつかの標準的な構造を組み込んでいますが、サードパーティーはこのAPIを拡張することで、CSV、JSONやその他構造化データの向けに新しいリーダーを作り出しています。今日、私たちも独自のリーダーを作り出しています。

APIを拡張する理由は2つあります。

第1に、過去のフォーマットを読み取ることや、現在のデータソースをより使いやすい新たなものに変換できるライブラリを必要としているため。

第2に、個々のアプリケーションに複雑に組み込むのではなく、すべてのアプリケーション上で横断的に使用できるライブラリを必要としているためです。

データソース

データソースは各ファイルがそれ自体エンティティであるファイルの一群からなります。今回の例のために、各ファイルがテキストファイルで、行ごとに各フィールドを記述した、ユーザー情報を含むファイルであるとし、そのための簡単なフォーマットを定義します。ファイルの例を見てみましょう。

pepe
20
Miami
Cube

このファイルは、「ペペ」という、20歳の、マイアミに住み、キューバで生まれたユーザーを表します。

多くの要件を加えるほどフォーマットを複雑にすることができますが、プロセス自体は変わりません。

数百万にのぼる同一フォーマットのファイルに対して、SQLで問い合わせをかける手順を公開します。

導入

データソースAPIを拡張するために、カスタムリーダーのロードとそれを使用するためにSparkフレームワークの特定クラスを導入する必要があります。

この例でのエントリーポイントとしてSparkアプリケーションを作成してみましょう。記事『SBT、Scala、Spark』に従って作成できます。

アプリを作成した後、最初に正しいSparkライブラリと関連付けることが必要です。今回の例はSpark 1.5.1上で実行します。sbtファイルは次のように定義されます。

name := "spark-datasource"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.5.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.5.1"

スキーマ作成

データソースAPI拡張の開始点はRelationProviderクラスです。RelationProviderクラスはデータの必要なリレーションを作り出すために使用されます。

希望するスキーマの作成を可能にするSchemaRelationProvider特性を組み合わせる必要があります。

DefaultSourceという名前のクラスを作成する必要があり、Sparkは所与のパッケージからそれを見つけようとします。DefaultSourceクラスはRelationProviderを拡張し、SchemaRelationProviderを組み合わせます。

ここまでのコードは次のようになります:

class DefaultSource extends RelationProvider with SchemaRelationProvider {
  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String])
    : BaseRelation = {
    createRelation(sqlContext, parameters, null)
  }
  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]
    , schema: StructType)
    : BaseRelation = {
    parameters.getOrElse("path", sys.error("'path' must be specified for our data."))
    return new LegacyRelation(parameters.get("path").get, schema)(sqlContext)
  }
}

このコードでは基本的に、生成したいリレーションを定義したLegacyRelationオブジェクトを作り出しています。既知のスキーマをもつタプルの一群のようなリレーションを考えてみてください。

リレーションクラスがどのように導入されたかを確認しましょう。

class LegacyRelation(location: String, userSchema: StructType)
(@transient val sqlContext: SQLContext)
  extends BaseRelation
       with Serializable {
  override def schema: StructType = {
    if (this.userSchema != null) {
      return this.userSchema
    }
    else {
      return StructType(Seq(StructField("name", StringType, true), 
                            StructField("age", IntegerType, true)))
    }
  }
}

ここではスキーマ関数を上書きしており、希望するスキーマが返されます。この例では、元のデータのスキーマがわかっていますが、ここでは、必要なスキーマを取得するために様々な操作を実行することができます。データがCSVであれば、ファイルのヘッダを利用してスキーマを推測したり、他の必要な操作を行うことができます。

エンティティの全内容ではなく、名前と年齢のフィールドのみを要求していることに着目してください。

次のステップは正しいスキーマを得ているかどうかをテストします。これはアプリケーションに次のコードを追加することで行えます。

object app {
  def main(args: Array[String]) {
    val config = new SparkConf().setAppName("testing provider")
    val sc = new SparkContext(config)
    val sqlContext = new SQLContext(sc)
   
    val df = sqlContext
              .read
              .format("com.nico.datasource.dat")
              .load("/Users/anicolaspp/data/")   
              
    df.printSchema()
  }
}

このコードはSparkContextを作成し、そこからさらにSQLContextを作り出します。SQLContextを使用して、パッケージ名 (SparkはDefaultSourceクラスのためにこのパッケージ名を見るので留めておいてください) を渡すことでフォーマットを設定します。次にプロバイダを利用して、特定のパスに含まれるデータをDataFrameにロードします。

df.printSchema()

定義したスキーマを表示します。出力は以下のようになります。

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

この時点では、求めるスキーマを単に作成しただけです。データを準備や定義されたスキーマの構築については言及していません。

スキーマへデータを読み込む

データソースからの読み込みのために、LegacyRelationクラスはTableScan特性を組み合わせる必要があります。TableScanには、次のシグネチャで実装するために必要なメソッドがあります。

def buildScan(): RDD[Row]

メソッドbuildScanはデータソースからすべての行を返します。今回の特定のケースでは、各行はそれぞれのファイルから選択された内容になります。buildScanの実装の様子を見ていきましょう。

override def buildScan(): RDD[Row] = {
    val rdd = sqlContext
                .sparkContext
                .wholeTextFiles(location)
                .map(x => x._2)
   
    val rows = rdd.map(file => {
      val lines = file.split("\n")
      Row.fromSeq(Seq(lines(0), lines(1)))
    })
    rows
  }

ここでは、(各ファイルがエンティティである) 全ファイルを読み込み、最初の2行 (求めるのはこのフィールドだけです) を読み込み、それらから行を作るwholeTextFilesメソッドを利用しています。その結果は、関わるファイルの一部を使用して作成された行の一群になります。

これはデータソースの内容をプリントアウトするようなアプリケーションを変更するために十分なメソッドでしょう。アプリケーションはこの時点で次のようになります。

object app {
  def main(args: Array[String]) {
    val config = new SparkConf().setAppName("testing provider")
    val sc = new SparkContext(config)
    val sqlContext = new SQLContext(sc)
   
    val df = sqlContext
              .read
              .format("com.nico.datasource.dat")
              .load("/Users/anicolaspp/data/")   
              
    df.show()
  }
}

希望するフォーマットをデータフレームに読み込んでいるにもかかわらず、データのフィールド型についての情報がありません。このスキーマの定義は異なるデータ型をサポートしていますが、この時点ではまだそれを強制しません。

各行を作成する際に型情報を推測するためにbuildScanメソッドを変更しましょう。

override def buildScan(): RDD[Row] = {
    val schemaFields = schema.fields
    val rdd = sqlContext
                .sparkContext
                .wholeTextFiles(location)
                .map(x => x._2)
    
    val rows = rdd.map(file => {
      val lines = file.split("\n")
      
      val typedValues = lines.zipWithIndex.map {
        case (value, index) => {
          val dataType = schemaFields(index).dataType
          castValue(value, dataType)
        }
    nbsp;  }
      Row.fromSeq(typedValues)
    })
    
    rows
  }
  
   private def castValue(value: String, toType: DataType) = toType match {
    case _: StringType      => value
    case _: IntegerType     => value.toInt
  }

ここでの唯一の変更は、ファイルから読み込んだそれぞれの値を、スキーマフィールドオブジェクトから推測される正しい型へ変換しているところです。今回の特定のケースでは名前が文字列であり、年齢が整数であることにのみ関心がありますが、しかしながら、この時点で極めて創造的である可能性があります。

それでは、最終的なLegacyRelationクラスは以下のようになります。

class LegacyRelation(location: String, userSchema: StructType)
  (@transient val sqlContext: SQLContext)
  extends BaseRelation
      with TableScan with Serializable {
  override def schema: StructType = {
    if (this.userSchema != null) {
      return this.userSchema
    }
    else {
      return StructType(Seq(StructField("name", StringType, true), 
                            StructField("age", IntegerType, true)))
    }
  }
  private def castValue(value: String, toType: DataType) = toType match {
    case _: StringType      => value
    case _: IntegerType     => value.toInt
  }
  override def buildScan(): RDD[Row] = {
    val schemaFields = schema.fields
    val rdd = sqlContext
              .sparkContext
              .wholeTextFiles(location)
              .map(x => x._2)
              
    val rows = rdd.map(file => {
      val lines = file.split("\n")
      val typedValues = lines.zipWithIndex.map{
        case (value, index) => {
          val dataType = schemaFields(index).dataType
          castValue(value, dataType)
        }
      }
      Row.fromSeq(typedValues)
    })
    rows
  }

これで、データをDataFrameにロードして、また、以前の記事で説明したとおり、SQLクライアントが利用できるようにデータを登録できます。アプリケーションは以下に示すようにシンプルです。

object app {
  def main(args: Array[String]) {
    val config = new SparkConf().setAppName("testing provider")
    val sc = new SparkContext(config)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext
              .read
              .format("com.nico.datasource.dat")
              .load("/Users/anicolaspp/data/")   

    df.registerTempTable("users")
    sqlContext.sql("select name from users").show()
  }
}

DataFrame APIの利点を活用できるように、カスタムフォーマットをデータフレームに読み込むまでを示しました。しかし、まだこれ以上のことができます。

データソースAPIはデータを読み込む機能を提供するだけでなく、データをカスタムフォーマットで書き出す機能もあります。あるフォーマットのデータセットを変換したい場合に、この機能はきわめて強力なものになります。既存のドライバーにこれらの機能を追加する方法を見てみましょう。

フォーマッタに書き込む

他の標準システムから読み込めるデータの保存を要求していると想定します。カスタムデータソースをロードし、そこからの出力としてCSVを作成しています。

APIからのセーブコールをサポートするために、DefaultSourceクラスはCreatableRelationProvider特性と組み合わせる必要があります。この特性には、導入する必要があるCreateRelationと呼ばれるメソッドがあります。次のコードを見てみましょう。

override def createRelation(sqlContext: SQLContext, mode: SaveMode, 
    parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
    saveAsCsvFile(data, parameters.get("path").get)
    createRelation(sqlContext, parameters, data.schema)
  }
  
  def saveAsCsvFile(data: DataFrame, path: String) = {
    val dataCustomRDD = data.rdd.map(row => {
      val values = row.toSeq.map(value => value.toString)
      values.mkString(",")
    })
    dataCustomRDD.saveAsTextFile(path)
  }

基本的には、データフレームをファイルのようにCSVとして保存し、その後、既知のスキーマとの関係に戻ってきます。

saveAsCsvFileメソッドは、CSVとしてフォーマットされたデータを使用してRDD [文字列] を作成しています。その後、それを所与のパスに保存します。簡単にするために出力ファイルにヘッダを含めませんでしたが、希望するフォーマットで必要なデータを出力できることを覚えておいてください。

DefaultSourceクラスの全コードは以下のとおりです。

class DefaultSource extends RelationProvider 
    with SchemaRelationProvider 
    with CreatableRelationProvider {
  override def createRelation(sqlContext: SQLContext, 
    parameters: Map[String, String]): BaseRelation = {
        
        createRelation(sqlContext, parameters, null)
  }
  override def createRelation(sqlContext: SQLContext, 
    parameters: Map[String, String], schema: StructType): BaseRelation = {
    
        parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
        return new LegacyRelation(parameters.get("path").get, schema)(sqlContext)
  }
  def saveAsCsvFile(data: DataFrame, path: String) = {
    val dataCustomRDD = data.rdd.map(row => {
      val values = row.toSeq.map(value => value.toString)
      values.mkString(",")
    })
    dataCustomRDD.saveAsTextFile(path)
  }
  override def createRelation(sqlContext: SQLContext, mode: SaveMode, 
    parameters: Map[String, String], data: DataFrame): BaseRelation = {
    
        saveAsCsvFile(data, parameters.get("path").get)
        createRelation(sqlContext, parameters, data.schema)
  }
}

オリジナルデータをフォーマットのようにCSVで保存するために、アプリケーションを以下のように変更します。

object app {
  def main(args: Array[String]) {
    val config = new SparkConf().setAppName("testing provider")
    val sc = new SparkContext(config)
    val sqlContext = new SQLContext(sc)
    
    val df = sqlContext
              .read
              .format("com.nico.datasource.dat")
              .load("/Users/anicolaspp/data/")   
    
    df.write
      .format("com.nico.datasource.dat")
      .save("/Users/anicolaspp/data/output")
  }
}

データを読み込み/書き出すたびに、DefaultSourceクラスが配置されているパッケージ名を指定する必要があることに注意してください。

これで、ライブラリをパッケージ化し、記述したデータソースを利用する必要があるプロジェクトに組み込むことができます。多くのライブラリーは想定される全てのフォーマットをサポートしており、すでにコミュニティーや個別プロジェクトなどで使用されています。

終わりに

SparkデータソースAPIを利用して、カスタムフォーマットのデータをデータフレームにロードする方法をご紹介しました。また、DefaultSourceに対するSparkの操作方法やデータフレームの使用に沿った出力フォーマッタの導入手順、各処理に関わるクラスの定義などについても再確認しました。

データソースAPIでできることはまだたくさんありますが、正しい解説書を見つけるのはかなり困難です。今後はより一層拡張APIに関するドキュメントが発行されることを期待しています。

今回の例では、簡単なフォーマットをサポートするためにデータソースAPIを拡張する方法を示しましたが、2進コード化されたエンティティのような、より複雑な型を読み込み、書き出すように変更することもできます。

Sparkへ独自のデータ型を統合するという能力は、世に出回るデータ処理フレームワークのなかでもトップクラスのものです。

Hadoopの世界では、目標と機能が共通する多くのツールを見つけることができますが、そのどれもSparkのような柔軟性や汎用性がありません。そのため、この分野でSparkはたいへん望ましいものになります。際限ない状況下において稼働可能な処理フレームワークに関心があるなら、Apache Sparkこそがそのための方法です。

著者情報

Arun Nallusamy

ニコラス・エイ・ペレス

(IPC ソフトウェアエンジニア)

「いまさら聞けない」データ分析の総ざらい

「いまさら聞けない」データ分析の総ざらい
昨今、「データ分析」の重要性が強く叫ばれています。ただ、それはアナリストやデータ・サイエンティストと呼ばれる“専門家だけが担うことができる難解なもの”という誤解を持たれてはいないでしょうか。

データ分析という手法は、IT のパワーを活用することで、どんな企業も導入・実践することができる“現実解”なのです。

データ分析とはいかなるものなのか――。

この資料では、これからデータ分析を始める方や始めたが、もう一度初歩知識を復習したい方々向けにデータ分析の基本を解説します。

無料ダウンロードはこちら

こちらの記事もおすすめです