Spark optimizations

Filters

The Neo4j Connector for Apache Spark implements the SupportPushdownFilters interface, that allows you to push the Spark filters down to the Neo4j layer. In this way the data that Spark receives have been already filtered by Neo4j, decreasing the amount of data transferred from Neo4j to Spark.

You can manually disable the PushdownFilters support using the pushdown.filters.enabled option and set it to false (default is true).

If you use the filter function more than once, like in this example:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "letmein!")
  .option("labels", ":Person")
  .load())

df.where("name = 'John Doe'").where("age = 32").show()

The conditions are automatically joined with an AND operator.

When using relationship.node.map = true or query the PushdownFilters support is automatically disabled. In that case, the filters are applied by Spark and not by Neo4j.

Aggregation

The Neo4j Connector for Apache Spark implements the SupportsPushDownAggregates interface, that allows you to push Spark aggregations down to the Neo4j layer. In this way the data that Spark receives have been already aggregate by Neo4j, decreasing the amount of data transferred from Neo4j to Spark.

You can manually disable the PushdownAggregate support using the pushdown.aggregate.enabled option and set it to false (default is true).

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

(spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "letmein!")
  .option("relationship", "BOUGHT")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load
  .createTempView("BOUGHT"))


val df = spark.sql(
  """SELECT `source.fullName`, MAX(`target.price`) AS max, MIN(`target.price`) AS min
    |FROM BOUGHT
    |GROUP BY `source.fullName`""".stripMargin)

df.show()

The MAX and MIN operators are applied directly on Neo4j.

Push-down limit

The Neo4j Connector for Apache Spark implements the SupportsPushDownLimit interface. That allows you to push Spark limits down to the Neo4j layer. In this way the data that Spark receives have been already limited by Neo4j. This decreases the amount of data transferred from Neo4j to Spark.

You can manually disable the PushdownLimit support using the pushdown.limit.enabled option and set it to false (default is true).

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

val df = (spark.read
      .format("org.neo4j.spark.DataSource")
      .option("url", "neo4j://localhost:7687")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "letmein!")
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Person")
      .option("relationship.target.labels", "Product")
      .load
      .select("`target.name`", "`target.id`")
      .limit(10))


df.show()

The limit value will be pushed down to Neo4j.

Push-down top N

The Neo4j Connector for Apache Spark implements the SupportsPushDownTopN interface. That allows you to push top N aggregations down to the Neo4j layer. In this way the data that Spark receives have been already aggregated and limited by Neo4j. This decreases the amount of data transferred from Neo4j to Spark.

You can manually disable the PushDownTopN support using the pushdown.topN.enabled option and set it to false (default is true).

// Given a DB populated with the following query
"""
  CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
  WITH pe
  UNWIND range(1, 10) as id
  CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
  CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
  RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()

val df = (spark.read
      .format("org.neo4j.spark.DataSource")
      .option("url", "neo4j://localhost:7687")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "letmein!")
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Person")
      .option("relationship.target.labels", "Product")
      .load
      .select("`target.name`", "`target.id`")
      .sort(col("`target.name`").desc)
      .limit(10))


df.show()

The limit value will be pushed down to Neo4j.

Partitioning

While we’re trying to pull off the data we offer the possibility to partition the extraction in order to parallelize it.

Please consider the following job:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = (spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "neo4j://localhost:7687")
        .option("authentication.basic.username", "neo4j")
        .option("authentication.basic.password", "letmein!")
        .option("labels", "Person")
        .option("partitions", "5")
        .load())

This means that if the total count of the nodes with label Person into Neo4j is 100 we are creating 5 partitions and each one manages 20 records (we use SKIP / LIMIT queries).

Partitioning the dataset makes sense only if you’re dealing with a big dataset (>= 10M of records).

How to parallelize query execution

Three options are available:

  1. Node extraction.

  2. Relationship extraction.

  3. Query extraction.

A general count on what you’re trying to pull off is being provided and a query with SKIP / LIMIT approach over each partition is being built.

Therefore, for a dataset of 100 nodes (Person) with a partition size of 5 the following queries are generated (one for partition):

MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20

While for node and relationship extraction, you leverage the Neo4j count store in order to retrieve the total count about the nodes/relationships you’re trying pulling off, for the (3) you have two possible approaches:

  • Compute a count over the query that you’re using.

  • Compute a global count over a second optimized query that leverages indexes. In this case, you can pass it via the .option("query.count", "<your cypher query>").

The option can be set in two ways:

  • as an integer: .option("query.count", 100)

  • as a query, that must always return only one field named count which is the result of the count:

MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count