The Configuration System
The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j. The most recent version of the Kafka Connect Neo4j Connector can be found here. |
Configuration system overview
Location of Configuration Information
For versions prior to 4.0.7
the configuration management works statically
with the properties provided inside the neo4j.conf
file; since the version 4.0.7
we introduced a new configuration systems based on dynamic reloading that relies
on the streams.conf
file inside $NEO4J_HOME/conf
.
Breaking Changes
We deprecated the neo4j.conf
file based configuration, so you need to define
a new streams.conf
file inside $NEO4J_HOME/conf
and put in there all the
required configuration.
Note about usage with Docker
The official Neo4j Docker image uses a particular naming convention for environment
variables in order to transform them into properties inside the neo4j.conf
file.
In order to be compliant with that behaviour you can still use them without changing anything
in your configuration, under-the-hood from version 4.0.7
system
will save them inside the streams.conf
file instead.
How it behaves
You can interact with the new configuration system in two ways:
-
By changing the
streams.conf
manually -
By applying new configurations via procedures
You must consider that every change applied to the configuration causes the reload of the plugins so you must use this feature very carefully.
File based changes
Every change inside the streams.conf
is gathered and reloads the Streams Sink
and Source
with the new configuration.
Procedure based changes
From version 4.0.7
you’ll find three new procedures:
-
streams.configuration.get
returns the current configuration -
streams.configuration.set({<plugin_config_map>}, {<procedure_config>})
that applies the new configuration and returns it -
streams.configuration.remove({<plugin_config_keys_list>}, {<procedure_config>})
that removes the provided configuration keys and returns the new configuration status
N.B. These procedures work for each database instance this means that if you’re changing a property that affected all databases that are running a Streams module will be notified and restarted.
streams.configuration.get
This procedure returns the current configuration applied to both Sink
and Source
plugin.
Output Parameters:
Variable Name | Description |
---|---|
|
The configuration name |
|
The configuration value |
So given the following procedure call:
CALL streams.configuration.get() YIELD name, value
RETURN name, value
You’ll have this following output (it’s related to your configuration 😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
streams.configuration.set
This procedure applies the map of configuration parameters passed as first argument.
Input Parameters:
Variable Name | Description |
---|---|
|
This map represents the set of configurations applied to the |
|
The configuration map |
Output Parameters:
Variable Name | Description |
---|---|
|
The configuration name |
|
The configuration value |
Following the accepted parameters for the procedure_config
:
Configuration Name | Description |
---|---|
|
(Boolean, default |
So given the following procedure call:
CALL streams.configuration.set({`streams.sink.topic.cypher.test`: 'CREATE (p:Person{name: event.name, surname: event.surname, fullName: event.name + ' ' + event.surname})'}, {save: false}) YIELD name, value
RETURN name, value
You’ll have this following output (it’s related to your configuration 😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
streams.configuration.remove
This procedure removes the provided list of keys from the configuration.
Input Parameters:
Variable Name | Description |
---|---|
|
This list represents the properties set that will be removed from the configuration. |
|
The configuration map |
Output Parameters:
Variable Name | Description |
---|---|
|
The configuration name |
|
The configuration value |
Following the accepted parameters for the procedure_config
:
Configuration Name | Description |
---|---|
|
(Boolean, default |
So given the following procedure call:
CALL streams.configuration.remove([`kafka.acks`], {save: false}) YIELD name, value
RETURN name, value
You’ll have this following output (it’s related to your configuration 😄)
name | value |
---|---|
"streams.sink.topic.cypher.test" |
"CREATE (p:Person{name: event.name, surname: event.surname})" |
"streams.sink.errors.tolerance" |
"all" |
"kafka.default.api.timeout.ms" |
"5000" |
"kafka.bootstrap.servers" |
"broker:9093" |
"streams.sink.errors.log.include.messages" |
"true" |
"streams.sink.enabled" |
"true" |
"streams.sink.errors.log.enable" |
"true" |
What happens when we change a configuration properties from procedure
When we change the configuration properties from streams.configuration.set/remove
,
under-the-hood Sink
and Source
modules are reloaded. So use it carefully
because it has an impact in your Stream flow.
N.b. The Source/Sink module will be restarted only if there are changes in the configuration related to itself; this means that if you have both active and you change properties related to the Sink, only it will be restarted.
What happens into the Source
module
During the reload process the transaction event handler gets unplugged, this
means that all transaction that even happen during reload period are not
caught by the Source
, so they are lost.
What happens into the Sink
module
During the reload process the Sink gets stopped, this should not have any impact in your ingestion process because it will restart from the last committed messages, so there is no data loss.