diff options
author | apollo1321 <apollo1321@yandex-team.com> | 2024-09-18 20:45:37 +0300 |
---|---|---|
committer | apollo1321 <apollo1321@yandex-team.com> | 2024-09-18 21:01:11 +0300 |
commit | f566662cd4a87a146ac5eeed9e3912a7217a9695 (patch) | |
tree | 2affb74c07b69ca51a2caa8f696bc0b287d336ad | |
parent | d52033f579a6fb762b53f11cfb88dbd23b50ce5c (diff) | |
download | ydb-f566662cd4a87a146ac5eeed9e3912a7217a9695.tar.gz |
YT-21709: Introduce TRowBatchReader for shuffle service
commit_hash:19d310f65fbda04a047ea4fce7a93284c7c3cc44
-rw-r--r-- | yt/yt/client/api/rpc_proxy/row_batch_reader.cpp | 156 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/row_batch_reader.h | 58 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_reader.cpp | 167 | ||||
-rw-r--r-- | yt/yt/client/api/table_reader.h | 2 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 |
5 files changed, 224 insertions, 160 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp new file mode 100644 index 0000000000..22223f6945 --- /dev/null +++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp @@ -0,0 +1,156 @@ +#include "row_batch_reader.h" +#include "helpers.h" +#include "row_stream.h" +#include "wire_row_stream.h" + +#include <yt/yt/client/table_client/name_table.h> + +#include <yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h> + +namespace NYT::NApi::NRpcProxy { + +using namespace NConcurrency; +using namespace NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +TRowBatchReader::TRowBatchReader( + IAsyncZeroCopyInputStreamPtr underlying, + bool isStreamWithStatistics) + : Underlying_(std::move(underlying)) + , Decoder_(CreateWireRowStreamDecoder(NameTable_)) + , IsStreamWithStatistics_(isStreamWithStatistics) +{ + YT_VERIFY(Underlying_); + + RowsFuture_ = GetRows(); + ReadyEvent_.TrySetFrom(RowsFuture_); +} + +IUnversionedRowBatchPtr TRowBatchReader::Read(const TRowBatchReadOptions& options) +{ + StoredRows_.clear(); + + if (!ReadyEvent_.IsSet() || !ReadyEvent_.Get().IsOK()) { + return CreateEmptyUnversionedRowBatch(); + } + + if (!Finished_) { + ReadyEvent_ = NewPromise<void>(); + } + + std::vector<TUnversionedRow> rows; + rows.reserve(options.MaxRowsPerRead); + i64 dataWeight = 0; + + while (RowsFuture_ && + RowsFuture_.IsSet() && + RowsFuture_.Get().IsOK() && + !Finished_ && + std::ssize(rows) < options.MaxRowsPerRead && + dataWeight < options.MaxDataWeightPerRead) + { + const auto& currentRows = RowsFuture_.Get().Value(); + + if (currentRows.Empty()) { + ReadyEvent_.Set(); + Finished_ = true; + continue; + } + + while (CurrentRowsOffset_ < std::ssize(currentRows) && + std::ssize(rows) < options.MaxRowsPerRead && + dataWeight < options.MaxDataWeightPerRead) + { + auto row = currentRows[CurrentRowsOffset_++]; + rows.push_back(row); + dataWeight += GetDataWeight(row); + } + + StoredRows_.push_back(currentRows); + + if (CurrentRowsOffset_ == std::ssize(currentRows)) { + RowsFuture_ = GetRows(); + CurrentRowsOffset_ = 0; + } + } + + RowCount_ += rows.size(); + DataWeight_ += dataWeight; + + ReadyEvent_.TrySetFrom(RowsFuture_); + return rows.empty() + ? nullptr + : CreateBatchFromUnversionedRows(MakeSharedRange(std::move(rows), MakeStrong(this))); +} + +TFuture<void> TRowBatchReader::GetReadyEvent() const +{ + return ReadyEvent_; +} + +const TNameTablePtr& TRowBatchReader::GetNameTable() const +{ + return NameTable_; +} + +TFuture<TSharedRange<TUnversionedRow>> TRowBatchReader::GetRows() +{ + return Underlying_->Read() + .Apply(BIND([this, weakThis = MakeWeak(this)](const TSharedRef& block) { + auto this_ = weakThis.Lock(); + if (!this_) { + THROW_ERROR_EXCEPTION(NYT::EErrorCode::Canceled, "Reader destroyed"); + } + + NProto::TRowsetDescriptor descriptor; + NProto::TRowsetStatistics statistics; + auto payloadRef = DeserializeRowStreamBlockEnvelope( + block, + &descriptor, + IsStreamWithStatistics_ ? &statistics : nullptr); + + ValidateRowsetDescriptor( + descriptor, + CurrentWireFormatVersion, + NProto::RK_UNVERSIONED, + NProto::ERowsetFormat::RF_YT_WIRE); + + if (descriptor.rowset_format() != NApi::NRpcProxy::NProto::RF_YT_WIRE) { + THROW_ERROR_EXCEPTION( + "Unsupported rowset format %Qv", + NApi::NRpcProxy::NProto::ERowsetFormat_Name(descriptor.rowset_format())); + } + + auto batch = Decoder_->Decode(payloadRef, descriptor); + auto rows = batch->MaterializeRows(); + + if (IsStreamWithStatistics_) { + ApplyStatistics(statistics); + } + + if (rows.Empty()) { + return ExpectEndOfStream(Underlying_).Apply(BIND([=] { + return std::move(rows); + })); + } + return MakeFuture(std::move(rows)); + })); +} + +void TRowBatchReader::ApplyStatistics(const NProto::TRowsetStatistics& /*statistics*/) +{ +} + +//////////////////////////////////////////////////////////////////////////////// + +IRowBatchReaderPtr CreateRowBatchReader( + IAsyncZeroCopyInputStreamPtr inputStream, + bool isStreamWithStatistics) +{ + return New<TRowBatchReader>(std::move(inputStream), isStreamWithStatistics); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.h b/yt/yt/client/api/rpc_proxy/row_batch_reader.h new file mode 100644 index 0000000000..b7e30c02b6 --- /dev/null +++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.h @@ -0,0 +1,58 @@ +#pragma once + +#include <yt/yt/client/api/row_batch_reader.h> +#include <yt/yt/client/api/rpc_proxy/public.h> + +#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h> + +namespace NYT::NApi::NRpcProxy { + +//////////////////////////////////////////////////////////////////////////////// + +class TRowBatchReader + : public virtual IRowBatchReader +{ +public: + TRowBatchReader( + NConcurrency::IAsyncZeroCopyInputStreamPtr underlying, + bool isStreamWithStatistics); + + NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options) override; + + TFuture<void> GetReadyEvent() const override; + + const NTableClient::TNameTablePtr& GetNameTable() const override; + +protected: + i64 RowCount_ = 0; + i64 DataWeight_ = 0; + + virtual void ApplyStatistics(const NProto::TRowsetStatistics& statistics); + +private: + const NConcurrency::IAsyncZeroCopyInputStreamPtr Underlying_; + const NTableClient::TNameTablePtr NameTable_ = New<NTableClient::TNameTable>(); + const IRowStreamDecoderPtr Decoder_; + + TFuture<TSharedRange<NTableClient::TUnversionedRow>> RowsFuture_; + TPromise<void> ReadyEvent_ = NewPromise<void>(); + + std::vector<TSharedRange<NTableClient::TUnversionedRow>> StoredRows_; + + bool Finished_ = false; + i64 CurrentRowsOffset_ = 0; + + const bool IsStreamWithStatistics_; + + TFuture<TSharedRange<NTableClient::TUnversionedRow>> GetRows(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRowBatchReaderPtr CreateRowBatchReader( + NConcurrency::IAsyncZeroCopyInputStreamPtr inputStream, + bool isStreamWithStatistics); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp index 7bf5238da3..bb1f571332 100644 --- a/yt/yt/client/api/rpc_proxy/table_reader.cpp +++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp @@ -1,20 +1,12 @@ #include "table_reader.h" +#include "row_batch_reader.h" #include "helpers.h" -#include "row_stream.h" -#include "wire_row_stream.h" -#include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/table_reader.h> +#include <yt/yt/client/table_client/schema.h> #include <yt/yt_proto/yt/client/chunk_client/proto/data_statistics.pb.h> -#include <yt/yt/client/table_client/name_table.h> -#include <yt/yt/client/table_client/unversioned_reader.h> -#include <yt/yt/client/table_client/unversioned_row.h> -#include <yt/yt/client/table_client/row_batch.h> - -#include <yt/yt/core/rpc/stream.h> - namespace NYT::NApi::NRpcProxy { using namespace NConcurrency; @@ -23,7 +15,8 @@ using namespace NTableClient; //////////////////////////////////////////////////////////////////////////////// class TTableReader - : public ITableReader + : public TRowBatchReader + , public ITableReader { public: TTableReader( @@ -31,19 +24,13 @@ public: i64 startRowIndex, const std::vector<TString>& omittedInaccessibleColumns, TTableSchemaPtr schema, - const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics) - : Underlying_(std::move(underlying)) + const NProto::TRowsetStatistics& statistics) + : TRowBatchReader(std::move(underlying), /*isStreamWithStatistics*/ true) , StartRowIndex_(startRowIndex) , TableSchema_(std::move(schema)) , OmittedInaccessibleColumns_(omittedInaccessibleColumns) - , Decoder_(CreateWireRowStreamDecoder(NameTable_)) { - YT_VERIFY(Underlying_); - - ApplyReaderStatistics(statistics); - - RowsWithStatisticsFuture_ = GetRowsWithStatistics(); - ReadyEvent_.TrySetFrom(RowsWithStatisticsFuture_); + ApplyStatistics(statistics); } i64 GetStartRowIndex() const override @@ -65,76 +52,6 @@ public: return dataStatistics; } - TFuture<void> GetReadyEvent() const override - { - return ReadyEvent_; - } - - IUnversionedRowBatchPtr Read(const TRowBatchReadOptions& options) override - { - StoredRows_.clear(); - - if (!ReadyEvent_.IsSet() || !ReadyEvent_.Get().IsOK()) { - return CreateEmptyUnversionedRowBatch(); - } - - if (!Finished_) { - ReadyEvent_ = NewPromise<void>(); - } - - std::vector<TUnversionedRow> rows; - rows.reserve(options.MaxRowsPerRead); - i64 dataWeight = 0; - - while (RowsWithStatisticsFuture_ && - RowsWithStatisticsFuture_.IsSet() && - RowsWithStatisticsFuture_.Get().IsOK() && - !Finished_ && - std::ssize(rows) < options.MaxRowsPerRead && - dataWeight < options.MaxDataWeightPerRead) - { - const auto& currentRows = RowsWithStatisticsFuture_.Get().Value().Rows; - const auto& currentStatistics = RowsWithStatisticsFuture_.Get().Value().Statistics; - - if (currentRows.Empty()) { - ReadyEvent_.Set(); - Finished_ = true; - ApplyReaderStatistics(currentStatistics); - continue; - } - - while (CurrentRowsOffset_ < std::ssize(currentRows) && - std::ssize(rows) < options.MaxRowsPerRead && - dataWeight < options.MaxDataWeightPerRead) - { - auto row = currentRows[CurrentRowsOffset_++]; - rows.push_back(row); - dataWeight += GetDataWeight(row); - } - - StoredRows_.push_back(currentRows); - ApplyReaderStatistics(currentStatistics); - - if (CurrentRowsOffset_ == std::ssize(currentRows)) { - RowsWithStatisticsFuture_ = GetRowsWithStatistics(); - CurrentRowsOffset_ = 0; - } - } - - RowCount_ += rows.size(); - DataWeight_ += dataWeight; - - ReadyEvent_.TrySetFrom(RowsWithStatisticsFuture_); - return rows.empty() - ? nullptr - : CreateBatchFromUnversionedRows(MakeSharedRange(std::move(rows), MakeStrong(this))); - } - - const TNameTablePtr& GetNameTable() const override - { - return NameTable_; - } - const TTableSchemaPtr& GetTableSchema() const override { return TableSchema_; @@ -146,86 +63,18 @@ public: } private: - struct TRowsWithStatistics - { - TSharedRange<TUnversionedRow> Rows; - NApi::NRpcProxy::NProto::TRowsetStatistics Statistics; - }; - - const IAsyncZeroCopyInputStreamPtr Underlying_; const i64 StartRowIndex_; const TTableSchemaPtr TableSchema_; const std::vector<TString> OmittedInaccessibleColumns_; - const TNameTablePtr NameTable_ = New<TNameTable>(); - const IRowStreamDecoderPtr Decoder_; - NChunkClient::NProto::TDataStatistics DataStatistics_; i64 TotalRowCount_; - i64 RowCount_ = 0; - i64 DataWeight_ = 0; - - TNameTableToSchemaIdMapping IdMapping_; - - TPromise<void> ReadyEvent_ = NewPromise<void>(); - - std::vector<TSharedRange<TUnversionedRow>> StoredRows_; - TFuture<TRowsWithStatistics> RowsWithStatisticsFuture_; - i64 CurrentRowsOffset_ = 0; - - bool Finished_ = false; - - void ApplyReaderStatistics(const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics) + void ApplyStatistics(const NProto::TRowsetStatistics& statistics) override { TotalRowCount_ = statistics.total_row_count(); DataStatistics_ = statistics.data_statistics(); } - - TFuture<TRowsWithStatistics> GetRowsWithStatistics() - { - return Underlying_->Read() - .Apply(BIND([this, weakThis = MakeWeak(this)] (const TSharedRef& block) { - auto this_ = weakThis.Lock(); - if (!this_) { - THROW_ERROR_EXCEPTION(NYT::EErrorCode::Canceled, "Reader destroyed"); - } - - NApi::NRpcProxy::NProto::TRowsetDescriptor descriptor; - NApi::NRpcProxy::NProto::TRowsetStatistics statistics; - auto payloadRef = DeserializeRowStreamBlockEnvelope(block, &descriptor, &statistics); - - ValidateRowsetDescriptor( - descriptor, - NApi::NRpcProxy::CurrentWireFormatVersion, - NApi::NRpcProxy::NProto::RK_UNVERSIONED, - NApi::NRpcProxy::NProto::ERowsetFormat::RF_YT_WIRE); - - auto decoder = GetOrCreateDecoder(descriptor.rowset_format()); - auto batch = decoder->Decode(payloadRef, descriptor); - auto rows = batch->MaterializeRows(); - auto rowsWithStatistics = TRowsWithStatistics{ - std::move(rows), - std::move(statistics) - }; - - if (rowsWithStatistics.Rows.Empty()) { - return ExpectEndOfStream(Underlying_).Apply(BIND([=] { - return std::move(rowsWithStatistics); - })); - } - return MakeFuture(std::move(rowsWithStatistics)); - })); - } - - IRowStreamDecoderPtr GetOrCreateDecoder(NApi::NRpcProxy::NProto::ERowsetFormat format) - { - if (format != NApi::NRpcProxy::NProto::RF_YT_WIRE) { - THROW_ERROR_EXCEPTION("Unsupported rowset format %Qv", - NApi::NRpcProxy::NProto::ERowsetFormat_Name(format)); - } - return Decoder_; - } }; TFuture<ITableReaderPtr> CreateTableReader(IAsyncZeroCopyInputStreamPtr inputStream) diff --git a/yt/yt/client/api/table_reader.h b/yt/yt/client/api/table_reader.h index 927f8dcb9f..b26e970657 100644 --- a/yt/yt/client/api/table_reader.h +++ b/yt/yt/client/api/table_reader.h @@ -10,7 +10,7 @@ namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// struct ITableReader - : public IRowBatchReader + : public virtual IRowBatchReader { //! Returns the starting row index within the table. virtual i64 GetStartRowIndex() const = 0; diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index ec0e33b3a8..bcd88f84d2 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -51,6 +51,7 @@ SRCS( api/rpc_proxy/timestamp_provider.cpp api/rpc_proxy/transaction.cpp api/rpc_proxy/transaction_impl.cpp + api/rpc_proxy/row_batch_reader.cpp api/rpc_proxy/row_stream.cpp api/rpc_proxy/wire_row_stream.cpp |