Astral Realms Documentation Help

Messaging (RabbitMQ)

MessagingService in commons provides RabbitMQ-based inter-server communication with pub/sub and RPC patterns.

Configuration

Set the connection details in plugins/AstralCore/config.yml:

rabbitmq: host: "localhost" port: 5672 username: "guest" password: "guest" virtual-host: "/"

Connection State

The service connects automatically on plugin enable and disconnects on disable. The connection lifecycle emits state change events:

State

Meaning

CONNECTING

Attempting to establish the AMQP connection

CONNECTED

Connection is active

DISCONNECTING

Graceful shutdown in progress

DISCONNECTED

No active connection

FAILED

Connection attempt failed (will retry with backoff)

Monitor state changes:

messagingService.addConnectionListener(state -> { logger.info("RabbitMQ state: " + state); });

Pub/Sub — Publishing a Packet

MessagingService messaging = AstralCore.get().messaging(); // Fire-and-forget messaging.send("astralcore.servers", new ServerHeartbeatPacket());

Pub/Sub — Subscribing to an Exchange

messaging.registerExchange("astralcore.players", (packet, envelope) -> { if (packet instanceof PlayerJoinPacket join) { handlePlayerJoin(join); } });

The listener receives the decoded Packet instance and RabbitMQ's Envelope metadata.

RPC — Request with Reply

CompletableFuture<Packet> future = messaging.sendWithReply( "astralcore.servers", // exchange "my-reply-queue", // reply queue new FindEmptiestServerPacket("survival"), 5000L // timeout in milliseconds ); future.thenAccept(response -> { if (response instanceof ServerSearchResponse res) { // use res } });

RPC — Handling Requests

messaging.registerExchange("astralcore.servers", (packet, envelope) -> { if (packet instanceof ServerSearchRequest req) { // compute response return new ServerSearchResponse(foundServer); } return null; // no reply });

Core Exchanges

Built-in exchanges used by AstralCore internally:

Constant

Exchange name

Purpose

CoreExchanges.SERVERS

astralcore.servers

Server registration, heartbeat, status

CoreExchanges.PLAYERS

astralcore.players

Player join/quit network events

CoreExchanges.TELEPORTATION

astralcore.teleportation

Cross-server teleport requests

CoreExchanges.DISCORD

astralcore.discord

Discord account linking

Thread Pool

The service uses a fixed thread pool with 6 threads (named AstralCore-RabbitMQ-*) for all AMQP operations. This prevents unbounded thread creation under high traffic.

Traffic level

Recommended pool size

Low

4

Medium (default)

6

High

8–10

The pool size is defined in commons/src/main/java/com/astralrealms/core/messaging/MessagingService.java:44 as RABBITMQ_THREAD_POOL_SIZE.

Self-message Filtering

Every MessagingService instance generates a random SENDER_ID UUID on startup. Outgoing packets are wrapped with this ID, and incoming packets from the same sender are silently discarded. This prevents a server from processing its own broadcasts.

Packet Registration

Packets are registered by integer ID in PacketRegistry. All built-in packets are pre-registered. To add a custom packet:

PacketRegistry registry = AstralCore.get().messaging().packetRegistry(); registry.registerPacket(200, MyCustomPacket::new);

Implement the Packet interface with write(BinaryMessage) and read(BinaryMessage) for binary serialisation.

23 April 2026