Benchmarking Kafka: Distributed Workers and Workload topology in OpenMessaging Benchmark

March 31, 2025
Jorge Esteban Quilcate Otoya | how-to
#apache-kafka #performance

This is the second post in a series about benchmarking Apache Kafka using OpenMessaging Benchmark (OMB). In the previous post, the OMB framework was presented, together with guidance on how to run it locally.

Now let’s dive into its distributed mode: how are workers deployed across multiple nodes, and how to define the workload topology.

In local mode, all components run in a single process, which is ideal for getting started and initial testing. However, for production-like performance testing, deploying workers across multiple nodes is key to achieve higher throughput and better represent real-world scenarios.

The distributed mode consists in deploying workers across multiple nodes, each exposing an HTTP endpoint to schedule work and gather stats. The benchmark CLI uses this endpoint to coordinate the workload execution (e.g., create producers on this worker, create consumers on this other one, and collect metrics to aggregate them locally).

[CLI Node]
   |
   | HTTP connections
   |
   v
[Workers, Drivers, Workloads]
   |
   |---------------|---------------|
   v               v               v
[Worker 1]     [Worker 2]      [Worker 3]
   |               |               |
   v               v               v
[Worker 4]     [Worker 5]      [Worker 6]
   |               |               |
   |---------------|---------------|
   v
[Kafka Cluster]

Provisioning Workers

OMB workers can be deployed in any environment, as long as they can access the Kafka cluster. They don’t need to coordinate between each other, but they need to be accessible from the node running the CLI.

The most common way to deploy them seems to be as VMs or containers. The OMB repository provides Terraform scripts[1] to deploy workers on AWS, and Alibaba Cloud; and Ansible playbooks[2] to install the required dependencies and start the workers. These include deploying a Kafka cluster and Zookeeper ensemble, and the OMB workers. Although these are useful, they would mostly serve as a template to start with as you will need to adjust the scripts to fit your environment (e.g., remove Kafka cluster deployment as you may already have one to test, adjust versions, instrumenting workers with metrics agents and profilers, etc.).

To run them as containers, OMB repository also includes a Helm chart[11] to deploy workers on a Kubernetes cluster.

To start a worker, run the following command defining the main port and the port for Prometheus stats collection:

./bin/benchmark-worker \
    --port 8080 \
    --stats-port 8081

The stats endpoint only contains OMB and JVM metrics in Prometheus format; however it does not include Kafka client specific metrics. Prometheus JMX exporter could be used to gather these.

When running the CLI, the workers list have to be set either by listing them directly:

./bin/benchmark \
    --workers http://worker1:8080,http://worker2:8080 \
    --drivers driver.yaml \
    workload.yaml

Or passing them from a file:

./bin/benchmark \
    --worker-file workers.yaml \
    --drivers driver.yaml \
    workload.yaml
workers:
  - http://worker0:8080
  - http://worker1:8080

Resource requirements

The resource requirements for the workers depend on the workload you are trying to run. Over-provisioned instances are usually used in public benchmarks to ensure that the workers are not the bottleneck of the benchmark, here are some examples:

For each test, we did three runs of each workload, each with a 30-minute warm-up. For the clients, we ran on four m5n.8xlarge instances, which ensured guaranteed 25Gbps network bandwidth with 128GB of RAM and 32 vCPUs to ensure our clients were not the bottleneck.

Redpanda vs. Kafka: A performance comparison

Finally, we used m6in.4xl instances to run the WarpStream Agents and m6in.8xl instances to run the openmessaging benchmark (intentionally overprovisioned to avoid any bottlenecks in the clients).

Warpstream Public Benchmarks and TCO analysis

For smaller workloads, you can use smaller instance types (e.g., m5.large) to reduce costs.

When running these load tests regularly, cloud spot instances could be used to reduce costs as long as you are aware of the risks of having them terminated during the test. Usually the benchmark period is short enough to ensure that the spot instances are not terminated, but this is not guaranteed.

Client distribution

Any benchmark workload includes a set of producers and consumers, and these need to be assigned to specific workers. This means that a Worker can be either a producer or a consumer, but not both.

This is the first limitation when trying to run a workload on a distributed setup: the minimum number of workers is 2: one for producers and another for consumers.

The number of workers will be split in half between producers and consumers —unless you have a specific type of workload that requires more consumers than producers (like Consumer Backlog that will be explored on the next post), where you can assign extra workers to consumers (2/3 consumers, 1/3 producers) [3].

[CLI Node]
   |
   | HTTP connections
   |
   v
[Worker List File]
   |
   |---------------|---------------|
   v               v               v
[Worker 1]     [Worker 2]      [Worker 3]
 Producer       Producer       Producer
   |               |               |
   v               v               v
[Worker 4]     [Worker 5]      [Worker 6]
 Consumer       Consumer       Consumer
   |               |               |
   |---------------|---------------|
   v
[Kafka Cluster]

The kafka clients are created on the workers using the configurations provided on the Driver specification, and the workers are responsible for managing the client lifecycle (e.g., start, stop, pause, resume, etc.). We won’t go into the details of the client configuration in this post; for scenarios to optimize for there is this good Confluent documentation to explore: https://docs.confluent.io/cloud/current/client-apps/optimizing/overview.html.

Workers and AZ distribution

OMB workers are usually deployed in the same network as the Kafka cluster to minimize network latency (e.g., same cloud region, and probably using private IPs to reduce inter-AZ costs[4]) .

