読者です 読者をやめる 読者になる 読者になる

Tomo's IT Blog

技術的な調査メモ

Apache Sparkの勉強-Clusterを構成してみよう!Standaloneクラスタ編

spark cluster 分散

http://spark.apache.org/images/spark-logo-trademark.png


前回は、Apache SparkのQuick Guideを読み進めて、Sparkの動作を実際に確認してみました。


tmnj.hatenablog.com
tmnj.hatenablog.com


今回は、Sparkのクラスタ構成に挑戦してみたいと思います。
まずは以下のドキュメントでClusterの概要を理解します。

Cluster Mode Overview - Spark 2.0.2 Documentation

その後、以下のドキュメントを参考にしてStandaloneクラスタ構成を組みたいと思います!
(注:Standaloneクラスタ構成とはSparkに内在するクラスタマネージャを利用するという意味で、シングルインスタンス構成ではありません。)

Spark Standalone Mode - Spark 2.0.2 Documentation

まずはクラスタ概要を読んでみよう!

Sparkアプリケーションは、クラスタ構成がどのように構成されているかとは関係なくコーディングすることができます。SparkContextオブジェクトがコーディネートするため、開発者はクラスタ構成かどうかは意識する必要はありません。

クラスタ構成でSparkアプリケーションを実行した場合、SparkContextはいくつかのタイプのクラスタマネージャ(Spark独自のクラスタ管理機能やMesosやYARN)と接続しアプリケーションへリソースを割り当てます。クラスタに接続すると、アプリケーションロジックやデータ格納などを実行するために、各ノードのExecutorを取得します。SparkContextが作成されて実行する役割をDriverと呼びます。Driverはアプリケーションコードを各リモートのExecutorへ送信して、最後にその処理を実行するためのTaskを各Executorに送信します。


f:id:tmnj:20161208130607p:plain

クラスタマネージャタイプ

以下の3つのクラスタマネージャをサポートします。

今回は以下のドキュメントを利用して、Standaloneモードのクラスタ構成を構築していたいと思います。

Spark Standalone Mode - Spark 2.0.2 Documentation


EC2スポットインスタンスの作成

今回はクラスタ構成なので、スポットインスタンスをc3.large×3つ起動します。
詳細は以下を参照してください。

tmnj.hatenablog.com

セキュリティグループの構成

以下のように同じセキュリティグループ内のEC2インスタンス同士は無条件でアクセスできるようにルールを追加しておきます。
インバウンドルールで、ポートの範囲を0 - 65535を設定して、送信元を同一のセキュリティグループIDを指定したものを追加しておきましょう。

f:id:tmnj:20161208161903p:plain


これにより、同一セキュリティグループ内のEC2インスタンス間のすべての通信が許可されます。(なおdafaultセキュリティグループを使用している場合はすでにこのルールはセットされているはずです。)

Sparkのインストール

作成したスポットインスタンス3つにSparkをインストールします。
インストール方法は以下の記事を参照してください。

tmnj.hatenablog.com

クラスタを手動で起動

Standaloneクラスタは、手動で一つずつ起動するか、もしくはクラスタ起動スクリプトで一括起動できます。

  • mastar/workerを手動で一つずつ起動する
  • Sparkに付属しているクラスタ起動スクリプトを利用して一括起動する

今回はまず手動起動で実施してみます。

マスターの起動

まずはどれか一つのEC2インスタンス上で、以下を実行してマスターを起動します。

$  ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /home/ec2-user/spark-2.0.2-bin-hadoop2.7/logs/spark-ec2-user-org.apache.spark.deploy.master.Master-1-ip-xx-xx-xx-xx.out


上記のログの中に、Spark MasterのURLが出力されていますので調査します。以下のような内容がログ内に記述されているはずです。

 INFO Master: Starting Spark master at spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077

また、MasterのWebUI用のURLも出力されています。

 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://xx.xx.xx.xx:8080

SSHで8080ポートにトンネルを掘ってアクセスすると作業しているPCからアクセスできます。
#セキュリティグループに定義する場合はソースIPを指定して安全性を考慮しましょう!

トンネルは以下のようにsshを実行すれば掘ることができます。

ssh -N -L 8080:localhost:8080 -i "your-key.pem" ec2-user@ec2-xx-xx-xx-xx.ap-northeast-1.compute.amazonaws.com

上記を実行したのちに、ローカルPCのブラウザでhttp://localhost:8080にアクセスするとMaster WebUIにアクセスできます。
この画面で、クラスタのWorkerの情報なども確認できます。

f:id:tmnj:20161208145615p:plain

Slaveを起動してみる

残りの2つのインスタンス上で以下のコマンドを実行します。spark://~はmaster起動時のログか、MasterWebUI上に記載されていますので、それをコピペしましょう。

./sbin/start-slave.sh spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077

slaveのログに以下のように出力されていれば正常に起動しています。

INFO Worker: Successfully registered with master spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077


MasterWebUIでもクラスタに参加したワーカを確認できます。

f:id:tmnj:20161208153020p:plain



以上で、手動起動によるクラスタの構成が実施できました。

上記シェルにはいろいろ引数がありますので、以下のドキュメントを参照してみましょう。

https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually


この手順では、EC2インスタンスごとにstart-master/slave.shを起動していましたが、一括でクラスタを起動するためのスクリプトも用意されています*1
。こちらは、後日試してみたいと思います。

アプリケーションをクラスタに接続してみよう

アプリケーションをクラスタに接続する場合は単純にSparkContextにMasterのspark urlを渡してあげるだけです。
対話的なシェルで実施する場合は以下のように実行します。

