apoc.periodic.iterate

As of Neo4j 5.21, transactions can be processed in parallel using the Cypher command CALL {…​} IN CONCURRENT TRANSACTIONS. For more information, see CALL subqueries in transactions → Concurrent transactions.
Details

Syntax

apoc.periodic.iterate(cypherIterate, cypherAction, config) :: (batches, total, timeTaken, committedOperations, failedOperations, failedBatches, retries, errorMessages, batch, operations, wasTerminated, failedParams, updateStatistics)

Description

Runs the second statement for each item returned by the first statement. This procedure returns the number of batches and the total number of processed rows.

Input arguments

Name

Type

Description

cypherIterate

STRING

The first Cypher statement to be run.

cypherAction

STRING

The Cypher statement to run for each item returned by the initial Cypher statement.

config

MAP

{ batchSize = 10000 :: INTEGER, parallel = false :: BOOLEAN, retries = 0 :: INTEGER, batchMode = "BATCH" :: STRING, params = {} :: MAP, concurrency :: INTEGER, failedParams = -1 :: INTEGER, planner = "DEFAULT" :: ["DEFAULT", "COST", "IDP", "DP"] }

Return arguments

Name

Type

Description

batches

INTEGER

The total number of batches.

total

INTEGER

The number of processed input rows.

timeTaken

INTEGER

The duration taken in seconds.

committedOperations

INTEGER

The number of successful inner queries (actions).

failedOperations

INTEGER

The number of failed inner queries (actions).

failedBatches

INTEGER

The number of failed batches.

retries

INTEGER

The number of retries.

errorMessages

MAP

A map of batch error messages paired with their corresponding error counts.

batch

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

operations

MAP

{ total :: INTEGER, failed :: INTEGER, committed :: INTEGER, errors :: MAP }

wasTerminated

BOOLEAN

If the transaction was terminated before completion.

failedParams

MAP

Parameters of failed batches. The key is the batch number as a STRING and the value is a list of batch parameters.

updateStatistics

MAP

{ nodesCreated :: INTEGER, nodesDeleted :: INTEGER, relationshipsCreated :: INTEGER, relationshipsDeleted :: INTEGER, propertiesSet :: INTEGER, labelsAdded :: INTEGER, labelsRemoved :: INTEGER }

Config parameters

The procedure support the following config parameters:

Config parameters
Name Type Default Description

batchSize

INTEGER

10000

run the specified number of operation statements in a single tx - params: {_count, _batch}

parallel

BOOLEAN

false

run operation statements in parallel (note that statements might deadlock if conflicting).
Please note that, in case of parallel: false, APOC is designed to reuse the same java.util.concurrent.ThreadPoolExecutor with a maximum pool size equal 1, in order to prevent parallelism; this means that if you want to execute multiple apoc.periodic.iterate each one will be executed when the previous one has been completed. Instead, with parallel: true, APOC will use a ThreadPoolExecutor with a configurable maximum pool size via the apoc.jobs.pool.num_threads config or as default with the number of available processor * 2. Therefore, if we execute multiple apoc.periodic.iterate each one will be executed in parallel if the queue pool size can accept new tasks. Furthermore, to be noted that running in parallel affects all databases, and not the single database you are using. So with e.g. 2 databases db1 and db2, the apoc.periodic.iterate on db1 will impact on performance if we execute an apoc.periodic.iterate on db2.

retries

INTEGER

0

if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param {_retry}

batchMode

STRING

"BATCH"

how data-driven statements should be processed by operation statement. Valid values are:

  • "BATCH" - execute operation statement once per batchSize. Operation statement is prefixed with the following, which extracts each field returned in the data-driven statement from the $_batch parameter:

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - execute operation statement one at a time

  • "BATCH_SINGLE" - execute operation statement once per batchSize, but leaves unpacking of batch to the operation statement. The operation query can access the batched values via the $_batch parameter.

params

MAP

{}

externally pass in map of params

concurrency

INTEGER

Number of processors available

number of concurrent tasks are generated when using parallel:true

failedParams

INTEGER

-1

if set to a non-negative value, each failed batch up to failedParams parameter sets are returned in yield failedParams.

