mrd's blog

Dynamic Programming in Haskell

Submitted by mrd on Thu, 02/22/2007 - 8:57pm.

Dynamic Programming is an algorithm design strategy which can be essentially described as breaking a problem down into subproblems and re-using previously computed results. Sometimes, a problem which appears to take exponential time to solve can be reformulated using a Dynamic Programming approach in which it becomes tractable. The benefit is especially clear when the subproblem solutions overlap considerably. The technique of memoization is a major time-saver in those cases.

A common Haskell newbie question is: how do I memoize? At first, it appears to be a very difficult problem, because access to mutable arrays and hashtables is restricted. It is important to realize that lazy evaluation is actually memoization itself and can be leveraged in that way for the purposes of Dynamic Programming. In fact, as a result, the expression of these algorithms can be more natural and lucid in Haskell than in a strict language.

Here, I am going to examine the classic ``knapsack problem.'' Given a number of items, their values, and their sizes -- what is the best combination of items that you can fit in a limited-size knapsack?

> module Knapsack where > import Control.Monad > import Data.Array > import Data.List > import Test.QuickCheck
I am going to represent items with this data type. Essentially, it is just a tuple of the item itself, its value, and its size. > data Item a = Item { item :: a, > itemValue :: Int, > itemSize :: Int } > deriving (Eq, Show, Ord)
Cells will be used both to represent the solution to the knapsack problem, and as individual cells in the matrix for the Dynamic Programming algorithm. It is a pair consisting of: summed values, and the items in the sack. > data Cell a = Cell (Int, [Item a]) > deriving (Eq, Show, Ord)
This powerset definition is a very neat use of the List monad (which I pulled from the Haskell wiki). You can think of it as saying: for each element in the list, half the possible subsets will include it, and half will not. > powerset :: [a] -> [[a]] > powerset = filterM (const [True, False])
brutepack considers the powerset of the items, cutting out those subsets which are too large in size, and picking the most valuable subset left. As you might figure, this is going to run in O(2^n) thanks to the use of powerset. The definition should be simple to understand and will provide a sound basis for testing the Dynamic Programming alternative. > brutepack :: (Ord a) => Int -> [Item a] -> Cell a > brutepack size items = maximum [ > cellOf subset | > subset <- powerset items, itemsFit subset > ] > where > itemsFit items = sum (map itemSize items) <= size > cellOf items = Cell (sum (map itemValue items), items)

The Dynamic Programming algorithm is as follows:

Consider a matrix where the rows are indexed by size and the columns by items. The rows range from 1 to the size of the knapsack, and the columns are one-to-one with the items in the list.


value = $30     $20    $40
 size =  4       3      5
_item_|_lion___tiger___bear_
      |
     1|
      |
     2|
      |
     3|
      |
     4|        v(2,4)<--.
      |                 |
     5|                 |
      |                 |
     6|                 |
      |                 |
     7|                 |
      |              <--|    
     8|        v(2,8) v(3,8)
      |

This is a diagram where the knapsack has a maximum size allowance of 8, and we want to stuff some animals in it. Each element in the matrix is going to tell us the best value of items by size. That means the answer to the whole problem is going to be found in v(3,8) which is the bottom-rightmost corner.

The value of any one cell in the matrix will be decided by whether it is worthwhile to add that item to the sack or not. In the v(3,8) cell it compares the v(2,8) cell to the left to the v(2,4) cell up above. The v(2,8) cell has no room for the bear, and the v(2,4) cell represents the situation where the bear will fit. So the question, after determining if the bear will fit in the bag at all, is: is value of bear + v(2,4) better than v(2,8)?

This definition of v lends itself to a very nice recursive formulation.

             
              / v(m-1,n)                    if (s_n) > n
              \ 
    v(m,n) = -|      / v(m-1,n)
              /      \
              \ max -|                      otherwise
                     /
                     \ v(m-1,n-(s_n)) + v_n
where s_n is the size of item n and v_n is its value.

A typical implementation of this algorithm might initialize a 2D array to some value representing ``undefined.'' In Haskell, we can initialize the entire array to the correct value directly and recursively because it will not be computed until needed. All that is necessary is to express the data-dependencies, and the order of evaluation will take care of itself.

