diff options
| author | Enrico Tassi | 2014-01-22 10:23:14 +0100 |
|---|---|---|
| committer | Enrico Tassi | 2014-01-26 14:20:52 +0100 |
| commit | ea17a2a371d0d791f439e0a4c6610819ecb6f9b6 (patch) | |
| tree | e7a91131f773efedbbcdb79d035b6c6d443b1e55 /lib/spawned.ml | |
| parent | 26b6134c3cd333d7fc78c665be5fd1394a546395 (diff) | |
Spawn: managed processes
The Spawn and Spawned modules factor the operation of spawning
a process. Both synchronous and asynchronous channels are supported.
Both threaded and glib like main loop models are supported. Still,
not all combinations are truly tested not equipped with a decent API:
only async + glib and sync + thread are, since these are the models we
use for coqide<->coqtop and coqtop<->worker respectively.
Diffstat (limited to 'lib/spawned.ml')
| -rw-r--r-- | lib/spawned.ml | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/lib/spawned.ml b/lib/spawned.ml new file mode 100644 index 0000000000..29cf517690 --- /dev/null +++ b/lib/spawned.ml @@ -0,0 +1,107 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +open Spawn + +let pr_err s = Printf.eprintf "(Spawned,%d) %s\n%!" (Unix.getpid ()) s +let prerr_endline s = if !Flags.debug then begin pr_err s end else () + +type chandescr = AnonPipe | Socket of string * int + +let handshake cin cout = + try + match input_value cin with + | Hello(v, pid) when v = proto_version -> + prerr_endline (Printf.sprintf "Handshake with %d OK" pid); + output_value cout (Hello (proto_version,Unix.getpid ())); flush cout + | _ -> raise (Failure "handshake protocol") + with + | Failure s | Invalid_argument s | Sys_error s -> + pr_err ("Handshake failed: " ^ s); raise (Failure "handshake") + | End_of_file -> + pr_err "Handshake failed: End_of_file"; raise (Failure "handshake") + +let open_bin_connection h p = + let open Unix in + let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in + set_binary_mode_in cin true; + set_binary_mode_out cout true; + handshake cin cout; + cin, cout + +let controller h p = + prerr_endline "starting controller thread"; + let main () = + let ic, oc = open_bin_connection h p in + let rec loop () = + try + match input_value ic with + | Hello _ -> prerr_endline "internal protocol error"; exit 1 + | ReqDie -> prerr_endline "death sentence received"; exit 0 + | ReqStats -> + output_value oc (RespStats (Gc.stat ())); flush oc; loop () + with + | e -> + prerr_endline ("control channel broken: " ^ Printexc.to_string e); + exit 1 in + loop () in + ignore(Thread.create main ()) + +let main_channel = ref None +let control_channel = ref None + +let channels = ref None + +let init_channels () = + if !channels <> None then Errors.anomaly(Pp.str "init_channels called twice"); + match !main_channel, !control_channel with + | None, None -> () + | None, Some _ | Some _, None -> + Errors.anomaly (Pp.str "incomplete channels options") + | _, Some AnonPipe -> + Errors.anomaly (Pp.str "control channel cannot be a pipe") + | Some (Socket(mh,mp)), Some (Socket(ch,cp)) -> + channels := Some (open_bin_connection mh mp); + controller ch cp + | Some AnonPipe, Some (Socket (ch,cp)) -> + let stdin = Unix.in_channel_of_descr (Unix.dup Unix.stdin) in + let stdout = Unix.out_channel_of_descr (Unix.dup Unix.stdout) in + Unix.dup2 Unix.stderr Unix.stdout; + set_binary_mode_in stdin true; + set_binary_mode_out stdout true; + channels := Some (stdin, stdout); + handshake stdin stdout; + controller ch cp + +let get_channels () = + match !channels with + | None -> Errors.anomaly(Pp.str "init_channels not called") + | Some(ic, oc) -> ic, oc + +let prepare_in_channel_for_thread_friendly_blocking_input ic = + if Sys.os_type = "Win32" then Unix.set_nonblock (Unix.descr_of_in_channel ic) + else () + +let thread_friendly_blocking_input ic = + if Sys.os_type = "Win32" then + let open Unix in + let open Thread in + let fd = descr_of_in_channel ic in + let rec loop buf n = + try read fd buf 0 n + with + | Unix.Unix_error((Unix.EWOULDBLOCK|Unix.EAGAIN),_,_) -> + (* We wait for some data explicitly yielding each second *) + while not (wait_timed_read fd 1.0) do yield () done; + loop buf n + | Unix.Unix_error _ -> 0 + in + loop + else + (fun buf n -> Pervasives.input ic buf 0 n) + |
