Query strategy
Query strategy allows users to define their own Cypher query to extract changes. This requires proper schema modifications, such as tracking changes through a dedicated change tracking property such as timestamps on nodes or relationships or using soft-deletes to track deletion of entities.
Configuration
First, you need to select QUERY strategy for the connector instance;
"neo4j.source-strategy": "QUERY"
Second, you need to define your query to track changes and where to publish them.
"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 | Topic name which will receive the message. |
2 | A Cypher query that returns changed entities since the last iteration, sent in by $lastCheck parameter. |
3 | The property (field name) that we use as a cursor to track changes. This needs to be part of the returned results. |
Creating Source instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named source.query.neo4j.json
into a local directory.
We will now create the source instance by invoking the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.query.neo4j.json
This will create a Kafka Connect source instance that will send change event messages derived by the provided query over to the my-topic
topic, using your preferred serialization format.
In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.
Generated change event messages in this case will have the following structure:
{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}