spark-streaming

    1热度

    1回答

    愿意将数据写回kafka的效率最高,我有兴趣使用Akka Stream将我的RDD分区写回到Kafka。 问题是,我需要一种方法来创建每个执行器的actor系统,而不是每个分区,这将是荒谬的。一个JVM上的一个节点上最多可能有8个actorSystems。然而,每个分区有一个Stream是很好的。 有没有人已经这样做? 我的理解是,一个演员系统不能被序列化,因此不能是 发送了具有每个执行者的广播变

    -1热度

    2回答

    我目前正在学习火花流。我试图在创建新文件后立即从目录中的文件中读取数据。实时“文件流”。我收到下面的错误。任何人都可以向我推荐解决方案 import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.s

    3热度

    2回答

    DSTREAM提供的union两种类型两种类型工会之间有什么不同: StreamingContext.union(Dstreams) Dstream.union(anotherDstream) 所以我想知道什么是不同的,尤其是在并行性能。

    0热度

    1回答

    我想与Kafka集成一起使用Spark流。我使用Spark 2.0.0版。 但是我得到一个无法解析的依赖错误(“未解析的依赖关系:org.apache.spark#spark-sql-kafka-0-10_2.11; 2.0.0:not found”)。 如何访问此包?或者我做错了什么/失踪? 我build.sbt文件: name := "Spark Streaming" version :=

    0热度

    1回答

    在Spark结构化Streaming中,我想从STRING创建一个StructType。 在下面的例子中,spark读取方法只接受Schema的“Struct Type”,我怎样才能从String创建一个StructType。我想将employeeSchema字符串转换为StructType。 public static void main(String[] args) throws Analys

    1热度

    4回答

    我正在使用Scala中的Spark来消费和处理卡夫卡消费者应用程序中的消息。有时,处理来自Kafka消息队列的消息比平时花费更多的时间。那时候我需要消费最新的信息,而忽略那些已经由制片人发行但尚未消费的早期信息。 这里是我的消费者代码: object KafkaSparkConsumer extends MessageProcessor { def main(args: scala.Array

    1热度

    1回答

    因此,我有一个Apache Spark流,每20分钟一天一小时地写入S3 parquet文件分区。 看来,每个批处理在写入之前,都会在此表(/根文件夹)名称的所有文件夹上执行“ls”和“head”。 由于我们有多个天X 24小时X个不同的表,这会导致总体上相对较高的S3成本。 请注意我们的模式正在动态改变。 所以我的问题是: 它是正确的,因为在写递归读取所有实木复合地板的头上? 为什么流不会缓存这

    0热度

    2回答

    我试图整合火花和卡夫卡消费来自卡夫卡的消息。我有生产者代码也发送关于“temp”主题的消息。另外,我正在使用Kafka的Console Producer来制作“temp”主题的消息。 我已经创建了下面的代码来使用来自同一个“temp”主题的消息,但它也不会收到单个消息。 计划: import java.util.Arrays; import java.util.Map; import java

    -1热度

    1回答

    我有一个Spark Streaming作业,它使用spark-submit脚本手动运行。我想安排它每天运行。 哪一个更好Azkaban,Airflow,Oozie,Crontab,Spark-jobserver,Quartz或Luigi。 任何建议表示赞赏!

    1热度

    1回答

    我的火花纱线群集被许多用户使用,并且火花历史记录服务器中有大量作业。通过Spark历史记录服务器查找我的工作需要很多时间。我无法找到任何选项来通过用户ID在火花wiki here上筛选作业。 我想知道,有什么方法可以选择特定用户提交的作业列表吗?或在特定的时间窗口?谢谢。