ScyllaDB University LIVE, FREE Virtual Training Event | March 21
Register for Free
ScyllaDB Documentation Logo Documentation
  • Server
  • Cloud
  • Tools
    • ScyllaDB Manager
    • ScyllaDB Monitoring Stack
    • ScyllaDB Operator
  • Drivers
    • CQL Drivers
    • DynamoDB Drivers
  • Resources
    • ScyllaDB University
    • Community Forum
    • Tutorials
Download
ScyllaDB Docs ScyllaDB Open Source Getting Started ScyllaDB Integrations and Connectors Integrate ScyllaDB with Kafka Shard-Aware Kafka Connector for ScyllaDB Kafka Sink Connector Quickstart

Caution

You're viewing documentation for a previous version. Switch to the latest stable version.

Kafka Sink Connector Quickstart¶

Topic: Kafka Connector

Learn: how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB

Audience: Application Developer

Synopsis¶

This quickstart will show how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB.

Preliminary setup¶

  1. Using Docker, follow the instructions to launch ScyllaDB.

  2. Start the Docker container, replacing the --name and --host name parameters with your own information. For example:

    docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
    
  3. Run docker ps to show the exposed ports. The output should be similar to this example:

    docker ps
    CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                              NAMES
    26cc6d47efe3        replace-with-image-name   "/docker-entrypoint.…"   4 hours ago         Up 23 seconds       0.0.0.0:32777->1883/tcp, 0.0.0.0:32776->9001/tcp   anonymous_my_1
    
  4. Continue with either Confluent or Manual Installation.

Install using Confluent Platform¶

If you are new to Confluent, download Confluent Platform.

  1. In the Self managed software box, click DOWNLOAD FREE

  2. Fill in your email address.

  3. Open the Select Deployment Type drop-down and select ZIP.

  4. Accept the Terms & Conditions and click DOWNLOAD FREE.

  5. You will receive an email with instructions. Download / move the file to the desired location.

  6. Continue with the setup following this document.

Install Kafka Connector manually¶

  1. Navigate to the Kafka Connect Scylladb Sink github page and clone the repository.

  2. Using a terminal, open the source code (src) folder.

  3. Run the command mvn clean install.

  4. Run the Integration Tests in an IDE. If tests fail run mvn clean install -DskipTests.

    Note

    To run Integration Tests, there is no need to run Confluent. Use docker-compose.yml file in the github repository and run the following command (it contains images to run Kafka and other services):

    docker-compose -f docker-compose.yml up
    

    After completion of the above steps, a folder named components will be created in the target folder of the source code folder. The Connector jar files are present in {source-code-folder}/target/components/packages/[jar-files] Create a folder by the name of ScyllaDB-Sink-Connector and copy the jar files into it. Navigate to your Confluent Platform installation directory and place this folder in {confluent-directory}/share/java.

Add Sink Connector plugin¶

The ScyllaDB sink connector is used to publish records from a Kafka topic into ScyllaDB. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

  1. Run the following

    confluent local stop && confluent local start
    

    The output will be similar to:

    confluent local stop && confluent local start
    Starting zookeeper
    zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    
  2. Check if the kafka-connect-scylladb connector plugin has been installed correctly and picked up by the plugin loader:

    curl -sS localhost:8083/connector-plugins | jq .[].class | grep ScyllaDbSinkConnector
    

    Your output should resemble:

    io.connect.scylladb.ScyllaDbSinkConnector

