Async + Reactive = Fast and Productive

Paulo Lopes
Principal Software Engineer

@pml0pes
https://www.linkedin.com/in/pmlopes/
pmlopes

Summary

  • Reactive Programming
  • Reactive Systems
  • Functional Reactive Programming

Reactive Programming

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. Wikipedia

Asynchronous programming

... refers to the occurrence of events independently of the main program flow and ways to deal with such events.wikipedia

Why Asynchronous programming?


        for (int i = 0; i < 10 * 1024; i++) {
          new Thread(() -> {
            URLConnection conn = new URL("http://localhost:8080/")
              .openConnection();

            try (InputStream in = conn.getInputStream()) {
              int ch;
              while ((ch = in.read()) != -1) {
                System.out.print((char) ch);
              }
              System.out.println();
            }
          }).start();
        }
        

How do we change?

STOP thinking about Iterators

START thinking about Streams

Iterators...


					List<MyPOJO> filtered = new ArrayList<>();
					int count = 0;
					for (Iterator<MyPOJO> i = pojos.iterator());
					  i.hasNext() && count < 10; ) {
					    MyPOJO data = i.next();
					      if (data.getName().contains(nameQuery)) {
					        filtered.add(data);
					        count++;
					      }
					}
        

Iterators...


					List<MyPOJO> filtered = new ArrayList<>();
					int count = 0;
					for (Iterator<MyPOJO> i = pojos.iterator());
					  i.hasNext() && count < 10; ) {
					    MyPOJO data = i.next();
					      if (data.getName().contains(nameQuery)) {
					        filtered.add(data);
					        count++;
					      }
					}
        

Iterators...


          List<MyPOJO> filtered = new ArrayList<>();
          int count = 0;
          for (Iterator<MyPOJO> i = pojos.iterator());
            i.hasNext() && count < 10; ) {
              MyPOJO data = i.next();
                if (data.getName().contains(nameQuery)) {
                  filtered.add(data);
                  count++;
                }
          }
        

Iterators...


          List<MyPOJO> filtered = new ArrayList<>();
          int count = 0;
          for (Iterator<MyPOJO> i = pojos.iterator());
            i.hasNext() && count < 10; ) {
              MyPOJO data = i.next();
                if (data.getName().contains(nameQuery)) {
                  filtered.add(data);
                  count++;
                }
          }
        

Streams...


          pojos.stream()
            .filter(p -> p.getName().contains(nameQuery))
            .limit(10)
            .collect(Collectors.toList());
        

Streams...


          pojos.stream()
            .filter(p -> p.getName().contains(nameQuery))
            .limit(10)
            .collect(Collectors.toList());
        

Streams...


          pojos.stream()
            .filter(p -> p.getName().contains(nameQuery))
            .limit(10)
            .collect(Collectors.toList());
        

Streams...


          pojos.stream()
            .filter(p -> p.getName().contains(nameQuery))
            .limit(10)
            .collect(Collectors.toList());
        

Example #1

C10k Problem

The C10k problem is the problem of optimising network sockets to handle a large number of clients at the same time.

The term was coined in 1999 by Dan Kegel, citing the Simtel FTP host serving 10,000 clients at once over 1 gigabit per second Ethernet in that year.Wikipedia

Server.java


          final Router app = Router.router(vertx);
          app.route("/eventbus/*").handler(SockJSHandler.create(vertx));
          vertx.createHttpServer()
            .requestHandler(app::accept)
            .listen(8080, res -> {
              // publish a new message every 10 sec
              vertx.setPeriodic(10000L, t ->
                vertx.eventBus().publish(
                "time", new JsonObject()
                  .put("unixtime", System.currentTimeMillis())));
          });
        

Client.java


          final WebSocket[] sockets = new WebSocket[10 * 1024];
          for (int i = 0; i < sockets.length; i++) {
            vertx.createHttpClient()
              .websocket(8080, "localhost", "/eventbus/websocket", ws -> {
                sockets[i] = ws;
                ws.frameHandler(System.out::println);
                ws.exceptionHandler(Throwable::printStackTrace);
                ws.endHandler(v -> {
                  System.err.println("Connection ended: " + i);
                  sockets[i] = null;
                });
            });
          }
        

DEMO

Reactive

Reactive System

  • Elastic
  • Resilient
  • Responsive
  • Message Driven

Example #2

Consider an application with 2 endpoints:

  • / - the current process id
  • /work - value of π Gregory-Leibniz Series[1]

[1] https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80

Spring5 WebFlux Reactive


          @RestController
          public class PiController {
            private static final String PID =
                ManagementFactory.getRuntimeMXBean().getName();
            @GetMapping("/")
            public Mono<String> home() {
              return Mono.just("Current PID: ")
                  .map(w -> w + PID);
            }
            @GetMapping("/work")
            public Mono<String> work() {
              return Mono.just("Pi is: ")
                  .map(w -> w + Pi.calculate(100_000_000));
            }
          }
        

node.js


          var process = require('process');
          var express = require('express');
          var pi = require('./pi');
          var app = express();

          app.get('/', function (req, res) {
            res.send('Request served by ' + process.pid);
          });

          app.get('/work', function (req, res) {
            res.send('Pi = (' + process.pid + ') ' + pi(100000000));
          });
        

