Pattern strategy
This strategy allows you to extract nodes and relationships from a kafka message through extraction patterns.
Configuration
To configure a pattern strategy for a desired topic, you must follow the following convention:
"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"
For instance, given the following message published to the user
and lives_in
topics:
{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}
You can transform it into a node by providing the following configuration:
"neo4j.pattern.topic.user": "(:User{!id: userId})"
You can also provide relationship patterns to transform it into a path, like (n)-[r]→(m)
, by providing the following configuration:
"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
Relationship key properties can only be used with Neo4j Enterprise Edition 5.7 and above, and AuraDB 5. |
Creating Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.pattern.neo4j.json
into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.pattern.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters
.
Verify that the configured connector instance is running in the Connect
tab under connect-default
.
Patterns
Node Patterns
Node patterns are defined similar to Cypher node patterns.
-
Start with
(
. -
Define optional list of labels, separated by
:
, such as:Person
or:Person:Employee
. -
Open property section with
{
. -
Define properties to be treated as keys, each prepended with
!
, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format ofuserId: __key.user.id
. By default, messages fields can be referenced as__timestamp
,__headers
,__key
and__value
. -
Either;
-
Nothing or
*
, meaning that assign all properties from message to the node. -
List of property names to be assigned to the node from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of
userName: __value.user.name
. By default, messages fields can be referenced as__timestamp
,__headers
,__key
and__value
. -
List of property names not to be assigned to the node, each prepended with
-
, all other properties from message will be assigned to the node.
-
-
Close property section with
}
. -
End with
)
.
You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contain either all exclusion or inclusion properties. |
Examples
-
MERGE
operation onUser
label withuserId
treated as a key and assigning all properties from incoming message value to the node(:User{!userId})
or
(:User{!userId, *})
-
MERGE
operation onUser
label withuserId
treated as a key and assigning onlysurname
property from incoming message to the node(:User{!userId, surname})
-
MERGE
operation onUser
label withuserId
treated as a key and assigning onlysurname
andaddress.city
properties from incoming message to the node(:User{!userId, surname, city: address.city})
-
MERGE
operation onUser
label withuserId
treated as key and assigning all properties excludingaddress
property from incoming message to the node(:User{!userId, -address})
-
MERGE
operation onUser
label withuserId
treated as key which will be taken from the key part of the message and assigning onlyname
andsurname
properties from the value part of incoming message to the node(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
-
MERGE
operation onUser
label withuserId
treated as key which will be taken from the key part of incoming message and assigning onlyname
,surname
properties from the value part,createdBy
from the header andcreatedAt
property as current timestamp to the node(:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})
Relationship Patterns
Relationship patterns are defined similar to Cypher relationship patterns.
-
Node pattern for start node.
-
-[
-
Define relationship type, prepended by
:
, such as:BOUGHT
or:KNOWS
. -
Open property section with
{
. -
[Optional] Define properties to be treated as keys, each prepended with
!
, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format ofrelationshipId: __key.relationship.id
. By default, messages fields can be referenced as__timestamp
,__headers
,__key
and__value
. -
Either;
-
Nothing or
*
, meaning that assign all properties from message to the relationship. -
List of property names to be assigned to the relationship from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of
relationshipType: __value.relationship.type
. By default, messages fields can be referenced as__timestamp
,__headers
,__key
and__value
. -
List of property names not to be assigned to the relationship, each prepended with
-
, all other properties from message will be assigned to the relationship.
-
-
Close property section with
}
. -
]->
-
Node pattern for end node.
You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contains all exclusion or inclusion properties. |
Examples
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively andMERGE
aBOUGHT
relationship between them assigning all other message properties to the relationship(:User{!userId})-[:BOUGHT]->(:Product{!productId})
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively andMERGE
aBOUGHT
relationship between them assigning onlyprice
andcurrency
properties from incoming message to the relationship(:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively andMERGE
aBOUGHT
relationship between them assigning onlyprice
,currency
andshippingAddress.city
properties from incoming message to the relationship(:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively andMERGE
aBOUGHT
relationship between them assigning all properties excludingshippingAddress
from incoming message to the relationship(:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively, assignuserFirstName
anduserLastName
properties from the message to theUser
node andMERGE
aBOUGHT
relationship between them assigning onlyprice
andcurrency
properties from the message to the relationship(:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
-
MERGE
operation onUser
andProduct
labels withuserId
andproductId
treated as keys respectively andMERGE
aBOUGHT
relationship between them assigning onlytransactionId
property as a key property from key part of the message anddate
from value part of the message to the relationship(:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})
Tombstone records
The pattern strategy supports tombstone records.
In order to use it, message key should contain at least the key properties present in the provided pattern and message value should be set as null
.
It is not possible to define multiple patterns for a single topic, such as extracting more than one node or relationship type from a single message. In order to achieve this, you have to use a different topic for each pattern. |