This code takes the algorithm a little further by tracking a field in each cell that contains a list of the items in the sack at that point.

> dynapack :: Int -> [Item a] -> Cell a > dynapack size items = valOf noItems size > where > noItems = length items > itemsArr = listArray (1,noItems) items > itemNo n = itemsArr ! n > > table = array ((1,1),(noItems,size)) $ > [ > ((m, n), cell m n) | > m <- [1..noItems], > n <- [1..size] > ] > > valOf m n > | m < 1 || n < 1 = Cell (0, []) > | otherwise = table ! (m,n) > > cell m n = > case itemNo m of > i@(Item _ v s) > | s > n || > vL >= vU + v -> Cell (vL , isL) > | otherwise -> Cell (vU + v, i:isU) > where > Cell (vL, isL) = valOf (m - 1) n > Cell (vU, isU) = valOf (m - 1) (n - s)

The matrix is defined in the variable table and valOf is our function v here. This definition very naturally follows from the algorithmic description because there is no problem with self-reference when defining cells in the array. In a strict language, the programmer would have to manually check for the presence of values and fill in the table.

Testing

It's important to be confident that the algorithm is coded correctly. Let's see if brutepack and dynapack agree on test inputs. I will define my own simple data type to customize the randomly generated test data.

> newtype TestItems = TestItems [(Int, Int, Int)] > deriving (Eq, Show, Ord)
> nubWithKey k = nubBy (\a b -> k a == k b) > fst3 (a,b,c) = a > tripleToItem (i,v,s) = Item i v s

The Arbitrary class expects you to define your customly generated test data in the Gen monad. QuickCheck provides a number of handy combinators, and of course you can use normal monadic functions.

sized is a QuickCheck combinator which binds the generator's notion of "size" to a parameter of the supplied function. This notion of "size" is a hint to test-data creators that QuickCheck wants data on the "order of" that size. Of course, what "size" means can be freely interpreted by the author of the function, in this case I am using it for a couple purposes.

The basic idea is simply: create a list of randomly generated tuples of length "size", and choose values and item-sizes randomly from (1, "size"). Notice how the randomly generated tuple is replicated with the monadic combinator replicateM. Then, before returning, just make sure that there are no "repeated items" by running nubWithKey fst3 over the generated list. That will cut out any items with the same name as previous items.

> instance Arbitrary TestItems where > arbitrary = sized $ \n -> do > items <- replicateM n > $ do > i <- arbitrary > v <- choose (1, n) > s <- choose (1, n) > return $ (i, v, s) > return . TestItems . nubWithKey fst3 $ items

With an Arbitrary instance, we can now define a property: it extracts the tuples and creates Items out of them, then tries out both algorithms for equivalence. Note that I am only checking the final values, not the actual items, because there may be more than one solution of the same value.

> prop_effectivePacking (TestItems items) = v1 == v2 > where items' = map tripleToItem items > Cell (v1,_) = brutepack 16 items' > Cell (v2,_) = dynapack 16 items'
Knapsack> verboseCheck prop_effectivePacking 0: TestItems [(1,2,3),(3,2,2)] 1: TestItems [] 2: TestItems [(1,1,1)] 3: TestItems [(2,1,2),(-1,2,3),(0,2,3)] 4: TestItems [(-2,2,2),(-1,1,1),(3,2,3),(4,2,2)] ...

It will progressively check larger "size" samples and you will notice that the brute force algorithm is going to start dragging down performance mightily. On my computer, in the ghci interpreter, brutepack on even just 20 items takes 10 seconds; while dynapack takes almost no time at all.

Conclusion

Algorithms making use of Dynamic Programming techniques are often expressed in the literature in an imperative style. I have demonstrated an example of one such algorithm in a functional language without resorting to any imperative features. The result is a natural recursive expression of the algorithm, that has its advantage from the use of lazy evaluation.

Simple Parsec Example: HTMangL

Submitted by mrd on Sun, 01/28/2007 - 11:01pm.
::

Writing bird-style Literate Haskell is working out pretty well for me. Actually, I prefer latex-style, but bird-style works well for short articles with little to no math. Bird-style is also fairly easy to integrate with HTML markup; except for the fact that it uses '>' to designate code. Browsers vary, but most seem to handle the non-XHTML compliant markup with few snags. However, I got sick of dealing with those snags as they can be difficult to spot on occasion.

