src/transaction-rx.js
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Observable } from 'rxjs'
import RxResult from './result-rx'
// eslint-disable-next-line no-unused-vars
import Transaction from 'neo4j-driver-core'
/**
* A reactive transaction, which provides the same functionality as {@link Transaction} but through a Reactive API.
*/
export default class RxTransaction {
/**
* @constructor
* @protected
* @param {Transaction} txc - The underlying transaction instance to relay requests
*/
constructor (txc) {
this._txc = txc
}
/**
* Creates a reactive result that will execute the query in this transaction, with the provided parameters.
*
* @public
* @param {string} query - Query to be executed.
* @param {Object} parameters - Parameter values to use in query execution.
* @returns {RxResult} - A reactive result
*/
run (query, parameters) {
return new RxResult(
new Observable(observer => {
try {
observer.next(this._txc.run(query, parameters))
observer.complete()
} catch (err) {
observer.error(err)
}
return () => {}
})
)
}
/**
* Commits the transaction.
*
* @public
* @returns {Observable} - An empty observable
*/
commit () {
return new Observable(observer => {
this._txc
.commit()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
}
/**
* Rolls back the transaction.
*
* @public
* @returns {Observable} - An empty observable
*/
rollback () {
return new Observable(observer => {
this._txc
.rollback()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
}
/**
* Check if this transaction is active, which means commit and rollback did not happen.
* @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
*/
isOpen () {
return this._txc.isOpen()
}
/**
* Closes the transaction
*
* This method will roll back the transaction if it is not already committed or rolled back.
*
* @returns {Observable} - An empty observable
*/
close () {
return new Observable(observer => {
this._txc
.close()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
}
}