Run non-blocking asynchronous queries
The examples in Query the database use the async/await
syntax, which forces the driver to work synchronously.
When using await
with a query, your application waits for the server to retrieve all query results and transmit them to the driver.
This is not a problem for most use cases, but for queries that have a long processing time or a large result set, asynchronous handling may speed up your application.
There are several ways of running asynchronous queries:
-
Asynchronous iteration — the query result is processed (iteratively) as quickly as your application can handle. The driver modulates the amount of records transmitted by the server accordingly.
-
Promise API — the query result is returned as a
Promise
. The promise is only resolved when the full result set is available to the driver. Best suited for queries with a large server processing time, but the result of which you want to process in one go. Your application receives the result in bulk, for eager consumption. -
Streaming API — the query result is returned as a stream, so that each result record is processed as soon as it is available. Best suited for queries where records processing is individual. Your application receives the result in bits, for lazy consumption.
-
Reactive API — for reactive applications.
When using await tx.run() in a transaction function, you may return the query result out of the transaction function as-is for further processing.
On the other hand, for async queries, you have to process the result inside the transaction function (except for the Promise API).
|
Asynchronous iteration
The Result
object supports asynchronous iteration.
This allows your application to process data at its own pace, with the driver accordingly modulating the speed at which records are streamed from the server, applying backpressure.
With async iterators, you get the guarantee that your application does not receive data faster than it can process.
const session = driver.session()
try {
const peopleNames = await session.executeWrite(async tx => {
const result = tx.run( (1)
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
let names = []
for await (const record of result) { (2)
console.log(`Processing ${record.get('name')}`)
names.push(record.get('name'))
}
return names (3)
})
} finally {
await session.close()
}
1 | Run a query |
2 | Process records with async iteration |
3 | Return processed results (and not the raw query result) |
There are two important points to the usage of the async iterator:
-
you can async-iterate only once per query result. Once the result cursor reaches the end of the stream, it does not rewind, so you cannot iterate over a result more than once. If you need to process the data more than once in your application, you have to manually store it in an auxiliary data structure (like a list, as above).
-
the processing of the result happens inside the transaction function. You should not return the raw result out of the transaction function and then iterate over it. That workflow only works with the Promise API.
Promise API
The Promise API allows to run a query and receive the result as a Promise
.
You can think of this query method as allowing you to specify a Cypher query and a number of callbacks that are asynchronously executed depending on the query outcome.
const session = driver.session({database: 'neo4j'})
const result = session.executeWrite(async tx => { (1)
return tx.run(
'MERGE (p:Person {name: $name}) RETURN p.name AS name',
{ name: 'Alice' }
)
})
result.then(result => { (2)
result.records.forEach(record => {
console.log(record.get('name'))
})
return result
})
.catch(error => { (3)
console.log(error)
})
.then(() => session.close()) (4)
1 | Run a query |
2 | Specify callback for successful runs, taking query result as input |
3 | Specify callback for failed runs, taking driver error as input |
4 | Specify callback to run regardless of query outcome |
The Promise API holds for |
Combine multiple transactions
To run multiple queries within the same transaction, use Promise.all()
.
It runs asynchronous operations concurrently, so you can submit multiple queries at the same time and wait for them all to finish.
const companyName = 'Neo4j'
const session = driver.session({database: 'neo4j'})
try {
const names = await session.executeRead(async tx => {
const result = await tx.run('MATCH (p:Person) RETURN p.name AS name')
return result.records.map(record => record.get('name'))
})
const relationshipsCreated = await session.executeWrite(tx =>
Promise.all( // group together all Promises
names.map(name =>
tx.run(`
MATCH (emp:Person {name: $personName})
MERGE (com:Company {name: $companyName})
MERGE (emp)-[:WORKS_FOR]->(com)
`, { personName: name, companyName: companyName }
)
.then(result => result.summary.counters.updates().relationshipsCreated)
)
).then(values => values.reduce((a, b) => a + b)) // aggregate results
)
console.log(`Created ${relationshipsCreated} employees relationships.`)
} finally {
await session.close()
}
Streaming API
The Streaming API allows to run a query and receive results individually, as soon as the server has them ready. You can specify a callback to process each record. This API is particularly fit for cases in which it may take the server a different time to retrieve the different records, but you want to process each of them as soon as they are available. The behavior is similar to the async iterator; the programming style is different.
const session = driver.session({database: 'neo4j'})
let peopleNames = []
session
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (1)
name: 'Alice'
})
.subscribe({ (2)
onKeys: keys => { (3)
console.log('Result columns are:')
console.log(keys)
},
onNext: record => { (4)
console.log(`Processing ${record.get('name')}`)
peopleNames.push(record.get('name'))
},
onCompleted: () => { (5)
session.close() // returns a Promise
},
onError: error => { (6)
console.log(error)
}
})
1 | Run a query |
2 | Attach a handler to the result stream |
3 | The onKeys callback receives the list of result columns |
4 | The onNext callback is invoked every time a record is received |
5 | The onCompleted callback is invoked when the transaction is over |
6 | The onError is triggered in case of error |
Reactive API
Typical of reactive programming, in a reactive flow consumers control the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server. The reactive API is recommended for applications that are already oriented towards the reactive style.
const rxjs = require('rxjs');
const rxSession = driver.rxSession() (1)
const rxResult = await rxSession.executeWrite(tx => {
return tx
.run('MERGE (p:Person {name: $name}) RETURN p.name AS name', { (2)
name: 'Alice'
})
.records() (3)
.pipe( (4)
rxjs.map(record => record.get('name')),
//rxjs.materialize(), // optional, turns outputs into Notifications
rxjs.toArray()
)
})
const people = await rxResult.toPromise()
console.log(people)
1 | Obtain a reactive session |
2 | Run a query |
3 | Obtain an observable for result records |
4 | Reactive processing |
The reactive API is not available in the lite version of the driver. |
Glossary
- LTS
-
A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.
- Aura
-
Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.
- Cypher
-
Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.
- APOC
-
Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.
- Bolt
-
Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.
- ACID
-
Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.
- eventual consistency
-
A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.
- causal consistency
-
A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.
- NULL
-
The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with
null
. - transaction
-
A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.
- backpressure
-
Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.
- transaction function
-
A transaction function is a callback executed by an
executeRead
orexecuteWrite
call. The driver automatically re-executes the callback in case of server failure. - Driver
-
A
Driver
object holds the details required to establish connections with a Neo4j database.