Usage

Maintain topics, schemas and RBAC

gafkalo can maintain the topics, schemas and RBAC rolebindings for a cluster.

In the configuration file you point it to the YAML definitions and it will attempt to create or update any resources needed.

It will investigate the cluster’s state in order to decide if something needs creating or updating (or no action needed).

To generate a plan (a list of changes that it will perform, without actually performing them)

gafkalo plan --config myconfig.yaml

This will produce a plan output with the changes that gafkalo will make.

Once you are satisfied with the plan you can let gafkalo apply the changes:

gafkalo apply --config myconfig.yaml

This command will read all the yaml files specified in the config , merge them and apply them. It will produce a report at the end.

Producer

gafkalo can be used as a producer to aid in testing, development and troubleshooting.

gafkalo –config dev.yaml produce “SKATA1” –idempotent –separator=! –serialize –value-schema-file data/schemas/schema.json –key-schema-file data/schemas/schema.json

Available options:

--idempotent

Enable idempotence in the producer. In an RBAC enabled cluster this requires DeveloperWrite in the Cluster resource

--acks

acks to require from brokers. Defaults to -1 (all).

--separator

a character the producer will use to separate the key part from the value part of the user supplied input

--serialize

serialize the input. The latest schema in the schema registry will be used. Provide –user-schema and/or –key-schema if you want to produce according to a new schema (will be registered)

--key-schema

the key schema to use (avsc file)

--value-schema

the value schema to use (avsc file)

--tombstone

Produce a tombstone message. The input will be used as record key and the value will be automatically null.

--whole-file

Point to a file and produce the whole file as a single kafka record. (for example, to test big messages, etc)

Consumer

gafkalo can be used as a consumer.

Example 1: Read from SKATA1 and deserialize the value using schema registry

gafkalo --config dev.yaml consumer SKATA1

The consumer by default will emit the raw record. IF the records are serialized using Schema Registry you can add the parameters –deserialize-key and –deserialize-value to deserialize key and value respectively. The consumer will then utilize the schema registry to fetch the corresponding schema , deserialize and display the deserialized output.

gafkalo --config dev.yaml consumer SKATA1 --deserialize-key --deserialize-value

It is possible to define a user provided Golang template to format records as they come in. using the –record-template parameter.

If you only want to read a certain number of records and then stop you can add –max-records=<number>.

$ ./gafkalo --config dev.yaml consumer SKATA2 --max-records=4
Topic[SKATA2] Offset[5] Timestamp[2021-06-30 13:01:05.893 +0200 CEST]: Key:=, Value:asd
Topic[SKATA2] Offset[6] Timestamp[2021-06-30 13:01:06.701 +0200 CEST]: Key:=, Value:sdf
Topic[SKATA2] Offset[7] Timestamp[2021-06-30 13:01:07.191 +0200 CEST]: Key:=, Value:sdf
Topic[SKATA2] Offset[8] Timestamp[2021-06-30 13:01:07.658 +0200 CEST]: Key:=, Value:sdf
2021/06/30 13:01:07 terminating..

The template will be passed the CustomRecordTemplateContext context and all fields will be made available to the user to format as needed

Example:

CUSTOMRECORD: {{ .Value }} The key is : {{.Key }} with Offset:{{ .Offset }}

Then produce some records and you will see:

$ ./gafkalo --config dev.yaml consumer SKATA2 --record-template testdata/files/recordtemplate.tpl
CUSTOMRECORD: sdfsdf The key is :  with Offset:0
CUSTOMRECORD: sgfdg The key is :  with Offset:1
CUSTOMRECORD: dfgdf The key is :  with Offset:2
CUSTOMRECORD: gdf The key is :  with Offset:3
CUSTOMRECORD: gdfg The key is :  with Offset:4

The context passed to that template is a CustomRecordTemplateContext with this definition (please check source for staleness of this document):

