java.lang.IllegalArgumentException:错误的FS:,期望:hdfs:// localhost:9000

我正在尝试实现reduce side join,并使用mapfile reader查找分布式缓存但是在stderr中检查时它没有查找值,它显示以下错误,lookupfile文件已经存在于hdfs中,并且似乎正确加载进入缓存,如stdout中所示。

java.lang.IllegalArgumentException:Wrong FS:file:/ app / hadoop / tmp / mapred / local / taskTracker / distcache / -8118663285704962921_-1196516983_170706299 / localhost / input / delivery_status / DeliveryStatusCodes / data,expected:hdfs:// localhost:9000 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390)org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus (DistributedFileSystem.java:554)org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816)org.apache.hadoop.io.SequenceFile $ Reader。(SequenceFile.java:1479)org.apache .hadoop.io.SequenceFile $ Reader。(SequenceFile.java:1474)org.apache.hadoop.io.MapFile $ Reader.createDataFileReader(MapFile.java:302)at org.apache.hadoop.io.MapFile $ Reader。打开(MapFile.java:284)org.apache.hadoop.io.MapFile $ Reader。(MapFile.java:273)org.apache.hadoop.io.MapFile $ Reader。(MapFile.java:260)at org .apache.hadoop.io.MapFile $读卡器(MapFile.java: 253)在mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59)mr_poc.reducerrsj.setup(reducerrsj.java:42)atg.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174)at org。位于org.apache.hadoop.mapred.Child $ 4.run的org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)中的apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) .java:255)位于org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java)的javax.security.auth.Subject.doAs(Subject.java:416)的java.security.AccessController.doPrivileged(Native Method) :1190)atr.apache.hadoop.mapred.Child.main(Child.java:249)mr_poc.reducerrsj.reduce(reducerrsj.java)mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83)中的java.lang.NullPointerException :127)位于org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask。)的org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)中的mr_poc.reducerrsj.reduce(reducerrsj.java:1)。 java:649)at org.apache.hadoop.mapred.ReduceTask.run(Reduc) eTask.java:418)atg.apache.hadoop.mapred.Child $ 4.run(Child.java:255)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject .java:416)在org.apache.hadoop.security。

这是我的驱动程序代码,

