2015-02-09 72 views
5

我们能够成功地整合Drools的火花,当我们尝试应用从Drools的规则,我们能够为批处理文件HICH做的是目前在HDFS,但我们试图用滴料的流媒体文件这样我们就可以做出决定瞬间,但我们无法弄清楚如何做it.Below是我们正在努力实现的代码snipet。Drools的火花的流文件

SparkConf conf = new SparkConf().setAppName("sample"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat"); 
    List<String> store = new ArrayList<String>(); 
    store = javaRDD.collect(); 

第二种情况,当我们使用流上下文

SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); 
    JavaStreamingContext ssc = 
       new JavaStreamingContext(sparkconf, new Duration(1)); 

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 

在第一种情况下,我们能够适用于可变商店我们的规则,而在第二种情况下,我们无法对应用规则dstream线。如果有人有一些想法,我们如何能得到很大的帮助。提前致谢。

+0

'/ user/root'?呃 – fge 2015-02-09 10:38:12

+0

例如我已经给用户/根/,原来的将是用户/ vish /火花/ sample.dat – beginner 2015-02-09 10:40:52

+0

难道@奎师那gajula的回答工作? – gijswijs 2015-05-22 11:55:01

回答

1

下面是完成它的一种方式。

  1. 创建您的知识会议与业务规则第一。

    //Create knowledge and session here 
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(); 
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); 
    kbuilder.add(ResourceFactory.newFileResource("rulefile.drl"), 
         ResourceType.DRL); 
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages(); 
    kbase.addKnowledgePackages(pkgs); 
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession(); 
    
  2. 使用StreamingContext创建JavaDStream。

    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); 
    JavaStreamingContext ssc = 
          new JavaStreamingContext(sparkconf, new Duration(1)); 
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 
    
  3. 调用DStream的foreachRDD来创建事实并激发您的规则。

    lines.foreachRDD(new Function<JavaRDD<String>, Void>() { 
        @Override 
        public Void call(JavaRDD<String> rdd) throws Exception { 
        List<String> facts = rdd.collect(); 
        //Apply rules on facts here 
        ksession.execute(facts); 
        return null; 
        } 
    });