notebook

都内でWEB系エンジニアやってます。

Apache Spark(spark-submit)

ファイルからの実行

前回はspark-shellをとりあえず使ってみたので今回はそれをまとめてjarファイルから実行したいと思います

Quick Start

上記参考に一連の流れをまとめてプロジェクトにして実行してみます

レスポンスコードのカウント

package sample

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object Sample {
  def main(args: Array[String]) {
    val logFile = "/var/log/nginx/access.log-20150627"
    val conf = new SparkConf().setAppName("sample")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter( line => line.contains(" 200 ")).count()
    val numBs = logData.filter( line => line.contains(" 304 ")).count()
    val total = logData.count()
    println("total lines: %s".format(logData.count()))

    println("Lines with 200: %s, Lines with 304: %s".format(numAs, numBs))
  }
}
  • sample.sbt
name := "sampleProject"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"

ファイル構成は以下

[spark-sample]$ find .
.
./sample.sbt
./src
./src/main
./src/main/scala
./src/main/scala/sample.scala
[spark-sample]$ sbt package

target以下にjarファイルが生成されるのでspark-submitで実行してみる

[spark-sample]$ SPARK_HOME_DIR/bin/spark-submit  target/scala-2.10/sampleproject_2.10-1.0.jar
.....
.....
total lines: 729
Lines with 200: 42, Lines with 304: 687
.....
.....

ログが沢山流れるがprintした箇所に関しても出力されていることが確認できた

アクセスログの特定パラメータをカウントしてみる

では、もう少し実用よりの事をやってみます

fluentdで取得したアクセスログに関してURLの特定のパラメータの値がどの程度あるのかカウントしてみる

ディレクトリ構成は一度目と同様

ログの書式は以下(公開にあたり文字列を改変している箇所があります)

2015-06-28 00:06:33     app.balancer.ip-10-0-2-1.access_log {"host":"111.11.111.111","user":"-","method":"GET","path":"/z?app=1.9.1","code":200,"size":5,"referer":"-","agent":"piggy/1.9.1 CFNetwork/711.1.16 Darwin/14.0.0","request_time":0.012,"forwarded_for":"111.11.111.111","cookie":"-"}
2015-06-28 00:06:33     app.balancer.ip-10-0-2-1.access_log {"host":"22.22.22.222","user":"-","method":"GET","path":"/i?app=1.1.6&identifier=B4C97925-FB29-4F0E-ADA9-713C4722B9BF&m_id=88&system=8.3","code":200,"size":20770,"referer":"-","agent":"Mozilla/5.0 (iPhone; CPU iPhone OS 8_3 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12F70","request_time":0.034,"forwarded_for":"22.22.22.222","cookie":"-"}
  • sample.sbt
name := "sampleProject"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0"
package sample

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.combinator.RegexParsers
import scala.util.parsing.json.JSON

object Sample {
  val urlPattern = "identifier=(.*?)&".r

  def parseParam(path: String): String = {

    val m = urlPattern.findFirstMatchIn(path)
    if ( m.isDefined ) return m.get.group(1)
    return "-"
  }

  def main(args: Array[String]) {
    val logFile = "/home/vagrant/spark/xaaaa"
    //sparkの設定
    val conf = new SparkConf().setAppName("sample").set("spark.executer.memory", "2g").set("spark.executer.extraClassPath", "/home/vagrant/spark/lib_managed/jars")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    // 日付 時間\t tag名\tjsondata
    // jsonデータの部分だけ抜き出す
    val jsonDatas = logData.map(line => line.split("\t") ).map( row => row.splitAt(2)._2 ).map( line => line.mkString(" ") )

    // jsonへの変換
    // path部分だけ抜き出し
    val dummyMap:Map[String,String] = Map( "notJson" -> "-" )
    val paths = for {
      jsonraw <- jsonDatas
      jsonobj <- JSON.parseFull(jsonraw).getOrElse( dummyMap ).asInstanceOf[Map[String,String]]
      if jsonobj._1 == "path"
    } yield jsonobj

    val countByIdentifier = paths.map( path => parseParam(path._2)).countByValue().toSeq.sortBy(_._2)
    countByIdentifier.foreach( println )
  }
}

結果

.....
.....
.....
.....
(1323A2FB-9624-4E5D-A334-15F06A29C6B3,98)
(D038639F-6F67-42C2-B524-8DB5593CA2E5,109)
(18907EA6-57F1-4985-ADC2-89A654FB46C3,194)
(85231974-08D8-44F5-8F95-BB176B0693D1,279)
(,363)
(-,73841)

一応成功!

elasticsearchに突っ込んでみるところまでやる予定だったけど、コンパイルまでできるが実行時に出る「java.lang.NoClassDefFoundError」が解決できず。。。

次はエラー含めウィンドウ集計、elasticsearchとの連携、apache zeppelin、クラスタ構成、あたりまでできたらと思います