I put together a small program to deal with this mess, and also as a small Parsec usage tutorial. This program processes bird-style Literate Haskell input and outputs a converted version with the code-blocks surrounded by <code> tags while also converting >, <, and & to entity designators.

> module Main where > import Text.ParserCombinators.Parsec

A few combinators are built up from smaller ones. eol and tilEOL parse the remaining characters in a line up to and including the newline character (or EOF).

> eol = newline <|> (eof >> return '\n') > tilEOL = manyTill (noneOf "\n") eol

A line of code begins with "> " and continues til EOL.

> codeLine = do > string "> " > code <- tilEOL > return $ "> " ++ code

A non-blank literate line can begin with any character but newline, and if it begins with '>' then it cannot be followed by a space. To those coming from imperative backgrounds, the return () does not return from the function but rather returns () to the monad; here it is used as a no-op. The rest of the line is treated as above.

> litLine = do > ch <- noneOf "\n" > if ch == '>' then > notFollowedBy space > else > return () > text <- tilEOL > return $ ch:text

A blank line is one which begins with a newline.

> blankLine = char '\n' >> return ""

Blocks of code and literate lines (or blanks) are simply multiple consecutive lines (at least 1).

> code = many1 (try codeLine) > lit = many1 (try litLine <|> blankLine)
> data LiterateCode = Literate [String] > | Code [String] > deriving (Show, Eq)

A literate Haskell file is composed of many Code and Literate blocks. These are unified in one disjoint type, LiterateCode, and the combinator below ensures that the appropriate tag is applied to the results of parsing.

> literateCode = many (Code `fmap` code <|> Literate `fmap` lit)

A block of literate text is printed literally, but code must be processed slightly.

> printBlock (Literate ls) = mapM_ putStrLn ls > printBlock (Code cs) = do > putStrLn "<code>\n" > mapM_ (putStrLn . subEntities) cs > putStrLn "\n</code><br/>"

In case you were wondering how this works: it maps the function over each character in the input string and concatenates the resulting list of strings.

> subEntities = (>>= \c -> > case c of > '>' -> "&gt;" > '<' -> "&lt;" > '&' -> "&amp;" > c -> [c])

Really simple: work on stdin, print to stdout.

> main = do > s <- getContents > case parse literateCode "stdin" s of > Left err -> putStr "Error: " >> print err > Right cs -> mapM_ printBlock cs

Naturally, the first candidate code to run this program on is this program itself: I call it HTMangL.

A simple TCP server

Submitted by mrd on Fri, 01/19/2007 - 10:19pm.

A simple TCP server which accepts multiple clients and echos input text back to all those connected. It uses threads to manage multiple handles, and Software Transactional Memory to pass messages.

This text is literate Haskell, and has been tested with ghc 6.6 on Linux/x86. Type annotations are included for didactic purposes.

> module Main where > import Prelude hiding (catch)

Module Network is the simple networking library, presenting a Handle-based interface.

> import Network (listenOn, accept, sClose, Socket, > withSocketsDo, PortID(..)) > import System.IO > import System.Environment (getArgs) > import Control.Exception (finally, catch) > import Control.Concurrent > import Control.Concurrent.STM > import Control.Monad (forM, filterM, liftM, when)

A simple main to parse a port number from the command line, and fire up the server socket.

> main = withSocketsDo $ do > [portStr] <- getArgs > let port = fromIntegral (read portStr :: Int) > servSock <- listenOn $ PortNumber port > putStrLn $ "listening on: " ++ show port > start servSock `finally` sClose servSock
> start servSock = do > acceptChan <- atomically newTChan > forkIO $ acceptLoop servSock acceptChan > mainLoop servSock acceptChan []

acceptLoop manages the server socket, accepting connections, starting client threads, and forwarding the relevant information about them over the channel so the main loop can multiplex it all together.

> type Client = (TChan String, Handle) > > acceptLoop :: Socket -> TChan Client -> IO () > acceptLoop servSock chan = do > (cHandle, host, port) <- accept servSock > cChan <- atomically newTChan > cTID <- forkIO $ clientLoop cHandle cChan > atomically $ writeTChan chan (cChan, cHandle) > acceptLoop servSock chan

