diff options
| author | Enrico Tassi | 2014-07-21 10:03:04 +0200 |
|---|---|---|
| committer | Enrico Tassi | 2014-08-05 18:38:28 +0200 |
| commit | 7dba9d3f3ce62246b9d8562d2818c63ba37b206e (patch) | |
| tree | fbf0e133e160a5f7ff03f8a0b5bb4d0f47b43105 /stm | |
| parent | 4e724634839726aa11534f16e4bfb95cd81232a4 (diff) | |
STM: new "par:" goal selector, like "all:" but in parallel
par: distributes the goals among a number of workers given
by -async-proofs-tac-j (defaults to 2).
Diffstat (limited to 'stm')
| -rw-r--r-- | stm/asyncTaskQueue.ml | 45 | ||||
| -rw-r--r-- | stm/asyncTaskQueue.mli | 11 | ||||
| -rw-r--r-- | stm/stm.ml | 215 | ||||
| -rw-r--r-- | stm/stm.mli | 3 | ||||
| -rw-r--r-- | stm/tQueue.ml | 19 | ||||
| -rw-r--r-- | stm/tQueue.mli | 5 | ||||
| -rw-r--r-- | stm/tacworkertop.ml | 15 | ||||
| -rw-r--r-- | stm/tacworkertop.mllib | 1 | ||||
| -rw-r--r-- | stm/vernac_classifier.ml | 16 | ||||
| -rw-r--r-- | stm/workerPool.ml | 29 | ||||
| -rw-r--r-- | stm/workerPool.mli | 6 |
11 files changed, 300 insertions, 65 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 65219724e4..36689e6e5f 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -26,8 +26,8 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : task -> request option - val use_response : task -> response -> [ `Die | `Stay | `StayReset ] + val request_of_task : [ `Fresh | `Old ] -> task -> request option + val use_response : task -> response -> [ `Stay | `StayReset ] val on_marshal_error : string -> task -> unit val on_slave_death : task option -> [ `Exit of int | `Stay ] val on_task_cancellation_or_expiration : task option -> unit @@ -120,12 +120,15 @@ module Make(T : Task) = struct with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_more_data: "^s) - let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2) + let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) let join () = if not (WorkersPool.is_empty ()) then TQueue.wait_until_n_are_waiting_and_queue_empty (WorkersPool.n_workers ()) queue + let cancel_all () = + TQueue.clear queue; + WorkersPool.cancel_all () exception KillRespawn exception Die @@ -134,7 +137,7 @@ module Make(T : Task) = struct let report_status ?(id = !Flags.async_proofs_worker_id) s = Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s)) - let rec manage_slave ~cancel:cancel_user_req id respawn = + let rec manage_slave ~cancel:cancel_user_req ~die id respawn = let ic, oc, proc = let rec set_slave_opt = function | [] -> !Flags.async_proofs_flags_for_workers @ @@ -151,29 +154,29 @@ module Make(T : Task) = struct let last_task = ref None in let task_expired = ref false in let task_cancelled = ref false in + let worker_age = ref `Fresh in CThread.prepare_in_channel_for_thread_friendly_io ic; try - while true do + while not !die do prerr_endline "waiting for a task"; report_status ~id "Idle"; let task, cancel_switch = TQueue.pop queue in prerr_endline ("got task: "^T.name_of_task task); last_task := Some task; - try - let req = T.request_of_task task in + begin try + let req = T.request_of_task !worker_age task in if req = None then raise Expired; marshal_request oc (Request (Option.get req)); Worker.kill_if proc ~sec:1 (fun () -> task_expired := !cancel_switch; task_cancelled := !cancel_user_req; if !cancel_user_req then cancel_user_req := false; - !task_expired || !task_cancelled); + !task_expired || !task_cancelled || !die); let rec loop () = let response = unmarshal_response ic in match response with | Response resp -> (match T.use_response task resp with - | `Die -> raise Die | `Stay -> last_task := None; () | `StayReset -> last_task := None; raise KillRespawn) | RespGetCounterNewUnivLevel -> @@ -189,34 +192,37 @@ module Make(T : Task) = struct loop () with | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task) - | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> + | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e -> raise e (* we pass the exception to the external handler *) | MarshalError s -> T.on_marshal_error s task | e -> pr_err ("Uncaught exception in worker manager: "^ string_of_ppcmds (print e)); flush_all () - done + end; + worker_age := `Old; + done; + raise Die with | KillRespawn -> Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn - | Die -> Worker.kill proc; ignore(Worker.wait proc) + manage_slave ~cancel:cancel_user_req ~die id respawn + | (Die | TQueue.BeingDestroyed) -> Worker.kill proc;ignore(Worker.wait proc) | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> T.on_task_cancellation_or_expiration !last_task; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> msg_warning(strbrk "The worker was cancelled."); T.on_task_cancellation_or_expiration !last_task; Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file -> match T.on_slave_death !last_task with | `Stay -> msg_warning(strbrk "The worker process died badly."); Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id respawn + manage_slave ~cancel:cancel_user_req ~die id respawn | `Exit exit_code -> Worker.kill proc; let exit_status proc = match Worker.wait proc with @@ -290,6 +296,13 @@ module Make(T : Task) = struct let init n = WorkersPool.init n manage_slave (fun n -> Printf.sprintf "%s:%d" T.name n) + let destroy () = + WorkersPool.destroy (); + TQueue.destroy queue + + let with_n_workers n f = + try init n; let rc = f ~join ~cancel_all in destroy (); rc + with e -> let e = Errors.push e in destroy (); raise e let n_workers = WorkersPool.n_workers diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli index e01479d30b..ddbb28457d 100644 --- a/stm/asyncTaskQueue.mli +++ b/stm/asyncTaskQueue.mli @@ -18,8 +18,8 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : task -> request option - val use_response : task -> response -> [ `Die | `Stay | `StayReset ] + val request_of_task : [ `Fresh | `Old ] -> task -> request option + val use_response : task -> response -> [ `Stay | `StayReset ] val on_marshal_error : string -> task -> unit val on_slave_death : task option -> [ `Exit of int | `Stay ] val on_task_cancellation_or_expiration : task option -> unit @@ -40,6 +40,10 @@ module Make(T : Task) : sig (* Number of workers, 0 = lazy local *) val init : int -> unit + val destroy : unit -> unit + + val with_n_workers : + int -> (join:(unit -> unit) -> cancel_all:(unit -> unit) -> 'a) -> 'a val n_workers : unit -> int @@ -47,6 +51,7 @@ module Make(T : Task) : sig (* blocking function that waits for the task queue to be empty *) val join : unit -> unit + val cancel_all : unit -> unit (* slave process main loop *) val slave_main_loop : (unit -> unit) -> unit @@ -54,7 +59,7 @@ module Make(T : Task) : sig val cancel_worker : string -> unit - val reorder_tasks : (T.task -> T.task -> int) -> unit + val set_order : (T.task -> T.task -> int) -> unit val dump : unit -> T.task list diff --git a/stm/stm.ml b/stm/stm.ml index 0827e0bfa4..20448947f3 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -13,6 +13,7 @@ let prerr_endline s = if !Flags.debug then begin pr_err s end else () let (f_process_error, process_error_hook) = Hook.make () let ((f_interp : (?verbosely:bool -> ?proof:Proof_global.closed_proof -> Loc.t * Vernacexpr.vernac_expr -> unit) Hook.value), interp_hook) = Hook.make () +let with_fail, with_fail_hook = Hook.make () open Vernacexpr open Errors @@ -82,7 +83,7 @@ type branch_type = [ `Master | `Proof of proof_mode * depth | `Edit of proof_mode * Stateid.t * Stateid.t ] -type cmd_t = ast * Id.t list +type cmd_t = ast * Id.t list * bool type fork_t = ast * Vcs_.Branch.t * Vernacexpr.opacity_guarantee * Id.t list type qed_t = { qast : ast; @@ -249,7 +250,7 @@ end = struct let fname = "stm_" ^ Str.global_replace (Str.regexp " ") "_" (System.process_id ()) in let string_of_transaction = function - | Cmd (t, _) | Fork (t, _,_,_) -> + | Cmd (t, _, _) | Fork (t, _,_,_) -> (try string_of_ppcmds (pr_ast t) with _ -> "ERR") | Sideff (Some t) -> sprintf "Sideff(%s)" @@ -538,7 +539,7 @@ module State : sig ?redefine:bool -> ?cache:Summary.marshallable -> (unit -> unit) -> Stateid.t -> unit val install_cached : Stateid.t -> unit - val is_cached : Stateid.t -> bool + val is_cached : ?cache:Summary.marshallable -> Stateid.t -> bool val exn_on : Stateid.t -> ?valid:Stateid.t -> exn -> exn @@ -566,13 +567,23 @@ end = struct let () = Future.set_freeze (fun () -> in_t (freeze_global_state `No, !cur_id)) (fun t -> let s,i = out_t t in unfreeze_global_state s; cur_id := i) + + type frozen_state = state - let is_cached id = - Stateid.equal id !cur_id || - try match VCS.get_info id with - | { state = Some _ } -> true - | _ -> false - with VCS.Expired -> false + let freeze marhallable id = VCS.set_state id (freeze_global_state marhallable) + + let is_cached ?(cache=`No) id = + if Stateid.equal id !cur_id then + try match VCS.get_info id with + | { state = None } when cache = `Yes -> freeze `No id; true + | { state = None } when cache = `Shallow -> freeze `Shallow id; true + | _ -> true + with VCS.Expired -> false + else + try match VCS.get_info id with + | { state = Some _ } -> true + | _ -> false + with VCS.Expired -> false let install_cached id = if Stateid.equal id !cur_id then () else (* optimization *) @@ -582,8 +593,6 @@ end = struct | _ -> anomaly (str "unfreezing a non existing state") in unfreeze_global_state s; cur_id := id - type frozen_state = state - let get_cached id = try match VCS.get_info id with | { state = Some s } -> s @@ -594,8 +603,6 @@ end = struct try if VCS.get_state id = None then VCS.set_state id s with VCS.Expired -> () - let freeze marhallable id = VCS.set_state id (freeze_global_state marhallable) - let exn_on id ?valid e = match Stateid.get e with | Some _ -> e @@ -700,7 +707,8 @@ module Task = struct let name_of_task t = t.t_name let name_of_request r = r.r_name - let request_of_task { t_exn_info; t_start; t_stop; t_loc; t_uuid; t_name } = + let request_of_task age { t_exn_info;t_start;t_stop;t_loc;t_uuid;t_name } = + assert(age = `Fresh); try Some { r_exn_info = t_exn_info; r_stop = t_stop; @@ -764,7 +772,7 @@ module Task = struct VCS.print (); RespBuiltProof(rc,time) with - |e when Errors.noncritical e -> + | e when Errors.noncritical e -> (* This can happen if the proof is broken. The error has also been * signalled as a feedback, hence we can silently recover *) let e_error_at, e_safe_id = match Stateid.get e with @@ -877,7 +885,7 @@ end = struct spc () ++ print e) | Some (_, cur) -> match VCS.visit cur with - | { step = `Cmd ( { loc }, _) } + | { step = `Cmd ( { loc }, _, _) } | { step = `Fork ( { loc }, _, _, _) } | { step = `Qed ( { qast = { loc } }, _) } | { step = `Sideff (`Ast ( { loc }, _)) } -> @@ -938,9 +946,9 @@ end = struct let set_perspective idl = let open Stateid in let p = List.fold_right Set.add idl Set.empty in - TQueue.set_order queue (fun task1 task2 -> - let TaskBuildProof (_, a1, b1, _, _,_,_,_) = task1 in - let TaskBuildProof (_, a2, b2, _, _,_,_,_) = task2 in + TaskQueue.set_order (fun task1 task2 -> + let { Task.t_start = a1; Task.t_stop = b1 } = task1 in + let { Task.t_start = a2; Task.t_stop = b2 } = task2 in match Set.mem a1 p || Set.mem b1 p, Set.mem a2 p || Set.mem b2 p with | true, true | false, false -> 0 | true, false -> -1 @@ -983,10 +991,150 @@ end = struct let tasks = TaskQueue.dump () in prerr_endline (Printf.sprintf "dumping %d tasks\n" (List.length tasks)); List.map (function r -> { r with r_uuid = List.assoc r.r_uuid f2t_map }) - (CList.map_filter Task.request_of_task tasks) + (CList.map_filter (Task.request_of_task `Fresh) tasks) + +end + +module SubTask = struct + + let reach_known_state = ref (fun ?redefine_qed ~cache id -> ()) + let set_reach_known_state f = reach_known_state := f + + type output = Constr.constr * Evd.evar_universe_context + + let forward_feedback = forward_feedback + type task = { + t_state : Stateid.t; + t_state_fb : Stateid.t; + t_assign : output Future.assignement -> unit; + t_ast : ast; + t_goal : Goal.goal; + t_kill : unit -> unit; + t_name : string } + + type request = { + r_state : Stateid.t; + r_state_fb : Stateid.t; + r_document : VCS.vcs option; + r_ast : ast; + r_goal : Goal.goal; + r_name : string } + + type response = + | RespBuiltSubProof of output + | RespError of std_ppcmds + + let name = "tacworker" + let extra_env () = [||] + + (* run by the master, on a thread *) + let request_of_task age { t_state; t_state_fb; t_ast; t_goal; t_name } = + try Some { + r_state = t_state; + r_state_fb = t_state_fb; + r_document = + if age = `Old then None + else Some (VCS.slice ~start:t_state ~stop:t_state); + r_ast = t_ast; + r_goal = t_goal; + r_name = t_name } + with VCS.Expired -> None + + let use_response { t_assign; t_state; t_state_fb; t_kill } = function + | RespBuiltSubProof o -> t_assign (`Val o); `Stay + | RespError msg -> + let e = Stateid.add ~valid:t_state (RemoteException msg) t_state_fb in + t_assign (`Exn e); + t_kill (); + `Stay + + let on_marshal_error err { t_name } = + pr_err ("Fatal marshal error: " ^ t_name ); + flush_all (); exit 1 + + let on_slave_death task = `Stay + let on_task_cancellation_or_expiration task = () (* We shall die *) + + let perform { r_state = id; r_state_fb; r_document = vcs; r_ast; r_goal } = + Option.iter VCS.restore vcs; + try + !reach_known_state ~cache:`No id; + let t, uc = Future.purify (fun () -> + vernac_interp r_state_fb r_ast; + let _,_,_,_,sigma = Proof.proof (Proof_global.give_me_the_proof ()) in + match Goal.solution sigma r_goal with + | None -> Errors.errorlabstrm "Stm" (str "no progress") + | Some t -> + let t = Evarutil.nf_evar sigma t in + if Evarutil.is_ground_term sigma t then + t, Evd.evar_universe_context sigma + else Errors.errorlabstrm "Stm" (str"The solution is not ground")) + () in + RespBuiltSubProof (t,uc) + with e when Errors.noncritical e -> RespError (Errors.print e) + + let name_of_task { t_name } = t_name + let name_of_request { r_name } = r_name + end +module Partac = struct + + module TaskQueue = AsyncTaskQueue.Make(SubTask) + + let vernac_interp nworkers safe_id id { verbose; loc; expr = e } = + let e, etac, time, fail = + let rec find time fail = function VernacSolve(_,re,b) -> re, b, time, fail + | VernacTime [_,e] -> find true fail e + | VernacFail e -> find time true e + | _ -> errorlabstrm "Stm" (str"unsupported") in find false false e in + Hook.get with_fail fail (fun () -> + (if time then System.with_time false else (fun x -> x)) (fun () -> + ignore(TaskQueue.with_n_workers nworkers (fun ~join ~cancel_all -> + Proof_global.with_current_proof (fun _ p -> + let goals, _, _, _, _ = Proof.proof p in + let open SubTask in + let res = CList.map_i (fun i g -> + let f,assign= Future.create_delegate (State.exn_on id ~valid:safe_id) in + let t_ast = { verbose;loc;expr = VernacSolve(SelectNth i,e,etac) } in + let t_name = Goal.uid g in + TaskQueue.enqueue_task + { t_state = safe_id; t_state_fb = id; + t_assign = assign; t_ast; t_goal = g; t_name; t_kill = cancel_all } + (ref false); + Goal.uid g,f) + 1 goals in + join (); + let assign_tac : unit Proofview.tactic = + Proofview.V82.tactic (fun gl -> + let open Tacmach in + let sigma, g = project gl, sig_it gl in + let gid = Goal.uid g in + let f = + try List.assoc gid res + with Not_found -> Errors.anomaly(str"Partac: wrong focus") in + if Future.is_over f then + let pt, uc = Future.join f in + prerr_endline Pp.(string_of_ppcmds(hov 0 ( + str"g=" ++ str gid ++ spc () ++ + str"t=" ++ (Printer.pr_constr pt) ++ spc () ++ + str"uc=" ++ Evd.pr_evar_universe_context uc))); + let sigma = Goal.V82.partial_solution sigma g pt in + let sigma = Evd.merge_universe_context sigma uc in + re_sig [] sigma + else (* One has failed and cancelled the others, but not this one *) + re_sig [g] sigma) in + Proof.run_tactic (Global.env()) assign_tac p)))) ()) + + let slave_main_loop = TaskQueue.slave_main_loop + let slave_init_stdout = TaskQueue.slave_init_stdout + +end + +let tacslave_main_loop () = Partac.slave_main_loop Ephemeron.clear +let tacslave_init_stdout = Partac.slave_init_stdout + (* Runs all transactions needed to reach a state *) module Reach : sig @@ -1019,7 +1167,7 @@ let collect_proof cur hd brkind id = let rec collect last accn id = let view = VCS.visit id in match last, view.step with - | _, `Cmd (x, _) -> collect (Some (id,x)) (id::accn) view.next + | _, `Cmd (x, _, _) -> collect (Some (id,x)) (id::accn) view.next | _, `Alias _ -> `Sync (no_name,`Alias) | _, `Fork(_,_,_,_::_::_)-> `Sync (no_name,`MutualProofs) | _, `Fork(_,_,Doesn'tGuaranteeOpacity,_) -> @@ -1099,7 +1247,7 @@ let known_state ?(redefine_qed=false) ~cache id = (* traverses the dag backward from nodes being already calculated *) and reach ?(redefine_qed=false) ?(cache=cache) id = prerr_endline ("reaching: " ^ Stateid.to_string id); - if not redefine_qed && State.is_cached id then begin + if not redefine_qed && State.is_cached ~cache id then begin State.install_cached id; feedback ~state_id:id Feedback.Processed; prerr_endline ("reached (cache)") @@ -1110,9 +1258,13 @@ let known_state ?(redefine_qed=false) ~cache id = | `Alias id -> (fun () -> reach view.next; reach id ), cache - | `Cmd (x,_) -> (fun () -> + | `Cmd (x,_,false) -> (fun () -> reach view.next; vernac_interp id x ), cache + | `Cmd (x,_,true) -> (fun () -> + reach ~cache:`Shallow view.next; + Partac.vernac_interp !Flags.async_proofs_n_tacworkers view.next id x + ), cache | `Fork (x,_,_,_) -> (fun () -> reach view.next; vernac_interp id x; wall_clock_last_fork := Unix.gettimeofday () @@ -1205,6 +1357,7 @@ let known_state ?(redefine_qed=false) ~cache id = end let _ = Task.set_reach_known_state Reach.known_state +let _ = SubTask.set_reach_known_state Reach.known_state (* The backtrack module simulates the classic behavior of a linear document *) module Backtrack : sig @@ -1263,7 +1416,7 @@ end = struct if id = Stateid.initial || id = Stateid.dummy then [] else match VCS.visit id with | { step = `Fork (_,_,_,l) } -> l - | { step = `Cmd (_,l) } -> l + | { step = `Cmd (_,l,_) } -> l | _ -> [] in match f acc (id, vcs, ids) with | `Stop x -> x @@ -1550,7 +1703,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ~tty verbose c (loc, expr) = | VtQuery (true,report_id), w -> assert(Stateid.equal report_id Stateid.dummy); let id = VCS.new_node ~id:newtip () in - VCS.commit id (Cmd (x,[])); + VCS.commit id (Cmd (x,[],false)); Backtrack.record (); if w == VtNow then finish (); `Ok | VtQuery (false,_), VtLater -> anomaly(str"classifier: VtQuery + VtLater must imply part_of_script") @@ -1569,7 +1722,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ~tty verbose c (loc, expr) = | VtProofMode mode, VtNow -> let id = VCS.new_node ~id:newtip () in VCS.checkout VCS.Branch.master; - VCS.commit id (Cmd (x,[])); + VCS.commit id (Cmd (x,[],false)); VCS.propagate_sideff (Some x); List.iter (fun bn -> match VCS.get_branch bn with @@ -1585,9 +1738,9 @@ let process_transaction ?(newtip=Stateid.fresh ()) ~tty verbose c (loc, expr) = Backtrack.record (); finish (); `Ok - | VtProofStep, w -> + | VtProofStep paral, w -> let id = VCS.new_node ~id:newtip () in - VCS.commit id (Cmd (x,[])); + VCS.commit id (Cmd (x,[],paral)); Backtrack.record (); if w == VtNow then finish (); `Ok | VtQed keep, w -> let rc = merge_proof_branch ~id:newtip x keep head in @@ -1602,7 +1755,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ~tty verbose c (loc, expr) = | VtSideff l, w -> let id = VCS.new_node ~id:newtip () in VCS.checkout VCS.Branch.master; - VCS.commit id (Cmd (x,l)); + VCS.commit id (Cmd (x,l,false)); VCS.propagate_sideff (Some x); VCS.checkout_shallowest_proof_branch (); Backtrack.record (); if w == VtNow then finish (); `Ok @@ -1624,7 +1777,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ~tty verbose c (loc, expr) = VCS.branch bname (`Proof ("Classic", VCS.proof_nesting () + 1)); Proof_global.activate_proof_mode "Classic"; end else begin - VCS.commit id (Cmd (x,[])); + VCS.commit id (Cmd (x,[],false)); VCS.propagate_sideff (Some x); VCS.checkout_shallowest_proof_branch (); end in @@ -1848,7 +2001,7 @@ let get_script prf = | `Sideff (`Ast (x,_)) -> find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Sideff (`Id id) -> find acc id - | `Cmd (x,_) -> find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next + | `Cmd (x,_,_) -> find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Alias id -> find acc id | `Fork _ -> find acc view.next in diff --git a/stm/stm.mli b/stm/stm.mli index b6450e6ac2..28b165e4bb 100644 --- a/stm/stm.mli +++ b/stm/stm.mli @@ -63,6 +63,8 @@ val get_current_state : unit -> Stateid.t val init : unit -> unit val slave_main_loop : unit -> unit val slave_init_stdout : unit -> unit +val tacslave_main_loop : unit -> unit +val tacslave_init_stdout : unit -> unit (* Filename *) val set_compilation_hints : string -> unit @@ -87,3 +89,4 @@ val show_script : ?proof:Proof_global.closed_proof -> unit -> unit val process_error_hook : (exn -> exn) Hook.t val interp_hook : (?verbosely:bool -> ?proof:Proof_global.closed_proof -> Loc.t * Vernacexpr.vernac_expr -> unit) Hook.t +val with_fail_hook : (bool -> (unit -> unit) -> unit) Hook.t diff --git a/stm/tQueue.ml b/stm/tQueue.ml index e4b9d382d8..9d3553c36c 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -42,23 +42,29 @@ type 'a t = { cond : Condition.t; mutable nwaiting : int; cond_waiting : Condition.t; + mutable release : bool; } +exception BeingDestroyed + let create () = { queue = PriorityQueue.create (); lock = Mutex.create (); cond = Condition.create (); nwaiting = 0; cond_waiting = Condition.create (); + release = false; } let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) = + if tq.release then raise BeingDestroyed; Mutex.lock m; while PriorityQueue.is_empty q do tq.nwaiting <- tq.nwaiting + 1; Condition.signal cn; Condition.wait c m; tq.nwaiting <- tq.nwaiting - 1; + if tq.release then (Mutex.unlock m; raise BeingDestroyed) done; let x = PriorityQueue.pop q in Condition.signal c; @@ -66,7 +72,9 @@ let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) = Mutex.unlock m; x -let push { queue = q; lock = m; cond = c } x = +let push { queue = q; lock = m; cond = c; release } x = + if release then Errors.anomaly(Pp.str + "TQueue.push while being destroyed! Only 1 producer/destroyer allowed"); Mutex.lock m; PriorityQueue.push x q; Condition.signal c; @@ -79,6 +87,15 @@ let clear { queue = q; lock = m; cond = c } = let is_empty { queue = q } = PriorityQueue.is_empty q +let destroy tq = + tq.release <- true; + while tq.nwaiting > 0 do + Mutex.lock tq.lock; + Condition.signal tq.cond; + Mutex.unlock tq.lock; + done; + tq.release <- false + let wait_until_n_are_waiting_and_queue_empty j tq = Mutex.lock tq.lock; while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do diff --git a/stm/tQueue.mli b/stm/tQueue.mli index 23063262b3..c006b6a867 100644 --- a/stm/tQueue.mli +++ b/stm/tQueue.mli @@ -17,3 +17,8 @@ val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit val dump : 'a t -> 'a list val clear : 'a t -> unit val is_empty : 'a t -> bool + +exception BeingDestroyed +(* Threads blocked in pop can get this exception if the queue is being + * destroyed *) +val destroy : 'a t -> unit diff --git a/stm/tacworkertop.ml b/stm/tacworkertop.ml new file mode 100644 index 0000000000..8a582a6891 --- /dev/null +++ b/stm/tacworkertop.ml @@ -0,0 +1,15 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +let () = Coqtop.toploop_init := (fun args -> + Flags.make_silent true; + Stm.tacslave_init_stdout (); + args) + +let () = Coqtop.toploop_run := Stm.tacslave_main_loop + diff --git a/stm/tacworkertop.mllib b/stm/tacworkertop.mllib new file mode 100644 index 0000000000..db38fde279 --- /dev/null +++ b/stm/tacworkertop.mllib @@ -0,0 +1 @@ +Tacworkertop diff --git a/stm/vernac_classifier.ml b/stm/vernac_classifier.ml index 2b8c36ca96..5bbd857d9c 100644 --- a/stm/vernac_classifier.ml +++ b/stm/vernac_classifier.ml @@ -18,7 +18,8 @@ let string_of_vernac_type = function | VtSideff _ -> "Sideff" | VtQed VtKeep -> "Qed(keep)" | VtQed VtDrop -> "Qed(drop)" - | VtProofStep -> "ProofStep" + | VtProofStep false -> "ProofStep" + | VtProofStep true -> "ProofStep (parallel)" | VtProofMode s -> "ProofMode " ^ s | VtQuery (b,_) -> "Query" ^ string_of_in_script b | VtStm ((VtFinish|VtJoinDocument|VtObserve _|VtPrintDag|VtWait), b) -> @@ -80,9 +81,9 @@ let rec classify_vernac e = | VernacTime e -> classify_vernac_list e | VernacFail e -> (* Fail Qed or Fail Lemma must not join/fork the DAG *) (match classify_vernac e with - | ( VtQuery _ | VtProofStep | VtSideff _ + | ( VtQuery _ | VtProofStep _ | VtSideff _ | VtStm _ | VtProofMode _ ), _ as x -> x - | VtQed _, _ -> VtProofStep, VtNow + | VtQed _, _ -> VtProofStep false, VtNow | (VtStartProof _ | VtUnknown), _ -> VtUnknown, VtNow) (* Qed *) | VernacEndProof Admitted | VernacAbort _ -> VtQed VtDrop, VtLater @@ -91,6 +92,7 @@ let rec classify_vernac e = | VernacShow _ | VernacPrint _ | VernacSearch _ | VernacLocate _ | VernacCheckMayEval _ -> VtQuery (true,Stateid.dummy), VtLater (* ProofStep *) + | VernacSolve (SelectAllParallel,_,_) -> VtProofStep true, VtLater | VernacProof _ | VernacBullet _ | VernacFocus _ | VernacUnfocus @@ -98,7 +100,7 @@ let rec classify_vernac e = | VernacSolve _ | VernacCheckGuard | VernacUnfocused - | VernacSolveExistential _ -> VtProofStep, VtLater + | VernacSolveExistential _ -> VtProofStep false, VtLater (* Options changing parser *) | VernacUnsetOption (["Default";"Proof";"Using"]) | VernacSetOption (["Default";"Proof";"Using"],_) -> VtSideff [], VtNow @@ -117,13 +119,15 @@ let rec classify_vernac e = let ids, open_proof = List.fold_left (fun (l,b) (((_,id),_,_,_,p),_) -> id::l, b || p = None) ([],false) l in - if open_proof then VtStartProof ("Classic",GuaranteesOpacity,ids), VtLater + if open_proof + then VtStartProof ("Classic",GuaranteesOpacity,ids), VtLater else VtSideff ids, VtLater | VernacCoFixpoint (_,l) -> let ids, open_proof = List.fold_left (fun (l,b) (((_,id),_,_,p),_) -> id::l, b || p = None) ([],false) l in - if open_proof then VtStartProof ("Classic",GuaranteesOpacity,ids), VtLater + if open_proof + then VtStartProof ("Classic",GuaranteesOpacity,ids), VtLater else VtSideff ids, VtLater (* Sideff: apply to all open branches. usually run on master only *) | VernacAssumption (_,_,l) -> diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 593240ad4f..2e192cdec5 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -18,7 +18,12 @@ type spawn = args:string array -> env:string array -> unit -> in_channel * out_channel * Worker.process -let slave_managers = ref None +type worker = { + name : worker_id; + cancel : bool ref; + die : bool ref; + manager : Thread.t } +let slave_managers : worker array option ref = ref None let n_workers () = match !slave_managers with | None -> 0 @@ -50,16 +55,26 @@ let init ~size:n ~manager:manage_slave mk_name = (Array.init n (fun x -> let name = mk_name x in let cancel = ref false in - name, cancel, Thread.create (manage_slave ~cancel name) (respawn name))) + let die = ref false in + let manager = + Thread.create (manage_slave ~cancel ~die name) (respawn name) in + { name; cancel; die; manager })) -let cancel n = +let foreach f = match !slave_managers with | None -> () | Some a -> - for i = 0 to Array.length a - 1 do - let name, switch, _ = a.(i) in - if n = name then switch := true - done + for i = 0 to Array.length a - 1 do f a.(i) done + +let cancel n = foreach (fun { name; cancel } -> if n = name then cancel := true) + +let cancel_all () = foreach (fun { cancel } -> cancel := true) + +let kill_all () = foreach (fun { die } -> die := true) + +let destroy () = + kill_all (); + slave_managers := None let worker_handshake slave_ic slave_oc = try diff --git a/stm/workerPool.mli b/stm/workerPool.mli index d55b35c282..4e5512a4be 100644 --- a/stm/workerPool.mli +++ b/stm/workerPool.mli @@ -19,11 +19,15 @@ type spawn = in_channel * out_channel * Worker.process val init : - size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> + size:int -> + manager:(cancel:bool ref -> die:bool ref -> worker_id -> spawn -> unit) -> (int -> worker_id) -> unit +val destroy : unit -> unit + val is_empty : unit -> bool val n_workers : unit -> int val cancel : worker_id -> unit +val cancel_all : unit -> unit (* The worker should call this function *) val worker_handshake : in_channel -> out_channel -> unit |
