diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-28 09:14:05 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-28 09:33:12 +0300 |
commit | 0216115f0371d3fe312ee23618317b75f391fb4c (patch) | |
tree | e96000f669ffb23a2aa5430ac1ff5a2a1da1935d | |
parent | 57be712e6b3bf6c7a5005f003f69d9f527d5cfc5 (diff) | |
download | ydb-0216115f0371d3fe312ee23618317b75f391fb4c.tar.gz |
Limit SourceId length to 2048 chars and check KV key length
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 125 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 18 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 85 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pq_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_persqueue_v1.proto | 6 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_topic.proto | 10 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_compat_ut.cpp | 59 |
8 files changed, 223 insertions, 93 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index f05fc122e1..2b014a12e6 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -61,6 +61,12 @@ constexpr ui64 ReadResultSizeEstimationNewApi = 1 + 5 // Key id, length constexpr ui64 ErrorMessageSizeEstimation = 128; +constexpr size_t MaxKeySize = 4000; + +bool IsKeyLengthValid(const TString& key) { + return key.length() <= MaxKeySize; +} + // Guideline: // Check SetError calls: there must be no changes made to the DB before SetError call (!) @@ -1416,13 +1422,21 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db } TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TCopyRange &cmd, TKeySet& keys, - ui32 /*index*/) const + ui32 index) const { TVector<TString> nkeys; auto range = GetRange(cmd.Range, keys); for (auto it = range.first; it != range.second; ++it) { if (it->StartsWith(cmd.PrefixToRemove)) { - nkeys.push_back(cmd.PrefixToAdd + it->substr(cmd.PrefixToRemove.size())); + auto newKey = cmd.PrefixToAdd + it->substr(cmd.PrefixToRemove.size()); + if (!IsKeyLengthValid(newKey)) { + TStringStream str; + str << "KeyValue# " << TabletId + << " NewKey# " << EscapeC(newKey) << " in CmdCopyRange(" << index << ") has length " << newKey.length() << " but max is " << MaxKeySize + << " Marker# KV24"; + return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()}; + } + nkeys.push_back(newKey); } } keys.insert(nkeys.begin(), nkeys.end()); @@ -1438,8 +1452,16 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TRena str << "KeyValue# " << TabletId << " OldKey# " << EscapeC(cmd.OldKey) << " does not exist in CmdRename(" << index << ")" << " Marker# KV18"; - return {false, str.Str()}; + return {NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, str.Str()}; + } + if (!IsKeyLengthValid(cmd.NewKey)) { + TStringStream str; + str << "KeyValue# " << TabletId + << " NewKey# " << EscapeC(cmd.NewKey) << " in CmdRename(" << index << ") has length " << cmd.NewKey.length() << " but max is " << MaxKeySize + << " Marker# KV23"; + return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()}; } + keys.erase(it); keys.insert(cmd.NewKey); return {}; @@ -1455,13 +1477,21 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TConc str << "KeyValue# " << TabletId << " InputKey# " << EscapeC(key) << " does not exist in CmdConcat(" << index << ")" << " Marker# KV19"; - return {false, str.Str()}; + return {NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, str.Str()}; } if (!cmd.KeepInputs) { keys.erase(it); } } + if (!IsKeyLengthValid(cmd.OutputKey)) { + TStringStream str; + str << "KeyValue# " << TabletId + << " OutputKey length in CmdConcat(" << index << ") is " << cmd.OutputKey.length() << " but max is " << MaxKeySize + << " Marker# KV25"; + return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()}; + } + keys.insert(cmd.OutputKey); return {}; } @@ -1475,43 +1505,32 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TDele } TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TWrite &cmd, TKeySet& keys, - ui32 /*index*/) const + ui32 index) const { + if (!IsKeyLengthValid(cmd.Key)) { + TStringStream str; + str << "KeyValue# " << TabletId + << " Key length in CmdWrite(" << index << ") is " << cmd.Key.length() << " but max is " << MaxKeySize + << " Marker# KV26"; + return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()}; + } keys.insert(cmd.Key); return {}; } -bool TKeyValueState::CheckCmdCopyRanges(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, - TKeySet& keys, const TTabletStorageInfo* /*info*/) -{ - for (const auto& cmd : intermediate->CopyRanges) { - CheckCmd(cmd, keys, 0); - } - return true; +TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TGetStatus &/*cmd*/, TKeySet& /*keys*/, ui32 /*index*/) const { + return {}; } -bool TKeyValueState::CheckCmdRenames(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info) -{ - ui32 index = 0; - for (const auto& cmd : intermediate->Renames) { - const auto &[ok, msg] = CheckCmd(cmd, keys, index++); - if (!ok) { - ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info); - return false; - } - } - return true; -} +template<class Cmd> +bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TDeque<Cmd>& cmds, const TActorContext& ctx, + TKeySet& keys, const TTabletStorageInfo* info) { -bool TKeyValueState::CheckCmdConcats(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info) -{ ui32 index = 0; - for (const auto& cmd : intermediate->Concats) { - const auto &[ok, msg] = CheckCmd(cmd, keys, index++); - if (!ok) { - ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info); + for (const auto& cmd : cmds) { + const auto &[status, msg] = CheckCmd(cmd, keys, index++); + if (NKikimrKeyValue::Statuses::RSTATUS_OK != status) { + ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, status, intermediate, info); return false; } } @@ -1519,30 +1538,6 @@ bool TKeyValueState::CheckCmdConcats(THolder<TIntermediate>& intermediate, const return true; } -bool TKeyValueState::CheckCmdDeletes(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys, - const TTabletStorageInfo* /*info*/) -{ - for (const auto& cmd : intermediate->Deletes) { - CheckCmd(cmd, keys, 0); - } - return true; -} - -bool TKeyValueState::CheckCmdWrites(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys, - const TTabletStorageInfo* /*info*/) -{ - for (const auto& cmd : intermediate->Writes) { - CheckCmd(cmd, keys, 0); - } - return true; -} - -bool TKeyValueState::CheckCmdGetStatus(THolder<TIntermediate>& /*intermediate*/, const TActorContext& /*ctx*/, - TKeySet& /*keys*/, const TTabletStorageInfo* /*info*/) -{ - return true; -} - bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, const TTabletStorageInfo* info) { @@ -1564,9 +1559,9 @@ bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TActo }; for (const auto& cmd : intermediate->Commands) { - const auto &[ok, msg] = std::visit(visitor, cmd); - if (!ok) { - ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info); + const auto &[status, msg] = std::visit(visitor, cmd); + if (NKikimrKeyValue::Statuses::RSTATUS_OK != status) { + ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, status, intermediate, info); return false; } } @@ -1590,13 +1585,13 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb success = false; } - success = success && CheckCmdCopyRanges(intermediate, ctx, keys, info); - success = success && CheckCmdRenames(intermediate, ctx, keys, info); - success = success && CheckCmdConcats(intermediate, ctx, keys, info); - success = success && CheckCmdDeletes(intermediate, ctx, keys, info); - success = success && CheckCmdWrites(intermediate, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->CopyRanges, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->Renames, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->Concats, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->Deletes, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->Writes, ctx, keys, info); success = success && CheckCmds(intermediate, ctx, keys, info); - success = success && CheckCmdGetStatus(intermediate, ctx, keys, info); + success = success && CheckCmds(intermediate, intermediate->GetStatuses, ctx, keys, info); if (!success) { DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx, intermediate->RequestUid); } else { diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index 1e52702512..0e57426ec8 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -402,7 +402,7 @@ public: bool IncrementGeneration(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx); struct TCheckResult { - bool Result = true; + NKikimrKeyValue::Statuses::ReplyStatus Status = NKikimrKeyValue::Statuses::RSTATUS_OK; TString ErrorMsg; }; @@ -411,19 +411,11 @@ public: TCheckResult CheckCmd(const TIntermediate::TWrite &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TCopyRange &cmd, TKeySet& keys, ui32 index) const; TCheckResult CheckCmd(const TIntermediate::TConcat &cmd, TKeySet& keys, ui32 index) const; + TCheckResult CheckCmd(const TIntermediate::TGetStatus &cmd, TKeySet& keys, ui32 index) const; - bool CheckCmdRenames(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info); - bool CheckCmdDeletes(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info); - bool CheckCmdWrites(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info); - bool CheckCmdCopyRanges(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info); - bool CheckCmdConcats(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys, - const TTabletStorageInfo *info); - bool CheckCmdGetStatus(THolder<TIntermediate>& /*intermediate*/, const TActorContext& /*ctx*/, - TKeySet& /*keys*/, const TTabletStorageInfo* /*info*/); + template<class Cmd> + bool CheckCmds(THolder<TIntermediate>& intermediate, const TDeque<Cmd>& cmds, const TActorContext& ctx, + TKeySet& keys, const TTabletStorageInfo* info); bool CheckCmds(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys, const TTabletStorageInfo* /*info*/); diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 2f2d8118f0..860eb40908 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -738,7 +738,7 @@ struct TKeyRenamePair { TString NewKey; }; - +template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK> void ExecuteRename(TTestContext &tc, const TDeque<TKeyRenamePair> &pairs, ui64 lockedGeneration) { TDesiredPair<TEvKeyValue::TEvExecuteTransaction> dp; @@ -755,11 +755,12 @@ void ExecuteRename(TTestContext &tc, const TDeque<TKeyRenamePair> &pairs, ui64 l dp.Request.set_lock_generation(lockedGeneration); ExecuteEvent(dp, tc); - UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK, + UNIT_ASSERT_C(dp.Response.status() == ExpectedStatus, "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status()) << " msg# " << dp.Response.msg()); } +template <bool IsSuccess = true> void ExecuteConcat(TTestContext &tc, const TString &newKey, const TDeque<TString> &inputKeys, ui64 lockedGeneration, bool keepKeys) { @@ -778,9 +779,15 @@ void ExecuteConcat(TTestContext &tc, const TString &newKey, const TDeque<TString dp.Request.set_lock_generation(lockedGeneration); ExecuteEvent(dp, tc); - UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK, - "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status()) - << " msg# " << dp.Response.msg()); + if constexpr (IsSuccess) { + UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK, + "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status()) + << " msg# " << dp.Response.msg()); + } else { + UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, + "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status()) + << " msg# " << dp.Response.msg()); + } } @@ -2438,5 +2445,73 @@ Y_UNIT_TEST(TestLargeWriteAndDelete) { }); } +Y_UNIT_TEST(TestWriteLongKey) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + ExecuteObtainLock(tc, 1); + + TDeque<TKeyValuePair> keys; + keys.push_back({TString{10_KB, '_'}, ""}); + + ExecuteWrite<NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR>(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + }); +} + +Y_UNIT_TEST(TestRenameToLongKey) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + ExecuteObtainLock(tc, 1); + + TDeque<TKeyValuePair> keys; + keys.push_back({"oldKey", ""}); + + ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteRename<NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR>(tc, { {"oldKey", TString{10_KB, '_'}} }, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + }); +} + +Y_UNIT_TEST(TestCopyRangeToLongKey) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + ExecuteObtainLock(tc, 1); + + TDeque<TKeyValuePair> keys; + keys.push_back({"oldKey", ""}); + + ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteCopyRange<false>(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1, TString{10_KB, '_'}, ""); + }); +} + +Y_UNIT_TEST(TestConcatToLongKey) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + ExecuteObtainLock(tc, 1); + + TDeque<TKeyValuePair> keys; + keys.push_back({"oldKey1", "1"}); + keys.push_back({"oldKey2", "2"}); + + ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteConcat<false>(tc, TString{10_KB, '_'}, {"oldKey1", "oldKey2"}, 1, 1); + }); +} + } // TKeyValueTest } // NKikimr diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 881d9d7c33..880a278046 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -6,6 +6,7 @@ #include <ydb/core/base/tx_processing.h> #include <ydb/core/persqueue/config/config.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/protos/counters_keyvalue.pb.h> #include <ydb/core/metering/metering.h> @@ -28,7 +29,7 @@ static constexpr TDuration TOTAL_TIMEOUT = TDuration::Seconds(120); static constexpr char TMP_REQUEST_MARKER[] = "__TMP__REQUEST__MARKER__"; static constexpr ui32 CACHE_SIZE = 100_MB; static constexpr ui32 MAX_BYTES = 25_MB; -static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB; +static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048; struct TPartitionInfo { TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange, @@ -1766,7 +1767,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p errorStr = "TotalSize must be filled for first part"; } else if (cmd.HasTotalSize() && static_cast<size_t>(cmd.GetTotalSize()) <= cmd.GetData().size()) { // TotalSize must be > size of each part errorStr = "TotalSize is incorrect"; - } else if (cmd.GetSourceId().size() > MAX_SOURCE_ID_LENGTH) { + } else if (cmd.HasSourceId() && !cmd.GetSourceId().empty() && NPQ::NSourceIdEncoding::Decode(cmd.GetSourceId()).length() > MAX_SOURCE_ID_LENGTH) { errorStr = "Too big SourceId"; } else if (mirroredPartition && !cmd.GetDisableDeduplication()) { errorStr = "Write to mirrored topic is forbiden"; diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index a47ceaa1b4..c34e8d3ecc 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -1200,6 +1200,14 @@ void TestWritePQImpl(bool fast) { //read from gap CmdRead(0, 500, Max<i32>(), Max<i32>(), 6, false, tc, {1000,1001,2000,2001,3000,3002}); + + // Write long sourceId + // The write should not be executed because the SourceId exceeds the maximum allowed size + CmdWrite(0, TString(10_KB, '_'), data1, tc, true, {}, false, "", -1, 10000); + + // Write long sourceId + // The write must be completed successfully because the SourceId has the maximum allowed size + CmdWrite(0, TString(2_KB, '_'), data1, tc, false, {}, false, "", -1, 10000); }); } diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index af0f7c1ed0..5262d50f30 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -63,7 +63,7 @@ message StreamingWriteClientMessage { // Path of topic to write to. string topic = 1; // message group identifier of client data stream a.k.a. sourceId. - string message_group_id = 2; + string message_group_id = 2 [(Ydb.length).le = 2048]; // Some user metadata attached to this write session. map<string, string> session_meta = 3; // Partition group to write to. @@ -553,7 +553,7 @@ message StreamingReadServerMessage { // Representation of sequence of client messages from one write session. message Batch { // Source identifier provided by client for this batch of client messages. - bytes message_group_id = 2; + bytes message_group_id = 2 [(Ydb.length).le = 2048]; // Client metadata attached to write session, the same for all messages in batch. MetaValue session_meta = 3; // Persist timestamp on server for batch. @@ -882,7 +882,7 @@ message MigrationStreamingReadServerMessage { // Representation of sequence of client messages from one write session. message Batch { // Source identifier provided by client for this batch of client messages. - bytes source_id = 2; + bytes source_id = 2 [(Ydb.length).le = 2048]; // Client metadata attached to write session, the same for all messages in batch. repeated KeyValue extra_fields = 3; // Persist timestamp on server for batch. diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index b5a95b5f1d..c4c0606055 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -104,7 +104,7 @@ message StreamWriteMessage { string path = 1; // Producer identifier of client data stream. // Used for message deduplication by sequence numbers. - string producer_id = 2; + string producer_id = 2 [(Ydb.length).le = 2048]; // User metadata attached to this write session. // Reader will get this session meta data with each message read. map<string, string> write_session_meta = 3; @@ -112,7 +112,7 @@ message StreamWriteMessage { // If neither is set, no guarantees on ordering or partitions to write to. oneof partitioning { // All messages with given pair (producer_id, message_group_id) go to single partition in order of writes. - string message_group_id = 4; + string message_group_id = 4 [(Ydb.length).le = 2048]; // Explicit partition id to write to. int64 partition_id = 5; // Explicit partition location to write to. @@ -163,7 +163,7 @@ message StreamWriteMessage { // Per-message override for respective write session settings. oneof partitioning { // All messages with given pair (producer_id, message_group_id) go to single partition in order of writes. - string message_group_id = 5; + string message_group_id = 5 [(Ydb.length).le = 2048]; // Explicit partition id to write to. int64 partition_id = 6; // Explicit partition location to write to. @@ -377,7 +377,7 @@ message StreamReadMessage { int64 uncompressed_size = 6; // Filled if message_group_id was set on message write. - string message_group_id = 7; + string message_group_id = 7 [(Ydb.length).le = 2048]; repeated MetadataItem metadata_items = 8; } @@ -388,7 +388,7 @@ message StreamReadMessage { repeated MessageData message_data = 1; // Producer identifier provided by client for this batch of client messages. - string producer_id = 2; + string producer_id = 2 [(Ydb.length).le = 2048]; // Client metadata attached to write session, the same for all messages in batch. map<string, string> write_session_meta = 3; diff --git a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp index e42e787a20..2238ff89e6 100644 --- a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp @@ -240,6 +240,65 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) { } + Y_UNIT_TEST(LongProducerAndLongMessageGroupId) { + TPQv1CompatTestBase testServer; + std::shared_ptr<grpc::Channel> Channel_; + Channel_ = grpc::CreateChannel( + "localhost:" + ToString(testServer.Server->GrpcPort), + grpc::InsecureChannelCredentials() + ); + auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + struct Data { + TString ProducerId; + bool error; + }; + + std::vector<Data> data = { + {"producer-1", false}, + { TString(2_KB, '-'), false}, + { TString(2_KB + 1, '-'), true}, + }; + + for(const auto& [producerId, error] : data) { + Cerr << ">>>>> Case producerId.length()=" << producerId.length() << Endl; + + grpc::ClientContext wcontext; + wcontext.AddMetadata("x-ydb-database", "/Root/LbCommunal/account"); + + auto writeStream = TopicStubP_->StreamWrite(&wcontext); + UNIT_ASSERT(writeStream); + + Ydb::Topic::StreamWriteMessage::FromClient req; + Ydb::Topic::StreamWriteMessage::FromServer resp; + + req.mutable_init_request()->set_path("topic2"); + if (!producerId.Empty()) { + req.mutable_init_request()->set_producer_id(producerId); + req.mutable_init_request()->set_message_group_id(producerId); + } + + UNIT_ASSERT(writeStream->Write(req)); + UNIT_ASSERT(writeStream->Read(&resp)); + Cerr << ">>>>> Response = " << resp.server_message_case() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse); + req.Clear(); + + Cerr << ">>>>> Write message" << Endl; + auto* write = req.mutable_write_request(); + write->set_codec(Ydb::Topic::CODEC_RAW); + + auto* msg = write->add_messages(); + msg->set_seq_no(1); + msg->set_data("x"); + UNIT_ASSERT(writeStream->Write(req)); + UNIT_ASSERT(writeStream->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == (error ? Ydb::Topic::StreamWriteMessage::FromServer::SERVER_MESSAGE_NOT_SET : Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse)); + resp.Clear(); + } + } + Y_UNIT_TEST(ReadWriteSessions) { TPQv1CompatTestBase testServer; std::shared_ptr<grpc::Channel> Channel_; |