As before, each client gets a loop which reads from the handle and pumps the data right into a channel. However, this time, exception handling is done per-thread; if a client disconnects we just want the thread to die silently. A more clever implementation might have a more structured channel which allows it to indicate when the client disconnects.

> clientLoop :: Handle -> TChan String -> IO () > clientLoop handle chan = > listenLoop (hGetLine handle) chan > `catch` (const $ return ()) > `finally` hClose handle > listenLoop :: IO a -> TChan a -> IO () > listenLoop act chan = > sequence_ (repeat (act >>= atomically . writeTChan chan))

STM conveniently allows composition of actions which makes custom tailoring of library code a snap. Here, I've added an additional action to check the status of the acceptChan along with all the clients. The acceptChan has a different type than any of the client channels, so I separate it from the others using an Either data type for simplicity. `fmap` here acts very much like (.), the functional composition operator.

> mainLoop :: Socket -> TChan Client -> [Client] -> IO () > mainLoop servSock acceptChan clients = do > r <- atomically $ (Left `fmap` readTChan acceptChan) > `orElse` > (Right `fmap` tselect clients) > case r of > Left (ch,h) -> do > putStrLn "new client" > mainLoop servSock acceptChan $ (ch,h):clients > Right (line,_) -> do > putStrLn $ "data: " ++ line

In addition to sending the data out to every client, this loop catches any errors from writing to handles and excludes that client from the list.

> clients' <- forM clients $ > \(ch,h) -> do > hPutStrLn h line > hFlush h > return [(ch,h)] > `catch` const (hClose h >> return []) > let dropped = length $ filter null clients' > when (dropped > 0) $ > putStrLn ("clients lost: " ++ show dropped) > mainLoop servSock acceptChan $ concat clients'

tselect is a function which multiplexes any number of TChans. It will return the data from whichever TChan it can read, along with the "key" value that can be supplied in the pair. This takes advantage of the STM combinators orElse and retry by applying them to a list of actions constructed around the TChans.

> tselect :: [(TChan a, t)] -> STM (a, t) > tselect = foldl orElse retry > . map (\(ch, ty) -> (flip (,) ty) `fmap` readTChan ch)

This code demonstrates a basic TCP server as well as a more generally applicable function tselect. It serves as an example of the strength and simplicity of the Software Transactional Memory model, and of network IO in Haskell.

A simple TCP client

Submitted by mrd on Thu, 01/18/2007 - 8:02pm.

A simple example network client showing how to multiplex the reading of lines from the remote peer and the local user, using Software Transactional Memory to do message-passing and light-weight threads.

This text is literate Haskell, and has been tested with ghc 6.6 on Linux/x86. Type annotations are included for didactic purposes.


> module Main where > import Prelude hiding (catch)

Module Network is the simple networking library, presenting a Handle-based interface.

> import Network (connectTo, withSocketsDo, PortID(..)) > import System.IO > import System.IO.Error (isEOFError) > import System.Environment (getArgs) > import Control.Exception (finally, catch, Exception(..)) > import Control.Concurrent > import Control.Concurrent.STM

main parses host and port from the command line and connects to it. Then it calls the start function with the socket handle. An error handler is defined which prints out exceptions, except for EOF. Finally, the socket is ensured to be closed.

withSocketsDo is required on Windows platforms, but does no harm otherwise.

> main = withSocketsDo $ do > [host, portStr] <- getArgs

PortNumbers are an instance of Num, but not Read. So, we read it as an Int, and then generalize to class Num using fromIntegral.

> let port = fromIntegral (read portStr :: Int) > sock <- connectTo host $ PortNumber port > start sock `catch` handler `finally` hClose sock > where > handler (IOException e) > | isEOFError e = return () > handler e = putStrLn $ show e

start takes care of the creation of threads and channels to communicate between them. Each thread spawned is responsible for listening to a given handle, and forwarding any communications received along the channel. Notice, in particular, how this listening task has been abstracted into a higher-order monadic function. The main thread is then used as the central "coordinating" loop, as discussed below.

