src/session-rx.js
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { defer, Observable, of, throwError } from 'rxjs'
import { mergeMap, catchError, concatWith } from 'rxjs/operators'
import RxResult from './result-rx'
// eslint-disable-next-line no-unused-vars
import { Session, internal } from 'neo4j-driver-core'
import RxTransaction from './transaction-rx'
import RxManagedTransaction from './transaction-managed-rx'
import RxRetryLogic from './internal/retry-logic-rx'
const {
constants: { ACCESS_MODE_READ, ACCESS_MODE_WRITE, TELEMETRY_APIS },
txConfig: { TxConfig }
} = internal
/**
* A Reactive session, which provides the same functionality as {@link Session} but through a Reactive API.
*/
export default class RxSession {
/**
* Constructs a reactive session with given default session instance and provided driver configuration.
*
* @protected
* @param {Object} param - Object parameter
* @param {Session} param.session - The underlying session instance to relay requests
*/
constructor ({ session, config, log } = {}) {
this._session = session
this._retryLogic = _createRetryLogic(config)
this._log = log
}
/**
* Creates a reactive result that will execute the query with the provided parameters and the provided
* transaction configuration that applies to the underlying auto-commit transaction.
*
* @public
* @param {string} query - Query to be executed.
* @param {Object} parameters - Parameter values to use in query execution.
* @param {TransactionConfig} transactionConfig - Configuration for the new auto-commit transaction.
* @returns {RxResult} - A reactive result
*/
run (query, parameters, transactionConfig) {
return new RxResult(
new Observable(observer => {
try {
observer.next(this._session.run(query, parameters, transactionConfig))
observer.complete()
} catch (err) {
observer.error(err)
}
return () => {}
})
)
}
/**
* Starts a new explicit transaction with the provided transaction configuration.
*
* @public
* @param {TransactionConfig} transactionConfig - Configuration for the new transaction.
* @returns {Observable<RxTransaction>} - A reactive stream that will generate at most **one** RxTransaction instance.
*/
beginTransaction (transactionConfig) {
return this._beginTransaction(this._session._mode, transactionConfig, { api: TELEMETRY_APIS.UNMANAGED_TRANSACTION })
}
/**
* Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided
* transaction configuration.
* @public
* @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeRead} instead.
* @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
* @returns {Observable} - A reactive stream returned by the unit of work.
*/
readTransaction (work, transactionConfig) {
return this._runTransaction(ACCESS_MODE_READ, work, transactionConfig)
}
/**
* Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided
* transaction configuration.
* @public
* @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeWrite} instead.
* @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
* @returns {Observable} - A reactive stream returned by the unit of work.
*/
writeTransaction (work, transactionConfig) {
return this._runTransaction(ACCESS_MODE_WRITE, work, transactionConfig)
}
/**
* Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided
* transaction configuration.
* @public
* @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed.
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
* @returns {Observable} - A reactive stream returned by the unit of work.
*/
executeRead (work, transactionConfig) {
return this._executeInTransaction(ACCESS_MODE_READ, work, transactionConfig)
}
/**
* Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided
* transaction configuration.
* @public
* @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed.
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
* @returns {Observable} - A reactive stream returned by the unit of work.
*/
executeWrite (work, transactionConfig) {
return this._executeInTransaction(ACCESS_MODE_WRITE, work, transactionConfig)
}
/**
* @private
* @param {function(txc: RxManagedTransaction): Observable} work
* @param {TransactionConfig} transactionConfig
* @returns {Observable}
*/
_executeInTransaction (accessMode, work, transactionConfig) {
const wrapper = txc => new RxManagedTransaction({
run: txc.run.bind(txc)
})
return this._runTransaction(accessMode, work, transactionConfig, wrapper)
}
/**
* Closes this reactive session.
*
* @public
* @returns {Observable} - An empty reactive stream
*/
close () {
return new Observable(observer => {
this._session
.close()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
}
[Symbol.asyncDispose] () {
return this.close()
}
/**
* Returns the bookmarks received following the last successfully completed query, which is executed
* either in an {@link RxTransaction} obtained from this session instance or directly through one of
* the {@link RxSession#run} method of this session instance.
*
* If no bookmarks were received or if this transaction was rolled back, the bookmarks value will not be
* changed.
*
* @deprecated This method will be removed in 6.0 version. Please, use {@link RxSession#lastBookmarks} instead.
*
* @public
* @returns {string[]}
*/
lastBookmark () {
return this.lastBookmarks()
}
/**
* Returns the bookmarks received following the last successfully completed query, which is executed
* either in an {@link RxTransaction} obtained from this session instance or directly through one of
* the {@link RxSession#run} method of this session instance.
*
* If no bookmarks were received or if this transaction was rolled back, the bookmarks value will not be
* changed.
*
* @public
* @returns {string[]}
*/
lastBookmarks () {
return this._session.lastBookmarks()
}
/**
* @private
*/
_beginTransaction (accessMode, transactionConfig, apiTelemetryConfig) {
let txConfig = TxConfig.empty()
if (transactionConfig) {
txConfig = new TxConfig(transactionConfig, this._log)
}
return new Observable(observer => {
try {
this._session._beginTransaction(accessMode, txConfig, apiTelemetryConfig)
.then(tx => {
observer.next(
new RxTransaction(tx)
)
observer.complete()
})
.catch(err => observer.error(err))
} catch (err) {
observer.error(err)
}
return () => {}
})
}
/**
* @private
*/
_runTransaction (accessMode, work, transactionConfig, transactionWrapper = (tx) => tx) {
let txConfig = TxConfig.empty()
if (transactionConfig) {
txConfig = new TxConfig(transactionConfig)
}
const context = {
apiTelemetryConfig: {
api: TELEMETRY_APIS.MANAGED_TRANSACTION,
onTelemetrySuccess: () => {
context.apiTelemetryConfig = undefined
}
}
}
return this._retryLogic.retry(
of(1).pipe(
mergeMap(() => this._beginTransaction(accessMode, txConfig, context.apiTelemetryConfig)),
mergeMap(txc =>
defer(() => {
try {
return work(transactionWrapper(txc))
} catch (err) {
return throwError(() => err)
}
}).pipe(
catchError(err => txc.rollback().pipe(concatWith(throwError(() => err)))),
concatWith(txc.commit())
)
)
)
)
}
}
function _createRetryLogic (config) {
const maxRetryTimeout =
config && config.maxTransactionRetryTime
? config.maxTransactionRetryTime
: null
return new RxRetryLogic({ maxRetryTimeout })
}