Apache Kafka

Get the Metrics for Consumer in Apache Kafka Consumer

The Kafka Consumer metrics are a set of quantitative measurements which describes a Kafka Consumer’s behavior and performance. These metrics can provide valuable insights into the health and efficiency of your Kafka Consumer and can help you monitor and diagnose the issues with your Kafka Consumer as they arise.

Some standard Kafka Consumer metrics include the following:

records-consumed-total: The total number of records consumed by the Consumer.

records-consumed-rate: The rate at which the Consumer is consuming records.

fetch-latency-avg: The average time to fetch a batch of records from the Kafka broker to the Consumer.

fetch-latency-max: The maximum time to fetch a batch of records from the Kafka broker to the Consumer.

records-lag: The number of records produced by the Kafka Producer but not yet consumed by the Consumer.

records-lag-max: The maximum number of records produced by the Kafka Producer but not yet consumed by the Consumer.

In this tutorial, we will learn how to use the Kafka Consumer API to fetch the metrics of a Kafka consumer. You can extend the functionality of this post to the more comprehensive metrics-gathering tool or you can use some external tools such as Prometheus or Grafana.

By tracking and analyzing these metrics, you can better understand your Kafka Consumer’s performance and behavior and take proactive steps to optimize its performance.

Source Code

The following example demonstrates how to use the Python Kafka-Python library to gather the metrics about a consumer that subscribed to a topic called “users”:

import json
from kafka import KafkaConsumer

def gather_consumer_metrics(producer):
# Get the metrics for the consumer
metrics = consumer.metrics()

# Convert the metrics dictionary to a JSON string
metrics_json = json.dumps(metrics)

# Print the consumer metrics as JSON
print(metrics_json)

# Example usage
consumer = KafkaConsumer("users")

gather_consumer_metrics(consumer)

Once completed, you can run the code and pipe the output to a JSON parser such as JQ as shown in the following:

$ python3 consumer_metrics.py | jq

Resulting Output Metrics:

{
  "kafka-metrics-count": {
    "count": 50
  },
  "consumer-metrics": {
    "connection-close-rate": 0,
    "connection-creation-rate": 0.033213687564987555,
    "select-rate": 0,
    "io-wait-time-ns-avg": 0,
    "io-wait-ratio": 0,
    "io-time-ns-avg": 0,
    "io-ratio": 0,
    "connection-count": 1,
    "network-io-rate": 0.13285491939532135,
    "outgoing-byte-rate": 2.2585332634396345,
    "request-rate": 0.06642743761308507,
    "request-size-avg": 34,
    "request-size-max": 36,
    "incoming-byte-rate": 63.69424014976771,
    "response-rate": 0.06666062988696791,
    "request-latency-avg": 53.93648147583008,
    "request-latency-max": 105.57699203491211
  },
  "consumer-node-metrics.node-bootstrap-0": {
    "outgoing-byte-rate": 2.258531139013172,
    "request-rate": 0.06642737620726911,
    "request-size-avg": 34,
    "request-size-max": 36,
    "incoming-byte-rate": 63.69418521020762,
    "response-rate": 0.06666056967657394,
    "request-latency-avg": 53.93648147583008,
    "request-latency-max": 105.57699203491211
  },
  "consumer-fetch-manager-metrics": {
    "fetch-size-avg": 0,
    "fetch-size-max": -1.7976931348623157e+308,
    "bytes-consumed-rate": 0,
    "records-per-request-avg": 0,
    "records-consumed-rate": 0,
    "fetch-latency-avg": 0,
    "fetch-latency-max": -1.7976931348623157e+308,
    "fetch-rate": 0,
    "records-lag-max": -1.7976931348623157e+308,
    "fetch-throttle-time-avg": 0,
    "fetch-throttle-time-max": -1.7976931348623157e+308
  },
  "consumer-coordinator-metrics": {
    "heartbeat-response-time-max": -1.7976931348623157e+308,
    "heartbeat-rate": 0,
    "join-time-avg": 0,
    "join-time-max": -1.7976931348623157e+308,
    "join-rate": 0,
    "sync-time-avg": 0,
    "sync-time-max": -1.7976931348623157e+308,
    "sync-rate": 0,
    "last-heartbeat-seconds-ago": 1.7976931348623157e+308,
    "commit-latency-avg": 0,
    "commit-latency-max": -1.7976931348623157e+308,
    "commit-rate": 0,
    "assigned-partitions": 0
  }
}

Conclusion

A simple Python code to gather the metrics of a Kafka consumer is in JSON format.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list