> start :: Handle -> IO () > start sock = do > netChan <- atomically newTChan > userChan <- atomically newTChan > netTID <- spawn $ listenLoop (hGetLine sock) netChan > userTID <- spawn $ listenLoop getLine userChan > mainLoop sock netChan userChan

spawn is a small wrapper around forkIO which adds a small thread-specific exception handler that simply passes any exceptions along to the main thread.

(Note: myThreadId is GHC-specific)

> spawn :: IO () -> IO ThreadId > spawn act = do > mainTID <- myThreadId > forkIO $ act `catch` throwTo mainTID

listenLoop pipes the output of calling an action repeatedly into a channel.

Read literally: listenLoop repeats forever, in sequence, the action of invoking act and then atomically writing its value to the channel.

> listenLoop :: IO a -> TChan a -> IO () > listenLoop act chan = > sequence_ (repeat (act >>= atomically . writeTChan chan))

Here is an explicit-recursion version of the above:

listenLoop = do v <- action atomically $ writeTChan chan v listenLoop action chan

Understanding why both versions of listenLoop are equivalent will help you understand that monadic actions in Haskell are first-class values.

mainLoop demonstrates the usage of a simple select function which awaits input from one of two channels.

> mainLoop :: Handle -> TChan String -> TChan String -> IO () > mainLoop sock netChan userChan = do > input <- atomically $ select netChan userChan > case input of > -- from netChan, to user > Left str -> putStrLn str >> hFlush stdout > -- from userChan, to net > Right str -> hPutStrLn sock str >> hFlush sock > mainLoop sock netChan userChan

select multiplexes two TChans using the STM combinator orElse.

Read plainly, it states that ch1 should be consulted first, or else, ch2 should be read. Any values from ch1 are tagged with Left and any values from ch2 are tagged with Right.

The reason why this works seamlessly is because STM will keep track of which channels it has attempted to read. If both have nothing available right now, then STM knows it can block until one of the two channels has data ready.

> select :: TChan a -> TChan b -> STM (Either a b) > select ch1 ch2 = do > a <- readTChan ch1; return (Left a) > `orElse` do > b <- readTChan ch2; return (Right b)

Transcript:


$ ghc --make Client1.lhs [1 of 1] Compiling Main ( Client1.lhs, Client1.o ) Linking Client1 ... $ ./Client1 localhost 25 220 localhost ESMTP Exim 3.36 helo localhost 250 localhost Hello mrd at localhost [127.0.0.1] quit 221 localhost closing connection $

This code has demonstrated a simple TCP client which can receive and transmit lines in an interactive session. The implementation used light-weight threads to multiplex handles and Software Transactional Memory for inter-thread communication. Several re-usable functions were defined which show the expressive power and simplicity of STM and higher-order monadic functions.

Synchronized threads, part II

Submitted by mrd on Fri, 01/05/2007 - 6:34pm.

For comparison, here is an implementation of multiple threads of which each attempt to perform as many steps as possible in 1 second.


> import Control.Monad > import Control.Concurrent > import Control.Concurrent.STM > import Data.List > import Data.IORef > import System.Time > import System.Environment > import System.IO > import System.Random > import Text.Printf > import Ratio

oneThread greedily attempts to loop through as many steps as possible until one second has elapsed. Then it blocks while it waits for the main thread to collect the previous result, so it can put the new result in the TMVar. Every step it takes, it executes the supplied function parameter f.

> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO () > oneThread mvar n v f = do > TOD s ps <- getClockTime > loop (fromIntegral s + ps%(10^12)) n n v > where > loop prevTime prevN n v = do > TOD s ps <- getClockTime > let now = fromIntegral s + ps%(10^12) > tdiff = now - prevTime > ndiff = fromIntegral $ n - prevN > sps = floor (ndiff / tdiff) > v' = f v > if tdiff >= 1 then > do atomically $ putTMVar mvar sps > loop now n n v > else v' `seq` loop prevTime prevN (n + 1) v'

nosync is akin to sync in that it is an STM action which collects results from all the threads via the TMVars. Again, the key portion is easy: mapM takeTMVar mvars.

> nosync :: (Num a, Ord a) => [TMVar a] -> STM (a, a) > nosync mvars = do > vals <- mapM takeTMVar mvars > return $ (maximum vals, sum vals)
> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int]) > initialize k v f = do > mvars <- atomically (forM [1..k] > (\_ -> newEmptyTMVar)) > thds <- forM (zip mvars [1..k]) > (\(ch, n) -> forkIO (oneThread ch 0 v f)) > return (thds, mvars)

