Machine learning pipelines: Node classification

Open In Colab

This Jupyter notebook is hosted here in the Neo4j Graph Data Science Client Github repository.

The notebook shows the usage of GDS machine learning pipelines with the Python client and the well-known Cora dataset.

The task we cover here is a typical use case in graph machine learning: the classification of nodes given a graph and some node features.

1. Setup

We need a dedicated environment where Neo4j and GDS are available, for example a fresh AuraDS instance (which comes with GDS preinstalled) or Neo4j Desktop with a dedicated database.

Please note that we will be writing to and deleting data from Neo4j.

Once the credentials to access this environment are available, we can install the graphdatascience package and import the client class.

%pip install graphdatascience
import os
from graphdatascience import GraphDataScience

When using a local Neo4j setup, the default connection URI is bolt://localhost:7687; when using AuraDS, instead, the connection URI is slightly different as it uses the neo4j+s protocol. In this case, the client should also include the aura_ds=True flag to enable AuraDS-recommended settings. Check the Neo4j GDS Client docs for more details.

# Get Neo4j DB URI, credentials and name from environment if applicable
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")
NEO4J_AUTH = None
NEO4J_DB = os.environ.get("NEO4J_DB", "neo4j")
if os.environ.get("NEO4J_USER") and os.environ.get("NEO4J_PASSWORD"):
    NEO4J_AUTH = (
        os.environ.get("NEO4J_USER"),
        os.environ.get("NEO4J_PASSWORD"),
    )
gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB)

# On AuraDS:
#
# gds = GraphDataScience(NEO4J_URI, auth=NEO4J_AUTH, database=NEO4J_DB, aura_ds=True)

We also need to check that the version of the GDS library is 2.2.0 or newer, because we will use the concept of “context” introduced in GDS 2.2.0.

assert gds.version() >= "2.2.0"

Finally, we import json to help in writing the Cypher queries used to load the data, and numpy and pandas for further data processing.

import json

import numpy as np
import pandas as pd

2. Loading the Cora dataset

First of all, we need to load the Cora dataset on Neo4j. The latest versions of the GDS client include the Cora dataset as a ready-to-use graph (see for instance the PyG example notebook); alternatively, the graph construction notebook shows how to project the Cora graph in memory without writing it to Neo4j. In this tutorial, anyway, we use data from CSV files and some Cypher code to run an end-to-end example, from loading the source data into Neo4j to training a model and using it for predictions.

Please note that, if you use the Cora graph loader or the graph construction method on an AuraDS instance, you cannot write the data to the Neo4j database.

The CSV files can be found at the following URIs:

CORA_CONTENT = "https://data.neo4j.com/cora/cora.content"
CORA_CITES = "https://data.neo4j.com/cora/cora.cites"

Upon loading, we need to perform an additional preprocessing step to convert the subject field (which is a string in the dataset) into an integer, because node properties have to be numerical in order to be projected into a graph; although we could assign consecutive IDs, we assign an ID other than 0 to the first subject to later show how the class labels are represented in the model.

We also select a number of nodes to be held out to test the model after it has been trained. NOTE: This is not related to the algorithm test/split ratio.

SUBJECT_TO_ID = {
    "Neural_Networks": 100,
    "Rule_Learning": 1,
    "Reinforcement_Learning": 2,
    "Probabilistic_Methods": 3,
    "Theory": 4,
    "Genetic_Algorithms": 5,
    "Case_Based": 6,
}

HOLDOUT_NODES = 10

We can now load the CSV files using the LOAD CSV Cypher statement and some basic data transformation:

# Define a string representation of the SUBJECT_TO_ID map using backticks
subject_map = json.dumps(SUBJECT_TO_ID).replace('"', "`")

# Cypher command to load the nodes using `LOAD CSV`, taking care of
# converting the string `subject` field into an integer and
# replacing the node label for the holdout nodes
load_nodes = f"""
    LOAD CSV FROM "{CORA_CONTENT}" AS row
    WITH
      {subject_map} AS subject_to_id,
      toInteger(row[0]) AS extId,
      row[1] AS subject,
      toIntegerList(row[2..]) AS features
    MERGE (p:Paper {{extId: extId, subject: subject_to_id[subject], features: features}})
    WITH p LIMIT {HOLDOUT_NODES}
    REMOVE p:Paper
    SET p:UnclassifiedPaper
"""

# Cypher command to load the relationships using `LOAD CSV`
load_relationships = f"""
    LOAD CSV FROM "{CORA_CITES}" AS row
    MATCH (n), (m)
    WHERE n.extId = toInteger(row[0]) AND m.extId = toInteger(row[1])
    MERGE (n)-[:CITES]->(m)
"""

