2017-05-28 73 views
1

我想按产品价格对产品进行排序。下面是代码 -如何删除pyspark中产品价格为空值的记录

products = sc.textFile("/user/cloudera/sqoop_import/products") 

获取产品类别ID从第2列

productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) 

集团从字符串类型转换产品价格线按类别ID

productsGroupBy = productsMap.groupByKey() 

按产品价格排序浮动:

for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i) 

我无法为具有空值的产品价格输入几个值。那么,有没有办法删除这个特定字段的空值的记录。请找出错误日志如下─

17/05/28 00:48:25 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 2, in File "", line 2, in ValueError: could not convert string to float:

+0

你试过像'productsGroupBy = productsMap.filter(拉姆达行: yourProductPriceColumn!=“Null”)。groupByKey()'在进入* for循环之前*? (否则,此过滤器也可以与* productsGroupBy *一起使用)。由于您已经导入了字符串值,因此使用此过滤功能,您将拥有RDD,而无需productPrice为“Null”的行,因此您可以将字符串转换为float。 – titiro89

+0

非常感谢@ titiro89 !!我运行了下面的代码并得到了下面的结果:'for i in productsMap.filter(lambda line:line.split(“,”)[4] ==“”).collect():print(i)' 。 .. ** 685,31,TaylorMade SLDR铁杆 - (钢)4-PW,AW ,, 899.99,http://images.acmesports.sports/TaylorMade+SLDR+Irons+-+%28Steel%29+4-PW %2C + AW ** –

+0

此处,product_price已移至第5个索引,而不是第4个索引。所以,这样的记录应该被跳过! –

回答

1

卡我们可以包含空值的产品筛选,如下

Quoting - isNotNull()

//filter the products containing null 
filteredProducts = products.filter(products.price.isNotNull()) 
//create a tuple (CategoryId,record) 
filteredProductsMap = filteredProducts.map(lambda rec: (rec.split(",")[1], rec)) 
//group by categoryId 
productsGroupBy = filteredProductsMap.groupByKey() 
//Sort based on product price 
for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i) 
+0

@Divyojyoti辛哈 - 请你接受我会很感激。 – Raja