diff options
author | nadya73 <nadya73@yandex-team.com> | 2023-09-14 21:38:24 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2023-09-14 21:58:35 +0300 |
commit | 66ab4dcacadc42a49d252c99d3f7666f13a5abe1 (patch) | |
tree | 94f38106381f9b5d36da767491c344972a28c547 /yt/cpp | |
parent | 59dbf512fa4bb44d4873c2dd926eb95a57895472 (diff) | |
download | ydb-66ab4dcacadc42a49d252c99d3f7666f13a5abe1.tar.gz |
[yt/cpp/mapreduce] YT-19268: Lock memory for parallel writer
Diffstat (limited to 'yt/cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_writer.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryless_writer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/retryless_writer.h | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/io-inl.h | 15 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/io.h | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/job_writer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/job_writer.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/node_table_writer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/node_table_writer.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_writer.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_writer.h | 1 |
16 files changed, 85 insertions, 5 deletions
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 357abd32eb..7141015492 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -19,7 +19,7 @@ TClientWriter::TClientWriter( const TTransactionId& transactionId, const TMaybe<TFormat>& format, const TTableWriterOptions& options) - : BUFFER_SIZE(options.BufferSize_) + : BufferSize_(options.BufferSize_) { if (options.SingleHttpRequest_) { RawWriter_.Reset(new TRetrylessWriter( @@ -28,7 +28,7 @@ TClientWriter::TClientWriter( GetWriteTableCommand(context.Config->ApiVersion), format, path, - BUFFER_SIZE, + BufferSize_, options)); } else { RawWriter_.Reset(new TRetryfulWriter( @@ -64,6 +64,11 @@ void TClientWriter::Abort() RawWriter_->Abort(); } +size_t TClientWriter::GetBufferMemoryUsage() const +{ + return RawWriter_->GetBufferMemoryUsage(); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h index 010a88a8ff..8ee912d065 100644 --- a/yt/cpp/mapreduce/client/client_writer.h +++ b/yt/cpp/mapreduce/client/client_writer.h @@ -29,11 +29,12 @@ public: IOutputStream* GetStream(size_t tableIndex) const override; void OnRowFinished(size_t tableIndex) override; void Abort() override; + size_t GetBufferMemoryUsage() const override; private: - ::TIntrusivePtr<TRawTableWriter> RawWriter_; + const size_t BufferSize_ = 64 << 20; - const size_t BUFFER_SIZE = 64 << 20; + ::TIntrusivePtr<TRawTableWriter> RawWriter_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 12b2939ffa..057fb5e7f0 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -145,6 +145,11 @@ void TRetryfulWriter::Abort() WriterState_ = Completed; } +size_t TRetryfulWriter::GetBufferMemoryUsage() const +{ + return BufferSize_ * 4; +} + size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions) { auto retryBlockSize = TMaybe<size_t>(); diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index 38e351977d..0725a36aac 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -75,6 +75,8 @@ public: void NotifyRowEnd() override; void Abort() override; + size_t GetBufferMemoryUsage() const override; + size_t GetRetryBlockRemainingSize() const { return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0; diff --git a/yt/cpp/mapreduce/client/retryless_writer.cpp b/yt/cpp/mapreduce/client/retryless_writer.cpp index 4c25c1a1dd..e3cf7cba06 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.cpp +++ b/yt/cpp/mapreduce/client/retryless_writer.cpp @@ -40,6 +40,11 @@ void TRetrylessWriter::Abort() Running_ = false; } +size_t TRetrylessWriter::GetBufferMemoryUsage() const +{ + return BufferSize_; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h index baf49a258f..d1511fc476 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.h +++ b/yt/cpp/mapreduce/client/retryless_writer.h @@ -35,6 +35,7 @@ public: const TRichYPath& path, size_t bufferSize, const TWriterOptions& options) + : BufferSize_(bufferSize) { THttpHeader header("PUT", command); header.SetInputFormat(format); @@ -51,18 +52,22 @@ public: auto hostName = GetProxyForHeavyRequest(context); Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header); - BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), bufferSize)); + BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_)); } ~TRetrylessWriter() override; void NotifyRowEnd() override; void Abort() override; + size_t GetBufferMemoryUsage() const override; + protected: void DoWrite(const void* buf, size_t len) override; void DoFinish() override; private: + const size_t BufferSize_ = 0; + bool Running_ = true; NHttpClient::IHttpRequestPtr Request_; THolder<TBufferedOutput> BufferedOutput_; diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index c35ebb7481..542749c679 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -740,6 +740,11 @@ struct IWriterImplBase } } + virtual size_t GetBufferMemoryUsage() const + { + return 0; + } + virtual size_t GetTableCount() const = 0; virtual void FinishTable(size_t tableIndex) = 0; virtual void Abort() @@ -816,6 +821,11 @@ public: } } + size_t GetBufferMemoryUsage() const + { + return DoGetBufferMemoryUsage(); + } + protected: template <class U> void DoAddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0) @@ -869,6 +879,11 @@ protected: Writer_->AddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight); } + size_t DoGetBufferMemoryUsage() const + { + return Writer_->GetBufferMemoryUsage(); + } + ::TIntrusivePtr<IWriterImpl> GetWriterImpl() { return Writer_; diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h index e2b20a1802..dbd3a2cb70 100644 --- a/yt/cpp/mapreduce/interface/io.h +++ b/yt/cpp/mapreduce/interface/io.h @@ -168,6 +168,11 @@ public: /// By default it does nothing, but implementations are welcome to override this method. virtual void Abort() { } + + virtual size_t GetBufferMemoryUsage() const + { + return 0; + } }; /// @brief Interface to deal with multiple raw output streams. @@ -191,6 +196,11 @@ public: /// By default it does nothing, but implementations are welcome to override this method. virtual void Abort() { } + + virtual size_t GetBufferMemoryUsage() const + { + return 0; + } }; //////////////////////////////////////////////////////////////////////////////// @@ -378,6 +388,8 @@ public: /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction). void Finish(); + + size_t GetBufferMemoryUsage() const; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp index d08bb0a665..cbd3f16c58 100644 --- a/yt/cpp/mapreduce/io/job_writer.cpp +++ b/yt/cpp/mapreduce/io/job_writer.cpp @@ -56,6 +56,11 @@ IOutputStream* TJobWriter::GetStream(size_t tableIndex) const void TJobWriter::OnRowFinished(size_t) { } +size_t TJobWriter::GetBufferMemoryUsage() const +{ + return TStream::BUFFER_SIZE * GetStreamCount(); +} + //////////////////////////////////////////////////////////////////////////////// THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount) diff --git a/yt/cpp/mapreduce/io/job_writer.h b/yt/cpp/mapreduce/io/job_writer.h index 9b24650640..9ea8402345 100644 --- a/yt/cpp/mapreduce/io/job_writer.h +++ b/yt/cpp/mapreduce/io/job_writer.h @@ -18,6 +18,7 @@ public: explicit TJobWriter(size_t outputTableCount); explicit TJobWriter(const TVector<TFile>& fileList); + size_t GetBufferMemoryUsage() const override; size_t GetStreamCount() const override; IOutputStream* GetStream(size_t tableIndex) const override; void OnRowFinished(size_t tableIndex) override; diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp index dcb5a0f5b5..916dec7ae4 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.cpp +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -24,6 +24,11 @@ TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFor TNodeTableWriter::~TNodeTableWriter() { } +size_t TNodeTableWriter::GetBufferMemoryUsage() const +{ + return Output_->GetBufferMemoryUsage(); +} + size_t TNodeTableWriter::GetTableCount() const { return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h index 4bf8cb2fe7..155bec076d 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.h +++ b/yt/cpp/mapreduce/io/node_table_writer.h @@ -19,6 +19,7 @@ public: void AddRow(const TNode& row, size_t tableIndex) override; void AddRow(TNode&& row, size_t tableIndex) override; + size_t GetBufferMemoryUsage() const override; size_t GetTableCount() const override; void FinishTable(size_t) override; void Abort() override; diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp index 1ce7811625..160ea78f50 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.cpp +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -105,6 +105,11 @@ TProtoTableWriter::TProtoTableWriter( TProtoTableWriter::~TProtoTableWriter() { } +size_t TProtoTableWriter::GetBufferMemoryUsage() const +{ + return NodeWriter_->GetBufferMemoryUsage(); +} + size_t TProtoTableWriter::GetTableCount() const { return NodeWriter_->GetTableCount(); @@ -143,6 +148,11 @@ TLenvalProtoTableWriter::TLenvalProtoTableWriter( TLenvalProtoTableWriter::~TLenvalProtoTableWriter() { } +size_t TLenvalProtoTableWriter::GetBufferMemoryUsage() const +{ + return Output_->GetBufferMemoryUsage(); +} + size_t TLenvalProtoTableWriter::GetTableCount() const { return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h index a6df69e6ae..d00e77091c 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.h +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -21,6 +21,7 @@ public: void AddRow(const Message& row, size_t tableIndex) override; void AddRow(Message&& row, size_t tableIndex) override; + size_t GetBufferMemoryUsage() const override; size_t GetTableCount() const override; void FinishTable(size_t) override; void Abort() override; @@ -44,6 +45,7 @@ public: void AddRow(const Message& row, size_t tableIndex) override; void AddRow(Message&& row, size_t tableIndex) override; + size_t GetBufferMemoryUsage() const override; size_t GetTableCount() const override; void FinishTable(size_t) override; void Abort() override; diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp index cce7ceb0f0..fe31eb5543 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp @@ -13,6 +13,11 @@ TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output) TYaMRTableWriter::~TYaMRTableWriter() { } +size_t TYaMRTableWriter::GetBufferMemoryUsage() const +{ + return Output_->GetBufferMemoryUsage(); +} + size_t TYaMRTableWriter::GetTableCount() const { return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h index cf88eaf287..7f72c8005a 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.h +++ b/yt/cpp/mapreduce/io/yamr_table_writer.h @@ -18,6 +18,7 @@ public: void AddRow(const TYaMRRow& row, size_t tableIndex) override; void AddRow(TYaMRRow&& row, size_t tableIndex) override; + size_t GetBufferMemoryUsage() const override; size_t GetTableCount() const override; void FinishTable(size_t) override; void Abort() override; |