lib6/session.js
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
/* eslint-disable @typescript-eslint/promise-function-async */
import { FailedObserver } from './internal/observers';
import { validateQueryAndParameters } from './internal/util';
import { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE, TELEMETRY_APIS } from './internal/constants';
import { newError } from './error';
import Result from './result';
import { ConnectionHolder } from './internal/connection-holder';
import { TransactionExecutor } from './internal/transaction-executor';
import { Bookmarks } from './internal/bookmarks';
import { TxConfig } from './internal/tx-config';
import TransactionPromise from './transaction-promise';
import ManagedTransaction from './transaction-managed';
/**
* A Session instance is used for handling the connection and
* sending queries through the connection.
* In a single session, multiple queries will be executed serially.
* In order to execute parallel queries, multiple sessions are required.
* @access public
*/
class Session {
/**
* @constructor
* @protected
* @param {Object} args
* @param {string} args.mode the default access mode for this session.
* @param {ConnectionProvider} args.connectionProvider - The connection provider to acquire connections from.
* @param {Bookmarks} args.bookmarks - The initial bookmarks for this session.
* @param {string} args.database the database name
* @param {Object} args.config={} - This driver configuration.
* @param {boolean} args.reactive - Whether this session should create reactive streams
* @param {number} args.fetchSize - Defines how many records is pulled in each pulling batch
* @param {string} args.impersonatedUser - The username which the user wants to impersonate for the duration of the session.
* @param {AuthToken} args.auth - the target auth for the to-be-acquired connection
* @param {NotificationFilter} args.notificationFilter - The notification filter used for this session.
*/
constructor({ mode, connectionProvider, bookmarks, database, config, reactive, fetchSize, impersonatedUser, bookmarkManager, notificationFilter, auth, log }) {
this._mode = mode;
this._database = database;
this._reactive = reactive;
this._fetchSize = fetchSize;
this._onDatabaseNameResolved = this._onDatabaseNameResolved.bind(this);
this._getConnectionAcquistionBookmarks = this._getConnectionAcquistionBookmarks.bind(this);
this._readConnectionHolder = new ConnectionHolder({
mode: ACCESS_MODE_READ,
auth,
database,
bookmarks,
connectionProvider,
impersonatedUser,
onDatabaseNameResolved: this._onDatabaseNameResolved,
getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks,
log
});
this._writeConnectionHolder = new ConnectionHolder({
mode: ACCESS_MODE_WRITE,
auth,
database,
bookmarks,
connectionProvider,
impersonatedUser,
onDatabaseNameResolved: this._onDatabaseNameResolved,
getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks,
log
});
this._open = true;
this._hasTx = false;
this._impersonatedUser = impersonatedUser;
this._lastBookmarks = bookmarks !== null && bookmarks !== void 0 ? bookmarks : Bookmarks.empty();
this._configuredBookmarks = this._lastBookmarks;
this._transactionExecutor = _createTransactionExecutor(config);
this._databaseNameResolved = this._database !== '';
const calculatedWatermaks = this._calculateWatermaks();
this._lowRecordWatermark = calculatedWatermaks.low;
this._highRecordWatermark = calculatedWatermaks.high;
this._results = [];
this._bookmarkManager = bookmarkManager;
this._notificationFilter = notificationFilter;
this._log = log;
}
/**
* Run Cypher query
* Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}`
* or with the query and parameters as separate arguments.
*
* @public
* @param {mixed} query - Cypher query to execute
* @param {Object} parameters - Map with parameters to use in query
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
* @return {Result} New Result.
*/
run(query, parameters, transactionConfig) {
const { validatedQuery, params } = validateQueryAndParameters(query, parameters);
const autoCommitTxConfig = (transactionConfig != null)
? new TxConfig(transactionConfig, this._log)
: TxConfig.empty();
const result = this._run(validatedQuery, params, (connection) => __awaiter(this, void 0, void 0, function* () {
const bookmarks = yield this._bookmarks();
this._assertSessionIsOpen();
return connection.run(validatedQuery, params, {
bookmarks,
txConfig: autoCommitTxConfig,
mode: this._mode,
database: this._database,
apiTelemetryConfig: {
api: TELEMETRY_APIS.AUTO_COMMIT_TRANSACTION
},
impersonatedUser: this._impersonatedUser,
afterComplete: (meta) => this._onCompleteCallback(meta, bookmarks),
reactive: this._reactive,
fetchSize: this._fetchSize,
lowRecordWatermark: this._lowRecordWatermark,
highRecordWatermark: this._highRecordWatermark,
notificationFilter: this._notificationFilter
});
}));
this._results.push(result);
return result;
}
_run(query, parameters, customRunner) {
const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner);
const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error })));
const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark };
return new Result(observerPromise, query, parameters, connectionHolder, watermarks);
}
/**
* This method is used by Rediscovery on the neo4j-driver-bolt-protocol package.
*
* @private
* @param {function()} connectionConsumer The method which will use the connection
* @returns {Promise<T>} A connection promise
*/
_acquireConnection(connectionConsumer) {
const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer);
return resultPromise.then((result) => __awaiter(this, void 0, void 0, function* () {
yield connectionHolder.releaseConnection();
return result;
}));
}
/**
* Acquires a {@link Connection}, consume it and return a promise of the result along with
* the {@link ConnectionHolder} used in the process.
*
* @private
* @param connectionConsumer
* @returns {object} The connection holder and connection promise.
*/
_acquireAndConsumeConnection(connectionConsumer) {
let resultPromise;
const connectionHolder = this._connectionHolderWithMode(this._mode);
if (!this._open) {
resultPromise = Promise.reject(newError('Cannot run query in a closed session.'));
}
else if (!this._hasTx && connectionHolder.initializeConnection()) {
resultPromise = connectionHolder
.getConnection()
// Connection won't be null at this point since the initialize method
// return
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
.then(connection => connectionConsumer(connection));
}
else {
resultPromise = Promise.reject(newError('Queries cannot be run directly on a ' +
'session with an open transaction; either run from within the ' +
'transaction or use a different session.'));
}
return { connectionHolder, resultPromise };
}
/**
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
*
* While a transaction is open the session cannot be used to run queries outside the transaction.
*
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
* @returns {TransactionPromise} New Transaction.
*/
beginTransaction(transactionConfig) {
// this function needs to support bookmarks parameter for backwards compatibility
// parameter was of type {string|string[]} and represented either a single or multiple bookmarks
// that's why we need to check parameter type and decide how to interpret the value
const arg = transactionConfig;
let txConfig = TxConfig.empty();
if (arg != null) {
txConfig = new TxConfig(arg, this._log);
}
return this._beginTransaction(this._mode, txConfig, { api: TELEMETRY_APIS.UNMANAGED_TRANSACTION });
}
_beginTransaction(accessMode, txConfig, apiTelemetryConfig) {
if (!this._open) {
throw newError('Cannot begin a transaction on a closed session.');
}
if (this._hasTx) {
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
'either run from within the transaction or use a different session.');
}
const mode = Session._validateSessionMode(accessMode);
const connectionHolder = this._connectionHolderWithMode(mode);
connectionHolder.initializeConnection();
this._hasTx = true;
const tx = new TransactionPromise({
connectionHolder,
impersonatedUser: this._impersonatedUser,
onClose: this._transactionClosed.bind(this),
onBookmarks: (newBm, oldBm, db) => this._updateBookmarks(newBm, oldBm, db),
onConnection: this._assertSessionIsOpen.bind(this),
reactive: this._reactive,
fetchSize: this._fetchSize,
lowRecordWatermark: this._lowRecordWatermark,
highRecordWatermark: this._highRecordWatermark,
notificationFilter: this._notificationFilter,
apiTelemetryConfig
});
tx._begin(() => this._bookmarks(), txConfig);
return tx;
}
/**
* @private
* @returns {void}
*/
_assertSessionIsOpen() {
if (!this._open) {
throw newError('You cannot run more transactions on a closed session.');
}
}
/**
* @private
* @returns {void}
*/
_transactionClosed() {
this._hasTx = false;
}
/**
* Return the bookmarks received following the last completed {@link Transaction}.
*
* @deprecated This method will be removed in version 6.0. Please, use Session#lastBookmarks instead.
*
* @return {string[]} A reference to a previous transaction.
* @see {@link Session#lastBookmarks}
*/
lastBookmark() {
return this.lastBookmarks();
}
/**
* Return the bookmarks received following the last completed {@link Transaction}.
*
* @return {string[]} A reference to a previous transaction.
*/
lastBookmarks() {
return this._lastBookmarks.values();
}
_bookmarks() {
var _a;
return __awaiter(this, void 0, void 0, function* () {
const bookmarks = yield ((_a = this._bookmarkManager) === null || _a === void 0 ? void 0 : _a.getBookmarks());
if (bookmarks === undefined) {
return this._lastBookmarks;
}
return new Bookmarks([...bookmarks, ...this._configuredBookmarks]);
});
}
/**
* Execute given unit of work in a {@link READ} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use Session#executeRead instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
* @see {@link Session#executeRead}
*/
readTransaction(transactionWork, transactionConfig) {
const config = new TxConfig(transactionConfig, this._log);
return this._runTransaction(ACCESS_MODE_READ, config, transactionWork);
}
/**
* Execute given unit of work in a {@link WRITE} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use Session#executeWrite instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
* @see {@link Session#executeWrite}
*/
writeTransaction(transactionWork, transactionConfig) {
const config = new TxConfig(transactionConfig, this._log);
return this._runTransaction(ACCESS_MODE_WRITE, config, transactionWork);
}
_runTransaction(accessMode, transactionConfig, transactionWork) {
return this._transactionExecutor.execute((apiTelemetryConfig) => this._beginTransaction(accessMode, transactionConfig, apiTelemetryConfig), transactionWork);
}
/**
* Execute given unit of work in a {@link READ} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeRead(transactionWork, transactionConfig) {
const config = new TxConfig(transactionConfig, this._log);
return this._executeInTransaction(ACCESS_MODE_READ, config, transactionWork);
}
/**
* Execute given unit of work in a {@link WRITE} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeWrite(transactionWork, transactionConfig) {
const config = new TxConfig(transactionConfig, this._log);
return this._executeInTransaction(ACCESS_MODE_WRITE, config, transactionWork);
}
/**
* @private
* @param {SessionMode} accessMode
* @param {TxConfig} transactionConfig
* @param {ManagedTransactionWork} transactionWork
* @returns {Promise}
*/
_executeInTransaction(accessMode, transactionConfig, transactionWork) {
return this._transactionExecutor.execute((apiTelemetryConfig) => this._beginTransaction(accessMode, transactionConfig, apiTelemetryConfig), transactionWork, ManagedTransaction.fromTransaction);
}
/**
* Sets the resolved database name in the session context.
* @private
* @param {string|undefined} database The resolved database name
* @returns {void}
*/
_onDatabaseNameResolved(database) {
if (!this._databaseNameResolved) {
const normalizedDatabase = database !== null && database !== void 0 ? database : '';
this._database = normalizedDatabase;
this._readConnectionHolder.setDatabase(normalizedDatabase);
this._writeConnectionHolder.setDatabase(normalizedDatabase);
this._databaseNameResolved = true;
}
}
_getConnectionAcquistionBookmarks() {
var _a;
return __awaiter(this, void 0, void 0, function* () {
const bookmarks = yield ((_a = this._bookmarkManager) === null || _a === void 0 ? void 0 : _a.getBookmarks());
if (bookmarks === undefined) {
return this._lastBookmarks;
}
return new Bookmarks([...this._configuredBookmarks, ...bookmarks]);
});
}
/**
* Update value of the last bookmarks.
* @private
* @param {Bookmarks} newBookmarks - The new bookmarks.
* @returns {void}
*/
_updateBookmarks(newBookmarks, previousBookmarks, database) {
var _a, _b, _c;
if ((newBookmarks != null) && !newBookmarks.isEmpty()) {
(_a = this._bookmarkManager) === null || _a === void 0 ? void 0 : _a.updateBookmarks((_b = previousBookmarks === null || previousBookmarks === void 0 ? void 0 : previousBookmarks.values()) !== null && _b !== void 0 ? _b : [], (_c = newBookmarks === null || newBookmarks === void 0 ? void 0 : newBookmarks.values()) !== null && _c !== void 0 ? _c : []).catch(() => { });
this._lastBookmarks = newBookmarks;
this._configuredBookmarks = Bookmarks.empty();
}
}
/**
* Close this session.
* @return {Promise}
*/
close() {
return __awaiter(this, void 0, void 0, function* () {
if (this._open) {
this._open = false;
this._results.forEach(result => result._cancel());
this._transactionExecutor.close();
yield this._readConnectionHolder.close(this._hasTx);
yield this._writeConnectionHolder.close(this._hasTx);
}
});
}
// eslint-disable-next-line
// @ts-ignore
[Symbol.asyncDispose]() {
return this.close();
}
_connectionHolderWithMode(mode) {
if (mode === ACCESS_MODE_READ) {
return this._readConnectionHolder;
}
else if (mode === ACCESS_MODE_WRITE) {
return this._writeConnectionHolder;
}
else {
throw newError('Unknown access mode: ' + mode);
}
}
/**
* @private
* @param {Object} meta Connection metadatada
* @returns {void}
*/
_onCompleteCallback(meta, previousBookmarks) {
this._updateBookmarks(new Bookmarks(meta.bookmark), previousBookmarks, meta.db);
}
/**
* @private
* @returns {void}
*/
_calculateWatermaks() {
if (this._fetchSize === FETCH_ALL) {
return {
low: Number.MAX_VALUE,
high: Number.MAX_VALUE // we shall never reach this number to disable auto pull
};
}
return {
low: 0.3 * this._fetchSize,
high: 0.7 * this._fetchSize
};
}
/**
* Configure the transaction executor
*
* This used by {@link Driver#executeQuery}
* @private
* @returns {void}
*/
_configureTransactionExecutor(pipelined, telemetryApi) {
this._transactionExecutor.pipelineBegin = pipelined;
this._transactionExecutor.telemetryApi = telemetryApi;
}
/**
* @protected
*/
static _validateSessionMode(rawMode) {
const mode = rawMode !== null && rawMode !== void 0 ? rawMode : ACCESS_MODE_WRITE;
if (mode !== ACCESS_MODE_READ && mode !== ACCESS_MODE_WRITE) {
throw newError('Illegal session mode ' + mode);
}
return mode;
}
}
/**
* @private
* @param {object} config
* @returns {TransactionExecutor} The transaction executor
*/
function _createTransactionExecutor(config) {
var _a;
const maxRetryTimeMs = (_a = config === null || config === void 0 ? void 0 : config.maxTransactionRetryTime) !== null && _a !== void 0 ? _a : null;
return new TransactionExecutor(maxRetryTimeMs);
}
export default Session;