This is the first part of how to make effective comms between Unreal Engine and a custom server.
The posts in this series can be found here:
- How to make effective comms between UE and custom server
- Implementing a good backend system design (This post)
- Connecting your Unreal Engine to your websockets (synchronize players)
- Motion smoothing for your actors
- How to spawn and control mobs using UE connected to our MMO server
The server code can be found on github here.
The high-level design of what we’re achieving can be found here.
We will look at how we can make a implementation following this general architecture:
We will look at how to implement the socket and the services that interact with it. Let’s dive in!
Note that in the code snippets I may skip some things that are less important – the code is available on Github for reference!
Socket implementation
We will first create the socket to which the client will connect to.
@ServerWebSocket("/v1/communication-socket")
public class CommunicationSocket {
ConcurrentSet<WebSocketSession> socketSessions = new ConcurrentSet<>();
@OnOpen
public void onOpen(WebSocketSession session) {
// TODO: get player/server name via headers
socketSessions.add(session);
}
@OnMessage
public void onMessage(SocketMessage message, WebSocketSession session) {
// TODO: get player/server name via injected headers
updateSessionParams(session, message);
socketProcessService.processMessage(message);
}
@OnClose
public void onClose(WebSocketSession session) {
playerMotionService.disconnectPlayer(SessionParams.PLAYER_NAME.getType());
socketSessions.remove(session);
}
}
Note I didn’t include all the functions from the file, but just important ones that I will be discussing.
Ok so we first start by defining our socket: @ServerWebSocket("/v1/communication-socket")
.
This means that our socket path locally will be: ws://localhost:8081/v1/communication-socket
.
I didn’t add player name or map into the path in order to keep it generic – but perhaps you may choose to make sockets more specific.
Ok next I will fill out the content for:
-
@OnOpen
@OnMessage
@OnClose
The real key one is the OnMessage
. This will have all of the content that our UE client/server is sending us to process. Having said that, we still have some important things to do when opening/closing the sessions.
On Open and On Close
First of all, we want to track our sessions. This will become a little more apparent as to why later when we start synchronizing the data they contain.
We track it simply by adding it into concurrent set when we open the socket -> socketSessions.add(session);
I have not implemented authentication layer yet so I had to hack some things in place, but here I would have obtained the headers from onOpen
, injected by a authentication layer, such as Cognito perhaps and I would have added some variables into the session for the player name / server name.
When we open the socket we can also set the player as online, but at the moment this is automatically done when the players motion updates.
For onClose
we want to disconnect the player, i.e. setting isOnline
to false. Furthermore, we want to just remove the session from the tracking set.
Processing Websocket Message
I added a line:
updateSessionParams(session, message);
Which will ensure the player/server name is synchronized on the session parameters. This is not the right approach – it should be injected from authentication layer which does not exist yet.
So all we need to do is pass this message to the processing service.
socketProcessService.processMessage(message);
For reference, our socket message contains:
public class SocketMessage {
String updateType;
Monster monster;
PlayerMotion playerMotion;
String mobId; // mob id used by server to identify mesh etc to use
String mobInstanceId; // mob instance ID is the unique mob identifier
String playerName; // this is temp, while auth layer is not done
String serverName; // this is temp, while auth layer is not done
}
The UpdateType
String can only be one of the following Enums for now:
@Getter
@AllArgsConstructor
public enum MessageType {
CREATE_MOB("CREATE_MOB"),
MOB_MOTION("MOB_MOTION"),
PLAYER_MOTION("PLAYER_MOTION"),
MOB_COMBAT("MOB_COMBAT"),
PLAYER_COMBAT("PLAYER_COMBAT"),
INVENTORY_UPDATE("INVENTORY_UPDATE");
public final String type;
}
This will evolve in the future, but for now I just needed a generic message structure which will handle players and monster motion updates. As you can see from the MessageType
, the socket message will in near future contain combat updates, inventory updates, etc.
Process message service
Ok let’s dive a little into the service which executes socketProcessService.processMessage(message);
@Slf4j
@Singleton
public class SocketProcessOutgoingService {
// this service determines what happens with the outgoing message - specifically where it gets sent
UpdateProducer updateProducer;
public SocketProcessOutgoingService(
@KafkaClient("update-producer") UpdateProducer updateProducer) {
this.updateProducer = updateProducer;
}
public void processMessage(SocketMessage socketMessage) {
String updateType = socketMessage.getUpdateType();
// TODO: Make this more pretty
if (updateType.equals(MessageType.PLAYER_MOTION.getType())) {
handlePlayerMotionUpdate(socketMessage);
} else if (updateType.equals(MessageType.CREATE_MOB.getType())) {
handleCreateMob(socketMessage);
} else if (updateType.equals(MessageType.MOB_MOTION.getType())) {
handleMobMotionUpdate(socketMessage);
// ......... more functions here
} else {
log.error("Did not recognise update type, {}", updateType);
}
}
private void handlePlayerMotionUpdate(SocketMessage message) {
updateProducer.sendPlayerMotionUpdate(message.getPlayerMotion());
}
private void handleMobMotionUpdate(SocketMessage message) {
updateProducer.sendMobMotionUpdate(message.getMonster());
}
private void handleCreateMob(SocketMessage message) {
updateProducer.sendCreateMob(message.getMonster());
}
....
}
Ok so we enter this class at processMessage
. I’ve done a bunch of IF statements because Java doesn’t support switch on enums. To make this more pretty I may create a static map containing the enums mapped to the functionality that it should execute.
The important thing of this class though is to act as a messenger.
From the constructor, you may find that we spawned a Kafka client: @KafkaClient("update-producer") UpdateProducer updateProducer
.
Based on the update type, this class essentially determines which class should receive the message for the actual processing.
For example, if the update type is: updateType.equals(MessageType.PLAYER_MOTION.getType())
Then we enter:
private void handlePlayerMotionUpdate(SocketMessage message) {
updateProducer.sendPlayerMotionUpdate(message.getPlayerMotion());
}
This will also scale very well when the service is broken into microservices, because this class doesn’t even need to know who/what is subscribed to that event.
The definition of that particular update is:
@KafkaClient(id = "update-producer")
public interface UpdateProducer {
@Topic("player-motion-update")
void sendPlayerMotionUpdate(PlayerMotion playerMotion);
.....
}
The message listeners
Ok so now what we covered is that we have our websocket receive messages from UE client/server. We check inside the message to see who the recipient should be and based on that we send a Kafka event.
Now, we have to create the subscribers to these events, which could be in the same service or another service or potentially another instance of same service.
We started looking at player motion example, so let’s continue with that one.
Note that this listener should be configured to receive the message once, even if there’s multiple instances (some others you will want ALL instances to receive the message).
@Slf4j
@KafkaListener(...)
public class PlayerMotionUpdateListener {
@Inject PlayerMotionService playerMotionService;
@Topic("player-motion-update")
public void receive(PlayerMotion playerMotion) {
// TODO: validate
playerMotionService
.updatePlayerMotion(playerMotion)
.doOnError(
(error) ->
log.error("Error updating player motion, {}", error.getMessage()))
.subscribe();
// make others aware of this motion
playerMotionService.relayPlayerMotion(playerMotion);
}
}
Ok so what we can see here is that when this function receives the message, it will want to process it. For the time being, it does very little. It will want to validate the player motion update – this is to prevent hacking and similar.
Next, it will subscribe to saving this motion to the DB (mongo db in our case). The great thing about this function is that it’s non-blocking as I am utilizing the reactive framework.
Finally, we want to relay the message back using Kafka.
public void relayPlayerMotion(PlayerMotion playerMotion) {
playerMotionUpdateProducer.sendPlayerMotionResult(playerMotion);
}
Which is executing:
@KafkaClient(id = "player-motion-client")
public interface PlayerMotionUpdateProducer {
@Topic("player-motion-update-result")
void sendPlayerMotionResult(PlayerMotion playerMotion);
}
So we’re pushing the result BACK to Kafka. You may ask, why would you do that?
Well this is to overcome the Websocket limitation of being single session based.
Websocket Kafka responder
Ok so the last thing we did was to send back the motion information to @Topic("player-motion-update-result")
.
So now we need another service layer on the same one as the sockets:
@Slf4j
@KafkaListener(...)
public class SocketUpdateListener {
@Inject ClientUpdatesService clientUpdatesService;
@Topic("player-motion-update-result")
void receivePlayerMotionUpdate(PlayerMotion playerMotion) {
clientUpdatesService.sendMotionUpdatesToSubscribedClients(playerMotion);
}
@Topic("mob-motion-update-result")
void receiveMobMotionUpdate(Monster monster) {
clientUpdatesService.sendMotionUpdatesToSubscribedClients(monster);
}
}
This Kafka listener needs to be configured such that ALL instances receive the updates (unlike the motion processor).
You have to visualize here that you can have multiple instances of the websocket running and they all need to check whether they need to process the message or not (i.e. do they have players subscribed to this event).
For completion, let’s look at the function which deals with the sending the message back to users.
public void sendMotionUpdatesToSubscribedClients(PlayerMotion playerMotion) {
SocketResponse socketResponse =
SocketResponse.builder()
.messageType(SocketResponseType.PLAYER_MOTION_UPDATE.getType())
.playerMotion(
Map.of(playerMotion.getPlayerName(),playerMotion))
.playerKeys(Set.of(playerMotion.getPlayerName()))
.build();
broadcaster
.broadcast(socketResponse, isValid(playerMotion.getPlayerName()))
.subscribe(socketResponseSubscriber);
}
The key function from that is actually: isValid
. That’s what determines who gets the message.
It’s definition can be found here:
private Predicate<WebSocketSession> isValid(String playerOrMob) {
// we will report to player every time they call update about other players nearby
return s -> {
String serverName = (String) s.asMap().get(SessionParams.SERVER_NAME.getType());
boolean isServer = serverName != null && !serverName.isBlank();
// server does not track mobs
Set<String> mobs =
isServer
? Set.of()
: (Set<String>)
s.asMap()
.getOrDefault(
SessionParams.TRACKING_MOBS.getType(),
Set.of());
Set<String> players =
(Set<String>)
s.asMap()
.getOrDefault(
SessionParams.TRACKING_PLAYERS.getType(), Set.of());
return mobs.contains(playerOrMob) || players.contains(playerOrMob);
};
}
The formatting makes this look a bit funky so here’s the image for this function.
It will perhaps be modified and simplified but for now it proves the MVP.
Each player/server has a session. That session contains parameters for the mobs/players they are subscribing to – i.e. if those players or mobs move, they want to know about it.
So in this isValid
function we check, does this session track the player or mob ID? We use a SET structure to make this check more efficient as it will run very often.
And that’s how the UE client/server will receive updates!
But wait, how did we populate the tracking players
. This occurs asynchronously, let’s try quickly cover that now too.
Determining the tracked players/mobs
I’ve created a service which runs asynchronously to the socket comms to determine what messages each session is subscribed to.
Remember the ConcurrentSet<WebSocketSession> socketSessions
we created earlier? This class will be consuming it.
Its a bit of monolithic design so it will be subject to change! But for now it proves a MVP.
@Singleton
public class SynchroniseSessionService {
......
@Scheduled(fixedDelay = "1s")
public void evaluateNearbyPlayers() {
ConcurrentSet<WebSocketSession> sessions = socket.getLiveSessions();
sessions.parallelStream().forEach(session -> {
String playerName = (String) session.asMap().get(SessionParams.PLAYER_NAME.getType());
Motion motion = (Motion) session.asMap().get(SessionParams.MOTION.getType());
if (motion == null) {
// possibly the motion is not fully initiated
return;
}
// sync nearby players
// TODO: Make these calls via Kafka
String serverName = (String) session.asMap().get(SessionParams.SERVER_NAME.getType());
boolean isServer = serverName != null && !serverName.isBlank();
int distanceThreshold =
isServer
? DISTANCE_THRESHOLD_SERVER
: DISTANCE_THRESHOLD_PLAYER;
playerMotionService
.getNearbyPlayersAsync(motion, playerName, distanceThreshold)
.doAfterSuccess(list -> {
if (list == null || list.isEmpty()) {
return;
}
Set<String> playerNames = list.stream()
.map(PlayerMotion::getPlayerName)
.collect(Collectors.toSet());
evaluateNewPlayers(playerNames, session);
// update the names that we follow
session.put(
SessionParams.TRACKING_PLAYERS.getType(),
playerNames);
})
.doOnError((error) -> log.error("error getting nearby players, {}", error.getMessage()))
.subscribe();
if (isServer) {
return;
}
// sync nearby mobs, if this is a player only
mobInstanceService
.getMobsNearby(new Location(motion))
.doAfterSuccess(mobList -> {
if (mobList == null || mobList.isEmpty()) {
return;
}
Set<String> mobInstanceIds = evaluateNewMobs(mobList, session);
session.put(
SessionParams.TRACKING_MOBS.getType(),
mobInstanceIds);
})
.doOnError((error) -> log.error("error getting nearby mobs, {}", error.getMessage()))
.subscribe();
});
}
}
This function appears long and complicated and to some degree it is and it will need simplifying. So perhaps I will just explain what its achieving from high level.
- Every 1 second, run this function, to synchronize what each session is tracking
- Get the session params, the player/server name and the motion/location of where they are
- For each player/server, call
getNearbyPlayersAsync
- For player only call
getMobsNearby
(because server is already aware of mobs nearby) - Compare the list of nearby players/mobs to what the player/server is already tracking
- If no changes, nothing required
- If there are new players/mobs, subscribe to them by adding this mob/player ID into session params and send details of the player/mob to the session now.
- If there are ‘lost’ players/mobs (e.g. out of range or disconnected), unsubscribe from those players/mobs and send update to remove those from the client
These are fairly expensive calls to make too, so there should be a balance of how often they’re called. On the bright side, all of these DB calls are non-blocking, enhancing the performance greatly.