aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgareuselesinge2013-10-11 09:10:53 +0000
committergareuselesinge2013-10-11 09:10:53 +0000
commit2af8edadfdd87c4cea63ce7d386f0304631977e9 (patch)
tree98411b2ebf318158bd7dac126ba7679aeb1d824c
parent0f9a7d13714f30a3c1eeee41b6a500370e5c18bb (diff)
STM: cancel slaves working on outdated jobs
I did not manage to make the slave manager use Unix.select to wait for a response from the slave for a limited time and check for cancellation. Hence the following semi-cooperative model: - The slave has a thread that sends a Tick every second when the thread is working - The slave_manager will then be unblocked periodically by this tick and check for cancellation - Cancellation is, for the moment, implemented using kill. To kill a process on windows one could bind TerminateProcess (>= WinXP) or RegisterWindowMessage + BroadcastSystemMessage (>= Win2k). See: http://msdn.microsoft.com/en-us/library/windows/desktop/ms686722%28v=vs.85%29.aspx Another option is to make the slave_manager send to the tick thread on the slave process a boolean answer to the Tick message, and the tick thread could eventually bail out. But to do that, it is way better to have a second channel, used only by this tick thread. This solution sound very like the one proposed for windows, but requires more work. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/coq/trunk@16878 85f007b7-540e-0410-9357-904b9bb8a0f7
-rw-r--r--toplevel/stm.ml115
1 files changed, 83 insertions, 32 deletions
diff --git a/toplevel/stm.ml b/toplevel/stm.ml
index 3457afff16..838497da25 100644
--- a/toplevel/stm.ml
+++ b/toplevel/stm.ml
@@ -58,6 +58,7 @@ module Vcs_ = Vcs.Make(Stateid)
type future_proof = Entries.proof_output list Future.computation
type proof_mode = string
type depth = int
+type cancel_switch = bool ref
type branch_type =
[ `Master
| `Proof of proof_mode * depth
@@ -67,7 +68,7 @@ type fork_t = ast * Vcs_.Branch.t * Id.t list
type qed_t = {
qast : ast;
keep : vernac_qed_type;
- mutable fproof : future_proof option;
+ mutable fproof : (future_proof * cancel_switch) option;
brname : Vcs_.Branch.t;
brinfo : branch_type Vcs_.branch_info
}
@@ -423,7 +424,15 @@ end = struct (* {{{ *)
let delete_cluster_of id =
Option.iter (fun x -> vcs := delete_cluster !vcs x) (Vcs_.cluster_of !vcs id)
- let gc () = vcs := gc !vcs
+ let gc () =
+ let old_vcs = !vcs in
+ let new_vcs, erased_nodes = gc old_vcs in
+ Vcs_.NodeSet.iter (fun id ->
+ match (Vcs_aux.visit old_vcs id).step with
+ | `Qed ({ fproof = Some (_, cancel_switch) }, _) -> cancel_switch := true
+ | _ -> ())
+ erased_nodes;
+ vcs := new_vcs
module NB : sig (* Non blocking Sys.command *)
@@ -577,7 +586,7 @@ module Slaves : sig
(* (eventually) remote calls *)
val build_proof :
exn_info:(Stateid.t * Stateid.t) -> start:Stateid.t -> stop:Stateid.t ->
- future_proof
+ future_proof * cancel_switch
(* blocking function that waits for the task queue to be empty *)
val wait_all_done : unit -> unit
@@ -696,6 +705,7 @@ end = struct (* {{{ *)
| RespFeedback of Interface.feedback
| RespGetCounterFreshLocalUniv
| RespGetCounterNewUnivLevel
+ | RespTick
let pr_response = function
| RespBuiltProof _ -> "Sucess"
| RespError _ -> "Error"
@@ -704,6 +714,7 @@ end = struct (* {{{ *)
| RespFeedback _ -> assert false
| RespGetCounterFreshLocalUniv -> "GetCounterFreshLocalUniv"
| RespGetCounterNewUnivLevel -> "GetCounterNewUnivLevel"
+ | RespTick -> "Tick"
type more_data =
| MoreDataLocalUniv of Univ.universe list
@@ -711,17 +722,20 @@ end = struct (* {{{ *)
type task =
| TaskBuildProof of (Stateid.t * Stateid.t) * Stateid.t * Stateid.t *
- (Entries.proof_output list Future.assignement -> unit)
+ (Entries.proof_output list Future.assignement -> unit) * cancel_switch
let pr_task = function
- | TaskBuildProof(_,bop,eop,_) ->
+ | TaskBuildProof(_,bop,eop,_,_) ->
"TaskBuilProof("^Stateid.to_string bop^","^Stateid.to_string eop^")"
let request_of_task task =
match task with
- | TaskBuildProof (exn_info,bop,eop,_) ->
+ | TaskBuildProof (exn_info,bop,eop,_,_) ->
ReqBuildProof(exn_info,eop,
VCS.slice ~start:eop ~stop:bop ~purify:State.make_shallow)
+ let cancel_switch_of_task = function
+ | TaskBuildProof (_,_,_,_,c) -> c
+
let build_proof_here (id,valid) eop =
Future.create (fun () ->
!reach_known_state ~cache:`No eop;
@@ -754,29 +768,39 @@ end = struct (* {{{ *)
Marshal.to_channel oc data [];
flush oc
+ let marshal_err s = raise (MarshalError s)
+
let marshal_request oc (req : request) =
try marshal_to_channel oc req
- with Invalid_argument s -> raise (MarshalError ("marshal_request: "^s))
+ with Failure s | Invalid_argument s -> marshal_err ("marshal_request: "^s)
let unmarshal_request ic =
try (Marshal.from_channel ic : request)
- with Invalid_argument s -> raise (MarshalError ("unmarshal_request: "^s))
-
- let marshal_response oc (res : response) =
- try marshal_to_channel oc res
- with Invalid_argument s -> raise (MarshalError ("marshal_response: "^s))
+ with Failure s | Invalid_argument s -> marshal_err ("unmarshal_request: "^s)
+
+ (* Since cancelling is still cooperative, the slave runs a thread that
+ periodically sends a RespTick message on the same channel used by the
+ main slave thread to send back feedbacks and responses. We need mutual
+ exclusion. *)
+ let marshal_response =
+ let m = Mutex.create () in
+ fun oc (res : response) ->
+ Mutex.lock m;
+ try marshal_to_channel oc res; Mutex.unlock m
+ with Failure s | Invalid_argument s ->
+ Mutex.unlock m; marshal_err ("marshal_response: "^s)
let unmarshal_response ic =
try (Marshal.from_channel ic : response)
- with Invalid_argument s -> raise (MarshalError ("unmarshal_response: "^s))
+ with Failure s | Invalid_argument s -> marshal_err ("unmarshal_response: "^s)
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
- with Invalid_argument s -> raise (MarshalError ("marshal_more_data: "^s))
+ with Failure s | Invalid_argument s -> marshal_err ("marshal_more_data: "^s)
let unmarshal_more_data ic =
try (Marshal.from_channel ic : more_data)
- with Invalid_argument s -> raise (MarshalError ("unmarshal_more_data: "^s))
+ with Failure s | Invalid_argument s -> marshal_err ("unmarshal_more_data: "^s)
let queue : task TQueue.t = TQueue.create ()
@@ -785,19 +809,23 @@ end = struct (* {{{ *)
(SlavesPool.n_slaves ()) queue
let build_proof ~exn_info ~start ~stop =
+ let cancel_switch = ref false in
if SlavesPool.is_empty () then
- build_proof_here exn_info stop
+ build_proof_here exn_info stop, cancel_switch
else
let f, assign = Future.create_delegate () in
Pp.feedback (Interface.InProgress 1);
- TQueue.push queue (TaskBuildProof(exn_info,start,stop,assign));
- f
+ TQueue.push queue
+ (TaskBuildProof(exn_info,start,stop,assign,cancel_switch));
+ f, cancel_switch
exception RemoteException of std_ppcmds
let _ = Errors.register_handler (function
| RemoteException ppcmd -> ppcmd
| _ -> raise Unhandled)
+ exception KillRespawn
+
let rec manage_slave respawn =
let ic, oc, pid = respawn () in
let kill_pid =
@@ -806,31 +834,37 @@ end = struct (* {{{ *)
let last_task = ref None in
try
while true do
+ prerr_endline "waiting for a task";
let task = TQueue.pop queue in
+ prerr_endline ("got task: "^pr_task task);
last_task := Some task;
+ let cancel_switch = cancel_switch_of_task task in
try
marshal_request oc (request_of_task task);
let rec loop () =
let response = unmarshal_response ic in
match task, response with
- | TaskBuildProof(_,_,_, assign), RespBuiltProof pl ->
+ | TaskBuildProof(_,_,_, assign,_), RespBuiltProof pl ->
assign (`Val pl);
Pp.feedback (Interface.InProgress ~-1)
- | TaskBuildProof(_,_,_, assign), RespError (err_id,valid,e) ->
+ | TaskBuildProof(_,_,_, assign,_), RespError (err_id,valid,e) ->
let e = Stateid.add ~valid (RemoteException e) err_id in
assign (`Exn e);
Pp.feedback (Interface.InProgress ~-1)
| _, RespGetCounterFreshLocalUniv ->
marshal_more_data oc (MoreDataLocalUniv
(CList.init 10 (fun _ -> Univ.fresh_local_univ ())));
- loop ()
+ if !cancel_switch then raise KillRespawn else loop ()
| _, RespGetCounterNewUnivLevel ->
+ prerr_endline "-> MoreDataUnivLevel";
marshal_more_data oc (MoreDataUnivLevel
(CList.init 10 (fun _ -> Termops.new_univ_level ())));
- loop ()
+ if !cancel_switch then raise KillRespawn else loop ()
| _, RespFeedback {id = State state_id; content = msg} ->
Pp.feedback ~state_id msg;
- loop ()
+ if !cancel_switch then raise KillRespawn else loop ()
+ | _, RespTick ->
+ if !cancel_switch then raise KillRespawn else loop ()
| _, RespFeedback _ -> assert false (* Parsing in master process *)
in
loop (); last_task := None
@@ -842,10 +876,11 @@ end = struct (* {{{ *)
msg_warning(strbrk("Marshalling error: "^s^". "^
"The system state could not be sent to the slave process. "^
"Falling back to local, lazy, evaluation."));
- let TaskBuildProof (exn_info, _, stop, assign) = task in
+ let TaskBuildProof (exn_info, _, stop, assign,_) = task in
assign(`Comp(build_proof_here exn_info stop));
Pp.feedback (Interface.InProgress ~-1)
- | (Sys_error _ | Invalid_argument _ | End_of_file) as e -> raise e
+ | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ raise e (* we pass the exception to the external handler *)
| MarshalError s ->
Printf.eprintf "Fatal marshal error: %s\n" s;
flush_all (); exit 1
@@ -855,6 +890,14 @@ end = struct (* {{{ *)
(* XXX do something sensible *)
done
with
+ | KillRespawn ->
+ Pp.feedback (Interface.InProgress ~-1);
+ !kill_pid (); (* FIXME: This does not work on Windows *)
+ kill_pid := (fun () -> ());
+ ignore(Unix.waitpid [] pid);
+ close_in ic;
+ close_out oc;
+ manage_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file
when !fallback_to_lazy_if_slave_dies ->
kill_pid := (fun () -> ());
@@ -862,7 +905,7 @@ end = struct (* {{{ *)
(match !last_task with
| Some task ->
msg_warning(strbrk "Falling back to local, lazy, evaluation.");
- let TaskBuildProof (exn_info, _, stop, assign) = task in
+ let TaskBuildProof (exn_info, _, stop, assign,_) = task in
assign(`Comp(build_proof_here exn_info stop));
Pp.feedback (Interface.InProgress ~-1);
| None -> ());
@@ -911,9 +954,17 @@ end = struct (* {{{ *)
marshal_response !slave_oc RespGetCounterFreshLocalUniv;
match unmarshal_more_data !slave_ic with
| MoreDataLocalUniv l -> l | _ -> assert false));
+ let working = ref false in
+ let _tick = Thread.create (fun n ->
+ while true do
+ Unix.sleep n;
+ if !working then marshal_response !slave_oc RespTick
+ done) 1 in
while true do
try
+ working := false;
let request = unmarshal_request !slave_ic in
+ working := true;
let response = slave_respond request in
marshal_response !slave_oc response;
with
@@ -1031,19 +1082,19 @@ let known_state ?(redefine_qed=false) ~cache id =
VCS.create_cluster nodes ~tip:id;
begin match keep, brinfo, qed.fproof with
| VtKeep, { VCS.kind = `Edit _ }, None -> assert false
- | VtKeep, { VCS.kind = `Edit _ }, Some ofp ->
+ | VtKeep, { VCS.kind = `Edit _ }, Some (ofp, cancel) ->
assert(redefine_qed = true);
VCS.create_cluster nodes ~tip:id;
- let fp =
+ let fp, cancel =
Slaves.build_proof ~exn_info:(id,eop) ~start ~stop:eop in
Future.replace ofp fp;
- qed.fproof <- Some fp
+ qed.fproof <- Some (fp, cancel)
| VtKeep, { VCS.kind = `Proof _ }, Some _ -> assert false
| VtKeep, { VCS.kind = `Proof _ }, None ->
reach ~cache:`Shallow start;
- let fp =
+ let fp, cancel =
Slaves.build_proof ~exn_info:(id,eop) ~start ~stop:eop in
- qed.fproof <- Some fp;
+ qed.fproof <- Some (fp, cancel);
let proof = Proof_global.close_future_proof fp in
reach view.next;
vernac_interp id ~proof x
@@ -1534,7 +1585,7 @@ let edit_at id =
(VCS.reachable (VCS.get_branch_pos (VCS.current_branch ()))) in
let has_failed qed_id =
match VCS.visit qed_id with
- | { step = `Qed ({ fproof = Some fp }, _) } -> Future.is_exn fp
+ | { step = `Qed ({ fproof = Some (fp,_) }, _) } -> Future.is_exn fp
| _ -> false in
let rec master_for_br root tip =
if Stateid.equal tip Stateid.initial then tip else