diff options
| author | apollo1321 <[email protected]> | 2024-09-18 21:18:13 +0300 |
|---|---|---|
| committer | apollo1321 <[email protected]> | 2024-09-18 21:32:42 +0300 |
| commit | 1ff5503fb75269187625fb885f3b7961259c5b05 (patch) | |
| tree | 1ec000db962a775adc478908da1889ecb99ea84e | |
| parent | f566662cd4a87a146ac5eeed9e3912a7217a9695 (diff) | |
YT-21709: Introduce TRowBatchWriter for shuffle service
commit_hash:3f47f41fc3e5b49173d99678eed1ff4c51f2b749
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/row_batch_writer.cpp | 68 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/row_batch_writer.h | 41 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/table_writer.cpp | 57 | ||||
| -rw-r--r-- | yt/yt/client/api/table_writer.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/ya.make | 1 |
5 files changed, 115 insertions, 54 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp b/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp new file mode 100644 index 00000000000..7d1cbe5a4de --- /dev/null +++ b/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp @@ -0,0 +1,68 @@ +#include "row_batch_writer.h" +#include "row_stream.h" +#include "wire_row_stream.h" + +#include <yt/yt/client/table_client/name_table.h> +#include <yt/yt/client/table_client/row_batch.h> + +#include <yt/yt/core/concurrency/async_stream.h> + +namespace NYT::NApi::NRpcProxy { + +using namespace NConcurrency; +using namespace NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +TRowBatchWriter::TRowBatchWriter(IAsyncZeroCopyOutputStreamPtr underlying) + : Underlying_(std::move(underlying)) + , Encoder_(CreateWireRowStreamEncoder(NameTable_)) +{ + YT_VERIFY(Underlying_); + NameTable_->SetEnableColumnNameValidation(); +} + +bool TRowBatchWriter::Write(TRange<TUnversionedRow> rows) +{ + YT_VERIFY(!Closed_); + YT_VERIFY(ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK()); + + auto batch = CreateBatchFromUnversionedRows(TSharedRange<TUnversionedRow>(rows, nullptr)); + + auto block = Encoder_->Encode(batch, nullptr); + + ReadyEvent_ = NewPromise<void>(); + ReadyEvent_.TrySetFrom(Underlying_->Write(std::move(block))); + + return ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK(); +} + +TFuture<void> TRowBatchWriter::GetReadyEvent() +{ + return ReadyEvent_; +} + +TFuture<void> TRowBatchWriter::Close() +{ + YT_VERIFY(!Closed_); + Closed_ = true; + + return Underlying_->Close(); +} + +const TNameTablePtr& TRowBatchWriter::GetNameTable() const +{ + return NameTable_; +} + +//////////////////////////////////////////////////////////////////////////////// + +IRowBatchWriterPtr CreateRowBatchWriter( + IAsyncZeroCopyOutputStreamPtr outputStream) +{ + return New<TRowBatchWriter>(std::move(outputStream)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/row_batch_writer.h b/yt/yt/client/api/rpc_proxy/row_batch_writer.h new file mode 100644 index 00000000000..1476f08ca66 --- /dev/null +++ b/yt/yt/client/api/rpc_proxy/row_batch_writer.h @@ -0,0 +1,41 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/api/row_batch_writer.h> + +namespace NYT::NApi::NRpcProxy { + +//////////////////////////////////////////////////////////////////////////////// + +class TRowBatchWriter + : public virtual IRowBatchWriter +{ +public: + explicit TRowBatchWriter(NConcurrency::IAsyncZeroCopyOutputStreamPtr underlying); + + bool Write(TRange<NTableClient::TUnversionedRow> rows) override; + + TFuture<void> GetReadyEvent() override; + + TFuture<void> Close() override; + + const NTableClient::TNameTablePtr& GetNameTable() const override; + +private: + const NConcurrency::IAsyncZeroCopyOutputStreamPtr Underlying_; + const NTableClient::TNameTablePtr NameTable_ = New<NTableClient::TNameTable>(); + const IRowStreamEncoderPtr Encoder_; + + TPromise<void> ReadyEvent_ = MakePromise<void>(TError()); + bool Closed_ = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRowBatchWriterPtr CreateRowBatchWriter( + NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp index 31197018f60..d957c670234 100644 --- a/yt/yt/client/api/rpc_proxy/table_writer.cpp +++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp @@ -1,15 +1,8 @@ #include "table_writer.h" -#include "helpers.h" -#include "row_stream.h" -#include "wire_row_stream.h" +#include "row_batch_writer.h" #include <yt/yt/client/api/table_writer.h> - -#include <yt/yt/client/table_client/name_table.h> #include <yt/yt/client/table_client/schema.h> -#include <yt/yt/client/table_client/row_batch.h> - -#include <yt/yt/core/rpc/stream.h> namespace NYT::NApi::NRpcProxy { @@ -19,51 +12,16 @@ using namespace NTableClient; //////////////////////////////////////////////////////////////////////////////// class TTableWriter - : public ITableWriter + : public TRowBatchWriter + , public ITableWriter { public: TTableWriter( IAsyncZeroCopyOutputStreamPtr underlying, TTableSchemaPtr schema) - : Underlying_(std::move(underlying)) + : TRowBatchWriter(std::move(underlying)) , Schema_(std::move(schema)) - , Encoder_(CreateWireRowStreamEncoder(NameTable_)) { - YT_VERIFY(Underlying_); - NameTable_->SetEnableColumnNameValidation(); - } - - bool Write(TRange<TUnversionedRow> rows) override - { - YT_VERIFY(!Closed_); - YT_VERIFY(ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK()); - - auto batch = CreateBatchFromUnversionedRows(TSharedRange<TUnversionedRow>(rows, nullptr)); - - auto block = Encoder_->Encode(batch, nullptr); - - ReadyEvent_ = NewPromise<void>(); - ReadyEvent_.TrySetFrom(Underlying_->Write(std::move(block))); - - return ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK(); - } - - TFuture<void> GetReadyEvent() override - { - return ReadyEvent_; - } - - TFuture<void> Close() override - { - YT_VERIFY(!Closed_); - Closed_ = true; - - return Underlying_->Close(); - } - - const TNameTablePtr& GetNameTable() const override - { - return NameTable_; } const TTableSchemaPtr& GetSchema() const override @@ -72,14 +30,7 @@ public: } private: - const IAsyncZeroCopyOutputStreamPtr Underlying_; const TTableSchemaPtr Schema_; - - const TNameTablePtr NameTable_ = New<TNameTable>(); - const IRowStreamEncoderPtr Encoder_; - - TPromise<void> ReadyEvent_ = MakePromise<void>(TError()); - bool Closed_ = false; }; ITableWriterPtr CreateTableWriter( diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h index fd22dbd1a06..f4d42c3c78e 100644 --- a/yt/yt/client/api/table_writer.h +++ b/yt/yt/client/api/table_writer.h @@ -8,7 +8,7 @@ namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// struct ITableWriter - : public IRowBatchWriter + : public virtual IRowBatchWriter { //! Returns the schema to be used for constructing rows. virtual const NTableClient::TTableSchemaPtr& GetSchema() const = 0; diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index bcd88f84d2d..a0cc4caf88f 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -53,6 +53,7 @@ SRCS( api/rpc_proxy/transaction_impl.cpp api/rpc_proxy/row_batch_reader.cpp api/rpc_proxy/row_stream.cpp + api/rpc_proxy/row_batch_writer.cpp api/rpc_proxy/wire_row_stream.cpp bundle_controller_client/bundle_controller_client.cpp |
