Tag: mapreduce

为什么LongWritable(key)没有在Mapper类中使用?

制图员: Mapper类是generics类型,有四个forms类型参数,用于指定map函数的输入键,输入值,输出键和输出值类型 public class MaxTemperatureMapper extends Mapper { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == ‘+’) { // parseInt doesn’t like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); […]

使用MapReduce中的globStatus过滤输入文件

我有很多输入文件,我想根据最后附加的日期处理选定的文件。 我现在很困惑我在哪里使用globStatus方法来过滤掉文件。 我有一个自定义的RecordReader类,我试图在其下一个方法中使用globStatus,但它没有成功。 public boolean next(Text key, Text value) throws IOException { Path filePath = fileSplit.getPath(); if (!processed) { key.set(filePath.getName()); byte[] contents = new byte[(int) fileSplit.getLength()]; value.clear(); FileSystem fs = filePath.getFileSystem(conf); fs.globStatus(new Path(“/*” + date)); FSDataInputStream in = null; try { in = fs.open(filePath); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); […]

如何在Hadoop中使用CompressionCodec

我正在做以下操作从reducer压缩o / p文件: OutputStream out = ipFs.create( new Path( opDir + “/” + fileName ) ); CompressionCodec codec = new GzipCodec(); OutputStream cs = codec.createOutputStream( out ); BufferedWriter cout = new BufferedWriter( new OutputStreamWriter( cs ) ); cout.write( … ) 但是在第3行得到了空指针exception: java.lang.NullPointerException at org.apache.hadoop.io.compress.zlib.ZlibFactory.isNativeZlibLoaded(ZlibFactory.java:63) at org.apache.hadoop.io.compress.GzipCodec.createOutputStream(GzipCodec.java:92) at myFile$myReduce.reduce(myFile.java:354) 我也跟着JIRA一样。 你能否建议我做错了什么?

运行hbase MR作业时,我的cdh5.2集群会出现FileNotFoundException

我的cdh5.2群集运行hbase MR作业时出现问题。 例如,我将hbase类路径添加到hadoop类路径中: vi /etc/hadoop/conf/hadoop-env.sh 添加行: export HADOOP_CLASSPATH=”/usr/lib/hbase/bin/hbase classpath:$HADOOP_CLASSPATH” 当我运行时: hadoop jar /usr/lib/hbase/hbase-server-0.98.6-cdh5.2.1.jar rowcounter “mytable” 我得到以下exception: 14/12/09 03:44:02 WARN security.UserGroupInformation: PriviledgedActionException as:root (auth:SIMPLE) cause:java.io.FileNotFoundException: File does not exist: hdfs://clusterName/usr/lib/hbase/lib/hbase-client-0.98.6-cdh5.2.1.jar Exception in thread “main” java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hbase.mapreduce.Driver.main(Driver.java:54) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at […]

如何在Hadoop 1.0.4中链接mapper / reducer?

我正在使用Hadoop 1.0.4的“新”API(包org.apache.hadoop.mapreduce中的类)。 当我想链映射器/缩减器时,我发现ChainMapper,ChainReducer是为“旧”API(包org.apache.hadoop.mapred中的类)编写的。 我该怎么办?

Protobuf RPC在Hadoop 2.2.0单节点服务器上不可用?

我正在尝试按照本教程在我安装的本地单节点集群上运行hadoop 2.2.0 mapreduce作业: http : //codesfusion.blogspot.co.at/2013/10/setup-hadoop-2x-220-on- ubuntu.html?m = 1的 虽然在服务器端引发了以下exception: org.apache.hadoop.ipc.RpcNoSuchProtocolException: Unknown protocol: org.apache.hadoop.yarn.api.ApplicationClientProtocolPB at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.getProtocolImpl(ProtobufRpcEngine.java:527) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:566) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) 有没有办法让我配置Protobuf RPC在服务器端可用? 我需要hadoop本机库吗? 或者我可以在客户端以某种方式切换到Writables / Avro RPC吗?

两个相等的组合键不能到达相同的减速器

我正在使用MapReduce框架在Java中创建Hadoop应用程序。 我只使用文本键和输入和输出值。 在减少到最终输出之前,我使用组合器进行额外的计算步骤。 但我有一个问题,键不会去同一个reducer。 我在组合器中创建并添加这样的键/值对: public static class Step4Combiner extends Reducer { private static Text key0 = new Text(); private static Text key1 = new Text(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { key0.set(“KeyOne”); key1.set(“KeyTwo”); context.write(key0, new Text(“some value”)); context.write(key1, new Text(“some other value”)); } } public static class Step4Reducer […]

Mapreduce作业运行,并且有一个例外

这是我的代码: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SecondarySort extends Configured implements Tool{ public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new SecondarySort(), args); } catch (Exception e) […]

在HBase MapReduce任务中加载本机共享库

最近我试图在JNI代码中实现我的算法(使用C ++)。我这样做并生成了一个共享库。 这是我的JNI课程。 public class VideoFeature{ // JNI Code Begin public native static float Match(byte[] testFileBytes, byte[] tempFileBytes); static { System.loadLibrary(“JVideoFeatureMatch”); } // JNI Code End } 在主要function,我写 // MapReduce Configuration conf = HBaseConfiguration.create(); // DistributedCache shared library DistributedCache.createSymlink(conf); // Both following ways seem work. // DistributedCache.addCacheFile(new URI(“/home/danayan/Desktop/libJVideoFeatureMatch.so#JVideoFeatureMatch”), conf); DistributedCache.addCacheFile(new URI(“hdfs://danayan-pc:9000/lib/libJVideoFeatureMatch.so#libJVideoFeatureMatch.so”), conf); 在map方法中,代码跟随工作。 public static […]

将对象副本传递给hadoop中所有映射器的最佳实践

您好我正在学习Map Reduce,我正在尝试使用hadoop 1.0.4构建一个小作业。 我有一个停用词列表和一个模式列表。 在我的文件映射之前,我想在一个有效的数据结构(如地图)中加载stoppwords。 我还想从我的模式列表中构建一个正则表达式模式。 由于这些是串行任务,我想在映射前执行​​它们,并将每个映射器的副本传递给它们可以读/写的对象。 我想在我的驱动程序类中只使用一个带有getter的静态变量,但是将java调用对象作为指针原则这不起作用。 我当然可以在传递之前克隆对象,但这似乎不是一个好习惯。 我读了一些关于分布式缓存的东西,但据我所知,它只用于文件而不是对象,而且我可以让每个映射器读取stopp word / pattern文件。 谢谢你的帮助!