diff options
author | ermolovd <ermolovd@yandex-team.com> | 2023-11-28 09:30:16 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2023-11-28 09:59:46 +0300 |
commit | 5e58015f2e6fddae74e51d9de0870e5f7119cc12 (patch) | |
tree | 0f686ad847d54a66716d717404a1d615139ab22d | |
parent | 86cac93557a380155b8a87abd5802e5d40c027bf (diff) | |
download | ydb-5e58015f2e6fddae74e51d9de0870e5f7119cc12.tar.gz |
YT-19269: table writer implementation that doesn't wait for complete buffer before sending to network
-rw-r--r-- | yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 36 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retry_heavy_write_request.cpp | 107 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retry_heavy_write_request.h | 48 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer_v2.cpp | 384 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer_v2.h | 58 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/ya.make | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/config.h | 18 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/helpers.h | 10 |
15 files changed, 657 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt b/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt index fc711ccea8..0056b9c641 100644 --- a/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt +++ b/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt @@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt index fc711ccea8..0056b9c641 100644 --- a/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt +++ b/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt @@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt index f841673139..69b08c73ce 100644 --- a/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt +++ b/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt @@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt index 064b063845..5658f88328 100644 --- a/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt +++ b/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt @@ -57,6 +57,7 @@ target_sources(cpp-mapreduce-client PRIVATE ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt index 3082662ccb..4feb80136b 100644 --- a/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt +++ b/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt @@ -53,6 +53,7 @@ target_sources(cpp-mapreduce-client PRIVATE ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 7141015492..2122d0d8db 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -2,6 +2,7 @@ #include "retryful_writer.h" #include "retryless_writer.h" +#include "retryful_writer_v2.h" #include <yt/cpp/mapreduce/interface/io.h> #include <yt/cpp/mapreduce/common/fwd.h> @@ -31,15 +32,32 @@ TClientWriter::TClientWriter( BufferSize_, options)); } else { - RawWriter_.Reset(new TRetryfulWriter( - std::move(clientRetryPolicy), - std::move(transactionPinger), - context, - transactionId, - GetWriteTableCommand(context.Config->ApiVersion), - format, - path, - options)); + bool useV2Writer = context.Config->TableWriterVersion == ETableWriterVersion::V2; + if (useV2Writer) { + auto serializedWriterOptions = FormIORequestParameters(options); + + RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>( + std::move(clientRetryPolicy), + std::move(transactionPinger), + context, + transactionId, + GetWriteTableCommand(context.Config->ApiVersion), + format, + path, + serializedWriterOptions, + static_cast<ssize_t>(options.BufferSize_), + options.CreateTransaction_); + } else { + RawWriter_.Reset(new TRetryfulWriter( + std::move(clientRetryPolicy), + std::move(transactionPinger), + context, + transactionId, + GetWriteTableCommand(context.Config->ApiVersion), + format, + path, + options)); + } } } diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h index 8ee912d065..56605a3f31 100644 --- a/yt/cpp/mapreduce/client/client_writer.h +++ b/yt/cpp/mapreduce/client/client_writer.h @@ -8,7 +8,6 @@ namespace NYT { struct TTableWriterOptions; -class TRetryfulWriter; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp index 54a2be02e3..253d7c0d44 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -16,6 +16,8 @@ #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <util/stream/null.h> + namespace NYT { using ::ToString; @@ -93,4 +95,109 @@ void RetryHeavyWriteRequest( //////////////////////////////////////////////////////////////////////////////// +THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters) + : Parameters_(std::move(parameters)) + , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest()) + , StreamFactory_([] { + return MakeHolder<TNullInput>(); + }) +{ + Retry([] { }); +} + +THeavyRequestRetrier::~THeavyRequestRetrier() = default; + +void THeavyRequestRetrier::Update(THeavyRequestRetrier::TStreamFactory streamFactory) +{ + StreamFactory_ = streamFactory; + Retry([this] { + auto stream = StreamFactory_(); + stream->Skip(Attempt_->Offset); + auto transfered = stream->ReadAll(*Attempt_->Request->GetStream()); + Attempt_->Offset += transfered; + }); +} + +void THeavyRequestRetrier::Finish() +{ + Retry([this] { + Attempt_->Request->Finish()->GetResponse(); + Attempt_->Transaction->Commit(); + Attempt_.reset(); + }); +} + +void THeavyRequestRetrier::Retry(const std::function<void()> &function) +{ + while (true) { + try { + if (!Attempt_) { + TryStartAttempt(); + } + function(); + return; + } catch (const std::exception& ex) { + YT_LOG_ERROR("RSP %v - attempt %v failed", + Attempt_->RequestId, + RequestRetryPolicy_->GetAttemptDescription()); + Attempt_.reset(); + + TMaybe<TDuration> backoffDuration; + if (const auto *errorResponse = dynamic_cast<const TErrorResponse *>(&ex)) { + if (!IsRetriable(*errorResponse)) { + throw; + } + backoffDuration = RequestRetryPolicy_->OnRetriableError(*errorResponse); + } else { + if (!IsRetriable(ex)) { + throw; + } + backoffDuration = RequestRetryPolicy_->OnGenericError(ex); + } + + if (!backoffDuration) { + throw; + } + NDetail::TWaitProxy::Get()->Sleep(*backoffDuration); + } + } +} + +void THeavyRequestRetrier::TryStartAttempt() +{ + Attempt_ = std::make_unique<TAttempt>(); + Attempt_->Transaction = std::make_unique<TPingableTransaction>( + Parameters_.ClientRetryPolicy, Parameters_.Context, + Parameters_.TransactionId, + Parameters_.TransactionPinger->GetChildTxPinger(), + TStartTransactionOptions()); + + auto header = Parameters_.Header; + if (Parameters_.Context.ServiceTicketAuth) { + header.SetServiceTicket(Parameters_.Context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(Parameters_.Context.Token); + } + + if (Parameters_.Context.ImpersonationUser) { + header.SetImpersonationUser(*Parameters_.Context.ImpersonationUser); + } + auto hostName = GetProxyForHeavyRequest(Parameters_.Context); + Attempt_->RequestId = CreateGuidAsString(); + + UpdateHeaderForProxyIfNeed(hostName, Parameters_.Context, header); + + header.AddTransactionId(Attempt_->Transaction->GetId(), /* overwrite = */ true); + header.SetRequestCompression(ToString(Parameters_.Context.Config->ContentEncoding)); + + Attempt_->Request = Parameters_.Context.HttpClient->StartRequest( + GetFullUrlForProxy(hostName, Parameters_.Context, header), + Attempt_->RequestId, header); + + auto stream = StreamFactory_(); + stream->ReadAll(*Attempt_->Request->GetStream()); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h index 647cad302c..853306ff42 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h @@ -1,13 +1,59 @@ #pragma once +#include <yt/cpp/mapreduce/client/transaction.h> #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> namespace NYT { /////////////////////////////////////////////////////////////////////////////// +class THeavyRequestRetrier +{ +public: + struct TParameters + { + IClientRetryPolicyPtr ClientRetryPolicy; + ITransactionPingerPtr TransactionPinger; + TClientContext Context; + TTransactionId TransactionId; + THttpHeader Header; + }; + + using TStreamFactory = std::function<THolder<IInputStream>()>; + +public: + explicit THeavyRequestRetrier(TParameters parameters); + ~THeavyRequestRetrier(); + + void Update(TStreamFactory streamFactory); + void Finish(); + + private: + void Retry(const std::function<void()>& function); + + void TryStartAttempt(); + + private: + const TParameters Parameters_; + const IRequestRetryPolicyPtr RequestRetryPolicy_; + + struct TAttempt + { + std::unique_ptr<TPingableTransaction> Transaction; + TString RequestId; + NHttpClient::IHttpRequestPtr Request; + ssize_t Offset = 0; + }; + std::unique_ptr<TAttempt> Attempt_; + + TStreamFactory StreamFactory_; +}; + +/////////////////////////////////////////////////////////////////////////////// + void RetryHeavyWriteRequest( const IClientRetryPolicyPtr& clientRetryPolicy, const ITransactionPingerPtr& transactionPinger, @@ -16,6 +62,6 @@ void RetryHeavyWriteRequest( THttpHeader& header, std::function<THolder<IInputStream>()> streamMaker); -/////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index 0725a36aac..8c6de1c266 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -18,8 +18,6 @@ #include <util/system/thread.h> #include <util/system/event.h> -#include <atomic> - namespace NYT { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp new file mode 100644 index 0000000000..b8a2bd2164 --- /dev/null +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp @@ -0,0 +1,384 @@ +#include "retryful_writer_v2.h" + +#include <yt/cpp/mapreduce/client/retry_heavy_write_request.h> +#include <yt/cpp/mapreduce/client/transaction.h> +#include <yt/cpp/mapreduce/client/transaction_pinger.h> +#include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/helpers.h> +#include <yt/cpp/mapreduce/http/http.h> + +#include <util/system/condvar.h> + +#include <queue> + +namespace NYT::NPrivate { + +//////////////////////////////////////////////////////////////////////////////// + +class TRetryfulWriterV2::TSentBuffer +{ +public: + TSentBuffer() = default; + TSentBuffer(const TSentBuffer& ) = delete; + + std::pair<std::shared_ptr<char[]>, ssize_t> Snapshot() const + { + return {Buffer_, Size_}; + } + + void Clear() + { + Size_ = 0; + } + + ssize_t Size() const + { + return Size_; + } + + void Append(const void* data, ssize_t size) + { + auto newSize = Size_ + size; + if (newSize < Capacity_) { + memcpy(Buffer_.get() + Size_, data, size); + } else { + // Closest power of 2 exceeding new size + auto newCapacity = 1 << (MostSignificantBit(newSize) + 1); + newCapacity = Max<ssize_t>(64, newCapacity); + auto newBuffer = std::make_shared<char[]>(newCapacity); + memcpy(newBuffer.get(), Buffer_.get(), Size_); + memcpy(newBuffer.get() + Size_, data, size); + Buffer_ = newBuffer; + Capacity_ = newCapacity; + } + Size_ = newSize; + } + +private: + std::shared_ptr<char[]> Buffer_ = nullptr; + ssize_t Size_ = 0; + ssize_t Capacity_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TRetryfulWriterV2::TSender +{ +public: + TSender(TRichYPath path, THeavyRequestRetrier::TParameters parameters) + : SenderThread_( + [this, path=std::move(path), parameters=std::move(parameters)] { + ThreadMain(std::move(path), parameters); + }) + { + SenderThread_.SetCurrentThreadName("retryful-writer-v2-sender"); + SenderThread_.Start(); + } + + ~TSender() + { + Abort(); + SenderThread_.Join(); + } + + void Abort() + { + auto g = Guard(Lock_); + SetFinishedState(EState::Aborted, g); + } + + void Finish() + { + { + auto g = Guard(Lock_); + SetFinishedState(EState::Completed, g); + } + SenderThread_.Join(); + CheckNoError(); + + Y_ABORT_UNLESS(TaskIdQueue_.empty()); + Y_ABORT_UNLESS(TaskMap_.empty()); + } + + // Return future that is complete once upload of this buffer is successfully complete + std::pair<NThreading::TFuture<void>, int> StartBlock() + { + auto g = Guard(Lock_); + + CheckNoError(); + + auto taskId = NextTaskId_++; + const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{}); + Y_ABORT_UNLESS(inserted); + TaskIdQueue_.push(taskId); + HaveMoreData_.Signal(); + it->second.SendingComplete = NThreading::NewPromise(); + return {it->second.SendingComplete, taskId}; + } + + void UpdateBlock(int taskId, const TSentBuffer& buffer, bool complete) + { + auto snapshot = buffer.Snapshot(); + + { + auto g = Guard(Lock_); + + CheckNoError(); + + auto it = TaskMap_.find(taskId); + Y_ABORT_UNLESS(it != TaskMap_.end()); + auto& writeTask = it->second; + writeTask.Data = std::move(snapshot.first); + writeTask.Size = snapshot.second; + writeTask.BufferComplete = complete; + + if (TaskIdQueue_.empty() || TaskIdQueue_.back() != taskId) { + TaskIdQueue_.push(taskId); + } + + HaveMoreData_.Signal(); + } + } + +private: + enum class EState; + +private: + void CheckNoError() + { + if (Error_) { + std::rethrow_exception(Error_); + } + } + + void SetFinishedState(EState state, TGuard<TMutex>&) + { + if (State_ == EState::Running) { + State_ = state; + } + HaveMoreData_.Signal(); + } + + void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters) + { + THolder<THeavyRequestRetrier> retrier; + + auto firstRequestParameters = parameters; + auto restRequestParameters = parameters; + + { + TNode firstPath = PathToNode(path); + firstRequestParameters.Header.MergeParameters(TNode()("path", firstPath), /*overwrite*/ true); + + TNode restPath = PathToNode(TRichYPath(path.Path_).Append(true)); + restRequestParameters.Header.MergeParameters(TNode()("path", restPath), /*overwrite*/ true); + } + + const auto* currentParameters = &firstRequestParameters; + + while (true) { + int taskId = 0; + TWriteTask task; + { + auto g = Guard(Lock_); + while (State_ == EState::Running && TaskIdQueue_.empty()) { + HaveMoreData_.Wait(Lock_); + } + + if ( + State_ == EState::Aborted || + State_ == EState::Completed && TaskIdQueue_.empty() + ) { + break; + } + + taskId = TaskIdQueue_.front(); + TaskIdQueue_.pop(); + if (auto it = TaskMap_.find(taskId); it != TaskMap_.end()) { + task = it->second; + } else { + Y_ABORT(); + } + } + + try { + if (!retrier) { + retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters); + } + retrier->Update([task=task] { + return MakeHolder<TMemoryInput>(task.Data.get(), task.Size); + }); + if (task.BufferComplete) { + retrier->Finish(); + retrier.Reset(); + } + } catch (const std::exception& ex) { + task.SendingComplete.SetException(std::current_exception()); + auto g = Guard(Lock_); + Error_ = std::current_exception(); + return; + } + + if (task.BufferComplete) { + retrier.Reset(); + task.SendingComplete.SetValue(); + currentParameters = &restRequestParameters; + + auto g = Guard(Lock_); + auto erased = TaskMap_.erase(taskId); + Y_ABORT_UNLESS(erased == 1); + } + } + + if (State_ == EState::Completed) { + auto g = Guard(Lock_); + Y_ABORT_UNLESS(TaskIdQueue_.empty()); + Y_ABORT_UNLESS(TaskMap_.empty()); + } + } + +private: + struct TWriteTask + { + NThreading::TPromise<void> SendingComplete; + std::shared_ptr<char[]> Data; + ssize_t Size = 0; + bool BufferComplete = false; + }; + + TMutex Lock_; + TCondVar HaveMoreData_; + + TThread SenderThread_; + + THashMap<int, TWriteTask> TaskMap_; + std::queue<int> TaskIdQueue_; + + std::exception_ptr Error_; + enum class EState { + Running, + Completed, + Aborted, + }; + std::atomic<EState> State_ = EState::Running; + int NextTaskId_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TRetryfulWriterV2::TSendTask +{ + TSentBuffer Buffer; + NThreading::TFuture<void> SentFuture = NThreading::MakeFuture(); + int TaskId = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TRetryfulWriterV2::TRetryfulWriterV2( + IClientRetryPolicyPtr clientRetryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& parentId, + const TString& command, + const TMaybe<TFormat>& format, + const TRichYPath& path, + const TNode& serializedWriterOptions, + ssize_t bufferSize, + bool createTransaction) + : BufferSize_(bufferSize) + , Current_(MakeHolder<TSendTask>()) + , Previous_(MakeHolder<TSendTask>()) +{ + THttpHeader httpHeader("PUT", command); + httpHeader.SetInputFormat(format); + httpHeader.MergeParameters(serializedWriterOptions); + + if (createTransaction) { + WriteTransaction_ = MakeHolder<TPingableTransaction>( + clientRetryPolicy, + context, + parentId, + transactionPinger->GetChildTxPinger(), + TStartTransactionOptions() + ); + auto append = path.Append_.GetOrElse(false); + auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE); + NDetail::NRawClient::Lock( + clientRetryPolicy->CreatePolicyForGenericRequest(), + context, + WriteTransaction_->GetId(), + path.Path_, + lockMode + ); + } + + THeavyRequestRetrier::TParameters parameters = { + .ClientRetryPolicy = clientRetryPolicy, + .TransactionPinger = transactionPinger, + .Context = context, + .TransactionId = WriteTransaction_ ? WriteTransaction_->GetId() : parentId, + .Header = std::move(httpHeader), + }; + + Sender_ = MakeHolder<TSender>(path, parameters); + + DoStartBatch(); +} + +void TRetryfulWriterV2::Abort() +{ + if (Sender_) { + Sender_->Abort(); + } +} + +void TRetryfulWriterV2::DoFinish() +{ + if (Sender_) { + Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true); + Sender_->Finish(); + Sender_.Reset(); + } + if (WriteTransaction_) { + WriteTransaction_->Commit(); + } +} + +void TRetryfulWriterV2::DoStartBatch() +{ + Previous_->SentFuture.Wait(); + + std::swap(Previous_, Current_); + auto&& [future, taskId] = Sender_->StartBlock(); + Current_->SentFuture = future; + Current_->TaskId = taskId; + Current_->Buffer.Clear(); + NextSizeToSend_ = SendStep_; +} + +void TRetryfulWriterV2::DoWrite(const void* buf, size_t len) +{ + Current_->Buffer.Append(buf, len); + auto currentSize = Current_->Buffer.Size(); + if (currentSize >= NextSizeToSend_) { + Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, false); + NextSizeToSend_ = currentSize + SendStep_; + } +} + +void TRetryfulWriterV2::NotifyRowEnd() +{ + if (Current_->Buffer.Size() >= BufferSize_) { + Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true); + DoStartBatch(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NPrivate diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h new file mode 100644 index 0000000000..c344fd0c6c --- /dev/null +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h @@ -0,0 +1,58 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> + +#include <yt/cpp/mapreduce/client/transaction.h> +#include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/interface/io.h> + +#include <util/generic/size_literals.h> + +namespace NYT::NPrivate { + +//////////////////////////////////////////////////////////////////////////////// + +class TRetryfulWriterV2 + : public TRawTableWriter +{ +public: + TRetryfulWriterV2( + IClientRetryPolicyPtr clientRetryPolicy, + ITransactionPingerPtr transactionPinger, + const TClientContext& context, + const TTransactionId& parentId, + const TString& command, + const TMaybe<TFormat>& format, + const TRichYPath& path, + const TNode& serializedWriterOptions, + ssize_t bufferSize, + bool createTranasaction); + + void NotifyRowEnd() override; + void Abort() override; + +protected: + void DoWrite(const void* buf, size_t len) override; + void DoFinish() override; + + void DoStartBatch(); + +private: + class TSentBuffer; + class TSender; + struct TSendTask; + + const ssize_t BufferSize_; + const ssize_t SendStep_ = 64_KB; + ssize_t NextSizeToSend_; + THolder<TSender> Sender_; + THolder<TPingableTransaction> WriteTransaction_; + + THolder<TSendTask> Current_; + THolder<TSendTask> Previous_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NPrivate diff --git a/yt/cpp/mapreduce/client/ya.make b/yt/cpp/mapreduce/client/ya.make index 58a14cc060..c159492808 100644 --- a/yt/cpp/mapreduce/client/ya.make +++ b/yt/cpp/mapreduce/client/ya.make @@ -22,6 +22,7 @@ SRCS( retry_heavy_write_request.cpp retry_transaction.cpp retryful_writer.cpp + retryful_writer_v2.cpp retryless_writer.cpp skiff.cpp structured_table_formats.cpp diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index e0868b60b4..b6d34f8895 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -54,6 +54,21 @@ DEFINE_ENUM(EUploadDeduplicationMode, //////////////////////////////////////////////////////////////////////////////// +/// Enum describing possible versions of table writer implemetation. +enum class ETableWriterVersion +{ + /// Allow library to choose version of writer. + Auto, + + /// Stable but slower version of writer. + V1, + + /// Unstable but faster version of writer (going to be default in the future). + V2, +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TConfig : public TThrRefBase { @@ -177,6 +192,9 @@ struct TConfig // (cf. https://ytsaurus.tech/docs/en/user-guide/proxy/http-reference#framing). THashSet<TString> CommandsWithFraming; + /// Which implemetation of table writer to use. + ETableWriterVersion TableWriterVersion = ETableWriterVersion::Auto; + static bool GetBool(const char* var, bool defaultValue = false); static int GetInt(const char* var, int defaultValue); static TDuration GetDuration(const char* var, TDuration defaultValue); diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h index 5dbbf20906..0733ff417c 100644 --- a/yt/cpp/mapreduce/io/helpers.h +++ b/yt/cpp/mapreduce/io/helpers.h @@ -33,6 +33,16 @@ struct TIOOptionsTraits<TTableWriterOptions> }; template <class TOptions> +TNode FormIORequestParameters(const TOptions& options) +{ + TNode params = TNode::CreateMap(); + if (options.Config_) { + params[TIOOptionsTraits<TOptions>::ConfigName] = *options.Config_; + } + return params; +} + +template <class TOptions> TNode FormIORequestParameters( const TRichYPath& path, const TOptions& options) |