2016-06-10 61 views
0

我有followinf水槽配置。我正在尝试使用来自spool目录的flume将大小为9GB的文件传输到hdfs。我有以下水槽配置。水槽配置不工作,显示异常

#initialize agent's source, channel and sink 
wagent.sources = wavetronix 
wagent.channels = memoryChannel2 
wagent.sinks = flumeHDFS 

# Setting the source to spool directory where the file exists 
wagent.sources.wavetronix.type = spooldir 
wagent.sources.wavetronix.spoolDir = /johir/WAVETRONIX/output/Yesterday 
wagent.sources.wavetronix.fileHeader = false 
wagent.sources.wavetronix.basenameHeader = true 
#agent.sources.wavetronix.fileSuffix = .COMPLETED 

# Setting the channel to memory 
wagent.channels.memoryChannel2.type = memory 
# Max number of events stored in the memory channel 
wagent.channels.memoryChannel2.capacity = 50000 
agent.channels.memoryChannel2.batchSize = 1000 
wagent.channels.memoryChannel2.transactioncapacity = 1000 

# Setting the sink to HDFS 
wagent.sinks.flumeHDFS.type = hdfs 
#agent.sinks.flumeHDFS.useLocalTimeStamp = true 
wagent.sinks.flumeHDFS.hdfs.path =/user/root/WAVETRONIXFLUME/%Y-%m-%d/ 
wagent.sinks.flumeHDFS.hdfs.useLocalTimeStamp = true 
wagent.sinks.flumeHDFS.hdfs.filePrefix= %{basename} 
wagent.sinks.flumeHDFS.hdfs.fileType = DataStream 

# Write format can be text or writable 
wagent.sinks.flumeHDFS.hdfs.writeFormat = Text 

# use a single csv file at a time 
wagent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1 

wagent.sinks.flumeHDFS.hdfs.rollCount=0 
wagent.sinks.flumeHDFS.hdfs.rollInterval=0 
wagent.sinks.flumeHDFS.hdfs.rollSize = 6400000 
wagent.sinks.flumeHDFS.hdfs.batchSize =1000 

# never rollover based on the number of events 
wagent.sinks.flumeHDFS.hdfs.rollCount = 0 

# rollover file based on max time of 1 min 
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0 
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600 

# Connect source and sink with channel 
wagent.sources.wavetronix.channels = memoryChannel2 
wagent.sinks.flumeHDFS.channel = memoryChannel2 

但我得到以下例外。

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1043) at java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1535) at java.lang.ClassLoader.getClassLoadingLock(ClassLoader.java:463) at java.lang.ClassLoader.loadClass(ClassLoader.java:404) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java:165) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:479) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)

任何人都可以帮助我解决这个问题吗?

回答

0

请编辑文件${FLUME_HOME}/conf/flume-env.sh,然后添加以下代码:

export JAVA_OPTS="-Xms1000m -Xmx12000m -Dcom.sun.management.jmxremote"

您可以调整选项 “XMX” 和 “X毫秒”。