# Load nodes and relationships on Neo4j
gds.run_cypher(load_nodes)
gds.run_cypher(load_relationships)

With the data loaded on Neo4j, we can now project a graph including all the nodes and the CITES relationship as undirected (and with SINGLE aggregation, to skip repeated relationships as a result of adding the inverse direction).

# Create the projected graph containing both classified and unclassified nodes
G, _ = gds.graph.project(
    "cora-graph",
    {"Paper": {"properties": ["features", "subject"]}, "UnclassifiedPaper": {"properties": ["features"]}},
    {"CITES": {"orientation": "UNDIRECTED", "aggregation": "SINGLE"}},
)

We can finally check the number of nodes and relationships in the newly-projected graph to make sure it has been created correctly:

assert G.node_count() == 2708
assert G.relationship_count() == 10556

3. Pipeline catalog basics

Once the dataset has been loaded, we can define a node classification machine learning pipeline.

# Create the pipeline
node_pipeline, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline")

We can check that the pipeline has actually been created with the list method:

# List all pipelines
gds.beta.pipeline.list()

# Alternatively, get the details of a specific pipeline object
gds.beta.pipeline.list(node_pipeline)

4. Configuring the pipeline

We can now configure the pipeline. As a reminder, we need to:

  1. Select a subset of the available node properties to be used as features for the machine learning model

  2. Configure the train/test split and the number of folds for k-fold cross-validation (optional)

  3. Configure the candidate models for training

  4. Configure autotuning (optional) In this example we use Logistic Regression as a candidate model for the training, but other algorithms (such as Random Forest) are available as well. We also set some reasonable starting parameters that can be further tuned according to the needed metrics.

Some hyperparameters such as penalty can be single values or ranges. If they are expressed as ranges, autotuning is used to search their best value.

The configureAutoTuning method can be used to set the number of model candidates to try. Here we choose 5 to keep the training time short.

# "Mark" some node properties that will be used as features
node_pipeline.selectFeatures(["features"])

# If needed, change the train/test split ratio and the number of folds
# for k-fold cross-validation
node_pipeline.configureSplit(testFraction=0.2, validationFolds=5)

# Add a model candidate to train
node_pipeline.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))

# Explicit set the number of trials for autotuning (default = 10)
node_pipeline.configureAutoTuning(maxTrials=5)

5. Training the pipeline

The configured pipeline is now ready to select and train a model. We also run a training estimate, to make sure there are enough resources to run the actual training afterwards.

The Node Classification model supports several evaluation metrics. Here we use the global metric F1_WEIGHTED.

NOTE: The concurrency parameter is explicitly set to 4 (the default value) for demonstration purposes. The maximum concurrency in the library is limited to 4 for Neo4j Community Edition.

# Estimate the resources needed for training the model
node_pipeline.train_estimate(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)
# Perform the actual training
model, stats = node_pipeline.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

We can inspect the result of the training, for example to print the evaluation metrics of the trained model.

# Uncomment to print all stats
# print(stats.to_json(indent=2))

# Print F1_WEIGHTED metric
stats["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"]

6. Using the model for prediction

After training, the model is ready to classify unclassified data.

One simple way to use the predict mode is to just stream the result of the prediction. This can be impractical when a graph is very large, so it should be only used for experimentation purposes.

predicted = model.predict_stream(
    G, modelName="cora-pipeline-model", includePredictedProbabilities=True, targetNodeLabels=["UnclassifiedPaper"]
)

The result of the prediction is a Pandas DataFrame containing the predicted class and the predicted probabilities for all the classes for each node.

predicted

The order of the classes in the predictedProbabilities field is given in the model information, and can be used to retrieve the predicted probability for the predicted class.

Please note that the order in which the classes appear in the predictedProbabilities field is somewhat arbitrary, so the correct way to access each probability is via the class index obtained from the model, not its position.

# List of class labels
classes = stats["modelInfo"]["classes"]
print("Class labels:", classes)

# Calculate the confidence percentage for the predicted class
predicted["confidence"] = predicted.apply(
    lambda row: np.floor(row["predictedProbabilities"][classes.index(row["predictedClass"])] * 100), axis=1
)

predicted

7. Adding a data preprocessing step

The quality of the model can potentially be increased by adding more features or by using different features altogether. One way is to use algorithms such as FastRP that create embeddings based on both node properties and graph features, which can be added via the addNodeProperty pipeline method. Such properties are “transient”, in that they are automatically created and removed by the pipeline itself.

