summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/transaction.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 03:37:03 +0300
committermax42 <[email protected]>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/transaction.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/transaction.cpp')
-rw-r--r--yt/cpp/mapreduce/client/transaction.cpp195
1 files changed, 195 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp
new file mode 100644
index 00000000000..0aa1a7a1c39
--- /dev/null
+++ b/yt/cpp/mapreduce/client/transaction.cpp
@@ -0,0 +1,195 @@
+#include "transaction.h"
+
+#include "transaction_pinger.h"
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+
+#include <util/datetime/base.h>
+
+#include <util/generic/scope.h>
+
+#include <util/random/random.h>
+
+#include <util/string/builder.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TPingableTransaction::TPingableTransaction(
+ const IClientRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ ITransactionPingerPtr transactionPinger,
+ const TStartTransactionOptions& options)
+ : ClientRetryPolicy_(retryPolicy)
+ , Context_(context)
+ , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
+ , AbortOnTermination_(true)
+ , AutoPingable_(options.AutoPingable_)
+ , Pinger_(std::move(transactionPinger))
+{
+ auto transactionId = NDetail::NRawClient::StartTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ context,
+ parentId,
+ options);
+
+ auto actualTimeout = options.Timeout_.GetOrElse(Context_.Config->TxTimeout);
+ Init(context, transactionId, actualTimeout);
+}
+
+TPingableTransaction::TPingableTransaction(
+ const IClientRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ ITransactionPingerPtr transactionPinger,
+ const TAttachTransactionOptions& options)
+ : ClientRetryPolicy_(retryPolicy)
+ , Context_(context)
+ , AbortableRegistry_(NDetail::TAbortableRegistry::Get())
+ , AbortOnTermination_(options.AbortOnTermination_)
+ , AutoPingable_(options.AutoPingable_)
+ , Pinger_(std::move(transactionPinger))
+{
+ auto timeoutNode = NDetail::NRawClient::TryGet(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ context,
+ TTransactionId(),
+ "#" + GetGuidAsString(transactionId) + "/@timeout",
+ TGetOptions());
+ if (timeoutNode.IsUndefined()) {
+ throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
+ }
+ auto timeout = TDuration::MilliSeconds(timeoutNode.AsInt64());
+ Init(context, transactionId, timeout);
+}
+
+void TPingableTransaction::Init(
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ TDuration timeout)
+{
+ TransactionId_ = transactionId;
+
+ if (AbortOnTermination_) {
+ AbortableRegistry_->Add(
+ TransactionId_,
+ ::MakeIntrusive<NDetail::TTransactionAbortable>(context, TransactionId_));
+ }
+
+ if (AutoPingable_) {
+ // Compute 'MaxPingInterval_' and 'MinPingInterval_' such that 'pingInterval == (max + min) / 2'.
+ auto pingInterval = Context_.Config->PingInterval;
+ auto safeTimeout = timeout - TDuration::Seconds(5);
+ MaxPingInterval_ = Max(pingInterval, Min(safeTimeout, pingInterval * 1.5));
+ MinPingInterval_ = pingInterval - (MaxPingInterval_ - pingInterval);
+
+ Pinger_->RegisterTransaction(*this);
+ }
+}
+
+TPingableTransaction::~TPingableTransaction()
+{
+ try {
+ Stop(AbortOnTermination_ ? EStopAction::Abort : EStopAction::Detach);
+ } catch (...) {
+ }
+}
+
+const TTransactionId TPingableTransaction::GetId() const
+{
+ return TransactionId_;
+}
+
+const std::pair<TDuration, TDuration> TPingableTransaction::GetPingInterval() const {
+ return {MinPingInterval_, MaxPingInterval_};
+}
+
+const TClientContext TPingableTransaction::GetContext() const {
+ return Context_;
+}
+
+void TPingableTransaction::Commit()
+{
+ Stop(EStopAction::Commit);
+}
+
+void TPingableTransaction::Abort()
+{
+ Stop(EStopAction::Abort);
+}
+
+void TPingableTransaction::Detach()
+{
+ Stop(EStopAction::Detach);
+}
+
+void TPingableTransaction::Stop(EStopAction action)
+{
+ if (Finalized_) {
+ return;
+ }
+
+ Y_DEFER {
+ Finalized_ = true;
+ if (AutoPingable_ && Pinger_->HasTransaction(*this)) {
+ Pinger_->RemoveTransaction(*this);
+ }
+ };
+
+ switch (action) {
+ case EStopAction::Commit:
+ NDetail::NRawClient::CommitTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ Context_,
+ TransactionId_);
+ break;
+ case EStopAction::Abort:
+ NDetail::NRawClient::AbortTransaction(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ Context_,
+ TransactionId_);
+ break;
+ case EStopAction::Detach:
+ // Do nothing.
+ break;
+ }
+
+ AbortableRegistry_->Remove(TransactionId_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYPath Snapshot(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TYPath& path)
+{
+ auto lockId = NDetail::NRawClient::Lock(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ context,
+ transactionId,
+ path,
+ ELockMode::LM_SNAPSHOT);
+ auto lockedNodeId = NDetail::NRawClient::Get(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ context,
+ transactionId,
+ ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
+ return "#" + lockedNodeId.AsString();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT