2015-10-18 48 views
1

我有一个未绑定的数据流管道,它从Pub/Sub中读取数据,应用ParDo并写入Cassandra。它仅适用于ParDo转换,所以即使源未被绑定,我仍使用缺省触发的全局窗口。如何在云数据流中保持与外部数据库的连接

在这样的管道中,我该如何保持与Cassandra的连接?

目前我保持它在startBundle这样的:

private class CassandraWriter <T> extends DoFn<T, Void> { 
    private transient Cluster cluster; 
    private transient Session session; 
    private transient MappingManager mappingManager; 

    @Override 
    public void startBundle(Context c) { 
    this.cluster = Cluster.builder() 
     .addContactPoints(hosts) 
     .withPort(port) 
     .withoutMetrics() 
     .withoutJMXReporting() 
     .build(); 
    this.session = cluster.connect(keyspace); 
    this.mappingManager = new MappingManager(session); 
    } 

    @Override 
    public void processElement(ProcessContext c) throws IOException { 
    T element = c.element(); 
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass()); 
    mapper.save(element); 
    } 

    @Override 
    public void finishBundle(Context c) throws IOException { 
    session.close(); 
    cluster.close(); 
    } 
} 

然而,这种方式为每个元素创建一个新的连接。

另一种选择是在它传递https://github.com/benjumanji/cassandra-dataflow作为侧输入像:

public PDone apply(PCollection<T> input) { 
    Pipeline p = input.getPipeline(); 

    CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this); 

    Coder<CassandraWriteOperation<T>> coder = 
    (Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass()); 

    PCollection<CassandraWriteOperation<T>> opSingleton = 
    p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder); 

    final PCollectionView<CassandraWriteOperation<T>> opSingletonView = 
    opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton()); 

    PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() { 
    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     // use the side input here 
    } 
    }).withSideInputs(opSingletonView)); 

    PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable()); 

    opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() { 
    private static final long serialVersionUID = 0; 

    @Override 
    public void processElement(ProcessContext c) { 
     CassandraWriteOperation<T> op = c.element(); 
     op.finalize(); 
    } 

    }).withSideInputs(voidView)); 

    return new PDone(); 
} 

然而这样我必须使用加窗因为PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());由适用的基团。

一般来说,从无界PCollection写入外部数据库的PTransform应该如何保持与数据库的连接?

回答

2

您正确地观察到,与批处理/有界情况相比,流/无界情况下的典型捆绑包大小更小。实际的捆绑包大小取决于许多参数,有时捆绑包可能包含单个元素。解决这个问题的

一种方法是使用每个工人的连接池,存储在您的DoFn的静止状态。您应该能够在第一次调用startBundle期间对其进行初始化,并在捆绑包中使用它。或者,您可以按需创建连接,并在不再需要时将其释放到池中以供重用。

您应该确保静态静态是线程安全的,并且您没有假设Dataflow如何管理包。

+0

谢谢。我正在考虑静态,但我有点不情愿使用它,因为它不是[通过附加(带外)数据]的四种记录方式之一(https://cloud.google.com/dataflow/FAQ#帕尔-附加数据)。 –

0

正如Davor Bonaci所说,使用静态变量解决了这个问题。

public class CassandraWriter<T> extends DoFn<T, Void> { 
    private static final Logger log = LoggerFactory.getLogger(CassandraWriter.class); 

    // Prevent multiple threads from creating multiple cluster connection in parallel. 
    private static transient final Object lock = new Object(); 
    private static transient Cluster cluster; 
    private static transient Session session; 
    private static transient MappingManager mappingManager; 

    private final String[] hosts; 
    private final int port; 
    private final String keyspace; 

    public CassandraWriter(String[] hosts, int port, String keyspace) { 
    this.hosts = hosts; 
    this.port = port; 
    this.keyspace = keyspace; 
    } 

    @Override 
    public void startBundle(Context c) { 
    synchronized (lock) { 
     if (cluster == null) { 
     cluster = Cluster.builder() 
      .addContactPoints(hosts) 
      .withPort(port) 
      .withoutMetrics() 
      .withoutJMXReporting() 
      .build(); 
     session = cluster.connect(keyspace); 
     mappingManager = new MappingManager(session); 
     } 
    } 
    } 

    @Override 
    public void processElement(ProcessContext c) throws IOException { 
    T element = c.element(); 
    Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass()); 
    mapper.save(element); 
    } 
}