6. Use Kafka Connect for ETL¶
This is a part of the Charmed Apache Kafka Tutorial.
In this part of the tutorial, we are going to use Kafka Connect, an ETL framework on top of Apache Kafka, to seamlessly move data between different charmed database technologies.
We will follow a step-by-step process for moving data between Canonical Data Platform charms using Kafka Connect. Specifically, we will showcase a particular use-case of loading data from a relational database, (PostgreSQL), to a document store and search engine (OpenSearch), entirely using charmed solutions.
By the end, you should be able to use Kafka Connect integrator and Kafka Connect charms to streamline data ETL tasks on Canonical Data Platform charmed solutions.
Prerequisites¶
We will be deploying different charmed data solutions including PostgreSQL and OpenSearch. If you require more information or face issues deploying any of the mentioned products, you should consult the respective documentations:
For PostgreSQL, refer to Charmed PostgreSQL tutorial.
For OpenSearch, refer to Charmed OpenSearch tutorial.
Check current deployment¶
Up to this point, we should have three units of Charmed Apache Kafka application. Check the current status of the Juju model:
juju status
Output example
Model Controller Cloud/Region Version SLA Timestamp
tutorial overlord localhost/localhost 3.6.20 unsupported 18:27:29Z
App Version Status Scale Charm Channel Rev Exposed Message
data-integrator active 1 data-integrator latest/stable 362 no
kafka 4.1.1 active 3 kafka 4/stable 248 no
kraft 4.1.1 active 3 kafka 4/stable 248 no
self-signed-certificates active 1 self-signed-certificates 1/stable 317 no
Unit Workload Agent Machine Public address Ports Message
data-integrator/0* active idle 6 10.157.174.36
kafka/0* active idle 0 10.157.174.225 9093,19093/tcp
kafka/1 active idle 1 10.157.174.62 9093,19093/tcp
kafka/2 active idle 2 10.157.174.59 9093,19093/tcp
kraft/0* active idle 3 10.157.174.228 9098/tcp
kraft/1 active idle 4 10.157.174.127 9098/tcp
kraft/2 active idle 5 10.157.174.24 9098/tcp
self-signed-certificates/0* active idle 8 10.157.174.248
Machine State Address Inst id Base AZ Message
0 started 10.157.174.225 juju-29b29f-0 ubuntu@24.04 kafka-test Running
1 started 10.157.174.62 juju-29b29f-1 ubuntu@24.04 kafka-test Running
2 started 10.157.174.59 juju-29b29f-2 ubuntu@24.04 kafka-test Running
3 started 10.157.174.228 juju-29b29f-3 ubuntu@24.04 kafka-test Running
4 started 10.157.174.127 juju-29b29f-4 ubuntu@24.04 kafka-test Running
5 started 10.157.174.24 juju-29b29f-5 ubuntu@24.04 kafka-test Running
6 started 10.157.174.36 juju-29b29f-6 ubuntu@24.04 kafka-test Running
8 started 10.157.174.248 juju-29b29f-8 ubuntu@24.04 kafka-test Running
Set the necessary kernel properties for OpenSearch¶
Since we will be deploying the OpenSearch charm, we need to make necessary kernel configurations required for OpenSearch charm to function properly, described in detail here. This basically means running the following commands:
sudo tee -a /etc/sysctl.conf > /dev/null <<EOT
vm.max_map_count=262144
vm.swappiness=0
net.ipv4.tcp_retries2=5
fs.file-max=1048576
EOT
sudo sysctl -p
Next, we should set the required model parameters using the juju model-config command:
cat <<EOF > cloudinit-userdata.yaml
cloudinit-userdata: |
postruncmd:
- [ 'echo', 'vm.max_map_count=262144', '>>', '/etc/sysctl.conf' ]
- [ 'echo', 'vm.swappiness=0', '>>', '/etc/sysctl.conf' ]
- [ 'echo', 'net.ipv4.tcp_retries2=5', '>>', '/etc/sysctl.conf' ]
- [ 'echo', 'fs.file-max=1048576', '>>', '/etc/sysctl.conf' ]
- [ 'sysctl', '-p' ]
EOF
juju model-config --file=./cloudinit-userdata.yaml
Deploy the databases and Kafka Connect charms¶
Deploy the PostgreSQL, OpenSearch, and Kafka Connect charms:
juju deploy kafka-connect --channel edge
juju deploy postgresql --channel 14/stable
juju deploy opensearch --channel 2/stable --config profile=testing
OpenSearch charm requires a TLS relation to become active.
We will use the self-signed-certificates charm
that was deployed earlier in the
Enable Encryption part of this Tutorial.
Enable TLS¶
Using the juju status command, you should see that the Kafka Connect and OpenSearch applications
are in blocked state. In order to activate them, we need to set up necessary integrations.
First, activate the OpenSearch application by integrating it with the TLS operator:
juju integrate opensearch self-signed-certificates
Then, activate the Kafka Connect application by integrating it with the Apache Kafka application:
juju integrate kafka kafka-connect
Finally, since we will be using TLS on the Kafka Connect interface, integrate the Kafka Connect application with the TLS operator:
juju integrate kafka-connect self-signed-certificates
Use the watch juju status --color command to continuously probe your model’s status.
After a couple of minutes, all the applications should be in active/idle state.
Output example
Model Controller Cloud/Region Version SLA Timestamp
tutorial overlord localhost/localhost 3.6.20 unsupported 18:51:59Z
App Version Status Scale Charm Channel Rev Exposed Message
data-integrator active 1 data-integrator latest/stable 362 no
kafka 4.1.1 active 3 kafka 4/stable 248 no
kafka-connect active 1 kafka-connect latest/edge 30 no
kraft 4.1.1 active 3 kafka 4/stable 248 no
opensearch active 1 opensearch 2/stable 314 no
postgresql 14.20 active 1 postgresql 14/stable 987 no
self-signed-certificates active 1 self-signed-certificates 1/stable 317 no
Unit Workload Agent Machine Public address Ports Message
data-integrator/0* active idle 6 10.157.174.36
kafka-connect/0* active idle 10 10.157.174.69 8083/tcp
kafka/0* active idle 0 10.157.174.225 9092,19093/tcp
kafka/1 active idle 1 10.157.174.62 9092,19093/tcp
kafka/2 active idle 2 10.157.174.59 9092,19093/tcp
kraft/0* active idle 3 10.157.174.228 9098/tcp
kraft/1 active idle 4 10.157.174.127 9098/tcp
kraft/2 active idle 5 10.157.174.24 9098/tcp
opensearch/0* active idle 12 10.157.174.204 9200/tcp
postgresql/0* active idle 11 10.157.174.208 5432/tcp Primary
self-signed-certificates/0* active idle 8 10.157.174.248
Machine State Address Inst id Base AZ Message
0 started 10.157.174.225 juju-29b29f-0 ubuntu@24.04 kafka-test Running
1 started 10.157.174.62 juju-29b29f-1 ubuntu@24.04 kafka-test Running
2 started 10.157.174.59 juju-29b29f-2 ubuntu@24.04 kafka-test Running
3 started 10.157.174.228 juju-29b29f-3 ubuntu@24.04 kafka-test Running
4 started 10.157.174.127 juju-29b29f-4 ubuntu@24.04 kafka-test Running
5 started 10.157.174.24 juju-29b29f-5 ubuntu@24.04 kafka-test Running
6 started 10.157.174.36 juju-29b29f-6 ubuntu@24.04 kafka-test Running
8 started 10.157.174.248 juju-29b29f-8 ubuntu@24.04 kafka-test Running
10 started 10.157.174.69 juju-29b29f-10 ubuntu@22.04 kafka-test Running
11 started 10.157.174.208 juju-29b29f-11 ubuntu@22.04 kafka-test Running
12 started 10.157.174.204 juju-29b29f-12 ubuntu@24.04 kafka-test Running
Load test data¶
In a real-world scenario, an application would typically write data to a PostgreSQL database.
However, for the purposes of this tutorial, we’ll generate test data using a simple SQL script
and load it into a PostgreSQL database using the psql command-line tool included with
the PostgreSQL charm.
Note
For more information on how to access a PostgreSQL database in the PostgreSQL charm, refer to Access PostgreSQL page of the Charmed PostgreSQL tutorial.
First, create a SQL script by running the following command:
cat <<EOF > /tmp/populate.sql
CREATE TABLE posts (
id serial not null primary key,
content text not null,
likes int default null,
created_at timestamp with time zone not null default now()
);
INSERT INTO posts (content, likes)
VALUES
(
'Charmed Apache Kafka is an open-source operator that makes it easier to manage Apache Kafka, with built-in support for enterprise features.',
150
),
(
'Apache Kafka is a free, open-source software project by the Apache Software Foundation. Users can find out more at the Apache Kafka project page.',
200
),
(
'Charmed Apache Kafka is built on top of Juju and reliably simplifies the deployment, scaling, design, and management of Apache Kafka in production',
100
),
(
'Charmed Apache Kafka is a solution designed and developed to help ops teams and administrators automate Apache Kafka operations from Day 0 to Day 2, across multiple cloud environments and platforms.',
1000
),
(
'Charmed Apache Kafka is developed and supported by Canonical, as part of its commitment to provide open-source, self-driving solutions, seamlessly integrated using the Operator Framework Juju. Please refer to Charmhub, for more charmed operators that can be integrated by Juju.',
60
);
EOF
Next, copy the populate.sql script to the PostgreSQL unit using the juju scp command:
juju scp /tmp/populate.sql postgresql/0:/home/ubuntu/populate.sql
Then, retrieve the password for the operator user on the PostgreSQL database using
the get-password action:
juju run postgresql/leader get-password
See PostgreSQL tutorial for more guidance if needed.
Output example
As a result, you should see output with the password:
...
password: bQOUgw8ZZgUyPA6n
Make note of the password, and use juju ssh to connect to the PostgreSQL unit:
juju ssh postgresql/leader
Once connected to the unit, use the psql command line tool with the operator
user credentials, to create the database named tutorial:
psql --host $(hostname -i) --username operator --password --dbname postgres \
-c "CREATE DATABASE tutorial"
You will be prompted for the password, which you have obtained previously.
Now, we can use the populate.sql script copied earlier into the PostgreSQL unit,
to create a table named posts with some test data:
cat populate.sql | \
psql --host $(hostname -i) --username operator --password --dbname tutorial
To ensure that the test data is loaded successfully into the posts table:
psql --host $(hostname -i) --username operator --password --dbname tutorial \
-c 'SELECT COUNT(*) FROM posts'
The output should indicate that the posts table has five rows now:
count
-------
5
(1 row)
Log out from the PostgreSQL unit using exit command or the Ctrl+D keyboard shortcut.
Deploy and integrate the postgresql-connect-integrator charm¶
Now that you have sample data loaded into PostgreSQL, it is time to deploy
the postgresql-connect-integrator charm to enable integration of PostgreSQL
and Kafka Connect applications.
First, deploy the charm in source mode using the juju deploy command and provide
the minimum necessary configurations:
juju deploy postgresql-connect-integrator \
--channel edge \
--config mode=source \
--config db_name=tutorial \
--config topic_prefix=etl_
Each Kafka Connect integrator application needs at least two relations:
with the Kafka Connect
with a Database charm (e.g. MySQL, PostgreSQL, OpenSearch, etc.)
Integrate both Kafka Connect and PostgreSQL with the postgresql-connect-integrator charm:
juju integrate postgresql-connect-integrator postgresql
juju integrate postgresql-connect-integrator kafka-connect
After a couple of minutes, juju status command should show the
postgresql-connect-integrator in active/idle state, with a message indicating
that the ETL task is running:
...
postgresql-connect-integrator active 1 postgresql-connect-integrator latest/edge 13 no Task Status: RUNNING
...
This means that the integrator application is actively copying data from the source database
(named tutorial) into Apache Kafka topics prefixed with etl_.
For example, rows in the posts table will be published into the Apache Kafka topic
named etl_posts.
Deploy and integrate the opensearch-connect-integrator charm¶
You are almost done with the ETL task, the only remaining part is to move data from Apache Kafka
to OpenSearch.
To do that, deploy another Kafka Connect integrator named opensearch-connect-integrator
in the sink mode:
juju deploy opensearch-connect-integrator \
--channel edge \
--config mode=sink \
--config topics="etl_posts"
The above command deploys an integrator application to move messages from the etl_posts topic
to the index in OpenSearch named etl_posts.
And the etl_posts topic is filled by the postgresql-connect-integrator charm
we deployed earlier.
To activate the opensearch-connect-integrator, make the necessary integrations:
juju integrate opensearch-connect-integrator opensearch
juju integrate opensearch-connect-integrator kafka-connect
Wait a couple of minutes and run juju status, now both opensearch-connect-integrator
and postgresql-connect-integrator applications should be in active/idle state,
showing a message indicating that the ETL task is running:
...
opensearch-connect-integrator/0* active idle 14 10.157.174.70 8080/tcp Task Status: RUNNING
postgresql-connect-integrator/0* active idle 13 10.157.174.173 8080/tcp Task Status: RUNNING
...
Verify data transfer¶
Now it’s time to verify that the data is being copied from the PostgreSQL database to the OpenSearch index. We can use the OpenSearch REST API for that purpose.
First, retrieve the admin user credentials for OpenSearch using get-password action:
juju run opensearch/leader get-password
As a result, you should see output similar to the following:
...
password: HTLPVZTzZPYhdrXyH3u8jvw42H9pWN4H
username: admin
Then, retrieve the OpenSearch unit IP and save it into an environment variable:
OPENSEARCH_IP=$(juju ssh opensearch/0 'hostname -i' | tr -d '\r\n')
Using the password obtained above, send a request to the topic’s _search endpoint,
either using your browser or curl:
curl -u admin:<admin-password> -k -sS "https://${OPENSEARCH_IP}:9200/etl_posts/_search?pretty=true"
As a result you get a JSON response containing the search results, which should have five documents.
The hits.total value should be 5, as shown in the output example below:
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
...
]
}
}
Now let’s insert a new post into the PostgreSQL database.
Get the password for the operator built-in user again:
juju run postgresql/leader get-password
SSH to the PostgreSQL leader unit:
juju ssh postgresql/leader
Then, insert a new post using following command and the password for the operator user
on the PostgreSQL:
psql --host $(hostname -i) --username operator --password --dbname tutorial -c \
"INSERT INTO posts (content, likes) VALUES ('my new post', 1)"
Log out from the PostgreSQL unit using exit command or the Ctrl+D keyboard shortcut.
Then, check that the data is automatically copied to the OpenSearch index:
curl -u admin:<admin-password> -k -sS "https://${OPENSEARCH_IP}:9200/etl_posts/_search?pretty=true"
Which now should have six hits (output is truncated):
{
...
"hits" : {
"total" : {
"value" : 6,
"relation" : "eq"
}
}
...
}
Congratulations! You have successfully completed an ETL job that continuously moves data from PostgreSQL to OpenSearch, using entirely charmed solutions.