Cypher Strategy
This strategy executes corresponding Cypher statements for each message received.
To configure a cypher strategy for a desired topic, you must follow the following convention:
"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"
Starting with version 5.1.0 of the Neo4j Connector for Kafka, the Cypher strategy binds header, key, and value of the messages as For backward compatibility, |
Example
Given that you configure the topics your sink connector subscribes to within the sink configuration settings as follows;
"topics": "creates,updates,deletes"
You need to declare that you want to use cypher
strategy and provide the corresponding Cypher statement for each topic, similar to the following;
"topics": "creates,updates,deletes",
"neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
"neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"
The above configuration excerpt defines that;
-
messages received from
creates
topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
messages received from
updates
topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)
-
messages received from
deletes
topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p
Creating the Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.cypher.neo4j.json
into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cypher.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters
.
Verify that the configured connector instance is running in the Connect
tab under connect-default
.
Under the hood the connector will create a batch of changes for each topic, and will execute the query with a prepended
where |