Concurrency
Forking a new thread of control is achieved with the forkIO operation.
forkIO :: IO () -> IO ThreadId
This takes a computation of type IO () as its argument, and executes this computation in a new thread that runs concurrently with other threads in the system.
The basic communication mechanism is MVar:
data MVar a
An MVar can be thought of as a box that is either empty or full. The newEmptyMVar operation creates a new empty box, and newMVar creates a new full box containing the value passed as its argument. The takeMVar operation removes the value from a full MVar and returns it, but waits (or blocks) if the MVar is currently empty. Symmetrically, the putMVar operation puts a value into the MVar but blocks if the MVar is already full.
Examples of Program Execution
data Logger = Logger (MVar LogCommand)
data LogCommand = Message String | Stop (MVar ())
initLogger :: IO Logger
initLogger = do
m <- newEmptyMVar
let l = Logger m
forkIO (logger l)
return l
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (msg)
putMVar m (Message "haha")
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
putMVar m (Stop s)
loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
main :: IO ()
main = do
l@(Logger m) <- initLogger
logMessage l "hello"
logMessage l "bye"
This will output hello and then block:
mainputs “hello” intoMVarmloggertakes theLogCommandcmd fromMVarmmainputs “bye” intoMVarmloggertries to put “haha” intoMVarm, but it is already full, so it blocks
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg)
-- putMVar m (Message "haha")
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
putMVar m (Stop s)
-- loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar m
return ()
main :: IO ()
main = do
l@(Logger m) <- initLogger
logMessage l "hello"
logMessage l "bye"
logStop l
This will output “hello” and then “bye” and then block:
mainputs “hello” inMVar m, (and cannot put “bye” into m until something takes from m)loggertakes “hello” fromMVar mmainputs “bye” inMVar m, (and cannot putStop sinto m until something takes from m)loggertakes “bye” fromMVar mmainputsStop sintoMVar m, but then immediately takes fromMVar mbeforeloggercan take fromMVar m, therefore the logger blocks before it can read fromm.
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg)
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
main :: IO ()
main = do
l@(Logger m) <- initLogger
logMessage l "hello"
logMessage l "bye"
logStop l
This will print out “hello”, “bye”, and then “stop” correctly:
mainputs “hello” inMVar m, (and cannot put “bye” into m until something takes from m)loggertakes “hello” fromMVar mmainputs “bye” inMVar m, (and cannot putStop sinto m until something takes from m)loggertakes “bye” fromMVar mmainputsStop swheresis a new emptyMVarinto theMVarm (and then blocks as it cannot take fromMVar suntil it is full)loggertakesStop sfromMVar m, and then puts a value()into the emptyMVar s.logStopcan now continue by taking()fromMVar s.
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg)
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
putMVar m (Stop s)
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
main :: IO ()
main = do
l@(Logger m) <- initLogger
logMessage l "hello"
logMessage l "bye"
logStop l
This will run exactly the same as above, because the line putMVar m (Stop s) is redundant. If a mutable var m contains another mutable var s, then changing s will mean m will include this new s.
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg)
-- putMVar m (Message "haha")
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
putMVar m (Stop s)
-- loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
-- putMVar s ()
main :: IO ()
main = do
l@(Logger m) <- initLogger
logMessage l "hello"
logMessage l "bye"
logStop l
a <- takeMVar m
case a of Message a -> putStrLn ("Message " ++ a)
Stop s -> do b <- takeMVar s
putStrLn ("Stop " ++ show b)
This will output “hello”, “bye”, “logger: stop”, and then block.
mainputs “hello” inMVar m, (and cannot put “bye” into m until something takes from m)loggertakes “hello” fromMVar mmainputs “bye” inMVar m, (and cannot putStop sinto m until something takes from m)loggertakes “bye” fromMVar mmainputsStop swheresis a new emptyMVarinto theMVarm (and then blocks as it cannot take fromMVar suntil it is full)loggertakesStop sfromMVar m, and then puts a value()into the emptyMVar s.logStopcan now continue by taking()fromMVar s. This results inmbeing the valueStop swheresis empty.mainthen takesStop sfromm, pattern matches this asStop s, and then tries to take froms, but blocks becausesis empty.
Making an MVar m contain another MVar s (which is initially empty) is a useful way of sending messages (using putMVar m) and then waiting for a response (using takeMVar s), without blocking another thread.
For example:
data Logger = Logger (MVar LogCommand)
data LogCommand = Message String | Stop (MVar ())
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
putStrLn msg
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
Thread A calling logStop sets m to be Stop s where s is empty.
Thread B running logger can then take from m, and then let Thread A know that it is finished by putting s to (), at which point ThreadA can take from s.
It is important to note that the program terminates when main returns, even if there are other threads still running. (Behaviour differs in ghci in comparison to executing a program via ghc; ghci will tend to let the other threads run, whereas ghc will terminate after the main thread finishes).
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg )
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
putter :: Logger -> IO ()
putter l = do
logMessage l "hello"
logMessage l "bye"
logStop l
return ()
main :: IO ()
main = do
m <- newEmptyMVar
let l = Logger m
forkIO (logger l)
forkIO (putter l)
return ()
Running this (via ghc) will return without any output, as the main thread finishes early.
If we inline putter into the main thread, then this will run to termination (in ghc).
logger :: Logger -> IO ()
logger (Logger m) = loop
where
loop = do
cmd <- takeMVar m
case cmd of
Message msg -> do
id <- myThreadId
putStrLn (show id ++ ": " ++ msg )
loop
Stop s -> do
putStrLn "logger: stop"
putMVar s ()
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)
logStop :: Logger -> IO ()
logStop (Logger m) = do
s <- newEmptyMVar
putMVar m (Stop s)
takeMVar s
main :: IO ()
main = do
m <- newEmptyMVar
let l = Logger m
forkIO (logger l)
logMessage l "hello"
logMessage l "bye"
logStop l
return ()