IT story

크기 제한이있는 캐시 된 스레드 풀을 만들 수 없습니까?

hot-time 2020. 7. 18. 10:02
반응형

크기 제한이있는 캐시 된 스레드 풀을 만들 수 없습니까?


캐시 가능한 스레드 풀을 생성 할 수있는 스레드 수로 제한하는 것은 불가능한 것 같습니다.

정적 Executors.newCachedThreadPool이 표준 Java 라이브러리에서 구현되는 방법은 다음과 같습니다.

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

따라서 해당 템플리트를 사용하여 고정 크기의 캐시 된 스레드 풀을 작성하십시오.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

이제 이것을 사용하고 3 개의 작업을 제출하면 모든 것이 정상입니다. 추가 작업을 제출하면 거부 된 실행 예외가 발생합니다.

이것을 시도 :

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

모든 스레드가 순차적으로 실행됩니다. 즉, 스레드 풀은 작업을 처리하기 위해 둘 이상의 스레드를 만들지 않습니다.

ThreadPoolExecutor의 execute 메소드에 버그가 있습니까? 아니면 이것은 의도적입니까? 아니면 다른 방법이 있습니까?

편집 : 캐시 된 스레드 풀과 똑같은 것을 원합니다 (요청시 스레드를 생성 한 다음 타임 아웃 후에 종료). 만들 수있는 스레드 수와 한 번 추가 작업을 계속 큐에 넣을 수있는 기능에 제한이 있습니다. 스레드 제한에 도달했습니다. sjlee의 답변에 따르면 이것은 불가능합니다. ThreadPoolExecutor의 execute () 메소드를 보면 실제로 불가능합니다. ThreadPoolExecutor의 서브 클래스를 작성하고 SwingWorker와 비슷한 execute ()를 재정의해야하지만 SwingWorker가 execute ()에서 수행하는 작업은 완전한 해킹입니다.


ThreadPoolExecutor에는 다음과 같은 몇 가지 주요 동작이 있으며 이러한 동작으로 문제를 설명 할 수 있습니다.

작업이 제출되면

  1. 스레드 풀이 코어 크기에 도달하지 않으면 새 스레드가 작성됩니다.
  2. 코어 크기에 도달하고 유휴 스레드가 없으면 작업을 대기열에 넣습니다.
  3. 코어 크기에 도달하면 유휴 스레드가없고 큐가 가득 차면 새 스레드를 작성합니다 (최대 크기에 도달 할 때까지).
  4. 최대 크기에 도달하면 유휴 스레드가없고 큐가 가득 차면 거부 정책이 시작됩니다.

