Async + Reactive = Fast and Productive
Paulo Lopes
Principal Software Engineer
@pml0pes
https://www.linkedin.com/in/pmlopes/
pmlopes
Summary
Reactive Programming
Reactive Systems
Functional Reactive Programming
In this talk I will be speaking about Reactive.
Reactive is quite popular these days and aparently everyone claims to be reactive.
We can classify **Reactive** in 3 main areas:
* ***>slide<*** Reactive programming
* ***>slide<*** Reactive Systems
* ***>slide<*** Functional Reactive Programming
I will stay away from the last one because it is an totally different topic and talk.
I will talk about the first two, so we can finally **Reactive all the things!**
Reactive Programming
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.
Wikipedia
What is Reactive Programming?
***>slide<*** Wikipedia says:
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.
If this is to vague, I've highlighted the important parts!
Small hands up! Who here is familiar with:
* Node.js?
* Vert.x?
* Spring 5?
If your hand was up, sorry but this next couple of slides will be a bit boring...
Asynchronous programming
... refers to the occurrence of events independently of the main program flow and ways to deal with such events .wikipedia
If we continue reading Wikipedia we get:
Asynchronous programming refers to the occurrence of events independently of the main program flow and ways to deal with such events.
And continues...
These may be "outside" events such as the arrival of signals, or actions instigated by a program that take place concurrently with program execution, without the program blocking to wait for results.
Why Asynchronous programming?
But the question is:
* Why is this important?
* Why the sudden need to change how we program?
for (int i = 0; i < 10 * 1024; i++) {
new Thread(() -> {
URLConnection conn = new URL("http://localhost:8080/")
.openConnection();
try (InputStream in = conn.getInputStream()) {
int ch;
while ((ch = in.read()) != -1) {
System.out.print((char) ch);
}
System.out.println();
}
}).start();
}
Let's imagine the scenario where we want to test if our application can handle 10000 client in parallel.
The application is an hello world (for the sake of completion).
In a synchronous world, we would loop 10000 times, creating a thread and each thread would then connect to the server.
We would again synchronously wait for the response and print as the stream offer data to us to the console.
* It's 2020 we have big servers this should work, right?
* 10000 threads, what's the deal with that, right?
* But this is so trivial why not test on a Raspberry Pi?
* Lets connect to the Pi
* Start top to see the state of the machine
* Lets connect on another shell
* And now run the application
* What did just happen?
* It died, Out of Memory allocating threads...
* As you can see the synchronous world does not scale as they promised us.
* Sure run on a bigger server, add more RAM...
* But we can make the problem more complex
* Add a database here, and external system there...
* it does not scale!
How do we change?
So how does a developer change the mind set from synchronous to asynchronous?
STOP thinking about Iterators
Simple:
We must stop thinking about **Iterators**
START thinking about Streams
And we must start think about **Streams**.
Iterators...
List<MyPOJO> filtered = new ArrayList<>();
int count = 0;
for (Iterator<MyPOJO> i = pojos.iterator());
i.hasNext() && count < 10; ) {
MyPOJO data = i.next();
if (data.getName().contains(nameQuery)) {
filtered.add(data);
count++;
}
}
Let's look at this code. Imagine that we want to select the first 10 elements which name contains a given pattern.
In a synchronous world, one would think of creating a placeholder list where items can be added.
Iterators...
List<MyPOJO> filtered = new ArrayList<>();
int count = 0;
for (Iterator<MyPOJO> i = pojos.iterator());
i.hasNext() && count < 10; ) {
MyPOJO data = i.next();
if (data.getName().contains(nameQuery)) {
filtered.add(data);
count++;
}
}
If we only want the first 10 matches, we need before hand to keep track of the state, plus during the iteration we need to maintain the state.
Iterators...
List<MyPOJO> filtered = new ArrayList<>();
int count = 0;
for (Iterator<MyPOJO> i = pojos.iterator());
i.hasNext() && count < 10; ) {
MyPOJO data = i.next();
if (data.getName().contains(nameQuery)) {
filtered.add(data);
count++;
}
}
Due to the synchronous programming model we must explicitly iterate the list.
Iterators...
List<MyPOJO> filtered = new ArrayList<>();
int count = 0;
for (Iterator<MyPOJO> i = pojos.iterator());
i.hasNext() && count < 10; ) {
MyPOJO data = i.next();
if (data.getName().contains(nameQuery)) {
filtered.add(data);
count++;
}
}
And finally if the item from the list matches our query it gets processed.
Streams...
pojos.stream()
.filter(p -> p.getName().contains(nameQuery))
.limit(10)
.collect(Collectors.toList());
Now, lets look into streams.
With streams we have no control over the step over of elements, but that is fine. We must think in terms of **reactions**.
Streams...
pojos.stream()
.filter(p -> p.getName().contains(nameQuery))
.limit(10)
.collect(Collectors.toList());
We specify filter functions (as a reaction to the event of element being available).
In this case, given `p` as a stream element, if the name of `p` contains our `query` then it is further processed.
Streams...
pojos.stream()
.filter(p -> p.getName().contains(nameQuery))
.limit(10)
.collect(Collectors.toList());
We specify that we want to limit the results to 10 elements.
Streams...
pojos.stream()
.filter(p -> p.getName().contains(nameQuery))
.limit(10)
.collect(Collectors.toList());
And finally we collect the results. And in this case we ask the API to collect into a `List`.
Once you get familiar with this mindset, you will see that reactive programming can be more concise and easier to read that traditional synchronous programming.
Example #1
C10k Problem
Lets look at a real world example.
The C10k problem.
The C10k problem is the problem of optimising network sockets to handle a large number of clients at the same time.
The term was coined in 1999 by Dan Kegel, citing the Simtel FTP host serving 10,000 clients at once over 1 gigabit per second Ethernet in that year.Wikipedia
So what is the C10k problem?
The challenge with this problem is that if you have a traditional Thread per connection server, you will need 10 000 threads to handle the load. Once you manage to spawn this amount of threads you will notice that your application will be rendered unusable due to the fact that the operating system is constantly switching the thread context.
But remember we're talking about reactive programming, so lets implement a reactive server that can do this:
Server.java
final Router app = Router.router(vertx);
app.route("/eventbus/*").handler(SockJSHandler.create(vertx));
vertx.createHttpServer()
.requestHandler(app::accept)
.listen(8080, res -> {
// publish a new message every 10 sec
vertx.setPeriodic(10000L, t ->
vertx.eventBus().publish(
"time", new JsonObject()
.put("unixtime", System.currentTimeMillis())));
});
I'm using Vert.x for this example, in order to keep the connections open all the time, I will be using websockets instead of plain HTTP requests.
Let me read the code:
1. Create a router to map HTTP requests to handlers (a handler is like a callback for node people)
2. All requests to `eventbus` are handled by sockjs (one helper library we have for websockets)
3. We create a HTTP server
4. Requests are handled by the router
5. We listen on port 8080
6. Every 10 seconds we publish a message to all websockets with the current time.
Client.java
final WebSocket[] sockets = new WebSocket[10 * 1024];
for (int i = 0; i < sockets.length; i++) {
vertx.createHttpClient()
.websocket(8080, "localhost", "/eventbus/websocket", ws -> {
sockets[i] = ws;
ws.frameHandler(System.out::println);
ws.exceptionHandler(Throwable::printStackTrace);
ws.endHandler(v -> {
System.err.println("Connection ended: " + i);
sockets[i] = null;
});
});
}
The client code is also small, instead of spawning 10000 processes (which would be pretty much as having 10000 threads in terms of performance), we will manage all websockets from a single process.
Let me read the code:
1. For the length of the sockets array (10000)
2. Create a websocket to port 8080 at localhost using the resource `eventbus/resource`
3. On frame received print to the console
4. On error print the exception
5. On end log
DEMO
Reactive
To make things more exciting I will not run on my laptop. Remember the problem was defined in 1999 the CPU of the time was the first generation Intel Xeon.
My laptop has a Intel i7 so it wouldn't be fair. Let me run this again on the Raspberry Pi 2. It's a 2! not a 3!
So lets do this!
I'll connect to the Pi and start the server. Ignore the warnings for now, those are a vert.x protection to let you know that your code is blocking the event loop so your application will not be abble to process other events until the event loop is unblocked.
Start top and lets count the number of sockets. So far so good.
Now start the client again the event loop block check warns that your code is doing something too heavy... ignore for now...
Ok, everything is runnig, the CPU remains low, thats good!
What about the number of sockets? More than 20000, well that is correct since we use 10000 from the client to connect to the server which requires another 10000 so we're in fact running the C20k problem :)
And again the CPU remains low.
And there! Job done!
Reactive programming is quite powerful, but **it is not** a silver bullet.
Reactive System
Elastic
Resilient
Responsive
Message Driven
Lets talk reactive systems, because **we** need:
* ***>slide<*** Elasticity - because we need to handle varying load, we need to write our applications so that it can be split up and be distributed across several machines
* ***>slide<*** Resilience - because 1) most applications must provide high availability and 2) things will go wrong, we must not put all eggs into one basket. Again, our application must be distributed.
Based on the assumption that we want Elasticity and Resilience we will get:
* ***>slide<*** Responsiveness: we want to have systems that respond in a timely manner, under varying load and in the case of failures
* ***>slide<*** Asynchronous Messaging: is required to achieve the properties mentioned before
By having asynchronous messages we also get:
* Decoupling: we want compartmentalization (required for Resilience), which forces us to have clear system boundaries and loose coupling
* Maintainability & Flexibility: is a result we get from clear boundaries and loose coupling
Example #2
Consider an application with 2 endpoints:
/ - the current process id
/work - value of π Gregory-Leibniz Series[1]
why do we need a reactive system? Consider the following web application to calculate the value of Pi using the Gregory-Leibniz Series.
It will expose 2 REST end points:
1. / will return the current process id
2. /work will return the value of Pi
Spring5 WebFlux Reactive
@RestController
public class PiController {
private static final String PID =
ManagementFactory.getRuntimeMXBean().getName();
@GetMapping("/")
public Mono<String> home() {
return Mono.just("Current PID: ")
.map(w -> w + PID);
}
@GetMapping("/work")
public Mono<String> work() {
return Mono.just("Pi is: ")
.map(w -> w + Pi.calculate(100_000_000));
}
}
In the latest and greatest Spring Reactive we would implement the controller as this.
Spring Reactive claims to be Reactive (duh!) and uses their Mono/Flux API to process streams.
Since the application is trivial, we only use Mono of a single value.
node.js
var process = require('process');
var express = require('express');
var pi = require('./pi');
var app = express();
app.get('/', function (req, res) {
res.send('Request served by ' + process.pid);
});
app.get('/work', function (req, res) {
res.send('Pi = (' + process.pid + ') ' + pi(100000000));
});
Node JS which is also reactive in conjuction with Express.js could be implemented as this.
Node API relies on callbacks, callbacks are continuations, where the flow control is resumed on the given last argument of the call.
For simplicity I'm still using the ES5 notation but that is not a requirement the same could be achieved with ES6.
vert.x
final Router app = Router.router(vertx);
final PiService pi = ProxyHelper
.createProxy(PiService.class, vertx, "pi");
app.get("/").handler(ctx -> ctx.response().end("PID: " + PID));
app.get("/work").handler(ctx ->
pi.calculatePi(ar -> {
if (ar.failed()) {
ctx.fail(ar.cause());
} else {
ctx.response().end("Pi = " + ar.result());
}
}));
Finally the Vert.x application, In a way it is similar to Node but introduces the concept of Services and Proxies. A Proxy Service is a way to abstract the message driven code and invoke the service regardless of the physical location on the cluster.
The moment of truth! How do these frameworks compare?
Start with spring.
Test that the server is Ok.
Now let's generate lots of load so the server gets saturated.
And lets try to get the value of Pi...
It timed out, so the responsive test failed under load.
Lets repeat for node.
It timed out, so the responsive test failed under load.
Finally lets start vert.x.
It takes some time but the result arrives. Vert.x achieves this because it can off load CPU intensive tasks from the the event loop so it won't block the rest of the application as Spring or Node do.
Only Vert.x has out of the box support for being resilient, the important flags are: `ha` `cluster`.
So lets start a jvm and join to cluster.
As you see there is nothing running, now start the application.
Lets kill the application Pid
Back to the shell, but notice the empty JVM, it noticed that the application died so it respawn it there!
Lets test the API the PID is different now!
Your application has survived your OPS team can have a peaceful night of sleep!
Elastic means that we can horizontally scale and again only Vert.x offers this out of the box, lets see, start a server.
Test the API, the PID never changes as it is obvious.
Now start a second node and see that both nodes discover each other.
Test the API, note the PID, it changes in a round robin although you're going always to the same HTTP server.
The load is now shared across the nodes automatically.
Lets look again at the reactive system model.
* A reactive system **is** elastic and resilient
* This **gives** us a responsive application
* To **build** such a system we need a message driven architecture
Which in turn also **gives** us:
* a maintainable and extensible application
Spring5
node.js
vert.x
Responsive
✔️*
✔️*
✔️
Message Driven
✔️
Resilient
✔️
Elastic
✔️
* to some extent!
Let's close this small analysis and look at the scoreboard.
All frameworks adopt a reactive programming model but not all are reactive systems.
5ms / req time
# In optimal circumstances
1 Thread => 200 req/sec
8 Cores => 1600 req/sec
Consider an API that has a SLA of 5ms
In optimal circumstances 1 thread will handle 200 req/s
So My laptop which has 8 cores shall be able to handle 1600 req/s
req time grows as threads fight for execution time
# PROCESS STATE CODES
# D Uninterruptible sleep (usually IO)
ps aux | awk '$8 ~ /D/ { print $0 }'
root 9324 0.0 0.0 8316 436 ? D< Oct15 0:00 /usr/bin/java...
when load is higher than max threads queuing builds up
# [email protected] :tsuna/contextswitch.git
./cpubench.sh
model name : Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
1 physical CPUs, 4 cores/CPU, 2 hardware threads/core
2000000 thread context switches in 2231974869ns
(1116.0ns/ctxsw)
grep 'CONFIG_HZ=' /boot/config-$(uname -r)
# CONFIG_HZ=1000
given the benchmark from the previous slide and my linux kernel tick configuration, I waste for each second
1.116ms in context switches?
at max utilization
CPU is mostly waiting
Vert.x
requestHandler(req => { ... })
authHandler(auth => { ... })
dbHandler(db => { ... })
renderHandler(res => req.json(res))
Vert.x
Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
Baseline: JAX-RS
Blocking API
Thread Based
Java
jax-rs
@GET
@Path("/queries")
World[]
queries(@QueryParam("queries") String queries)
{
World[] worlds = new World[queries];
Session session = emf.createEntityManager();
for (int i = 0; i < queries; i++) {
worlds[i] = session
.byId(World.class)
.load(randomWorld());
}
return worlds;
}
vert.x
void
queriesHandler(final RoutingContext ctx) {
World[] worlds = new World[getQueries(ctx)];
AtomicInteger cnt = new AtomicInteger();
for (int i = 0; i < getQueries(ctx); i++) {
db.preparedQuery(FIND_WORLD, ..., res -> {
final Row row = res.result()
.iterator()
.next();
worlds[cnt.incrementAndGet()] =
new World(row);
if (cnt.get() == queries) {
ctx.response()
.end(Json.encodeToBuffer(worlds));
}
});
}
}
Simple results
Vert.x: 37,157 req/s
Jax-RS: 14,493 req/s
Did you know that using non blocking frameworks (like vert.x) you will not need as much hardware as with
blocking frameworks. Which means, lower cloud bills and also very important, less resource waste, which is
always good for the planet.
Let me elaborate more on this. Here's a rotated result of the benchmark run on 2019-12-31.
I've truncated the results to the top 128 results, the list contains more than 400 frameworks but then it would really be
impossible to read...
Quizz time, who uses nodejs? where do you think it will be ranked?
No so fast after all...
That would be another talk on it's own I'm afraid!
So who uses Micronaut? You know it's native and fast right?
A bit better than node but not impressive...
Spring? anyone uses spring reactive and the fastest postgres driver pg-client?
Ouch...
OK, what if you were using vert.x?
Well this is what I'm trying to tell you, reactive and fully async programming are fast and productive!
Spring5
node.js
Micronaut
vert.x
req/s
129.556
175.195
209.863
582.077
RAM (G)
7.27
4.92
6.36
4.10
Max CPU
71%
86%
79%
66%
Let's close this small analysis and look at the scoreboard.
All frameworks adopt a reactive programming model but not all are reactive systems.
Now that you've seen the benefits of choosing a reactive system...
Let me show you some secrets about Vert.x
Example #3
Vert.x
node.js
the browser
Vert.x Message Driven EventBus is so simple that it can be extended to anything
I mean anything, for example:
* an external application in NodeJS
* and even the Browser!!!
Vert.x
final Router router = Router.router(vertx);
BridgeOptions sockjsConfig = ...;
router
.route("/eventbus/*")
.handler(SockJSHandler.create(vertx).bridge(sockjsConfig));
router.route().handler(StaticHandler.create());
vertx.createHttpServer()
.requestHandler(router)
.listen(8080);
vertx.setPeriodic(500, t ->
vertx.eventBus().send("greetings",
new JsonObject().put("msg", "Greetings from Vert.x!")));
}
So the vertx code to create a HTTP server and publish a greeting message every 500 milliseconds would be like this.
node.js
var EventBus = require('vertx3-eventbus-client');
var eb = new EventBus('http://localhost:8080/eventbus');
eb.onopen = () => {
setInterval(() => {
eb.publish('greetings', {msg: 'Hello from Node.js!'});
}, 500);
eb.registerHandler('greetings', (err, msg) => {
console.log(msg.body.msg);
});
};
From node we can pretty much do the same, connect to the vert.x node and both listen and publish greeting messages.
react.js
const eb = new EventBus(`//${window.location.host}/eventbus`);
class App extends React.Component {
componentWillMount() {
eb.registerHandler('greetings', (err, msg) => {
this.state.messages.unshift(msg.body.msg);
this.setState({ messages: this.state.messages });
});
}
static sayHello(e) {
e.preventDefault();
eb.publish('greetings', {msg: 'Hello from React.js!'})
}
...
}
Finally from your browser, say in your react application also listen and publish events...
Reactive Programming != Reactive Systems
How do I start?
So where do you go from here?
There is a free book, feel free to get a copy, scan the QR code!
And here's our project github organization and twitter account!