From cf6b12cb3a88fb3af6a7b3e91d17db8b06d23c81 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Mon, 1 Sep 2014 14:54:49 +0200 Subject: coqworkmgr --- stm/asyncTaskQueue.ml | 23 +++++++-- stm/coqworkmgrApi.ml | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++ stm/coqworkmgrApi.mli | 44 ++++++++++++++++ stm/stm.mllib | 1 + stm/stmworkertop.ml | 1 + stm/tacworkertop.ml | 1 + 6 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 stm/coqworkmgrApi.ml create mode 100644 stm/coqworkmgrApi.mli (limited to 'stm') diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 5de80bbfc2..cb03459cc5 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -142,10 +142,14 @@ module Make(T : Task) = struct let rec set_slave_opt = function | [] -> !Flags.async_proofs_flags_for_workers @ ["-toploop"; T.name^"top"; - "-worker-id"; id] + "-worker-id"; id; + "-async-proofs-worker-priority"; + Flags.string_of_priority !Flags.async_proofs_worker_priority] | ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile" - | "-load-vernac-source" | "-compile-verbose")::_::tl -> set_slave_opt tl + |"-load-vernac-source" |"-compile-verbose" + |"-async-proofs-worker-priority" |"-worker-id") :: _ :: tl -> + set_slave_opt tl | x::tl -> x :: set_slave_opt tl in let args = Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in @@ -155,6 +159,9 @@ module Make(T : Task) = struct let task_expired = ref false in let task_cancelled = ref false in let worker_age = ref `Fresh in + let got_token = ref false in + let giveback_token () = + if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in CThread.prepare_in_channel_for_thread_friendly_io ic; try while not !die do @@ -166,6 +173,8 @@ module Make(T : Task) = struct begin try let req = T.request_of_task !worker_age task in if req = None then raise Expired; + ignore(CoqworkmgrApi.get 1); got_token := true; + prerr_endline ("got execution token"); marshal_request oc (Request (Option.get req)); Worker.kill_if proc ~sec:1 (fun () -> task_expired := !cancel_switch; @@ -177,7 +186,7 @@ module Make(T : Task) = struct match response with | Response resp -> (match T.use_response task resp with - | `Stay -> last_task := None; () + | `Stay -> last_task := None; giveback_token () | `StayReset -> last_task := None; raise KillRespawn) | RespGetCounterNewUnivLevel -> marshal_more_data oc (MoreDataUnivLevel @@ -205,19 +214,25 @@ module Make(T : Task) = struct raise Die with | KillRespawn -> + giveback_token (); Worker.kill proc; ignore(Worker.wait proc); manage_slave ~cancel:cancel_user_req ~die id respawn - | (Die | TQueue.BeingDestroyed) -> Worker.kill proc;ignore(Worker.wait proc) + | (Die | TQueue.BeingDestroyed) -> + giveback_token (); + Worker.kill proc;ignore(Worker.wait proc) | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> + giveback_token (); T.on_task_cancellation_or_expiration !last_task; ignore(Worker.wait proc); manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> + giveback_token (); msg_warning(strbrk "The worker was cancelled."); T.on_task_cancellation_or_expiration !last_task; Worker.kill proc; ignore(Worker.wait proc); manage_slave ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file -> + giveback_token (); match T.on_slave_death !last_task with | `Stay -> msg_warning(strbrk "The worker process died badly."); diff --git a/stm/coqworkmgrApi.ml b/stm/coqworkmgrApi.ml new file mode 100644 index 0000000000..a22fd5427a --- /dev/null +++ b/stm/coqworkmgrApi.ml @@ -0,0 +1,140 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* '\r' then s else String.sub s 0 (len - 1) + +let positive_int_of_string n = + try + let n = int_of_string n in + if n <= 0 then raise ParseError else n + with Invalid_argument _ | Failure _ -> raise ParseError + +let parse_request s = + if debug then Printf.eprintf "parsing '%s'\n" s; + match Str.split (Str.regexp " ") (strip_r s) with + | [ "HELLO"; "LOW" ] -> Hello Flags.Low + | [ "HELLO"; "HIGH" ] -> Hello Flags.High + | [ "GET"; n ] -> Get (positive_int_of_string n) + | [ "TRYGET"; n ] -> TryGet (positive_int_of_string n) + | [ "GIVEBACK"; n ] -> GiveBack (positive_int_of_string n) + | [ "PING" ] -> Ping + | _ -> raise ParseError + +let parse_response s = + if debug then Printf.eprintf "parsing '%s'\n" s; + match Str.split (Str.regexp " ") (strip_r s) with + | [ "TOKENS"; n ] -> Tokens (positive_int_of_string n) + | [ "NOLUCK" ] -> Noluck + | [ "PONG"; n; m; p ] -> + let n = try int_of_string n with _ -> raise ParseError in + let m = try int_of_string m with _ -> raise ParseError in + let p = try int_of_string p with _ -> raise ParseError in + Pong (n,m,p) + | _ -> raise ParseError + +let print_request = function + | Hello Flags.Low -> "HELLO LOW\n" + | Hello Flags.High -> "HELLO HIGH\n" + | Get n -> Printf.sprintf "GET %d\n" n + | TryGet n -> Printf.sprintf "TRYGET %d\n" n + | GiveBack n -> Printf.sprintf "GIVEBACK %d\n" n + | Ping -> "PING\n" + +let print_response = function + | Tokens n -> Printf.sprintf "TOKENS %d\n" n + | Noluck -> "NOLUCK\n" + | Pong (n,m,p) -> Printf.sprintf "PONG %d %d %d\n" n m p + +let connect s = + try + match Str.split (Str.regexp ":") s with + | [ h; p ] -> + let open Unix in + let s = socket PF_INET SOCK_STREAM 0 in + connect s (ADDR_INET (inet_addr_of_string h,int_of_string p)); + Some s + | _ -> None + with Unix.Unix_error _ -> None + +let manager = ref None + +let option_map f = function None -> None | Some x -> Some (f x) + +let init p = + try + let sock = Sys.getenv "COQWORKMGR_SOCK" in + manager := option_map (fun s -> + let cout = Unix.out_channel_of_descr s in + set_binary_mode_out cout true; + let cin = Unix.in_channel_of_descr s in + set_binary_mode_in cin true; + output_string cout (print_request (Hello p)); flush cout; + cin, cout) (connect sock) + with Not_found | End_of_file -> () + +let with_manager f g = + try + match !manager with + | None -> f () + | Some (cin, cout) -> g cin cout + with + | ParseError | End_of_file -> manager := None; f () + +let get n = + with_manager + (fun () -> + min n (min !Flags.async_proofs_n_workers !Flags.async_proofs_n_tacworkers)) + (fun cin cout -> + output_string cout (print_request (Get n)); + flush cout; + let l = input_line cin in + match parse_response l with + | Tokens m -> m + | _ -> raise (Failure "coqworkmgr protocol error")) + +let tryget n = + with_manager + (fun () -> + Some + (min n + (min !Flags.async_proofs_n_workers !Flags.async_proofs_n_tacworkers))) + (fun cin cout -> + output_string cout (print_request (TryGet n)); + flush cout; + let l = input_line cin in + match parse_response l with + | Tokens m -> Some m + | Noluck -> None + | _ -> raise (Failure "coqworkmgr protocol error")) + +let giveback n = + with_manager + (fun () -> ()) + (fun cin cout -> + output_string cout (print_request (GiveBack n)); + flush cout) + diff --git a/stm/coqworkmgrApi.mli b/stm/coqworkmgrApi.mli new file mode 100644 index 0000000000..453029132f --- /dev/null +++ b/stm/coqworkmgrApi.mli @@ -0,0 +1,44 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* unit + +(* blocking *) +val get : int -> int + +(* not blocking *) +val tryget : int -> int option +val giveback : int -> unit + +(* Low level *) +type request = + | Hello of Flags.priority + | Get of int + | TryGet of int + | GiveBack of int + | Ping + +type response = + | Tokens of int + | Noluck + | Pong of int * int * int (* cur, max, pid *) + +val connect : string -> Unix.file_descr option + +exception ParseError + +(* Intended to be used with input_line and output_string *) +val parse_request : string -> request +val parse_response : string -> response + +val print_request : request -> string +val print_response : response -> string diff --git a/stm/stm.mllib b/stm/stm.mllib index 308b2ac4c5..28f097780b 100644 --- a/stm/stm.mllib +++ b/stm/stm.mllib @@ -5,6 +5,7 @@ TQueue WorkerPool Vernac_classifier Lemmas +CoqworkmgrApi AsyncTaskQueue Stm Vi_checking diff --git a/stm/stmworkertop.ml b/stm/stmworkertop.ml index 50afd97ab5..0d1b44e494 100644 --- a/stm/stmworkertop.ml +++ b/stm/stmworkertop.ml @@ -9,6 +9,7 @@ let () = Coqtop.toploop_init := (fun args -> Flags.make_silent true; Stm.slave_init_stdout (); + CoqworkmgrApi.init !Flags.async_proofs_worker_priority; args) let () = Coqtop.toploop_run := Stm.slave_main_loop diff --git a/stm/tacworkertop.ml b/stm/tacworkertop.ml index 8a582a6891..5e3d90c755 100644 --- a/stm/tacworkertop.ml +++ b/stm/tacworkertop.ml @@ -9,6 +9,7 @@ let () = Coqtop.toploop_init := (fun args -> Flags.make_silent true; Stm.tacslave_init_stdout (); + CoqworkmgrApi.init !Flags.async_proofs_worker_priority; args) let () = Coqtop.toploop_run := Stm.tacslave_main_loop -- cgit v1.2.3