# Types of Transporters

In order to communicate with other nodes (ServiceBroker instances) you need to configure a Transporter. There are two types of Transporter:

  • Centralized Transporters: Transporters using a central server. The central server can be, for example, a Redis, NATS, Kafka, MQTT or a JMS server.

  • Decentralized, Peer-to-Peer Transporters: Transporters without a central server. For example, TcpTransporter belongs to this group, which uses Gossip protocol to publish the status of the nodes.

Transporter communicates with other nodes, transfers events, calls requests and processes responses. If a Service runs on multiple instances on different nodes, the requests will be load-balanced among live nodes. Each Transporter can be assigned a Serializer. Serializers convert messages into bytes and vice versa. There are several built-in Transporters in Moleculer Framework.

# Centralized Transporters

Centralized Transporters connect to a central MessageBroker that provide a reliable way of exchanging messages among remote nodes. These brokers use publish/subscribe messaging pattern to deliver data packets.

Centralized topology

# NATS Transporter


Built-in transporter for NATS. NATS Server is a simple, high performance open source messaging system for cloud native applications, IoT messaging, and microservices architectures.

NatsTransporter transporter = new NatsTransporter("nats://nats.server:4222");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

NATS dependencies

To use NATS Transporter, add the following dependency to the build script:
group: 'io.nats', name: 'jnats', version: '2.8.0'

Detailed example:

// Create Transporter
NatsTransporter transporter = new NatsTransporter("host1", "host2");

// Configure Transporter
transporter.setSecure(true);
transporter.setUsername("user");
transporter.setPassword("secret");
transporter.setNoRandomize(true);

// Create Service Broker
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

// Install distributed Services
broker.createService(new Service("testService") {
    Action testAction = ctx -> {

        // Process request JSON (ctx.params),
        // and create response JSON structure
        return new Tree();
    };
});
            
// Connect the Service Broker to other Nodes
broker.start();

# NATS Streaming Transporter


Built-in transporter for NATS Streaming. NATS Server is a simple, high performance open source messaging system for cloud native applications, IoT messaging, and microservices architectures.

NatsStreamingTransporter transporter = new NatsStreamingTransporter("nats://nats.server:4222");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

NATS Streaming dependencies

To use NATS Transporter, add the following dependency to the build script:
group: 'io.nats', name: 'java-nats-streaming', version: '2.2.3'

Detailed example:

// Create Transporter
NatsStreamingTransporter transporter = new NatsStreamingTransporter("host1");

// Configure Transporter
transporter.setClientId("client1");
transporter.setClusterId("test-cluster");
transporter.setMaxPubAcksInFlight(16384);
transporter.setDiscoverPrefix("_STAN.discover");
transporter.setTraceConnection(true);

// Create Service Broker
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

// Install distributed Services
broker.createService(new Service("testService") {
    Action testAction = ctx -> {

        // Process request JSON (ctx.params),
        // and create response JSON structure
        return new Tree();
    };
});
            
// Connect the Service Broker to other Nodes
broker.start();

# Redis Transporter


Built-in Transporter for Redis. Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker.

RedisTransporter transporter = new RedisTransporter("redis://redis.server:6379");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

Redis dependencies

To use Redis Transporter, add the following dependency to the build script:
group: 'biz.paluch.redis', name: 'lettuce', version: '4.5.0.Final'

Detailed example:

RedisTransporter transporter = new RedisTransporter("host1", "host2");
transporter.setSecure(true);
transporter.setPassword("secret");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# MQTT Transporter


Built-in Transporter for MQTT protocol. MQTT Transporter (eg. for Mosquitto MQTT Server or ActiveMQ Server). MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.

MqttTransporter transporter = new MqttTransporter("mqtt://mqtt-server:1883");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

MQTT dependencies

To use MQTT Transporter, add the following dependency to the build script:
group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.5'

Detailed example:

MqttTransporter transporter = new MqttTransporter("host1");
transporter.setUsername("user");
transporter.setPassword("secret");
transporter.setKeepAliveSeconds(120);
transporter.setConnectTimeoutSeconds(10);
transporter.setMessageResendIntervalSeconds(20);
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# AMQP Transporter


Built-in Transporter for AMQP protocol. AMQP Transporter based on RabbitMQ's AMQP client API. AMQP provides a platform-agnostic method for ensuring information is safely transported between applications, among organizations, within mobile infrastructures, and across the Cloud.

AmqpTransporter transporter = new AmqpTransporter("amqp://rabbitmq-server:5672");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

AMQP dependencies

To use AMQP Transporter, add the following dependency to the build script:
group: 'com.rabbitmq', name: 'amqp-client', version: '5.11.0'

