diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-11-23 11:26:33 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-11-23 12:01:57 +0300 |
commit | 44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300 (patch) | |
tree | cb4d75cd1c6dbc3da0ed927337fd8d1b6ed9da84 /contrib/libs/libpqxx/src/transaction_base.cxx | |
parent | 0e69bf615395fdd48ecee032faaec81bc468b0b8 (diff) | |
download | ydb-44354d0fc55926c1d4510d1d2c9c9f6a1a5e9300.tar.gz |
YQ Connector:test INNER JOIN
Diffstat (limited to 'contrib/libs/libpqxx/src/transaction_base.cxx')
-rw-r--r-- | contrib/libs/libpqxx/src/transaction_base.cxx | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/contrib/libs/libpqxx/src/transaction_base.cxx b/contrib/libs/libpqxx/src/transaction_base.cxx new file mode 100644 index 0000000000..84a7ab7e1b --- /dev/null +++ b/contrib/libs/libpqxx/src/transaction_base.cxx @@ -0,0 +1,577 @@ +/** Common code and definitions for the transaction classes. + * + * pqxx::transaction_base defines the interface for any abstract class that + * represents a database transaction. + * + * Copyright (c) 2000-2019, Jeroen T. Vermeulen. + * + * See COPYING for copyright license. If you did not receive a file called + * COPYING with this source code, please notify the distributor of this mistake, + * or contact the author. + */ +#include "pqxx/compiler-internal.hxx" + +#include <cstring> +#include <stdexcept> + +#include "pqxx/connection_base" +#include "pqxx/result" +#include "pqxx/transaction_base" + +#include "pqxx/internal/gates/connection-transaction.hxx" +#include "pqxx/internal/gates/connection-parameterized_invocation.hxx" +#include "pqxx/internal/gates/transaction-transactionfocus.hxx" + +#include "pqxx/internal/encodings.hxx" + + +using namespace pqxx::internal; + + +pqxx::internal::parameterized_invocation::parameterized_invocation( + connection_base &c, + const std::string &query) : + m_home{c}, + m_query{query} +{ +} + + +pqxx::result pqxx::internal::parameterized_invocation::exec() +{ + std::vector<const char *> values; + std::vector<int> lengths; + std::vector<int> binaries; + const int elements = marshall(values, lengths, binaries); + + return gate::connection_parameterized_invocation{m_home}.parameterized_exec( + m_query, + values.data(), + lengths.data(), + binaries.data(), + elements); +} + + +pqxx::transaction_base::transaction_base(connection_base &C, bool direct) : + namedclass{"transaction_base"}, + m_conn{C} +{ + if (direct) + { + gate::connection_transaction gate{conn()}; + gate.register_transaction(this); + m_registered = true; + } +} + + +pqxx::transaction_base::~transaction_base() +{ + try + { + reactivation_avoidance_clear(); + if (not m_pending_error.empty()) + process_notice("UNPROCESSED ERROR: " + m_pending_error + "\n"); + + if (m_registered) + { + m_conn.process_notice(description() + " was never closed properly!\n"); + gate::connection_transaction gate{conn()}; + gate.unregister_transaction(this); + } + } + catch (const std::exception &e) + { + try + { + process_notice(std::string{e.what()} + "\n"); + } + catch (const std::exception &) + { + process_notice(e.what()); + } + } +} + + +void pqxx::transaction_base::commit() +{ + CheckPendingError(); + + // Check previous status code. Caller should only call this function if + // we're in "implicit" state, but multiple commits are silently accepted. + switch (m_status) + { + case st_nascent: // Empty transaction. No skin off our nose. + return; + + case st_active: // Just fine. This is what we expect. + break; + + case st_aborted: + throw usage_error{"Attempt to commit previously aborted " + description()}; + + case st_committed: + // Transaction has been committed already. This is not exactly proper + // behaviour, but throwing an exception here would only give the impression + // that an abort is needed--which would only confuse things further at this + // stage. + // Therefore, multiple commits are accepted, though under protest. + m_conn.process_notice(description() + " committed more than once.\n"); + return; + + case st_in_doubt: + // Transaction may or may not have been committed. The only thing we can + // really do is keep telling the caller that the transaction is in doubt. + throw in_doubt_error{ + description() + " committed again while in an indeterminate state."}; + + default: + throw internal_error{"pqxx::transaction: invalid status code."}; + } + + // Tricky one. If stream is nested in transaction but inside the same scope, + // the commit() will come before the stream is closed. Which means the + // commit is premature. Punish this swiftly and without fail to discourage + // the habit from forming. + if (m_focus.get()) + throw failure{ + "Attempt to commit " + description() + " with " + + m_focus.get()->description() + " still open."}; + + // Check that we're still connected (as far as we know--this is not an + // absolute thing!) before trying to commit. If the connection was broken + // already, the commit would fail anyway but this way at least we don't remain + // in-doubt as to whether the backend got the commit order at all. + if (not m_conn.is_open()) + throw broken_connection{ + "Broken connection to backend; cannot complete transaction."}; + + try + { + do_commit(); + m_status = st_committed; + } + catch (const in_doubt_error &) + { + m_status = st_in_doubt; + throw; + } + catch (const std::exception &) + { + m_status = st_aborted; + throw; + } + + gate::connection_transaction gate{conn()}; + gate.add_variables(m_vars); + + End(); +} + + +void pqxx::transaction_base::abort() +{ + // Check previous status code. Quietly accept multiple aborts to + // simplify emergency bailout code. + switch (m_status) + { + case st_nascent: // Never began transaction. No need to issue rollback. + break; + + case st_active: + try { do_abort(); } catch (const std::exception &) { } + break; + + case st_aborted: + return; + + case st_committed: + throw usage_error{"Attempt to abort previously committed " + description()}; + + case st_in_doubt: + // Aborting an in-doubt transaction is probably a reasonably sane response + // to an insane situation. Log it, but do not complain. + m_conn.process_notice( + "Warning: " + description() + " aborted after going into " + "indeterminate state; it may have been executed anyway.\n"); + return; + + default: + throw internal_error{"Invalid transaction status."}; + } + + m_status = st_aborted; + End(); +} + + +std::string pqxx::transaction_base::esc_raw(const std::string &str) const +{ + const unsigned char *p = reinterpret_cast<const unsigned char *>(str.c_str()); + return conn().esc_raw(p, str.size()); +} + + +std::string pqxx::transaction_base::quote_raw(const std::string &str) const +{ + const unsigned char *p = reinterpret_cast<const unsigned char *>(str.c_str()); + return conn().quote_raw(p, str.size()); +} + + +void pqxx::transaction_base::activate() +{ + switch (m_status) + { + case st_nascent: + // Make sure transaction has begun before executing anything + Begin(); + break; + + case st_active: + break; + + case st_committed: + case st_aborted: + case st_in_doubt: + throw usage_error{ + "Attempt to activate " + description() + " " + "which is already closed."}; + + default: + throw internal_error{"pqxx::transaction: invalid status code."}; + } +} + + +pqxx::result pqxx::transaction_base::exec( + const std::string &Query, + const std::string &Desc) +{ + CheckPendingError(); + + const std::string N = (Desc.empty() ? "" : "'" + Desc + "' "); + + if (m_focus.get()) + throw usage_error{ + "Attempt to execute query " + N + + "on " + description() + " " + "with " + m_focus.get()->description() + " still open."}; + + try + { + activate(); + } + catch (const usage_error &e) + { + throw usage_error{"Error executing query " + N + ". " + e.what()}; + } + + // TODO: Pass Desc to do_exec(), and from there on down + return do_exec(Query.c_str()); +} + + +pqxx::result pqxx::transaction_base::exec_n( + size_t rows, + const std::string &Query, + const std::string &Desc) +{ + const result r = exec(Query, Desc); + if (r.size() != rows) + { + const std::string N = (Desc.empty() ? "" : "'" + Desc + "'"); + throw unexpected_rows{ + "Expected " + to_string(rows) + " row(s) of data " + "from query " + N + ", got " + to_string(r.size()) + "."}; + } + return r; +} + + +void pqxx::transaction_base::check_rowcount_prepared( + const std::string &statement, + size_t expected_rows, + size_t actual_rows) +{ + if (actual_rows != expected_rows) + { + throw unexpected_rows{ + "Expected " + to_string(expected_rows) + " row(s) of data " + "from prepared statement '" + statement + "', got " + + to_string(actual_rows) + "."}; + } +} + + +void pqxx::transaction_base::check_rowcount_params( + size_t expected_rows, + size_t actual_rows) +{ + if (actual_rows != expected_rows) + { + throw unexpected_rows{ + "Expected " + to_string(expected_rows) + " row(s) of data " + "from parameterised query, got " + to_string(actual_rows) + "."}; + } +} + + +pqxx::internal::parameterized_invocation +pqxx::transaction_base::parameterized(const std::string &query) +{ +#include "pqxx/internal/ignore-deprecated-pre.hxx" + return internal::parameterized_invocation{conn(), query}; +#include "pqxx/internal/ignore-deprecated-post.hxx" +} + + +pqxx::prepare::invocation +pqxx::transaction_base::prepared(const std::string &statement) +{ + try + { + activate(); + } + catch (const usage_error &e) + { + throw usage_error{ + "Error executing prepared statement " + statement + ". " + e.what()}; + } +#include "pqxx/internal/ignore-deprecated-pre.hxx" + return prepare::invocation{*this, statement}; +#include "pqxx/internal/ignore-deprecated-post.hxx" +} + + +pqxx::result pqxx::transaction_base::internal_exec_prepared( + const std::string &statement, + const internal::params &args, + result_format format) +{ + gate::connection_transaction gate{conn()}; + return gate.exec_prepared(statement, args, format); +} + + +pqxx::result pqxx::transaction_base::internal_exec_params( + const std::string &query, + const internal::params &args) +{ + gate::connection_transaction gate{conn()}; + return gate.exec_params(query, args); +} + + +void pqxx::transaction_base::set_variable( + const std::string &Var, + const std::string &Value) +{ + // Before committing to this new value, see what the backend thinks about it + gate::connection_transaction gate{conn()}; + gate.raw_set_var(Var, Value); + m_vars[Var] = Value; +} + + +std::string pqxx::transaction_base::get_variable(const std::string &Var) +{ + const std::map<std::string,std::string>::const_iterator i = m_vars.find(Var); + if (i != m_vars.end()) return i->second; + return gate::connection_transaction{conn()}.raw_get_var(Var); +} + + +void pqxx::transaction_base::Begin() +{ + if (m_status != st_nascent) + throw internal_error{ + "pqxx::transaction: Begin() called while not in nascent state."}; + + try + { + // Better handle any pending notifications before we begin + m_conn.get_notifs(); + + do_begin(); + m_status = st_active; + } + catch (const std::exception &) + { + End(); + throw; + } +} + + +void pqxx::transaction_base::End() noexcept +{ + try + { + try { CheckPendingError(); } + catch (const std::exception &e) { m_conn.process_notice(e.what()); } + + gate::connection_transaction gate{conn()}; + if (m_registered) + { + m_registered = false; + gate.unregister_transaction(this); + } + + if (m_status != st_active) return; + + if (m_focus.get()) + m_conn.process_notice( + "Closing " + description() + " with " + + m_focus.get()->description() + " still open.\n"); + + try { abort(); } + catch (const std::exception &e) { m_conn.process_notice(e.what()); } + + gate.take_reactivation_avoidance(m_reactivation_avoidance.get()); + m_reactivation_avoidance.clear(); + } + catch (const std::exception &e) + { + try { m_conn.process_notice(e.what()); } catch (const std::exception &) {} + } +} + + +void pqxx::transaction_base::register_focus(internal::transactionfocus *S) +{ + m_focus.register_guest(S); +} + + +void pqxx::transaction_base::unregister_focus(internal::transactionfocus *S) + noexcept +{ + try + { + m_focus.unregister_guest(S); + } + catch (const std::exception &e) + { + m_conn.process_notice(std::string{e.what()} + "\n"); + } +} + + +pqxx::result pqxx::transaction_base::direct_exec(const char C[], int Retries) +{ + CheckPendingError(); + return gate::connection_transaction{conn()}.exec(C, Retries); +} + + +void pqxx::transaction_base::register_pending_error(const std::string &Err) + noexcept +{ + if (m_pending_error.empty() and not Err.empty()) + { + try + { + m_pending_error = Err; + } + catch (const std::exception &e) + { + try + { + process_notice("UNABLE TO PROCESS ERROR\n"); + process_notice(e.what()); + process_notice("ERROR WAS:"); + process_notice(Err); + } + catch (...) + { + } + } + } +} + + +void pqxx::transaction_base::CheckPendingError() +{ + if (not m_pending_error.empty()) + { + const std::string Err{m_pending_error}; + m_pending_error.clear(); + throw failure{Err}; + } +} + + +namespace +{ +std::string MakeCopyString( + const std::string &Table, + const std::string &Columns) +{ + std::string Q = "COPY " + Table + " "; + if (not Columns.empty()) Q += "(" + Columns + ") "; + return Q; +} +} // namespace + + +void pqxx::transaction_base::BeginCopyRead( + const std::string &Table, + const std::string &Columns) +{ + exec(MakeCopyString(Table, Columns) + "TO STDOUT"); +} + + +void pqxx::transaction_base::BeginCopyWrite( + const std::string &Table, + const std::string &Columns) +{ + exec(MakeCopyString(Table, Columns) + "FROM STDIN"); +} + + +bool pqxx::transaction_base::read_copy_line(std::string &line) +{ + return gate::connection_transaction{conn()}.read_copy_line(line); +} + + +void pqxx::transaction_base::write_copy_line(const std::string &line) +{ + gate::connection_transaction gate{conn()}; + gate.write_copy_line(line); +} + + +void pqxx::transaction_base::end_copy_write() +{ + gate::connection_transaction gate{conn()}; + gate.end_copy_write(); +} + + +void pqxx::internal::transactionfocus::register_me() +{ + gate::transaction_transactionfocus gate{m_trans}; + gate.register_focus(this); + m_registered = true; +} + + +void pqxx::internal::transactionfocus::unregister_me() noexcept +{ + gate::transaction_transactionfocus gate{m_trans}; + gate.unregister_focus(this); + m_registered = false; +} + +void +pqxx::internal::transactionfocus::reg_pending_error(const std::string &err) + noexcept +{ + gate::transaction_transactionfocus gate{m_trans}; + gate.register_pending_error(err); +} |