Write your own Java Stream Collectors

May 27, 2019

Introduction

The Java Stream API has brought us a lot of awesome ways of working with data pipelines : streaming, mapping, filtering, reducing, collecting is how we deal with our algorithms since Java 8.

While being great to use and providing a great number of already prepared solutions for those operations, we sometimes need to go a step further with those technical implementations. For diverse reasons : the way we deal with data is a bit different of what the collection has to offer us for example.

The objective of this article is to provide an easy and understandable way of creating your own collector for your own specific needs.

Collectors.java

If you take a look at the JDK’s class Collectors, you’ll have that feeling that you don’t really understand or want to understand what’s happening behind the curtain. Using Collectors.toList() will be more than enough 80% of the time and you don’t want to waste time trying to decipher these kind of hieroglyphic-like Java code.

Collectors::groupingBy implementationCollectors::groupingBy implementation

Well, trust me on that : you won’t be having any difficulty writing your own Collector .

Collector.java

Looking at Collector.java , we directly see 5 unimplemented methods and two other factory methods. Those take into account 4 functions and one set of Characteristics needed to create our own Collector :

  • Supplier : will be the first container needed used for accumulation of elements
  • Accumulator : A BiConsumer for which no presentation is needed, it takes the result of the Supplier and one element of the stream and performs an action on them
  • Combiner : A BinaryOperator needed in the case of parallel computing to merge two instance of the Supplier ‘s result
  • Finisher : This one is a Function , it takes the result of Supplier and maps it to the wanted result of the collect operation
  • Characteristics : A set of characteristics that applies to the stream

While this explanation can be kind of abstract now, it will become easier to understand with the following example.

Our Use Case

Let’s say I have a list of Person s that I want to group by city and age.

public class Person {
    String name;
    int age;
    String city;
}

I could use Collectors.groupingBy twice for example, but I would end up with a Map<String, Map<Integer, Person>> which is not what I want at all for my processing. I would ideally want a List<List> for the sake of simplicity.

Look at this question on StackOverflow.

We could maybe use a List as key for our Map and end up with Map<List, List>with the list playing the role of a composite key, but this is not ideal, as the map keys should always be immutable, as we know.

But what if I want to add another grouping function?

No easy way to accomplish this huh?

Well…

Creating our Collector

Let’s begin first by implementing the Collector interface :

public static class GroupingPersonBy implements Collector<Person, List<Person>, List<Set<Person>>>

You can see I have three types defined in the Collector generics placeholders

  • Person : simply the type of elements streaming all the way to our collector
  • List : is the type of the object where the Person objects will be accumulated before being mapped to a …
  • List<Set> : return type of the collect operation

Let’s now implement each function, one by one

Constructor

We need a list of functions to group our objects accordingly. In our case, as we want to group by age and city, we will instantiate our object in the following way : new GroupingPersonBy(Person::getAge, Person::getCity)

private final Function<Person, Object>[] functions;

public GroupingPersonBy(Function<Person, Object>... functions) {
    this.functions = *requireNonNull*(functions);
}

Supplier

Supplying the collection used for accumulation

@Override
public Supplier<List<Person>> supplier() {
    return ArrayList::new;
}

Accumulator

Logic needed to accumulate the elements inside of the supplier’s result

@Override
public BiConsumer<List<Person>, Person> accumulator() {
    return List::add;
}

Combiner

Combiner BinaryOperator needed to merge to of our List s

@Override
public BinaryOperator<List<Person>> combiner() {
    return (l1, l2) -> {
        l1.addAll(l2);
        return l1;
    };
}

Characteristics

Characteristics applying to our collector. For our needs, we’ll only use UNORDERED . Other values can be CONCURRENT and IDENTITY_FINISH

@Override
public Set<Characteristics> characteristics() {
    return Set.*of*(*UNORDERED*);
}

Finisher

The finisher is the most interesting part of the collector, it represents its core since all the logic needed to go from the accumulated List to the List<Set> lies there. In our case, we’ll make a general BiPredicate of all our Function<Person, Object> and test all Person objects against each others.

@Override
public Function<List<Person>, List<Set<Person>>> finisher() {
    return list -> {
        BiPredicate<Person, Person> predicate = null;

        for (Function<Person, Object> function : functions) {
            BiPredicate<Person, Person> personPersonBiPredicate = (p1, p2) -> function.apply(p1).equals(function.apply(p2));
            if (*isNull*(predicate)) {
                predicate = personPersonBiPredicate;
                continue;
            }
            predicate = predicate.and(personPersonBiPredicate);
        }

        List<Set<Person>> listOfSets = new ArrayList<>();
        boolean[] alreadyAdded = new boolean[list.size()];

        for (int i = 0; i < list.size(); i++) {
            Person personOne = list.get(i);
            Set<Person> personSet = new HashSet<>();
            if (!alreadyAdded[i]) {
                personSet.add(personOne);
                alreadyAdded[i] = true;
            }
            for (int i1 = i + 1; i1 < list.size(); i1++) {
                Person personTwo = list.get(i1);
                if (predicate.test(personOne, personTwo) && !alreadyAdded[i1]) {
                    personSet.add(personTwo);
                    alreadyAdded[i1] = true;
                }
            }
            if (!personSet.isEmpty())
                listOfSets.add(personSet);
        }

        return listOfSets;
    };
}

Testing the Collector

Now that the Collector is totally written and ready to be used, we can use it in the following way

List<Person> people = *asList*(
        new Person("John Doe", 18, "Los Angeles"),
        new Person("John Dough", 18, "New York"),
        new Person("Joan Doe", 18, "Los Angeles"),
        new Person("John Dough", 19, "San Francisco"),
        new Person("Johnny Doe", 19, "Los Angeles"),
        new Person("Mac Doe", 19, "New York"),
        new Person("Ringo Doe", 18, "New York"),
        new Person("Johnny Dough", 19, "Los Angeles"),
        new Person("Goeffrey Doe", 19, "New York"),
        new Person("John Deaux", 19, "Mons"),
        new Person("Goeffrey Deaux", 19, null)
);

List<Set<Person>> collect = people.stream()
        .parallel()
        .collect(new GroupingPersonBy(Person::getAge, Person::getCity));

collect.forEach(System.*out*::println);

And the printed out result is the following

[Person{name='John Doe', age=18, city='Los Angeles'}, Person{name='Joan Doe', age=18, city='Los Angeles'}]
[Person{name='Ringo Doe', age=18, city='New York'}, Person{name='John Dough', age=18, city='New York'}]
[Person{name='John Dough', age=19, city='San Francisco'}]
[Person{name='Johnny Doe', age=19, city='Los Angeles'}, Person{name='Johnny Dough', age=19, city='Los Angeles'}]
[Person{name='Goeffrey Doe', age=19, city='New York'}, Person{name='Mac Doe', age=19, city='New York'}]
[Person{name='John Deaux', age=19, city='Mons'}]
[Person{name='Goeffrey Deaux', age=19, city='null'}]

It really works as expected and was not difficult at all to write.

You can now have your fun writing your own Collector s for your own specific needs.