aboutsummaryrefslogtreecommitdiff
path: root/stm
diff options
context:
space:
mode:
Diffstat (limited to 'stm')
-rw-r--r--stm/asyncTaskQueue.ml31
-rw-r--r--stm/stm.ml39
-rw-r--r--stm/tQueue.ml101
-rw-r--r--stm/workerPool.ml7
4 files changed, 63 insertions, 115 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index dd80ff21aa..a9f203014f 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -56,12 +56,8 @@ module Make(T : Task) () = struct
type response =
| Response of T.response
| RespFeedback of Feedback.feedback
- | RespGetCounterNewUnivLevel
type request = Request of T.request
- type more_data =
- | MoreDataUnivLevel of UnivGen.univ_unique_id list
-
let slave_respond (Request r) =
let res = T.perform r in
Response res
@@ -94,16 +90,6 @@ module Make(T : Task) () = struct
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
- let marshal_more_data oc (res : more_data) =
- try marshal_to_channel oc res
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_more_data: "^s)
-
- let unmarshal_more_data ic =
- try (CThread.thread_friendly_input_value ic : more_data)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_more_data: "^s)
-
let report_status ?(id = !Flags.async_proofs_worker_id) s =
let open Feedback in
feedback ~id:Stateid.initial (WorkerStatus(id, s))
@@ -198,8 +184,6 @@ module Make(T : Task) () = struct
| Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
| Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
| Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in
- let more_univs n =
- CList.init n (fun _ -> UnivGen.new_univ_id ()) in
let rec kill_if () =
if not (Worker.is_alive proc) then ()
@@ -231,9 +215,6 @@ module Make(T : Task) () = struct
marshal_request oc (Request req);
let rec continue () =
match unmarshal_response ic with
- | RespGetCounterNewUnivLevel ->
- marshal_more_data oc (MoreDataUnivLevel (more_univs 10));
- continue ()
| RespFeedback fbk -> T.forward_feedback fbk; continue ()
| Response resp ->
match T.use_response !worker_age task resp with
@@ -315,13 +296,6 @@ module Make(T : Task) () = struct
let ic, oc = Spawned.get_channels () in
slave_oc := Some oc; slave_ic := Some ic
- let bufferize f =
- let l = ref [] in
- fun () ->
- match !l with
- | [] -> let data = f () in l := List.tl data; List.hd data
- | x::tl -> l := tl; x
-
let slave_handshake () =
Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc)
@@ -339,11 +313,6 @@ module Make(T : Task) () = struct
Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc) ()
in
ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x));
- (* We ask master to allocate universe identifiers *)
- UnivGen.set_remote_new_univ_id (bufferize @@ Control.protect_sigalrm (fun () ->
- marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel;
- match unmarshal_more_data (Option.get !slave_ic) with
- | MoreDataUnivLevel l -> l));
let working = ref false in
slave_handshake ();
while true do
diff --git a/stm/stm.ml b/stm/stm.ml
index 9480bbdc2e..6287943cee 100644
--- a/stm/stm.ml
+++ b/stm/stm.ml
@@ -98,8 +98,7 @@ let forward_feedback, forward_feedback_hook =
let m = Mutex.create () in
Hook.make ~default:(function
| { doc_id = did; span_id = id; route; contents } ->
- try Mutex.lock m; feedback ~did ~id ~route contents; Mutex.unlock m
- with e -> Mutex.unlock m; raise e) ()
+ CThread.with_lock m ~scope:(fun () -> feedback ~did ~id ~route contents)) ()
let unreachable_state, unreachable_state_hook = Hook.make
~default:(fun ~doc:_ _ _ -> ()) ()
@@ -758,17 +757,16 @@ end = struct (* {{{ *)
let worker = ref None
let set_last_job j =
- Mutex.lock m;
- job := Some j;
- Condition.signal c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ job := Some j;
+ Condition.signal c)
let get_last_job () =
- Mutex.lock m;
- while Option.is_empty !job do Condition.wait c m; done;
- match !job with
- | None -> assert false
- | Some x -> job := None; Mutex.unlock m; x
+ CThread.with_lock m ~scope:(fun () ->
+ while Option.is_empty !job do Condition.wait c m; done;
+ match !job with
+ | None -> assert false
+ | Some x -> job := None; x)
let run_command () =
try while true do get_last_job () () done
@@ -2449,24 +2447,21 @@ let join ~doc =
VCS.print ();
doc
-let dump_snapshot () = Slaves.dump_snapshot (), RemoteCounter.snapshot ()
-
-type tasks = int Slaves.tasks * RemoteCounter.remote_counters_status
-let check_task name (tasks,rcbackup) i =
- RemoteCounter.restore rcbackup;
+type tasks = int Slaves.tasks
+let check_task name tasks i =
let vcs = VCS.backup () in
try
let rc = State.purify (Slaves.check_task name tasks) i in
VCS.restore vcs;
rc
with e when CErrors.noncritical e -> VCS.restore vcs; false
-let info_tasks (tasks,_) = Slaves.info_tasks tasks
-let finish_tasks name u p (t,rcbackup as tasks) =
- RemoteCounter.restore rcbackup;
+let info_tasks = Slaves.info_tasks
+
+let finish_tasks name u p tasks =
let finish_task u (_,_,i) =
let vcs = VCS.backup () in
- let u = State.purify (Slaves.finish_task name u p t) i in
+ let u = State.purify (Slaves.finish_task name u p tasks) i in
VCS.restore vcs;
u in
try
@@ -2517,13 +2512,13 @@ let snapshot_vio ~create_vos ~doc ~output_native_objects ldir long_f_dot_vo =
CErrors.user_err ~hdr:"stm" (str"Cannot dump a vio with open proofs");
(* LATER: when create_vos is true, it could be more efficient to not allocate the futures; but for now it seems useful for synchronization of the workers,
below, [snapshot] gets computed even if [create_vos] is true. *)
- let (tasks,counters) = dump_snapshot() in
+ let tasks = Slaves.dump_snapshot() in
let except = List.fold_left (fun e (r,_) ->
Future.UUIDSet.add r.Stateid.uuid e) Future.UUIDSet.empty tasks in
let todo_proofs =
if create_vos
then Library.ProofsTodoSomeEmpty except
- else Library.ProofsTodoSome (except,tasks,counters)
+ else Library.ProofsTodoSome (except,tasks)
in
Library.save_library_to todo_proofs ~output_native_objects ldir long_f_dot_vo (Global.opaque_tables ());
doc
diff --git a/stm/tQueue.ml b/stm/tQueue.ml
index 2aaca85582..f5bd726dde 100644
--- a/stm/tQueue.ml
+++ b/stm/tQueue.ml
@@ -68,61 +68,54 @@ let create () = {
let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq =
let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in
- Mutex.lock m;
- if tq.release then (Mutex.unlock m; raise BeingDestroyed);
- while not (PriorityQueue.exists picky q || !destroy) do
- tq.nwaiting <- tq.nwaiting + 1;
- Condition.broadcast cn;
- Condition.wait c m;
- tq.nwaiting <- tq.nwaiting - 1;
- if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed)
- done;
- if !destroy then (Mutex.unlock m; raise BeingDestroyed);
- let x = PriorityQueue.pop ~picky q in
- Condition.signal c;
- Condition.signal cn;
- Mutex.unlock m;
- x
+ CThread.with_lock m ~scope:(fun () ->
+ if tq.release then raise BeingDestroyed;
+ while not (PriorityQueue.exists picky q || !destroy) do
+ tq.nwaiting <- tq.nwaiting + 1;
+ Condition.broadcast cn;
+ Condition.wait c m;
+ tq.nwaiting <- tq.nwaiting - 1;
+ if tq.release || !destroy then raise BeingDestroyed
+ done;
+ if !destroy then raise BeingDestroyed;
+ let x = PriorityQueue.pop ~picky q in
+ Condition.signal c;
+ Condition.signal cn;
+ x)
let broadcast tq =
let { lock = m; cond = c } = tq in
- Mutex.lock m;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ Condition.broadcast c)
let push tq x =
let { queue = q; lock = m; cond = c; release } = tq in
if release then CErrors.anomaly(Pp.str
"TQueue.push while being destroyed! Only 1 producer/destroyer allowed.");
- Mutex.lock m;
- PriorityQueue.push q x;
- Condition.broadcast c;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.push q x;
+ Condition.broadcast c)
let length tq =
let { queue = q; lock = m } = tq in
- Mutex.lock m;
- let n = PriorityQueue.length q in
- Mutex.unlock m;
- n
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.length q)
let clear tq =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
- PriorityQueue.clear q;
- Mutex.unlock m
+ CThread.with_lock m ~scope:(fun () ->
+ PriorityQueue.clear q)
let clear_saving tq f =
let { queue = q; lock = m; cond = c } = tq in
- Mutex.lock m;
let saved = ref [] in
- while not (PriorityQueue.is_empty q) do
- let elem = PriorityQueue.pop q in
- match f elem with
- | Some x -> saved := x :: !saved
- | None -> ()
- done;
- Mutex.unlock m;
+ CThread.with_lock m ~scope:(fun () ->
+ while not (PriorityQueue.is_empty q) do
+ let elem = PriorityQueue.pop q in
+ match f elem with
+ | Some x -> saved := x :: !saved
+ | None -> ()
+ done);
List.rev !saved
let is_empty tq = PriorityQueue.is_empty tq.queue
@@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue
let destroy tq =
tq.release <- true;
while tq.nwaiting > 0 do
- Mutex.lock tq.lock;
- Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ Condition.broadcast tq.cond)
done;
tq.release <- false
let wait_until_n_are_waiting_and_queue_empty j tq =
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
- Condition.wait tq.cond_waiting tq.lock
- done;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do
+ Condition.wait tq.cond_waiting tq.lock
+ done)
let wait_until_n_are_waiting_then_snapshot j tq =
let l = ref [] in
- Mutex.lock tq.lock;
- while not (PriorityQueue.is_empty tq.queue) do
- l := PriorityQueue.pop tq.queue :: !l
- done;
- while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
- List.iter (PriorityQueue.push tq.queue) (List.rev !l);
- if !l <> [] then Condition.broadcast tq.cond;
- Mutex.unlock tq.lock;
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ while not (PriorityQueue.is_empty tq.queue) do
+ l := PriorityQueue.pop tq.queue :: !l
+ done;
+ while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
+ List.iter (PriorityQueue.push tq.queue) (List.rev !l);
+ if !l <> [] then Condition.broadcast tq.cond);
List.rev !l
let set_order tq rel =
- Mutex.lock tq.lock;
- PriorityQueue.set_rel rel tq.queue;
- Mutex.unlock tq.lock
+ CThread.with_lock tq.lock ~scope:(fun () ->
+ PriorityQueue.set_rel rel tq.queue)
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index 45c92c3748..fef9300377 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -72,12 +72,7 @@ let worker_handshake slave_ic slave_oc =
exit 1
let locking { lock; pool = p } f =
- try
- Mutex.lock lock;
- let x = f p in
- Mutex.unlock lock;
- x
- with e -> Mutex.unlock lock; raise e
+ CThread.with_lock lock ~scope:(fun () -> f p)
let rec create_worker extra pool priority id =
let cancel = ref false in