Quickstart for Confluent Cloud

This page includes instructions on the usage of a third-party platform, which may be subject to changes beyond our control. In case of doubt, refer to the third-party platform documentation.

Neo4j supports the Confluent Cloud with the Neo4j Connector for Confluent running as a Custom Connector. Confluent’s Custom Connectors provide a way to extend the Confluent Cloud beyond the fully managed connectors available on their platform.

Prerequisites

cc cluster
Figure 1. A running cluster inside Confluent Cloud

Upload Custom Connectors

Before creating our connector instances, we first need to define Neo4j Connector for Kafka as Custom Connectors.

Source

  1. Select the cluster you want to install the connector into, open the Connectors section and click Add plugin.

  2. Click Add Plugin and fill in the details for the new custom connector as shown below, then accept the conditions and click Submit.

    Connector plugin name

    Neo4j Connector for Confluent Source

    Custom plugin description

    Neo4j Connector for Confluent Source plugin as a custom connector.

    Connector class

    org.neo4j.connectors.kafka.source.Neo4jConnector

    Connector type

    Source

    Connector archive

    Grab latest Confluent Hub Component archive package by following distribution instructions and select the downloaded neo4j-kafka-connector-5.1.5.zip file from your local computer.

    Sensitive properties

    In order for sensitive configuration properties to be protected in your connector instances, you should mark at least the following configuration properties as sensitive.

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

It will upload the archive and create the Source plugin.

Sink

  1. Select the cluster you want to install the connector into, open the Connectors section and click Add plugin.

  2. Click Add Plugin and fill in the details for the new custom connector as shown below, then accept the conditions and click Submit.

    Connector plugin name

    Neo4j Connector for Confluent Sink

    Custom plugin description

    Neo4j Connector for Confluent Sink plugin as a custom connector.

    Connector class

    org.neo4j.connectors.kafka.sink.Neo4jConnector

    Connector type

    Sink

    Connector archive

    Grab latest Confluent Hub Component archive package by following distribution instructions and select the downloaded neo4j-kafka-connector-5.1.5.zip file from your local computer.

    Sensitive properties

    In order for sensitive configuration properties to be protected in your connector instances, you should mark at least the following configuration properties as sensitive.

    neo4j.authentication.basic.password
    neo4j.authentication.kerberos.ticket
    neo4j.authentication.bearer.token
    neo4j.authentication.custom.credentials

It will upload the archive and create the Sink plugin.

Custom Connectors are Kafka Connect plugins created by users, modified open-source connector plugins, or third-party connector plugins like the Neo4j Connector for Confluent. Find out more about Custom Connectors in the Confluent documentation and read our blog post which includes a worked example on how to set this up with Aura.

You can create a connector from Confluent Cloud to Neo4j and Neo4j AuraDB by following the instructions on the Neo4j Developer Blog.

Create Source Instance

Having created our custom connectors in the previous section, we can now start configuring our Source instance.

  1. In Confluent Cloud, go to Connectors section for your cluster and search for plugin Neo4j Connector for Confluent Source which we have created in above.

  2. Click on the connector to start configuring our source connector instance.

  3. Configure an API Key for accessing your Kafka cluster, and click Continue.

  4. First click Auto Configure Schema Registry and select either of JSON Schema, Avro or Protobuf based on your preferences, and click Apply changes. This will generate a couple of configuration options for schema support. Next, configure the connector configuration options either as individual key value pairs or adding into the existing JSON. For quickstart, we will configure our source instance so that it will send change event messages on nodes matching pattern (:TestSource) over to the topics named creates, updates and deletes, using your preferred serialization format.

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.source-strategy": "CDC",
      "neo4j.start-from": "NOW",
      "neo4j.cdc.poll-interval": "1s",
      "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
      "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
      "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
      "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
    }

    Verify that all your configuration options are correct, and click Continue.

  5. At the next screen, we need to add connection endpoints so that our connector can access Neo4j or AuraDB. Extract your hostname and port from Neo4j Connection URI, and add it as an endpoint. Remember that default port number for Neo4j connections is 7687. For example, for connection URI neo4j+s://<redacted>.databases.neo4j.io we should enter <redacted>.databases.neo4j.io:7687 as an endpoint.

  6. Next, select how many tasks your connector should run with and click Continue. Source connectors always run with 1 task, so the default value of 1 would be enough.

  7. Finally, name your connector instance, review your settings and click Continue.

  8. The source instance will be provisioned and will be shown as Running in a couple of minutes.

