aboutsummaryrefslogtreecommitdiff
path: root/lib/spawned.ml
diff options
context:
space:
mode:
authorEnrico Tassi2014-01-22 10:23:14 +0100
committerEnrico Tassi2014-01-26 14:20:52 +0100
commitea17a2a371d0d791f439e0a4c6610819ecb6f9b6 (patch)
treee7a91131f773efedbbcdb79d035b6c6d443b1e55 /lib/spawned.ml
parent26b6134c3cd333d7fc78c665be5fd1394a546395 (diff)
Spawn: managed processes
The Spawn and Spawned modules factor the operation of spawning a process. Both synchronous and asynchronous channels are supported. Both threaded and glib like main loop models are supported. Still, not all combinations are truly tested not equipped with a decent API: only async + glib and sync + thread are, since these are the models we use for coqide<->coqtop and coqtop<->worker respectively.
Diffstat (limited to 'lib/spawned.ml')
-rw-r--r--lib/spawned.ml107
1 files changed, 107 insertions, 0 deletions
diff --git a/lib/spawned.ml b/lib/spawned.ml
new file mode 100644
index 0000000000..29cf517690
--- /dev/null
+++ b/lib/spawned.ml
@@ -0,0 +1,107 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open Spawn
+
+let pr_err s = Printf.eprintf "(Spawned,%d) %s\n%!" (Unix.getpid ()) s
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+type chandescr = AnonPipe | Socket of string * int
+
+let handshake cin cout =
+ try
+ match input_value cin with
+ | Hello(v, pid) when v = proto_version ->
+ prerr_endline (Printf.sprintf "Handshake with %d OK" pid);
+ output_value cout (Hello (proto_version,Unix.getpid ())); flush cout
+ | _ -> raise (Failure "handshake protocol")
+ with
+ | Failure s | Invalid_argument s | Sys_error s ->
+ pr_err ("Handshake failed: " ^ s); raise (Failure "handshake")
+ | End_of_file ->
+ pr_err "Handshake failed: End_of_file"; raise (Failure "handshake")
+
+let open_bin_connection h p =
+ let open Unix in
+ let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in
+ set_binary_mode_in cin true;
+ set_binary_mode_out cout true;
+ handshake cin cout;
+ cin, cout
+
+let controller h p =
+ prerr_endline "starting controller thread";
+ let main () =
+ let ic, oc = open_bin_connection h p in
+ let rec loop () =
+ try
+ match input_value ic with
+ | Hello _ -> prerr_endline "internal protocol error"; exit 1
+ | ReqDie -> prerr_endline "death sentence received"; exit 0
+ | ReqStats ->
+ output_value oc (RespStats (Gc.stat ())); flush oc; loop ()
+ with
+ | e ->
+ prerr_endline ("control channel broken: " ^ Printexc.to_string e);
+ exit 1 in
+ loop () in
+ ignore(Thread.create main ())
+
+let main_channel = ref None
+let control_channel = ref None
+
+let channels = ref None
+
+let init_channels () =
+ if !channels <> None then Errors.anomaly(Pp.str "init_channels called twice");
+ match !main_channel, !control_channel with
+ | None, None -> ()
+ | None, Some _ | Some _, None ->
+ Errors.anomaly (Pp.str "incomplete channels options")
+ | _, Some AnonPipe ->
+ Errors.anomaly (Pp.str "control channel cannot be a pipe")
+ | Some (Socket(mh,mp)), Some (Socket(ch,cp)) ->
+ channels := Some (open_bin_connection mh mp);
+ controller ch cp
+ | Some AnonPipe, Some (Socket (ch,cp)) ->
+ let stdin = Unix.in_channel_of_descr (Unix.dup Unix.stdin) in
+ let stdout = Unix.out_channel_of_descr (Unix.dup Unix.stdout) in
+ Unix.dup2 Unix.stderr Unix.stdout;
+ set_binary_mode_in stdin true;
+ set_binary_mode_out stdout true;
+ channels := Some (stdin, stdout);
+ handshake stdin stdout;
+ controller ch cp
+
+let get_channels () =
+ match !channels with
+ | None -> Errors.anomaly(Pp.str "init_channels not called")
+ | Some(ic, oc) -> ic, oc
+
+let prepare_in_channel_for_thread_friendly_blocking_input ic =
+ if Sys.os_type = "Win32" then Unix.set_nonblock (Unix.descr_of_in_channel ic)
+ else ()
+
+let thread_friendly_blocking_input ic =
+ if Sys.os_type = "Win32" then
+ let open Unix in
+ let open Thread in
+ let fd = descr_of_in_channel ic in
+ let rec loop buf n =
+ try read fd buf 0 n
+ with
+ | Unix.Unix_error((Unix.EWOULDBLOCK|Unix.EAGAIN),_,_) ->
+ (* We wait for some data explicitly yielding each second *)
+ while not (wait_timed_read fd 1.0) do yield () done;
+ loop buf n
+ | Unix.Unix_error _ -> 0
+ in
+ loop
+ else
+ (fun buf n -> Pervasives.input ic buf 0 n)
+