using System.CommandLine;
using System.Text.Json;
using Neo4j.Driver;
#pragma warning disable CS4014
namespace Neo4j.CDC.Sample;
static class Program
{
class CDCSample
{
private readonly IDriver _driver;
private readonly string? _database;
private volatile string? _from;
private readonly IEnumerable<object> _selectors;
private readonly ManualResetEventSlim _event;
public CDCSample(IDriver driver, string? database, string? from, IEnumerable<object>? selectors)
{
_driver = driver ?? throw new ArgumentNullException(nameof(driver));
_database = database;
_from = from;
_selectors = selectors ?? Enumerable.Empty<object>();
_event = new ManualResetEventSlim();
}
private static void ApplyChange(IRecord record) (1)
{
var jsonText = JsonSerializer.Serialize(record.Values, new JsonSerializerOptions()
{
WriteIndented = true
});
Console.WriteLine(jsonText);
}
private async Task QueryChanges(CancellationToken cancellation) (2)
{
await using var session = _driver.AsyncSession(ConfigureSession(_database));
var current = await CurrentChangeId();
await session.ExecuteReadAsync(async tx =>
{
var from = _from;
var result = await tx.RunAsync("CALL db.cdc.query($from, $selectors)", new
{
from,
selectors = _selectors,
});
var processed = 0;
await foreach (var record in result.WithCancellation(cancellation))
{
ApplyChange(record); (3)
_from = record["id"].As<string>(); (4)
processed++;
}
if (processed == 0)
{
_from = current; (5)
}
});
}
private static Action<SessionConfigBuilder> ConfigureSession(string? database)
{
return sc =>
{
if (!string.IsNullOrEmpty(database))
{
sc.WithDatabase(database);
}
};
}
private async Task<string> EarliestChangeId() (6)
{
var response = await _driver
.ExecutableQuery("CALL db.cdc.earliest")
.WithMap(record => record["id"].As<string>())
.ExecuteAsync();
return response.Result[0];
}
private async Task<string> CurrentChangeId() (7)
{
var response = await _driver
.ExecutableQuery("CALL db.cdc.current")
.WithMap(record => record["id"].As<string>())
.ExecuteAsync();
return response.Result[0];
}
public async Task Start(CancellationToken cancellation)
{
if (string.IsNullOrEmpty(_from))
{
_from = await CurrentChangeId();
}
_event.Reset();
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellation.IsCancellationRequested)
{
await QueryChanges(cancellation);
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellation); (8)
}
}
finally
{
_event.Set();
}
}, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
public void WaitForExit()
{
_event.Wait();
}
}
static async Task<int> Main(string[] args)
{
var uriOption = new Option<string>("--address", () => "bolt://localhost:7687", "Bolt URI");
uriOption.AddAlias("-a");
var databaseOption = new Option<string?>("--database", () => "", "Database");
databaseOption.AddAlias("-d");
var usernameOption = new Option<string>("--username", () => "neo4j", "Username");
usernameOption.AddAlias("-u");
var passwordOption = new Option<string>("--password", () => "passw0rd", "Password");
passwordOption.AddAlias("-p");
var fromOption = new Option<string?>("--from", () => null, "Change identifier to query changes from");
fromOption.AddAlias("-f");
var cmd = new RootCommand("Sample CDC application");
cmd.AddOption(uriOption);
cmd.AddOption(databaseOption);
cmd.AddOption(usernameOption);
cmd.AddOption(passwordOption);
cmd.AddOption(fromOption);
cmd.SetHandler(ctx =>
{
var cancellation = ctx.GetCancellationToken();
var uri = ctx.ParseResult.GetValueForOption(uriOption);
var database = ctx.ParseResult.GetValueForOption(databaseOption);
var username = ctx.ParseResult.GetValueForOption(usernameOption);
var password = ctx.ParseResult.GetValueForOption(passwordOption);
var from = ctx.ParseResult.GetValueForOption(fromOption);
DoRootCommand(cancellation, uri!, username!, password!, database, from)
.Wait(cancellation);
});
return await cmd.InvokeAsync(args);
}
private static async Task DoRootCommand(CancellationToken cancellation, string uri, string username,
string password,
string? database, string? from)
{
try
{
var selectors = new List<object>
{
// new (9)
// {
// select = "n", labels = new[] { "Person", "Employee" }
// },
};
await using var driver = GraphDatabase.Driver(uri, AuthTokens.Basic(username, password));
var service = new CDCSample(driver, database, from, selectors);
await service.Start(cancellation);
await Console.Out.WriteLineAsync("starting...");
service.WaitForExit();
await Console.Out.WriteLineAsync("quitting...");
}
catch (Exception e)
{
await Console.Error.WriteLineAsync("Error: " + e);
}
}
}