Tokyo.clj#15

Tokyo.clj#15に参加してきた。
Twitter社がtweetをリアルタイムに処理するために使ってるstormがClojureから書けると知って前から気になっていたので、ドキュメントやソースコードを読んだり、サンプルを動かしたりした。
以下はstormのTutorialをざっと要約したもの。

stormとは

Hadoopと大きく異なる点が"リアルタイム"にデータを処理するための分散データ処理システムである。 Hadoopの場合、大きなソース(巨大なログや文書データ等)を投入してそれを各ワーカーに対して分散させて処理させ、ソースすべての処理が完了後、ワーカーのジョブは停止する。

しかし、解析したいソースが単一のものとして存在するのではなく、時系列で常に大量に流れているものである場合、Hadoopのモデルでは対応できない。
たとえば、Twitter社では流れるtweetストリームすべてをリアルタイムに分析しなければならない。(ユーザに一番インタラクティブに反映させなければならない例としてTrendsがあげられる) そこで、リアルタイムにデータを分散処理し続けるためのものがstormである。

stormのモデル

StormのクラスターとHadoopクラスターの構成は似ている。
その一方でHadoopは"MapReduce jobs"を動かし、Stormは"topologies"を動かす。 MapReduce jobは終了するのに対して、Topologyはプロセスをkillするまで動き続ける。 SpoutとBoltでTopologyを形成する。
Spoutは処理したいソースを提供し、BoltはSpoutからそのソースを受け取りソースに対して定義された処理を実行する。
また、Boltが処理したソースをストリームに流すこともできる(そしてほかのBoltに処理させる)。
複雑な処理をしたい場合、複数のBoltを経由させることで実現できる。 これはノード間で必ず通らなければならないパスを指定する。たとえば、topologyにSpout'A'、Bolt'B'、Bolt'C'が存在し、A->B->CというパスとA->Cという定義を書くと、Bolt'B'で処理されたものは必ずBolt'C'を通ることになる。
Boltを連結させて複雑なパスを形成したり、また、簡単なパスも作るなど書けるところが、Hadoopより柔軟で便利だと思う。

storm-starterを動かす

ローカルで動かすこと自体は簡単で次の手順を踏む。

$ git clone https://github.com/nathanmarz/storm-starter.git
$ cd storm-starter
$ lein deps
$ lein compile
$ java -cp `lein classpath` storm.stream.clj.word_count

ただし、ローカルで動かすためにはword_count.cljに手をいれなければならない。run-local!をmainから呼び出すように修正する。
word_count.cljではいくつかの短い文章をひたすら流し続けて(sentence-spout)その単語をカウント(split-sentence, word-count)するもの。
Topologyをclojureで作成するには次のように記述する。

    (topology
      {1 (spout-spec sentence-spout)
       2 (spout-spec (sentence-spout-parameterized
                      ["the cat jumped over the door"
                       "greetings from a faraway land"])
                      :p 2)}
      {3 (bolt-spec {1 :shuffle 2 :shuffle}
                 split-sentence
                 :p 5)
       4 (bolt-spec {3 ["word"]}
                 word-count
                 :p 6)})

topology関数はspoutを定義するHashとboltを定義するHashの2変数を受け取る。
各Hashのkeyとしてidを指定している。これはTopology中で一意にノードを決定できるidを示す。
bolt-spec3の{1 :shuffle 2:shuffle}はgroupingを意味し、spout1、spout2のどちらのsspoutからもデータを受け取ることを意味している。
bolt-spec4ではid3を指定しているので、必ずbolt-spec3が処理したデータを受け取る。 :pは各ノードで動作するスレッド数である。

Clojureの便利なAPI(@masa_edw)

https://gist.github.com/1395188

  • ..
  • doto
  • ->>
  • ->

についての解説。

Clojureのパフォーマンス測定(@deltam)

実行スピードとメモリ使用量がすぐにわかったらいい。

  • clojure.core/time
  • clojure.contrib/profile
  • VisualVM
  • 測定用関数はtarai使う
  • time

(time (tarai 19 16 11))

計測したい関数に手をいれないといけない。
(profile (tarai 19 16 11))
コンパイル時にプロファイルコードを削除してくれる。

  • VisualVM

プロファイラで確認できる。

Incanter(@AntiBays)

  • Rはあんまりビッグデータに向かない。
  • もっと高速に処理できてRに代替するものがほしい。
  • Incanterはサンプルのデータセットが用意されている。
  • jarに含まれているのでどこでも実行できる。
  • グラフもそこそこ綺麗にかける