Files
2026-04-05 16:14:49 -04:00

727 lines
23 KiB
JavaScript

'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.Connection = exports.connect = undefined;
var _promise = require('babel-runtime/core-js/promise');
var _promise2 = _interopRequireDefault(_promise);
var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
var _createClass2 = require('babel-runtime/helpers/createClass');
var _createClass3 = _interopRequireDefault(_createClass2);
var _stringify = require('babel-runtime/core-js/json/stringify');
var _stringify2 = _interopRequireDefault(_stringify);
var _chWebsocket = require('./ch-websocket');
var _chWebsocket2 = _interopRequireDefault(_chWebsocket);
var _chNode = require('./ch-node');
var _chNode2 = _interopRequireDefault(_chNode);
var _chunking = require('./chunking');
var _packstreamUtil = require('./packstream-util');
var _packstreamUtil2 = _interopRequireDefault(_packstreamUtil);
var _buf = require('./buf');
var _error = require('./../error');
var _chConfig = require('./ch-config');
var _chConfig2 = _interopRequireDefault(_chConfig);
var _urlUtil = require('./url-util');
var _urlUtil2 = _interopRequireDefault(_urlUtil);
var _streamObserver = require('./stream-observer');
var _streamObserver2 = _interopRequireDefault(_streamObserver);
var _serverVersion = require('./server-version');
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
/**
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var Channel = void 0;
if (_chNode2.default.available) {
Channel = _chNode2.default.channel;
} else if (_chWebsocket2.default.available) {
Channel = _chWebsocket2.default.channel;
} else {
throw (0, _error.newError)("Fatal: No compatible transport available. Need to run on a platform with the WebSocket API.");
}
var
// Signature bytes for each message type
INIT = 0x01,
// 0000 0001 // INIT <user_agent>
ACK_FAILURE = 0x0E,
// 0000 1110 // ACK_FAILURE
RESET = 0x0F,
// 0000 1111 // RESET
RUN = 0x10,
// 0001 0000 // RUN <statement> <parameters>
DISCARD_ALL = 0x2F,
// 0010 1111 // DISCARD *
PULL_ALL = 0x3F,
// 0011 1111 // PULL *
SUCCESS = 0x70,
// 0111 0000 // SUCCESS <metadata>
RECORD = 0x71,
// 0111 0001 // RECORD <value>
IGNORED = 0x7E,
// 0111 1110 // IGNORED <metadata>
FAILURE = 0x7F,
// 0111 1111 // FAILURE <metadata>
//sent before version negotiation
MAGIC_PREAMBLE = 0x6060B017,
DEBUG = false;
/**
* Very rudimentary log handling, should probably be replaced by something proper at some point.
* @param actor the part that sent the message, 'S' for server and 'C' for client
* @param msg the bolt message
*/
function log(actor, msg) {
if (DEBUG) {
for (var i = 2; i < arguments.length; i++) {
msg += " " + (0, _stringify2.default)(arguments[i]);
}
console.log(actor + ":" + msg);
}
}
function NO_OP() {}
var NO_OP_OBSERVER = {
onNext: NO_OP,
onCompleted: NO_OP,
onError: NO_OP
};
/**
* A connection manages sending and receiving messages over a channel. A
* connector is very closely tied to the Bolt protocol, it implements the
* same message structure with very little frills. This means Connectors are
* naturally tied to a specific version of the protocol, and we expect
* another layer will be needed to support multiple versions.
*
* The connector tries to batch outbound messages by requiring its users
* to call 'sync' when messages need to be sent, and it routes response
* messages back to the originators of the requests that created those
* response messages.
* @access private
*/
var Connection = function () {
/**
* @constructor
* @param {NodeChannel|WebSocketChannel} channel - channel with a 'write' function and a 'onmessage' callback property.
* @param {string} hostPort - the hostname and port to connect to.
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
*/
function Connection(channel, hostPort) {
var _this = this;
var disableLosslessIntegers = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : false;
(0, _classCallCheck3.default)(this, Connection);
/**
* An ordered queue of observers, each exchange response (zero or more
* RECORD messages followed by a SUCCESS message) we receive will be routed
* to the next pending observer.
*/
this.hostPort = hostPort;
this.server = { address: hostPort };
this.creationTimestamp = Date.now();
this._disableLosslessIntegers = disableLosslessIntegers;
this._pendingObservers = [];
this._currentObserver = undefined;
this._ch = channel;
this._dechunker = new _chunking.Dechunker();
this._chunker = new _chunking.Chunker(channel);
// initially assume that database supports latest Bolt version, create latest packer and unpacker
this._packer = _packstreamUtil2.default.createLatestPacker(this._chunker);
this._unpacker = _packstreamUtil2.default.createLatestUnpacker(disableLosslessIntegers);
this._ackFailureMuted = false;
this._currentFailure = null;
this._state = new ConnectionState(this);
// Set to true on fatal errors, to get this out of session pool.
this._isBroken = false;
// TODO: Using `onmessage` and `onerror` came from the WebSocket API,
// it reads poorly and has several annoying drawbacks. Swap to having
// Channel extend EventEmitter instead, then we can use `on('data',..)`
this._ch.onmessage = function (buf) {
var proposed = buf.readInt32();
if (proposed == 1 || proposed == 2) {
_this._initializeProtocol(proposed, buf);
} else if (proposed == 1213486160) {
//server responded 1213486160 == 0x48545450 == "HTTP"
_this._handleFatalError((0, _error.newError)('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'));
} else {
_this._handleFatalError((0, _error.newError)('Unknown Bolt protocol version: ' + proposed));
}
};
// Listen to connection errors. Important note though;
// In some cases we will get a channel that is already broken (for instance,
// if the user passes invalid configuration options). In this case, onerror
// will have "already triggered" before we add out listener here. So the line
// below also checks that the channel is not already failed. This could be nicely
// encapsulated into Channel if we used `on('error', ..)` rather than `onerror=..`
// as outlined in the comment about `onmessage` further up in this file.
this._ch.onerror = this._handleFatalError.bind(this);
if (this._ch._error) {
this._handleFatalError(this._ch._error);
}
this._dechunker.onmessage = function (buf) {
_this._handleMessage(_this._unpacker.unpack(buf));
};
var handshake = (0, _buf.alloc)(5 * 4);
//magic preamble
handshake.writeInt32(MAGIC_PREAMBLE);
//proposed versions
handshake.writeInt32(2);
handshake.writeInt32(1);
handshake.writeInt32(0);
handshake.writeInt32(0);
handshake.reset();
this._ch.write(handshake);
}
/**
* Complete protocol initialization.
* @param {number} version the selected protocol version.
* @param {BaseBuffer} buffer the handshake response buffer.
* @private
*/
(0, _createClass3.default)(Connection, [{
key: '_initializeProtocol',
value: function _initializeProtocol(version, buffer) {
var _this2 = this;
// re-create packer and unpacker because version might be lower than we initially assumed
this._packer = _packstreamUtil2.default.createPackerForProtocolVersion(version, this._chunker);
this._unpacker = _packstreamUtil2.default.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers);
// Ok, protocol running. Simply forward all messages to the dechunker
this._ch.onmessage = function (buf) {
return _this2._dechunker.write(buf);
};
if (buffer.hasRemaining()) {
this._dechunker.write(buffer.readSlice(buffer.remaining()));
}
}
/**
* "Fatal" means the connection is dead. Only call this if something
* happens that cannot be recovered from. This will lead to all subscribers
* failing, and the connection getting ejected from the session pool.
*
* @param err an error object, forwarded to all current and future subscribers
* @protected
*/
}, {
key: '_handleFatalError',
value: function _handleFatalError(err) {
this._isBroken = true;
this._error = err;
if (this._currentObserver && this._currentObserver.onError) {
this._currentObserver.onError(err);
}
while (this._pendingObservers.length > 0) {
var observer = this._pendingObservers.shift();
if (observer && observer.onError) {
observer.onError(err);
}
}
}
}, {
key: '_handleMessage',
value: function _handleMessage(msg) {
if (this._isBroken) {
// ignore all incoming messages when this connection is broken. all previously pending observers failed
// with the fatal error. all future observers will fail with same fatal error.
return;
}
var payload = msg.fields[0];
switch (msg.signature) {
case RECORD:
log("S", "RECORD", msg);
this._currentObserver.onNext(payload);
break;
case SUCCESS:
log("S", "SUCCESS", msg);
try {
this._currentObserver.onCompleted(payload);
} finally {
this._updateCurrentObserver();
}
break;
case FAILURE:
log("S", "FAILURE", msg);
try {
this._currentFailure = (0, _error.newError)(payload.message, payload.code);
this._currentObserver.onError(this._currentFailure);
} finally {
this._updateCurrentObserver();
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
this._ackFailureIfNeeded();
}
break;
case IGNORED:
log("S", "IGNORED", msg);
try {
if (this._currentFailure && this._currentObserver.onError) this._currentObserver.onError(this._currentFailure);else if (this._currentObserver.onError) this._currentObserver.onError((0, _error.newError)('Ignored either because of an error or RESET'));
} finally {
this._updateCurrentObserver();
}
break;
default:
this._handleFatalError((0, _error.newError)("Unknown Bolt protocol message: " + msg));
}
}
/** Queue an INIT-message to be sent to the database */
}, {
key: 'initialize',
value: function initialize(clientName, token, observer) {
var _this3 = this;
log("C", "INIT", clientName, token);
var initObserver = this._state.wrap(observer);
var queued = this._queueObserver(initObserver);
if (queued) {
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)], function (err) {
return _this3._handleFatalError(err);
});
this._chunker.messageBoundary();
this.sync();
}
}
/** Queue a RUN-message to be sent to the database */
}, {
key: 'run',
value: function run(statement, params, observer) {
var _this4 = this;
log("C", "RUN", statement, params);
var queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)], function (err) {
return _this4._handleFatalError(err);
});
this._chunker.messageBoundary();
}
}
/** Queue a PULL_ALL-message to be sent to the database */
}, {
key: 'pullAll',
value: function pullAll(observer) {
var _this5 = this;
log("C", "PULL_ALL");
var queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(PULL_ALL, [], function (err) {
return _this5._handleFatalError(err);
});
this._chunker.messageBoundary();
}
}
/** Queue a DISCARD_ALL-message to be sent to the database */
}, {
key: 'discardAll',
value: function discardAll(observer) {
var _this6 = this;
log("C", "DISCARD_ALL");
var queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(DISCARD_ALL, [], function (err) {
return _this6._handleFatalError(err);
});
this._chunker.messageBoundary();
}
}
/**
* Send a RESET-message to the database. Mutes failure handling.
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
*/
}, {
key: 'resetAndFlush',
value: function resetAndFlush() {
var _this7 = this;
log('C', 'RESET');
this._ackFailureMuted = true;
return new _promise2.default(function (resolve, reject) {
var observer = {
onNext: function onNext(record) {
var neo4jError = _this7._handleProtocolError('Received RECORD as a response for RESET: ' + (0, _stringify2.default)(record));
reject(neo4jError);
},
onError: function onError(error) {
if (_this7._isBroken) {
// handling a fatal error, no need to raise a protocol violation
reject(error);
} else {
var neo4jError = _this7._handleProtocolError('Received FAILURE as a response for RESET: ' + error);
reject(neo4jError);
}
},
onCompleted: function onCompleted() {
_this7._ackFailureMuted = false;
resolve();
}
};
var queued = _this7._queueObserver(observer);
if (queued) {
_this7._packer.packStruct(RESET, [], function (err) {
return _this7._handleFatalError(err);
});
_this7._chunker.messageBoundary();
_this7.sync();
}
});
}
}, {
key: '_ackFailureIfNeeded',
value: function _ackFailureIfNeeded() {
var _this8 = this;
if (this._ackFailureMuted) {
return;
}
log('C', 'ACK_FAILURE');
var observer = {
onNext: function onNext(record) {
_this8._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + (0, _stringify2.default)(record));
},
onError: function onError(error) {
if (!_this8._isBroken && !_this8._ackFailureMuted) {
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
_this8._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
} else {
_this8._currentFailure = null;
}
},
onCompleted: function onCompleted() {
_this8._currentFailure = null;
}
};
var queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(ACK_FAILURE, [], function (err) {
return _this8._handleFatalError(err);
});
this._chunker.messageBoundary();
this.sync();
}
}
}, {
key: '_queueObserver',
value: function _queueObserver(observer) {
if (this._isBroken) {
if (observer && observer.onError) {
observer.onError(this._error);
}
return false;
}
observer = observer || NO_OP_OBSERVER;
observer.onCompleted = observer.onCompleted || NO_OP;
observer.onError = observer.onError || NO_OP;
observer.onNext = observer.onNext || NO_OP;
if (this._currentObserver === undefined) {
this._currentObserver = observer;
} else {
this._pendingObservers.push(observer);
}
return true;
}
/**
* Get promise resolved when connection initialization succeed or rejected when it fails.
* Connection is initialized using {@link initialize} function.
* @return {Promise<Connection>} the result of connection initialization.
*/
}, {
key: 'initializationCompleted',
value: function initializationCompleted() {
return this._state.initializationCompleted();
}
/*
* Pop next pending observer form the list of observers and make it current observer.
* @protected
*/
}, {
key: '_updateCurrentObserver',
value: function _updateCurrentObserver() {
this._currentObserver = this._pendingObservers.shift();
}
/**
* Synchronize - flush all queued outgoing messages and route their responses
* to their respective handlers.
*/
}, {
key: 'sync',
value: function sync() {
this._chunker.flush();
}
/** Check if this connection is in working condition */
}, {
key: 'isOpen',
value: function isOpen() {
return !this._isBroken && this._ch._open;
}
}, {
key: 'isEncrypted',
value: function isEncrypted() {
return this._ch.isEncrypted();
}
/**
* Call close on the channel.
* @param {function} cb - Function to call on close.
*/
}, {
key: 'close',
value: function close(cb) {
this._ch.close(cb);
}
}, {
key: '_packable',
value: function _packable(value) {
var _this9 = this;
return this._packer.packable(value, function (err) {
return _this9._handleFatalError(err);
});
}
/**
* @protected
*/
}, {
key: '_markInitialized',
value: function _markInitialized(metadata) {
var serverVersion = metadata ? metadata.server : null;
if (!this.server.version) {
this.server.version = serverVersion;
var version = _serverVersion.ServerVersion.fromString(serverVersion);
if (version.compareTo(_serverVersion.VERSION_3_2_0) < 0) {
this._packer.disableByteArrays();
}
}
}
}, {
key: '_handleProtocolError',
value: function _handleProtocolError(message) {
this._ackFailureMuted = false;
this._currentFailure = null;
this._updateCurrentObserver();
var error = (0, _error.newError)(message, _error.PROTOCOL_ERROR);
this._handleFatalError(error);
return error;
}
}]);
return Connection;
}();
var ConnectionState = function () {
/**
* @constructor
* @param {Connection} connection the connection to track state for.
*/
function ConnectionState(connection) {
var _this10 = this;
(0, _classCallCheck3.default)(this, ConnectionState);
this._connection = connection;
this._initRequested = false;
this._initError = null;
this._resolveInitPromise = null;
this._rejectInitPromise = null;
this._initPromise = new _promise2.default(function (resolve, reject) {
_this10._resolveInitPromise = resolve;
_this10._rejectInitPromise = reject;
});
}
/**
* Wrap the given observer to track connection's initialization state. Connection is closed by the server if
* processing of INIT message fails so returned observer will handle initialization failure as a fatal error.
* @param {StreamObserver} observer the observer used for INIT message.
* @return {StreamObserver} updated observer.
*/
(0, _createClass3.default)(ConnectionState, [{
key: 'wrap',
value: function wrap(observer) {
var _this11 = this;
return {
onNext: function onNext(record) {
if (observer && observer.onNext) {
observer.onNext(record);
}
},
onError: function onError(error) {
_this11._processFailure(error);
_this11._connection._updateCurrentObserver(); // make sure this same observer will not be called again
try {
if (observer && observer.onError) {
observer.onError(error);
}
} finally {
_this11._connection._handleFatalError(error);
}
},
onCompleted: function onCompleted(metaData) {
_this11._connection._markInitialized(metaData);
_this11._resolveInitPromise(_this11._connection);
if (observer && observer.onCompleted) {
observer.onCompleted(metaData);
}
}
};
}
/**
* Get promise resolved when connection initialization succeed or rejected when it fails.
* @return {Promise<Connection>} the result of connection initialization.
*/
}, {
key: 'initializationCompleted',
value: function initializationCompleted() {
this._initRequested = true;
if (this._initError) {
var error = this._initError;
this._initError = null; // to reject initPromise only once
this._rejectInitPromise(error);
}
return this._initPromise;
}
/**
* @private
*/
}, {
key: '_processFailure',
value: function _processFailure(error) {
if (this._initRequested) {
// someone is waiting for initialization to complete, reject the promise
this._rejectInitPromise(error);
} else {
// no one is waiting for initialization, memorize the error but do not reject the promise
// to avoid unnecessary unhandled promise rejection warnings
this._initError = error;
}
}
}]);
return ConnectionState;
}();
/**
* Crete new connection to the provided address.
* @access private
* @param {string} hostPort - the Bolt endpoint to connect to
* @param {object} config - this driver configuration
* @param {string=null} connectionErrorCode - error code for errors raised on connection errors
* @return {Connection} - New connection
*/
function connect(hostPort) {
var config = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
var connectionErrorCode = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : null;
var Ch = config.channel || Channel;
var parsedAddress = _urlUtil2.default.parseDatabaseUrl(hostPort);
var channelConfig = new _chConfig2.default(parsedAddress, config, connectionErrorCode);
return new Connection(new Ch(channelConfig), parsedAddress.hostAndPort, config.disableLosslessIntegers);
}
exports.connect = connect;
exports.Connection = Connection;