diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/transaction.cpp | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/transaction.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/transaction.cpp | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp new file mode 100644 index 00000000000..0aa1a7a1c39 --- /dev/null +++ b/yt/cpp/mapreduce/client/transaction.cpp @@ -0,0 +1,195 @@ +#include "transaction.h" + +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> + +#include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +#include <util/datetime/base.h> + +#include <util/generic/scope.h> + +#include <util/random/random.h> + +#include <util/string/builder.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TPingableTransaction::TPingableTransaction( + const IClientRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& parentId, + ITransactionPingerPtr transactionPinger, + const TStartTransactionOptions& options) + : ClientRetryPolicy_(retryPolicy) + , Context_(context) + , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) + , AbortOnTermination_(true) + , AutoPingable_(options.AutoPingable_) + , Pinger_(std::move(transactionPinger)) +{ + auto transactionId = NDetail::NRawClient::StartTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + context, + parentId, + options); + + auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout); + Init(context, transactionId, actualTimeout); +} + +TPingableTransaction::TPingableTransaction( + const IClientRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + ITransactionPingerPtr transactionPinger, + const TAttachTransactionOptions& options) + : ClientRetryPolicy_(retryPolicy) + , Context_(context) + , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) + , AbortOnTermination_(options.AbortOnTermination_) + , AutoPingable_(options.AutoPingable_) + , Pinger_(std::move(transactionPinger)) +{ + auto timeoutNode = NDetail::NRawClient::TryGet( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + context, + TTransactionId(), + "#" + GetGuidAsString(transactionId) + "/@timeout", + TGetOptions()); + if (timeoutNode.IsUndefined()) { + throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist"; + } + auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64()); + Init(context, transactionId, timeout); +} + +void TPingableTransaction::Init( + const TClientContext& context, + const TTransactionId& transactionId, + TDuration timeout) +{ + TransactionId_ = transactionId; + + if (AbortOnTermination_) { + AbortableRegistry_->Add( + TransactionId_, + ::MakeIntrusive<NDetail::TTransactionAbortable>(context, TransactionId_)); + } + + if (AutoPingable_) { + // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'. + auto pingInterval = Context_.Config->PingInterval; + auto safeTimeout = timeout - TDuration::Seconds(5); + MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5)); + MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval); + + Pinger_->RegisterTransaction(*this); + } +} + +TPingableTransaction::~TPingableTransaction() +{ + try { + Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach); + } catch (...) { + } +} + +const TTransactionId TPingableTransaction::GetId() const +{ + return TransactionId_; +} + +const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const { + return {MinPingInterval_, MaxPingInterval_}; +} + +const TClientContext TPingableTransaction::GetContext() const { + return Context_; +} + +void TPingableTransaction::Commit() +{ + Stop(EStopAction::Commit); +} + +void TPingableTransaction::Abort() +{ + Stop(EStopAction::Abort); +} + +void TPingableTransaction::Detach() +{ + Stop(EStopAction::Detach); +} + +void TPingableTransaction::Stop(EStopAction action) +{ + if (Finalized_) { + return; + } + + Y_DEFER { + Finalized_ = true; + if (AutoPingable_ && Pinger_->HasTransaction(*this)) { + Pinger_->RemoveTransaction(*this); + } + }; + + switch (action) { + case EStopAction::Commit: + NDetail::NRawClient::CommitTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + TransactionId_); + break; + case EStopAction::Abort: + NDetail::NRawClient::AbortTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + TransactionId_); + break; + case EStopAction::Detach: + // Do nothing. + break; + } + + AbortableRegistry_->Remove(TransactionId_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TYPath Snapshot( + const IClientRetryPolicyPtr& clientRetryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path) +{ + auto lockId = NDetail::NRawClient::Lock( + clientRetryPolicy->CreatePolicyForGenericRequest(), + context, + transactionId, + path, + ELockMode::LM_SNAPSHOT); + auto lockedNodeId = NDetail::NRawClient::Get( + clientRetryPolicy->CreatePolicyForGenericRequest(), + context, + transactionId, + ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id"); + return "#" + lockedNodeId.AsString(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |