-2

我在使用Apache Spark处理一组规则时遇到的一个用例需要帮助。需要使用Apache Spark根据一组规则过滤记录时需要帮助

由于实际数据有太多的领域,例如,你可以把数据如下图所示(为简单起见给出JSON格式的数据),

records : [{ 
      "recordId": 1, 
      "messages": [{"name": "Tom","city": "Mumbai"}, 
         {"name": "Jhon","address": "Chicago"}, .....] 
      },....] 

rules : [{ 
      ruleId: 1, 
      ruleName: "rule1", 
      criterias: { 
          name: "xyz", 
          address: "Chicago, Boston" 
         } 
      }, ....] 

我要匹配所有规则的所有记录。这里是伪代码:

var matchedRecords = [] 
for(record <- records) 
    for(rule <- rules) 
     for(message <- record.message) 
      if(!isMatch(message, rule.criterias)) 
       break; 
     if(allMessagesMatched) // If loop completed without break 
      matchedRecords.put((record.id, ruleId)) 



def isMatch(message, criteria) = 
      for(each field in crieteria) 
       if(field.value contains comma) 
        if(! message.field containsAny field.value) 
         return false 
       else if(!message.field equals field.value) // value doesnt contain comma 
        return false 
      return true // if loop completed that means all criterias are matched 

有成千上万的记录包含成千上万的消息,并且有这样的规则hundreads。

解决此类问题的方法有哪些?任何特定的模块都会有帮助,如(SparkSQL,Spark Mlib,Spark GraphX)?我需要使用任何第三方库吗?

方法1:

  • 有无列表[规则] & RDD [记录]

  • 广播列表[规则]因为他们人数少。

  • 将每条记录与所有规则进行匹配。

仍然在这种情况下没有parallize计算发生匹配每条消息与标准。

回答

0

我认为你的建议方法是很好的方向。如果非要解决这个任务,我会从实现通用特质与负责匹配方法入手:

trait FilterRule extends Serializable { 
    def match(record: Record): Boolean 
} 

然后我会实现特定的过滤器如:

class EqualsRule extends FilterRule 
class RegexRule extends FilterRule 

然后,我将实现复合过滤器如:

class AndRule extends FilterRule 
class OrRule extends FilterRule 
... 

然后您可以筛选RDD或数据集有:

// constructing rule - in reality reading json from configuration, parsing json and creating FilterRule object 
val rule = AndRule(EqualsRule(...), EqualsRule(...), ...) 

// applying rule 
rdd.filter(record => rule.match(r)) 

第二种方法是尝试使用现有的Spark SQL函数和DataFrame进行过滤,您可以在其中使用和或多列构建相当复杂的表达式。这种方法的缺点是它不是安全的,单元测试会更复杂。