Tomo's IT Blog

技術的な調査メモ

Apache Sparkの勉強-実際に動かしてRDDを操作してみよう

http://spark.apache.org/images/spark-logo-trademark.png


前回は、Apache Sparkのトップページを眺めて超概要を勉強しました。

tmnj.hatenablog.com


Apache Sparkってなに?と聞かれたら、「高度なDAG実行エンジンを備えた大規模データ高速処理基盤だよ!Hadoopより100倍速いよ!いろんな言語を使えるし、便利な拡張機能もあるよ、SQLで構造化データを扱えるし、リアルタイム処理もできるし、機械学習とか使えるし、グラフデータも処理できるよ。それも高速にね!!」と教えてあげましょう^^b
DAGってなに?と聞かれたら、瞬時に「Directed acyclic graphの略で日本語だと有向非巡回グラフというよ」とすらすら言えるとかっこいいですね!!
練習しておきましょう!(答えになってない)

ということで、今回は実際にSparkをインストールし、以下のQuick Startを読み進めながら実際に動かしてみたいと思います。

Quick Start - Spark 2.0.2 Documentation

環境の準備

今回は、Amazon EC2のスポットインスタンスを作成して利用します。
以下の記事で解説しております。

tmnj.hatenablog.com


ダウンロードとインストール

次のサイトから最新版のApache Sparkをダウンロードします。

Downloads | Apache Spark


次のように1~3を選択すると、4.Download Sparkにダウンロード用のリンクができますので、このリンクURLをコピーします。

f:id:tmnj:20161202151824p:plain


AWS EC2インスタンスSSHでログインして、以下のようにバイナリを取得します。

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz


適当なディレクトリで展開します。今回は、ec2-userホームで展開します。

$ pwd
/home/ec2-user

$ tar xzvf spark-2.0.2-bin-hadoop2.7.tgz


その他、Apache SparkではJava 7+が必要となりますが、Amazon AMIに含まれていますのでインストールは不要です。

Exampleを動かしてみる

以下のようにsparkを解凍したディレクトリ上で、run-exampleというコマンドを実行してみます。
これは円周率を求めるサンプルになります。

$ pwd
/home/ec2-user/spark-2.0.2-bin-hadoop2.7
$ ./bin/run-example SparkPi 10

細かい出力は割愛しますが、以下のように円周率が計算できていることが解ります。

Pi is roughly 3.145155145155145


また、インタラクティブにプログラムを動作させることもできます。
Sparkの準備が整いましたので、Quick Startを実施してみましょう

Quick Start

以下のURLでQuick Startにアクセスできます。これをベースに進めます。

Quick Start - Spark 2.0.2 Documentation

Quick Startでは対話シェルでScalaPythonを選択できます。
自分はScala初心者ですが、今回はScalaで試してみたいと思います。

Spark Shellで対話的に分析をしてみよう!

以下のコマンドで対話的なシェルを起動します。

$ pwd
/home/ec2-user/spark-2.0.2-bin-hadoop2.7
$ ./bin/spark-shell

…省略…

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.7.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

このプログラムは、":quite"で終了できます。(:helpでヘルプも見れます。)


Sparkの重要なコンセプトは、RDD(Resilient Distributed Dataset)と呼ばれる分散コレクションにあります。
分散というように、Sparkを複数マシンでクラスタ構成を取っている場合は、この1つの論理的なコレクションがRDDで各サーバに分散されて処理が実行されます。RDDは、HDFSなどからファイルを読み込んで作成したり出来ます。RDDにtransformという処理を加えると別のRDDが作成されます。Sparkの処理は、このRDDに対してtransform処理を数珠つなぎで実行していき、最終的に必要な結果を得る(actionと言います)というのが特徴です。MapReduceをいくつもつなげて処理するということに似ています。イメージ的には以下のようになるかと思います。

RDD1 → transform → RDD2→ transform → RDD3 → action → ほしい結果①
                      │
                      │
                      └→ transform → RDD4 → action → ほしい結果②

RDDはイミュータブルなので処理が循環することはありません。すなわち上記のイメージは有向非巡回モデルということになります。DAGエンジンは、このような複数の処理に対して、効率的なRDDの実行パスを計算するといったところで使われるのかなと思います。
RDDの詳細やDAGとの関連などは、また別途勉強していきたいと思います。

初めてのRDD

では、実際にファイルを読み込んでRDDを作ってみましょう。ここではSparkに付いているREADME.mdを読み込んでみます。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at :24

