슬기로운 개발생활

[Java] 병렬 스트림(ParallelStream) 사용 방법 및 주의사항

by coco3o
반응형

ParallelStream

Java8에서 등장한 Stream은 병렬 처리를 쉽게 할 수 있도록 메소드를 제공해 준다.
개발자가 직접 스레드 혹은 스레드풀을 생성하거나 관리할 필요 없이 parallelStream(), parallel()만 사용하면 알아서 ForkJoinFramework 관리 방식을 이용하여 작업들을 분할하고, 병렬적으로 처리하게 된다.
 

Fork / Join Framework

Fork / Join Framework는 작업들을 분할 가능한 만큼 쪼개고, 쪼개진 작업들을 Work Thread를 통해 작업 후 결과를 합치는 과정으로 결과를 만들어 낸다.
즉, 분할 정복(Divide and Conquer) 알고리즘과 흡사하며, Fork를 통해 Task를 분담하고 Join을 통해 결과를 합친다.

Fork / Join Framework의 중심은 AbstractExecutorService 클래스를 확장한 ForkJoinPool이다.
 
ForkJoinPool을 알아보기 위해 JavaDoc에서 일부 발췌한 내용이다.

An ExecutorService for running ForkJoinTasks. A ForkJoinPool provides the entry point for submissions from non-ForkJoinTask clients,

as well as management and monitoring operations. A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks
(eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

위 내용을 정리하면 다음과 같다.

  • 다른 종류의 ExecutorService와는 다르게 Work-Stealing 메커니즘을 사용한다.
  • 이를 통해 대부분의 Task가 하위 Task를 생성하는 경우, 외부 클라이언트에 의한 Small Task가 많을 경우 효과적일 수 있다.

http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html

1. task를 보낸다. (submit)
2. inbound queue에 task가 들어가고, A와 B 스레드가 가져다가 task를 처리한다.
3. A와 B는 각자 큐가 있으며, 위 그림의 B처럼 큐에 task가 없으면 A의 task를 steal 하여 처리한다.

work-stealing 메커니즘을 사용하기 때문에 CPU 자원이 놀지 않고 최적의 성능을 낼 수 있게 된다. 

스레드 자신의 task queue로 deque를 사용한다. deque는 양 쪽 끝으로 넣고 뺄 수 있는 구조이며,
각 스레드는 deque의 한쪽 끝에서만 일하고 나머지 반대쪽에서는 task를 훔치러 온 다른 스레드가 접근한다.


그리고, Fork / Join Framework의 Work Thread의 수는 해당 어플리케이션이 구동되는 서버의 스펙에 따라 결정된다.
Runtime.getRuntime().availableProcessors()으로 JVM에서 이용 가능한 CPU Core 개수를 확인할 수 있으며,
스레드가 N개 생성되었을 때, 하나는 메인 스레드로 스트림을 처리하는 기본 스레드와 나머지 N-1개의 스레드가 ForkJoinPool 스레드이다.
 
 

예제 코드로 확인해보기

우선 순차적으로 실행 중인 thread 이름을 출력해보자.

public static void main(String[] arg) {

    List<Integer> numbers = Arrays.asList(1, 2, 3, 4);

    long start = System.currentTimeMillis();
    numbers.forEach(number -> {
        try {
            Thread.sleep(3000);
            System.out.println(number + ": " + Thread.currentThread().getName());
        } catch (InterruptedException e) {}
	});
    long duration = (System.currentTimeMillis() - start);
    double seconds = duration / 1000.0;

    System.out.printf("Done in %.2f sec\n", seconds);
}
1: main
2: main
3: main
4: main
Done in 12.05 sec

위 테스트를 진행했을 때 4개의 Elements에 대해서 각각 3초간 delay 되면서 총 12.05초가 걸렸다.
 
다음은 병렬 스트림을 이용하여 실행 중인 thread 이름을 출력하는 메소드를 호출해보자.
사용법은 간단한데, parallelStream() 또는 stream().parallel() 만 붙여주면 된다.

public static void main(String[] arg) {

    List<Integer> numbers = Arrays.asList(1, 2, 3, 4);

    long start = System.currentTimeMillis();
    numbers.parallelStream().forEach(number -> {
        try {
            Thread.sleep(3000);
            System.out.println(number + ": " + Thread.currentThread().getName());
        } catch (InterruptedException e) {}
    });
    long duration = (System.currentTimeMillis() - start);
    double seconds = duration / 1000.0;

    System.out.printf("Done in %.2f sec\n", seconds);
}
3: main
1: ForkJoinPool.commonPool-worker-2
2: ForkJoinPool.commonPool-worker-1
4: ForkJoinPool.commonPool-worker-3
Done in 3.04 sec

현재 사용 중인 PC의 코어 수는 4개이다.

메인 스레드를 포함한 4개의 스레드가 병렬 처리되면서 12.05가 걸렸던 작업이 3.04초만에 완료되었다.
그리고 numbers의 size를 5개로 설정하면 결과를 얻는 데까지 6초의 시간이 걸릴 것이다.
4개의 스레드가 동시 작업 할 동안 남은 1개의 작업을 처리하지 못하기 때문이다.
 
 

ParallelStream 크기 제어

ParallelStream에서 개발자가 임의로 Thread Pool의 크기를 조절하는 방법은 두 가지가 있다.
 

1. Property 설정

java.util.concurrent.ForkJoinPool.common.parallelism Property값을 설정하는 방법이다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");
public static void main(String[] arg) {
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");

    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);

    long start = System.currentTimeMillis();
    
    numbers.parallelStream().forEach(number -> {
        try {
            Thread.sleep(3000);
            System.out.println(number + ": " + Thread.currentThread().getName());
        } catch (InterruptedException e) {}
    });    
    
    long duration = (System.currentTimeMillis() - start);
    double seconds = duration / 1000.0;

    System.out.printf("Done in %.2f sec\n", seconds);
}
4: main
3: ForkJoinPool.commonPool-worker-2
2: ForkJoinPool.commonPool-worker-3
1: ForkJoinPool.commonPool-worker-1
6: ForkJoinPool.commonPool-worker-4
5: ForkJoinPool.commonPool-worker-5
Done in 3.07 sec

