2016-01-21 53 views
1

后不重放数据要具有测试星火流预写日志我创建了一个非常简单的自定义输入接收器,这将生成字符串和存储这些的一个简单的方法:星火流预写日志重启

class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) { 

    val batchID = System.currentTimeMillis() 

    def onStart() { 
    new Thread("InMemoryStringReceiver") { 
     override def run(): Unit = { 
     var i = 0 
     while(true) { 
      //http://spark.apache.org/docs/latest/streaming-custom-receivers.html 
      //To implement a reliable receiver, you have to use store(multiple-records) to store data. 
      store(ArrayBuffer(s"$batchID-$i")) 
      println(s"Stored => [$batchID-$i)]") 
      Thread.sleep(1000L) 
      i = i + 1 
     } 
     } 
    }.start() 
    } 

    def onStop() {} 
} 

然后我创建了一个简单的应用将使用自定义接收到流数据,并处理它:

object DStreamResilienceTest extends App { 

    val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest") 
    val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver()) 
    customReceiverStream.foreachRDD { (rdd: RDD[String]) => 
    println(s"processed => [${rdd.collect().toList}]") 
    Thread.sleep(2000L) 
    } 
    ssc.start() 
    ssc.awaitTermination() 

} 

正如可以看到的处理的每一接收到的RDD具有2秒睡眠而字符串存储每一秒。这会产生积压,新的字符串堆积起来,应该存储在WAL中。事实上,我可以看到检查点目录中的文件得到更新。运行应用程序,我得到的输出是这样的:

[info] Stored => [1453374654941-0)] 
[info] processed => [List(1453374654941-0)] 
[info] Stored => [1453374654941-1)] 
[info] Stored => [1453374654941-2)] 
[info] processed => [List(1453374654941-1)] 
[info] Stored => [1453374654941-3)] 
[info] Stored => [1453374654941-4)] 
[info] processed => [List(1453374654941-2)] 
[info] Stored => [1453374654941-5)] 
[info] Stored => [1453374654941-6)] 
[info] processed => [List(1453374654941-3)] 
[info] Stored => [1453374654941-7)] 
[info] Stored => [1453374654941-8)] 
[info] processed => [List(1453374654941-4)] 
[info] Stored => [1453374654941-9)] 
[info] Stored => [1453374654941-10)] 

正如你所期望的,存储超出踱步处理。所以我杀了应用程序并重新启动它。这一次,我在foreachRDD注释掉的睡眠,这样的处理可以清除任何积压:

[info] Stored => [1453374753946-0)] 
[info] processed => [List(1453374753946-0)] 
[info] Stored => [1453374753946-1)] 
[info] processed => [List(1453374753946-1)] 
[info] Stored => [1453374753946-2)] 
[info] processed => [List(1453374753946-2)] 
[info] Stored => [1453374753946-3)] 
[info] processed => [List(1453374753946-3)] 
[info] Stored => [1453374753946-4)] 
[info] processed => [List(1453374753946-4)] 

正如你可以看到新的事件被处理,但没有一个批次。旧的WAL日志被清除,我看到这样的日志消息,但旧数据没有得到处理。

INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0 

我在做什么错?我正在使用Spark 1.5.2。

回答

2

这是由史雄(瑞安)朱在Spark Users mailing list回答。

使用StreamingContext.getOrCreate他建议的作品。