From ffe7fc6ff44ec94544123c47b3d01bdec05b3fe0 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Tue, 10 Feb 2015 08:34:00 +0100 Subject: *Queue: API to wake up all threads --- stm/asyncTaskQueue.ml | 4 +++- stm/asyncTaskQueue.mli | 2 ++ stm/tQueue.ml | 2 +- stm/tQueue.mli | 4 +++- 4 files changed, 9 insertions(+), 3 deletions(-) (limited to 'stm') diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 672527d9b5..e3fb0b607a 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -177,7 +177,7 @@ module Make(T : Task) = struct if not (Worker.is_alive proc) then () else if cancelled () || !(!expiration_date) then let () = stop_waiting := true in - let () = TQueue.signal_destruction queue in + let () = TQueue.broadcast queue in Worker.kill proc else let () = Unix.sleep 1 in @@ -253,6 +253,8 @@ module Make(T : Task) = struct Pool.destroy active; TQueue.destroy queue + let broadcast { queue } = TQueue.broadcast queue + let enqueue_task { queue; active } (t, _ as item) = prerr_endline ("Enqueue task "^T.name_of_task t); TQueue.push queue item diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli index 78f295d3d5..a3fe4b8c0d 100644 --- a/stm/asyncTaskQueue.mli +++ b/stm/asyncTaskQueue.mli @@ -61,6 +61,8 @@ module MakeQueue(T : Task) : sig val set_order : queue -> (T.task -> T.task -> int) -> unit + val broadcast : queue -> unit + (* Take a snapshot (non destructive but waits until all workers are * enqueued) *) val snapshot : queue -> T.task list diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 8a62fe79e1..6fef895ae8 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -79,7 +79,7 @@ let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) Mutex.unlock m; x -let signal_destruction { lock = m; cond = c } = +let broadcast { lock = m; cond = c } = Mutex.lock m; Condition.broadcast c; Mutex.unlock m diff --git a/stm/tQueue.mli b/stm/tQueue.mli index bc3922b33a..7458de510f 100644 --- a/stm/tQueue.mli +++ b/stm/tQueue.mli @@ -14,7 +14,9 @@ val pop : ?picky:('a -> bool) -> ?destroy:bool ref -> 'a t -> 'a val push : 'a t -> 'a -> unit val set_order : 'a t -> ('a -> 'a -> int) -> unit val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit -val signal_destruction : 'a t -> unit + +(* Wake up all waiting threads *) +val broadcast : 'a t -> unit (* Non destructive *) val wait_until_n_are_waiting_then_snapshot : int -> 'a t -> 'a list -- cgit v1.2.3