yunomuのブログ

趣味のこと

Conduit + Attoparsec (+ Concurrent)

Conduit 0.5が出て、わかりやすいチュートリアルが書かれていたので、それで一日中遊んでいたりしました。
ConduitとHaskellでネットワークプロキシサーバを作る - 純粋関数空間
Conduit楽しいですね。

これを上から下までやってなんとなく感覚を掴んだところで、Attoparsecと組み合わせて使う場合を作って遊んでみました。

Sink

まずSinkの方から作ってみます。

Parserはとりあえず適当にこんな感じで。

import Data.Attoparsec.ByteString
import qualified Data.Attoparsec.ByteString.Char8 as AC

field :: Parser ByteString
field = AC.takeWhile (/= ',') <* AC.anyChar

','までの文字列を返して、一文字(','を)取り除く。やる気ないCSVパーサみたいなの。
実行例:

*Main> parseTest field "1,223,4,"
Done "223,4," "1"

これを食うSinkを作ります。

import Data.Conduit.Attoparsec

takeField :: (Monad m, MonadThrow m) => GLSink ByteString m ByteString
takeField = sinkParser field

Data.Conduit.Attoparsecをimportして、sinkParserに渡すだけ。まあこの関数をみてこのネタ思いついたんですけど。

Source

次、Sourceを用意する。せっかくなのでHTTP Conduitを使ってネットワークから取ってくるようにしてみました。
まずデータ作り。mighttpd2が既にインストールされているものとする。

% echo 12,3,456,78,9 > index.html
% mighty

これでデータはOK。

% curl http://localhost:8080/
12,3,456,78,9

Network.HTTP.Conduitにある例題通りにCurl的なものを書く。

