# Types of event broadcasts

Molculer ServiceBroker has a built-in event bus for sending events to local and remote services. Events can be used to create event-driven, runtime scalable applications from services deployed on different operating systems and implemented in different languages.

Events can be grouped; events can be sent to

  • to all Listeners (unconditional "broadcast")
  • to a group of Listeners ("broadcast" with "groups" parameter)
  • to one member from all groups (unconditional "emit")
  • or to one member from the specified groups ("emit" with "groups" parameter)

# Emit balanced events

The event Listeners are arranged to logical groups. It means that only one listener is triggered in every group.

Example

An application contains 2 main services: "users" & "payments". Both subscribe to the "user.created" event. 3 instances of "users" service and 2 instances of "payments" service run. If the "user.created" event is emitted, only one "users" and one "payments" service will receive this event.

Balanced events diagram

The group name comes from the Service name, but it can be overwritten in event definition in services (by the "@Group" Annotation).

Example







 
 













import services.moleculer.eventbus.*;
import services.moleculer.service.*;

@Name("payment")
public class PaymentService extends Service {

    @Subscribe("order.created")
    @Group("other")
    Listener orderCreated = ctx -> {
        logger.info("Payload:", ctx.params);
        logger.info("Sender:", ctx.nodeID);
        logger.info("Metadata:", ctx.params.getMeta());
        logger.info("The called event name:", ctx.name);

        // Example of parsing the "params" block:
        String firstName = ctx.params.get("firstName", "defaultValue");
        String lastName  = ctx.params.get("lastName").asString();
    };

}

Balanced events can be sent using "broker.emit" function. The first parameter is the name of the event, the second parameter is the payload. To send multiple/hierarchical values, wrap them into a Tree object:












 

// The payload is a "Tree" object (~=JSON structure)
// that will be serialized for transport:
// {
//   "firstName": "John",
//   "lastName": "Doe"
// }
Tree user = new Tree();
user.put("firstName", "John");
user.put("lastName", "Doe");

// Emit event
broker.emit("user.created", user);

Specify which groups/services shall receive the event:

// Only the "mail" & "payments" services receive this event
broker.emit("user.created", user, Groups.of("mail", "payments"));

# Simplified syntax for sending key-value pairs

If the data structure to be sent consists only of key-value pairs, it is not necessary to create a Tree object. It is enough to list the key-value pairs after the event name (the same syntax can be used for the "ctx.broadcast" and "broker.broadcast" methods):


 
 


ctx.emit("user.created",
         "firstName", "John",
         "lastName", "Doe",
         Groups.of("mail", "payments"));

Creating a Tree object is required when passing more complex data structures between nodes, which contain sub-structures (~=hierarchical JSON objects and JSON arrays).

# Streaming binary files

In addition to JSON structures, it is possible to send large files and/or media content (videos, audio).

// Opening stream
PacketStream stream = broker.createStream();
broker.emit("service.listener", stream);

// Sending file...
stream.transferFrom(new File("source.bin")).then(rsp -> {
    // File submitted
}).catchError(err -> {
    // Unexpected error occurred
});

It is possible to send data immediately if we are not sending data from a file (for example, transmitting a microphone signal).

// Opening stream
PacketStream stream = broker.createStream();
broker.emit("service.listener", stream);

// Sending packets
stream.sendData("packet1".getBytes());
stream.sendData("packet2".getBytes());
stream.sendData("packet3".getBytes());

// Always close streams!
stream.sendClose();

# Broadcast event

The broadcast event is sent to all available local & remote Services. It is not balanced, all event Listener instances receive this event.

Broadcast events diagram

Send broadcast events with "broker.broadcast" method:
















 

// The payload is a hierarchical (~=JSON) structure:
// {
//   "key": "value",
//   "anArray": [1, 2, 3],
//   "anObject": {
//     "port": 1234,
//     "host": "server1"
//   }
// }
Tree config = new Tree();
config.put("key", "value");
config.putList("anArray").add(1).add(2).add(3);
config.putMap("anObject").put("port", 1234).put("host", "server1");

// Send "config" to all listeners
broker.broadcast("config.changed", config);

Specify which groups/services shall receive the event:

// Send to all "mail" service instances
broker.broadcast("user.created", user, Groups.of("mail"));

// Send to all "user" & "purchase" service instances.
broker.broadcast("user.created", user, Groups.of("user", "purchase"));

// Same thing, but with simplified syntax
ctx.broadcast("user.created",
              "firstName", "John",
              "lastName", "Doe",
              Groups.of("user", "purchase"));

