'use strict'; Object.defineProperty(exports, "__esModule", { value: true }); exports.Connection = exports.connect = undefined; var _promise = require('babel-runtime/core-js/promise'); var _promise2 = _interopRequireDefault(_promise); var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck'); var _classCallCheck3 = _interopRequireDefault(_classCallCheck2); var _createClass2 = require('babel-runtime/helpers/createClass'); var _createClass3 = _interopRequireDefault(_createClass2); var _stringify = require('babel-runtime/core-js/json/stringify'); var _stringify2 = _interopRequireDefault(_stringify); var _chWebsocket = require('./ch-websocket'); var _chWebsocket2 = _interopRequireDefault(_chWebsocket); var _chNode = require('./ch-node'); var _chNode2 = _interopRequireDefault(_chNode); var _chunking = require('./chunking'); var _packstreamUtil = require('./packstream-util'); var _packstreamUtil2 = _interopRequireDefault(_packstreamUtil); var _buf = require('./buf'); var _error = require('./../error'); var _chConfig = require('./ch-config'); var _chConfig2 = _interopRequireDefault(_chConfig); var _urlUtil = require('./url-util'); var _urlUtil2 = _interopRequireDefault(_urlUtil); var _streamObserver = require('./stream-observer'); var _streamObserver2 = _interopRequireDefault(_streamObserver); var _serverVersion = require('./server-version'); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } /** * Copyright (c) 2002-2018 "Neo4j," * Neo4j Sweden AB [http://neo4j.com] * * This file is part of Neo4j. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var Channel = void 0; if (_chNode2.default.available) { Channel = _chNode2.default.channel; } else if (_chWebsocket2.default.available) { Channel = _chWebsocket2.default.channel; } else { throw (0, _error.newError)("Fatal: No compatible transport available. Need to run on a platform with the WebSocket API."); } var // Signature bytes for each message type INIT = 0x01, // 0000 0001 // INIT ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE RESET = 0x0F, // 0000 1111 // RESET RUN = 0x10, // 0001 0000 // RUN DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD * PULL_ALL = 0x3F, // 0011 1111 // PULL * SUCCESS = 0x70, // 0111 0000 // SUCCESS RECORD = 0x71, // 0111 0001 // RECORD IGNORED = 0x7E, // 0111 1110 // IGNORED FAILURE = 0x7F, // 0111 1111 // FAILURE //sent before version negotiation MAGIC_PREAMBLE = 0x6060B017, DEBUG = false; /** * Very rudimentary log handling, should probably be replaced by something proper at some point. * @param actor the part that sent the message, 'S' for server and 'C' for client * @param msg the bolt message */ function log(actor, msg) { if (DEBUG) { for (var i = 2; i < arguments.length; i++) { msg += " " + (0, _stringify2.default)(arguments[i]); } console.log(actor + ":" + msg); } } function NO_OP() {} var NO_OP_OBSERVER = { onNext: NO_OP, onCompleted: NO_OP, onError: NO_OP }; /** * A connection manages sending and receiving messages over a channel. A * connector is very closely tied to the Bolt protocol, it implements the * same message structure with very little frills. This means Connectors are * naturally tied to a specific version of the protocol, and we expect * another layer will be needed to support multiple versions. * * The connector tries to batch outbound messages by requiring its users * to call 'sync' when messages need to be sent, and it routes response * messages back to the originators of the requests that created those * response messages. * @access private */ var Connection = function () { /** * @constructor * @param {NodeChannel|WebSocketChannel} channel - channel with a 'write' function and a 'onmessage' callback property. * @param {string} hostPort - the hostname and port to connect to. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. */ function Connection(channel, hostPort) { var _this = this; var disableLosslessIntegers = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : false; (0, _classCallCheck3.default)(this, Connection); /** * An ordered queue of observers, each exchange response (zero or more * RECORD messages followed by a SUCCESS message) we receive will be routed * to the next pending observer. */ this.hostPort = hostPort; this.server = { address: hostPort }; this.creationTimestamp = Date.now(); this._disableLosslessIntegers = disableLosslessIntegers; this._pendingObservers = []; this._currentObserver = undefined; this._ch = channel; this._dechunker = new _chunking.Dechunker(); this._chunker = new _chunking.Chunker(channel); // initially assume that database supports latest Bolt version, create latest packer and unpacker this._packer = _packstreamUtil2.default.createLatestPacker(this._chunker); this._unpacker = _packstreamUtil2.default.createLatestUnpacker(disableLosslessIntegers); this._ackFailureMuted = false; this._currentFailure = null; this._state = new ConnectionState(this); // Set to true on fatal errors, to get this out of session pool. this._isBroken = false; // TODO: Using `onmessage` and `onerror` came from the WebSocket API, // it reads poorly and has several annoying drawbacks. Swap to having // Channel extend EventEmitter instead, then we can use `on('data',..)` this._ch.onmessage = function (buf) { var proposed = buf.readInt32(); if (proposed == 1 || proposed == 2) { _this._initializeProtocol(proposed, buf); } else if (proposed == 1213486160) { //server responded 1213486160 == 0x48545450 == "HTTP" _this._handleFatalError((0, _error.newError)('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)')); } else { _this._handleFatalError((0, _error.newError)('Unknown Bolt protocol version: ' + proposed)); } }; // Listen to connection errors. Important note though; // In some cases we will get a channel that is already broken (for instance, // if the user passes invalid configuration options). In this case, onerror // will have "already triggered" before we add out listener here. So the line // below also checks that the channel is not already failed. This could be nicely // encapsulated into Channel if we used `on('error', ..)` rather than `onerror=..` // as outlined in the comment about `onmessage` further up in this file. this._ch.onerror = this._handleFatalError.bind(this); if (this._ch._error) { this._handleFatalError(this._ch._error); } this._dechunker.onmessage = function (buf) { _this._handleMessage(_this._unpacker.unpack(buf)); }; var handshake = (0, _buf.alloc)(5 * 4); //magic preamble handshake.writeInt32(MAGIC_PREAMBLE); //proposed versions handshake.writeInt32(2); handshake.writeInt32(1); handshake.writeInt32(0); handshake.writeInt32(0); handshake.reset(); this._ch.write(handshake); } /** * Complete protocol initialization. * @param {number} version the selected protocol version. * @param {BaseBuffer} buffer the handshake response buffer. * @private */ (0, _createClass3.default)(Connection, [{ key: '_initializeProtocol', value: function _initializeProtocol(version, buffer) { var _this2 = this; // re-create packer and unpacker because version might be lower than we initially assumed this._packer = _packstreamUtil2.default.createPackerForProtocolVersion(version, this._chunker); this._unpacker = _packstreamUtil2.default.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers); // Ok, protocol running. Simply forward all messages to the dechunker this._ch.onmessage = function (buf) { return _this2._dechunker.write(buf); }; if (buffer.hasRemaining()) { this._dechunker.write(buffer.readSlice(buffer.remaining())); } } /** * "Fatal" means the connection is dead. Only call this if something * happens that cannot be recovered from. This will lead to all subscribers * failing, and the connection getting ejected from the session pool. * * @param err an error object, forwarded to all current and future subscribers * @protected */ }, { key: '_handleFatalError', value: function _handleFatalError(err) { this._isBroken = true; this._error = err; if (this._currentObserver && this._currentObserver.onError) { this._currentObserver.onError(err); } while (this._pendingObservers.length > 0) { var observer = this._pendingObservers.shift(); if (observer && observer.onError) { observer.onError(err); } } } }, { key: '_handleMessage', value: function _handleMessage(msg) { if (this._isBroken) { // ignore all incoming messages when this connection is broken. all previously pending observers failed // with the fatal error. all future observers will fail with same fatal error. return; } var payload = msg.fields[0]; switch (msg.signature) { case RECORD: log("S", "RECORD", msg); this._currentObserver.onNext(payload); break; case SUCCESS: log("S", "SUCCESS", msg); try { this._currentObserver.onCompleted(payload); } finally { this._updateCurrentObserver(); } break; case FAILURE: log("S", "FAILURE", msg); try { this._currentFailure = (0, _error.newError)(payload.message, payload.code); this._currentObserver.onError(this._currentFailure); } finally { this._updateCurrentObserver(); // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. this._ackFailureIfNeeded(); } break; case IGNORED: log("S", "IGNORED", msg); try { if (this._currentFailure && this._currentObserver.onError) this._currentObserver.onError(this._currentFailure);else if (this._currentObserver.onError) this._currentObserver.onError((0, _error.newError)('Ignored either because of an error or RESET')); } finally { this._updateCurrentObserver(); } break; default: this._handleFatalError((0, _error.newError)("Unknown Bolt protocol message: " + msg)); } } /** Queue an INIT-message to be sent to the database */ }, { key: 'initialize', value: function initialize(clientName, token, observer) { var _this3 = this; log("C", "INIT", clientName, token); var initObserver = this._state.wrap(observer); var queued = this._queueObserver(initObserver); if (queued) { this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)], function (err) { return _this3._handleFatalError(err); }); this._chunker.messageBoundary(); this.sync(); } } /** Queue a RUN-message to be sent to the database */ }, { key: 'run', value: function run(statement, params, observer) { var _this4 = this; log("C", "RUN", statement, params); var queued = this._queueObserver(observer); if (queued) { this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)], function (err) { return _this4._handleFatalError(err); }); this._chunker.messageBoundary(); } } /** Queue a PULL_ALL-message to be sent to the database */ }, { key: 'pullAll', value: function pullAll(observer) { var _this5 = this; log("C", "PULL_ALL"); var queued = this._queueObserver(observer); if (queued) { this._packer.packStruct(PULL_ALL, [], function (err) { return _this5._handleFatalError(err); }); this._chunker.messageBoundary(); } } /** Queue a DISCARD_ALL-message to be sent to the database */ }, { key: 'discardAll', value: function discardAll(observer) { var _this6 = this; log("C", "DISCARD_ALL"); var queued = this._queueObserver(observer); if (queued) { this._packer.packStruct(DISCARD_ALL, [], function (err) { return _this6._handleFatalError(err); }); this._chunker.messageBoundary(); } } /** * Send a RESET-message to the database. Mutes failure handling. * Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required. * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. */ }, { key: 'resetAndFlush', value: function resetAndFlush() { var _this7 = this; log('C', 'RESET'); this._ackFailureMuted = true; return new _promise2.default(function (resolve, reject) { var observer = { onNext: function onNext(record) { var neo4jError = _this7._handleProtocolError('Received RECORD as a response for RESET: ' + (0, _stringify2.default)(record)); reject(neo4jError); }, onError: function onError(error) { if (_this7._isBroken) { // handling a fatal error, no need to raise a protocol violation reject(error); } else { var neo4jError = _this7._handleProtocolError('Received FAILURE as a response for RESET: ' + error); reject(neo4jError); } }, onCompleted: function onCompleted() { _this7._ackFailureMuted = false; resolve(); } }; var queued = _this7._queueObserver(observer); if (queued) { _this7._packer.packStruct(RESET, [], function (err) { return _this7._handleFatalError(err); }); _this7._chunker.messageBoundary(); _this7.sync(); } }); } }, { key: '_ackFailureIfNeeded', value: function _ackFailureIfNeeded() { var _this8 = this; if (this._ackFailureMuted) { return; } log('C', 'ACK_FAILURE'); var observer = { onNext: function onNext(record) { _this8._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + (0, _stringify2.default)(record)); }, onError: function onError(error) { if (!_this8._isBroken && !_this8._ackFailureMuted) { // not handling a fatal error and RESET did not cause the given error - looks like a protocol violation _this8._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error); } else { _this8._currentFailure = null; } }, onCompleted: function onCompleted() { _this8._currentFailure = null; } }; var queued = this._queueObserver(observer); if (queued) { this._packer.packStruct(ACK_FAILURE, [], function (err) { return _this8._handleFatalError(err); }); this._chunker.messageBoundary(); this.sync(); } } }, { key: '_queueObserver', value: function _queueObserver(observer) { if (this._isBroken) { if (observer && observer.onError) { observer.onError(this._error); } return false; } observer = observer || NO_OP_OBSERVER; observer.onCompleted = observer.onCompleted || NO_OP; observer.onError = observer.onError || NO_OP; observer.onNext = observer.onNext || NO_OP; if (this._currentObserver === undefined) { this._currentObserver = observer; } else { this._pendingObservers.push(observer); } return true; } /** * Get promise resolved when connection initialization succeed or rejected when it fails. * Connection is initialized using {@link initialize} function. * @return {Promise} the result of connection initialization. */ }, { key: 'initializationCompleted', value: function initializationCompleted() { return this._state.initializationCompleted(); } /* * Pop next pending observer form the list of observers and make it current observer. * @protected */ }, { key: '_updateCurrentObserver', value: function _updateCurrentObserver() { this._currentObserver = this._pendingObservers.shift(); } /** * Synchronize - flush all queued outgoing messages and route their responses * to their respective handlers. */ }, { key: 'sync', value: function sync() { this._chunker.flush(); } /** Check if this connection is in working condition */ }, { key: 'isOpen', value: function isOpen() { return !this._isBroken && this._ch._open; } }, { key: 'isEncrypted', value: function isEncrypted() { return this._ch.isEncrypted(); } /** * Call close on the channel. * @param {function} cb - Function to call on close. */ }, { key: 'close', value: function close(cb) { this._ch.close(cb); } }, { key: '_packable', value: function _packable(value) { var _this9 = this; return this._packer.packable(value, function (err) { return _this9._handleFatalError(err); }); } /** * @protected */ }, { key: '_markInitialized', value: function _markInitialized(metadata) { var serverVersion = metadata ? metadata.server : null; if (!this.server.version) { this.server.version = serverVersion; var version = _serverVersion.ServerVersion.fromString(serverVersion); if (version.compareTo(_serverVersion.VERSION_3_2_0) < 0) { this._packer.disableByteArrays(); } } } }, { key: '_handleProtocolError', value: function _handleProtocolError(message) { this._ackFailureMuted = false; this._currentFailure = null; this._updateCurrentObserver(); var error = (0, _error.newError)(message, _error.PROTOCOL_ERROR); this._handleFatalError(error); return error; } }]); return Connection; }(); var ConnectionState = function () { /** * @constructor * @param {Connection} connection the connection to track state for. */ function ConnectionState(connection) { var _this10 = this; (0, _classCallCheck3.default)(this, ConnectionState); this._connection = connection; this._initRequested = false; this._initError = null; this._resolveInitPromise = null; this._rejectInitPromise = null; this._initPromise = new _promise2.default(function (resolve, reject) { _this10._resolveInitPromise = resolve; _this10._rejectInitPromise = reject; }); } /** * Wrap the given observer to track connection's initialization state. Connection is closed by the server if * processing of INIT message fails so returned observer will handle initialization failure as a fatal error. * @param {StreamObserver} observer the observer used for INIT message. * @return {StreamObserver} updated observer. */ (0, _createClass3.default)(ConnectionState, [{ key: 'wrap', value: function wrap(observer) { var _this11 = this; return { onNext: function onNext(record) { if (observer && observer.onNext) { observer.onNext(record); } }, onError: function onError(error) { _this11._processFailure(error); _this11._connection._updateCurrentObserver(); // make sure this same observer will not be called again try { if (observer && observer.onError) { observer.onError(error); } } finally { _this11._connection._handleFatalError(error); } }, onCompleted: function onCompleted(metaData) { _this11._connection._markInitialized(metaData); _this11._resolveInitPromise(_this11._connection); if (observer && observer.onCompleted) { observer.onCompleted(metaData); } } }; } /** * Get promise resolved when connection initialization succeed or rejected when it fails. * @return {Promise} the result of connection initialization. */ }, { key: 'initializationCompleted', value: function initializationCompleted() { this._initRequested = true; if (this._initError) { var error = this._initError; this._initError = null; // to reject initPromise only once this._rejectInitPromise(error); } return this._initPromise; } /** * @private */ }, { key: '_processFailure', value: function _processFailure(error) { if (this._initRequested) { // someone is waiting for initialization to complete, reject the promise this._rejectInitPromise(error); } else { // no one is waiting for initialization, memorize the error but do not reject the promise // to avoid unnecessary unhandled promise rejection warnings this._initError = error; } } }]); return ConnectionState; }(); /** * Crete new connection to the provided address. * @access private * @param {string} hostPort - the Bolt endpoint to connect to * @param {object} config - this driver configuration * @param {string=null} connectionErrorCode - error code for errors raised on connection errors * @return {Connection} - New connection */ function connect(hostPort) { var config = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; var connectionErrorCode = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : null; var Ch = config.channel || Channel; var parsedAddress = _urlUtil2.default.parseDatabaseUrl(hostPort); var channelConfig = new _chConfig2.default(parsedAddress, config, connectionErrorCode); return new Connection(new Ch(channelConfig), parsedAddress.hostAndPort, config.disableLosslessIntegers); } exports.connect = connect; exports.Connection = Connection;