Detailed example:

AmqpTransporter transporter = new AmqpTransporter("host1");
transporter.setUsername("user");
transporter.setPassword("secret");
transporter.setSslContextFactory(customSslFactory);
transporter.setQueueProperties(customQueueProperties);
transporter.setExchangeProperties(customExchangeProperties);
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# Kafka Transporter


Built-in Transporter for Kafka. Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies. Kafka Transporter is a very simple implementation. It transfers Moleculer packets to consumers via pub/sub. There are not implemented offset, replay, etc. features.

KafkaTransporter transporter = new KafkaTransporter("kafka://server:9092");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

Kafka dependencies

To use Kafka Transporter, add the following dependency to the build script:
group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'

Detailed example:

KafkaTransporter transporter = new KafkaTransporter();
transporter.setUrls(new String[] { "192.168.51.29:9092" });
transporter.setDebug(true);
transporter.setProducerProperty("session.timeout.ms", "30000");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# JMS Transporter

Built-in Transporter for Java Message Service. The Java Message Service API is a Java Message Oriented Middleware API for sending messages between two or more clients. It is an implementation to handle the Producer-consumer problem. JMS is a part of the Java Enterprise Edition.

// Sample of usage with Active MQ
JmsTransporter transporter = new JmsTransporter(new ActiveMQConnectionFactory());
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

JMS dependencies

To use JMS Transporter, add the following dependency to the build script:
group: 'javax.jms', name: 'javax.jms-api', version: '2.0.1'
+ dependencies of the JMS driver

Detailed example:

JmsTransporter transporter = new JmsTransporter(new ActiveMQConnectionFactory());
transporter.setUsername("user");
transporter.setPassword("secret");
transporter.setAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
transporter.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
transporter.setTransacted(false);
transporter.setPriority(5);
transporter.setTtl(10000);
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# Decentralized, Peer-to-Peer Transporters

Decentralized topology

# TCP Transporter


TCP Transporter uses fault tolerant and peer-to-peer Gossip protocol to discover location and service information about the other nodes participating in a Moleculer Cluster. In Moleculer's P2P architecture all nodes are equal, there is no "leader" or "controller" node, so the cluster is truly horizontally scalable. This Transporter aims to run on top of an infrastructure of hundreds of nodes. TCP Transporter provides the highest speed data transfer between Moleculer Nodes - hundred thousand packets per second can be transmitted from one node to another over a high-speed LAN. There are two ways for TCP Transporter to find other nodes on the network:

  • UDP multicast / broadcast discovery
  • Based on a preset host list

It contains an integrated UDP discovery feature to detect new and disconnected nodes on the network. If the UDP is prohibited on your network, use "urls" option to list the hosts. The list contains remote endpoints; host/ip, port and nodeID. The list can be static or a file path/URL which contains the list.

Example of using TCP Transporter with default settings with UDP discovery:

// Create Transporter
TcpTransporter transporter = new TcpTransporter();

// Create Service Broker
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("node1")
                                    .transporter(transporter)
                                    .build();
            
// Install distributed Services
broker.createService(new Service("testService") {
    Action testAction = ctx -> {

        // Process request JSON (ctx.params),
        // and create response JSON structure
        return new Tree();
    };
});
            
// Connect the Service Broker to other Nodes
broker.start();

TCP Transporter with static endpoint list:

TcpTransporter transporter = new TcpTransporter("172.17.0.1:6000/node-1",
                                                "172.17.0.2:6000/node-2",
                                                "172.17.0.3:6000/node-3");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("node-1")
                                    .transporter(transporter)
                                    .build();

You don't need to set "port" because it find & parse the self TCP port from URL list.

TCP Transporter with static endpoint list file:

TcpTransporter transporter = new TcpTransporter(new URL("file:///nodes.json"));
// nodes.json
[
    "127.0.0.1:6001/client-1",
    "127.0.0.1:7001/server-1",
    "127.0.0.1:7002/server-2"
]

Network traffic is approximately constant when using TCP Transporter. During operation, each node talks to another random node from time to time. They communicate statements about what they know about other nodes of the cluster. If they have conflicting information about another node, they decide which statement is true based on the "sequence number" in the information. This "sequence number" continues to increase by one as the information changes. New information is spreading at an increasing rate within the cluster; the number of nodes that have new information may double every period. (1, then 2, then 4, 8, 16, 32, etc.). Thus, the spread of information is not immediate, but fast enough.

Visualistation of Moleculer's Gossip Protocol