nosyncLoop waits for all the threads to place a value into their TMVar, which will happen after one second.

> nosyncLoop :: [TMVar Int] -> IO () > nosyncLoop mvars = do > (best, sum) <- atomically $ nosync mvars > printf "Best steps / second = %d; Sum steps / second = %d\n" best sum > hFlush stdout > nosyncLoop mvars

A computational time-waster to simulate "real work".

> computation l = let (v:l') = l > in fact v `seq` l' > > fact n = product [1..n]
> main :: IO () > main = do > args <- getArgs > let n = case args of > [] -> 10 > a:_ -> read a > g <- newStdGen > (_,mvars) <- initialize n (randomRs (500,600) g) computation > nosyncLoop mvars

System is a 4-way Xeon 3.6GHz.


[mrd@system ~]$ ghc --make -O2 -threaded Unsync.lhs

[mrd@system ~]$ ./Unsync 1 +RTS -N1 Best steps / second = 3179; Sum steps / second = 3179 Best steps / second = 3181; Sum steps / second = 3181 Best steps / second = 3178; Sum steps / second = 3178 Best steps / second = 3175; Sum steps / second = 3175 Best steps / second = 3174; Sum steps / second = 3174 [mrd@system ~]$ ./Unsync 1 +RTS -N2 Best steps / second = 3142; Sum steps / second = 3142 Best steps / second = 3168; Sum steps / second = 3168 Best steps / second = 3174; Sum steps / second = 3174 Best steps / second = 3177; Sum steps / second = 3177 Best steps / second = 3172; Sum steps / second = 3172

[mrd@system ~]$ ./Unsync 5 +RTS -N1 Best steps / second = 635; Sum steps / second = 3071 Best steps / second = 638; Sum steps / second = 3094 Best steps / second = 668; Sum steps / second = 3080 Best steps / second = 669; Sum steps / second = 3184 Best steps / second = 751; Sum steps / second = 3181 [mrd@system ~]$ ./Unsync 5 +RTS -N2 Best steps / second = 1429; Sum steps / second = 5601 Best steps / second = 1434; Sum steps / second = 5647 Best steps / second = 1446; Sum steps / second = 5647 Best steps / second = 1413; Sum steps / second = 5647 Best steps / second = 1502; Sum steps / second = 5639 [mrd@system ~]$ ./Unsync 5 +RTS -N3 Best steps / second = 1912; Sum steps / second = 5792 Best steps / second = 2092; Sum steps / second = 5934 Best steps / second = 2107; Sum steps / second = 5938 Best steps / second = 1959; Sum steps / second = 5922 Best steps / second = 2068; Sum steps / second = 5960 [mrd@system ~]$ ./Unsync 5 +RTS -N4 Best steps / second = 1876; Sum steps / second = 7428 Best steps / second = 1865; Sum steps / second = 7402 Best steps / second = 1891; Sum steps / second = 7420 Best steps / second = 1895; Sum steps / second = 7581 Best steps / second = 1899; Sum steps / second = 7602

[mrd@system ~]$ ./Unsync 10 +RTS -N1 Best steps / second = 334; Sum steps / second = 2852 Best steps / second = 332; Sum steps / second = 3100 Best steps / second = 334; Sum steps / second = 3082 Best steps / second = 335; Sum steps / second = 3176 Best steps / second = 335; Sum steps / second = 3186 [mrd@system ~]$ ./Unsync 10 +RTS -N2 Best steps / second = 594; Sum steps / second = 5577 Best steps / second = 669; Sum steps / second = 5631 Best steps / second = 588; Sum steps / second = 5641 Best steps / second = 622; Sum steps / second = 5657 Best steps / second = 604; Sum steps / second = 5639 [mrd@system ~]$ ./Unsync 10 +RTS -N3 Best steps / second = 702; Sum steps / second = 5846 Best steps / second = 692; Sum steps / second = 5865 Best steps / second = 717; Sum steps / second = 5884 Best steps / second = 679; Sum steps / second = 5893 Best steps / second = 745; Sum steps / second = 5913 [mrd@system ~]$ ./Unsync 10 +RTS -N4 Best steps / second = 949; Sum steps / second = 7133 Best steps / second = 958; Sum steps / second = 7198 Best steps / second = 989; Sum steps / second = 7189 Best steps / second = 906; Sum steps / second = 7155 Best steps / second = 964; Sum steps / second = 7181

Observations

Number of steps is proportional to number of processors, and inversely proportional to number of threads.

Synchronized threads, part I

Submitted by mrd on Wed, 01/03/2007 - 8:51pm.

I'm performing a small experiment with synchronization of threads: each thread is in a loop performing a "step" each time; the overall idea is to synchronize all threads such that each performs in parallel lock-step.

In this example, the threads are spawned by forkIO, and the synchronization is maintained with the use of the Software Transactional Memory library for MVars.


> import Control.Monad > import Control.Concurrent > import Control.Concurrent.STM > import Data.List > import System.Time > import System.Environment > import System.IO > import System.Random > import Text.Printf > import Ratio

oneThread executes the steps of one thread while remaining in synchronization with the rest. putTMVar will block until the TMVar is empty. Executes a supplied function inside of the synchronization logic, for every step.

> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO () > oneThread syncv n v f = do > atomically $ putTMVar syncv n > let v' = f v > v' `seq` oneThread syncv (n + 1) v' f

sync performs one step of synchronization ensuring that all the threads are working on the same step number. Note that this function is written in the STM monad. It is meant to execute as one atomic block. That means it will block until all TMVars are filled by their threads. It won't stop other threads from running and filling in their TMVars, and it won't touch any of the TMVars until all of them are ready.

STM makes writing this a complete breeze. No worries about strange locking issues, it just does the right thing. The key portion is dead simple: mapM takeTMVar vars. It's functional, it's reusing monadic combinators, and it's easy.

> sync :: (Eq a) => [TMVar a] -> a -> STM Bool > sync vars n = do > vals <- mapM takeTMVar vars > return $ all (==n) vals

Initialize k threads each with a TMVar for synchronization.

> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int]) > initialize k v f = do > vars <- atomically (forM [1..k] > (\_ -> newEmptyTMVar)) > thds <- forM vars > (\var -> forkIO (oneThread var 0 v f)) > return (thds, vars)

