Maintaining Schemas¶
Manage Avro schemas and compatibility settings via YAML.
YAML definition¶
Schemas are defined alongside topics:
topics:
- name: events.user.login
partitions: 6
replication_factor: 3
configs:
cleanup.policy: delete
min.insync.replicas: 2
key:
schema: "schemas/user-key.avsc"
compatibility: BACKWARD
value:
schema: "schemas/login-event.avsc"
compatibility: BACKWARD_TRANSITIVE
Schema paths:
Relative paths are resolved from
kafkalo.schema_dirin configAbsolute paths used as-is
Config example:
kafkalo:
input_dirs:
- "data/*.yaml"
schema_dir: "data/"
Compatibility modes¶
Standard Confluent modes:
BACKWARD: Consumers using new schema can read old dataBACKWARD_TRANSITIVE: Backward for all versionsFORWARD: Consumers using old schema can read new dataFORWARD_TRANSITIVE: Forward for all versionsFULL: Both backward and forwardFULL_TRANSITIVE: Full for all versionsNONE: No compatibility checks
Use BACKWARD or BACKWARD_TRANSITIVE for most use cases.
Schema registry config¶
In config.yaml:
connections:
schemaregistry:
url: "https://schema-registry:8081"
username: "user"
password: "pass"
caPath: "/path/to/ca.crt"
timeout: 10
skipRegistryForReads: false
skipRegistryForReads:Read
_schemastopic directly instead of REST API. Faster for large subject counts. Mutations still use REST API.
Schema file format¶
Standard Avro schema JSON:
{
"type": "record",
"name": "LoginEvent",
"namespace": "com.example.events",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "ip_address", "type": ["null", "string"], "default": null}
]
}
CLI commands¶
Check if schema exists¶
gafkalo --config config.yaml schema check-exists \
--subject events.user.login-value \
--schema-file schemas/login-event.avsc
Returns:
Schema ID
Version number
Whether schema is registered
Compare schemas¶
Diff local schema against registered version:
gafkalo --config config.yaml schema schema-diff \
--subject events.user.login-value \
--version 3 \
--schema-file schemas/login-event.avsc
Shows visual diff if schemas differ.
Workflow¶
Create/update Avro schema file
Reference in YAML topic definition
Run
gafkalo planto previewRun
gafkalo applyto register
Gafkalo handles:
Registering new schemas
Checking if schema already exists
Setting compatibility mode
Validating against compatibility rules
Example workflow¶
Step 1: Create schema
schemas/user-key.avsc:
{
"type": "record",
"name": "UserKey",
"namespace": "com.example.keys",
"fields": [
{"name": "user_id", "type": "string"}
]
}
schemas/login-event.avsc:
{
"type": "record",
"name": "LoginEvent",
"namespace": "com.example.events",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "success", "type": "boolean"}
]
}
Step 2: Define in YAML
data/topics.yaml:
topics:
- name: events.user.login
partitions: 6
replication_factor: 3
key:
schema: "schemas/user-key.avsc"
compatibility: BACKWARD
value:
schema: "schemas/login-event.avsc"
compatibility: BACKWARD
Step 3: Apply
gafkalo plan --config config.yaml
gafkalo apply --config config.yaml
Subject naming¶
Gafkalo uses TopicNameStrategy by default:
Key subject:
<topic-name>-keyValue subject:
<topic-name>-value
For events.user.login:
Key:
events.user.login-keyValue:
events.user.login-value
Schema evolution¶
Adding optional field (backward compatible)
{
"type": "record",
"name": "LoginEvent",
"namespace": "com.example.events",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "success", "type": "boolean"},
{"name": "device_type", "type": ["null", "string"], "default": null}
]
}
Removing field with default (backward compatible)
Remove field that has default value in previous schema.
Breaking changes
Removing field without default
Changing field type
Renaming field without alias
Test compatibility before applying.
Performance optimization¶
For clusters with many subjects (1000+):
connections:
schemaregistry:
skipRegistryForReads: true
timeout: 30
This consumes _schemas topic directly, building in-memory cache.
Mutations still use REST API for safety.
Best practices¶
Use
BACKWARDorBACKWARD_TRANSITIVEcompatibilityAlways provide defaults for optional fields
Use namespaces to organize schemas
Version schema files in git alongside YAML
Test schema changes in dev before prod
Use
gafkalo planto preview schema registration