Neo4j Streams - Sink: Kafka → Neo4j
The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j. The most recent version of the Kafka Connect Neo4j Connector can be found here. |
Is the Kafka Sink that ingest the data directly into Neo4j
How it works
It works in several ways:
-
by providing a Cypher template
-
by ingesting the events emitted from another Neo4j instance via the Change Data Capture module
-
by providing a pattern extraction to a JSON or AVRO file
-
by managing a CUD file format
The Cypher Template strategy is the only Sink strategy that guarantees messages to be processed in the same order as they arrive in a topic. Other Sink strategies group messages together by type of operation, which can also be optimised into batches. In this case, the execution order is the following:
|
Cypher Template
It works with template Cypher queries stored into properties with the following format:
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
Each Cypher template must refer to an event object that will be injected by the Sink
Following an example:
For this event
{
"id": 42,
"properties": {
"title": "Answer to anyting",
"description": "It depends."
}
}
streams.sink.topic.cypher.my-topic=MERGE (n:Label {id: event.id}) \
ON CREATE SET n += event.properties
Under the hood the Sink inject the event object as a parameter in this way
UNWIND {events} AS event
MERGE (n:Label {id: event.id})
ON CREATE SET n += event.properties
Where {events}
is a json list, so continuing with the example above a possible full representation could be:
:params events => [{id:"alice@example.com",properties:{name:"Alice",age:32}},
{id:"bob@example.com",properties:{name:"Bob",age:42}}]
UNWIND {events} AS event
MERGE (n:Label {id: event.id})
ON CREATE SET n += event.properties
When you decide to use Cypher template as Sink strategy to import data from Kafka into Neo4j, you have to be sure about the query correctness. If the query is not optimized, this could also results into possible performance issue or in situations where the plugin seems to be stuck, for example if the query loads a large amount of nodes and relationships into memory. We suggests the following:
|
Change Data Capture Event
This method allows to ingest CDC events coming from another Neo4j Instance. You can use two strategies:
-
The
SourceId
strategy which merges the nodes/relationships by the CDC eventid
field (it’s related to the Neo4j physical ID) -
The
Schema
strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined in your graph model
The SourceId
strategy
You can configure the topic in the following way:
streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
streams.sink.topic.cdc.sourceId=my-topic;my-other.topic
Each streams event will be projected into the related graph entity, for instance the following event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
will be persisted as the following node:
Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}
as you can notice, ingested event has been projected with two peculiarities:
-
the
id
field has transformed intosourceId
; -
the node has an additional label
SourceEvent
;
these two fields will be used in order to match the node/relationship for future updates/deletes
The Schema
strategy
You can configure the topic in the following way:
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=my-topic;my-other.topic
Each streams event will be projected into the related graph entity, for instance the following event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
will be persisted as the following node:
Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}
The Schema
strategy leverages the schema
field in order to insert/update the nodes so no extra fields will be created.
In case of relationship
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
the Schema
strategy leverages the ids
fields in order to insert/update the relationships so no extra fields will be created.
The Pattern
strategy
The Pattern
strategy allows you to extract nodes and relationships from a json by providing a extraction pattern
Each property can be prefixed with:
-
!
: identify the id (could be more than one property), it’s mandatory -
-
: exclude the property from the extraction If no prefix is specified this means that the property will be included
You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties |
Labels can be chained via :
The pattern strategy come out with the support to the Tombstone Record,
in order to leverage it your event should contain as key the record that you want to delete and null
for the value.
Currently you can’t concatenate multiple patterns (for example in case you use just one topic and produce more then one node/relationship type). So you have to use a different topic for each type of node/relationship and define a pattern for each of them |
The Node Pattern
configuration
You can configure the node pattern extraction in the following way:
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
So for instance, given the following json
published via the user
topic:
{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}
You can transform it into a node by providing the following configuration:
by specifying a simpler pattern:
streams.sink.topic.pattern.node.user=User{!userId}
or by specifying a Cypher like node pattern:
streams.sink.topic.pattern.node.user=(:User{!userId})
Similar to the CDC pattern you can provide:
pattern | meaning |
---|---|
|
the userId will be used as ID field and all properties of the json will be attached to the node with the provided
labels ( |
|
the userId will be used as ID field and only the surname property of the json will be attached to the node with the provided
labels ( |
|
the userId will be used as ID field and only the surname and the |
|
the userId will be used as ID field and the |
The Relationship Pattern
configuration
You can configure the relationship pattern extraction in the following way:
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
So for instance, given the following json
published via the user
topic:
{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}
You can transform it into a path, like (n)-[r]→(m)
, by providing the following configuration:
By specifying a simpler pattern:
streams.sink.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}
or by specifying a Cypher like node pattern:
streams.sink.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})
in this last case the we assume that User
is the source node and Product
the target node
Similar to the CDC pattern you can provide:
pattern | meaning |
---|---|
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
Attach properties to node
By default no properties will be attached to the edge nodes but you can specify which property attach to each node. Given the following json
published via the user
topic:
{
"userId": 1,
"userName": "Andrea",
"userSurname": "Santurbano",
"productId": 100,
"productName": "My Awesome Product!",
"price": 10,
"currency": "€"
}
pattern | meaning |
---|---|
|
this will merge two nodes and the |
CUD File Format
The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations.
You can configure the topic in the following way:
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=my-topic;my-other.topic
We have two formats:
-
One for nodes:
We provide an example of a
MERGE
operation{ "op": "merge", "properties": { "foo": "value", "key": 1 }, "ids": {"key": 1, "otherKey": "foo"}, "labels": ["Foo","Bar"], "type": "node", "detach": true }
which would be transformed into the following Cypher query:
UNWIND [..., {
"op": "merge",
"properties": {
"foo": "value",
"key": 1
},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": true
}, ...] AS event
MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
SET n += event.properties
Lets describe the fields:
field | mandatory | Description |
---|---|---|
op |
yes |
The operation type: create/merge/update/delete N.B. delete messages are for individual nodes it’s not intended to be a generic way of doing cypher query building from JSON |
properties |
no in case the operation is |
The properties attached to the node |
ids |
no in case the operation is |
In case the operation is merge/update/delete this field is mandatory and contains
the primary/unique keys of the node that will be use to do the lookup to the entity.
In case you use as key the N.B. If you’ll use the |
labels |
no |
The labels attached to the node. N.B. Neo4j allows to create nodes without labels, but from a performance perspective, it’s a bad idea don’t provide them. |
type |
yes |
The entity type: node/relationship ⇒ node in this case |
detach |
no |
In case the operation is delete you can specify if perform a "detach" delete that means delete any incident relationships when you delete a node N.B. if no value is provided, the default is true |
-
And one for relationships:
We provide an example of a CREATE
operation
{
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
which would be transformed into the following Cypher query:
UNWIND [..., {
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel-type": "MY-REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
CREATE (from)-[r:MY_REL]->(to)
SET r = event.properties
Lets describe the fields:
field | mandatory | Description |
---|---|---|
op |
yes |
The operation type: create/merge/update/delete |
properties |
no |
The properties attached to the relationship |
rel_type |
yes |
The relationship type |
from |
yes, if you use the |
Contains the info about the source node of the relationship.
For the description of the |
to |
yes, if you use the |
Contains the info about the target node of the relationship.
For the description of the |
type |
yes |
The entity type: node/relationship ⇒ relationship in this case |
Following another example of DELETE
operation for both node and relationship.
-
For Node, the following JSON:
{
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}
will be transformed in the following Cypher query:
UNWIND [..., {
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}, ...] AS event
MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
DELETE n
Note that if you set "detach": true
then the transformation will be:
UNWIND [
...
] AS event
...
DETACH DELETE n
-
For Relationship, the following JSON:
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
will be transformed in the following Cypher query:
UNWIND [..., {
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
MATCH (from)-[r:MY_REL]->(to)
DELETE r
We can create non-existent nodes at relationship creation/merging, putting "op": "merge"
in "from"
and/or "to"
field.
By default, "op" is match
, so the node is not created if it doesn’t exist.
We can write, for example:
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"],
"op": "merge"
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"],
"op": "merge"
},
"type":"relationship"
}
How deal with bad data
The Neo4j Streams Plugin provides several means to handle processing errors.
It can fail fast or log errors with different detail levels.
Another way is to re-route all the data and errors that for something reason it wasn’t able to ingest to a Dead Letter Queue
.
It behaves by default like Kafka Connect, see this blog post |
-
fail fast (abort) by default
-
need to configure dead-letter-queue topic to enable
-
need to enable logging explicitly
-
headers and message logging must be enabled explicitly
Config Options
Name | Value | Note |
---|---|---|
|
|
fail fast (default!) abort |
|
|
all == lenient, silently ignore bad messages |
|
|
log errors (default: false) |
|
|
log bad messages too (default: false) |
|
|
dead letter queue topic name, if left off no DLQ, default: not set |
|
|
enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false |
|
|
common prefix for header entries, e.g. |
|
|
replication factor, need to set to 1 for single partition, default:3 |
For the Neo4j extension you prefix them with streams.sink
in the Neo4j configuration.
Example settings:
errors.tolerance=none
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq-topic
errors.deadletterqueue.context.headers.enable=true
streams.sink.errors.tolerance=all
streams.sink.errors.deadletterqueue.topic.name=my-dlq-topic
streams.sink.errors.deadletterqueue.context.headers.enable=true
Every published record in the Dead Letter Queue
contains the original record Key
and Value
pairs and optionally the following headers:
Header key | Description |
---|---|
|
The topic where the data is published |
|
The topic partition where the data is published |
|
The offset of the data into the topic partition |
|
The class that generated the error |
|
The exception that generated the error |
|
The exception message |
|
The exception stack trace |
|
The database name |
Supported Kafka deserializers
The Neo4j Streams plugin supports 2 deserializers:
-
org.apache.kafka.common.serialization.ByteArrayDeserializer
: if you want manage JSON messages -
io.confluent.kafka.serializers.KafkaAvroDeserializer
: if you want manage AVRO messages
You can define them independently for Key
and Value
as specified in the Configuration paragraph
Configuration summary
You can set the following Kafka configuration values in your neo4j.conf
, here are the defaults.
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>
streams.check.apoc.timeout=<ms to await for APOC being loaded, default -1 skip the wait>
streams.check.apoc.interval=<ms interval awaiting for APOC being loaded, default 1000>
streams.sink.poll.interval=<The delay interval between poll cycles, default 0>
See the Apache Kafka documentation for details on these settings.
if streams.cluster.only is set to true, streams will refuse to start in single instance mode,
or when run in the context of the backup operation. This is an important safety guard to ensure that operations do not occur in unexpected situations for production deploys
|
See the Apache Kafka documentation for details on these settings.
Custom Kafka Configurations
In this section we describe the meaning of specific Neo4j streams Kafka configurations
kafka.streams.async.commit
If kafka.enable.auto.commit=false
this property allows you to manage how to commit the messages to the topic.
Possible values:
-
false
(default) under-the-hood we use the Kafka ConsumercommitSync
method -
true
under-the-hood we use the Kafka ConsumercommitAsync
method
commitSync
VS commitAsync
commitSync
is a synchronous commits and will block until either the commit
succeeds or an unrecoverable error is encountered (in which case it is thrown
to the caller).
That means, the commitSync
is a blocking method with an interal retry mechanism,
that can affect the performance of the ingestion because a new batch of messages
will be processed only when the commit ended.
On the other hand commitAsync
is an asynchronous call (so it will not block)
and does not provide an internal retry mechanism.
If you have to ensure the data consistency, choose commitSync
because it will make sure that, before doing any further actions,
you will know whether the offset commit is successful or failed.
But because it is sync and blocking, you will spend more time on waiting for the commit
to be finished, which leads to high latency.
If you are ok of certain data inconsistency and want to have low latency, choose commitAsync
because it will not wait to be finished.
Multi Database Support
Neo4j 4.0 Enterprise has multi-tenancy support,
in order to support this feature you can set for each database instance a configuration suffix with the following pattern
to.<DB_NAME>
to the properties in your neo4j.conf file.
Following the list of new properties that allows to support multi-tenancy:
streams.sink.topic.cypher.<TOPIC_NAME>.to.<DB_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.<DB_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.<DB_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.<DB_NAME>=<true/false, default=false>
Please note the |
This means that for each db instance you can specify if:
-
use the source connector
-
the routing patterns
So if you have a instance name foo
you can specify a configuration in this way:
streams.sink.topic.cypher.<TOPIC_NAME>.to.foo=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.foo=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.foo=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled.to.foo=<true/false, default=false>
The old properties:
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=false>
are still valid and they refer to Neo4j’s default db instance, which is usually called neo4j
, but can be controlled by
separate Neo4j system configuration.
The default database is controlled by Neo4j’s dbms.default_database configuration property so we’re being clear about which default database applies for this user. Database names are case-insensitive and normalized to lowercase, and must follow Neo4j database naming rules. (Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration) |
In particular the following property will be used as default values for non-default db instances, in case of the specific configuration params is not provided:
streams.sink.enabled=<true/false, default=false>
This means that if you have Neo4j with 3 db instances:
-
neo4j (default)
-
foo
-
bar
and you want to enable the Sink plugin on all instance you can simply omit any configuration about enabling it, you just need to provide the routing configuration for each instance:
streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.neo4j=MERGE (c:MyLabel{myId: event.myId}) SET c += event.properties
Otherwise if you want to enable the Sink plugin only on customers
and products
instances
you can do it in this way:
streams.sink.enabled=false
streams.sink.enabled.to.foo=true
streams.sink.enabled.to.bar=true
streams.sink.topic.cypher.fooTopic.to.foo=MERGE (f:Foo{fooId: event.fooId}) SET c += event.properties
streams.sink.topic.cypher.barTopic.to.bar=MERGE (b:Bar{barId: event.barId}) SET c += event.properties
So in general if you have:
streams.sink.enabled=true
streams.sink.enabled.to.foo=false
Then sink is enabled on all databases EXCEPT foo (local overrides global)