Extending Stream with removeAll, retainAll – Functional style programming – extending API
191. Extending Stream with removeAll, retainAll
Before reading this problem I strongly recommend you to read Problem 190.In Problem 190, we have extended the Stream API with two final operations named containsAll() and containsAny() via a custom interface and by extending the Stream API. In both cases, the resulting interface was named Streams. In this problem, we follow the same logic to implement two intermediate operations named removeAll() and retainAll() with the following signatures:
Streams<T> remove(T item);
Streams<T> removeAll(T… items);
Streams<T> removeAll(List<? extends T> items);
Streams<T> removeAll(Stream<? extends T> items);
Streams<T> retainAll(T… items);
Streams<T> retainAll(List<? extends T> items);
Streams<T> retainAll(Stream<? extends T> items);
Since removeAll() and retainAll() are intermediate operations they have to return Stream. More precisely, they have to return Streams which is our implementation based on a custom interface or an interface that extends Stream.
Exposing removeAll()/retainAll() via a custom interface
The removeAll(Stream<? extends T> items) relies on a Set to accomplish its job as follows (you can challenge yourself to find an alternative implementation):
default Streams<T> removeAll(Stream<? extends T> items) {
Set<? extends T> set = toSet(items);
if (set.isEmpty()) {
return this;
}
return from(stream().filter(item -> !set.contains(item)));
}
The retainAll(Stream<? extends T> items) relies also on a Set:
default Streams<T> retainAll(Stream<? extends T> items) {
Set<? extends T> set = toSet(items);
if (set.isEmpty()) {
return from(Stream.empty());
}
return from(stream().filter(item -> set.contains(item)));
}
The toSet() method is just a helper that collects the Stream items into a Set:
static <T> Set<T> toSet(Stream<? extends T> stream) {
return stream.collect(Collectors.toSet());
}
Next, we can sneak these default methods in a custom interface named Streams exactly as we did in Problem 190:
@SuppressWarnings(“unchecked”)
public interface Streams<T> {
Stream<T> stream();
static <T> Streams<T> from(Stream<T> stream) {
return () -> stream;
}
// removeAll()/retainAll() default methods and toSet()
}
There is a big problem with this implementation. The problem becomes obvious when we try to chain removeAll()/retainAll() in a stream pipeline next to other Stream operations. Because these two methods return Streams (not Stream) we cannot chain a Stream operation after them without calling before the Java built-in stream(). This is needed for switching from Streams to Stream. Here is an example (cars, car1, car2, car3, and car123 have been introduced in Problem 190):
Streams.from(cars.stream())
.retainAll(cars123)
.removeAll(car1, car3)
.stream()
.forEach(System.out::println);
The problem becomes even worse if we have to alternate between Streams and Stream multiple times. Check out this zombie:
Streams.from(Streams.from(cars.stream().distinct())
.retainAll(car1, car2, car3)
.stream()
.filter(car -> car.getFuel().equals(“electric”)))
.removeAll(car2)
.stream()
.forEach(System.out::println);
This hack is not a happy choice for enriching the Stream API with intermediate operations. However, it works quite well for terminal operations. So, the proper approach is to extend the Stream interface.
Exposing removeAll/retainAll() via an extension of Stream
We already know from Problem 190 how to extend the Stream interface. The implementation of removeAll() is also straightforward:
@SuppressWarnings(“unchecked”)
public interface Streams<T> extends Stream<T> {
default Streams<T> remove(T item) {
return removeAll(item);
}
default Streams<T> removeAll(T… items) {
return removeAll(Stream.of(items));
}
default Streams<T> removeAll(List<? extends T> items) {
return removeAll(items.stream());
}
default Streams<T> removeAll(Stream<? extends T> items) {
Set<? extends T> set = toSet(items);
if (set.isEmpty()) {
return this;
}
return filter(item -> !set.contains(item))
.onClose(items::close);
}
…
And, retainAll() follows in the same manner:
default Streams<T> retainAll(T… items) {
return retainAll(Stream.of(items));
}
default Streams<T> retainAll(List<? extends T> items) {
return retainAll(items.stream());
}
default Streams<T> retainAll(Stream<? extends T> items) {
Set<? extends T> set = toSet(items);
if (set.isEmpty()) {
return from(Stream.empty());
}
return filter(item -> set.contains(item))
.onClose(items::close);
}
…
}
As you know from Problem 190, next we have to override all the Stream methods to return Streams. While this part is available in the bundled code, here is an example of using removeAll()/retainAll():
Streams.from(cars.stream())
.distinct()
.retainAll(car1, car2, car3)
.filter(car -> car.getFuel().equals(“electric”))
.removeAll(car2)
.forEach(System.out::println);
As you can see, this time the stream pipeline looks quite good. There is no need to perform switches between Streams and Stream via stream() calls. So, mission accomplished!