diff options
| author | gareuselesinge | 2013-10-11 09:10:53 +0000 |
|---|---|---|
| committer | gareuselesinge | 2013-10-11 09:10:53 +0000 |
| commit | 2af8edadfdd87c4cea63ce7d386f0304631977e9 (patch) | |
| tree | 98411b2ebf318158bd7dac126ba7679aeb1d824c | |
| parent | 0f9a7d13714f30a3c1eeee41b6a500370e5c18bb (diff) | |
STM: cancel slaves working on outdated jobs
I did not manage to make the slave manager use Unix.select to wait for a
response from the slave for a limited time and check for cancellation.
Hence the following semi-cooperative model:
- The slave has a thread that sends a Tick every second when the thread
is working
- The slave_manager will then be unblocked periodically by this tick and
check for cancellation
- Cancellation is, for the moment, implemented using kill.
To kill a process on windows one could bind TerminateProcess (>= WinXP)
or RegisterWindowMessage + BroadcastSystemMessage (>= Win2k).
See: http://msdn.microsoft.com/en-us/library/windows/desktop/ms686722%28v=vs.85%29.aspx
Another option is to make the slave_manager send to the tick thread on
the slave process a boolean answer to the Tick message, and the tick
thread could eventually bail out. But to do that, it is way better
to have a second channel, used only by this tick thread. This solution
sound very like the one proposed for windows, but requires more work.
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/coq/trunk@16878 85f007b7-540e-0410-9357-904b9bb8a0f7
| -rw-r--r-- | toplevel/stm.ml | 115 |
1 files changed, 83 insertions, 32 deletions
diff --git a/toplevel/stm.ml b/toplevel/stm.ml index 3457afff16..838497da25 100644 --- a/toplevel/stm.ml +++ b/toplevel/stm.ml @@ -58,6 +58,7 @@ module Vcs_ = Vcs.Make(Stateid) type future_proof = Entries.proof_output list Future.computation type proof_mode = string type depth = int +type cancel_switch = bool ref type branch_type = [ `Master | `Proof of proof_mode * depth @@ -67,7 +68,7 @@ type fork_t = ast * Vcs_.Branch.t * Id.t list type qed_t = { qast : ast; keep : vernac_qed_type; - mutable fproof : future_proof option; + mutable fproof : (future_proof * cancel_switch) option; brname : Vcs_.Branch.t; brinfo : branch_type Vcs_.branch_info } @@ -423,7 +424,15 @@ end = struct (* {{{ *) let delete_cluster_of id = Option.iter (fun x -> vcs := delete_cluster !vcs x) (Vcs_.cluster_of !vcs id) - let gc () = vcs := gc !vcs + let gc () = + let old_vcs = !vcs in + let new_vcs, erased_nodes = gc old_vcs in + Vcs_.NodeSet.iter (fun id -> + match (Vcs_aux.visit old_vcs id).step with + | `Qed ({ fproof = Some (_, cancel_switch) }, _) -> cancel_switch := true + | _ -> ()) + erased_nodes; + vcs := new_vcs module NB : sig (* Non blocking Sys.command *) @@ -577,7 +586,7 @@ module Slaves : sig (* (eventually) remote calls *) val build_proof : exn_info:(Stateid.t * Stateid.t) -> start:Stateid.t -> stop:Stateid.t -> - future_proof + future_proof * cancel_switch (* blocking function that waits for the task queue to be empty *) val wait_all_done : unit -> unit @@ -696,6 +705,7 @@ end = struct (* {{{ *) | RespFeedback of Interface.feedback | RespGetCounterFreshLocalUniv | RespGetCounterNewUnivLevel + | RespTick let pr_response = function | RespBuiltProof _ -> "Sucess" | RespError _ -> "Error" @@ -704,6 +714,7 @@ end = struct (* {{{ *) | RespFeedback _ -> assert false | RespGetCounterFreshLocalUniv -> "GetCounterFreshLocalUniv" | RespGetCounterNewUnivLevel -> "GetCounterNewUnivLevel" + | RespTick -> "Tick" type more_data = | MoreDataLocalUniv of Univ.universe list @@ -711,17 +722,20 @@ end = struct (* {{{ *) type task = | TaskBuildProof of (Stateid.t * Stateid.t) * Stateid.t * Stateid.t * - (Entries.proof_output list Future.assignement -> unit) + (Entries.proof_output list Future.assignement -> unit) * cancel_switch let pr_task = function - | TaskBuildProof(_,bop,eop,_) -> + | TaskBuildProof(_,bop,eop,_,_) -> "TaskBuilProof("^Stateid.to_string bop^","^Stateid.to_string eop^")" let request_of_task task = match task with - | TaskBuildProof (exn_info,bop,eop,_) -> + | TaskBuildProof (exn_info,bop,eop,_,_) -> ReqBuildProof(exn_info,eop, VCS.slice ~start:eop ~stop:bop ~purify:State.make_shallow) + let cancel_switch_of_task = function + | TaskBuildProof (_,_,_,_,c) -> c + let build_proof_here (id,valid) eop = Future.create (fun () -> !reach_known_state ~cache:`No eop; @@ -754,29 +768,39 @@ end = struct (* {{{ *) Marshal.to_channel oc data []; flush oc + let marshal_err s = raise (MarshalError s) + let marshal_request oc (req : request) = try marshal_to_channel oc req - with Invalid_argument s -> raise (MarshalError ("marshal_request: "^s)) + with Failure s | Invalid_argument s -> marshal_err ("marshal_request: "^s) let unmarshal_request ic = try (Marshal.from_channel ic : request) - with Invalid_argument s -> raise (MarshalError ("unmarshal_request: "^s)) - - let marshal_response oc (res : response) = - try marshal_to_channel oc res - with Invalid_argument s -> raise (MarshalError ("marshal_response: "^s)) + with Failure s | Invalid_argument s -> marshal_err ("unmarshal_request: "^s) + + (* Since cancelling is still cooperative, the slave runs a thread that + periodically sends a RespTick message on the same channel used by the + main slave thread to send back feedbacks and responses. We need mutual + exclusion. *) + let marshal_response = + let m = Mutex.create () in + fun oc (res : response) -> + Mutex.lock m; + try marshal_to_channel oc res; Mutex.unlock m + with Failure s | Invalid_argument s -> + Mutex.unlock m; marshal_err ("marshal_response: "^s) let unmarshal_response ic = try (Marshal.from_channel ic : response) - with Invalid_argument s -> raise (MarshalError ("unmarshal_response: "^s)) + with Failure s | Invalid_argument s -> marshal_err ("unmarshal_response: "^s) let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res - with Invalid_argument s -> raise (MarshalError ("marshal_more_data: "^s)) + with Failure s | Invalid_argument s -> marshal_err ("marshal_more_data: "^s) let unmarshal_more_data ic = try (Marshal.from_channel ic : more_data) - with Invalid_argument s -> raise (MarshalError ("unmarshal_more_data: "^s)) + with Failure s | Invalid_argument s -> marshal_err ("unmarshal_more_data: "^s) let queue : task TQueue.t = TQueue.create () @@ -785,19 +809,23 @@ end = struct (* {{{ *) (SlavesPool.n_slaves ()) queue let build_proof ~exn_info ~start ~stop = + let cancel_switch = ref false in if SlavesPool.is_empty () then - build_proof_here exn_info stop + build_proof_here exn_info stop, cancel_switch else let f, assign = Future.create_delegate () in Pp.feedback (Interface.InProgress 1); - TQueue.push queue (TaskBuildProof(exn_info,start,stop,assign)); - f + TQueue.push queue + (TaskBuildProof(exn_info,start,stop,assign,cancel_switch)); + f, cancel_switch exception RemoteException of std_ppcmds let _ = Errors.register_handler (function | RemoteException ppcmd -> ppcmd | _ -> raise Unhandled) + exception KillRespawn + let rec manage_slave respawn = let ic, oc, pid = respawn () in let kill_pid = @@ -806,31 +834,37 @@ end = struct (* {{{ *) let last_task = ref None in try while true do + prerr_endline "waiting for a task"; let task = TQueue.pop queue in + prerr_endline ("got task: "^pr_task task); last_task := Some task; + let cancel_switch = cancel_switch_of_task task in try marshal_request oc (request_of_task task); let rec loop () = let response = unmarshal_response ic in match task, response with - | TaskBuildProof(_,_,_, assign), RespBuiltProof pl -> + | TaskBuildProof(_,_,_, assign,_), RespBuiltProof pl -> assign (`Val pl); Pp.feedback (Interface.InProgress ~-1) - | TaskBuildProof(_,_,_, assign), RespError (err_id,valid,e) -> + | TaskBuildProof(_,_,_, assign,_), RespError (err_id,valid,e) -> let e = Stateid.add ~valid (RemoteException e) err_id in assign (`Exn e); Pp.feedback (Interface.InProgress ~-1) | _, RespGetCounterFreshLocalUniv -> marshal_more_data oc (MoreDataLocalUniv (CList.init 10 (fun _ -> Univ.fresh_local_univ ()))); - loop () + if !cancel_switch then raise KillRespawn else loop () | _, RespGetCounterNewUnivLevel -> + prerr_endline "-> MoreDataUnivLevel"; marshal_more_data oc (MoreDataUnivLevel (CList.init 10 (fun _ -> Termops.new_univ_level ()))); - loop () + if !cancel_switch then raise KillRespawn else loop () | _, RespFeedback {id = State state_id; content = msg} -> Pp.feedback ~state_id msg; - loop () + if !cancel_switch then raise KillRespawn else loop () + | _, RespTick -> + if !cancel_switch then raise KillRespawn else loop () | _, RespFeedback _ -> assert false (* Parsing in master process *) in loop (); last_task := None @@ -842,10 +876,11 @@ end = struct (* {{{ *) msg_warning(strbrk("Marshalling error: "^s^". "^ "The system state could not be sent to the slave process. "^ "Falling back to local, lazy, evaluation.")); - let TaskBuildProof (exn_info, _, stop, assign) = task in + let TaskBuildProof (exn_info, _, stop, assign,_) = task in assign(`Comp(build_proof_here exn_info stop)); Pp.feedback (Interface.InProgress ~-1) - | (Sys_error _ | Invalid_argument _ | End_of_file) as e -> raise e + | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> + raise e (* we pass the exception to the external handler *) | MarshalError s -> Printf.eprintf "Fatal marshal error: %s\n" s; flush_all (); exit 1 @@ -855,6 +890,14 @@ end = struct (* {{{ *) (* XXX do something sensible *) done with + | KillRespawn -> + Pp.feedback (Interface.InProgress ~-1); + !kill_pid (); (* FIXME: This does not work on Windows *) + kill_pid := (fun () -> ()); + ignore(Unix.waitpid [] pid); + close_in ic; + close_out oc; + manage_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file when !fallback_to_lazy_if_slave_dies -> kill_pid := (fun () -> ()); @@ -862,7 +905,7 @@ end = struct (* {{{ *) (match !last_task with | Some task -> msg_warning(strbrk "Falling back to local, lazy, evaluation."); - let TaskBuildProof (exn_info, _, stop, assign) = task in + let TaskBuildProof (exn_info, _, stop, assign,_) = task in assign(`Comp(build_proof_here exn_info stop)); Pp.feedback (Interface.InProgress ~-1); | None -> ()); @@ -911,9 +954,17 @@ end = struct (* {{{ *) marshal_response !slave_oc RespGetCounterFreshLocalUniv; match unmarshal_more_data !slave_ic with | MoreDataLocalUniv l -> l | _ -> assert false)); + let working = ref false in + let _tick = Thread.create (fun n -> + while true do + Unix.sleep n; + if !working then marshal_response !slave_oc RespTick + done) 1 in while true do try + working := false; let request = unmarshal_request !slave_ic in + working := true; let response = slave_respond request in marshal_response !slave_oc response; with @@ -1031,19 +1082,19 @@ let known_state ?(redefine_qed=false) ~cache id = VCS.create_cluster nodes ~tip:id; begin match keep, brinfo, qed.fproof with | VtKeep, { VCS.kind = `Edit _ }, None -> assert false - | VtKeep, { VCS.kind = `Edit _ }, Some ofp -> + | VtKeep, { VCS.kind = `Edit _ }, Some (ofp, cancel) -> assert(redefine_qed = true); VCS.create_cluster nodes ~tip:id; - let fp = + let fp, cancel = Slaves.build_proof ~exn_info:(id,eop) ~start ~stop:eop in Future.replace ofp fp; - qed.fproof <- Some fp + qed.fproof <- Some (fp, cancel) | VtKeep, { VCS.kind = `Proof _ }, Some _ -> assert false | VtKeep, { VCS.kind = `Proof _ }, None -> reach ~cache:`Shallow start; - let fp = + let fp, cancel = Slaves.build_proof ~exn_info:(id,eop) ~start ~stop:eop in - qed.fproof <- Some fp; + qed.fproof <- Some (fp, cancel); let proof = Proof_global.close_future_proof fp in reach view.next; vernac_interp id ~proof x @@ -1534,7 +1585,7 @@ let edit_at id = (VCS.reachable (VCS.get_branch_pos (VCS.current_branch ()))) in let has_failed qed_id = match VCS.visit qed_id with - | { step = `Qed ({ fproof = Some fp }, _) } -> Future.is_exn fp + | { step = `Qed ({ fproof = Some (fp,_) }, _) } -> Future.is_exn fp | _ -> false in let rec master_for_br root tip = if Stateid.equal tip Stateid.initial then tip else |
