原创

使用 java 中的 Spark 并行操作 100 个 S3 路径

温馨提示:
本文最后更新于 2024年04月12日,已超过 37 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

I have a use case that requires, system reads contents from 100s of S3 paths, after reading content at path it excludes certain records and then rewrite it back to the same location. It is using spark 3.4 and java 17 and runs on AWS EMR in cluster mode.

            final List<String> errorPaths       = new ArrayList<> ();//Deadletter queue
            final Dataset<Row> exclusionContent = sparkSession.read ().parquet ("s3://some/path/having/eclusion/content");
            //hundredsOfPaths is list of all the paths that need to be read, exclude some data and rewrite
            hundredsOfPaths.stream ().parallel ().forEach (pathInContext -> {
                try {
                    final Dataset<Row> originalContent = sparkSession.read ().parquet (pathInContext).cache ();
                    final long         countOriginal   = originalContent.count ();
                    final Dataset<Row> finalContent    = originalContent.except (exclusionContent).cache ();
                    final long         finalCount      = finalContent.count ();
                    finalContent.repartition (1)
                                .write ()
                                .format ("parquet")
                                .mode (SaveMode.Overwrite)
                                .save (pathInContext);
                }
                catch (Throwable e) {
                    //If there is any error in any of the paths interaction, then reprocess
                    errorPaths.add (pathInContext);
                }
            });
            //Reprocess all paths accumulated in errorPaths

In above will using Java Stream's parallelization (paths.stream ().parallel ().forEach...) anyway interfere with Spark's native parallelism, please advise.

正文到此结束
热门推荐
本文目录