Synchronized threads, part I

Submitted by mrd on Wed, 01/03/2007 - 8:51pm.

I'm performing a small experiment with synchronization of threads: each thread is in a loop performing a "step" each time; the overall idea is to synchronize all threads such that each performs in parallel lock-step.

In this example, the threads are spawned by forkIO, and the synchronization is maintained with the use of the Software Transactional Memory library for MVars.


> import Control.Monad > import Control.Concurrent > import Control.Concurrent.STM > import Data.List > import System.Time > import System.Environment > import System.IO > import System.Random > import Text.Printf > import Ratio

oneThread executes the steps of one thread while remaining in synchronization with the rest. putTMVar will block until the TMVar is empty. Executes a supplied function inside of the synchronization logic, for every step.

> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO () > oneThread syncv n v f = do > atomically $ putTMVar syncv n > let v' = f v > v' `seq` oneThread syncv (n + 1) v' f

sync performs one step of synchronization ensuring that all the threads are working on the same step number. Note that this function is written in the STM monad. It is meant to execute as one atomic block. That means it will block until all TMVars are filled by their threads. It won't stop other threads from running and filling in their TMVars, and it won't touch any of the TMVars until all of them are ready.

STM makes writing this a complete breeze. No worries about strange locking issues, it just does the right thing. The key portion is dead simple: mapM takeTMVar vars. It's functional, it's reusing monadic combinators, and it's easy.

> sync :: (Eq a) => [TMVar a] -> a -> STM Bool > sync vars n = do > vals <- mapM takeTMVar vars > return $ all (==n) vals

Initialize k threads each with a TMVar for synchronization.

> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int]) > initialize k v f = do > vars <- atomically (forM [1..k] > (\_ -> newEmptyTMVar)) > thds <- forM vars > (\var -> forkIO (oneThread var 0 v f)) > return (thds, vars)

simpleSyncLoop only terminates if the threads ever become unsynchronized.

> simpleSyncLoop vars n = do > ok <- atomically $ sync vars n > if ok then do > printf "Synchronized at step = %d\n" n > simpleSyncLoop vars $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

A computational time-waster to simulate "real work".

Pops a value off the random list and takes the factorial of it.

