diff options
author | apollo1321 <apollo1321@yandex-team.com> | 2024-08-13 13:27:07 +0300 |
---|---|---|
committer | apollo1321 <apollo1321@yandex-team.com> | 2024-08-13 14:36:55 +0300 |
commit | 3025080eee9655a8bc0333e6367cdada2e3539d2 (patch) | |
tree | 38a5bf0d0d311a7d0e60cb0f868010f6ae4f002c | |
parent | 1270994679ea8f9d37c655302c61f28781bb316b (diff) | |
download | ydb-3025080eee9655a8bc0333e6367cdada2e3539d2.tar.gz |
YT-21709: Make table client functions more general
Enables the reuse of functions in the Shuffle Service.
a702881968d19ae9c32fb0aa6cb2560bcf3b52dc
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.cpp | 27 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_reader.cpp | 37 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_reader.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_writer.cpp | 20 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_writer.h | 9 |
5 files changed, 53 insertions, 46 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index c603c7dbeb5..4b36ee2e0ef 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -39,13 +39,14 @@ namespace NYT::NApi::NRpcProxy { //////////////////////////////////////////////////////////////////////////////// -using namespace NYPath; -using namespace NYson; +using namespace NConcurrency; +using namespace NObjectClient; using namespace NTableClient; using namespace NTabletClient; -using namespace NObjectClient; using namespace NTransactionClient; +using namespace NYPath; using namespace NYTree; +using namespace NYson; using NYT::ToProto; using NYT::FromProto; @@ -747,7 +748,10 @@ TFuture<ITableReaderPtr> TClientBase::CreateTableReader( ToProto(req->mutable_transactional_options(), options); ToProto(req->mutable_suppressable_access_tracking_options(), options); - return NRpcProxy::CreateTableReader(std::move(req)); + return NRpc::CreateRpcClientInputStream(std::move(req)) + .Apply(BIND([=] (IAsyncZeroCopyInputStreamPtr inputStream) { + return NRpcProxy::CreateTableReader(std::move(inputStream)); + })); } TFuture<ITableWriterPtr> TClientBase::CreateTableWriter( @@ -766,7 +770,20 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter( ToProto(req->mutable_transactional_options(), options); - return NRpcProxy::CreateTableWriter(std::move(req)); + auto schema = New<TTableSchema>(); + return NRpc::CreateRpcClientOutputStream( + std::move(req), + BIND ([=] (const TSharedRef& metaRef) { + NApi::NRpcProxy::NProto::TWriteTableMeta meta; + if (!TryDeserializeProto(&meta, metaRef)) { + THROW_ERROR_EXCEPTION("Failed to deserialize schema for table writer"); + } + + FromProto(schema.Get(), meta.schema()); + })) + .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) { + return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema)); + })).As<ITableWriterPtr>(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/table_reader.cpp b/yt/yt/client/api/rpc_proxy/table_reader.cpp index 7703fc4c5ec..6ac4eedc2a8 100644 --- a/yt/yt/client/api/rpc_proxy/table_reader.cpp +++ b/yt/yt/client/api/rpc_proxy/table_reader.cpp @@ -228,28 +228,25 @@ private: } }; -TFuture<ITableReaderPtr> CreateTableReader(TApiServiceProxy::TReqReadTablePtr request) +TFuture<ITableReaderPtr> CreateTableReader(IAsyncZeroCopyInputStreamPtr inputStream) { - return NRpc::CreateRpcClientInputStream(std::move(request)) - .Apply(BIND([=] (const IAsyncZeroCopyInputStreamPtr& inputStream) { - return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) { - NApi::NRpcProxy::NProto::TRspReadTableMeta meta; - if (!TryDeserializeProto(&meta, metaRef)) { - THROW_ERROR_EXCEPTION("Failed to deserialize table reader meta information"); - } + return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) { + NApi::NRpcProxy::NProto::TRspReadTableMeta meta; + if (!TryDeserializeProto(&meta, metaRef)) { + THROW_ERROR_EXCEPTION("Failed to deserialize table reader meta information"); + } - i64 startRowIndex = meta.start_row_index(); - auto omittedInaccessibleColumns = FromProto<std::vector<TString>>( - meta.omitted_inaccessible_columns()); - auto schema = NYT::FromProto<TTableSchemaPtr>(meta.schema()); - return New<TTableReader>( - inputStream, - startRowIndex, - std::move(omittedInaccessibleColumns), - std::move(schema), - meta.statistics()); - })).As<ITableReaderPtr>(); - })); + i64 startRowIndex = meta.start_row_index(); + auto omittedInaccessibleColumns = FromProto<std::vector<TString>>( + meta.omitted_inaccessible_columns()); + auto schema = NYT::FromProto<TTableSchemaPtr>(meta.schema()); + return New<TTableReader>( + inputStream, + startRowIndex, + std::move(omittedInaccessibleColumns), + std::move(schema), + meta.statistics()); + })).As<ITableReaderPtr>(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/table_reader.h b/yt/yt/client/api/rpc_proxy/table_reader.h index cc84ef2ab0f..dded41df4c1 100644 --- a/yt/yt/client/api/rpc_proxy/table_reader.h +++ b/yt/yt/client/api/rpc_proxy/table_reader.h @@ -1,13 +1,15 @@ #pragma once -#include "api_service_proxy.h" +#include <yt/yt/client/api/public.h> + +#include <yt/yt/core/concurrency/public.h> namespace NYT::NApi::NRpcProxy { //////////////////////////////////////////////////////////////////////////////// TFuture<ITableReaderPtr> CreateTableReader( - TApiServiceProxy::TReqReadTablePtr request); + NConcurrency::IAsyncZeroCopyInputStreamPtr inputStream); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/table_writer.cpp b/yt/yt/client/api/rpc_proxy/table_writer.cpp index 40e696bf05d..31197018f60 100644 --- a/yt/yt/client/api/rpc_proxy/table_writer.cpp +++ b/yt/yt/client/api/rpc_proxy/table_writer.cpp @@ -82,23 +82,11 @@ private: bool Closed_ = false; }; -TFuture<ITableWriterPtr> CreateTableWriter( - TApiServiceProxy::TReqWriteTablePtr request) +ITableWriterPtr CreateTableWriter( + IAsyncZeroCopyOutputStreamPtr outputStream, + TTableSchemaPtr schema) { - auto schema = New<TTableSchema>(); - return NRpc::CreateRpcClientOutputStream( - std::move(request), - BIND ([=] (const TSharedRef& metaRef) { - NApi::NRpcProxy::NProto::TWriteTableMeta meta; - if (!TryDeserializeProto(&meta, metaRef)) { - THROW_ERROR_EXCEPTION("Failed to deserialize schema for table writer"); - } - - FromProto(schema.Get(), meta.schema()); - })) - .Apply(BIND([=] (const IAsyncZeroCopyOutputStreamPtr& outputStream) { - return New<TTableWriter>(outputStream, std::move(schema)); - })).As<ITableWriterPtr>(); + return New<TTableWriter>(std::move(outputStream), std::move(schema)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/table_writer.h b/yt/yt/client/api/rpc_proxy/table_writer.h index ce67e7ed0d0..21e20c4ba4b 100644 --- a/yt/yt/client/api/rpc_proxy/table_writer.h +++ b/yt/yt/client/api/rpc_proxy/table_writer.h @@ -1,13 +1,16 @@ #pragma once -#include "api_service_proxy.h" +#include <yt/yt/client/api/public.h> + +#include <yt/yt/core/concurrency/public.h> namespace NYT::NApi::NRpcProxy { //////////////////////////////////////////////////////////////////////////////// -TFuture<ITableWriterPtr> CreateTableWriter( - TApiServiceProxy::TReqWriteTablePtr request); +ITableWriterPtr CreateTableWriter( + NConcurrency::IAsyncZeroCopyOutputStreamPtr outputStream, + NTableClient::TTableSchemaPtr tableSchema); //////////////////////////////////////////////////////////////////////////////// |