使用 Java 8 和 lambdas,很容易将集合作为流进行迭代,并且使用并行流同样容易。 the docs 中的两个示例,第二个使用 parallelStream:
myShapesCollection.stream()
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
myShapesCollection.parallelStream() // <-- This one uses parallel
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
只要我不关心顺序,使用并行是否总是有益的?人们会认为将工作分配到更多内核上会更快。
还有其他考虑吗?什么时候应该使用并行流,什么时候应该使用非并行?
(问这个问题是为了引发关于如何以及何时使用并行流的讨论,而不是因为我认为总是使用它们是一个好主意。)
与顺序流相比,并行流的开销要高得多。协调线程需要大量时间。我会默认使用顺序流,并且只考虑并行流,如果
我有大量的项目要处理(或者每个项目的处理需要时间并且是可并行的)
我首先遇到了性能问题
我还没有在多线程环境中运行该进程(例如:在 Web 容器中,如果我已经有许多要并行处理的请求,那么在每个请求中添加额外的并行层可能会产生更多的负面影响)
在您的示例中,性能无论如何将由对 System.out.println()
的同步访问驱动,并且使此过程并行将没有效果,甚至是负面的。
此外,请记住并行流并不能神奇地解决所有同步问题。如果进程中使用的谓词和函数使用共享资源,则必须确保一切都是线程安全的。特别是,如果并行运行,副作用是您真正需要担心的事情。
无论如何,测量,不要猜测!只有测量会告诉您并行性是否值得。
Stream API 的设计目的是使编写计算变得容易,这种方式与它们的执行方式无关,从而使顺序和并行之间的切换变得容易。
然而,仅仅因为它很容易,并不意味着它总是一个好主意,事实上,仅仅因为你可以将 .parallel()
放在各处都是一个坏的主意。
首先,请注意并行性除了可以在更多内核可用时更快地执行之外没有提供任何好处。并行执行总是比顺序执行涉及更多的工作,因为除了解决问题之外,它还必须执行子任务的调度和协调。希望通过在多个处理器上拆分工作,您能够更快地得到答案;这是否真的发生取决于很多事情,包括数据集的大小、对每个元素进行多少计算、计算的性质(具体来说,一个元素的处理是否与其他元素的处理相互作用?) ,可用处理器的数量,以及竞争这些处理器的其他任务的数量。
此外,请注意并行性也经常暴露计算中的不确定性,而这种不确定性通常被顺序实现所隐藏。有时这并不重要,或者可以通过限制所涉及的操作来缓解(即归约运算符必须是无状态的和关联的。)
实际上,有时并行性会加快计算速度,有时不会,有时甚至会减慢计算速度。最好先使用顺序执行进行开发,然后在哪里应用并行性
(A) 你知道提高性能实际上是有好处的
(B) 它实际上会提供更高的性能。
(A) 是业务问题,而不是技术问题。如果您是性能专家,通常可以查看代码并确定 (B),但明智的做法是衡量。 (而且,在你确信 (A) 之前,甚至不要打扰;如果代码足够快,最好将你的大脑周期应用到其他地方。)
最简单的并行性能模型是“NQ”模型,其中 N
是元素的数量,Q
是每个元素的计算量。通常,您需要产品 NQ 超过某个阈值才能开始获得性能优势。对于像“将 1
到 N
的数字相加”这样的低 Q 问题,您通常会看到 N=1000
和 N=10000
之间的盈亏平衡。对于 Q 值较高的问题,您会在较低的阈值处看到盈亏平衡点。
但实际情况相当复杂。因此,在您达到专业水平之前,首先确定顺序处理何时实际上会花费您一些东西,然后衡量并行性是否会有所帮助。
findAny
而不是 {2 }…
myListOfURLs.stream().map((url) -> downloadPage(url))...
)。
ForkJoinPool.commonPool()
,您不希望阻止任务去那里。
我观看了 Brian Goetz (Java 语言架构师和 Lambda 表达式规范负责人)的presentations之一。他详细解释了在进行并行化之前要考虑的以下 4 点:
拆分/分解成本——有时拆分比做工作更昂贵!任务调度/管理成本——可以在将工作交给另一个线程的时间内完成大量工作。结果组合成本——有时组合涉及复制大量数据。例如,添加数字很便宜,而合并集合很昂贵。地点——房间里的大象。这是每个人都可能错过的重要一点。您应该考虑缓存未命中,如果 CPU 由于缓存未命中而等待数据,那么您将不会通过并行化获得任何收益。这就是为什么基于数组的源在缓存下一个索引(靠近当前索引)时并行化最好,并且 CPU 遇到缓存未命中的机会更少。
他还提到了一个相对简单的公式来确定并行加速的机会。
NQ型号:
N x Q > 10000
其中,N = 数据项的数量 Q = 每项的工作量
其他答案已经涵盖了分析以避免并行处理中的过早优化和开销成本。这个答案解释了并行流数据结构的理想选择。
通常,并行性的性能提升在 ArrayList 、 HashMap 、 HashSet 和 ConcurrentHashMap 实例上的流上是最好的;数组;整数范围;和远距离。这些数据结构的共同点是它们都可以准确且廉价地拆分为任何所需大小的子范围,这使得在并行线程之间划分工作变得容易。流库用于执行此任务的抽象是 spliterator ,它由 Stream 和 Iterable 上的 spliterator 方法返回。所有这些数据结构共有的另一个重要因素是,它们在顺序处理时提供了从良好到优秀的引用局部性:顺序元素引用一起存储在内存中。这些引用所引用的对象在内存中可能不会彼此靠近,这会减少引用的局部性。事实证明,引用局部性对于并行化批量操作至关重要:没有它,线程大部分时间都处于空闲状态,等待数据从内存传输到处理器的缓存中。具有最佳引用局部性的数据结构是原始数组,因为数据本身连续存储在内存中。
来源:Item #48 使用 Joshua Bloch 编写的 Streams Parallel, Effective Java 3e 时要小心
永远不要将无限流与限制并行化。这是发生的事情:
public static void main(String[] args) {
// let's count to 1 in parallel
System.out.println(
IntStream.iterate(0, i -> i + 1)
.parallel()
.skip(1)
.findFirst()
.getAsInt());
}
结果
Exception in thread "main" java.lang.OutOfMemoryError
at ...
at java.base/java.util.stream.IntPipeline.findFirst(IntPipeline.java:528)
at InfiniteTest.main(InfiniteTest.java:24)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.stream.SpinedBuffer$OfInt.newArray(SpinedBuffer.java:750)
at ...
如果您使用 .limit(...)
,则相同
此处说明:Java 8, using .parallel in a stream causes OOM error
同样,如果流是有序的并且具有比您想要处理的更多的元素,请不要使用并行,例如
public static void main(String[] args) {
// let's count to 1 in parallel
System.out.println(
IntStream.range(1, 1000_000_000)
.parallel()
.skip(100)
.findFirst()
.getAsInt());
}
这可能会运行更长的时间,因为并行线程可能会在大量数字范围而不是关键的 0-100 范围内工作,从而导致这需要很长时间。
Collection.parallelStream()
是并行工作的好方法。但是您需要记住,这有效地使用了一个公共线程池,内部只有几个工作线程(默认情况下,线程数等于 cpu 核心数),请参阅 ForkJoinPool.commonPool()
。如果池的某些任务是长时间运行的 I/O 密集型工作,那么其他可能很快的 parallelStream
调用将在等待空闲池线程时卡住。这显然导致要求 fork-join 任务是非阻塞和短的,或者换句话说,cpu-bound。为了更好地理解细节,我强烈建议您仔细阅读 java.util.concurrent.ForkJoinTask
javadoc,这里有一些相关的引用:
ForkJoinTasks 的效率源于......它们的主要用途是作为计算任务计算纯函数或对纯孤立对象进行操作。
理想情况下,计算应避免同步方法或块,并应尽量减少其他阻塞同步
可细分任务也不应该执行阻塞 I/O
这些表明 parallelStream()
任务的主要目的是对孤立的内存结构进行短计算。还建议查看文章 Common parallel stream pitfalls
Runnable
的对象集合,我称之为start()
将它们用作Threads
,是否可以将其更改为在.forEach()
并行化中使用 java 8 流?然后我就可以将线程代码从课程中剥离出来。但是有什么缺点吗?