我们的目的一个样本shell脚本(一定要使其可执行文件),将其放置在便于测试您的Clojure项目的根:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在,我们将定义流程来执行,开始它,并获得stdout((.getInputStream process)
),一次只读一行,并循环,直到我们完成。实时阅读。
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
测试:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
这个函数会阻塞,如将来电转接到您callback
;您可以在future
包裹,如果你希望它在一个单独的线程中运行:
(future (callback line))
对于基于core.async的方法:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这适用于您的callback
功能的传感器到通道,结果放置在函数返回的通道上:
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在此示例中,通道上有一个1000的缓冲区。因此,除非您开始从通道中取出,否则在读取1000行后,将会阻止对>!!
的呼叫。你也可以用回调函数put!
,但是这里有一个内置的1024个限制,你应该正在处理结果。
长久时间 - 直到NIO - JVM作为一个整体没有非阻塞IO,所以你不能这样做ync I/O *任何*基于Java的语言。所以最长久的成语将包含一个阻止I/O并将结果推送到core.async频道的线程。 –
...坦率地说,我很难争辩说有一个很好的理由不遵循这个模式。毕竟,一个线程比它所读取的输出的重量级进程更便宜。 –