Tokyo.clj#15
Tokyo.clj#15に参加してきた。 Twitter社がtweetをリアルタイムに処理するために使ってるstormがClojureから書けると知って前から気になっていたので、ドキュメントやソースコードを読んだり、サンプルを動かしたりした。 以下はstormのTutorialをざっと要約したもの。
stormとは
Hadoopと大きく異なる点が"リアルタイム"にデータを処理するための分散データ処理システムである。 Hadoopの場合、大きなソース(巨大なログや文書データ等)を投入してそれを各ワーカーに対して分散させて処理させ、ソースすべての処理が完了後、ワーカーのジョブは停止する。
しかし、解析したいソースが単一のものとして存在するのではなく、時系列で常に大量に流れているものである場合、Hadoopのモデルでは対応できない。 たとえば、Twitter社では流れるtweetストリームすべてをリアルタイムに分析しなければならない。(ユーザに一番インタラクティブに反映させなければならない例としてTrendsがあげられる) そこで、リアルタイムにデータを分散処理し続けるためのものがstormである。
stormのモデル
StormのクラスターとHadoopクラスターの構成は似ている。 その一方でHadoopは"MapReduce jobs"を動かし、Stormは"topologies"を動かす。 MapReduce jobは終了するのに対して、Topologyはプロセスをkillするまで動き続ける。 SpoutとBoltでTopologyを形成する。 Spoutは処理したいソースを提供し、BoltはSpoutからそのソースを受け取りソースに対して定義された処理を実行する。 また、Boltが処理したソースをストリームに流すこともできる(そしてほかのBoltに処理させる)。 複雑な処理をしたい場合、複数のBoltを経由させることで実現できる。 これはノード間で必ず通らなければならないパスを指定する。たとえば、topologyにSpout'A'、Bolt'B'、Bolt'C'が存在し、A->B->CというパスとA->Cという定義を書くと、Bolt'B'で処理されたものは必ずBolt'C'を通ることになる。 Boltを連結させて複雑なパスを形成したり、また、簡単なパスも作るなど書けるところが、Hadoopより柔軟で便利だと思う。
storm-starterを動かす
ローカルで動かすこと自体は簡単で次の手順を踏む。
$ git clone https://github.com/nathanmarz/storm-starter.git $ cd storm-starter $ lein deps $ lein compile $ java -cp `lein classpath` storm.stream.clj.word_count
ただし、ローカルで動かすためにはword_count.cljに手をいれなければならない。run-local!をmainから呼び出すように修正する。
word_count.cljではいくつかの短い文章をひたすら流し続けて(sentence-spout)その単語をカウント(split-sentence, word-count)するもの。
Topologyをclojureで作成するには次のように記述する。
(topology {1 (spout-spec sentence-spout) 2 (spout-spec (sentence-spout-parameterized ["the cat jumped over the door" "greetings from a faraway land"]) :p 2)} {3 (bolt-spec {1 :shuffle 2 :shuffle} split-sentence :p 5) 4 (bolt-spec {3 ["word"]} word-count :p 6)})
topology関数はspoutを定義するHashとboltを定義するHashの2変数を受け取る。 各Hashのkeyとしてidを指定している。これはTopology中で一意にノードを決定できるidを示す。 bolt-spec3の{1 :shuffle 2:shuffle}はgroupingを意味し、spout1、spout2のどちらのsspoutからもデータを受け取ることを意味している。 bolt-spec4ではid3を指定しているので、必ずbolt-spec3が処理したデータを受け取る。 :pは各ノードで動作するスレッド数である。
Clojureのパフォーマンス測定(@deltam)
実行スピードとメモリ使用量がすぐにわかったらいい。
(time (tarai 19 16 11))
- clojure.contrib/profile
計測したい関数に手をいれないといけない。
(profile (tarai 19 16 11))
コンパイル時にプロファイルコードを削除してくれる。
- VisualVM
プロファイラで確認できる。
Incanter(@AntiBays)
- Rはあんまりビッグデータに向かない。
- もっと高速に処理できてRに代替するものがほしい。
- Incanterはサンプルのデータセットが用意されている。
- jarに含まれているのでどこでも実行できる。
- グラフもそこそこ綺麗にかける