2017-07-24 42 views
1

我只想找出两个不同JSON文件中的女员工,只选择我们感兴趣的字段并将输出写入另一个JSON。在Google云端平台中使用数据流连接两个json

另外我正试图在Google的云平台上使用Dataflow来实现它。有人可以提供任何可以实现的Java代码来获得结果。

员工JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"} 
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"} 

部JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"} 
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"} 
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"} 

预期的输出JSON文件应该像

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"} 

回答

4

您可以使用CoGroupByKey(其中将使用shuffle)或使用副输入(如果您的部门集合显着更小)执行此操作。

我会给你Python中的代码,但是你可以在Java中使用相同的管道。


同方的投入,你会:

  1. 将您的部门PCollection成映射 的dept_id该部门JSON字典词典。

  2. 然后,您将012ol员工PCollection作为主要输入,您可以使用dept_id 获取部门PCollection中每个部门的JSON。

像这样:

departments = (p | LoadDepts() 
       | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept))) 

deps_si = beam.pvalue.AsDict(departments) 

employees = (p | LoadEmps()) 

def join_emp_dept(employee, dept_dict): 
    return employee.update(dept_dict[employee['dept_id']]) 

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si) 

随着CoGroupByKey,你可以使用的dept_id为一键组中的两个集合。这将导致关键值对的PCollection,其中关键是dept_id,该值是部门的两个迭代器以及该部门中的员工。

​​
+1

请注意,使用侧面输入可能是更好的选择,因为巴勃罗提到部门集合可能比员工集合小。 –

+0

巴勃罗 - 感谢您的回复。你能否提供一个你提到的步骤的班轮解释。由于我在这方面是新手,所以一个小的解释会有所帮助。 –

+0

我已经添加了一个解释。另外,我移动了侧面输入解决方案,以便您首先考虑这一点。 – Pablo

相关问题