Blogs

Synchronized threads, part II

Submitted by mrd on Fri, 01/05/2007 - 6:34pm.

For comparison, here is an implementation of multiple threads of which each attempt to perform as many steps as possible in 1 second.


> import Control.Monad > import Control.Concurrent > import Control.Concurrent.STM > import Data.List > import Data.IORef > import System.Time > import System.Environment > import System.IO > import System.Random > import Text.Printf > import Ratio

oneThread greedily attempts to loop through as many steps as possible until one second has elapsed. Then it blocks while it waits for the main thread to collect the previous result, so it can put the new result in the TMVar. Every step it takes, it executes the supplied function parameter f.

> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO () > oneThread mvar n v f = do > TOD s ps <- getClockTime > loop (fromIntegral s + ps%(10^12)) n n v > where > loop prevTime prevN n v = do > TOD s ps <- getClockTime > let now = fromIntegral s + ps%(10^12) > tdiff = now - prevTime > ndiff = fromIntegral $ n - prevN > sps = floor (ndiff / tdiff) > v' = f v > if tdiff >= 1 then > do atomically $ putTMVar mvar sps > loop now n n v > else v' `seq` loop prevTime prevN (n + 1) v'

nosync is akin to sync in that it is an STM action which collects results from all the threads via the TMVars. Again, the key portion is easy: mapM takeTMVar mvars.

> nosync :: (Num a, Ord a) => [TMVar a] -> STM (a, a) > nosync mvars = do > vals <- mapM takeTMVar mvars > return $ (maximum vals, sum vals)
> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int]) > initialize k v f = do > mvars <- atomically (forM [1..k] > (\_ -> newEmptyTMVar)) > thds <- forM (zip mvars [1..k]) > (\(ch, n) -> forkIO (oneThread ch 0 v f)) > return (thds, mvars)

nosyncLoop waits for all the threads to place a value into their TMVar, which will happen after one second.

> nosyncLoop :: [TMVar Int] -> IO () > nosyncLoop mvars = do > (best, sum) <- atomically $ nosync mvars > printf "Best steps / second = %d; Sum steps / second = %d\n" best sum > hFlush stdout > nosyncLoop mvars

A computational time-waster to simulate "real work".

