我一直在试图找到一个连接器将数据从Redis读取到Flink。 Flink的文档包含写入Redis的连接器的说明。我需要从我的Flink作业中读取来自Redis的数据。在Using Apache Flink for data streaming中,Fabian提到可以从Redis读取数据。什么是可用于此目的的连接器?将数据从Redis读取到Flink
0
A
回答
0
我们正在运行一个生产看起来大致是这样的
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这真的取决于你需要从Redis的获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis也支持Pub/Sub,因此可以订阅,获取SourceConext并向下游推送消息。
0
有关Apache Flink的流redis源连接器的讨论(请参阅FLINK-3033),但没有一个可用。然而,实施一个应该不难。
1
目前,Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction类来实现。
public class RedisSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
//open redis connection
}
@Override
public void invoke(String map) throws Exception {
//sink data to redis
}
@Override
public void close() throws Exception {
super.close();
}
}
0
让您的Flink程序使用Jedis与Redis交谈时遇到的挑战之一是将相应的库存入您提交给Flink的JAR文件中。没有这个,你会得到调用堆栈,指示某些类未定义。这里是我创建的Maven pom.xml片段,用于将Redis及其相关组件apache commons-pool2移动到我的JAR中。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/commons/**</includes>
</artifactItem>
<artifactItem>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>redis/clients/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
相关问题
- 1. Apache Flink - 无法从Twitter获取数据
- 2. 用Apache Flink将数据推送到S3
- 3. 将数据从redis独立实例移动到redis集群
- 4. 从Apache Flink查询数据
- 5. 将数据从CSV读取到.net数据表
- 6. 连接到Redis时无法读取任何数据
- 7. 从从服务器读取Redis哨兵
- 8. Redis写入主站从从站读取
- 9. 从文件读取数据到数组
- 10. 使用apache flink读取和写入cassandra的数据Java API
- 11. Redis读取/写入
- 12. 如何将数据从sqlite读取到数组中
- 13. 将数据库读取到MapProperty Javafx
- 14. 从Redis读取的许多Logstash实例
- 15. Django-redis-cache无法从redis获取数据
- 16. 使用Jmeter Redis数据集从Redis获取价值
- 17. 从PDF文件读取数据到R
- 18. 如何从Sling读取数据到Jquery?
- 19. 从Matlab读取数据到Java
- 20. 从Microsoft SQL Server读取数据到R
- 21. 从表中读取数据到散列
- 22. 角从json读取数据到textarea
- 23. XCode:从plist读取数据到NSmutablearray
- 24. 从MySQL读取数据到VB
- 25. 将Redis数据迁移到群集
- 26. 将音频数据从ALSA缓冲区读取到numpy阵列
- 27. Android:无法将数据从文件读取到多个类
- 28. 如何将数据从Cassandra(DBeaver)读取到R
- 29. 如何将XML字符串从数据库读取到XMLTextReader中?
- 30. 如何将数据从CSV读取到集合中