Tomo's IT Blog

技術的な調査メモ

Apache Sparkの勉強-Clusterを構成してみよう!Standaloneクラスタ編

http://spark.apache.org/images/spark-logo-trademark.png


前回は、Apache SparkのQuick Guideを読み進めて、Sparkの動作を実際に確認してみました。


tmnj.hatenablog.com
tmnj.hatenablog.com


今回は、Sparkのクラスタ構成に挑戦してみたいと思います。
まずは以下のドキュメントでClusterの概要を理解します。

Cluster Mode Overview - Spark 2.0.2 Documentation

その後、以下のドキュメントを参考にしてStandaloneクラスタ構成を組みたいと思います!
(注:Standaloneクラスタ構成とはSparkに内在するクラスタマネージャを利用するという意味で、シングルインスタンス構成ではありません。)

Spark Standalone Mode - Spark 2.0.2 Documentation

まずはクラスタ概要を読んでみよう!

Sparkアプリケーションは、クラスタ構成がどのように構成されているかとは関係なくコーディングすることができます。SparkContextオブジェクトがコーディネートするため、開発者はクラスタ構成かどうかは意識する必要はありません。

クラスタ構成でSparkアプリケーションを実行した場合、SparkContextはいくつかのタイプのクラスタマネージャ(Spark独自のクラスタ管理機能やMesosやYARN)と接続しアプリケーションへリソースを割り当てます。クラスタに接続すると、アプリケーションロジックやデータ格納などを実行するために、各ノードのExecutorを取得します。SparkContextが作成されて実行する役割をDriverと呼びます。Driverはアプリケーションコードを各リモートのExecutorへ送信して、最後にその処理を実行するためのTaskを各Executorに送信します。


f:id:tmnj:20161208130607p:plain

クラスタマネージャタイプ

以下の3つのクラスタマネージャをサポートします。

今回は以下のドキュメントを利用して、Standaloneモードのクラスタ構成を構築していたいと思います。

Spark Standalone Mode - Spark 2.0.2 Documentation


EC2スポットインスタンスの作成

今回はクラスタ構成なので、スポットインスタンスをc3.large×3つ起動します。
詳細は以下を参照してください。

tmnj.hatenablog.com

セキュリティグループの構成

以下のように同じセキュリティグループ内のEC2インスタンス同士は無条件でアクセスできるようにルールを追加しておきます。
インバウンドルールで、ポートの範囲を0 - 65535を設定して、送信元を同一のセキュリティグループIDを指定したものを追加しておきましょう。

f:id:tmnj:20161208161903p:plain


これにより、同一セキュリティグループ内のEC2インスタンス間のすべての通信が許可されます。(なおdafaultセキュリティグループを使用している場合はすでにこのルールはセットされているはずです。)

Sparkのインストール

作成したスポットインスタンス3つにSparkをインストールします。
インストール方法は以下の記事を参照してください。

tmnj.hatenablog.com

クラスタを手動で起動

Standaloneクラスタは、手動で一つずつ起動するか、もしくはクラスタ起動スクリプトで一括起動できます。

  • mastar/workerを手動で一つずつ起動する
  • Sparkに付属しているクラスタ起動スクリプトを利用して一括起動する

今回はまず手動起動で実施してみます。

マスターの起動

まずはどれか一つのEC2インスタンス上で、以下を実行してマスターを起動します。

$  ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /home/ec2-user/spark-2.0.2-bin-hadoop2.7/logs/spark-ec2-user-org.apache.spark.deploy.master.Master-1-ip-xx-xx-xx-xx.out


上記のログの中に、Spark MasterのURLが出力されていますので調査します。以下のような内容がログ内に記述されているはずです。

 INFO Master: Starting Spark master at spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077

また、MasterのWebUI用のURLも出力されています。

 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://xx.xx.xx.xx:8080

SSHで8080ポートにトンネルを掘ってアクセスすると作業しているPCからアクセスできます。
#セキュリティグループに定義する場合はソースIPを指定して安全性を考慮しましょう!

トンネルは以下のようにsshを実行すれば掘ることができます。

ssh -N -L 8080:localhost:8080 -i "your-key.pem" ec2-user@ec2-xx-xx-xx-xx.ap-northeast-1.compute.amazonaws.com

上記を実行したのちに、ローカルPCのブラウザでhttp://localhost:8080にアクセスするとMaster WebUIにアクセスできます。
この画面で、クラスタのWorkerの情報なども確認できます。

f:id:tmnj:20161208145615p:plain

Slaveを起動してみる

残りの2つのインスタンス上で以下のコマンドを実行します。spark://~はmaster起動時のログか、MasterWebUI上に記載されていますので、それをコピペしましょう。

./sbin/start-slave.sh spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077

slaveのログに以下のように出力されていれば正常に起動しています。

INFO Worker: Successfully registered with master spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077


MasterWebUIでもクラスタに参加したワーカを確認できます。

f:id:tmnj:20161208153020p:plain



以上で、手動起動によるクラスタの構成が実施できました。

上記シェルにはいろいろ引数がありますので、以下のドキュメントを参照してみましょう。

https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually


この手順では、EC2インスタンスごとにstart-master/slave.shを起動していましたが、一括でクラスタを起動するためのスクリプトも用意されています*1
。こちらは、後日試してみたいと思います。

アプリケーションをクラスタに接続してみよう

アプリケーションをクラスタに接続する場合は単純にSparkContextにMasterのspark urlを渡してあげるだけです。
対話的なシェルで実施する場合は以下のように実行します。

./bin/spark-shell --master spark://IP:PORT


正常に起動すると、Master WebUIのRunning Applicationsに情報が表示されます。

f:id:tmnj:20161208153848p:plain


spark-shell.shの引数に--total-executor-cores を記述すると、このアプリケーションで利用するコア数を設定することも可能です。この引数を指定してWorkerのコアがどのように割当たっているか確認してみてください。


次に、以下のような簡単な処理を実行してみましょう。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> textFile.count()
res0: Long = 99

ちゃんとカウントが表示されましたでしょうか?

自己完結型Sparkアプリをクラスタで実行してみる

以前作成したScalaアプリと同様のもので実施してみます。
適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}


次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*2

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

以下のようなsimple.sbtファイルを作成して依存関係を定義します。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"

sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。

$ find .
.
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
./simple.sbt

次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!

$ sbt package
Getting org.scala-sbt sbt 0.13.12  ← しばらく待機していますが待ちましょう
…略…
[info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM

上記のように出力されたらパッケージング成功しています。では以下のように実際に実行してみましょう!

$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
--class "SimpleApp" \
--master "spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077" \
target/scala-2.11/simple-project_2.11-1.0.jar

…略…
Lines with a: 61, Lines with b: 27

…略…

ちゃんと結果が出力されましたが、これはSpark Cluster上で実行されています。master Web UIでも確認してみましょう。

以下のように、実行されていることが解ります。

f:id:tmnj:20161213121941p:plain


Application IDのリンクをクリックするとさらにWorker上で実行されていることがわかります。

f:id:tmnj:20161213121109p:plain

まとめ

本日は、Standaloneクラスタ構成を3つのEC2インスタンスを使って実施してみました。Master WebUIにアクセスすることでクラスタのWorker構成(利用できるコア数やメモリサイズなど)や実行中のアプリケーションが監視できることを確認しました。
また、対話シェルと自己完結型アプリケーションをクラスタモードで動かしてみました。非常に簡単でしたね!