mrd's blog
Dynamic Programming in Haskell
Dynamic Programming is an algorithm design strategy which can be essentially described as breaking a problem down into subproblems and re-using previously computed results. Sometimes, a problem which appears to take exponential time to solve can be reformulated using a Dynamic Programming approach in which it becomes tractable. The benefit is especially clear when the subproblem solutions overlap considerably. The technique of memoization is a major time-saver in those cases.
A common Haskell newbie question is: how do I memoize? At first, it appears to be a very difficult problem, because access to mutable arrays and hashtables is restricted. It is important to realize that lazy evaluation is actually memoization itself and can be leveraged in that way for the purposes of Dynamic Programming. In fact, as a result, the expression of these algorithms can be more natural and lucid in Haskell than in a strict language.
Here, I am going to examine the classic ``knapsack problem.'' Given a number of items, their values, and their sizes -- what is the best combination of items that you can fit in a limited-size knapsack?
> module Knapsack where
> import Control.Monad
> import Data.Array
> import Data.List
> import Test.QuickCheck
I am going to represent items with this data type. Essentially, it is just a tuple of the item itself, its value, and its size.
> data Item a = Item { item :: a,
> itemValue :: Int,
> itemSize :: Int }
> deriving (Eq, Show, Ord)
Cells will be used both to represent the solution to the knapsack problem, and as individual cells in the matrix for the Dynamic Programming algorithm. It is a pair consisting of: summed values, and the items in the sack.
> data Cell a = Cell (Int, [Item a])
> deriving (Eq, Show, Ord)
This
powerset definition is a very neat use of the List monad
(which I pulled from the Haskell wiki). You can think of it as
saying: for each element in the list, half the possible subsets
will include it, and half will not.
> powerset :: [a] -> [[a]]
> powerset = filterM (const [True, False])
brutepack considers the powerset of the items,
cutting out those subsets which are too large in size, and picking
the most valuable subset left. As you might figure, this is going
to run in O(2^n) thanks to the use of powerset.
The definition should be simple to understand and will provide
a sound basis for testing the Dynamic Programming alternative.
> brutepack :: (Ord a) => Int -> [Item a] -> Cell a
> brutepack size items = maximum [
> cellOf subset |
> subset <- powerset items, itemsFit subset
> ]
> where
> itemsFit items = sum (map itemSize items) <= size
> cellOf items = Cell (sum (map itemValue items), items)
The Dynamic Programming algorithm is as follows:
Consider a matrix where the rows are indexed by size and the columns by items. The rows range from 1 to the size of the knapsack, and the columns are one-to-one with the items in the list.
value = $30 $20 $40
size = 4 3 5
_item_|_lion___tiger___bear_
|
1|
|
2|
|
3|
|
4| v(2,4)<--.
| |
5| |
| |
6| |
| |
7| |
| <--|
8| v(2,8) v(3,8)
|
This is a diagram where the knapsack has a maximum size allowance of 8, and we want to stuff some animals in it. Each element in the matrix is going to tell us the best value of items by size. That means the answer to the whole problem is going to be found in v(3,8) which is the bottom-rightmost corner.
The value of any one cell in the matrix will be decided by whether it is worthwhile to add that item to the sack or not. In the v(3,8) cell it compares the v(2,8) cell to the left to the v(2,4) cell up above. The v(2,8) cell has no room for the bear, and the v(2,4) cell represents the situation where the bear will fit. So the question, after determining if the bear will fit in the bag at all, is: is value of bear + v(2,4) better than v(2,8)?
This definition of v lends itself to a very nice recursive formulation.
/ v(m-1,n) if (s_n) > n
\
v(m,n) = -| / v(m-1,n)
/ \
\ max -| otherwise
/
\ v(m-1,n-(s_n)) + v_n
where s_n is the size of item n and v_n is its value.
A typical implementation of this algorithm might initialize a 2D array to some value representing ``undefined.'' In Haskell, we can initialize the entire array to the correct value directly and recursively because it will not be computed until needed. All that is necessary is to express the data-dependencies, and the order of evaluation will take care of itself.
This code takes the algorithm a little further by tracking a field in each cell that contains a list of the items in the sack at that point.
> dynapack :: Int -> [Item a] -> Cell a
> dynapack size items = valOf noItems size
> where
> noItems = length items
> itemsArr = listArray (1,noItems) items
> itemNo n = itemsArr ! n
>
> table = array ((1,1),(noItems,size)) $
> [
> ((m, n), cell m n) |
> m <- [1..noItems],
> n <- [1..size]
> ]
>
> valOf m n
> | m < 1 || n < 1 = Cell (0, [])
> | otherwise = table ! (m,n)
>
> cell m n =
> case itemNo m of
> i@(Item _ v s)
> | s > n ||
> vL >= vU + v -> Cell (vL , isL)
> | otherwise -> Cell (vU + v, i:isU)
> where
> Cell (vL, isL) = valOf (m - 1) n
> Cell (vU, isU) = valOf (m - 1) (n - s)
The matrix is defined in the variable table and
valOf is our function v here. This definition
very naturally follows from the algorithmic description because
there is no problem with self-reference when defining cells in
the array.
In a strict language, the programmer would have to manually
check for the presence of values and fill in the table.
Testing
It's important to be confident that the algorithm is coded
correctly. Let's see if brutepack and dynapack
agree on test inputs. I will define my own simple data type
to customize the randomly generated test data.
> newtype TestItems = TestItems [(Int, Int, Int)]
> deriving (Eq, Show, Ord)
> nubWithKey k = nubBy (\a b -> k a == k b)
> fst3 (a,b,c) = a
> tripleToItem (i,v,s) = Item i v s
The Arbitrary class expects you to define your
customly generated test data in the Gen monad.
QuickCheck provides a number of handy combinators, and
of course you can use normal monadic functions.
sized is a QuickCheck combinator which
binds the generator's notion of "size" to a parameter
of the supplied function. This notion of "size" is
a hint to test-data creators that QuickCheck wants
data on the "order of" that size. Of course, what "size"
means can be freely interpreted by the author of the
function, in this case I am using it for a couple purposes.
The basic idea is simply: create a list of randomly
generated tuples of length "size", and choose values
and item-sizes randomly from (1, "size"). Notice
how the randomly generated tuple is replicated with
the monadic combinator replicateM.
Then, before returning,
just make sure that there are no "repeated items"
by running nubWithKey fst3 over the generated
list. That will cut out any items with the same name
as previous items.
> instance Arbitrary TestItems where
> arbitrary = sized $ \n -> do
> items <- replicateM n
> $ do
> i <- arbitrary
> v <- choose (1, n)
> s <- choose (1, n)
> return $ (i, v, s)
> return . TestItems . nubWithKey fst3 $ items
With an Arbitrary instance, we can now define a property: it extracts the tuples and creates Items out of them, then tries out both algorithms for equivalence. Note that I am only checking the final values, not the actual items, because there may be more than one solution of the same value.
> prop_effectivePacking (TestItems items) = v1 == v2
> where items' = map tripleToItem items
> Cell (v1,_) = brutepack 16 items'
> Cell (v2,_) = dynapack 16 items'
Knapsack> verboseCheck prop_effectivePacking
0:
TestItems [(1,2,3),(3,2,2)]
1:
TestItems []
2:
TestItems [(1,1,1)]
3:
TestItems [(2,1,2),(-1,2,3),(0,2,3)]
4:
TestItems [(-2,2,2),(-1,1,1),(3,2,3),(4,2,2)]
...
It will progressively check larger "size" samples and you will
notice that the brute force algorithm is going to start
dragging down performance mightily. On my computer, in the
ghci interpreter, brutepack on even just 20 items takes 10 seconds;
while dynapack takes almost no time at all.
Conclusion
Algorithms making use of Dynamic Programming techniques are often expressed in the literature in an imperative style. I have demonstrated an example of one such algorithm in a functional language without resorting to any imperative features. The result is a natural recursive expression of the algorithm, that has its advantage from the use of lazy evaluation.
Simple Parsec Example: HTMangL
Writing bird-style Literate Haskell is working out pretty well for me. Actually, I prefer latex-style, but bird-style works well for short articles with little to no math. Bird-style is also fairly easy to integrate with HTML markup; except for the fact that it uses '>' to designate code. Browsers vary, but most seem to handle the non-XHTML compliant markup with few snags. However, I got sick of dealing with those snags as they can be difficult to spot on occasion.
I put together a small program to deal with this mess, and also as a small Parsec usage tutorial. This program processes bird-style Literate Haskell input and outputs a converted version with the code-blocks surrounded by <code> tags while also converting >, <, and & to entity designators.
> module Main where
> import Text.ParserCombinators.Parsec
A few combinators are built up from smaller ones.
eol and tilEOL parse
the remaining characters in a line up to and including
the newline character (or EOF).
> eol = newline <|> (eof >> return '\n')
> tilEOL = manyTill (noneOf "\n") eol
A line of code begins with "> " and continues til EOL.
> codeLine = do
> string "> "
> code <- tilEOL
> return $ "> " ++ code
A non-blank literate line can begin with any character
but newline, and if it begins with '>' then it cannot
be followed by a space. To those coming from imperative
backgrounds, the return () does not
return from the function but rather returns ()
to the monad; here it is used as a no-op. The rest of
the line is treated as above.
> litLine = do
> ch <- noneOf "\n"
> if ch == '>' then
> notFollowedBy space
> else
> return ()
> text <- tilEOL
> return $ ch:text
A blank line is one which begins with a newline.
> blankLine = char '\n' >> return ""
Blocks of code and literate lines (or blanks) are simply multiple consecutive lines (at least 1).
> code = many1 (try codeLine)
> lit = many1 (try litLine <|> blankLine)
> data LiterateCode = Literate [String]
> | Code [String]
> deriving (Show, Eq)
A literate Haskell file is composed of many Code and Literate
blocks. These are unified in one disjoint type, LiterateCode,
and the combinator below ensures that the appropriate tag is
applied to the results of parsing.
> literateCode = many (Code `fmap` code <|> Literate `fmap` lit)
A block of literate text is printed literally, but code must be processed slightly.
> printBlock (Literate ls) = mapM_ putStrLn ls
> printBlock (Code cs) = do
> putStrLn "<code>\n"
> mapM_ (putStrLn . subEntities) cs
> putStrLn "\n</code><br/>"
In case you were wondering how this works: it maps the function over each character in the input string and concatenates the resulting list of strings.
> subEntities = (>>= \c ->
> case c of
> '>' -> ">"
> '<' -> "<"
> '&' -> "&"
> c -> [c])
Really simple: work on stdin, print to stdout.
> main = do
> s <- getContents
> case parse literateCode "stdin" s of
> Left err -> putStr "Error: " >> print err
> Right cs -> mapM_ printBlock cs
Naturally, the first candidate code to run this program on is this program itself: I call it HTMangL.
A simple TCP server
A simple TCP server which accepts multiple clients and echos input text back to all those connected. It uses threads to manage multiple handles, and Software Transactional Memory to pass messages.
This text is literate Haskell, and has been tested with ghc 6.6 on Linux/x86. Type annotations are included for didactic purposes.
> module Main where
> import Prelude hiding (catch)
Module Network is the simple networking
library, presenting a Handle-based interface.
> import Network (listenOn, accept, sClose, Socket,
> withSocketsDo, PortID(..))
> import System.IO
> import System.Environment (getArgs)
> import Control.Exception (finally, catch)
> import Control.Concurrent
> import Control.Concurrent.STM
> import Control.Monad (forM, filterM, liftM, when)
A simple main to parse a port number from
the command line, and fire up the server socket.
> main = withSocketsDo $ do
> [portStr] <- getArgs
> let port = fromIntegral (read portStr :: Int)
> servSock <- listenOn $ PortNumber port
> putStrLn $ "listening on: " ++ show port
> start servSock `finally` sClose servSock
> start servSock = do
> acceptChan <- atomically newTChan
> forkIO $ acceptLoop servSock acceptChan
> mainLoop servSock acceptChan []
acceptLoop manages the server socket, accepting
connections, starting client threads, and forwarding the relevant
information about them over the channel so the
main loop can multiplex it all together.
> type Client = (TChan String, Handle)
>
> acceptLoop :: Socket -> TChan Client -> IO ()
> acceptLoop servSock chan = do
> (cHandle, host, port) <- accept servSock
> cChan <- atomically newTChan
> cTID <- forkIO $ clientLoop cHandle cChan
> atomically $ writeTChan chan (cChan, cHandle)
> acceptLoop servSock chan
As before, each client gets a loop which reads from the handle and pumps the data right into a channel. However, this time, exception handling is done per-thread; if a client disconnects we just want the thread to die silently. A more clever implementation might have a more structured channel which allows it to indicate when the client disconnects.
> clientLoop :: Handle -> TChan String -> IO ()
> clientLoop handle chan =
> listenLoop (hGetLine handle) chan
> `catch` (const $ return ())
> `finally` hClose handle
> listenLoop :: IO a -> TChan a -> IO ()
> listenLoop act chan =
> sequence_ (repeat (act >>= atomically . writeTChan chan))
STM conveniently allows composition of actions which makes custom tailoring of library code a snap. Here, I've added an additional action to check the status of the acceptChan along with all the clients. The acceptChan has a different type than any of the client channels, so I separate it from the others using an Either data type for simplicity. `fmap` here acts very much like (.), the functional composition operator.
> mainLoop :: Socket -> TChan Client -> [Client] -> IO ()
> mainLoop servSock acceptChan clients = do
> r <- atomically $ (Left `fmap` readTChan acceptChan)
> `orElse`
> (Right `fmap` tselect clients)
> case r of
> Left (ch,h) -> do
> putStrLn "new client"
> mainLoop servSock acceptChan $ (ch,h):clients
> Right (line,_) -> do
> putStrLn $ "data: " ++ line
In addition to sending the data out to every client, this loop catches any errors from writing to handles and excludes that client from the list.
> clients' <- forM clients $
> \(ch,h) -> do
> hPutStrLn h line
> hFlush h
> return [(ch,h)]
> `catch` const (hClose h >> return [])
> let dropped = length $ filter null clients'
> when (dropped > 0) $
> putStrLn ("clients lost: " ++ show dropped)
> mainLoop servSock acceptChan $ concat clients'
tselect is a function which multiplexes any number of
TChans. It will return the data from whichever TChan it can read,
along with the "key" value that can be supplied in the pair.
This takes advantage of the STM combinators orElse
and retry by applying them to a list of actions
constructed around the TChans.
> tselect :: [(TChan a, t)] -> STM (a, t)
> tselect = foldl orElse retry
> . map (\(ch, ty) -> (flip (,) ty) `fmap` readTChan ch)
This code demonstrates a basic TCP server as well as a more
generally applicable function tselect. It serves
as an example of the strength and simplicity of the Software
Transactional Memory model, and of network IO in Haskell.
- Login to post comments
A simple TCP client
A simple example network client showing how to multiplex the reading of lines from the remote peer and the local user, using Software Transactional Memory to do message-passing and light-weight threads.
This text is literate Haskell, and has been tested with ghc 6.6 on Linux/x86. Type annotations are included for didactic purposes.
> module Main where
> import Prelude hiding (catch)
Module Network is the simple networking
library, presenting a Handle-based interface.
> import Network (connectTo, withSocketsDo, PortID(..))
> import System.IO
> import System.IO.Error (isEOFError)
> import System.Environment (getArgs)
> import Control.Exception (finally, catch, Exception(..))
> import Control.Concurrent
> import Control.Concurrent.STM
main parses host and port from the command line
and connects to it.
Then it calls the start function with the
socket handle.
An error handler is defined which
prints out exceptions, except for EOF.
Finally, the socket is ensured to be closed.
withSocketsDo is required on Windows
platforms, but does no harm otherwise.
> main = withSocketsDo $ do
> [host, portStr] <- getArgs
PortNumbers are an instance of Num,
but not Read. So, we read it as an
Int, and then generalize to class Num
using fromIntegral.
> let port = fromIntegral (read portStr :: Int)
> sock <- connectTo host $ PortNumber port
> start sock `catch` handler `finally` hClose sock
> where
> handler (IOException e)
> | isEOFError e = return ()
> handler e = putStrLn $ show e
start takes care of the creation of threads
and channels to communicate between them. Each
thread spawned is responsible for listening to a given
handle, and forwarding any communications received along
the channel. Notice, in particular, how this listening
task has been abstracted into a higher-order monadic function.
The main thread is then used as the central "coordinating"
loop, as discussed below.
> start :: Handle -> IO ()
> start sock = do
> netChan <- atomically newTChan
> userChan <- atomically newTChan
> netTID <- spawn $ listenLoop (hGetLine sock) netChan
> userTID <- spawn $ listenLoop getLine userChan
> mainLoop sock netChan userChan
spawn is a small wrapper
around forkIO which adds a small
thread-specific exception handler that simply
passes any exceptions along to the main thread.
(Note: myThreadId is GHC-specific)
> spawn :: IO () -> IO ThreadId
> spawn act = do
> mainTID <- myThreadId
> forkIO $ act `catch` throwTo mainTID
listenLoop pipes the output of calling
an action repeatedly into a channel.
Read literally: listenLoop repeats forever, in
sequence, the action of invoking act and
then atomically writing its value to the channel.
> listenLoop :: IO a -> TChan a -> IO ()
> listenLoop act chan =
> sequence_ (repeat (act >>= atomically . writeTChan chan))
Here is an explicit-recursion version of the above:
listenLoop = do
v <- action
atomically $ writeTChan chan v
listenLoop action chan
Understanding why both versions of listenLoop
are equivalent will help you understand that monadic
actions in Haskell are first-class values.
mainLoop demonstrates the usage of a simple
select function which awaits input from one
of two channels.
> mainLoop :: Handle -> TChan String -> TChan String -> IO ()
> mainLoop sock netChan userChan = do
> input <- atomically $ select netChan userChan
> case input of
> -- from netChan, to user
> Left str -> putStrLn str >> hFlush stdout
> -- from userChan, to net
> Right str -> hPutStrLn sock str >> hFlush sock
> mainLoop sock netChan userChan
select multiplexes two TChans
using the STM combinator orElse.
Read plainly, it states that ch1 should
be consulted first, or else, ch2 should
be read. Any values from ch1 are tagged
with Left and any values from ch2
are tagged with Right.
The reason why this works seamlessly is because STM will keep track of which channels it has attempted to read. If both have nothing available right now, then STM knows it can block until one of the two channels has data ready.
> select :: TChan a -> TChan b -> STM (Either a b)
> select ch1 ch2 = do
> a <- readTChan ch1; return (Left a)
> `orElse` do
> b <- readTChan ch2; return (Right b)
Transcript:
$ ghc --make Client1.lhs
[1 of 1] Compiling Main ( Client1.lhs, Client1.o )
Linking Client1 ...
$ ./Client1 localhost 25
220 localhost ESMTP Exim 3.36
helo localhost
250 localhost Hello mrd at localhost [127.0.0.1]
quit
221 localhost closing connection
$
This code has demonstrated a simple TCP client which can receive and transmit lines in an interactive session. The implementation used light-weight threads to multiplex handles and Software Transactional Memory for inter-thread communication. Several re-usable functions were defined which show the expressive power and simplicity of STM and higher-order monadic functions.
Synchronized threads, part II
For comparison, here is an implementation of multiple threads of which each attempt to perform as many steps as possible in 1 second.
> import Control.Monad
> import Control.Concurrent
> import Control.Concurrent.STM
> import Data.List
> import Data.IORef
> import System.Time
> import System.Environment
> import System.IO
> import System.Random
> import Text.Printf
> import Ratio
oneThread greedily attempts to loop through as many steps as possible
until one second has elapsed. Then it blocks while it waits for
the main thread to collect the previous result, so it can put
the new result in the TMVar. Every step it takes, it executes
the supplied function parameter f.
> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO ()
> oneThread mvar n v f = do
> TOD s ps <- getClockTime
> loop (fromIntegral s + ps%(10^12)) n n v
> where
> loop prevTime prevN n v = do
> TOD s ps <- getClockTime
> let now = fromIntegral s + ps%(10^12)
> tdiff = now - prevTime
> ndiff = fromIntegral $ n - prevN
> sps = floor (ndiff / tdiff)
> v' = f v
> if tdiff >= 1 then
> do atomically $ putTMVar mvar sps
> loop now n n v
> else v' `seq` loop prevTime prevN (n + 1) v'
nosync is akin to sync in that it is an STM action which collects
results from all the threads via the TMVars. Again, the key
portion is easy: mapM takeTMVar mvars.
> nosync :: (Num a, Ord a) => [TMVar a] -> STM (a, a)
> nosync mvars = do
> vals <- mapM takeTMVar mvars
> return $ (maximum vals, sum vals)
> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int])
> initialize k v f = do
> mvars <- atomically (forM [1..k]
> (\_ -> newEmptyTMVar))
> thds <- forM (zip mvars [1..k])
> (\(ch, n) -> forkIO (oneThread ch 0 v f))
> return (thds, mvars)
nosyncLoop waits for all the threads to place a value into their TMVar, which will happen after one second.
> nosyncLoop :: [TMVar Int] -> IO ()
> nosyncLoop mvars = do
> (best, sum) <- atomically $ nosync mvars
> printf "Best steps / second = %d; Sum steps / second = %d\n" best sum
> hFlush stdout
> nosyncLoop mvars
A computational time-waster to simulate "real work".
> computation l = let (v:l') = l
> in fact v `seq` l'
>
> fact n = product [1..n]
> main :: IO ()
> main = do
> args <- getArgs
> let n = case args of
> [] -> 10
> a:_ -> read a
> g <- newStdGen
> (_,mvars) <- initialize n (randomRs (500,600) g) computation
> nosyncLoop mvars
System is a 4-way Xeon 3.6GHz.
[mrd@system ~]$ ghc --make -O2 -threaded Unsync.lhs
[mrd@system ~]$ ./Unsync 1 +RTS -N1
Best steps / second = 3179; Sum steps / second = 3179
Best steps / second = 3181; Sum steps / second = 3181
Best steps / second = 3178; Sum steps / second = 3178
Best steps / second = 3175; Sum steps / second = 3175
Best steps / second = 3174; Sum steps / second = 3174
[mrd@system ~]$ ./Unsync 1 +RTS -N2
Best steps / second = 3142; Sum steps / second = 3142
Best steps / second = 3168; Sum steps / second = 3168
Best steps / second = 3174; Sum steps / second = 3174
Best steps / second = 3177; Sum steps / second = 3177
Best steps / second = 3172; Sum steps / second = 3172
[mrd@system ~]$ ./Unsync 5 +RTS -N1
Best steps / second = 635; Sum steps / second = 3071
Best steps / second = 638; Sum steps / second = 3094
Best steps / second = 668; Sum steps / second = 3080
Best steps / second = 669; Sum steps / second = 3184
Best steps / second = 751; Sum steps / second = 3181
[mrd@system ~]$ ./Unsync 5 +RTS -N2
Best steps / second = 1429; Sum steps / second = 5601
Best steps / second = 1434; Sum steps / second = 5647
Best steps / second = 1446; Sum steps / second = 5647
Best steps / second = 1413; Sum steps / second = 5647
Best steps / second = 1502; Sum steps / second = 5639
[mrd@system ~]$ ./Unsync 5 +RTS -N3
Best steps / second = 1912; Sum steps / second = 5792
Best steps / second = 2092; Sum steps / second = 5934
Best steps / second = 2107; Sum steps / second = 5938
Best steps / second = 1959; Sum steps / second = 5922
Best steps / second = 2068; Sum steps / second = 5960
[mrd@system ~]$ ./Unsync 5 +RTS -N4
Best steps / second = 1876; Sum steps / second = 7428
Best steps / second = 1865; Sum steps / second = 7402
Best steps / second = 1891; Sum steps / second = 7420
Best steps / second = 1895; Sum steps / second = 7581
Best steps / second = 1899; Sum steps / second = 7602
[mrd@system ~]$ ./Unsync 10 +RTS -N1
Best steps / second = 334; Sum steps / second = 2852
Best steps / second = 332; Sum steps / second = 3100
Best steps / second = 334; Sum steps / second = 3082
Best steps / second = 335; Sum steps / second = 3176
Best steps / second = 335; Sum steps / second = 3186
[mrd@system ~]$ ./Unsync 10 +RTS -N2
Best steps / second = 594; Sum steps / second = 5577
Best steps / second = 669; Sum steps / second = 5631
Best steps / second = 588; Sum steps / second = 5641
Best steps / second = 622; Sum steps / second = 5657
Best steps / second = 604; Sum steps / second = 5639
[mrd@system ~]$ ./Unsync 10 +RTS -N3
Best steps / second = 702; Sum steps / second = 5846
Best steps / second = 692; Sum steps / second = 5865
Best steps / second = 717; Sum steps / second = 5884
Best steps / second = 679; Sum steps / second = 5893
Best steps / second = 745; Sum steps / second = 5913
[mrd@system ~]$ ./Unsync 10 +RTS -N4
Best steps / second = 949; Sum steps / second = 7133
Best steps / second = 958; Sum steps / second = 7198
Best steps / second = 989; Sum steps / second = 7189
Best steps / second = 906; Sum steps / second = 7155
Best steps / second = 964; Sum steps / second = 7181
Observations
Number of steps is proportional to number of processors, and inversely proportional to number of threads.
- Login to post comments
Synchronized threads, part I
I'm performing a small experiment with synchronization of threads: each thread is in a loop performing a "step" each time; the overall idea is to synchronize all threads such that each performs in parallel lock-step.
In this example, the threads are spawned by forkIO, and the synchronization is maintained with the use of the Software Transactional Memory library for MVars.
> import Control.Monad
> import Control.Concurrent
> import Control.Concurrent.STM
> import Data.List
> import System.Time
> import System.Environment
> import System.IO
> import System.Random
> import Text.Printf
> import Ratio
oneThread executes the steps of one thread while remaining in synchronization
with the rest. putTMVar will block until the TMVar is empty. Executes
a supplied function inside of the synchronization logic, for every step.
> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO ()
> oneThread syncv n v f = do
> atomically $ putTMVar syncv n
> let v' = f v
> v' `seq` oneThread syncv (n + 1) v' f
sync performs one step of synchronization ensuring that all the threads are
working on the same step number. Note that this function is written in
the STM monad. It is meant to execute as one atomic block. That means
it will block until all TMVars are filled by their threads. It won't stop
other threads from running and filling in their TMVars, and it won't
touch any of the TMVars until all of them are ready.
STM makes writing this a complete breeze. No worries about strange locking
issues, it just does the right thing. The key portion is dead simple:
mapM takeTMVar vars. It's functional, it's reusing monadic
combinators, and it's easy.
> sync :: (Eq a) => [TMVar a] -> a -> STM Bool
> sync vars n = do
> vals <- mapM takeTMVar vars
> return $ all (==n) vals
Initialize k threads each with a TMVar for synchronization.
> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int])
> initialize k v f = do
> vars <- atomically (forM [1..k]
> (\_ -> newEmptyTMVar))
> thds <- forM vars
> (\var -> forkIO (oneThread var 0 v f))
> return (thds, vars)
simpleSyncLoop only terminates if the threads ever become unsynchronized.
> simpleSyncLoop vars n = do
> ok <- atomically $ sync vars n
> if ok then do
> printf "Synchronized at step = %d\n" n
> simpleSyncLoop vars $ n + 1
> else
> printf "Unsynchronized at step = %d\n" n
A computational time-waster to simulate "real work".
Pops a value off the random list and takes the factorial of it.
> computation l = let (v:l') = l
> in fact v `seq` l'
>
> fact n = product [1..n]
A simple main function which starts 10 threads and runs the test forever.
The computation is initialized with an infinite list of random numbers
between 500 and 600.
> simpleMain = do
> g <- newStdGen
> (_,vars) <- initialize 10 (randomRs (500,600) g) computation
> simpleSyncLoop vars 0
timingSyncLoop attempts to count the number of steps taken per second.
(Note: using the TOD (time-of-day) constructor directly like this is a GHC-specific extension)
> timingSyncLoop vars n = do
> -- TOD seconds picoseconds
> TOD s ps <- getClockTime
> loop (fromIntegral s + ps%(10^12)) n n
> where
> noThds = length vars
> loop prevTime prevN n = do
> TOD s ps <- getClockTime
> let now = fromIntegral s + ps%(10^12)
> tdiff = now - prevTime
> ndiff = fromIntegral $ n - prevN
> sps = floor (ndiff / tdiff) :: Int
> if tdiff >= 1 then
> do printf "Steps / sec each: %d; Steps / sec total: %d\n" sps (sps * noThds)
> hFlush stdout
> loop now n n
> else
> do ok <- atomically $ sync vars n
> if ok then do
> loop prevTime prevN $ n + 1
> else
> printf "Unsynchronized at step = %d\n" n
Examines the first command line argument to determine how many threads to
create, defaulting with 10. Initializes the threads and runs the timingSyncLoop
indefinitely.
> timingWithArgMain = do
> args <- getArgs
> let n = case args of
> [] -> 10
> a:_ -> read a
> g <- newStdGen
> (_,vars) <- initialize n (randomRs (500,600) g) computation
> timingSyncLoop vars 0
> main :: IO ()
> main = timingWithArgMain
System is a 4-way Xeon 3.6GHz.
[mrd@system ~]$ ghc --make -O2 -threaded Sync.lhs
[mrd@system ~]$ ./Sync 1 +RTS -N1
Steps / sec each: 2978; Steps / sec total: 2978
Steps / sec each: 2974; Steps / sec total: 2974
Steps / sec each: 2968; Steps / sec total: 2968
Steps / sec each: 2953; Steps / sec total: 2953
Steps / sec each: 2939; Steps / sec total: 2939
[mrd@system ~]$ ./Sync 1 +RTS -N2
Steps / sec each: 3301; Steps / sec total: 3301
Steps / sec each: 3297; Steps / sec total: 3297
Steps / sec each: 3279; Steps / sec total: 3279
Steps / sec each: 3286; Steps / sec total: 3286
Steps / sec each: 3254; Steps / sec total: 3254
[mrd@system ~]$ ./Sync 1 +RTS -N3
Steps / sec each: 3332; Steps / sec total: 3332
Steps / sec each: 3311; Steps / sec total: 3311
Steps / sec each: 3409; Steps / sec total: 3409
Steps / sec each: 3492; Steps / sec total: 3492
Steps / sec each: 3456; Steps / sec total: 3456
[mrd@system ~]$ ./Sync 1 +RTS -N4
Steps / sec each: 3374; Steps / sec total: 3374
Steps / sec each: 3515; Steps / sec total: 3515
Steps / sec each: 3471; Steps / sec total: 3471
Steps / sec each: 3452; Steps / sec total: 3452
Steps / sec each: 3418; Steps / sec total: 3418
[mrd@system ~]$ ./Sync 5 +RTS -N1
Steps / sec each: 659; Steps / sec total: 3295
Steps / sec each: 649; Steps / sec total: 3245
Steps / sec each: 655; Steps / sec total: 3275
Steps / sec each: 649; Steps / sec total: 3245
Steps / sec each: 653; Steps / sec total: 3265
[mrd@system ~]$ ./Sync 5 +RTS -N2
Steps / sec each: 947; Steps / sec total: 4735
Steps / sec each: 813; Steps / sec total: 4065
Steps / sec each: 874; Steps / sec total: 4370
Steps / sec each: 901; Steps / sec total: 4505
Steps / sec each: 803; Steps / sec total: 4015
[mrd@system ~]$ ./Sync 5 +RTS -N3
Steps / sec each: 1114; Steps / sec total: 5570
Steps / sec each: 914; Steps / sec total: 4570
Steps / sec each: 993; Steps / sec total: 4965
Steps / sec each: 1020; Steps / sec total: 5100
Steps / sec each: 983; Steps / sec total: 4915
[mrd@system ~]$ ./Sync 5 +RTS -N4
Steps / sec each: 994; Steps / sec total: 4970
Steps / sec each: 833; Steps / sec total: 4165
Steps / sec each: 899; Steps / sec total: 4495
Steps / sec each: 787; Steps / sec total: 3935
Steps / sec each: 878; Steps / sec total: 4390
[mrd@system ~]$ ./Sync 10 +RTS -N1
Steps / sec each: 286; Steps / sec total: 2860
Steps / sec each: 316; Steps / sec total: 3160
Steps / sec each: 314; Steps / sec total: 3140
Steps / sec each: 313; Steps / sec total: 3130
Steps / sec each: 302; Steps / sec total: 3020
Sync: interrupted
[mrd@system ~]$ ./Sync 10 +RTS -N2
Steps / sec each: 563; Steps / sec total: 5630
Steps / sec each: 557; Steps / sec total: 5570
Steps / sec each: 562; Steps / sec total: 5620
Steps / sec each: 558; Steps / sec total: 5580
Steps / sec each: 563; Steps / sec total: 5630
[mrd@system ~]$ ./Sync 10 +RTS -N3
Steps / sec each: 568; Steps / sec total: 5680
Steps / sec each: 562; Steps / sec total: 5620
Steps / sec each: 561; Steps / sec total: 5610
Steps / sec each: 567; Steps / sec total: 5670
Steps / sec each: 550; Steps / sec total: 5500
[mrd@system ~]$ ./Sync 10 +RTS -N4
Steps / sec each: 555; Steps / sec total: 5550
Steps / sec each: 516; Steps / sec total: 5160
Steps / sec each: 436; Steps / sec total: 4360
Steps / sec each: 514; Steps / sec total: 5140
Steps / sec each: 488; Steps / sec total: 4880
- Login to post comments