Apache Arrow projection
Projecting graphs via Apache Arrow allows importing graph data which is stored outside of Neo4j. Apache Arrow is a language-agnostic in-memory, columnar data structure specification. With Arrow Flight, it also contains a protocol for serialization and generic data transport.
GDS exposes an Arrow Flight Server which accepts graph data from an Arrow Flight Client. The data that is being sent is represented using the Arrow columnar format. Projecting graphs via Arrow Flight follows a specific client-server protocol. In this chapter, we explain that protocol, message formats and schema constraints.
In this chapter, we assume that a Flight server has been set up and configured. To learn more about the installation, please refer to the installation chapter.
Graph projection features are versioned to allow for future changes. Please refer to the corresponding section in the Configure Apache Arrow server documentation for more details on versioned commands.
Client-Server protocol
The protocol describes the projection of a single in-memory graph into GDS. Each projection is represented as an import process on the server side. The protocol divides the import process into three phases.
-
Initialize the import process
To initialize the import process, the client needs to execute a Flight action on the server. The action type is called
CREATE_GRAPH
and the action body configures the import process. The server receives the action, creates the import process and acknowledges success.See Initializing the Import Process for more details.
-
Send node records via an Arrow Flight stream
In the second phase, the client sends record batches of nodes via
PUT
as a Flight stream. Once all record batches are sent, the client needs to indicate that all nodes have been sent. This is done via sending another Flight action with typeNODE_LOAD_DONE
.See Sending node records via PUT as a Flight stream for more details.
-
Send relationship records via an Arrow Flight stream
In the third and last phase, the client sends record batches of relationships via
PUT
as a Flight stream. Once all record batches are sent, the client needs to indicate that the import process is complete. This is done via sending another Flight action with typeRELATIONSHIP_LOAD_DONE
. The server finalizes the construction of the in-memory graph and stores the graph in the graph catalog.See Sending relationship records via PUT as a Flight stream for more details.
Initializing the import process
An import process is initialized by sending a Flight action using the action type v1/CREATE_GRAPH
.
The action type contains the server version, which is currently version 1 (v1
).
The action body is a JSON document containing metadata for the import process:
{ name: "my_graph", (1) database_name: "neo4j", (2) concurrency: 4, (3) undirected_relationship_types: [] (4) inverse_indexed_relationship_types: [] (5) skip_dangling_relationships: false (6) }
1 | Used to identify the import process. It is also the name of the resulting in-memory graph in the graph catalog. |
2 | The name of the database on which the projected graph will be available. |
3 | (optional) The level of concurrency that will be set on the in-memory graph after all data has been received. |
4 | (optional) A list of relationship types that must be imported as undirected. A wildcard (* ) can be used to include all the types. |
5 | (optional) A list of relationship types that must be indexed in inverse direction. A wildcard (* ) can be used to include all the types. |
6 | (optional) If set to true , dangling relationships will be skipped during the import process. Otherwise, the import process will fail if dangling relationships are detected. |
Relationships declared as undirected should only be provided once, i.e. in a single direction. |
The server acknowledges creating the import process by sending a result JSON document which contains the name of the import process. If an error occurs, e.g., if the graph already exists or if the server is not started, the client is informed accordingly.
Sending node records via PUT as a Flight stream
Nodes need to be turned into Arrow record batches and sent to the server via a Flight stream. Each stream needs to target an import process on the server. That information is encoded in the Flight descriptor body as a JSON document:
{ name: "PUT_COMMAND", version: "v1", body: { name: "my_graph", entity_type: "node", } }
The server expects the node records to adhere to a specific schema.
Given an example node such as (:Pokemon { weight: 8.5, height: 0.6, hp: 39 })
, it’s record must be represented as follows:
nodeId | labels | weight | height | hp |
---|---|---|---|---|
0 |
"Pokemon" |
8.5 |
0.6 |
39 |
The following table describes the node columns with reserved names.
Name | Type | Optional | Nullable | Description |
---|---|---|---|---|
|
Integer |
No |
No |
Unique 64-bit node identifiers for the in-memory graph. Must be positive values. |
|
String or Integer or List of String |
Yes |
No |
Node labels, either a single string node label, a single dictionary encoded node label or a list of node label strings. |
Any additional column is interpreted as a node property.
The supported data types are equivalent to the GDS node property types, i.e., long
, double
, long[]
, double[]
and float[]
.
For floating point values, null will be converted to NaN .
|
To increase the throughput, multiple Flight streams can be sent in parallel. The server manages multiple incoming streams for the same import process. In addition to the number of parallel streams, the size of a single record batch can also affect the overall throughput. The client has to make sure that node ids are unique across all streams.
Sending duplicate node ids will result in an undefined behaviour. |
Once all node record batches are sent to the server, the client needs to indicate that node loading is done.
This is achieved by sending another Flight action with the action type v1/NODE_LOAD_DONE
and the following JSON document as action body:
{ name: "my_graph" }
The server acknowledges the action by returning a JSON document including the name of the import process and the number of nodes that have been imported:
{ name: "my_graph", node_count: 42 }
Importing nodes with common labels
In case all nodes of a single call to the PUT
endpoint share the same labels, the labels can be specified as part of the import process metadata via the common_labels
property:
{ name: "PUT_COMMAND", version: "v1", body: { name: "my_graph", entity_type: "node", common_labels: ["Pokemon"] } }
If the common labels are the only labels present in a stream, the labels
column can be omitted from the node record batches.
In case that there are nodes with more labels than the common ones, it is still possible to include a labels
column which contains the additional labels.
Sending relationship records via PUT as a Flight stream
Similar to nodes, relationships need to be turned into record batches in order to send them to the server via a Flight stream. The Flight descriptor is a JSON document containing the name of the import process as well as the entity type:
{ name: "PUT_COMMAND", version: "v1", body: { name: "my_graph", entity_type: "relationship", } }
As for nodes, the server expects a specific schema for relationship records.
For example, given the relationship (a)-[:EVOLVES_TO { at_level: 16 }]→(b)
an assuming node id 0
for a
and node id 1
for b
, the record must be represented as follow:
sourceNodeId | targetNodeId | type | at_level |
---|---|---|---|
0 |
1 |
"EVOLVES_TO" |
16 |
The following table describes the node columns with reserved names.
Name | Type | Optional | Nullable | Description |
---|---|---|---|---|
|
Integer |
No |
No |
Unique 64-bit source node identifiers. Must be positive values and present in the imported nodes. |
|
Integer |
No |
No |
Unique 64-bit target node identifiers. Must be positive values and present in the imported nodes. |
|
String or Integer |
Yes |
No |
Single relationship type. Either a string literal or a dictionary encoded number. |
Any additional column is interpreted as a relationship property.
GDS only supports relationship properties of type double
.
Similar to sending nodes, the overall throughput depends on the number of parallel Flight streams and the record batch size.
Once all relationship record batches are sent to the server, the client needs to indicate that the import process is done.
This is achieved by sending a final Flight action with the action type v1/RELATIONSHIP_LOAD_DONE
and the following JSON document as action body:
{ name: "my_graph" }
The server finalizes the graph projection and stores the in-memory graph in the graph catalog. Once completed, the server acknowledges the action by returning a JSON document including the name of the import process and the number of relationships that have been imported:
{ name: "my_graph", relationship_count: 1337 }
Aborting an import process
A started import process can be aborted by sending a Flight action using the action type v1/ABORT
.
This will immediately cancel the running graph or database import process and remove all temporary data.
The action body is a JSON document containing the name of the graph or database that is being imported:
{ name: "my_graph", }
Arrow import processes will be aborted automatically if no data or instructions were received within a configurable timeout.
The timeout can be configured via the |
Appending new data to an existing graph
Once a graph has been created, it is possible to append additional data to the existing graph. This avoids dropping the graph and re-importing all data. The process is similar to the initial import process but with a few differences.
Appending new node properties
Once a graph has been created, it is possible to append additional node properties to the existing graph.
To do so, the client needs to send the node records with the same node ids as the existing nodes.
The server will update the existing nodes with the new properties, potentially limited to a set of node labels.
A node record is identified by the nodeId
column, which must be unique across all node records and must match the node ids of the existing nodes.
It is not required to send node property values for all nodes in the existing graph. If a node record is not represented in the new data, that record will receive the default value for the property type.
The process will fail if the property already exists or if one of the provided node ids does not match an existing node.
The import process is initialized by sending a Flight action using the action type v1/PUT_NODE_PROPERTIES
with the following action body:
{ name: "my_graph", (1) database_name: "neo4j", (2) concurrency: 4, (3) node_labels: ["*"], (4) consecutive_ids: false (5) }
1 | The name of the existing in-memory graph. It is also used to identify the import process. |
2 | The name of the database on which the projected graph is available. |
3 | (optional) The level of concurrency that will be used to build the property data structures on the server. |
4 | (optional) A set of node labels that will be updated with the new properties. A wildcard (* ) can be used to include all the labels. |
5 | (optional) If data is associated with a graph that has been exported using consecutive node ids, set this to true . |
Node properties need to be turned into Arrow record batches and sent to the server via a Flight stream. Each stream needs to target an import process on the server. That information is encoded in the Flight descriptor body as a JSON document:
{ name: "PUT_COMMAND", version: "v1", body: { name: "my_graph", entity_type: "node_properties", } }
As with sending nodes and relationships, the overall throughput depends on the number of parallel Flight streams and the record batch size.
Similar to the node import, the server expects the node records to adhere to a specific schema.
Given an example node such as ({ yob: 1984, magic: 1.8, rank: 42 })
, it’s record must be represented as follows:
nodeId | yob | magic | rank |
---|---|---|---|
0 |
1984 |
1.8 |
42 |
The following table describes the node columns with reserved names.
Name | Type | Optional | Nullable | Description |
---|---|---|---|---|
|
Integer |
No |
No |
Unique 64-bit node identifiers for the in-memory graph. Must be positive values. |
Any additional column except nodeId
is interpreted as a new node property to append.
The supported data types are equivalent to the GDS node property types, i.e., long
, double
, long[]
, double[]
and float[]
.
Once all node record batches are sent to the server, the client needs to indicate that node appending is done.
This is achieved by sending another Flight action with the action type v1/PUT_NODE_PROPERTIES_DONE
and the following JSON document as action body:
{ name: "my_graph" }
The server acknowledges the action by returning a JSON document including the name of the import process and the number of nodes that have been updated:
{ name: "my_graph", node_count: 1 }
Creating a Neo4j database
This feature is not available in AuraDS. |
The Client-Server protocol can also be used to create a new Neo4j database instead of an in-memory graph.
To initialize a database import process, we need to change the initial action type to v1/CREATE_DATABASE
.
The action body is a JSON document containing the configuration for the import process:
{ name: "my_database", concurrency: 4 }
The following table contains all settings for the database import.
Name | Type | Optional | Default value | Description |
---|---|---|---|---|
|
String |
No |
None |
The name of the import process and the resulting database. |
|
String |
Yes |
INTEGER |
Sets the node id type used in the input data. Can be either |
|
Integer |
Yes |
Available cores |
Number of threads to use for the database creation process. |
|
String |
Yes |
|
The node property key which stores the node id of the input data. |
|
String |
Yes |
|
DEPRECATED: Use |
|
String |
Yes |
|
Database format. Valid values are blank (no value, default), |
|
Boolean |
Yes |
False |
Force deletes any existing database files prior to the import. |
|
Boolean |
Yes |
False |
Ignore environment-based heuristics, and specify whether the target storage subsystem can support parallel IO with high throughput. |
|
Boolean |
Yes |
False |
Collects bad node and relationship records during import and writes them into the log. |
After sending the action to initialize the import process, the subsequent protocol is the same as for creating an in-memory graph. See Sending node records via PUT as a Flight stream and Sending relationship records via PUT as a Flight stream for further details.
Supported node identifier types
For the CREATE_DATABASE
action, one can set the id_type
configuration parameter.
The two possible options are INTEGER
and STRING
, with INTEGER
being the default.
If set to INTEGER
, the node id columns for both node (nodeId
) and relationship records (sourceNodeId
and targetNodeId
), are expected to be represented as BigIntVector
.
For the STRING
id type, the server expects the identifiers to be represented as VarCharVector
.
In both cases, the original id is being stored as a property on the imported nodes.
The property key can be changed by the id_property
config option.