Neo4j Streams - Procedures
The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j. The most recent version of the Kafka Connect Neo4j Connector can be found here. |
The Streams project comes out with a list of procedures.
Configuration
You can enable/disable the procedures by changing this variable inside the neo4j.conf
streams.procedures.enabled=<true/false, default=true>
Please note that by default the
If you try to CALL one of the Streams procedures without declaring them into the whitelist, you will receive an error like the following: Figure 1. Neo4j Streams procedure not found
|
Multi Database Support
Neo4j 4.0 Enterprise has multi-tenancy support,
in order to support this feature you can set for each database instance a configuration suffix with the following pattern
<DB_NAME>
to the properties in your neo4j.conf file.
So, to enable the Streams procedures the following property should be added:
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>
So if you have a instance name foo
you can specify a configuration in this way:
streams.procedures.enabled.foo=<true/false, default=true>
The old property:
streams.procedures.enabled=<true/false, default=true>
are still valid and it refers to Neo4j’s default db instance.
In particular the following property will be used as default value for non-default db instances, in case of the specific configuration params is not provided:
streams.procedures.enabled=<true/false, default=true>
streams.publish
This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.
Uses:
CALL streams.publish('my-topic', 'Hello World from Neo4j!')
The message retrieved from the Consumer is the following:
{"payload":"Hello world from Neo4j!"}
If you use a local docker (compose) setup, you can check for these messages with:
docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092
Input Parameters:
Variable Name | Type | Description |
---|---|---|
|
String |
The topic where you want to publish the data |
|
Object |
The data that you want to stream |
Configuration parameters:
Name | Type | Description |
---|---|---|
|
Object |
The key value of message that you want to stream. Please note that if the key doesn’t exist, you get a message with a random UUID as key value |
|
Int |
The partition of message that you want to stream |
You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.
In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.
streams.publish.sync
Similar to streams.publish
procedure, but in a synchronous way.
Uses:
CALL streams.publish.sync('my-topic', 'my-payload', {<config>}) YIELD value RETURN value
This procedure return a RecordMetadata
value like this {"timestamp": 1, "offset": 2, "partition", 3, "keySize", 4, "valueSize", 5}
Variable Name | Description |
---|---|
|
The timestamp of the record in the topic/partition. |
|
The offset of the record in the topic/partition. |
|
The partition the record was sent to |
|
The size of the serialized, uncompressed key in bytes |
|
The size of the serialized, uncompressed value in bytes |
streams.consume
This procedure allows to consume messages from a given topic.
Uses:
CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event
Example:
Imagine you have a producer that publish events like this {"name": "Andrea", "surname": "Santurbano"}
, we can create user nodes in this way:
CALL streams.consume('my-topic') YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})
In case you want to read a specific offset of a topic partition you can do it by executing the following query:
CALL streams.consume('my-topic', {timeout: 5000, partitions: [{partition: 0, offset: 30}]}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})
Input Parameters:
Variable Name | Type | Description |
---|---|---|
|
String |
The topic where you want to publish the data |
|
Map<K,V> |
The configuration parameters |
Available configuration parameters
Variable Name | Type | Description |
---|---|---|
|
Number (default |
Define the time that the procedure should be listen the topic |
|
String |
It’s the Kafka configuration parameter |
|
String |
It’s the Kafka configuration parameter |
|
Boolean (default |
It’s the Kafka configuration parameter |
|
Boolean (default |
In case of |
|
String |
The comma separated string of Kafka nodes url.
If not specified it inherits the underlying |
|
List<Map<K,V>> |
The map contains the information about partition and offset in order to start reading from a |
|
String |
The supported deserializer for the Kafka Record Key
If not specified it inherits the underlying |
|
String |
The supported deserializer for the Kafka Record Value
If not specified it inherits the underlying |
|
String |
The schema registry url, required in case you are dealing with AVRO messages. |
Streams Sink Lifecycle procedure
We provide a set of procedures in order to manage the Sink lifecycle.
Proc. Name | Description |
---|---|
|
stops the Sink, and return the status, with the error if one occurred during the process |
|
starts the Sink, and return the status, with the error if one occurred during the process |
|
restart the Sink, and return the status, with the error if one occurred during the process |
|
returns the Sink config, please check the table "Streams Config" |
|
returns the status |
Please consider that in order to use this procedures you must enable the streams procedures and they are runnable only on the leader. |
Config Name | Description |
---|---|
invalid_topics |
return a list of invalid topics |
streams.sink.topic.pattern.relationship |
return a Map<K,V> where the K is the topic name and V is the provided pattern |
streams.sink.topic.cud |
return a list of topics defined for the CUD format |
streams.sink.topic.cdc.sourceId |
return a list of topics defined for the CDC SourceId strategy |
streams.sink.topic.cypher |
return a Map<K,V> where the K is the topic name and V is the provided Cypher Query |
streams.sink.topic.cdc.schema |
return a list of topics defined for the CDC Schema strategy |
streams.sink.topic.pattern.node |
return a Map<K,V> where the K is the topic name and V is the provided pattern |
streams.sink.errors |
return a Map<K,V> where the K sub property name, and V is the value |
streams.sink.source.id.strategy.config |
returns the config for the SourceId CDC strategy |
Example
Executing: CALL streams.sink.config()
+----------------------------------------------------------------------------------------------------------------------------------------------+
| name | value |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| "streams.sink.errors" | {} |
| "streams.sink.source.id.strategy.config" | {idName -> "sourceId", labelName -> "SourceEvent"} |
| "streams.sink.topic.cypher" | {shouldWriteCypherQuery -> "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"} |
| "streams.sink.topic.cud" | [] |
| "streams.sink.topic.cdc.schema" | [] |
| "streams.sink.topic.cdc.sourceId" | [] |
| "streams.sink.topic.pattern.node" | {} |
| "streams.sink.topic.pattern.relationship" | {} |
| "invalid_topics" | [] |
+----------------------------------------------------------------------------------------------------------------------------------------------+
9 rows
Executing: CALL streams.sink.stop()
+----------------------+
| name | value |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name | value |
+----------------------+
| "status" | "STOPPED" |
+----------------------+
1 row
Executing: CALL streams.sink.start()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.status()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Executing: CALL streams.sink.restart()
+----------------------+
| name | value |
+----------------------+
| "status" | "RUNNING" |
+----------------------+
1 row
Given a cluster env, executing in a NON LEADER: CALL streams.sink.status()
+--------------------------------------------------------------------------------------------------+
| name | value |
+--------------------------------------------------------------------------------------------------+
| "error" | "You can use this procedure only in the LEADER or in a single instance configuration." |
+--------------------------------------------------------------------------------------------------+
1 row