Skip to content

Getting Into (A) State With Java 8 Streams

Hot on the heels of my earlier foray into Java 8′s new shininess comes more playing.

The Oracle documentation has this to say about state and streams:

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from. The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

All that accepted, nonetheless sometimes it is necessary for a stream operation to look around at the world. So the question naturally becomes: how?

Enter exhibit A. A toy application that, given a signal stream will determine it’s phase: rising, level or falling.

package jdk8;

import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.DoubleStream;

public class Phaser {
    public static void main(String[] args) {
        final int RESOLUTION = 20;
        final int CYCLES = 2;

        final AtomicReference<Double> history = new AtomicReference<>(0.0D);

        // generate a few cycles of a nice sinusoidal signal
        DoubleStream.iterate(0.0D, n -> n + (2 * Math.PI) / RESOLUTION)
                .limit(CYCLES * RESOLUTION)
                .sequential()
                .map(Math::sin)
                .boxed()
//                .map(d ->
//                                ((Function<Double, SignalPair>) next ->
//                                        new SignalPair(history.getAndSet(next), next)
//                                ).apply(d)
//
                .map(d ->
                    new SignalPair(history.getAndSet(d), d)
                )
                .map(v -> String.format("%f,%f,%d", v.getPrevious(), v.getCurrent(), v.getPhase()))
                .forEachOrdered(System.out::println);
    }
}

class SignalPair {
    private final Double previous;
    private final Double current;

    public SignalPair(Double previous, Double current) {
        this.previous = previous;
        this.current = current;
    }

    public Double getCurrent() { return current; }

    public Double getPrevious() { return previous; }

    public int getPhase() {
        return (int) Math.signum(current - previous);
    }
}

Hipster-point earning features here include:

  • Doublestream.iterate() presents an infinite generator for a sinusoidal double-valued signal with a specific resolution; it will be invoked a limited number of times (2 complete cycles in this case).
  • Processing a signal stream like this is inherently a sequential activity; hence .sequential()
  • It is easy to map a value x -> SIN(x) or execute println(s) using method references.
  • It’s also easy to map a SignalPair instance to a CSV-formatted String instance.

The above are just small-denomination pieces of hipster currency, though. The real payoff can be seen in this drilldown:

        final AtomicReference<Double> history = new AtomicReference<>(0.0D);

        ...

                        .map(d ->
                    new SignalPair(history.getAndSet(d), d)
                )

It’s worth stating again that determining a signal’s phase is inherently both sequential and differential in nature. To be correct, you MUST compare values n and n-1 in the order that they arrived, not in some parallel-friendly way.

The AtomicReference makes it possible to maintain state (a single Double instance, in this case) from one stream invocation/iteration to the next. The double indirection imposed by the use of AtomicReference is actually needed: map() cannot simply update a reference to a Double but it can update the value stored in an instance to which it has access. A Map (or —old-school style—even an element of an array) would probably also work.

It’s worth comparing the ‘longhand’ mapping code in comments with the shorter form. I originally wrote the former before realising that the latter was possible. The equivalence isn’t made particularly clear in any documentation that I have read, so for reference I kept the “new and shiny” version alongside the “new and even shinier” one.

Does it work? Yes. Here is an edited version of the output:

0.000000,0.000000,0
0.000000,0.309017,1
0.309017,0.587785,1
0.587785,0.809017,1
0.809017,0.951057,1
0.951057,1.000000,1
1.000000,0.951057,-1
0.951057,0.809017,-1
0.809017,0.587785,-1
0.587785,0.309017,-1
...
-1.000000,-0.951057,1
...
0.951057,1.000000,1
1.000000,0.951057,-1
...
-0.951057,-1.000000,-1
-1.000000,-0.951057,1
...
-0.587785,-0.309017,1

On to Exhibit B. Another stateful algorithm: work out the area of a polygon, given the vertices of that polygon presented in sequential order.

package jdk8;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class Area {
    public static void main(String[] args) {

        List<Vertex> polygon = new ArrayList<Vertex>() {{
            add(new Vertex(0.0D, 0.0D));
            add(new Vertex(0.0D, 2.0D));
            add(new Vertex(2.0D, 2.0D));
            add(new Vertex(2.0D, 0.0D));
        }};

        Vertex v0 = polygon.get(0);
        polygon.add(v0);  // ensure closed polygon

        AtomicInteger ai = new AtomicInteger(0);
        double area = polygon.stream().limit(polygon.size() - 1).sequential()
                .peek(System.out::println)
                .mapToDouble(p -> {
                    Vertex nextV = polygon.get(ai.incrementAndGet());

                    return (nextV.getX() - p.getX()) * (nextV.getY() + p.getY());
                })
                .sum() / 2;

        System.out.printf("Area: %f\n", area);
    }
}

class Vertex {
    private final double x;
    private final double y;

    Vertex(double x, double y) {
        this.x = x;
        this.y = y;
    }

    public double getX() { return x; }

    public double getY() { return y; }

    @Override
    public String toString() {
        return "Vertex{" +
                "x=" + x +
                ", y=" + y +
                '}';
    }
}

This is not so different, but has one new low-score hipster-point earning feature:

  • .peek() is a useful ‘debugging’ tool.

It is probably worth pointing out that the initialisation of the polygon ArrayList instance is just standard Java. It’s not something that one often sees around but this feature has been a part of Java since anonymous inner classes and instance initialisers were introduced in version 1.1.

I hear you asking “Does this do it’s stuff?” Oh why do you doubt me so? Take a look:

Vertex{x=0.0, y=0.0}
Vertex{x=0.0, y=2.0}
Vertex{x=2.0, y=2.0}
Vertex{x=2.0, y=0.0}
Area: 4.000000

I feel obliged to include the disclaimer that all the above is not really anything that a true purveyor of Functional Goodness(™) would condone or be proud of. But what the heck, eh?! At least I’m not saying “what’s the point of it all?“, now am I?

Tags: , ,

C, Java Enterprise Edition, JEE, J2EE, JBoss, Application Server, Glassfish, JavaServer Pages, JSP, Tag Libraries, Servlets, Enterprise Java Beans, EJB, Java Messaging Service JMS, BEA Weblogic, JBoss, Application Servers, Spring Framework, Groovy, Grails, Griffon, GPars, GAnt, Spock, Gradle, Seam, Open Source, Service Oriented Architectures, SOA, Java 2 Standard Edition, J2SE, Eclipse, Intellij, Oracle Service Bus, OSB