diff options
Diffstat (limited to 'stm')
| -rw-r--r-- | stm/stm.ml | 20 | ||||
| -rw-r--r-- | stm/tQueue.ml | 101 | ||||
| -rw-r--r-- | stm/workerPool.ml | 7 |
3 files changed, 55 insertions, 73 deletions
diff --git a/stm/stm.ml b/stm/stm.ml index 9480bbdc2e..f5768726c3 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -98,8 +98,7 @@ let forward_feedback, forward_feedback_hook = let m = Mutex.create () in Hook.make ~default:(function | { doc_id = did; span_id = id; route; contents } -> - try Mutex.lock m; feedback ~did ~id ~route contents; Mutex.unlock m - with e -> Mutex.unlock m; raise e) () + CThread.with_lock m ~scope:(fun () -> feedback ~did ~id ~route contents)) () let unreachable_state, unreachable_state_hook = Hook.make ~default:(fun ~doc:_ _ _ -> ()) () @@ -758,17 +757,16 @@ end = struct (* {{{ *) let worker = ref None let set_last_job j = - Mutex.lock m; - job := Some j; - Condition.signal c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + job := Some j; + Condition.signal c) let get_last_job () = - Mutex.lock m; - while Option.is_empty !job do Condition.wait c m; done; - match !job with - | None -> assert false - | Some x -> job := None; Mutex.unlock m; x + CThread.with_lock m ~scope:(fun () -> + while Option.is_empty !job do Condition.wait c m; done; + match !job with + | None -> assert false + | Some x -> job := None; x) let run_command () = try while true do get_last_job () () done diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 2aaca85582..f5bd726dde 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -68,61 +68,54 @@ let create () = { let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq = let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in - Mutex.lock m; - if tq.release then (Mutex.unlock m; raise BeingDestroyed); - while not (PriorityQueue.exists picky q || !destroy) do - tq.nwaiting <- tq.nwaiting + 1; - Condition.broadcast cn; - Condition.wait c m; - tq.nwaiting <- tq.nwaiting - 1; - if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed) - done; - if !destroy then (Mutex.unlock m; raise BeingDestroyed); - let x = PriorityQueue.pop ~picky q in - Condition.signal c; - Condition.signal cn; - Mutex.unlock m; - x + CThread.with_lock m ~scope:(fun () -> + if tq.release then raise BeingDestroyed; + while not (PriorityQueue.exists picky q || !destroy) do + tq.nwaiting <- tq.nwaiting + 1; + Condition.broadcast cn; + Condition.wait c m; + tq.nwaiting <- tq.nwaiting - 1; + if tq.release || !destroy then raise BeingDestroyed + done; + if !destroy then raise BeingDestroyed; + let x = PriorityQueue.pop ~picky q in + Condition.signal c; + Condition.signal cn; + x) let broadcast tq = let { lock = m; cond = c } = tq in - Mutex.lock m; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + Condition.broadcast c) let push tq x = let { queue = q; lock = m; cond = c; release } = tq in if release then CErrors.anomaly(Pp.str "TQueue.push while being destroyed! Only 1 producer/destroyer allowed."); - Mutex.lock m; - PriorityQueue.push q x; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.push q x; + Condition.broadcast c) let length tq = let { queue = q; lock = m } = tq in - Mutex.lock m; - let n = PriorityQueue.length q in - Mutex.unlock m; - n + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.length q) let clear tq = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; - PriorityQueue.clear q; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.clear q) let clear_saving tq f = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; let saved = ref [] in - while not (PriorityQueue.is_empty q) do - let elem = PriorityQueue.pop q in - match f elem with - | Some x -> saved := x :: !saved - | None -> () - done; - Mutex.unlock m; + CThread.with_lock m ~scope:(fun () -> + while not (PriorityQueue.is_empty q) do + let elem = PriorityQueue.pop q in + match f elem with + | Some x -> saved := x :: !saved + | None -> () + done); List.rev !saved let is_empty tq = PriorityQueue.is_empty tq.queue @@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue let destroy tq = tq.release <- true; while tq.nwaiting > 0 do - Mutex.lock tq.lock; - Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + Condition.broadcast tq.cond) done; tq.release <- false let wait_until_n_are_waiting_and_queue_empty j tq = - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do - Condition.wait tq.cond_waiting tq.lock - done; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do + Condition.wait tq.cond_waiting tq.lock + done) let wait_until_n_are_waiting_then_snapshot j tq = let l = ref [] in - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) do - l := PriorityQueue.pop tq.queue :: !l - done; - while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; - List.iter (PriorityQueue.push tq.queue) (List.rev !l); - if !l <> [] then Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) do + l := PriorityQueue.pop tq.queue :: !l + done; + while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; + List.iter (PriorityQueue.push tq.queue) (List.rev !l); + if !l <> [] then Condition.broadcast tq.cond); List.rev !l let set_order tq rel = - Mutex.lock tq.lock; - PriorityQueue.set_rel rel tq.queue; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + PriorityQueue.set_rel rel tq.queue) diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 45c92c3748..fef9300377 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -72,12 +72,7 @@ let worker_handshake slave_ic slave_oc = exit 1 let locking { lock; pool = p } f = - try - Mutex.lock lock; - let x = f p in - Mutex.unlock lock; - x - with e -> Mutex.unlock lock; raise e + CThread.with_lock lock ~scope:(fun () -> f p) let rec create_worker extra pool priority id = let cancel = ref false in |
