diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/transaction.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/transaction.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/transaction.cpp | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp new file mode 100644 index 00000000000..0aa1a7a1c39 --- /dev/null +++ b/yt/cpp/mapreduce/client/transaction.cpp @@ -0,0 +1,195 @@ +#include "transaction.h" + +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> + +#include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/http/requests.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +#include <util/datetime/base.h> + +#include <util/generic/scope.h> + +#include <util/random/random.h> + +#include <util/string/builder.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TPingableTransaction::TPingableTransaction( + const IClientRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& parentId, + ITransactionPingerPtr transactionPinger, + const TStartTransactionOptions& options) + : ClientRetryPolicy_(retryPolicy) + , Context_(context) + , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) + , AbortOnTermination_(true) + , AutoPingable_(options.AutoPingable_) + , Pinger_(std::move(transactionPinger)) +{ + auto transactionId = NDetail::NRawClient::StartTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + context, + parentId, + options); + + auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout); + Init(context, transactionId, actualTimeout); +} + +TPingableTransaction::TPingableTransaction( + const IClientRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + ITransactionPingerPtr transactionPinger, + const TAttachTransactionOptions& options) + : ClientRetryPolicy_(retryPolicy) + , Context_(context) + , AbortableRegistry_(NDetail::TAbortableRegistry::Get()) + , AbortOnTermination_(options.AbortOnTermination_) + , AutoPingable_(options.AutoPingable_) + , Pinger_(std::move(transactionPinger)) +{ + auto timeoutNode = NDetail::NRawClient::TryGet( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + context, + TTransactionId(), + "#" + GetGuidAsString(transactionId) + "/@timeout", + TGetOptions()); + if (timeoutNode.IsUndefined()) { + throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist"; + } + auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64()); + Init(context, transactionId, timeout); +} + +void TPingableTransaction::Init( + const TClientContext& context, + const TTransactionId& transactionId, + TDuration timeout) +{ + TransactionId_ = transactionId; + + if (AbortOnTermination_) { + AbortableRegistry_->Add( + TransactionId_, + ::MakeIntrusive<NDetail::TTransactionAbortable>(context, TransactionId_)); + } + + if (AutoPingable_) { + // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'. + auto pingInterval = Context_.Config->PingInterval; + auto safeTimeout = timeout - TDuration::Seconds(5); + MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5)); + MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval); + + Pinger_->RegisterTransaction(*this); + } +} + +TPingableTransaction::~TPingableTransaction() +{ + try { + Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach); + } catch (...) { + } +} + +const TTransactionId TPingableTransaction::GetId() const +{ + return TransactionId_; +} + +const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const { + return {MinPingInterval_, MaxPingInterval_}; +} + +const TClientContext TPingableTransaction::GetContext() const { + return Context_; +} + +void TPingableTransaction::Commit() +{ + Stop(EStopAction::Commit); +} + +void TPingableTransaction::Abort() +{ + Stop(EStopAction::Abort); +} + +void TPingableTransaction::Detach() +{ + Stop(EStopAction::Detach); +} + +void TPingableTransaction::Stop(EStopAction action) +{ + if (Finalized_) { + return; + } + + Y_DEFER { + Finalized_ = true; + if (AutoPingable_ && Pinger_->HasTransaction(*this)) { + Pinger_->RemoveTransaction(*this); + } + }; + + switch (action) { + case EStopAction::Commit: + NDetail::NRawClient::CommitTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + TransactionId_); + break; + case EStopAction::Abort: + NDetail::NRawClient::AbortTransaction( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + TransactionId_); + break; + case EStopAction::Detach: + // Do nothing. + break; + } + + AbortableRegistry_->Remove(TransactionId_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TYPath Snapshot( + const IClientRetryPolicyPtr& clientRetryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TYPath& path) +{ + auto lockId = NDetail::NRawClient::Lock( + clientRetryPolicy->CreatePolicyForGenericRequest(), + context, + transactionId, + path, + ELockMode::LM_SNAPSHOT); + auto lockedNodeId = NDetail::NRawClient::Get( + clientRetryPolicy->CreatePolicyForGenericRequest(), + context, + transactionId, + ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id"); + return "#" + lockedNodeId.AsString(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |
