aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorNikita Sokolov <faucct@tracto.ai>2025-02-27 17:58:17 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-27 18:20:00 +0300
commit4daa87047aa4f7ea66820e82d8b76b5350f8722a (patch)
tree6a4e17d8f861b93553479bed4a0984c56f95eb7e /yt
parente09cc30c9f0165e294c13c19bcee8ab26767ec47 (diff)
downloadydb-4daa87047aa4f7ea66820e82d8b76b5350f8722a.tar.gz
Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables
Historically, it data written through RPC proxies had pass dynamic table row validation standards, like max 16MB value size. This is inconvenient for static tables. SPYT needs to write large values – ytsaurus/ytsaurus-spyt#43. Disable wire input Max*ValueLength validation on RPC proxies. The methods affected by this change are WriteTable and ReadShuffleData in rpc proxy server and TTableReader in rpc proxy client. * Changelog entry Type: feature Component: proxy Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables --- Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/1019 commit_hash:264beb456994ba459ef06d8f5c25bf6d52abc08b
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_reader.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/wire_row_stream.cpp15
-rw-r--r--yt/yt/client/api/rpc_proxy/wire_row_stream.h5
-rw-r--r--yt/yt/client/table_client/wire_protocol.cpp11
-rw-r--r--yt/yt/client/table_client/wire_protocol.h2
5 files changed, 28 insertions, 7 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
index b74ebb5ab2..a3aaf98bec 100644
--- a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
+++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
@@ -18,7 +18,7 @@ TRowBatchReader::TRowBatchReader(
IAsyncZeroCopyInputStreamPtr underlying,
bool isStreamWithStatistics)
: Underlying_(std::move(underlying))
- , Decoder_(CreateWireRowStreamDecoder(NameTable_))
+ , Decoder_(CreateWireRowStreamDecoder(NameTable_, CreateUnlimitedWireProtocolOptions()))
, IsStreamWithStatistics_(isStreamWithStatistics)
{
YT_VERIFY(Underlying_);
diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
index 12b1dcf2d9..d25eef2c88 100644
--- a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
+++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
@@ -74,8 +74,11 @@ class TWireRowStreamDecoder
: public IRowStreamDecoder
{
public:
- explicit TWireRowStreamDecoder(TNameTablePtr nameTable)
+ explicit TWireRowStreamDecoder(
+ TNameTablePtr nameTable,
+ TWireProtocolOptions wireProtocolOptions = {})
: NameTable_(std::move(nameTable))
+ , WireProtocolOptions_(std::move(wireProtocolOptions))
{
Descriptor_.set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion);
Descriptor_.set_rowset_kind(NApi::NRpcProxy::NProto::RK_UNVERSIONED);
@@ -86,7 +89,7 @@ public:
const NProto::TRowsetDescriptor& descriptorDelta) override
{
struct TWireRowStreamDecoderTag { };
- auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()));
+ auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()), WireProtocolOptions_);
auto rows = reader->ReadUnversionedRowset(true);
auto oldNameTableSize = Descriptor_.name_table_entries_size();
@@ -125,18 +128,20 @@ public:
private:
const TNameTablePtr NameTable_;
+ const TWireProtocolOptions WireProtocolOptions_;
NApi::NRpcProxy::NProto::TRowsetDescriptor Descriptor_;
TNameTableToSchemaIdMapping IdMapping_;
bool HasNontrivialIdMapping_ = false;
};
-IRowStreamDecoderPtr CreateWireRowStreamDecoder(TNameTablePtr nameTable)
+IRowStreamDecoderPtr CreateWireRowStreamDecoder(
+ TNameTablePtr nameTable,
+ TWireProtocolOptions wireProtocolOptions)
{
- return New<TWireRowStreamDecoder>(std::move(nameTable));
+ return New<TWireRowStreamDecoder>(std::move(nameTable), std::move(wireProtocolOptions));
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NApi::NRpcProxy
-
diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.h b/yt/yt/client/api/rpc_proxy/wire_row_stream.h
index 528a15fdeb..aff0ff3097 100644
--- a/yt/yt/client/api/rpc_proxy/wire_row_stream.h
+++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.h
@@ -3,13 +3,16 @@
#include "public.h"
#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/wire_protocol.h>
namespace NYT::NApi::NRpcProxy {
////////////////////////////////////////////////////////////////////////////////
IRowStreamEncoderPtr CreateWireRowStreamEncoder(NTableClient::TNameTablePtr nameTable);
-IRowStreamDecoderPtr CreateWireRowStreamDecoder(NTableClient::TNameTablePtr nameTable);
+IRowStreamDecoderPtr CreateWireRowStreamDecoder(
+ NTableClient::TNameTablePtr nameTable,
+ NTableClient::TWireProtocolOptions wireProtocolOptions = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp
index b80de65602..e7b744c91e 100644
--- a/yt/yt/client/table_client/wire_protocol.cpp
+++ b/yt/yt/client/table_client/wire_protocol.cpp
@@ -1055,6 +1055,17 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa
////////////////////////////////////////////////////////////////////////////////
+TWireProtocolOptions CreateUnlimitedWireProtocolOptions()
+{
+ return {
+ .MaxStringValueLength = std::numeric_limits<i64>::max(),
+ .MaxAnyValueLength = std::numeric_limits<i64>::max(),
+ .MaxCompositeValueLength = std::numeric_limits<i64>::max(),
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options)
{
return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options));
diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h
index f701aaa49e..b4eb802f51 100644
--- a/yt/yt/client/table_client/wire_protocol.h
+++ b/yt/yt/client/table_client/wire_protocol.h
@@ -288,6 +288,8 @@ struct TWireProtocolOptions
i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight;
};
+TWireProtocolOptions CreateUnlimitedWireProtocolOptions();
+
////////////////////////////////////////////////////////////////////////////////
//! Creates wire protocol reader.