aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2023-09-14 21:38:24 +0300
committernadya73 <nadya73@yandex-team.com>2023-09-14 21:58:35 +0300
commit66ab4dcacadc42a49d252c99d3f7666f13a5abe1 (patch)
tree94f38106381f9b5d36da767491c344972a28c547 /yt/cpp
parent59dbf512fa4bb44d4873c2dd926eb95a57895472 (diff)
downloadydb-66ab4dcacadc42a49d252c99d3f7666f13a5abe1.tar.gz
[yt/cpp/mapreduce] YT-19268: Lock memory for parallel writer
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client_writer.cpp9
-rw-r--r--yt/cpp/mapreduce/client/client_writer.h5
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp5
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.h2
-rw-r--r--yt/cpp/mapreduce/client/retryless_writer.cpp5
-rw-r--r--yt/cpp/mapreduce/client/retryless_writer.h7
-rw-r--r--yt/cpp/mapreduce/interface/io-inl.h15
-rw-r--r--yt/cpp/mapreduce/interface/io.h12
-rw-r--r--yt/cpp/mapreduce/io/job_writer.cpp5
-rw-r--r--yt/cpp/mapreduce/io/job_writer.h1
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.cpp5
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.h1
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.cpp10
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.h2
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.cpp5
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.h1
16 files changed, 85 insertions, 5 deletions
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp
index 357abd32eb..7141015492 100644
--- a/yt/cpp/mapreduce/client/client_writer.cpp
+++ b/yt/cpp/mapreduce/client/client_writer.cpp
@@ -19,7 +19,7 @@ TClientWriter::TClientWriter(
const TTransactionId& transactionId,
const TMaybe<TFormat>& format,
const TTableWriterOptions& options)
- : BUFFER_SIZE(options.BufferSize_)
+ : BufferSize_(options.BufferSize_)
{
if (options.SingleHttpRequest_) {
RawWriter_.Reset(new TRetrylessWriter(
@@ -28,7 +28,7 @@ TClientWriter::TClientWriter(
GetWriteTableCommand(context.Config->ApiVersion),
format,
path,
- BUFFER_SIZE,
+ BufferSize_,
options));
} else {
RawWriter_.Reset(new TRetryfulWriter(
@@ -64,6 +64,11 @@ void TClientWriter::Abort()
RawWriter_->Abort();
}
+size_t TClientWriter::GetBufferMemoryUsage() const
+{
+ return RawWriter_->GetBufferMemoryUsage();
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h
index 010a88a8ff..8ee912d065 100644
--- a/yt/cpp/mapreduce/client/client_writer.h
+++ b/yt/cpp/mapreduce/client/client_writer.h
@@ -29,11 +29,12 @@ public:
IOutputStream* GetStream(size_t tableIndex) const override;
void OnRowFinished(size_t tableIndex) override;
void Abort() override;
+ size_t GetBufferMemoryUsage() const override;
private:
- ::TIntrusivePtr<TRawTableWriter> RawWriter_;
+ const size_t BufferSize_ = 64 << 20;
- const size_t BUFFER_SIZE = 64 << 20;
+ ::TIntrusivePtr<TRawTableWriter> RawWriter_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index 12b2939ffa..057fb5e7f0 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -145,6 +145,11 @@ void TRetryfulWriter::Abort()
WriterState_ = Completed;
}
+size_t TRetryfulWriter::GetBufferMemoryUsage() const
+{
+ return BufferSize_ * 4;
+}
+
size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions)
{
auto retryBlockSize = TMaybe<size_t>();
diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h
index 38e351977d..0725a36aac 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.h
+++ b/yt/cpp/mapreduce/client/retryful_writer.h
@@ -75,6 +75,8 @@ public:
void NotifyRowEnd() override;
void Abort() override;
+ size_t GetBufferMemoryUsage() const override;
+
size_t GetRetryBlockRemainingSize() const
{
return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0;
diff --git a/yt/cpp/mapreduce/client/retryless_writer.cpp b/yt/cpp/mapreduce/client/retryless_writer.cpp
index 4c25c1a1dd..e3cf7cba06 100644
--- a/yt/cpp/mapreduce/client/retryless_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryless_writer.cpp
@@ -40,6 +40,11 @@ void TRetrylessWriter::Abort()
Running_ = false;
}
+size_t TRetrylessWriter::GetBufferMemoryUsage() const
+{
+ return BufferSize_;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h
index baf49a258f..d1511fc476 100644
--- a/yt/cpp/mapreduce/client/retryless_writer.h
+++ b/yt/cpp/mapreduce/client/retryless_writer.h
@@ -35,6 +35,7 @@ public:
const TRichYPath& path,
size_t bufferSize,
const TWriterOptions& options)
+ : BufferSize_(bufferSize)
{
THttpHeader header("PUT", command);
header.SetInputFormat(format);
@@ -51,18 +52,22 @@ public:
auto hostName = GetProxyForHeavyRequest(context);
Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
- BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), bufferSize));
+ BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
}
~TRetrylessWriter() override;
void NotifyRowEnd() override;
void Abort() override;
+ size_t GetBufferMemoryUsage() const override;
+
protected:
void DoWrite(const void* buf, size_t len) override;
void DoFinish() override;
private:
+ const size_t BufferSize_ = 0;
+
bool Running_ = true;
NHttpClient::IHttpRequestPtr Request_;
THolder<TBufferedOutput> BufferedOutput_;
diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h
index c35ebb7481..542749c679 100644
--- a/yt/cpp/mapreduce/interface/io-inl.h
+++ b/yt/cpp/mapreduce/interface/io-inl.h
@@ -740,6 +740,11 @@ struct IWriterImplBase
}
}
+ virtual size_t GetBufferMemoryUsage() const
+ {
+ return 0;
+ }
+
virtual size_t GetTableCount() const = 0;
virtual void FinishTable(size_t tableIndex) = 0;
virtual void Abort()
@@ -816,6 +821,11 @@ public:
}
}
+ size_t GetBufferMemoryUsage() const
+ {
+ return DoGetBufferMemoryUsage();
+ }
+
protected:
template <class U>
void DoAddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0)
@@ -869,6 +879,11 @@ protected:
Writer_->AddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight);
}
+ size_t DoGetBufferMemoryUsage() const
+ {
+ return Writer_->GetBufferMemoryUsage();
+ }
+
::TIntrusivePtr<IWriterImpl> GetWriterImpl()
{
return Writer_;
diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h
index e2b20a1802..dbd3a2cb70 100644
--- a/yt/cpp/mapreduce/interface/io.h
+++ b/yt/cpp/mapreduce/interface/io.h
@@ -168,6 +168,11 @@ public:
/// By default it does nothing, but implementations are welcome to override this method.
virtual void Abort()
{ }
+
+ virtual size_t GetBufferMemoryUsage() const
+ {
+ return 0;
+ }
};
/// @brief Interface to deal with multiple raw output streams.
@@ -191,6 +196,11 @@ public:
/// By default it does nothing, but implementations are welcome to override this method.
virtual void Abort()
{ }
+
+ virtual size_t GetBufferMemoryUsage() const
+ {
+ return 0;
+ }
};
////////////////////////////////////////////////////////////////////////////////
@@ -378,6 +388,8 @@ public:
/// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction).
void Finish();
+
+ size_t GetBufferMemoryUsage() const;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp
index d08bb0a665..cbd3f16c58 100644
--- a/yt/cpp/mapreduce/io/job_writer.cpp
+++ b/yt/cpp/mapreduce/io/job_writer.cpp
@@ -56,6 +56,11 @@ IOutputStream* TJobWriter::GetStream(size_t tableIndex) const
void TJobWriter::OnRowFinished(size_t)
{ }
+size_t TJobWriter::GetBufferMemoryUsage() const
+{
+ return TStream::BUFFER_SIZE * GetStreamCount();
+}
+
////////////////////////////////////////////////////////////////////////////////
THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
diff --git a/yt/cpp/mapreduce/io/job_writer.h b/yt/cpp/mapreduce/io/job_writer.h
index 9b24650640..9ea8402345 100644
--- a/yt/cpp/mapreduce/io/job_writer.h
+++ b/yt/cpp/mapreduce/io/job_writer.h
@@ -18,6 +18,7 @@ public:
explicit TJobWriter(size_t outputTableCount);
explicit TJobWriter(const TVector<TFile>& fileList);
+ size_t GetBufferMemoryUsage() const override;
size_t GetStreamCount() const override;
IOutputStream* GetStream(size_t tableIndex) const override;
void OnRowFinished(size_t tableIndex) override;
diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp
index dcb5a0f5b5..916dec7ae4 100644
--- a/yt/cpp/mapreduce/io/node_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/node_table_writer.cpp
@@ -24,6 +24,11 @@ TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFor
TNodeTableWriter::~TNodeTableWriter()
{ }
+size_t TNodeTableWriter::GetBufferMemoryUsage() const
+{
+ return Output_->GetBufferMemoryUsage();
+}
+
size_t TNodeTableWriter::GetTableCount() const
{
return Output_->GetStreamCount();
diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h
index 4bf8cb2fe7..155bec076d 100644
--- a/yt/cpp/mapreduce/io/node_table_writer.h
+++ b/yt/cpp/mapreduce/io/node_table_writer.h
@@ -19,6 +19,7 @@ public:
void AddRow(const TNode& row, size_t tableIndex) override;
void AddRow(TNode&& row, size_t tableIndex) override;
+ size_t GetBufferMemoryUsage() const override;
size_t GetTableCount() const override;
void FinishTable(size_t) override;
void Abort() override;
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp
index 1ce7811625..160ea78f50 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp
@@ -105,6 +105,11 @@ TProtoTableWriter::TProtoTableWriter(
TProtoTableWriter::~TProtoTableWriter()
{ }
+size_t TProtoTableWriter::GetBufferMemoryUsage() const
+{
+ return NodeWriter_->GetBufferMemoryUsage();
+}
+
size_t TProtoTableWriter::GetTableCount() const
{
return NodeWriter_->GetTableCount();
@@ -143,6 +148,11 @@ TLenvalProtoTableWriter::TLenvalProtoTableWriter(
TLenvalProtoTableWriter::~TLenvalProtoTableWriter()
{ }
+size_t TLenvalProtoTableWriter::GetBufferMemoryUsage() const
+{
+ return Output_->GetBufferMemoryUsage();
+}
+
size_t TLenvalProtoTableWriter::GetTableCount() const
{
return Output_->GetStreamCount();
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h
index a6df69e6ae..d00e77091c 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.h
+++ b/yt/cpp/mapreduce/io/proto_table_writer.h
@@ -21,6 +21,7 @@ public:
void AddRow(const Message& row, size_t tableIndex) override;
void AddRow(Message&& row, size_t tableIndex) override;
+ size_t GetBufferMemoryUsage() const override;
size_t GetTableCount() const override;
void FinishTable(size_t) override;
void Abort() override;
@@ -44,6 +45,7 @@ public:
void AddRow(const Message& row, size_t tableIndex) override;
void AddRow(Message&& row, size_t tableIndex) override;
+ size_t GetBufferMemoryUsage() const override;
size_t GetTableCount() const override;
void FinishTable(size_t) override;
void Abort() override;
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
index cce7ceb0f0..fe31eb5543 100644
--- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
@@ -13,6 +13,11 @@ TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output)
TYaMRTableWriter::~TYaMRTableWriter()
{ }
+size_t TYaMRTableWriter::GetBufferMemoryUsage() const
+{
+ return Output_->GetBufferMemoryUsage();
+}
+
size_t TYaMRTableWriter::GetTableCount() const
{
return Output_->GetStreamCount();
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h
index cf88eaf287..7f72c8005a 100644
--- a/yt/cpp/mapreduce/io/yamr_table_writer.h
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.h
@@ -18,6 +18,7 @@ public:
void AddRow(const TYaMRRow& row, size_t tableIndex) override;
void AddRow(TYaMRRow&& row, size_t tableIndex) override;
+ size_t GetBufferMemoryUsage() const override;
size_t GetTableCount() const override;
void FinishTable(size_t) override;
void Abort() override;