2017-05-26 2346 views

回答

0

我们正在运行一个生产看起来大致是这样的

class RedisSource extends RichSourceFunction[SomeDataType] { 

    var client: RedisClient = _ 

    override def open(parameters: Configuration) = { 
    client = RedisClient() // init connection etc 
    } 

    @volatile var isRunning = true 

    override def cancel(): Unit = { 
    isRunning = false 
    client.close() 
    } 

    override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) { 
     for { 
     data <- ??? // get some data from the redis client 
     } yield ctx.collect(SomeDataType(data)) 

    } 
} 

我认为这真的取决于你需要从Redis的获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis也支持Pub/Sub,因此可以订阅,获取SourceConext并向下游推送消息。

0

有关Apache Flink的流redis源连接器的讨论(请参阅FLINK-3033),但没有一个可用。然而,实施一个应该不难。

1

目前,Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction类来实现。

public class RedisSink extends RichSinkFunction<String> { 

    @Override 
    public void open(Configuration parameters) throws Exception { 
     //open redis connection 
    } 

    @Override 
    public void invoke(String map) throws Exception { 
    //sink data to redis 
    } 

    @Override 
    public void close() throws Exception { 
    super.close(); 
    } 

} 
0

让您的Flink程序使用Jedis与Redis交谈时遇到的挑战之一是将相应的库存入您提交给Flink的JAR文件中。没有这个,你会得到调用堆栈,指示某些类未定义。这里是我创建的Maven pom.xml片段,用于将Redis及其相关组件apache commons-pool2移动到我的JAR中。

<build> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-dependency-plugin</artifactId> 
      <version>2.9</version> 
      <executions> 
       <execution> 
        <id>unpack</id> 
        <!-- executed just before the package phase --> 
        <!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html --> 
        <phase>prepare-package</phase> 
        <goals> 
         <goal>unpack</goal> 
        </goals> 
        <configuration> 
         <artifactItems> 
          <artifactItem> 
           <groupId>org.apache.commons</groupId> 
           <artifactId>commons-pool2</artifactId> 
           <version>2.4.2</version> 
           <type>jar</type> 
           <overWrite>false</overWrite> 
           <outputDirectory>${project.build.directory}/classes</outputDirectory> 
           <includes>org/apache/commons/**</includes> 
          </artifactItem> 
          <artifactItem> 
           <groupId>redis.clients</groupId> 
           <artifactId>jedis</artifactId> 
           <version>2.9.0</version> 
           <type>jar</type> 
           <overWrite>false</overWrite> 
           <outputDirectory>${project.build.directory}/classes</outputDirectory> 
           <includes>redis/clients/**</includes> 
          </artifactItem> 

         </artifactItems> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 

    </plugins> 
</build>