슬기로운 개발생활

[Spring] ThreadPoolTaskExecutor 설정으로 비동기 처리 효과적으로 하기

by coco3o
반응형

이전 글에 이어서 Spring Boot에서 ThreadPoolTaskExecutor 설정을 통해 비동기 처리를 효율적으로 수행하는 방법을 알아보려 한다.

 

ThreadPoolTaskExecutor

ThreadPoolTaskExecutor는 Spring에서 제공해주는 클래스로 자바에서 제공하는 ThreadPoolExecutor를 사용하기 쉽게 만들어 사용하도록 구현 되어 있어 스레드 풀을 쉽고 간단하게 설정하고 관리할 수 있다.

java.util.concurrent Executor를 최상위 인터페이스로 가진다. 

 

Bean 등록

우선 ThreadPoolTaskExecutor를 사용하기 위해 Bean으로 등록한다.

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.initialize();

        return taskExecutor;
    }
}

ThreadPoolTaskExecutor를 생성하고 사용할 수 있도록 initialize()를 호출했다.

※ 명시적으로 적지 않아도 빈으로 등록될 때 initialize()한다. 

 

Thread 설정

private static final int CORE_POOL_SIZE = 10;
private static final int MAX_POOL_SIZE = 30;

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.initialize();

    return taskExecutor;
}
  • setCorePoolSize
    • 동시에 실행 할 기본 스레드의 수를 설정할 수 있다.
    • 기본 값은 1이다.
  • setMaxPoolSize
    • thread-pool의 사용할 수 있는 최대 스레드 수를 설정할 수 있다.
    • 기본 값은 Integer.MAX_VALUE 이다.

 

최초 설정한 스레드의 수만큼 작업하다 더 이상 처리할 수 없을 경우 max size 만큼 스레드가 증가하겠지? 라고 예상할 수 있지만, 실제로는 그렇지 않다.

 

설정한 core size보다 많은 작업 요청이 들어오면 ThreadPoolTaskExecutor는 내부적으로 Integer.MAX_VALUE 만큼 LinkedBlockingQueue를 생성하고 task는 queue에서 대기하게 된다.

그리고 queue가 꽉 차게 되면 그 때 max size 만큼 스레드를 생성하여 task를 처리한다.

 

QueueCapacity

core size보다 많은 요청이 들어오면 Integer.MAX_VALUE 만큼 queue를 생성한다 했는데 이는 기본 설정 값이며,

작업에 맞게 queueCapacity 사이즈를 변경하여 사용할 수 있다.

private static final int CORE_POOL_SIZE = 10;
private static final int MAX_POOL_SIZE = 30;
private static final int QUEUE_CAPACITY = 100;

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    taskExecutor.initialize();

    return taskExecutor;
}
  • setQueueCapacity
    • thread-pool executor의 작업 큐의 크기를 설정할 수 있다.
    • 기본 값은 Integer.MAX_VALUE 이다.

 

위와 같이 설정하면 최초 10개의 스레드에서 처리하다가 처리 속도가 밀릴 경우 100개 사이즈의 큐에서 대기하고 그보다 많은 요청이 발생하면 최대 30개의 스레드를 생성해서 처리한다.

 

RejectedExecutionHandler

max size만큼 스레드를 생성하고, 설정한 queue가 가득 찬 상태에서 추가 작업이 들어올 경우 RejectedExecutionException 예외가 발생하게 된다.

