I'm trying to figure out how to batch incoming requests, do an action with the values in those requests, and then return the result of that action to each request. A slightly simplified version of my problem looks like the following:
Incoming requests make calls to
(defn process
[values]
;; put values on the queue and wait for result, then return the result
...)
Periodically, another function is called
(defn batch-process
[]
;; take up to 10 of the values from the queue, sum those values,
;; then return the result to their process requests
...)
I think I am lacking the vocabulary to figure out how I should be doing this. Any advice or pointers would be appreciated!
CodePudding user response:
I think I figured it out. The key was passing the out-channels into the batch-process call
(defn batch-process
[]
(let [trigger (chan)
in-chan (chan 100)]
(go (loop []
(let [trigger-val (<! trigger)]
(if trigger-val
(let [temp-chan (take (min 10 (.count (.buf in-chan))) in-chan)
chan-vals (<! (into [] temp-chan))
sum-vals (reduce (fn [cur-sum [num out-chan]] ( cur-sum num))
0
chan-vals)]
(do (doseq [[num out-chan] chan-vals]
(>! out-chan [num sum-vals]))
(recur)))))))
[trigger in-chan]))
(defn process
[value in-chan]
(let [out-chan (chan)]
(>!! in-chan [2 out-chan])
(<!! out-chan)))
Then keep track of trigger
and in-chan
after calling batch-process
and pass in-chan
to process
. Putting a "true" value on trigger
will trigger a batch-process
.
CodePudding user response:
i would propose different approach, simply accumulating data and flush on desired count achieved, providing one more channel to force flush:
(require '[clojure.core.async :as a])
(defn batch-consume [n in]
(let [flush-chan (a/chan)
out-chan (a/chan)]
(a/go-loop [data []]
(a/alt! in ([v] (let [data (conj data v)]
(if (= n (count data))
(do (a/>! out-chan data)
(recur []))
(recur data))))
flush-chan (do (a/>! out-chan data)
(recur []))))
{:out out-chan
:flush flush-chan}))
so that could be used somehow like this:
(let [ch (a/chan)
{:keys [out flush]} (batch-consume 3 ch)]
(a/go-loop []
(let [data (a/<! out)]
;; processing batch
(println data (apply data)))
(recur))
(a/go (dotimes [i 10] ;; automatic flush demo
(a/>! ch i))
(a/>! flush :flush) ;; flushing the pending 10th item
(dotimes [i 3] ;; force flushing by 2 items
(dotimes [j 2]
(a/>! ch ( (* 10 i) j)))
(a/>! flush :flush))))
output:
;; [0 1 2] 3
;; [3 4 5] 12
;; [6 7 8] 21
;; [9] 9
;; [0 1] 1
;; [10 11] 21
;; [20 21] 41
notice, that if you pass non positive n
to the batch-consume
function, you're left with only a force flush (which could also be usable in some cases):
(let [ch (a/chan)
{:keys [out flush]} (batch-consume -1 ch)]
(a/go-loop []
(let [data (a/<! out)]
(println data (apply data)))
(recur))
(a/go (dotimes [i 10]
(a/>! ch i))
(a/>! flush :flush)
(dotimes [i 3]
(dotimes [j 2]
(a/>! ch ( (* 10 i) j)))
(a/>! flush :flush))))
;; [0 1 2 3 4 5 6 7 8 9] 45
;; [0 1] 1
;; [10 11] 21
;; [20 21] 41