In this example we also use the contextNodeLabels parameter to explicitly set the types of nodes we calculate the embeddings for, and we include both the classified and the unclassified nodes. This is useful because the more nodes are used, the better the generated embeddings are. Although it may seem counterintuitive, unclassified nodes do not need to be completely unobserved during training (so, for instance, information on their neighbours can be retained). More information can be found in graph ML publications such as the Graph Representation Learning Book.

node_pipeline_fastrp, _ = gds.beta.pipeline.nodeClassification.create("cora-pipeline-fastrp")

# Add a step in the pipeline that mutates the graph
node_pipeline_fastrp.addNodeProperty(
    "fastRP",
    mutateProperty="embedding",
    embeddingDimension=512,
    propertyRatio=1.0,
    randomSeed=42,
    featureProperties=["features"],
    contextNodeLabels=["Paper", "UnclassifiedPaper"],
)

# With the node embeddings available as features, we no longer use the original raw `features`.
node_pipeline_fastrp.selectFeatures(["embedding"])

# Configure the pipeline as before
node_pipeline_fastrp.configureSplit(testFraction=0.2, validationFolds=5)
node_pipeline_fastrp.addLogisticRegression(maxEpochs=200, penalty=(0.0, 0.5))
node_pipeline.configureAutoTuning(maxTrials=5)

The training then proceeds as in the previous section:

# Perform the actual training
model_fastrp, stats_fastrp = node_pipeline_fastrp.train(
    G,
    targetNodeLabels=["Paper"],
    modelName="cora-pipeline-model-fastrp",
    targetProperty="subject",
    metrics=["F1_WEIGHTED"],
    randomSeed=42,
    concurrency=4,
)

The F1_WEIGHTED metrics is better with embeddings:

print(stats_fastrp["modelInfo"]["metrics"]["F1_WEIGHTED"]["test"])

The classification using predict_stream can be run in the same way:

predicted_fastrp = model_fastrp.predict_stream(
    G,
    modelName="cora-pipeline-model-fastrp",
    includePredictedProbabilities=True,
    targetNodeLabels=["UnclassifiedPaper"],
)
print(len(predicted_fastrp))

Instead of streaming the results, the prediction can be run in mutate mode to be more performant, especially when the predicted values are used multiple times. The predicted nodes can be retrieved using the streamNodeProperty method with the UnclassifiedPaper class.

model_fastrp.predict_mutate(
    G,
    mutateProperty="predictedClass",
    modelName="cora-pipeline-model-fastrp",
    predictedProbabilityProperty="predictedProbabilities",
    targetNodeLabels=["UnclassifiedPaper"],
)

predicted_fastrp = gds.graph.nodeProperty.stream(G, "predictedClass", ["UnclassifiedPaper"])
predicted_fastrp

This is useful to compare the result of classification with the original subject value of the test nodes, which must be retrieved from the Neo4j database since it has been excluded from the projected graph.

# Retrieve node information from Neo4j using the node IDs from the prediction result
nodes = gds.util.asNodes(predicted_fastrp.nodeId.to_list())

# Create a new DataFrame containing node IDs along with node properties
nodes_df = pd.DataFrame([(node.id, node["subject"]) for node in nodes], columns=["nodeId", "subject"])

# Merge with the prediction result on node IDs, to check the predicted value
# against the original subject
#
# NOTE: This could also be replaced by just appending `node["subject"]` as a
# Series since the node order would not change, but a proper merge (or join)
# is clearer and less prone to errors.
predicted_fastrp.merge(nodes_df, on="nodeId")

As we can see, the prediction for all the test nodes is accurate.

8. Writing result back to Neo4j

Having the predicted class written back to the graph, we can now write them back to the Neo4j database.

Please note that this step is not applicable if you are running this notebook on AuraDS.

gds.graph.nodeProperties.write(
    G,
    node_properties=["predictedClass"],
    node_labels=["UnclassifiedPaper"],
)

9. Cleanup

When the graph, the model and the pipeline are no longer needed, they should be dropped to free up memory. This only needs to be done if the Neo4j or AuraDS instance is not restarted, since a restart would clean up all the in-memory content anyway.

model.drop()
model_fastrp.drop()
node_pipeline.drop()
node_pipeline_fastrp.drop()

G.drop()

The Neo4j database instead needs to be cleaned up explicitly if no longer useful:

gds.run_cypher("MATCH (n) WHERE n:Paper OR n:UnclassifiedPaper DETACH DELETE n")

It is good practice to close the client as well:

gds.close()