2017-03-03 156 views

回答

4

卡夫卡连接的框架。 Confluent只提供连接器。如果你不想使用Confluent Open Source(但为什么不呢?),你也可以使用所有这些连接器与香草Apache Kafka。

有可用的多个卡桑德拉连接器:https://www.confluent.io/product/connectors/

顺便说一句:没有列出卡桑德拉连接器是由汇合维持。

当然,您也可以编写自己的连接器或使用任何其他第三方连接器。

+0

访问数据库的基本概念是在我的情况下使用查询SQL/CQL。连接还对数据库执行查询以存储数据或获取数据。如果我建立一个用于处理的消费者组和一个用于存储到DB的消费者组,然后用于存储到DB的消费者组,比如说DB-Consumer,它的工作是仅将数据存储到数据库,我可以轻松地使用ORM,而且我也将有完全的透明度和对它的控制。所以我担心的是它在性能和速度方面与这类(DB消费者)消费者实际上有何不同。 提前感谢您的帮助和帮助。 –

+0

连接作为框架负责故障切换,您也可以在分布式模式下运行它以扩展数据导入/导出“作业”。因此,Connect实际上是一种“火再忘”的体验。此外,对于Connect,您不需要编写任何代码 - 只需配置连接器即可。 –

0

是的,你基本上可以通过DataStax使用Kafka Connect api和Cassandra驱动程序,并为Kafka-cassandra-connect构建你的代码。

+0

您是否愿意详细阐述该解决方案? – zx485

3

DataMountaineer Stream Reactor有一个Cassandra源和接收解决方案,可用于Kafka连接。

下降的jar文件(download)到卡夫卡libs文件夹,然后指定你的连接如下:

{ 
"name": "cassandra-NAME", 
"config": { 
    "tasks.max": "1", 
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector", 
    "connect.cassandra.key.space": "KEYSPACE", 
    "connect.cassandra.source.kcql": "INSERT INTO KAFKA_TOPIC SELECT column1, timestamp_col FROM CASSANDRA_TABLE PK timestamp_col", 
    "connect.cassandra.import.mode": "incremental", 
    "connect.cassandra.contact.points": "localhost", 
    "connect.cassandra.port": 9042, 
    "connect.cassandra.import.poll.interval": 10000 
}} 

开始卡夫卡连接

bin/connect-distributed.sh config/connect-distributed.properties 

,并通过卡珊德拉连接器加载到卡夫卡连接上面提到的JSON属性文件(假设名称为connect-cassandra-source.json)

curl -X POST -H "Content-Type: application/json" -d @config/connect-cassandra-source.json localhost:8083/connectors 

您将需要创建一个具有timeuuid列作为群集密钥的表。这被描述为here

相关问题