Configure with Kafka over SSL
The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j. The most recent version of the Kafka Connect Neo4j Connector can be found here. |
This section provides guidance to configure SSL security between Kafka and Neo4j. This will provide data encryption between Kafka and Neo4j.
This does not address ACL confguration inside of KAFKA.
Please see also the following Confluent documentations for further details on how to configure encryption and authentication with SSL:
Self Signed Certificates
This section came from https://medium.com/talking-tech-all-around/how-to-enable-and-verify-client-authentication-in-kafka-21e936e670e8.
Make sure that you have truststore and keystore JKSs for each server. In case you want a self signed certificate, you can use the following commands:
mkdir security
cd security
export PASSWORD=password
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed
Once the keystores are created, you have to move the kafka.client1.keystore.jks
and kafka.client1.truststore.jks
to your neo4j server.
This article discusses addressing this error (Caused by: java.security.cert.CertificateException: No subject alternative names present) that may appear when querying the topic. https://geekflare.com/san-ssl-certificate/ |
Kafka Configuration
Connect to your Kafka server and modify the config/server.properties
file.
This configuration worked in general but other configurations without the EXTERNAL and INTERNAL settings should works as well.
This configuration is for Kafka on AWS but should work for other configurations.
listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:19092,CLIENT://0.0.0.0:9093,SSL://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,SSL:SSL
advertised.listeners=EXTERNAL://aws_public_ip:9092,INTERNAL://aws_internal_ip:19092,CLIENT://aws_public_ip:9093,SSL://aws_public_ip:9094
inter.broker.listener.name=INTERNAL
ssl.keystore.location=/home/kafka/security/kafka.server.keystore.jks
ssl.keystore.password=neo4jpassword
ssl.truststore.location=/home/kafka/security/kafka.server.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.key.password=neo4jpassword
ssl.enabled.protocols=TLSv1.2,TLSv1.1
ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
Neo4j Configuration
The following is required for a Neo4j configuration. In this case, we are connecting to the public AWS IP address. The keystore and truststore locations point to the files that you created earlier in the steps.
Note that the passwords are stored in plaintext so limit access to this neo4j.conf
file.
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
streams.sink.enabled=false
streams.source.topic.nodes.neoTest=Person{*}
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=neo4j-dlq
kafka.acks=all
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.security.protocol=SSL
kafka.ssl.truststore.location=/home/ubuntu/security/kafka.client1.truststore.jks
kafka.ssl.truststore.password=neo4jpassword
kafka.ssl.keystore.location=/home/ubuntu/security/kafka.client1.keystore.jks
kafka.ssl.keystore.password=neo4jpassword
kafka.ssl.key.password=neo4jpassword
kafka.ssl.endpoint.identification.algorithm=HTTPS
dbms.security.procedures.whitelist=apoc.*
dbms.security.procedures.unrestricted=apoc.*
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake
This line dbms.jvm.additional=-Djavax.net.debug=ssl:handshake
is optional but does help for debugging SSL issues.
When configuring a secure connection between Neo4j and Kafka, and using SASL protocol in particular, pay attention to use the following properties:
and not the following, which has to be used on server side and not client side:
|
Testing
After starting Kafka and Neo4j, you can test by creating a Person node in Neo4j and then query the topic as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic neoTest --from-beginning
If you want to test using SSL, you would do the following:
-
Create a
client-ssl.properties
file consisting of:
security.protocol=SSL
ssl.truststore.location=/home/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.endpoint.identification.algorithm=
Authentication with SASL
You can configure JAAS by providing a JAAS configuration file. To do this, connect to your Kafka server and modify the
config/server.properties
file. This configuration worked in general, but other configurations without the EXTERNAL
and INTERNAL settings should works as well.
This configuration, for example, is for Kafka on AWS but should work for other configurations.
listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9093,CLIENT://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:SASL_PLAINTEXT
advertised.listeners=EXTERNAL://18.188.84.xxx:9092,INTERNAL://172.31.43.xxx:9093,CLIENT://18.188.84.xxx:9094
zookeeper.connect=18.188.84.xxx:2181
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
inter.broker.listener.name=INTERNAL
On the Neo4j side the following is required. Please consider that in this case, we are connecting to the public AWS IP address.
-
Copy the contents of
~/kafka/conf/kafka_jaas.conf
on your Kafka server and save it to a file on your Neo4j server (i.e ~/conf/kafka_client_jaas.conf) -
In neo4j.conf, add the following:
dbms.jvm.additional=-Djava.security.auth.login.config=/Users/davidfauth/neo4j-enterprise-4.0.4_kafka/conf/kafka_client_jaas.conf kafka.security.protocol=SASL_PLAINTEXT kafka.sasl.mechanism=PLAIN
For more information, please consult the official Confluent documentation at the following links:
Authorization with ACL’s
To configure use with ACLs, the following configuration properties are required:
kafka.authorizer.class.name=kafka.security.authorizer.AclAuthorizer
kafka.zookeeper.set.acl=true
-
kafka.security.authorizer.AclAuthorizer
(the default Kafka authorizer implementation), was introduced in Apache Kafka 2.4/Confluent Platform 5.4.0. If you are running a previous version, then use SimpleAclAuthorizer (kafka.security.auth.SimpleAclAuthorizer
). If you are using the Confluent platform, you can use also the LDAP authorizer (please refer to the official Confluent documentation for further details: https://docs.confluent.io/platform/current/security/ldap-authorizer/quickstart.html) -
Please consider that
zookeeper.set.acl
is false by default
From the official Kafka documentation you can find that if a resource has no associated ACLs, then no one is allowed to access that resource except super users. If this is the case in your Kafka cluster, then you have also to add the following:
kafka.allow.everyone.if.no.acl.found=true
Be very careful on using the above property because, as the property name implies, it will allow access to everyone if no acl were found |
If super users are specified, then include also:
kafka.super.users=...
Moreover, if you change the default user name (principal) mapping rule then you have to add also the following properties:
-
If you used SSL encryption, then:
kafka.ssl.principal.mapping.rules=...
-
If you used SASL encryption (probably so, if you have Kerberos environment), then:
kafka.sasl.kerberos.principal.to.local.rules=...
Furthermore, if you want to ensure that also the brokers communicates with each other using Kerberos, you have to specify the following property, which is anyway not required for the ACLs purposes:
kafka.security.inter.broker.protocol=SASL_SSL
The last property is PLAIN by default
|
To make the plugin work properly, the following operations must be authorized for Topic and Cluster resource types:
-
Write, when you want to use the plugin as a Source
-
Read, when you want to use the plugin as a Sink
-
DescribeConfigs and Describe, because the plugin uses the following 2 Kafka AdminClient API:
-
listTopics
-
describeCluster
-
To use streams procedures, the same operations must be authorized (read or write) depending on which of the procedures you wish to use. The permissions required by the procedures and the source/sink operations are the same.
For further details on how to setup and define ACLs on Kafka, please refer to official Confluent Kafka documentation:
This section applies only to the Neo4j Streams plugin. |