I'm experimenting with the idea of โโbuilding a parallel collector for the Java Stream API (so that operations can run in parallel, not just on the common FJP).
A basic implementation is quite simple. However, once you try to implement a parallelism limitation, it becomes pretty tricky.
The simplest way to implement it would be to rely on a Semaphore
example, but in my approach, the result is wrapped in CompletableFuture
Thus, the entire computation must be executed asynchronously to avoid generating a blocking future.
To do this, I created a queue of work managed by a single distributor thread (protected by a Semaphore
example).
To comply with Collector
API, in addition, I had to create a queue not yet completed CompletableFuture
instances, so that you can retrieve them later and finish once the results are calculated.
I have tested it thoroughly and it seems to work, but I'm sure it must contain a lot of multithreading / simultaneous access bugs …
Please suggest improvements.
Here is the code:
AbstractParallelCollector
which serves as a basis:
abstractParallelCollector abstract class <T, R1, R2 extends the collection>
implements Collector <T, List <CompletableFuture>, CompleteFuture> {
executor final executor;
final function surgery;
final supplier CollectionFactory;
AbstractParallelCollector (
A function surgery,
Provider collection,
Executor executor) {
this.executor = executor;
this.collectionFactory = collection;
this.operation = operation;
}
@Override
public Suppliers <List <CompletableFuture>> provider () {
return () -> synchronizedList (new ArrayList <> ());
}
@Override
public BiConsumer <List <CompleteFuture>, T> accumulator () {
return (processing, e) -> processing.add (supplyAsync () -> operation.apply (e), executor));
}
@Override
public BinaryOperator <List <CompleteFuture>> combinator () {
back (left, right) -> {
left.addAll (right);
turn left;
};
}
@Override
public Function <List <CompletableFuture>, CompleteFuture> finisher () {
return of futures -> futures.stream ()
.reduce (completedFuture (collectionFactory.get ()),
accumulationResults (),
mergingPartialResults ());
}
@Override
public abstract set characteristics();
private static <T1, R1 extends the collection> BinaryOperator <CompletableFuture> mergingPartialResults () {
return (f1, f2) -> f1.thenCombine (f2, (left, right) -> {
left.addAll (right);
turn left;
});
}
private static <T1, R1 extends the collection> BiFunction <CompleteFuture, CompleteFuture, CompleteFuture> accumatingResults () {
return (list, object) -> list.thenCombine (object, (left, right) -> {
left.add (right);
turn left;
});
}
}
A specific implementation limiting parallelism:
Throttling classParallelCollector <T, R, C Collection>
extends AbstractParallelCollector
implements AutoCloseable {
private final ExecutorService dispatcher = newSingleThreadExecutor (new CustomThreadFactory ());
private final BlockingQueue <Vendor> taskQueue = new LinkedBlockingQueue <> ();
final final private ConcurrentLinkedQueue <CompletableFuture> pending = new ConcurrentLinkedQueue <> ();
private final semaphore license;
ThrottlingParallelCollector (
A function surgery,
Provider CollectionFactory,
Executor,
int parallelism) {
super (operation, collectionFactory, executor);
permit = new semaphore (parallelism);
dispatcher.execute (dispatcherThread ());
}
@Override
public BiConsumer <List <CompleteFuture>, T> accumulator () {
return (acc, e) -> {
CompletableFuture future = new CompleteFuture <> ();
waiting for (future);
acc.add (future);
taskQueue.add (() -> {
try {
return.apply (e) operation;
} finally {
permissions.release ();
}
});
};
}
@Override
public set characteristics() {
return EnumSet.of (Characteristics.UNORDERED);
}
@Override
public Function <List <CompletableFuture>, CompleteFuture> finisher () {
returns super.finisher ()
.etThen (f -> {
try {
returns f;
} finally {
dispatcher.shutdown ();
}
});
}
@Override
public void close () {
dispatcher.shutdown ();
}
private Runnable dispatcherThread () {
return () -> {
while (! Thread.currentThread (). isInterrupted ()) {
try {
permissions.acquire ();
runAsyncAndComplete (taskQueue.take ());
} catch (InterruptedException e) {
permissions.release ();
Thread.currentThread (). Interrupt ();
Pause;
} catch (Exception e) {
permissions.release ();
launch e;
}
}
};
}
runAsyncAndComplete (provider) task) {
supplyAsync (task, executor)
.thenAccept (result -> Objects.requireNonNull (waiting.poll ()). complete (result);
}
The Private CustomThreadFactory class implements ThreadFactory {
final final private defaultThreadFactory = Executors.defaultThreadFactory ();
@Override
Public topic newThread (executable task) {
Thread = defaultThreadFactory.newThread (task);
thread.setName ("parallel-executor-" + thread.getName ());
thread.setDaemon (true);
return of the wire;
}
}
}
A static factory method:
public static Collector <T, List <CompletableFuture>, CompleteFuture <List>> inParallelToList (Function mapper, executor executor, int parallelism) {
requireNonNull (executor);
requireNonNull (mapper);
assertParallelismValid (parallelism);
returns the new ThrottlingParallelCollector <> (mapper, ArrayList :: new, executor, assertParallelismValid (parallelism));
}
And a super simple example:
CompleteFuture <List> result = Flow of (1, 2, 3)
.collect (inParallelToList (i -> foo (), executor, 2));