当我们谈到 ThreadPoolExecutor
时,core pool size 和 maximum pool size 之间究竟有什么区别?
可以借助示例进行解释?
举这个例子。开始线程池大小为 1,核心池大小为 5,最大池大小为 10,队列为 100。随着请求的进入,线程将被创建最多 5 个,然后将任务添加到队列中,直到达到 100。当队列已满时,将创建最多为 maxPoolSize 的新线程。一旦所有线程都在使用并且队列已满,任务将被拒绝。随着队列的减少,活动线程的数量也会减少。
IF running threads > corePoolSize & < maxPoolSize,如果 Total task queue 已满且新线程到达,则创建一个新线程。
来自文档:(如果运行的线程多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建一个新线程。)
现在,举一个简单的例子,
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
这里,5 是 corePoolSize - 意味着 Jvm 将为前 5 个任务的新任务创建新线程。其他任务将被添加到队列中,直到队列满(50 个任务)。
10 是 maxPoolSize - JVM 最多可以创建 10 个线程。表示如果已经有 5 个任务/线程正在运行并且队列已满,有 50 个待处理任务,并且如果还有一个新请求/任务到达队列,那么 JVM 将创建最多 10 个新线程(总线程数 = 前 5 个 + 新 5 个) ;
new ArrayBlockingQueue(50) = 是总队列大小 - 它可以在其中排队 50 个任务。
一旦所有 10 个线程都在运行并且如果新任务到达,那么该新任务将被拒绝。
SUN 内部创建线程的规则:
如果线程数小于 corePoolSize,则创建一个新的 Thread 来运行一个新的任务。如果线程数等于(或大于)corePoolSize,则将任务放入队列。如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行任务。如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。
希望,这是有帮助的......如果我错了,请纠正我......
ThreadPoolExecutor 池大小的规则
ThreadPoolExecutor's
池大小的规则通常被误解,因为它没有按照您认为的方式或您希望的方式工作。
举这个例子。起始线程池大小为 1,核心池大小为 5,最大池大小为 10,队列为 100。
Sun 的方式:当请求进入时,最多创建 5 个线程,然后将任务添加到队列中,直到达到 100 个。当队列满时,将创建最多 maxPoolSize
个新线程。一旦所有线程都在使用并且队列已满,任务将被拒绝。随着队列的减少,活动线程的数量也会减少。
用户预期的方式:当请求进入线程时,将创建最多 10 个线程,然后将任务添加到队列中,直到达到 100 个时它们被拒绝。线程数将重命名为最大值,直到队列为空。当队列为空时,线程将终止,直到剩下 corePoolSize
。
不同之处在于用户希望更早开始增加池大小并希望队列更小,而 Sun 方法希望保持池大小较小,并且仅在负载变得很大时才增加它。
以下是 Sun 的线程创建规则,简单来说:
如果线程数小于 corePoolSize,则创建一个新的 Thread 来运行一个新的任务。如果线程数等于(或大于)corePoolSize,则将任务放入队列。如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行任务。如果队列已满,并且线程数大于或等于 maxPoolSize,则拒绝该任务。总而言之,新线程仅在队列填满时创建,因此如果您使用无界队列,则线程数不会超过 corePoolSize。
要获得更全面的解释,请从马口中获取:ThreadPoolExecutor
API 文档。
有一篇非常好的论坛帖子向您介绍了 ThreadPoolExecutor
与代码示例一起使用的方式:http://forums.sun.com/thread.jspa?threadID=5401400&tstart=0
更多信息:http://forums.sun.com/thread.jspa?threadID=5224557&tstart=450
从 the doc:
当在方法 execute(java.lang.Runnable) 中提交了一个新任务,并且运行的线程少于 corePoolSize 时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。
此外:
通过将 corePoolSize 和 maximumPoolSize 设置为相同,您可以创建一个固定大小的线程池。通过将 maximumPoolSize 设置为基本上无界的值,例如 Integer.MAX_VALUE,您允许池容纳任意数量的并发任务。最典型的是,核心和最大池大小仅在构造时设置,但它们也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 动态更改。
如果您决定手动创建 ThreadPoolExecutor
而不是使用 Executors
工厂类,则需要使用其构造函数之一来创建和配置一个。这个类最广泛的构造函数是:
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
);
如您所见,您可以配置:
核心池大小(线程池将尝试坚持的大小)。
最大池大小。
保持活动时间,这是一个空闲线程有资格被拆除的时间。
保存等待执行的任务的工作队列。
任务提交被拒绝时应用的策略。
限制排队任务的数量
限制正在执行的并发任务的数量,调整线程池的大小,在可预测性和稳定性方面对您的应用程序及其执行环境带来巨大的好处:无限的线程创建最终会耗尽运行时资源,您的应用程序可能会因此而遇到问题, 严重的性能问题,甚至可能导致应用程序不稳定。
这只是解决问题的一部分:您限制了正在执行的任务数量,但没有限制可以提交和排队以供以后执行的作业数量。应用程序稍后会遇到资源短缺,但如果提交率持续超过执行率,它最终会遇到这种情况。
解决这个问题的方法是:给执行者提供一个阻塞队列来保存等待中的任务。如果队列已满,提交的任务将被“拒绝”。 RejectedExecutionHandler
在任务提交被拒绝时被调用,这就是为什么在前一项中引用了动词被拒绝的原因。您可以实施自己的拒绝策略或使用框架提供的内置策略之一。
默认拒绝策略让执行程序抛出 RejectedExecutionException
。但是,其他内置策略允许您:
默默地放弃工作。
放弃最旧的作业并尝试重新提交最后一个作业。
在调用者的线程上执行被拒绝的任务。
在图片中,只考虑添加任务正在发生
https://i.stack.imgur.com/vQccY.png
您可以在 javadoc 中找到术语 corepoolsize 和 maxpoolsize 的定义。 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html
上面的链接有你问题的答案。但是,只是为了说清楚。应用程序将继续创建线程,直到达到 corePoolSize。我认为这里的想法是这些线程应该足以处理任务的流入。如果在创建 corePoolSize 线程之后有新任务出现,则任务将排队。一旦队列已满,执行程序将开始创建新线程。这是一种平衡。它本质上的意思是,任务的流入量大于处理能力。因此,Executor 将再次开始创建新线程,直到达到最大线程数。同样,当且仅当队列已满时,才会创建一个新线程。
this 博客中有很好的解释:
public class ThreadPoolExecutorExample {
public static void main (String[] args) {
createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
}
private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
String msg) {
System.out.println("---- " + msg + " queue instance = " +
queue.getClass()+ " -------------");
ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, queue);
for (int i = 0; i < 10; i++) {
try {
e.execute(new Task());
} catch (RejectedExecutionException ex) {
System.out.println("Task rejected = " + (i + 1));
}
printStatus(i + 1, e);
}
e.shutdownNow();
System.out.println("--------------------\n");
}
private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
StringBuilder s = new StringBuilder();
s.append("poolSize = ")
.append(e.getPoolSize())
.append(", corePoolSize = ")
.append(e.getCorePoolSize())
.append(", queueSize = ")
.append(e.getQueue()
.size())
.append(", queueRemainingCapacity = ")
.append(e.getQueue()
.remainingCapacity())
.append(", maximumPoolSize = ")
.append(e.getMaximumPoolSize())
.append(", totalTasksSubmitted = ")
.append(taskSubmitted);
System.out.println(s.toString());
}
private static class Task implements Runnable {
@Override
public void run () {
while (true) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
break;
}
}
}
}
}
输出 :
---- Bounded queue instance = class java.util.concurrent.ArrayBlockingQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 4, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 2, corePoolSize = 2, queueSize = 4, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 2, corePoolSize = 2, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 2, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = 8
poolSize = 2, corePoolSize = 2, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9
poolSize = 2, corePoolSize = 2, queueSize = 8, queueRemainingCapacity = 2147483639, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Direct hand-off queue instance = class java.util.concurrent.SynchronousQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 3, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
Task rejected = 6
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
Task rejected = 7
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
Task rejected = 8
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
Process finished with exit code 0
来自 Java concurency Essentials 一书:
CorePoolSize:ThreadPoolExecutor 有一个属性 corePoolSize,它决定了它将启动多少个线程,直到队列满时才启动新线程
MaximumPoolSize:这个属性决定了最多启动多少个线程。您可以将其设置为整数。 MAX_VALUE 为了没有上限
java.util.concurrent.ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
了解提交新任务时 ThreadPoolExecutor
的内部行为有助于我了解 corePoolSize
和 maximumPoolSize
的不同之处。
让:
N 是池中的线程数,getPoolSize()。活动线程+空闲线程。
T 是提交给执行者/池的任务数量。
C 是核心池大小,getCorePoolSize()。在新任务进入队列之前,每个池最多可以为传入任务创建多少个线程。
M 是最大池大小,getMaximumPoolSize()。池可以分配的最大线程数。
提交新任务时 Java 中 ThreadPoolExecutor
的行为:
对于 N <= C,空闲线程不会被分配新的传入任务,而是创建一个新线程。
对于 N > C,如果有空闲线程,则在那里分配新任务。
对于 N > C 并且如果没有空闲线程,则将新任务放入队列中。此处未创建新线程。
当队列已满时,我们创建最多 M 的新线程。如果达到 M,我们拒绝任务。重要的是不要在这里创建新线程,直到队列已满!
资料来源:
docs.oracle.com
关于这个主题的优秀文章。
例子
corePoolSize = 0 和 maximumPoolSize = 10 的示例,队列容量为 50。
这将导致池中只有一个活动线程,直到队列中有 50 个项目。
executor.execute(task #1):
before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
[task #1 immediately queued and kicked in b/c the very first thread is created when `workerCountOf(recheck) == 0`]
execute(task #2):
before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
[task #2 not starting before #1 is done]
... executed a few tasks...
execute(task #19)
before task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 17, completed tasks = 0]
after task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 18, completed tasks = 0]
...
execute(task #51)
before task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 50, completed tasks = 0]
after task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 2, active threads = 2, queued tasks = 50, completed tasks = 0]
Queue is full.
A new thread was created as the queue was full.
corePoolSize = 10 和 maximumPoolSize = 10 的示例,队列容量为 50。
这将导致池中有 10 个活动线程。当队列中有 50 个项目时,任务将被拒绝。
execute(task #1)
before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
execute(task #2)
before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
execute(task #3)
before task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
after task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
... executed a few tasks...
execute(task #11)
before task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]
after task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 1, completed tasks = 0]
... executed a few tasks...
execute(task #51)
before task #51 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 50, completed tasks = 0]
Task was rejected as we have reached `maximumPoolSize`.
根据文档:
任何 BlockingQueue 都可以用来传输和保存提交的任务。此队列的使用与池大小交互:如果运行的线程少于核心池大小,则执行程序总是更喜欢添加新线程而不是排队。如果核心池大小或更多线程正在运行,Executor 总是更喜欢排队请求而不是添加新线程。如果请求无法排队,则会创建一个新线程,除非这将超过 maximumPoolSize,在这种情况下,该任务将被拒绝。
这意味着核心池大小是执行程序服务更愿意将任务排队而不是产生新线程的阈值。如果在接收任务时,线程数小于核心池大小,则创建新线程,从而增加活动线程。
不定期副业成功案例分享
allowCoreThreadTimeOut(boolean)
,它允许核心线程在给定的空闲时间后被杀死。将此设置为 true 并设置core threads
=max threads
允许线程池在 0 和max threads
之间缩放。prestartCoreThread()
或prestartAllCoreThreads()
。