Change Data Capture Strategy
This strategy allows to ingest CDC events coming from another Neo4j instance, generated either by a source connector instance configured for Change Data Capture strategy or the deprecated Neo4j Streams plugin.
Change Data Capture events needs to be generated by the same corresponding version of the Source connector, which must be configured using a value converter that supports schemas. |
Two sub-strategies are available:
-
The Schema strategy merges nodes and relationships by the constraints (node key, relationship key and/or property uniqueness + existence) defined in the source database.
-
The Source ID strategy merges nodes and relationships by the CDC event’s
elementId
orid
fields (internal Neo4j entity identifier).
Schema sub-strategy
The Schema
strategy merges nodes and relationships using the constraints declared in the change event, thus preserving the source schema structure.
Configuration of this strategy requires declaration of list of topics to read change events from.
"neo4j.cdc.schema.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"
Example
Given that you configure the topics your sink connector subscribes to as follows;
"topics": "topic.1,topic.2"
You need to declare that you want to use the cdc.schema
strategy by providing the list of topics you want to consume change events from.
"neo4j.cdc.schema.topics": "topic.1,topic.2"
Each change event will then be projected into a graph entity.
Consider this node creation event:
The relationship is persisted as follows, with the Sink connector using the keys
or schema
fields in order to insert/update the nodes, without a need for extra properties or labels.
Consider this relationship creation event:
The relationship is persisted as follows, with the Sink connector using the keys
fields of the start and end nodes from the change event to create or update the relationship, again without a need for extra properties or labels.
Creating Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.cdc.schema.neo4j.json
into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cdc.schema.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters
.
Verify that the configured connector instance is running in the Connect
tab under connect-default
.
Source ID sub-strategy
The Source ID
strategy merges nodes and relationships by the source entity’s elementId
or id
values, by storing this value as an explicit property on the target nodes and relationships and by marking nodes with an explicit label.
The configuration of this strategy requires declaring the list of topics to read change events from.
You can add an optional label name to use as a marker, and an optional property name to store the elementId
or id
values of the source entities.
"neo4j.cdc.source-id.topics": "<comma-separated list of topics>"
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>"
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"
Example
Given that you configure the topics your sink connector subscribes to as follows;
"topics": "topic.1,topic.2"
You need to declare that you want to use cdc.source-id
strategy by providing the list of topics you want to consume change events from.
"neo4j.cdc.source-id.topics": "topic.1,topic.2"
Each change event will then be projected into a graph entity.
Consider this node creation event:
The node is persisted as follows, with the Sink connector using the elementId
or id
fields of the node change event to create or update the node.
Consider this relationship creation event:
The relationship is persisted as follows, with the Sink connector using the elementId
or id
fields of the start and end nodes from the change event to create or update the relationship.
Creating Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.cdc.source-id.neo4j.json
into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cdc.source-id.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters
.
Verify that the configured connector instance is running in the Connect
tab under connect-default
.