> computation l = let (v:l') = l > in fact v `seq` l' > > fact n = product [1..n]
> main :: IO () > main = do > args <- getArgs > let n = case args of > [] -> 10 > a:_ -> read a > g <- newStdGen > (_,mvars) <- initialize n (randomRs (500,600) g) computation > nosyncLoop mvars

System is a 4-way Xeon 3.6GHz.


[mrd@system ~]$ ghc --make -O2 -threaded Unsync.lhs

[mrd@system ~]$ ./Unsync 1 +RTS -N1 Best steps / second = 3179; Sum steps / second = 3179 Best steps / second = 3181; Sum steps / second = 3181 Best steps / second = 3178; Sum steps / second = 3178 Best steps / second = 3175; Sum steps / second = 3175 Best steps / second = 3174; Sum steps / second = 3174 [mrd@system ~]$ ./Unsync 1 +RTS -N2 Best steps / second = 3142; Sum steps / second = 3142 Best steps / second = 3168; Sum steps / second = 3168 Best steps / second = 3174; Sum steps / second = 3174 Best steps / second = 3177; Sum steps / second = 3177 Best steps / second = 3172; Sum steps / second = 3172

[mrd@system ~]$ ./Unsync 5 +RTS -N1 Best steps / second = 635; Sum steps / second = 3071 Best steps / second = 638; Sum steps / second = 3094 Best steps / second = 668; Sum steps / second = 3080 Best steps / second = 669; Sum steps / second = 3184 Best steps / second = 751; Sum steps / second = 3181 [mrd@system ~]$ ./Unsync 5 +RTS -N2 Best steps / second = 1429; Sum steps / second = 5601 Best steps / second = 1434; Sum steps / second = 5647 Best steps / second = 1446; Sum steps / second = 5647 Best steps / second = 1413; Sum steps / second = 5647 Best steps / second = 1502; Sum steps / second = 5639 [mrd@system ~]$ ./Unsync 5 +RTS -N3 Best steps / second = 1912; Sum steps / second = 5792 Best steps / second = 2092; Sum steps / second = 5934 Best steps / second = 2107; Sum steps / second = 5938 Best steps / second = 1959; Sum steps / second = 5922 Best steps / second = 2068; Sum steps / second = 5960 [mrd@system ~]$ ./Unsync 5 +RTS -N4 Best steps / second = 1876; Sum steps / second = 7428 Best steps / second = 1865; Sum steps / second = 7402 Best steps / second = 1891; Sum steps / second = 7420 Best steps / second = 1895; Sum steps / second = 7581 Best steps / second = 1899; Sum steps / second = 7602

[mrd@system ~]$ ./Unsync 10 +RTS -N1 Best steps / second = 334; Sum steps / second = 2852 Best steps / second = 332; Sum steps / second = 3100 Best steps / second = 334; Sum steps / second = 3082 Best steps / second = 335; Sum steps / second = 3176 Best steps / second = 335; Sum steps / second = 3186 [mrd@system ~]$ ./Unsync 10 +RTS -N2 Best steps / second = 594; Sum steps / second = 5577 Best steps / second = 669; Sum steps / second = 5631 Best steps / second = 588; Sum steps / second = 5641 Best steps / second = 622; Sum steps / second = 5657 Best steps / second = 604; Sum steps / second = 5639 [mrd@system ~]$ ./Unsync 10 +RTS -N3 Best steps / second = 702; Sum steps / second = 5846 Best steps / second = 692; Sum steps / second = 5865 Best steps / second = 717; Sum steps / second = 5884 Best steps / second = 679; Sum steps / second = 5893 Best steps / second = 745; Sum steps / second = 5913 [mrd@system ~]$ ./Unsync 10 +RTS -N4 Best steps / second = 949; Sum steps / second = 7133 Best steps / second = 958; Sum steps / second = 7198 Best steps / second = 989; Sum steps / second = 7189 Best steps / second = 906; Sum steps / second = 7155 Best steps / second = 964; Sum steps / second = 7181

Observations

Number of steps is proportional to number of processors, and inversely proportional to number of threads.

Synchronized threads, part I

Submitted by mrd on Wed, 01/03/2007 - 8:51pm.

I'm performing a small experiment with synchronization of threads: each thread is in a loop performing a "step" each time; the overall idea is to synchronize all threads such that each performs in parallel lock-step.

In this example, the threads are spawned by forkIO, and the synchronization is maintained with the use of the Software Transactional Memory library for MVars.


> import Control.Monad > import Control.Concurrent > import Control.Concurrent.STM > import Data.List > import System.Time > import System.Environment > import System.IO > import System.Random > import Text.Printf > import Ratio

oneThread executes the steps of one thread while remaining in synchronization with the rest. putTMVar will block until the TMVar is empty. Executes a supplied function inside of the synchronization logic, for every step.

> oneThread :: TMVar Int -> Int -> a -> (a -> a) -> IO () > oneThread syncv n v f = do > atomically $ putTMVar syncv n > let v' = f v > v' `seq` oneThread syncv (n + 1) v' f

sync performs one step of synchronization ensuring that all the threads are working on the same step number. Note that this function is written in the STM monad. It is meant to execute as one atomic block. That means it will block until all TMVars are filled by their threads. It won't stop other threads from running and filling in their TMVars, and it won't touch any of the TMVars until all of them are ready.

STM makes writing this a complete breeze. No worries about strange locking issues, it just does the right thing. The key portion is dead simple: mapM takeTMVar vars. It's functional, it's reusing monadic combinators, and it's easy.

> sync :: (Eq a) => [TMVar a] -> a -> STM Bool > sync vars n = do > vals <- mapM takeTMVar vars > return $ all (==n) vals

Initialize k threads each with a TMVar for synchronization.

> initialize :: Int -> a -> (a -> a) -> IO ([ThreadId], [TMVar Int]) > initialize k v f = do > vars <- atomically (forM [1..k] > (\_ -> newEmptyTMVar)) > thds <- forM vars > (\var -> forkIO (oneThread var 0 v f)) > return (thds, vars)

simpleSyncLoop only terminates if the threads ever become unsynchronized.

> simpleSyncLoop vars n = do > ok <- atomically $ sync vars n > if ok then do > printf "Synchronized at step = %d\n" n > simpleSyncLoop vars $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

A computational time-waster to simulate "real work".

Pops a value off the random list and takes the factorial of it.

> computation l = let (v:l') = l > in fact v `seq` l' > > fact n = product [1..n]

A simple main function which starts 10 threads and runs the test forever. The computation is initialized with an infinite list of random numbers between 500 and 600.

> simpleMain = do > g <- newStdGen > (_,vars) <- initialize 10 (randomRs (500,600) g) computation > simpleSyncLoop vars 0

timingSyncLoop attempts to count the number of steps taken per second.

(Note: using the TOD (time-of-day) constructor directly like this is a GHC-specific extension)

> timingSyncLoop vars n = do > -- TOD seconds picoseconds > TOD s ps <- getClockTime > loop (fromIntegral s + ps%(10^12)) n n > where > noThds = length vars > loop prevTime prevN n = do > TOD s ps <- getClockTime > let now = fromIntegral s + ps%(10^12) > tdiff = now - prevTime > ndiff = fromIntegral $ n - prevN > sps = floor (ndiff / tdiff) :: Int > if tdiff >= 1 then > do printf "Steps / sec each: %d; Steps / sec total: %d\n" sps (sps * noThds) > hFlush stdout > loop now n n > else > do ok <- atomically $ sync vars n > if ok then do > loop prevTime prevN $ n + 1 > else > printf "Unsynchronized at step = %d\n" n

Examines the first command line argument to determine how many threads to create, defaulting with 10. Initializes the threads and runs the timingSyncLoop indefinitely.

> timingWithArgMain = do > args <- getArgs > let n = case args of > [] -> 10 > a:_ -> read a > g <- newStdGen > (_,vars) <- initialize n (randomRs (500,600) g) computation > timingSyncLoop vars 0
> main :: IO () > main = timingWithArgMain

System is a 4-way Xeon 3.6GHz.


[mrd@system ~]$ ghc --make -O2 -threaded Sync.lhs

[mrd@system ~]$ ./Sync 1 +RTS -N1 Steps / sec each: 2978; Steps / sec total: 2978 Steps / sec each: 2974; Steps / sec total: 2974 Steps / sec each: 2968; Steps / sec total: 2968 Steps / sec each: 2953; Steps / sec total: 2953 Steps / sec each: 2939; Steps / sec total: 2939 [mrd@system ~]$ ./Sync 1 +RTS -N2 Steps / sec each: 3301; Steps / sec total: 3301 Steps / sec each: 3297; Steps / sec total: 3297 Steps / sec each: 3279; Steps / sec total: 3279 Steps / sec each: 3286; Steps / sec total: 3286 Steps / sec each: 3254; Steps / sec total: 3254 [mrd@system ~]$ ./Sync 1 +RTS -N3 Steps / sec each: 3332; Steps / sec total: 3332 Steps / sec each: 3311; Steps / sec total: 3311 Steps / sec each: 3409; Steps / sec total: 3409 Steps / sec each: 3492; Steps / sec total: 3492 Steps / sec each: 3456; Steps / sec total: 3456 [mrd@system ~]$ ./Sync 1 +RTS -N4 Steps / sec each: 3374; Steps / sec total: 3374 Steps / sec each: 3515; Steps / sec total: 3515 Steps / sec each: 3471; Steps / sec total: 3471 Steps / sec each: 3452; Steps / sec total: 3452 Steps / sec each: 3418; Steps / sec total: 3418

[mrd@system ~]$ ./Sync 5 +RTS -N1 Steps / sec each: 659; Steps / sec total: 3295 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 655; Steps / sec total: 3275 Steps / sec each: 649; Steps / sec total: 3245 Steps / sec each: 653; Steps / sec total: 3265 [mrd@system ~]$ ./Sync 5 +RTS -N2 Steps / sec each: 947; Steps / sec total: 4735 Steps / sec each: 813; Steps / sec total: 4065 Steps / sec each: 874; Steps / sec total: 4370 Steps / sec each: 901; Steps / sec total: 4505 Steps / sec each: 803; Steps / sec total: 4015 [mrd@system ~]$ ./Sync 5 +RTS -N3 Steps / sec each: 1114; Steps / sec total: 5570 Steps / sec each: 914; Steps / sec total: 4570 Steps / sec each: 993; Steps / sec total: 4965 Steps / sec each: 1020; Steps / sec total: 5100 Steps / sec each: 983; Steps / sec total: 4915 [mrd@system ~]$ ./Sync 5 +RTS -N4 Steps / sec each: 994; Steps / sec total: 4970 Steps / sec each: 833; Steps / sec total: 4165 Steps / sec each: 899; Steps / sec total: 4495 Steps / sec each: 787; Steps / sec total: 3935 Steps / sec each: 878; Steps / sec total: 4390

[mrd@system ~]$ ./Sync 10 +RTS -N1 Steps / sec each: 286; Steps / sec total: 2860 Steps / sec each: 316; Steps / sec total: 3160 Steps / sec each: 314; Steps / sec total: 3140 Steps / sec each: 313; Steps / sec total: 3130 Steps / sec each: 302; Steps / sec total: 3020 Sync: interrupted [mrd@system ~]$ ./Sync 10 +RTS -N2 Steps / sec each: 563; Steps / sec total: 5630 Steps / sec each: 557; Steps / sec total: 5570 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 558; Steps / sec total: 5580 Steps / sec each: 563; Steps / sec total: 5630 [mrd@system ~]$ ./Sync 10 +RTS -N3 Steps / sec each: 568; Steps / sec total: 5680 Steps / sec each: 562; Steps / sec total: 5620 Steps / sec each: 561; Steps / sec total: 5610 Steps / sec each: 567; Steps / sec total: 5670 Steps / sec each: 550; Steps / sec total: 5500 [mrd@system ~]$ ./Sync 10 +RTS -N4 Steps / sec each: 555; Steps / sec total: 5550 Steps / sec each: 516; Steps / sec total: 5160 Steps / sec each: 436; Steps / sec total: 4360 Steps / sec each: 514; Steps / sec total: 5140 Steps / sec each: 488; Steps / sec total: 4880


Haskell is a first-class action language as well

Submitted by metaperl on Tue, 12/12/2006 - 10:50am.

I had been considering Haskell a value-oriented language and had concluded that doing a lot of file system actions would be cumbersome. Dons, put that all to rest:


metaperl: dons - if you are doing a lot of file-system
manipulation, doesnt using a value-oriented language
like Haskell become cumbersome?
metaperl: dons - i mean things like copying files, renaming files, etc
dons: metaperl: hmm, like darcs?
dons: why? you're using a first-class action oriented language too, remember
metaperl didn't think of darcs
dons: so you can string together your manipulting functions in interesting ways
dons: i.e. haskell treats imperative statements as first class citizens
dons: you can pass them to functions, put them in data structures
dons: map over them
dons: > sequence_ $ reverse [readFile "/tmp/x", writeFile "/tmp/y"]

wow. Haskell wins again.

Python optional args - great for evolving source code

Submitted by metaperl on Tue, 12/12/2006 - 8:54am.

Optional args are not part of the Haskell Way. I have about 20 calls to this function:

def archive_zip(my):
for f in path(my.zip).files('*'):
f.move(my.archive)

but then I needed to do the same thing but I needed to prepend the date to the file for one particular invocation of this. So, in Python, I simply tacked on an optional argument and used a default value which led to the behavior that the oroginal 20 calls to this function would expect and then I added some code to handle prepending of date:


def archive_zip(my, prepend_date=False):
for f in path(my.zip).files('*'):
f.move(my.archive)
if prepend_date:
today = datetime.date.today()
s = today.strftime("%b-%d-%y")
newfile = "%s-%s" % (s, f)
syscmd = "cd %s; mv %s %s" % (my.archive, f, newfile)
print syscmd
os.system(syscmd)

Now, I'm not sure how quickly I could evolve this function in Haskell, but I'm dead curious to know.

Memorable quotes

Submitted by metaperl on Fri, 12/08/2006 - 1:19am.

This is a placeholder thread for all of the funny things I see in #haskell

lispy

one of the reasons oo is so good at solving problems is because it's so good at creating them :)

Daydreaming about teaching haskell

Submitted by metaperl on Thu, 12/07/2006 - 8:40am.

"Ok students, today we are going to learn something different and amazing. It's called purely functional programming... did you notice you entered this room? Sometimes 1 of you came in, other times 3 came through the door? Well, in a functional language, that would be OK, but we would type it like this:

enterDoor :: timeQuanta -> [Student] -> [Student]

"

I started this out as a post about how a function always took the exact same arguments... ALWAYS... OK now I got it.

"Ok students, you noticed you that dog came in? And then that principle? That's the beauty of FP. Any possible set of inputs MUST be stated up front, so we have to change our funciton again."

Hmm, this didn't go as planned but it's leading somewhere. Bear with me. I'll clean this up sometime... i'm supposed to be working now :)

The Haskell type system bothers me

Submitted by metaperl on Thu, 12/07/2006 - 4:58am.

A type is a set of values. Earlier, I discussed how I had to do programming on something that was no more than a list whose elements were "consed" together via carriage returns:


"item 1\nitem2\nitem 3"

While I did manage to write an implementation of lines, the Haskell type system is reknowned for forcing programmers to describe their data up front.

But, the Haskell type system did not interpret my string as a list for me.

I think Cale said it best recently

Sometimes the static typing language just doesn't have the types necessary to express the conditions on code which the programmer would want to express, and sometimes adding those additional types will either spoil type-inference, or make the problem of proving that a program satisfies its types much harder, or even make it outright impossible for the compiler to do for itself.

In reflection, I suppose I could resort to Parsec to produce a list for me.

Musing about very strong types and their accessors

Submitted by metaperl on Mon, 12/04/2006 - 2:40pm.

In working through SJT, I was required to implement lines. For the first time ever, I managed to break a Haskell program while running it instead of the type checker catching the problem. Feast your eyes on this:


*Main> mylines ts
["dog","cat"*** Exception: Prelude.tail: empty list

As usual, that got me thinking about how I got into the situation. And it hit me, that my main problem is that I had to do programming instead of creating a very strong type and related accessors.

In other words, lines converts a sequence of lines into a the sequence we call a list.

It seems that I should be able to type a string consisting of carriage return delimeters as a lazy list which returns elements.

Hmm, just brainstorming.

Haskell Project List

Submitted by metaperl on Sun, 12/03/2006 - 8:28am.

Here I list everything that I will probably never get the time to do, but want to do.

haskell paste site

paste.lisp.org has gone down one too many times for me. I think it's time we had our own robust paste tied into lambdabot.

Please dont sweat the small stuff

Submitted by metaperl on Fri, 12/01/2006 - 3:11pm.

I am happy that I finished Chapter 7 of SJT today. It was a mammoth chapter. It involved a lot of thinking and I am a stronger programmer for having made it.

That being said, I really needed to fight my addiction to perfection. On a hard exercise, I took the liberty of stringing together several prelude functions without (a) learning how they worked or (b) rewriting them myself.

Also, my solutions may not be the most efficient.

But you know what? I'm done with the chapter and I'm better.

If I really wanted to be anal, I would have to rebuild the hardware I'm on for max speed. And then recompile ghc with all the right compiler flags, etc, etc.

DONT SWEAT THE SMALL STUFF!