并行读取S3中的多个文件(Spark,Java)
我看到了一些关于此的讨论,但还不太明白正确的解决方案:我想将S3中的几百个文件加载到RDD中。 我现在就是这样做的:
ObjectListing objectListing = s3.listObjects(new ListObjectsRequest(). withBucketName(...). withPrefix(...)); List keys = new LinkedList(); objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated() JavaRDD events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
ReadFromS3Function
使用AmazonS3
客户端执行实际读取:
public Iterator call(String s) throws Exception { AmazonS3 s3Client = getAmazonS3Client(properties); S3Object object = s3Client.getObject(new GetObjectRequest(...)); InputStream is = object.getObjectContent(); List lines = new LinkedList(); String str; try { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); if (is != null) { while ((str = reader.readLine()) != null) { lines.add(str); } } else { ... } } finally { ... } return lines.iterator();
我从Scala中为同一个问题看到的答案中“有点”翻译了这个。 我认为也可以将整个路径列表传递给sc.textFile(...)
,但我不确定哪种是最佳实践方式。
潜在的问题是在s3中列出对象的速度非常慢,并且只要某些东西进行了树木行走(就像路径的通配符模式一样),它看起来像目录树的方式会导致性能下降。
post中的代码正在进行全子列表,提供更好的性能,它基本上是Hadoop 2.8和s3a listFiles(路径,递归)附带的HADOOP-13208 。
获得该列表后,您已经获得了对象路径的字符串,然后您可以映射到s3a / s3n路径以便将spark作为文本文件输入处理,然后您可以将其应用于
val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",") sc.textFile(files).map(...)
根据要求,这是使用的java代码。
String prefix = "s3a://" + properties.get("s3.source.bucket") + "/"; objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); // repeat while objectListing truncated JavaRDD events = sc.textFile(String.join(",", keys))
请注意,我将s3n切换到s3a,因为如果你的CP上有hadoop-aws
和amazon-sdk
JAR,那么s3a连接器就是你应该使用的连接器。 它更好,它是一个由人(我)维护和测试火花工作负载的那个。 请参阅Hadoop S3连接器的历史 。
您可以使用sc.textFile
读取多个文件。
您可以传递multiple file url
作为其参数。
您可以指定整个directories
,使用wildcards
甚至CSV目录和通配符。
例如:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
从这个ans的参考
我想如果你尝试并行化,而阅读aws将使用执行器,绝对提高性能
val bucketName=xxx val keyname=xxx val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList) .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }
- com.amazonaws.services.s3.model.AmazonS3Exception:Forbidden(Service:Amazon S3; Status Code:403; Error Code:403 Forbidden; Request ID:XXXXXXXX)
- 编写Big JSON文件以避免OutOfMemory问题的最佳方法
- AWS S3 – 列出没有前缀的文件夹中的所有对象
- 在Java SDK Amazon S3中配置路径样式
- 使用Java将包含文件的目录上载到S3
- 将Maven依赖项部署到S3:没有可用的连接器
- AWS S3 Java SDK – 拒绝访问
- AWS Java SDK – 无法通过区域提供程序链查找区域
- com.amazonaws.services.s3.model.AmazonS3Exception:拒绝访问