summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapollo1321 <[email protected]>2024-09-18 21:18:13 +0300
committerapollo1321 <[email protected]>2024-09-18 21:32:42 +0300
commit1ff5503fb75269187625fb885f3b7961259c5b05 (patch)
tree1ec000db962a775adc478908da1889ecb99ea84e
parentf566662cd4a87a146ac5eeed9e3912a7217a9695 (diff)
YT-21709: Introduce TRowBatchWriter for shuffle service
commit_hash:3f47f41fc3e5b49173d99678eed1ff4c51f2b749
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_writer.cpp68
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_writer.h41
-rw-r--r--yt/yt/client/api/rpc_proxy/table_writer.cpp57
-rw-r--r--yt/yt/client/api/table_writer.h2
-rw-r--r--yt/yt/client/ya.make1
5 files changed, 115 insertions, 54 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp b/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp
new file mode 100644
index 00000000000..7d1cbe5a4de
--- /dev/null
+++ b/yt/yt/client/api/rpc_proxy/row_batch_writer.cpp
@@ -0,0 +1,68 @@
+#include "row_batch_writer.h"
+#include "row_stream.h"
+#include "wire_row_stream.h"
+
+#include <yt/yt/client/table_client/name_table.h>
+#include <yt/yt/client/table_client/row_batch.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NApi::NRpcProxy {
+
+using namespace NConcurrency;
+using namespace NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRowBatchWriter::TRowBatchWriter(IAsyncZeroCopyOutputStreamPtr underlying)
+ : Underlying_(std::move(underlying))
+ , Encoder_(CreateWireRowStreamEncoder(NameTable_))
+{
+ YT_VERIFY(Underlying_);
+ NameTable_->SetEnableColumnNameValidation();
+}
+
+bool TRowBatchWriter::Write(TRange<TUnversionedRow> rows)
+{
+ YT_VERIFY(!Closed_);
+ YT_VERIFY(ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK());
+
+ auto batch = CreateBatchFromUnversionedRows(TSharedRange<TUnversionedRow>(rows, nullptr));
+
+ auto block = Encoder_->Encode(batch, nullptr);
+
+ ReadyEvent_ = NewPromise<void>();
+ ReadyEvent_.TrySetFrom(Underlying_->Write(std::move(block)));
+
+ return ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK();
+}
+
+TFuture<void> TRowBatchWriter::GetReadyEvent()
+{
+ return ReadyEvent_;
+}
+
+TFuture<void> TRowBatchWriter::Close()
+{
+ YT_VERIFY(!Closed_);
+ Closed_ = true;
+
+ return Underlying_->Close();
+}
+
+const TNameTablePtr& TRowBatchWriter::GetNameTable() const
+{
+ return NameTable_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRowBatchWriterPtr CreateRowBatchWriter(
+ IAsyncZeroCopyOutputStreamPtr outputStream)
+{
+ return New<TRowBatchWriter>(std::move(outputStream));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_writer.h b/yt/yt/client/api/rpc_proxy/row_batch_writer.h
new file mode 100644
index 00000000000..1476f08ca66
--- /dev/null
+++ b/yt/yt/client/api/rpc_proxy/row_batch_writer.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/api/row_batch_writer.h>
+
+namespace NYT::NApi::NRpcProxy {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRowBatchWriter
+ : public virtual IRowBatchWriter
+{
+public:
+ explicit TRowBatchWriter(NConcurrency::IAsyncZeroCopyOutputStreamPtr underlying);
+
+ bool Write(TRange<NTableClient::TUnversionedRow> rows) override;
+
+ TFuture<void> GetReadyEvent() override;
+
+ TFuture<void> Close() override;
+
+ const NTableClient::TNameTablePtr& GetNameTable() const override;
+
+private:
+ const NConcurrency::IAsyncZeroCopyOutputStreamPtr Underlying_;
+ const NTableClient::TNameTablePtr NameTable_ = New<NTableClient::TNameTable>();
+ const IRowStreamEncoderPtr Encoder_;
+
+ TPromise<void> ReadyEvent_ = MakePromise<void>(TError());
+ bool Closed_ = false;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRowBatchWriterPtr CreateRowBatchWriter(
+ NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp
index 31197018f60..d957c670234 100644
--- a/yt/yt/client/api/rpc_proxy/table_writer.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp
@@ -1,15 +1,8 @@
#include "table_writer.h"
-#include "helpers.h"
-#include "row_stream.h"
-#include "wire_row_stream.h"
+#include "row_batch_writer.h"
#include <yt/yt/client/api/table_writer.h>
-
-#include <yt/yt/client/table_client/name_table.h>
#include <yt/yt/client/table_client/schema.h>
-#include <yt/yt/client/table_client/row_batch.h>
-
-#include <yt/yt/core/rpc/stream.h>
namespace NYT::NApi::NRpcProxy {
@@ -19,51 +12,16 @@ using namespace NTableClient;
////////////////////////////////////////////////////////////////////////////////
class TTableWriter
- : public ITableWriter
+ : public TRowBatchWriter
+ , public ITableWriter
{
public:
TTableWriter(
IAsyncZeroCopyOutputStreamPtr underlying,
TTableSchemaPtr schema)
- : Underlying_(std::move(underlying))
+ : TRowBatchWriter(std::move(underlying))
, Schema_(std::move(schema))
- , Encoder_(CreateWireRowStreamEncoder(NameTable_))
{
- YT_VERIFY(Underlying_);
- NameTable_->SetEnableColumnNameValidation();
- }
-
- bool Write(TRange<TUnversionedRow> rows) override
- {
- YT_VERIFY(!Closed_);
- YT_VERIFY(ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK());
-
- auto batch = CreateBatchFromUnversionedRows(TSharedRange<TUnversionedRow>(rows, nullptr));
-
- auto block = Encoder_->Encode(batch, nullptr);
-
- ReadyEvent_ = NewPromise<void>();
- ReadyEvent_.TrySetFrom(Underlying_->Write(std::move(block)));
-
- return ReadyEvent_.IsSet() && ReadyEvent_.Get().IsOK();
- }
-
- TFuture<void> GetReadyEvent() override
- {
- return ReadyEvent_;
- }
-
- TFuture<void> Close() override
- {
- YT_VERIFY(!Closed_);
- Closed_ = true;
-
- return Underlying_->Close();
- }
-
- const TNameTablePtr& GetNameTable() const override
- {
- return NameTable_;
}
const TTableSchemaPtr& GetSchema() const override
@@ -72,14 +30,7 @@ public:
}
private:
- const IAsyncZeroCopyOutputStreamPtr Underlying_;
const TTableSchemaPtr Schema_;
-
- const TNameTablePtr NameTable_ = New<TNameTable>();
- const IRowStreamEncoderPtr Encoder_;
-
- TPromise<void> ReadyEvent_ = MakePromise<void>(TError());
- bool Closed_ = false;
};
ITableWriterPtr CreateTableWriter(
diff --git a/yt/yt/client/api/table_writer.h b/yt/yt/client/api/table_writer.h
index fd22dbd1a06..f4d42c3c78e 100644
--- a/yt/yt/client/api/table_writer.h
+++ b/yt/yt/client/api/table_writer.h
@@ -8,7 +8,7 @@ namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
struct ITableWriter
- : public IRowBatchWriter
+ : public virtual IRowBatchWriter
{
//! Returns the schema to be used for constructing rows.
virtual const NTableClient::TTableSchemaPtr& GetSchema() const = 0;
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index bcd88f84d2d..a0cc4caf88f 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -53,6 +53,7 @@ SRCS(
api/rpc_proxy/transaction_impl.cpp
api/rpc_proxy/row_batch_reader.cpp
api/rpc_proxy/row_stream.cpp
+ api/rpc_proxy/row_batch_writer.cpp
api/rpc_proxy/wire_row_stream.cpp
bundle_controller_client/bundle_controller_client.cpp