Run with Docker

The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j.

The most recent version of the Kafka Connect Neo4j Connector can be found here.

Neo4j Streams plugin

Introduction

When Neo4j is run in a Docker, some special considerations apply; please see Neo4j Docker Configuration for more information. In particular, the configuration format used in neo4j.conf looks different.

Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_ and using the following transformations:

  • single underscore is converted in double underscore: _ → __

  • point is converted in single underscore: ._

Example:

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G

  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

For more information and examples see this section and the Confluent With Docker section of the documentation.

Another important thing to watch out for is about possible permissions issue. If you want to running Kafka in Docker using a host volume for which the user is not the owner then you will have a permission error. There are two possible solutions:

  • use a root-user

  • change permissions of the volume in order to make it accessible by the non-root user

The Neo4j docker container is built on an approach that uses environment variables passed to the container as a way to configure Neo4j. There are certain characters which environment variables cannot contain, notably the dash - character. Configuring the plugin to use stream names that contain these characters will not work properly, because a configuration environment variable such as NEO4J_streams_sink_topic_cypher_my-topic cannot be correctly evaluated as an environment variable (my-topic). This is a limitation of the Neo4j docker container rather than neo4j-streams.

Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_ and using the following transformations:

  • single underscore is converted in double underscore: _ → __

  • point is converted in single underscore: ._

Example:

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G

  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

Following you’ll find a lightweight Docker Compose file that allows you to test the application in your local environment

Prerequisites:

  • Docker

  • Docker Compose

Here the instruction about how to configure Docker and Docker-Compose

From the same directory where the compose file is, you can launch this command:

docker-compose up -d

Source module

Following a compose file that allows you to spin-up Neo4j, Kafka and Zookeeper in order to test the application.

docker-compose.yml
version: '3'
services:
  neo4j:
    image: neo4j:4.4
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    depends_on:
      - kafka
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_logs_debug_level: DEBUG
      # KAFKA related configuration
      NEO4J_kafka_bootstrap_servers: kafka:19092
      NEO4J_streams_source_topic_nodes_neo4j: Person{*}
      NEO4J_streams_source_topic_relationships_neo4j: KNOWS{*}

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092

Launch it locally

Prerequisites

Before starting please change the volume directory according to yours, inside the <plugins> dir you must put Streams jar

volumes:
    - ./neo4j/plugins:/plugins

You can execute a Kafka Consumer that subscribes the topic neo4j by executing this command:

docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning

Then directly from the Neo4j browser you can generate some random data with this query:

UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)

And if you go back to your consumer you’ll see something like this:

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":98,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"84","before":null,"after":{"properties":{"name":"Name 85","id":85,"age":1},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}
Please note that in this example no topic name was specified before the execution of the Kafka Consumer, which is listening on neo4j topic. This is because Neo4j Streams plugin, if not specified, will produce messages into a topic named neo4j by default.

Sink module

Following you’ll find a simple docker compose file that allow you to spin-up two Neo4j instances one configured as Source and one as Sink, allowing you to share any data from the Source to the Sink:

  • The Source is listening at http://localhost:8474/browser/ (bolt: bolt://localhost:8687)

  • The Sink is listening at http://localhost:7474/browser/ (bolt: bolt://localhost:7687) and is configured with the Schema strategy.

    environment:
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

Launch it locally

In the following example we will use the Neo4j Streams plugin in combination with the APOC procedures (download from here) in order to download some data from Stackoverflow, store them into the Neo4j Source instance and replicate these dataset into the Sink via the Neo4j Streams plugin.

version: '3'

services:
  neo4j-source:
    image: neo4j:4.4
    hostname: neo4j-source
    container_name: neo4j-source
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8474:7474"
      - "8687:7687"
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/source
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_batch_size: 16384
      NEO4J_streams_sink_enabled: "false"
      NEO4J_streams_source_schema_polling_interval: 10000

  neo4j-sink:
    image: neo4j:4.4
    hostname: neo4j-sink
    container_name: neo4j-sink
    depends_on:
      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - ./neo4j/plugins-sink:/plugins
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_kafka_max_poll_records: 16384
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_topic_cdc_schema: "neo4j"
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
Prerequisites
  • Install the APOC into ./neo4j/plugins.

  • Install the Neo4j Streams plugin into ./neo4j/plugins and ./neo4j/plugins-sink

Import the data

Let’s go to two instances in order to create the constraints on both sides:

