diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/retryful_writer.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/retryful_writer.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp new file mode 100644 index 0000000000..12b2939ffa --- /dev/null +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -0,0 +1,163 @@ +#include "retryful_writer.h" + +#include "retry_heavy_write_request.h" + +#include <yt/cpp/mapreduce/http/requests.h> + +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/finish_or_die.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TRetryfulWriter::~TRetryfulWriter() +{ + NDetail::FinishOrDie(this, "TRetryfulWriter"); +} + +void TRetryfulWriter::CheckWriterState() +{ + switch (WriterState_) { + case Ok: + break; + case Completed: + ythrow TApiUsageError() << "Cannot use table writer that is finished"; + case Error: + ythrow TApiUsageError() << "Cannot use table writer that finished with error"; + } +} + +void TRetryfulWriter::NotifyRowEnd() +{ + CheckWriterState(); + if (Buffer_.Size() >= BufferSize_) { + FlushBuffer(false); + } +} + +void TRetryfulWriter::DoWrite(const void* buf, size_t len) +{ + CheckWriterState(); + while (Buffer_.Size() + len > Buffer_.Capacity()) { + Buffer_.Reserve(Buffer_.Capacity() * 2); + } + Buffer_.Append(static_cast<const char*>(buf), len); +} + +void TRetryfulWriter::DoFinish() +{ + if (WriterState_ != Ok) { + return; + } + FlushBuffer(true); + if (Started_) { + FilledBuffers_.Stop(); + Thread_.Join(); + } + if (Exception_) { + WriterState_ = Error; + std::rethrow_exception(Exception_); + } + if (WriteTransaction_) { + WriteTransaction_->Commit(); + } + WriterState_ = Completed; +} + +void TRetryfulWriter::FlushBuffer(bool lastBlock) +{ + if (!Started_) { + if (lastBlock) { + try { + Send(Buffer_); + } catch (...) { + WriterState_ = Error; + throw; + } + return; + } else { + Started_ = true; + Thread_.Start(); + } + } + + auto emptyBuffer = EmptyBuffers_.Pop(); + if (!emptyBuffer) { + WriterState_ = Error; + std::rethrow_exception(Exception_); + } + FilledBuffers_.Push(std::move(Buffer_)); + Buffer_ = std::move(emptyBuffer.GetRef()); +} + +void TRetryfulWriter::Send(const TBuffer& buffer) +{ + THttpHeader header("PUT", Command_); + header.SetInputFormat(Format_); + header.MergeParameters(Parameters_); + + auto streamMaker = [&buffer] () { + return MakeHolder<TBufferInput>(buffer); + }; + + auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); + RetryHeavyWriteRequest(ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker); + + Parameters_ = SecondaryParameters_; // all blocks except the first one are appended +} + +void TRetryfulWriter::SendThread() +{ + while (auto maybeBuffer = FilledBuffers_.Pop()) { + auto& buffer = maybeBuffer.GetRef(); + try { + Send(buffer); + } catch (const std::exception&) { + Exception_ = std::current_exception(); + EmptyBuffers_.Stop(); + break; + } + buffer.Clear(); + EmptyBuffers_.Push(std::move(buffer)); + } +} + +void* TRetryfulWriter::SendThread(void* opaque) +{ + static_cast<TRetryfulWriter*>(opaque)->SendThread(); + return nullptr; +} + +void TRetryfulWriter::Abort() +{ + if (Started_) { + FilledBuffers_.Stop(); + Thread_.Join(); + } + if (WriteTransaction_) { + WriteTransaction_->Abort(); + } + WriterState_ = Completed; +} + +size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions) +{ + auto retryBlockSize = TMaybe<size_t>(); + if (writerOptions) { + if (writerOptions->RetryBlockSize_) { + retryBlockSize = *writerOptions->RetryBlockSize_; + } else if (writerOptions->DesiredChunkSize_) { + retryBlockSize = *writerOptions->DesiredChunkSize_; + } + } + return retryBlockSize.GetOrElse(64_MB); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |