From c9b1caaa5516d616e400faa7a7c0278c8677c51c Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Thu, 13 Mar 2014 15:41:44 +0100 Subject: STM: move out a couple of submodules These modules are not as reusable as one may want them to be, but moving them out simplifies a little STM. --- lib/lib.mllib | 2 ++ lib/tQueue.ml | 64 +++++++++++++++++++++++++++++++++++++++++++++++ lib/tQueue.mli | 17 +++++++++++++ lib/workerPool.ml | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/workerPool.mli | 30 ++++++++++++++++++++++ 5 files changed, 186 insertions(+) create mode 100644 lib/tQueue.ml create mode 100644 lib/tQueue.mli create mode 100644 lib/workerPool.ml create mode 100644 lib/workerPool.mli (limited to 'lib') diff --git a/lib/lib.mllib b/lib/lib.mllib index 9ba1e15b9b..50621df20d 100644 --- a/lib/lib.mllib +++ b/lib/lib.mllib @@ -25,3 +25,5 @@ Future RemoteCounter Dag Vcs +TQueue +WorkerPool diff --git a/lib/tQueue.ml b/lib/tQueue.ml new file mode 100644 index 0000000000..783c545fd0 --- /dev/null +++ b/lib/tQueue.ml @@ -0,0 +1,64 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* Queue.push x tq.queue) (List.sort rel !l); + Mutex.unlock tq.lock diff --git a/lib/tQueue.mli b/lib/tQueue.mli new file mode 100644 index 0000000000..a3ea5532fc --- /dev/null +++ b/lib/tQueue.mli @@ -0,0 +1,17 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* 'a t +val pop : 'a t -> 'a +val push : 'a t -> 'a -> unit +val reorder : 'a t -> ('a -> 'a -> int) -> unit +val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit +val dump : 'a t -> 'a list diff --git a/lib/workerPool.ml b/lib/workerPool.ml new file mode 100644 index 0000000000..fcae4f20d6 --- /dev/null +++ b/lib/workerPool.ml @@ -0,0 +1,73 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* ?env:string array -> string -> string array -> + process * in_channel * out_channel +end) = struct + +type worker_id = int +type spawn = + args:string array -> env:string array -> unit -> + in_channel * out_channel * Worker.process + +let slave_managers = ref None + +let n_workers () = match !slave_managers with + | None -> 0 + | Some managers -> Array.length managers +let is_empty () = !slave_managers = None + +let magic_no = 17 + +let master_handshake worker_id ic oc = + try + Marshal.to_channel oc magic_no []; flush oc; + let n = (Marshal.from_channel ic : int) in + if n <> magic_no then begin + Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id; + exit 1; + end + with e when Errors.noncritical e -> + Printf.eprintf "Handshake with %d failed: %s\n" + worker_id (Printexc.to_string e); + exit 1 + +let respawn n ~args ~env () = + let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in + master_handshake n ic oc; + ic, oc, proc + +let init ~size:n ~manager:manage_slave = + slave_managers := Some + (Array.init n (fun x -> + let cancel = ref false in + cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1)))) + +let cancel n = + match !slave_managers with + | None -> () + | Some a -> + let switch, _ = a.(n) in + switch := true + +let worker_handshake slave_ic slave_oc = + try + let v = (Marshal.from_channel slave_ic : int) in + if v <> magic_no then begin + prerr_endline "Handshake failed: protocol mismatch\n"; + exit 1; + end; + Marshal.to_channel slave_oc v []; flush slave_oc; + with e when Errors.noncritical e -> + prerr_endline ("Handshake failed: " ^ Printexc.to_string e); + exit 1 + +end diff --git a/lib/workerPool.mli b/lib/workerPool.mli new file mode 100644 index 0000000000..d7a546929f --- /dev/null +++ b/lib/workerPool.mli @@ -0,0 +1,30 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* ?env:string array -> string -> string array -> + process * in_channel * out_channel +end) : sig + +type worker_id = int +type spawn = + args:string array -> env:string array -> unit -> + in_channel * out_channel * Worker.process + +val init : + size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit +val is_empty : unit -> bool +val n_workers : unit -> int +val cancel : worker_id -> unit + +(* The worker should call this function *) +val worker_handshake : in_channel -> out_channel -> unit + +end -- cgit v1.2.3