CLI Usage

Command-line tools for producing, consuming, and inspecting Kafka.

Producer

Produce messages to topics with optional schema serialization.

Basic usage

gafkalo --config config.yaml produce TOPIC_NAME

Reads from stdin. One message per line.

With schema serialization:

gafkalo --config config.yaml produce events.user.login \
  --serialize \
  --key-schema-file schemas/user-key.avsc \
  --value-schema-file schemas/login-event.avsc \
  --separator '|'

Input format: key|value

Example:

{"user_id":"u123"}|{"user_id":"u123","timestamp":1234567890,"success":true}

Options

--idempotent

Enable idempotent producer. Requires DeveloperWrite on Cluster resource.

--acks

Acks to require. Default: -1 (all).

--separator

Character separating key from value. Default: none (whole input is value).

--serialize

Serialize using Avro schema registry.

--key-schema-file

Path to key Avro schema (.avsc).

--value-schema-file

Path to value Avro schema (.avsc).

--tombstone

Produce tombstone (null value). Input used as key.

--whole-file

Produce entire file as single message.

Examples

String messages:

echo "hello world" | gafkalo --config config.yaml produce test-topic

With key:

echo "key1:value1" | gafkalo --config config.yaml produce test-topic --separator ':'

Serialized Avro:

echo '{"id":"123"}|{"name":"test","value":42}' | \
  gafkalo --config config.yaml produce events.test \
    --serialize \
    --key-schema-file key.avsc \
    --value-schema-file value.avsc \
    --separator '|'

Tombstone:

echo "user123" | gafkalo --config config.yaml produce state.users --tombstone

Large file:

gafkalo --config config.yaml produce test-topic --whole-file /path/to/data.json

Consumer

Consume messages from topics with optional deserialization.

Basic usage

gafkalo --config config.yaml consumer TOPIC_NAME

Consumes from beginning by default.

With deserialization:

gafkalo --config config.yaml consumer events.user.login \
  --deserialize-key \
  --deserialize-value

Options

--deserialize-key

Deserialize key using schema registry.

--deserialize-value

Deserialize value using schema registry.

--max-records

Stop after N records.

--record-template

Path to Go template for custom formatting.

--output-format

Output format: text, json.

Examples

Read 10 records:

gafkalo --config config.yaml consumer events.orders --max-records 10

Deserialize Avro:

gafkalo --config config.yaml consumer events.orders \
  --deserialize-key \
  --deserialize-value

JSON output:

gafkalo --config config.yaml consumer events.orders \
  --output-format json \
  --deserialize-value

Custom template

Create template file record.tpl:

TOPIC: {{.Topic}} | OFFSET: {{.Offset}} | KEY: {{.Key}} | VALUE: {{.Value}}

Use it:

gafkalo --config config.yaml consumer events.orders \
  --record-template record.tpl

Template context:

type CustomRecordTemplateContext struct {
    Topic       string
    Key         string
    Value       string
    Timestamp   time.Time
    Partition   int32
    Offset      int64
    KeySchemaID int
    ValSchemaID int
    Headers     map[string]string
}

Headers

Headers displayed automatically:

Headers[[trace-id:abc123 content-type:application/json]] Topic[events.orders] Offset[42] ...

JSON format:

{
  "headers": {
    "trace-id": "abc123",
    "content-type": "application/json"
  },
  "topic": "events.orders",
  "offset": 42
}

Replicator

Copy topics within or across clusters.

Same cluster

gafkalo --config config.yaml replicator \
  --source-topic old-topic \
  --dest-topic new-topic

Cross-cluster

gafkalo --config source-cluster.yaml replicator \
  --source-topic events.orders \
  --dest-topic events.orders \
  --dest-config dest-cluster.yaml

Behavior:

  • Default consumer group: gafkalo-replicator

  • Idempotent mode enabled (requires permissions)

  • Resumable (uses consumer group offsets)

Consumer groups

Manage consumer group offsets.

Reset offsets

gafkalo --config config.yaml consumer events.orders \
  --group my-consumer-group \
  --reset-offsets \
  --partition 0:100 \
  --partition 1:200

Sets partition 0 to offset 100, partition 1 to offset 200.

Topic management

List topics

# Plain list
gafkalo --config config.yaml topic list

# Table format with metadata
gafkalo --config config.yaml topic list --output-format table

# Filter by pattern
gafkalo --config config.yaml topic list --pattern '^events-'

Create topic

gafkalo --config config.yaml topic create -n my-topic \
  --partitions 12 \
  --replication-factor 3 \
  -c retention.ms=604800000

Describe topic

gafkalo --config config.yaml topic describe events.orders

Change partitions

# Plan changes (dry-run)
gafkalo --config config.yaml topic partitions events.orders \
  --count 24 \
  --factor 3 \
  --plan

# Execute changes
gafkalo --config config.yaml topic partitions events.orders \
  --count 24 \
  --factor 3 \
  --execute

See topics documentation for detailed usage.

Topic linting

See topics documentation for linting commands.

Schema utilities

See schemas documentation for schema CLI commands.

Connect management

See connectors documentation for Connect CLI commands.

Global options

--config

Path to config YAML. Required for all commands.

--verbose

Enable verbose logging.

--version

Show version and exit.

Exit codes

  • 0: Success

  • 1: Error

Useful for scripts:

#!/bin/bash
if ! gafkalo --config prod.yaml topic list; then
  echo "Failed to connect to Kafka"
  exit 1
fi