在spark-java程序中,我需要读取一个配置文件并填充一个HashMap,我需要将其作为广播变量发布,以便它可以跨所有datanode使用。Spark程序中的广播变量发布
我需要在将要在datanode中运行的CustomInputFormat类中获取此广播变量的值。如何在我的CustomInputFormat类中指定从特定的广播变量中获取值,因为广播变量是在我的驱动程序中声明的?
我加入一些代码来解释它在更多:
在这种scenario1我使用它在驱动程序本身,即该变量在相同的类中使用:在这里,我可以使用Broadcat.value()方法
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer>(){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
在方案2中,我将用我的自定义输入格式类的内部广播变量:
驱动程序:
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
个
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
我需要这个在驱动程序以外的其他Java类中广播值。 – Harisyam
您是否设法在此期间解决这个问题? – Havnar