diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/retryful_writer.h | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/retryful_writer.h')
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h new file mode 100644 index 0000000000..38e351977d --- /dev/null +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -0,0 +1,130 @@ +#pragma once + +#include "transaction.h" +#include "transaction_pinger.h" + +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/http/http.h> +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/io/helpers.h> +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +#include <library/cpp/threading/blocking_queue/blocking_queue.h> + +#include <util/stream/output.h> +#include <util/generic/buffer.h> +#include <util/stream/buffer.h> +#include <util/system/thread.h> +#include <util/system/event.h> + +#include <atomic> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TRetryfulWriter + : public TRawTableWriter +{ +public: + template <class TWriterOptions> + TRetryfulWriter( + IClientRetryPolicyPtr clientRetryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& parentId, + const TString& command, + const TMaybe<TFormat>& format, + const TRichYPath& path, + const TWriterOptions& options) + : ClientRetryPolicy_(std::move(clientRetryPolicy)) + , TransactionPinger_(std::move(transactionPinger)) + , Context_(context) + , Command_(command) + , Format_(format) + , BufferSize_(GetBufferSize(options.WriterOptions_)) + , ParentTransactionId_(parentId) + , WriteTransaction_() + , FilledBuffers_(2) + , EmptyBuffers_(2) + , Buffer_(BufferSize_ * 2) + , Thread_(TThread::TParams{SendThread, this}.SetName("retryful_writer")) + { + Parameters_ = FormIORequestParameters(path, options); + + auto secondaryPath = path; + secondaryPath.Append_ = true; + secondaryPath.Schema_.Clear(); + secondaryPath.CompressionCodec_.Clear(); + secondaryPath.ErasureCodec_.Clear(); + secondaryPath.OptimizeFor_.Clear(); + SecondaryParameters_ = FormIORequestParameters(secondaryPath, options); + + if (options.CreateTransaction_) { + WriteTransaction_.ConstructInPlace(ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions()); + auto append = path.Append_.GetOrElse(false); + auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE); + NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode); + } + + EmptyBuffers_.Push(TBuffer(BufferSize_ * 2)); + } + + ~TRetryfulWriter() override; + void NotifyRowEnd() override; + void Abort() override; + + size_t GetRetryBlockRemainingSize() const + { + return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0; + } + +protected: + void DoWrite(const void* buf, size_t len) override; + void DoFinish() override; + +private: + static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions); + +private: + const IClientRetryPolicyPtr ClientRetryPolicy_; + const ITransactionPingerPtr TransactionPinger_; + const TClientContext Context_; + TString Command_; + TMaybe<TFormat> Format_; + const size_t BufferSize_; + + TNode Parameters_; + TNode SecondaryParameters_; + + TTransactionId ParentTransactionId_; + TMaybe<TPingableTransaction> WriteTransaction_; + + ::NThreading::TBlockingQueue<TBuffer> FilledBuffers_; + ::NThreading::TBlockingQueue<TBuffer> EmptyBuffers_; + + TBuffer Buffer_; + + TThread Thread_; + bool Started_ = false; + std::exception_ptr Exception_ = nullptr; + + enum EWriterState { + Ok, + Completed, + Error, + } WriterState_ = Ok; + +private: + void FlushBuffer(bool lastBlock); + void Send(const TBuffer& buffer); + void CheckWriterState(); + + void SendThread(); + static void* SendThread(void* opaque); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} |