aboutsummaryrefslogtreecommitdiff
path: root/toplevel
diff options
context:
space:
mode:
authorEnrico Tassi2014-03-13 15:41:44 +0100
committerEnrico Tassi2014-03-13 16:04:13 +0100
commitc9b1caaa5516d616e400faa7a7c0278c8677c51c (patch)
tree2dc6f7870a9824f8b9c8357774287c5c44b332a2 /toplevel
parent8ee720fef8e21595827d18e1e28777c1d061a9e5 (diff)
STM: move out a couple of submodules
These modules are not as reusable as one may want them to be, but moving them out simplifies a little STM.
Diffstat (limited to 'toplevel')
-rw-r--r--toplevel/stm.ml161
1 files changed, 27 insertions, 134 deletions
diff --git a/toplevel/stm.ml b/toplevel/stm.ml
index c612586557..f6564e8921 100644
--- a/toplevel/stm.ml
+++ b/toplevel/stm.ml
@@ -617,6 +617,8 @@ module Worker = Spawn.Sync(struct
())
end)
+module WorkersPool = WorkerPool.Make(Worker)
+
let record_pb_time proof_name loc time =
let proof_build_time = Printf.sprintf "%.3f" time in
Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time;
@@ -660,121 +662,8 @@ module Slaves : sig
end = struct (* {{{ *)
- module TQueue : sig
-
- type 'a t
- val create : unit -> 'a t
- val pop : 'a t -> 'a
- val push : 'a t -> 'a -> unit
- val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit
- val dump : 'a t -> 'a list
-
- end = struct (* {{{ *)
-
- (* queue, lock, cond_q, n_waiting, cond_n_waiting *)
- type 'a t = 'a Queue.t * Mutex.t * Condition.t * int ref * Condition.t
-
- let create () =
- Queue.create (), Mutex.create (), Condition.create (),
- ref 0, Condition.create ()
-
- let pop (q,m,c,n,cn) =
- Mutex.lock m;
- while Queue.is_empty q do
- n := !n+1;
- Condition.signal cn;
- Condition.wait c m;
- n := !n-1
- done;
- let x = Queue.pop q in
- Condition.signal c;
- Condition.signal cn;
- Mutex.unlock m;
- x
- let push (q,m,c,n,cn) x =
- Mutex.lock m;
- Queue.push x q;
- Condition.signal c;
- Mutex.unlock m
-
- let wait_until_n_are_waiting_and_queue_empty (j : int) (q,m,c,n,cn) =
- Mutex.lock m;
- while not (Queue.is_empty q) || !n < j do Condition.wait cn m done;
- Mutex.unlock m
-
- let dump (q,m,_,_,_) =
- let l = ref [] in
- Mutex.lock m;
- while not (Queue.is_empty q) do l := Queue.pop q :: !l done;
- Mutex.unlock m;
- List.rev !l
-
- end (* }}} *)
-
- module SlavesPool : sig
-
- val init : int -> (bool ref -> (unit -> in_channel * out_channel * Worker.process * int) -> unit) -> unit
- val is_empty : unit -> bool
- val n_slaves : unit -> int
-
- val cancel : int -> unit
-
- end = struct (* {{{ *)
-
- let slave_managers = ref None
-
- let n_slaves () = match !slave_managers with
- | None -> 0
- | Some managers -> Array.length managers
- let is_empty () = !slave_managers = None
-
- let master_handshake worker_id ic oc =
- try
- Marshal.to_channel oc 17 []; flush oc;
- let n = (Marshal.from_channel ic : int) in
- assert(n = 17);
- prerr_endline (Printf.sprintf "Handshake with %d OK" worker_id)
- with e ->
- prerr_endline
- (Printf.sprintf "Handshake with %d failed: %s"
- worker_id (Printexc.to_string e));
- exit 1
-
- let respawn n () =
- let prog = Sys.argv.(0) in
- let rec set_slave_opt = function
- | [] -> ["-async-proofs"; "worker"; string_of_int n]
- | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
- | ("-async-proofs"
- |"-compile"
- |"-compile-verbose")::_::tl -> set_slave_opt tl
- | x::tl -> x :: set_slave_opt tl in
- let args =
- Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
- let env =
- Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
- let proc, ic, oc = Worker.spawn ~env prog args in
- master_handshake n ic oc;
- ic, oc, proc, n
-
- let init n manage_slave =
- slave_managers := Some
- (Array.init n (fun x ->
- let calcel_req = ref false in
- calcel_req,
- Thread.create (manage_slave calcel_req) (respawn (x+1))))
-
- let cancel n =
- match !slave_managers with
- | None -> ()
- | Some a ->
- let switch, _ = a.(n) in
- switch := true
-
- end (* }}} *)
-
- let cancel_worker n = SlavesPool.cancel (n-1)
+ let cancel_worker n = WorkersPool.cancel (n-1)
let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
let set_reach_known_state f = reach_known_state := f
@@ -976,13 +865,13 @@ end = struct (* {{{ *)
let queue : task TQueue.t = TQueue.create ()
let wait_all_done () =
- if not (SlavesPool.is_empty ()) then
+ if not (WorkersPool.is_empty ()) then
TQueue.wait_until_n_are_waiting_and_queue_empty
- (SlavesPool.n_slaves ()) queue
+ (WorkersPool.n_workers ()) queue
let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name =
let cancel_switch = ref false in
- if SlavesPool.is_empty () then
+ if WorkersPool.is_empty () then
if !Flags.compilation_mode = Flags.BuildVi then begin
let force () : Entries.proof_output list Future.assignement =
try `Val (build_proof_here_core loc stop ()) with e -> `Exn e in
@@ -1018,8 +907,20 @@ end = struct (* {{{ *)
| _ -> assert false in
Pp.feedback ~state_id:Stateid.initial (Interface.SlaveStatus(id, s))
- let rec manage_slave cancel_user_req respawn =
- let ic, oc, proc, id_slave = respawn () in
+ let rec manage_slave ~cancel:cancel_user_req id_slave respawn =
+ let ic, oc, proc =
+ let rec set_slave_opt = function
+ | [] -> ["-async-proofs"; "worker"; string_of_int id_slave]
+ | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
+ | ("-async-proofs"
+ |"-compile"
+ |"-compile-verbose")::_::tl -> set_slave_opt tl
+ | x::tl -> x :: set_slave_opt tl in
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env =
+ Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
+ respawn ~args ~env () in
let last_task = ref None in
let task_expired = ref false in
let task_cancelled = ref false in
@@ -1098,11 +999,11 @@ end = struct (* {{{ *)
| KillRespawn ->
Pp.feedback (Interface.InProgress ~-1);
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
Pp.feedback (Interface.InProgress ~-1);
ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
msg_warning(strbrk "The worker was cancelled.");
Option.iter (fun task ->
@@ -1114,7 +1015,7 @@ end = struct (* {{{ *)
Pp.feedback (Interface.InProgress ~-1);
) !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file
when !fallback_to_lazy_if_slave_dies ->
msg_warning(strbrk "The worker process died badly.");
@@ -1125,7 +1026,7 @@ end = struct (* {{{ *)
Pp.feedback (Interface.InProgress ~-1);
) !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
Worker.kill proc;
let exit_status proc = match Worker.wait proc with
@@ -1136,7 +1037,7 @@ end = struct (* {{{ *)
pr_err ("Fatal worker error: " ^ (exit_status proc));
flush_all (); exit 1
- let init () = SlavesPool.init !Flags.async_proofs_n_workers manage_slave
+ let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave
let slave_ic = ref stdin
let slave_oc = ref stdout
@@ -1152,15 +1053,7 @@ end = struct (* {{{ *)
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
- let slave_handshake () =
- try
- let v = (Marshal.from_channel !slave_ic : int) in
- assert(v = 17);
- Marshal.to_channel !slave_oc v []; flush !slave_oc;
- prerr_endline "Handshake OK"
- with e ->
- prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
- exit 1
+ let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
let slave_main_loop reset =
let slave_feeder oc fb =
@@ -1214,7 +1107,7 @@ end = struct (* {{{ *)
(* For external users this name is nicer than request *)
type tasks = int request list
let dump f2t_map =
- assert(SlavesPool.is_empty ()); (* ATM, we allow that only if no slaves *)
+ assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
let tasks = TQueue.dump queue in
prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks));
let tasks = List.map request_of_task tasks in