diff options
author | ermolovd <ermolovd@yandex-team.com> | 2024-04-17 09:35:35 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2024-04-17 09:45:11 +0300 |
commit | 78f427d10a1aa4f27b0b4eff5a510a214a5a93a2 (patch) | |
tree | f548f02858e9ef2e0e879e7b3c9b56aa192cb45a | |
parent | 535850f0ad768874a11d81e6494a29b6885565da (diff) | |
download | ydb-78f427d10a1aa4f27b0b4eff5a510a214a5a93a2.tar.gz |
Fix more problems with RetryfulWriterV2
404e999bcffb20d5497161a98f48f566b5245704
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 11 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer_v2.cpp | 38 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/io-inl.h | 7 |
5 files changed, 43 insertions, 20 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index caa0ab270c..9e2885bbe0 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -52,8 +52,6 @@ #include <util/string/type.h> #include <util/system/env.h> -#include <exception> - using namespace NYT::NDetail::NRawClient; namespace NYT { diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 2122d0d8db..ee14ee0eb2 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -21,6 +21,7 @@ TClientWriter::TClientWriter( const TMaybe<TFormat>& format, const TTableWriterOptions& options) : BufferSize_(options.BufferSize_) + , AutoFinish_(options.AutoFinish_) { if (options.SingleHttpRequest_) { RawWriter_.Reset(new TRetrylessWriter( @@ -61,6 +62,16 @@ TClientWriter::TClientWriter( } } +TClientWriter::~TClientWriter() +{ + NDetail::FinishOrDie(this, AutoFinish_, "TClientWriter"); +} + +void TClientWriter::Finish() +{ + RawWriter_->Finish(); +} + size_t TClientWriter::GetStreamCount() const { return 1; diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h index 56605a3f31..19a4977817 100644 --- a/yt/cpp/mapreduce/client/client_writer.h +++ b/yt/cpp/mapreduce/client/client_writer.h @@ -24,14 +24,19 @@ public: const TMaybe<TFormat>& format, const TTableWriterOptions& options); + ~TClientWriter(); + size_t GetStreamCount() const override; IOutputStream* GetStream(size_t tableIndex) const override; void OnRowFinished(size_t tableIndex) override; void Abort() override; size_t GetBufferMemoryUsage() const override; + void Finish(); + private: const size_t BufferSize_ = 64 << 20; + const bool AutoFinish_; ::TIntrusivePtr<TRawTableWriter> RawWriter_; }; diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp index 40297500ae..49f02e0405 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp @@ -1,5 +1,6 @@ #include "retryful_writer_v2.h" +#include <util/generic/scope.h> #include <yt/cpp/mapreduce/client/retry_heavy_write_request.h> #include <yt/cpp/mapreduce/client/transaction.h> #include <yt/cpp/mapreduce/client/transaction_pinger.h> @@ -32,6 +33,9 @@ public: void Clear() { + // This method can be called only if no other object is holding snapshot. + Y_ABORT_IF(Buffer_.use_count() != 1); + Size_ = 0; } @@ -86,6 +90,12 @@ public: SenderThread_.Join(); } + bool IsRunning() const + { + auto g = Guard(Lock_); + return State_.load() == EState::Running && !Error_; + } + void Abort() { auto g = Guard(Lock_); @@ -114,7 +124,7 @@ public: auto taskId = NextTaskId_++; const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{}); - Y_ABORT_UNLESS(inserted); + Y_ABORT_IF(!inserted); TaskIdQueue_.push(taskId); HaveMoreData_.Signal(); it->second.SendingComplete = NThreading::NewPromise(); @@ -131,7 +141,7 @@ public: CheckNoError(); auto it = TaskMap_.find(taskId); - Y_ABORT_UNLESS(it != TaskMap_.end()); + Y_ABORT_IF(it == TaskMap_.end()); auto& writeTask = it->second; writeTask.Data = std::move(snapshot.first); writeTask.Size = snapshot.second; @@ -333,8 +343,13 @@ TRetryfulWriterV2::TRetryfulWriterV2( void TRetryfulWriterV2::Abort() { - if (Sender_) { - Sender_->Abort(); + auto sender = std::move(Sender_); + auto writeTransaction = std::move(WriteTransaction_); + if (sender) { + sender->Abort(); + if (writeTransaction) { + writeTransaction->Abort(); + } } } @@ -345,13 +360,14 @@ size_t TRetryfulWriterV2::GetBufferMemoryUsage() const void TRetryfulWriterV2::DoFinish() { - if (Sender_) { - Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true); - Sender_->Finish(); - Sender_.Reset(); - } - if (WriteTransaction_) { - WriteTransaction_->Commit(); + auto sender = std::move(Sender_); + auto writeTransaction = std::move(WriteTransaction_); + if (sender && sender->IsRunning()) { + sender->UpdateBlock(Current_->TaskId, Current_->Buffer, true); + sender->Finish(); + if (writeTransaction) { + writeTransaction->Commit(); + } } } diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index 7324eaca20..ffaf71f0a2 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -781,13 +781,6 @@ public: , Locks_(MakeAtomicShared<TVector<TAdaptiveLock>>(writer->GetTableCount())) { } - ~TTableWriterBase() override - { - if (Locks_.RefCount() == 1) { - NDetail::FinishOrDie(this, /*autoFinish*/ true, "TTableWriterBase"); - } - } - void Abort() { Writer_->Abort(); |