Control.Concurrent.forkIOをもっとうまくやる
以前に、
Conduit + Attoparsec (+ Concurrent) - yunomuのブログこのあたりで
forkIOステキ!みたいなことを書いたんだと思う。
で、最近ようやくこのあたりを真面目に扱うようになり、「プロセス終了時にスレッド中断されてるじゃん!」ってことに気付いた。
この時は軽い処理してたから気付かなかったんだね。
その前にちょっと前回のコードを書き直し。
import Control.Concurrent import qualified Control.Exception.Lifted as E import qualified Data.ByteString.Char8 as BC import Control.Monad.Trans.Control import Control.Monad.IO.Class (liftIO) import Data.Conduit import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.List as CL(中略) printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) => ResumableSource m ByteString -> m () printList src = E.handle ignore $ do (src1, str1) <- src $$++ takeField liftIO $ forkIO $ BC.putStrLn str1 printList src1 ignore :: MonadBaseControl IO m => ParseError -> m () ignore _ = return () main :: IO () main = runResourceT $ do (src0, _) <- CB.sourceFile "index.html" $$+ CL.take 0 printList src0
importを足したり、ハンドラの名前を変えただけ。
ついでにネットワーク通信部分を除去してファイルから読むようにした。
まあそれはいいんですけど、これのどこに問題があったかというと、forkIOのとこ。
liftIO $ forkIO $ BC.putStrLn str1
どうもここが、一部のスレッドが実行されないまま終了してるっぽかった。もしかするとforkIOで生成されたスレッドは親プロセスが死ぬと終了を待たずにそのままいっしょに死ぬのかもしれない。
確かにスレッドが終了するまで親が待つとかそういう事はどこにも書いてないし。
ということでwait的なものが無いかどうか探してみたんですが、ちょっと見つからない。代わりにEventというのを見つけた。
Control.Concurrent.Event - hackage
ドキュメントには、Eventを使うとEventを待つことができるから並列処理をする時に使ってね、って書いてあるような気がする。
なんとなく、子プロセス終了待ちはこれを使って自分で実装しなきゃいけない気がしてきた。
いやなんかあるのかもしれないけど。
まあでもあればあったで後で入れ替えることにして、簡単そうなのでとりあえず実装してみました。
Eventの簡単な使い方
- newでEventを生成する
- set eventでイベントを発生させる
- wait eventはsetされるまでブロックされる。setされると解除される
といった感じなので、
要するに、処理が終了した後に、終了したことをEventを使って通知してあげればいい。
具体的には、Eventを生成して、スレッドに渡してあげて、スレッドが終了する時にsetする。
-- なんぞ処理(proc)を実行してeventを発生させる withEvent :: Event -> IO a -> IO () withEvent event proc = do proc set event -- スレッドを生成してイベントを返す fork :: IO () -> IO Event fork proc = do event <- new forkIO $ withEvent event proc return event -- 改良版printList forkIOがforkになっただけ printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) => ResumableSource m ByteString -> m () printList src = E.handle ignore $ do (src1, str1) <- src $$++ takeField liftIO $ fork $ BC.putStrLn str1 printList src1
というので流れはおおむねいいんですが、まあ見ての通りwaitしていません。
waitは、forkがEventを返すようにしていたので、そいつを使えばいい。そして今回の場合はストリームの終了はParseErrorの発生で検出していたので、いじるのはignoreです。その影響でprintListの形も少々変わります。
-- 改良版2 printList 蓄積変数が増えた printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) => ResumableSource m ByteString -> m () printList src = printList' src [] where printList' src0 es = do (src1, str1) <- src $$++ takeField e <- liftIO $ fork $ BC.putStrLn str1 -- eventを蓄積する let es' = e:es E.handle (ignore es') $ printList' src1 es' -- クラス制約が増えた。引数も増えた。すべてのイベントが終了するまで待つ ignore :: (MonadBaseControl IO m, MonadResource m) => [Event] -> ParseError -> m () ignore es _ = liftIO $ mapM_ wait es
これでignoreが実行された時に、それまでにforkされたスレッドが全て終了するようになりました。たぶん。
これでだいたいの場合はOKなんですが、さてforkに渡されたprocの中で例外が発生した場合はどうなるでしょう。
procの中でという事なので、直接影響を受けるのはwithEvent関数で、withEvent関数の実装はこうなので
withEvent :: Event -> IO a -> IO () withEvent event proc = do proc set event
このprocの中で例外が起きるとsetが実行されない。
setが実行されないということは、waitの部分で死にます。
というのを回避するために、procが死のうがどうしようがset eventだけは実行してもらわなければなりません。
とう時に使えるのがJavaでもおなじみのfinallyです。
withEventをfinallyで書き換えて、set eventが必ず実行されるようにします。
withEvent :: Event -> IO a -> IO a withEvent event proc = E.finally proc $ set event
まあなんかシグネチャから変わっていますが、使い勝手はだいたい同じというか、使いやすくなってるというか、もう無くていいんじゃないかなこの関数。
このあたり設計ちょっと面倒ですけど、まじめにやれば並列処理の実行順序を自由に制御できるようになりそうな気がしますよね。
スレッドの終了待ちくらいはあってもよさそうな気もしますけどね。
でも実質的にこの程度あれば十分なのかな。
全ソースはこちらです。
exercises/http-attoparsec/Main.hs at master · yunomu/exercises · GitHub