aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/transaction.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/transaction.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/transaction.cpp')
-rw-r--r--yt/cpp/mapreduce/client/transaction.cpp195
1 files changed, 195 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp
new file mode 100644
index 00000000000..0aa1a7a1c39
--- /dev/null
+++ b/yt/cpp/mapreduce/client/transaction.cpp
@@ -0,0 +1,195 @@
+#include "transaction.h"
+
+#include "transaction_pinger.h"
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+
+#include <util/datetime/base.h>
+
+#include <util/generic/scope.h>
+
+#include <util/random/random.h>
+
+#include <util/string/builder.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TPingableTransaction::TPingableTransaction(
+ const IClientRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ ITransactionPingerPtr transactionPinger,
+ const TStartTransactionOptions& options)
+ : ClientRetryPolicy_(retryPolicy)
+ , Context_(context)
+ , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
+ , AbortOnTermination_(true)
+ , AutoPingable_(options.AutoPingable_)
+ , Pinger_(std::move(transactionPinger))
+{
+ auto transactionId = NDetail::NRawClient::StartTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ context,
+ parentId,
+ options);
+
+ auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout);
+ Init(context, transactionId, actualTimeout);
+}
+
+TPingableTransaction::TPingableTransaction(
+ const IClientRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ ITransactionPingerPtr transactionPinger,
+ const TAttachTransactionOptions& options)
+ : ClientRetryPolicy_(retryPolicy)
+ , Context_(context)
+ , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
+ , AbortOnTermination_(options.AbortOnTermination_)
+ , AutoPingable_(options.AutoPingable_)
+ , Pinger_(std::move(transactionPinger))
+{
+ auto timeoutNode = NDetail::NRawClient::TryGet(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ context,
+ TTransactionId(),
+ "#" + GetGuidAsString(transactionId) + "/@timeout",
+ TGetOptions());
+ if (timeoutNode.IsUndefined()) {
+ throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
+ }
+ auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64());
+ Init(context, transactionId, timeout);
+}
+
+void TPingableTransaction::Init(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ TDuration timeout)
+{
+ TransactionId_ = transactionId;
+
+ if (AbortOnTermination_) {
+ AbortableRegistry_->Add(
+ TransactionId_,
+ ::MakeIntrusive<NDetail::TTransactionAbortable>(context, TransactionId_));
+ }
+
+ if (AutoPingable_) {
+ // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'.
+ auto pingInterval = Context_.Config->PingInterval;
+ auto safeTimeout = timeout - TDuration::Seconds(5);
+ MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5));
+ MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval);
+
+ Pinger_->RegisterTransaction(*this);
+ }
+}
+
+TPingableTransaction::~TPingableTransaction()
+{
+ try {
+ Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach);
+ } catch (...) {
+ }
+}
+
+const TTransactionId TPingableTransaction::GetId() const
+{
+ return TransactionId_;
+}
+
+const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const {
+ return {MinPingInterval_, MaxPingInterval_};
+}
+
+const TClientContext TPingableTransaction::GetContext() const {
+ return Context_;
+}
+
+void TPingableTransaction::Commit()
+{
+ Stop(EStopAction::Commit);
+}
+
+void TPingableTransaction::Abort()
+{
+ Stop(EStopAction::Abort);
+}
+
+void TPingableTransaction::Detach()
+{
+ Stop(EStopAction::Detach);
+}
+
+void TPingableTransaction::Stop(EStopAction action)
+{
+ if (Finalized_) {
+ return;
+ }
+
+ Y_DEFER {
+ Finalized_ = true;
+ if (AutoPingable_ && Pinger_->HasTransaction(*this)) {
+ Pinger_->RemoveTransaction(*this);
+ }
+ };
+
+ switch (action) {
+ case EStopAction::Commit:
+ NDetail::NRawClient::CommitTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ Context_,
+ TransactionId_);
+ break;
+ case EStopAction::Abort:
+ NDetail::NRawClient::AbortTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ Context_,
+ TransactionId_);
+ break;
+ case EStopAction::Detach:
+ // Do nothing.
+ break;
+ }
+
+ AbortableRegistry_->Remove(TransactionId_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYPath Snapshot(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path)
+{
+ auto lockId = NDetail::NRawClient::Lock(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ context,
+ transactionId,
+ path,
+ ELockMode::LM_SNAPSHOT);
+ auto lockedNodeId = NDetail::NRawClient::Get(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ context,
+ transactionId,
+ ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
+ return "#" + lockedNodeId.AsString();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT