Tag: pyspark

实现java UDF并从pyspark调用它

我需要创建一个在pyspark python中使用的UDF,它使用java对象进行内部计算。 如果它是一个简单的python我会做类似的事情: def f(x): return 7 fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType()) 并使用以下方式调用: df = sqlContext.range(0,5) df2 = df.withColumn(“a”,fudf(df.id)).show() 但是,我需要的函数的实现是在java而不是在python中。 我需要以某种方式包装它,所以我可以从python中以类似的方式调用它。 我的第一个尝试是实现java对象,然后将其包装在pyspark中的python中并将其转换为UDF。 因序列化错误而失败。 Java代码: package com.test1.test2; public class TestClass1 { Integer internalVal; public TestClass1(Integer val1) { internalVal = val1; } public Integer do_something(Integer val) { return internalVal; } } pyspark代码: from py4j.java_gateway import java_import from pyspark.sql.functions import […]

在PySpark中运行自定义Java类

我正在尝试在PySpark中运行自定义HDFS阅读器类。 这个类是用Java编写的,我需要从PySpark访问它,无论是从shell还是使用spark-submit。 在PySpark中,我从SparkContext( sc._gateway )中检索sc._gateway 。 说我有一节课: package org.foo.module public class Foo { public int fooMethod() { return 1; } } 我试图将它打包到jar中并使用–jar选项传递给pyspark,然后运行: from py4j.java_gateway import java_import jvm = sc._gateway.jvm java_import(jvm, “org.foo.module.*”) foo = jvm.org.foo.module.Foo() 但我得到错误: Py4JError: Trying to call a package. 有人可以帮忙吗? 谢谢。

PySpark:java.lang.OutofMemoryError:Java堆空间

我最近在我的服务器上使用PySpark与Ipython一起使用24个CPU和32GB RAM。 它只能在一台机器上运行。 在我的过程中,我想收集大量数据,如下面的代码所示: train_dataRDD = (train.map(lambda x:getTagsAndText(x)) .filter(lambda x:x[-1]!=[]) .flatMap(lambda (x,text,tags): [(tag,(x,text)) for tag in tags]) .groupByKey() .mapValues(list)) 当我做 training_data = train_dataRDD.collectAsMap() 它给了我outOfMemory错误。 Java heap Space 。 此外,我在此错误后无法对Spark执行任何操作,因为它失去了与Java的连接。 它给出了Py4JNetworkError: Cannot connect to the java server 。 看起来堆空间很小。 如何将其设置为更大的限制? 编辑 : 我在运行之前尝试过的事情: sc._conf.set(‘spark.executor.memory’,’32g’).set(‘spark.driver.memory’,’32g’).set(‘spark.driver.maxResultsSize’,’0′) 我按照此处的文档更改了spark选项(如果你执行ctrl-f并搜索spark.executor.extraJavaOptions): http ://spark.apache.org/docs/1.2.1/configuration.html 它说我可以通过设置spark.executor.memory选项来避免OOM。 我做了同样的事情,但似乎没有工作。