aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapollo1321 <apollo1321@yandex-team.com>2024-09-18 20:45:37 +0300
committerapollo1321 <apollo1321@yandex-team.com>2024-09-18 21:01:11 +0300
commitf566662cd4a87a146ac5eeed9e3912a7217a9695 (patch)
tree2affb74c07b69ca51a2caa8f696bc0b287d336ad
parentd52033f579a6fb762b53f11cfb88dbd23b50ce5c (diff)
downloadydb-f566662cd4a87a146ac5eeed9e3912a7217a9695.tar.gz
YT-21709: Introduce TRowBatchReader for shuffle service
commit_hash:19d310f65fbda04a047ea4fce7a93284c7c3cc44
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_reader.cpp156
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_reader.h58
-rw-r--r--yt/yt/client/api/rpc_proxy/table_reader.cpp167
-rw-r--r--yt/yt/client/api/table_reader.h2
-rw-r--r--yt/yt/client/ya.make1
5 files changed, 224 insertions, 160 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
new file mode 100644
index 0000000000..22223f6945
--- /dev/null
+++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
@@ -0,0 +1,156 @@
+#include "row_batch_reader.h"
+#include "helpers.h"
+#include "row_stream.h"
+#include "wire_row_stream.h"
+
+#include <yt/yt/client/table_client/name_table.h>
+
+#include <yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h>
+
+namespace NYT::NApi::NRpcProxy {
+
+using namespace NConcurrency;
+using namespace NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRowBatchReader::TRowBatchReader(
+ IAsyncZeroCopyInputStreamPtr underlying,
+ bool isStreamWithStatistics)
+ : Underlying_(std::move(underlying))
+ , Decoder_(CreateWireRowStreamDecoder(NameTable_))
+ , IsStreamWithStatistics_(isStreamWithStatistics)
+{
+ YT_VERIFY(Underlying_);
+
+ RowsFuture_ = GetRows();
+ ReadyEvent_.TrySetFrom(RowsFuture_);
+}
+
+IUnversionedRowBatchPtr TRowBatchReader::Read(const TRowBatchReadOptions& options)
+{
+ StoredRows_.clear();
+
+ if (!ReadyEvent_.IsSet() || !ReadyEvent_.Get().IsOK()) {
+ return CreateEmptyUnversionedRowBatch();
+ }
+
+ if (!Finished_) {
+ ReadyEvent_ = NewPromise<void>();
+ }
+
+ std::vector<TUnversionedRow> rows;
+ rows.reserve(options.MaxRowsPerRead);
+ i64 dataWeight = 0;
+
+ while (RowsFuture_ &&
+ RowsFuture_.IsSet() &&
+ RowsFuture_.Get().IsOK() &&
+ !Finished_ &&
+ std::ssize(rows) < options.MaxRowsPerRead &&
+ dataWeight < options.MaxDataWeightPerRead)
+ {
+ const auto& currentRows = RowsFuture_.Get().Value();
+
+ if (currentRows.Empty()) {
+ ReadyEvent_.Set();
+ Finished_ = true;
+ continue;
+ }
+
+ while (CurrentRowsOffset_ < std::ssize(currentRows) &&
+ std::ssize(rows) < options.MaxRowsPerRead &&
+ dataWeight < options.MaxDataWeightPerRead)
+ {
+ auto row = currentRows[CurrentRowsOffset_++];
+ rows.push_back(row);
+ dataWeight += GetDataWeight(row);
+ }
+
+ StoredRows_.push_back(currentRows);
+
+ if (CurrentRowsOffset_ == std::ssize(currentRows)) {
+ RowsFuture_ = GetRows();
+ CurrentRowsOffset_ = 0;
+ }
+ }
+
+ RowCount_ += rows.size();
+ DataWeight_ += dataWeight;
+
+ ReadyEvent_.TrySetFrom(RowsFuture_);
+ return rows.empty()
+ ? nullptr
+ : CreateBatchFromUnversionedRows(MakeSharedRange(std::move(rows), MakeStrong(this)));
+}
+
+TFuture<void> TRowBatchReader::GetReadyEvent() const
+{
+ return ReadyEvent_;
+}
+
+const TNameTablePtr& TRowBatchReader::GetNameTable() const
+{
+ return NameTable_;
+}
+
+TFuture<TSharedRange<TUnversionedRow>> TRowBatchReader::GetRows()
+{
+ return Underlying_->Read()
+ .Apply(BIND([this, weakThis = MakeWeak(this)](const TSharedRef& block) {
+ auto this_ = weakThis.Lock();
+ if (!this_) {
+ THROW_ERROR_EXCEPTION(NYT::EErrorCode::Canceled, "Reader destroyed");
+ }
+
+ NProto::TRowsetDescriptor descriptor;
+ NProto::TRowsetStatistics statistics;
+ auto payloadRef = DeserializeRowStreamBlockEnvelope(
+ block,
+ &descriptor,
+ IsStreamWithStatistics_ ? &statistics : nullptr);
+
+ ValidateRowsetDescriptor(
+ descriptor,
+ CurrentWireFormatVersion,
+ NProto::RK_UNVERSIONED,
+ NProto::ERowsetFormat::RF_YT_WIRE);
+
+ if (descriptor.rowset_format() != NApi::NRpcProxy::NProto::RF_YT_WIRE) {
+ THROW_ERROR_EXCEPTION(
+ "Unsupported rowset format %Qv",
+ NApi::NRpcProxy::NProto::ERowsetFormat_Name(descriptor.rowset_format()));
+ }
+
+ auto batch = Decoder_->Decode(payloadRef, descriptor);
+ auto rows = batch->MaterializeRows();
+
+ if (IsStreamWithStatistics_) {
+ ApplyStatistics(statistics);
+ }
+
+ if (rows.Empty()) {
+ return ExpectEndOfStream(Underlying_).Apply(BIND([=] {
+ return std::move(rows);
+ }));
+ }
+ return MakeFuture(std::move(rows));
+ }));
+}
+
+void TRowBatchReader::ApplyStatistics(const NProto::TRowsetStatistics& /*statistics*/)
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRowBatchReaderPtr CreateRowBatchReader(
+ IAsyncZeroCopyInputStreamPtr inputStream,
+ bool isStreamWithStatistics)
+{
+ return New<TRowBatchReader>(std::move(inputStream), isStreamWithStatistics);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.h b/yt/yt/client/api/rpc_proxy/row_batch_reader.h
new file mode 100644
index 0000000000..b7e30c02b6
--- /dev/null
+++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include <yt/yt/client/api/row_batch_reader.h>
+#include <yt/yt/client/api/rpc_proxy/public.h>
+
+#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h>
+
+namespace NYT::NApi::NRpcProxy {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRowBatchReader
+ : public virtual IRowBatchReader
+{
+public:
+ TRowBatchReader(
+ NConcurrency::IAsyncZeroCopyInputStreamPtr underlying,
+ bool isStreamWithStatistics);
+
+ NTableClient::IUnversionedRowBatchPtr Read(const NTableClient::TRowBatchReadOptions& options) override;
+
+ TFuture<void> GetReadyEvent() const override;
+
+ const NTableClient::TNameTablePtr& GetNameTable() const override;
+
+protected:
+ i64 RowCount_ = 0;
+ i64 DataWeight_ = 0;
+
+ virtual void ApplyStatistics(const NProto::TRowsetStatistics& statistics);
+
+private:
+ const NConcurrency::IAsyncZeroCopyInputStreamPtr Underlying_;
+ const NTableClient::TNameTablePtr NameTable_ = New<NTableClient::TNameTable>();
+ const IRowStreamDecoderPtr Decoder_;
+
+ TFuture<TSharedRange<NTableClient::TUnversionedRow>> RowsFuture_;
+ TPromise<void> ReadyEvent_ = NewPromise<void>();
+
+ std::vector<TSharedRange<NTableClient::TUnversionedRow>> StoredRows_;
+
+ bool Finished_ = false;
+ i64 CurrentRowsOffset_ = 0;
+
+ const bool IsStreamWithStatistics_;
+
+ TFuture<TSharedRange<NTableClient::TUnversionedRow>> GetRows();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRowBatchReaderPtr CreateRowBatchReader(
+ NConcurrency::IAsyncZeroCopyInputStreamPtr inputStream,
+ bool isStreamWithStatistics);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp
index 7bf5238da3..bb1f571332 100644
--- a/yt/yt/client/api/rpc_proxy/table_reader.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp
@@ -1,20 +1,12 @@
#include "table_reader.h"
+#include "row_batch_reader.h"
#include "helpers.h"
-#include "row_stream.h"
-#include "wire_row_stream.h"
-#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/table_reader.h>
+#include <yt/yt/client/table_client/schema.h>
#include <yt/yt_proto/yt/client/chunk_client/proto/data_statistics.pb.h>
-#include <yt/yt/client/table_client/name_table.h>
-#include <yt/yt/client/table_client/unversioned_reader.h>
-#include <yt/yt/client/table_client/unversioned_row.h>
-#include <yt/yt/client/table_client/row_batch.h>
-
-#include <yt/yt/core/rpc/stream.h>
-
namespace NYT::NApi::NRpcProxy {
using namespace NConcurrency;
@@ -23,7 +15,8 @@ using namespace NTableClient;
////////////////////////////////////////////////////////////////////////////////
class TTableReader
- : public ITableReader
+ : public TRowBatchReader
+ , public ITableReader
{
public:
TTableReader(
@@ -31,19 +24,13 @@ public:
i64 startRowIndex,
const std::vector<TString>& omittedInaccessibleColumns,
TTableSchemaPtr schema,
- const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics)
- : Underlying_(std::move(underlying))
+ const NProto::TRowsetStatistics& statistics)
+ : TRowBatchReader(std::move(underlying), /*isStreamWithStatistics*/ true)
, StartRowIndex_(startRowIndex)
, TableSchema_(std::move(schema))
, OmittedInaccessibleColumns_(omittedInaccessibleColumns)
- , Decoder_(CreateWireRowStreamDecoder(NameTable_))
{
- YT_VERIFY(Underlying_);
-
- ApplyReaderStatistics(statistics);
-
- RowsWithStatisticsFuture_ = GetRowsWithStatistics();
- ReadyEvent_.TrySetFrom(RowsWithStatisticsFuture_);
+ ApplyStatistics(statistics);
}
i64 GetStartRowIndex() const override
@@ -65,76 +52,6 @@ public:
return dataStatistics;
}
- TFuture<void> GetReadyEvent() const override
- {
- return ReadyEvent_;
- }
-
- IUnversionedRowBatchPtr Read(const TRowBatchReadOptions& options) override
- {
- StoredRows_.clear();
-
- if (!ReadyEvent_.IsSet() || !ReadyEvent_.Get().IsOK()) {
- return CreateEmptyUnversionedRowBatch();
- }
-
- if (!Finished_) {
- ReadyEvent_ = NewPromise<void>();
- }
-
- std::vector<TUnversionedRow> rows;
- rows.reserve(options.MaxRowsPerRead);
- i64 dataWeight = 0;
-
- while (RowsWithStatisticsFuture_ &&
- RowsWithStatisticsFuture_.IsSet() &&
- RowsWithStatisticsFuture_.Get().IsOK() &&
- !Finished_ &&
- std::ssize(rows) < options.MaxRowsPerRead &&
- dataWeight < options.MaxDataWeightPerRead)
- {
- const auto& currentRows = RowsWithStatisticsFuture_.Get().Value().Rows;
- const auto& currentStatistics = RowsWithStatisticsFuture_.Get().Value().Statistics;
-
- if (currentRows.Empty()) {
- ReadyEvent_.Set();
- Finished_ = true;
- ApplyReaderStatistics(currentStatistics);
- continue;
- }
-
- while (CurrentRowsOffset_ < std::ssize(currentRows) &&
- std::ssize(rows) < options.MaxRowsPerRead &&
- dataWeight < options.MaxDataWeightPerRead)
- {
- auto row = currentRows[CurrentRowsOffset_++];
- rows.push_back(row);
- dataWeight += GetDataWeight(row);
- }
-
- StoredRows_.push_back(currentRows);
- ApplyReaderStatistics(currentStatistics);
-
- if (CurrentRowsOffset_ == std::ssize(currentRows)) {
- RowsWithStatisticsFuture_ = GetRowsWithStatistics();
- CurrentRowsOffset_ = 0;
- }
- }
-
- RowCount_ += rows.size();
- DataWeight_ += dataWeight;
-
- ReadyEvent_.TrySetFrom(RowsWithStatisticsFuture_);
- return rows.empty()
- ? nullptr
- : CreateBatchFromUnversionedRows(MakeSharedRange(std::move(rows), MakeStrong(this)));
- }
-
- const TNameTablePtr& GetNameTable() const override
- {
- return NameTable_;
- }
-
const TTableSchemaPtr& GetTableSchema() const override
{
return TableSchema_;
@@ -146,86 +63,18 @@ public:
}
private:
- struct TRowsWithStatistics
- {
- TSharedRange<TUnversionedRow> Rows;
- NApi::NRpcProxy::NProto::TRowsetStatistics Statistics;
- };
-
- const IAsyncZeroCopyInputStreamPtr Underlying_;
const i64 StartRowIndex_;
const TTableSchemaPtr TableSchema_;
const std::vector<TString> OmittedInaccessibleColumns_;
- const TNameTablePtr NameTable_ = New<TNameTable>();
- const IRowStreamDecoderPtr Decoder_;
-
NChunkClient::NProto::TDataStatistics DataStatistics_;
i64 TotalRowCount_;
- i64 RowCount_ = 0;
- i64 DataWeight_ = 0;
-
- TNameTableToSchemaIdMapping IdMapping_;
-
- TPromise<void> ReadyEvent_ = NewPromise<void>();
-
- std::vector<TSharedRange<TUnversionedRow>> StoredRows_;
- TFuture<TRowsWithStatistics> RowsWithStatisticsFuture_;
- i64 CurrentRowsOffset_ = 0;
-
- bool Finished_ = false;
-
- void ApplyReaderStatistics(const NApi::NRpcProxy::NProto::TRowsetStatistics& statistics)
+ void ApplyStatistics(const NProto::TRowsetStatistics& statistics) override
{
TotalRowCount_ = statistics.total_row_count();
DataStatistics_ = statistics.data_statistics();
}
-
- TFuture<TRowsWithStatistics> GetRowsWithStatistics()
- {
- return Underlying_->Read()
- .Apply(BIND([this, weakThis = MakeWeak(this)] (const TSharedRef& block) {
- auto this_ = weakThis.Lock();
- if (!this_) {
- THROW_ERROR_EXCEPTION(NYT::EErrorCode::Canceled, "Reader destroyed");
- }
-
- NApi::NRpcProxy::NProto::TRowsetDescriptor descriptor;
- NApi::NRpcProxy::NProto::TRowsetStatistics statistics;
- auto payloadRef = DeserializeRowStreamBlockEnvelope(block, &descriptor, &statistics);
-
- ValidateRowsetDescriptor(
- descriptor,
- NApi::NRpcProxy::CurrentWireFormatVersion,
- NApi::NRpcProxy::NProto::RK_UNVERSIONED,
- NApi::NRpcProxy::NProto::ERowsetFormat::RF_YT_WIRE);
-
- auto decoder = GetOrCreateDecoder(descriptor.rowset_format());
- auto batch = decoder->Decode(payloadRef, descriptor);
- auto rows = batch->MaterializeRows();
- auto rowsWithStatistics = TRowsWithStatistics{
- std::move(rows),
- std::move(statistics)
- };
-
- if (rowsWithStatistics.Rows.Empty()) {
- return ExpectEndOfStream(Underlying_).Apply(BIND([=] {
- return std::move(rowsWithStatistics);
- }));
- }
- return MakeFuture(std::move(rowsWithStatistics));
- }));
- }
-
- IRowStreamDecoderPtr GetOrCreateDecoder(NApi::NRpcProxy::NProto::ERowsetFormat format)
- {
- if (format != NApi::NRpcProxy::NProto::RF_YT_WIRE) {
- THROW_ERROR_EXCEPTION("Unsupported rowset format %Qv",
- NApi::NRpcProxy::NProto::ERowsetFormat_Name(format));
- }
- return Decoder_;
- }
};
TFuture<ITableReaderPtr> CreateTableReader(IAsyncZeroCopyInputStreamPtr inputStream)
diff --git a/yt/yt/client/api/table_reader.h b/yt/yt/client/api/table_reader.h
index 927f8dcb9f..b26e970657 100644
--- a/yt/yt/client/api/table_reader.h
+++ b/yt/yt/client/api/table_reader.h
@@ -10,7 +10,7 @@ namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
struct ITableReader
- : public IRowBatchReader
+ : public virtual IRowBatchReader
{
//! Returns the starting row index within the table.
virtual i64 GetStartRowIndex() const = 0;
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index ec0e33b3a8..bcd88f84d2 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -51,6 +51,7 @@ SRCS(
api/rpc_proxy/timestamp_provider.cpp
api/rpc_proxy/transaction.cpp
api/rpc_proxy/transaction_impl.cpp
+ api/rpc_proxy/row_batch_reader.cpp
api/rpc_proxy/row_stream.cpp
api/rpc_proxy/wire_row_stream.cpp