Running Kafka clusters in a single rack/zone reduces the availability promise of the system. This is why it is recommended to run Kafka clusters across multiple racks/zones to ensure high availability; although the latencies and–specially–the costs attached to this setup are considerably higher [10].

If running clusters on a single rack (e.g., in a single Availability Zone), the default setup may suffice to distribute the load across workers. However, if running a Kafka cluster across multiple zones, you most probably want to deploy workers across zones as well to properly simulate client’s distribution.

Since Follower-fetching feature was added to Kafka, it has started to be possible to flag which zone/rack a client is running on by adding a client.rack configuration. More recently, Kafka-compatible systems that support leaderless producers with AZ-awareness (e.g., Warpstream[5]), use the client.id to gather this metadata from.

There is a hidden (i.e., undocumented[13]) feature in OMB to define a Java system property zone.id:

export JVM_OPTS=-Dzone.id={{ az }}
/opt/benchmark/bin/benchmark-worker

The framework replaces {zone.id} with the value of the zone.id property from the worker process [9].

name: kafka-local
driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver

# Kafka client-specific configuration
commonConfig: |
  bootstrap.servers=localhost:9092
  client.id=omb-client_az={zone.id}
producerConfig: ""
consumerConfig: |
  auto.offset.reset=earliest

OMB could be extended to pass this information to the client.rack as well[12]. This would allow testing another Kafka features like Follower Fetching, and—if KIP-1123[6] is adopted—Producer rack-awareness.

Kafka brokers would need to know how to extract this information and use it for client AZ-awareness.

When distributing workers across zones, consider having N * zones workers—where N is the number of workers per zone (at least 2).

[CLI Node]
   |
   | HTTP connections
   |
   v
[Worker List File]
   |
   |---------------|---------------|
   v               v               v
[Worker 1]     [Worker 2]      [Worker 3]
 Producer       Producer       Producer
  AZ0             AZ1             AZ2
   |               |               |
   v               v               v
[Worker 4]     [Worker 5]      [Worker 6]
 Consumer       Consumer       Consumer
  AZ0             AZ1             AZ2
   |               |               |
   v               v               v
[Broker 0]     [Broker 1]      [Broker 2]
  AZ0             AZ1             AZ2
   ^               ^               ^
   |---------------|---------------|

Workload topology

Number of topics, partition, producers and consumers define how the benchmark works end-to-end. These are defined as part of the Workload specification.

Topic topology

In OMB, topic topology is pretty straightforward to configure: you define the number of topics and the number of partitions per topic.

topics: 10
partitionsPerTopic: 12

In this example, 10 topics are created, each with 12 partitions: new 120 partitions to be created.

Though minimal, these configurations have subtle implications on the benchmark’s scheduling and execution. Let’s look into those.

Producer and consumer topology

The number of producers and consumer groups are defined by the number of topics:

# producers
producersPerTopic: 6
# consumers
subscriptionsPerTopic: 3
consumerPerSubscription: 6

In this example, each topic will have 6 producers per topic; and 3 consumer groups, each with 6 consumers, per topic: 60 producer clients; and 30 consumer groups, 180 consumer instances in total.

Knowing that workers are split in half between producers and consumers is helpful to ensure that producer assignments (producers * number of topics) and consumer assignments (subscription * consumers per subscription * number of topics) are properly distributed across the workers.

There is not guarantee on assignment ordering though. Each assignment is shuffled before being distributed across the workers [7] [8].

I haven’t covered Workload examples in more detail as I plan to dive into that in a separate post where we will look into how to define the workload and experiment with different workload types.

Troubleshooting

You may hit some common issues while trying to start your benchmark on a distributed OMB setup. Workers may fail to start, to connect to a Kafka cluster, to create a topic, etc.

A good starting point to troubleshoot them is to look into the benchmark-worker.log file at the OMB installation directory.

To get an actual failure on the CLI you may need to wait for a minute until workers HTTP client request timeout or kafka clients default.api.timeout.ms is reached.

In some scenarios the workers may be the bottleneck of the benchmark (e.g., small instance types, not enough resources, non-optimized configurations, etc.). Low level metrics (e.g., CPU, memory, network, disk) can help identify these issues, but it is useful to monitor the worker client-level metrics (e.g., producer/consumer lag, throughput, etc.) to identify bottlenecks at the client level; for instance using Prometheus JMX exporter.

If a benchmark breaks in the middle of an execution, it may not be obvious. For instance, at some point the throughput may drop to 0, but the benchmark may not be stopped. This is because the benchmark is still running, but the workers have stalled. You will need to check the logs and look into your Kafka cluster to identify the issue.

After starting a benchmark, and create the topics, topics will stay there—even if the benchmark is stopped or broken. If needed, make sure to add a cleanup step to remove the topics created by the benchmark.

After running multiple benchmarks, I needed a way to identify which topics were created by the benchmark as all of them follow the same naming convention, so I have this patch on my fork to log the topic names created by the benchmark.

Conclusion

In this post we have covered how to deploy OMB workers across multiple nodes, how to distribute them across zones, and how to define the workload topology. We have also covered some of the common issues you may encounter when running a distributed benchmark.

This is the second post in a series about benchmarking Apache Kafka using OMB. In the next post we will look into how to define the workload and experiment with different workload types. This should cover how the benchmark execution phases work, how to define the workload type, and how do they look like.

References