package org.neo4j.example;
import org.neo4j.driver.*;
import org.neo4j.driver.Record;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static picocli.CommandLine.*;
class CDCService implements Closeable {
private final Driver driver;
private final String database;
private final AtomicReference<String> cursor = new AtomicReference<>(null);
private final ScheduledThreadPoolExecutor scheduler;
private List<Map<String, Object>> selectors = List.of();
public CDCService(String uri, String database, String user, String password) {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
scheduler = new ScheduledThreadPoolExecutor(1);
this.database = database;
}
@Override
public void close(){
driver.close();
scheduler.shutdown();
}
public void setSelectors(List<Map<String, Object>> selectors) {
this.selectors = selectors;
}
private void applyChange(Record change) { (1)
var eventType = change.get("event").asMap().get("eventType");
var operation = change.get("event").asMap().get("operation");
System.out.println("Element of type '%s' changed with operation '%s'".formatted(eventType, operation));
System.out.println(change);
}
private synchronized void queryChanges() { (2)
try (var session = driver.session(SessionConfig.forDatabase(database))) {
var current = currentChangeID();
session.executeRead(tx -> {
var result = tx.run(new Query("CALL db.cdc.query($from, $selectors)", Map.of("from", cursor.get(), "selectors", selectors)));
if (!result.hasNext()) {
cursor.set(current); (3)
} else {
while (result.hasNext()) {
var change = result.next();
applyChange(change); (4)
cursor.set(change.get("id").asString()); (5)
}
}
return cursor.get();
});
} catch (Exception e) {
System.err.println(e.getMessage());
throw new RuntimeException("Error querying/processing changes.", e);
}
}
private String earliestChangeID() { (6)
try (var session = driver.session(SessionConfig.forDatabase(database))) {
return session.executeRead(tx -> {
var result = tx.run(new Query("CALL db.cdc.earliest"));
return result.single().get("id").asString();
});
}
}
private String currentChangeID() { (7)
try (var session = driver.session(SessionConfig.forDatabase(database))) {
return session.executeRead(tx -> {
var result = tx.run(new Query("CALL db.cdc.current"));
return result.single().get("id").asString();
});
}
}
public void start(String from) {
cursor.set(from == null ? currentChangeID() : from);
scheduler.scheduleWithFixedDelay(this::queryChanges, 0, 500, TimeUnit.MILLISECONDS); (8)
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
}
public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
scheduler.awaitTermination(timeout, unit);
}
}
@Command(name = "Neo4j CDC example usage", mixinStandardHelpOptions = true, version = "1.0", description = "Connects to neo4j and queries for change events through cdc procedures.")
public class Example implements Callable<Integer> {
@Option(names = {"-a", "--address"}, description = "Where to find the neo4j instance to connect to")
private String address = "bolt://localhost:7687";
@Option(names = {"-d", "--database"}, description = "Which database to access")
private String database = "neo4j";
@Option(names = {"-u", "--username"}, description = "Username for authenticating against the neo4j server")
private String username = "neo4j";
@Option(names = {"-p", "--password"}, description = "Password for authenticating against the neo4j server")
private String password = "passw0rd";
@Option(names = {"-f", "--from"}, description = "Cursor value to start streaming from")
private String from = null;
@Override
public Integer call() {
var cdcService = new CDCService(address, database, username, password);
List<Map<String, Object>> selectors = List.of( (9)
// Map.of("select", "n")
);
cdcService.setSelectors(selectors);
cdcService.start(from);
System.out.println("started querying changes...");
try {
cdcService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("quitting...");
System.out.flush();
return 0;
}
public static void main(String... args) {
int exitCode = new CommandLine(new Example()).execute(args);
System.exit(exitCode);
}
}