1

我正在使用Cloud Dataflow,PubSub & Bigquery阅读JSON Pubsub消息,使用TableRowJsonCoder将JSON转换为表格,然后将它们写入Bigquery。Cloud Dataflow,PubSub&Bigquery(TableRowJsonCoder)问题

我的问题是一致性,下面的代码有时会起作用。没有错误被抛出。我确定我正在向Pubsub主题正确发布消息。我也确信Dataflow正在读取每条消息。我使用gcloud命令行工具测试了这一点。

gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME 

在那里我有两个订阅的主题,一个读数据流和一个读我在终端。该代码还成功地将JSON数据格式化为表格格式,并将其写入到我指定的数据集和表格中,感觉如下:(

我的假设是我并不真正了解发生了什么,而我如果我发送50条消息,看起来数据流只读取大约一半的元素,这是我的第一个问题,这是关于元素的问题吗?视为一定数量的字节或消息?我如何解决这个问题?我正在用TableRowJSONCoder读取数据。

然后再次出现了类似的问题,对于X元素,只有一小部分能成功通过Groupbykey。如果我能进一步排除故障,我对这个问题的描述会更加深入。请注意,“ID”字段总是不稳定,所以我认为这不是与重复有关,但我可能是错的。

即使我写这封邮件,添加的元素已增加到41 &输出到bigquery已上升到12.我只是没有足够长的时间?我的测试数据是否很小(始终低于100条消息)?即使它最终保存了我所有的行,花了一个多小时,似乎太长了。

dataflow console

The succesfully inserted data

/* 
* Copyright (C) 2015 Google Inc. 
* 
* Licensed under the Apache License, Version 2.0 (the "License"); you may not 
* use this file except in compliance with the License. You may obtain a copy of 
* the License at 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
* License for the specific language governing permissions and limitations under 
* the License. 
*/ 

package com.example; 

import com.google.api.services.bigquery.model.TableFieldSchema; 
import com.google.api.services.bigquery.model.TableReference; 
import com.google.api.services.bigquery.model.TableRow; 
import com.google.api.services.bigquery.model.TableSchema; 
import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; 
import com.google.cloud.dataflow.sdk.io.BigQueryIO; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; 
import com.google.cloud.dataflow.sdk.transforms.windowing.Window; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; 

import java.util.ArrayList; 
import java.util.List; 

import org.joda.time.Duration; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
* A starter example for writing Google Cloud Dataflow programs. 
* 
* <p>The example takes two strings, converts them to their upper-case 
* representation and logs them. 
* 
* <p>To run this starter example locally using DirectPipelineRunner, just 
* execute it without any additional parameters from your favorite development 
* environment. 
* 
* <p>To run this starter example using managed resource in Google Cloud 
* Platform, you should specify the following command-line options: 
* --project=<YOUR_PROJECT_ID> 
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> 
* --runner=BlockingDataflowPipelineRunner 
*/ 
public class StarterPipeline { 

    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    static final int WINDOW_SIZE = 1; // Default window duration in minutes 

    private final static String PROJECT_ID = "dataflow-project"; 
    private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic"; 
    private final static String DATASET_ID = "test_dataset"; 
    private final static String TABLE_ID = "test_table_version_one"; 


    private static TableSchema getSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("ip").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("installation_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("user_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("device_type").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("language").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("application_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); 
     TableSchema schema = new TableSchema().setFields(fields); 
     return schema; 
    } 

    private static TableReference getTableReference() { 
     TableReference tableRef = new TableReference(); 
     tableRef.setProjectId(PROJECT_ID); 
     tableRef.setDatasetId(DATASET_ID); 
     tableRef.setTableId(TABLE_ID); 
     return tableRef; 
    } 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    dataflowOptions.setStreaming(true); 
    Pipeline pipeline = Pipeline.create(dataflowOptions); 
    LOG.info("Reading from PubSub."); 
    PCollection<TableRow> input = pipeline 
     .apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of())) 
      .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1)))); 
    input 
     .apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema())); 

    pipeline.run(); 
    } 
} 

而且我感兴趣的是指定时间戳和记录ID为 “时间戳” 和 “ID” 字段。

+0

这应该确实快得多。如果您的项目中的网络设置配置错误,我以前就看到过这种情况。你能否提供你工作的job_id,以便我可以进一步调查? 有关时间戳/ ID问题,请参阅https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids – danielm

+0

@danielm 2017-01-23_09_48_10-1670593411236141809,请注意,该项目上面的id不是正确的。 –

+0

离开管道运行过夜后,Pubsub读取中添加了63个元素,并生成了17行。瓶颈在于GroupByKey,并且需要很长时间才能从Pubsub中读取。 –

回答

0

问题是您的GCE虚拟机的网络配置错误。数据流需要虚拟机能够通过TCP通信,并且您的防火墙规则不允许这样做。添加防火墙规则以允许虚拟机之间的TCP连接将解决此问题。

一些数据缓慢通过管道的原因是,有时你很幸运,数据只需要在一台机器上处理。 Pubsub最终会超时并重试消息,因此它们最终都会通过。