使用 java 中的 Spark 并行操作 100 个 S3 路径
温馨提示:
本文最后更新于 2024年04月12日,已超过 37 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
I have a use case that requires, system reads contents from 100s of S3 paths, after reading content at path it excludes certain records and then rewrite it back to the same location. It is using spark 3.4 and java 17 and runs on AWS EMR in cluster mode.
final List<String> errorPaths = new ArrayList<> ();//Deadletter queue
final Dataset<Row> exclusionContent = sparkSession.read ().parquet ("s3://some/path/having/eclusion/content");
//hundredsOfPaths is list of all the paths that need to be read, exclude some data and rewrite
hundredsOfPaths.stream ().parallel ().forEach (pathInContext -> {
try {
final Dataset<Row> originalContent = sparkSession.read ().parquet (pathInContext).cache ();
final long countOriginal = originalContent.count ();
final Dataset<Row> finalContent = originalContent.except (exclusionContent).cache ();
final long finalCount = finalContent.count ();
finalContent.repartition (1)
.write ()
.format ("parquet")
.mode (SaveMode.Overwrite)
.save (pathInContext);
}
catch (Throwable e) {
//If there is any error in any of the paths interaction, then reprocess
errorPaths.add (pathInContext);
}
});
//Reprocess all paths accumulated in errorPaths
In above will using Java Stream's parallelization (paths.stream ().parallel ().forEach...
) anyway interfere with Spark's native parallelism, please advise.
正文到此结束
- 本文标签: 家庭宠物
- 本文链接: https://www.coder6.net/article/2343
- 版权声明: 本文由蚂蚁原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权