aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Sokolov <faucct@tracto.ai>2025-02-27 17:58:17 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-27 18:20:00 +0300
commit4daa87047aa4f7ea66820e82d8b76b5350f8722a (patch)
tree6a4e17d8f861b93553479bed4a0984c56f95eb7e
parente09cc30c9f0165e294c13c19bcee8ab26767ec47 (diff)
downloadydb-4daa87047aa4f7ea66820e82d8b76b5350f8722a.tar.gz
Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables
Historically, it data written through RPC proxies had pass dynamic table row validation standards, like max 16MB value size. This is inconvenient for static tables. SPYT needs to write large values – ytsaurus/ytsaurus-spyt#43. Disable wire input Max*ValueLength validation on RPC proxies. The methods affected by this change are WriteTable and ReadShuffleData in rpc proxy server and TTableReader in rpc proxy client. * Changelog entry Type: feature Component: proxy Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables --- Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/1019 commit_hash:264beb456994ba459ef06d8f5c25bf6d52abc08b
-rw-r--r--yt/yt/client/api/rpc_proxy/row_batch_reader.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/wire_row_stream.cpp15
-rw-r--r--yt/yt/client/api/rpc_proxy/wire_row_stream.h5
-rw-r--r--yt/yt/client/table_client/wire_protocol.cpp11
-rw-r--r--yt/yt/client/table_client/wire_protocol.h2
5 files changed, 28 insertions, 7 deletions
diff --git a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
index b74ebb5ab2..a3aaf98bec 100644
--- a/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
+++ b/yt/yt/client/api/rpc_proxy/row_batch_reader.cpp
@@ -18,7 +18,7 @@ TRowBatchReader::TRowBatchReader(
IAsyncZeroCopyInputStreamPtr underlying,
bool isStreamWithStatistics)
: Underlying_(std::move(underlying))
- , Decoder_(CreateWireRowStreamDecoder(NameTable_))
+ , Decoder_(CreateWireRowStreamDecoder(NameTable_, CreateUnlimitedWireProtocolOptions()))
, IsStreamWithStatistics_(isStreamWithStatistics)
{
YT_VERIFY(Underlying_);
diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
index 12b1dcf2d9..d25eef2c88 100644
--- a/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
+++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.cpp
@@ -74,8 +74,11 @@ class TWireRowStreamDecoder
: public IRowStreamDecoder
{
public:
- explicit TWireRowStreamDecoder(TNameTablePtr nameTable)
+ explicit TWireRowStreamDecoder(
+ TNameTablePtr nameTable,
+ TWireProtocolOptions wireProtocolOptions = {})
: NameTable_(std::move(nameTable))
+ , WireProtocolOptions_(std::move(wireProtocolOptions))
{
Descriptor_.set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion);
Descriptor_.set_rowset_kind(NApi::NRpcProxy::NProto::RK_UNVERSIONED);
@@ -86,7 +89,7 @@ public:
const NProto::TRowsetDescriptor& descriptorDelta) override
{
struct TWireRowStreamDecoderTag { };
- auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()));
+ auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()), WireProtocolOptions_);
auto rows = reader->ReadUnversionedRowset(true);
auto oldNameTableSize = Descriptor_.name_table_entries_size();
@@ -125,18 +128,20 @@ public:
private:
const TNameTablePtr NameTable_;
+ const TWireProtocolOptions WireProtocolOptions_;
NApi::NRpcProxy::NProto::TRowsetDescriptor Descriptor_;
TNameTableToSchemaIdMapping IdMapping_;
bool HasNontrivialIdMapping_ = false;
};
-IRowStreamDecoderPtr CreateWireRowStreamDecoder(TNameTablePtr nameTable)
+IRowStreamDecoderPtr CreateWireRowStreamDecoder(
+ TNameTablePtr nameTable,
+ TWireProtocolOptions wireProtocolOptions)
{
- return New<TWireRowStreamDecoder>(std::move(nameTable));
+ return New<TWireRowStreamDecoder>(std::move(nameTable), std::move(wireProtocolOptions));
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NApi::NRpcProxy
-
diff --git a/yt/yt/client/api/rpc_proxy/wire_row_stream.h b/yt/yt/client/api/rpc_proxy/wire_row_stream.h
index 528a15fdeb..aff0ff3097 100644
--- a/yt/yt/client/api/rpc_proxy/wire_row_stream.h
+++ b/yt/yt/client/api/rpc_proxy/wire_row_stream.h
@@ -3,13 +3,16 @@
#include "public.h"
#include <yt/yt/client/table_client/public.h>
+#include <yt/yt/client/table_client/wire_protocol.h>
namespace NYT::NApi::NRpcProxy {
////////////////////////////////////////////////////////////////////////////////
IRowStreamEncoderPtr CreateWireRowStreamEncoder(NTableClient::TNameTablePtr nameTable);
-IRowStreamDecoderPtr CreateWireRowStreamDecoder(NTableClient::TNameTablePtr nameTable);
+IRowStreamDecoderPtr CreateWireRowStreamDecoder(
+ NTableClient::TNameTablePtr nameTable,
+ NTableClient::TWireProtocolOptions wireProtocolOptions = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp
index b80de65602..e7b744c91e 100644
--- a/yt/yt/client/table_client/wire_protocol.cpp
+++ b/yt/yt/client/table_client/wire_protocol.cpp
@@ -1055,6 +1055,17 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa
////////////////////////////////////////////////////////////////////////////////
+TWireProtocolOptions CreateUnlimitedWireProtocolOptions()
+{
+ return {
+ .MaxStringValueLength = std::numeric_limits<i64>::max(),
+ .MaxAnyValueLength = std::numeric_limits<i64>::max(),
+ .MaxCompositeValueLength = std::numeric_limits<i64>::max(),
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options)
{
return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options));
diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h
index f701aaa49e..b4eb802f51 100644
--- a/yt/yt/client/table_client/wire_protocol.h
+++ b/yt/yt/client/table_client/wire_protocol.h
@@ -288,6 +288,8 @@ struct TWireProtocolOptions
i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight;
};
+TWireProtocolOptions CreateUnlimitedWireProtocolOptions();
+
////////////////////////////////////////////////////////////////////////////////
//! Creates wire protocol reader.