在开始之前,我想先说我对Kafka是全新的,对于Linux来说是相当新的,所以如果这最终是一个荒谬的简单答案,请善待! :)Kafka连接MySQL源
我想要做的事情的高层次想法是使用Confluent的Kafka Connect从分数或分钟基础上传输传感器数据的MySQL数据库读取,然后使用Kafka作为一个“ETL管道”即时将数据发送到数据仓库和/或MongoDB进行报告,甚至直接从我们的网络应用程序绑定到Kafka。
我使用Robin Moffatt的series以及Confluent的JDBC Source Connector Quickstart作为我的初始指南。就托管的位置而言,我使用Amazon RDS MySQL数据库和独立的AWS EC2 t2.large实例与Ubuntu 16.04.2一起运行Kafka Connect。
使用Robin的工作流程,我到了创建配置文件的地步,但我没有使用他使用的json格式。我正在使用快速入门文章的格式。
name=jdbc_source_mysql_4427_Data
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*****
table.whitelist=4427_Data
mode=timestamp
timestamp.column.name=TmStamp
validate.non.null=false
topic.prefix=mysql-
,这是保存在:
/etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties
我然后运行:
/usr/bin/confluent load jdbc_source_mysql_4427_Data -d /etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties
,并得到这个错误:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*** for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*** for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
这似乎是一个驱动程序问题。我现在的问题是,“我需要将MySQL JDBC驱动程序下载到我的EC2实例,还是应该包含在Confluent Platform软件包中?”
此外,我的整体想法听起来很适合Kafka Connect?
正如我前面提到的,我对这些技术很陌生,但已经发现学习某些东西的最佳方式是直接跳入并尝试解决问题。任何想法和建议都会受到欢迎。谢谢!
谢谢@dawsaw。我知道这是非常基本的,因为我是Linux的新手,但是有一些文档建议您将驱动程序添加到我的worker classpath中?我对apt-get下载/安装做得很好,但使用压缩tar.gz对我来说并不那么直截了当。我一生都被Windows GUI所宠坏 – bneelon
您可以执行与https://docs.confluent.io/current/connect/userguide.html#installing-plugins中所述相似的操作。您也可以通过将jar放入share/java/kafka-connect-jdbc中直接将jar添加到您的安装中。在那里您还可以找到随连接器一起提供的其他驱动程序。 – dawsaw
好的,在你的帮助下,我已经完成了这个工作,但我碰到的一个问题是,我的许多MySQL表名都以数字开头,并且抛出了“非法初始字符”错误。有没有办法解决这个问题?我试着用一个字母开头的表,它工作得很好。谢谢。 – bneelon