google-cloud-dataflow

    0热度

    1回答

    我正在尝试使用GCP Dataflow Python使用其第一个字符来处理输入文本文件。如果条目的第一个字符是'A',我想要将文件存储在A.txt中等等。同样,我有一个数字与每个字符相关联。我为此储存了两个hashmaps。下面是我的代码: splitHashMap={'A':1,'F':4, 'J':4, 'Z':4, 'G':10, 'I':11}; fileHashMap= {'A':'A

    0热度

    1回答

    假设我有一个存储在GCS桶中的shell脚本。是否可以使用Apache Beam执行它?如果是,那么如何? 我目前还没有尝试过任何东西,因为我在Apache Beam或Dataflow的文档中找不到任何此类内容。所以只想知道我必须采取什么方法。 谢谢。

    1热度

    1回答

    是否可以直接从云数据流代码调用“存储过程”。我已经将一些数据转储到BigQuery表中,并将这些数据应用于存储过程(在SQL系统中使用这些数据时使用)。我是否也可以在Dataflow代码中使用相同的存储过程或任何解决方法?

    0热度

    1回答

    我一般提出这个问题,因为它可能是一个通用的答案。但是一个具体的例子是将2个BigQuery表与相同的模式进行比较,但可能会有不同的数据。我想要一个差异,即相对于一个组合键,例如,添加,删除,修改的内容。前2列。 Table A C1 C2 C3 ----------- a a 1 a b 1 a c 1 Table B C1 C2 C3 # Notes if comparing

    2热度

    1回答

    我正在谷歌云平台上运行Dataflow-Jobs,而我得到的一个新错误是“工作流失败”,没有任何解释。 我得到的是日志如下: 2017-08-25 (00:06:01) Executing operation ReadNewXXXFromStorage/Read+JsonStringsToXXX+RemoveLanguagesFromXXX... 2017-08-25 (00:06:01) Ex

    0热度

    1回答

    问题背景 我试图生成每个从实时视频流密钥事件项目的总体(线性)订单,其中的顺序是事件时间(源自活动按关键字进行的处理全排序事件有效载荷)。 方法 我曾试图实现此使用流如下: 1)设置的非重叠序列的窗户,例如持续时间5分钟 2)建立一个允许迟到 - 这是很好丢弃晚期事件 3)设置的累加模式保留全部解雇窗格 4)使用“AfterwaterMark”触发 5)当处理触发窗格时,只考虑窗格是否是最后一个窗

    2热度

    1回答

    给定一个相对较小的数据源(3,000-10,000)的键/值对,我试图只处理符合组阈值(50-100)的记录。所以最简单的方法是将它们按键,过滤和展开进行分组 - 无论是使用FlatMap还是ParDo。迄今为止,最大的团体只有1,500条记录。但这似乎是Google Cloud Dataflow生产中的一个严重瓶颈。 随着给定的列表 (1,1) (1,2) (1,3) ... (2,1) (2,

    1热度

    1回答

    我们有一个非常简单的管道,它正在读取GCS,执行一个简单的ParDo,然后将结果写入BigQuery。它可以自动扩展到50台虚拟机,运行在GCP上,并且不会做任何事情。 从GCS(〜10B记录&〜700 + GB)读取所有数据并进行转换,所有数据都发生得相对较快(前7-10分钟)。 但是,当它到达BigQuery写入(使用BigQueryIO)时,它会放慢速度 - 即使它只需要写入大约1M个记录(

    0热度

    1回答

    如何在写入apache beam(2.1.0)中的文本文件之前检查pcollection是否为空? 我在这里要做的是将一个文件分解成指定数字的集合,这个集合通过ValueProvider作为参数传递给管道。由于此ValueProvider在管道施工时间不可用,因此我声明一个不错的26号(总字母数,这是用户可以输入的最大数量),使其可用于.withOuputTags()。所以我得到26个元组标签,在

    2热度

    1回答

    我需要阅读从GCS桶的文件。我知道我将不得不使用GCS API /客户端库,但我无法找到与此相关的任何实例。 我一直在参考GCS文档中的链接: GCS Client Libraries。但无法真正发挥作用。如果有人能提供一个真正有用的例子。 谢谢。