Tag: apache spark

如何修复java.lang.ClassCastException:无法将scala.collection.immutable.List的实例分配给字段类型scala.collection.Seq?

这个错误一直是最难追踪的。 我不知道发生了什么事。 我在我的位置机器上运行Spark集群。 所以整个火花集群在一个主机127.0.0.1 ,我在一个独立模式下运行 JavaPairRDD<byte[], Iterable> cassandraRowsRDD= javaFunctions(sc).cassandraTable(“test”, “hello” ) .select(“rowkey”, “col1”, “col2”, “col3”, ) .spanBy(new Function() { @Override public byte[] call(CassandraRow v1) { return v1.getBytes(“rowkey”).array(); } }, byte[].class); Iterable<Tuple2<byte[], Iterable>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE Tuple2<byte[], Iterable> tuple = listOftuples.iterator().next(); byte[] partitionKey = tuple._1(); for(CassandraRow cassandraRow: tuple._2()) { System.out.println(“************START************”); System.out.println(new String(partitionKey)); System.out.println(“************END************”); […]

如何使用纯Java生成Parquet文件(包括日期和小数类型)并上传到S3 (无HDFS)

我最近有一个要求,我需要生成Parquet文件,Apache Spark只能使用Java读取(不使用其他软件安装,如:Apache Drill,Hive,Spark等)。 这些文件需要保存到S3,因此我将分享有关如何执行这两项操作的详细信息。 关于如何做到这一点没有简单的指南。 我也不是Java程序员,因此使用Maven,Hadoop等的概念对我来说都是陌生的。 所以我花了将近两周的时间来完成这项工作。 我想在下面分享我的个人指南,了解我是如何实现这一目标的

使用Apache Spark将RDD写为文本文件

我正在探索Spark进行批处理。 我正在使用独立模式在本地计算机上运行spark。 我试图使用saveTextFile()方法将Spark RDD转换为单个文件[最终输出],但它不起作用。 例如,如果我有多个分区,我们可以将一个文件作为最终输出。 更新: 我尝试了以下方法,但我得到空指针exception。 person.coalesce(1).toJavaRDD().saveAsTextFile(“C://Java_All//output”); person.repartition(1).toJavaRDD().saveAsTextFile(“C://Java_All//output”); 例外是: 15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 15/06/23 18:25:27 INFO […]

Scala错误:无法在Scala IDE和Eclipse中找到或加载主类

这是我的问题,我知道有很多类似问题的答案,但是在我尝试之后它们都没有奏效。 我正在使用Scala IDE 4.6和eclipse Oxygen运行代码,所有这些错误都失败了。 这是我的scala编译器配置: 这是我的运行配置: 这是我在控制台中显示的代码,文件结构和错误: 以下是问题控制台的信息: 从在线答案,我已经尝试在构建之前清理项目,我也尝试了所有版本的JVM和Scala编译器,所有这些都没有帮助。 代码是直接从在线课程代码导入的,所以我相信代码中不应该有任何错误。

Avro Schema引发StructType

这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。 我正在使用Sparkdataframe,它可以从几个不同的模式版本之一加载数据: // Version One {“namespace”: “com.example.avro”, “type”: “record”, “name”: “MeObject”, “fields”: [ {“name”: “A”, “type”: [“null”, “int”], “default”: null} ] } // Version Two {“namespace”: “com.example.avro”, “type”: “record”, “name”: “MeObject”, “fields”: [ {“name”: “A”, “type”: [“null”, “int”], “default”: null}, {“name”: “B”, “type”: [“null”, “int”], “default”: null} ] } 我正在使用Spark Avro加载数据。 DataFrame df = context.read() […]

如何在YARN Spark作业中设置环境变量?

我试图通过使用带有newAPIHadoopRDD的AccumuloInputFormat从Apache Spark作业(用Java编写)访问Accumulo 1.6 。 为了做到这一点,我必须通过调用setZooKeeperInstance方法告诉AccumuloInputFormat在哪里找到ZooKeeper。 此方法采用ClientConfiguration对象,该对象指定各种相关属性。 我正在通过调用静态loadDefault方法创建我的ClientConfiguration对象。 此方法应该在client.conf文件的各个位置查看以从中加载其默认值。 应该看的其中一个地方是$ACCUMULO_CONF_DIR/client.conf 。 因此,我试图设置ACCUMULO_CONF_DIR环境变量,使其在Spark运行作业时可见(作为参考,我试图在yarn-cluster部署模式下运行)。 我还没有找到成功的方法。 到目前为止,我已经尝试过: 在setExecutorEnv(“ACCUMULO_CONF_DIR”, “/etc/accumulo/conf”)调用setExecutorEnv(“ACCUMULO_CONF_DIR”, “/etc/accumulo/conf”) 在spark-env.sh导出spark-env.sh 在spark-defaults.conf设置spark.executorEnv.ACCUMULO_CONF_DIR 他们都没有工作。 当我在调用setZooKeeperInstance之前打印环境时,不会出现ACCUMULO_CONF_DIR 。 如果它是相关的,我正在使用CDH5版本的所有东西。 这是我正在尝试做的一个例子(为简洁而省略了导入和exception处理): public class MySparkJob { public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(“MySparkJob”); sparkConf.setExecutorEnv(“ACcUMULO_CONF_DIR”, “/etc/accumulo/conf”); JavaSparkContext sc = new JavaSparkContext(sparkConf); Job accumuloJob = Job.getInstance(sc.hadoopConfiguration()); // Foreach loop to print […]

Apache Spark Kinesis示例不起作用

我正在尝试运行JavaKinesisWordCountASL示例。 该示例似乎连接到我的Kinesis Stream并从流中获取数据(如下面的日志所示)。 但是,Sparks不会调用示例中传递给unionStreams.flatMap方法的调用函数,也不会打印任何wordcount。 我尝试使用Java 8和Java 7运行。我在ubuntu实例上运行它。 同样的例子适用于我的macbook。 14/11/15 01:59:42 INFO scheduler.ReceiverTracker:Stream 1收到0个块14/11/15 01:59:42 INFO storage.MemoryStore:ensureFreeSpace(264)调用curMem = 3512,maxMem = 938244833 14 / 11/15 01:59:42 INFO storage.MemoryStore:阻止输入-0-1416016781800存储为内存中的值(估计大小264.0 B,免费894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerInfo:已添加ip-10-80-91-13.ec2.internal内存输入0-1416016781800:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerMaster:更新块输入信息-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler:已添加作业时间1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection:启动与[ip-的连接10-80-91-13.ec2.internal / 10.80.91.13:39149] 14/11/15 01:59:42 INFO network.SendingConnection:已连接到[ip-10-80-91-13.ec2.internal / 10.80.91.13:39149],1条消息待定14/11/15 01:59:42 INFO […]

不断增加YARN中Spark应用程序的物理内存

我在YARN中运行Spark应用程序,有两个执行程序,Xms / Xmx为32 GB,spark.yarn.excutor.memoryOverhead为6 GB。 我看到应用程序的物理内存不断增加,最终被节点管理器杀死: 2015-07-25 15:07:05,354 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=10508,containerID=container_1437828324746_0002_01_000003] is running beyond physical memory limits. Current usage: 38.0 GB of 38 GB physical memory used; 39.5 GB of 152 GB virtual memory used. Killing container. Dump of the process-tree for container_1437828324746_0002_01_000003 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) […]

如何从spark设置和获取静态变量?

我有一个class级: public class Test { private static String name; public static String getName() { return name; } public static void setName(String name) { Test.name = name; } public static void print() { System.out.println(name); } } 在我的Spark驱动程序中,我正在设置这样的名称并调用print()命令: public final class TestDriver{ public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(“TestApp”); // … […]

在封闭范围内定义的局部变量日志必须是最终的或有效的最终

我是lambda和Java8的新手。 我正面临以下错误。 在封闭范围内定义的局部变量日志必须是最终的或有效的最终 public JavaRDD modify(JavaRDD filteredRdd) { filteredRdd.map(log -> { placeHolder.forEach(text -> { //error comes here log = log.replace(text, “,”); }); return log; }); return null; }