diff options
Diffstat (limited to 'stm/tQueue.ml')
| -rw-r--r-- | stm/tQueue.ml | 101 |
1 files changed, 45 insertions, 56 deletions
diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 2aaca85582..f5bd726dde 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -68,61 +68,54 @@ let create () = { let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq = let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in - Mutex.lock m; - if tq.release then (Mutex.unlock m; raise BeingDestroyed); - while not (PriorityQueue.exists picky q || !destroy) do - tq.nwaiting <- tq.nwaiting + 1; - Condition.broadcast cn; - Condition.wait c m; - tq.nwaiting <- tq.nwaiting - 1; - if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed) - done; - if !destroy then (Mutex.unlock m; raise BeingDestroyed); - let x = PriorityQueue.pop ~picky q in - Condition.signal c; - Condition.signal cn; - Mutex.unlock m; - x + CThread.with_lock m ~scope:(fun () -> + if tq.release then raise BeingDestroyed; + while not (PriorityQueue.exists picky q || !destroy) do + tq.nwaiting <- tq.nwaiting + 1; + Condition.broadcast cn; + Condition.wait c m; + tq.nwaiting <- tq.nwaiting - 1; + if tq.release || !destroy then raise BeingDestroyed + done; + if !destroy then raise BeingDestroyed; + let x = PriorityQueue.pop ~picky q in + Condition.signal c; + Condition.signal cn; + x) let broadcast tq = let { lock = m; cond = c } = tq in - Mutex.lock m; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + Condition.broadcast c) let push tq x = let { queue = q; lock = m; cond = c; release } = tq in if release then CErrors.anomaly(Pp.str "TQueue.push while being destroyed! Only 1 producer/destroyer allowed."); - Mutex.lock m; - PriorityQueue.push q x; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.push q x; + Condition.broadcast c) let length tq = let { queue = q; lock = m } = tq in - Mutex.lock m; - let n = PriorityQueue.length q in - Mutex.unlock m; - n + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.length q) let clear tq = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; - PriorityQueue.clear q; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.clear q) let clear_saving tq f = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; let saved = ref [] in - while not (PriorityQueue.is_empty q) do - let elem = PriorityQueue.pop q in - match f elem with - | Some x -> saved := x :: !saved - | None -> () - done; - Mutex.unlock m; + CThread.with_lock m ~scope:(fun () -> + while not (PriorityQueue.is_empty q) do + let elem = PriorityQueue.pop q in + match f elem with + | Some x -> saved := x :: !saved + | None -> () + done); List.rev !saved let is_empty tq = PriorityQueue.is_empty tq.queue @@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue let destroy tq = tq.release <- true; while tq.nwaiting > 0 do - Mutex.lock tq.lock; - Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + Condition.broadcast tq.cond) done; tq.release <- false let wait_until_n_are_waiting_and_queue_empty j tq = - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do - Condition.wait tq.cond_waiting tq.lock - done; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do + Condition.wait tq.cond_waiting tq.lock + done) let wait_until_n_are_waiting_then_snapshot j tq = let l = ref [] in - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) do - l := PriorityQueue.pop tq.queue :: !l - done; - while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; - List.iter (PriorityQueue.push tq.queue) (List.rev !l); - if !l <> [] then Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) do + l := PriorityQueue.pop tq.queue :: !l + done; + while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; + List.iter (PriorityQueue.push tq.queue) (List.rev !l); + if !l <> [] then Condition.broadcast tq.cond); List.rev !l let set_order tq rel = - Mutex.lock tq.lock; - PriorityQueue.set_rel rel tq.queue; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + PriorityQueue.set_rel rel tq.queue) |
