Create a job specification file
The job configuration file instructs Dataflow on how to run the import (where to source the data from, how to map it into Neo4j, etc). It consists of a JSON object with four sections.
{
"config": { ... }, (1)
"sources": [ (2)
{ ... }
],
"targets": [ (3)
{ ... }
],
"actions": [ (4)
{ ... }
]
}
1 | config — Global flags affecting how the import is performed (optional) |
2 | sources — Data source definitions (relational) |
3 | targets — Data target definitions (graph: nodes/relationships/Cypher queries) |
4 | actions — One-off actions (optional) |
At a high level, the job fetches data from sources
and transforms/imports them into the targets
.
A full example
Here below is an example job specification file that works out of the box to import the publicly-available movies
dataset.
The dataset contains entities Person
and Movie
, linked together by DIRECTED
and ACTED_IN
relationships.
In other words, each Person
may have DIRECTED
and/or ACTED_IN
a Movie
.
Both entities and relationships have extra details attached to each of them.
The data is sourced from the following files: persons.csv, movies.csv, acted_in.csv, directed.csv.
The next sections break it down and provide in-context information for each part. We recommend reading this guide side by side with the job specification example.
{
"version": "1",
"config": {
"reset_db": true
},
"sources": [
{
"type": "text",
"name": "persons",
"urls": ["gs://neo4j-examples/persons.csv"],
"format": "excel",
"header": ["person_tmdbId","bio","born","bornIn","died","person_imdbId","name","person_poster","person_url"]
},
{
"type": "text",
"name": "movies",
"urls": ["gs://neo4j-examples/movies.csv"],
"format": "excel",
"header": ["movieId","title","budget","countries","movie_imdbId","imdbRating","imdbVotes","languages","plot","movie_poster","released","revenue","runtime","movie_tmdbId","movie_url","year","genres"]
},
{
"type": "text",
"name": "directed",
"urls": ["gs://neo4j-examples/directed.csv"],
"format": "excel",
"header": ["movieId","person_tmdbId"]
},
{
"type": "text",
"name": "acted_in",
"urls": ["gs://neo4j-examples/acted_in.csv"],
"format": "excel",
"header": ["movieId","person_tmdbId","role"]
}
],
"targets": {
"nodes": [
{
"source": "persons",
"name": "Persons",
"write_mode": "merge",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_property": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_property": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_property": "bornDate",
"target_property_type": "date"
},
{
"source_field": "died",
"target_property": "diedDate",
"target_property_type": "date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
},
{
"source": "movies",
"name": "Movies",
"write_mode": "merge",
"labels": [ "Movie" ],
"properties": [
{
"source_field": "movieId",
"target_property": "id",
"target_property_type": "string"
},
{
"source_field": "title",
"target_property": "title",
"target_property_type": "string"
},
{
"source_field": "year",
"target_property": "releaseYear",
"target_property_type": "string"
},
{
"source_field": "imdbRating",
"target_property": "imdbRating",
"target_property_type": "float"
}
],
"schema": {
"key_constraints": [
{
"name": "movieIdKey",
"label": "Movie",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "movieTitleUnique",
"label": "Movie",
"properties": ["title"]
}
]
}
}
],
"relationships": [
{
"source": "directed",
"name": "Directed",
"type": "DIRECTED",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies"
},
{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_property": "role",
"target_property_type": "string"
}
]
}
]
}
}
Configuration
The config
object contains global configuration for the import job.
All settings have a default, so you don’t need to specify them unless you wish to alter them.
"config": {
"reset_db": false,
"index_all_properties": false,
"node_target_batch_size": 5000,
"relationship_target_batch_size": 1000,
"query_target_batch_size": 1000,
"node_target_parallelism": 10,
"relationship_target_parallelism": 1,
"query_target_parallelism": 1
}
-
reset_db
(bool) — Whether to clear the target database before importing. Deletes data, indexes, and constraints. -
index_all_properties
(bool) — Whether to create indexes for all properties. See Cypher® → Search-performance indexes. -
node_target_batch_size
(int) — Number of nodes to be processed for each transaction. -
relationship_target_batch_size
(int) — Number of relationships to be processed for each transaction. -
query_target_batch_size
(int) — Number of rows to be processed for each custom query. -
node_target_parallelism
(int) — Number of max concurrent transactions for node targets per worker. -
relationship_target_parallelism
(int) — Number of max concurrent transactions for relationship targets per worker. Values higher than1
should be set with care, as they may result in deadlocks. -
query_target_parallelism
(int) — Number of max concurrent transactions for Cypher query targets per worker. Values higher than1
should be set with care, as they may result in deadlocks.
Sources
The sources
section contains the definitions of the data sources, as a list. As a rough guideline, you can think one table <=> one source
. The importer will leverage the data surfaced by the sources and make it available to the targets, which eventually map it into Neo4j.
Source objects must at minimum specify the attributes type
, name
, urls
, and header
.
The default column delimiter and line separator are set depending on the specified format
, according to Apache’s CSVFormat
.
{
"type": "text",
"name": "<sourceName>",
"urls": [ "<csvPath1>", "<csvPath2>", ... ],
"format": "default",
"column_delimiter": "",
"line_separator": "",
"header": "<colName1>,<colName2>,..."
}
-
type
(string) —text
. -
name
(string) — A human-friendly label for the source (unique among all names). You will use this to reference the source from other parts of the specification file. -
urls
(list of strings) — The Google Storage location of the CSV file (ex.gs://neo4j-datasets/movies.csv
).How to retrieve the Google Storage location of a file?
To retrieve the Google Storage location of a file in a Cloud bucket, expand the file options through the three dots on the right, and choose
Copy gsutil URI
. -
format
(string) — The format of the provided CSV file.
Valid values are:default
,excel
,informix
,mongo
,mongo_tsv
,mysql
,oracle
,postgres
,postgresql_csv
,rfc4180
.
Formats behave in accordance with Apache’sCSVFormat
. -
column_delimiter
(string) — CSV field delimiter. -
line_separator
(string) — CSV line separator. -
header
(string) — Full list of field names the CSV file contains, in order. Alternatively, the list may be restricted to only the first few columns. The column names specified here control the row field names the targets will map from.
The field Valid/Invalid
|
Valid |
|
Valid |
|
Valid |
|
Invalid |
|
Invalid |
|
Targets
The targets
section contains the definitions of the graph entities that will result from the import.
Neo4j represents objects with nodes (ex. movies
, people
) and connects them with relationships (ex. ACTED_IN
, DIRECTED
).
Each object in the targets section will generate a corresponding entity (node or relationship) in Neo4j drawing data from a source.
It is also possible to run custom Cypher queries.
"targets": {
"nodes": [ ... ],
"relationships": [ ... ],
"queries": [ ... ]
}
By default, you don’t need to think about dependencies between nodes and relationships. Relationship targets are always processed after the targets corresponding to their start and end node. It is however possible to add other targets as dependencies.
Node objects
Node entities must be grouped in a list keyed nodes
inside the targets
object.
"targets": {
"nodes": [
{ <nodeSpec1> },
{ <nodeSpec2> },
...
]
}
Compulsory fields
Each node object must at minimum have attributes source
, name
, labels
, and properties
.
{
"source": "<sourceName>",
"name": "<targetName>",
"labels": ["<label1>", "<label2>", ...],
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
],
"write_mode": "merge"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
labels
(list of strings) — Labels to mark the nodes with. -
properties
(list of objects) — Mapping between source columns and node properties.
Valid values fortarget_property_type
are:boolean
,byte_array
(assumes base64 encoding),date
,duration
,float
,integer
,local_date
,local_datetime
,local_time
,point
,string
,zoned_datetime
,zoned_time
. -
write_mode
(string) — The creation mode in Neo4j. Eithercreate
ormerge
. SeeCREATE
andMERGE
for info on the Cypher clauses behavior.
Schema definition
You may create indexes and constraints on the imported nodes through the schema
object.
The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT
commands, except they are run automatically ahead of import for each entity type.
If the global config index_all_properties is set to true , all properties will be indexed with range indexes.
|
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"label": "<label>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"labels": ["label1", "label2", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"label": "<label>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
Where the attributes for each object are:
-
name
(string) — The name of the index or constraint to be created in Neo4j. -
label
(string) orlabels
(list of strings) — The label(s) on which the index or constraint should be enforced upon. -
property
(string) orproperties
(list of strings) — The property(s) on which the index or constraint should be enforced upon. -
options
(object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.
Source data must not have null values for key_constraints columns, or they will clash with the node key constraint.
If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL ).
Alternatively, use the where attribute in a source transformation.
|
The options key_constraints and unique_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.
|
Configuration
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import (default:true
). -
source_transformations
(object) — Ifenable_grouping
is set totrue
, the import will append the SQL clauseGROUP BY
on all fields specified inkey_constraints
andproperties
. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
Person
nodes{
"source": "persons",
"name": "Persons",
"labels": [ "Person" ],
"properties": [
{
"source_field": "person_tmdbId",
"target_field": "id",
"target_property_type": "string"
},
{
"source_field": "name",
"target_field": "name",
"target_property_type": "string"
},
{
"source_field": "bornIn",
"target_field": "bornLocation",
"target_property_type": "string"
},
{
"source_field": "born",
"target_field": "bornDate",
"target_property_type": "local_date"
},
{
"source_field": "died",
"target_field": "diedDate",
"target_property_type": "local_date"
}
],
"schema": {
"key_constraints": [
{
"name": "personIdKey",
"label": "Person",
"properties": ["id"]
}
],
"unique_constraints": [
{
"name": "personNameUnique",
"label": "Person",
"properties": ["name"]
}
]
}
}
Relationship objects
Relationship entities must be grouped in a list keyed relationships
inside the targets
object.
"targets": {
...
"relationships": [
{ <relationshipSpec1> },
{ <relationshipSpec2> },
...
]
}
Compulsory fields
Each relationship object must at minimum have attributes source
, name
, and type
.
It must also contain information about which node targets the relationship links together. You provide this through start_node_reference
and end_node_reference
.
{
"source": "<sourceName>",
"name": "<targetName>",
"type": "<relationshipType>",
"start_node_reference": "<nodeTargetName>",
"end_node_reference": "<nodeTargetName>",
"node_match_mode": "<match/merge>",
"write_mode": "<create/merge>"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
type
(string) — Type to assign to the relationship. -
start_node_reference
(string) — The name of the node target that acts as start for the relationship. -
end_node_reference
(string) — The name of the node target that acts as end for the relationship. -
node_match_mode
(string) — What Cypher clause to use to fetch the source/end nodes ahead of creating a relationship between them. Valid values arematch
ormerge
, respectively resulting in the Cypher clausesMATCH
andMERGE
. -
write_mode
(string) — The creation mode in Neo4j. Eithercreate
ormerge
. SeeCREATE
andMERGE
for info on the Cypher clauses behavior.
|
Properties
Relationships may also map source columns as properties.
{
...
"properties": [
{
"source_field": "<bigQueryColumnName>",
"target_field": "<neo4jPropertyName>",
"target_property_type": "<neo4jPropertyType>"
},
{ <propertyObj2> },
...
]
}
-
properties
(list of objects) — Mapping between source columns and relationship properties.
Valid values fortarget_property_type
are:boolean
,byte_array
(assumes base64 encoding),date
,duration
,float
,integer
,local_date
,local_datetime
,local_time
,point
,string
,zoned_datetime
,zoned_time
.
Schema definition
You may create indexes and constraints on the imported relationships through the schema
object.
The schema setup is equivalent to manually running the relevant CREATE INDEX/CONSTRAINT
commands, except they are run automatically ahead of import for each relationship type.
If the global config index_all_properties is set to true , all properties will be indexed with range indexes.
|
{
...
"schema": {
"enable_type_constraints": true,
"key_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"unique_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"existence_constraints": [
{
"name": "<constraintName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>"
}
],
"range_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
}
],
"text_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"point_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
],
"fulltext_indexes": [
{
"name": "<indexName>",
"types": ["<relationshipType1>", "<relationshipType2>", ...],
"properties": ["<neo4jPropertyName1>", "<neo4jPropertyName2>", ...],
"options": {}
}
],
"vector_indexes": [
{
"name": "<indexName>",
"type": "<relationshipType>",
"property": "<neo4jPropertyName>",
"options": {}
}
]
}
}
Where the attributes for each object are:
-
name
(string) — The name of the index or constraint to be created in Neo4j. -
type
(string) — The type on which the index or constraint should be enforced upon. -
property
(string) orproperties
(list of strings) — The property(s) on which the index or constraint should be enforced upon. -
options
(object) — The options with which the index or constraint should be created with (refer to the individual pages for each index and constraint type). When present, it is optional, except for vector indexes where it is mandatory.
Source data must not have null values for key_constraints columns, or they will clash with the relationship key constraint.
If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL ).
Alternatively, use the where attribute in a source transformation.
|
The options key_constraints and unique_constraints require Neo4j/Aura Enterprise Edition, and do not have any effect when run against a Neo4j Community Edition installation.
|
Configuration
{
...
"active": true,
"source_transformations": {
"enable_grouping": true
},
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import. -
source_transformations
(object) — Ifenable_grouping
is set totrue
, the import will SQLGROUP BY
on all fields specified inkey_constraints
andproperties
. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions and further fields, see Source transformations. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
ACTED_IN
relationships{
"source": "acted_in",
"name": "Acted_in",
"type": "ACTED_IN",
"write_mode": "merge",
"node_match_mode": "match",
"start_node_reference": "Persons",
"end_node_reference": "Movies",
"properties": [
{
"source_field": "role",
"target_field": "role",
"target_property_type": "string"
}
]
}
Custom query targets
Custom query targets are useful when the import requires a complex query that does not easily fit into the node/relationship targets format.
Query targets receive batches of rows through the variable $rows
.
Custom queries must be grouped in a list keyed queries
inside the targets
object.
"targets": {
...
"queries": [
{ <querySpec1> },
{ <querySpec2> },
...
]
}
Do not use custom queries to run Cypher that does not directly depend on a source; use actions instead.
One-off queries, especially if not idempotent, are not fit to use in custom query targets.
The reason for this is that queries from targets are run in batches, so a custom query may be run several times depending on the number of $rows batches extracted from the source.
|
Compulsory fields
Each query target must at minimum have attributes source
, name
, and query
.
{
"source": "<sourceName>",
"name": "<targetName>",
"query": "<cypherQuery>"
}
-
source
(string) — The name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — A human-friendly name for the target (unique among all names). -
query
(string) — A Cypher query. Data from the source is available as a list in the parameter$rows
.
Configuration
{
...
"active": true,
"depends_on": ["<dependencyTargetName1>", "<dependencyTargetName2>", ...]
}
-
active
(bool) — Whether the target should be included in the import. -
depends_on
(list of strings) — Thename
of the target(s) that should execute before the current one.
Example
Person
nodes and setting a date on creation{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
Source transformations
Each node and relationship target can optionally have a source_transformation
attribute containing aggregation functions. This can be useful to extract higher-level dimensions from a more granular source. Aggregations result in extra fields that become available for property mappings.
"source_transformations": {
"enable_grouping": true,
"aggregations": [ {
"expression": "",
"field_name": ""
},
{ aggregationObj2 }, ...
],
"limit": -1,
"where": "",
"order_by": [
{
"expression": "column_name",
"order": "<asc/desc>"
},
{ orderObj2 }, ...
],
}
-
enable_grouping
(bool) — Must betrue
foraggregations
/where
to work. -
aggregations
(list of objects) — Aggregations are specified as SQL queries in theexpression
attribute, and the result is available as a source column under the name specified infield_name
. -
limit
(int) — Caps the number of source rows that are considered for import (defaults to no limit, encoded as-1
). -
where
(string) — Filters out source data prior to import (with an SQLWHERE
clause format). -
order_by
(list of objects) — Enforces ordering on the source.
Example
{
"enable_grouping": true,
"aggregations": [
{
"expression": "SUM(unit_price*quantity)",
"field_name": "total_amount_sold"
},
{
"expression": "SUM(quantity)",
"field_name": "total_quantity_sold"
}
],
"limit": 50,
"where": "sourceId IS NOT NULL"
}
Actions
The actions
section contains commands that can be run before or after specific steps of the import process.
Each step is called a stage
.
You may for example submit HTTP requests when steps complete, execute SQL queries on the source, or run Cypher statements on the Neo4j target instance.
...
"actions": [
{ <actionSpec1> },
{ <actionSpec2> },
...
]
Each action object must at minimum have the attribute name
, type
, and stage
.
Further attributes depend on the action type.
{
"type": "http",
"name": "<actionName>",
"stage": "<stageName>",
"method": "<get/post>",
"url": "<targetUrl>",
"headers": {}
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
method
(string) — The HTTP method; eitherget
orpost
. -
url
(string) — The URL the HTTP request should target. -
headers
(object, optional) — Request headers.
GET
request after import completes{
"type": "http",
"name": "Post load ping",
"stage": "end",
"method": "get",
"url": "https://neo4j.com/success",
"headers": {
"secret": "314159",
"moreSecret": "17320"
}
}
{
"type": "cypher",
"name": "<actionName>",
"stage": "<stageName>",
"query": "<cypherQuery>",
"execution_mode": "<transaction/autocommit>"
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
query
(string) — The Cypher query to run. -
execution_mode
(string, optional) — Under what mode the query should be executed. Valid values aretransaction
,autocommit
(default:transaction
).
importJob
node after import completes{
"type": "cypher",
"name": "Post load log",
"stage": "end",
"query": "MERGE (:importJob {date: datetime()})"
}
{
"type": "bigquery",
"name": "<actionName>",
"stage": "<stageName>",
"sql": "<sqlQuery>"
}
-
type
(string) — The action type. -
name
(string) — A human-friendly name for the action (unique among all names). -
stage
(string) — At what point of the import the action should run. Valid values are:start
,post_sources
,pre_nodes
,post_nodes
,pre_relationships
,post_relationships
,pre_queries
,post_queries
,end
. -
sql
(string) — The SQL query to run.
GET
request after import completes{
"type": "bigquery",
"name": "Post load log",
"stage": "end",
"sql": "INSERT INTO logs.imports (time) VALUES (NOW())"
}
Variables
Key-values can be supplied in Dataflow to replace $
delimited tokens.
You can provide parameters in the Options JSON
field when creating the Dataflow job, as a JSON object.
Variable interpolation works in:
-
BigQuery source query (SQL)
-
Text source URL
-
Custom Cypher target query
-
BigQuery action SQL
-
Cypher action query
-
HTTP GET/POST URL and header values.
Variables must be prefixed by the $
symbol (ex. $limit
), and may be used in job specification files and in readQuery
or inputFilePattern
(source URI) command-line parameters.