aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapollo1321 <apollo1321@yandex-team.com>2024-08-13 13:27:07 +0300
committerapollo1321 <apollo1321@yandex-team.com>2024-08-13 14:36:55 +0300
commit3025080eee9655a8bc0333e6367cdada2e3539d2 (patch)
tree38a5bf0d0d311a7d0e60cb0f868010f6ae4f002c
parent1270994679ea8f9d37c655302c61f28781bb316b (diff)
downloadydb-3025080eee9655a8bc0333e6367cdada2e3539d2.tar.gz
YT-21709: Make table client functions more general
Enables the reuse of functions in the Shuffle Service. a702881968d19ae9c32fb0aa6cb2560bcf3b52dc
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp27
-rw-r--r--yt/yt/client/api/rpc_proxy/table_reader.cpp37
-rw-r--r--yt/yt/client/api/rpc_proxy/table_reader.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/table_writer.cpp20
-rw-r--r--yt/yt/client/api/rpc_proxy/table_writer.h9
5 files changed, 53 insertions, 46 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index c603c7dbeb5..4b36ee2e0ef 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -39,13 +39,14 @@ namespace NYT::NApi::NRpcProxy {
////////////////////////////////////////////////////////////////////////////////
-using namespace NYPath;
-using namespace NYson;
+using namespace NConcurrency;
+using namespace NObjectClient;
using namespace NTableClient;
using namespace NTabletClient;
-using namespace NObjectClient;
using namespace NTransactionClient;
+using namespace NYPath;
using namespace NYTree;
+using namespace NYson;
using NYT::ToProto;
using NYT::FromProto;
@@ -747,7 +748,10 @@ TFuture<ITableReaderPtr> TClientBase::CreateTableReader(
ToProto(req->mutable_transactional_options(), options);
ToProto(req->mutable_suppressable_access_tracking_options(), options);
- return NRpcProxy::CreateTableReader(std::move(req));
+ return NRpc::CreateRpcClientInputStream(std::move(req))
+ .Apply(BIND([=] (IAsyncZeroCopyInputStreamPtr inputStream) {
+ return NRpcProxy::CreateTableReader(std::move(inputStream));
+ }));
}
TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
@@ -766,7 +770,20 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
ToProto(req->mutable_transactional_options(), options);
- return NRpcProxy::CreateTableWriter(std::move(req));
+ auto schema = New<TTableSchema>();
+ return NRpc::CreateRpcClientOutputStream(
+ std::move(req),
+ BIND ([=] (const TSharedRef& metaRef) {
+ NApi::NRpcProxy::NProto::TWriteTableMeta meta;
+ if (!TryDeserializeProto(&meta, metaRef)) {
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for table writer");
+ }
+
+ FromProto(schema.Get(), meta.schema());
+ }))
+ .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) {
+ return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema));
+ })).As<ITableWriterPtr>();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp
index 7703fc4c5ec..6ac4eedc2a8 100644
--- a/yt/yt/client/api/rpc_proxy/table_reader.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp
@@ -228,28 +228,25 @@ private:
}
};
-TFuture<ITableReaderPtr> CreateTableReader(TApiServiceProxy::TReqReadTablePtr request)
+TFuture<ITableReaderPtr> CreateTableReader(IAsyncZeroCopyInputStreamPtr inputStream)
{
- return NRpc::CreateRpcClientInputStream(std::move(request))
- .Apply(BIND([=] (const IAsyncZeroCopyInputStreamPtr& inputStream) {
- return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) {
- NApi::NRpcProxy::NProto::TRspReadTableMeta meta;
- if (!TryDeserializeProto(&meta, metaRef)) {
- THROW_ERROR_EXCEPTION("Failed to deserialize table reader meta information");
- }
+ return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) {
+ NApi::NRpcProxy::NProto::TRspReadTableMeta meta;
+ if (!TryDeserializeProto(&meta, metaRef)) {
+ THROW_ERROR_EXCEPTION("Failed to deserialize table reader meta information");
+ }
- i64 startRowIndex = meta.start_row_index();
- auto omittedInaccessibleColumns = FromProto<std::vector<TString>>(
- meta.omitted_inaccessible_columns());
- auto schema = NYT::FromProto<TTableSchemaPtr>(meta.schema());
- return New<TTableReader>(
- inputStream,
- startRowIndex,
- std::move(omittedInaccessibleColumns),
- std::move(schema),
- meta.statistics());
- })).As<ITableReaderPtr>();
- }));
+ i64 startRowIndex = meta.start_row_index();
+ auto omittedInaccessibleColumns = FromProto<std::vector<TString>>(
+ meta.omitted_inaccessible_columns());
+ auto schema = NYT::FromProto<TTableSchemaPtr>(meta.schema());
+ return New<TTableReader>(
+ inputStream,
+ startRowIndex,
+ std::move(omittedInaccessibleColumns),
+ std::move(schema),
+ meta.statistics());
+ })).As<ITableReaderPtr>();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/table_reader.h b/yt/yt/client/api/rpc_proxy/table_reader.h
index cc84ef2ab0f..dded41df4c1 100644
--- a/yt/yt/client/api/rpc_proxy/table_reader.h
+++ b/yt/yt/client/api/rpc_proxy/table_reader.h
@@ -1,13 +1,15 @@
#pragma once
-#include "api_service_proxy.h"
+#include <yt/yt/client/api/public.h>
+
+#include <yt/yt/core/concurrency/public.h>
namespace NYT::NApi::NRpcProxy {
////////////////////////////////////////////////////////////////////////////////
TFuture<ITableReaderPtr> CreateTableReader(
- TApiServiceProxy::TReqReadTablePtr request);
+ NConcurrency::IAsyncZeroCopyInputStreamPtr inputStream);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp
index 40e696bf05d..31197018f60 100644
--- a/yt/yt/client/api/rpc_proxy/table_writer.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp
@@ -82,23 +82,11 @@ private:
bool Closed_ = false;
};
-TFuture<ITableWriterPtr> CreateTableWriter(
- TApiServiceProxy::TReqWriteTablePtr request)
+ITableWriterPtr CreateTableWriter(
+ IAsyncZeroCopyOutputStreamPtr outputStream,
+ TTableSchemaPtr schema)
{
- auto schema = New<TTableSchema>();
- return NRpc::CreateRpcClientOutputStream(
- std::move(request),
- BIND ([=] (const TSharedRef& metaRef) {
- NApi::NRpcProxy::NProto::TWriteTableMeta meta;
- if (!TryDeserializeProto(&meta, metaRef)) {
- THROW_ERROR_EXCEPTION("Failed to deserialize schema for table writer");
- }
-
- FromProto(schema.Get(), meta.schema());
- }))
- .Apply(BIND([=] (const IAsyncZeroCopyOutputStreamPtr& outputStream) {
- return New<TTableWriter>(outputStream, std::move(schema));
- })).As<ITableWriterPtr>();
+ return New<TTableWriter>(std::move(outputStream), std::move(schema));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/table_writer.h b/yt/yt/client/api/rpc_proxy/table_writer.h
index ce67e7ed0d0..21e20c4ba4b 100644
--- a/yt/yt/client/api/rpc_proxy/table_writer.h
+++ b/yt/yt/client/api/rpc_proxy/table_writer.h
@@ -1,13 +1,16 @@
#pragma once
-#include "api_service_proxy.h"
+#include <yt/yt/client/api/public.h>
+
+#include <yt/yt/core/concurrency/public.h>
namespace NYT::NApi::NRpcProxy {
////////////////////////////////////////////////////////////////////////////////
-TFuture<ITableWriterPtr> CreateTableWriter(
- TApiServiceProxy::TReqWriteTablePtr request);
+ITableWriterPtr CreateTableWriter(
+ NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream,
+ NTableClient::TTableSchemaPtr tableSchema);
////////////////////////////////////////////////////////////////////////////////