Useful Scripts for Kafka

I have used Kafka for the past few years. It is an interesting technology. In this blog, I am going to discuss some useful commands and scripts. There are many blogs and articles about how to install a Kafka cluster. Whether it is single node or multiple node cluster, the command usage is quite similar.In this blog, I am going to show you some useful commands and scripts to quickly test out your kafka topic.

Set Environment
When dealing with –zookeeper or –broker-list, I have to a long list of zookeeper and Kafka broker with port number. To simplify the process, I usually set it in an environment file. For my testing, I am going to create a test topic, wz-test1.

[testuser@wz-vm1 kafka-test]$ cat set-kafka.env
export KAFKAZKHOSTS=zknode1:2181,zknode2:2181,zknode3:2181
export KAFKABROKERS=kbnode1:9092,kbnode2:9092,kbnode3:9092
export KAFKACURTOPIC=dd-wz-test1

Create and list Topic
Here is the script to create a topic, then do the listing of topic.

[testuser@wz-vm1 kafka-test]$ cat create-topic.sh
. set-kafka.env
kafka-topics --create --zookeeper $KAFKAZKHOSTS --replication-factor 3 --partitions 5 --topic $KAFKACURTOPIC

[testuser@wz-vm1 kafka-test]$ cat ./list-topic.sh
. set-kafka.env
kafka-topics --list --zookeeper $KAFKAZKHOSTS 

Create a topic.

[testuser@wz-vm1 kafka-test]$ ./create-topic.sh
Created topic "wz-test1".

[testuser@wz-vm1 kafka-test]$ ./list-topic.sh
wz-test1

Describe Topic

[testuser@wz-vm1 kafka-test]$ cat desc-topic.sh
. set-kafka.env
kafka-topics --describe --zookeeper $KAFKAZKHOSTS --topic $KAFKACURTOPIC

[testuser@wz-vm1 kafka-test]$ ./desc-topic.sh
Topic:wz-test1  PartitionCount:5        ReplicationFactor:3     Configs:
        Topic: wz-test1 Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: wz-test1 Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: wz-test1 Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: wz-test1 Partition: 3    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: wz-test1 Partition: 4    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2

Delete Topic

[testuser@wz-vm1 kafka-test]$ cat delete-topic.sh
. set-kafka.env
kafka-topics --delete --zookeeper $KAFKAZKHOSTS --topic $1
[testuser@wz-vm1 kafka-test]$ ./list-topic.sh 
wz-test1

[testuser@wz-vm1 kafka-test]$ ./delete-topic.sh wz-test1
Topic wz-test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

[testuser@wz-vm1 kafka-test]$ ./list-topic.sh 
[testuser@wz-vm1 kafka-test]$

Produce a message

[testuser@wz-vm1 kafka-test]$ cat produce-msg.sh
. set-kafka.env
echo "hello msg: `date '+%Y-%m-%d %H:%M:%S'`" | kafka-console-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC

[testuser@wz-vm1 kafka-test]$ ./produce-msg.sh
>>[testuser@wz-vm1 kafka-test]$

Consume message

[testuser@wz-vm1 kafka-test]$ cat consume-msg.sh
. set-kafka.env
kafka-console-consumer --bootstrap-server $KAFKABROKERS --topic $KAFKACURTOPIC --from-beginning

[testuser@wz-vm1 kafka-test]$ ./consume-msg.sh
hello msg: 2019-01-06 19:15:34
^CProcessed a total of 1 messages

Push a file to topic

[testuser@wz-vm1 kafka-test]$ cat test_file.txt
hello1
hello2
hello3
hello4
hello5

[testuser@wz-vm1 kafka-test]$  kafka-console-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC < test_file.txt
>>>>>>

[testuser@wz-vm1 kafka-test]$ ./consume-msg.sh
hello5
hello3
hello1
hello4
hello2
^CProcessed a total of 5 messages
[testuser@wz-vm1 kafka-test]$

Configure Retention Time
The default retention period is 24 hrs or 86400000 milliseconds in Kafka. If you want to delete all messages in a topic, you can set retentions.ms parameter to a small number.

Consume message

[testuser@wz-vm1 kafka-test]$ cat config-retention.sh
. set-kafka.env
kafka-configs --alter --zookeeper $KAFKAZKHOSTS --entity-type topics --entity-name $KAFKACURTOPIC --add-config retention.ms=$1

Set the retention time to 1 minute.

[testuser@wz-vm1 kafka-test]$ ./config-retention.sh 60000
Completed Updating config for entity: topic 'wz-test1'.

[testuser@wz-vm1 kafka-test]$ ./desc-topic.sh
Topic:wz-test1  PartitionCount:5        ReplicationFactor:3     Configs:retention.ms=60000
        Topic: wz-test1 Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: wz-test1 Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: wz-test1 Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: wz-test1 Partition: 3    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: wz-test1 Partition: 4    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2

Bulk test of producing messages
If you want to test out the distribution of messages, you can use kafka-verifiable-producer.

[testuser@wz-vm1 kafka-test]$ cat test-producer.sh
. set-kafka.env
kafka-verifiable-producer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC --max-messages 10

[testuser@wz-vm1 kafka-test]$ ./test-producer.sh
{"timestamp":1552341811025,"name":"startup_complete"}
{"timestamp":1552341811297,"name":"producer_send_success","key":null,"value":"4","offset":1,"topic":"wz-test1","partition":2}
{"timestamp":1552341811300,"name":"producer_send_success","key":null,"value":"9","offset":2,"topic":"wz-test1","partition":2}
{"timestamp":1552341811301,"name":"producer_send_success","key":null,"value":"2","offset":3,"topic":"wz-test1","partition":4}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"7","offset":4,"topic":"wz-test1","partition":4}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"3","offset":1,"topic":"wz-test1","partition":1}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"8","offset":2,"topic":"wz-test1","partition":1}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"0","offset":1,"topic":"wz-test1","partition":3}
{"timestamp":1552341811302,"name":"producer_send_success","key":null,"value":"5","offset":2,"topic":"wz-test1","partition":3}
{"timestamp":1552341811303,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"wz-test1","partition":0}
{"timestamp":1552341811303,"name":"producer_send_success","key":null,"value":"6","offset":2,"topic":"wz-test1","partition":0}
{"timestamp":1552341811312,"name":"shutdown_complete"}
{"timestamp":1552341811313,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":34.602076124567475}

Consume the message under the same consumer group

[testuser@wz-vm1 kafka-test]$ cat test-consumer.sh
. set-kafka.env
kafka-verifiable-consumer --broker-list $KAFKABROKERS --topic $KAFKACURTOPIC --max-messages 3 --group-id $1

[testuser@wz-vm1 kafka-test]$ ./test-consumer.sh myconsumergrp1
{"timestamp":1552342329954,"name":"startup_complete"}
{"timestamp":1552342330191,"name":"partitions_revoked","partitions":[]}
{"timestamp":1552342330215,"name":"partitions_assigned","partitions":[{"topic":"wz-test1","partition":4},{"topic":"wz-test1","partition":3},{"topic":"wz-test1","partition":2},{"topic":"wz-test1","partition":1},{"topic":"wz-test1","partition":0}]}
^C{"timestamp":1552342343799,"name":"shutdown_complete"}
Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s