notebook

都内でWEB系エンジニアやってます。

apache sparkを使ってみる

scala を使ってみたいなということでscalaで書けるsparkを見てみたいと思います

javaが入っていることが前提

今回はopenjdk1.8をインストールしてます

scalaのインストール

$ rpm -ivh http://downloads.typesafe.com/scala/2.11.6/scala-2.11.6.rpm
$ scala -version
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

インストール

  • clone
git clone git://github.com/apache/spark.git
  • build
$ cd spark
$ build/sbt
$ build/sbt assembly

spark-shell

  • scalaのREPLでsparkの処理を実行できる

試しにいくつか実行してみる

$ ./bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.0-SNAPSHOT
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
15/06/22 16:08:22 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/06/22 16:08:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
SQL context available as sqlContext.

こんな感じでログがたくさん出てくる

ちなみに「sc」はSparkContextクラスのインスタンス

sample.txtの中の単語をカウントしてみます

  • sample.txt
[spark]$ cat sample.txt
aaa bbb ccc ddd
eee fff aaa ddd
ccc aaa aaa aaa
bbb ddd aaa ccc
  • spark-shell
scala> val textFile = sc.textFile("/home/vagrant/spark/sample.txt")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
# 行数を取得
scala> textFile.count()
res0: Long = 4
# 単語カウントを取得
scala> val wordCount = textFile.flatMap( line => line.split(" ") ).map(word => ( word , 1 )).reduceByKey((a,b) => a + b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23
# 出力
scala> wordCount.foreach( println )
(bbb,2)
(ddd,3)
(fff,1)
(eee,1)
(ccc,3)
(aaa,6)

Scalaで書くコードとほぼ変わらない感じで書けるとのことで、結構四苦八苦しながら書いてみました

.cacheでキャッシュに乗せることができて繰り返し処理する場合に早くなるよう

下記で1s -> 31msになったのを確認した

# 実ファイル
scala> val file = sc.textFile("/home/vagrant/spark-sample-access/xaa")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at textFile at <console>:21

scala> file.count()
res2: Long = 200000

scala> val filtered = file.filter( line => line.contains("identifier"))
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:23

scala> filtered.cache()
res4: filtered.type = MapPartitionsRDD[7] at filter at <console>:23

scala> filtered.count()
res5: Long = 84087

scala> filtered.count()
res6: Long = 84087

spark-ui

spark-shell起動中は4040番ポートでjobの進捗具合などが見れる

f:id:swfz:20150804024803p:plain

どんな処理をしてるか、どのくらいの実行時間だったかなども見れる模様

上記でcacheする前と後で結果が変わってるのを確認できた

今度はjarファイルを実行してみます