aboutsummaryrefslogtreecommitdiff
path: root/stm
diff options
context:
space:
mode:
Diffstat (limited to 'stm')
-rw-r--r--stm/stm.ml20
-rw-r--r--stm/tQueue.ml101
-rw-r--r--stm/workerPool.ml7
3 files changed, 55 insertions, 73 deletions
diff --git a/stm/stm.ml b/stm/stm.ml
index 9480bbdc2e..f5768726c3 100644
--- a/stm/stm.ml
+++ b/stm/stm.ml
@@ -98,8 +98,7 @@ let forward_feedback, forward_feedback_hook =
let m = Mutex.create () in
Hook.make ~default:(function
| { doc_id = did; span_id = id; route; contents } ->
- try Mutex.lock m; feedback ~did ~id ~route contents; Mutex.unlock m
- with e -> Mutex.unlock m; raise e) ()
+ CThread.with_lock m ~scope:(fun () -> feedback ~did ~id ~route contents)) ()
let unreachable_state, unreachable_state_hook = Hook.make
~default:(fun ~doc:_ _ _ -> ()) ()
@@ -758,17 +757,16 @@ end = struct (* {{{ *)
let worker = ref None
let set_last_job j =
- Mutex.lock m;
- job := Some j;
- Condition.signal c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ job := Some j;
+ Condition.signal c)
let get_last_job () =
- Mutex.lock m;
- while Option.is_empty !job do Condition.wait c m; done;
- match !job with
- | None -> assert false
- | Some x -> job := None; Mutex.unlock m; x
+ CThread.with_lock m ~scope:(fun () ->
+ while Option.is_empty !job do Condition.wait c m; done;
+ match !job with
+ | None -> assert false
+ | Some x -> job := None; x)
let run_command () =
try while true do get_last_job () () done
diff --git a/stm/tQueue.ml b/stm/tQueue.ml
index 2aaca85582..f5bd726dde 100644
--- a/stm/tQueue.ml
+++ b/stm/tQueue.ml
@@ -68,61 +68,54 @@ let create () = {
let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq =
let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in
- Mutex.lock m;
- if tq.release then (Mutex.unlock m; raise BeingDestroyed);
- while not (PriorityQueue.exists picky q || !destroy) do
- tq.nwaiting <- tq.nwaiting + 1;
- Condition.broadcast cn;
- Condition.wait c m;
- tq.nwaiting <- tq.nwaiting - 1;
- if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed)
- done;
- if !destroy then (Mutex.unlock m; raise BeingDestroyed);
- let x = PriorityQueue.pop ~picky q in
- Condition.signal c;
- Condition.signal cn;
- Mutex.unlock m;
- x
+ CThread.with_lock m ~scope:(fun () ->
+ if tq.release then raise BeingDestroyed;
+ while not (PriorityQueue.exists picky q || !destroy) do
+ tq.nwaiting <- tq.nwaiting + 1;
+ Condition.broadcast cn;
+ Condition.wait c m;
+ tq.nwaiting <- tq.nwaiting - 1;
+ if tq.release || !destroy then raise BeingDestroyed
+ done;
+ if !destroy then raise BeingDestroyed;
+ let x = PriorityQueue.pop ~picky q in
+ Condition.signal c;
+ Condition.signal cn;
+ x)
let broadcast tq =
let { lock = m; cond = c } = tq in
- Mutex.lock m;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ Condition.broadcast c)
let push tq x =
let { queue = q; lock = m; cond = c; release } = tq in
if release then CErrors.anomaly(Pp.str
"TQueue.push while being destroyed! Only 1 producer/destroyer allowed.");
- Mutex.lock m;
- PriorityQueue.push q x;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.push q x;
+ Condition.broadcast c)
let length tq =
let { queue = q; lock = m } = tq in
- Mutex.lock m;
- let n = PriorityQueue.length q in
- Mutex.unlock m;
- n
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.length q)
let clear tq =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
- PriorityQueue.clear q;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.clear q)
let clear_saving tq f =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
let saved = ref [] in
- while not (PriorityQueue.is_empty q) do
- let elem = PriorityQueue.pop q in
- match f elem with
- | Some x -> saved := x :: !saved
- | None -> ()
- done;
- Mutex.unlock m;
+ CThread.with_lock m ~scope:(fun () ->
+ while not (PriorityQueue.is_empty q) do
+ let elem = PriorityQueue.pop q in
+ match f elem with
+ | Some x -> saved := x :: !saved
+ | None -> ()
+ done);
List.rev !saved
let is_empty tq = PriorityQueue.is_empty tq.queue
@@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue
let destroy tq =
tq.release <- true;
while tq.nwaiting > 0 do
- Mutex.lock tq.lock;
- Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ Condition.broadcast tq.cond)
done;
tq.release <- false
let wait_until_n_are_waiting_and_queue_empty j tq =
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
- Condition.wait tq.cond_waiting tq.lock
- done;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
+ Condition.wait tq.cond_waiting tq.lock
+ done)
let wait_until_n_are_waiting_then_snapshot j tq =
let l = ref [] in
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) do
- l := PriorityQueue.pop tq.queue :: !l
- done;
- while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
- List.iter (PriorityQueue.push tq.queue) (List.rev !l);
- if !l <> [] then Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) do
+ l := PriorityQueue.pop tq.queue :: !l
+ done;
+ while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
+ List.iter (PriorityQueue.push tq.queue) (List.rev !l);
+ if !l <> [] then Condition.broadcast tq.cond);
List.rev !l
let set_order tq rel =
- Mutex.lock tq.lock;
- PriorityQueue.set_rel rel tq.queue;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ PriorityQueue.set_rel rel tq.queue)
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index 45c92c3748..fef9300377 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -72,12 +72,7 @@ let worker_handshake slave_ic slave_oc =
exit 1
let locking { lock; pool = p } f =
- try
- Mutex.lock lock;
- let x = f p in
- Mutex.unlock lock;
- x
- with e -> Mutex.unlock lock; raise e
+ CThread.with_lock lock ~scope:(fun () -> f p)
let rec create_worker extra pool priority id =
let cancel = ref false in