Conduit 0.5が出て、わかりやすいチュートリアルが書かれていたので、それで一日中遊んでいたりしました。
ConduitとHaskellでネットワークプロキシサーバを作る - 純粋関数空間
Conduit楽しいですね。
これを上から下までやってなんとなく感覚を掴んだところで、Attoparsecと組み合わせて使う場合を作って遊んでみました。
Sink
まずSinkの方から作ってみます。
Parserはとりあえず適当にこんな感じで。
import Data.Attoparsec.ByteString import qualified Data.Attoparsec.ByteString.Char8 as AC field :: Parser ByteString field = AC.takeWhile (/= ',') <* AC.anyChar
','までの文字列を返して、一文字(','を)取り除く。やる気ないCSVパーサみたいなの。
実行例:
*Main> parseTest field "1,223,4," Done "223,4," "1"
これを食うSinkを作ります。
import Data.Conduit.Attoparsec takeField :: (Monad m, MonadThrow m) => GLSink ByteString m ByteString takeField = sinkParser field
Data.Conduit.Attoparsecをimportして、sinkParserに渡すだけ。まあこの関数をみてこのネタ思いついたんですけど。
Source
次、Sourceを用意する。せっかくなのでHTTP Conduitを使ってネットワークから取ってくるようにしてみました。
まずデータ作り。mighttpd2が既にインストールされているものとする。
% echo 12,3,456,78,9 > index.html % mighty
これでデータはOK。
% curl http://localhost:8080/ 12,3,456,78,9
Network.HTTP.Conduitにある例題通りにCurl的なものを書く。
{-# LANGUAGE OverloadedStrings #-} module Curl where import Data.Conduit import Data.Conduit.Binary import Network.HTTP.Conduit import System.IO main :: IO () main = do request <- parseUrl "http://localhost:8080/" withManager $ \manager -> do response <- http request manager responseBody response $$+- sinkHandle stdout
そのままだと今のバージョンでは動かなかったり、ファイルに吐き出したりする仕様が微妙なので、ちょっと書き換えました。
あの例ってConduitのバージョンいくつのなんだろう、だいぶ前から動かなかった気が……。http関数が返すResopnseのbodyの型はResumableSourceなので($$)は($$+-)にしないと動かないのです。
% ghci Curl.hs [1 of 1] Compiling Curl ( Curl.hs, interpreted ) Ok, modules loaded: Curl. *Curl> main 12,3,456,78,9
で、これを一般化してcurl関数を作ろう! と思ったらなんか大変なことになった。型が。
curl :: (MonadUnsafeIO m, MonadThrow m, MonadIO m, MonadBaseControl IO m, Failure HttpException m) => String -> m (ResumableSource (ResourceT m) ByteString) curl url = do request <- parseUrl url withManager $ \manager -> responseBody <$> http request manager
まあ処理自体はほとんど変わっていません。色々importしなきゃいけなくてめんどいだけです。
(訂正:http://yunomu.hatenablog.jp/entry/2012/07/22/213649)
Sinkと実行と例外とか
次、またSinkです。
Sourceから取ってきたデータを最初のSink(Parser)で切り取って、切り取ったデータをこっちのSinkで処理するというイメージ。
今回は、takeFieldで取ってきた文字列をIntに変換してリストに詰める処理を書いてみました。そんなの最初からattoparsecでやれよって感じですが、実際はfieldごとになんかもっと複雑な処理をすると思いねぇ。
import qualified Data.ByteString.Char8 as BC getIntList :: (MonadBaseControl IO m, MonadThrow m) => ResumableSource m ByteString -> m [Int] getIntList src = do (src1, str1) <- src $$++ takeField let v = read $ BC.unpack str1 (v:) <$> getIntList src1
ここでは($$++)を使って、切り出した文字列(str1)と残りのResumableSource(src1)を取り出している。これに何がしかの処理(read)を施して、残りのsrc1に対してまた同じ処理を呼び出している。これでコンマ区切りの文字列が次々に加工されて数値リストになっていくはず!
ではつないでみます。
main :: IO () main = runResourceT $ do src0 <- liftIO $ curl "http://localhost:8080/" ss <- getIntList src0 liftIO $ print ss
最初のcurl関数を使ってSourceを作り、getIntListに渡すだけ。
実行。
*Main> main *** Exception: ParseError {errorContexts = ["demandInput"], errorMessage = "not enough bytes", errorPosition = 2:2}
エラーです。まあストリームの終わりまでいってもさらにparseしようとすればそりゃエラーも起きるよという話です。
そもそもParseErrorってどうやって拾うのよって話ですが、そのあたりはsinkParserのドキュメントに書いてあります。
Convert an Attoparsec Parser into a Sink. The parser will be streamed bytes until it returns Done or Fail.
If parsing fails, a ParseError will be thrown with monadThrow.
Since 0.5.0
ParseErrorをmonadThrowで投げるから拾えと。
拾うのは、Control.Exception.Liftedのcatchとかhandleでやります。
今回は、ParseErrorが起きたらそこで処理を止める事にしてみます。
import qualified Control.Exception.Lifted as E import qualified Data.ByteString.Char8 as BC getIntList :: (MonadBaseControl IO m, MonadThrow m) => ResumableSource m ByteString -> m [Int] getIntList src = E.handle handler $ do (src1, str1) <- src $$++ takeField let v = read $ BC.unpack str1 (v:) <$> getIntList src1 where handler :: MonadBaseControl IO m => ParseError -> m [a] handler _ = return []
パースできなかったら空リストを返して、そこで処理終了という感じ。
本当はエラーはエラーとして処理して、ストリームが終わったらなんとかかんとかとかした方がいいかもしれないけど。
ParseError以外のやつの事は知らん。上で処理しろ。そういう感じです。
mainの方は変えなくていいので、再度実行してみます。
*Main> main [12,3,456,78]
おめでとうございます。うまいこと切り出してIntのリストにしてくれたみたいですね。
"9"が無いのは、入力文字列の最後に','が無いので、パーサが切り取れなかったのです。
IOする
readとか無害すぎてつまらないじゃないですか。
ということで、とりあえず来たデータをprintしてみます。
import qualified Control.Exception.Lifted as E import qualified Data.ByteString.Char8 as BC printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) => ResumableSource m ByteString -> m () printList src = E.handle handler $ do (src1, str1) <- src $$++ takeField liftIO $ BC.putStrLn str1 printList src1 where handler :: MonadBaseControl IO m => ParseError -> m () handler _ = return () main :: IO () main = runResourceT $ do src0 <- liftIO $ curl "http://localhost:8080/" printList src0
クラス制約に(MonadIO m)が加わります。あとは戻り値がちょこちょこっと変わるだけです。
*Main> main 12 3 456 78
並列化する
ついでにやっちゃいましょう。Control.Concurrentモジュールを使います。
import Control.Concurrent import qualified Control.Exception.Lifted as E import qualified Data.ByteString.Char8 as BC printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) => ResumableSource m ByteString -> m () printList src = E.handle handler $ do (src1, str1) <- src $$++ takeField liftIO $ forkIO $ BC.putStrLn str1 printList src1 where handler :: MonadBaseControl IO m => ParseError -> m () handler _ = return () main :: IO () main = runResourceT $ do src0 <- liftIO $ curl "http://localhost:8080/" printList src0
はい、さっきのIOの部分にforkIOって書いただけです。
まとめ
このへんまでやるとなんとなくConduitのうれしさがわかってきます。
私が一体何を目指していたかというと、
- ストリームデータを取得する(http)
- 取ってきたデータを特定のフォーマットで切り取る(attoparsec)
- 切り取ったデータをスレッドに渡す(forkIO)
- 渡されたスレッドがなんか処理をする(今回はputStrLnだけ)
というのがやりたかったというか、
具体的にはRSSリーダみたいなのを作っていまして、
- RSSフィードを取得する
- フィードをitemごとに切り出す
- itemのデータをスレッドに渡す
- itemのURLからリンク先記事を取ってきて解析してデータをDBに突っ込む(ここが重たい)
というような事がやりたい。
というような事が、気持ちよくできそうな気がする。
今回書いたソースの全体像っていうか最終型はこれです。ちょっと遊びで増減してたりします。
exercises/http-attoparsec/Main.hs at master · yunomu/exercises · GitHub