Was this page helpful?
Caution
You're viewing documentation for a previous version. Switch to the latest stable version.
Kafka Sink Connector Quickstart¶
Topic: Kafka Connector
Learn: how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB
Audience: Application Developer
Synopsis¶
This quickstart will show how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB.
Preliminary setup¶
- Using Docker, follow the instructions to launch ScyllaDB. 
- Start the Docker container, replacing the - --nameand- --host nameparameters with your own information. For example:- docker run --name some-scylla --hostname some-scylla -d scylladb/scylla 
- Run - docker psto show the exposed ports. The output should be similar to this example:- docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 26cc6d47efe3 replace-with-image-name "/docker-entrypoint.…" 4 hours ago Up 23 seconds 0.0.0.0:32777->1883/tcp, 0.0.0.0:32776->9001/tcp anonymous_my_1 
- Continue with either Confluent or Manual Installation. 
Install using Confluent Platform¶
If you are new to Confluent, download Confluent Platform.
- In the Self managed software box, click DOWNLOAD FREE 
- Fill in your email address. 
- Open the Select Deployment Type drop-down and select ZIP. 
- Accept the Terms & Conditions and click DOWNLOAD FREE. 
- You will receive an email with instructions. Download / move the file to the desired location. 
- Continue with the setup following this document. 
Install Kafka Connector manually¶
- Navigate to the Kafka Connect Scylladb Sink github page and clone the repository. 
- Using a terminal, open the source code (src) folder. 
- Run the command - mvn clean install.
- Run the Integration Tests in an IDE. If tests fail run - mvn clean install -DskipTests.- Note - To run Integration Tests, there is no need to run Confluent. Use docker-compose.yml file in the github repository and run the following command (it contains images to run Kafka and other services): - docker-compose -f docker-compose.yml up - After completion of the above steps, a folder named - componentswill be created in the target folder of the source code folder. The Connector jar files are present in- {source-code-folder}/target/components/packages/[jar-files]Create a folder by the name of- ScyllaDB-Sink-Connectorand copy the jar files into it. Navigate to your Confluent Platform installation directory and place this folder in- {confluent-directory}/share/java.
Add Sink Connector plugin¶
The ScyllaDB sink connector is used to publish records from a Kafka topic into ScyllaDB. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.
- Run the following - confluent local stop && confluent local start - The output will be similar to: - confluent local stop && confluent local start Starting zookeeper zookeeper is [UP] Starting Kafka Kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] 
- Check if the kafka-connect-scylladb connector plugin has been installed correctly and picked up by the plugin loader: - curl -sS localhost:8083/connector-plugins | jq .[].class | grep ScyllaDbSinkConnector - Your output should resemble: - io.connect.scylladb.ScyllaDbSinkConnector
Connector configuration¶
- Save the configuration settings in a file named - kafka-connect-scylladb.jsonits contents should contain:- { "name" : "scylladb-sink-connector", "config" : { "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector", "tasks.max" : "1", "topics" : "topic1,topic2,topic3", "scylladb.contact.points" : "scylladb-hosts", "scylladb.keyspace" : "test" } 
- Load the connector. Run the following command: - curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors 
- Update the configuration of the existing connector. - curl -s -X PUT -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors/scylladb/config 
- Once the Connector is up and running, use the command - kafka-avro-console-producerto produce records(in AVRO format) into the Kafka topic.- kafka-avro-console-producer --broker-list localhost:9092 --topic example --property parse.key=true --property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}' --property "key.separator=$" --property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"}, {"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}' {"id":1}${"id":1,"firstName":"first","lastName":"last"}
- Test ScyllaDB by running a select cql query: - cqlsh>select * from demo.example; id | firstname | lastname ----+-----------+---------- 1 | first | last 
ScyllaDB modes¶
There are two modes, Standalone and Distributed.
- Standard - will use the properties based example. 
- Distributed - will use the JSON / REST examples. 
Use this command to load the connector and connect to ScyllaDB instance without authentication:
curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.
Note
Each json record should consist of a schema and payload.
Distributed Mode JSON example¶
 {
  "name" : "scylladb-sink-connector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "scylladb-hosts",
    "scylladb.keyspace" : "test",
    "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter"
    "key.converter.schemas.enable" : "true",
    "value.converter.schemas.enable" : "true",
    "transforms" : "createKey",
    "transforms.createKey.fields" : "[field-you-want-as-primary-key-in-scylla]",
    "transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
  }
}
Standalone Mode JSON example¶
To load the connector in Standalone mode use:
confluent local load scylladb-sink-conector -- -d scylladb-sink-connector.properties
Use the following configuratopn settings:
connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
transforms=createKey
transforms.createKey.fields=[field-you-want-as-primary-key-in-scylla]
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
For Example:
kafka-console-producer --broker-list localhost:9092 --topic sample-topic
>{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"department"},"payload":{"id":10,"name":"John Doe10","department":"engineering"}}
Run the select cql query to view the data:
Select * from keyspace_name.topic-name;
Note
To publish records in Avro Format use the following properties:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Authentication¶
This example connects to a ScyllaDB instance with security enabled and username / password authentication.
Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.
Distributed Mode example¶
{
  "name" : "scylladbSinkConnector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "cassandra",
    "scylladb.keyspace" : "test",
    "scylladb.security.enabled" : "true",
    "scylladb.username" : "example",
    "scylladb.password" : "password",
    **add other properties same as in the above example**
  }
}
Standalone Mode example¶
connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
scylladb.ssl.enabled=true
scylladb.username=example
scylladb.password=password
Logging¶
To check logs for the Confluent Platform use:
confluent local log <service> -- [<argument>] --path <path-to-confluent>
To check logs for ScyllaDB:
docker logs some-scylla | tail