// enable the multi-statement execution: https://stackoverflow.com/questions/21778435/multiple-unrelated-queries-in-neo4j-cypher?answertab=votes#tab-top
CREATE CONSTRAINT ON (u:User) ASSERT u.id IS UNIQUE;
CREATE CONSTRAINT ON (a:Answer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT ON (t:Tag) ASSERT t.name IS UNIQUE;
CREATE CONSTRAINT ON (q:Question) ASSERT q.id IS UNIQUE;

please take a look at the property inside the compose file:

NEO4J_streams_source_schema_polling_interval: 10000

this means that every 10 seconds the Streams plugin polls the DB in order to retrieve schema changes and store them. So after you created the indexes you need almost to wait 10 seconds before the next step.

Now lets go to the Source and, in order to import the Stackoverflow dataset, execute the following query:

UNWIND range(1, 1) as page
CALL apoc.load.json("https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&page=" + page) YIELD value
UNWIND value.items AS event
MERGE (question:Question {id:event.question_id}) ON CREATE
  SET question.title = event.title, question.share_link = event.share_link, question.favorite_count = event.favorite_count

FOREACH (ignoreMe in CASE WHEN exists(event.accepted_answer_id) THEN [1] ELSE [] END | MERGE (question)<-[:ANSWERS]-(answer:Answer{id: event.accepted_answer_id}))

WITH * WHERE NOT event.owner.user_id IS NULL
MERGE (owner:User {id:event.owner.user_id}) ON CREATE SET owner.display_name = event.owner.display_name
MERGE (owner)-[:ASKED]->(question)

Once the import process has finished to be sure that the data is correctly replicated into the Sink execute this query both in Source and Sink and compare the results:

MATCH (n)
RETURN
DISTINCT labels(n),
count(*) AS NumofNodes,
avg(size(keys(n))) AS AvgNumOfPropPerNode,
min(size(keys(n))) AS MinNumPropPerNode,
max(size(keys(n))) AS MaxNumPropPerNode,
avg(size((n)-[]-())) AS AvgNumOfRelationships,
min(size((n)-[]-())) AS MinNumOfRelationships,
max(size((n)-[]-())) AS MaxNumOfRelationships
order by NumofNodes desc

You can also launch a Kafka Consumer that subscribes the topic neo4j by executing this command:

docker exec broker kafka-console-consumer --bootstrap-server broker:9093 --topic neo4j --from-beginning

You’ll see something like this:

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":330,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"94","start":{"id":"186","labels":["User"],"ids":{"id":286795}},"end":{"id":"59","labels":["Question"],"ids":{"id":58303891}},"before":null,"after":{"properties":{}},"label":"ASKED","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":331,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"34","start":{"id":"134","labels":["Answer"],"ids":{"id":58180296}},"end":{"id":"99","labels":["Question"],"ids":{"id":58169215}},"before":null,"after":{"properties":{}},"label":"ANSWERS","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

Neo4j Streams with Neo4j Cluster and Kafka Cluster

Here we provide a docker-compose file to quickstart with an environment composed by a 3-nodes Neo4j Causal Cluster (with Streams plugin configured in Sink mode) and a 3-nodes Kafka Cluster.

version: '3'

networks:
  kafka_cluster:
    driver: bridge

services:

  core1:
    image: neo4j:4.4-enterprise
    hostname: core1
    container_name: core1
    ports:
      - 7474:7474
      - 6477:6477
      - 7687:7687
    volumes:
      - ./neo4j-cluster-40/core1/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7474
      NEO4J_dbms_connector_https_listen__address: :6477
      NEO4J_dbms_connector_bolt_listen__address: :7687
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_apoc_import_file_enabled: "true"
      NEO4J_kafka_auto_offset_reset: "latest"
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  core2:
    image: neo4j:4.0.3-enterprise
    hostname: core2
    container_name: core2
    ports:
      - 7475:7475
      - 6478:6478
      - 7688:7688
    volumes:
      - ./neo4j-cluster-40/core2/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7475
      NEO4J_dbms_connector_https_listen__address: :6478
      NEO4J_dbms_connector_bolt_listen__address: :7688
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_apoc_import_file_enabled: "true"
      NEO4J_kafka_auto_offset_reset: "latest"
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  core3:
    image: neo4j:4.0.3-enterprise
    hostname: core3
    container_name: core3
    ports:
      - 7476:7476
      - 6479:6479
      - 7689:7689
    volumes:
      - ./neo4j-cluster-40/core3/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: CORE
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7476
      NEO4J_dbms_connector_https_listen__address: :6479
      NEO4J_dbms_connector_bolt_listen__address: :7689
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  read1:
    image: neo4j:4.0.3-enterprise
    hostname: read1
    container_name: read1
    ports:
      - 7477:7477
      - 6480:6480
      - 7690:7690
    volumes:
      - ./neo4j-cluster-40/read1/plugins:/plugins
    networks:
      - kafka_cluster
    environment:
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_mode: READ_REPLICA
      NEO4J_causalClustering_expectedCoreClusterSize: 3
      NEO4J_causalClustering_initialDiscoveryMembers: core1:5000,core2:5000,core3:5000
      NEO4J_dbms_connector_http_listen__address: :7477
      NEO4J_dbms_connector_https_listen__address: :6480
      NEO4J_dbms_connector_bolt_listen__address: :7690
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_bootstrap_servers: broker-1:29092,broker-2:39092,broker-3:49092
      NEO4J_kafka_group_id: "neo4j"
      NEO4J_kafka_client_id: "neo4j"
      NEO4J_kafka_enable_auto_commit: "false"
      NEO4J_kafka_key_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_kafka_value_deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_enabled_to_dbtest: "true"
      NEO4J_streams_sink_topic_cypher_mytopic_to_dbtest: "CREATE (n:Person {id: event.id, name: event.name, surname: event.surname}) RETURN n"

  zookeeper-1:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-1
    container_name: zookeeper-1
    ports:
      - 22181:22181
      - 22888:22888
      - 23888:23888
    volumes:
      - ./zookeeper-1/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  zookeeper-2:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-2
    container_name: zookeeper-2
    ports:
      - 32181:32181
      - 32888:32888
      - 33888:33888
    volumes:
      - ./zookeeper-2/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  zookeeper-3:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper-3
    container_name: zookeeper-3
    ports:
      - 42181:42181
      - 42888:42888
      - 43888:43888
    volumes:
      - ./zookeeper-3/data:/data
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 42181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
    networks:
      - kafka_cluster

  broker-1:
    image: confluentinc/cp-kafka
    hostname: broker-1
    container_name: broker-1
    ports:
      - 9092:9092
      - 29092:29092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-1:29092,PLAINTEXT_HOST://localhost:9092
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

  broker-2:
    image: confluentinc/cp-kafka
    hostname: broker-2
    container_name: broker-2
    ports:
      - 9093:9093
      - 39092:39092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:39092,PLAINTEXT_HOST://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-2:39092,PLAINTEXT_HOST://localhost:9093
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

  broker-3:
    image: confluentinc/cp-kafka
    hostname: broker-3
    container_name: broker-3
    ports:
      - 9094:9094
      - 49092:49092
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:49092,PLAINTEXT_HOST://:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker-3:49092,PLAINTEXT_HOST://localhost:9094
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_OFFSET_RESET: "latest"
      KAFKA_MAX_POLL_INTERVAL_MS: 300000
      KAFKA_MAX_POLL_RECORDS: 20000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 52428800
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_MESSAGE_MAX_BYTES: 20220088
    networks:
      - kafka_cluster

What you need to do is just:

  • Download the latest Neo4j Streams plugin version from here: https://github.com/neo4j-contrib/neo4j-streams/releases/tag/4.1.5

  • Be sure to create the volume folders (into the same folder where the docker-compose file is) /neo4j-cluster-40/core1/plugins, /neo4j-cluster-40/core2/plugins, /neo4j-cluster-40/core3/plugins, /neo4j-cluster-40/read1/plugins and be sure to put the neo4j-streams-4.1.5.jar into those folders.

  • Run docker-compose up -d

  • Connect to Neo4j core1 instance from the web browser: localhost:7474

    • Login using the credentials provided in the docker-compose file

    • Create a new database (the one where Neo4j Streams Sink is listening), running the following 2 commands from the Neo4j Browser

      • :use system

      • CREATE DATABASE dbtest

  • Once all the containers are up and running, open a terminal window and connect to Kafka broker-1, in order to send a JSON event using a kafka-console-producer. Follow the steps below:

    • docker exec -it broker-1 /bin/bash

    • kafka-console-producer --broker-list broker-1:29092 --topic mytopic

    • paste the following JSON event into kafka-console-producer:

      {"id": 1, "name": "Mauro", "surname": "Roiter"}.

Here an output example of the last steps:

$ docker exec -it broker-1 /bin/bash
root@broker-1:/# kafka-console-producer --broker-list broker-1:29092 --topic mytopic
>{"id": 1, "name": "Mauro", "surname": "Roiter"}

Now if you come back to Neo4j browser, you will see the created node into the respective database dbtest.

docker streams cluster example
Figure 1. Streams Sink plugin into Neo4j+Kafka cluster environment

You will se the same results in the other Neo4j instances too.

In this example we’ve used the Neo4j Enterprise docker image because the "CREATE DATABASE" feature is available only into Enterprise Edition