Maintaining Connectors¶
Manage Kafka Connect connectors via CLI and YAML.
Configuration¶
In config.yaml:
connections:
connect:
url: "https://connect:8083"
username: "connect-user"
password: "connect-pass"
caPath: "/path/to/ca.crt"
skipVerify: false
CLI commands¶
Create connector¶
gafkalo --config config.yaml connect create connector-definition.json
Connector definition (JSON):
{
"name": "postgres-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "user",
"connection.password": "pass",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-"
}
}
List connectors¶
gafkalo --config config.yaml connect list
Output:
+---+------------------+
| # | CONNECTOR NAME |
+---+------------------+
| 0 | postgres-source |
| 1 | s3-sink |
+---+------------------+
Describe connector¶
gafkalo --config config.yaml connect describe postgres-source
Output:
Connector configuration (all key-value pairs)
Task status (ID, status, worker, running state)
Delete connector¶
gafkalo --config config.yaml connect delete postgres-source
Heal connector¶
Restart failed connector and tasks:
gafkalo --config config.yaml connect heal postgres-source
Checks status and restarts any task not in RUNNING state.
Health check¶
Check all connectors:
gafkalo --config config.yaml connect health-check
Reports any connectors or tasks not in healthy state.
YAML definition¶
Define connectors in YAML (applied via gafkalo apply):
connectors:
- name: postgres-source
config:
connector.class: io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max: 1
connection.url: jdbc:postgresql://db:5432/mydb
connection.user: user
connection.password: pass
table.whitelist: users,orders
mode: incrementing
incrementing.column.name: id
topic.prefix: postgres-
- name: s3-sink
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
tasks.max: 3
topics: events.orders,events.payments
s3.bucket.name: kafka-events
s3.region: us-east-1
flush.size: 1000
Apply:
gafkalo plan --config config.yaml
gafkalo apply --config config.yaml
Common connector types¶
JDBC Source¶
connectors:
- name: mysql-source
config:
connector.class: io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max: 1
connection.url: jdbc:mysql://db:3306/mydb
connection.user: user
connection.password: pass
mode: timestamp+incrementing
timestamp.column.name: updated_at
incrementing.column.name: id
topic.prefix: mysql-
S3 Sink¶
connectors:
- name: s3-sink
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
tasks.max: 3
topics.regex: events\..*
s3.bucket.name: data-lake
s3.region: us-west-2
flush.size: 1000
rotate.interval.ms: 3600000
format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
Elasticsearch Sink¶
connectors:
- name: elasticsearch-sink
config:
connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max: 2
topics: events.search
connection.url: https://es:9200
connection.username: elastic
connection.password: pass
type.name: _doc
key.ignore: false
Replicator¶
connectors:
- name: replicator-prod-to-dr
config:
connector.class: io.confluent.connect.replicator.ReplicatorSourceConnector
tasks.max: 4
topic.regex: events\..*
topic.rename.format: ${topic}.replica
src.kafka.bootstrap.servers: prod:9092
dest.kafka.bootstrap.servers: dr:9092
src.kafka.security.protocol: SASL_SSL
dest.kafka.security.protocol: SASL_SSL
Best practices¶
Use
tasks.maxappropriate for workloadEnable error handling and DLQ
Monitor connector and task status
Use
healcommand in runbooksVersion connector configs in git
Test in dev before prod
Error handling¶
Enable error tolerance:
connectors:
- name: s3-sink
config:
connector.class: io.confluent.connect.s3.S3SinkConnector
errors.tolerance: all
errors.log.enable: true
errors.log.include.messages: true
errors.deadletterqueue.topic.name: dlq.s3-sink
errors.deadletterqueue.topic.replication.factor: 3
Monitoring¶
Use health-check in monitoring:
#!/bin/bash
gafkalo --config prod.yaml connect health-check
if [ $? -ne 0 ]; then
alert "Connect cluster has failed connectors"
fi
Or check specific connector:
status=$(gafkalo --config prod.yaml connect describe my-connector | grep Status)
if [[ ! "$status" =~ "RUNNING" ]]; then
gafkalo --config prod.yaml connect heal my-connector
fi
Troubleshooting¶
Connector fails to start
gafkalo --config config.yaml connect describe connector-name
Check task status and error messages.
Tasks in FAILED state
gafkalo --config config.yaml connect heal connector-name
Configuration issues
Validate connector config via Connect REST API:
curl -X PUT https://connect:8083/connector-plugins/JdbcSourceConnector/config/validate \
-H "Content-Type: application/json" \
-d @connector-config.json
Limitations¶
No connector deletion via YAML apply (use CLI)
No custom validation
No converter/transform validation
For complex deployments, consider Confluent Control Center or custom automation.
Security¶
Hide sensitive config keys in output. See config:Hiding sensitive keys.