是否可以为 Java 8 parallel stream 指定自定义线程池?我在任何地方都找不到它。
想象一下,我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大而且是多线程的,所以我想把它分开。我不希望在另一个模块的应用程序块任务的一个模块中运行缓慢的任务。
如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数现实世界的情况下安全地使用并行流。
试试下面的例子。有一些 CPU 密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被破坏了,所以每一步需要 1 秒(通过线程睡眠模拟)。问题是其他线程卡住并等待中断的任务完成。这是一个人为的例子,但想象一个 servlet 应用程序和某人向共享分叉连接池提交一个长时间运行的任务。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
实际上有一个技巧如何在特定的 fork-join 池中执行并行操作。如果您将它作为一个任务在一个 fork-join 池中执行,它会停留在那里并且不使用公共池。
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
该技巧基于 ForkJoinTask.fork
,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者如果不是 inForkJoinPool()
,则使用 ForkJoinPool.commonPool()
”
并行流使用 Runtime.getRuntime().availableProcessors()
返回的默认 ForkJoinPool.commonPool
和 by default has one less threads as you have processors(这意味着并行流为调用线程保留一个处理器)。
对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构建 ForkJoinPool;默认情况下,等于可用处理器的数量。
这也意味着如果您有嵌套的并行流或多个并行流同时启动,它们将全部共享同一个池。优点:您永远不会使用超过默认值(可用处理器的数量)。缺点:您可能不会将“所有处理器”分配给您启动的每个并行流(如果您碰巧有多个)。 (显然您可以使用 ManagedBlocker 来规避它。)
要更改并行流的执行方式,您可以
将并行流执行提交到您自己的 ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();或者
您可以使用系统属性更改公共池的大小: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 用于 20 个线程的目标并行度。
后者在我的机器上有 8 个处理器的例子。如果我运行以下程序:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
输出是:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
因此您可以看到并行流一次处理 8 个项目,即它使用 8 个线程。但是,如果我取消注释注释行,则输出为:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
这一次,并行流使用了 20 个线程,并且流中的所有 20 个元素都已同时处理。
commonPool
实际上比 availableProcessors
少 1,导致总并行度等于 availableProcessors
,因为调用线程计为 1。
ForkJoinTask
。模仿parallel()
需要get()
:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
会使用给定的 ForkJoinPool
运行我的 Stream 操作。我希望整个 Stream-Action 在 ForJoinPool 中作为 ONE 操作执行,但在内部仍使用默认/常见的 ForkJoinPool。您在哪里看到 ForkJoinPool.submit() 会按照您所说的那样做?
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", …)
将不再工作,并且从 JDK 18 开始,它仍然可以按预期工作。
除了在您自己的 forkJoinPool 中触发并行计算的技巧之外,您还可以将该池传递给 CompletableFuture.supplyAsync 方法,如下所示:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
原来的解决方案(设置 ForkJoinPool 通用并行属性)不再有效。查看原始答案中的链接,破坏此问题的更新已被移植回 Java 8。如链接线程中所述,不能保证此解决方案永远有效。基于此,解决方案是在接受的答案中讨论的带有 .get 解决方案的 forkjoinpool.submit。我认为 backport 也解决了这个解决方案的不可靠性。
ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
.forEach((int theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
.forEach((theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
ForkJoinPool.commonPool().getParallelism()
时,我看不到并行度的变化。
unreported exception InterruptedException; must be caught or declared to be thrown
,即使循环中有所有 catch
异常。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); System.out.println(ForkJoinPool.commonPool().getParallelism());
时,它会在从 JDK 8 到 JDK 18 的所有版本上始终打印 10
。我不知道您为什么声称这个常见的并行属性不起作用;您添加到另一个答案的链接甚至没有远程说明这个属性,它的补丁根本没有触及这个功能。
我们可以使用以下属性更改默认并行度:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
可以设置使用更多的并行度。
要测量实际使用的线程数,您可以检查 Thread.activeCount()
:
Runnable r = () -> IntStream
.range(-42, +42)
.parallel()
.map(i -> Thread.activeCount())
.max()
.ifPresent(System.out::println);
ForkJoinPool.commonPool().submit(r).join();
new ForkJoinPool(42).submit(r).join();
这可以在 4 核 CPU 上产生如下输出:
5 // common pool
23 // custom pool
如果没有 .parallel()
,它会给出:
3 // common pool
4 // custom pool
到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我为此创建了一个名为 Parallel Stream Support 的小库:
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
.filter(PrimesPrint::isPrime)
.collect(toList())
但正如@PabloMatiasGomez 在评论中指出的那样,并行流的拆分机制存在缺陷,这在很大程度上取决于公共池的大小。请参阅Parallel stream from a HashSet doesn't run in parallel。
我使用此解决方案只是为了为不同类型的工作提供单独的池,但即使我不使用它,我也无法将公共池的大小设置为 1。
注意:似乎在 JDK 10 中实现了一个修复程序,以确保自定义线程池使用预期的线程数。
自定义 ForkJoinPool 中的并行流执行应遵守并行性 https://bugs.openjdk.java.net/browse/JDK-8190974
如果您不想依赖于实现技巧,总有一种方法可以通过实现结合 map
和 collect
语义的自定义收集器来实现相同的目标......而且您不会局限于 ForkJoinPool:
list.stream()
.collect(parallel(i -> process(i), executor, 4))
.join()
幸运的是,它已经在此处完成并在 Maven Central 上可用:http://github.com/pivovarit/parallel-collectors
免责声明:我写了它并对此负责。
我尝试了自定义 ForkJoinPool 如下调整池大小:
private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
return () -> aList.parallelStream()
.peek((i) -> {
String threadName = Thread.currentThread().getName();
ThreadNameSet.add(threadName);
})
.reduce(0L, Long::sum);
}
private static void testForkJoinPool() {
final int parallelism = 10;
ForkJoinPool forkJoinPool = null;
Long result = 0L;
try {
forkJoinPool = new ForkJoinPool(parallelism);
result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown(); //always remember to shutdown the pool
}
}
out.println(result);
out.println(ThreadNameSet);
}
这里的输出表明池使用的线程数比默认的 4 多。
50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]
但实际上有一个 奇怪,当我尝试使用 ThreadPoolExecutor
达到相同的结果时,如下所示:
BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
但我失败了。
它只会在新线程中启动 parallelStream,然后其他一切都一样,这再次证明 parallelStream
将使用 ForkJoinPool strong> 启动其子线程。
去获取AbacusUtil。可以为并行流指定线程号。这是示例代码:
LongStream.range(4, 1_000_000).parallel(threadNum)...
披露:我是 AbacusUtil 的开发者。
以下是我如何以编程方式设置上述最大线程数标志,并截取代码以验证该参数是否有效
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
.parallel()
.limit(100000)
.map(i -> Thread.currentThread().getName())
.collect(Collectors.toSet());
System.out.println(threadNames);
// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]
如果您不介意使用第三方库,则可以使用 cyclops-react 在同一管道中混合顺序流和并行流,并提供自定义 ForkJoinPools。例如
ReactiveSeq.range(1, 1_000_000)
.foldParallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
.max(Comparator.naturalOrder()));
或者,如果我们希望在顺序 Stream 中继续处理
ReactiveSeq.range(1, 1_000_000)
.parallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
.map(this::processSequentially)
.forEach(System.out::println);
[披露我是 cyclops-react 的主要开发者]
如果您不需要自定义 ThreadPool 但您想限制并发任务的数量,您可以使用:
List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method
partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
// do your processing
}));
(重复的问题要求这个被锁定,所以请在这里忍耐)
我制作了实用程序方法来与定义最大线程数的参数并行运行任务。
public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(maxThreads);
forkJoinPool.submit(task).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
它创建具有最大允许线程数的 ForkJoinPool
,并在任务完成(或失败)后将其关闭。
用法如下:
final int maxThreads = 4;
runParallel(maxThreads, () ->
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()));
不定期副业成功案例分享
ForkJoinPool
还是实现细节?到文档的链接会很好。ForkJoinPool
实例以避免线程泄漏时,它应该是shutdown()
。 (example)