슬기로운 개발생활

[Java] CompletableFuture 사용법

by coco3o
반응형

Future

Java5부터 사용되던 Future 인터페이스는 java.util.concurrency 패키지에서 비동기 작업의 결과 값을 받는 용도로 사용했다. 하지만 여러 Future의 결괏값을 조합하거나, 예외를 효과적으로 핸들링할 수가 없었다.

 

그리고 Future는 오직 get 호출로만 작업 완료가 가능한데, get은 작업이 완료될 때까지 대기하는 블로킹호출이므로 비동기 작업 응답에 추가 작업을 하기 적합하지 않다.

public interface Future<V> {
    ...
    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
  • get() - 작업이 완료될 때까지 대기
  • get(long timeout, TimeUnit unit) - 작업이 완료될 때까지 설정한 시간 동안 대기하며, 시간 내 작업 미완료 시 TimeoutException 발생

 

CompletableFuture

Java8에서는 이러한 문제들을 모두 해결한 CompletableFuture가 소개되었다.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ...
}

CompletableFuture 클래스는 Future 인터페이스를 구현함과 동시에 CompletionStage 인터페이스를 구현한다.

CompletionStage의 특징을 살펴보면 CompletableFuture의 장점을 알 수 있다.

 

CompletionStage는 결국은 계산이 완료될 것이라는 의미의 약속이다.

계산의 완료는 단일 단계의 완료뿐만 아니라 다른 여러 단계 혹은 다른 여러 단계 중의 하나로 이어질 수 있음도 포함한다. 

또한, 각 단계에서 발생한 에러를 관리하고 전달할 수 있다.

※ 비동기 연산 Step을 제공해서 체이닝 형태로 조합이 가능하며, 완료 후 콜백이 가능하다.

 

기본적인 사용 방법

- runAsync

반환 값이 없는 경우 비동기 작업 실행

CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
cf.join(); // Hello World!

runAsync()는 Runnable 타입을 파라미터로 전달하기 때문에 어떤 결과 값을 담지 않는다.

 

- supplyAsync

반환 값이 있는 경우 비동기 작업 실행

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() ->  "Hello World!");
System.out.println(cf.join()); // Hello World!

반면, supplyAsync()는 supplier 타입을 넘기기 때문에 반환 값이 존재한다.

 

또한, 결과를 get() 또는 join() 메소드로 가져올 수 있는데 각 차이점은 다음과 같다.

  • get() - Future 인터페이스에 정의된 메소드로 checked exceptionInterruptedExceptionExecutionException을 던지므로 예외 처리 로직이 반드시 필요하다.
  • join() - CompletableFuture에 정의되어 있으며, checked  exception을 발생시키지 않는 대신 unchecked CompletionException이 발생된다.

일반적으로 join()을 사용하는 것이 권장되지만, 예외 처리에 대한 추가 로직이 필요할 때 혹은 timeout 설정을 해야 하는 경우 get()을 사용하자.

 

runAsync와 supplyAsync는 기본적으로 ForkJoinPool의 commonPool()을 사용해 작업을 실행한 스레드를 스레드 풀로부터 얻어 실행시킨다. 만약 원하는 스레드 풀을 사용하려면, ExecutorService를 파라미터로 넘겨주면 된다.

작업 콜백

비동기 실행이 끝난 후에 다음과 같이 체이닝 형태로 작성하여 전달 받은 작업 콜백을 실행시켜 준다.

 

- thenApply

함수형 인터페이스 Function 타입을 파라미터로 받으며, 반환 값을 받아서 다른 값을 반환해주는 콜백이다.

thenApply는 앞선 계산의 결과를 콜백 함수로 전달된 Function을 실행한다.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenApply(s -> {
    return s.toUpperCase();
});
System.out.println(cf.join()); // HELLO WORLD!

 

- thenAccept

함수형 인터페이스 Consumer를 파라미터로 받으며, 반환 값을 받아 처리하고 값을 반환하지 않는 콜백이다.

CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenAccept(System.out::println);

cf.join(); // hello world!

 

- thenRun

함수형 인터페이스 Runnable을 파라미터로 받으며, 반환 값을 받지 않고 그냥 다른 작업을 처리하고 값을 반환하지 않는  콜백이다.

CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenRun(() -> System.out.println("hello coco world!"));

cf.join(); // hello coco world!;

 

비동기 작업 콜백

- thenApplyAsync

앞선 계산의 결과를 콜백 함수로 전달된 Function을 별도의 스레드에서 비동기적으로 실행한다.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenApplyAsync(s -> {
    return s.toUpperCase();
});
System.out.println(cf.join()); // HELLO WORLD!

 

- thenAcceptAsync

앞선 계산의 결과를 콜백 함수로 전달된 Consumer를 별도의 스레드에서 비동기적으로 실행한다.

CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenAcceptAsync(System.out::println);

cf.join(); // hello world!

 

- thenRunAsync

앞선 계산의 결과와 상관없이 주어진 작업을 별도의 스레드에서 비동기적으로 실행한다.

CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
    return "hello world!";
}).thenRunAsync(() -> System.out.println("hello coco world!"));

cf.join(); // hello coco world!

※ xxxAsync가 붙은 메소드는 기본적으로 ForkJoinPool의 commonPool을 사용하며, 
두 번째 인수를 받는 오버로드 메소드에서 다른 Thread Executor를 선택적으로 사용할 수도 있다.

 

비동기 작업 콜백 메소드들은 주로 병렬로 수행되는 작업이나 I/O 작업과 같이 시간이 오래 걸리는 작업을 할 때 유용하게 활용된다.

 

다른 비동기 작업과 조합하기

- thenCompose

두 작업을 이어서 실행하도록 조합하며, 앞선 작업의 결과를 받아서 사용할 수 있다.

함수형 인터페이스 Function을 파라미터로 받는다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

CompletableFuture<String> helloWorld = hello.thenCompose(this::world);

System.out.println(helloWorld.join()); // Hello World!



private CompletableFuture<String> world(String message) {
    return CompletableFuture.supplyAsync(() -> {
        return message + " " + "World!";
    });
}

 

- thenCombine

각 작업을 독립적으로 실행하고, 모두 완료되었을 때 결과를 받아서 사용할 수 있다.

함수형 인터페이스 Function을 파라미터로 받는다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    return "World!";
});

CompletableFuture<String> cf = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(cf.join()); // Hello World!

 

- allOf

여러 작업들을 동시에 실행하고, 모든 작업 결과에 콜백을 실행한다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    return "World!";
});

List<CompletableFuture<String>> futures = List.of(hello, world);
CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));

System.out.println(result.join()); // [Hello, World!]
result.join().forEach(System.out::println);
// Hello
// World!

 

- anyOf

여러 작업들 중에 가장 빨리 끝난 하나의 결과에 콜백을 실행한다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {}

    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    return "World!";
});

CompletableFuture<Void> anyOfFuture = CompletableFuture.anyOf(hello, world)
        .thenAccept(System.out::println);

anyOfFuture.join(); // World!

 

에러 핸들링

Future에서 에러 핸들링 할 수 없던 문제를 CompletableFuture는 어떻게 해결했는지 알아보자.

 

- exceptionally

발생한 에러를 받아서 예외를 처리한다.

함수형 인터페이스 Function을 파라미터로 받는다.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    int divisionByZero = 2 / 0;
    return "success";
}).exceptionally(e -> { // e is wrapped with CompletionException
    return e.toString();
});

System.out.println(cf.join()); // java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

 

- handle

(결과값, 에러)를 반환받아 에러가 발생한 경우와 아닌 경우 모두를 처리할 수 있다.

함수형 인터페이스 BiFunction을 파라미터로 받는다.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    int divisionByZero = 2 / 0;
    return "success";
}).handle((result, e) -> { // e is wrapped with CompletionException
    return e == null ? result : e.toString();
});;

System.out.println(cf.join()); // java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

 

 

여기까지 비동기 처리를 위한 CompletableFuture 사용 방법에 대해 알아보았고, 다음 글에 이어서 ThreadPoolTaskExecutor 설정으로 보다 효과적으로 비동기 처리 하는 방법을 알아보도록 하자.

 

참고

https://stackoverflow.com/questions/45490316/completablefuturet-class-join-vs-get

https://mangkyu.tistory.com/263

https://devidea.tistory.com/34

반응형

블로그의 정보

슬기로운 개발생활

coco3o

활동하기