はじめに

ナイル株式会社でエンジニアをしている、Maldiniです。今回は「SparkでRDDを中心に開発していてDataFrameはまだよく分からない」という人向けにDataFrameやspark.mlのPipelineについて要点をまとめました。

というのも、弊社は去年からScalaをプロダクト開発の中心としているというのもあり、2016年の1月ぐらいから筆者もSparkの勉強を始めて機械学習周りでSparkのMLlibを導入したのですが、先日参加した Datapalooza Tokyo というイベントでの「Spark の機械学習の開発と活用の動向」というセッションで

  • やはりこれからはRDDよりもDataFrameが主流
  • やはりこれからはspark.mllibよりもspark.mlが主流
  • Spark 2.0もうすぐリリースする

などといった話が出てきたので、 DataFrameやspark.mlのPipelineの要点をまとめることでRDD, spark.mllibを中心に開発している人のRDD → DataFrameの移行の手助けになればと思ったからです。(自分も含めて)

RDD vs DataFrame

RDDとは

RDD(Resilient Distributed Dataset)とは、簡単に言うと分散処理用のデータセットです。 下に例を載せておきます。(こちらは最早主流ではないのでさらっとだけ説明)

map, flatMap, reduceなどscala.collection用のメソッドは大体使えます。

詳しい使い方は、公式ドキュメントがわかりやすいのでそちらを参照ください。

DataFrameとは

テーブルのようなデータ構造をもった分散処理用データセットです。テーブル構造なのでSQLライクにデータを操作できます。 下に簡単な例を載せておきます。

DataFrameの使い方の詳細はこの記事この記事がわかりやすいと思います。左の記事はどちらもPythonですが、Scalaでも似た書き方になります。Scalaでの書き方をきちんと学びたい方はこちらも公式ドキュメントがわかりやすいので参照してください。

RDD ↔︎ DataFrameの変換方法

RDD vs DataFrameのまとめ

  • DataFrameはSQLライクに書けるので言語間 (Python, R, Scala) で書き方の違いも特になく、SQLが書ける人にとっては敷居が低い
  • DataFrameの方が基本的に速い (RDDよりも速く、かつ言語間で速さに違いが出にくい)

上記の理由からRDDではなくDataFrameが主流になっていくそうですが、DataFrameは型チェックに弱いなどの弱点もあり、それを補うために DataSet API もSpark 1.6から試験的にリリースされているのでこちらも注目です。

spark.mllib vs spark.ml

spark.mllibとは

org.apache.spark.mllibパッケージ内に属するMLlibの機械学習用クラス群です。 こちらはRDDをベースとしているので、今後はメンテナンスだけで新規開発は行われないようです。

spark.mlとは

org.apache.spark.mlパッケージ内に属するMLlibの機械学習用クラス群です。 こちらはDataFrameをベースとしているので、今後はこちらを主流に開発していくようです。

Pipelineとは

spark.ml.Piplelineの導入により、「生データに対してMapReduceなどの処理を行って、予測モデルを生成する」という処理自体をインスタンスとして記述、保持できるようになりました。

以下に簡単なサンプルコードを書いておきます。(下の図はサンプルコードのPipelineが行っている一連の処理の図解です)

Pipelineの使い方

Screen Shot 2016-06-23 at 5.42.53 PM

(Pipelineが行っている一連の処理)

Pipeline, PipelineModelの保存、読み込み方法 (上のコードの続き)

上記のようにPipelineを使うと、「データを受け取って予測モデルを作成するまでの処理」をインスタンスとして保存したりロードしたりして使いまわせます。

Pipelineのまとめ

Pipelineの導入により、今後はMLlibでの機械学習の主な流れは

  • 学習データをDataFrame (or DataSet?)として取得
  • それをTokenizer, HashingTFなどのPapelineStageクラスを継承したクラスを使って  [正解データ(ラベル), 特徴量] のペアデータを作成するための処理を書く。
  • NaiveBayesなどのアルゴリズムを使って予測モデルを作成する処理を書く
  • 一連の処理をPipelineのインスタンスとして作成 (、 保存)
  • Pipelineのインスタンスを元に予測モデルを作成(、保存)
  • 生成された予測モデルを使って新しいデータ予測をする。

という風になるのではないでしょうか。

さらにPipeline, PipelineModelの利点として、「言語をまたいだ使用が可能」というものがあります。

例えばデータ分析チームはPythonを使っている一方、ビッグデータの分析処理 & 分析結果を使ってサービスを提供するプロダクトはScalaで作られているといった場合、Pythonで作成したPipelineやPipelineModelをS3に保存して、Scalaでそれらをロードして使用する、ということも可能です。(S3への保存方法は上記参照)

最後に

spark.mlではまだ実装されていないアルゴリズムなどもあるので、 用途によってはspark.mllibのアルゴリズムに頼らないといけない場合もあり、その場合は結局RDDを使うこともあるでしょう。

しかし、今後spark.mlでの開発はどんどん進めていくとのことですし、DataFrame ↔︎ RDDの変換は簡単にできるので、少なくとも今からSparkを使って開発する機能に関してはDataFrame(or DataSet), spark.mlを中心に考えて作ったほうが良いのではないでしょうか。