2016-12-29 144 views
2

我对Apache Flink相对来说比较新,我正在尝试创建一个生成AWS S3存储桶文件的简单项目。基于文档,它看起来像我需要安装Hadoop才能执行此操作。Apache Flink AWS S3 Sink是否需要Hadoop进行本地测试?

如何设置我的本地环境以允许测试此功能?我已经在本地安装了Apache Flink和Hadoop。我已经为Hadoop的core-site.xml配置添加了必要的更改,并且还将我的HADOOP_CONF路径添加到了我的flink.yaml配置中。当我尝试通过弗林克UI本地提交我的工作,我总是得到一个错误

2016-12-29 16:03:49,861 INFO org.apache.flink.util.NetUtils        - Unable to allocate on port 6123, due to error: Address already in use 
 
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager    - Failed to run JobManager. 
 
java.lang.RuntimeException: Unable to do further retries starting the actor system 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143) 
 
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040) 
 
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我猜想,我很想念我的环境是如何设置的东西。有没有可能在本地做到这一点?任何帮助,将不胜感激。

+0

检查端口6123是否正在使用中。如果没有,那么禁用你的防火墙/ iptables。 –

回答

3

虽然您需要Hadoop库,但您不必安装Hadoop即可在本地运行并写入S3。我碰巧试着写出基于Avro模式的Parquet输出并生成SpecificRecord到S3。我通过SBT和Intellij Idea在本地运行以下代码的一个版本。需要的部分:

1)让以下文件指定所需的Hadoop属性(注意:不建议定义AWS访问密钥/密钥。最好在具有适当IAM角色读/写的EC2实例上运行。您的S3存储桶,但需要对当地进行测试)

<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)进口: 进口com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat 
import org.apache.flink.api.scala.{ExecutionEnvironment, _} 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.mapreduce.Job 

import org.apache.parquet.avro.AvroParquetOutputFormat 

3)弗林克代码使用HadoopOutputFormat以上配置:

val events: DataSet[(Void, EventOnlyRecord)] = ... 

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile) 

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord] 
    val outputJob = Job.getInstance 

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T] 
    //so key is Void, value of type T - EventOnlyRecord in this case 
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
     outputFormat, 
     outputJob 
    ) 

    val outputConfig = outputJob.getConfiguration 
    outputConfig.addResource(hadoopConfig) 
    val outputPath = new Path("s3://<bucket>/<dir-prefix>") 
    FileOutputFormat.setOutputPath(outputJob, outputPath) 
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema) 

    events.output(hadoopOutputFormat) 

    env.execute 

    ... 

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = { 
     val hadoopConfig = new HadoopConfiguration() 
     hadoopConfig.addResource(new Path(hadoodConfigPath)) 
     hadoopConfig 
    } 

4)结构的依赖关系和版本使用:

val awsSdkVersion = "1.7.4" 
    val hadoopVersion = "2.7.3" 
    val flinkVersion = "1.1.4" 

    val flinkDependencies = Seq(
     ("org.apache.flink" %% "flink-scala" % flinkVersion), 
     ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) 
    ) 

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided") 

    val serializationDependencies = Seq(
     ("org.apache.avro" % "avro" % "1.7.7"), 
     ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), 
     ("org.apache.parquet" % "parquet-avro" % "1.8.1") 
    ) 

    val s3Dependencies = Seq(
     ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), 
     ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) 
    ) 

编辑使用writeAsText至S3:

1)创建一个Hadoop配置目录(将引用此作为Hadoop的CONF-DIR ),其中包含一个文件core-site.xml。

例如:

mkdir /home/<user>/hadoop-config 
cd /home/<user>/hadoop-config 
vi core-site.xml 

#content of core-site.xml 
<configuration> 
    <property> 
     <name>fs.s3.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 

    <!-- Comma separated list of local directories used to buffer 
     large results prior to transmitting them to S3. --> 
    <property> 
     <name>fs.s3a.buffer.dir</name> 
     <value>/tmp</value> 
    </property> 

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>YOUR_ACCESS_KEY</value> 
    </property> 

    <!-- set your AWS access key --> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>YOUR_SECRET_KEY</value> 
    </property> 
</configuration> 

2)在它一个文件弗林克-conf.yaml创建一个目录(将引用此作为弗林克-conf的-DIR)。

例如:

mkdir /home/<user>/flink-config 
cd /home/<user>/flink-config 
vi flink-conf.yaml 

//content of flink-conf.yaml - continuing earlier example 
fs.hdfs.hadoopconf: /home/<user>/hadoop-config 

3)编辑用于运行S3弗林克作业您的IntelliJ Run配置 - 运行 - 编辑组态 - 并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir 

Continuing the example above: 
FLINK_CONF_DIR=/home/<user>/flink-config 

4)使用该环境变量集运行代码:

events.writeAsText("s3://<bucket>/<prefix-dir>") 

env.execute 
+0

感谢您的回复。有没有一种方法可以将我的本地java执行指向hadoop配置文件,而无需定义outputPath。基于文档,似乎我应该能够做一些类似于:messageStream.writeAsText(“s3:// ...”);但是当我通过IntelliJ运行本地执行时,它不知道该文件在哪里。我也似乎无法找到任何flink操作,这将允许我在运行时设置它。 – medium

+0

问题是,在调用writeAsText时使用的默认HadoopFileSystem不会“了解”s3文件系统。请参阅上面编辑我的原始答案。 – medale

+0

所以我认为我能够正常工作,但是我正在访问S3存储桶。出现此错误: com.amazonaws.services.s3.model.AmazonS3Exception:状态码:403,AWS服务:Amazon S3,AWS请求标识:**********,AWS错误代码:null,AWS错误消息:Forbidden,S3扩展请求ID: 我不确定为什么它存在访问错误,因为我的应用中使用的密钥与创建S3存储桶的帐户相同。似乎现在一切都在工作。如果您有任何提示,说明我为什么会收到此错误,请告诉我。再次感谢! – medium