aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorermolovd <ermolovd@yandex-team.com>2023-11-28 09:30:16 +0300
committerermolovd <ermolovd@yandex-team.com>2023-11-28 09:59:46 +0300
commit5e58015f2e6fddae74e51d9de0870e5f7119cc12 (patch)
tree0f686ad847d54a66716d717404a1d615139ab22d
parent86cac93557a380155b8a87abd5802e5d40c027bf (diff)
downloadydb-5e58015f2e6fddae74e51d9de0870e5f7119cc12.tar.gz
YT-19269: table writer implementation that doesn't wait for complete buffer before sending to network
-rw-r--r--yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt1
-rw-r--r--yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt1
-rw-r--r--yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt1
-rw-r--r--yt/cpp/mapreduce/client/client_writer.cpp36
-rw-r--r--yt/cpp/mapreduce/client/client_writer.h1
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp107
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.h48
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.h2
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.cpp384
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.h58
-rw-r--r--yt/cpp/mapreduce/client/ya.make1
-rw-r--r--yt/cpp/mapreduce/interface/config.h18
-rw-r--r--yt/cpp/mapreduce/io/helpers.h10
15 files changed, 657 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt b/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt
index fc711ccea8..0056b9c641 100644
--- a/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt
+++ b/yt/cpp/mapreduce/client/CMakeLists.darwin-arm64.txt
@@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt
index fc711ccea8..0056b9c641 100644
--- a/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt
+++ b/yt/cpp/mapreduce/client/CMakeLists.darwin-x86_64.txt
@@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt
index f841673139..69b08c73ce 100644
--- a/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt
+++ b/yt/cpp/mapreduce/client/CMakeLists.linux-aarch64.txt
@@ -56,6 +56,7 @@ target_sources(cpp-mapreduce-client PRIVATE
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt
index 064b063845..5658f88328 100644
--- a/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt
+++ b/yt/cpp/mapreduce/client/CMakeLists.linux-x86_64.txt
@@ -57,6 +57,7 @@ target_sources(cpp-mapreduce-client PRIVATE
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt
index 3082662ccb..4feb80136b 100644
--- a/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt
+++ b/yt/cpp/mapreduce/client/CMakeLists.windows-x86_64.txt
@@ -53,6 +53,7 @@ target_sources(cpp-mapreduce-client PRIVATE
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retry_transaction.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/retryless_writer.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/skiff.cpp
${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp
index 7141015492..2122d0d8db 100644
--- a/yt/cpp/mapreduce/client/client_writer.cpp
+++ b/yt/cpp/mapreduce/client/client_writer.cpp
@@ -2,6 +2,7 @@
#include "retryful_writer.h"
#include "retryless_writer.h"
+#include "retryful_writer_v2.h"
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/common/fwd.h>
@@ -31,15 +32,32 @@ TClientWriter::TClientWriter(
BufferSize_,
options));
} else {
- RawWriter_.Reset(new TRetryfulWriter(
- std::move(clientRetryPolicy),
- std::move(transactionPinger),
- context,
- transactionId,
- GetWriteTableCommand(context.Config->ApiVersion),
- format,
- path,
- options));
+ bool useV2Writer = context.Config->TableWriterVersion == ETableWriterVersion::V2;
+ if (useV2Writer) {
+ auto serializedWriterOptions = FormIORequestParameters(options);
+
+ RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
+ std::move(clientRetryPolicy),
+ std::move(transactionPinger),
+ context,
+ transactionId,
+ GetWriteTableCommand(context.Config->ApiVersion),
+ format,
+ path,
+ serializedWriterOptions,
+ static_cast<ssize_t>(options.BufferSize_),
+ options.CreateTransaction_);
+ } else {
+ RawWriter_.Reset(new TRetryfulWriter(
+ std::move(clientRetryPolicy),
+ std::move(transactionPinger),
+ context,
+ transactionId,
+ GetWriteTableCommand(context.Config->ApiVersion),
+ format,
+ path,
+ options));
+ }
}
}
diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h
index 8ee912d065..56605a3f31 100644
--- a/yt/cpp/mapreduce/client/client_writer.h
+++ b/yt/cpp/mapreduce/client/client_writer.h
@@ -8,7 +8,6 @@
namespace NYT {
struct TTableWriterOptions;
-class TRetryfulWriter;
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
index 54a2be02e3..253d7c0d44 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -16,6 +16,8 @@
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <util/stream/null.h>
+
namespace NYT {
using ::ToString;
@@ -93,4 +95,109 @@ void RetryHeavyWriteRequest(
////////////////////////////////////////////////////////////////////////////////
+THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters)
+ : Parameters_(std::move(parameters))
+ , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest())
+ , StreamFactory_([] {
+ return MakeHolder<TNullInput>();
+ })
+{
+ Retry([] { });
+}
+
+THeavyRequestRetrier::~THeavyRequestRetrier() = default;
+
+void THeavyRequestRetrier::Update(THeavyRequestRetrier::TStreamFactory streamFactory)
+{
+ StreamFactory_ = streamFactory;
+ Retry([this] {
+ auto stream = StreamFactory_();
+ stream->Skip(Attempt_->Offset);
+ auto transfered = stream->ReadAll(*Attempt_->Request->GetStream());
+ Attempt_->Offset += transfered;
+ });
+}
+
+void THeavyRequestRetrier::Finish()
+{
+ Retry([this] {
+ Attempt_->Request->Finish()->GetResponse();
+ Attempt_->Transaction->Commit();
+ Attempt_.reset();
+ });
+}
+
+void THeavyRequestRetrier::Retry(const std::function<void()> &function)
+{
+ while (true) {
+ try {
+ if (!Attempt_) {
+ TryStartAttempt();
+ }
+ function();
+ return;
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("RSP %v - attempt %v failed",
+ Attempt_->RequestId,
+ RequestRetryPolicy_->GetAttemptDescription());
+ Attempt_.reset();
+
+ TMaybe<TDuration> backoffDuration;
+ if (const auto *errorResponse = dynamic_cast<const TErrorResponse *>(&ex)) {
+ if (!IsRetriable(*errorResponse)) {
+ throw;
+ }
+ backoffDuration = RequestRetryPolicy_->OnRetriableError(*errorResponse);
+ } else {
+ if (!IsRetriable(ex)) {
+ throw;
+ }
+ backoffDuration = RequestRetryPolicy_->OnGenericError(ex);
+ }
+
+ if (!backoffDuration) {
+ throw;
+ }
+ NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
+ }
+ }
+}
+
+void THeavyRequestRetrier::TryStartAttempt()
+{
+ Attempt_ = std::make_unique<TAttempt>();
+ Attempt_->Transaction = std::make_unique<TPingableTransaction>(
+ Parameters_.ClientRetryPolicy, Parameters_.Context,
+ Parameters_.TransactionId,
+ Parameters_.TransactionPinger->GetChildTxPinger(),
+ TStartTransactionOptions());
+
+ auto header = Parameters_.Header;
+ if (Parameters_.Context.ServiceTicketAuth) {
+ header.SetServiceTicket(Parameters_.Context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(Parameters_.Context.Token);
+ }
+
+ if (Parameters_.Context.ImpersonationUser) {
+ header.SetImpersonationUser(*Parameters_.Context.ImpersonationUser);
+ }
+ auto hostName = GetProxyForHeavyRequest(Parameters_.Context);
+ Attempt_->RequestId = CreateGuidAsString();
+
+ UpdateHeaderForProxyIfNeed(hostName, Parameters_.Context, header);
+
+ header.AddTransactionId(Attempt_->Transaction->GetId(), /* overwrite = */ true);
+ header.SetRequestCompression(ToString(Parameters_.Context.Config->ContentEncoding));
+
+ Attempt_->Request = Parameters_.Context.HttpClient->StartRequest(
+ GetFullUrlForProxy(hostName, Parameters_.Context, header),
+ Attempt_->RequestId, header);
+
+ auto stream = StreamFactory_();
+ stream->ReadAll(*Attempt_->Request->GetStream());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
index 647cad302c..853306ff42 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
@@ -1,13 +1,59 @@
#pragma once
+#include <yt/cpp/mapreduce/client/transaction.h>
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
namespace NYT {
///////////////////////////////////////////////////////////////////////////////
+class THeavyRequestRetrier
+{
+public:
+ struct TParameters
+ {
+ IClientRetryPolicyPtr ClientRetryPolicy;
+ ITransactionPingerPtr TransactionPinger;
+ TClientContext Context;
+ TTransactionId TransactionId;
+ THttpHeader Header;
+ };
+
+ using TStreamFactory = std::function<THolder<IInputStream>()>;
+
+public:
+ explicit THeavyRequestRetrier(TParameters parameters);
+ ~THeavyRequestRetrier();
+
+ void Update(TStreamFactory streamFactory);
+ void Finish();
+
+ private:
+ void Retry(const std::function<void()>& function);
+
+ void TryStartAttempt();
+
+ private:
+ const TParameters Parameters_;
+ const IRequestRetryPolicyPtr RequestRetryPolicy_;
+
+ struct TAttempt
+ {
+ std::unique_ptr<TPingableTransaction> Transaction;
+ TString RequestId;
+ NHttpClient::IHttpRequestPtr Request;
+ ssize_t Offset = 0;
+ };
+ std::unique_ptr<TAttempt> Attempt_;
+
+ TStreamFactory StreamFactory_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
void RetryHeavyWriteRequest(
const IClientRetryPolicyPtr& clientRetryPolicy,
const ITransactionPingerPtr& transactionPinger,
@@ -16,6 +62,6 @@ void RetryHeavyWriteRequest(
THttpHeader& header,
std::function<THolder<IInputStream>()> streamMaker);
-///////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h
index 0725a36aac..8c6de1c266 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.h
+++ b/yt/cpp/mapreduce/client/retryful_writer.h
@@ -18,8 +18,6 @@
#include <util/system/thread.h>
#include <util/system/event.h>
-#include <atomic>
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
new file mode 100644
index 0000000000..b8a2bd2164
--- /dev/null
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
@@ -0,0 +1,384 @@
+#include "retryful_writer_v2.h"
+
+#include <yt/cpp/mapreduce/client/retry_heavy_write_request.h>
+#include <yt/cpp/mapreduce/client/transaction.h>
+#include <yt/cpp/mapreduce/client/transaction_pinger.h>
+#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/http.h>
+
+#include <util/system/condvar.h>
+
+#include <queue>
+
+namespace NYT::NPrivate {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRetryfulWriterV2::TSentBuffer
+{
+public:
+ TSentBuffer() = default;
+ TSentBuffer(const TSentBuffer& ) = delete;
+
+ std::pair<std::shared_ptr<char[]>, ssize_t> Snapshot() const
+ {
+ return {Buffer_, Size_};
+ }
+
+ void Clear()
+ {
+ Size_ = 0;
+ }
+
+ ssize_t Size() const
+ {
+ return Size_;
+ }
+
+ void Append(const void* data, ssize_t size)
+ {
+ auto newSize = Size_ + size;
+ if (newSize < Capacity_) {
+ memcpy(Buffer_.get() + Size_, data, size);
+ } else {
+ // Closest power of 2 exceeding new size
+ auto newCapacity = 1 << (MostSignificantBit(newSize) + 1);
+ newCapacity = Max<ssize_t>(64, newCapacity);
+ auto newBuffer = std::make_shared<char[]>(newCapacity);
+ memcpy(newBuffer.get(), Buffer_.get(), Size_);
+ memcpy(newBuffer.get() + Size_, data, size);
+ Buffer_ = newBuffer;
+ Capacity_ = newCapacity;
+ }
+ Size_ = newSize;
+ }
+
+private:
+ std::shared_ptr<char[]> Buffer_ = nullptr;
+ ssize_t Size_ = 0;
+ ssize_t Capacity_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRetryfulWriterV2::TSender
+{
+public:
+ TSender(TRichYPath path, THeavyRequestRetrier::TParameters parameters)
+ : SenderThread_(
+ [this, path=std::move(path), parameters=std::move(parameters)] {
+ ThreadMain(std::move(path), parameters);
+ })
+ {
+ SenderThread_.SetCurrentThreadName("retryful-writer-v2-sender");
+ SenderThread_.Start();
+ }
+
+ ~TSender()
+ {
+ Abort();
+ SenderThread_.Join();
+ }
+
+ void Abort()
+ {
+ auto g = Guard(Lock_);
+ SetFinishedState(EState::Aborted, g);
+ }
+
+ void Finish()
+ {
+ {
+ auto g = Guard(Lock_);
+ SetFinishedState(EState::Completed, g);
+ }
+ SenderThread_.Join();
+ CheckNoError();
+
+ Y_ABORT_UNLESS(TaskIdQueue_.empty());
+ Y_ABORT_UNLESS(TaskMap_.empty());
+ }
+
+ // Return future that is complete once upload of this buffer is successfully complete
+ std::pair<NThreading::TFuture<void>, int> StartBlock()
+ {
+ auto g = Guard(Lock_);
+
+ CheckNoError();
+
+ auto taskId = NextTaskId_++;
+ const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{});
+ Y_ABORT_UNLESS(inserted);
+ TaskIdQueue_.push(taskId);
+ HaveMoreData_.Signal();
+ it->second.SendingComplete = NThreading::NewPromise();
+ return {it->second.SendingComplete, taskId};
+ }
+
+ void UpdateBlock(int taskId, const TSentBuffer& buffer, bool complete)
+ {
+ auto snapshot = buffer.Snapshot();
+
+ {
+ auto g = Guard(Lock_);
+
+ CheckNoError();
+
+ auto it = TaskMap_.find(taskId);
+ Y_ABORT_UNLESS(it != TaskMap_.end());
+ auto& writeTask = it->second;
+ writeTask.Data = std::move(snapshot.first);
+ writeTask.Size = snapshot.second;
+ writeTask.BufferComplete = complete;
+
+ if (TaskIdQueue_.empty() || TaskIdQueue_.back() != taskId) {
+ TaskIdQueue_.push(taskId);
+ }
+
+ HaveMoreData_.Signal();
+ }
+ }
+
+private:
+ enum class EState;
+
+private:
+ void CheckNoError()
+ {
+ if (Error_) {
+ std::rethrow_exception(Error_);
+ }
+ }
+
+ void SetFinishedState(EState state, TGuard<TMutex>&)
+ {
+ if (State_ == EState::Running) {
+ State_ = state;
+ }
+ HaveMoreData_.Signal();
+ }
+
+ void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters)
+ {
+ THolder<THeavyRequestRetrier> retrier;
+
+ auto firstRequestParameters = parameters;
+ auto restRequestParameters = parameters;
+
+ {
+ TNode firstPath = PathToNode(path);
+ firstRequestParameters.Header.MergeParameters(TNode()("path", firstPath), /*overwrite*/ true);
+
+ TNode restPath = PathToNode(TRichYPath(path.Path_).Append(true));
+ restRequestParameters.Header.MergeParameters(TNode()("path", restPath), /*overwrite*/ true);
+ }
+
+ const auto* currentParameters = &firstRequestParameters;
+
+ while (true) {
+ int taskId = 0;
+ TWriteTask task;
+ {
+ auto g = Guard(Lock_);
+ while (State_ == EState::Running && TaskIdQueue_.empty()) {
+ HaveMoreData_.Wait(Lock_);
+ }
+
+ if (
+ State_ == EState::Aborted ||
+ State_ == EState::Completed && TaskIdQueue_.empty()
+ ) {
+ break;
+ }
+
+ taskId = TaskIdQueue_.front();
+ TaskIdQueue_.pop();
+ if (auto it = TaskMap_.find(taskId); it != TaskMap_.end()) {
+ task = it->second;
+ } else {
+ Y_ABORT();
+ }
+ }
+
+ try {
+ if (!retrier) {
+ retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters);
+ }
+ retrier->Update([task=task] {
+ return MakeHolder<TMemoryInput>(task.Data.get(), task.Size);
+ });
+ if (task.BufferComplete) {
+ retrier->Finish();
+ retrier.Reset();
+ }
+ } catch (const std::exception& ex) {
+ task.SendingComplete.SetException(std::current_exception());
+ auto g = Guard(Lock_);
+ Error_ = std::current_exception();
+ return;
+ }
+
+ if (task.BufferComplete) {
+ retrier.Reset();
+ task.SendingComplete.SetValue();
+ currentParameters = &restRequestParameters;
+
+ auto g = Guard(Lock_);
+ auto erased = TaskMap_.erase(taskId);
+ Y_ABORT_UNLESS(erased == 1);
+ }
+ }
+
+ if (State_ == EState::Completed) {
+ auto g = Guard(Lock_);
+ Y_ABORT_UNLESS(TaskIdQueue_.empty());
+ Y_ABORT_UNLESS(TaskMap_.empty());
+ }
+ }
+
+private:
+ struct TWriteTask
+ {
+ NThreading::TPromise<void> SendingComplete;
+ std::shared_ptr<char[]> Data;
+ ssize_t Size = 0;
+ bool BufferComplete = false;
+ };
+
+ TMutex Lock_;
+ TCondVar HaveMoreData_;
+
+ TThread SenderThread_;
+
+ THashMap<int, TWriteTask> TaskMap_;
+ std::queue<int> TaskIdQueue_;
+
+ std::exception_ptr Error_;
+ enum class EState {
+ Running,
+ Completed,
+ Aborted,
+ };
+ std::atomic<EState> State_ = EState::Running;
+ int NextTaskId_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TRetryfulWriterV2::TSendTask
+{
+ TSentBuffer Buffer;
+ NThreading::TFuture<void> SentFuture = NThreading::MakeFuture();
+ int TaskId = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRetryfulWriterV2::TRetryfulWriterV2(
+ IClientRetryPolicyPtr clientRetryPolicy,
+ ITransactionPingerPtr transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ const TString& command,
+ const TMaybe<TFormat>& format,
+ const TRichYPath& path,
+ const TNode& serializedWriterOptions,
+ ssize_t bufferSize,
+ bool createTransaction)
+ : BufferSize_(bufferSize)
+ , Current_(MakeHolder<TSendTask>())
+ , Previous_(MakeHolder<TSendTask>())
+{
+ THttpHeader httpHeader("PUT", command);
+ httpHeader.SetInputFormat(format);
+ httpHeader.MergeParameters(serializedWriterOptions);
+
+ if (createTransaction) {
+ WriteTransaction_ = MakeHolder<TPingableTransaction>(
+ clientRetryPolicy,
+ context,
+ parentId,
+ transactionPinger->GetChildTxPinger(),
+ TStartTransactionOptions()
+ );
+ auto append = path.Append_.GetOrElse(false);
+ auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
+ NDetail::NRawClient::Lock(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ context,
+ WriteTransaction_->GetId(),
+ path.Path_,
+ lockMode
+ );
+ }
+
+ THeavyRequestRetrier::TParameters parameters = {
+ .ClientRetryPolicy = clientRetryPolicy,
+ .TransactionPinger = transactionPinger,
+ .Context = context,
+ .TransactionId = WriteTransaction_ ? WriteTransaction_->GetId() : parentId,
+ .Header = std::move(httpHeader),
+ };
+
+ Sender_ = MakeHolder<TSender>(path, parameters);
+
+ DoStartBatch();
+}
+
+void TRetryfulWriterV2::Abort()
+{
+ if (Sender_) {
+ Sender_->Abort();
+ }
+}
+
+void TRetryfulWriterV2::DoFinish()
+{
+ if (Sender_) {
+ Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
+ Sender_->Finish();
+ Sender_.Reset();
+ }
+ if (WriteTransaction_) {
+ WriteTransaction_->Commit();
+ }
+}
+
+void TRetryfulWriterV2::DoStartBatch()
+{
+ Previous_->SentFuture.Wait();
+
+ std::swap(Previous_, Current_);
+ auto&& [future, taskId] = Sender_->StartBlock();
+ Current_->SentFuture = future;
+ Current_->TaskId = taskId;
+ Current_->Buffer.Clear();
+ NextSizeToSend_ = SendStep_;
+}
+
+void TRetryfulWriterV2::DoWrite(const void* buf, size_t len)
+{
+ Current_->Buffer.Append(buf, len);
+ auto currentSize = Current_->Buffer.Size();
+ if (currentSize >= NextSizeToSend_) {
+ Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, false);
+ NextSizeToSend_ = currentSize + SendStep_;
+ }
+}
+
+void TRetryfulWriterV2::NotifyRowEnd()
+{
+ if (Current_->Buffer.Size() >= BufferSize_) {
+ Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
+ DoStartBatch();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NPrivate
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h
new file mode 100644
index 0000000000..c344fd0c6c
--- /dev/null
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <yt/cpp/mapreduce/client/transaction.h>
+#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NYT::NPrivate {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRetryfulWriterV2
+ : public TRawTableWriter
+{
+public:
+ TRetryfulWriterV2(
+ IClientRetryPolicyPtr clientRetryPolicy,
+ ITransactionPingerPtr transactionPinger,
+ const TClientContext& context,
+ const TTransactionId& parentId,
+ const TString& command,
+ const TMaybe<TFormat>& format,
+ const TRichYPath& path,
+ const TNode& serializedWriterOptions,
+ ssize_t bufferSize,
+ bool createTranasaction);
+
+ void NotifyRowEnd() override;
+ void Abort() override;
+
+protected:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFinish() override;
+
+ void DoStartBatch();
+
+private:
+ class TSentBuffer;
+ class TSender;
+ struct TSendTask;
+
+ const ssize_t BufferSize_;
+ const ssize_t SendStep_ = 64_KB;
+ ssize_t NextSizeToSend_;
+ THolder<TSender> Sender_;
+ THolder<TPingableTransaction> WriteTransaction_;
+
+ THolder<TSendTask> Current_;
+ THolder<TSendTask> Previous_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NPrivate
diff --git a/yt/cpp/mapreduce/client/ya.make b/yt/cpp/mapreduce/client/ya.make
index 58a14cc060..c159492808 100644
--- a/yt/cpp/mapreduce/client/ya.make
+++ b/yt/cpp/mapreduce/client/ya.make
@@ -22,6 +22,7 @@ SRCS(
retry_heavy_write_request.cpp
retry_transaction.cpp
retryful_writer.cpp
+ retryful_writer_v2.cpp
retryless_writer.cpp
skiff.cpp
structured_table_formats.cpp
diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h
index e0868b60b4..b6d34f8895 100644
--- a/yt/cpp/mapreduce/interface/config.h
+++ b/yt/cpp/mapreduce/interface/config.h
@@ -54,6 +54,21 @@ DEFINE_ENUM(EUploadDeduplicationMode,
////////////////////////////////////////////////////////////////////////////////
+/// Enum describing possible versions of table writer implemetation.
+enum class ETableWriterVersion
+{
+ /// Allow library to choose version of writer.
+ Auto,
+
+ /// Stable but slower version of writer.
+ V1,
+
+ /// Unstable but faster version of writer (going to be default in the future).
+ V2,
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TConfig
: public TThrRefBase
{
@@ -177,6 +192,9 @@ struct TConfig
// (cf. https://ytsaurus.tech/docs/en/user-guide/proxy/http-reference#framing).
THashSet<TString> CommandsWithFraming;
+ /// Which implemetation of table writer to use.
+ ETableWriterVersion TableWriterVersion = ETableWriterVersion::Auto;
+
static bool GetBool(const char* var, bool defaultValue = false);
static int GetInt(const char* var, int defaultValue);
static TDuration GetDuration(const char* var, TDuration defaultValue);
diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h
index 5dbbf20906..0733ff417c 100644
--- a/yt/cpp/mapreduce/io/helpers.h
+++ b/yt/cpp/mapreduce/io/helpers.h
@@ -33,6 +33,16 @@ struct TIOOptionsTraits<TTableWriterOptions>
};
template <class TOptions>
+TNode FormIORequestParameters(const TOptions& options)
+{
+ TNode params = TNode::CreateMap();
+ if (options.Config_) {
+ params[TIOOptionsTraits<TOptions>::ConfigName] = *options.Config_;
+ }
+ return params;
+}
+
+template <class TOptions>
TNode FormIORequestParameters(
const TRichYPath& path,
const TOptions& options)