Apache Sparkの勉強-Clusterを構成してみよう!Standaloneクラスタ編
前回は、Apache SparkのQuick Guideを読み進めて、Sparkの動作を実際に確認してみました。
tmnj.hatenablog.com
tmnj.hatenablog.com
今回は、Sparkのクラスタ構成に挑戦してみたいと思います。
まずは以下のドキュメントでClusterの概要を理解します。
Cluster Mode Overview - Spark 2.0.2 Documentation
その後、以下のドキュメントを参考にしてStandaloneクラスタ構成を組みたいと思います!
(注:Standaloneクラスタ構成とはSparkに内在するクラスタマネージャを利用するという意味で、シングルインスタンス構成ではありません。)
Spark Standalone Mode - Spark 2.0.2 Documentation
- まずはクラスタ概要を読んでみよう!
- クラスタマネージャタイプ
- EC2スポットインスタンスの作成
- セキュリティグループの構成
- Sparkのインストール
- クラスタを手動で起動
- アプリケーションをクラスタに接続してみよう
- 自己完結型Sparkアプリをクラスタで実行してみる
- まとめ
まずはクラスタ概要を読んでみよう!
Sparkアプリケーションは、クラスタ構成がどのように構成されているかとは関係なくコーディングすることができます。SparkContextオブジェクトがコーディネートするため、開発者はクラスタ構成かどうかは意識する必要はありません。
クラスタ構成でSparkアプリケーションを実行した場合、SparkContextはいくつかのタイプのクラスタマネージャ(Spark独自のクラスタ管理機能やMesosやYARN)と接続しアプリケーションへリソースを割り当てます。クラスタに接続すると、アプリケーションロジックやデータ格納などを実行するために、各ノードのExecutorを取得します。SparkContextが作成されて実行する役割をDriverと呼びます。Driverはアプリケーションコードを各リモートのExecutorへ送信して、最後にその処理を実行するためのTaskを各Executorに送信します。
クラスタマネージャタイプ
以下の3つのクラスタマネージャをサポートします。
- Standalone - Sparkに含まれているシンプルなクラスタマネージャです。簡単にクラスタ構成を組めます。
- Apache Mesos - Hadoop MapReduceとサービスアプリケーションを実行できる一般的なクラスタマネージャです。
- Hadoop YARN - Hadoop 2のリソースマネージャです。
今回は以下のドキュメントを利用して、Standaloneモードのクラスタ構成を構築していたいと思います。
Spark Standalone Mode - Spark 2.0.2 Documentation
セキュリティグループの構成
以下のように同じセキュリティグループ内のEC2インスタンス同士は無条件でアクセスできるようにルールを追加しておきます。
インバウンドルールで、ポートの範囲を0 - 65535を設定して、送信元を同一のセキュリティグループIDを指定したものを追加しておきましょう。
これにより、同一セキュリティグループ内のEC2インスタンス間のすべての通信が許可されます。(なおdafaultセキュリティグループを使用している場合はすでにこのルールはセットされているはずです。)
クラスタを手動で起動
Standaloneクラスタは、手動で一つずつ起動するか、もしくはクラスタ起動スクリプトで一括起動できます。
今回はまず手動起動で実施してみます。
マスターの起動
まずはどれか一つのEC2インスタンス上で、以下を実行してマスターを起動します。
$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /home/ec2-user/spark-2.0.2-bin-hadoop2.7/logs/spark-ec2-user-org.apache.spark.deploy.master.Master-1-ip-xx-xx-xx-xx.out
上記のログの中に、Spark MasterのURLが出力されていますので調査します。以下のような内容がログ内に記述されているはずです。
INFO Master: Starting Spark master at spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
また、MasterのWebUI用のURLも出力されています。
INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://xx.xx.xx.xx:8080
SSHで8080ポートにトンネルを掘ってアクセスすると作業しているPCからアクセスできます。
#セキュリティグループに定義する場合はソースIPを指定して安全性を考慮しましょう!
トンネルは以下のようにsshを実行すれば掘ることができます。
ssh -N -L 8080:localhost:8080 -i "your-key.pem" ec2-user@ec2-xx-xx-xx-xx.ap-northeast-1.compute.amazonaws.com
上記を実行したのちに、ローカルPCのブラウザでhttp://localhost:8080にアクセスするとMaster WebUIにアクセスできます。
この画面で、クラスタのWorkerの情報なども確認できます。
Slaveを起動してみる
残りの2つのインスタンス上で以下のコマンドを実行します。spark://~はmaster起動時のログか、MasterWebUI上に記載されていますので、それをコピペしましょう。
./sbin/start-slave.sh spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
slaveのログに以下のように出力されていれば正常に起動しています。
INFO Worker: Successfully registered with master spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
MasterWebUIでもクラスタに参加したワーカを確認できます。
以上で、手動起動によるクラスタの構成が実施できました。
上記シェルにはいろいろ引数がありますので、以下のドキュメントを参照してみましょう。
https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually
この手順では、EC2インスタンスごとにstart-master/slave.shを起動していましたが、一括でクラスタを起動するためのスクリプトも用意されています*1
。こちらは、後日試してみたいと思います。
アプリケーションをクラスタに接続してみよう
アプリケーションをクラスタに接続する場合は単純にSparkContextにMasterのspark urlを渡してあげるだけです。
対話的なシェルで実施する場合は以下のように実行します。
./bin/spark-shell --master spark://IP:PORT
正常に起動すると、Master WebUIのRunning Applicationsに情報が表示されます。
spark-shell.shの引数に--total-executor-cores
次に、以下のような簡単な処理を実行してみましょう。
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> textFile.count() res0: Long = 99
ちゃんとカウントが表示されましたでしょうか?
自己完結型Sparkアプリをクラスタで実行してみる
以前作成したScalaアプリと同様のもので実施してみます。
適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") sc.stop() } }
次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*2。
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo sudo yum install sbt
以下のようなsimple.sbtファイルを作成して依存関係を定義します。
name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"
sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。
$ find . . ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala ./simple.sbt
次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!
$ sbt package Getting org.scala-sbt sbt 0.13.12 ← しばらく待機していますが待ちましょう …略… [info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ... [info] Done packaging. [success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM
上記のように出力されたらパッケージング成功しています。では以下のように実際に実行してみましょう!
$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ --class "SimpleApp" \ --master "spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077" \ target/scala-2.11/simple-project_2.11-1.0.jar …略… Lines with a: 61, Lines with b: 27 …略…
ちゃんと結果が出力されましたが、これはSpark Cluster上で実行されています。master Web UIでも確認してみましょう。
以下のように、実行されていることが解ります。
Application IDのリンクをクリックするとさらにWorker上で実行されていることがわかります。