Now that you have a running source instance, you can create the following nodes in Neo4j:

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

This will result in new messages being published to the topic named creates.

Create Sink Instance

Having created our Source instance in the previous section, we can now start configuring our Sink instance so that we can act upon messages generated by our Source instance.

  1. In Confluent Cloud, go to Connectors section for your cluster and search for plugin Neo4j Connector for Confluent Sink which we have created in above.

  2. Click on the connector to start configuring our sink connector instance.

  3. Configure an API Key for accessing your Kafka cluster, and click Continue.

  4. First click Auto Configure Schema Registry and select either of JSON Schema, Avro or Protobuf based on your preferences, and click Apply changes. This will generate a couple of configuration options for schema support. Next, configure the connector configuration options either as individual key value pairs or adding into the existing JSON. For quickstart, we will configure our sink instance so that it will execute a Cypher statement for each message received from the topics named creates, updates and deletes.

    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }
    {
      "confluent.custom.schema.registry.auto": "true",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.optional.for.nullables": true,
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.optional.for.nullables": true,
      "topics": "creates,updates,deletes",
      "neo4j.uri": "neo4j+s://<redacted>.databases.neo4j.io",
      "neo4j.authentication.type": "BASIC",
      "neo4j.authentication.basic.username": "neo4j",
      "neo4j.authentication.basic.password": "<redacted>",
      "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
      "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
      "neo4j.cypher.bind-timestamp-as": "",
      "neo4j.cypher.bind-header-as": "",
      "neo4j.cypher.bind-key-as": "",
      "neo4j.cypher.bind-value-as": "__value",
      "neo4j.cypher.bind-value-as-event": "false"
    }

    Verify that all your configuration options are correct, and click Continue.

  5. At the next screen, we need to add connection endpoints so that our connector can access Neo4j or AuraDB. Extract your hostname and port from Neo4j Connection URI, and add it as an endpoint. Remember that default port number for Neo4j connections is 7687. For example, for connection URI neo4j+s://<redacted>.databases.neo4j.io we should enter <redacted>.databases.neo4j.io:7687 as an endpoint.

  6. Next, select how many tasks your connector should run with and click Continue.

  7. Finally, name your connector instance, review your settings and click Continue.

  8. The sink instance will be provisioned and will be shown as Running in a couple of minutes.

Testing It Out

Now you can access your Confluent Cloud cluster, and verify at least the creates topic is created as specified in the connector configuration.

With both source and sink connectors running, the previously created :TestSource nodes will result in messages being published into the creates topic by the source instance. These messages will then be consumed by the sink instance, and corresponding :Person and :Family nodes to be created inside Neo4j. As you create, update and delete the TestSource labelled nodes, updates and deletes topics will also be created.

Check that this is the case, by executing the following query in the Neo4j Browser at http://localhost:7474/browser/:

MATCH (n:(Person | Family)) RETURN n

You can now create, update or delete Person and Family nodes by executing more statements like:

Create a new person
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

Verify that a new Person and a new Family node is created and linked together.

Update an existing person
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

Verify that the existing Person node is now updated with a surname of smith and linked to a new Family node.

Delete an existing person
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

Verify that the existing Person node is now deleted.

Summary

In this quickstart, we have shown how to configure an AuraDB/Neo4j database to act as both the source of messages for Kafka topics and the sink for those same messages to create, update or delete nodes and relationships in the database. Typically, our connector is used as either a sink when pulling data from other data sources via Confluent or as source for Confluent to push data into other databases.

Troubleshooting

  • Make sure you have CDC enabled on your database.

  • Make sure you have set the connection endpoints correctly as specified above.

  • Check logs for the connector.

Please note that custom connector logs in Confluent Cloud may not be available immediately. It might be useful to keep this in mind while testing out configuration changes on the connector instance.