CLI Usage¶
Command-line tools for producing, consuming, and inspecting Kafka.
Producer¶
Produce messages to topics with optional schema serialization.
Basic usage¶
gafkalo --config config.yaml produce TOPIC_NAME
Reads from stdin. One message per line.
With schema serialization:
gafkalo --config config.yaml produce events.user.login \
--serialize \
--key-schema-file schemas/user-key.avsc \
--value-schema-file schemas/login-event.avsc \
--separator '|'
Input format: key|value
Example:
{"user_id":"u123"}|{"user_id":"u123","timestamp":1234567890,"success":true}
Options¶
--idempotentEnable idempotent producer. Requires DeveloperWrite on Cluster resource.
--acksAcks to require. Default:
-1(all).--separatorCharacter separating key from value. Default: none (whole input is value).
--serializeSerialize using Avro schema registry.
--key-schema-filePath to key Avro schema (.avsc).
--value-schema-filePath to value Avro schema (.avsc).
--tombstoneProduce tombstone (null value). Input used as key.
--whole-fileProduce entire file as single message.
Examples¶
String messages:
echo "hello world" | gafkalo --config config.yaml produce test-topic
With key:
echo "key1:value1" | gafkalo --config config.yaml produce test-topic --separator ':'
Serialized Avro:
echo '{"id":"123"}|{"name":"test","value":42}' | \
gafkalo --config config.yaml produce events.test \
--serialize \
--key-schema-file key.avsc \
--value-schema-file value.avsc \
--separator '|'
Tombstone:
echo "user123" | gafkalo --config config.yaml produce state.users --tombstone
Large file:
gafkalo --config config.yaml produce test-topic --whole-file /path/to/data.json
Consumer¶
Consume messages from topics with optional deserialization.
Basic usage¶
gafkalo --config config.yaml consumer TOPIC_NAME
Consumes from beginning by default.
With deserialization:
gafkalo --config config.yaml consumer events.user.login \
--deserialize-key \
--deserialize-value
Options¶
--deserialize-keyDeserialize key using schema registry.
--deserialize-valueDeserialize value using schema registry.
--max-recordsStop after N records.
--record-templatePath to Go template for custom formatting.
--output-formatOutput format:
text,json.
Examples¶
Read 10 records:
gafkalo --config config.yaml consumer events.orders --max-records 10
Deserialize Avro:
gafkalo --config config.yaml consumer events.orders \
--deserialize-key \
--deserialize-value
JSON output:
gafkalo --config config.yaml consumer events.orders \
--output-format json \
--deserialize-value
Custom template¶
Create template file record.tpl:
TOPIC: {{.Topic}} | OFFSET: {{.Offset}} | KEY: {{.Key}} | VALUE: {{.Value}}
Use it:
gafkalo --config config.yaml consumer events.orders \
--record-template record.tpl
Template context:
type CustomRecordTemplateContext struct {
Topic string
Key string
Value string
Timestamp time.Time
Partition int32
Offset int64
KeySchemaID int
ValSchemaID int
Headers map[string]string
}
Headers¶
Headers displayed automatically:
Headers[[trace-id:abc123 content-type:application/json]] Topic[events.orders] Offset[42] ...
JSON format:
{
"headers": {
"trace-id": "abc123",
"content-type": "application/json"
},
"topic": "events.orders",
"offset": 42
}
Replicator¶
Copy topics within or across clusters.
Same cluster¶
gafkalo --config config.yaml replicator \
--source-topic old-topic \
--dest-topic new-topic
Cross-cluster¶
gafkalo --config source-cluster.yaml replicator \
--source-topic events.orders \
--dest-topic events.orders \
--dest-config dest-cluster.yaml
Behavior:
Default consumer group:
gafkalo-replicatorIdempotent mode enabled (requires permissions)
Resumable (uses consumer group offsets)
Consumer groups¶
Manage consumer group offsets.
Reset offsets¶
gafkalo --config config.yaml consumer events.orders \
--group my-consumer-group \
--reset-offsets \
--partition 0:100 \
--partition 1:200
Sets partition 0 to offset 100, partition 1 to offset 200.
Topic management¶
List topics¶
# Plain list
gafkalo --config config.yaml topic list
# Table format with metadata
gafkalo --config config.yaml topic list --output-format table
# Filter by pattern
gafkalo --config config.yaml topic list --pattern '^events-'
Create topic¶
gafkalo --config config.yaml topic create -n my-topic \
--partitions 12 \
--replication-factor 3 \
-c retention.ms=604800000
Describe topic¶
gafkalo --config config.yaml topic describe events.orders
Change partitions¶
# Plan changes (dry-run)
gafkalo --config config.yaml topic partitions events.orders \
--count 24 \
--factor 3 \
--plan
# Execute changes
gafkalo --config config.yaml topic partitions events.orders \
--count 24 \
--factor 3 \
--execute
See topics documentation for detailed usage.
Topic linting¶
See topics documentation for linting commands.
Schema utilities¶
See schemas documentation for schema CLI commands.
Connect management¶
See connectors documentation for Connect CLI commands.
Global options¶
--configPath to config YAML. Required for all commands.
--verboseEnable verbose logging.
--versionShow version and exit.
Exit codes¶
0: Success1: Error
Useful for scripts:
#!/bin/bash
if ! gafkalo --config prod.yaml topic list; then
echo "Failed to connect to Kafka"
exit 1
fi