aboutsummaryrefslogtreecommitdiff
path: root/stm
diff options
context:
space:
mode:
authorEnrico Tassi2014-09-01 14:54:49 +0200
committerEnrico Tassi2014-09-02 11:29:42 +0200
commitcf6b12cb3a88fb3af6a7b3e91d17db8b06d23c81 (patch)
tree46ec306afd1ebf29b735e7f6679c8e1b8d9c5679 /stm
parent7befcc7ea63ea4bd6e45e6f4b8ec01a69b586cc7 (diff)
coqworkmgr
Diffstat (limited to 'stm')
-rw-r--r--stm/asyncTaskQueue.ml23
-rw-r--r--stm/coqworkmgrApi.ml140
-rw-r--r--stm/coqworkmgrApi.mli44
-rw-r--r--stm/stm.mllib1
-rw-r--r--stm/stmworkertop.ml1
-rw-r--r--stm/tacworkertop.ml1
6 files changed, 206 insertions, 4 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index 5de80bbfc2..cb03459cc5 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -142,10 +142,14 @@ module Make(T : Task) = struct
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
["-toploop"; T.name^"top";
- "-worker-id"; id]
+ "-worker-id"; id;
+ "-async-proofs-worker-priority";
+ Flags.string_of_priority !Flags.async_proofs_worker_priority]
| ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl
| ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
- | "-load-vernac-source" | "-compile-verbose")::_::tl -> set_slave_opt tl
+ |"-load-vernac-source" |"-compile-verbose"
+ |"-async-proofs-worker-priority" |"-worker-id") :: _ :: tl ->
+ set_slave_opt tl
| x::tl -> x :: set_slave_opt tl in
let args =
Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
@@ -155,6 +159,9 @@ module Make(T : Task) = struct
let task_expired = ref false in
let task_cancelled = ref false in
let worker_age = ref `Fresh in
+ let got_token = ref false in
+ let giveback_token () =
+ if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
CThread.prepare_in_channel_for_thread_friendly_io ic;
try
while not !die do
@@ -166,6 +173,8 @@ module Make(T : Task) = struct
begin try
let req = T.request_of_task !worker_age task in
if req = None then raise Expired;
+ ignore(CoqworkmgrApi.get 1); got_token := true;
+ prerr_endline ("got execution token");
marshal_request oc (Request (Option.get req));
Worker.kill_if proc ~sec:1 (fun () ->
task_expired := !cancel_switch;
@@ -177,7 +186,7 @@ module Make(T : Task) = struct
match response with
| Response resp ->
(match T.use_response task resp with
- | `Stay -> last_task := None; ()
+ | `Stay -> last_task := None; giveback_token ()
| `StayReset -> last_task := None; raise KillRespawn)
| RespGetCounterNewUnivLevel ->
marshal_more_data oc (MoreDataUnivLevel
@@ -205,19 +214,25 @@ module Make(T : Task) = struct
raise Die
with
| KillRespawn ->
+ giveback_token ();
Worker.kill proc; ignore(Worker.wait proc);
manage_slave ~cancel:cancel_user_req ~die id respawn
- | (Die | TQueue.BeingDestroyed) -> Worker.kill proc;ignore(Worker.wait proc)
+ | (Die | TQueue.BeingDestroyed) ->
+ giveback_token ();
+ Worker.kill proc;ignore(Worker.wait proc)
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
+ giveback_token ();
T.on_task_cancellation_or_expiration !last_task;
ignore(Worker.wait proc);
manage_slave ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
+ giveback_token ();
msg_warning(strbrk "The worker was cancelled.");
T.on_task_cancellation_or_expiration !last_task;
Worker.kill proc; ignore(Worker.wait proc);
manage_slave ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
+ giveback_token ();
match T.on_slave_death !last_task with
| `Stay ->
msg_warning(strbrk "The worker process died badly.");
diff --git a/stm/coqworkmgrApi.ml b/stm/coqworkmgrApi.ml
new file mode 100644
index 0000000000..a22fd5427a
--- /dev/null
+++ b/stm/coqworkmgrApi.ml
@@ -0,0 +1,140 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+let debug = false
+
+type request =
+ | Hello of Flags.priority
+ | Get of int
+ | TryGet of int
+ | GiveBack of int
+ | Ping
+
+type response =
+ | Tokens of int
+ | Noluck
+ | Pong of int * int * int
+
+exception ParseError
+
+(* make it work with telnet: strip trailing \r *)
+let strip_r s =
+ let len = String.length s in
+ if s.[len - 1] <> '\r' then s else String.sub s 0 (len - 1)
+
+let positive_int_of_string n =
+ try
+ let n = int_of_string n in
+ if n <= 0 then raise ParseError else n
+ with Invalid_argument _ | Failure _ -> raise ParseError
+
+let parse_request s =
+ if debug then Printf.eprintf "parsing '%s'\n" s;
+ match Str.split (Str.regexp " ") (strip_r s) with
+ | [ "HELLO"; "LOW" ] -> Hello Flags.Low
+ | [ "HELLO"; "HIGH" ] -> Hello Flags.High
+ | [ "GET"; n ] -> Get (positive_int_of_string n)
+ | [ "TRYGET"; n ] -> TryGet (positive_int_of_string n)
+ | [ "GIVEBACK"; n ] -> GiveBack (positive_int_of_string n)
+ | [ "PING" ] -> Ping
+ | _ -> raise ParseError
+
+let parse_response s =
+ if debug then Printf.eprintf "parsing '%s'\n" s;
+ match Str.split (Str.regexp " ") (strip_r s) with
+ | [ "TOKENS"; n ] -> Tokens (positive_int_of_string n)
+ | [ "NOLUCK" ] -> Noluck
+ | [ "PONG"; n; m; p ] ->
+ let n = try int_of_string n with _ -> raise ParseError in
+ let m = try int_of_string m with _ -> raise ParseError in
+ let p = try int_of_string p with _ -> raise ParseError in
+ Pong (n,m,p)
+ | _ -> raise ParseError
+
+let print_request = function
+ | Hello Flags.Low -> "HELLO LOW\n"
+ | Hello Flags.High -> "HELLO HIGH\n"
+ | Get n -> Printf.sprintf "GET %d\n" n
+ | TryGet n -> Printf.sprintf "TRYGET %d\n" n
+ | GiveBack n -> Printf.sprintf "GIVEBACK %d\n" n
+ | Ping -> "PING\n"
+
+let print_response = function
+ | Tokens n -> Printf.sprintf "TOKENS %d\n" n
+ | Noluck -> "NOLUCK\n"
+ | Pong (n,m,p) -> Printf.sprintf "PONG %d %d %d\n" n m p
+
+let connect s =
+ try
+ match Str.split (Str.regexp ":") s with
+ | [ h; p ] ->
+ let open Unix in
+ let s = socket PF_INET SOCK_STREAM 0 in
+ connect s (ADDR_INET (inet_addr_of_string h,int_of_string p));
+ Some s
+ | _ -> None
+ with Unix.Unix_error _ -> None
+
+let manager = ref None
+
+let option_map f = function None -> None | Some x -> Some (f x)
+
+let init p =
+ try
+ let sock = Sys.getenv "COQWORKMGR_SOCK" in
+ manager := option_map (fun s ->
+ let cout = Unix.out_channel_of_descr s in
+ set_binary_mode_out cout true;
+ let cin = Unix.in_channel_of_descr s in
+ set_binary_mode_in cin true;
+ output_string cout (print_request (Hello p)); flush cout;
+ cin, cout) (connect sock)
+ with Not_found | End_of_file -> ()
+
+let with_manager f g =
+ try
+ match !manager with
+ | None -> f ()
+ | Some (cin, cout) -> g cin cout
+ with
+ | ParseError | End_of_file -> manager := None; f ()
+
+let get n =
+ with_manager
+ (fun () ->
+ min n (min !Flags.async_proofs_n_workers !Flags.async_proofs_n_tacworkers))
+ (fun cin cout ->
+ output_string cout (print_request (Get n));
+ flush cout;
+ let l = input_line cin in
+ match parse_response l with
+ | Tokens m -> m
+ | _ -> raise (Failure "coqworkmgr protocol error"))
+
+let tryget n =
+ with_manager
+ (fun () ->
+ Some
+ (min n
+ (min !Flags.async_proofs_n_workers !Flags.async_proofs_n_tacworkers)))
+ (fun cin cout ->
+ output_string cout (print_request (TryGet n));
+ flush cout;
+ let l = input_line cin in
+ match parse_response l with
+ | Tokens m -> Some m
+ | Noluck -> None
+ | _ -> raise (Failure "coqworkmgr protocol error"))
+
+let giveback n =
+ with_manager
+ (fun () -> ())
+ (fun cin cout ->
+ output_string cout (print_request (GiveBack n));
+ flush cout)
+
diff --git a/stm/coqworkmgrApi.mli b/stm/coqworkmgrApi.mli
new file mode 100644
index 0000000000..453029132f
--- /dev/null
+++ b/stm/coqworkmgrApi.mli
@@ -0,0 +1,44 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+(* High level api for clients of the service (like coqtop) *)
+
+(* Connects to a work manager if any. If no worker manager, then
+ -async-proofs-j and -async-proofs-tac-j are used *)
+val init : Flags.priority -> unit
+
+(* blocking *)
+val get : int -> int
+
+(* not blocking *)
+val tryget : int -> int option
+val giveback : int -> unit
+
+(* Low level *)
+type request =
+ | Hello of Flags.priority
+ | Get of int
+ | TryGet of int
+ | GiveBack of int
+ | Ping
+
+type response =
+ | Tokens of int
+ | Noluck
+ | Pong of int * int * int (* cur, max, pid *)
+
+val connect : string -> Unix.file_descr option
+
+exception ParseError
+
+(* Intended to be used with input_line and output_string *)
+val parse_request : string -> request
+val parse_response : string -> response
+
+val print_request : request -> string
+val print_response : response -> string
diff --git a/stm/stm.mllib b/stm/stm.mllib
index 308b2ac4c5..28f097780b 100644
--- a/stm/stm.mllib
+++ b/stm/stm.mllib
@@ -5,6 +5,7 @@ TQueue
WorkerPool
Vernac_classifier
Lemmas
+CoqworkmgrApi
AsyncTaskQueue
Stm
Vi_checking
diff --git a/stm/stmworkertop.ml b/stm/stmworkertop.ml
index 50afd97ab5..0d1b44e494 100644
--- a/stm/stmworkertop.ml
+++ b/stm/stmworkertop.ml
@@ -9,6 +9,7 @@
let () = Coqtop.toploop_init := (fun args ->
Flags.make_silent true;
Stm.slave_init_stdout ();
+ CoqworkmgrApi.init !Flags.async_proofs_worker_priority;
args)
let () = Coqtop.toploop_run := Stm.slave_main_loop
diff --git a/stm/tacworkertop.ml b/stm/tacworkertop.ml
index 8a582a6891..5e3d90c755 100644
--- a/stm/tacworkertop.ml
+++ b/stm/tacworkertop.ml
@@ -9,6 +9,7 @@
let () = Coqtop.toploop_init := (fun args ->
Flags.make_silent true;
Stm.tacslave_init_stdout ();
+ CoqworkmgrApi.init !Flags.async_proofs_worker_priority;
args)
let () = Coqtop.toploop_run := Stm.tacslave_main_loop