package mr_poc; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class driverrsj extends Configured implements Tool{ @Override public int run(String[] arg) throws Exception { if(arg.length != 3) { System.out.printf("3 parameters are required for DriverRSJ-    \n"); return -1; } Job job = new Job(getConf()); Configuration conf = job.getConfiguration(); DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf); System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files")); job.setJarByClass(driverrsj.class); conf.setInt("cust_info", 1); conf.setInt("status", 2); StringBuilder inputPaths = new StringBuilder(); inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString()); FileInputFormat.setInputPaths(job, inputPaths.toString()); FileOutputFormat.setOutputPath(job, new Path(arg[2])); job.setJarByClass(driverrsj.class); job.setMapperClass(mappperRSJ.class); job.setReducerClass(reducerrsj.class); job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class); job.setMapOutputValueClass(Text.class); //job.setPartitionerClass(partinonrsj.class); job.setSortComparatorClass(secondarysortcomp.class); job.setGroupingComparatorClass(GroupingComparatorRSJ.class); job.setNumReduceTasks(1); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); boolean success =job.waitForCompletion(true); return success? 0 : 1; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args); System.exit(exitCode); } } 

这是我的reducer代码

package mr_poc;

 import java.io.File; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reducerrsj extends Reducer{ StringBuilder reduceValueBuilder = new StringBuilder(""); NullWritable nullWritableKey = NullWritable.get(); Text reduceOutputValue = new Text(""); String strSeparator = ","; private MapFile.Reader deptMapReader = null; Text txtMapFileLookupKey = new Text(); Text txtMapFileLookupValue = new Text(); //Path[] cacheFilesLocal; //Path[] eachPath; @Override protected void setup(Context context) throws IOException,InterruptedException { Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); for ( Path eachPath : cacheFiles){ System.out.println(eachPath.toString()); System.out.println(eachPath.getName()); if(eachPath.getName().toString().contains("delivery_status")) { URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI(); initializeDepartmentsMap(uriUncompressedFile, context); } } } //@SuppressWarnings("deprecation") private void initializeDepartmentsMap(URI uriUncompressedFile, Context context) throws IOException { // {{ // Initialize the reader of the map file (side data) Configuration conf = context.getConfiguration(); conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml")); FileSystem dfs = FileSystem.get(conf); try { deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration()); } catch (Exception e) { e.printStackTrace(); } // }} } private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key, StringBuilder reduceValueBuilder, Text value) { if (key.getsourceindex() == 2) { String arrSalAttributes[] = value.toString().split(","); txtMapFileLookupKey.set(arrSalAttributes[0].toString()); System.out.println("key=" + txtMapFileLookupKey); try { deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue); } catch (Exception e) { txtMapFileLookupValue.set(""); e.printStackTrace(); } finally { txtMapFileLookupValue .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue .equals("")) ? "NOT-FOUND" : txtMapFileLookupValue.toString()); } reduceValueBuilder.append(txtMapFileLookupValue.toString()); } else if(key.getsourceindex() == 1) { String arrEmpAttributes[] = value.toString().split(","); reduceValueBuilder.append(arrEmpAttributes[0].toString()).append( strSeparator); } txtMapFileLookupKey.set(""); txtMapFileLookupValue.set(""); return reduceValueBuilder; } @Override public void reduce(CompositeKeyWritableRSJ key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { buildOutputValue(key, reduceValueBuilder, value); } // Drop last comma, set value, and emit output if (reduceValueBuilder.length() > 1) { //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1); // Emit output reduceOutputValue.set(reduceValueBuilder.toString()); context.write(nullWritableKey, reduceOutputValue); } else { System.out.println("Key=" + key.getjoinkey() + "src=" + key.getsourceindex()); } // Reset variables reduceValueBuilder.setLength(0); reduceOutputValue.set(""); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { if(deptMapReader != null) { deptMapReader.close(); } } } 

这是我的核心站点Xml

      hadoop.tmp.dir /app/hadoop/tmp A base for other temporary directories.   fs.default.name hdfs://localhost:9000 The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem.   

任何帮助将受到高度赞赏。 提前致谢!!!

即使我有同样的问题,我通过在驱动程序类中添加FileSystem fs = FileSystem.get(新URI(“hdfs:// localhost:9000”),conf)解决了这个问题

URI必须从java.net.URI导入

我想我也遇到过类似的问题。 这个问题的关键点是你要从DistributedCache操作一个SequenceFile,它应该在你的本地文件系统上。 从您的日志中,有一条线

 "org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)" 

如果您可以查看SequenceFile.Reader的源代码,您会发现该日志是由此代码引起的

 fs.getFileStatus(filename).getLen() 

这里的“fs”应该是LocalFileSystem而不是DistributedFileSystem。

我的解决方案是改变

 deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration()); 

 JobConf conf = context.getConfiguration(); String originalFS = conf.get("fs.default.name"); //backup original configuration conf.set("fs.default.name", "file:///"); //change configuration to local file system deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), conf); conf.set("fs.default.name", originalFS); //restore original configuration 

执行此操作后,SequenceFile.Reader对象可以访问本地文件系统上的缓存文件。

我认为这个问题的发生是因为SequenceFile API发生了变化,并且在这种情况下,不推荐使用像MapFile.Reader(fs,path,conf)这样的SequenceFile.Reader的一些API。

这个解决方案适合我。

您应该根据您的core-site.xml文件设置conf的属性,如下所示:

 conf.set("fs.defaultFS", "hdfs://host:port"); conf.set("mapreduce.jobtracker.address", "host:port"); 

在job runner中包含以下行:DistributedCache.addCacheFile(new URI(“”),conf);

下面是mapper的setup方法中的代码

 @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration configuration = new Configuration(); FileSystem fileSystem = null; try { fileSystem = FileSystem.get(new URI("; FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location)); Scanner scanner = new Scanner(fsDataInputStream); int i = 1; while(scanner.hasNextLine()) { String str[] = scanner.nextLine().split(","); LOG.info("keys are \t" + str[0] + str[1]); stickerMap.put(str[0] + str[1], i); ++i; } } 
Interesting Posts