sc.textFile("README.md")により、textFileという変数にorg.apache.spark.rdd.RDDオブジェクトが挿入されたことが解ります。scとは、SparkContextのオブジェクトで、Sparkで処理を行う際に必須となるオブジェクトです。spark-shellを起動すると、自動的にSparkContextが作成されてscという変数で利用できます。プログラム上でSpark処理を記述する場合は、SparkContextを明示的に作成する必要がありますが、現時点ではSparkShellでは暗黙的にscを利用可能とだけ認識しておいてください。
RDDactionsと、transformationsという2つのタイプの操作が定義されています。actionsは、RDDが持つDatasetをもとに何らかの処理を実行した結果を返します。transformationsはRDDの持つデータセットを別のものに変換して新たなRDDを作り出します。
transformationの例としてはmapメソッドがあります。これはRDD内のデータセットをmap形式に変換して、別のRDDを作り出します。
actionの例としては、reduceメソッドがあります。これはRDD内のデータセットに対して集計操作を実行します。

実際にRDDのaction操作をしてみましょう。

scala> textFile.count()
res0: Long = 99

scala> textFile.first()
res1: String = # Apache Spark


これはどちらもactionの例です。countはtextFileのデータセット数(=ファイルの行数)を返し、firstはデータセットの先頭(=ファイルの先頭行)を返します。

次にtransformation操作をしてみます。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at :26

scala> linesWithSpark.count()
res1: Long = 19


この例では、filterメソッドを使用して、"Spark"という文字列を含む行のみを含んだ新たなRDDを作成してlinesWithSpark変数に格納しています。linesWithSparkにcount()を実行すると19行と結果が出力されます。
なおSparkの特徴としてRDDのトランスフォーメーション操作は遅延処理となります。上記の例では、実際にfilter処理が実行されてRDD内に新たなデータセットが作られるのはRDDにaction操作を実行したときになります。この例で説明すると、1行目のfilter処理ではクラスが作成されるのみです。2行目のcount()アクションが実行されたときに初めてtransformationが実行されlinesWithSparkにデータが格納され、count処理が実行されます。

transformationとactionは連続して記述することもできます。

scala> textFile.filter(line => line.contains("Spark")).count()
res2: Long = 19

もっとRDDをいじってみよう!

もう少し複雑な操作を実行してみましょう。

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res3: Int = 22

これは、READMEの中で最も単語数の多い行の単語数を抽出します。
mapメソッドやreduceメソッド内のこのような記述方法はscalaに慣れていないと理解が難しいかもしれませんが、これはクロージャと呼ばれる関数オブジェクトです。Java8のラムダ式と同じ感じですね。Javaの無名クラスの関数版みたいなものだという理解です。とりあえず、習うより慣れろということで、ここはそういうものだという素直な気持ちで進めましょう。

最初のmapは単純に行を空白で区切ってそのsizeを抽出しています。これにより、元のデータセットには行ごとの文字列が入っていましたが、mapで作成されたデータセットには行ごとの単語数が入ります。
その後、reduceメソッドにより、新しいRDD内のコレクションが順番に評価されて値が大きいものが抽出されていきます。最終的には、一番値の大きいものが返されるという処理となります。なお、クラスタ構成ではこれがうまく分散処理されますので高速に処理できますが、コードの記述内容はクラスタ環境でも変わるわけではありません。

ScalaJava上で動作していますので、Javaのクラスを取りこんで利用することも出ます。
下記の例は、java.lang.Mathを利用して値の大きい方を取得しています。

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res6: Int = 22

よくMapReduceの処理例として、ファイルに含まれるワードをカウントするという処理が取り上げられますが、Sparkでは以下の1行で記述することができます。非常にシンプルですね(と言ってみる)。

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:27

この結果はcollect()アクションを利用することで収集することができます。収集という言い方をしているのは、今はローカルのみで動かしているのでピンと来ないかもしれませんが、クラスタ環境では、wordCountsの実際のデータは各サーバ上に散らばっているわけです。collect()を実行すると、手元に収集されます。

scala> wordCounts.collect()
res7: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), ...)

flatMapとは何かというと、文字通り平らにしたMapということになりますが、解り辛いですね。
mapとflatMapを比較すると、
textFile.map(line => line.split(" "))の処理結果は、lineという文字列がWordのArrayに変換されます。

次のようなファイルをmapで変換した場合、

行1: My name is tomo.
行2: What's your name?

map(line => line.split(" "))の処理結果は以下のようになります。

Array(Array(My, name, is tomo.), Array(What's, your, name?))


flatMapは、Arrayの中のArrayをさらに平らにしてくれます。
つまり、flatMap(line => line.split(" "))の処理結果は以下のようになります。

Array(My, name, is, tomo., What's, your, name?)


すこし解り辛いですが、お分かりいただけましたでしょうか?

次に、.map(word => (word, 1))の部分ですが、これはwordをKeyとし文字が1個としたKey/Value型のMapに変換しています。
最後の、.reduceByKey((a, b) => a + b)は、Key毎(=この場合はKeyはワード)に値を足しており、最終的にワードをKeyにしたValueがワード数として算出されるということになります。


本日は一旦ここまでとしたいと思います。

まとめ

Sparkのデータ操作の肝はRDDです。RDDに対する操作はactionとtransformがあります。またSparkは遅延処理で実行されますので、actionが実行されるまでは、transform処理は走りません。
Scalaクロージャは習うより慣れろ!