大規模並列処理:PythonとSparkの甘酸っぱい関係~PyData.Tokyo Meetup #3イベントレポート

 本連載では、勉強会を含む活動を通じてPyData.Tokyoが得た「Python+Data」の可能性やナレッジを、読者の方にお届けしていきます。機械学習や大規模データ解析など、幅広いテーマを取り扱っていく予定です。

新ロゴ作成&正式名称変更のお知らせ

 コミュニティの公式ロゴができました。あわせて、コミュニティ名称を「PyData Tokyo」から「PyData.Tokyo」に変更しました。

 ロゴステッカーの作成計画も進行中です。近々イベント会場でお配りできるかも知れません。

チュートリアルおよび次回勉強会のお知らせ

 この度PyData.Tokyo初の試みとして、初心者向けのチュートリアルを3月7日(土曜日)に行います。また、次回勉強会はデータ解析に関する「高速化」をテーマにし、4月3日(金曜日)に開催します。詳細は記事の最後をご覧下さい。

 PyData.Tokyo オーガナイザーのシバタアキラ(@madyagi)です。

 ビッグデータを処理するための基盤としてHadoopは既にデファクトスタンダードになりつつあります。一方で、データ処理に対するさらなる高速化と安定化に向けて、新しい技術が日々生まれており、様々な技術が競争し、淘汰されています。そんな中、Apache Spark(以下Spark)は、新しい分析基盤として昨年あたりから急激にユーザーを増やしており、耳にした方も多いのではないでしょうか。Hadoopとの互換とメモリ内処理による圧倒的なスピードに加え、MLlibと言われる分散型機械学習ライブラリの発展などもその理由に挙げられています。

 今回、Pythonでの開発経験が長く、Gunosyで広告サーバーなどの開発をされている粟飯原俊介さんをお招きし、Sparkに至るまでの分散処理の入門、Sparkを利用する上で必要となる知識や例の紹介、そして、PySparkを使ったPythonからのSpark利用の手ほどきをご紹介いただきました。

Spark is Hadoop++

 現在幅広く利用されているHadoopのエコシステム(HDFS, MESOS, YARNなど)と連携しながらも、Hadoopのボトルネックであるディスクからの読み出しをオンメモリ処理に置き換え、高速化を実現したのがSparkです。その高速なデータ処理を応用した、ストリーミング処理(Spark Streaming)や、機械学習(MLlib)などのライブラリーが用意されており、活発な開発が行われています。

 分散処理の「肝」となるのが、大量のコンピューターの管理と、その中で効率的に分散配置を行うことです。それまでのビッグデータ分散処理には様々な形がありましたが、現在でも広く使われているMPIと呼ばれる手法では、データの配置や、障害耐性を独自に実装する必要があり、"Life is too short for MPI"(MPIを使うほど人生は長くない)などと言われていました。

 それらの問題はmap, fold/reduce, filterなどのアルゴリズム体系を使い、Hadoopが極めて効率的に解決してきました。巨大データに特化し、データの配置を自動管理、広く分散されたデータに並列して計算を行い集計する手法です。一方で、高い信頼性の確保のために、多くの中間データがディスクに書き出されることもあり、計算によっては数時間かかることもよくありました。それを解決するために、Presto、Impalaなどのプロダクトが低レイテンシ実現のために開発されてきており、Sparkもこの流れの中で生まれました。

 Sparkは非常に汎用的なプログラミング環境を提供しており、map/reduceはもちろんのこと、zip, sort, union, join, groupByなど、非常に多様なデータ操作をサポートしています。RDD(Resillient Distributed Datasets)という分散データ構造を用いることで、クラスタ上の一部のデータが破損した場合にも、再度そのデータを生成し、高い障害耐性を実現しています。

PythonからはPySpark+IPythonで動かせる

 Sparkの利用は既にかなり簡略化されているようです。特にAWS上ではEMR(Elastic Map Reduce)を使うと、コマンドライン上からでも簡単にクラスタを必要な時に立ち上げられます(粟飯原さんの例はこちら)。iPython Notebook上でコーディングし、PySparkというライブラリを用いることで、立ち上げたクラスタ上でSparkコマンドを実行することができます。

 粟飯原さんが紹介していた例がこちら。ユーザーの読んだ記事のリストをTF-IDF分析し、リグレッションでモデル化することで、記事のリストからユーザーの性別を当てる、というものです。MLlibのライブラリを使っています:

