Using Subqueries to Control the Scope of Aggregations
Aggregations, such as collect() and count(), show up as EagerAggregation
operators (with dark blue headers) in query plans.
These are similar to the Eager operator in that it presents a barrier that all rows must execute to and stop at for the aggregation to process, but it otherwise doesn’t change the streaming behavior of a query (provided there are no actual Eager operators in the plan).
This is less memory-intensive than the Eager operator, but the aggregation results (including the variables used for the grouping keys) must be kept in memory until the aggregation finishes processing all inputs, and only then can streaming resume from the resulting rows.
As such, the rows that build up resulting from the aggregation may stress the heap, if numerous enough, which could result in high GC pauses, or at the worst going out of heap memory.
This article shows hot to use subqueries, introduced in Neo4j 4.1, to provide a way to narrow the scope of an aggregation. This may be easier on your heap leading to a more memory efficient query.
That said, correctness matters first; it is possible that an eager aggregation over all input rows is necessary to calculate correct results. However, when you want the eager to apply only to a certain expansion (or segment of input), then you can control that via subqueries.
Even before 4.1, you can accomplish something similar (though on a smaller scale) with pattern comprehensions, which has the effect of scoping a collect() to the pattern expanded in the comprehension.
Some APOC procedures can also be used as a subquery replacement.
An example of aggregation behavior
Let’s use something simple, the Movies graph from the :play movies
guide in the Neo4j Browser.
Once we’ve created this graph from the guide, let’s look at a simple query, returning each movie node, and its list of actor nodes:
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
RETURN movie, collect(p) as actors
How will this execute?
The first path match will be found (a label scan from one of the two two nodes, then an expand of the pattern, and a filter based on the node label), and then the aggregation processing begins.
The next path match will be found, and the aggregation will process that row, and so on, with the aggregation continuing to take in rows of data of build up the aggregation results accordingly.
Eventually all 172 paths have been processed by the aggregation. The aggregation has built up 38 result rows (one per movie, which is the grouping key), each consisting of the movie node and the list of actors for the movie. Now that all input rows have been processed, the aggregation is complete, and streaming begins from the 38 resulting rows.
This interim state in the heap does NOT include node or relationship properties, excepting any that have been projected out as variables. |
A slightly larger dataset
This was a small data set. But what if we instead had a graph with a far larger data set? It’s estimated that 500,000 movies have been made up to this point in history. 500,000 rows held in heap at once still won’t be a problem, we have to look at bigger numbers to present a challenge, or at least a notable time difference during execution..
Let’s create a movies graph with 1 million movies, 1 million actors, and 10 actors per movie.
We’ll use apoc.periodic.iterate()
from APOC Procedures to create our graph.
First let’s create 1 million nodes for both movies and persons:
CALL apoc.periodic.iterate("
UNWIND range(1,1000000) as id
RETURN id
",
"
CREATE (m:Movie {id:id})
CREATE (p:Person {id:id})
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
We don’t need additional properties right now. The matching and aggregating that we’re doing don’t depend upon the node properties, and we don’t pay a cost for using them until we project them out anyway.
Now let’s make sure we have indexes in place:
CREATE INDEX ON :Person(id);
CREATE INDEX ON :Movie(id);
Now that we have indexes, we can randomly assign 10 persons to each movie as actors:
CALL apoc.periodic.iterate("
MATCH (m:Movie)
RETURN m
",
"
UNWIND range(0,10) as i
WITH m, toInteger(rand() * 1000000) as id
MATCH (p:Person {id:id})
CREATE (m)<-[:ACTED_IN]-(p)
", {}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
For my particular laptop, I’ve configured 4gb of memory for the heap. Actual server deployments would tend to use at least double that, up to 31gb as the recommended maximum.
Let’s see how we do, using a slightly modified version of the original query. I want to factor out the returning of the results (which includes the property access of all of those nodes), so we’ll just end with a count() aggregation, which tends to be much cheaper (after all it’s just incrementing the count per input row, at the end).
MATCH (movie:Movie)<-[:ACTED_IN]-(p:Person)
WITH movie, collect(p) as actors
RETURN count(*)
The grouping key is still the movie, so we know we have to hold up to 1 million rows as the aggregation builds up, along with the lists of actors per movie.
Depending on heap memory (as well as what other queries were executing at the same time), that could stress the heap, leading to high GC pauses as memory is used up and unable to be reclaimed. We could blow through all heap memory completely.
So let’s give this a try. First up, here’s the EXPLAIN plan:
We can see the aggregations for the collect (more expensive) and the count (cheap). Let’s try running it:
Started streaming 1 records after 1 ms and completed after 14907 ms.
It returns a count of 1 million (omitted because that’s not interesting), but more interesting is the execution time, or rather it will be interesting once we have an alternate query to compare it to. For me this took about 15 seconds.
The most interesting part of this query is actually in the debug log:
2020-10-01 04:06:31.703+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=178, gcTime=248, gcCount=1}
2020-10-01 04:06:32.893+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=254, gcTime=269, gcCount=1}
2020-10-01 04:06:34.620+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=277, gcTime=295, gcCount=1}
2020-10-01 04:06:36.506+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=328, gcTime=383, gcCount=1}
2020-10-01 04:06:38.847+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=542, gcTime=628, gcCount=1}
2020-10-01 04:06:40.937+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=346, gcTime=384, gcCount=1}
2020-10-01 04:06:42.994+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=314, gcTime=348, gcCount=1}
2020-10-01 04:06:44.965+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=241, gcTime=271, gcCount=1}
2020-10-01 04:07:04.570+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=242, gcTime=256, gcCount=1}
2020-10-01 04:08:42.469+0000 WARN [o.n.k.i.c.VmPauseMonitorComponent] Detected VM stop-the-world pause: {pauseTime=169, gcTime=198, gcCount=1}
These gcs are not very high individually, but this shows that aggregations like this can and do cause GC pauses. With a more complex query, or a more complex dataset, these pauses might actually become quite significant.
Subqueries narrow the scope of an aggregation
If we use a subquery in the right place, and aggregate within the subquery, we can narrow down the scope of the aggregation, and avoid the need to manifest all of those rows in memory at the same time.
MATCH (movie:Movie)
CALL {
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors
}
RETURN count(*)
This one should be more memory-efficient.
Remember that subqueries are executed per row. And due to the MATCH just before the subquery, we have a row per movie.
The MATCH and the aggregation happens within the subquery, so per collect(), it’s only considering the paths for a single movie at a time. That means each collect() is only being applied over 10 input rows (because of 10 actors per movie), so the results for a single row will be available very quickly.
Note that this is a tradeoff: instead of performing a single collect() aggregation applied to 1 million rows, we are using subqueries to break down the work at the movie level. Because we have 1 million movies, we end up making 1 million subquery calls, each doing its own expansion and collect(), so in total collect() gets called 1 million times, but each only needs to run on a tiny set of data.
We can execute the subquery for each input row, perform the aggregation on this limited scope, output the result, and move on to the next row. The memory we used during execution of that row is all eligible for garbage collection, and doesn’t need to be kept in the heap as the subsequent rows are processed.
First let’s check the plan for this query:
Note that we still see the eager aggregation for the collect(), but it’s feeding into an Apply operation, this shows that the scope of the aggregation is only for the item that it is being applied to, which will be each movie node.
Let’s try running it. I’ll omit the actual query result, since we know that will still be 1 million, but let’s check the timing:
Started streaming 1 records after 1 ms and completed after 5542 ms.
Repeated runs vary a bit, but we usually end up between 4 and 6 seconds. That’s a nice improvement over the 15 seconds from the original query.
What about GC pauses in the debug log? Your milage may vary, but even after repeating the query execution several times, I didn’t see any GCs being logged.
This shows that aggregations over large number of rows at once can be memory intensive, and you can often avoid this and the resulting GC pauses via clever application of subqueries to narrow the scope of your aggregations (provided that doing so is correct for your use case).
Pattern comprehension is similar to a collect() called within a subquery
Pattern comprehensions can be used for similar effect, and have been available since Neo4j 3.1.
MATCH (movie:Movie)
WITH movie, [(movie)<-[:ACTED_IN]-(p:Person) | p] as actors
RETURN count(*)
Pattern comprehensions are most like OPTIONAL MATCH followed by collect(), but similar to subqueries, they are executed per row. Even the EXPLAIN plan is similar:
Note that the line of execution with the collect() eager aggregation feeds into a ConditionalApply this time, which is a variant of Apply, meaning the right hand side is executing in a nested loop, which is also the scope for those operations.
How does it perform?
Started streaming 1 records after 1 ms and completed after 4539 ms.
Repeated runs fall between 4 and 6 seconds, so about the same as the version with the subquery. Likewise, we see no GCs in the debug log.
So as far as efficiency goes, both in timing and memory, pattern comprehensions are about the same as using subqueries.
While this is more concise than using subqueries, and can often be more versatile (you can use several pattern comprehensions within a single WITH clause), these are only used for collecting results. Though you could get the size() of the resulting list as an equivalent of a count(), you can’t use this for any other kind of aggregation.
Also, pattern comprehensions do not yet allow sorting, skipping, or limiting of the list results, all of which can be freely used if using subqueries instead.
APOC Procedures can substitute for subqueries
If you’re not on Neo4j 4.1.x or higher, there are some procedures in APOC that act as subqueries for the same effect.
MATCH (movie:Movie)
CALL apoc.cypher.run("
WITH movie
MATCH (movie)<-[:ACTED_IN]-(p:Person)
RETURN collect(p) as actors", {movie:movie}) YIELD value
WITH movie, value.actors as actors
RETURN count(*)
Procedures, like subqueries, execute per row, so the collect() aggregation is similarly scoped only to the rows matched within that particular call.
Since there are 1 million movies, there will be a total of 1 million apoc.cypher.run()
calls, each one doing its own MATCH and small collect().
We’re going to omit the plan for this one, because it won’t show anything interesting. We would see a procedure call operation, but since the meat of the query is in the form of a query string, the planner has no ability to evaluate it, so it won’t show up in the plan.
We could run an EXPLAIN of the copy/pasted query string separately, with a few small modifications so it will compile, but we’ve already seen a plan like this, with the collect() aggregation. The only differences is that this plan will be planned and executed as an entirely separate transaction, whose reults will be yielded to the transaction for this query. Let’s see how it does:
Started streaming 1 records after 1 ms and completed after 136441 ms.
Whoa, what happened here? The time spiked super high, at around 2 minutes. Why did this happen?
This APOC procedure creates and executes the query as a new transaction, as opposed to native subqueries which still execute within the same single transaction. This means that we’re actually executing 1 million separate transactions via APOC with this approach, and that has a cost in terms of setup and execution.
Why would we ever consider this approach, if it can be so costly timewise when run over so many rows? Because we still see no GC pauses in the debug log.
If GCs and out of heap are issues for your query as a result of an aggregation like this, and if you aren’t running a high enough version to use native subqueries, and if the use case doesn’t let you use pattern comprehensions, then this approach with certain APOC procs may let you avoid those GCs and heap pressure, but possibly at a cost of time.
As always, perform your own timing over your data to test.
Is this page helpful?