回答
有测试星火RDD /应用2种方法。它们分别是:
例如:
股,以测试:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
class WordCount {
def get(url: String, sc: SparkContext): RDD[(String, Int)] = {
val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
}
}
现在方法1来测试其计算方法如下:
import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
class WordCountTest extends FunSuite with BeforeAndAfterAll {
private var sparkConf: SparkConf = _
private var sc: SparkContext = _
override def beforeAll() {
sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local")
sc = new SparkContext(sparkConf)
}
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
override def afterAll() {
sc.stop()
}
}
在方法1中,我们不是在嘲笑RDD。我们只是检查我们的WordCount
类的行为。但是我们必须自己管理SparkContext的创建和销毁。所以,如果你不想写额外的代码,那么你可以使用spark-testing-base,像这样:
方法2:
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
class WordCountTest extends FunSuite with SharedSparkContext {
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
}
或者
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
import com.holdenkarau.spark.testing.RDDComparisons
class WordCountTest extends FunSuite with SharedSparkContext {
private val wordCount = new WordCount
test("get word count rdd with comparison") {
val expected = sc.textFile("file.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
val result = wordCount.get("file.txt", sc)
assert(RDDComparisons.compare(expected, result).isEmpty)
}
}
对于在星火RDD测试更多详情,请参阅本 - KnolX: Unit Testing of Spark Applications
我只是一个小程序,所以我试图使用方法:1,但是你(himanshu)在方法1中显示的是没有比较RDD。您正在对该RDD执行操作,然后尝试将其与整数值相等。我想要比较2个RDD ... 可以说RDD [myClass] === RDD [myClass] – AJm
为了比较RDD(s),您应该使用方法2中提到的RDDComparisons。 – himanshuIIITian
但是,使用由Some开发的定制库,它仍在开发中,而不是像Apache这样的大伞。它也可能不是生产准备好的。 – AJm
谢谢你把这个突出的问题放在那里。出于某种原因,当谈到Spark时,每个人都会深深陷入分析中,忘记过去15年左右出现的伟大软件工程实践。这就是为什么我们要在我们的课程中讨论测试和持续集成(其中包括DevOps等)。
的快速除术语
我去之前,我必须表达对KnolX呈现@himanshuIIITian举了一个小的分歧。一个真正单位测试意味着你有过在测试每个组件的完全控制。不能与数据库,REST调用,文件系统甚至系统时钟交互; Gerard Mezaros在xUnit Test Patterns中提到的所有内容都必须“加倍”(例如嘲笑,残片等)。我知道这看起来像语义,但它真的很重要。未能理解这是你在持续集成中看到间歇性测试失败的一个主要原因。
我们仍然可以单元测试
所以给出这样的认识,单元测试的RDD
是不可能的。但是,开发分析时仍然有单元测试的地方。
(注:我将使用斯卡拉的例子,但其概念超越了语言和框架。)
考虑一个简单的操作:
rdd.map(foo).map(bar)
这里foo
和bar
是简单的功能。这些可以通过正常的方式进行单元测试,并且应该尽可能多地使用角落案例。毕竟,他们为什么关心他们从哪里获得他们的投入,无论是测试夹具还是RDD
?
不要忘记星火壳牌
这不是测试本身,但在这些早期阶段,你也应该在Spark外壳可以尝试找出你的变换,尤其是你的方法的后果。例如,您可以检查物理和逻辑查询计划,分区策略和保留以及具有许多不同功能的数据状态,如toDebugString
,explain
,glom
,show
, printSchema
等。我会让你探索那些。
您还可以在Spark shell和您的测试中将您的主设置为local[2]
,以识别一旦您开始分发工作时可能出现的任何问题。
集成测试与星火
现在到了有趣的东西。
为了集成测试星火你觉得在你的助手职能的质量和RDD
/DataFrame
转换逻辑信心后,关键是做了几件事情(无论构建工具和测试框架):
- 增加JVM内存。
- 启用分叉但禁用并行执行。
- 使用您的测试框架将Spark集成测试累积到套件中,并在所有测试之前初始化
SparkContext
,并在所有测试之后停止测试。
有几种方法可以做到这一点。其中一个可从spark-testing-base获得@Pushkr和由@himanshuIIITian链接的KnolX演示文稿。
贷款模式
另一种方法是使用Loan Pattern。
例如(使用ScalaTest):
class MySpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (sparkContext) =>
val data = Seq(...)
val rdd = sparkContext.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkContext) => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkContext = new SparkContext(conf)
try {
testMethod(sparkContext)
}
finally sparkContext.stop()
}
}
正如你所看到的,在贷款模式利用了高阶函数“贷款”的SparkContext
到测试再到销毁,它的后完成。
面向受苦编程(谢谢,内森)
这是完全喜好的问题,但我更愿意把之前只要我可以使用的贷款模式和电线的东西了自己另一个框架。除了试图保持轻量级之外,框架有时会增加很多“魔术”,使得调试测试失败很难推理。所以我采取了Suffering-Oriented Programming的方法 - 我避免增加一个新的框架,直到没有它的痛苦承受太多。但是,这又取决于你。
现在一个地方火花试验基地真正的亮点是与基于Hadoop的助手一样HDFSClusterLike
和YARNClusterLike
。混合这些特征可以真正为您节省很多设置痛苦。它发光的另一个地方是类似Scalacheck的属性和发电机。但是,再次,我个人会坚持使用它,直到我的分析和测试达到了这种复杂程度。
集成测试与星火流
最后,我只想提出一个什么样的SparkStreaming集成测试设置的内存值看起来像一个片段:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
这比看起来更简单。它实际上只是将一系列数据转换为队列以供给DStream
。其中大部分实际上只是与Spark API配合使用的样板设置。
这可能是我有史以来最长的帖子,所以我会把它留在这里。我希望别人与其他想法一起合作,通过相同的敏捷软件工程实践帮助提高分析质量,这些实践改进了所有其他应用程序开发。
并为无耻插头道歉,你可以看看我们的课程Analytics with Apache Spark,我们解决了很多这些想法和更多。我们希望很快就有在线版本。
- 1. 在星火RDD
- 2. 星火RDD容错
- 3. 星火RDD不Elasticsearch
- 4. 如何转的RDD在星火
- 5. 星火RDD:设置差异
- 6. 星火RDD写入HBase的
- 7. 星火RDD外部存储
- 8. 星火:保存RDD在HDFS
- 9. 星火多维RDD分区
- 10. 星火单元测试
- 11. 星火:RDD(按键,列表)来RDD(键,值)的扩展
- 12. 星火单元测试编译错误使用星火试验基地
- 13. 星火发动机条件测试
- 14. 如何星火
- 15. 如何星火预测值ML
- 16. 筛选火花RDD
- 17. 火花:RDD列出
- 18. 问题在星火壳尝试例如
- 19. 星火1.5.1,斯卡拉2.10.5:如何扩大的RDD [数组[字符串],矢量]
- 20. 如何使用星火UDF
- 21. 星火HashingTF如何工作
- 22. 星火:如何加快rdd.count()
- 23. 如何在Apache的星火
- 24. 如何处理在星火
- 25. 阿帕奇星火:参考指针父RDD
- 26. 为什么不星火RDD支持 “扁平化”
- 27. 删除的RDD的第一个和最后一行星火
- 28. 星火:按多个值的RDD在一个元组/列
- 29. 星火 - 包裹任意RDD与使用ShuffledRDD
- 30. 排序分区中,但没有使用星火RDD
你看过Holden的[spark-test-base](https://github.com/holdenk/spark-testing-base)了吗? – Pushkr