./bin/spark-shell --master spark://IP:PORT


正常に起動すると、Master WebUIのRunning Applicationsに情報が表示されます。

f:id:tmnj:20161208153848p:plain


spark-shell.shの引数に--total-executor-cores を記述すると、このアプリケーションで利用するコア数を設定することも可能です。この引数を指定してWorkerのコアがどのように割当たっているか確認してみてください。


次に、以下のような簡単な処理を実行してみましょう。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> textFile.count()
res0: Long = 99

ちゃんとカウントが表示されましたでしょうか?

自己完結型Sparkアプリをクラスタで実行してみる

以前作成したScalaアプリと同様のもので実施してみます。
適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}


次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*2

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

以下のようなsimple.sbtファイルを作成して依存関係を定義します。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"

sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。

$ find .
.
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
./simple.sbt

次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!

$ sbt package
Getting org.scala-sbt sbt 0.13.12  ← しばらく待機していますが待ちましょう
…略…
[info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM

上記のように出力されたらパッケージング成功しています。では以下のように実際に実行してみましょう!

$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
--class "SimpleApp" \
--master "spark://ip-xx-xx-xx-xx.ap-northeast-1.compute.internal:7077" \
target/scala-2.11/simple-project_2.11-1.0.jar

…略…
Lines with a: 61, Lines with b: 27

…略…

ちゃんと結果が出力されましたが、これはSpark Cluster上で実行されています。master Web UIでも確認してみましょう。

以下のように、実行されていることが解ります。

f:id:tmnj:20161213121941p:plain


Application IDのリンクをクリックするとさらにWorker上で実行されていることがわかります。

f:id:tmnj:20161213121109p:plain

まとめ

本日は、Standaloneクラスタ構成を3つのEC2インスタンスを使って実施してみました。Master WebUIにアクセスすることでクラスタのWorker構成(利用できるコア数やメモリサイズなど)や実行中のアプリケーションが監視できることを確認しました。
また、対話シェルと自己完結型アプリケーションをクラスタモードで動かしてみました。非常に簡単でしたね!

Apache Sparkの勉強-実際に動かしてRDDを操作してみよう (2)

spark

http://spark.apache.org/images/spark-logo-trademark.png


前回は、対話的にシェルを実行してRDDの操作を試してみました。

tmnj.hatenablog.com


今回も、以下のQuick Startを元に続きを進めていきたいと思います。

Quick Start - Spark 2.0.2 Documentation

キャッシュしてみよう!

Sparkはキャッシュ機能を有しており、データセットクラスタをまたがるインメモリ・キャッシュ上に置くことができます。
データセットに何度もアクセスする場合に非常に便利です。例えば、小さい"Hot"なデータセットに何度もアクセスする場合や、ページランクのような何度も繰り返すアルゴリズムを実行するような場合です。
まずは、linesWithSparkをキャッシュしてみましょう。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

scala> linesWithSpark.cache()
res0: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:26

scala> linesWithSpark.count()
res2: Long = 19

scala> linesWithSpark.count()
res3: Long = 19

データは最初にaction操作が呼ばれたときにキャッシュされます。上記の場合は、最初のcount()時にlinesWithSparkがキャッシュされます。2回目のcount()処理はキャッシュ上のデータにより実行されます。cache()を実行していない場合は、count()を実行の度に、元のRDDへのtextFile.filter(line => line.contains("Spark"))が実行されます。
cacheを利用することで、action毎にRDDのtransformation処理を経ずに結果を取得することができるということになります。

なお、上記の例のように非常に小さいセットをキャッシュすることはあまり意味が無いかもしれませんが、大規模なクラスタ環境で大容量のデータを同じようにキャッシュすることができます。

なお、cache()メソッドは、Sparkの持つPersistence(永続化)機能の一部です。(persistenceで、Storage LevelをMEMORY_ONLYにした場合と同じ。)cache()メソッドを呼ぶことで、MEMORY_ONLYでpersistされます。
Spark Persistenceの詳細は以下のURLを参照してみましょう。

Spark Programming Guide - Spark 2.0.2 Documentation

自己完結型アプリケーション(Self-Contained Applications)を動かしてみよう!

Spark APIを利用することで、対話形式ではなく自己完結型アプリケーション(要はmainを持ったアプリ)を作成することができます。Scala/Java/Pythonで作成できます。
ここでは、scalaJavaで実施してみましょう。

ScalaでSparkアプリケーションを作成してみよう!

適当なディレクトリ上で、以下の内容でSimpleApp.scalaというファイルを作成します。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}

コード中では、まずSparkContextを作成しています。対話的シェルではSparkContextは暗黙的に利用できましたが、アプリ内で利用する場合は、明示的なインスタンス化が必要です。処理内容は非常にシンプルで、READEMに"a"と"b"が含まれる行数をカウントしているだけです。

次に、このアプリをビルド&パッケージングする必要がありますが、ここではsbtというツールを利用します。
以下のコマンドでEC2(Amazon Linux AMI)上にinstallします*1

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

以下のようなsimple.sbtファイルを作成して依存関係を定義します。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"

sbtでパッケージングするために、以下のようなディレクトリ構造にしてSimpeApp.scalaとsimple.sbtを配置しておきます。

$ find .
.
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
./simple.sbt

次に上記ディレクトリ上で、sbt packageを実行します。なお、sbt初回実行時は必要なjarをダウンロードする処理が走りますので少し時間がかかります。特に'Getting org.scala-sbt sbt 0.13.12'というメッセージが出てStuckしているように見えますが、2~3分するとjarのダウンロード処理が始まりますので待ちましょう!全体では5~6分ぐらいかかります!

$ sbt package
Getting org.scala-sbt sbt 0.13.12  ← しばらく待機していますが待ちましょう
…略…
[info] Packaging /home/ec2-user/spark-test/scala/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 234 s, completed Dec 5, 2016 1:58:44 AM

上記のように出力されたらパッケージング成功しています。では実際に実行してみましょう!


$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
--class "SimpleApp" \
--master local[2] \
target/scala-2.11/simple-project_2.11-1.0.jar

…略…
Lines with a: 61, Lines with b: 27

…略…

ちゃんと結果が出力されました!

JavaでSparkアプリケーションを作成してみよう!

今度はJavaバージョンで実装してみます。処理内容は一緒で次のようなSimpleApp.javaファイルを作成します。

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/home/ec2-user/spark-2.0.2-bin-hadoop2.7/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
    sc.stop();
  }
}

Scala版と処理内容は一緒ですが、SparkContextはJavaSparkContexというクラスをインスタンス化しています。
Sparkのクロージャに該当する部分はspark.api.java.function.Functionを匿名クラスで定義して処理内容を実装しています。このあたりは、Scalaと比べると少し冗長ですね。

次にビルドしますが、今回はMavenを利用します。まずはMavenをインストールしてみましょう。次のGistのコマンドそのままでインストールできます。

gist.github.com


次に、ビルドのためのpom.xmlを用意します。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.2</version>
    </dependency>
  </dependencies>
</project>


Scalaの場合と同様に、ディレクトリ構造を整えておく必要があります。以下のように配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

コンパイル+パッケージングします。こちらも初回実行時は必要なパッケージのダウンロード処理が走りますので少し時間がかかります。(4~5分ぐらい)

$ mvn package
…省略…
[INFO] Building jar: /home/ec2-user/spark-test/java/target/simple-project-1.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.310 s
[INFO] Finished at: 2016-12-05T02:33:28+00:00
[INFO] Final Memory: 32M/245M
[INFO] ------------------------------------------------------------------------

それでは実行してみます。

$ ~/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
 --class "SimpleApp" \
 --master local[2] \
  target/simple-project-1.0.jar

…省略…

Lines with a: 61, lines with b: 27

…省略…

Scala版と同じ結果が出力されました!

以上で、Quick Startをすべて実行できました!

まとめ

前回と今回の2回で、Quick Startを読了しました。
SparkのRDDの基礎的な考え方と、キャッシュやSparkアプリケーションの作成方法など超概要ですが理解することができましたね!

次回以降は、以下のような観点で勉強を進めていきたいと思います。

Apache Sparkの勉強-実際に動かしてRDDを操作してみよう

spark

http://spark.apache.org/images/spark-logo-trademark.png


前回は、Apache Sparkのトップページを眺めて超概要を勉強しました。

tmnj.hatenablog.com


Apache Sparkってなに?と聞かれたら、「高度なDAG実行エンジンを備えた大規模データ高速処理基盤だよ!Hadoopより100倍速いよ!いろんな言語を使えるし、便利な拡張機能もあるよ、SQLで構造化データを扱えるし、リアルタイム処理もできるし、機械学習とか使えるし、グラフデータも処理できるよ。それも高速にね!!」と教えてあげましょう^^b
DAGってなに?と聞かれたら、瞬時に「Directed acyclic graphの略で日本語だと有向非巡回グラフというよ」とすらすら言えるとかっこいいですね!!
練習しておきましょう!(答えになってない)

ということで、今回は実際にSparkをインストールし、以下のQuick Startを読み進めながら実際に動かしてみたいと思います。

Quick Start - Spark 2.0.2 Documentation

環境の準備

今回は、Amazon EC2のスポットインスタンスを作成して利用します。
以下の記事で解説しております。

tmnj.hatenablog.com


ダウンロードとインストール

次のサイトから最新版のApache Sparkをダウンロードします。

Downloads | Apache Spark


次のように1~3を選択すると、4.Download Sparkにダウンロード用のリンクができますので、このリンクURLをコピーします。

f:id:tmnj:20161202151824p:plain


AWS EC2インスタンスSSHでログインして、以下のようにバイナリを取得します。

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz


適当なディレクトリで展開します。今回は、ec2-userホームで展開します。

$ pwd
/home/ec2-user

$ tar xzvf spark-2.0.2-bin-hadoop2.7.tgz


その他、Apache SparkではJava 7+が必要となりますが、Amazon AMIに含まれていますのでインストールは不要です。

Exampleを動かしてみる

以下のようにsparkを解凍したディレクトリ上で、run-exampleというコマンドを実行してみます。
これは円周率を求めるサンプルになります。

$ pwd
/home/ec2-user/spark-2.0.2-bin-hadoop2.7
$ ./bin/run-example SparkPi 10

細かい出力は割愛しますが、以下のように円周率が計算できていることが解ります。

Pi is roughly 3.145155145155145


また、インタラクティブにプログラムを動作させることもできます。
Sparkの準備が整いましたので、Quick Startを実施してみましょう

Quick Start

以下のURLでQuick Startにアクセスできます。これをベースに進めます。

Quick Start - Spark 2.0.2 Documentation

Quick Startでは対話シェルでScalaPythonを選択できます。
自分はScala初心者ですが、今回はScalaで試してみたいと思います。

Spark Shellで対話的に分析をしてみよう!

以下のコマンドで対話的なシェルを起動します。

$ pwd
/home/ec2-user/spark-2.0.2-bin-hadoop2.7
$ ./bin/spark-shell

…省略…

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.7.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

このプログラムは、":quite"で終了できます。(:helpでヘルプも見れます。)


Sparkの重要なコンセプトは、RDD(Resilient Distributed Dataset)と呼ばれる分散コレクションにあります。
分散というように、Sparkを複数マシンでクラスタ構成を取っている場合は、この1つの論理的なコレクションがRDDで各サーバに分散されて処理が実行されます。RDDは、HDFSなどからファイルを読み込んで作成したり出来ます。RDDにtransformという処理を加えると別のRDDが作成されます。Sparkの処理は、このRDDに対してtransform処理を数珠つなぎで実行していき、最終的に必要な結果を得る(actionと言います)というのが特徴です。MapReduceをいくつもつなげて処理するということに似ています。イメージ的には以下のようになるかと思います。

RDD1 → transform → RDD2→ transform → RDD3 → action → ほしい結果①
                      │
                      │
                      └→ transform → RDD4 → action → ほしい結果②

RDDはイミュータブルなので処理が循環することはありません。すなわち上記のイメージは有向非巡回モデルということになります。DAGエンジンは、このような複数の処理に対して、効率的なRDDの実行パスを計算するといったところで使われるのかなと思います。
RDDの詳細やDAGとの関連などは、また別途勉強していきたいと思います。

初めてのRDD

では、実際にファイルを読み込んでRDDを作ってみましょう。ここではSparkに付いているREADME.mdを読み込んでみます。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at :24

sc.textFile("README.md")により、textFileという変数にorg.apache.spark.rdd.RDDオブジェクトが挿入されたことが解ります。scとは、SparkContextのオブジェクトで、Sparkで処理を行う際に必須となるオブジェクトです。spark-shellを起動すると、自動的にSparkContextが作成されてscという変数で利用できます。プログラム上でSpark処理を記述する場合は、SparkContextを明示的に作成する必要がありますが、現時点ではSparkShellでは暗黙的にscを利用可能とだけ認識しておいてください。
RDDactionsと、transformationsという2つのタイプの操作が定義されています。actionsは、RDDが持つDatasetをもとに何らかの処理を実行した結果を返します。transformationsはRDDの持つデータセットを別のものに変換して新たなRDDを作り出します。
transformationの例としてはmapメソッドがあります。これはRDD内のデータセットをmap形式に変換して、別のRDDを作り出します。
actionの例としては、reduceメソッドがあります。これはRDD内のデータセットに対して集計操作を実行します。

実際にRDDのaction操作をしてみましょう。

scala> textFile.count()
res0: Long = 99

scala> textFile.first()
res1: String = # Apache Spark


これはどちらもactionの例です。countはtextFileのデータセット数(=ファイルの行数)を返し、firstはデータセットの先頭(=ファイルの先頭行)を返します。

次にtransformation操作をしてみます。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at :26

scala> linesWithSpark.count()
res1: Long = 19


この例では、filterメソッドを使用して、"Spark"という文字列を含む行のみを含んだ新たなRDDを作成してlinesWithSpark変数に格納しています。linesWithSparkにcount()を実行すると19行と結果が出力されます。
なおSparkの特徴としてRDDのトランスフォーメーション操作は遅延処理となります。上記の例では、実際にfilter処理が実行されてRDD内に新たなデータセットが作られるのはRDDにaction操作を実行したときになります。この例で説明すると、1行目のfilter処理ではクラスが作成されるのみです。2行目のcount()アクションが実行されたときに初めてtransformationが実行されlinesWithSparkにデータが格納され、count処理が実行されます。

transformationとactionは連続して記述することもできます。

scala> textFile.filter(line => line.contains("Spark")).count()
res2: Long = 19

もっとRDDをいじってみよう!

もう少し複雑な操作を実行してみましょう。

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res3: Int = 22

これは、READMEの中で最も単語数の多い行の単語数を抽出します。
mapメソッドやreduceメソッド内のこのような記述方法はscalaに慣れていないと理解が難しいかもしれませんが、これはクロージャと呼ばれる関数オブジェクトです。Java8のラムダ式と同じ感じですね。Javaの無名クラスの関数版みたいなものだという理解です。とりあえず、習うより慣れろということで、ここはそういうものだという素直な気持ちで進めましょう。

最初のmapは単純に行を空白で区切ってそのsizeを抽出しています。これにより、元のデータセットには行ごとの文字列が入っていましたが、mapで作成されたデータセットには行ごとの単語数が入ります。
その後、reduceメソッドにより、新しいRDD内のコレクションが順番に評価されて値が大きいものが抽出されていきます。最終的には、一番値の大きいものが返されるという処理となります。なお、クラスタ構成ではこれがうまく分散処理されますので高速に処理できますが、コードの記述内容はクラスタ環境でも変わるわけではありません。

ScalaJava上で動作していますので、Javaのクラスを取りこんで利用することも出ます。
下記の例は、java.lang.Mathを利用して値の大きい方を取得しています。

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res6: Int = 22

よくMapReduceの処理例として、ファイルに含まれるワードをカウントするという処理が取り上げられますが、Sparkでは以下の1行で記述することができます。非常にシンプルですね(と言ってみる)。

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:27

この結果はcollect()アクションを利用することで収集することができます。収集という言い方をしているのは、今はローカルのみで動かしているのでピンと来ないかもしれませんが、クラスタ環境では、wordCountsの実際のデータは各サーバ上に散らばっているわけです。collect()を実行すると、手元に収集されます。

scala> wordCounts.collect()
res7: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), ...)

flatMapとは何かというと、文字通り平らにしたMapということになりますが、解り辛いですね。
mapとflatMapを比較すると、
textFile.map(line => line.split(" "))の処理結果は、lineという文字列がWordのArrayに変換されます。

次のようなファイルをmapで変換した場合、

行1: My name is tomo.
行2: What's your name?

map(line => line.split(" "))の処理結果は以下のようになります。

Array(Array(My, name, is tomo.), Array(What's, your, name?))


flatMapは、Arrayの中のArrayをさらに平らにしてくれます。
つまり、flatMap(line => line.split(" "))の処理結果は以下のようになります。

Array(My, name, is, tomo., What's, your, name?)


すこし解り辛いですが、お分かりいただけましたでしょうか?

次に、.map(word => (word, 1))の部分ですが、これはwordをKeyとし文字が1個としたKey/Value型のMapに変換しています。
最後の、.reduceByKey((a, b) => a + b)は、Key毎(=この場合はKeyはワード)に値を足しており、最終的にワードをKeyにしたValueがワード数として算出されるということになります。


本日は一旦ここまでとしたいと思います。

まとめ

Sparkのデータ操作の肝はRDDです。RDDに対する操作はactionとtransformがあります。またSparkは遅延処理で実行されますので、actionが実行されるまでは、transform処理は走りません。
Scalaクロージャは習うより慣れろ!

初めてのGitHub: Hello World

git

GitHubにアカウントを作成して、トップページにある"Read the guide"からたどれるHello Worldチュートリアルを実施してみましょう!

そもそもGit(ギット)とは?

いまさらですが、そもそもGitとはソフトウェア開発やその他のバージョン管理タスクを実施するためのバージョン管理システムです。Linux開発者の Linus Torvaldsが2005年にLinuxカーネル開発のために作成したものです。

TEDのインタビューで、Git誕生の理由が語られています。数千人レベルの開発者がLinuxカーネルの開発に携わり、それぞれがちょっとずつコードを変更しており、そのような状況で2~3カ月に一度にリリースを実施していたため、コード管理だけで一つのプロジェクトになってしまっていたようです。そうした状況を変えるべくGitを開発したそうです。GitはLinus氏の2番目に大きなプロジェクトですが、もともとは1番目の大きなプロジェクトであるLinuxを維持管理する目的で作成したということです。なおCVSは大嫌いだそう!

www.ted.com

GitHubとは?

GitHubは、WebベースのGitリポジトリを提供しているサービスです。インターネット上でコードのバージョン管理や開発者同士でコラボレーションできるコード管理プラットフォームを提供しています。有料のPrivateリポジトリか無料のパブリックなリポジトリの2つのプランを提供しています。後者の方は、OSSプロジェクトで良く利用されていたり、個人で作成したコードを公開する手段として良く利用されています。2016年4月時点で、1,400万人のユーザーと3,500万以上のリポジトリを有している*1そうです。
自分は恥ずかしながら、コードを公開するといった習慣が無かったのでこれを機に検証などで利用したコードなんかを公開していきたいなと思っております。

Hello Worldチュートリアル

GitHubのトップページからたどれるHelloWorldチュートリアルでは、Gitの基本要素であるリポジトリ、ブランチ、commit/pullリクエストなどを学ぶことができます。HelloWorldリポジトリを作成してPullリクエストを実行したり、基本的な変更箇所のレビューやブランチへのマージ方法を実践できます。

Hello World · GitHub Guides

Step1: リポジトリを作成する

リポジトリは、通常単一のプロジェクトごとに作成します。リポジトリにはプロジェクトに必要なファイルやディレクトリを含めます。プロジェクトの説明を記述したREADMEを含めることが推奨されています。

リポジトリを作成してみよう!

1. 右上の+ボタンからNew Repositoryを選択します。

f:id:tmnj:20161201133150p:plain


2. Repository Nameにhello-worldと入力、その他は以下のように入力して"Create repository"ボタンを押す

f:id:tmnj:20161201133458p:plain


これで、Repositoryが完成!簡単!

Step2: ブランチを作成してみよう!

ブランチとは、同じリポジトリ上で同時に複数の異なるバージョンを管理するための仕組みです。デフォルトでは"master"という名前のブランチが一つあり、最終的にはこのmasterブランチに全ての変更がマージされます。
"master"からブランチを作成するとその時点での"master"のスナップショットが作成されます。ブランチで作業中に他の誰かがmasterに変更をマージした場合は、その変更を自分が作業中のブランチに取り込む(pull)こともできます。

以下は、masterブランチから新しいブランチを作成して変更→Pullリクエスト→マージまでのフローイメージです。


f:id:tmnj:20161201134306p:plain


開発者は、バグフィックスをしたり新機能を追加する際にmasterからブランチを作成します。開発が完了したら、そのブランチの変更をmasterブランチににマージします。

それでは実際にブランチを作成してみましょう!


1. 先ほど作成したhello-worldリポジトリに移動します。

2. 真ん中左端の"Branch:master"ドロップダウンをクリックして、"readme-edits"と入力して、Create branchをクリックします。


f:id:tmnj:20161201134805p:plain

これで、masterとreadme-editsの2つのブランチがリポジトリに存在するようになりました。

Step3: 変更してコミットしてみよう!

README.mdファイルをクリックして、f:id:tmnj:20161201135101p:plainをクリックして編集モードにします。
内容は何でも良いですがせっかくPublicな環境ですので、世界の人に向けたメッセージをREADMEに記述してみましょう!

f:id:tmnj:20161201135927p:plain



修正したら、画面下の方の"Commit Change"ボタンをクリックします。

f:id:tmnj:20161201140007p:plain



この変更はreadme-editsのみに反映されており、このブランチはmasterブランチとは異なる内容となりました。

Step4: Pullリクエストをオープンする!

readme-editsブランチは、masterブランチと異なる内容なりましたので、Pullリクエストをオープンすることができます。Pullリクエストは、Githubでのチーム開発を行う際の重要な機能になります。
Pullリクエストをオープンすると、変更内容が他のメンバーに提示され、その内容を他のメンバーが作業中のブランチにマージするようリクエストします。Pullリクエストは両方のブランチからの内容の差異を示し、変更した内容、追加した内容、削除した内容が緑と赤で表示され差分を確認することができます。リクエストを受けたメンバーは、その内容を吟味し必要に応じてリクエスト送信者とコミュニケーションし、最終的にブランチにマージするか判断します。
今回は自分専用のリポジトリでPullリクエストをオープンしてmasterブランチにマージしてみることで、GitHubの重要なフローを理解しましょう。

READMEファイルの変更に対してPullリクエストをオープンしよう!


Pullリクエストタブをクリックします。

f:id:tmnj:20161201141631p:plain


緑色のNew pull requestボタンをクリックします。

f:id:tmnj:20161201141710p:plain



作成したブランチ(readme-edits)を選択して、オリジナルのmasterと比較します。

f:id:tmnj:20161201141844p:plain


変更部分を確認できます。

f:id:tmnj:20161201141928p:plain


確認してOKであれば、Create pull requestを押します。

f:id:tmnj:20161201142003p:plain


変更タイトルと変更内容を適当に記述して、Create pull requestを押します。

f:id:tmnj:20161201142152p:plain


Pullリクエストがオープンしました!

Step5: Pullリクエストをマージする

このステップでは、READMEファイルを変更したreadme-editsブランチとmasterブランチをマージします。


Merge pull requestボタンを押します。

f:id:tmnj:20161201142557p:plain


次に、Confirm mergeボタンを押します。

以下のように表示されて、マージが完了しました。変更は既にmasterに組み込まれたので、"Delete branch"ボタンでreadme-editsブランチを削除しましょう。

f:id:tmnj:20161201142723p:plain


masterブランチにREADMEの変更がマージされました!


f:id:tmnj:20161201142922p:plain


以上でHelloworldチュートリアルは終わりです。

まとめ

このチュートリアルではmasterブランチから新しいブランチを作成して、そのブランチに対して変更内容をコミットし、Pullリクエストを投げて、最終的に変更をmasterブランチにマージすると言うGitにおける重要な一連のフローを学ぶことができました。
GitはブランチとPullリクエストの仕組みにより、大人数のチーム開発を非常に効率的に実行できるわけです!
15年ほど前に開発現場にいた頃はCVSを使っており、リリース作業のたびにライブラリアンという役割の人が「一旦マージするので、コミットしばらくやめてくださいー」とかやっていました。かなり属人的で、きめ細やかな役割でしたが、今でもライブラリアンなんて役割の人は居るのだでしょうか?
Gitを利用すればリリース用ブランチをmasterから切れば開発プロセスを止めずにリリース作業ができますね。

なお、GitHubのPullリクエストに関しては、他のリポジトリをForkしてリポジトリのコピーを作成し、そこで変更した内容を元のリポジトリ管理者に反映してもらうためにPullリクエストを送る方法もあります。リポジトリを跨いだPullの仕組みです。GitHubの利用方法としては、こちらの方が多いのかもしれません。
Agile開発のように同時並列的に開発を進めていくプロジェクトでGitを利用する場合は、同一リポジトリ内でブランチを複数作成して、Pullリクエストによりお互いの変更をマージしたり、masterブランチにマージする利用方法が多いのかなと思います。
GitHubのドキュメント上では、前者を"Fork & Pull Model"と呼び、後者を"Shared Repository Model"と呼んでいます。


今回はWeb上ですべての操作を実施しましたが、通常コードは開発端末上にあり、ローカルで開発をしてその変更をGitHubに反映という流れになります。次回はClientからの操作方法、ローカルリポジトリやクローン、リモートリポジトリへのpushといった操作を実施してみたいと思います。

Apache Sparkの勉強-超概要を理解する

spark

http://spark.apache.org/images/spark-logo-trademark.png


Apache Sparkを基礎から勉強していきます。
基本的にはドキュメントを読み進めながら動作を確認していこうと思います。

まずはトップページを読んでみよう

Apache Sparkのトップページ

Apache Spark™ - Lightning-Fast Cluster Computing

ここを見れば、そもそもApache Sparkが何なのか概要レベルで解るはずです。

Apache Spark™ is a fast and general engine for large-scale data processing.

Apache Sparkは、大規模なデータ処理のための高速かつ汎用的エンジンです。

特徴①:速い!

まず一つめと特徴としては、処理速度にあります。同じような並列分散処理基盤であるHadoop MapReduce上でプログラムを実行するよりもインメモリであれば最大100倍速い。ディスクでも10倍速いという特徴があります。
Apache Sparkは、高度なDAG実行エンジン(Advanced DAG execution engine)を備えます。

DAGとは何でしょうか?ここは用語が解らないので、Google先生に聞いてみます。
DAG=directed acyclic graphとのことです。日本語で言うと、有向非巡回グラフとなります。
詳細はWikipediaの出番です。

有向非巡回グラフ - Wikipedia

有向非巡回グラフ、有向非循環グラフ、有向無閉路グラフ(ゆうこうひじゅんかいグラフ、英: Directed acyclic graph, DAG)とは、グラフ理論における閉路のない有向グラフの事。有向グラフは頂点と有向辺(方向を示す矢印付きの辺)からなり、辺は頂点同士をつなぐが、ある頂点 v から出発し、辺をたどり、頂点 v に戻ってこないのが有向非巡回グラフである。

うーん。言葉だけだと解り辛いけど、どの頂点から出発しても同じ頂点には戻ってこないとということなのでしょう。これだと抽象的すぎて解り辛いですね。DAGの具体的な例を探してみましたが、英文のWikipediaの説明にSpread SheetもDAGでモデル化できるとのこと。一つのセルを頂点とみなし、あるセルの値とあるセルの値をかけたりといった計算を書けますが、どこかの頂点=セルの中身が変化すれば後続のすべてのセルの計算結果が変化するといった具合です。
なんとなくわかりましたが、このDAGというデータモデルがどのようにSpark内で利用されているのか、今後の勉強課題としておきます。

特徴②:使いやすい!

Java/Scala/Python/Rで記述できます。また、簡単に並列に処理できる80以上のハイレベルな操作が提供されています。また、対話式にこの操作をJava/Scala/Python/Rから使用できます。
具体的な操作は今後の学習ポイントです。特に並列処理は自分で実装すると大変なので、簡単なAPIを利用するだけで並列分散して高速に処理してくれるという意味で使いやすいというのが一番重要な点だと思います。

特徴③:汎用性

とりあえず、いろいろ汎用的に利用できるための機能が付いているようです。個々の機能に関しては今後調べていきたいと思います。

f:id:tmnj:20161130222558p:plain

簡単に言うと、以下のような感じだと思いますが、重要な点はすべてSparkという高速な並列分散基盤上で実行できるという点にあると思います。

特徴④:どこでも実行

SparkはHadoop、Mesos、スタンドアロン、またはクラウド上で動作します。 HDFS、Cassandra、HBase、S3などのさまざまなデータソースにアクセスできます。

f:id:tmnj:20161130223418p:plain


Topページを一通り眺めて、何となくイメージが付きました。
今後深掘りしていきたいと思います。

AWS EBSをEC2インスタンスにアタッチして、Linuxから利用できるようにする

aws ec2

EBSボリュームを作成して既存のEC2インスタンスにアタッチして、Linuxから利用する方法をまとめました。

EC2インスタンスの作成

適当にスポットインスタンスを作成します。
以下の記事を参照してください。
tmnj.hatenablog.com

EBSボリュームの作成

EC2ダッシュボードで"ELASTIC BLOCK STORAGE" -> "ボリューム"を選択します。

f:id:tmnj:20161129214807p:plain


ボリュームの作成ボタンをクリックします。

f:id:tmnj:20161129214847p:plain


今回は、汎用SSD(GP2)で8GBほど作成します。同一のアベイラビリティゾーンのインスタンスにしかアタッチできませんので、アベイラビリティゾーンはアタッチしたいインスタンスと同じものを指定してください。

f:id:tmnj:20161129214925p:plain


なお、EBSは別途料金がかかりますので注意してください。ボリュームタイプの説明と料金は以下に定義されています。

料金 - Amazon Elastic Block Store(EBS) | AWS


以下のようにボリュームが作成されます。Nameは後から追記できます。"available"となっているのが新しく追加したボリュームです。

f:id:tmnj:20161129215218p:plain

EBSボリュームをEC2インスタンスにアタッチする

まず、対象のEC2インスタンスにログインしてlsblkコマンドを利用して現在のデバイスの状態を出力してみます。

[ec2-user@ip-xx-xx-xx-xx ~]$ lsblk
NAME    MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda    202:0    0   8G  0 disk
└─xvda1 202:1    0   8G  0 part /

作成したボリュームを右クリックして、ボリュームのアタッチを選択します。

f:id:tmnj:20161129215322p:plain



既存のインスタンスを選択して"アタッチ"ボタンをクリックします。

f:id:tmnj:20161129215417p:plain


再度、lsblkコマンドを利用してデバイスの状態を出力してみます。xvdfが追加されていることが解ります。

[ec2-user@ip-xx-xx-xx-xx2 ~]$ lsblk
NAME    MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda    202:0    0   8G  0 disk
└─xvda1 202:1    0   8G  0 part /
xvdf    202:80   0   8G  0 disk

LinuxでEBSボリュームを使用できるようにする

以下のURLの手順で実行します。

Amazon EBS ボリュームを使用できるようにする - Amazon Elastic Compute Cloud


今回対象となるデバイスは上記の出力から/dev/xvdfとなります。以下のコマンドを実行します。

[ec2-user ~]$ sudo file -s /dev/xvdf
/dev/xvdf: data

dataと出ると、ファイルシステムが存在していない状態です。次にファイルシステムをこのデバイスに作成します。
以下のコマンドを実行して出力を確認します。

[ec2-user@ip-xx-xx-xx-xx ~]$ sudo mkfs -t ext4 /dev/xvdf
mke2fs 1.42.12 (29-Aug-2014)
Creating filesystem with 2097152 4k blocks and 524288 inodes
Filesystem UUID: 805da6f8-b81c-40ef-969c-910763183294
Superblock backups stored on blocks:
        32768, 98304, 163840, 229376, 294912, 819200, 884736, 1605632

Allocating group tables: done
Writing inode tables: done
Creating journal (32768 blocks): done
Writing superblocks and filesystem accounting information: done

再度、/dev/xvdfを確認するとファイルシステムが作成されていることが解ります。

[ec2-user@ip-xx-xx-xx-xx ~]$ sudo file -s /dev/xvdf
/dev/xvdf: Linux rev 1.0 ext4 filesystem data, UUID=805da6f8-b81c-40ef-969c-9107
63183294 (extents) (large files) (huge files)

ファイルシステムができたので、マウントポイントにマウントします。今回は/dataにマウントしてみます。
まず、ディレクトリを作成します。

$ sudo mkdir /data


マウントポイントにマウントします。

$ sudo mount /dev/xvdf /data


これで、EBSボリュームをEC2インスタンスにアタッチしてLinux上から利用できるようになりました。

システムブート時にEBSボリュームをマウントする

/etc/fstabファイルに以下のエントリを追記します。

/dev/xvdf  /data  ext4  defaults,nofail 0 2

再起動してマウントされているか確認

/dataに適当なファイルを作成しておきます。
その後、rebootした後に/dataにマウントされて先ほど作成したファイルが存在していることを確認します。
(スポットインスタンスだとshutdownをするとインスタンスが削除されますので注意してください。)

スポットインスタンス再作成後のマウント

スポットインスタンスを再作成した後に、再度EBSボリュームをマウントし同じファイルが存在していることを確認してみましょう!

EC2のストレージの勉強

ec2 aws storage

今回はEC2ストレージの勉強をしたいと思います。

教材は以下を利用。
ストレージ - Amazon Elastic Compute Cloud

ストレージのオプション

次の3つのオプションを利用可能。

図でまとまっています。

f:id:tmnj:20161126145809p:plain

Amazon EBSとは

ブロックレベルのストレージで、同一のアベイラビリティゾーンであればどのインスタンスにもアタッチできます。同時に複数のインスタンスからはアタッチできませんが、デタッチしたEBSストレージを別のインスタンスにアタッチすることができます。頻繁な更新データを保持するのに最適です。

EBSの可用性は?

以下のURLの情報によると、データは同一アベイラビリティゾーン内に自動でレプリケーションされ99.999% の可用性を維持する設計となっているようです。
製品の詳細 - Amazon Elastic Block Store(EBS) | AWS

EBSはスナップショットをS3に保持する機能があります。またS3に保存したスナップショットからEBSボリュームを作成して別のインスタンスにアタッチすることもできるようです。データをもっとかっちりと守りたい場合は、定期的にスナップショットを取得してS3に保存しておいた方が良いと思います。ちなみにS3の耐久性と可用性の情報は以下のURLに記載があります。

製品の詳細 - Amazon S3 | AWS

1 年でオブジェクトの 99.999999999% の耐久性と最大 99.99% の可用性を提供するよう設計されています。


S3だとデータの耐久性は非常に高くデータがロストすることはほぼないけど、可用性はEBSより低いので、データにアクセスできない時間が長いということでしょうね。

インスタンスストアとは

これは単純にホストコンピュータ上の物理的なディスクのことです。これをインスタンスストアと呼んでいます。重要なのはインスタンス用のブロックレベルの"一時ストレージ"を提供するということです。つまり、インスタンスを停止または終了すると消えてしまいます。
じゃあ、リブートはどうなんだろうと思ったら、リブートではインスタンスストアボリューム上のデータはすべて保持されるようです。
インスタンスの再起動 - Amazon Elastic Compute Cloud

インスタンスを再起動すると、インスタンスは同じホスト上で保持されるため、インスタンスパブリックドメイン名、プライベート IP アドレス、およびインスタンスストアボリューム上のすべてのデータは保持されます。

Amazon S3とは

オブジェクトストレージです。RESTなどのWebインタフェースを通じてアクセスします。ブロックストレージではないので直接OSからアタッチして利用することはできませんが、EBSに比べて低コスト・高スケーラビリティ・高堅牢性などの特徴があります。

EC2ルートデバイスボリューム

EC2インスタンスは、ルートデバイスボリュームに格納されているイメージを利用してインスタンスがブートされます。ルートデバイスボリュームとして利用できるのが、インスタンスストアかEBSとなります。
この2つの違いにより、AMIのタイプとして2つあります。

起動が高速で永続的ストレージを利用しているため、推奨されるのはAmazon EBS Backedなインスタンスです。以下のURLに違いがまとまっています。

AMI タイプ - Amazon Elastic Compute Cloud

ルートデバイスボリュームの詳細は、以下のURLを参照しましょう。

Amazon EC2 ルートデバイスボリューム - Amazon Elastic Compute Cloud

永続的ルートデバイスボリュームへの変更

EBS Backedでもデフォルトではインスタンス終了時に削除されてしまうようです。これを永続的ルートデバイスボリュームに変更できます。

Amazon EC2 ルートデバイスボリューム - Amazon Elastic Compute Cloud

デフォルトでは、Amazon EBS-backed AMI のルートデバイスボリュームは、インスタンスを終了すると削除されます。デフォルトの動作を変更するには、ブロックデバイマッピングを使用して、DeleteOnTermination 属性を false に設定します。

スポットインスタンス作成ウィザードでは、EBSボリュームの設定の"削除"チェックボックスをオフにすると永続的ルートボリュームに変更できるようです。

f:id:tmnj:20161126155617p:plain