Probabilistic Effects. λθ

Concurrency

Forking a new thread of control is achieved with the forkIO operation.

forkIO :: IO () -> IO ThreadId

This takes a computation of type IO () as its argument, and executes this computation in a new thread that runs concurrently with other threads in the system.

The basic communication mechanism is MVar:

data MVar a

An MVar can be thought of as a box that is either empty or full. The newEmptyMVar operation creates a new empty box, and newMVar creates a new full box containing the value passed as its argument. The takeMVar operation removes the value from a full MVar and returns it, but waits (or blocks) if the MVar is currently empty. Symmetrically, the putMVar operation puts a value into the MVar but blocks if the MVar is already full.

Examples of Program Execution

data Logger = Logger (MVar LogCommand)

data LogCommand = Message String | Stop (MVar ()) 

initLogger :: IO Logger
initLogger = do
  m <- newEmptyMVar
  let l = Logger m
  forkIO (logger l)
  return l

logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (msg)
        putMVar m (Message "haha")
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()
        putMVar m (Stop s)
        loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

main :: IO ()
main = do
  l@(Logger m) <- initLogger
  logMessage l "hello"
  logMessage l "bye"

This will output hello and then block:

  • main puts “hello” into MVar m
  • logger takes the LogCommand cmd from MVar m
  • main puts “bye” into MVar m
  • logger tries to put “haha” into MVar m, but it is already full, so it blocks
logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg)
        -- putMVar m (Message "haha")
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()
        putMVar m (Stop s)
        -- loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar m
  return ()

main :: IO ()
main = do
  l@(Logger m) <- initLogger
  logMessage l "hello"
  logMessage l "bye"
  logStop l

This will output “hello” and then “bye” and then block:

  • main puts “hello” in MVar m, (and cannot put “bye” into m until something takes from m)
  • logger takes “hello” from MVar m
  • main puts “bye” in MVar m, (and cannot put Stop s into m until something takes from m)
  • logger takes “bye” from MVar m
  • main puts Stop s into MVar m, but then immediately takes from MVar m before logger can take from MVar m, therefore the logger blocks before it can read from m.
logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg)
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()

logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s

main :: IO ()
main = do
  l@(Logger m) <- initLogger
  logMessage l "hello"
  logMessage l "bye"
  logStop l

This will print out “hello”, “bye”, and then “stop” correctly:

  • main puts “hello” in MVar m, (and cannot put “bye” into m until something takes from m)
  • logger takes “hello” from MVar m
  • main puts “bye” in MVar m, (and cannot put Stop s into m until something takes from m)
  • logger takes “bye” from MVar m
  • main puts Stop s where s is a new empty MVar into the MVar m (and then blocks as it cannot take from MVar s until it is full)
  • logger takes Stop s from MVar m, and then puts a value () into the empty MVar s.
  • logStop can now continue by taking () from MVar s.
logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg)
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()
        putMVar m (Stop s)

logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s

main :: IO ()
main = do
  l@(Logger m) <- initLogger
  logMessage l "hello"
  logMessage l "bye"
  logStop l

This will run exactly the same as above, because the line putMVar m (Stop s) is redundant. If a mutable var m contains another mutable var s, then changing s will mean m will include this new s.

logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg)
        -- putMVar m (Message "haha")
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()
        putMVar m (Stop s)
        -- loop
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s
  -- putMVar s ()

main :: IO ()
main = do
  l@(Logger m) <- initLogger
  logMessage l "hello"
  logMessage l "bye"
  logStop l
  a <- takeMVar m 
  case a of Message a -> putStrLn ("Message " ++ a)
            Stop s    -> do b <- takeMVar s
                            putStrLn ("Stop " ++ show b)

This will output “hello”, “bye”, “logger: stop”, and then block.

  • main puts “hello” in MVar m, (and cannot put “bye” into m until something takes from m)
  • logger takes “hello” from MVar m
  • main puts “bye” in MVar m, (and cannot put Stop s into m until something takes from m)
  • logger takes “bye” from MVar m
  • main puts Stop s where s is a new empty MVar into the MVar m (and then blocks as it cannot take from MVar s until it is full)
  • logger takes Stop s from MVar m, and then puts a value () into the empty MVar s.
  • logStop can now continue by taking () from MVar s. This results in m being the value Stop s where s is empty.
  • main then takes Stop s from m, pattern matches this as Stop s, and then tries to take from s, but blocks because s is empty.

Making an MVar m contain another MVar s (which is initially empty) is a useful way of sending messages (using putMVar m) and then waiting for a response (using takeMVar s), without blocking another thread.

For example:

data Logger = Logger (MVar LogCommand)

data LogCommand = Message String | Stop (MVar ())

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s

logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        putStrLn msg
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()

Thread A calling logStop sets m to be Stop s where s is empty. Thread B running logger can then take from m, and then let Thread A know that it is finished by putting s to (), at which point ThreadA can take from s.

It is important to note that the program terminates when main returns, even if there are other threads still running. (Behaviour differs in ghci in comparison to executing a program via ghc; ghci will tend to let the other threads run, whereas ghc will terminate after the main thread finishes).

logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg )
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()

logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s

putter :: Logger -> IO ()
putter l = do
  logMessage l "hello"
  logMessage l "bye"
  logStop l
  return ()

main :: IO ()
main = do
  m <- newEmptyMVar
  let l = Logger m
  forkIO (logger l)
  forkIO (putter l)
  return ()

Running this (via ghc) will return without any output, as the main thread finishes early.

If we inline putter into the main thread, then this will run to termination (in ghc).

logger :: Logger -> IO ()
logger (Logger m) = loop
 where
  loop = do
    cmd <- takeMVar m
    case cmd of
      Message msg -> do
        id <- myThreadId
        putStrLn (show id ++ ": " ++ msg )
        loop
      Stop s -> do
        putStrLn "logger: stop"
        putMVar s ()
logMessage :: Logger -> String -> IO ()
logMessage (Logger m) s = putMVar m (Message s)

logStop :: Logger -> IO ()
logStop (Logger m) = do
  s <- newEmptyMVar
  putMVar m (Stop s)
  takeMVar s

main :: IO ()
main = do
  m <- newEmptyMVar
  let l = Logger m
  forkIO (logger l)
  logMessage l "hello"
  logMessage l "bye"
  logStop l
  return ()
Last updated on 13 Nov 2020
Published on 13 Nov 2020