37. How to connect your Java Micronaut/SpringBoot with Kafka SASL

Part of the MMO journey, we need to make sure that we use scalable web systems. One thing we can do, is make our server event driven.

Concept is to have our UE web server communicate with our Micronaut server using Kafka pub/sub, then clients obtain relevant information from our Micronaut server directly.

There’s some very useful documentation for getting started with Java frameworks:

Note that both are quite similar, we will be using Micronaut here, but a lot of commands may be similar/identical (with obviously different imports).

This post will focus on getting it working in the development environment but we will have SASL enabled.

Kafka is used within this hobby MMO server project.

So all the code that is shared here, will be available as example in that project.

This part will later be used to synchronize with Unreal Engine server and process the updates from there.

Follow the blog / YouTube channel to stay on top of the updates!

Blog – Unreal MMO Development with Yaz (unreal-mmo-dev.com)

How to integrate Kafka with Java Micronaut / SpringBoot

Getting started – docker-compose

Your Kafka and Zookeeper will be started on docker containers rather than locally. This is pretty standard practice now.

What are the key components that are required in your docker-compose?

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
    networks:
      - kafka-net
    tmpfs: "/datalog"
  kafka1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 9093:9093
    networks:
      - kafka-net
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ZOOKEEPER_SASL_ENABLED: false
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/configs/kafka_server_jaas.conf"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 2
#     https://github.com/robcowart/docker_compose_cookbook/blob/master/confluent_kafka_oss/docker-compose.yml
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./configs:/etc/kafka/configs
    restart: always

There are several different images that can be used for your Kafka, the one I am using is:

confluentinc/cp-kafka – Docker Image | Docker Hub

To be honest it took me a while to find some of the environment variable names, for instance I had issue with the default offset topic replication being 3, but only 1 broker enabled. I found KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR by locating it manually here: https://github.com/robcowart/docker_compose_cookbook/blob/master/confluent_kafka_oss/docker-compose.yml

There are others available there so check it out if there are other parameters you may need.

Configurations – docker-compose

Part of the docker-compose file, we are pushing our configs to the kafka containers. This is achieved through this line:

volumes:
  - ./configs:/etc/kafka/configs

You should add them as such, feel free to rename them and just update the reference in the volumes if you do.

kafka container configs

kafka_server_haas.conf:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="password123"
  user_kafka="password123";
};

The kafka_server_jaas.conf file essentially contains the authentication information for client/server. In reality, you will not want to store them like this for production, but its fine for development.

For production, these whole files can be injected before the build, for example through the use of elastic beanstalk build configurations. Also the production docker-compose would generally look quite different to your development one.

The next file in there is config.properties:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="password123" user_kafka="password123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

this config.properties will be useful/used if you try to do any manual tests, e.g. produce/consume messages over CLI with provided config.

Note that we had the line: ZOOKEEPER_SASL_ENABLED: false. This means that the comms between the Kafka container and Zookeeper are over plaintext. This is possible because they are over shared network space. If you did not want this, you’d also have to configure zookeeper_jaas.conf and set that up. It may look something like:

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="admin-secret";
};

In production it should be ok for zookeeper to share network with the Kafka brokers without SASL/SSL, provided that network is not exposed to the public. It reduces some complexity and improves performance very slightly.

Configurations – Micronaut / Springboot

Now we setup a working Kafka server/client (you can and perhaps should test it using CLI before proceeding, perhaps check the testing section below just to make sure).

My configuration can be found here (application.yml):

kafka:
  bootstrap:
    servers: 127.0.0.1:9093
  security:
    protocol: SASL_PLAINTEXT
  sasl:
    mechanism: PLAIN
    jaas:
      config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="password123";
  consumers:
    mmo-server:
      bootstrap:
        servers: 127.0.0.1:9093
      session:
        timeout:
          ms: 30000
      auto:
        offset:
          reset: earliest

Note that replication of 1 is not good for production, you need at least 2 and typical is 3.

Check that bootstrap.servers matches the port of your SASL enabled port from docker-compose. You need to also ensure you’ve setup the security/sasl information which matches with docker-compose too.

The section on consumers is related to your @KafkaListener code. You can specify configs under ‘default’ (check documentation) or specify based on groups, in this case, the group ID is mmo-server.

Again, note that jaas-config should not be provided with plaintext credentials in production – they would usually be part of env variables here.

Useful example

Now let’s create a scenario where I want to publish an update, specifically relating to a mob and then I want a subscriber to pick this event up and process it.

I will do the following for this example then:

  • Define a complex (ish) data structure representing a mob update
  • Define a Producer which will publish the messages
  • Create a timed event which will publish the message periodically
  • Create a listener which will subscribe to the event and do something (print to output) when receive the message.

Monster update model

