aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-28 09:14:05 +0300
committertesseract <tesseract@yandex-team.com>2023-08-28 09:33:12 +0300
commit0216115f0371d3fe312ee23618317b75f391fb4c (patch)
treee96000f669ffb23a2aa5430ac1ff5a2a1da1935d
parent57be712e6b3bf6c7a5005f003f69d9f527d5cfc5 (diff)
downloadydb-0216115f0371d3fe312ee23618317b75f391fb4c.tar.gz
Limit SourceId length to 2048 chars and check KV key length
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp125
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h18
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp85
-rw-r--r--ydb/core/persqueue/pq_impl.cpp5
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp8
-rw-r--r--ydb/public/api/protos/ydb_persqueue_v1.proto6
-rw-r--r--ydb/public/api/protos/ydb_topic.proto10
-rw-r--r--ydb/services/persqueue_v1/persqueue_compat_ut.cpp59
8 files changed, 223 insertions, 93 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index f05fc122e1..2b014a12e6 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -61,6 +61,12 @@ constexpr ui64 ReadResultSizeEstimationNewApi = 1 + 5 // Key id, length
constexpr ui64 ErrorMessageSizeEstimation = 128;
+constexpr size_t MaxKeySize = 4000;
+
+bool IsKeyLengthValid(const TString& key) {
+ return key.length() <= MaxKeySize;
+}
+
// Guideline:
// Check SetError calls: there must be no changes made to the DB before SetError call (!)
@@ -1416,13 +1422,21 @@ void TKeyValueState::CmdCmds(THolder<TIntermediate> &intermediate, ISimpleDb &db
}
TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TCopyRange &cmd, TKeySet& keys,
- ui32 /*index*/) const
+ ui32 index) const
{
TVector<TString> nkeys;
auto range = GetRange(cmd.Range, keys);
for (auto it = range.first; it != range.second; ++it) {
if (it->StartsWith(cmd.PrefixToRemove)) {
- nkeys.push_back(cmd.PrefixToAdd + it->substr(cmd.PrefixToRemove.size()));
+ auto newKey = cmd.PrefixToAdd + it->substr(cmd.PrefixToRemove.size());
+ if (!IsKeyLengthValid(newKey)) {
+ TStringStream str;
+ str << "KeyValue# " << TabletId
+ << " NewKey# " << EscapeC(newKey) << " in CmdCopyRange(" << index << ") has length " << newKey.length() << " but max is " << MaxKeySize
+ << " Marker# KV24";
+ return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()};
+ }
+ nkeys.push_back(newKey);
}
}
keys.insert(nkeys.begin(), nkeys.end());
@@ -1438,8 +1452,16 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TRena
str << "KeyValue# " << TabletId
<< " OldKey# " << EscapeC(cmd.OldKey) << " does not exist in CmdRename(" << index << ")"
<< " Marker# KV18";
- return {false, str.Str()};
+ return {NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, str.Str()};
+ }
+ if (!IsKeyLengthValid(cmd.NewKey)) {
+ TStringStream str;
+ str << "KeyValue# " << TabletId
+ << " NewKey# " << EscapeC(cmd.NewKey) << " in CmdRename(" << index << ") has length " << cmd.NewKey.length() << " but max is " << MaxKeySize
+ << " Marker# KV23";
+ return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()};
}
+
keys.erase(it);
keys.insert(cmd.NewKey);
return {};
@@ -1455,13 +1477,21 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TConc
str << "KeyValue# " << TabletId
<< " InputKey# " << EscapeC(key) << " does not exist in CmdConcat(" << index << ")"
<< " Marker# KV19";
- return {false, str.Str()};
+ return {NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, str.Str()};
}
if (!cmd.KeepInputs) {
keys.erase(it);
}
}
+ if (!IsKeyLengthValid(cmd.OutputKey)) {
+ TStringStream str;
+ str << "KeyValue# " << TabletId
+ << " OutputKey length in CmdConcat(" << index << ") is " << cmd.OutputKey.length() << " but max is " << MaxKeySize
+ << " Marker# KV25";
+ return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()};
+ }
+
keys.insert(cmd.OutputKey);
return {};
}
@@ -1475,43 +1505,32 @@ TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TDele
}
TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TWrite &cmd, TKeySet& keys,
- ui32 /*index*/) const
+ ui32 index) const
{
+ if (!IsKeyLengthValid(cmd.Key)) {
+ TStringStream str;
+ str << "KeyValue# " << TabletId
+ << " Key length in CmdWrite(" << index << ") is " << cmd.Key.length() << " but max is " << MaxKeySize
+ << " Marker# KV26";
+ return {NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR, str.Str()};
+ }
keys.insert(cmd.Key);
return {};
}
-bool TKeyValueState::CheckCmdCopyRanges(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/,
- TKeySet& keys, const TTabletStorageInfo* /*info*/)
-{
- for (const auto& cmd : intermediate->CopyRanges) {
- CheckCmd(cmd, keys, 0);
- }
- return true;
+TKeyValueState::TCheckResult TKeyValueState::CheckCmd(const TIntermediate::TGetStatus &/*cmd*/, TKeySet& /*keys*/, ui32 /*index*/) const {
+ return {};
}
-bool TKeyValueState::CheckCmdRenames(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info)
-{
- ui32 index = 0;
- for (const auto& cmd : intermediate->Renames) {
- const auto &[ok, msg] = CheckCmd(cmd, keys, index++);
- if (!ok) {
- ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info);
- return false;
- }
- }
- return true;
-}
+template<class Cmd>
+bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TDeque<Cmd>& cmds, const TActorContext& ctx,
+ TKeySet& keys, const TTabletStorageInfo* info) {
-bool TKeyValueState::CheckCmdConcats(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info)
-{
ui32 index = 0;
- for (const auto& cmd : intermediate->Concats) {
- const auto &[ok, msg] = CheckCmd(cmd, keys, index++);
- if (!ok) {
- ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info);
+ for (const auto& cmd : cmds) {
+ const auto &[status, msg] = CheckCmd(cmd, keys, index++);
+ if (NKikimrKeyValue::Statuses::RSTATUS_OK != status) {
+ ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, status, intermediate, info);
return false;
}
}
@@ -1519,30 +1538,6 @@ bool TKeyValueState::CheckCmdConcats(THolder<TIntermediate>& intermediate, const
return true;
}
-bool TKeyValueState::CheckCmdDeletes(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys,
- const TTabletStorageInfo* /*info*/)
-{
- for (const auto& cmd : intermediate->Deletes) {
- CheckCmd(cmd, keys, 0);
- }
- return true;
-}
-
-bool TKeyValueState::CheckCmdWrites(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys,
- const TTabletStorageInfo* /*info*/)
-{
- for (const auto& cmd : intermediate->Writes) {
- CheckCmd(cmd, keys, 0);
- }
- return true;
-}
-
-bool TKeyValueState::CheckCmdGetStatus(THolder<TIntermediate>& /*intermediate*/, const TActorContext& /*ctx*/,
- TKeySet& /*keys*/, const TTabletStorageInfo* /*info*/)
-{
- return true;
-}
-
bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
const TTabletStorageInfo* info)
{
@@ -1564,9 +1559,9 @@ bool TKeyValueState::CheckCmds(THolder<TIntermediate>& intermediate, const TActo
};
for (const auto& cmd : intermediate->Commands) {
- const auto &[ok, msg] = std::visit(visitor, cmd);
- if (!ok) {
- ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND, intermediate, info);
+ const auto &[status, msg] = std::visit(visitor, cmd);
+ if (NKikimrKeyValue::Statuses::RSTATUS_OK != status) {
+ ReplyError(ctx, msg, NMsgBusProxy::MSTATUS_ERROR, status, intermediate, info);
return false;
}
}
@@ -1590,13 +1585,13 @@ void TKeyValueState::ProcessCmds(THolder<TIntermediate> &intermediate, ISimpleDb
success = false;
}
- success = success && CheckCmdCopyRanges(intermediate, ctx, keys, info);
- success = success && CheckCmdRenames(intermediate, ctx, keys, info);
- success = success && CheckCmdConcats(intermediate, ctx, keys, info);
- success = success && CheckCmdDeletes(intermediate, ctx, keys, info);
- success = success && CheckCmdWrites(intermediate, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->CopyRanges, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->Renames, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->Concats, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->Deletes, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->Writes, ctx, keys, info);
success = success && CheckCmds(intermediate, ctx, keys, info);
- success = success && CheckCmdGetStatus(intermediate, ctx, keys, info);
+ success = success && CheckCmds(intermediate, intermediate->GetStatuses, ctx, keys, info);
if (!success) {
DropRefCountsOnErrorInTx(std::exchange(intermediate->RefCountsIncr, {}), db, ctx, intermediate->RequestUid);
} else {
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index 1e52702512..0e57426ec8 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -402,7 +402,7 @@ public:
bool IncrementGeneration(THolder<TIntermediate> &intermediate, ISimpleDb &db, const TActorContext &ctx);
struct TCheckResult {
- bool Result = true;
+ NKikimrKeyValue::Statuses::ReplyStatus Status = NKikimrKeyValue::Statuses::RSTATUS_OK;
TString ErrorMsg;
};
@@ -411,19 +411,11 @@ public:
TCheckResult CheckCmd(const TIntermediate::TWrite &cmd, TKeySet& keys, ui32 index) const;
TCheckResult CheckCmd(const TIntermediate::TCopyRange &cmd, TKeySet& keys, ui32 index) const;
TCheckResult CheckCmd(const TIntermediate::TConcat &cmd, TKeySet& keys, ui32 index) const;
+ TCheckResult CheckCmd(const TIntermediate::TGetStatus &cmd, TKeySet& keys, ui32 index) const;
- bool CheckCmdRenames(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info);
- bool CheckCmdDeletes(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info);
- bool CheckCmdWrites(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info);
- bool CheckCmdCopyRanges(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info);
- bool CheckCmdConcats(THolder<TIntermediate>& intermediate, const TActorContext& ctx, TKeySet& keys,
- const TTabletStorageInfo *info);
- bool CheckCmdGetStatus(THolder<TIntermediate>& /*intermediate*/, const TActorContext& /*ctx*/,
- TKeySet& /*keys*/, const TTabletStorageInfo* /*info*/);
+ template<class Cmd>
+ bool CheckCmds(THolder<TIntermediate>& intermediate, const TDeque<Cmd>& cmds, const TActorContext& ctx,
+ TKeySet& keys, const TTabletStorageInfo* info);
bool CheckCmds(THolder<TIntermediate>& intermediate, const TActorContext& /*ctx*/, TKeySet& keys,
const TTabletStorageInfo* /*info*/);
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 2f2d8118f0..860eb40908 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -738,7 +738,7 @@ struct TKeyRenamePair {
TString NewKey;
};
-
+template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK>
void ExecuteRename(TTestContext &tc, const TDeque<TKeyRenamePair> &pairs, ui64 lockedGeneration)
{
TDesiredPair<TEvKeyValue::TEvExecuteTransaction> dp;
@@ -755,11 +755,12 @@ void ExecuteRename(TTestContext &tc, const TDeque<TKeyRenamePair> &pairs, ui64 l
dp.Request.set_lock_generation(lockedGeneration);
ExecuteEvent(dp, tc);
- UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK,
+ UNIT_ASSERT_C(dp.Response.status() == ExpectedStatus,
"got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status())
<< " msg# " << dp.Response.msg());
}
+template <bool IsSuccess = true>
void ExecuteConcat(TTestContext &tc, const TString &newKey, const TDeque<TString> &inputKeys, ui64 lockedGeneration,
bool keepKeys)
{
@@ -778,9 +779,15 @@ void ExecuteConcat(TTestContext &tc, const TString &newKey, const TDeque<TString
dp.Request.set_lock_generation(lockedGeneration);
ExecuteEvent(dp, tc);
- UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK,
- "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status())
- << " msg# " << dp.Response.msg());
+ if constexpr (IsSuccess) {
+ UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_OK,
+ "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status())
+ << " msg# " << dp.Response.msg());
+ } else {
+ UNIT_ASSERT_C(dp.Response.status() == NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR,
+ "got# " << NKikimrKeyValue::Statuses_ReplyStatus_Name(dp.Response.status())
+ << " msg# " << dp.Response.msg());
+ }
}
@@ -2438,5 +2445,73 @@ Y_UNIT_TEST(TestLargeWriteAndDelete) {
});
}
+Y_UNIT_TEST(TestWriteLongKey) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ ExecuteObtainLock(tc, 1);
+
+ TDeque<TKeyValuePair> keys;
+ keys.push_back({TString{10_KB, '_'}, ""});
+
+ ExecuteWrite<NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR>(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ });
+}
+
+Y_UNIT_TEST(TestRenameToLongKey) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ ExecuteObtainLock(tc, 1);
+
+ TDeque<TKeyValuePair> keys;
+ keys.push_back({"oldKey", ""});
+
+ ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteRename<NKikimrKeyValue::Statuses::RSTATUS_INTERNAL_ERROR>(tc, { {"oldKey", TString{10_KB, '_'}} }, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ });
+}
+
+Y_UNIT_TEST(TestCopyRangeToLongKey) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ ExecuteObtainLock(tc, 1);
+
+ TDeque<TKeyValuePair> keys;
+ keys.push_back({"oldKey", ""});
+
+ ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteCopyRange<false>(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1, TString{10_KB, '_'}, "");
+ });
+}
+
+Y_UNIT_TEST(TestConcatToLongKey) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ ExecuteObtainLock(tc, 1);
+
+ TDeque<TKeyValuePair> keys;
+ keys.push_back({"oldKey1", "1"});
+ keys.push_back({"oldKey2", "2"});
+
+ ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteConcat<false>(tc, TString{10_KB, '_'}, {"oldKey1", "oldKey2"}, 1, 1);
+ });
+}
+
} // TKeyValueTest
} // NKikimr
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 881d9d7c33..880a278046 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/base/tx_processing.h>
#include <ydb/core/persqueue/config/config.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/protos/counters_keyvalue.pb.h>
#include <ydb/core/metering/metering.h>
@@ -28,7 +29,7 @@ static constexpr TDuration TOTAL_TIMEOUT = TDuration::Seconds(120);
static constexpr char TMP_REQUEST_MARKER[] = "__TMP__REQUEST__MARKER__";
static constexpr ui32 CACHE_SIZE = 100_MB;
static constexpr ui32 MAX_BYTES = 25_MB;
-static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB;
+static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
struct TPartitionInfo {
TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange,
@@ -1766,7 +1767,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
errorStr = "TotalSize must be filled for first part";
} else if (cmd.HasTotalSize() && static_cast<size_t>(cmd.GetTotalSize()) <= cmd.GetData().size()) { // TotalSize must be > size of each part
errorStr = "TotalSize is incorrect";
- } else if (cmd.GetSourceId().size() > MAX_SOURCE_ID_LENGTH) {
+ } else if (cmd.HasSourceId() && !cmd.GetSourceId().empty() && NPQ::NSourceIdEncoding::Decode(cmd.GetSourceId()).length() > MAX_SOURCE_ID_LENGTH) {
errorStr = "Too big SourceId";
} else if (mirroredPartition && !cmd.GetDisableDeduplication()) {
errorStr = "Write to mirrored topic is forbiden";
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index a47ceaa1b4..c34e8d3ecc 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -1200,6 +1200,14 @@ void TestWritePQImpl(bool fast) {
//read from gap
CmdRead(0, 500, Max<i32>(), Max<i32>(), 6, false, tc, {1000,1001,2000,2001,3000,3002});
+
+ // Write long sourceId
+ // The write should not be executed because the SourceId exceeds the maximum allowed size
+ CmdWrite(0, TString(10_KB, '_'), data1, tc, true, {}, false, "", -1, 10000);
+
+ // Write long sourceId
+ // The write must be completed successfully because the SourceId has the maximum allowed size
+ CmdWrite(0, TString(2_KB, '_'), data1, tc, false, {}, false, "", -1, 10000);
});
}
diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto
index af0f7c1ed0..5262d50f30 100644
--- a/ydb/public/api/protos/ydb_persqueue_v1.proto
+++ b/ydb/public/api/protos/ydb_persqueue_v1.proto
@@ -63,7 +63,7 @@ message StreamingWriteClientMessage {
// Path of topic to write to.
string topic = 1;
// message group identifier of client data stream a.k.a. sourceId.
- string message_group_id = 2;
+ string message_group_id = 2 [(Ydb.length).le = 2048];
// Some user metadata attached to this write session.
map<string, string> session_meta = 3;
// Partition group to write to.
@@ -553,7 +553,7 @@ message StreamingReadServerMessage {
// Representation of sequence of client messages from one write session.
message Batch {
// Source identifier provided by client for this batch of client messages.
- bytes message_group_id = 2;
+ bytes message_group_id = 2 [(Ydb.length).le = 2048];
// Client metadata attached to write session, the same for all messages in batch.
MetaValue session_meta = 3;
// Persist timestamp on server for batch.
@@ -882,7 +882,7 @@ message MigrationStreamingReadServerMessage {
// Representation of sequence of client messages from one write session.
message Batch {
// Source identifier provided by client for this batch of client messages.
- bytes source_id = 2;
+ bytes source_id = 2 [(Ydb.length).le = 2048];
// Client metadata attached to write session, the same for all messages in batch.
repeated KeyValue extra_fields = 3;
// Persist timestamp on server for batch.
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index b5a95b5f1d..c4c0606055 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -104,7 +104,7 @@ message StreamWriteMessage {
string path = 1;
// Producer identifier of client data stream.
// Used for message deduplication by sequence numbers.
- string producer_id = 2;
+ string producer_id = 2 [(Ydb.length).le = 2048];
// User metadata attached to this write session.
// Reader will get this session meta data with each message read.
map<string, string> write_session_meta = 3;
@@ -112,7 +112,7 @@ message StreamWriteMessage {
// If neither is set, no guarantees on ordering or partitions to write to.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
- string message_group_id = 4;
+ string message_group_id = 4 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 5;
// Explicit partition location to write to.
@@ -163,7 +163,7 @@ message StreamWriteMessage {
// Per-message override for respective write session settings.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
- string message_group_id = 5;
+ string message_group_id = 5 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 6;
// Explicit partition location to write to.
@@ -377,7 +377,7 @@ message StreamReadMessage {
int64 uncompressed_size = 6;
// Filled if message_group_id was set on message write.
- string message_group_id = 7;
+ string message_group_id = 7 [(Ydb.length).le = 2048];
repeated MetadataItem metadata_items = 8;
}
@@ -388,7 +388,7 @@ message StreamReadMessage {
repeated MessageData message_data = 1;
// Producer identifier provided by client for this batch of client messages.
- string producer_id = 2;
+ string producer_id = 2 [(Ydb.length).le = 2048];
// Client metadata attached to write session, the same for all messages in batch.
map<string, string> write_session_meta = 3;
diff --git a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
index e42e787a20..2238ff89e6 100644
--- a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
@@ -240,6 +240,65 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) {
}
+ Y_UNIT_TEST(LongProducerAndLongMessageGroupId) {
+ TPQv1CompatTestBase testServer;
+ std::shared_ptr<grpc::Channel> Channel_;
+ Channel_ = grpc::CreateChannel(
+ "localhost:" + ToString(testServer.Server->GrpcPort),
+ grpc::InsecureChannelCredentials()
+ );
+ auto TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
+
+ struct Data {
+ TString ProducerId;
+ bool error;
+ };
+
+ std::vector<Data> data = {
+ {"producer-1", false},
+ { TString(2_KB, '-'), false},
+ { TString(2_KB + 1, '-'), true},
+ };
+
+ for(const auto& [producerId, error] : data) {
+ Cerr << ">>>>> Case producerId.length()=" << producerId.length() << Endl;
+
+ grpc::ClientContext wcontext;
+ wcontext.AddMetadata("x-ydb-database", "/Root/LbCommunal/account");
+
+ auto writeStream = TopicStubP_->StreamWrite(&wcontext);
+ UNIT_ASSERT(writeStream);
+
+ Ydb::Topic::StreamWriteMessage::FromClient req;
+ Ydb::Topic::StreamWriteMessage::FromServer resp;
+
+ req.mutable_init_request()->set_path("topic2");
+ if (!producerId.Empty()) {
+ req.mutable_init_request()->set_producer_id(producerId);
+ req.mutable_init_request()->set_message_group_id(producerId);
+ }
+
+ UNIT_ASSERT(writeStream->Write(req));
+ UNIT_ASSERT(writeStream->Read(&resp));
+ Cerr << ">>>>> Response = " << resp.server_message_case() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse);
+ req.Clear();
+
+ Cerr << ">>>>> Write message" << Endl;
+ auto* write = req.mutable_write_request();
+ write->set_codec(Ydb::Topic::CODEC_RAW);
+
+ auto* msg = write->add_messages();
+ msg->set_seq_no(1);
+ msg->set_data("x");
+ UNIT_ASSERT(writeStream->Write(req));
+ UNIT_ASSERT(writeStream->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == (error ? Ydb::Topic::StreamWriteMessage::FromServer::SERVER_MESSAGE_NOT_SET : Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse));
+ resp.Clear();
+ }
+ }
+
Y_UNIT_TEST(ReadWriteSessions) {
TPQv1CompatTestBase testServer;
std::shared_ptr<grpc::Channel> Channel_;