diff options
Diffstat (limited to 'stm')
| -rw-r--r-- | stm/asyncTaskQueue.ml | 31 | ||||
| -rw-r--r-- | stm/stm.ml | 39 | ||||
| -rw-r--r-- | stm/tQueue.ml | 101 | ||||
| -rw-r--r-- | stm/workerPool.ml | 7 |
4 files changed, 63 insertions, 115 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index dd80ff21aa..a9f203014f 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -56,12 +56,8 @@ module Make(T : Task) () = struct type response = | Response of T.response | RespFeedback of Feedback.feedback - | RespGetCounterNewUnivLevel type request = Request of T.request - type more_data = - | MoreDataUnivLevel of UnivGen.univ_unique_id list - let slave_respond (Request r) = let res = T.perform r in Response res @@ -94,16 +90,6 @@ module Make(T : Task) () = struct with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) - let marshal_more_data oc (res : more_data) = - try marshal_to_channel oc res - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("marshal_more_data: "^s) - - let unmarshal_more_data ic = - try (CThread.thread_friendly_input_value ic : more_data) - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("unmarshal_more_data: "^s) - let report_status ?(id = !Flags.async_proofs_worker_id) s = let open Feedback in feedback ~id:Stateid.initial (WorkerStatus(id, s)) @@ -198,8 +184,6 @@ module Make(T : Task) () = struct | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in - let more_univs n = - CList.init n (fun _ -> UnivGen.new_univ_id ()) in let rec kill_if () = if not (Worker.is_alive proc) then () @@ -231,9 +215,6 @@ module Make(T : Task) () = struct marshal_request oc (Request req); let rec continue () = match unmarshal_response ic with - | RespGetCounterNewUnivLevel -> - marshal_more_data oc (MoreDataUnivLevel (more_univs 10)); - continue () | RespFeedback fbk -> T.forward_feedback fbk; continue () | Response resp -> match T.use_response !worker_age task resp with @@ -315,13 +296,6 @@ module Make(T : Task) () = struct let ic, oc = Spawned.get_channels () in slave_oc := Some oc; slave_ic := Some ic - let bufferize f = - let l = ref [] in - fun () -> - match !l with - | [] -> let data = f () in l := List.tl data; List.hd data - | x::tl -> l := tl; x - let slave_handshake () = Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc) @@ -339,11 +313,6 @@ module Make(T : Task) () = struct Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc) () in ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x)); - (* We ask master to allocate universe identifiers *) - UnivGen.set_remote_new_univ_id (bufferize @@ Control.protect_sigalrm (fun () -> - marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel; - match unmarshal_more_data (Option.get !slave_ic) with - | MoreDataUnivLevel l -> l)); let working = ref false in slave_handshake (); while true do diff --git a/stm/stm.ml b/stm/stm.ml index 9480bbdc2e..6287943cee 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -98,8 +98,7 @@ let forward_feedback, forward_feedback_hook = let m = Mutex.create () in Hook.make ~default:(function | { doc_id = did; span_id = id; route; contents } -> - try Mutex.lock m; feedback ~did ~id ~route contents; Mutex.unlock m - with e -> Mutex.unlock m; raise e) () + CThread.with_lock m ~scope:(fun () -> feedback ~did ~id ~route contents)) () let unreachable_state, unreachable_state_hook = Hook.make ~default:(fun ~doc:_ _ _ -> ()) () @@ -758,17 +757,16 @@ end = struct (* {{{ *) let worker = ref None let set_last_job j = - Mutex.lock m; - job := Some j; - Condition.signal c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + job := Some j; + Condition.signal c) let get_last_job () = - Mutex.lock m; - while Option.is_empty !job do Condition.wait c m; done; - match !job with - | None -> assert false - | Some x -> job := None; Mutex.unlock m; x + CThread.with_lock m ~scope:(fun () -> + while Option.is_empty !job do Condition.wait c m; done; + match !job with + | None -> assert false + | Some x -> job := None; x) let run_command () = try while true do get_last_job () () done @@ -2449,24 +2447,21 @@ let join ~doc = VCS.print (); doc -let dump_snapshot () = Slaves.dump_snapshot (), RemoteCounter.snapshot () - -type tasks = int Slaves.tasks * RemoteCounter.remote_counters_status -let check_task name (tasks,rcbackup) i = - RemoteCounter.restore rcbackup; +type tasks = int Slaves.tasks +let check_task name tasks i = let vcs = VCS.backup () in try let rc = State.purify (Slaves.check_task name tasks) i in VCS.restore vcs; rc with e when CErrors.noncritical e -> VCS.restore vcs; false -let info_tasks (tasks,_) = Slaves.info_tasks tasks -let finish_tasks name u p (t,rcbackup as tasks) = - RemoteCounter.restore rcbackup; +let info_tasks = Slaves.info_tasks + +let finish_tasks name u p tasks = let finish_task u (_,_,i) = let vcs = VCS.backup () in - let u = State.purify (Slaves.finish_task name u p t) i in + let u = State.purify (Slaves.finish_task name u p tasks) i in VCS.restore vcs; u in try @@ -2517,13 +2512,13 @@ let snapshot_vio ~create_vos ~doc ~output_native_objects ldir long_f_dot_vo = CErrors.user_err ~hdr:"stm" (str"Cannot dump a vio with open proofs"); (* LATER: when create_vos is true, it could be more efficient to not allocate the futures; but for now it seems useful for synchronization of the workers, below, [snapshot] gets computed even if [create_vos] is true. *) - let (tasks,counters) = dump_snapshot() in + let tasks = Slaves.dump_snapshot() in let except = List.fold_left (fun e (r,_) -> Future.UUIDSet.add r.Stateid.uuid e) Future.UUIDSet.empty tasks in let todo_proofs = if create_vos then Library.ProofsTodoSomeEmpty except - else Library.ProofsTodoSome (except,tasks,counters) + else Library.ProofsTodoSome (except,tasks) in Library.save_library_to todo_proofs ~output_native_objects ldir long_f_dot_vo (Global.opaque_tables ()); doc diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 2aaca85582..f5bd726dde 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -68,61 +68,54 @@ let create () = { let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) tq = let { queue = q; lock = m; cond = c; cond_waiting = cn } = tq in - Mutex.lock m; - if tq.release then (Mutex.unlock m; raise BeingDestroyed); - while not (PriorityQueue.exists picky q || !destroy) do - tq.nwaiting <- tq.nwaiting + 1; - Condition.broadcast cn; - Condition.wait c m; - tq.nwaiting <- tq.nwaiting - 1; - if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed) - done; - if !destroy then (Mutex.unlock m; raise BeingDestroyed); - let x = PriorityQueue.pop ~picky q in - Condition.signal c; - Condition.signal cn; - Mutex.unlock m; - x + CThread.with_lock m ~scope:(fun () -> + if tq.release then raise BeingDestroyed; + while not (PriorityQueue.exists picky q || !destroy) do + tq.nwaiting <- tq.nwaiting + 1; + Condition.broadcast cn; + Condition.wait c m; + tq.nwaiting <- tq.nwaiting - 1; + if tq.release || !destroy then raise BeingDestroyed + done; + if !destroy then raise BeingDestroyed; + let x = PriorityQueue.pop ~picky q in + Condition.signal c; + Condition.signal cn; + x) let broadcast tq = let { lock = m; cond = c } = tq in - Mutex.lock m; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + Condition.broadcast c) let push tq x = let { queue = q; lock = m; cond = c; release } = tq in if release then CErrors.anomaly(Pp.str "TQueue.push while being destroyed! Only 1 producer/destroyer allowed."); - Mutex.lock m; - PriorityQueue.push q x; - Condition.broadcast c; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.push q x; + Condition.broadcast c) let length tq = let { queue = q; lock = m } = tq in - Mutex.lock m; - let n = PriorityQueue.length q in - Mutex.unlock m; - n + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.length q) let clear tq = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; - PriorityQueue.clear q; - Mutex.unlock m + CThread.with_lock m ~scope:(fun () -> + PriorityQueue.clear q) let clear_saving tq f = let { queue = q; lock = m; cond = c } = tq in - Mutex.lock m; let saved = ref [] in - while not (PriorityQueue.is_empty q) do - let elem = PriorityQueue.pop q in - match f elem with - | Some x -> saved := x :: !saved - | None -> () - done; - Mutex.unlock m; + CThread.with_lock m ~scope:(fun () -> + while not (PriorityQueue.is_empty q) do + let elem = PriorityQueue.pop q in + match f elem with + | Some x -> saved := x :: !saved + | None -> () + done); List.rev !saved let is_empty tq = PriorityQueue.is_empty tq.queue @@ -130,32 +123,28 @@ let is_empty tq = PriorityQueue.is_empty tq.queue let destroy tq = tq.release <- true; while tq.nwaiting > 0 do - Mutex.lock tq.lock; - Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + Condition.broadcast tq.cond) done; tq.release <- false let wait_until_n_are_waiting_and_queue_empty j tq = - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do - Condition.wait tq.cond_waiting tq.lock - done; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do + Condition.wait tq.cond_waiting tq.lock + done) let wait_until_n_are_waiting_then_snapshot j tq = let l = ref [] in - Mutex.lock tq.lock; - while not (PriorityQueue.is_empty tq.queue) do - l := PriorityQueue.pop tq.queue :: !l - done; - while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; - List.iter (PriorityQueue.push tq.queue) (List.rev !l); - if !l <> [] then Condition.broadcast tq.cond; - Mutex.unlock tq.lock; + CThread.with_lock tq.lock ~scope:(fun () -> + while not (PriorityQueue.is_empty tq.queue) do + l := PriorityQueue.pop tq.queue :: !l + done; + while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; + List.iter (PriorityQueue.push tq.queue) (List.rev !l); + if !l <> [] then Condition.broadcast tq.cond); List.rev !l let set_order tq rel = - Mutex.lock tq.lock; - PriorityQueue.set_rel rel tq.queue; - Mutex.unlock tq.lock + CThread.with_lock tq.lock ~scope:(fun () -> + PriorityQueue.set_rel rel tq.queue) diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 45c92c3748..fef9300377 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -72,12 +72,7 @@ let worker_handshake slave_ic slave_oc = exit 1 let locking { lock; pool = p } f = - try - Mutex.lock lock; - let x = f p in - Mutex.unlock lock; - x - with e -> Mutex.unlock lock; raise e + CThread.with_lock lock ~scope:(fun () -> f p) let rec create_worker extra pool priority id = let cancel = ref false in |
