我有一个未绑定的数据流管道,它从Pub/Sub中读取数据,应用ParDo并写入Cassandra。它仅适用于ParDo转换,所以即使源未被绑定,我仍使用缺省触发的全局窗口。如何在云数据流中保持与外部数据库的连接
在这样的管道中,我该如何保持与Cassandra的连接?
目前我保持它在startBundle
这样的:
private class CassandraWriter <T> extends DoFn<T, Void> {
private transient Cluster cluster;
private transient Session session;
private transient MappingManager mappingManager;
@Override
public void startBundle(Context c) {
this.cluster = Cluster.builder()
.addContactPoints(hosts)
.withPort(port)
.withoutMetrics()
.withoutJMXReporting()
.build();
this.session = cluster.connect(keyspace);
this.mappingManager = new MappingManager(session);
}
@Override
public void processElement(ProcessContext c) throws IOException {
T element = c.element();
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(element.getClass());
mapper.save(element);
}
@Override
public void finishBundle(Context c) throws IOException {
session.close();
cluster.close();
}
}
然而,这种方式为每个元素创建一个新的连接。
另一种选择是在它传递https://github.com/benjumanji/cassandra-dataflow作为侧输入像:
public PDone apply(PCollection<T> input) {
Pipeline p = input.getPipeline();
CassandraWriteOperation<T> op = new CassandraWriteOperation<T>(this);
Coder<CassandraWriteOperation<T>> coder =
(Coder<CassandraWriteOperation<T>>)SerializableCoder.of(op.getClass());
PCollection<CassandraWriteOperation<T>> opSingleton =
p.apply(Create.<CassandraWriteOperation<T>>of(op)).setCoder(coder);
final PCollectionView<CassandraWriteOperation<T>> opSingletonView =
opSingleton.apply(View.<CassandraWriteOperation<T>>asSingleton());
PCollection<Void> results = input.apply(ParDo.of(new DoFn<T, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
// use the side input here
}
}).withSideInputs(opSingletonView));
PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
opSingleton.apply(ParDo.of(new DoFn<CassandraWriteOperation<T>, Void>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
CassandraWriteOperation<T> op = c.element();
op.finalize();
}
}).withSideInputs(voidView));
return new PDone();
}
然而这样我必须使用加窗因为PCollectionView<Iterable<Void>> voidView = results.apply(View.<Void>asIterable());
由适用的基团。
一般来说,从无界PCollection写入外部数据库的PTransform应该如何保持与数据库的连接?
谢谢。我正在考虑静态,但我有点不情愿使用它,因为它不是[通过附加(带外)数据]的四种记录方式之一(https://cloud.google.com/dataflow/FAQ#帕尔-附加数据)。 –