aboutsummaryrefslogtreecommitdiff
path: root/stm/tQueue.ml
diff options
context:
space:
mode:
Diffstat (limited to 'stm/tQueue.ml')
-rw-r--r--stm/tQueue.ml101
1 files changed, 45 insertions, 56 deletions
diff --git a/stm/tQueue.ml b/stm/tQueue.ml
index 2aaca85582..f5bd726dde 100644
--- a/stm/tQueue.ml
+++ b/stm/tQueue.ml
@@ -68,61 +68,54 @@ let create () = {
let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq =
let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in
- Mutex.lock m;
- if tq.release then (Mutex.unlock m; raise BeingDestroyed);
- while not (PriorityQueue.exists picky q || !destroy) do
- tq.nwaiting <- tq.nwaiting + 1;
- Condition.broadcast cn;
- Condition.wait c m;
- tq.nwaiting <- tq.nwaiting - 1;
- if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed)
- done;
- if !destroy then (Mutex.unlock m; raise BeingDestroyed);
- let x = PriorityQueue.pop ~picky q in
- Condition.signal c;
- Condition.signal cn;
- Mutex.unlock m;
- x
+ CThread.with_lock m ~scope:(fun () ->
+ if tq.release then raise BeingDestroyed;
+ while not (PriorityQueue.exists picky q || !destroy) do
+ tq.nwaiting <- tq.nwaiting + 1;
+ Condition.broadcast cn;
+ Condition.wait c m;
+ tq.nwaiting <- tq.nwaiting - 1;
+ if tq.release || !destroy then raise BeingDestroyed
+ done;
+ if !destroy then raise BeingDestroyed;
+ let x = PriorityQueue.pop ~picky q in
+ Condition.signal c;
+ Condition.signal cn;
+ x)
let broadcast tq =
let { lock = m; cond = c } = tq in
- Mutex.lock m;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ Condition.broadcast c)
let push tq x =
let { queue = q; lock = m; cond = c; release } = tq in
if release then CErrors.anomaly(Pp.str
"TQueue.push while being destroyed! Only 1 producer/destroyer allowed.");
- Mutex.lock m;
- PriorityQueue.push q x;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.push q x;
+ Condition.broadcast c)
let length tq =
let { queue = q; lock = m } = tq in
- Mutex.lock m;
- let n = PriorityQueue.length q in
- Mutex.unlock m;
- n
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.length q)
let clear tq =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
- PriorityQueue.clear q;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.clear q)
let clear_saving tq f =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
let saved = ref [] in
- while not (PriorityQueue.is_empty q) do
- let elem = PriorityQueue.pop q in
- match f elem with
- | Some x -> saved := x :: !saved
- | None -> ()
- done;
- Mutex.unlock m;
+ CThread.with_lock m ~scope:(fun () ->
+ while not (PriorityQueue.is_empty q) do
+ let elem = PriorityQueue.pop q in
+ match f elem with
+ | Some x -> saved := x :: !saved
+ | None -> ()
+ done);
List.rev !saved
let is_empty tq = PriorityQueue.is_empty tq.queue
@@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue
let destroy tq =
tq.release <- true;
while tq.nwaiting > 0 do
- Mutex.lock tq.lock;
- Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ Condition.broadcast tq.cond)
done;
tq.release <- false
let wait_until_n_are_waiting_and_queue_empty j tq =
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
- Condition.wait tq.cond_waiting tq.lock
- done;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
+ Condition.wait tq.cond_waiting tq.lock
+ done)
let wait_until_n_are_waiting_then_snapshot j tq =
let l = ref [] in
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) do
- l := PriorityQueue.pop tq.queue :: !l
- done;
- while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
- List.iter (PriorityQueue.push tq.queue) (List.rev !l);
- if !l <> [] then Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) do
+ l := PriorityQueue.pop tq.queue :: !l
+ done;
+ while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
+ List.iter (PriorityQueue.push tq.queue) (List.rev !l);
+ if !l <> [] then Condition.broadcast tq.cond);
List.rev !l
let set_order tq rel =
- Mutex.lock tq.lock;
- PriorityQueue.set_rel rel tq.queue;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ PriorityQueue.set_rel rel tq.queue)