Quickstart

Before you start

  1. Install Neo4j or get a Neo4j Aura instance. Note down the connection URI and the access credentials.

    In an Enterprise environment, consider creating a separate username/password for Spark access instead of the default neo4j account.

  2. Make sure you have Java (version 8 or above) installed.

  3. Download Spark as documented on the Spark website.

  4. If you are developing a non-Python self-contained application, make sure you have a build tool installed.

Connection URI

Local instance

Use the neo4j:// protocol, for example neo4j://localhost:7687.

Neo4j Aura

Use the neo4j+s:// protocol. An Aura connection URI has the form neo4j+s://xxxxxxxx.databases.neo4j.io.

Neo4j Cluster

Use the neo4j+s:// protocol to route transactions appropriately (write transactions to the leader, read transactions to followers and read replicas).

Usage with the Spark shell

You can copy-paste and run the following examples directly in the interactive Spark shell (spark-shell for Scala, pyspark for Python).

import org.apache.spark.sql.{SaveMode, SparkSession}

// Replace with the actual connection URI and credentials
val url = "neo4j://localhost:7687"
val username = "neo4j"
val password = "password"
val dbname = "neo4j"

val spark = SparkSession.builder
    .config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()

// Create example DataFrame
val df = List(
    ("John", "Doe", 42),
    ("Jane", "Doe", 40)
).toDF("name", "surname", "age")

// Write to Neo4j
df.write
    .format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("labels", "Person")
    .option("node.keys", "name,surname")
    .save()

// Read from Neo4j
spark.read
    .format("org.neo4j.spark.DataSource")
    .option("labels", "Person")
    .load()
    .show()

Wrap chained methods in parentheses to avoid syntax errors.

Some common API constants are specified as strings in the PySpark API. For example, the save mode in the Python API is set with df.mode("Append").

from pyspark.sql import SparkSession

# Replace with the actual connection URI and credentials
url = "neo4j://localhost:7687"
username = "neo4j"
password = "password"
dbname = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

# Create example DataFrame
df = spark.createDataFrame(
    [
        {"name": "John", "surname": "Doe", "age": 42},
        {"name": "Jane", "surname": "Doe", "age": 40},
    ]
)

# Write to Neo4j
(
    df.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", "Person")
    .option("node.keys", "name,surname")
    .save()
)

# Read from Neo4j
(
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", "Person")
    .load()
    .show()
)

Self-contained applications

Non-Python applications require some additional setup.

  1. Create a scala-example directory.

  2. Copy the build.sbt from the Installation section and the example.jsonl below into the new directory.

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
  3. Create a src/main/scala directory and copy the SparkApp.scala file below.

    SparkApp.scala
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkApp {
        def main(args: Array[String]): Unit = {
            // Replace with the actual connection URI and credentials
            val url = "neo4j://localhost:7687"
            val username = "neo4j"
            val password = "password"
            val dbname = "neo4j"
    
            val spark = SparkSession.builder
                .config("neo4j.url", url)
                .config("neo4j.authentication.basic.username", username)
                .config("neo4j.authentication.basic.password", password)
                .config("neo4j.database", dbname)
                .appName("Spark App")
                .getOrCreate()
    
            val data = spark.read.json("example.jsonl")
    
            // Write to Neo4j
            data.write
                .format("org.neo4j.spark.DataSource")
                .mode(SaveMode.Overwrite)
                .option("labels", "Person")
                .option("node.keys", "name,surname")
                .save()
    
            // Read from Neo4j
            val ds = spark.read
                .format("org.neo4j.spark.DataSource")
                .option("labels", "Person")
                .load()
    
            ds.show()
        }
    }
  4. Run sbt package.

  5. Run spark-submit:

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      --class SparkApp \
      target/scala-2.12/spark-app_2.12-1.0.jar
  1. Create a python-example directory.

  2. Copy the example.jsonl and spark_app.py files below into the new directory.

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
    spark_app.py
    from pyspark.sql import SparkSession
    
    # Replace with the actual connection URI and credentials
    url = "neo4j://localhost:7687"
    username = "neo4j"
    password = "password"
    dbname = "neo4j"
    
    spark = (
        SparkSession.builder.config("neo4j.url", url)
        .config("neo4j.authentication.basic.username", username)
        .config("neo4j.authentication.basic.password", password)
        .config("neo4j.database", dbname)
        .getOrCreate()
    )
    
    data = spark.read.json("example.jsonl")
    
    (
        data.write.format("org.neo4j.spark.DataSource")
        .mode("Overwrite")
        .option("labels", "Person")
        .option("node.keys", "name,surname")
        .save()
    )
    
    ds = (
        spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()
    )
    
    ds.show()
  3. Run spark-submit:

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      spark_app.py
  1. Create a java-example directory.

  2. Copy the pom.xml from the Installation section and the example.jsonl below into the new directory.

    example.jsonl
    {"name": "John", "surname": "Doe", "age": 42}
    {"name": "Jane", "surname": "Doe", "age": 40}
  3. Create a src/main/java directory and copy the SparkApp.java file below.

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    public class SparkApp {
        public static void main(String[] args) {
            // Replace with the actual connection URI and credentials
            String url = "neo4j://localhost:7687";
            String username = "neo4j";
            String password = "password";
            String dbname = "neo4j";
    
            SparkSession spark = SparkSession
                .builder()
                .appName("Spark App")
                .config("neo4j.url", url)
                .config("neo4j.authentication.basic.username", username)
                .config("neo4j.authentication.basic.password", password)
                .config("neo4j.database", dbname)
                .getOrCreate();
    
            Dataset<Row> data = spark.read().json("example.jsonl");
    
            data.write().format("org.neo4j.spark.DataSource")
                .mode(SaveMode.Overwrite)
                .option("labels", "Person")
                .option("node.keys", "name,surname")
                .save();
    
            Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource")
                .option("labels", "Person")
                .load();
    
            ds.show();
        }
    }
  4. Run mvn package.

  5. Run spark-submit:

    $SPARK_HOME/bin/spark-submit \
      --packages org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3 \
      --class SparkApp \
      target/spark-app-1.0.jar

Jupyter notebooks

The code repository includes two Jupyter notebooks that show how to use the connector in a data-driven workflow:

  • neo4j_data_engineering.ipynb shows how to create Spark jobs to read data from and write data to Neo4j.

  • neo4j_data_science.ipynb shows how to combine Pandas (in PySpark) with the Neo4j Graph Data Science library to highlight frauds in a banking scenario.