Source: Neo4j → Kafka
The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j. The most recent version of the Kafka Connect Neo4j Connector can be found here. |
Is the transaction event handler events that sends data to a Kafka topic
Configuration
You can set the following configuration values in your neo4j.conf
, here are the defaults.
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=
kafka.topic.discovery.polling.interval=300000
kafka.streams.log.compaction.strategy=delete
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<default/all>
streams.source.enabled=<true/false, default=true>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
streams.procedures.enabled.from.<DB_NAME>=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>
To use the Kafka transactions please set |
See the Apache Kafka documentation for details on these settings.
In case you Kafka broker is configured with auto.create.topics.enable
to false
,
all the messages sent to topics that don’t exist are discarded;
this because the KafkaProducer.send()
method blocks the execution, as explained in KAFKA-3539.
You can tune the custom property kafka.topic.discovery.polling.interval
in order to
periodically check for new topics into the Kafka cluster so the plugin will be able
to send events to the defined topics.
With kafka.streams.log.compaction.strategy=delete
will be generated a sequence of unique keys with Neo4j Streams Source.
instead with kafka.streams.log.compaction.strategy=compact
the keys will be adapted to enable
Log Compaction on the Kafka side.
Please note that delete strategy does not actually delete records, it has this name to match the topic config cleanup.policy=delete/compact
.
Namely, the operations which will involve the same nodes or relationships, will have the same key.
When kafka.streams.log.compaction.strategy=compact
, for partitioning we leverage internal Kafka mechanism.
See Message structure section to see key examples
Multi Database Support
Neo4j 4.0 Enterprise has multi-tenancy support,
in order to support this feature you can set for each database instance a configuration suffix with the following pattern
from.<DB_NAME>
to the properties in your neo4j.conf file.
Following the list of new properties that allows to support multi-tenancy:
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<PATTERN>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
This means that for each db instance you can specify if:
-
use the source connector
-
the routing patterns
So if you have a instance name foo
you can specify a configuration in this way:
streams.source.topic.nodes.myTopic.from.foo=<PATTERN>
streams.source.topic.relationships.myTopic.from.foo=<PATTERN>
streams.source.enabled.from.foo=<true/false, default=true>
The old properties:
streams.source.enabled=<true/false, default=true>
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.procedures.enabled=<true/false, default=true>
are still valid and they refer to Neo4j’s default db instance.
The default database is controlled by Neo4j’s dbms.default_database configuration property so we’re being clear about which default database applies for this user. Database names are case-insensitive and normalized to lowercase, and must follow Neo4j database naming rules. (Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration) |
In particular the following property will be used as default value for non-default db instances, in case of the specific configuration params is not provided:
streams.source.enabled=<true/false, default=true>
This means that if you have Neo4j with 3 db instances:
-
neo4j (default)
-
foo
-
bar
and you want to enable the Source plugin on all instances, you can simply omit any configuration about enabling it, you just need to provide the routing configuration for each instance:
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}
Otherwise if you want to enable the Source plugin only on foo
and bar
instances,
you can do it in this way:
streams.source.enabled=false
streams.source.enabled.from.foo=true
streams.source.enabled.from.bar=true
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}
As you can see, if you want to enable the Source plugin only on one or more specific db instances, you have to previously
disable the Source plugin ( |
So in general if you have:
streams.source.enabled=true
streams.source.enabled.from.foo=false
Then Source module is enabled on all databases EXCEPT foo
(local overrides global)
For example purposes only, imagine a situation like the following: You have a Neo4j instance, without Neo4j Streams installed, where a database "testdb" was created and populated. You decide to install the Neo4j Streams plugin because we want to have also our graph data into Kafka. So you add the following configuration:
Doing so you expect to reflect all the created/updated nodes with label
The second point happens because, since the database "testdb" is already populated, by enabling the Source module ( If you want to turn off this default behaviour you have to disable the "generic" Source module and enable it just for the database you are interested of:
|
Serializers
To allow insertion of keys in any format (e.g. through streams.publish
procedure)
the key.serializer
is set with the org.apache.kafka.common.serialization.ByteArraySerializer
like value.serializer
Message structure
The message key structure depends on kafka.streams.log.compaction.strategy
.
With delete is a string: "${meta.txId + meta.txEventId}-${meta.txEventId}".
"[txId+txEventId] - txEventId "
where:
-
txId
identifies the transaction that affected the entity -
txEventId
is a counter that identifies the internal order in which Neo4j handled the specific event -
[txId+txEventId] is the numeric sum of the two previous values
Instead with compact:
In case of node without constrained label the key is the string value of node id.
In case of node with constrained label, the key is a json with {ids: mapOfConstaint , labels: listOfLabels}
For example, with this configuration:
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
this constraint:
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE
and this query:
CREATE (:Person {name:'Sherlock', surname: 'Holmes'})
We obtain this key:
{"ids": {"name": "Sherlock"}, "labels": ["Person"]}
Otherwise, with the same configuration and query as above, but with the constraint:
CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY
We obtain this key:
{"ids": {"surname": "Holmes", "name": "Sherlock"}, "labels": ["Person"]}
In case of relationship, the key is a json with {start: START_NODE , end: END_NODE, label: typeOfRelationship}
START_NODE and END_NODE node follow the same rule as above.
For example, with this configuration:
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
these constraints:
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;
and these queries:
CREATE (:Person {name:'Pippo'});
CREATE (p:Product {code:'1367', name: 'Notebook'});
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);
We obtain this key:
{"start": {"ids": {"name": "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code": "1367"}, "labels": ["Product"]},
"label": "BUYS"}
Otherwise, with this configuration:
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact
without constraints, and with these queries:
CREATE (:Person {name:'Pippo'})
We obtain this key:
{"start": "0", "end": "1", "label": "BUYS"}
In case of relationships with multiple constraints on start or end node,
the |
Patterns
Nodes
To control which nodes are sent to Kafka, and which of their properties you can define node-patterns in the config.
You can chose Labels and properties for inclusion or exclusion, with *
meaning all.
Patterns are separated by semicolons ;
.
The basic syntax is:
Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
pattern | meaning |
---|---|
|
all nodes with this label with all their properties go to the related topic |
|
nodes with these two labels are sent to the related topic |
|
the |
|
in the node with label |
Relationships
To control which relationships are sent to Kafka, and which of their properties you can define relationships-patterns in the config.
You can chose Type and properties for inclusion or exclusion, with *
meaning all.
Patterns are separated by semicolons ;
.
The basic syntax is:
KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
pattern | meaning |
---|---|
|
all relationship with this label with all their properties go to the related topic |
|
the |
|
in the relationship with type KNOWS properties |
Relationship key strategy
See the Key strategies section.
Transaction Event Handler
The transaction event handler is the core of the Stream Producer and allows to stream database changes.
Events
The Producer streams three kind of events:
-
created: when a node/relation/property is created
-
updated: when a node/relation/property is updated
-
deleted: when a node/relation/property is deleted
Created
Following an example of the node creation event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
Updated
Following an example of the node update event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "updated",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"before": {
"labels": ["Person", "Tmp"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne"
}
},
"after": {
"labels": ["Person"],
"properties": {
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"first_name": "Anne Marie",
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String",
"geo": "point"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "updated",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"before": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]",
"to": "2019-04-05T23:00:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime",
"to": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
Deleted
Following an example of the node creation event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "deleted",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"before": {
"labels": ["Person"],
"properties": {
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"first_name": "Anne Marie",
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String",
"geo": "point"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "deleted",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"before": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]",
"to": "2019-04-05T23:00:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime",
"to": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
Meta
The meta field contains the metadata related to the transaction event:
Field | Type | Description |
---|---|---|
timestamp |
Number |
The timestamp related to the transaction event |
username |
String |
The username that generated the transaction |
tx_id |
Number |
The transaction id provided by the Neo4j trasaction manager |
tx_event_count |
Number |
The number of the events included into the transaction (i.e. 2 update on nodes, 1 relationship creation) |
tx_event_id |
Number |
The id of the event inside the transaction |
operation |
enum["created", "updated", "deleted"] |
The operation type |
source |
Object |
Contains the information about the source |
Payload
The payload field contains the information about the the data related to the event
Field | Type | Description |
---|---|---|
id |
Number |
The id of the graph entity |
type |
enum["node", "relationship"] |
The type of the graph entity |
before |
Object |
The data before the transaction event |
after |
Object |
The data after the transaction event |
Payload: before and after
We must distinguish two cases:
Nodes
Field | Type | Description |
---|---|---|
labels |
String[] |
List of labels attached to the node |
properties |
Map<K, V> |
List of properties attached to the node, the |
Schema
Field | Type | Description |
---|---|---|
constraints |
Object[] |
List of constraints attached to the entity |
properties |
Map<K, V> |
List of properties attached to the entity, where the |
Constraints
Nodes and Relationships can have a list of constraints attached to them:
Field | Type | Description |
---|---|---|
label |
String |
The label attached to the constraint |
type |
enum["UNIQUE", "NODE_PROPERTY_EXISTS", "RELATIONSHIP_PROPERTY_EXISTS"] |
The constraint type |
properties |
String[] |
List of properties involved in the constraint |