2016-03-03 106 views
2

值操作我有这样的RDD:Pyspark:基于类型

[('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))] 

使按键可以有一个tupletuples一个tuple的值。这来自reduceByKey。 我需要执行一个简单的操作:将S的计数除以(H + S)的计数。 当S不存在时,就像在第一个项目的情况下,我将不得不返回0. 问题是从第二个(tuple的两个tuples)中分离第一个案例(单个tuple),以便我知道如何在map中运作。

我将如何进行?

+0

你是如何得到这样的数据呢?它是一种具有特定意义的层次结构吗?如果没有,我会更有意义的执行上游一致的整形器。 Python自3.4以来提供了基本的调度机制,但这些都很浅。 – zero323

回答

1

一般来说它会更有意义,以解决这一问题的上游,但你可以尝试例如是这样的:

from operator import truediv 

def f(vs): 
    try: 
     d = dict(vs) 
    except ValueError: 
     d = dict([vs]) 

    s = sum(d.values()) 
    return truediv(d.get("S", 0), s) if s else float('nan') 

rdd = sc.parallelize([('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))]) 
rdd.mapValues(f).collect() 

## [('a', 0.0), ('b', 0.023809523809523808)] 

另外,如果你不介意的外部依赖性,你可以尝试使用multipledispatch

from multipledispatch import dispatch 

@dispatch(tuple, tuple) 
def f(h, s): 
    try: 
     return truediv(s[1], h[1] + s[1]) 
    except ZeroDivisionError: 
     return float('nan') 

@dispatch(str, int) 
def f(x, y): 
    return 0.0 

rdd.mapValues(lambda args: f(*args)).collect() 
## [('a', 0.0), ('b', 0.023809523809523808)] 
+0

两者之间是否存在性能差异? –

+1

我喜欢调度方法,它确实应该隐藏“isinstance”的丑陋之处。但它需要相对复杂的代码。如果走上一条快乐的道路(不要输入'除了'阻止第一个应该更快)。异常处理相当昂贵,但我希望摊销成本会更高。一般来说,我会真正解决这个问题,并提供可以直接使用的输入。 – zero323