type CustomRecordTemplateContext struct {
   Topic       string
   Key         string
   Value       string
   Timestamp   time.Time
   Partition   int32
   Offset      int64
   KeySchemaID int // The schema registry ID of the Key schema
   ValSchemaID int // The Schema registry ID of the Value schema
}

Schemas

gafkalo has some schema related CLI functions.

schema schema-diff can get compare a subject+version on the schema registry against a JSON file on disk, and tell if they match or give you a visual diff. Useful to identify why schema is detected as changed etc

schema check-exists can check if a provided schema on disk, is already registered on the provided subject name. If it is, it will return under which version and what is the ID of the schema.

Topics

topic command allows you to manage topics.

describe subcommand outputs the description of a Topic (partitions, replication, configuration).

Example:

./gafkalo --config mycluster.yaml topic describe TOPIC1

Topic: TOPIC1
ReplicationFactor 3 , Partitions: 1
Configs: cleanup.policy=delete, compression.type=producer, confluent.key.schema.validation=false, confluent.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy, confluent.placement.constraints=, confluent.prefer.tier.fetch.ms=-1, confluent.segment.speculative.prefetch.enable=false, confluent.tier.enable=false, confluent.tier.local.hotset.bytes=-1, confluent.tier.local.hotset.ms=86400000, confluent.tier.segment.hotset.roll.min.bytes=104857600, confluent.value.schema.validation=false, confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy, delete.retention.ms=86400000, file.delete.delay.ms=60000, flush.messages=9223372036854775807, flush.ms=9223372036854775807, follower.replication.throttled.replicas=, index.interval.bytes=4096, leader.replication.throttled.replicas=, max.compaction.lag.ms=9223372036854775807, max.message.bytes=1048588, message.downconversion.enable=true, message.format.version=2.7-IV2, message.timestamp.difference.max.ms=9223372036854775807, message.timestamp.type=CreateTime, min.cleanable.dirty.ratio=0.5, min.compaction.lag.ms=0, min.insync.replicas=1, preallocate=false, retention.bytes=-1, retention.ms=604800000, segment.bytes=1073741824, segment.index.bytes=10485760, segment.jitter.ms=0, segment.ms=604800000, unclean.leader.election.enable=false,
Partition 0: Leader ID 0. Followers: [1,2]

Topic linter

There is minimal support for a topic linter

The idea is to parse the topic configs and give you errors or warnings for them. For example you may have replication settings not indicated for production setup, or a tombstone retention setting with a topic that does not use compacion (and is therefore meaningless and indicative of a possible mistake).

Running linter against YAML

Can be run with:

gafkalo plan --config myconfig.yaml lint

and will produce a report like this:

SKATA.VROMIA.LIGO has WARNING: min.insync.replicas not defined (Hint: Setting min.insync.replicas to 2 or higher will reduce chances of data-loss)
SKATA3 has ERROR: Replication factor < 2. Possible downtime (Hint: Increase replication factor to 3)

Ideally the user should be able to define custom rules in a future version..

Running linter against Broker topics

The Linter can also be run against the topics in the running brokers instead of the yaml. This is useful in detecting misconfigurations or before running a rolling restart.

The command is lint-broker

gafkalo --config myconfig.yaml lint-broker
confluent-audit-log-events has ERROR: Replication factor < 2. Possible downtime (Hint: Increase replication factor to 3)
confluent-audit-log-events has WARNING: min.insync.replicas not defined (Hint: Setting min.insync.replicas to 2 or higher will reduce chances of data-loss)
_confluent-metadata-auth has ERROR: Replication factor < 2. Possible downtime (Hint: Increase replication factor to 3)
_confluent-metadata-auth has WARNING: min.insync.replicas not defined (Hint: Setting min.insync.replicas to 2 or higher will reduce chances of data-loss)

Replicator

Gafkalo can be used to replicate a topic into another topic. A usage scenario would be to copy topic A into topic B (since renaming is not supported by Kafka)

Example usage:

./gafkalo --config dev.yaml replicator --source-topic SKATA1 --dest-topic SKATA2

