2017-09-15 199 views
0

在开始之前,我想先说我对Kafka是全新的,对于Linux来说是相当新的,所以如果这最终是一个荒谬的简单答案,请善待! :)Kafka连接MySQL源

我想要做的事情的高层次想法是使用Confluent的Kafka Connect从分数或分钟基础上传输传感器数据的MySQL数据库读取,然后使用Kafka作为一个“ETL管道”即时将数据发送到数据仓库和/或MongoDB进行报告,甚至直接从我们的网络应用程序绑定到Kafka。

我使用Robin Moffatt的series以及Confluent的JDBC Source Connector Quickstart作为我的初始指南。就托管的位置而言,我使用Amazon RDS MySQL数据库和独立的AWS EC2 t2.large实例与Ubuntu 16.04.2一起运行Kafka Connect。

使用Robin的工作流程,我到了创建配置文件的地步,但我没有使用他使用的json格式。我正在使用快速入门文章的格式。

name=jdbc_source_mysql_4427_Data  
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
key.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter=io.confluent.connect.avro.AvroConverter 
value.converter.schema.registry.url=http://localhost:8081 
connection.url=jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*****     
table.whitelist=4427_Data    
mode=timestamp     
timestamp.column.name=TmStamp    
validate.non.null=false     
topic.prefix=mysql- 

,这是保存在:

/etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties 

我然后运行:

/usr/bin/confluent load jdbc_source_mysql_4427_Data -d /etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties 

,并得到这个错误:

{ 
    "error_code": 400, 
    "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*** for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*** for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" 
} 

这似乎是一个驱动程序问题。我现在的问题是,“我需要将MySQL JDBC驱动程序下载到我的EC2实例,还是应该包含在Confluent Platform软件包中?”

此外,我的整体想法听起来很适合Kafka Connect?

正如我前面提到的,我对这些技术很陌生,但已经发现学习某些东西的最佳方式是直接跳入并尝试解决问题。任何想法和建议都会受到欢迎。谢谢!

回答

1

整体概念对我有意义。您需要下载驱动程序并将其添加到工作人员类路径中。由于我承担的许可原因,它未被打包。

+0

谢谢@dawsaw。我知道这是非常基本的,因为我是Linux的新手,但是有一些文档建议您将驱动程序添加到我的worker classpath中?我对apt-get下载/安装做得很好,但使用压缩tar.gz对我来说并不那么直截了当。我一生都被Windows GUI所宠坏 – bneelon

+0

您可以执行与https://docs.confluent.io/current/connect/userguide.html#installing-plugins中所述相似的操作。您也可以通过将jar放入share/java/kafka-connect-jdbc中直接将jar添加到您的安装中。在那里您还可以找到随连接器一起提供的其他驱动程序。 – dawsaw

+0

好的,在你的帮助下,我已经完成了这个工作,但我碰到的一个问题是,我的许多MySQL表名都以数字开头,并且抛出了“非法初始字符”错误。有没有办法解决这个问题?我试着用一个字母开头的表,它工作得很好。谢谢。 – bneelon

0

正如@dawsaw所说的,您需要将MySQL JDBC驱动程序提供给连接器。

我的观察在这里将给予你所描述的所有应用程序和体系结构的空闲手 - 最好从传感器流入Kafka,然后从Kafka流入MySQL,Mongo,webapp等。

如果您有选项,流入数据库然后流出数据库不是一个完美的选择。

+0

感谢您的回复,@RobinMoffat。到目前为止,您的文章非常有帮助。我绝对同意从传感器到Kafka的流式传输将是最佳的。有一个用于传感器的Java SDK,但正如前面提到的,我对某些更核心的编程有点新手。我有更多的R /统计背景。我正在试用传感器公司的一款软件,它允许我通过点击一个按钮来将传感器流式传输到MySQL数据库,但它非常严格。我想使用Kafka和一个独特的ID将数据分成更具查询能力的格式/平台。 – bneelon

0

这是因为在融合分布中没有mysql驱动程序。我认为你可以通过下载一个mysql驱动jar文件来解决这个问题,然后把它放在confluent/share/java/kafka-connect-jdbc文件夹中并重新运行程序。