더 이상 처리할 수 없다는 오류인데, 기본적으로 RejectedExecutionHandler 인터페이스를 구현한 몇 가지 클래스가 제공되며 이러한 예외가 발생하지 않도록 우리는 다음과 같이 설정하여 해결할 수 있다.

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    taskExecutor.initialize();

    return taskExecutor;
}
  • ThreadPoolExecutor.AbortPolicy
    • 기본 설정 값이다.
    • reject 발생시 RejectedExecutionException을 발생시킨다.
  • ThreadPoolExecutor.CallerRunsPolicy
    • shutdown 상태가 아닐 때, ThreadPoolTaskExecutor에 요청한 스레드에서 직접 처리한다.
  • ThreadPoolExecutor.DiscardPolicy
    • reject된 작업을 무시한다.
    • 모든 작업이 처리 될 필요가 없을 경우 사용한다.
  • ThreadPoolExecutor.DiscardOldsetPolicy
    • 오래된 작업을 제거하고 reject된 작업을 실행시킨다.
    • 역시 모든 작업이 처리 될 필요가 없을 경우 사용한다.

 

예외와 작업의 누락 없이 모두 처리하려면 CallerRunsPolicy로 설정하여 사용하자.

 

Shutdown

위와 같이 별도로 정의한 스레드 풀에서 작업이 이루어지고 있을 때 어플리케이션 종료를 요청하게되면 어플리케이션이 바로 종료된다. 이렇게 되면 아직 처리되지 못한 작업들은 유실되기 때문에 다음과 같이 설정하면 작업 유실을 방지할 수 있다.

private static final int CORE_POOL_SIZE = 30;
private static final int MAX_POOL_SIZE = 50;
private static final int QUEUE_CAPACITY = 100;
private static final boolean WAIT_TASK_COMPLETE = true;

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    taskExecutor.setWaitForTasksToCompleteOnShutdown(WAIT_TASK_COMPLETE);
    taskExecutor.initialize();

    return taskExecutor;
}
  • setWaitForTasksToCompleteOnShutdown
    •  true 설정시 어플리케이션 종료 요청시 queue에 남아 있는 모든 작업들이 완료될 때까지 기다린 후 종료된다.

 

Timeout

만약 모든 작업이 처리되길 기다리기 힘든 경우라면 setAwaitTerminationSeconds 설정을 통해 최대 종료 대기 시간을 설정할 수 있다.

private static final int CORE_POOL_SIZE = 30;
private static final int MAX_POOL_SIZE = 50;
private static final int QUEUE_CAPACITY = 100;
private static final boolean WAIT_TASK_COMPLETE = true;
private static final int AWAIT_TERMINATION_SECONDS = 30;

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    taskExecutor.setWaitForTasksToCompleteOnShutdown(WAIT_TASK_COMPLETE);
    taskExecutor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
    taskExecutor.initialize();

    return taskExecutor;
}

 

ThreadName

private static final int CORE_POOL_SIZE = 30;
private static final int MAX_POOL_SIZE = 50;
private static final int QUEUE_CAPACITY = 100;
private static final String THREAD_NAME_PREFIX = "executor-";
private static final boolean WAIT_TASK_COMPLETE = true;
private static final int AWAIT_TERMINATION_SECONDS = 30;

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    taskExecutor.setWaitForTasksToCompleteOnShutdown(WAIT_TASK_COMPLETE);
    taskExecutor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
    taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
    taskExecutor.initialize();

    return taskExecutor;
}
  • setThreadNamePrefix
    • 스레드의 이름을 설정한다.

 

ThreadPoolTaskExecutor 동작 원리

여기까지 내용을 복기하며 ThreadPoolTaskExecutor의 동작 원리를 생각 해보자.

  1. 현재 점유하고 있는 스레드의 수가 corePoolSize만큼 있고, 추가 요청이 오면 LinkedBlockingQueue를 생성하고, 요청을 큐에 넣는다.
  2. queue에 담긴 요청이 queueCapacity의 수만큼 있을 때 요청이 오면 maxPoolSize 만큼 스레드를 생성하여 처리한다.
  3. 현재 점유하고 있는 스레드의 수가 maxPoolSize 만큼 있고, 큐에 담긴 요청이 queueCapacity의 수 만큼 있을 때 추가 요청이 오면 RejectedExecution 전략에 따라 처리된다.

 