첫 번째 예에서 SynchronousQueue의 크기는 기본적으로 0입니다. 따라서 최대 크기 (3)에 도달하면 거부 정책이 시작됩니다 (# 4).

두 번째 예에서 선택한 큐는 크기가 무제한 인 LinkedBlockingQueue입니다. 그러므로, 당신은 행동 # 2에 갇히게됩니다.

동작이 거의 완전히 결정되었으므로 캐시 된 유형 또는 고정 유형으로 실제로 많은 것을 해결할 수 없습니다.

제한된 동적 스레드 풀을 사용하려면 유한 크기의 큐와 결합 된 포지티브 코어 크기 및 최대 크기를 사용해야합니다. 예를 들어

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

부록 : 이것은 상당히 오래된 답변이며 JDK가 0의 코어 크기와 관련하여 동작을 변경 한 것으로 보입니다. JDK 1.6부터 코어 크기가 0이고 풀에 스레드가 없으면 ThreadPoolExecutor는 해당 작업을 실행할 스레드 따라서 코어 크기 0은 위의 규칙에 대한 예외입니다. 감사합니다 스티브 에 대한 데려 내 관심 해당합니다.


내가 놓친 것이 없다면, 원래 질문에 대한 해결책은 간단합니다. 다음 코드는 원본 포스터에서 설명한대로 원하는 동작을 구현합니다. 바인딩되지 않은 큐에서 작동하기 위해 최대 5 개의 스레드를 생성하고 유휴 스레드는 60 초 후에 종료됩니다.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

같은 문제가 있었다. 다른 답변으로 모든 문제를 해결할 수는 없으므로 다음을 추가하고 있습니다.

이제 문서로 명확하게 작성되었습니다 . LinkedBlockingQueue최대 스레드 설정을 차단하지 않는 큐를 사용하면 ( ) 최대 스레드 설정이 적용되지 않고 코어 스레드 만 사용됩니다.

그래서:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

이 유언 집행 인은 :

  1. 무제한 큐를 사용하므로 최대 스레드 개념이 없습니다. 이러한 큐가 executor가 일반적인 정책을 따르는 경우 코어가 아닌 추가 스레드를 대량으로 만들 수 있기 때문에 이는 좋은 일입니다.

  2. A queue of max size Integer.MAX_VALUE. Submit() will throw RejectedExecutionException if number of pending tasks exceeds Integer.MAX_VALUE. Not sure we will run out of memory first or this will happen.

  3. Has 4 core threads possible. Idle core threads automatically exit if idle for 5 seconds.So, yes, strictly on demand threads.Number can be varied using setThreads() method.

  4. Makes sure min number of core threads is never less than one, or else submit() will reject every task. Since core threads need to be >= max threads the method setThreads() sets max threads as well, though max thread setting is useless for an unbounded queue.


In your first example, subsequent tasks are rejected because the AbortPolicy is the default RejectedExecutionHandler. The ThreadPoolExecutor contains the following policies, which you can change via the setRejectedExecutionHandler method:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

It sounds like you want cached thread pool with a CallerRunsPolicy.


None of the answers here fixed my problem, which had to do with creating a limited amount of HTTP connections using Apache's HTTP client (3.x version). Since it took me some hours to figure out a good setup, I'll share:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

This creates a ThreadPoolExecutor which starts with five and holds a maximum of ten simultaneously running threads using CallerRunsPolicy for executing.


Per the Javadoc for ThreadPoolExecutor:

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool.

(Emphasis mine.)

jitter's answer is what you want, although mine answers your other question. :)


there is one more option. Instead of using new SynchronousQueue you can use any other queue also, but you have to make sure its size is 1, so that will force executorservice to create new thread.


Doesn't look as though any of the answers actually answer the question - in fact I can't see a way of doing this - even if you subclass from PooledExecutorService since many of the methods/properties are private e.g. making addIfUnderMaximumPoolSize was protected you could do the following:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

The closest I got was this - but even that isn't a very good solution

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. not tested the above


This is what you want (atleast I guess so). For an explanation check Jonathan Feinberg answer

Executors.newFixedThreadPool(int n)

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.


Here is another solution. I think this solution behaves as you want it to (though not proud of this solution):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

  1. You can use ThreadPoolExecutor as suggested by @sjlee

    You can control the size of the pool dynamically. Have a look at this question for more details :

    Dynamic Thread Pool

    OR

  2. You can use newWorkStealingPool API, which has been introduced with java 8.

    public static ExecutorService newWorkStealingPool()
    

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

By default, the parallelism level is set to number of CPU cores in your server. If you have 4 core CPU server, thread pool size would be 4. This API returns ForkJoinPool type of ExecutorService and allow work stealing of idle threads by stealing tasks from busy threads in ForkJoinPool.


The problem was summarized as follows:

I want something exactly like the cached thread pool (it creates threads on demand and then kills them after some timeout) but with a limit on the number of threads that it can create and the ability to continue to queue additional tasks once it has hit its thread limit.

Before pointing to the solution I will explain why the following solutions don't work:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

This will not queue any tasks when the limit of 3 is reached because SynchronousQueue, by definition, cannot hold any elements.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

This will not create more than a single thread because ThreadPoolExecutor only creates threads exceeding the corePoolSize if the queue is full. But LinkedBlockingQueue is never full.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

This will not reuse threads until the corePoolSize has been reached because ThreadPoolExecutor increases the number of threads until the corePoolSize is reached even if existing threads are idle. If you can live with this disadvantage then this is the easiest solution to the problem. It is also the solution described in "Java Concurrency in Practice" (footnote on p172).

The only complete solution to the described problem seems to be the one involving overriding the queue's offer method and writing a RejectedExecutionHandler as explained in the answers to this question: How to get the ThreadPoolExecutor to increase threads to max before queueing?

참고URL : https://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with-a-size-limit

반응형