Let’s begin by defining the monster update model.

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MobUpdate {

    String mobId;
    String mobInstanceId;
    Motion motion;
    String state;
    String target;
}

We’re using Lombok for the constructors/getters/setters.

Don’t worry about reproducing this exactly, this is just for testing representation.

Producer interface

Next, we will define the producer:

@KafkaClient(id = "test_client")
public interface TestProducer {

    @Topic("mob-updates")
    void sendFakeUpdate(MobUpdate mobUpdate);

}

This is super simple because Micronaut (and Springboot) is pretty awesome..

Basically, when you define an interface and specify the Kafka client and the Topic you want to publish, Micronaut will go ahead and fill out all the necessary internal code for you!

Check some more in-depth docs here for example: Kafka and the Micronaut Framework – Event-Driven Applications.

Execute the producer via scheduled task

Ok now that we have the function to use to actually do the publishing, let’s create a call which will execute the call.

@Singleton
public class TestKafkaService {

    TestProducer testProducer;

    public TestKafkaService(@KafkaClient("test_client") TestProducer testProducer) {
        // advised this way vs direct inject
        this.testProducer = testProducer;
    }

    @Scheduled(fixedDelay = "10s")
    void executeEveryTen() {
        MobUpdate mobUpdate = new MobUpdate(UUID.randomUUID().toString(), UUID.randomUUID().toString(), null, "ATTACKING", "player1");
        testProducer.sendFakeUpdate(mobUpdate);
    }
}

Here I created a class, make sure to tag it as a Singleton to make sure it gets instantiated by Micronaut/Springboot.

You could either inject/autowire it (actually I haven’t tested that part as the docs suggested to use constructor like above, but it should probably work too).

Next I create the MobUpdate model we defined earlier, note I add some random IDs to it so we have some different data to display.

Then I use the interface bean to actually publish the message.

Creating the Kafka listener

Finally, let’s define the Kafka listener, this will be ‘subscribing’ to the events and will do some work after receiving the message.

@KafkaListener(groupId = "mmo-server", offsetReset = OffsetReset.EARLIEST)
public class MonsterServerListener {

    @Topic("mob-updates")
    public void receive(MobUpdate mobUpdate) {
        System.out.printf("Got a mob update, mob id: %s, mob instance id: %s, mote: %s, state: %s, target: %s%n",
                mobUpdate.getMobId(), mobUpdate.getMobInstanceId(), mobUpdate.getMotion(), mobUpdate.getState(), mobUpdate.getTarget());
    }
}

Note the groupId matches to that from application.yml.

Micronaut will automatically create the consumer at startup and you can define the groups they belong to.

Specify the Topic above the function which it will ‘listen to’. Inside the function call, specify the data object that you are expecting to get from this call.

You can also specify much more data, a common one for instance is @KafkaKey which can be a String/Integer or a more complex data object.

That’s basically it for the implementation!

So its time for some testing.

Testing

Since we just finished some code which will send some updates, listen to it and print from the consumer, let’s just put a screenshot of that working:

Testing the producer/consumer from Micronaut

As you can see, I am almost simulating some updates from a server which will represent monster updates.

You may want to try execute the requests over CLI if having problems with Java code. In order to execute them, go into your container using docker exec -it <container_id> /bin/bash. Here are some useful snippets:

Create topic:

kafka-topics --create --bootstrap-server localhost:9093 --replication-factor 1
 --partitions 1 --topic test --command-config /etc/kafka/configs/config.properties

Setup a listener

kafka-console-consumer --bootstrap-server localhost:9093 --topic test --from-beginning
 --partition 0
 --consumer.config /etc/kafka/configs/config.properties

Publish message:

kafka-console-producer --broker-list localhost:9093 --topic test
 --producer.config /etc/kafka/configs/config.properties

Before this stage though, you may run into more problems, for example the docker image has changed and some env variables are different etc.

If you’re having issues with instantiating the docker containers, check their logs!

If you have docker desktop installed, simply click on the container, for instance ‘kafka1’ from above screenshot. For instance here I found obscure issues relating to offset replication being too high, which caused silent failures in my app.

Example of logs from Kafka

It can be hard to find relevant things here, but just search for ‘error’ entries perhaps.

Offset explorer

Next very useful tool was offset explorer.

You will be able to use it to check the topics and the consumers as well as see if data is being published:

Here are the configurations I’ve got for plaintext/sasl:

plaintext properties:

plaintext properties

Security is simply plaintext:

plaintext security

Advanced refers to bootstrap servers

JAAS and ACL are both not configured.

Next we setup the SASL_PLAIN configurations (which is what your app will use for instance).

SASL properties:

SASL properties

Next is SASL security – defining the type.

SASL Advanced properties:

The JAAS config:

Note this config should be the same as the one used in application.yml file.

ACL is also empty.