Connector configuration¶

  1. Save the configuration settings in a file named kafka-connect-scylladb.json its contents should contain:

    {
         "name" : "scylladb-sink-connector",
         "config" : {
           "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
           "tasks.max" : "1",
           "topics" : "topic1,topic2,topic3",
           "scylladb.contact.points" : "scylladb-hosts",
           "scylladb.keyspace" : "test"
    }
    
  2. Load the connector. Run the following command:

    curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
    
  3. Update the configuration of the existing connector.

    curl -s -X PUT -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors/scylladb/config
    
  4. Once the Connector is up and running, use the command kafka-avro-console-producer to produce records(in AVRO format) into the Kafka topic.

    kafka-avro-console-producer
    --broker-list localhost:9092
    --topic example
    --property parse.key=true
    --property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}'
    --property "key.separator=$"
    --property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},
    {"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'
    {"id":1}${"id":1,"firstName":"first","lastName":"last"}
    
  5. Test ScyllaDB by running a select cql query:

    cqlsh>select * from demo.example;
     id | firstname | lastname
    ----+-----------+----------
      1 |     first |     last
    

ScyllaDB modes¶

There are two modes, Standalone and Distributed.

  • Standard - will use the properties based example.

  • Distributed - will use the JSON / REST examples.

Use this command to load the connector and connect to ScyllaDB instance without authentication:

curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors

Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.

Note

Each json record should consist of a schema and payload.

Distributed Mode JSON example¶

 {
  "name" : "scylladb-sink-connector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "scylladb-hosts",
    "scylladb.keyspace" : "test",
    "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter"
    "key.converter.schemas.enable" : "true",
    "value.converter.schemas.enable" : "true",

    "transforms" : "createKey",
    "transforms.createKey.fields" : "[field-you-want-as-primary-key-in-scylla]",
    "transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
  }
}

Standalone Mode JSON example¶

To load the connector in Standalone mode use:

confluent local load scylladb-sink-conector -- -d scylladb-sink-connector.properties

Use the following configuratopn settings:

connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

transforms=createKey
transforms.createKey.fields=[field-you-want-as-primary-key-in-scylla]
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

For Example:

kafka-console-producer --broker-list localhost:9092 --topic sample-topic
>{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"department"},"payload":{"id":10,"name":"John Doe10","department":"engineering"}}

Run the select cql query to view the data:

Select * from keyspace_name.topic-name;

Note

To publish records in Avro Format use the following properties:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Authentication¶

This example connects to a ScyllaDB instance with security enabled and username / password authentication.

Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.

Distributed Mode example¶

{
  "name" : "scylladbSinkConnector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "cassandra",
    "scylladb.keyspace" : "test",
    "scylladb.security.enabled" : "true",
    "scylladb.username" : "example",
    "scylladb.password" : "password",
    **add other properties same as in the above example**
  }
}

Standalone Mode example¶

connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
scylladb.ssl.enabled=true
scylladb.username=example
scylladb.password=password

Logging¶

To check logs for the Confluent Platform use:

confluent local log <service> -- [<argument>] --path <path-to-confluent>

To check logs for ScyllaDB:

docker logs some-scylla | tail

Additional information¶

  • Kafka Sink Connector Configuration

Was this page helpful?

PREVIOUS
Shard-Aware Kafka Connector for ScyllaDB
NEXT
Kafka Sink Connector Configuration
  • Create an issue
  • Edit this page

On this page

  • Kafka Sink Connector Quickstart
    • Synopsis
    • Preliminary setup
      • Install using Confluent Platform
      • Install Kafka Connector manually
    • Add Sink Connector plugin
    • Connector configuration
    • ScyllaDB modes
      • Distributed Mode JSON example
      • Standalone Mode JSON example
    • Authentication
      • Distributed Mode example
      • Standalone Mode example
    • Logging
    • Additional information