이 방법은 현재 실행되는 프로세스의 모든 ForkJoinPool의 commonPool에 설정이 적용되며, JVM 전체에 영향을 미칠 수 있기 때문에 가급적 사용하지 않는 것을 권장한다.  
 

2. ForkJoinPool 설정

두 번째 방법은 commonPool을 사용하지 않고 개발자가 정의한 ForkJoinPool을 사용하는 방법이다.

ForkJoinPool forkJoinPool = new ForkJoinPool(6);

forkJoinPool.submit(() -> numbers.parallelStream()
        .forEach(number -> {
            try {
                Thread.sleep(3000);
                System.out.println(number + ": " + Thread.currentThread().getName());
            } catch (InterruptedException e) {}
        })
).get();

ForkJoinPool 생성자에 스레드 개수를 지정하여 사용할 수 있으며, 지정한 수만큼 스레드를 이용하여 처리한다.
 
 

ParallelStream 사용 전 꼭 알아야 할 주의사항

 

1. Thread Pool을 공유

parallelStream은 내부적으로 common ForkJoinPool을 사용하여 작업을 병렬화 시킨다.
parallelStream 별로 Thread Pool을 만드는 게 아니라는 것이다.
별도의 설정이 없다면 하나의 Thread Pool을 모든 parallelStream이 공유하게 되고, 
Thread Pool을 사용하는 다른 Thread에 영향을 줄 수 있으며, 반대로 영향을 받을 수 있다.
 
Thread Pool은 미리 스레드를 생성하여 보관하고 필요할 때 빌려주고 사용하지 않으면 반납하여 Thread의 숫자를 유지하는 역할을 한다.
그런데 만약 Thread를 사용 중인 곳에서 아래 이미지처럼 Thread를 반납하지 않고 계속 점유 중이라면 어떻게 될까?


이렇게 되면 Thread 1, 2, 3은 사용할 수 없으며 Thread 4 한 개 만을 이용해서 모든 요청을 처리하게 된다. 
특히 Thread 1, 2, 3 이 sleep과 같이 아무런 일을 하지 않으면서 점유를 하고 있다면 이는 문제가 크다.

만약, Thread 4까지 점유 중이게 되면 더 이상 요청은 처리되지 않고 Thread Pool Queue에 쌓이게 되며, 일정시간 이상 되면 요청이 Drop 되는 현상까지 발생할 것이다.

병렬 스트림은 Thread Pool을 global하게 공유하기 때문에 만약 A메서드에서 4개의 Thread를 모두 점유하면 다른 병렬 스트림의 요청은 처리되지 않고 대기하게 된다.
또한, blocking  I/O 가 발생하는 작업을 하게 되면 Thread Pool 내부의 스레드들은 block 되며, 이때 Thread Pool을 공유하는 다른 쪽의 병렬 Stream은 스레드를 얻을 때까지 계속해서 기다리게 되면서 문제가 발생한다.
 

이 문제는 각 parallelStream마다 커스텀(new ForkJoinPool(int n))하여 독립적인 Thread Pool로 분리하여 사용하면 해결할 수 있다.
 

