diff options
| author | nadya73 <[email protected]> | 2023-09-14 21:38:24 +0300 | 
|---|---|---|
| committer | nadya73 <[email protected]> | 2023-09-14 21:58:35 +0300 | 
| commit | 66ab4dcacadc42a49d252c99d3f7666f13a5abe1 (patch) | |
| tree | 94f38106381f9b5d36da767491c344972a28c547 /yt/cpp | |
| parent | 59dbf512fa4bb44d4873c2dd926eb95a57895472 (diff) | |
[yt/cpp/mapreduce] YT-19268: Lock memory for parallel writer
Diffstat (limited to 'yt/cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_writer.cpp | 9 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/client_writer.h | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryful_writer.h | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryless_writer.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/retryless_writer.h | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/io-inl.h | 15 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/io.h | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/job_writer.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/job_writer.h | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_writer.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_writer.h | 1 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.cpp | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.h | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_writer.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_writer.h | 1 | 
16 files changed, 85 insertions, 5 deletions
| diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp index 357abd32eb1..71410154926 100644 --- a/yt/cpp/mapreduce/client/client_writer.cpp +++ b/yt/cpp/mapreduce/client/client_writer.cpp @@ -19,7 +19,7 @@ TClientWriter::TClientWriter(      const TTransactionId& transactionId,      const TMaybe<TFormat>& format,      const TTableWriterOptions& options) -    : BUFFER_SIZE(options.BufferSize_) +    : BufferSize_(options.BufferSize_)  {      if (options.SingleHttpRequest_) {          RawWriter_.Reset(new TRetrylessWriter( @@ -28,7 +28,7 @@ TClientWriter::TClientWriter(              GetWriteTableCommand(context.Config->ApiVersion),              format,              path, -            BUFFER_SIZE, +            BufferSize_,              options));      } else {          RawWriter_.Reset(new TRetryfulWriter( @@ -64,6 +64,11 @@ void TClientWriter::Abort()      RawWriter_->Abort();  } +size_t TClientWriter::GetBufferMemoryUsage() const +{ +    return RawWriter_->GetBufferMemoryUsage(); +} +  ////////////////////////////////////////////////////////////////////////////////  } // namespace NYT diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h index 010a88a8ffb..8ee912d0656 100644 --- a/yt/cpp/mapreduce/client/client_writer.h +++ b/yt/cpp/mapreduce/client/client_writer.h @@ -29,11 +29,12 @@ public:      IOutputStream* GetStream(size_t tableIndex) const override;      void OnRowFinished(size_t tableIndex) override;      void Abort() override; +    size_t GetBufferMemoryUsage() const override;  private: -    ::TIntrusivePtr<TRawTableWriter> RawWriter_; +    const size_t BufferSize_ = 64 << 20; -    const size_t BUFFER_SIZE = 64 << 20; +    ::TIntrusivePtr<TRawTableWriter> RawWriter_;  };  //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 12b2939ffad..057fb5e7f0e 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -145,6 +145,11 @@ void TRetryfulWriter::Abort()      WriterState_ = Completed;  } +size_t TRetryfulWriter::GetBufferMemoryUsage() const +{ +    return BufferSize_ * 4; +} +  size_t TRetryfulWriter::GetBufferSize(const TMaybe<TWriterOptions>& writerOptions)  {      auto retryBlockSize = TMaybe<size_t>(); diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h index 38e351977d6..0725a36aacb 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.h +++ b/yt/cpp/mapreduce/client/retryful_writer.h @@ -75,6 +75,8 @@ public:      void NotifyRowEnd() override;      void Abort() override; +    size_t GetBufferMemoryUsage() const override; +      size_t GetRetryBlockRemainingSize() const      {        return (BufferSize_ > Buffer_.size()) ? (BufferSize_ - Buffer_.size()) : 0; diff --git a/yt/cpp/mapreduce/client/retryless_writer.cpp b/yt/cpp/mapreduce/client/retryless_writer.cpp index 4c25c1a1dde..e3cf7cba06f 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.cpp +++ b/yt/cpp/mapreduce/client/retryless_writer.cpp @@ -40,6 +40,11 @@ void TRetrylessWriter::Abort()      Running_ = false;  } +size_t TRetrylessWriter::GetBufferMemoryUsage() const +{ +    return BufferSize_; +} +  ////////////////////////////////////////////////////////////////////////////////  } // namespace NYT diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h index baf49a258f6..d1511fc4765 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.h +++ b/yt/cpp/mapreduce/client/retryless_writer.h @@ -35,6 +35,7 @@ public:          const TRichYPath& path,          size_t bufferSize,          const TWriterOptions& options) +        : BufferSize_(bufferSize)      {          THttpHeader header("PUT", command);          header.SetInputFormat(format); @@ -51,18 +52,22 @@ public:          auto hostName = GetProxyForHeavyRequest(context);          Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header); -        BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), bufferSize)); +        BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));      }      ~TRetrylessWriter() override;      void NotifyRowEnd() override;      void Abort() override; +    size_t GetBufferMemoryUsage() const override; +  protected:      void DoWrite(const void* buf, size_t len) override;      void DoFinish() override;  private: +    const size_t BufferSize_ = 0; +      bool Running_ = true;      NHttpClient::IHttpRequestPtr Request_;      THolder<TBufferedOutput> BufferedOutput_; diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index c35ebb74811..542749c679b 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -740,6 +740,11 @@ struct IWriterImplBase          }      } +    virtual size_t GetBufferMemoryUsage() const +    { +        return 0; +    } +      virtual size_t GetTableCount() const = 0;      virtual void FinishTable(size_t tableIndex) = 0;      virtual void Abort() @@ -816,6 +821,11 @@ public:          }      } +    size_t GetBufferMemoryUsage() const +    { +        return DoGetBufferMemoryUsage(); +    } +  protected:      template <class U>      void DoAddRow(const U& row, size_t tableIndex = 0, size_t rowWeight = 0) @@ -869,6 +879,11 @@ protected:          Writer_->AddRowBatch(std::move(rowBatch), tableIndex, rowBatchWeight);      } +    size_t DoGetBufferMemoryUsage() const +    { +        return Writer_->GetBufferMemoryUsage(); +    } +      ::TIntrusivePtr<IWriterImpl> GetWriterImpl()      {          return Writer_; diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h index e2b20a18029..dbd3a2cb707 100644 --- a/yt/cpp/mapreduce/interface/io.h +++ b/yt/cpp/mapreduce/interface/io.h @@ -168,6 +168,11 @@ public:      /// By default it does nothing, but implementations are welcome to override this method.      virtual void Abort()      { } + +    virtual size_t GetBufferMemoryUsage() const +    { +        return 0; +    }  };  /// @brief Interface to deal with multiple raw output streams. @@ -191,6 +196,11 @@ public:      /// By default it does nothing, but implementations are welcome to override this method.      virtual void Abort()      { } + +    virtual size_t GetBufferMemoryUsage() const +    { +        return 0; +    }  };  //////////////////////////////////////////////////////////////////////////////// @@ -378,6 +388,8 @@ public:      /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction).      void Finish(); + +    size_t GetBufferMemoryUsage() const;  };  //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp index d08bb0a6651..cbd3f16c582 100644 --- a/yt/cpp/mapreduce/io/job_writer.cpp +++ b/yt/cpp/mapreduce/io/job_writer.cpp @@ -56,6 +56,11 @@ IOutputStream* TJobWriter::GetStream(size_t tableIndex) const  void TJobWriter::OnRowFinished(size_t)  { } +size_t TJobWriter::GetBufferMemoryUsage() const +{ +    return TStream::BUFFER_SIZE * GetStreamCount(); +} +  ////////////////////////////////////////////////////////////////////////////////  THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount) diff --git a/yt/cpp/mapreduce/io/job_writer.h b/yt/cpp/mapreduce/io/job_writer.h index 9b24650640e..9ea84023457 100644 --- a/yt/cpp/mapreduce/io/job_writer.h +++ b/yt/cpp/mapreduce/io/job_writer.h @@ -18,6 +18,7 @@ public:      explicit TJobWriter(size_t outputTableCount);      explicit TJobWriter(const TVector<TFile>& fileList); +    size_t GetBufferMemoryUsage() const override;      size_t GetStreamCount() const override;      IOutputStream* GetStream(size_t tableIndex) const override;      void OnRowFinished(size_t tableIndex) override; diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp index dcb5a0f5b5e..916dec7ae4e 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.cpp +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -24,6 +24,11 @@ TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFor  TNodeTableWriter::~TNodeTableWriter()  { } +size_t TNodeTableWriter::GetBufferMemoryUsage() const +{ +    return Output_->GetBufferMemoryUsage(); +} +  size_t TNodeTableWriter::GetTableCount() const  {      return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h index 4bf8cb2fe7a..155bec076de 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.h +++ b/yt/cpp/mapreduce/io/node_table_writer.h @@ -19,6 +19,7 @@ public:      void AddRow(const TNode& row, size_t tableIndex) override;      void AddRow(TNode&& row, size_t tableIndex) override; +    size_t GetBufferMemoryUsage() const override;      size_t GetTableCount() const override;      void FinishTable(size_t) override;      void Abort() override; diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp index 1ce78116251..160ea78f50d 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.cpp +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -105,6 +105,11 @@ TProtoTableWriter::TProtoTableWriter(  TProtoTableWriter::~TProtoTableWriter()  { } +size_t TProtoTableWriter::GetBufferMemoryUsage() const +{ +    return NodeWriter_->GetBufferMemoryUsage(); +} +  size_t TProtoTableWriter::GetTableCount() const  {      return NodeWriter_->GetTableCount(); @@ -143,6 +148,11 @@ TLenvalProtoTableWriter::TLenvalProtoTableWriter(  TLenvalProtoTableWriter::~TLenvalProtoTableWriter()  { } +size_t TLenvalProtoTableWriter::GetBufferMemoryUsage() const +{ +    return Output_->GetBufferMemoryUsage(); +} +  size_t TLenvalProtoTableWriter::GetTableCount() const  {      return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h index a6df69e6ae0..d00e77091ce 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.h +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -21,6 +21,7 @@ public:      void AddRow(const Message& row, size_t tableIndex) override;      void AddRow(Message&& row, size_t tableIndex) override; +    size_t GetBufferMemoryUsage() const override;      size_t GetTableCount() const override;      void FinishTable(size_t) override;      void Abort() override; @@ -44,6 +45,7 @@ public:      void AddRow(const Message& row, size_t tableIndex) override;      void AddRow(Message&& row, size_t tableIndex) override; +    size_t GetBufferMemoryUsage() const override;      size_t GetTableCount() const override;      void FinishTable(size_t) override;      void Abort() override; diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp index cce7ceb0f0b..fe31eb55431 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp @@ -13,6 +13,11 @@ TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output)  TYaMRTableWriter::~TYaMRTableWriter()  { } +size_t TYaMRTableWriter::GetBufferMemoryUsage() const +{ +    return Output_->GetBufferMemoryUsage(); +} +  size_t TYaMRTableWriter::GetTableCount() const  {      return Output_->GetStreamCount(); diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h index cf88eaf2878..7f72c8005af 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.h +++ b/yt/cpp/mapreduce/io/yamr_table_writer.h @@ -18,6 +18,7 @@ public:      void AddRow(const TYaMRRow& row, size_t tableIndex) override;      void AddRow(TYaMRRow&& row, size_t tableIndex) override; +    size_t GetBufferMemoryUsage() const override;      size_t GetTableCount() const override;      void FinishTable(size_t) override;      void Abort() override; | 
