2016-12-28 38 views
1

我想使用Flume在Python脚本中收集日志,因此我按照用户指南配置了使用netcat源的Flume,然后使用telnet和nc进行测试,它运作良好。在使用python套接字或telnet工作时无法正确获取事件

我的配置代码:

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
a1.sinks.k1.type = logger 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

然后我使用Python连接水槽,并发送一些话这样说:

import socket 
def netcat(hostname, port): 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    s.connect((hostname, port)) 
    s.send("test words 1\n") 
    s.send("test words 2\n") 
    s.send("test words 3\n") 
    s.send("test words 4\n") 
    s.shutdown(socket.SHUT_WR) 
    s.close() 

if _name_ == "_main_": 
    netcat("127.0.0.1",44444) 

问题发生,水槽只能接收2行。 水槽日志:

2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 } 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }

我有两个Ubuntu的& Java1.8和CentOS &的Java 1.7,并在Python的telnet模型相同的结果相同的结果。

配置或Python脚本有什么问题吗?或者任何人有这种情况下的建议?

回答

2

发生这种情况的原因是因为您并未等待响应回来。默认情况下,Flume的netcat源将为每个事件发回“OK”消息。在可以发送响应之前,您正在终止连接,这会导致进一步的消息处理失败(因为管道已从客户端断开)。

要解决这个问题,你需要进行以下更改您的flume.conf:

a1.sources.r1.ack-every-event=false 

这样就要求一个“OK”发送,并因此停止失败。

或者,您可以更改您的Python以等待每次在关闭连接前发送“确定”消息。人为地,在中增加一个睡眠声明,应该也解决了这个问题,虽然你会假设处理你的消息需要多长时间。通常很好,但可能会有其他情况导致处理延迟。