CUD File Format Strategy
The CUD file format is a JSON file that represents operations (Create, Update, Delete) performed on nodes or relationships.
Configuration
You can configure the topic in the following way:
"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"
For instance, if you have two topics, topic.1
and topic.2
that you publish as CUD-formatted messages, you can configure the sink instance as follows.
"neo4j.cud.topics": "topic.1,topic.2"
Creating Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.cud.neo4j.json
into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cud.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters
.
Verify that the configured connector instance is running in the Connect
tab under connect-default
.
CUD Format Specification
We have two formats:
-
Node
-
Relationship
Node
The Node format defines a set of fields which describes the operation (create
, update
, merge
and delete
), label and properties of a node entity.
Field | Mandatory | Description | ||
---|---|---|---|---|
op |
Mandatory |
The operation type.
One of
|
||
properties |
Optional when operation is |
The properties attached to the node. |
||
ids |
Optional when operation is |
Contains the primary/unique key properties that will be used to look up the entity.
In case you use
|
||
labels |
Optional |
The labels attached to the node.
|
||
type |
Mandatory |
The entity type: |
||
detach |
Optional |
When operation is
|
Examples
-
An example of a
CREATE
operation;{ "type": "node", "op": "create", "labels": ["Foo", "Bar"], "properties": { "id": 1, "foo": "foo-value" } }
which would be transformed into the following Cypher query:
CREATE (n:Foo:Bar) SET n = $properties
-
An example of a
UPDATE
operation;{ "type": "node", "op": "update", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }
which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
-
An example of a
MERGE
operation;{ "type": "node", "op": "merge", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }
which would be transformed into the following Cypher query:
MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
-
An example of a
DELETE
operation;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 } }
which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
-
An example of a
DELETE
operation with detachtrue
;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "detach": true }
which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n
Relationship
Relationship format defines a set of fields which describes the operation (create
, update
, merge
and delete
), relationship type, source and target node references and properties.
Field | Mandatory | Description | ||
---|---|---|---|---|
op |
Mandatory |
The operation type. One of |
||
properties |
Optional when operation is |
The properties attached to the relationship. |
||
rel_type |
Mandatory |
The relationship type. |
||
ids |
Optional |
Contains the primary/unique key properties that will be used to look up the relationship. |
||
from |
Mandatory |
Contains information about the source node of the relationship.
|
||
to |
Mandatory |
Contains information about the target node of the relationship.
|
||
type |
Mandatory |
The entity type: |
Examples
-
An example of a
CREATE
operation;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties
-
An example of a
CREATE
operation withmerging
the source node;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 }, "op": "merge" }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "match" }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MERGE (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties
-
An example of a
UPDATE
operation;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) SET r += $properties
-
An example of a
UPDATE
operation withrelationship ids
;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties
-
An example of a
MERGE
operation;{ "type": "relationship", "op": "merge", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO]->(end) SET r += $properties
-
An example of a
MERGE
operation withrelationship ids
;{ "type": "relationship", "op": "MERGE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties
-
An example of a
DELETE
operation;{ "type": "relationship", "op": "delete", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) DELETE r
-
An example of a
DELETE
operation withrelationship ids
;{ "type": "relationship", "op": "DELETE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "ids": { "id": 5 } }
which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) DELETE r