# データの設定
sc = SparkContext()
male = sc.textFile('s3://bucket/path/male_.gz')
female = sc.textFile('s3://bucket/path/female_.gz')

# 各行のデータからTF-IDF分析を行う
tf = HashingTF(numFeatures = 10000)
male = male.map(lambda x: tx.transform(x.split(','))
female = female.map(lambda x: tx.transform(x.split(','))
idf = IDF()
idf_model = idf.fit(male.union(female))
male = idf_model.transform(male)
female = idf_model.transform(female)

# 機械学習のためにラベル付け
male = male.map(lambda x: LabeledPoint(1,x))
female = female.map(lambda x: LabeledPoint(2,x))
training = male.union(female)
training.cache()
model = LogisticRegressionWithSGD.train(training)

# 性別不明のユーザーのデータを処理
unknown= sc.textFile("s3://bucket/path/unknown_*.gz")
unknown = unknown.map(lambda x: [int(i), for i in x.split(",")])
unknown = unknown.map(lambda x: (x[0],idf_model.toransform(tf.transform.x[1:])))
unknown.map(lambda x: (x[0], model.predict(x[1]))).collect()

 ご覧のように、データのマネジに関しては読み込み以外何も指定する必要がない。また、Pythonから手軽にコーディングし、実行できる点は素晴らしいです。ただし処理に関してはmap, unionなど汎用的なメソッドに加え、MLlibのライブラリの使い方に依存しているため、ある程度の学習は必要になりそうです。

全てがバラ色というわけでもない

 上記のような高度な処理を大量のデータに対して、手軽なスクリプティング環境から実行できることは非常に強力です。一方で、粟飯原さん本人、そして会場からは問題点も指摘されました。Pythonからの利用時は、インタラクティブとは言えない体感実行速度が報告されています。Pythonの関数をラムダ関数などの細かい単位でPickle(シリアライズ・デシリアライズ)することがオーバヘッドになるほか、JVM上で走るSpark Workerとデータをパイプする際に生じるJVMとPythonのデータ構造の変換が何回も起こり、レイテンシーが大きくなるのためだと考えられます。

 PySparkの内部動作を示す上図を見るとわかりますが、Sparkのコア部分はJava/Scalaで開発されており、JVM上で動作します。PythonとJavaの互換性については今回会場でも大きなディスカッションになりました。PythonとJVMをつなぐPy4Jというラッパーが使われていますが、実装の問題からメモリーの占有や、処理の遅さが以前から指摘されています。また、MLlibの実装を超え、より多様な処理(例えば日本語の分かち書き)をするためにJavaのライブラリを使おうとすると、Scalaのラッパーが追加で必要になり、粟飯原さんも途中で挫折してしまったとのこと。

ガチで使うならScalaで!?

 Sparkをちゃんと使いたかったらScalaのインターフェースを使うべし、というのが今回のディスカッションを通じた結論でした。PythonはもちろんiPythonなどの便利な開発ツールやコーディングのしやすさなどの利点がありますが、ライブラリなどを使った少しでも高度な処理を始めると欠点が際立ってくるようです。この点Scalaを使っていると依存関係をjarファイルに渡すだけで良く、実行環境もネイティブになります。

Pythonistaはどうする? Spartanが「アツい」らしい

 それではPythonistaはどうするべきなのかというお話も今回最後にご紹介いただきました。Pythonでのデータ処理はNumPyという行列データ構造を使うことで高速化することができ、データ解析でのPythonコーディングには必須のライブラリーです。このNumpy行列をSparkのRDDのように分散化する試みが、Spartanというプロジェクトで行われています。ニューヨーク大学のRussell Power氏の博士研究(PDF)に基づいており、まだSparkに比べると成熟したプロジェクトというには程遠いものの、Pythonのほとんどのデータ処理がNumpyを使っていることを考えると今後の発展が期待されます。

 他にも、Django Celeryをつかってデータ処理を並列化する手法などが紹介されました。機械学習のサービス連携はタスク並列(飛んでくるデータに対して定型化された処理を並列で行う)が重要なため、map/reduceのように処理を書き換えなくても、ナイーブな並列化で十分な場合が多いのです。Celeryのデコレーターを使うと既存のPythonの関数を関数単位でタスク指定することができるので、極めて簡単に分散処理を行うことができるということでした。

 今回の粟飯原さんのスライドはこちらからご覧いただけます:

 また、発表のビデオもこちらからご覧いただけます: