Spark公式、あるいはEMRではScala2.10系でビルドされたSparkが利用されていますが、 プロダクトではScala2.11を使っているため、SparkバッチでもScala2.11を使いたいと思い試してみました。
自己紹介
こんにちは。 主にミドルウェア、パフォーマンスの調査をしていますShibataと申します。 ScalaやReact.js、Akkaが好きです。
ゴール
EMR上で以下の構成で作成されたSparkバッチが動くようにします。
Library | Version |
---|---|
Scala | 2.11.X |
Spark | 1.6.1 |
Java | 8 |
EMR | 4.4.0(Hadoop2.7.2) |
ちなみに、プロダクトで使うにあたって、Hadoopクラスタを直接管理するのは嫌なので、なるべくフルマネージドなEMRを使っておきたいというのがあります。(あと、パフォーマンスのテストがしやすい)
(ちなみに、Sparkは1.3あたりからScala2.11版のexperimentalが取れてます。バイナリの配布はしてくれてませんが、、、)
モチベーション
弊社は広告系の開発をメインで行っていることもあり、扱うデータ量は非常に大きなものとなります。
そこで分散処理フレームワークであるSparkの利用を試みていますが、公式が配布している、あるいはEMR上で提供されているSparkはScala2.10でビルドされたものになってしまいます。(sparkのassembly.jarにはScala-libraryも組み込まれています)
しかし、既にその他のコードはScala2.11系で書かれており、Sparkバッチでも当然そのコードを流用したいわけです。
ではどうするか?Scala2.11系でビルドしたSparkを動かせるようにしましょう。
EMRのBootStrap Actionを確認する
まずはEMRのBootstrap Actionで対応できないか探してみます。 色々調べてみると、以下のissueにぶつかりました。
(どうでもいいのですが、この人のアイコンは虫の大群か何かに襲われてるところなのでしょうか?)
ログを追うと、「まだScala2.11版はexperimentalだから!」という理由でcloseされたようですが、「正式対応されているんだから早く用意して欲しい」という声がそれなりに多く上がっているようですね。
つまるところ、EMRはまだScala2.11版Sparkに対応できていないですね!
(ついでに言うとJava8対応もまだ。。。)
仕方がないので、自前で用意してみることにしました。
構築手順
ちなみに、以下ではSparkをyarn上で動かす前提で進めます。
Sparkはsubmit時に --master
オプションで起動できるモードを変えられますが、ここではそれぞれ、
- localモード
- yarn-clientモード
- yarn-clusterモード
と呼ぶことにします。
Scala2.11版Sparkのビルド
Sparkは公式にはScala2.10でビルドされたバイナリしか配布してくれていないので、 2.11版を使いたい場合は自分でビルドする必要があります。 (結構時間かかります。なんで公式で2.11版を出してくれないのか。。。)
このへんを参考にしてパッケージ一式を用意します。
- http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
- http://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution
(yarn-clusterモードで動かしたいならassembly.jarだけあればいいのですが、localモードでも動作確認したいのでパッケージごと用意します。)
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.1.tgz tar -xzvf spark-1.6.1.tgz cd spark-1.6.1 ./dev/change-scala-version.sh 2.11 ./make-distribution.sh --name custom-sparkh --tgz -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests
- メモリとCPUをかなり使います。(MacBook Proで30分強くらいずっとコンパイルしてた)
- ソースコードの展開場所によっては、pathが長すぎる旨のエラーが出るかも。
- Spark1.6.1だとHadoop2.6までしか指定できなかったですが、2.7系でも今のところ問題なく動いています。(2.10版でも「Hadoop2.6以上」と書かれているし。)
これでScala2.11、Hadoop2.6バージョンのSparkが用意出来たはずです。パッケージ化されたものの中から、lib/spark-assembly.jar
的なものを取り出してS3にUploadしましょう。
また、固められたパッケージ自体もuploadしておきましょう。
EMRの準備
デフォルトではJava7が入ってしまってるので、Hadoopクラスタが立ち上がる前にJava8に上げる必要があります。
こちらの方がEMRをJava8化するBootstrapActionを書いているので拝借しました。
Java 8 on Amazon EMR (AMI 4.0.0) « Crazy Doc's Blog
emr_bootstrap_java_8.shをS3にuploadしておく
EMR構築手順としては以下のとおりです。
- アプリケーションには、Spark1.6とHadoop2.7.2を入れる(emr-4.4.0)
- ここでインストールされるSparkはScala2.10バージョンのものだが、S3などとの連携に都合がいいので置いておく。
- BootstrapでJava8対応のshellを実行する。(s3://
/emr_bootstrap_java_8.sh) - おそらく踏み台を経由すると思うので、踏み台サーバと同じsubnetにする。
- Web画面を見たいのでpublicなsubnetにおく。(sshトンネル経由)
- sshするので、pemを用意しておく。
- logFolderはご自由に(EMRクラスタの各種ログが吐かれる)
- IAMは、とりあえずS3やDynamoにアクセスできれば。(デフォルトでOK)
- SGは、sshができればよい(デフォルトでOK)
EMR起動後、MasterNodeにsshしてみてjava -version
でJava8が使えることを確認。
動作確認
動作確認サンプルの準備
sparkにはデフォルトで円周率計算をするexampleがありますが、EMRを使う場合は普通は入出力にS3を使うので、 それが使えることを確かめるためのサンプルを用意します。 (S3に繋がるまでは実は色々罠に引っかかりました。知ってる人にとってはなんともないのでしょうが、、、)
(ついでにJava8, Scala2.11でちゃんと動くことも確認しておくと良いですね。Java8ならStreamAPI、Scala2.11ならArrowAssocを使って ClassNotFoundException
を出すのが楽でしょうか。)
とりあえず手っ取り早いものをここに準備しました。
hadoopSample/spark/scala_2.11の階層で activator batch_hdfs/assembly
を実行するとspark用のjarが生成されます。
jarが出来たら、同じくS3にuploadしてください。
書いていることはとても簡単で、ファイルの入力をそのまま出力をしてるだけです。
val fromFile = args(0) //s3n, s3, hdfs, file val toFile = args(1) //s3n, s3, hdfs, file val conf = new SparkConf().setAppName("local") val sc = new SparkContext(conf) val logData = sc.textFile(fromFile) logData.saveAsTextFile(toFile)
動作確認サンプルの実行
MasterNodeでの準備
EMRのMasterNodeにsshログインします。
まず、なぜかJAVA_HOMEがおかしなことになってるので直します。
export JAVA_HOME=/usr/java/default
spark-submitをlocalモードで試す場合は、2.11ビルド版のsparkパッケージを落としてくる必要があるので、 S3からパッケージを落としてくる必要があります。
aws s3 cp s3://<bucket>/spark-1.6.1-bin-scala-2.11.tar.gz ~/ tar -zxvf spark-1.6.1-bin-scala-2.11.tar.gz
それでは実行してみましょう。
localモードでの実行
※この時、S3へのアクセスはできませんでした。。ローカルのファイルを参照してください。
(s3プロトコルなんて知らないと言われました。たぶん一部Hadoopコードを組み込んでビルドすれば動くとは思う。)
./spark-1.6.1-bin-scala-2.11/bin/spark-submit --class com.github.uryyyyyyy.hadoop.spark.batch.fileIO.Hello --master local --conf spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter ./spark2.11_batch_hdfs-assembly-1.0.jar file:///home/hadoop/dummy_in.txt file:///home/hadoop/dummy_out.txt
yarn-clusterモードでの実行
spark-submit --class com.github.uryyyyyyy.hadoop.spark.batch.fileIO.Hello --master yarn-cluster --conf spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter --conf spark.yarn.jar="s3://<bucket>/spark-assembly-1.6.1-hadoop2.7.2.jar" s3://<bucket>/spark2.11_batch_hdfs-assembly-1.0.jar s3://<bucket>/in.txt s3://<bucket>/out.txt
色々余計なものが付いていますが、後述しますのでしばしお待ちを。
yarn-clientモードでの実行
※後述しますが、submitを実行しているDriver Nodeのspark-assemblyとExecutor Nodeのspark-assemblyが異なってしまい、上手く行きませんでした。
動作原理の説明
assembly.jarの扱い
yarn上で動作させたい場合は、入出力(sc.textFile
と saveAsTextFile
)の対象ファイルにlocal fileを指定するわけにはいかないので、対象パスにS3を指定することになります。
しかし、先ほど述べたようにscala2.11ビルド版のsparkパッケージでsubmitした場合は、S3へのアクセスが上手くいきません。
そこで、yarn-clusterモード、yarn-clientモードでは、EMR組み込みのscala2.10ビルド版のspark-submitで実行しています。
しかし、これだとScala2.10がクラスパスに入ってしまうように思います。
それを回避するために spark.yarn.jar="s3://<bucket>/spark-assembly-1.6.1-hadoop2.7.2.jar"
を指定しています。
これを使うことで、YARN上に展開されるDriverとExecutorには、S3上に置いてあるscala2.11のassemblyが配布されて、Scala2.11で実行することが出来るようになります。
ここでややこしいのが、yarn-clientモードで実行した場合はDriver Nodeはsubmitしたマシンになるので、Spark-assemblyがScala2.10版のものを利用していることになります。そのため、yarn-clientモードで実行すると「sparkバイナリが違うよ」と怒られてしまいました。
S3へのアクセス
ちなみに、これでScala2.11での実行はできたのですが、Driver/ExecutorからS3へ出力しようとすると、よくわからんエラーが出るので、以下を参考にパッチを組み込みます。
これで、実行時に
spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter
を指定することで、うまくS3へ出力することが出来ます。
まとめ
以上でScala2.11ビルド版のSparkをEMRで実行することができました。SparkのWebUI(history)もちゃんと使えています。
弊社では、SparkやHadoopを利用して大量データを効率よく捌くことに関心のあるプログラマを募集しています。 もし興味があればぜひHPよりご連絡下さい。