Logging in predicates 2 – Functional style programming – extending API

Exposing constainsAll/Any() via an extension of Stream

The previous solution can be considered more like a hack. A more logical and realistic solution will consist of extending the built-in Stream API and adding our containsAll/Any() methods as teammates next to the Stream operations. So, the implementation starts as follows:

@SuppressWarnings(“unchecked”)
public interface Streams<T> extends Stream<T> {      
  …
}

Before implementing the containsAll/Any() methods, we need to handle some aspects resulting from extending the Stream interface. First, we need to override in Streams each of the Stream methods. Since the Stream interface has a lot of methods, we list here only a few of them:

@Override
public Streams<T> filter(Predicate<? super T> predicate);
@Override
public <R> Streams<R> map(
  Function<? super T, ? extends R> mapper);

@Override
public T reduce(T identity, BinaryOperator<T> accumulator);

@Override
default boolean isParallel() {
  return false;
}

@Override
default Streams<T> parallel() {
  throw new UnsupportedOperationException(
    “Not supported yet.”); // or, return this
}
@Override
default Streams<T> unordered() {
  throw new UnsupportedOperationException(
    “Not supported yet.”); // or, return this
}

@Override
default Streams<T> sequential() {
  return this;
}

Since Streams can handle only sequential streams (parallelism is not supported), we can implement the isParallel(), parallel(), unordered(), and sequential() methods as default methods directly in Streams.Next, in order to use our Streams, we need a from(Stream s) method that is capable to wrap the given Stream as follows:

static <T> Streams<T> from(Stream<? extends T> stream) {
  if (stream == null) {
    return from(Stream.empty());
  }
  if (stream instanceof Streams) {
    return (Streams<T>) stream;
  }
  return new StreamsWrapper<>(stream);
}

The StreamsWrapper is a class that wraps the current Stream into sequential Streams. StreamsWrapper implements Streams, so it has to override all the Streams methods and properly wrap the Stream to Streams. Because Streams has quite a lot of methods (as a consequence of extending Stream), we list here only a few of them (the rest are available in the bundled code):

@SuppressWarnings(“unchecked”)
public class StreamsWrapper<T> implements Streams<T> {
  private final Stream<? extends T> delegator;
  public StreamsWrapper(Stream<? extends T> delegator) {
    this.delegator = delegator.sequential();
  }     
  @Override
  public Streams<T> filter(Predicate<? super T> predicate) {      
    return Streams.from(delegator.filter(predicate));
  }
  @Override
  public <R> Streams<R> map(
      Function<? super T, ? extends R> mapper) {
    return Streams.from(delegator.map(mapper));
  }
  …
  @Override
  public T reduce(T identity, BinaryOperator<T> accumulator) {
    return ((Stream<T>) delegator)
      .reduce(identity, accumulator);
  }
  …
}

Finally, we add in Streams the containsAll/Any() methods which are quite straightforward (since Streams extends Stream, we have access to all the Stream goodies without the need to write a stream() hack as in the previous solution). First we add the containsAll() methods:

default boolean contains(T item) {
  return anyMatch(isEqual(item));
}
default boolean containsAll(T… items) {
  return containsAll(Stream.of(items));
}
default boolean containsAll(List<? extends T> items) {
  return containsAll(items.stream());
}
default boolean containsAll(Stream<? extends T> items) {
  Set<? extends T> set = toSet(items);
  if (set.isEmpty()) {
    return true;
  }
  return filter(item -> set.remove(item))
    .anyMatch(any -> set.isEmpty());
}

Second, we add the containsAny() methods:

default boolean containsAny(T… items) {
  return containsAny(Stream.of(items));
}
default boolean containsAny(List<? extends T> items) {
  return containsAny(items.stream());
}
default boolean containsAny(Stream<? extends T> items) {
  Set<? extends T> set = toSet(items);
  if (set.isEmpty()) {
    return false;
  }
  return anyMatch(set::contains);
}

And, the toSet() method that you already know:

static <T> Set<T> toSet(Stream<? extends T> stream) {
  return stream.collect(Collectors.toSet());
}

Mission accomplished! Now, let’s write some examples:

boolean result = Streams.from(cars.stream())
  .filter(car -> car.getBrand().equals(“Mercedes”))
  .contains(car1);
boolean result = Streams.from(cars.stream())
  .containsAll(cars123);
boolean result = Streams.from(cars123.stream())
  .containsAny(cars.stream());

You can find more examples in the bunded code.