如何在Scalding中输出数据

我正在尝试将管道输出到不同的目录中,以便每个目录的输出将基于某些ID进行分区。 所以在一个普通的地图中减少代码我会使用MultipleOutputs类,我会在reducer中做这样的事情。

protected void reduce(final SomeKey key, final Iterable values, final Context context) { ... for (SomeValue value: values) { String bucketId = computeBucketIdFrom(...); multipleOutputs.write(key, value, folderName + "/" + bucketId); ... 

所以我想在烫伤时可以这样做

 ... val somePipe = Csv(in, separator = "\t", fields = someSchema, skipHeader = true) .read for (i  (id.hashCode % numberOfBuckets) == i} .write(Csv(out + "/bucket" + i , writeHeader = true, separator = "\t")) } 

但我觉得你最终会多次重复使用相同的管道,这会影响整体性能。

还有其他选择吗?

谢谢

是的,当然有更好的方法使用TemplatedTsv 。

所以上面的代码可以写成如下,

 val somePipe = Tsv(in, fields = someSchema, skipHeader = true) .read .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true)) 

这会将所有来自’some_id的记录放入out / some_ids文件夹下的单独文件夹中。

但是,您也可以创建整数桶。 只需改变最后一行,

 .map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets } .write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket))) 

这将创建两个数字文件夹out / dd /。 你也可以在这里查看templatedTsv api 。

使用templatedTsv可能存在小问题,即reducers会生成大量小文件,这些文件可能会对使用结果的下一个作业造成不利影响。 因此,最好在写入磁盘之前对模板字段进行排序。 我在这里写了一篇关于它的博客。