vert.x


            final Router app = Router.router(vertx);
            final PiService pi = ProxyHelper
              .createProxy(PiService.class, vertx, "pi");

            app.get("/").handler(ctx -> ctx.response().end("PID: " + PID));

            app.get("/work").handler(ctx ->
              pi.calculatePi(ar -> {
                if (ar.failed()) {
                  ctx.fail(ar.cause());
                } else {
                  ctx.response().end("Pi = " + ar.result());
                }
              }));
        

DEMO

Responsive

DEMO

Resilient

DEMO

Elastic

Spring5 node.js vert.x
Responsive ✔️* ✔️* ✔️
Message Driven ✔️
Resilient ✔️
Elastic ✔️

* to some extent!

Why Threads 🤮?

5ms / req time

...

          # In optimal circumstances
  
          1 Thread => 200 req/sec
          8 Cores => 1600 req/sec
        

req time grows as threads fight for execution time

...

        # PROCESS STATE CODES
        #   D    Uninterruptible sleep (usually IO)
  
        ps aux | awk '$8 ~ /D/  { print $0 }'
        root 9324 0.0 0.0 8316 436 ? D<   Oct15 0:00 /usr/bin/java...
        

when load is higher than max threads queuing builds up

...

        # [email protected]:tsuna/contextswitch.git
  
        ./cpubench.sh
        model name : Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
        1 physical CPUs, 4 cores/CPU, 2 hardware threads/core
        2000000  thread context switches in 2231974869ns
                                            (1116.0ns/ctxsw)
        

          grep 'CONFIG_HZ=' /boot/config-$(uname -r)
          # CONFIG_HZ=1000
        

Practical example: Tomcat 9.0

  • Default maxThreads: 200
  • Avg req time: 5ms
  • Hypothetical High load: 1000 req
  • Wasted wait/queue time: (1000 / 200 - 1) * 5 = 0~20ms

https://tomcat.apache.org/tomcat-9.0-doc/config/executor.html

at max utilization

CPU is mostly waiting

Non-Blocking I/O

Vert.x

requestHandler(req => { ... })

authHandler(auth => { ... })

dbHandler(db => { ... })

renderHandler(res => req.json(res))

1 CPU core fully used!

Vert.x

Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz

100% CPU cores used!

Benchmarking is hard

  • Meaningful benchmarks are even harder
  • Techempower Framework Benchmarks
    • Contributors: 545
    • Pull Requests: 4138
    • Commits: 11230
    • Frameworks: 658

https://github.com/TechEmpower/FrameworkBenchmarks

Baseline: JAX-RS

  • Blocking API
  • Thread Based
  • Java

jax-rs


          
          @GET
          @Path("/queries")
          World[]
          queries(@QueryParam("queries") String queries)
          {
            World[] worlds = new World[queries];
            Session session = emf.createEntityManager();
  
            for (int i = 0; i < queries; i++) {
              worlds[i] = session
                .byId(World.class)
                .load(randomWorld());
            }
  
            return worlds;
          }
          

vert.x


          
          void
          queriesHandler(final RoutingContext ctx) {
  
            World[] worlds = new World[getQueries(ctx)];
            AtomicInteger cnt = new AtomicInteger();
  
            for (int i = 0; i < getQueries(ctx); i++) {
              db.preparedQuery(FIND_WORLD, ..., res -> {
                final Row row = res.result()
                            .iterator()
                            .next();
  
                worlds[cnt.incrementAndGet()] =
                  new World(row);
  
                if (cnt.get() == queries) {
                  ctx.response()
                    .end(Json.encodeToBuffer(worlds));
                }
              });
            }
          }
          

Simple results

  • Vert.x: 37,157 req/s
  • Jax-RS: 14,493 req/s
Spring5 node.js Micronaut vert.x
req/s 129.556 175.195 209.863 582.077
RAM (G) 7.27 4.92 6.36 4.10
Max CPU 71% 86% 79% 66%

Example #3

  • Vert.x
  • node.js
  • the browser

Vert.x


            final Router router = Router.router(vertx);
            BridgeOptions sockjsConfig = ...;
            router
              .route("/eventbus/*")
              .handler(SockJSHandler.create(vertx).bridge(sockjsConfig));

            router.route().handler(StaticHandler.create());
            vertx.createHttpServer()
              .requestHandler(router)
              .listen(8080);

            vertx.setPeriodic(500, t ->
              vertx.eventBus().send("greetings",
                new JsonObject().put("msg", "Greetings from Vert.x!")));
          }
        

node.js


          var EventBus = require('vertx3-eventbus-client');

          var eb = new EventBus('http://localhost:8080/eventbus');

          eb.onopen = () => {
            setInterval(() => {
              eb.publish('greetings', {msg: 'Hello from Node.js!'});
            }, 500);

            eb.registerHandler('greetings', (err, msg) => {
              console.log(msg.body.msg);
            });
          };
        

react.js


          const eb = new EventBus(`//${window.location.host}/eventbus`);

          class App extends React.Component {
            componentWillMount() {
              eb.registerHandler('greetings', (err, msg) => {
                this.state.messages.unshift(msg.body.msg);
                this.setState({ messages: this.state.messages });
              });
            }
            static sayHello(e) {
              e.preventDefault();
              eb.publish('greetings', {msg: 'Hello from React.js!'})
            }
            ...
          }
        

Conclusion

Reactive Programming != Reactive Systems

How do I start?

The End

  • Thank you!