2015-09-04 83 views
3

我试图让一个本地Orion + Cygnus通过WebHDFS在本地HDFS上持久保存Orion数据。使Cygnus使用WebHDFS写入本地HDFS

关于gitub上的Cygnus' instructions,关于WebHDFS的提示很少,因为配置更多地是关于HttpFS。 在.md上OrionHDFSsink据说hdfs_port = 50070是用于WebHDFS,就像我的HDFS一样。所以我期望通过这种方式设置端口cygnus会自动使用WebHDFS,但对我来说,它似乎并没有这样工作。

所以,这里是我的agent_1.conf:

cygnusagent.sources = http-source 
cygnusagent.sinks = hdfs-sink 
cygnusagent.channels = hdfs-channel 

# source configuration 
cygnusagent.sources.http-source.channels = hdfs-channel 
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource 
cygnusagent.sources.http-source.port = 5050 
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler 
cygnusagent.sources.http-source.handler.notification_target = /notify 
cygnusagent.sources.http-source.handler.default_service = def_serv 
cygnusagent.sources.http-source.handler.default_service_path = def_servpath 
cygnusagent.sources.http-source.handler.events_ttl = 4 
cygnusagent.sources.http-source.interceptors = ts gi 
cygnusagent.sources.http-source.interceptors.ts.type = timestamp 
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder 
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf 

# OrionHDFSSink configuration 
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel 
cygnusagent.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.OrionHDFSSink 
cygnusagent.sinks.hdfs-sink.hdfs_host = localHDFS.ip 
cygnusagent.sinks.hdfs-sink.hdfs_port = 50070 
cygnusagent.sinks.hdfs-sink.hdfs_username = HDFSrootUser 
cygnusagent.sinks.hdfs-sink.attr_persistence = column 

# hdfs-channel configuration 
cygnusagent.channels.hdfs-channel.type = memory 
cygnusagent.channels.hdfs-channel.capacity = 1000 
cygnusagent.channels.hdfs-channel.transactionCapacity = 100 

当更新我在猎户座的实体,向谁天鹅被涂胶,天鹅座记录以下:

02 Sep 2015 20:09:12,353 INFO [[email protected]] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:150) - Starting transaction (1441217314-956-0000000000) 
02 Sep 2015 20:09:12,362 INFO [[email protected]] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:236) - Received data ({ "subscriptionId" : "55e735c9b89e8535f8ca5ef2", "originator" : "localhost", "contextResponses" : [ {  "contextElement" : {  "type" : "Reading",  "isPattern" : "false",  "id" : "Reading1.1",  "attributes" : [   {   "name" : "Cost",   "type" : "double",   "value" : "32"   },   {   "name" : "Reading_ID",   "type" : "integer",   "value" : "14"   },   {   "name" : "Threshold",   "type" : "double",   "value" : "30"   },   {   "name" : "email",   "type" : "string",   "value" : "[email protected]"   }  ]  },  "statusCode" : {  "code" : "200",  "reasonPhrase" : "OK"  } } ]}) 
02 Sep 2015 20:09:12,366 INFO [[email protected]] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:258) - Event put in the channel (id=2020008711, ttl=4) 
02 Sep 2015 20:09:12,432 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128) - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=4, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812) 
02 Sep 2015 20:09:12,549 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356) - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"[email protected]", "email_md":[]}) 
02 Sep 2015 20:09:12,557 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143) - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable) 
02 Sep 2015 20:09:12,558 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173) - An event was put again in the channel (id=2020008711, ttl=3) 
02 Sep 2015 20:09:12,558 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193) - Finishing transaction (1441217314-956-0000000000) 
02 Sep 2015 20:09:13,560 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128) - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=3, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812) 
02 Sep 2015 20:09:13,574 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356) - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"[email protected]", "email_md":[]}) 
02 Sep 2015 20:09:13,574 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143) - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable) 
02 Sep 2015 20:09:13,575 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173) - An event was put again in the channel (id=2020008711, ttl=2) 
02 Sep 2015 20:09:13,575 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193) - Finishing transaction (1441217314-956-0000000000) 
02 Sep 2015 20:09:15,576 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128) - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=2, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812) 
02 Sep 2015 20:09:15,590 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356) - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"[email protected]", "email_md":[]}) 
02 Sep 2015 20:09:15,599 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143) - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable) 
02 Sep 2015 20:09:15,600 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173) - An event was put again in the channel (id=2020008711, ttl=1) 
02 Sep 2015 20:09:15,600 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193) - Finishing transaction (1441217314-956-0000000000) 
02 Sep 2015 20:09:18,601 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128) - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=1, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812) 
02 Sep 2015 20:09:18,615 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356) - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"[email protected]", "email_md":[]}) 
02 Sep 2015 20:09:18,618 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143) - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable) 
02 Sep 2015 20:09:18,621 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173) - An event was put again in the channel (id=2020008711, ttl=0) 
02 Sep 2015 20:09:18,621 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193) - Finishing transaction (1441217314-956-0000000000) 
02 Sep 2015 20:09:22,622 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128) - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=0, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812) 
02 Sep 2015 20:09:22,635 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356) - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"[email protected]", "email_md":[]}) 
02 Sep 2015 20:09:22,635 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143) - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable) 
02 Sep 2015 20:09:22,635 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:163) - The event TTL has expired, it is no more re-injected in the channel (id=2020008711, ttl=0) 
02 Sep 2015 20:09:22,635 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193) - Finishing transaction (1441217314-956-0000000000) 

所以,你可以看到它的试图使用HttpFS,因为它记录了响应:

HttpFS响应:503服务不可用

...每次写作都会尝试。

如何配置代理使用WebHDFS?

谢谢

回答

1

我不知道发生了什么,但提到的配置是正确的,现在正在工作。

经过多次尝试重新启动实例,重写配置文件和其他日志错误比提到的一个,它的工作。 在某些时候,Cygnus试图写入localhost:50075,而不是{localHDFS.ip}:50070,但是在重新启动cygnus之后,这消失了。

所有实例的最新版本(重要)。

1

用于WebHDFS的Cygnus配置就是将端口设置为50070,不需要其他任何东西。

关于你提到50075的连接,它们也是正确的,因为这是WebHDFS的行为:当你想上传数据到HDFS时,首先客户端(在这种情况下,Cygnus)通过TCP/50070端口,那么namenode响应一个重定向位置,指向datanode,数据将被有效上传;这种重定向使用TCP/50075端口,因此客户端(Cygnus)必须可以访问datanode:50075。这就是为什么我们在FIWARE实验室的Cosmos全局实例中使用HttpFS:HttpFS充当隐藏数据节点细节的网关,并且需要单个入口点和端口(14000)。