Home > other >  How to batch process values on a channel
How to batch process values on a channel

Time:04-07

I'm trying to figure out how to batch incoming requests, do an action with the values in those requests, and then return the result of that action to each request. A slightly simplified version of my problem looks like the following:

Incoming requests make calls to

(defn process 
  [values] 
  ;; put values on the queue and wait for result, then return the result
  ...)

Periodically, another function is called

(defn batch-process
  []
  ;; take up to 10 of the values from the queue, sum those values, 
  ;; then return the result to their process requests
  ...)

I think I am lacking the vocabulary to figure out how I should be doing this. Any advice or pointers would be appreciated!

CodePudding user response:

I think I figured it out. The key was passing the out-channels into the batch-process call

(defn batch-process
  []
  (let [trigger (chan)
        in-chan (chan 100)]
    (go (loop []
          (let [trigger-val (<! trigger)]
            (if trigger-val
              (let [temp-chan (take (min 10 (.count (.buf in-chan))) in-chan)
                    chan-vals (<! (into [] temp-chan))
                    sum-vals (reduce (fn [cur-sum [num out-chan]] (  cur-sum num))
                                     0
                                     chan-vals)]
                (do (doseq [[num out-chan] chan-vals]
                      (>! out-chan [num sum-vals]))
                    (recur)))))))
    [trigger in-chan]))

(defn process 
  [value in-chan] 
    (let [out-chan (chan)]
      (>!! in-chan [2 out-chan])
      (<!! out-chan)))

Then keep track of trigger and in-chan after calling batch-process and pass in-chan to process. Putting a "true" value on trigger will trigger a batch-process.

CodePudding user response:

i would propose different approach, simply accumulating data and flush on desired count achieved, providing one more channel to force flush:

(require '[clojure.core.async :as a])

(defn batch-consume [n in]
  (let [flush-chan (a/chan)
        out-chan (a/chan)]
    (a/go-loop [data []]
      (a/alt! in ([v] (let [data (conj data v)]
                        (if (= n (count data))
                          (do (a/>! out-chan data)
                              (recur []))
                          (recur data))))
              flush-chan (do (a/>! out-chan data)
                             (recur []))))
    {:out out-chan
     :flush flush-chan}))

so that could be used somehow like this:

(let [ch (a/chan)
      {:keys [out flush]} (batch-consume 3 ch)]
  (a/go-loop []
    (let [data (a/<! out)]
      ;; processing batch
      (println data (apply   data)))
    (recur))
  (a/go (dotimes [i 10] ;; automatic flush demo
          (a/>! ch i))
        (a/>! flush :flush) ;; flushing the pending 10th item    
        (dotimes [i 3]      ;; force flushing by 2 items
          (dotimes [j 2]
            (a/>! ch (  (* 10 i) j)))
          (a/>! flush :flush))))

output:

 ;; [0 1 2] 3
 ;; [3 4 5] 12
 ;; [6 7 8] 21
 ;; [9] 9
 ;; [0 1] 1
 ;; [10 11] 21
 ;; [20 21] 41

notice, that if you pass non positive n to the batch-consume function, you're left with only a force flush (which could also be usable in some cases):

(let [ch (a/chan)
      {:keys [out flush]} (batch-consume -1 ch)]
  (a/go-loop []
    (let [data (a/<! out)]
      (println data (apply   data)))
    (recur))
  (a/go (dotimes [i 10]
          (a/>! ch i))
        (a/>! flush :flush)        
        (dotimes [i 3]
          (dotimes [j 2]
            (a/>! ch (  (* 10 i) j)))
          (a/>! flush :flush))))

;; [0 1 2 3 4 5 6 7 8 9] 45
;; [0 1] 1
;; [10 11] 21
;; [20 21] 41
  • Related