ScyllaDB Open Source
  • 6.2
    • master
    • 6.2
    • 6.1
    • 6.0
    • 5.4
    • 5.2
    • 5.1
  • Getting Started
    • Install ScyllaDB
      • Launch ScyllaDB on AWS
      • Launch ScyllaDB on GCP
      • Launch ScyllaDB on Azure
      • ScyllaDB Web Installer for Linux
      • Install ScyllaDB Linux Packages
      • Install scylla-jmx Package
      • Run ScyllaDB in Docker
      • Install ScyllaDB Without root Privileges
      • Air-gapped Server Installation
      • ScyllaDB Housekeeping and how to disable it
      • ScyllaDB Developer Mode
    • Configure ScyllaDB
    • ScyllaDB Configuration Reference
    • ScyllaDB Requirements
      • System Requirements
      • OS Support by Linux Distributions and Version
      • Cloud Instance Recommendations
      • ScyllaDB in a Shared Environment
    • Migrate to ScyllaDB
      • Migration Process from Cassandra to ScyllaDB
      • ScyllaDB and Apache Cassandra Compatibility
      • Migration Tools Overview
    • Integration Solutions
      • Integrate ScyllaDB with Spark
      • Integrate ScyllaDB with KairosDB
      • Integrate ScyllaDB with Presto
      • Integrate ScyllaDB with Elasticsearch
      • Integrate ScyllaDB with Kubernetes
      • Integrate ScyllaDB with the JanusGraph Graph Data System
      • Integrate ScyllaDB with DataDog
      • Integrate ScyllaDB with Kafka
      • Integrate ScyllaDB with IOTA Chronicle
      • Integrate ScyllaDB with Spring
      • Shard-Aware Kafka Connector for ScyllaDB
      • Install ScyllaDB with Ansible
      • Integrate ScyllaDB with Databricks
      • Integrate ScyllaDB with Jaeger Server
      • Integrate ScyllaDB with MindsDB
    • Tutorials
  • ScyllaDB for Administrators
    • Administration Guide
    • Procedures
      • Cluster Management
      • Backup & Restore
      • Change Configuration
      • Maintenance
      • Best Practices
      • Benchmarking ScyllaDB
      • Migrate from Cassandra to ScyllaDB
      • Disable Housekeeping
    • Security
      • ScyllaDB Security Checklist
      • Enable Authentication
      • Enable and Disable Authentication Without Downtime
      • Creating a Custom Superuser
      • Generate a cqlshrc File
      • Reset Authenticator Password
      • Enable Authorization
      • Grant Authorization CQL Reference
      • Certificate-based Authentication
      • Role Based Access Control (RBAC)
      • Encryption: Data in Transit Client to Node
      • Encryption: Data in Transit Node to Node
      • Generating a self-signed Certificate Chain Using openssl
      • Configure SaslauthdAuthenticator
    • Admin Tools
      • Nodetool Reference
      • CQLSh
      • Admin REST API
      • Tracing
      • ScyllaDB SStable
      • ScyllaDB Types
      • SSTableLoader
      • cassandra-stress
      • SSTabledump
      • SSTableMetadata
      • ScyllaDB Logs
      • Seastar Perftune
      • Virtual Tables
      • Reading mutation fragments
      • Maintenance socket
      • Maintenance mode
      • Task manager
    • ScyllaDB Monitoring Stack
    • ScyllaDB Operator
    • ScyllaDB Manager
    • Upgrade Procedures
      • ScyllaDB Versioning
      • ScyllaDB Open Source Upgrade
      • ScyllaDB Open Source to ScyllaDB Enterprise Upgrade
      • ScyllaDB Image
      • ScyllaDB Enterprise
    • System Configuration
      • System Configuration Guide
      • scylla.yaml
      • ScyllaDB Snitches
    • Benchmarking ScyllaDB
    • ScyllaDB Diagnostic Tools
  • ScyllaDB for Developers
    • Develop with ScyllaDB
    • Tutorials and Example Projects
    • Learn to Use ScyllaDB
    • ScyllaDB Alternator
    • ScyllaDB Drivers
      • ScyllaDB CQL Drivers
      • ScyllaDB DynamoDB Drivers
  • CQL Reference
    • CQLSh: the CQL shell
    • Appendices
    • Compaction
    • Consistency Levels
    • Consistency Level Calculator
    • Data Definition
    • Data Manipulation
      • SELECT
      • INSERT
      • UPDATE
      • DELETE
      • BATCH
    • Data Types
    • Definitions
    • Global Secondary Indexes
    • Expiring Data with Time to Live (TTL)
    • Functions
    • Wasm support for user-defined functions
    • JSON Support
    • Materialized Views
    • Non-Reserved CQL Keywords
    • Reserved CQL Keywords
    • Service Levels
    • ScyllaDB CQL Extensions
  • Alternator: DynamoDB API in Scylla
    • Getting Started With ScyllaDB Alternator
    • ScyllaDB Alternator for DynamoDB users
  • Features
    • Lightweight Transactions
    • Global Secondary Indexes
    • Local Secondary Indexes
    • Materialized Views
    • Counters
    • Change Data Capture
      • CDC Overview
      • The CDC Log Table
      • Basic operations in CDC
      • CDC Streams
      • CDC Stream Generations
      • Querying CDC Streams
      • Advanced column types
      • Preimages and postimages
      • Data Consistency in CDC
    • Workload Attributes
  • ScyllaDB Architecture
    • Data Distribution with Tablets
    • ScyllaDB Ring Architecture
    • ScyllaDB Fault Tolerance
    • Consistency Level Console Demo
    • ScyllaDB Anti-Entropy
      • ScyllaDB Hinted Handoff
      • ScyllaDB Read Repair
      • ScyllaDB Repair
    • SSTable
      • ScyllaDB SSTable - 2.x
      • ScyllaDB SSTable - 3.x
    • Compaction Strategies
    • Raft Consensus Algorithm in ScyllaDB
    • Zero-token Nodes
  • Troubleshooting ScyllaDB
    • Errors and Support
      • Report a ScyllaDB problem
      • Error Messages
      • Change Log Level
    • ScyllaDB Startup
      • Ownership Problems
      • ScyllaDB will not Start
      • ScyllaDB Python Script broken
    • Upgrade
      • Inaccessible configuration files after ScyllaDB upgrade
    • Cluster and Node
      • Handling Node Failures
      • Failure to Add, Remove, or Replace a Node
      • Failed Decommission Problem
      • Cluster Timeouts
      • Node Joined With No Data
      • NullPointerException
      • Failed Schema Sync
    • Data Modeling
      • ScyllaDB Large Partitions Table
      • ScyllaDB Large Rows and Cells Table
      • Large Partitions Hunting
      • Failure to Update the Schema
    • Data Storage and SSTables
      • Space Utilization Increasing
      • Disk Space is not Reclaimed
      • SSTable Corruption Problem
      • Pointless Compactions
      • Limiting Compaction
    • CQL
      • Time Range Query Fails
      • COPY FROM Fails
      • CQL Connection Table
    • ScyllaDB Monitor and Manager
      • Manager and Monitoring integration
      • Manager lists healthy nodes as down
    • Installation and Removal
      • Removing ScyllaDB on Ubuntu breaks system packages
  • Knowledge Base
    • Upgrading from experimental CDC
    • Compaction
    • Consistency in ScyllaDB
    • Counting all rows in a table is slow
    • CQL Query Does Not Display Entire Result Set
    • When CQLSh query returns partial results with followed by “More”
    • Run ScyllaDB and supporting services as a custom user:group
    • Customizing CPUSET
    • Decoding Stack Traces
    • Snapshots and Disk Utilization
    • DPDK mode
    • Debug your database with Flame Graphs
    • How to Change gc_grace_seconds for a Table
    • Gossip in ScyllaDB
    • Increase Permission Cache to Avoid Non-paged Queries
    • How does ScyllaDB LWT Differ from Apache Cassandra ?
    • Map CPUs to ScyllaDB Shards
    • ScyllaDB Memory Usage
    • NTP Configuration for ScyllaDB
    • Updating the Mode in perftune.yaml After a ScyllaDB Upgrade
    • POSIX networking for ScyllaDB
    • ScyllaDB consistency quiz for administrators
    • Recreate RAID devices
    • How to Safely Increase the Replication Factor
    • ScyllaDB and Spark integration
    • Increase ScyllaDB resource limits over systemd
    • ScyllaDB Seed Nodes
    • How to Set up a Swap Space
    • ScyllaDB Snapshots
    • ScyllaDB payload sent duplicated static columns
    • Stopping a local repair
    • System Limits
    • How to flush old tombstones from a table
    • Time to Live (TTL) and Compaction
    • ScyllaDB Nodes are Unresponsive
    • Update a Primary Key
    • Using the perf utility with ScyllaDB
    • Configure ScyllaDB Networking with Multiple NIC/IP Combinations
  • Reference
    • AWS Images
    • Azure Images
    • GCP Images
    • Configuration Parameters
    • Glossary
    • Limits
    • API Reference (BETA)
    • Metrics (BETA)
  • ScyllaDB FAQ
  • Contribute to ScyllaDB
Docs Tutorials University Contact Us About Us
© 2025, ScyllaDB. All rights reserved. | Terms of Service | Privacy Policy | ScyllaDB, and ScyllaDB Cloud, are registered trademarks of ScyllaDB, Inc.
Last updated on 08 May 2025.
Powered by Sphinx 7.4.7 & ScyllaDB Theme 1.8.6