2. Custom Thread Pool 사용 시 Memory Leak 주의

ForkJoinPool customForkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

※ 별도의 스레드 풀 생성 시 정석은 실행 중인 CPU 코어 수를 기준으로 생성하는 것이다. 

물리적인 코어 수를 초과하여 생성할 경우, 생성은 되지만 스레드 관리 오버헤드와 스레드 간의 빈번한 컨텍스트 스위칭(Context-Switching) 등의 문제로 성능 저하가 발생할 수 있다.
 

Parallel Stream 별로 ForkJoinPool을 인스턴스화하여 사용하면 OOME(OutOfMemoryError)이 발생할 수 있다.
default로 사용되는 Common ForkJoinPool은 정적(static)이기 때문에 메모리 누수가 발생하지 않지만,
Custom 한 ForkJoinPool 객체는 참조 해제되지 않거나, GC(Garbage Collection)로 수집되지 않을 수 있다.
이 문제에 대한 해결 방법은 간단한데, Custom ForkJoinPool을 사용한 후 다음과 같이 스레드 풀을 명시적으로 종료하는 것이다.

ForkJoinPool customForkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// do something..
customForkJoinPool.shutdown();

이렇게 하면 사용이 끝난 Custom ForkJoinPool이 메모리에 머무르는 것을 방지할 수 있다.
 

3. Collection 별 성능 차이

parallelStream은 분할되는 작업의 단위가 균등하게 나누어져야 하며, 나누어지는 작업의 비용이 높지 않아야 순차적 방식보다 효율적으로 이루어질 수 있다.
array, arrayList와 같이 전체 사이즈를 알 수 있는 경우에는 분할 처리가 빠르고 비용이 적게 들지만,
linkedList와 같이 사이즈를 정확히 알 수 없는 데이터 구조는 분할되지 않고 순차 처리를 하므로 성능 효과를 보기 어렵다.
 

4. 병렬 처리 시 작업의 독립성

병렬로 처리되는 작업이 독립적이지 않다면, 수행 성능에 영향이 있을 수 있다.
예를 들어, stream()의 중간 연산 중에 sorted()나 distinct()와 같은 작업을 수행할 경우
내부적으로 상태에 대한 변수를 각 작업들이 공유(synchronized)하게 되어 있다.

내부적으로 어떤 공용 변수를 만들어 놓고 각 worker들이 이 변수에 접근할 경우, 동기화 작업 등을 통해 변수를 안전하게 유지하며 처리한다. 기존 Thread 작업 시 개발자가 해줘야 했던 동기화 등의 작업을 모두 수행하고 있는 것이다.

이러한 경우 순차적으로 실행하는 것이 보다 효과적이며,
각각 완전히 분리된 task들에 대해서 병렬로 처리하는 경우에 성능상 이점이 있을 수 있다.
 

5. 요소의 수 그리고 요소당 처리 시간

컬렉션에 요소의 수가 적고 요소당 처리 시간이 짧으면 순차 처리가 오히려 빠를 수 있다.
병렬 처리는 작업들을 분할(fork)하고 다시 합치는(join) 비용, 스레드 간의 컨텍스트 스위칭 비용도 포함되기 때문이다.
 
 

정리하면

parallelStream()은 세부 설정이나 복잡한 로직 없이 기존 stream()을 쓰듯 사용할 수 있는 편리함을 제공하지만,
병렬 처리가 무조건 더 나은 결과를 보장한다고 할 순 없다.
 
처리 성능에 영향을 미치는 부분들, 분할 및 병합 과정에서의 비용, 멀티 스레드 환경에서의 컨텍스트 스위칭 비용 등에 대해 충분히 고려해야 하기 때문에 신중해야 한다.
 
또한,  I/O를 기다리는 작업에는 적합하지 않고, (이 경우 CompletableFuture가 적합)
분할이 잘 이루어질 수 있는 데이터 구조 혹은 작업이 독립적이면서 CPU 사용이 높은 작업에 적합하다. 
 
특정 로직의 성능 개선을 위해 parallelStream()을 적용하고자 한다면, 이것이 정말로 성능 개선을 해줄 수 있는가 혹 예상치 못한 장애를 발생시키지는 않을까에 대해 충분히 고민하고 적용하는 것이 좋을 것 같다.
 
 
 
Reference :
https://sabarada.tistory.com/102
https://m.blog.naver.com/tmondev/220945933678
https://www.baeldung.com/java-when-to-use-parallel-stream
https://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html
https://www.baeldung.com/java-8-parallel-streams-custom-threadpool#bd-beware-of-the-memory-leak

반응형

블로그의 정보

슬기로운 개발생활

coco3o

활동하기