aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorermolovd <ermolovd@yandex-team.com>2024-04-17 09:35:35 +0300
committerermolovd <ermolovd@yandex-team.com>2024-04-17 09:45:11 +0300
commit78f427d10a1aa4f27b0b4eff5a510a214a5a93a2 (patch)
treef548f02858e9ef2e0e879e7b3c9b56aa192cb45a
parent535850f0ad768874a11d81e6494a29b6885565da (diff)
downloadydb-78f427d10a1aa4f27b0b4eff5a510a214a5a93a2.tar.gz
Fix more problems with RetryfulWriterV2
404e999bcffb20d5497161a98f48f566b5245704
-rw-r--r--yt/cpp/mapreduce/client/client.cpp2
-rw-r--r--yt/cpp/mapreduce/client/client_writer.cpp11
-rw-r--r--yt/cpp/mapreduce/client/client_writer.h5
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.cpp38
-rw-r--r--yt/cpp/mapreduce/interface/io-inl.h7
5 files changed, 43 insertions, 20 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index caa0ab270c..9e2885bbe0 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -52,8 +52,6 @@
#include <util/string/type.h>
#include <util/system/env.h>
-#include <exception>
-
using namespace NYT::NDetail::NRawClient;
namespace NYT {
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp
index 2122d0d8db..ee14ee0eb2 100644
--- a/yt/cpp/mapreduce/client/client_writer.cpp
+++ b/yt/cpp/mapreduce/client/client_writer.cpp
@@ -21,6 +21,7 @@ TClientWriter::TClientWriter(
const TMaybe<TFormat>& format,
const TTableWriterOptions& options)
: BufferSize_(options.BufferSize_)
+ , AutoFinish_(options.AutoFinish_)
{
if (options.SingleHttpRequest_) {
RawWriter_.Reset(new TRetrylessWriter(
@@ -61,6 +62,16 @@ TClientWriter::TClientWriter(
}
}
+TClientWriter::~TClientWriter()
+{
+ NDetail::FinishOrDie(this, AutoFinish_, "TClientWriter");
+}
+
+void TClientWriter::Finish()
+{
+ RawWriter_->Finish();
+}
+
size_t TClientWriter::GetStreamCount() const
{
return 1;
diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h
index 56605a3f31..19a4977817 100644
--- a/yt/cpp/mapreduce/client/client_writer.h
+++ b/yt/cpp/mapreduce/client/client_writer.h
@@ -24,14 +24,19 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options);
+ ~TClientWriter();
+
size_t GetStreamCount() const override;
IOutputStream* GetStream(size_t tableIndex) const override;
void OnRowFinished(size_t tableIndex) override;
void Abort() override;
size_t GetBufferMemoryUsage() const override;
+ void Finish();
+
private:
const size_t BufferSize_ = 64 << 20;
+ const bool AutoFinish_;
::TIntrusivePtr<TRawTableWriter> RawWriter_;
};
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
index 40297500ae..49f02e0405 100644
--- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
@@ -1,5 +1,6 @@
#include "retryful_writer_v2.h"
+#include <util/generic/scope.h>
#include <yt/cpp/mapreduce/client/retry_heavy_write_request.h>
#include <yt/cpp/mapreduce/client/transaction.h>
#include <yt/cpp/mapreduce/client/transaction_pinger.h>
@@ -32,6 +33,9 @@ public:
void Clear()
{
+ // This method can be called only if no other object is holding snapshot.
+ Y_ABORT_IF(Buffer_.use_count() != 1);
+
Size_ = 0;
}
@@ -86,6 +90,12 @@ public:
SenderThread_.Join();
}
+ bool IsRunning() const
+ {
+ auto g = Guard(Lock_);
+ return State_.load() == EState::Running && !Error_;
+ }
+
void Abort()
{
auto g = Guard(Lock_);
@@ -114,7 +124,7 @@ public:
auto taskId = NextTaskId_++;
const auto& [it, inserted] = TaskMap_.emplace(taskId, TWriteTask{});
- Y_ABORT_UNLESS(inserted);
+ Y_ABORT_IF(!inserted);
TaskIdQueue_.push(taskId);
HaveMoreData_.Signal();
it->second.SendingComplete = NThreading::NewPromise();
@@ -131,7 +141,7 @@ public:
CheckNoError();
auto it = TaskMap_.find(taskId);
- Y_ABORT_UNLESS(it != TaskMap_.end());
+ Y_ABORT_IF(it == TaskMap_.end());
auto& writeTask = it->second;
writeTask.Data = std::move(snapshot.first);
writeTask.Size = snapshot.second;
@@ -333,8 +343,13 @@ TRetryfulWriterV2::TRetryfulWriterV2(
void TRetryfulWriterV2::Abort()
{
- if (Sender_) {
- Sender_->Abort();
+ auto sender = std::move(Sender_);
+ auto writeTransaction = std::move(WriteTransaction_);
+ if (sender) {
+ sender->Abort();
+ if (writeTransaction) {
+ writeTransaction->Abort();
+ }
}
}
@@ -345,13 +360,14 @@ size_t TRetryfulWriterV2::GetBufferMemoryUsage() const
void TRetryfulWriterV2::DoFinish()
{
- if (Sender_) {
- Sender_->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
- Sender_->Finish();
- Sender_.Reset();
- }
- if (WriteTransaction_) {
- WriteTransaction_->Commit();
+ auto sender = std::move(Sender_);
+ auto writeTransaction = std::move(WriteTransaction_);
+ if (sender && sender->IsRunning()) {
+ sender->UpdateBlock(Current_->TaskId, Current_->Buffer, true);
+ sender->Finish();
+ if (writeTransaction) {
+ writeTransaction->Commit();
+ }
}
}
diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h
index 7324eaca20..ffaf71f0a2 100644
--- a/yt/cpp/mapreduce/interface/io-inl.h
+++ b/yt/cpp/mapreduce/interface/io-inl.h
@@ -781,13 +781,6 @@ public:
, Locks_(MakeAtomicShared<TVector<TAdaptiveLock>>(writer->GetTableCount()))
{ }
- ~TTableWriterBase() override
- {
- if (Locks_.RefCount() == 1) {
- NDetail::FinishOrDie(this, /*autoFinish*/ true, "TTableWriterBase");
- }
- }
-
void Abort()
{
Writer_->Abort();