aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2025-03-04 13:14:04 +0300
committerGitHub <noreply@github.com>2025-03-04 13:14:04 +0300
commit312918ad13acd489a5fee189fa4b7477b61f3fbc (patch)
treee1b036be03f5e53c51a8c11f8c30afcc74cfc56d
parent269d7aed2238603cb3b18970b49f4d9f786ca1c8 (diff)
downloadydb-312918ad13acd489a5fee189fa4b7477b61f3fbc.tar.gz
If optional is empty, get proper JSON value (#15168)
-rw-r--r--ydb/core/ymq/actor/index_events_processor.cpp4
-rw-r--r--ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp49
-rw-r--r--ydb/core/ymq/queues/common/queries.cpp6
3 files changed, 37 insertions, 22 deletions
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp
index d5a601be6b..77c2beffde 100644
--- a/ydb/core/ymq/actor/index_events_processor.cpp
+++ b/ydb/core/ymq/actor/index_events_processor.cpp
@@ -155,7 +155,7 @@ void TSearchEventsProcessor::OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryRes
auto customName = *parser.ColumnParser(2).GetOptionalUtf8();
auto createTs = *parser.ColumnParser(3).GetOptionalUint64();
auto folderId = *parser.ColumnParser(4).GetOptionalUtf8();
- auto tags = *parser.ColumnParser(5).GetOptionalUtf8();
+ auto tags = parser.ColumnParser(5).GetOptionalUtf8().value_or("{}");
auto insResult = ExistingQueues.insert(std::make_pair(
queueName, TQueueEvent{EQueueEventType::Existed, createTs, customName, cloudId, folderId, tags}
));
@@ -197,7 +197,7 @@ void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse:
auto customName = *parser.ColumnParser(3).GetOptionalUtf8();
auto timestamp = *parser.ColumnParser(4).GetOptionalUint64();
auto folderId = *parser.ColumnParser(5).GetOptionalUtf8();
- auto labels = *parser.ColumnParser(6).GetOptionalUtf8();
+ auto labels = parser.ColumnParser(6).GetOptionalUtf8().value_or("{}");
auto& qEvents = QueuesEvents[queueName];
auto insResult = qEvents.insert(std::make_pair(
timestamp, TQueueEvent{EQueueEventType(evType), timestamp, customName, cloudId, folderId, labels}
diff --git a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
index 24e06e3ae9..c0520cc5e4 100644
--- a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
+++ b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
@@ -130,7 +130,7 @@ private:
UNIT_ASSERT(statusVal.IsSuccess());
}
void AddEvent(
- const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TString labels = "{}")
+ const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TMaybe<TString> labels = "{}")
{
if (ts == TInstant::Zero())
ts = CurrTs;
@@ -143,12 +143,12 @@ private:
<< "\"myQueueCustomName\", "
<< ts.MilliSeconds() << ", "
<< "\"myFolder\", "
- << "\"" << labels << "\""
+ << (labels.Defined() ? "\"" + labels.GetRef() + "\"" : "NULL")
<< ");";
ExecDataQuery(queryBuilder.c_str());
}
- void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TString tags = "{}") {
+ void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TMaybe<TString> tags = "{}") {
if (ts == TInstant::Zero())
ts = CurrTs;
TStringBuilder queryBuilder;
@@ -159,12 +159,12 @@ private:
<< "\"myQueueCustomName\", "
<< ts.MilliSeconds() << ", "
<< "\"myFolder\", "
- << "\"" << tags << "\""
+ << (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL")
<< ");";
ExecDataQuery(queryBuilder.c_str());
}
- void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TString tags = "{}") {
+ void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TMaybe<TString> tags = "{}") {
Cerr << "===Started add queue batch\n";
TDeque<NYdb::NTable::TAsyncDataQueryResult> results;
ui64 maxInflight = 1;
@@ -181,7 +181,7 @@ private:
<< "\"myQueueCustomName\", "
<< CurrTs.MilliSeconds() << ", "
<< "\"myFolder\", "
- << "\"" << tags << "\""
+ << (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL")
<< ");";
auto preparedResult = session.PrepareDataQuery(queryBuilder.c_str()).GetValueSync();
@@ -323,41 +323,56 @@ private:
TTestRunner("CreateIndexProcessor", this);
}
- void TestSingleCreateQueueEvent() {
+ void CheckSingleCreateQueueEvent(bool nullLabels) {
TTestRunner runner{"SingleCreateQueueEvent", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
- runner.AddEvent( "cloud1", "queue1", EEvType::Created, {}, escapedLabels);
+ runner.AddEvent("cloud1", "queue1", EEvType::Created, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); // Events, reindex
- CheckEventsLine(messages[0], EEvType::Created, {}, labels);
- CheckEventsLine(messages[1], EEvType::Existed, {}, labels);
+ CheckEventsLine(messages[0], EEvType::Created, {}, nullLabels ? "{}" : labels);
+ CheckEventsLine(messages[1], EEvType::Existed, {}, nullLabels ? "{}" : labels);
UNIT_ASSERT_VALUES_EQUAL(runner.CountEvents(), 0);
}
- void TestReindexSingleQueue() {
+ void TestSingleCreateQueueEvent() {
+ CheckSingleCreateQueueEvent(false);
+ CheckSingleCreateQueueEvent(true);
+ }
+
+ void CheckReindexSingleQueue(bool nullLabels) {
TTestRunner runner{"ReindexSingleQueue", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
- runner.AddQueue("cloud1", "queue1", {}, escapedLabels);
+ runner.AddQueue("cloud1", "queue1", {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- CheckEventsLine(messages[0], EEvType::Existed, {}, labels);
+ CheckEventsLine(messages[0], EEvType::Existed, {}, nullLabels ? "{}" : labels);
}
- void TestDeletedQueueNotReindexed() {
+ void TestReindexSingleQueue() {
+ CheckReindexSingleQueue(false);
+ CheckReindexSingleQueue(true);
+ }
+
+ void CheckDeletedQueueNotReindexed(bool nullLabels) {
TTestRunner runner{"DeletedQueueNotReindexed", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
- runner.AddQueue("cloud1", "queue2", runner.PrevTs, escapedLabels);
- runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, escapedLabels);
+ runner.AddQueue("cloud1", "queue2", runner.PrevTs, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
+ runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
Sleep(TDuration::Seconds(1));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
- CheckEventsLine(messages[0], EEvType::Deleted, {}, labels);
+ CheckEventsLine(messages[0], EEvType::Deleted, {}, nullLabels ? "{}" : labels);
+ }
+
+ void TestDeletedQueueNotReindexed() {
+ CheckDeletedQueueNotReindexed(false);
+ CheckDeletedQueueNotReindexed(true);
}
void TestManyMessages() {
diff --git a/ydb/core/ymq/queues/common/queries.cpp b/ydb/core/ymq/queues/common/queries.cpp
index cff1815af5..99e2f66010 100644
--- a/ydb/core/ymq/queues/common/queries.cpp
+++ b/ydb/core/ymq/queues/common/queries.cpp
@@ -96,11 +96,11 @@ extern const char* const MatchQueueAttributesQuery = R"__(
(And
(And
(And (Equal (Member queuesRead 'Shards) shards)
- (Equal (Member queuesRead 'Tags) tags))
+ (Equal (Coalesce (Member queuesRead 'Tags) (Utf8String '"{}")) tags))
(Equal (Member queuesRead 'Partitions) partitions))
(Equal (Member queuesRead 'FifoQueue) fifo))
(Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName))
- (Bool 'true)))
+ (Bool 'false)))
(let attrRow '(
)__" ATTRS_KEYS_PARAM R"__(
@@ -122,7 +122,7 @@ extern const char* const MatchQueueAttributesQuery = R"__(
(Equal (Member attrRead 'MessageRetentionPeriod) retention)))
(Equal (Member attrRead 'VisibilityTimeout) visibility))
(Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount))
- (Bool 'true)))
+ (Bool 'false)))
(let sameVersion
(Equal currentVersion expectedVersion))