# Local broadcast event

Send broadcast events only to all local Services with "broker.broadcastLocal" method:

broker.broadcastLocal("config.changed", config);

# Subscribe to events

The contents of the events are wrapped in an object called event Context to receive the message. The event Context is very similar to Context of Actions.

Context-based event handler & emit a nested event














 




@Name("accounts")
public class AccountService extends Service {

    @Subscribe("user.created")
    Listener userCreated = ctx -> {
        logger.info("Payload:", ctx.params);
        logger.info("Sender:", ctx.nodeID);
        logger.info("Metadata:", ctx.params.getMeta());
        logger.info("The called event name:", ctx.name);

        // Emit a new, nested Event
        Tree payload = new Tree();
        payload.copyFrom(ctx.params, "user");
        ctx.emit("accounts.created", payload);
    };

}

Wildcards

Wildcards (?, *, **) can be used when making event subscriptions.

public class MyService extends Service {

    // Subscribe to "user.created" event
    @Subscribe("user.created")
    Listener userCreated = ctx -> {
        logger.info("User created event received:", ctx.params);
    };

    // Subscribe to all "user" events
    @Subscribe("user.*")
    Listener userEvents = ctx -> {
        logger.info("User event event received:", ctx.params);
    };
        
    // Subscribe to all internal events
    // (internal event names start with "$" prefix)
    @Subscribe("$**")
    Listener internalEvents = ctx -> {
        logger.info("Event " + ctx.name + " received:", ctx.params);
    };
        
}

Private (local or hidden) Event Listeners

With the "private" modifier, only the local events are monitored by the event Listener. Such event Listeners are invisible from the outside of the node, and cannot be called from other ServiceBrokers.

@Subscribe("$services.changed")
private Listener listener = ctx -> {
    logger.info("The list of services has changed!");
};

Redirecting streamed content

Stream data can be redirected to a File, ByteChannel or OutputStream.

ctx.stream.transferTo(new File("/temp.bin")).then(rsp -> {
    // File received and saved successfully
}).catchError(err -> {
    // Unexpected error occurred
});

Processing streamed content

Stream data can also be processed per packet.

ctx.stream.onPacket((bytes, error, closed) -> {
    if (bytes != null) {
        // A byte-array has received
    } else if (error != null) {
        // Error occurred
    }
    if (closed) {
        // Incoming stream closed (EOF)
    }
});

# Context

When you emit an event, the ServiceBroker creates a Context instance which contains all request information and passes it to the event handler as a single argument.

Available properties & methods of Context

Name Type Description
ctx.id String Unique Context ID.
ctx.name String Name of the event.
ctx.params Tree Request params. Contains meta-data (ctx.params.getMeta()).
ctx.level int Request level (in nested-calls). The first level is 1.
ctx.nodeID String The caller or target Node ID.
ctx.parentID String Parent context ID (in nested-calls).
ctx.requestID String Request ID (does not change during the call chain).
ctx.stream PacketStream Streamed content (large files or real-time media).
ctx.opts Options Calling options.
ctx.call() Method Make nested-calls. Same arguments like in broker.call
ctx.emit() Method Emit an event, same as broker.emit
ctx.broadcast() Method Broadcast an event, same as broker.broadcast

# Internal events

The ServiceBroker broadcasts some internal events. Internal events always starts with "$" prefix.

# $services.changed

The ServiceBroker sends this event if the local node or a remote node loads or destroys services.

Payload

Name Type Description
localService boolean True if a local service changed.

# $node.connected

The ServiceBroker sends this event when a node connected or reconnected.

Payload

Name Type Description
node Tree Node info object
reconnected boolean Is reconnected?

# $node.updated

The ServiceBroker sends this event when it has received an INFO message from a node (ie. a Service is loaded or destroyed).

Payload

Name Type Description
node Tree Node info object

# $node.disconnected

The ServiceBroker sends this event when a node disconnected (gracefully or unexpectedly).

Payload

Name Type Description
node Tree Node info object
unexpected boolean "true" - Not received heartbeat, "false" - Received "DISCONNECT" message from node.

# $broker.started

The ServiceBroker sends this event once "broker.start()" is called and all local Services are started.

# $broker.stopped

The ServiceBroker sends this event once "broker.stop()" is called and all local Services are stopped.

# $transporter.connected

The Transporter sends this event once the Transporter is connected.

# $transporter.disconnected

The Transporter sends this event once the Transporter is disconnected.