{-# LANGUAGE OverloadedStrings #-}
module Curl where

import Data.Conduit
import Data.Conduit.Binary
import Network.HTTP.Conduit
import System.IO

main :: IO ()
main = do
    request <- parseUrl "http://localhost:8080/"
    withManager $ \manager -> do
        response <- http request manager
        responseBody response $$+- sinkHandle stdout

そのままだと今のバージョンでは動かなかったり、ファイルに吐き出したりする仕様が微妙なので、ちょっと書き換えました。
あの例ってConduitのバージョンいくつのなんだろう、だいぶ前から動かなかった気が……。http関数が返すResopnseのbodyの型はResumableSourceなので($$)は($$+-)にしないと動かないのです。

% ghci Curl.hs
[1 of 1] Compiling Curl             ( Curl.hs, interpreted )
Ok, modules loaded: Curl.
*Curl> main
12,3,456,78,9

で、これを一般化してcurl関数を作ろう! と思ったらなんか大変なことになった。型が。

curl :: (MonadUnsafeIO m,
         MonadThrow m,
         MonadIO m,
         MonadBaseControl IO m,
         Failure HttpException m) =>
    String -> m (ResumableSource (ResourceT m) ByteString)
curl url = do
    request <- parseUrl url
    withManager $ \manager ->
        responseBody <$> http request manager

まあ処理自体はほとんど変わっていません。色々importしなきゃいけなくてめんどいだけです。
(訂正:http://yunomu.hatenablog.jp/entry/2012/07/22/213649

Sinkと実行と例外とか

次、またSinkです。
Sourceから取ってきたデータを最初のSink(Parser)で切り取って、切り取ったデータをこっちのSinkで処理するというイメージ。

今回は、takeFieldで取ってきた文字列をIntに変換してリストに詰める処理を書いてみました。そんなの最初からattoparsecでやれよって感じですが、実際はfieldごとになんかもっと複雑な処理をすると思いねぇ。

import qualified Data.ByteString.Char8 as BC

getIntList :: (MonadBaseControl IO m, MonadThrow m) =>
    ResumableSource m ByteString -> m [Int]
getIntList src = do
    (src1, str1) <- src $$++ takeField
    let v = read $ BC.unpack str1
    (v:) <$> getIntList src1

ここでは($$++)を使って、切り出した文字列(str1)と残りのResumableSource(src1)を取り出している。これに何がしかの処理(read)を施して、残りのsrc1に対してまた同じ処理を呼び出している。これでコンマ区切りの文字列が次々に加工されて数値リストになっていくはず!

ではつないでみます。

main :: IO ()
main = runResourceT $ do
    src0 <- liftIO $ curl "http://localhost:8080/"
    ss <- getIntList src0
    liftIO $ print ss

最初のcurl関数を使ってSourceを作り、getIntListに渡すだけ。
実行。

*Main> main
*** Exception: ParseError {errorContexts = ["demandInput"], errorMessage = "not enough bytes", errorPosition = 2:2}

エラーです。まあストリームの終わりまでいってもさらにparseしようとすればそりゃエラーも起きるよという話です。

そもそもParseErrorってどうやって拾うのよって話ですが、そのあたりはsinkParserのドキュメントに書いてあります。

Convert an Attoparsec Parser into a Sink. The parser will be streamed bytes until it returns Done or Fail.

If parsing fails, a ParseError will be thrown with monadThrow.

Since 0.5.0

ParseErrorをmonadThrowで投げるから拾えと。
拾うのは、Control.Exception.Liftedのcatchとかhandleでやります。

今回は、ParseErrorが起きたらそこで処理を止める事にしてみます。

import qualified Control.Exception.Lifted as E
import qualified Data.ByteString.Char8 as BC

getIntList :: (MonadBaseControl IO m, MonadThrow m) =>
    ResumableSource m ByteString -> m [Int]
getIntList src = E.handle handler $ do
    (src1, str1) <- src $$++ takeField
    let v = read $ BC.unpack str1
    (v:) <$> getIntList src1
  where
    handler :: MonadBaseControl IO m => ParseError -> m [a]
    handler _ = return []

パースできなかったら空リストを返して、そこで処理終了という感じ。
本当はエラーはエラーとして処理して、ストリームが終わったらなんとかかんとかとかした方がいいかもしれないけど。
ParseError以外のやつの事は知らん。上で処理しろ。そういう感じです。

mainの方は変えなくていいので、再度実行してみます。

*Main> main
[12,3,456,78]

おめでとうございます。うまいこと切り出してIntのリストにしてくれたみたいですね。
"9"が無いのは、入力文字列の最後に','が無いので、パーサが切り取れなかったのです。

IOする

readとか無害すぎてつまらないじゃないですか。
ということで、とりあえず来たデータをprintしてみます。

import qualified Control.Exception.Lifted as E
import qualified Data.ByteString.Char8 as BC

printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) =>
    ResumableSource m ByteString -> m ()
printList src = E.handle handler $ do
    (src1, str1) <- src $$++ takeField
    liftIO $ BC.putStrLn str1
    printList src1
  where
    handler :: MonadBaseControl IO m => ParseError -> m ()
    handler _ = return ()

main :: IO ()
main = runResourceT $ do
    src0 <- liftIO $ curl "http://localhost:8080/"
    printList src0

クラス制約に(MonadIO m)が加わります。あとは戻り値がちょこちょこっと変わるだけです。

*Main> main
12
3
456
78

並列化する

ついでにやっちゃいましょう。Control.Concurrentモジュールを使います。

import Control.Concurrent
import qualified Control.Exception.Lifted as E
import qualified Data.ByteString.Char8 as BC

printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) =>
    ResumableSource m ByteString -> m ()
printList src = E.handle handler $ do
    (src1, str1) <- src $$++ takeField
    liftIO $ forkIO $ BC.putStrLn str1
    printList src1
  where
    handler :: MonadBaseControl IO m => ParseError -> m ()
    handler _ = return ()

main :: IO ()
main = runResourceT $ do
    src0 <- liftIO $ curl "http://localhost:8080/"
    printList src0

はい、さっきのIOの部分にforkIOって書いただけです。

まとめ

このへんまでやるとなんとなくConduitのうれしさがわかってきます。
私が一体何を目指していたかというと、

  • ストリームデータを取得する(http)
  • 取ってきたデータを特定のフォーマットで切り取る(attoparsec)
  • 切り取ったデータをスレッドに渡す(forkIO)
  • 渡されたスレッドがなんか処理をする(今回はputStrLnだけ)

というのがやりたかったというか、

具体的にはRSSリーダみたいなのを作っていまして、

  • RSSフィードを取得する
  • フィードをitemごとに切り出す
  • itemのデータをスレッドに渡す
  • itemのURLからリンク先記事を取ってきて解析してデータをDBに突っ込む(ここが重たい)

というような事がやりたい。
というような事が、気持ちよくできそうな気がする。


今回書いたソースの全体像っていうか最終型はこれです。ちょっと遊びで増減してたりします。
exercises/http-attoparsec/Main.hs at master · yunomu/exercises · GitHub