2016-11-04 45 views
3

我想弄清楚如何最好地创建一个异步组件,或以一种组件友好的方式适应异步代码。这是我能想到的最好的,而且......它感觉不太正确。如何改进此Clojure组件+异步示例?

要义:接话,uppercase他们和他们reverse最后print他们。

问题1:我不能让system停止在最后。我期望看到个人c-chan停止的println,但不。

问题2:我该如何正确注射贴剂。进入producer/consumer fns?我的意思是,它们不是组件,我认为它们应该是而不是是组件,因为它们没有明智的生命周期。

问题3:如何处理惯用命名为a>b,并b>casync/pipeline -creating副作用? pipeline应该是一个组件吗?

(ns pipelines.core 
    (:require [clojure.core.async :as async 
      :refer [go >! <! chan pipeline-blocking close!]] 
      [com.stuartsierra.component :as component])) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; PIPELINES 
(defn a>b [a> b>] 
    (pipeline-blocking 4 
        b> 
        (map clojure.string/upper-case) 
        a>)) 
(defn b>c [b> c>] 
    (pipeline-blocking 4 
        c> 
        (map (comp (partial apply str) 
           reverse)) 
        b>)) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; PRODUCER/CONSUMER 
(defn producer [a>] 
    (doseq [word ["apple" "banana" "carrot"]] 
    (go (>! a> word)))) 

(defn consumer [c>] 
    (go (while true 
     (println "Your Word Is: " (<! c>))))) 



;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; SYSTEM 
(defn pipeline-system [config-options] 
    (let [c-chan (reify component/Lifecycle 
       (start [this] 
        (println "starting chan: " this) 
        (chan 1)) 
       (stop [this] 
        (println "stopping chan: " this) 
        (close! this)))] 
    (-> (component/system-map 
     :a> c-chan 
     :b> c-chan 
     :c> c-chan) 
     (component/using {})))) 


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 
;; RUN IT! 
(def system (atom nil)) 
(let [_  (reset! system (component/start (pipeline-system {}))) 
     _  (a>b (:a> @system) (:b> @system)) 
     _  (b>c (:b> @system) (:c> @system)) 
     _  (producer (:a> @system)) 
     _  (consumer (:c> @system)) 
     _  (component/stop @system)]) 

编辑

我开始思考以下,但我不能肯定它是否正常关闭...

(extend-protocol component/Lifecycle 
    clojure.core.async.impl.channels.ManyToManyChannel 
    (start [this] 
    this) 
    (stop [this] 
    (close! this))) 

回答

7

我重写你的榜样有点让它可加载

增值的管道

(ns pipeline 
    (:require [clojure.core.async :as ca :refer [>! <!]] 
      [clojure.string :as s])) 

(defn upverse [from to] 
    (ca/pipeline-blocking 4 
         to 
         (map (comp s/upper-case 
            s/reverse)) 
         from)) 
(defn produce [ch xs] 
    (doseq [word xs] 
    (ca/go (>! ch word)))) 

(defn consume [ch] 
    (ca/go-loop [] 
       (when-let [word (<! ch)] 
       (println "your word is:" word) 
       (recur)))) 

(defn start-engine [] 
    (let [[from to] [(ca/chan) (ca/chan)]] 
    (upverse to from) 
    (consume from) 
    {:stop (fn [] 
      (ca/close! to) 
      (ca/close! from) 
      (println "engine is stopped")) 
    :process (partial produce to)})) 

这样你可以做(start-engine),并用它来处理单词序列:

REPL时间

boot.user=> (require '[pipeline]) 

boot.user=> (def engine (pipeline/start-engine)) 
#'boot.user/engine 

它运行

boot.user=> ((engine :process) ["apple" "banana" "carrot"]) 

your word is: TORRAC 
your word is: ANANAB 
your word is: ELPPA 

boot.user=> ((engine :process) ["do" "what" "makes" "sense"]) 

your word is: OD 
your word is: SEKAM 
your word is: ESNES 
your word is: TAHW 

停止它

boot.user=> ((:stop engine)) 
engine is stopped 

;; engine would not process anymore 
boot.user=> ((engine :process) ["apple" "banana" "carrot"]) 
nil 

国家管理

取决于你打算如何使用这条管道,可能不会在所有需要像组件的状态管理框架:没有必要“以防万一”添加任何东西,启动和停止在这种情况下管道是一个调用两个函数的问题。

但是,如果此管道在具有更多状态的较大应用程序中使用,您肯定可以从状态管理库中受益。

not a fan of Component主要是因为它需要一个完整的应用程序买入(这使它成为一个框架),但我不使用它尊重其他人。

安装

我建议要么没有的情况下,使用任何具体的程序是小:你,例如可以构成此管道与其他管道/逻辑和-main踢它关闭,但如果应用程序是任何更大,有更多的国家无关,这里是所有你需要做的,添加mount它:

(defstate engine :start (start-engine) 
       :stop ((:stop engine))) 

开始管道

boot.user=> (mount/start) 
{:started ["#'pipeline/engine"]} 

boot.user=> ((engine :process) ["do" "what" "makes" "sense"]) 

your word is: OD 
your word is: SEKAM 
your word is: ESNES 
your word is: TAHW 

运行停止它

boot.user=> (mount/stop) 
engine is stopped 
{:stopped ["#'pipeline/engine"]} 

这里是一个gist with a full example包括build.boot

你可以下载,并通过boot repl


[编辑]用它玩:回答您已经迷上了组件的评论

在情况下,这应该让你开始:

(defrecord WordEngine [] 
    component/Lifecycle 

    (start [component] 
    (merge component (start-engine))) 

    (stop [component] 
    ((:stop component)) 
    (assoc component :process nil :stop nil))) 

这在开始时会创建一个WordEngine对象,该对象将具有方法:processod

您将无法像调用普通的Clojure函数那样调用它:即从REPL或任何名称空间仅由:require开始,除非您将引用传递给不推荐的整个系统。

所以为了调用它,需要将WordEngine插入到一个Component系统中,然后注入到另一个Component中,然后可以解构:process函数并调用它。

+0

Mount + Async的一个很好的例子,就像你说的那样,对于较小的用例可能是首选。在为* this *作品安排Component之前,我实际上花了很多时间在Component *和* Mount上,也就是因为我喜欢“框架”提供的内容。对我来说,这是一个已经“买进”Component的大型系统,我真的很喜欢为组件运行一个例子,尽管我会因为这是一个很好的Mount示例而听到这个声音! (加上,我喜欢这是'引导',我打算最终切换:) –

+1

我的意思是更小的情况是什么都没用:只是Clojure。 Mount目前在很多大型应用程序中都非常成功地使用。事实上,一周前我刚刚听到一家公司将他们的20K LOC应用程序从Component转换为Mount,并且不能更快乐。我在答案中添加了一个示例组件示例。 – tolitius

+1

我非常感谢你们在提倡和支持'Mount'!与我对斯图尔特塞拉利昂和weavejester(詹姆斯?)对组件的看法,我怀疑他们对这个玩具问题的处理看起来更通用一些,比如'channel'或'pipeline'组件(带有'boundary's?),而不是'WordEngine',以及补救你征收的一些(否则有效的)批评。尽管如此,我们确实有权衡取舍,并且你说服我让“更多”机会,所以非常感谢!我很感激tolitius! –