We can’t control the order in which operations are performed. Therefore, we also can’t control the result of the program execution. Even if we get an error, it won’t be easy to step in the same river twice. I’d like to suggest a recipe of testing a multithread application.
We’ll need the following ingredients: haskell, QuickCheck, some monads, salt/pepper to your taste.
Case Study
As a case study we’ll consider an exercise about philosophers having a dinner.MVar a – is a reference, which either contains a type value, or is empty.
putMVar ref x puts by ref reference the value of x.
takeMVar ref reads off the reference content leaving it empty after that. If it was already empty the thread falls asleep until another thread writes something to it.
() – is the type having the unique value which is indicated the same way as the type itself – ().
We model the forks by references of MVar () type.
Thus, a fork can have two states. If it’s occupied by some philosopher – it’s empty, if the fork is free – it contains () value.
import System.Random
import Control.Monad
import Control.Concurrent
import Control.Monad.Cont
import Control.Monad.Trans
import Data.IORef
import Test.QuickCheck
import Test.QuickCheck.Gen
import Test.QuickCheck.Monadic
-- sleep stops the thread for a random number of seconds (from 0 to 0.3)
sleep :: IO ()
sleep = randomRIO (0, 300000) >>= threadDelay
phil
:: Int -- Philisopher number.
-> MVar () -- The reference to the left fork.
-> MVar () -- The reference to the right fork.
-> IO ()
phil n leftFork rightFork = forever $ do
putStrLn $ show n ++ " is awaiting"
sleep
takeMVar leftFork
putStrLn $ show n ++ " took left fork"
-- sleep
takeMVar rightFork
putStrLn $ show n ++ " took right fork"
sleep
putMVar leftFork ()
putMVar rightFork ()
putStrLn $ show n ++ " put forks"
sleep
runPhil :: Int -> IO ()
runPhil n = do
-- Create references, which are represented by forks.
forks <- replicateM n $ newMVar ()
-- Run 5 threads and execute phil function in each of them.
forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))
main = do
runPhil 5
-- If the main thread exits then the program will stop, so we’ll send it to sleep for good.
forever (threadDelay 1000000000)
A deadlock can occur in this program.
If you want to see it at, you can uncomment sleep line and wait a little bit.
Our goal is to write tests which could detect this error.
But before we can do that, it’s necessary to understand the way we’ll manage the order of operations performance. We’ll use another monad instead of IO for that.
Let’s generalize sleep, phil and runPhil functions definitions so that they would work for other monads as well.
sleep :: MonadIO m => m ()
sleep = do
r <- liftIO $ randomRIO (0, 100)
r `times` liftIO (threadDelay 300)
where
times :: Monad m => Int -> m () -> m ()
times r a = mapM_ (\_ -> a) [1..r]
Now sleep function can work with any monad supporting IO operations. Only one liftIO function allowing to do that is defined in MonadIO class.
You should note that instead of falling asleep once for a random number of seconds we fall asleep a random number of times for 0.3 millisecond as actions are carried out atomically inside liftIO in our monad. So the time we sleep affects nothing. But the number of times we fall asleep is important.
Whereas our monad will operate in one thread only, MVar are useless for us. We’ll define our list of references later, assuming that phil function will be able to work with MVar as well as with other reference types. Let’s define monad MonadConcurrent class for that. The class will contain operations for creating, reading and writing by a reference, and also threads creation.
class Monad m => MonadConcurrent m where
type CVar m :: * -> *
newCVar :: a -> m (CVar m a)
takeCVar :: CVar m a -> m a
putCVar :: CVar m a -> a -> m ()
fork :: m () -> m ()
We used type families which are the language extension. In this case, we need this extension to define different reference types for different monads. In order to use the extension we should add the following line to the file beginning (+ extensions we’ll need later):
{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}
Let’s define instance of this class for IO monad.
It’s simple: we just use the appropriate operations for MVar.
instance MonadConcurrent IO where
type CVar IO = MVar
newCVar = newMVar
takeCVar = takeMVar
putCVar = putMVar
fork m = forkIO m >> return ()
Let’s generalize phil and runPhil functions.
phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m ()
phil n leftFork rightFork = forever $ do
liftIO $ putStrLn $ show n ++ " is awaiting"
sleep
takeCVar leftFork
liftIO $ putStrLn $ show n ++ " took left fork"
takeCVar rightFork
liftIO $ putStrLn $ show n ++ " took right fork"
sleep
putCVar leftFork ()
putCVar rightFork ()
liftIO $ putStrLn $ show n ++ " put forks"
sleep
runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m ()
runPhil n = do
forks <- replicateM n $ newCVar ()
forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))
Let’s run the program and make sure that it operates as before.
Concurrent Monad
Now it’s time for the most interesting part.Let’s define the monad we’ll work in (looking ahead I’ll tell you that it’s called Cont). I’d also risk assuming that Cont is one of the most difficult and at the same time powerful monads.
Using this monad we can do anything with the thread. For example, instead of carrying out the actions we can store them in the structure (let’s declare Action type for that purpose) and carry out them later, perhaps, in different order.
data Action = Atom (IO Action)
| forall a. ReadRef (MaybeRef a) (a -> Action)
| forall a. WriteRef (MaybeRef a) a Action
| Fork Action Action
| Stop
Let’s take a look at each constructor separately.
Stop action means that calculations are completed.
Fork action means that calculations branch, so now we have two threads which can be executed concurrently.
Atom action atomically performs IO operation that returns the new Action, where is located the action to be carried out at the next step.
For example:
getSum function defines the action that reads off two numbers from the keypad, print their amount and completes.
getSum :: Action
getSum = Atom $ do
x <- readLn -- read off the first number о
return $ Atom $ do -- return continuation
y <- readLn -- read off the second number
return $ Atom $ do -- return continuation
print (x + y) -- print the amount
return Stop -- return continuation
Then:
WriteRef ref val act action writes val value by ref reference, continuation is located in act.
ReadRef ref act action reads off the value by ref reference. act accepts this value and returns continuation.
So that we could save references of random types in Action, we use another language extension – existential quantification.
MaybeRef type represents the type of references we’ll use instead of MVar. We define it as a reference to Maybe.
newtype MaybeRef a = MaybeRef (IORef (Maybe a))
Now we can define our monad.
As promised, we just convert Cont monad.
newtype Concurrent a = Concurrent (Cont Action a) deriving Monad
Cont Action monad has the following structure.
Instead of returning a type value it accents continuation of (a -> Action) type, passes the value to this function and returns the result.
We can consider that
Cont Action a = (a -> Action) -> Action.
To put it more precisely we have a pair of functions converting (a -> Action) -> Action to Cont Action a and vice versa.
cont :: ((a -> Action) -> Action) -> Cont Action a.
runCont :: Cont Action a -> (a -> Action) -> Action
Now we can define the instance of MonadIO and MonadConcurrent classes.
instance MonadIO Concurrent where
liftIO m = Concurrent $ cont $ \c -> Atom $ do
a <- m
return (c a)
Let’s see what happens here.
liftIO accepts IO operation and converts it to an atomic operation. We pass a function to cont. This function accepts continuation (c has a -> Action type) and returns an atomic action executing IO m operation.
We define Atom so that the atomic operation should return Action, which is a continuation.
That’s what we’ll do: after executing m we call c, which returns the necessary continuation.
Now we’ll define instance MonadConcurrent.
Create a reference in newCVar, using just defined liftIO function.
Return the appropriate action to takeCVar and putCVar and save the continuation inside this function.
Return to fork an action, in which both threads are stored: pass one to fork function arguments, the other one comes from the continuation.
instance MonadConcurrent Concurrent where
type CVar Concurrent = MaybeRef
newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a)
takeCVar v = Concurrent $ cont (ReadRef v)
putCVar v a = Concurrent $ cont $ \c -> WriteRef v a $ c ()
fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()
Our monad is almost ready. We just learn how to run it.
To begin with let’s write a function which will start Action. It accepts the list of actions, in which each element is a separate thread.
There can be various strategies of actions run. Let’s clarify two things:
- order of threads execution
- things we should do when trying to read off the value from the empty variable
Reminding you, that the variable can be empty and we’ll have to wait till another thread puts something in it. At first let’s write a simple version, in which we’ll execute the lines one by one and will place the thread trying to read the empty value to the end of the queue.
runAction :: [Action] -> IO ()
-- We are done if there are no threads left
runAction [] = return ()
-- Execute the atomic action. Put the continuation it returns to the end of the queue.
runAction (Atom m : as) = do
a' <- m
runAction $ as ++ [a']
-- Put two new threads to the end of the queue.
runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2]
-- Keep starting the remaining threads.
runAction (Stop : as) = runAction as
runAction (ReadRef (MaybeRef ref) c : as) = do
-- Read off the value content.
ma <- readIORef ref
case ma of
-- If there’s something in it
Just a -> do
-- We empty the reference content.
writeIORef ref Nothing
-- Put the continuation to the end of the queue.
runAction (as ++ [c a])
-- If there hasn’t been abything in it we should try to read this reference later, so we’ll add the same action to the end of the queue.
Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c])
-- Write a value by the reference, put the continuation to the end of the queue.
runAction (WriteRef (MaybeRef ref) val a : as) = do
writeIORef ref (Just val)
runAction (as ++ [a])
You should note that putMVar works a bit differently comparing to our implementation of WriteRef.
If there has been some value by the reference, putMVar will freeze the thread until the variable is empty. We’ll rewrite the value in this case.
There’s no point in creating the version that will operate as putMVar, you’ll only make the code more complex.
Then we’ll write the function running Concurrent and redefine main.
runConcurrent :: Concurrent () -> IO ()
runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop]
main = runConcurrent (runPhil 5)
The speed has slowed down a bit as for the moment we’re working in one thread and threadDelay stops our work.
Writing Tests
It’s time to “add the seasoning to our dish” – write tests for our example.We’ll use QuickCheck library for it, which will generate random input data for tests.
Since we want to run our threads in different orders, the input data for our tests is the order in which we choose another thread from the list.
We can encode the input data by the number list, but the problem is that we don’t know beforehand, what range these numbers should be chosen from as n number of threads can vary.
So we’ll encode the input data by the list of Int -> Int type functions that accept the number and return the number from [0,n-1] range.
newtype Route = Route [Int -> Int]
Arbitrary class provided by QuickCheck library is intended for describing the types that allow generating elements randomly.
There are two functions declared in this class — shrink and arbitrary.
shrink has an implementation by default so we won’t redefine it.
We’ll generate the list of random functions, in which each function returns a number from [0,n-1] range.
instance Arbitrary Route where
arbitrary = fmap Route (listOf arbitraryFun)
where
arbitraryFun = MkGen $ \q s n -> unGen (choose (0, n - 1)) q s
We also define instance Show for Route as it’s required by QuickCheck.
Unfortunately, we can’t write the useful show. This function won’t even be used so we’ll leave it undefined.
instance Show Route where
show = undefined
Now we can get down to writing a better version of runAction.
The first difference is that we’ll divide the execution of atomic actions and work with references.
To begin with, let’s write skipAtoms helper function that will execute atomic actions. It accepts the list of actions and executes Atom, Fork and Stop, then returns the rest as a result
skipAtoms :: [Action] -> IO [Action]
skipAtoms [] = return []
skipAtoms (Atom m : as) = do
a <- m
skipAtoms (as ++ [a])
skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2])
skipAtoms (Stop : as) = skipAtoms as
skipAtoms (a : as) = fmap (a:) (skipAtoms as)
The second difference of new runAction version from the previous one is that we track the deadlock acquisition.
We create two lists of actions for that. In the first list we’ll store active (executed by us) threads. In the second one we’ll store the threads waiting for some reference renew.
We get a deadlock and throw the exception if the list of empty threads is empty and there are no threads waiting for the list.
The third innovation is the argument of Route type that is used for choosing the thread number to be executed at the current step.
runAction :: Route -> [Action] -> [Action] -> IO ()
runAction _ [] [] = return ()
runAction _ [] _ = fail "Deadlock"
runAction (Route []) _ _ = return ()
runAction (Route (r:rs)) as bs = do
as <- skipAtoms as
let n = length as
case splitAt (r n) as of
(as1, ReadRef (MaybeRef ref) c : as2) -> do
ma <- readIORef ref
case ma of
Just a -> do
writeIORef ref Nothing
runAction (Route rs) (as1 ++ [c a] ++ as2) bs
Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c])
(as1, WriteRef (MaybeRef ref) x c : as2) -> do
writeIORef ref (Just x)
runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []
runConcurrent function has barely changed.
runConcurrent :: Route -> Concurrent () -> IO ()
runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []
We can check up the way new version operates by passing round_robin as the first argument. It’s a simple strategy of execution; it’s similar to the way runAction function has worked before.
We just generate an infinite list and take the remainder by the module of threads number for each element.
round_robin :: Route
round_robin = Route $ map rem [0..]
Having run calculations at these input data, we’ll most probably get a deadlock as our example work is built on the basis of random-number generator. So, despite the fact that the input data are always the same, execution order turns out to be random.
If our example would be more determined we’d have to vary the input data randomly. That’s exactly what we’re doing.
main = quickCheck $ monadicIO $ do
r <- pick arbitrary
run $ runConcurrent r (runPhil 5)
We choose a random element of Route type by using the implemented earlier arbitrary function. Then we run our calculation at this input.
QuickCheck will take care about the rest. It will run our test 100 times, every time increasing the size of input data.
After running the program we’ll see the following:
...
3 took left fork
4 put forks
4 is awaiting
5 took left fork
4 took left fork
1 took right fork
1 put forks
1 is awaiting
1 took left fork
2 took left fork
*** Failed! Exception: 'user error (Deadlock)' (after 36 tests):
That’s exactly what we should get!
Summary
We learnt how to write tests that can detect deadlock state in a multithread application.We’ve seen examples of using Cont monad, type families, existential quantification and QuickCheck library.
Besides, we’ve learnt how to improvise when building a model of multithread program execution.
0 comments
Upload image