# Types of event broadcasts
Molculer ServiceBroker
has a built-in event bus for sending events to local and remote services.
Events can be used to create event-driven, runtime scalable applications from services
deployed on different operating systems and implemented in different languages.
Events can be grouped; events can be sent to
- to all
Listeners
(unconditional "broadcast") - to a group of
Listeners
("broadcast" with "groups" parameter) - to one member from all groups (unconditional "emit")
- or to one member from the specified groups ("emit" with "groups" parameter)
# Emit balanced events
The event Listeners
are arranged to logical groups.
It means that only one listener is triggered in every group.
Example
An application contains 2 main services: "users" & "payments". Both subscribe to the "user.created" event. 3 instances of "users" service and 2 instances of "payments" service run. If the "user.created" event is emitted, only one "users" and one "payments" service will receive this event.
![Balanced events diagram](balanced-events.gif)
The group name comes from the Service
name, but it can be overwritten in event definition in services
(by the "@Group" Annotation).
Example
import services.moleculer.eventbus.*;
import services.moleculer.service.*;
@Name("payment")
public class PaymentService extends Service {
@Subscribe("order.created")
@Group("other")
Listener orderCreated = ctx -> {
logger.info("Payload:", ctx.params);
logger.info("Sender:", ctx.nodeID);
logger.info("Metadata:", ctx.params.getMeta());
logger.info("The called event name:", ctx.name);
// Example of parsing the "params" block:
String firstName = ctx.params.get("firstName", "defaultValue");
String lastName = ctx.params.get("lastName").asString();
};
}
Balanced events can be sent using "broker.emit" function. The first parameter is the name of the event, the second parameter is the payload. To send multiple/hierarchical values, wrap them into a Tree object:
// The payload is a "Tree" object (~=JSON structure)
// that will be serialized for transport:
// {
// "firstName": "John",
// "lastName": "Doe"
// }
Tree user = new Tree();
user.put("firstName", "John");
user.put("lastName", "Doe");
// Emit event
broker.emit("user.created", user);
Specify which groups/services shall receive the event:
// Only the "mail" & "payments" services receive this event
broker.emit("user.created", user, Groups.of("mail", "payments"));
# Simplified syntax for sending key-value pairs
If the data structure to be sent consists only of key-value pairs,
it is not necessary to create a Tree
object.
It is enough to list the key-value pairs after the event name
(the same syntax can be used for the "ctx.broadcast" and "broker.broadcast" methods):
ctx.emit("user.created",
"firstName", "John",
"lastName", "Doe",
Groups.of("mail", "payments"));
Creating a Tree object is required when passing more complex data structures between nodes, which contain sub-structures (~=hierarchical JSON objects and JSON arrays).
# Streaming binary files
In addition to JSON structures, it is possible to send large files and/or media content (videos, audio).
// Opening stream
PacketStream stream = broker.createStream();
broker.emit("service.listener", stream);
// Sending file...
stream.transferFrom(new File("source.bin")).then(rsp -> {
// File submitted
}).catchError(err -> {
// Unexpected error occurred
});
It is possible to send data immediately if we are not sending data from a file (for example, transmitting a microphone signal).
// Opening stream
PacketStream stream = broker.createStream();
broker.emit("service.listener", stream);
// Sending packets
stream.sendData("packet1".getBytes());
stream.sendData("packet2".getBytes());
stream.sendData("packet3".getBytes());
// Always close streams!
stream.sendClose();
# Broadcast event
The broadcast event is sent to all available local & remote Services
.
It is not balanced, all event Listener
instances receive this event.
![Broadcast events diagram](broadcast-events.gif)
Send broadcast events with "broker.broadcast" method:
// The payload is a hierarchical (~=JSON) structure:
// {
// "key": "value",
// "anArray": [1, 2, 3],
// "anObject": {
// "port": 1234,
// "host": "server1"
// }
// }
Tree config = new Tree();
config.put("key", "value");
config.putList("anArray").add(1).add(2).add(3);
config.putMap("anObject").put("port", 1234).put("host", "server1");
// Send "config" to all listeners
broker.broadcast("config.changed", config);
Specify which groups/services shall receive the event:
// Send to all "mail" service instances
broker.broadcast("user.created", user, Groups.of("mail"));
// Send to all "user" & "purchase" service instances.
broker.broadcast("user.created", user, Groups.of("user", "purchase"));
// Same thing, but with simplified syntax
ctx.broadcast("user.created",
"firstName", "John",
"lastName", "Doe",
Groups.of("user", "purchase"));
# Local broadcast event
Send broadcast events only to all local Services
with "broker.broadcastLocal" method:
broker.broadcastLocal("config.changed", config);
# Subscribe to events
The contents of the events are wrapped in an object called event Context
to receive the message.
The event Context
is very similar to Context
of Actions
.
Context-based event handler & emit a nested event
@Name("accounts")
public class AccountService extends Service {
@Subscribe("user.created")
Listener userCreated = ctx -> {
logger.info("Payload:", ctx.params);
logger.info("Sender:", ctx.nodeID);
logger.info("Metadata:", ctx.params.getMeta());
logger.info("The called event name:", ctx.name);
// Emit a new, nested Event
Tree payload = new Tree();
payload.copyFrom(ctx.params, "user");
ctx.emit("accounts.created", payload);
};
}
Wildcards
Wildcards (?
, *
, **
) can be used when making event subscriptions.
public class MyService extends Service {
// Subscribe to "user.created" event
@Subscribe("user.created")
Listener userCreated = ctx -> {
logger.info("User created event received:", ctx.params);
};
// Subscribe to all "user" events
@Subscribe("user.*")
Listener userEvents = ctx -> {
logger.info("User event event received:", ctx.params);
};
// Subscribe to all internal events
// (internal event names start with "$" prefix)
@Subscribe("$**")
Listener internalEvents = ctx -> {
logger.info("Event " + ctx.name + " received:", ctx.params);
};
}
Private (local or hidden) Event Listeners
With the "private" modifier, only the local events are monitored by the event Listener
.
Such event Listeners
are invisible from the outside of the node,
and cannot be called from other ServiceBrokers
.
@Subscribe("$services.changed")
private Listener listener = ctx -> {
logger.info("The list of services has changed!");
};
Redirecting streamed content
Stream data can be redirected to a File
, ByteChannel
or OutputStream
.
ctx.stream.transferTo(new File("/temp.bin")).then(rsp -> {
// File received and saved successfully
}).catchError(err -> {
// Unexpected error occurred
});
Processing streamed content
Stream data can also be processed per packet.
ctx.stream.onPacket((bytes, error, closed) -> {
if (bytes != null) {
// A byte-array has received
} else if (error != null) {
// Error occurred
}
if (closed) {
// Incoming stream closed (EOF)
}
});
# Context
When you emit an event, the ServiceBroker
creates a Context
instance which contains all
request information and passes it to the event handler as a single argument.
Available properties & methods of Context
Name | Type | Description |
---|---|---|
ctx.id | String | Unique Context ID. |
ctx.name | String | Name of the event. |
ctx.params | Tree | Request params. Contains meta-data (ctx.params.getMeta()). |
ctx.level | int | Request level (in nested-calls). The first level is 1. |
ctx.nodeID | String | The caller or target Node ID. |
ctx.parentID | String | Parent context ID (in nested-calls). |
ctx.requestID | String | Request ID (does not change during the call chain). |
ctx.stream | PacketStream | Streamed content (large files or real-time media). |
ctx.opts | Options | Calling options. |
ctx.call() | Method | Make nested-calls. Same arguments like in broker.call |
ctx.emit() | Method | Emit an event, same as broker.emit |
ctx.broadcast() | Method | Broadcast an event, same as broker.broadcast |
# Internal events
The ServiceBroker
broadcasts some internal events.
Internal events always starts with "$" prefix.
# $services.changed
The ServiceBroker
sends this event if the local node or a remote node loads or destroys services.
Payload
Name | Type | Description |
---|---|---|
localService | boolean | True if a local service changed. |
# $node.connected
The ServiceBroker
sends this event when a node connected or reconnected.
Payload
Name | Type | Description |
---|---|---|
node | Tree | Node info object |
reconnected | boolean | Is reconnected? |
# $node.updated
The ServiceBroker
sends this event when it has received an INFO message from a node
(ie. a Service
is loaded or destroyed).
Payload
Name | Type | Description |
---|---|---|
node | Tree | Node info object |
# $node.disconnected
The ServiceBroker
sends this event when a node disconnected (gracefully or unexpectedly).
Payload
Name | Type | Description |
---|---|---|
node | Tree | Node info object |
unexpected | boolean | "true" - Not received heartbeat, "false" - Received "DISCONNECT" message from node. |
# $broker.started
The ServiceBroker
sends this event once "broker.start()" is called and all local Services
are started.
# $broker.stopped
The ServiceBroker
sends this event once "broker.stop()" is called and all local Services
are stopped.
# $transporter.connected
The Transporter
sends this event once the Transporter
is connected.
# $transporter.disconnected
The Transporter
sends this event once the Transporter
is disconnected.