It can also do from between two clusters. In that case the –config will be the source cluster and the –dest-config= will be the destination’s cluster YAML connecton config.

./gafkalo --config dev.yaml replicator --source-topic SKATA1 --dest-topic SKATA2

The replicator will default to the group ID gafkalo-replicator. This is in contrast to the consumer command (that generates random suffixes). This is done to easily support resuming the replicator process and it will use the recoded offsets to resume where it left off.

Additionaly the replicator defaults to Idempotemt mode, so the required permissions need to be set.

Connnect CLI

Gafkalo can also provide some a CLI interface to manage the Connect framework.

The configuration YAML needs the relevant snippet to know how to reach and authenticate to connect.

An full example with authentication and custom TLS CA

connect:
  url: "http://localhost:8083"
  username: "username"
  password: "password""
  caPath: /path/to/ca.pem
  skipVerify: false
$ ./gafkalo --config conf.yaml connect create  /path/to/connector_definition.json
$ ./gafkalo --config conf.yaml connect list
┌───┬────────────────────┐
│ # │ CONNECTOR NAME     │
├───┼────────────────────┤
│ 0 │ replicator1jschema │
└───┴────────────────────┘
$ ./gafkalo --config conf.yaml connect delete replicator1jschema
Deleted connector replicator1jschema

In this example we see a Confluent replicator connector in a simple configuration. There are no tasks running for this connector yet

$ ./gafkalo --config conf.yaml connect describe replicator1jschema

Connector: replicator1jschema
┌─────────────────────────────────────────┬───────────────────────────────────────────────────────────────────────────┐
│ CONFIG NAME                             │ CONFIG VALUE                                                              │
├─────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────┤
│ connector.class                         │ io.confluent.connect.replicator.ReplicatorSourceConnector                 │
│ errors.log.include.messages             │ true                                                                      │
│ tasks.max                               │ 1                                                                         │
│ topic.rename.format                     │ ${topic}.mirror                                                           │
│ dest.kafka.security.protocol            │ PLAINTEXT                                                                 │
│ src.consumer.interceptor.classes        │ io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor │
│ src.kafka.client.id                     │ connector1-noschema                                                       │
│ topic.auto.create                       │ true                                                                      │
│ src.value.converter.schema.registry.url │ http://localhost:8081                                                     │
│ dest.kafka.client.id                    │ connector1-noschema                                                       │
│ errors.log.enable                       │ true                                                                      │
│ topic.regex                             │ raw.*                                                                     │
│ src.value.converter.schemas.enable      │ false                                                                     │
│ value.converter.schema.registry.url     │ http://localhost:8081                                                     │
│ src.kafka.security.protocol             │ PLAINTEXT                                                                 │
│ dest.kafka.bootstrap.servers            │ localhost:9092                                                            │
│ src.value.converter                     │ io.confluent.connect.json.JsonSchemaConverter                             │
│ schema.registry.url                     │ http://localhost:8081                                                     │
│ name                                    │ replicator1jschema                                                        │
│ topic.preserve.partitions               │ true                                                                      │
│ value.converter                         │ io.confluent.connect.avro.AvroConverter                                   │
│ key.converter                           │ org.apache.kafka.connect.converters.ByteArrayConverter                    │
│ src.kafka.bootstrap.servers             │ localhost:9095                                                            │
└─────────────────────────────────────────┴───────────────────────────────────────────────────────────────────────────┘
┌────┬────────┬────────┬────────────┐
│ ID │ STATUS │ WORKER │ IS RUNNING │
├────┼────────┼────────┼────────────┤
└────┴────────┴────────┴────────────┘

The tool can check a connector’s status and restart the connector itself and any failed tasks it discovers.

$ ./gafkalo --config conf.yaml connect heal replicator1jschema

This will do a REST call to restart any task that does not have a status of RUNNING.

gafkalo can quickly check all connectors for health and report back any errors. This is will provide a usefuly, brief information when all you want to know if if everything is OK

$ ./gafkalo --config conf.yaml connect health-check