2017-01-09 623 views
1

我有一个小型测试项目来将数据推送到S3存储桶。但是,它看起来像我没有读取core-site.xml文件,因为我收到错误java.io.IOException: No file system found with scheme s3a。我如何正确读取core-site.xml文件并将数据推送到S3?用Apache Flink将数据推送到S3

这是代码:

public class S3Sink { 
public static void main(String[] args) throws Exception { 
    Map<String, String> configs = ConfigUtils.loadConfigs(“path/to/config.yaml"); 

    final ParameterTool parameterTool = ParameterTool.fromMap(configs); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.getConfig().disableSysoutLogging(); 
    env.getConfig().setGlobalJobParameters(parameterTool); 

    DataStream<String> messageStream = env 
      .addSource(new FlinkKafkaConsumer09<String>(
        parameterTool.getRequired("kafka.topic"), 
        new SimpleStringSchema(), 
        parameterTool.getProperties())); 

    String id = UUID.randomUUID().toString(); 
    messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1); 

    env.execute(); 
} 

这是弗林克-conf.yaml文件中的配置变化来引用核心site.xml文件:

fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/ 

这是我的核心-site.xml:

<configuration> 
<property> 
    <name>fs.defaultFS</name> 
    <value>hdfs://localhost:9000</value> 
</property> 
<property> 
    <name>fs.s3.impl</name> 
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
</property> 

<!-- Comma separated list of local directories used to buffer 
    large results prior to transmitting them to S3. --> 
<property> 
    <name>fs.s3a.buffer.dir</name> 
    <value>/tmp</value> 
</property> 

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
<property> 
    <name>fs.s3a.awsAccessKeyId</name> 
    <value>*****</value> 
</property> 
<!-- set your AWS access key --> 
<property> 
    <name>fs.s3a.awsSecretAccessKey</name> 
    <value>*****</value> 
</property> 

+0

如果将'fs.hdfs.hadoopconf'设置为连续'core-site.xml'文件夹,那么它会起作用吗?还要确保'$ HADOOP_HOME'环境变量设置正确。 –

+0

我正在使用IntelliJ并将环境变量HADOOP_HOME设置为core-site.xml路径。我在本地运行程序,因此fs.hdfs.hadoopconf设置不起作用。 – Sam

回答

1

core-site.xml文件未被读入的原因是由于Hadoop的文件结构。我有HADOOP_HOME=path/to/dir/etc/hadoop。但是,Hadoop会将etc/hadoop作为其文件结构的一部分来查找core-site.xml。要在HADOOP_HOME环境变量中正确读取路径,应该将其列为HADOOP_HOME=path/to/dir

另一个问题是数据没有推到S3的原因。这是因为我正在使用流处理。批量处理的作用是将数据推送到S3,但流处理并不是因为S3如何将数据存储为键/值存储,并且新数据只能被替换。对于流处理,Flink会将数据附加到S3不允许的数据上,因此不会将数据推送到S3。所以这段代码适用于将批量推送到S3

ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet dataSet = ee.readTextFile("/Users/name/Desktop/flinkoutputtest.txt"); 
    dataSet.writeAsText("s3://flink-test/flink-output/testdoc.txt").setParallelism(1); 
    ee.execute();