I have used Kafka for the past few years. It is an interesting technology. In this blog, I am going to discuss some useful commands and scripts. There are many blogs and articles about how to install a Kafka cluster. Whether it is single node or multiple node cluster, the command usage is quite similar.In this blog, I am going to show you some useful commands and scripts to quickly test out your kafka topic.
Set Environment
When dealing with –zookeeper or –broker-list, I have to a long list of zookeeper and Kafka broker with port number. To simplify the process, I usually set it in an environment file. For my testing, I am going to create a test topic, wz-test1.
[testuser@wz-vm1 kafka-test]$ cat set-kafka.env
export KAFKAZKHOSTS=zknode1:2181,zknode2:2181,zknode3:2181
export KAFKABROKERS=kbnode1:9092,kbnode2:9092,kbnode3:9092
export KAFKACURTOPIC=dd-wz-test1
Create and list Topic
Here is the script to create a topic, then do the listing of topic.
[testuser@wz-vm1 kafka-test]$ cat create-topic.sh
. set-kafka.env
kafka-topics --create --zookeeper $KAFKAZKHOSTS --replication-factor 3 --partitions 5 --topic $KAFKACURTOPIC
[testuser@wz-vm1 kafka-test]$ cat ./list-topic.sh
. set-kafka.env
kafka-topics --list --zookeeper $KAFKAZKHOSTS
Create a topic.
[testuser@wz-vm1 kafka-test]$ ./create-topic.sh
Created topic "wz-test1".
[testuser@wz-vm1 kafka-test]$ ./list-topic.sh
wz-test1
Describe Topic
[testuser@wz-vm1 kafka-test]$ cat desc-topic.sh
. set-kafka.env
kafka-topics --describe --zookeeper $KAFKAZKHOSTS --topic $KAFKACURTOPIC
[testuser@wz-vm1 kafka-test]$ ./desc-topic.sh
Topic:wz-test1 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: wz-test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: wz-test1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: wz-test1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: wz-test1 Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: wz-test1 Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Delete Topic
[testuser@wz-vm1 kafka-test]$ cat delete-topic.sh
. set-kafka.env
kafka-topics --delete --zookeeper $KAFKAZKHOSTS --topic $1
[testuser@wz-vm1 kafka-test]$ ./list-topic.sh
wz-test1
[testuser@wz-vm1 kafka-test]$ ./delete-topic.sh wz-test1
Topic wz-test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[testuser@wz-vm1 kafka-test]$ ./list-topic.sh
[testuser@wz-vm1 kafka-test]$
Produce a message
[testuser@wz-vm1 kafka-test]$ cat produce-msg.sh
. set-kafka.env
echo "hello msg: `date '+%Y-%m-%d %H:%M:%S'`" | kafka-console-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC
[testuser@wz-vm1 kafka-test]$ ./produce-msg.sh
>>[testuser@wz-vm1 kafka-test]$
Consume message
[testuser@wz-vm1 kafka-test]$ cat consume-msg.sh
. set-kafka.env
kafka-console-consumer --bootstrap-server $KAFKABROKERS --topic $KAFKACURTOPIC --from-beginning
[testuser@wz-vm1 kafka-test]$ ./consume-msg.sh
hello msg: 2019-01-06 19:15:34
^CProcessed a total of 1 messages
Push a file to topic
[testuser@wz-vm1 kafka-test]$ cat test_file.txt
hello1
hello2
hello3
hello4
hello5
[testuser@wz-vm1 kafka-test]$ kafka-console-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC < test_file.txt
>>>>>>
[testuser@wz-vm1 kafka-test]$ ./consume-msg.sh
hello5
hello3
hello1
hello4
hello2
^CProcessed a total of 5 messages
[testuser@wz-vm1 kafka-test]$
Configure Retention Time
The default retention period is 24 hrs or 86400000 milliseconds in Kafka. If you want to delete all messages in a topic, you can set retentions.ms parameter to a small number.
Consume message
[testuser@wz-vm1 kafka-test]$ cat config-retention.sh
. set-kafka.env
kafka-configs --alter --zookeeper $KAFKAZKHOSTS --entity-type topics --entity-name $KAFKACURTOPIC --add-config retention.ms=$1
Set the retention time to 1 minute.
[testuser@wz-vm1 kafka-test]$ ./config-retention.sh 60000
Completed Updating config for entity: topic 'wz-test1'.
[testuser@wz-vm1 kafka-test]$ ./desc-topic.sh
Topic:wz-test1 PartitionCount:5 ReplicationFactor:3 Configs:retention.ms=60000
Topic: wz-test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: wz-test1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: wz-test1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: wz-test1 Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: wz-test1 Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Bulk test of producing messages
If you want to test out the distribution of messages, you can use kafka-verifiable-producer.
[testuser@wz-vm1 kafka-test]$ cat test-producer.sh
. set-kafka.env
kafka-verifiable-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC --max-messages 10
[testuser@wz-vm1 kafka-test]$ ./test-producer.sh
{"timestamp":1552341811025,"name":"startup_complete"}
{"timestamp":1552341811297,"name":"producer_send_success","key":null,"value":"4","offset":1,"topic":"wz-test1","partition":2}
{"timestamp":1552341811300,"name":"producer_send_success","key":null,"value":"9","offset":2,"topic":"wz-test1","partition":2}
{"timestamp":1552341811301,"name":"producer_send_success","key":null,"value":"2","offset":3,"topic":"wz-test1","partition":4}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"7","offset":4,"topic":"wz-test1","partition":4}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"3","offset":1,"topic":"wz-test1","partition":1}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"8","offset":2,"topic":"wz-test1","partition":1}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"0","offset":1,"topic":"wz-test1","partition":3}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"5","offset":2,"topic":"wz-test1","partition":3}
{"timestamp":1552341811303,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"wz-test1","partition":0}
{"timestamp":1552341811303,"name":"producer_send_success","key":null,"value":"6","offset":2,"topic":"wz-test1","partition":0}
{"timestamp":1552341811312,"name":"shutdown_complete"}
{"timestamp":1552341811313,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":34.602076124567475}
Consume the message under the same consumer group
[testuser@wz-vm1 kafka-test]$ cat test-consumer.sh
. set-kafka.env
kafka-verifiable-consumer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC --max-messages 3 --group-id $1
[testuser@wz-vm1 kafka-test]$ ./test-consumer.sh myconsumergrp1
{"timestamp":1552342329954,"name":"startup_complete"}
{"timestamp":1552342330191,"name":"partitions_revoked","partitions":[]}
{"timestamp":1552342330215,"name":"partitions_assigned","partitions":[{"topic":"wz-test1","partition":4},{"topic":"wz-test1","partition":3},{"topic":"wz-test1","partition":2},{"topic":"wz-test1","partition":1},{"topic":"wz-test1","partition":0}]}
^C{"timestamp":1552342343799,"name":"shutdown_complete"}
You must be logged in to post a comment.