simpleSyncLoop only terminates if the threads ever become unsynchronized.

> simpleSyncLoop vars n = do > ok <- atomically $ sync vars n > if ok then do > printf "Synchronized at step = %d\n" n > simpleSyncLoop vars $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

A computational time-waster to simulate "real work".

Pops a value off the random list and takes the factorial of it.

> computation l = let (v:l') = l > in fact v `seq` l' > > fact n = product [1..n]

A simple main function which starts 10 threads and runs the test forever. The computation is initialized with an infinite list of random numbers between 500 and 600.

> simpleMain = do > g <- newStdGen > (_,vars) <- initialize 10 (randomRs (500,600) g) computation > simpleSyncLoop vars 0

timingSyncLoop attempts to count the number of steps taken per second.

(Note: using the TOD (time-of-day) constructor directly like this is a GHC-specific extension)

> timingSyncLoop vars n = do > -- TOD seconds picoseconds > TOD s ps <- getClockTime > loop (fromIntegral s + ps%(10^12)) n n > where > noThds = length vars > loop prevTime prevN n = do > TOD s ps <- getClockTime > let now = fromIntegral s + ps%(10^12) > tdiff = now - prevTime > ndiff = fromIntegral $ n - prevN > sps = floor (ndiff / tdiff) :: Int > if tdiff >= 1 then > do printf "Steps / sec each: %d; Steps / sec total: %d\n" sps (sps * noThds) > hFlush stdout > loop now n n > else > do ok <- atomically $ sync vars n > if ok then do > loop prevTime prevN $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

Examines the first command line argument to determine how many threads to create, defaulting with 10. Initializes the threads and runs the timingSyncLoop indefinitely.

> timingWithArgMain = do > args <- getArgs > let n = case args of > [] -> 10 > a:_ -> read a > g <- newStdGen > (_,vars) <- initialize n (randomRs (500,600) g) computation > timingSyncLoop vars 0
> main :: IO () > main = timingWithArgMain

System is a 4-way Xeon 3.6GHz.


[mrd@system ~]$ ghc --make -O2 -threaded Sync.lhs

[mrd@system ~]$ ./Sync 1 +RTS -N1 Steps / sec each: 2978; Steps / sec total: 2978 Steps / sec each: 2974; Steps / sec total: 2974 Steps / sec each: 2968; Steps / sec total: 2968 Steps / sec each: 2953; Steps / sec total: 2953 Steps / sec each: 2939; Steps / sec total: 2939 [mrd@system ~]$ ./Sync 1 +RTS -N2 Steps / sec each: 3301; Steps / sec total: 3301 Steps / sec each: 3297; Steps / sec total: 3297 Steps / sec each: 3279; Steps / sec total: 3279 Steps / sec each: 3286; Steps / sec total: 3286 Steps / sec each: 3254; Steps / sec total: 3254 [mrd@system ~]$ ./Sync 1 +RTS -N3 Steps / sec each: 3332; Steps / sec total: 3332 Steps / sec each: 3311; Steps / sec total: 3311 Steps / sec each: 3409; Steps / sec total: 3409 Steps / sec each: 3492; Steps / sec total: 3492 Steps / sec each: 3456; Steps / sec total: 3456 [mrd@system ~]$ ./Sync 1 +RTS -N4 Steps / sec each: 3374; Steps / sec total: 3374 Steps / sec each: 3515; Steps / sec total: 3515 Steps / sec each: 3471; Steps / sec total: 3471 Steps / sec each: 3452; Steps / sec total: 3452 Steps / sec each: 3418; Steps / sec total: 3418

[mrd@system ~]$ ./Sync 5 +RTS -N1 Steps / sec each: 659; Steps / sec total: 3295 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 655; Steps / sec total: 3275 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 653; Steps / sec total: 3265 [mrd@system ~]$ ./Sync 5 +RTS -N2 Steps / sec each: 947; Steps / sec total: 4735 Steps / sec each: 813; Steps / sec total: 4065 Steps / sec each: 874; Steps / sec total: 4370 Steps / sec each: 901; Steps / sec total: 4505 Steps / sec each: 803; Steps / sec total: 4015 [mrd@system ~]$ ./Sync 5 +RTS -N3 Steps / sec each: 1114; Steps / sec total: 5570 Steps / sec each: 914; Steps / sec total: 4570 Steps / sec each: 993; Steps / sec total: 4965 Steps / sec each: 1020; Steps / sec total: 5100 Steps / sec each: 983; Steps / sec total: 4915 [mrd@system ~]$ ./Sync 5 +RTS -N4 Steps / sec each: 994; Steps / sec total: 4970 Steps / sec each: 833; Steps / sec total: 4165 Steps / sec each: 899; Steps / sec total: 4495 Steps / sec each: 787; Steps / sec total: 3935 Steps / sec each: 878; Steps / sec total: 4390

[mrd@system ~]$ ./Sync 10 +RTS -N1 Steps / sec each: 286; Steps / sec total: 2860 Steps / sec each: 316; Steps / sec total: 3160 Steps / sec each: 314; Steps / sec total: 3140 Steps / sec each: 313; Steps / sec total: 3130 Steps / sec each: 302; Steps / sec total: 3020 Sync: interrupted [mrd@system ~]$ ./Sync 10 +RTS -N2 Steps / sec each: 563; Steps / sec total: 5630 Steps / sec each: 557; Steps / sec total: 5570 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 558; Steps / sec total: 5580 Steps / sec each: 563; Steps / sec total: 5630 [mrd@system ~]$ ./Sync 10 +RTS -N3 Steps / sec each: 568; Steps / sec total: 5680 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 561; Steps / sec total: 5610 Steps / sec each: 567; Steps / sec total: 5670 Steps / sec each: 550; Steps / sec total: 5500 [mrd@system ~]$ ./Sync 10 +RTS -N4 Steps / sec each: 555; Steps / sec total: 5550 Steps / sec each: 516; Steps / sec total: 5160 Steps / sec each: 436; Steps / sec total: 4360 Steps / sec each: 514; Steps / sec total: 5140 Steps / sec each: 488; Steps / sec total: 4880