> computation l = let (v:l') = l > in fact v `seq` l' > > fact n = product [1..n]

A simple main function which starts 10 threads and runs the test forever. The computation is initialized with an infinite list of random numbers between 500 and 600.

> simpleMain = do > g <- newStdGen > (_,vars) <- initialize 10 (randomRs (500,600) g) computation > simpleSyncLoop vars 0

timingSyncLoop attempts to count the number of steps taken per second.

(Note: using the TOD (time-of-day) constructor directly like this is a GHC-specific extension)

> timingSyncLoop vars n = do > -- TOD seconds picoseconds > TOD s ps <- getClockTime > loop (fromIntegral s + ps%(10^12)) n n > where > noThds = length vars > loop prevTime prevN n = do > TOD s ps <- getClockTime > let now = fromIntegral s + ps%(10^12) > tdiff = now - prevTime > ndiff = fromIntegral $ n - prevN > sps = floor (ndiff / tdiff) :: Int > if tdiff >= 1 then > do printf "Steps / sec each: %d; Steps / sec total: %d\n" sps (sps * noThds) > hFlush stdout > loop now n n > else > do ok <- atomically $ sync vars n > if ok then do > loop prevTime prevN $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

Examines the first command line argument to determine how many threads to create, defaulting with 10. Initializes the threads and runs the timingSyncLoop indefinitely.

> timingWithArgMain = do > args <- getArgs > let n = case args of > [] -> 10 > a:_ -> read a > g <- newStdGen > (_,vars) <- initialize n (randomRs (500,600) g) computation > timingSyncLoop vars 0
> main :: IO () > main = timingWithArgMain

System is a 4-way Xeon 3.6GHz.


[mrd@system ~]$ ghc --make -O2 -threaded Sync.lhs

[mrd@system ~]$ ./Sync 1 +RTS -N1 Steps / sec each: 2978; Steps / sec total: 2978 Steps / sec each: 2974; Steps / sec total: 2974 Steps / sec each: 2968; Steps / sec total: 2968 Steps / sec each: 2953; Steps / sec total: 2953 Steps / sec each: 2939; Steps / sec total: 2939 [mrd@system ~]$ ./Sync 1 +RTS -N2 Steps / sec each: 3301; Steps / sec total: 3301 Steps / sec each: 3297; Steps / sec total: 3297 Steps / sec each: 3279; Steps / sec total: 3279 Steps / sec each: 3286; Steps / sec total: 3286 Steps / sec each: 3254; Steps / sec total: 3254 [mrd@system ~]$ ./Sync 1 +RTS -N3 Steps / sec each: 3332; Steps / sec total: 3332 Steps / sec each: 3311; Steps / sec total: 3311 Steps / sec each: 3409; Steps / sec total: 3409 Steps / sec each: 3492; Steps / sec total: 3492 Steps / sec each: 3456; Steps / sec total: 3456 [mrd@system ~]$ ./Sync 1 +RTS -N4 Steps / sec each: 3374; Steps / sec total: 3374 Steps / sec each: 3515; Steps / sec total: 3515 Steps / sec each: 3471; Steps / sec total: 3471 Steps / sec each: 3452; Steps / sec total: 3452 Steps / sec each: 3418; Steps / sec total: 3418

[mrd@system ~]$ ./Sync 5 +RTS -N1 Steps / sec each: 659; Steps / sec total: 3295 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 655; Steps / sec total: 3275 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 653; Steps / sec total: 3265 [mrd@system ~]$ ./Sync 5 +RTS -N2 Steps / sec each: 947; Steps / sec total: 4735 Steps / sec each: 813; Steps / sec total: 4065 Steps / sec each: 874; Steps / sec total: 4370 Steps / sec each: 901; Steps / sec total: 4505 Steps / sec each: 803; Steps / sec total: 4015 [mrd@system ~]$ ./Sync 5 +RTS -N3 Steps / sec each: 1114; Steps / sec total: 5570 Steps / sec each: 914; Steps / sec total: 4570 Steps / sec each: 993; Steps / sec total: 4965 Steps / sec each: 1020; Steps / sec total: 5100 Steps / sec each: 983; Steps / sec total: 4915 [mrd@system ~]$ ./Sync 5 +RTS -N4 Steps / sec each: 994; Steps / sec total: 4970 Steps / sec each: 833; Steps / sec total: 4165 Steps / sec each: 899; Steps / sec total: 4495 Steps / sec each: 787; Steps / sec total: 3935 Steps / sec each: 878; Steps / sec total: 4390

[mrd@system ~]$ ./Sync 10 +RTS -N1 Steps / sec each: 286; Steps / sec total: 2860 Steps / sec each: 316; Steps / sec total: 3160 Steps / sec each: 314; Steps / sec total: 3140 Steps / sec each: 313; Steps / sec total: 3130 Steps / sec each: 302; Steps / sec total: 3020 Sync: interrupted [mrd@system ~]$ ./Sync 10 +RTS -N2 Steps / sec each: 563; Steps / sec total: 5630 Steps / sec each: 557; Steps / sec total: 5570 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 558; Steps / sec total: 5580 Steps / sec each: 563; Steps / sec total: 5630 [mrd@system ~]$ ./Sync 10 +RTS -N3 Steps / sec each: 568; Steps / sec total: 5680 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 561; Steps / sec total: 5610 Steps / sec each: 567; Steps / sec total: 5670 Steps / sec each: 550; Steps / sec total: 5500 [mrd@system ~]$ ./Sync 10 +RTS -N4 Steps / sec each: 555; Steps / sec total: 5550 Steps / sec each: 516; Steps / sec total: 5160 Steps / sec each: 436; Steps / sec total: 4360 Steps / sec each: 514; Steps / sec total: 5140 Steps / sec each: 488; Steps / sec total: 4880