public final class AsyncDocumentation extends Object
Suppose you want to show a customer detail page with the cart items and the customer data. For doing this, you need to fetch the cart and the customer. Let's suppose fetching those two unrelated documents from the commercetools Composable Commerce APIs takes 100ms for each document.
final String customerId = "customer-id";//time t=0 final Customer customer = executeSerial(CustomerByIdGet.of(customerId));//t=100ms final Cart cart = executeSerial(CartByCustomerIdGet.of(customerId));//t=200ms println("cart: " + cart + " customer: " + customer);
See the test code.
So it takes around 200ms since the requests are done one after another. By fetching them in parallel 100ms of time can be saved.
final String customerId = "customer-id";//time t=0 final CompletionStage<Customer> customerStage = execute(CustomerByIdGet.of(customerId));//t=1ms //after creating the CompletionStage the Thread is freed to start further requests final CompletionStage<Cart> cartStage = execute(CartByCustomerIdGet.of(customerId));//t=2ms //collect the results customerStage.thenAcceptBoth(cartStage, (customer, cart) -> { //t=102ms println("cart: " + cart + " customer: " + customer); });
See the test code.
Using futures (We use it here as synonym for CompletableFuture
and CompletionStage
.) can be very handy for executing code in parallel.
You can use future APIs to run code in separate Threads so that the result will not be immediately available, but in the future. The overhead of creating a future can be lower than the overhead of creating new Thread.
final String customerId = "customer-id";//time t=0 final CompletionStage<Customer> customerStage = execute(CustomerByIdGet.of(customerId));//t=1ms //after creating the CompletionStage the Thread is freed to start further requests final CompletionStage<Cart> cartStage = execute(CartByCustomerIdGet.of(customerId));//t=2ms //collect the results final CompletionStage<String> resultStage = customerStage.thenCompose(customer -> cartStage.thenApply(cart -> { //t=102ms return "cart: " + cart.getCustomerId() + " customer: " + customer.getId();//do some computation }) ); final String result = SphereClientUtils.blockingWait(resultStage, 500, TimeUnit.MILLISECONDS); assertThat(result).isEqualTo("cart: " + customerId + " customer: " + customerId);
See the test code.
import io.sphere.sdk.client.SphereClient;
import io.sphere.sdk.products.Product;
import io.sphere.sdk.products.commands.ProductDeleteCommand;
import java.time.Duration;
import java.util.List;
import static io.sphere.sdk.client.SphereClientUtils.blockingWaitForEachCollector;
public class AsyncCollectorDemo { public static void demo(final SphereClient client, final List<Product> products) { final List<Product> deletedProducts = products.stream() .map(product -> client.execute(ProductDeleteCommand.of(product))) .collect(blockingWaitForEachCollector(Duration.ofSeconds(10))); } }
See the test code.
CompletionStage.thenApply(java.util.function.Function)
and CompletionStage.thenCompose(java.util.function.Function)
will only be called if the future finishes successfully.
CompletionStage.thenApply(java.util.function.Function)
.
final String customerId = "customer-id"; final CompletionStage<Customer> customerStage = execute(CustomerByIdGet.of(customerId)); final CompletionStage<String> pageStage = customerStage.thenApply(customerOption -> "customer page " + customerOption);
See the test code.
CompletionStage.thenApply(java.util.function.Function)
you apply a function to a stage
if this stage completes successfully. The function is a first class member, so you can store it in a value or even make it the return type of
a method.
final String customerId = "customer-id"; final CompletionStage<Customer> customerStage = execute(CustomerByIdGet.of(customerId)); //stored in a value final Function<Customer, String> f = customer -> "customer page " + customer; final CompletionStage<String> pageStage = customerStage.thenApply(f);
See the test code.
import io.sphere.sdk.customers.Customer;
import io.sphere.sdk.customers.queries.CustomerByIdGet;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
public class FunctionAsReturnValueDemo extends AsyncDocumentationTest { public static final Function<Customer, String> renderCustomerPage(final String title) { return customerOption -> title + " " + customerOption; } public static void showUsage() { final String customerId = "customer-id"; final CompletionStage<Customer> customerStage = execute(CustomerByIdGet.of(customerId)); final CompletionStage<String> pageStage = customerStage.thenApply(renderCustomerPage("customer page")); } }
See the test code.
Stream.map(java.util.function.Function)
.
final List<Person> persons = asList(new Person("John", "Smith"), new Person("Michael", "Müller")); final List<String> lastNames = persons.stream().map(Person::getLastName).distinct().collect(toList()); assertThat(lastNames).isEqualTo(asList("Smith", "Müller"));
See the test code.
final String cartIt = "cart-id"; final CompletionStage<Cart> cartStage = execute(CartByIdGet.of(cartIt)); final Function<Cart, CompletionStage<ProductProjection>> f = cart -> { final LineItem lineItem = cart.getLineItems().get(0); final String productId = lineItem.getProductId(); final CompletionStage<ProductProjection> product = execute(ProductProjectionByIdGet.of(productId, CURRENT)); return product; }; // CompletionStage of CompletionStage, urgs! final CompletionStage<CompletionStage<ProductProjection>> productStageStage = cartStage.thenApply(f);
See the test code.
CompletionStage
of CompletionStage
,
you can use CompletionStage.thenCompose(java.util.function.Function)
.
final String cartIt = "cart-id"; final CompletionStage<Cart> cartStage = execute(CartByIdGet.of(cartIt)); final Function<Cart, CompletionStage<ProductProjection>> f = cart -> { final LineItem lineItem = cart.getLineItems().get(0); final String productId = lineItem.getProductId(); final CompletionStage<ProductProjection> product = execute(ProductProjectionByIdGet.of(productId, CURRENT)); return product; }; //no nested CompletionStage, by using thenCompose instead of thenApply final CompletionStage<ProductProjection> productStageStage = cartStage.thenCompose(f);
See the test code.
Stream.flatMap(java.util.function.Function)
.
final List<Person> persons = asList(new Person("John", "Smith"), new Person("Michael", "Müller")); //map causes Stream of Stream final Stream<Stream<Integer>> streamStream = persons.stream() .map(person -> person.getLastName().chars().boxed()); //flatMap final Stream<Integer> simpleStream = persons.stream() .flatMap(person -> person.getLastName().chars().boxed()); assertThat(simpleStream.collect(toList())) .isEqualTo(asList(83, 109, 105, 116, 104, 77, 252, 108, 108, 101, 114));
See the test code.
final CompletionStage<String> stage = getFuture(); //when completed, access only value if present stage.thenAcceptAsync(value -> logger.info("Fetched successfully " + value)); //when completed, access value or error, one of them is null final CompletionStage<String> resultLikeOriginalStage = stage.whenCompleteAsync((nullableValue, nullableError) -> { if (nullableValue != null) { logger.info("Fetched successfully " + nullableValue); } else { logger.error("Did not fetch successfully.", nullableError); } });
See the test code.
CompletionStage.whenComplete(java.util.function.BiConsumer)
keeps the result as it is and performs side-effects,
so it is nice to log in between and then map the stage to a new one:
final CompletionStage<Response> stage = getResponse(); final CompletionStage<String> contentTypeStage = stage .whenComplete((nullableResponse, e) -> logger.debug("response: " + nullableResponse)) .thenApply(response -> response.getContentType());
See the test code.
final CompletableFuture<String> future = CompletableFuture.completedFuture("result");
See the test code.
final CompletableFuture<String> future = new CompletableFuture<>(); assertThat(future.isDone()).isFalse(); assertThat(future.isCompletedExceptionally()).isFalse(); final String resultOfAComputation = "result"; future.complete(resultOfAComputation); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isFalse();
See the test code.
final CompletableFuture<String> future = CompletableFutureUtils.successful("result");
See the test code.
final CompletableFuture<String> future = new CompletableFuture<>(); assertThat(future.isDone()).isFalse(); assertThat(future.isCompletedExceptionally()).isFalse(); final Throwable throwable = new Exception(); future.completeExceptionally(throwable); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isTrue();
See the test code.
Using an SDK shortcut:
final CompletableFuture<String> future = CompletableFutureUtils.failed(new Exception());
See the test code.
If you complete a future, it is possible that the same Thread is used for functional compositions or executing callbacks. If you don't want this, the calls need to use the methods which end with "Async".
CompletionStage
does not provide immediate or blocking access to its value or error,
but it is possible, but not encouraged to transform the CompletionStage
with CompletionStage.toCompletableFuture()
to a CompletableFuture
.
final CompletableFuture<String> future = CompletableFuture.completedFuture("hi"); final String actual = future.join(); assertThat(actual).isEqualTo("hi");
See the test code.
Future completes in time:
final CompletableFuture<String> future = CompletableFuture.completedFuture("hi"); final String actual = future.get(12, TimeUnit.MILLISECONDS); assertThat(actual).isEqualTo("hi");
See the test code.
future does not complete in time:
final CompletableFuture<String> futureThatTakesTooLong = new CompletableFuture<>(); Assert.assertThrows(TimeoutException.class, () -> futureThatTakesTooLong.get(12, TimeUnit.MILLISECONDS));
See the test code.
Future completed:
final CompletableFuture<String> future = CompletableFuture.completedFuture("success in time"); final String value = future.getNow("alternative"); assertThat(value).isEqualTo("success in time");
See the test code.
Future did not yet complete:
final CompletableFuture<String> incompleteFuture = new CompletableFuture<>(); final String value = incompleteFuture.getNow("alternative"); assertThat(value).isEqualTo("alternative");
See the test code.
Workaround if the value should be lazy computed:
final CompletableFuture<String> incompleteFuture = new CompletableFuture<>(); final String value = CompletableFutureUtils //SDK utils class .orElseGet(incompleteFuture, () -> "ALTERNATIVE".toLowerCase()); assertThat(value).isEqualTo("alternative");
See the test code.
CompletableFutureUtils.orElseGet(CompletionStage, Supplier)
.
Workaround if exception should be thrown:
final CompletableFuture<String> incompleteFuture = new CompletableFuture<>(); Assert.assertThrows(WhatEverException.class, () -> CompletableFutureUtils //SDK utils class .orElseThrow(incompleteFuture, WhatEverException::new));
See the test code.
final CompletableFuture<String> future = CompletableFuture.completedFuture("success in time"); final String value = CompletableFutureUtils //SDK utils class .orElseThrow(future, () -> new WhatEverException()); assertThat(value).isEqualTo("success in time");
See the test code.
CompletableFutureUtils.orElseThrow(CompletionStage, Supplier)
.
future.join() |
future.get() |
future.get(12, TimeUnit.MILLISECONDS) |
future.getNow("default") |
|
returns value if present |
x |
x |
x |
x |
blocks potentially forever |
x |
x |
||
uses alternative, if value not present |
x |
|||
throws TimeoutException |
x |
|||
throws CompletionException |
x |
x |
||
throws ExecutionException |
x |
x |
||
throws only unchecked Exceptions |
x |
|
|
x |
number of arguments | behavior | checked Exception | purpose | |
Function <T,R> |
1 | create value | transforms one value into another | |
BiFunction <T,U,R> |
2 | create value | transforms two values into another | |
Consumer <T> |
1 | side-effects | side effect for one value | |
BiConsumer <T,U> |
2 | side-effects | side effect for two values | |
Supplier <T> |
0 | create value | on-demand creation of a value | |
Callable <V> |
0 | create value | x | like Supplier but throws Exception |
Runnable |
0 | side-effects | task which causes side-effects |
Which Thread is used for functional composition and callbacks depends on the method.
For CompletionStage.thenApply(Function)
,
CompletionStage.thenAccept(Consumer)
,
CompletionStage.handle(BiFunction)
etc. exist three variants:
CompletionStage.thenApply(Function)
, no suffix, if the future is not yet completed, the the thread which calls
CompletableFuture.complete(Object)
is used to apply the function, if the future is completed, the thread which calls CompletionStage.thenApply(Function)
is used.
So this method is discouraged if you use actors or tend to block threads.CompletionStage.thenApplyAsync(Function)
with suffix "Async" calls the function inside a Thread of ForkJoinPool.commonPool()
. So you are better protected against deadlocks.CompletionStage.thenApplyAsync(Function, Executor)
with suffix "Async" and additional Executor
parameter calls the function inside a Thread pool you specify as second parameter.If an exception occurs with the computation, it should be propagated to the future with CompletableFuture.completeExceptionally(Throwable)
, but only once in the lifetime of the future.
As a result the following try catch block does not make sense, since the error is inside the future which is most likely computed in another Thread:
final Service service = new Service(); try { final CompletionStage<String> result = service.execute(); } catch (final WhatEverException e) { //catch block does not make sense }
See the test code.
final CompletableFuture<String> future = new CompletableFuture<>(); future.completeExceptionally(new WhatEverException()); final CompletableFuture<String> hardenedFuture = future.exceptionally(e -> "a default value"); assertThat(hardenedFuture.join()).isEqualTo("a default value");
See the test code.
final CompletableFuture<String> future = new CompletableFuture<>(); future.completeExceptionally(new WhatEverException()); final CompletableFuture<String> hardenedFuture = future.exceptionally(e -> "a default value"); assertThat(hardenedFuture.join()).isEqualTo("a default value");
See the test code.
CompletionStage.exceptionally(Function)
is like applying CompletionStage.thenApply(Function)
and then CompletionStage.exceptionally(Function)
:
final CompletableFuture<String> future = CompletableFuture.completedFuture("hi"); final CompletableFuture<String> viaHandle = future.handle((nullableValue, nullableError) -> nullableValue != null ? nullableValue.toUpperCase() : "DEFAULT" ); final CompletableFuture<String> viaThenApply = future .thenApply(value -> value.toUpperCase()).exceptionally(e -> "DEFAULT"); assertThat(viaHandle.join()).isEqualTo(viaThenApply.join()).isEqualTo("HI");
See the test code.
You can use the exceptions to give error specific text to the user:
final CompletionStage<String> stage = new Service().execute(); final CompletionStage<String> result = stage.exceptionally(e -> { if (e instanceof PriceChangedException) { return "price changed, is now " + ((PriceChangedException) e).getCurrentPrice(); } else if (e instanceof OutOfStockException) { return "out of stock"; } else { return "oops"; } });
See the test code.
But you do not need to cover all problems:
final CompletableFuture<String> stage = CompletableFutureUtils.failed(new WhatEverException()); final Function<Throwable, String> f = e -> { if (e instanceof PriceChangedException) { return "price changed, is now " + ((PriceChangedException) e).getCurrentPrice(); } else if (e instanceof OutOfStockException) { return "out of stock"; } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { //wrap with CompletionException it checked exceptions //it works out with future.get and future.join throw new CompletionException(e); } }; final CompletableFuture<String> hardened = stage.exceptionally(f); trying(() -> hardened.join(), e -> assertExceptionAndCause(e, CompletionException.class, WhatEverException.class)); //now with a checked exception final CompletableFuture<String> withChecked = CompletableFutureUtils.failed(new EvilCheckedException()); trying(() -> withChecked.join(), e -> assertExceptionAndCause(e, CompletionException.class, EvilCheckedException.class)); trying(() -> withChecked.get(), e -> assertExceptionAndCause(e, ExecutionException.class, EvilCheckedException.class));
See the test code.
CompletionStage | CompletableFuture | |
type | interface | concrete class |
implements CompletionStage | x | x |
functional composition | x | x |
implements Future interface | x | |
can be filled with value or exception | x | |
can be cancelled | x | |
can check if completed | x | |
blocking usage directly possible | x | |
provides static methods for creation | x |
value | effect | Function param | Consumer param | Runnable param | maps value | maps error | or | and | Scala Future | Play F.Promise | |
thenApply | x | x | x | map | map | ||||||
thenCompose | x | x | x | flatMap | flatMap | ||||||
thenAccept | x | x | x | onSuccess | onSuccess | ||||||
thenRun | x | x | |||||||||
exceptionally | x | x | x | recover | recover | ||||||
handle | x | x | x | x | andThen | ||||||
whenComplete | x | x | x | x | x | ||||||
acceptEither | x | x | x | x | |||||||
thenAcceptBoth | x | x | x | x | zip | zip | |||||
applyToEither | x | x | x | x | fallbackTo | fallbackTo | |||||
thenCombine | x | x | x | x | |||||||
runAfterEither | x | x | x | ||||||||
runAfterBoth | x | x | x |