src/result-rx.js
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* eslint-disable-next-line no-unused-vars */
import { newError, Record, ResultSummary } from 'neo4j-driver-core'
import { Observable, Subject, ReplaySubject, from } from 'rxjs'
import { mergeMap, publishReplay, refCount } from 'rxjs/operators'
const States = {
READY: 0,
STREAMING: 1,
COMPLETED: 2
}
/**
* The reactive result interface.
*/
export default class RxResult {
/**
* @constructor
* @protected
* @param {Observable<Result>} result - An observable of single Result instance to relay requests.
* @param {number} state - The streaming state
*/
constructor (result, state) {
const replayedResult = result.pipe(publishReplay(1), refCount())
this._result = replayedResult
this._keys = replayedResult.pipe(
mergeMap(r => from(r.keys())),
publishReplay(1),
refCount()
)
this._records = undefined
this._controls = new StreamControl()
this._summary = new ReplaySubject()
this._state = state || States.READY
}
/**
* Returns an observable that exposes a single item containing field names
* returned by the executing query.
*
* Errors raised by actual query execution can surface on the returned
* observable stream.
*
* @public
* @returns {Observable<string[]>} - An observable stream (with exactly one element) of field names.
*/
keys () {
return this._keys
}
/**
* Returns an observable that exposes each record returned by the executing query.
*
* Errors raised during the streaming phase can surface on the returned observable stream.
*
* @public
* @returns {Observable<Record>} - An observable stream of records.
*/
records () {
const result = this._result.pipe(
mergeMap(
result =>
new Observable(recordsObserver =>
this._startStreaming({ result, recordsObserver })
)
)
)
result.push = () => this._push()
return result
}
/**
* Returns an observable that exposes a single item of {@link ResultSummary} that is generated by
* the server after the streaming of the executing query is completed.
*
* *Subscribing to this stream before subscribing to records() stream causes the results to be discarded on the server.*
*
* @public
* @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
*/
consume () {
return this._result.pipe(
mergeMap(
result =>
new Observable(summaryObserver =>
this._startStreaming({ result, summaryObserver })
)
)
)
}
/**
* Pauses the automatic streaming of records.
*
* This method provides a way of control the flow of records
*
* @experimental
*/
pause () {
this._controls.pause()
}
/**
* Resumes the automatic streaming of records.
*
* This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused.
*
* This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed.
* @experimental
* @returns {Promise<void>} - A promise that resolves when the stream is resumed.
*/
resume () {
return this._controls.resume()
}
/**
* Pushes the next record to the stream.
*
* This method automatic pause the auto-streaming of records and then push next record to the stream.
*
* For returning the automatic streaming of records, use {@link resume} method.
*
* @experimental
* @returns {Promise<void>} - A promise that resolves when the push is completed.
*/
push () {
return this._controls.push()
}
_startStreaming ({
result,
recordsObserver = null,
summaryObserver = null
} = {}) {
const subscriptions = []
if (summaryObserver) {
subscriptions.push(this._summary.subscribe(summaryObserver))
}
if (this._state < States.STREAMING) {
this._state = States.STREAMING
this._setupRecordsStream(result)
if (recordsObserver) {
subscriptions.push(this._records.subscribe(recordsObserver))
} else {
result._cancel()
}
subscriptions.push({
unsubscribe: () => {
if (result._cancel) {
result._cancel()
}
}
})
} else if (recordsObserver) {
recordsObserver.error(
newError(
'Streaming has already started/consumed with a previous records or summary subscription.'
)
)
}
return () => {
subscriptions.forEach(s => s.unsubscribe())
}
}
/**
* Create a {@link Observable} for the current {@link RxResult}
*
*
* @package
* @experimental
* @since 5.0
* @return {Observable<RxResult>}
*/
_toObservable () {
function wrap (result) {
return new Observable(observer => {
observer.next(result)
observer.complete()
})
}
return new Observable(observer => {
this._result.subscribe({
complete: () => observer.complete(),
next: result => observer.next(new RxResult(wrap(result)), this._state),
error: e => observer.error(e)
})
})
}
_setupRecordsStream (result) {
if (this._records) {
return this._records
}
this._records = createFullyControlledSubject(
result[Symbol.asyncIterator](),
{
complete: async () => {
this._state = States.COMPLETED
this._summary.next(await result.summary())
this._summary.complete()
},
error: error => {
this._state = States.COMPLETED
this._summary.error(error)
}
},
this._controls
)
return this._records
}
}
function createFullyControlledSubject (
iterator,
completeObserver,
streamControl = new StreamControl()
) {
const subject = new Subject()
const pushNextValue = async result => {
try {
streamControl.pushing = true
const { done, value } = await result
if (done) {
subject.complete()
completeObserver.complete()
} else {
subject.next(value)
if (!streamControl.paused) {
pushNextValue(iterator.next())
.catch(() => {})
}
}
} catch (error) {
subject.error(error)
completeObserver.error(error)
} finally {
streamControl.pushing = false
}
}
async function push (value) {
await pushNextValue(iterator.next(value))
}
streamControl.pusher = push
push()
return subject
}
class StreamControl {
constructor (push = async () => {}) {
this._paused = false
this._pushing = false
this._push = push
}
pause () {
this._paused = true
}
get paused () {
return this._paused
}
set pushing (pushing) {
this._pushing = pushing
}
get pushing () {
return this._pushing
}
async resume () {
const wasPaused = this._paused
this._paused = false
if (wasPaused && !this._pushing) {
await this._push()
}
}
async push () {
this.pause()
return await this._push()
}
set pusher (push) {
this._push = push
}
get pusher () {
return this._push
}
}