Create a job specification file
The job configuration file instructs Dataflow on how to run the import (where to source the data from, how to map it into Neo4j, etc). It consists of a JSON object with four sections.
{
"config": { ... }, (1)
"sources": [ (2)
{ ... }
],
"targets": [ (3)
{ ... }
],
"actions": [ (4)
{ ... }
]
}
1 | config — Global flags affecting how the import is performed (optional) |
2 | sources — Data source definitions (relational) |
3 | targets — Data target definitions (graph: nodes/relationships/Cypher queries) |
4 | actions — One-off actions (optional) |
At a high level, the job fetches data from sources
and transforms/imports them into the targets
.
A valid specification file contains at least one source object and one target object.
A full example
Here below is an example job specification file that works out of the box to import the publicly-available movies
dataset.
The dataset contains entities Person
and Movie
, linked together by DIRECTED
and ACTED_IN
relationships.
In other words, each Person
may have DIRECTED
and/or ACTED_IN
a Movie
.
Both entities and relationships have extra details attached to each of them.
The data is sourced from the following files: persons.csv, movies.csv, acted_in.csv, directed.csv.
The next sections break it down and provide in-context information for each part. We recommend reading this guide side by side with the job specification example.
{
"version": "1",
"config": {
"reset_db": true
},
"sources": [
{
"type": "bigquery",
"name": "persons",
"query": "SELECT person_tmdbId, name, bornIn, born, died FROM team-connectors-dev.movies.persons"
},
{
"type": "bigquery",
"name": "movies",
"query": "SELECT movieId, title, imdbRating, year FROM team-connectors-dev.movies.movies"
},
{
"type": "bigquery",
"name": "directed",
"query": "SELECT movieId, person_tmdbId FROM team-connectors-dev.movies.directed"
},
{
"type": "bigquery",
"name": "acted_in",
"query": "SELECT movieId, person_tmdbId, role FROM team-connectors-dev.movies.acted_in"
}
],
"targets": {
"nodes": [
{
"source": "persons",
"name": "Persons",
"write_mode": "merge",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_property": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_property": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_property": "bornDate",
"target_property_type": "date"
},
{
"source_field": "died",
"target_property": "diedDate",
"target_property_type": "date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
},
{
"source": "movies",
"name": "Movies",
"write_mode": "merge",
"labels": [ "Movie" ],
"properties": [
{
"source_field": "movieId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "title",
"target_property": "title",
"target_property_type": "string"
},
{
"source_field": "year",
"target_property": "releaseYear",
"target_property_type": "string"
},
{
"source_field": "imdbRating",
"target_property": "imdbRating",
"target_property_type": "float"
}
],
"schema": {
"key_constraints": [
{
"name": "movieIdKey",
"label": "Movie",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "movieTitleUnique",
"label": "Movie",
"properties": ["title"]
}
]
}
}
],
"relationships": [
{
"source": "directed",
"name": "Directed",
"write_mode": "merge",
"node_match_mode": "match",
"type": "DIRECTED",
"start_node_reference": "Persons",
"end_node_reference": "Movies"
},
{
"source": "acted_in",
"name": "Acted_in",
"write_mode": "merge",
"node_match_mode": "match",
"type": "ACTED_IN",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_property": "role",
"target_property_type": "string"
}
]
}
]
}
}
Configuration
The config
object contains global configuration for the import job.
All settings have a default, so you don’t need to specify them unless you wish to alter them.
"config": {
"reset_db": false,
"index_all_properties": false,
"node_target_batch_size": 5000,
"relationship_target_batch_size": 1000,
"query_target_batch_size": 1000,
"node_target_parallelism": 10,
"relationship_target_parallelism": 1,
"query_target_parallelism": 1
}
-
reset_db
(bool) — Whether to clear the target database before importing. Deletes data, indexes, and constraints. -
index_all_properties
(bool) — Whether to create indexes for all properties. See Cypher® → Search-performance indexes. -
node_target_batch_size
(int) — Number of rows to be processed for each node target’s import transaction. -
relationship_target_batch_size
(int) — Number of rows to be processed for each relationship target’s transaction. -
query_target_batch_size
(int) — Number of rows to be processed for each custom query’s transaction. -
node_target_parallelism
(int) — Number of max concurrent transactions for node targets per worker. -
relationship_target_parallelism
(int) — Number of max concurrent transactions for relationship targets per worker. Values higher than1
should be set with care, as they may result in deadlocks. -
query_target_parallelism
(int) — Number of max concurrent transactions for Cypher query targets per worker. Values higher than1
should be set with care, as they may result in deadlocks.
Sources
The sources
section contains the definitions of the data sources, as a list. As a rough guideline, you can think one table <=> one source
. The importer will take the data surfaced by the sources and make it available to the targets, which will eventually map it into Neo4j.
{
"type": "bigquery",
"name": "<sourceName>",
"query": "<bigQuerySqlQuery>",
"query_temp_project": "<projectName>",
"query_temp_dataset": "<datasetName>"
}
-
type
(string) —bigquery
. -
name
(string) — A human-friendly label for the source (unique among all names; may not contain spaces). You will use this to reference the source from other parts of the specification file. -
query
(string) — The dataset to extract from BigQuery, as an SQL query. Notice that:-
the source table can have more columns than what you select in the query;
-
multiple targets can use the same source, potentially filtering it for a different subset of columns.
-
-
query_temp_project
(string, optional) The Google Cloud project to store temporary query results (defaults to the current project). -
query_temp_dataset
(string, optional) The BigQuery dataset to store temporary query results (defaults to a new temporary dataset).
Specifying a temporary project and/or dataset is helpful in cases when you only have read permissions on the project/dataset where the sources are located. If you set |
Columns of type BIGNUMERIC , GEOGRAPHY , JSON , INTERVAL and STRUCT are not supported.
|
Targets
The targets
section contains the definitions of the graph entities that will result from the import.
You must specify at least one target object.
Neo4j represents objects with nodes (ex. movies
, people
) and connects them with relationships (ex. ACTED_IN
, DIRECTED
).
Each object in the targets section will generate a corresponding entity (node or relationship) in Neo4j drawing data from a source.
It is also possible to run custom Cypher queries.
"targets": {
"nodes": [ ... ],
"relationships": [ ... ],
"queries": [ ... ]
}
By default, you don’t need to think about dependencies between nodes and relationships. Relationship targets are always processed after the targets corresponding to their start and end node. It is however possible to add other targets as dependencies.
Node objects
Node entities must be grouped in a list keyed nodes
inside the targets
object.
"targets": {
"nodes": [
{ <nodeSpec1> },
{ <nodeSpec2> },
...
]
}
Compulsory fields
Each node object must at minimum have attributes source
, name
, labels
, and properties
.
{
"source": "<sourceName>",
"name": "<targetName>",
"labels": ["<label1>", "<label2>", ...],
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
],
"write_mode": "merge"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
labels
(list of strings) — Labels to mark the nodes with. -
properties
(list of objects) — Mapping between source columns and node properties.
Valid values fortarget_property_type
are:boolean
,byte_array
(assumes base64 encoding),date
,duration
,float
,integer
,local_date
,local_datetime
,local_time
,point
,string
,zoned_datetime
,zoned_time
. Each property type (except byte_array) is also available in its "_array" form (i.e. date_array, string_array, etc) for BigQuery "repeated" column types. -
write_mode
(string) — The creation mode in Neo4j. Eithercreate
ormerge
. SeeCREATE
andMERGE
for info on the Cypher clauses behavior.
Schema definition
You may create indexes and constraints on the imported nodes through the schema
object.
The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT
commands, except they are run automatically ahead of import for each entity type.
If the global config index_all_properties is set to true , all properties will be indexed with range indexes.
|
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"labels": ["label1", "label2", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
Where the attributes for each object are:
-
name
(string) — The name of the index or constraint to be created in Neo4j. -
label
(string) orlabels
(list of strings) — The label(s) on which the index or constraint should be enforced upon. -
property
(string) orproperties
(list of strings) — The property(s) on which the index or constraint should be enforced upon. -
options
(object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.
Source data must not have null values for key_constraints columns, or they will clash with the node key constraint.
If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL ).
Alternatively, use the where attribute in a source transformation.
|
The options key_constraints and existence_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.
|
Configuration
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import (default:true
). -
source_transformations
(object) — Ifenable_grouping
is set totrue
, the import will append the SQL clauseGROUP BY
on all fields specified inkey_constraints
andproperties
. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
Person
nodes{
"source": "persons",
"name": "Persons",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_field": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_field": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_field": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_field": "bornDate",
"target_property_type": "local_date"
},
{
"source_field": "died",
"target_field": "diedDate",
"target_property_type": "local_date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
}
Relationship objects
Relationship entities must be grouped in a list keyed relationships
inside the targets
object.
"targets": {
...
"relationships": [
{ <relationshipSpec1> },
{ <relationshipSpec2> },
...
]
}
Compulsory fields
Each relationship object must at minimum have attributes source
, name
, and type
.
It must also contain information about which node targets the relationship links together. You provide this through start_node_reference
and end_node_reference
.
{
"source": "<sourceName>",
"name": "<targetName>",
"type": "<relationshipType>",
"start_node_reference": "<nodeTargetName>",
"end_node_reference": "<nodeTargetName>",
"node_match_mode": "<match/merge>",
"write_mode": "<create/merge>"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
type
(string) — Type to assign to the relationship. -
start_node_reference
(string) — The name of the node target that acts as start for the relationship. -
end_node_reference
(string) — The name of the node target that acts as end for the relationship. -
node_match_mode
(string) — What Cypher clause to use to fetch the source/end nodes ahead of creating a relationship between them. Valid values arematch
ormerge
, respectively resulting in the Cypher clausesMATCH
andMERGE
. -
write_mode
(string) — The creation mode in Neo4j. Eithercreate
ormerge
. SeeCREATE
andMERGE
for info on the Cypher clauses behavior.
Properties
Relationships may also map source columns as properties.
{
...
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
]
}
-
properties
(list of objects) — Mapping between source columns and relationship properties.
Valid values fortarget_property_type
are:boolean
,byte_array
(assumes base64 encoding),date
,duration
,float
,integer
,local_date
,local_datetime
,local_time
,point
,string
,zoned_datetime
,zoned_time
. Each property type (except byte_array) is also available in its "_array" form (i.e. date_array, string_array, etc) for BigQuery "repeated" column types.
Schema definition
You may create indexes and constraints on the imported relationships through the schema
object.
The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT
commands, except they are run automatically ahead of import for each relationship type.
If the global config index_all_properties is set to true , all properties will be indexed with range indexes.
|
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"types": ["<relationshipType1>", "<relationshipType2>", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
Where the attributes for each object are:
-
name
(string) — The name of the index or constraint to be created in Neo4j. -
type
(string) — The type on which the index or constraint should be enforced upon. -
property
(string) orproperties
(list of strings) — The property(s) on which the index or constraint should be enforced upon. -
options
(object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.
Source data must not have null values for key_constraints columns, or they will clash with the relationship key constraint.
If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL ).
Alternatively, use the where attribute in a source transformation.
|
The options key_constraints and existence_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.
|
Configuration
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import. -
source_transformations
(object) — Ifenable_grouping
is set totrue
, the import will SQLGROUP BY
on all fields specified inkey_constraints
andproperties
. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
ACTED_IN
relationships{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
Custom query targets
Custom query targets are useful when the import requires a complex query that does not easily fit into the node/relationship targets format.
Query targets receive batches of rows through the variable $rows
.
Custom queries must be grouped in a list keyed queries
inside the targets
object.
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
Do not use custom queries to run Cypher that does not directly depend on a source; use actions instead.
One-off queries, especially if not idempotent, are not fit to use in custom query targets.
The reason for this is that queries from targets are run in batches, so a custom query may be run several times depending on the number of $rows batches extracted from the source.
|
Compulsory fields
Each query target must at minimum have attributes source
, name
, and query
.
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
query
(string) — A Cypher query. Data from the source is available as a list in the parameter$rows
.
Configuration
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
Person
nodes and setting a date on creation{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
Source transformations
Each node and relationship target can optionally have a source_transformation
attribute containing aggregation functions. This can be useful to extract higher-level dimensions from a more granular source. Aggregations result in extra fields that become available for property mappings.
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping
(bool) — Must betrue
foraggregations
/where
to work. -
aggregations
(list of objects) — Aggregations are specified as SQL queries in theexpression
attribute, and the result is available as a source column under the name specified infield_name
. -
limit
(int) — Caps the number of source rows that are considered for import (defaults to no limit, encoded as-1
). -
where
(string) — Filters out source data prior to import (with an SQLWHERE
clause format). -
order_by
(list of objects) — Enforces ordering on the source.
Example
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
By default, a source is processed only once; its data is then fanned out across its targets. However, targets with a source transformation trigger a new data fetch, as the resulting source query differs from the default. Sources are thus processed once for targets of that source without transformations plus as many times as targets defining transformations. As a consequence, the original source query must be deterministic, or different targets could receive different data. |
Actions
The actions
section contains commands that can be run before or after specific steps of the import process.
Each step is called a stage
.
You may for example submit HTTP requests when steps complete, execute SQL queries on the source, or run Cypher statements on the Neo4j target instance.
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
Each action object must at minimum have the attribute name
, type
, and stage
.
Further attributes depend on the action type.
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
method
(string) — The HTTP method; eitherget
orpost
. -
url
(string) — The URL the HTTP request should target. -
headers
(object, optional) — Request headers.
GET
request after import completes{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "https://neo4j.com/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
query
(string) — The Cypher query to run. -
execution_mode
(string, optional) — Under what mode the query should be executed. Valid values aretransaction
,autocommit
(default:transaction
).
importJob
node after import completes{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
sql
(string) — The SQL query to run.
GET
request after import completes{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}