diff options
author | Nikita Sokolov <faucct@tracto.ai> | 2025-02-27 17:58:17 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-27 18:20:00 +0300 |
commit | 4daa87047aa4f7ea66820e82d8b76b5350f8722a (patch) | |
tree | 6a4e17d8f861b93553479bed4a0984c56f95eb7e | |
parent | e09cc30c9f0165e294c13c19bcee8ab26767ec47 (diff) | |
download | ydb-4daa87047aa4f7ea66820e82d8b76b5350f8722a.tar.gz |
Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables
Historically, it data written through RPC proxies had pass dynamic table row validation standards, like max 16MB value size. This is inconvenient for static tables.
SPYT needs to write large values – ytsaurus/ytsaurus-spyt#43.
Disable wire input Max*ValueLength validation on RPC proxies.
The methods affected by this change are WriteTable and ReadShuffleData in rpc proxy server and TTableReader in rpc proxy client.
* Changelog entry
Type: feature
Component: proxy
Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables
---
Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/1019
commit_hash:264beb456994ba459ef06d8f5c25bf6d52abc08b
-rw-r--r-- | yt/yt/client/api/rpc_proxy/row_batch_reader.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/wire_row_stream.cpp | 15 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/wire_row_stream.h | 5 | ||||
-rw-r--r-- | yt/yt/client/table_client/wire_protocol.cpp | 11 | ||||
-rw-r--r-- | yt/yt/client/table_client/wire_protocol.h | 2 |
5 files changed, 28 insertions, 7 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp index b74ebb5ab2..a3aaf98bec 100644 --- a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp +++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp @@ -18,7 +18,7 @@ TRowBatchReader::TRowBatchReader( IAsyncZeroCopyInputStreamPtr underlying, bool isStreamWithStatistics) : Underlying_(std::move(underlying)) - , Decoder_(CreateWireRowStreamDecoder(NameTable_)) + , Decoder_(CreateWireRowStreamDecoder(NameTable_, CreateUnlimitedWireProtocolOptions())) , IsStreamWithStatistics_(isStreamWithStatistics) { YT_VERIFY(Underlying_); diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp index 12b1dcf2d9..d25eef2c88 100644 --- a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp +++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp @@ -74,8 +74,11 @@ class TWireRowStreamDecoder : public IRowStreamDecoder { public: - explicit TWireRowStreamDecoder(TNameTablePtr nameTable) + explicit TWireRowStreamDecoder( + TNameTablePtr nameTable, + TWireProtocolOptions wireProtocolOptions = {}) : NameTable_(std::move(nameTable)) + , WireProtocolOptions_(std::move(wireProtocolOptions)) { Descriptor_.set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion); Descriptor_.set_rowset_kind(NApi::NRpcProxy::NProto::RK_UNVERSIONED); @@ -86,7 +89,7 @@ public: const NProto::TRowsetDescriptor& descriptorDelta) override { struct TWireRowStreamDecoderTag { }; - auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag())); + auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()), WireProtocolOptions_); auto rows = reader->ReadUnversionedRowset(true); auto oldNameTableSize = Descriptor_.name_table_entries_size(); @@ -125,18 +128,20 @@ public: private: const TNameTablePtr NameTable_; + const TWireProtocolOptions WireProtocolOptions_; NApi::NRpcProxy::NProto::TRowsetDescriptor Descriptor_; TNameTableToSchemaIdMapping IdMapping_; bool HasNontrivialIdMapping_ = false; }; -IRowStreamDecoderPtr CreateWireRowStreamDecoder(TNameTablePtr nameTable) +IRowStreamDecoderPtr CreateWireRowStreamDecoder( + TNameTablePtr nameTable, + TWireProtocolOptions wireProtocolOptions) { - return New<TWireRowStreamDecoder>(std::move(nameTable)); + return New<TWireRowStreamDecoder>(std::move(nameTable), std::move(wireProtocolOptions)); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NApi::NRpcProxy - diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.h b/yt/yt/client/api/rpc_proxy/wire_row_stream.h index 528a15fdeb..aff0ff3097 100644 --- a/yt/yt/client/api/rpc_proxy/wire_row_stream.h +++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.h @@ -3,13 +3,16 @@ #include "public.h" #include <yt/yt/client/table_client/public.h> +#include <yt/yt/client/table_client/wire_protocol.h> namespace NYT::NApi::NRpcProxy { //////////////////////////////////////////////////////////////////////////////// IRowStreamEncoderPtr CreateWireRowStreamEncoder(NTableClient::TNameTablePtr nameTable); -IRowStreamDecoderPtr CreateWireRowStreamDecoder(NTableClient::TNameTablePtr nameTable); +IRowStreamDecoderPtr CreateWireRowStreamDecoder( + NTableClient::TNameTablePtr nameTable, + NTableClient::TWireProtocolOptions wireProtocolOptions = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp index b80de65602..e7b744c91e 100644 --- a/yt/yt/client/table_client/wire_protocol.cpp +++ b/yt/yt/client/table_client/wire_protocol.cpp @@ -1055,6 +1055,17 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa //////////////////////////////////////////////////////////////////////////////// +TWireProtocolOptions CreateUnlimitedWireProtocolOptions() +{ + return { + .MaxStringValueLength = std::numeric_limits<i64>::max(), + .MaxAnyValueLength = std::numeric_limits<i64>::max(), + .MaxCompositeValueLength = std::numeric_limits<i64>::max(), + }; +} + +//////////////////////////////////////////////////////////////////////////////// + std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options) { return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options)); diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h index f701aaa49e..b4eb802f51 100644 --- a/yt/yt/client/table_client/wire_protocol.h +++ b/yt/yt/client/table_client/wire_protocol.h @@ -288,6 +288,8 @@ struct TWireProtocolOptions i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight; }; +TWireProtocolOptions CreateUnlimitedWireProtocolOptions(); + //////////////////////////////////////////////////////////////////////////////// //! Creates wire protocol reader. |