ThreadPoolTaskExecutor 사용법

1. Executor의 execute()

다음과 같이 Executor를 주입한 뒤 처리하고자 하는 작업을 Runnable 인터페이스의 run()메소드에 정의하고,

Executor.execute()의 인자로 넘겨주면 된다.

@Slf4j
@Service
public class TestService {

    private final Executor executor;

    public TestService(@Qualifier("taskExecutor") Executor executor) {
        this.executor = executor;
    }

    public void executeThreads() {
        Runnable runnable = () -> {
            try {
                log.info("executing Thread Name .. [{}]", Thread.currentThread().getName());
                Thread.sleep(1000); // 1초간 정지
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.execute(runnable);
        }
    }
}
19:26:42.247 [INFO ] [executor-9] com.coco.demo.TestService - executing Thread Name .. [executor-9]
19:26:42.248 [INFO ] [executor-8] com.coco.demo.TestService - executing Thread Name .. [executor-8]
19:26:42.250 [INFO ] [executor-10] com.coco.demo.TestService - executing Thread Name .. [executor-10]
19:26:42.250 [INFO ] [executor-7] com.coco.demo.TestService - executing Thread Name .. [executor-7]
19:26:42.251 [INFO ] [executor-6] com.coco.demo.TestService - executing Thread Name .. [executor-6]
19:26:42.251 [INFO ] [executor-5] com.coco.demo.TestService - executing Thread Name .. [executor-5]
19:26:42.251 [INFO ] [executor-4] com.coco.demo.TestService - executing Thread Name .. [executor-4]
19:26:42.251 [INFO ] [executor-3] com.coco.demo.TestService - executing Thread Name .. [executor-3]
19:26:42.251 [INFO ] [executor-2] com.coco.demo.TestService - executing Thread Name .. [executor-2]
19:26:42.252 [INFO ] [executor-1] com.coco.demo.TestService - executing Thread Name .. [executor-1]

 

2. CompletableFuture

CompletableFuture의 xxxAsync가 붙은 메소드는 기본적으로 ForkJoinPool의 commonPool을 사용하며,
두 번째 인수를 받는 오버로드 메소드에서 커스텀한 Thread Executor를 사용할 수 있다.

public void executeThreads() {
    CompletableFuture<Void> cf = null;

    for(int i = 0; i < 10; i++){
        cf = CompletableFuture.runAsync(() -> {
        try {
            log.info("executing Thread Name .. [{}]", Thread.currentThread().getName());
            Thread.sleep(1000); // 1초간 정지
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        }, executor);
    }

    cf.join();
}
19:37:16.229 [INFO ] [executor-2] com.coco.demo.TestService - executing Thread Name .. [executor-2]
19:37:16.229 [INFO ] [executor-1] com.coco.demo.TestService - executing Thread Name .. [executor-1]
19:37:16.229 [INFO ] [executor-5] com.coco.demo.TestService - executing Thread Name .. [executor-5]
19:37:16.229 [INFO ] [executor-6] com.coco.demo.TestService - executing Thread Name .. [executor-6]
19:37:16.229 [INFO ] [executor-4] com.coco.demo.TestService - executing Thread Name .. [executor-4]
19:37:16.230 [INFO ] [executor-8] com.coco.demo.TestService - executing Thread Name .. [executor-8]
19:37:16.230 [INFO ] [executor-7] com.coco.demo.TestService - executing Thread Name .. [executor-7]
19:37:16.230 [INFO ] [executor-10] com.coco.demo.TestService - executing Thread Name .. [executor-10]
19:37:16.230 [INFO ] [executor-9] com.coco.demo.TestService - executing Thread Name .. [executor-9]

 

참고

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html

https://kapentaz.github.io/spring/Spring-ThreadPoolTaskExecutor-%EC%84%A4%EC%A0%95/#

반응형

블로그의 정보

슬기로운 개발생활

coco3o

활동하기