2017-07-25 62 views
0

我希望从clojure中产生一个长期运行的子进程,并通过标准的流与此进程进行通信。如何在clojure中的子进程中执行非阻塞读取标准输出?

使用conch库,我可以 产卵和阅读的过程中,从out流中读取数据:

(def my-process (sh/proc "my_dumb_process")) 
    ; read 10 lines from my-process's stdout. Will block until 10 lines taken 
    (take 10 (line-seq (clojure.java.io/reader (:out p)))) 

我想要调用异步回调,每当我处理打印 到stdout - 每当数据在标准输出流中可用。

我对clojure有点新 - 有没有一种惯用的clojur-ey方式来做 这个?我已经通过core.async查看,这是很好的,但我找不到一个 流的非阻塞解决方案。

+0

长久时间 - 直到NIO - JVM作为一个整体没有非阻塞IO,所以你不能这样做ync I/O *任何*基于Java的语言。所以最长久的成语将包含一个阻止I/O并将结果推送到core.async频道的线程。 –

+0

...坦率地说,我很难争辩说有一个很好的理由不遵循这个模式。毕竟,一个线程比它所读取的输出的重量级进程更便宜。 –

回答

3

我们的目的一个样本shell脚本(一定要使其可执行文件),将其放置在便于测试您的Clojure项目的根:

$ cat dumb.sh 
#!/bin/bash 

for i in 1 2 3 4 5 
do 
    echo "Loop iteration $i" 
    sleep 2 
done 

现在,我们将定义流程来执行,开始它,并获得stdout((.getInputStream process)),一次只读一行,并循环,直到我们完成。实时阅读。

(defn run-proc 
    [proc-name arg-string callback] 
    (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string])) 
     process (.start pbuilder)] 
    (with-open [reader (clojure.java.io/reader (.getInputStream process))] 
     (loop [] 
     (when-let [line (.readLine ^java.io.BufferedReader reader)] 
      (callback line) 
      (recur)))))) 

测试:

(run-proc "./dumb.sh" "" println) 
About to start... 
Loop iteration 1 
Loop iteration 2 
Loop iteration 3 
Loop iteration 4 
Loop iteration 5 
=> nil 

这个函数会阻塞,如将来电转接到您callback;您可以在future包裹,如果你希望它在一个单独的线程中运行:

(future (callback line)) 

对于基于core.async的方法:

(defn run-proc-async 
    [proc-name arg-string callback] 
    (let [ch (async/chan 1000 (map callback))] 
    (async/thread 
     (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string])) 
      process (.start pbuilder)] 
     (with-open [reader (clojure.java.io/reader (.getInputStream process))] 
      (loop [] 
      (when-let [line (.readLine ^java.io.BufferedReader reader)] 
       (async/>!! ch line) 
       (recur)))))) 
    ch)) 

这适用于您的callback功能的传感器到通道,结果放置在函数返回的通道上:

(run-proc-async "./dumb.sh" "" #(let [cnt (count %)] 
            (println "Counted" cnt "characters") 
            cnt)) 

#object[clojure.core.async.impl.channels.ManyToManyChannel ...] 
Counted 16 characters 
Counted 16 characters 
Counted 16 characters 
Counted 16 characters 
Counted 16 characters 

(async/<!! *1) 
=> 16 

在此示例中,通道上有一个1000的缓冲区。因此,除非您开始从通道中取出,否则在读取1000行后,将会阻止对>!!的呼叫。你也可以用回调函数put!,但是这里有一个内置的1024个限制,你应该正在处理结果。

+0

一个清楚而彻底的答案。谢谢。 – Dabo

1

如果您不介意使用库,您可以使用lazy-genyieldfrom the Tupelo library找到一个简单的解决方案。它的工作原理就像发生器功能在Python:

(ns tst.demo.core 
    (:use demo.core tupelo.test) 
    (:require 
    [clojure.java.io :as io] 
    [tupelo.core :as t] 
    [me.raynes.conch.low-level :as cll] 
)) 
(t/refer-tupelo) 

(dotest 
    (let [proc   (cll/proc "dumb.sh") 
     >>   (pretty proc) 
     out-lines  (line-seq (io/reader (grab :out proc))) 
     lazy-line-seq (lazy-gen 
         (doseq [line out-lines] 
          (yield line))) ] 
    (doseq [curr-line lazy-line-seq] 
     (spyx curr-line)))) 

像以前一样使用相同的dumb.sh,它产生的输出:

{:out #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x465b16bb "[email protected]"], 
:in #object[java.lang.UNIXProcess$ProcessPipeOutputStream 0xfafbc63 "[email protected]"], 
:err #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x59bb8f80 "[email protected]"], 
:process #object[java.lang.UNIXProcess 0x553c74cc "[email protected]"]} 

; one of these is printed every 2 seconds 
curr-line => "Loop iteration 1" 
curr-line => "Loop iteration 2" 
curr-line => "Loop iteration 3" 
curr-line => "Loop iteration 4" 
curr-line => "Loop iteration 5" 

lazy-gen一切都运行在一个单独的线程使用core.asyncdoseq急切地消耗过程输出并使用yield将其放在输出惰性序列上。第二doseq热切地消耗lazy-gen在当前线程中的结果,并在每行可用时打印每一行。


替代的解决方案:

一个更简单的解决方案是简单地用一个未来的,像这样:

(dotest 
    (let [proc   (cll/proc "dumb.sh") 
     out-lines  (line-seq (io/reader (grab :out proc))) ] 
    (future 
     (doseq [curr-line out-lines] 
     (spyx curr-line))))) 

具有相同的结果:

curr-line => "Loop iteration 1" 
curr-line => "Loop iteration 2" 
curr-line => "Loop iteration 3" 
curr-line => "Loop iteration 4" 
curr-line => "Loop iteration 5" 
相关问题