Run a list of futures:

List<String> taskNames = ...
taskNames.stream()
.map(task->CompletableFuture
.supplyAsync(() -> {
System.out.println(task);
return true;
})).map(CompletableFuture::join).collect(Collectors.toList());

Future

  • get: a blocking method that waits until the completable future is finished – this is not a java 8 feature!

CompletableFuture

  • supplyAsync:  runs a supplier asynchronously.
  • runAsync:runsRunnable asynchronously.
  • thenApply(map): process the execution result of the supplyAsync and return it
  • thenAccept: process the execution result and process the result in place without returning anything.
  • allOf: run multiple futures. Doesn’t return the result of all the completableFutures. Need to handle each result alone.

 

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() > "Hello");
CompletableFuture<String> future2  = CompletableFuture.supplyAsync(() > "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() > "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
// the result of each one can be processed as follows:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));

public class Main {
public class RunnableJob implements Runnable{
private final String str;
private final Consumer<JobResult> consumer;
public RunnableJob(Consumer<JobResult> consumer, String str){
this.str = str;
this.consumer = consumer;
}
@Override
public void run() {
int sec = (int) (Math.abs(Math.random())*6 + 1);
System.out.println("Running "+ str + " – takes " + sec + " seconds");
try {
Thread.sleep(sec*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.accept(new JobResult(str));
}
}
public void runJobs(){
Consumer<JobResult> consumer = (input)>{
System.out.println("Consuming " + input.toString());
};
List<CompletableFuture<Void>> futures = new ArrayList<>();
for(int i=0;i<10;i++) {
futures.add(CompletableFuture
.runAsync(new RunnableJob(consumer, "Job " + i))
.thenRun(new RunnableJob(consumer, "job " + i + " finished")));
}
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.<Void>toList());
System.out.println("REACHED END OF runJobs");
}
public static void main(String[] args){
new Main().runJobs();
}
}
/****
Output is:
—————————————
Running Job 0 – takes 1 seconds
Running Job 1 – takes 2 seconds
Running Job 3 – takes 2 seconds
Running Job 2 – takes 1 seconds
Running Job 4 – takes 6 seconds
Running Job 5 – takes 5 seconds
Running Job 6 – takes 5 seconds
Consuming JobResult:Job 0
Consuming JobResult:Job 2
Running job 2 finished – takes 2 seconds
Running job 0 finished – takes 4 seconds
Consuming JobResult:Job 3
Consuming JobResult:Job 1
Running job 1 finished – takes 5 seconds
Running job 3 finished – takes 4 seconds
Consuming JobResult:job 2 finished
Running Job 7 – takes 3 seconds
Consuming JobResult:Job 6
Consuming JobResult:job 0 finished
Consuming JobResult:Job 5
Running Job 8 – takes 6 seconds
Running job 6 finished – takes 3 seconds
Running job 5 finished – takes 6 seconds
Consuming JobResult:Job 4
Running job 4 finished – takes 5 seconds
Consuming JobResult:job 3 finished
Consuming JobResult:Job 7
Running job 7 finished – takes 1 seconds
Running Job 9 – takes 2 seconds
Consuming JobResult:job 1 finished
Consuming JobResult:job 7 finished
Consuming JobResult:job 6 finished
Consuming JobResult:Job 9
Running job 9 finished – takes 5 seconds
Consuming JobResult:job 4 finished
Consuming JobResult:job 5 finished
Consuming JobResult:Job 8
Running job 8 finished – takes 1 seconds
Consuming JobResult:job 8 finished
Consuming JobResult:job 9 finished
REACHED END OF runJobs
***/

view raw
TestingAllOf.java
hosted with ❤ by GitHub

  • thenRun: runs a runnable after the supplyAsync or runAsync.

 

public class RunnableJob implements Runnable{
@Override
public void run() {
}
}
public void runJobs(){
CompletableFuture<Void> f = CompletableFuture.runAsync(new RunnableJob());
f.thenRun(new RunnableJob());
CompletableFuture<RunnableJob> f2 = CompletableFuture.supplyAsync(()>new RunnableJob());
f2.thenRun(new RunnableJob());
}

view raw
completable.java
hosted with ❤ by GitHub

  • Async Threads: Runnable and Supply, we can use them to return completable future.
  • Monadic design pattern: Combining completable futures in chain using thenCompose and the thenApply which receives the result of the previous completable future.
  • thenCompose(flatMap): receives a function that returns another object of the same type. The input of the lambda is the completableFuture result just like the thenApply.
  • thenCombine: used when we want to use the result of two futures.
  • thenAcceptBoth: doesn’t send result down the future

Functional Interfaces in Java 8

Predicate

Represents a predicate (boolean-valued function) of one argument.

Predicate isAnAdult = age -> age >= 18;

we can actually try using our new predicate in a stream.

 

Predicate<Person> isAnAdult = person > person.getAge() >= 18;
List<Person> people = getAllPeople();
Integer nrOfAdults = people.stream() .filter(isAnAdult).count();

view raw
Predicate1.java
hosted with ❤ by GitHub

Consumer

Represents an operation that accepts a single input argument and returns no result.

Consumer ticketPrinter = ticket -> ticket.print();

The new forEach method in the Iterable interface takes the Consumer interface as an argument:

Collection tickets = getTicketsToPrint(); 
tickets.forEach(ticket -> ticket.print());

Supplier

This is kind of a factory. It takes no arguments, and just gives you a result. Perfect for returning an instance.

Supplier ticketHandlerCreator = () -> new TicketHandler();

Another solution is to use the constructor reference.

Supplier ticketHandlerCreator = TicketHandler::new;

Function<T,R>

Represents a function that accepts one argument and produces a result. Let’s just go straight to an example.

 

Function<String, Predicate<Ticket>> ticketFor = event > ticket > event.equals(ticket.getName());
List<Ticket> tickets = getAllTickets();
Integer soldTicketsForCoolEvent = tickets.stream()
.filter(ticketFor.apply("CoolEvent")).count();