aboutsummaryrefslogtreecommitdiff
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml45
1 files changed, 29 insertions, 16 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index 65219724e4..36689e6e5f 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -26,8 +26,8 @@ module type Task = sig
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : task -> request option
- val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val request_of_task : [ `Fresh | `Old ] -> task -> request option
+ val use_response : task -> response -> [ `Stay | `StayReset ]
val on_marshal_error : string -> task -> unit
val on_slave_death : task option -> [ `Exit of int | `Stay ]
val on_task_cancellation_or_expiration : task option -> unit
@@ -120,12 +120,15 @@ module Make(T : Task) = struct
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_more_data: "^s)
- let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+ let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
let join () =
if not (WorkersPool.is_empty ()) then
TQueue.wait_until_n_are_waiting_and_queue_empty
(WorkersPool.n_workers ()) queue
+ let cancel_all () =
+ TQueue.clear queue;
+ WorkersPool.cancel_all ()
exception KillRespawn
exception Die
@@ -134,7 +137,7 @@ module Make(T : Task) = struct
let report_status ?(id = !Flags.async_proofs_worker_id) s =
Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
- let rec manage_slave ~cancel:cancel_user_req id respawn =
+ let rec manage_slave ~cancel:cancel_user_req ~die id respawn =
let ic, oc, proc =
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
@@ -151,29 +154,29 @@ module Make(T : Task) = struct
let last_task = ref None in
let task_expired = ref false in
let task_cancelled = ref false in
+ let worker_age = ref `Fresh in
CThread.prepare_in_channel_for_thread_friendly_io ic;
try
- while true do
+ while not !die do
prerr_endline "waiting for a task";
report_status ~id "Idle";
let task, cancel_switch = TQueue.pop queue in
prerr_endline ("got task: "^T.name_of_task task);
last_task := Some task;
- try
- let req = T.request_of_task task in
+ begin try
+ let req = T.request_of_task !worker_age task in
if req = None then raise Expired;
marshal_request oc (Request (Option.get req));
Worker.kill_if proc ~sec:1 (fun () ->
task_expired := !cancel_switch;
task_cancelled := !cancel_user_req;
if !cancel_user_req then cancel_user_req := false;
- !task_expired || !task_cancelled);
+ !task_expired || !task_cancelled || !die);
let rec loop () =
let response = unmarshal_response ic in
match response with
| Response resp ->
(match T.use_response task resp with
- | `Die -> raise Die
| `Stay -> last_task := None; ()
| `StayReset -> last_task := None; raise KillRespawn)
| RespGetCounterNewUnivLevel ->
@@ -189,34 +192,37 @@ module Make(T : Task) = struct
loop ()
with
| Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
- | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e ->
raise e (* we pass the exception to the external handler *)
| MarshalError s -> T.on_marshal_error s task
| e ->
pr_err ("Uncaught exception in worker manager: "^
string_of_ppcmds (print e));
flush_all ()
- done
+ end;
+ worker_age := `Old;
+ done;
+ raise Die
with
| KillRespawn ->
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id respawn
- | Die -> Worker.kill proc; ignore(Worker.wait proc)
+ manage_slave ~cancel:cancel_user_req ~die id respawn
+ | (Die | TQueue.BeingDestroyed) -> Worker.kill proc;ignore(Worker.wait proc)
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
T.on_task_cancellation_or_expiration !last_task;
ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id respawn
+ manage_slave ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
msg_warning(strbrk "The worker was cancelled.");
T.on_task_cancellation_or_expiration !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id respawn
+ manage_slave ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
match T.on_slave_death !last_task with
| `Stay ->
msg_warning(strbrk "The worker process died badly.");
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id respawn
+ manage_slave ~cancel:cancel_user_req ~die id respawn
| `Exit exit_code ->
Worker.kill proc;
let exit_status proc = match Worker.wait proc with
@@ -290,6 +296,13 @@ module Make(T : Task) = struct
let init n =
WorkersPool.init n manage_slave
(fun n -> Printf.sprintf "%s:%d" T.name n)
+ let destroy () =
+ WorkersPool.destroy ();
+ TQueue.destroy queue
+
+ let with_n_workers n f =
+ try init n; let rc = f ~join ~cancel_all in destroy (); rc
+ with e -> let e = Errors.push e in destroy (); raise e
let n_workers = WorkersPool.n_workers