diff options
| author | Enrico Tassi | 2014-07-21 10:03:04 +0200 |
|---|---|---|
| committer | Enrico Tassi | 2014-08-05 18:38:28 +0200 |
| commit | 7dba9d3f3ce62246b9d8562d2818c63ba37b206e (patch) | |
| tree | fbf0e133e160a5f7ff03f8a0b5bb4d0f47b43105 /stm/asyncTaskQueue.ml | |
| parent | 4e724634839726aa11534f16e4bfb95cd81232a4 (diff) | |
STM: new "par:" goal selector, like "all:" but in parallel
par: distributes the goals among a number of workers given
by -async-proofs-tac-j (defaults to 2).
Diffstat (limited to 'stm/asyncTaskQueue.ml')
| -rw-r--r-- | stm/asyncTaskQueue.ml | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 65219724e4..36689e6e5f 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -26,8 +26,8 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : task -> request option - val use_response : task -> response -> [ `Die | `Stay | `StayReset ] + val request_of_task : [ `Fresh | `Old ] -> task -> request option + val use_response : task -> response -> [ `Stay | `StayReset ] val on_marshal_error : string -> task -> unit val on_slave_death : task option -> [ `Exit of int | `Stay ] val on_task_cancellation_or_expiration : task option -> unit @@ -120,12 +120,15 @@ module Make(T : Task) = struct with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_more_data: "^s) - let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2) + let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) let join () = if not (WorkersPool.is_empty ()) then TQueue.wait_until_n_are_waiting_and_queue_empty (WorkersPool.n_workers ()) queue + let cancel_all () = + TQueue.clear queue; + WorkersPool.cancel_all () exception KillRespawn exception Die @@ -134,7 +137,7 @@ module Make(T : Task) = struct let report_status ?(id = !Flags.async_proofs_worker_id) s = Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s)) - let rec manage_slave ~cancel:cancel_user_req id respawn = + let rec manage_slave ~cancel:cancel_user_req ~die id respawn = let ic, oc, proc = let rec set_slave_opt = function | [] -> !Flags.async_proofs_flags_for_workers @ @@ -151,29 +154,29 @@ module Make(T : Task) = struct let last_task = ref None in let task_expired = ref false in let task_cancelled = ref false in + let worker_age = ref `Fresh in CThread.prepare_in_channel_for_thread_friendly_io ic; try - while true do + while not !die do prerr_endline "waiting for a task"; report_status ~id "Idle"; let task, cancel_switch = TQueue.pop queue in prerr_endline ("got task: "^T.name_of_task task); last_task := Some task; - try - let req = T.request_of_task task in + begin try + let req = T.request_of_task !worker_age task in if req = None then raise Expired; marshal_request oc (Request (Option.get req)); Worker.kill_if proc ~sec:1 (fun () -> task_expired := !cancel_switch; task_cancelled := !cancel_user_req; if !cancel_user_req then cancel_user_req := false; - !task_expired || !task_cancelled); + !task_expired || !task_cancelled || !die); let rec loop () = let response = unmarshal_response ic in match response with | Response resp -> (match T.use_response task resp with - | `Die -> raise Die | `Stay -> last_task := None; () | `StayReset -> last_task := None; raise KillRespawn) | RespGetCounterNewUnivLevel -> @@ -189,34 +192,37 @@ module Make(T : Task) = struct loop () with | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task) - | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> + | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e -> raise e (* we pass the exception to the external handler *) | MarshalError s -> T.on_marshal_error s task | e -> pr_err ("Uncaught exception in worker manager: "^ string_of_ppcmds (print e)); flush_all () - done + end; + worker_age := `Old; + done; + raise Die with | KillRespawn -> Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn - | Die -> Worker.kill proc; ignore(Worker.wait proc) + manage_slave ~cancel:cancel_user_req ~die id respawn + | (Die | TQueue.BeingDestroyed) -> Worker.kill proc;ignore(Worker.wait proc) | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> T.on_task_cancellation_or_expiration !last_task; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> msg_warning(strbrk "The worker was cancelled."); T.on_task_cancellation_or_expiration !last_task; Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file -> match T.on_slave_death !last_task with | `Stay -> msg_warning(strbrk "The worker process died badly."); Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | `Exit exit_code -> Worker.kill proc; let exit_status proc = match Worker.wait proc with @@ -290,6 +296,13 @@ module Make(T : Task) = struct let init n = WorkersPool.init n manage_slave (fun n -> Printf.sprintf "%s:%d" T.name n) + let destroy () = + WorkersPool.destroy (); + TQueue.destroy queue + + let with_n_workers n f = + try init n; let rc = f ~join ~cancel_all in destroy (); rc + with e -> let e = Errors.push e in destroy (); raise e let n_workers = WorkersPool.n_workers |
