diff options
Diffstat (limited to 'stm/asyncTaskQueue.ml')
| -rw-r--r-- | stm/asyncTaskQueue.ml | 100 |
1 files changed, 50 insertions, 50 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 25f9d7c187..768d94d305 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -1,26 +1,30 @@ (************************************************************************) -(* v * The Coq Proof Assistant / The Coq Development Team *) -(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2017 *) +(* * The Coq Proof Assistant / The Coq Development Team *) +(* v * INRIA, CNRS and contributors - Copyright 1999-2018 *) +(* <O___,, * (see CREDITS file for the list of authors) *) (* \VV/ **************************************************************) -(* // * This file is distributed under the terms of the *) -(* * GNU Lesser General Public License Version 2.1 *) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(* * (see LICENSE file for the text of the license) *) (************************************************************************) open CErrors open Pp open Util -let stm_pr_err pp = Format.eprintf "%s] @[%a@]%!\n" (System.process_id ()) Pp.pp_with pp - +let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (Spawned.process_id ()) Pp.pp_with pp let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else () -type 'a worker_status = [ `Fresh | `Old of 'a ] +type cancel_switch = bool ref +let async_proofs_flags_for_workers = ref [] module type Task = sig type task type competence + type worker_status = Fresh | Old of competence + (* Marshallable *) type request type response @@ -29,15 +33,14 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : competence worker_status -> task -> request option - val task_match : competence worker_status -> task -> bool - val use_response : - competence worker_status -> task -> response -> - [ `Stay of competence * task list | `End ] + val request_of_task : worker_status -> task -> request option + val task_match : worker_status -> task -> bool + val use_response : worker_status -> task -> response -> + [ `Stay of competence * task list | `End ] val on_marshal_error : string -> task -> unit val on_task_cancellation_or_expiration_or_slave_death : task option -> unit val forward_feedback : Feedback.feedback -> unit - + (* run by the worker *) val perform : request -> response @@ -47,9 +50,7 @@ module type Task = sig end -type expiration = bool ref - -module Make(T : Task) = struct +module Make(T : Task) () = struct exception Die type response = @@ -59,45 +60,45 @@ module Make(T : Task) = struct type request = Request of T.request type more_data = - | MoreDataUnivLevel of Univ.universe_level list + | MoreDataUnivLevel of UnivGen.universe_id list let slave_respond (Request r) = let res = T.perform r in Response res exception MarshalError of string - + let marshal_to_channel oc data = Marshal.to_channel oc data []; flush oc - + let marshal_err s = raise (MarshalError s) - + let marshal_request oc (req : request) = try marshal_to_channel oc req with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_request: "^s) - + let unmarshal_request ic = try (CThread.thread_friendly_input_value ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) - + let marshal_response oc (res : response) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_response: "^s) - + let unmarshal_response ic = try (CThread.thread_friendly_input_value ic : response) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) - + let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_more_data: "^s) - + let unmarshal_more_data ic = try (CThread.thread_friendly_input_value ic : more_data) with Failure s | Invalid_argument s | Sys_error s -> @@ -107,24 +108,23 @@ module Make(T : Task) = struct let open Feedback in feedback ~id:Stateid.initial (WorkerStatus(id, s)) - module Worker = Spawn.Sync(struct end) + module Worker = Spawn.Sync () module Model = struct type process = Worker.process - type extra = (T.task * expiration) TQueue.t + type extra = (T.task * cancel_switch) TQueue.t let spawn id = let name = Printf.sprintf "%s:%d" !T.name id in let proc, ic, oc = let rec set_slave_opt = function - | [] -> !Flags.async_proofs_flags_for_workers @ - ["-toploop"; !T.name^"top"; - "-worker-id"; name; + | [] -> !async_proofs_flags_for_workers @ + ["-worker-id"; name; "-async-proofs-worker-priority"; - Flags.string_of_priority !Flags.async_proofs_worker_priority] - | ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl - | ("-async-proofs" |"-toploop" |"-vio2vo" + CoqworkmgrApi.(string_of_priority !async_proofs_worker_priority)] + | ("-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl + | ("-async-proofs" |"-vio2vo" |"-load-vernac-source" |"-l" |"-load-vernac-source-verbose" |"-lv" |"-compile" |"-compile-verbose" |"-async-proofs-worker-priority" |"-worker-id") :: _ :: tl -> @@ -133,14 +133,15 @@ module Make(T : Task) = struct let args = Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in let env = Array.append (T.extra_env ()) (Unix.environment ()) in - Worker.spawn ~env Sys.argv.(0) args in + let worker_name = System.get_toplevel_path ("coq" ^ !T.name) in + Worker.spawn ~env worker_name args in name, proc, CThread.prepare_in_channel_for_thread_friendly_io ic, oc let manager cpanel (id, proc, ic, oc) = let { WorkerPool.extra = queue; exit; cancelled } = cpanel in let exit () = report_status ~id "Dead"; exit () in let last_task = ref None in - let worker_age = ref `Fresh in + let worker_age = ref T.Fresh in let got_token = ref false in let giveback_exec_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in @@ -170,8 +171,7 @@ module Make(T : Task) = struct | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in let more_univs n = - CList.init n (fun _ -> - Universes.new_univ_level (Global.current_dirpath ())) in + CList.init n (fun _ -> UnivGen.new_univ_id ()) in let rec kill_if () = if not (Worker.is_alive proc) then () @@ -213,7 +213,7 @@ module Make(T : Task) = struct | `Stay(competence, new_tasks) -> last_task := None; giveback_exec_token (); - worker_age := `Old competence; + worker_age := T.Old competence; add_tasks new_tasks in continue () @@ -236,8 +236,8 @@ module Make(T : Task) = struct type queue = { active : Pool.pool; - queue : (T.task * expiration) TQueue.t; - cleaner : Thread.t; + queue : (T.task * cancel_switch) TQueue.t; + cleaner : Thread.t option; } let create size = @@ -250,18 +250,18 @@ module Make(T : Task) = struct { active = Pool.create queue ~size; queue; - cleaner = Thread.create cleaner queue; + cleaner = if size > 0 then Some (Thread.create cleaner queue) else None; } - + let destroy { active; queue } = Pool.destroy active; TQueue.destroy queue let broadcast { queue } = TQueue.broadcast queue - let enqueue_task { queue; active } (t, _ as item) = + let enqueue_task { queue; active } t ~cancel_switch = stm_prerr_endline ("Enqueue task "^T.name_of_task t); - TQueue.push queue item + TQueue.push queue (t, cancel_switch) let cancel_worker { active } n = Pool.cancel n active @@ -297,7 +297,7 @@ module Make(T : Task) = struct let slave_handshake () = Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc) - let pp_pid pp = Pp.(str (System.process_id () ^ " ") ++ pp) + let pp_pid pp = Pp.(str (Spawned.process_id () ^ " ") ++ pp) let debug_with_pid = Feedback.(function | { contents = Message(Debug, loc, pp) } as fb -> @@ -310,7 +310,7 @@ module Make(T : Task) = struct Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc in ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x)); (* We ask master to allocate universe identifiers *) - Universes.set_remote_new_univ_level (bufferize (fun () -> + UnivGen.set_remote_new_univ_id (bufferize (fun () -> marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel; match unmarshal_more_data (Option.get !slave_ic) with | MoreDataUnivLevel l -> l)); @@ -339,14 +339,14 @@ module Make(T : Task) = struct let clear { queue; active } = assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue - + let snapshot { queue; active } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot (Pool.n_workers active) queue) let with_n_workers n f = - let q = create n in + let q = create n in try let rc = f q in destroy q; rc with e -> let e = CErrors.push e in destroy q; iraise e @@ -354,5 +354,5 @@ module Make(T : Task) = struct end -module MakeQueue(T : Task) = struct include Make(T) end -module MakeWorker(T : Task) = struct include Make(T) end +module MakeQueue(T : Task) () = struct include Make(T) () end +module MakeWorker(T : Task) () = struct include Make(T) () end |
