'use strict'; Object.defineProperty(exports, "__esModule", { value: true }); exports.Dechunker = exports.Chunker = undefined; var _getPrototypeOf = require('babel-runtime/core-js/object/get-prototype-of'); var _getPrototypeOf2 = _interopRequireDefault(_getPrototypeOf); var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck'); var _classCallCheck3 = _interopRequireDefault(_classCallCheck2); var _createClass2 = require('babel-runtime/helpers/createClass'); var _createClass3 = _interopRequireDefault(_createClass2); var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn'); var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2); var _inherits2 = require('babel-runtime/helpers/inherits'); var _inherits3 = _interopRequireDefault(_inherits2); var _buf = require('./buf'); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } var _CHUNK_HEADER_SIZE = 2, _MESSAGE_BOUNDARY = 0x00, _DEFAULT_BUFFER_SIZE = 1400; // http://stackoverflow.com/questions/2613734/maximum-packet-size-for-a-tcp-connection /** * Looks like a writable buffer, chunks output transparently into a channel below. * @access private */ /** * Copyright (c) 2002-2018 "Neo4j," * Neo4j Sweden AB [http://neo4j.com] * * This file is part of Neo4j. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var Chunker = function (_BaseBuffer) { (0, _inherits3.default)(Chunker, _BaseBuffer); function Chunker(channel, bufferSize) { (0, _classCallCheck3.default)(this, Chunker); var _this = (0, _possibleConstructorReturn3.default)(this, (Chunker.__proto__ || (0, _getPrototypeOf2.default)(Chunker)).call(this, 0)); _this._bufferSize = bufferSize || _DEFAULT_BUFFER_SIZE; _this._ch = channel; _this._buffer = (0, _buf.alloc)(_this._bufferSize); _this._currentChunkStart = 0; _this._chunkOpen = false; return _this; } (0, _createClass3.default)(Chunker, [{ key: 'putUInt8', value: function putUInt8(position, val) { this._ensure(1); this._buffer.writeUInt8(val); } }, { key: 'putInt8', value: function putInt8(position, val) { this._ensure(1); this._buffer.writeInt8(val); } }, { key: 'putFloat64', value: function putFloat64(position, val) { this._ensure(8); this._buffer.writeFloat64(val); } }, { key: 'putBytes', value: function putBytes(position, data) { // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on // rather than doing the copy here TODO: *however* note that we need some way to find out when the data has been // written (and thus the buffer can be re-used) if we take that approach while (data.remaining() > 0) { // Ensure there is an open chunk, and that it has at least one byte of space left this._ensure(1); if (this._buffer.remaining() > data.remaining()) { this._buffer.writeBytes(data); } else { this._buffer.writeBytes(data.readSlice(this._buffer.remaining())); } } return this; } }, { key: 'flush', value: function flush() { if (this._buffer.position > 0) { this._closeChunkIfOpen(); // Local copy and clear the buffer field. This ensures that the buffer is not re-released if the flush call fails var out = this._buffer; this._buffer = null; this._ch.write(out.getSlice(0, out.position)); // Alloc a new output buffer. We assume we're using NodeJS's buffer pooling under the hood here! this._buffer = (0, _buf.alloc)(this._bufferSize); this._chunkOpen = false; } return this; } /** * Bolt messages are encoded in one or more chunks, and the boundary between two messages * is encoded as a 0-length chunk, `00 00`. This inserts such a message boundary, closing * any currently open chunk as needed */ }, { key: 'messageBoundary', value: function messageBoundary() { this._closeChunkIfOpen(); if (this._buffer.remaining() < _CHUNK_HEADER_SIZE) { this.flush(); } // Write message boundary this._buffer.writeInt16(_MESSAGE_BOUNDARY); } /** Ensure at least the given size is available for writing */ }, { key: '_ensure', value: function _ensure(size) { var toWriteSize = this._chunkOpen ? size : size + _CHUNK_HEADER_SIZE; if (this._buffer.remaining() < toWriteSize) { this.flush(); } if (!this._chunkOpen) { this._currentChunkStart = this._buffer.position; this._buffer.position = this._buffer.position + _CHUNK_HEADER_SIZE; this._chunkOpen = true; } } }, { key: '_closeChunkIfOpen', value: function _closeChunkIfOpen() { if (this._chunkOpen) { var chunkSize = this._buffer.position - (this._currentChunkStart + _CHUNK_HEADER_SIZE); this._buffer.putUInt16(this._currentChunkStart, chunkSize); this._chunkOpen = false; } } }]); return Chunker; }(_buf.BaseBuffer); /** * Combines chunks until a complete message is gathered up, and then forwards that * message to an 'onmessage' listener. * @access private */ var Dechunker = function () { function Dechunker() { (0, _classCallCheck3.default)(this, Dechunker); this._currentMessage = []; this._partialChunkHeader = 0; this._state = this.AWAITING_CHUNK; } (0, _createClass3.default)(Dechunker, [{ key: 'AWAITING_CHUNK', value: function AWAITING_CHUNK(buf) { if (buf.remaining() >= 2) { // Whole header available, read that return this._onHeader(buf.readUInt16()); } else { // Only one byte available, read that and wait for the second byte this._partialChunkHeader = buf.readUInt8() << 8; return this.IN_HEADER; } } }, { key: 'IN_HEADER', value: function IN_HEADER(buf) { // First header byte read, now we read the next one return this._onHeader((this._partialChunkHeader | buf.readUInt8()) & 0xFFFF); } }, { key: 'IN_CHUNK', value: function IN_CHUNK(buf) { if (this._chunkSize <= buf.remaining()) { // Current packet is larger than current chunk, or same size: this._currentMessage.push(buf.readSlice(this._chunkSize)); return this.AWAITING_CHUNK; } else { // Current packet is smaller than the chunk we're reading, split the current chunk itself up this._chunkSize -= buf.remaining(); this._currentMessage.push(buf.readSlice(buf.remaining())); return this.IN_CHUNK; } } }, { key: 'CLOSED', value: function CLOSED(buf) {} // no-op /** Called when a complete chunk header has been received */ }, { key: '_onHeader', value: function _onHeader(header) { if (header == 0) { // Message boundary var message = void 0; if (this._currentMessage.length == 1) { message = this._currentMessage[0]; } else { message = new _buf.CombinedBuffer(this._currentMessage); } this._currentMessage = []; this.onmessage(message); return this.AWAITING_CHUNK; } else { this._chunkSize = header; return this.IN_CHUNK; } } }, { key: 'write', value: function write(buf) { while (buf.hasRemaining()) { this._state = this._state(buf); } } }]); return Dechunker; }(); exports.Chunker = Chunker; exports.Dechunker = Dechunker;