planner

Enum[DEFAULT, COST, IDP, DP]

DEFAULT

Any planner other than DEFAULT will be prepended to the second statement as cypher planner=[VALUE_OF_CONFIG] (or insert planner=[VALUE_OF_CONFIG] with any existing query options). This planner value (except for DEFAULT) has higher precedence than the planner defined in the query (if any).

In APOC versions 4.0.0.11 and earlier, the iterateList config key was used to control the batching of values returned by the data-driven statement. This was replaced by batchMode in version 4.0.0.12. These config keys are treated as follows:

  • If batchMode is provided, its value takes precedence over iterateList

  • If batchMode is not provided and iterateList is provided, the value of iterateList will be translated as described in the table below.

  • If neither batchMode nor iterateList are provided, batchMode defaults to BATCH, which is the same as iterateList:true

Deprecated Config
param default description

iterateList

true

execute operation statements once per batchSize (whole batchSize list is passed in as parameter {_batch})

  • A value of true is equivalent to batchMode: "BATCH"

  • A value of false is equivalent to batchMode: "SINGLE"

Usage Examples

Let’s go through some examples.

If you were to add an :Actor label to several million :Person nodes, you could run the following code:

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

Let’s break down the parameters passed to the procedure:

  • Our first Cypher statement selects all the Person nodes with an ACTED_IN relationship to another node and returns those persons. This is the data-driven portion where we select the data that we want to change.

  • Our second Cypher statement sets the :Actor label on each of the Person nodes selected. This is the operation portion where we apply the change to the data from our first statement.

  • And finally, we specify any configuration we want the procedure to use. We have defined a batchSize of 10,000 and to run the statements in parallel.

Executing this procedure would take all of our Person nodes gathered in the first Cypher statement and update each of them with the second Cypher statement. It divides the work into batches - taking 10,000 Person nodes from the stream and updating them in a single transaction. If we have 30,000 Person nodes in our graph with an ACTED_IN relationship, then it would break this down into 3 batches.

Finally, it runs those in parallel, as updating node labels or properties do not conflict.

For more complex operations like updating or removing relationships, either do not use parallel: true OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects. If you attempt complex operations, also enable retrying failed operations, e.g. with retries:3.

Now let us look at a more complex example.

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
  "MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

Let’s break down the parameters passed to the procedure:

  • Our first Cypher statement selects all the Order nodes that have an order date greater than October 13, 2016 (first Cypher statement).

  • Our second Cypher statement takes those groups and finds the nodes that have a HAS_ITEM relationship to other nodes, then sums up the value of those items and sets that sum as a property (o.value) for the total order value.

  • Our configuration will batch those nodes into groups of 100 (batchSize:100) and run the batches in parallel for the second statement to process.

Batch mode: BATCH_SINGLE

If our operation statement calls a procedure that takes in a batch of values, we can use batchMode: "BATCH_SINGLE" to get access to a batch of values to pass to that procedure. When we use BATCH_SINGLE, the operation statement will have access to the $_batch parameter, which will contain a list of the fields returned in the data-driven statement.

For example, if the data driven statement is:

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

The contents of the $_batch variable passed to the operation statement would be:

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

Let’s see an example of this in action. We’ll start by creating some nodes:

The following query creates 100,000 nodes with the label Person and property id
UNWIND range(1,100000) as id create (:Person {id: id})

We can delete these nodes using the apoc.nodes.delete procedure. See Deleting data.

This procedure takes in a list of nodes, which we can extract from the $_batch parameter.

The following query streams all the Person nodes and deletes them in batches of 100. Note that using a node instead of a node id for the first parameter, such as MATCH (p:Person) RETURN p, will result in the parent transaction tracking all deleted nodes, which leads to overall higher memory usage. If you are using Neo4j 5.6 or later consider using the elementId function to pass node information between transactions.

CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN id(p) as personId",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

The contents of the $_batch parameter that is used in the operation statement would be as follows:

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

We can use a list comprehension to extract the p variable from each item in the list.

If we run this query, we’ll see the following output:

Results
batch operations

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}