Some standard Kafka Consumer metrics include the following:
records-consumed-total: The total number of records consumed by the Consumer.
records-consumed-rate: The rate at which the Consumer is consuming records.
fetch-latency-avg: The average time to fetch a batch of records from the Kafka broker to the Consumer.
fetch-latency-max: The maximum time to fetch a batch of records from the Kafka broker to the Consumer.
records-lag: The number of records produced by the Kafka Producer but not yet consumed by the Consumer.
records-lag-max: The maximum number of records produced by the Kafka Producer but not yet consumed by the Consumer.
In this tutorial, we will learn how to use the Kafka Consumer API to fetch the metrics of a Kafka consumer. You can extend the functionality of this post to the more comprehensive metrics-gathering tool or you can use some external tools such as Prometheus or Grafana.
By tracking and analyzing these metrics, you can better understand your Kafka Consumer’s performance and behavior and take proactive steps to optimize its performance.
Source Code
The following example demonstrates how to use the Python Kafka-Python library to gather the metrics about a consumer that subscribed to a topic called “users”:
from kafka import KafkaConsumer
def gather_consumer_metrics(producer):
# Get the metrics for the consumer
metrics = consumer.metrics()
# Convert the metrics dictionary to a JSON string
metrics_json = json.dumps(metrics)
# Print the consumer metrics as JSON
print(metrics_json)
# Example usage
consumer = KafkaConsumer("users")
gather_consumer_metrics(consumer)
Once completed, you can run the code and pipe the output to a JSON parser such as JQ as shown in the following:
Resulting Output Metrics:
"kafka-metrics-count": {
"count": 50
},
"consumer-metrics": {
"connection-close-rate": 0,
"connection-creation-rate": 0.033213687564987555,
"select-rate": 0,
"io-wait-time-ns-avg": 0,
"io-wait-ratio": 0,
"io-time-ns-avg": 0,
"io-ratio": 0,
"connection-count": 1,
"network-io-rate": 0.13285491939532135,
"outgoing-byte-rate": 2.2585332634396345,
"request-rate": 0.06642743761308507,
"request-size-avg": 34,
"request-size-max": 36,
"incoming-byte-rate": 63.69424014976771,
"response-rate": 0.06666062988696791,
"request-latency-avg": 53.93648147583008,
"request-latency-max": 105.57699203491211
},
"consumer-node-metrics.node-bootstrap-0": {
"outgoing-byte-rate": 2.258531139013172,
"request-rate": 0.06642737620726911,
"request-size-avg": 34,
"request-size-max": 36,
"incoming-byte-rate": 63.69418521020762,
"response-rate": 0.06666056967657394,
"request-latency-avg": 53.93648147583008,
"request-latency-max": 105.57699203491211
},
"consumer-fetch-manager-metrics": {
"fetch-size-avg": 0,
"fetch-size-max": -1.7976931348623157e+308,
"bytes-consumed-rate": 0,
"records-per-request-avg": 0,
"records-consumed-rate": 0,
"fetch-latency-avg": 0,
"fetch-latency-max": -1.7976931348623157e+308,
"fetch-rate": 0,
"records-lag-max": -1.7976931348623157e+308,
"fetch-throttle-time-avg": 0,
"fetch-throttle-time-max": -1.7976931348623157e+308
},
"consumer-coordinator-metrics": {
"heartbeat-response-time-max": -1.7976931348623157e+308,
"heartbeat-rate": 0,
"join-time-avg": 0,
"join-time-max": -1.7976931348623157e+308,
"join-rate": 0,
"sync-time-avg": 0,
"sync-time-max": -1.7976931348623157e+308,
"sync-rate": 0,
"last-heartbeat-seconds-ago": 1.7976931348623157e+308,
"commit-latency-avg": 0,
"commit-latency-max": -1.7976931348623157e+308,
"commit-rate": 0,
"assigned-partitions": 0
}
}
Conclusion
A simple Python code to gather the metrics of a Kafka consumer is in JSON format.