Apache Sparkの勉強-Clusterを構成してみよう!Standaloneクラスタ編
前回は、Apache SparkのQuick Guideを読み進めて、Sparkの動作を実際に確認してみました。
tmnj.hatenablog.com
tmnj.hatenablog.com
今回は、Sparkのクラスタ構成に挑戦してみたいと思います。
まずは以下のドキュメントでClusterの概要を理解します。
Cluster Mode Overview - Spark 2.0.2 Documentation
その後、以下のドキュメントを参考にしてStandaloneクラスタ構成を組みたいと思います!
(注:Standaloneクラスタ構成とはSparkに内在するクラスタマネージャを利用するという意味で、シングルインスタンス構成ではありません。)
Spark Standalone Mode - Spark 2.0.2 Documentation
- まずはクラスタ概要を読んでみよう!
- クラスタマネージャタイプ
- EC2スポットインスタンスの作成
- セキュリティグループの構成
- Sparkのインストール
- クラスタを手動で起動
- アプリケーションをクラスタに接続してみよう
- 自己完結型Sparkアプリをクラスタで実行してみる
- まとめ
まずはクラスタ概要を読んでみよう!
Sparkアプリケーションは、クラスタ構成がどのように構成されているかとは関係なくコーディングすることができます。SparkContextオブジェクトがコーディネートするため、開発者はクラスタ構成かどうかは意識する必要はありません。
クラスタ構成でSparkアプリケーションを実行した場合、SparkContextはいくつかのタイプのクラスタマネージャ(Spark独自のクラスタ管理機能やMesosやYARN)と接続しアプリケーションへリソースを割り当てます。クラスタに接続すると、アプリケーションロジックやデータ格納などを実行するために、各ノードのExecutorを取得します。SparkContextが作成されて実行する役割をDriverと呼びます。Driverはアプリケーションコードを各リモートのExecutorへ送信して、最後にその処理を実行するためのTaskを各Executorに送信します。
クラスタマネージャタイプ
以下の3つのクラスタマネージャをサポートします。
- Standalone - Sparkに含まれているシンプルなクラスタマネージャです。簡単にクラスタ構成を組めます。
- Apache Mesos - Hadoop MapReduceとサービスアプリケーションを実行できる一般的なクラスタマネージャです。
- Hadoop YARN - Hadoop 2のリソースマネージャです。
今回は以下のドキュメントを利用して、Standaloneモードのクラスタ構成を構築していたいと思います。
Spark Standalone Mode - Spark 2.0.2 Documentation
セキュリティグループの構成
以下のように同じセキュリティグループ内のEC2インスタンス同士は無条件でアクセスできるようにルールを追加しておきます。
インバウンドルールで、ポートの範囲を0 - 65535を設定して、送信元を同一のセキュリティグループIDを指定したものを追加しておきましょう。
これにより、同一セキュリティグループ内のEC2インスタンス間のすべての通信が許可されます。(なおdafaultセキュリティグループを使用している場合はすでにこのルールはセットされているはずです。)
クラスタを手動で起動
Standaloneクラスタは、手動で一つずつ起動するか、もしくはクラスタ起動スクリプトで一括起動できます。
今回はまず手動起動で実施してみます。
マスターの起動
まずはどれか一つのEC2インスタンス上で、以下を実行してマスターを起動します。
$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /home/ec2-user/spark-2.0.2-bin-hadoop2.7/logs/spark-ec2-user-org.apache.spark.deploy.master.Master-1-ip-xx-xx-xx-xx.out
上記のログの中に、Spark MasterのURLが出力されていますので調査します。以下のような内容がログ内に記述されているはずです。
INFO Master: Starting Spark master at spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
また、MasterのWebUI用のURLも出力されています。
INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://xx.xx.xx.xx:8080
SSHで8080ポートにトンネルを掘ってアクセスすると作業しているPCからアクセスできます。
#セキュリティグループに定義する場合はソースIPを指定して安全性を考慮しましょう!
トンネルは以下のようにsshを実行すれば掘ることができます。
ssh -N -L 8080:localhost:8080 -i "your-key.pem" ec2-user@ec2-xx-xx-xx-xx.ap-northeast-1.compute.amazonaws.com
上記を実行したのちに、ローカルPCのブラウザでhttp://localhost:8080にアクセスするとMaster WebUIにアクセスできます。
この画面で、クラスタのWorkerの情報なども確認できます。
Slaveを起動してみる
残りの2つのインスタンス上で以下のコマンドを実行します。spark://~はmaster起動時のログか、MasterWebUI上に記載されていますので、それをコピペしましょう。
./sbin/start-slave.sh spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
slaveのログに以下のように出力されていれば正常に起動しています。
INFO Worker: Successfully registered with master spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077
MasterWebUIでもクラスタに参加したワーカを確認できます。
以上で、手動起動によるクラスタの構成が実施できました。
上記シェルにはいろいろ引数がありますので、以下のドキュメントを参照してみましょう。
https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually
この手順では、EC2インスタンスごとにstart-master/slave.shを起動していましたが、一括でクラスタを起動するためのスクリプトも用意されています*1
。こちらは、後日試してみたいと思います。
アプリケーションをクラスタに接続してみよう
アプリケーションをクラスタに接続する場合は単純にSparkContextにMasterのspark urlを渡してあげるだけです。
対話的なシェルで実施する場合は以下のように実行します。
./bin/spark-shell --master spark://IP:PORT
正常に起動すると、Master WebUIのRunning Applicationsに情報が表示されます。
spark-shell.shの引数に--total-executor-cores
次に、以下のような簡単な処理を実行してみましょう。
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> textFile.count() res0: Long = 99
ちゃんとカウントが表示されましたでしょうか?
自己完結型Sparkアプリをクラスタで実行してみる
以前作成したScalaアプリと同様のもので実施してみます。
適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") sc.stop() } }
次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*2。
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo sudo yum install sbt
以下のようなsimple.sbtファイルを作成して依存関係を定義します。
name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"
sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。
$ find . . ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala ./simple.sbt
次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!
$ sbt package Getting org.scala-sbt sbt 0.13.12 ← しばらく待機していますが待ちましょう …略… [info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ... [info] Done packaging. [success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM
上記のように出力されたらパッケージング成功しています。では以下のように実際に実行してみましょう!
$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ --class "SimpleApp" \ --master "spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077" \ target/scala-2.11/simple-project_2.11-1.0.jar …略… Lines with a: 61, Lines with b: 27 …略…
ちゃんと結果が出力されましたが、これはSpark Cluster上で実行されています。master Web UIでも確認してみましょう。
以下のように、実行されていることが解ります。
Application IDのリンクをクリックするとさらにWorker上で実行されていることがわかります。
Apache Sparkの勉強-実際に動かしてRDDを操作してみよう (2)
前回は、対話的にシェルを実行してRDDの操作を試してみました。
今回も、以下のQuick Startを元に続きを進めていきたいと思います。
Quick Start - Spark 2.0.2 Documentation
キャッシュしてみよう!
Sparkはキャッシュ機能を有しており、データセットをクラスタをまたがるインメモリ・キャッシュ上に置くことができます。
データセットに何度もアクセスする場合に非常に便利です。例えば、小さい"Hot"なデータセットに何度もアクセスする場合や、ページランクのような何度も繰り返すアルゴリズムを実行するような場合です。
まずは、linesWithSparkをキャッシュしてみましょう。
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26 scala> linesWithSpark.cache() res0: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:26 scala> linesWithSpark.count() res2: Long = 19 scala> linesWithSpark.count() res3: Long = 19
データは最初にaction操作が呼ばれたときにキャッシュされます。上記の場合は、最初のcount()時にlinesWithSparkがキャッシュされます。2回目のcount()処理はキャッシュ上のデータにより実行されます。cache()を実行していない場合は、count()を実行の度に、元のRDDへのtextFile.filter(line => line.contains("Spark"))が実行されます。
cacheを利用することで、action毎にRDDのtransformation処理を経ずに結果を取得することができるということになります。
なお、上記の例のように非常に小さいセットをキャッシュすることはあまり意味が無いかもしれませんが、大規模なクラスタ環境で大容量のデータを同じようにキャッシュすることができます。
なお、cache()メソッドは、Sparkの持つPersistence(永続化)機能の一部です。(persistenceで、Storage LevelをMEMORY_ONLYにした場合と同じ。)cache()メソッドを呼ぶことで、MEMORY_ONLYでpersistされます。
Spark Persistenceの詳細は以下のURLを参照してみましょう。
自己完結型アプリケーション(Self-Contained Applications)を動かしてみよう!
Spark APIを利用することで、対話形式ではなく自己完結型アプリケーション(要はmainを持ったアプリ)を作成することができます。Scala/Java/Pythonで作成できます。
ここでは、scalaとJavaで実施してみましょう。
ScalaでSparkアプリケーションを作成してみよう!
適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") sc.stop() } }
コード中では、まずSparkContextを作成しています。対話的シェルではSparkContextは暗黙的に利用できましたが、アプリ内で利用する場合は、明示的なインスタンス化が必要です。処理内容は非常にシンプルで、READEMに"a"と"b"が含まれる行数をカウントしているだけです。
次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*1。
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo sudo yum install sbt
以下のようなsimple.sbtファイルを作成して依存関係を定義します。
name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"
sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。
$ find . . ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala ./simple.sbt
次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!
$ sbt package Getting org.scala-sbt sbt 0.13.12 ← しばらく待機していますが待ちましょう …略… [info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ... [info] Done packaging. [success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM
上記のように出力されたらパッケージング成功しています。では実際に実行してみましょう!
$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ --class "SimpleApp" \ --master local[2] \ target/scala-2.11/simple-project_2.11-1.0.jar …略… Lines with a: 61, Lines with b: 27 …略…
ちゃんと結果が出力されました!
JavaでSparkアプリケーションを作成してみよう!
今度はJavaバージョンで実装してみます。処理内容は一緒で次のようなSimpleApp.javaファイルを作成します。
/* SimpleApp.java */ import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String[] args) { String logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function<String, Boolean>() { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function<String, Boolean>() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); sc.stop(); } }
Scala版と処理内容は一緒ですが、SparkContextはJavaSparkContexというクラスをインスタンス化しています。
Sparkのクロージャに該当する部分はspark.api.java.function.Functionを匿名クラスで定義して処理内容を実装しています。このあたりは、Scalaと比べると少し冗長ですね。
次にビルドしますが、今回はMavenを利用します。まずはMavenをインストールしてみましょう。次のGistのコマンドそのままでインストールできます。
次に、ビルドのためのpom.xmlを用意します。
<project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency> </dependencies> </project>
Scalaの場合と同様に、ディレクトリ構造を整えておく必要があります。以下のように配置します。
$ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java
コンパイル+パッケージングします。こちらも初回実行時は必要なパッケージのダウンロード処理が走りますので少し時間がかかります。(4~5分ぐらい)
$ mvn package …省略… [INFO] Building jar: /home/ec2-user/spark-test/java/target/simple-project-1.0.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 13.310 s [INFO] Finished at: 2016-12-05T02:33:28+00:00 [INFO] Final Memory: 32M/245M [INFO] ------------------------------------------------------------------------
それでは実行してみます。
$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \ --class "SimpleApp" \ --master local[2] \ target/simple-project-1.0.jar …省略… Lines with a: 61, lines with b: 27 …省略…
Scala版と同じ結果が出力されました!
以上で、Quick Startをすべて実行できました!
まとめ
前回と今回の2回で、Quick Startを読了しました。
SparkのRDDの基礎的な考え方と、キャッシュやSparkアプリケーションの作成方法など超概要ですが理解することができましたね!
次回以降は、以下のような観点で勉強を進めていきたいと思います。
- Spark Programming Guide - Spark 2.0.2 Documentationの読み込み
- クラスタ構成+スケーラビリティの検証
*1:詳細は次のURLを参照: sbt Reference Manual — Installing sbt on Linux
Apache Sparkの勉強-実際に動かしてRDDを操作してみよう
前回は、Apache Sparkのトップページを眺めて超概要を勉強しました。
Apache Sparkってなに?と聞かれたら、「高度なDAG実行エンジンを備えた大規模データ高速処理基盤だよ!Hadoopより100倍速いよ!いろんな言語を使えるし、便利な拡張機能もあるよ、SQLで構造化データを扱えるし、リアルタイム処理もできるし、機械学習とか使えるし、グラフデータも処理できるよ。それも高速にね!!」と教えてあげましょう^^b
DAGってなに?と聞かれたら、瞬時に「Directed acyclic graphの略で日本語だと有向非巡回グラフというよ」とすらすら言えるとかっこいいですね!!
練習しておきましょう!(答えになってない)
ということで、今回は実際にSparkをインストールし、以下のQuick Startを読み進めながら実際に動かしてみたいと思います。
Quick Start - Spark 2.0.2 Documentation
ダウンロードとインストール
次のサイトから最新版のApache Sparkをダウンロードします。
次のように1~3を選択すると、4.Download Sparkにダウンロード用のリンクができますので、このリンクURLをコピーします。
AWS EC2インスタンスにSSHでログインして、以下のようにバイナリを取得します。
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
適当なディレクトリで展開します。今回は、ec2-userホームで展開します。
$ pwd /home/ec2-user $ tar xzvf spark-2.0.2-bin-hadoop2.7.tgz
その他、Apache SparkではJava 7+が必要となりますが、Amazon AMIに含まれていますのでインストールは不要です。
Exampleを動かしてみる
以下のようにsparkを解凍したディレクトリ上で、run-exampleというコマンドを実行してみます。
これは円周率を求めるサンプルになります。
$ pwd /home/ec2-user/spark-2.0.2-bin-hadoop2.7 $ ./bin/run-example SparkPi 10
細かい出力は割愛しますが、以下のように円周率が計算できていることが解ります。
Pi is roughly 3.145155145155145
また、インタラクティブにプログラムを動作させることもできます。
Sparkの準備が整いましたので、Quick Startを実施してみましょう
Quick Start
以下のURLでQuick Startにアクセスできます。これをベースに進めます。
Quick Start - Spark 2.0.2 Documentation
Quick Startでは対話シェルでScalaかPythonを選択できます。
自分はScala初心者ですが、今回はScalaで試してみたいと思います。
Spark Shellで対話的に分析をしてみよう!
以下のコマンドで対話的なシェルを起動します。
$ pwd /home/ec2-user/spark-2.0.2-bin-hadoop2.7 $ ./bin/spark-shell …省略… Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.7.0_121) Type in expressions to have them evaluated. Type :help for more information. scala>
このプログラムは、":quite"で終了できます。(:helpでヘルプも見れます。)
Sparkの重要なコンセプトは、RDD(Resilient Distributed Dataset)と呼ばれる分散コレクションにあります。
分散というように、Sparkを複数マシンでクラスタ構成を取っている場合は、この1つの論理的なコレクションがRDDで各サーバに分散されて処理が実行されます。RDDは、HDFSなどからファイルを読み込んで作成したり出来ます。RDDにtransformという処理を加えると別のRDDが作成されます。Sparkの処理は、このRDDに対してtransform処理を数珠つなぎで実行していき、最終的に必要な結果を得る(actionと言います)というのが特徴です。MapReduceをいくつもつなげて処理するということに似ています。イメージ的には以下のようになるかと思います。
RDD1 → transform → RDD2→ transform → RDD3 → action → ほしい結果① │ │ └→ transform → RDD4 → action → ほしい結果②
RDDはイミュータブルなので処理が循環することはありません。すなわち上記のイメージは有向非巡回モデルということになります。DAGエンジンは、このような複数の処理に対して、効率的なRDDの実行パスを計算するといったところで使われるのかなと思います。
RDDの詳細やDAGとの関連などは、また別途勉強していきたいと思います。
初めてのRDD
では、実際にファイルを読み込んでRDDを作ってみましょう。ここではSparkに付いているREADME.mdを読み込んでみます。
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at:24
sc.textFile("README.md")により、textFileという変数にorg.apache.spark.rdd.RDDオブジェクトが挿入されたことが解ります。scとは、SparkContextのオブジェクトで、Sparkで処理を行う際に必須となるオブジェクトです。spark-shellを起動すると、自動的にSparkContextが作成されてscという変数で利用できます。プログラム上でSpark処理を記述する場合は、SparkContextを明示的に作成する必要がありますが、現時点ではSparkShellでは暗黙的にscを利用可能とだけ認識しておいてください。
RDDはactionsと、transformationsという2つのタイプの操作が定義されています。actionsは、RDDが持つDatasetをもとに何らかの処理を実行した結果を返します。transformationsはRDDの持つデータセットを別のものに変換して新たなRDDを作り出します。
transformationの例としてはmapメソッドがあります。これはRDD内のデータセットをmap形式に変換して、別のRDDを作り出します。
actionの例としては、reduceメソッドがあります。これはRDD内のデータセットに対して集計操作を実行します。
実際にRDDのaction操作をしてみましょう。
scala> textFile.count() res0: Long = 99 scala> textFile.first() res1: String = # Apache Spark
これはどちらもactionの例です。countはtextFileのデータセット数(=ファイルの行数)を返し、firstはデータセットの先頭(=ファイルの先頭行)を返します。
次にtransformation操作をしてみます。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at:26 scala> linesWithSpark.count() res1: Long = 19
この例では、filterメソッドを使用して、"Spark"という文字列を含む行のみを含んだ新たなRDDを作成してlinesWithSpark変数に格納しています。linesWithSparkにcount()を実行すると19行と結果が出力されます。
なおSparkの特徴としてRDDのトランスフォーメーション操作は遅延処理となります。上記の例では、実際にfilter処理が実行されてRDD内に新たなデータセットが作られるのはRDDにaction操作を実行したときになります。この例で説明すると、1行目のfilter処理ではクラスが作成されるのみです。2行目のcount()アクションが実行されたときに初めてtransformationが実行されlinesWithSparkにデータが格納され、count処理が実行されます。
transformationとactionは連続して記述することもできます。
scala> textFile.filter(line => line.contains("Spark")).count() res2: Long = 19
もっとRDDをいじってみよう!
もう少し複雑な操作を実行してみましょう。
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 22
これは、READMEの中で最も単語数の多い行の単語数を抽出します。
mapメソッドやreduceメソッド内のこのような記述方法はscalaに慣れていないと理解が難しいかもしれませんが、これはクロージャと呼ばれる関数オブジェクトです。Java8のラムダ式と同じ感じですね。Javaの無名クラスの関数版みたいなものだという理解です。とりあえず、習うより慣れろということで、ここはそういうものだという素直な気持ちで進めましょう。
最初のmapは単純に行を空白で区切ってそのsizeを抽出しています。これにより、元のデータセットには行ごとの文字列が入っていましたが、mapで作成されたデータセットには行ごとの単語数が入ります。
その後、reduceメソッドにより、新しいRDD内のコレクションが順番に評価されて値が大きいものが抽出されていきます。最終的には、一番値の大きいものが返されるという処理となります。なお、クラスタ構成ではこれがうまく分散処理されますので高速に処理できますが、コードの記述内容はクラスタ環境でも変わるわけではありません。
ScalaはJava上で動作していますので、Javaのクラスを取りこんで利用することも出ます。
下記の例は、java.lang.Mathを利用して値の大きい方を取得しています。
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res6: Int = 22
よくMapReduceの処理例として、ファイルに含まれるワードをカウントするという処理が取り上げられますが、Sparkでは以下の1行で記述することができます。非常にシンプルですね(と言ってみる)。
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:27
この結果はcollect()アクションを利用することで収集することができます。収集という言い方をしているのは、今はローカルのみで動かしているのでピンと来ないかもしれませんが、クラスタ環境では、wordCountsの実際のデータは各サーバ上に散らばっているわけです。collect()を実行すると、手元に収集されます。
scala> wordCounts.collect() res7: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), ...)
flatMapとは何かというと、文字通り平らにしたMapということになりますが、解り辛いですね。
mapとflatMapを比較すると、
textFile.map(line => line.split(" "))の処理結果は、lineという文字列がWordのArrayに変換されます。
次のようなファイルをmapで変換した場合、
行1: My name is tomo. 行2: What's your name?
map(line => line.split(" "))の処理結果は以下のようになります。
Array(Array(My, name, is tomo.), Array(What's, your, name?))
flatMapは、Arrayの中のArrayをさらに平らにしてくれます。
つまり、flatMap(line => line.split(" "))の処理結果は以下のようになります。
Array(My, name, is, tomo., What's, your, name?)
すこし解り辛いですが、お分かりいただけましたでしょうか?
次に、.map(word => (word, 1))の部分ですが、これはwordをKeyとし文字が1個としたKey/Value型のMapに変換しています。
最後の、.reduceByKey((a, b) => a + b)は、Key毎(=この場合はKeyはワード)に値を足しており、最終的にワードをKeyにしたValueがワード数として算出されるということになります。
本日は一旦ここまでとしたいと思います。
初めてのGitHub: Hello World
GitHubにアカウントを作成して、トップページにある"Read the guide"からたどれるHello Worldチュートリアルを実施してみましょう!
- そもそもGit(ギット)とは?
- GitHubとは?
- Hello Worldチュートリアル
- Step1: リポジトリを作成する
- Step2: ブランチを作成してみよう!
- Step3: 変更してコミットしてみよう!
- Step4: Pullリクエストをオープンする!
- Step5: Pullリクエストをマージする
- まとめ
そもそもGit(ギット)とは?
いまさらですが、そもそもGitとはソフトウェア開発やその他のバージョン管理タスクを実施するためのバージョン管理システムです。Linux開発者の Linus Torvaldsが2005年にLinuxカーネル開発のために作成したものです。
TEDのインタビューで、Git誕生の理由が語られています。数千人レベルの開発者がLinuxカーネルの開発に携わり、それぞれがちょっとずつコードを変更しており、そのような状況で2~3カ月に一度にリリースを実施していたため、コード管理だけで一つのプロジェクトになってしまっていたようです。そうした状況を変えるべくGitを開発したそうです。GitはLinus氏の2番目に大きなプロジェクトですが、もともとは1番目の大きなプロジェクトであるLinuxを維持管理する目的で作成したということです。なおCVSは大嫌いだそう!
GitHubとは?
GitHubは、WebベースのGitリポジトリを提供しているサービスです。インターネット上でコードのバージョン管理や開発者同士でコラボレーションできるコード管理プラットフォームを提供しています。有料のPrivateリポジトリか無料のパブリックなリポジトリの2つのプランを提供しています。後者の方は、OSSプロジェクトで良く利用されていたり、個人で作成したコードを公開する手段として良く利用されています。2016年4月時点で、1,400万人のユーザーと3,500万以上のリポジトリを有している*1そうです。
自分は恥ずかしながら、コードを公開するといった習慣が無かったのでこれを機に検証などで利用したコードなんかを公開していきたいなと思っております。
Hello Worldチュートリアル
GitHubのトップページからたどれるHelloWorldチュートリアルでは、Gitの基本要素であるリポジトリ、ブランチ、commit/pullリクエストなどを学ぶことができます。HelloWorldリポジトリを作成してPullリクエストを実行したり、基本的な変更箇所のレビューやブランチへのマージ方法を実践できます。
Step1: リポジトリを作成する
リポジトリは、通常単一のプロジェクトごとに作成します。リポジトリにはプロジェクトに必要なファイルやディレクトリを含めます。プロジェクトの説明を記述したREADMEを含めることが推奨されています。
リポジトリを作成してみよう!
1. 右上の+ボタンからNew Repositoryを選択します。
2. Repository Nameにhello-worldと入力、その他は以下のように入力して"Create repository"ボタンを押す
これで、Repositoryが完成!簡単!
Step2: ブランチを作成してみよう!
ブランチとは、同じリポジトリ上で同時に複数の異なるバージョンを管理するための仕組みです。デフォルトでは"master"という名前のブランチが一つあり、最終的にはこのmasterブランチに全ての変更がマージされます。
"master"からブランチを作成するとその時点での"master"のスナップショットが作成されます。ブランチで作業中に他の誰かがmasterに変更をマージした場合は、その変更を自分が作業中のブランチに取り込む(pull)こともできます。
以下は、masterブランチから新しいブランチを作成して変更→Pullリクエスト→マージまでのフローイメージです。
開発者は、バグフィックスをしたり新機能を追加する際にmasterからブランチを作成します。開発が完了したら、そのブランチの変更をmasterブランチににマージします。
それでは実際にブランチを作成してみましょう!
1. 先ほど作成したhello-worldリポジトリに移動します。
2. 真ん中左端の"Branch:master"ドロップダウンをクリックして、"readme-edits"と入力して、Create branchをクリックします。
これで、masterとreadme-editsの2つのブランチがリポジトリに存在するようになりました。
Step3: 変更してコミットしてみよう!
README.mdファイルをクリックして、をクリックして編集モードにします。
内容は何でも良いですがせっかくPublicな環境ですので、世界の人に向けたメッセージをREADMEに記述してみましょう!
修正したら、画面下の方の"Commit Change"ボタンをクリックします。
この変更はreadme-editsのみに反映されており、このブランチはmasterブランチとは異なる内容となりました。
Step4: Pullリクエストをオープンする!
readme-editsブランチは、masterブランチと異なる内容なりましたので、Pullリクエストをオープンすることができます。Pullリクエストは、Githubでのチーム開発を行う際の重要な機能になります。
Pullリクエストをオープンすると、変更内容が他のメンバーに提示され、その内容を他のメンバーが作業中のブランチにマージするようリクエストします。Pullリクエストは両方のブランチからの内容の差異を示し、変更した内容、追加した内容、削除した内容が緑と赤で表示され差分を確認することができます。リクエストを受けたメンバーは、その内容を吟味し必要に応じてリクエスト送信者とコミュニケーションし、最終的にブランチにマージするか判断します。
今回は自分専用のリポジトリでPullリクエストをオープンしてmasterブランチにマージしてみることで、GitHubの重要なフローを理解しましょう。
READMEファイルの変更に対してPullリクエストをオープンしよう!
Pullリクエストタブをクリックします。
緑色のNew pull requestボタンをクリックします。
作成したブランチ(readme-edits)を選択して、オリジナルのmasterと比較します。
変更部分を確認できます。
確認してOKであれば、Create pull requestを押します。
変更タイトルと変更内容を適当に記述して、Create pull requestを押します。
Pullリクエストがオープンしました!
Step5: Pullリクエストをマージする
このステップでは、READMEファイルを変更したreadme-editsブランチとmasterブランチをマージします。
Merge pull requestボタンを押します。
次に、Confirm mergeボタンを押します。
以下のように表示されて、マージが完了しました。変更は既にmasterに組み込まれたので、"Delete branch"ボタンでreadme-editsブランチを削除しましょう。
masterブランチにREADMEの変更がマージされました!
以上でHelloworldチュートリアルは終わりです。
まとめ
このチュートリアルではmasterブランチから新しいブランチを作成して、そのブランチに対して変更内容をコミットし、Pullリクエストを投げて、最終的に変更をmasterブランチにマージすると言うGitにおける重要な一連のフローを学ぶことができました。
GitはブランチとPullリクエストの仕組みにより、大人数のチーム開発を非常に効率的に実行できるわけです!
15年ほど前に開発現場にいた頃はCVSを使っており、リリース作業のたびにライブラリアンという役割の人が「一旦マージするので、コミットしばらくやめてくださいー」とかやっていました。かなり属人的で、きめ細やかな役割でしたが、今でもライブラリアンなんて役割の人は居るのだでしょうか?
Gitを利用すればリリース用ブランチをmasterから切れば開発プロセスを止めずにリリース作業ができますね。
なお、GitHubのPullリクエストに関しては、他のリポジトリをForkしてリポジトリのコピーを作成し、そこで変更した内容を元のリポジトリ管理者に反映してもらうためにPullリクエストを送る方法もあります。リポジトリを跨いだPullの仕組みです。GitHubの利用方法としては、こちらの方が多いのかもしれません。
Agile開発のように同時並列的に開発を進めていくプロジェクトでGitを利用する場合は、同一リポジトリ内でブランチを複数作成して、Pullリクエストによりお互いの変更をマージしたり、masterブランチにマージする利用方法が多いのかなと思います。
GitHubのドキュメント上では、前者を"Fork & Pull Model"と呼び、後者を"Shared Repository Model"と呼んでいます。
今回はWeb上ですべての操作を実施しましたが、通常コードは開発端末上にあり、ローカルで開発をしてその変更をGitHubに反映という流れになります。次回はClientからの操作方法、ローカルリポジトリやクローン、リモートリポジトリへのpushといった操作を実施してみたいと思います。
Apache Sparkの勉強-超概要を理解する
Apache Sparkを基礎から勉強していきます。
基本的にはドキュメントを読み進めながら動作を確認していこうと思います。
まずはトップページを読んでみよう
Apache Sparkのトップページ
Apache Spark™ - Lightning-Fast Cluster Computing
ここを見れば、そもそもApache Sparkが何なのか概要レベルで解るはずです。
Apache Spark™ is a fast and general engine for large-scale data processing.
Apache Sparkは、大規模なデータ処理のための高速かつ汎用的エンジンです。
特徴①:速い!
まず一つめと特徴としては、処理速度にあります。同じような並列分散処理基盤であるHadoop MapReduce上でプログラムを実行するよりもインメモリであれば最大100倍速い。ディスクでも10倍速いという特徴があります。
Apache Sparkは、高度なDAG実行エンジン(Advanced DAG execution engine)を備えます。
DAGとは何でしょうか?ここは用語が解らないので、Google先生に聞いてみます。
DAG=directed acyclic graphとのことです。日本語で言うと、有向非巡回グラフとなります。
詳細はWikipediaの出番です。
有向非巡回グラフ、有向非循環グラフ、有向無閉路グラフ(ゆうこうひじゅんかいグラフ、英: Directed acyclic graph, DAG)とは、グラフ理論における閉路のない有向グラフの事。有向グラフは頂点と有向辺(方向を示す矢印付きの辺)からなり、辺は頂点同士をつなぐが、ある頂点 v から出発し、辺をたどり、頂点 v に戻ってこないのが有向非巡回グラフである。
うーん。言葉だけだと解り辛いけど、どの頂点から出発しても同じ頂点には戻ってこないとということなのでしょう。これだと抽象的すぎて解り辛いですね。DAGの具体的な例を探してみましたが、英文のWikipediaの説明にSpread SheetもDAGでモデル化できるとのこと。一つのセルを頂点とみなし、あるセルの値とあるセルの値をかけたりといった計算を書けますが、どこかの頂点=セルの中身が変化すれば後続のすべてのセルの計算結果が変化するといった具合です。
なんとなくわかりましたが、このDAGというデータモデルがどのようにSpark内で利用されているのか、今後の勉強課題としておきます。
特徴②:使いやすい!
Java/Scala/Python/Rで記述できます。また、簡単に並列に処理できる80以上のハイレベルな操作が提供されています。また、対話式にこの操作をJava/Scala/Python/Rから使用できます。
具体的な操作は今後の学習ポイントです。特に並列処理は自分で実装すると大変なので、簡単なAPIを利用するだけで並列分散して高速に処理してくれるという意味で使いやすいというのが一番重要な点だと思います。
特徴③:汎用性
とりあえず、いろいろ汎用的に利用できるための機能が付いているようです。個々の機能に関しては今後調べていきたいと思います。
簡単に言うと、以下のような感じだと思いますが、重要な点はすべてSparkという高速な並列分散基盤上で実行できるという点にあると思います。
AWS EBSをEC2インスタンスにアタッチして、Linuxから利用できるようにする
EBSボリュームを作成して既存のEC2インスタンスにアタッチして、Linuxから利用する方法をまとめました。
EC2インスタンスの作成
適当にスポットインスタンスを作成します。
以下の記事を参照してください。
tmnj.hatenablog.com
EBSボリュームの作成
EC2ダッシュボードで"ELASTIC BLOCK STORAGE" -> "ボリューム"を選択します。
ボリュームの作成ボタンをクリックします。
今回は、汎用SSD(GP2)で8GBほど作成します。同一のアベイラビリティゾーンのインスタンスにしかアタッチできませんので、アベイラビリティゾーンはアタッチしたいインスタンスと同じものを指定してください。
なお、EBSは別途料金がかかりますので注意してください。ボリュームタイプの説明と料金は以下に定義されています。
料金 - Amazon Elastic Block Store(EBS) | AWS
以下のようにボリュームが作成されます。Nameは後から追記できます。"available"となっているのが新しく追加したボリュームです。
EBSボリュームをEC2インスタンスにアタッチする
まず、対象のEC2インスタンスにログインしてlsblkコマンドを利用して現在のデバイスの状態を出力してみます。
[ec2-user@ip-xx-xx-xx-xx ~]$ lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT xvda 202:0 0 8G 0 disk └─xvda1 202:1 0 8G 0 part /
作成したボリュームを右クリックして、ボリュームのアタッチを選択します。
既存のインスタンスを選択して"アタッチ"ボタンをクリックします。
再度、lsblkコマンドを利用してデバイスの状態を出力してみます。xvdfが追加されていることが解ります。
[ec2-user@ip-xx-xx-xx-xx2 ~]$ lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT xvda 202:0 0 8G 0 disk └─xvda1 202:1 0 8G 0 part / xvdf 202:80 0 8G 0 disk
LinuxでEBSボリュームを使用できるようにする
以下のURLの手順で実行します。
Amazon EBS ボリュームを使用できるようにする - Amazon Elastic Compute Cloud
今回対象となるデバイスは上記の出力から/dev/xvdfとなります。以下のコマンドを実行します。
[ec2-user ~]$ sudo file -s /dev/xvdf /dev/xvdf: data
dataと出ると、ファイルシステムが存在していない状態です。次にファイルシステムをこのデバイスに作成します。
以下のコマンドを実行して出力を確認します。
[ec2-user@ip-xx-xx-xx-xx ~]$ sudo mkfs -t ext4 /dev/xvdf mke2fs 1.42.12 (29-Aug-2014) Creating filesystem with 2097152 4k blocks and 524288 inodes Filesystem UUID: 805da6f8-b81c-40ef-969c-910763183294 Superblock backups stored on blocks: 32768, 98304, 163840, 229376, 294912, 819200, 884736, 1605632 Allocating group tables: done Writing inode tables: done Creating journal (32768 blocks): done Writing superblocks and filesystem accounting information: done
再度、/dev/xvdfを確認するとファイルシステムが作成されていることが解ります。
[ec2-user@ip-xx-xx-xx-xx ~]$ sudo file -s /dev/xvdf /dev/xvdf: Linux rev 1.0 ext4 filesystem data, UUID=805da6f8-b81c-40ef-969c-9107 63183294 (extents) (large files) (huge files)
ファイルシステムができたので、マウントポイントにマウントします。今回は/dataにマウントしてみます。
まず、ディレクトリを作成します。
$ sudo mkdir /data
マウントポイントにマウントします。
$ sudo mount /dev/xvdf /data
システムブート時にEBSボリュームをマウントする
/etc/fstabファイルに以下のエントリを追記します。
/dev/xvdf /data ext4 defaults,nofail 0 2
EC2のストレージの勉強
今回はEC2ストレージの勉強をしたいと思います。
教材は以下を利用。
ストレージ - Amazon Elastic Compute Cloud
ストレージのオプション
次の3つのオプションを利用可能。
- Amazon Elastic Block Store(Amazon EBS)
- Amazon EC2 インスタンスストア
- Amazon Simple Storage Service(Amazon S3)
図でまとまっています。
Amazon EBSとは
ブロックレベルのストレージで、同一のアベイラビリティゾーンであればどのインスタンスにもアタッチできます。同時に複数のインスタンスからはアタッチできませんが、デタッチしたEBSストレージを別のインスタンスにアタッチすることができます。頻繁な更新データを保持するのに最適です。
EBSの可用性は?
以下のURLの情報によると、データは同一アベイラビリティゾーン内に自動でレプリケーションされ99.999% の可用性を維持する設計となっているようです。
製品の詳細 - Amazon Elastic Block Store(EBS) | AWS
EBSはスナップショットをS3に保持する機能があります。またS3に保存したスナップショットからEBSボリュームを作成して別のインスタンスにアタッチすることもできるようです。データをもっとかっちりと守りたい場合は、定期的にスナップショットを取得してS3に保存しておいた方が良いと思います。ちなみにS3の耐久性と可用性の情報は以下のURLに記載があります。
1 年でオブジェクトの 99.999999999% の耐久性と最大 99.99% の可用性を提供するよう設計されています。
S3だとデータの耐久性は非常に高くデータがロストすることはほぼないけど、可用性はEBSより低いので、データにアクセスできない時間が長いということでしょうね。
インスタンスストアとは
これは単純にホストコンピュータ上の物理的なディスクのことです。これをインスタンスストアと呼んでいます。重要なのはインスタンス用のブロックレベルの"一時ストレージ"を提供するということです。つまり、インスタンスを停止または終了すると消えてしまいます。
じゃあ、リブートはどうなんだろうと思ったら、リブートではインスタンスストアボリューム上のデータはすべて保持されるようです。
インスタンスの再起動 - Amazon Elastic Compute Cloud
インスタンスを再起動すると、インスタンスは同じホスト上で保持されるため、インスタンスのパブリックドメイン名、プライベート IP アドレス、およびインスタンスストアボリューム上のすべてのデータは保持されます。
EC2ルートデバイスボリューム
EC2インスタンスは、ルートデバイスボリュームに格納されているイメージを利用してインスタンスがブートされます。ルートデバイスボリュームとして利用できるのが、インスタンスストアかEBSとなります。
この2つの違いにより、AMIのタイプとして2つあります。
- 「Amazon EC2 インスタンスストア backed」のAMI
- Amazon EBS backed」の AMI
- Amazon EBS ボリュームがアタッチされます
起動が高速で永続的ストレージを利用しているため、推奨されるのはAmazon EBS Backedなインスタンスです。以下のURLに違いがまとまっています。
AMI タイプ - Amazon Elastic Compute Cloud
ルートデバイスボリュームの詳細は、以下のURLを参照しましょう。
永続的ルートデバイスボリュームへの変更
EBS Backedでもデフォルトではインスタンス終了時に削除されてしまうようです。これを永続的ルートデバイスボリュームに変更できます。
Amazon EC2 ルートデバイスボリューム - Amazon Elastic Compute Cloud
デフォルトでは、Amazon EBS-backed AMI のルートデバイスボリュームは、インスタンスを終了すると削除されます。デフォルトの動作を変更するには、ブロックデバイスマッピングを使用して、DeleteOnTermination 属性を false に設定します。
スポットインスタンス作成ウィザードでは、EBSボリュームの設定の"削除"チェックボックスをオフにすると永続的ルートボリュームに変更できるようです。