Tomo's IT Blog

技術的な調査メモ

Apache Sparkの勉強-実際に動かしてRDDを操作してみよう (2)

http://spark.apache.org/images/spark-logo-trademark.png


前回は、対話的にシェルを実行してRDDの操作を試してみました。

tmnj.hatenablog.com


今回も、以下のQuick Startを元に続きを進めていきたいと思います。

Quick Start - Spark 2.0.2 Documentation

キャッシュしてみよう!

Sparkはキャッシュ機能を有しており、データセットクラスタをまたがるインメモリ・キャッシュ上に置くことができます。
データセットに何度もアクセスする場合に非常に便利です。例えば、小さい"Hot"なデータセットに何度もアクセスする場合や、ページランクのような何度も繰り返すアルゴリズムを実行するような場合です。
まずは、linesWithSparkをキャッシュしてみましょう。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

scala> linesWithSpark.cache()
res0: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:26

scala> linesWithSpark.count()
res2: Long = 19

scala> linesWithSpark.count()
res3: Long = 19

データは最初にaction操作が呼ばれたときにキャッシュされます。上記の場合は、最初のcount()時にlinesWithSparkがキャッシュされます。2回目のcount()処理はキャッシュ上のデータにより実行されます。cache()を実行していない場合は、count()を実行の度に、元のRDDへのtextFile.filter(line => line.contains("Spark"))が実行されます。
cacheを利用することで、action毎にRDDのtransformation処理を経ずに結果を取得することができるということになります。

なお、上記の例のように非常に小さいセットをキャッシュすることはあまり意味が無いかもしれませんが、大規模なクラスタ環境で大容量のデータを同じようにキャッシュすることができます。

なお、cache()メソッドは、Sparkの持つPersistence(永続化)機能の一部です。(persistenceで、Storage LevelをMEMORY_ONLYにした場合と同じ。)cache()メソッドを呼ぶことで、MEMORY_ONLYでpersistされます。
Spark Persistenceの詳細は以下のURLを参照してみましょう。

Spark Programming Guide - Spark 2.0.2 Documentation

自己完結型アプリケーション(Self-Contained Applications)を動かしてみよう!

Spark APIを利用することで、対話形式ではなく自己完結型アプリケーション(要はmainを持ったアプリ)を作成することができます。Scala/Java/Pythonで作成できます。
ここでは、scalaJavaで実施してみましょう。

ScalaでSparkアプリケーションを作成してみよう!

適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}

コード中では、まずSparkContextを作成しています。対話的シェルではSparkContextは暗黙的に利用できましたが、アプリ内で利用する場合は、明示的なインスタンス化が必要です。処理内容は非常にシンプルで、READEMに"a"と"b"が含まれる行数をカウントしているだけです。

次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*1

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

以下のようなsimple.sbtファイルを作成して依存関係を定義します。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"

sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。

$ find .
.
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
./simple.sbt

次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!

$ sbt package
Getting org.scala-sbt sbt 0.13.12  ← しばらく待機していますが待ちましょう
…略…
[info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM

上記のように出力されたらパッケージング成功しています。では実際に実行してみましょう!


$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
--class "SimpleApp" \
--master local[2] \
target/scala-2.11/simple-project_2.11-1.0.jar

…略…
Lines with a: 61, Lines with b: 27

…略…

ちゃんと結果が出力されました!

JavaでSparkアプリケーションを作成してみよう!

今度はJavaバージョンで実装してみます。処理内容は一緒で次のようなSimpleApp.javaファイルを作成します。

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
    sc.stop();
  }
}

Scala版と処理内容は一緒ですが、SparkContextはJavaSparkContexというクラスをインスタンス化しています。
Sparkのクロージャに該当する部分はspark.api.java.function.Functionを匿名クラスで定義して処理内容を実装しています。このあたりは、Scalaと比べると少し冗長ですね。

次にビルドしますが、今回はMavenを利用します。まずはMavenをインストールしてみましょう。次のGistのコマンドそのままでインストールできます。

gist.github.com


次に、ビルドのためのpom.xmlを用意します。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.2</version>
    </dependency>
  </dependencies>
</project>


Scalaの場合と同様に、ディレクトリ構造を整えておく必要があります。以下のように配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

コンパイル+パッケージングします。こちらも初回実行時は必要なパッケージのダウンロード処理が走りますので少し時間がかかります。(4~5分ぐらい)

$ mvn package
…省略…
[INFO] Building jar: /home/ec2-user/spark-test/java/target/simple-project-1.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.310 s
[INFO] Finished at: 2016-12-05T02:33:28+00:00
[INFO] Final Memory: 32M/245M
[INFO] ------------------------------------------------------------------------

それでは実行してみます。

$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
 --class "SimpleApp" \
 --master local[2] \
  target/simple-project-1.0.jar

…省略…

Lines with a: 61, lines with b: 27

…省略…

Scala版と同じ結果が出力されました!

以上で、Quick Startをすべて実行できました!

まとめ

前回と今回の2回で、Quick Startを読了しました。
SparkのRDDの基礎的な考え方と、キャッシュやSparkアプリケーションの作成方法など超概要ですが理解することができましたね!

次回以降は、以下のような観点で勉強を進めていきたいと思います。