読者です 読者をやめる 読者になる 読者になる

yunomuのブログ

酒とゲームと上から目線

Control.Concurrent.forkIOをもっとうまくやる

以前に、
Conduit + Attoparsec (+ Concurrent) - yunomuのブログこのあたりで
forkIOステキ!みたいなことを書いたんだと思う。

で、最近ようやくこのあたりを真面目に扱うようになり、「プロセス終了時にスレッド中断されてるじゃん!」ってことに気付いた。
この時は軽い処理してたから気付かなかったんだね。

その前にちょっと前回のコードを書き直し。

import Control.Concurrent
import qualified Control.Exception.Lifted as E
import qualified Data.ByteString.Char8 as BC
import Control.Monad.Trans.Control
import Control.Monad.IO.Class (liftIO)
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL(中略)

printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) =>
    ResumableSource m ByteString -> m ()
printList src = E.handle ignore $ do
    (src1, str1) <- src $$++ takeField
    liftIO $ forkIO $ BC.putStrLn str1
    printList src1

ignore :: MonadBaseControl IO m => ParseError -> m ()
ignore _ = return ()

main :: IO ()
main = runResourceT $ do
    (src0, _) <- CB.sourceFile "index.html" $$+ CL.take 0
    printList src0

importを足したり、ハンドラの名前を変えただけ。
ついでにネットワーク通信部分を除去してファイルから読むようにした。

まあそれはいいんですけど、これのどこに問題があったかというと、forkIOのとこ。

    liftIO $ forkIO $ BC.putStrLn str1

どうもここが、一部のスレッドが実行されないまま終了してるっぽかった。もしかするとforkIOで生成されたスレッドは親プロセスが死ぬと終了を待たずにそのままいっしょに死ぬのかもしれない。
確かにスレッドが終了するまで親が待つとかそういう事はどこにも書いてないし。

ということでwait的なものが無いかどうか探してみたんですが、ちょっと見つからない。代わりにEventというのを見つけた。
Control.Concurrent.Event - hackage
ドキュメントには、Eventを使うとEventを待つことができるから並列処理をする時に使ってね、って書いてあるような気がする。

なんとなく、子プロセス終了待ちはこれを使って自分で実装しなきゃいけない気がしてきた。
いやなんかあるのかもしれないけど。
まあでもあればあったで後で入れ替えることにして、簡単そうなのでとりあえず実装してみました。

Eventの簡単な使い方

  • newでEventを生成する
  • set eventでイベントを発生させる
  • wait eventはsetされるまでブロックされる。setされると解除される

といった感じなので、
要するに、処理が終了した後に、終了したことをEventを使って通知してあげればいい。
具体的には、Eventを生成して、スレッドに渡してあげて、スレッドが終了する時にsetする。

-- なんぞ処理(proc)を実行してeventを発生させる
withEvent :: Event -> IO a -> IO ()
withEvent event proc = do
    proc
    set event

-- スレッドを生成してイベントを返す
fork :: IO () -> IO Event
fork proc = do
    event <- new
    forkIO $ withEvent event proc
    return event

-- 改良版printList forkIOがforkになっただけ
printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) =>
    ResumableSource m ByteString -> m ()
printList src = E.handle ignore $ do
    (src1, str1) <- src $$++ takeField
    liftIO $ fork $ BC.putStrLn str1
    printList src1 

というので流れはおおむねいいんですが、まあ見ての通りwaitしていません。

waitは、forkがEventを返すようにしていたので、そいつを使えばいい。そして今回の場合はストリームの終了はParseErrorの発生で検出していたので、いじるのはignoreです。その影響でprintListの形も少々変わります。

-- 改良版2 printList 蓄積変数が増えた
printList :: (MonadBaseControl IO m, MonadThrow m, MonadIO m) =>
    ResumableSource m ByteString -> m ()
printList src = printList' src []
  where
    printList' src0 es = do
        (src1, str1) <- src $$++ takeField
        e <- liftIO $ fork $ BC.putStrLn str1
        -- eventを蓄積する
        let es' = e:es
        E.handle (ignore es') $ printList' src1 es' 

-- クラス制約が増えた。引数も増えた。すべてのイベントが終了するまで待つ
ignore :: (MonadBaseControl IO m, MonadResource m)
    => [Event] -> ParseError -> m ()
ignore es _ = liftIO $ mapM_ wait es

これでignoreが実行された時に、それまでにforkされたスレッドが全て終了するようになりました。たぶん。

これでだいたいの場合はOKなんですが、さてforkに渡されたprocの中で例外が発生した場合はどうなるでしょう。
procの中でという事なので、直接影響を受けるのはwithEvent関数で、withEvent関数の実装はこうなので

withEvent :: Event -> IO a -> IO ()
withEvent event proc = do
    proc
    set event

このprocの中で例外が起きるとsetが実行されない。
setが実行されないということは、waitの部分で死にます。

というのを回避するために、procが死のうがどうしようがset eventだけは実行してもらわなければなりません。
とう時に使えるのがJavaでもおなじみのfinallyです。
withEventをfinallyで書き換えて、set eventが必ず実行されるようにします。

withEvent :: Event -> IO a -> IO a
withEvent event proc = E.finally proc $ set event 

まあなんかシグネチャから変わっていますが、使い勝手はだいたい同じというか、使いやすくなってるというか、もう無くていいんじゃないかなこの関数。

このあたり設計ちょっと面倒ですけど、まじめにやれば並列処理の実行順序を自由に制御できるようになりそうな気がしますよね。
スレッドの終了待ちくらいはあってもよさそうな気もしますけどね。
でも実質的にこの程度あれば十分なのかな。

全ソースはこちらです。
exercises/http-attoparsec/Main.hs at master · yunomu/exercises · GitHub