読者です 読者をやめる 読者になる 読者になる

yunomuのブログ

酒とゲームと上から目線

Frappuccinoの使い方

前回、Frappuccinoの件をあまりにももののついでみたいに書いてしまったんですが、使おうとすると例によってドキュメントが無くてソースを読む事になって、なったので、その時の足跡です。
https://github.com/steveklabnik/frappuccino

Source

前準備として、streamとイベント発生装置(Source)を作る。公式のReadmeではButtonとなってるやつ。

class Source
  def send(event)
    emit(event)
  end
end

source = Source.new
stream = Frappuccino::Stream.new(source)

このemitは、ソースを見てみると、まずStream.newで渡されたインスタンスにFrappuccino::Sourceを継承させてる。
frappuccino/stream.rb

module Frappuccino
  class Stream
    include Observable

    def initialize(*sources)
      sources.each do |source|
        source.extend(Frappuccino::Source).add_observer(self)
      end 
    end
(略)

それでemitはSourceに定義されている。中では変更フラグを立てて、イベントをObserverに通知している。
frappuccino/source.rb

require 'observer'

module Frappuccino
  module Source
    def self.extended(object)
      object.extend(Observable)
    end 

    def emit(value)
      changed
      notify_observers(value)
    end 
  end 
end

というわけでSource.sendの中でemitを呼び出すことができるようになっている。Observableは標準添付のobserverに入ってるやつです。そんなの標準であったのかという感じですが。

Streamにはmap, drop, take, select, zip, merge, scan, count, inject, merge, update, on_valueというメソッドがあって、分類すると以下のような感じになります。

  • Streamを返す
    • map
    • drop
    • take
    • select
    • zip
    • merge
    • scan
  • Proptertyを返す
    • inject
    • count
  • その他
    • update
    • on_value

Stream系

Streamを返すやつらは、分類したり変換したり統合したり、まあメソッド名を見ればでわかるような事をやります。Java8のStreamと同じような感じです。
mapは変換、dropは指定した個数のイベントを無視、takeは指定した個数のイベント以降を無視、selectは条件に合うイベントのみを拾うようなStreamを返します。だいたい想像通りに動くと思います。

mapはブロックじゃなくてHashを取ることもできて、

  def map(hash = nil, &blk)
    blk = lambda { |event| hash.fetch(event) { hash[:default] } } if hash
    Map.new(self, &blk)
  end

みたいになってて、

stream.map(:e1 => 1, :e2 => 2, :default => 0)

みたいなHashを渡すと、キーと一致するイベントがきたら特定の値に変換するというのが簡単に書けるんですが、まああんまし使わないと思います。

zipとmergeはStreamの統合で
zipは

stream3 = stream1.zip(stream2)

とやると、stream1とstream2の両方のイベントが1つずつそろった時にstream3が発火するような感じ。ソース見ると本当に単にバッファを2つ作って、両方そろったら発火させているだけで、あーこんなに簡単に作れるのかーという気がする。

mergeは

stream3 = Stream.merge(stream1, stream2)

zipと似てるんですけど、これはstream1とstream2のどちらかが発火したら発火する。

scanは畳み込みみたいな感じで、前回のイベントを参照しながらmapみたいな処理ができる。
例えば、イベントの数を数えるのはこう。

stream2 = stream1.scan(0) {|a| a + 1 }

イベントとして飛んでくる整数の合計値を取る場合はこう。

stream2 = stream1.scan(0) {|a, v| a + v }

ブロックの第一引数が蓄積変数というか前回の値で、第二引数が今回のイベントになってる。集計以外では、値のストリームを前回との差分のストリームに変換したりとか、そんなふうに使うんじゃないでしょうか。

Property系

Property系は、StreamではなくPropertyオブジェクトを返します。

module Frappuccino
  class Property
    def initialize(zero, stream)
      @value = zero
      stream.add_observer(self)
    end 
        
    def now
      @value
    end
    
    def update(value)
      @value = value
    end
  end
end

ご覧の通り、nowとupdateがあって、nowは現在の値を返します。updateはどうでもいいというか使わないと思います。
「現在の」と言ったのは、イベントが発生するとnowの値が変わるからです。公式のReadmeにあるとおりなんですが。

injectは、scanの結果をPropertyにしたやつで、scanの合計値を数えるのと同じ事をやると、

counter = stream1.inject(0) {|a| a + 1 }

これでcounter.nowで現在のイベントの個数が取得できます。以下と全く同じです。

stream2 = stream1.scan(0) {|a| a + 1 }
counter = Frappuccino::Property.new(0, stream2)

countはinjectのラッパーで、個数を数えるのに特化しています。引数を渡すと内部でselectされます。

counter = stream1.count #=> イベントの個数を数える
counter1 = stream1.count(:event1) #=> :event1の個数を数える
counter2 = stream1.count {|e| e[:value] >= 2 } #=> 条件に合うイベントの個数を数える

その他

updateはイベントを発生させます。
Source以外の場所からemit相当の事をするために使うっぽい。というかこれがあれば実はemitを使わなくてよくて、実は前回の例はこう書ける。(Emitterクラスを定義してない)

ws = WebSocket::Client::Simple.connect url
stream = Frappuccino::Stream.new
ws.on :message do |msg|
  stream.update msg
end

公式のReadmeの例に引っ張られすぎだった。

on_valueは、イベントが発生したら引数で渡したブロックを評価する。Streamをイベントに変換するメソッドです。便利!

おわり

実装が思いのほか単純で、読むのが楽なのが素晴らしいですね。まあドキュメントあるにこしたことはないんですけど。
ところでこのライブラリって何に使うんでしょうね。