;; Example for discussion about potential bug #ASYNC-36 on the async JIRA:
;; http://dev.clojure.org/jira/browse/ASYNC-36

;; Demonstrates proper stopping of channel-operations without the need
;; to close channels until all operations have cleanly come to an end.

;; Demonstrates that blocking is not an issue by using core.async/alts!.

;; Instructions:
;; Eval in a core.async capable repl and enter
;; (run-example)
;; at the repl.

;; Sample session:
;; A sample session can be found at the end of the file.

;; Leon Grapenthin, Nov. 20, 2013
;; -- updated on Nov. 22, 2013 with a benchmark of the blocking time
;; -- of the putting site

(require '[clojure.core.async :refer [go-loop go chan alts! <! >!! close! timeout put!]])

(def async-print-chan (chan))

(go-loop []
  (when-let [args (<! async-print-chan)]
    (apply println args)

(defn println-async
  "Use this instead of println to make sure that parallel calls to
  println don't result in mixed output."
  [& args]
  (put! async-print-chan args))

(defn produce-values!
  "Demonstrative function. Starts a seperate process in which an
  infinite amount of values is put onto out-ch, stops when :stop could
  be dispatched from stop-ch. Pending puts onto out-ch will be
  cancelled immediately after :stop could be taken."
  [out-ch stop-ch]
  (go-loop [[n & ns :as n-seq] (range)]
    (let [t1 (System/nanoTime)
          [v ch] (alts! [[out-ch n]
          t2 (System/nanoTime)]
        "(Time production-thread was blocked: " (/ (- t2 t1)
        "msecs)" \newline)
       (if (= out-ch ch)
         "Produced and dispatched from out-ch:"
         "Produced, but failed to put onto out-ch:") n)
      (case v
        :stop (close! out-ch)
        (recur (if (= out-ch ch)
                 ns n-seq))))))

(defn run-example
  (let [out-ch (chan)
        stop-ch (chan)]
    (produce-values! out-ch stop-ch)
    (go-loop []
      (let [val (<! out-ch)]
        (if val
            (println-async "Taken from out-ch:" val)
            (<! (timeout 2000))
          (println-async "Done. Reason: out-ch has been closed."))))
    (loop [u-inp nil] ;; UI
      (if-not (= :stop u-inp)
        (do (println-async "Please enter :stop or another form.")
            (recur (read)))
        (do (println-async ":stop dispatched")
            (>!! stop-ch u-inp)
            (println-async "Thank you."))))))

;; Example-session:

;; user> (run-example)
;; Please enter :stop or another form.
;; Taken from out-ch:  0
;; (Time production-thread was blocked: 0.221949msecs)
;; Produced and dispatched from out-ch: 0
;; Taken from out-ch:  1
;; (Time production-thread was blocked: 2000.677353msecs)

;; ;; This is where stopping can never happen and should never happen
;; ;; because we don't cut cables.

;; Produced and dispatched from out-ch: 1
;; Taken from out-ch:  2

;; (Time production-thread was blocked: 2000.488223msecs)
;; Produced and dispatched from out-ch: 2
;; => :stop
;; :stop dispatched
;; Thank you.

;; (Time production-thread was blocked: 508.66426msecs)

;; The above timing clearly demonstrates that the put operation was
;; not blocking as it in all other cases blocked the full 2 seconds
;; until it could put onto out-ch again

;; Produced, but failed to put onto out-ch: 3

;; Done. Reason: out-ch has been closed.
                                      ;; This is printed delayed
                                      ;; because we have the (<!
                                      ;; timeout 2000) still ticking
                                      ;; after the last taken value
                                      ;; from v

;; nil

Did you know? CLOSE

  • There are keyboard shortcuts!
    • When Creating A Paste
      • ALT+P Toggle Private
      • CTRL+Enter Create Paste
      • ALT+W Toggle word wrap
    • When Viewing A Paste
      • ALT+G Go to a line
      • ALT+CTRL+E Edit the paste
      • ALT+R Show the raw code
  • There are URL options!
    • When Creating A Paste
      • ?lang=Javascript to default to javascript
    • When Viewing A Paste
      • #L-N Jump to line number N