The above visualization shows the communication of 50 nodes. Speed is five times the real speed. During the simulation, nodes are randomly turned on and off. The red squares indicate that one node knows that another node is off. Green squares mean that one node knows that another node is on. Dark-green means new information, light-green means older information ("old" information will be "new" when the node get information about another node). The numbers represent the sequence number. Not only the on / off status is distributed over the network in this way, but also the list of Services and their properties.

Options of TCP Transporter

Name Type Default Description
port int 0 TCP port (used by the Transporter and Gossiper services). A port number of zero will let the system pick up an ephemeral port in a bind operation.
gossipPeriod int 3 Gossiping period time, in SECONDS.
maxConnections int 32 Max number of keep-alive connections (-1 = unlimited, 0 = disable keep-alive connections).
maxPacketSize int 1MB Max enable packet size (BYTES).
urls String[] null List of URLs ("tcp://host:port/nodeID" or "host:port/nodeID" or "host/nodeID"), when UDP discovery is disabled.
useHostName boolean true Use hostnames instead of IP addresses As the DHCP environment is dynamic, any later attempt to use IPs instead hostnames would most likely yield false results. Therefore, use hostnames if you are using DHCP.
udpPort int 4445 UDP broadcast/multicast port.
udpBindAddress int null UDP bind address (null = autodetect)
udpPeriod int 30 UDP broadcast/multicast period in SECONDS.
udpReuseAddr boolean true Resuse addresses (UDP).
udpMaxDiscovery int 0 Maximum number of outgoing multicast packets (0 = runs forever).
udpMulticast String "239.0.0.0" UDP multicast address of automatic discovery service.
udpMulticastTTL int 1 TTL of UDP multicast packets.
udpBroadcast boolean false Use UDP broadcast WITH UDP multicast (false = use UDP multicast only).
udpMaxDiscovery int 0 Maximum number of outgoing multicast packets (0 = runs forever).

# Internal Transporter

Internal Transporter is a built-in message bus that can connect multiple ServiceBrokers running in the same JVM. This Transporter is primarily used for testing purposes (eg. for testing Serializers, Listeners or Actions). The calls are made in separate Threads, so call timeouts can be used.

Using the shared (static) communication group:

ServiceBroker broker1 = ServiceBroker.builder()
                                     .nodeID("node1")
                                     .transporter(new InternalTransporter())
                                     .build();
ServiceBroker broker2 = ServiceBroker.builder()
                                     .nodeID("node2")
                                     .transporter(new InternalTransporter())
                                     .build();

Using independent communication groups:

// --- CREATE COMMUNICATION GROUPS ---

Subscriptions group1 = new Subscriptions();
Subscriptions group2 = new Subscriptions();

// --- CREATE TRANSPORTERS ---

// Group-1
InternalTransporter transporter1 = new InternalTransporter(group1);
InternalTransporter transporter2 = new InternalTransporter(group1);

// Group-2
InternalTransporter transporter3 = new InternalTransporter(group2);
InternalTransporter transporter4 = new InternalTransporter(group2);

// --- CREATE SERVICE BROKERS ---

// Group-1
ServiceBroker broker1 = ServiceBroker.builder()
                                     .nodeID("node1")
                                     .transporter(transporter1)
                                     .build();
ServiceBroker broker2 = ServiceBroker.builder()
                                     .nodeID("node2")
                                     .transporter(transporter2)
                                     .build();

// Group-2
ServiceBroker broker3 = ServiceBroker.builder()
                                     .nodeID("node3")
                                     .transporter(transporter3)
                                     .build();
ServiceBroker broker4 = ServiceBroker.builder()
                                     .nodeID("node4")
                                     .transporter(transporter4)
                                     .build();

# File System Transporter

Built-in, filesystem-based, server-less Transporter. With this Transporter multiple Service Brokers can communicate with each other through a shared directory structure. File System Transporter is not advisable for use in production mode as it is much slower than other Transporter. Rather it can be considered as a reference implementation or a code sample.

FileSystemTransporter transporter = new FileSystemTransporter("/shared/dir");
ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(transporter)
                                    .build();

# Custom Transporter

Custom Transporter module can be created. The simplest solution is to copy the source code of an existing Transporter and modify the "connect", "stopped", "subscribe" and "publish" methods.

Create custom Transporter:

public class CustomTransporter extends Transporter {
    public void connect { /*...*/ }
    public void stopped() { /*...*/ }
    public void publish(String channel, Tree message) { /*...*/ }
    public Promise subscribe(String channel) { /*...*/ }
}

Use custom Transporter:

ServiceBroker broker = ServiceBroker.builder()
                                    .nodeID("server-1")
                                    .transporter(new CustomTransporter())
                                    .build();