Quickstart
Before you start
-
Install Neo4j or get a Neo4j Aura instance. Note down the connection URI and the access credentials.
In an Enterprise environment, consider creating a separate username/password for Spark access instead of the default
neo4j
account. -
Make sure you have Java (version 8 or above) installed.
-
Download Spark as documented on the Spark website.
-
If you are developing a non-Python self-contained application, make sure you have a build tool installed.
Connection URI
- Local instance
-
Use the
neo4j://
protocol, for exampleneo4j://localhost:7687
. - Neo4j Aura
-
Use the
neo4j+s://
protocol. An Aura connection URI has the formneo4j+s://xxxxxxxx.databases.neo4j.io
. - Neo4j Cluster
-
Use the
neo4j+s://
protocol to route transactions appropriately (write transactions to the leader, read transactions to followers and read replicas).
Usage with the Spark shell
You can copy-paste and run the following examples directly in the interactive Spark shell (spark-shell
for Scala, pyspark
for Python).
import org.apache.spark.sql.{SaveMode, SparkSession}
// Replace with the actual connection URI and credentials
val url = "neo4j://localhost:7687"
val username = "neo4j"
val password = "password"
val dbname = "neo4j"
val spark = SparkSession.builder
.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
// Create example DataFrame
val df = List(
("John", "Doe", 42),
("Jane", "Doe", 40)
).toDF("name", "surname", "age")
// Write to Neo4j
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
// Read from Neo4j
spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
Wrap chained methods in parentheses to avoid syntax errors. |
Some common API constants are specified as strings in the PySpark API.
For example, the save mode in the Python API is set with |
from pyspark.sql import SparkSession
# Replace with the actual connection URI and credentials
url = "neo4j://localhost:7687"
username = "neo4j"
password = "password"
dbname = "neo4j"
spark = (
SparkSession.builder.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", username)
.config("neo4j.authentication.basic.password", password)
.config("neo4j.database", dbname)
.getOrCreate()
)
# Create example DataFrame
df = spark.createDataFrame(
[
{"name": "John", "surname": "Doe", "age": 42},
{"name": "Jane", "surname": "Doe", "age": 40},
]
)
# Write to Neo4j
(
df.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", "Person")
.option("node.keys", "name,surname")
.save()
)
# Read from Neo4j
(
spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
.show()
)
Self-contained applications
Non-Python applications require some additional setup.
-
Create a
scala-example
directory. -
Copy the
build.sbt
from the Installation section and theexample.jsonl
below into the new directory.example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
Create a
src/main/scala
directory and copy theSparkApp.scala
file below.SparkApp.scalaimport org.apache.spark.sql.{SaveMode, SparkSession} object SparkApp { def main(args: Array[String]): Unit = { // Replace with the actual connection URI and credentials val url = "neo4j://localhost:7687" val username = "neo4j" val password = "password" val dbname = "neo4j" val spark = SparkSession.builder .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .appName("Spark App") .getOrCreate() val data = spark.read.json("example.jsonl") // Write to Neo4j data.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save() // Read from Neo4j val ds = spark.read .format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ds.show() } }
-
Run
sbt package
. -
Run
spark-submit
:$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ --class SparkApp \ target/scala-2.12/spark-app_2.12-1.0.jar
-
Create a
python-example
directory. -
Copy the
example.jsonl
andspark_app.py
files below into the new directory.example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
spark_app.pyfrom pyspark.sql import SparkSession # Replace with the actual connection URI and credentials url = "neo4j://localhost:7687" username = "neo4j" password = "password" dbname = "neo4j" spark = ( SparkSession.builder.config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate() ) data = spark.read.json("example.jsonl") ( data.write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", "Person") .option("node.keys", "name,surname") .save() ) ds = ( spark.read.format("org.neo4j.spark.DataSource") .option("labels", "Person") .load() ) ds.show()
-
Run
spark-submit
:$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ spark_app.py
-
Create a
java-example
directory. -
Copy the
pom.xml
from the Installation section and theexample.jsonl
below into the new directory.example.jsonl{"name": "John", "surname": "Doe", "age": 42} {"name": "Jane", "surname": "Doe", "age": 40}
-
Create a
src/main/java
directory and copy theSparkApp.java
file below.import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class SparkApp { public static void main(String[] args) { // Replace with the actual connection URI and credentials String url = "neo4j://localhost:7687"; String username = "neo4j"; String password = "password"; String dbname = "neo4j"; SparkSession spark = SparkSession .builder() .appName("Spark App") .config("neo4j.url", url) .config("neo4j.authentication.basic.username", username) .config("neo4j.authentication.basic.password", password) .config("neo4j.database", dbname) .getOrCreate(); Dataset<Row> data = spark.read().json("example.jsonl"); data.write().format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("labels", "Person") .option("node.keys", "name,surname") .save(); Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource") .option("labels", "Person") .load(); ds.show(); } }
-
Run
mvn package
. -
Run
spark-submit
:$SPARK_HOME/bin/spark-submit \ --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \ --class SparkApp \ target/spark-app-1.0.jar
Jupyter notebooks
The code repository includes two Jupyter notebooks that show how to use the connector in a data-driven workflow:
-
neo4j_data_engineering.ipynb
shows how to create Spark jobs to read data from and write data to Neo4j. -
neo4j_data_science.ipynb
shows how to combine Pandas (in PySpark) with the Neo4j Graph Data Science library to highlight frauds in a banking scenario.