'use strict'; Object.defineProperty(exports, "__esModule", { value: true }); var _getPrototypeOf = require('babel-runtime/core-js/object/get-prototype-of'); var _getPrototypeOf2 = _interopRequireDefault(_getPrototypeOf); var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn'); var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2); var _get2 = require('babel-runtime/helpers/get'); var _get3 = _interopRequireDefault(_get2); var _inherits2 = require('babel-runtime/helpers/inherits'); var _inherits3 = _interopRequireDefault(_inherits2); var _promise = require('babel-runtime/core-js/promise'); var _promise2 = _interopRequireDefault(_promise); var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck'); var _classCallCheck3 = _interopRequireDefault(_classCallCheck2); var _createClass2 = require('babel-runtime/helpers/createClass'); var _createClass3 = _interopRequireDefault(_createClass2); var _streamObserver = require('./internal/stream-observer'); var _streamObserver2 = _interopRequireDefault(_streamObserver); var _result = require('./result'); var _result2 = _interopRequireDefault(_result); var _util = require('./internal/util'); var _connectionHolder = require('./internal/connection-holder'); var _bookmark = require('./internal/bookmark'); var _bookmark2 = _interopRequireDefault(_bookmark); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } /** * Represents a transaction in the Neo4j database. * * @access public */ var Transaction = function () { /** * @constructor * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param {function(error: Error): Error} errorTransformer callback use to transform error. * @param {Bookmark} bookmark bookmark for transaction begin. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. */ function Transaction(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { (0, _classCallCheck3.default)(this, Transaction); this._connectionHolder = connectionHolder; var streamObserver = new _TransactionStreamObserver(this); this._connectionHolder.getConnection(streamObserver).then(function (conn) { conn.run('BEGIN', bookmark.asBeginTransactionParameters(), streamObserver); conn.pullAll(streamObserver); }).catch(function (error) { return streamObserver.onError(error); }); this._state = _states.ACTIVE; this._onClose = onClose; this._errorTransformer = errorTransformer; this._onBookmark = onBookmark; } /** * Run Cypher statement * Could be called with a statement object i.e.: {text: "MATCH ...", parameters: {param: 1}} * or with the statement and parameters as separate arguments. * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement * @return {Result} New Result */ (0, _createClass3.default)(Transaction, [{ key: 'run', value: function run(statement, parameters) { var _validateStatementAnd = (0, _util.validateStatementAndParameters)(statement, parameters), query = _validateStatementAnd.query, params = _validateStatementAnd.params; return this._state.run(this._connectionHolder, new _TransactionStreamObserver(this), query, params); } /** * Commits the transaction and returns the result. * * After committing the transaction can no longer be used. * * @returns {Result} New Result */ }, { key: 'commit', value: function commit() { var committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); return committed.result; } /** * Rollbacks the transaction. * * After rolling back, the transaction can no longer be used. * * @returns {Result} New Result */ }, { key: 'rollback', value: function rollback() { var committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); return committed.result; } /** * Check if this transaction is active, which means commit and rollback did not happen. * @return {boolean} true when not committed and not rolled back, false otherwise. */ }, { key: 'isOpen', value: function isOpen() { return this._state == _states.ACTIVE; } }, { key: '_onError', value: function _onError() { var _this = this; if (this.isOpen()) { // attempt to rollback, useful when Transaction#run() failed return this.rollback().catch(function (ignoredError) { // ignore all errors because it is best effort and transaction might already be rolled back }).then(function () { // after rollback attempt change this transaction's state to FAILED _this._state = _states.FAILED; }); } else { // error happened in in-active transaction, just to the cleanup and change state to FAILED this._state = _states.FAILED; this._onClose(); // no async actions needed - return resolved promise return _promise2.default.resolve(); } } }]); return Transaction; }(); /** Internal stream observer used for transactional results*/ /** * Copyright (c) 2002-2018 "Neo4j," * Neo4j Sweden AB [http://neo4j.com] * * This file is part of Neo4j. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var _TransactionStreamObserver = function (_StreamObserver) { (0, _inherits3.default)(_TransactionStreamObserver, _StreamObserver); function _TransactionStreamObserver(tx) { (0, _classCallCheck3.default)(this, _TransactionStreamObserver); var _this2 = (0, _possibleConstructorReturn3.default)(this, (_TransactionStreamObserver.__proto__ || (0, _getPrototypeOf2.default)(_TransactionStreamObserver)).call(this, tx._errorTransformer || function (err) { return err; })); _this2._tx = tx; //this is to to avoid multiple calls to onError caused by IGNORED _this2._hasFailed = false; return _this2; } (0, _createClass3.default)(_TransactionStreamObserver, [{ key: 'onError', value: function onError(error) { var _this3 = this; if (!this._hasFailed) { this._tx._onError().then(function () { (0, _get3.default)(_TransactionStreamObserver.prototype.__proto__ || (0, _getPrototypeOf2.default)(_TransactionStreamObserver.prototype), 'onError', _this3).call(_this3, error); _this3._hasFailed = true; }); } } }, { key: 'onCompleted', value: function onCompleted(meta) { (0, _get3.default)(_TransactionStreamObserver.prototype.__proto__ || (0, _getPrototypeOf2.default)(_TransactionStreamObserver.prototype), 'onCompleted', this).call(this, meta); var bookmark = new _bookmark2.default(meta.bookmark); this._tx._onBookmark(bookmark); } }]); return _TransactionStreamObserver; }(_streamObserver2.default); /** internal state machine of the transaction*/ var _states = { //The transaction is running with no explicit success or failure marked ACTIVE: { commit: function commit(connectionHolder, observer) { return { result: _runPullAll("COMMIT", connectionHolder, observer), state: _states.SUCCEEDED }; }, rollback: function rollback(connectionHolder, observer) { return { result: _runPullAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK }; }, run: function run(connectionHolder, observer, statement, parameters) { connectionHolder.getConnection(observer).then(function (conn) { conn.run(statement, parameters || {}, observer); conn.pullAll(observer); conn.sync(); }).catch(function (error) { return observer.onError(error); }); return _newRunResult(observer, statement, parameters, function () { return observer.serverMetadata(); }); } }, //An error has occurred, transaction can no longer be used and no more messages will // be sent for this transaction. FAILED: { commit: function commit(connectionHolder, observer) { observer.onError({ error: "Cannot commit statements in this transaction, because previous statements in the " + "transaction has failed and the transaction has been rolled back. Please start a new" + " transaction to run another statement." }); return { result: _newDummyResult(observer, "COMMIT", {}), state: _states.FAILED }; }, rollback: function rollback(connectionHolder, observer) { observer.onError({ error: "Cannot rollback transaction, because previous statements in the " + "transaction has failed and the transaction has already been rolled back." }); return { result: _newDummyResult(observer, "ROLLBACK", {}), state: _states.FAILED }; }, run: function run(connectionHolder, observer, statement, parameters) { observer.onError({ error: "Cannot run statement, because previous statements in the " + "transaction has failed and the transaction has already been rolled back." }); return _newDummyResult(observer, statement, parameters); } }, //This transaction has successfully committed SUCCEEDED: { commit: function commit(connectionHolder, observer) { observer.onError({ error: "Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new" + " transaction to run another statement." }); return { result: _newDummyResult(observer, "COMMIT", {}), state: _states.SUCCEEDED }; }, rollback: function rollback(connectionHolder, observer) { observer.onError({ error: "Cannot rollback transaction, because transaction has already been successfully closed." }); return { result: _newDummyResult(observer, "ROLLBACK", {}), state: _states.SUCCEEDED }; }, run: function run(connectionHolder, observer, statement, parameters) { observer.onError({ error: "Cannot run statement, because transaction has already been successfully closed." }); return _newDummyResult(observer, statement, parameters); } }, //This transaction has been rolled back ROLLED_BACK: { commit: function commit(connectionHolder, observer) { observer.onError({ error: "Cannot commit this transaction, because it has already been rolled back." }); return { result: _newDummyResult(observer, "COMMIT", {}), state: _states.ROLLED_BACK }; }, rollback: function rollback(connectionHolder, observer) { observer.onError({ error: "Cannot rollback transaction, because transaction has already been rolled back." }); return { result: _newDummyResult(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK }; }, run: function run(connectionHolder, observer, statement, parameters) { observer.onError({ error: "Cannot run statement, because transaction has already been rolled back." }); return _newDummyResult(observer, statement, parameters); } } }; function _runPullAll(msg, connectionHolder, observer) { connectionHolder.getConnection(observer).then(function (conn) { conn.run(msg, {}, observer); conn.pullAll(observer); conn.sync(); }).catch(function (error) { return observer.onError(error); }); // for commit & rollback we need result that uses real connection holder and notifies it when // connection is not needed and can be safely released to the pool return new _result2.default(observer, msg, {}, emptyMetadataSupplier, connectionHolder); } /** * Creates a {@link Result} with empty connection holder. * Should be used as a result for running cypher statements. They can result in metadata but should not * influence real connection holder to release connections because single transaction can have * {@link Transaction#run} called multiple times. * @param {StreamObserver} observer - an observer for the created result. * @param {string} statement - the cypher statement that produced the result. * @param {object} parameters - the parameters for cypher statement that produced the result. * @param {function} metadataSupplier - the function that returns a metadata object. * @return {Result} new result. * @private */ function _newRunResult(observer, statement, parameters, metadataSupplier) { return new _result2.default(observer, statement, parameters, metadataSupplier, _connectionHolder.EMPTY_CONNECTION_HOLDER); } /** * Creates a {@link Result} without metadata supplier and with empty connection holder. * For cases when result represents an intermediate or failed action, does not require any metadata and does not * need to influence real connection holder to release connections. * @param {StreamObserver} observer - an observer for the created result. * @param {string} statement - the cypher statement that produced the result. * @param {object} parameters - the parameters for cypher statement that produced the result. * @return {Result} new result. * @private */ function _newDummyResult(observer, statement, parameters) { return new _result2.default(observer, statement, parameters, emptyMetadataSupplier, _connectionHolder.EMPTY_CONNECTION_HOLDER); } function emptyMetadataSupplier() { return {}; } exports.default = Transaction;