diff options
author | qyryq <qyryq@ydb.tech> | 2025-03-04 13:14:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-04 13:14:04 +0300 |
commit | 312918ad13acd489a5fee189fa4b7477b61f3fbc (patch) | |
tree | e1b036be03f5e53c51a8c11f8c30afcc74cfc56d | |
parent | 269d7aed2238603cb3b18970b49f4d9f786ca1c8 (diff) | |
download | ydb-312918ad13acd489a5fee189fa4b7477b61f3fbc.tar.gz |
If optional is empty, get proper JSON value (#15168)
-rw-r--r-- | ydb/core/ymq/actor/index_events_processor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp | 49 | ||||
-rw-r--r-- | ydb/core/ymq/queues/common/queries.cpp | 6 |
3 files changed, 37 insertions, 22 deletions
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp index d5a601be6b..77c2beffde 100644 --- a/ydb/core/ymq/actor/index_events_processor.cpp +++ b/ydb/core/ymq/actor/index_events_processor.cpp @@ -155,7 +155,7 @@ void TSearchEventsProcessor::OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryRes auto customName = *parser.ColumnParser(2).GetOptionalUtf8(); auto createTs = *parser.ColumnParser(3).GetOptionalUint64(); auto folderId = *parser.ColumnParser(4).GetOptionalUtf8(); - auto tags = *parser.ColumnParser(5).GetOptionalUtf8(); + auto tags = parser.ColumnParser(5).GetOptionalUtf8().value_or("{}"); auto insResult = ExistingQueues.insert(std::make_pair( queueName, TQueueEvent{EQueueEventType::Existed, createTs, customName, cloudId, folderId, tags} )); @@ -197,7 +197,7 @@ void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse: auto customName = *parser.ColumnParser(3).GetOptionalUtf8(); auto timestamp = *parser.ColumnParser(4).GetOptionalUint64(); auto folderId = *parser.ColumnParser(5).GetOptionalUtf8(); - auto labels = *parser.ColumnParser(6).GetOptionalUtf8(); + auto labels = parser.ColumnParser(6).GetOptionalUtf8().value_or("{}"); auto& qEvents = QueuesEvents[queueName]; auto insResult = qEvents.insert(std::make_pair( timestamp, TQueueEvent{EQueueEventType(evType), timestamp, customName, cloudId, folderId, labels} diff --git a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp index 24e06e3ae9..c0520cc5e4 100644 --- a/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp +++ b/ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp @@ -130,7 +130,7 @@ private: UNIT_ASSERT(statusVal.IsSuccess()); } void AddEvent( - const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TString labels = "{}") + const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TMaybe<TString> labels = "{}") { if (ts == TInstant::Zero()) ts = CurrTs; @@ -143,12 +143,12 @@ private: << "\"myQueueCustomName\", " << ts.MilliSeconds() << ", " << "\"myFolder\", " - << "\"" << labels << "\"" + << (labels.Defined() ? "\"" + labels.GetRef() + "\"" : "NULL") << ");"; ExecDataQuery(queryBuilder.c_str()); } - void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TString tags = "{}") { + void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TMaybe<TString> tags = "{}") { if (ts == TInstant::Zero()) ts = CurrTs; TStringBuilder queryBuilder; @@ -159,12 +159,12 @@ private: << "\"myQueueCustomName\", " << ts.MilliSeconds() << ", " << "\"myFolder\", " - << "\"" << tags << "\"" + << (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL") << ");"; ExecDataQuery(queryBuilder.c_str()); } - void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TString tags = "{}") { + void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TMaybe<TString> tags = "{}") { Cerr << "===Started add queue batch\n"; TDeque<NYdb::NTable::TAsyncDataQueryResult> results; ui64 maxInflight = 1; @@ -181,7 +181,7 @@ private: << "\"myQueueCustomName\", " << CurrTs.MilliSeconds() << ", " << "\"myFolder\", " - << "\"" << tags << "\"" + << (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL") << ");"; auto preparedResult = session.PrepareDataQuery(queryBuilder.c_str()).GetValueSync(); @@ -323,41 +323,56 @@ private: TTestRunner("CreateIndexProcessor", this); } - void TestSingleCreateQueueEvent() { + void CheckSingleCreateQueueEvent(bool nullLabels) { TTestRunner runner{"SingleCreateQueueEvent", this}; const TString labels = "{\"k1\": \"v1\"}"; const TString escapedLabels = EscapeC(labels); - runner.AddEvent( "cloud1", "queue1", EEvType::Created, {}, escapedLabels); + runner.AddEvent("cloud1", "queue1", EEvType::Created, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels)); runner.DispatchEvents(); auto messages = runner.EventsWriter->GetMessages(); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); // Events, reindex - CheckEventsLine(messages[0], EEvType::Created, {}, labels); - CheckEventsLine(messages[1], EEvType::Existed, {}, labels); + CheckEventsLine(messages[0], EEvType::Created, {}, nullLabels ? "{}" : labels); + CheckEventsLine(messages[1], EEvType::Existed, {}, nullLabels ? "{}" : labels); UNIT_ASSERT_VALUES_EQUAL(runner.CountEvents(), 0); } - void TestReindexSingleQueue() { + void TestSingleCreateQueueEvent() { + CheckSingleCreateQueueEvent(false); + CheckSingleCreateQueueEvent(true); + } + + void CheckReindexSingleQueue(bool nullLabels) { TTestRunner runner{"ReindexSingleQueue", this}; const TString labels = "{\"k1\": \"v1\"}"; const TString escapedLabels = EscapeC(labels); - runner.AddQueue("cloud1", "queue1", {}, escapedLabels); + runner.AddQueue("cloud1", "queue1", {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels)); runner.DispatchEvents(); auto messages = runner.EventsWriter->GetMessages(); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - CheckEventsLine(messages[0], EEvType::Existed, {}, labels); + CheckEventsLine(messages[0], EEvType::Existed, {}, nullLabels ? "{}" : labels); } - void TestDeletedQueueNotReindexed() { + void TestReindexSingleQueue() { + CheckReindexSingleQueue(false); + CheckReindexSingleQueue(true); + } + + void CheckDeletedQueueNotReindexed(bool nullLabels) { TTestRunner runner{"DeletedQueueNotReindexed", this}; const TString labels = "{\"k1\": \"v1\"}"; const TString escapedLabels = EscapeC(labels); - runner.AddQueue("cloud1", "queue2", runner.PrevTs, escapedLabels); - runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, escapedLabels); + runner.AddQueue("cloud1", "queue2", runner.PrevTs, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels)); + runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels)); Sleep(TDuration::Seconds(1)); runner.DispatchEvents(); auto messages = runner.EventsWriter->GetMessages(); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - CheckEventsLine(messages[0], EEvType::Deleted, {}, labels); + CheckEventsLine(messages[0], EEvType::Deleted, {}, nullLabels ? "{}" : labels); + } + + void TestDeletedQueueNotReindexed() { + CheckDeletedQueueNotReindexed(false); + CheckDeletedQueueNotReindexed(true); } void TestManyMessages() { diff --git a/ydb/core/ymq/queues/common/queries.cpp b/ydb/core/ymq/queues/common/queries.cpp index cff1815af5..99e2f66010 100644 --- a/ydb/core/ymq/queues/common/queries.cpp +++ b/ydb/core/ymq/queues/common/queries.cpp @@ -96,11 +96,11 @@ extern const char* const MatchQueueAttributesQuery = R"__( (And (And (And (Equal (Member queuesRead 'Shards) shards) - (Equal (Member queuesRead 'Tags) tags)) + (Equal (Coalesce (Member queuesRead 'Tags) (Utf8String '"{}")) tags)) (Equal (Member queuesRead 'Partitions) partitions)) (Equal (Member queuesRead 'FifoQueue) fifo)) (Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName)) - (Bool 'true))) + (Bool 'false))) (let attrRow '( )__" ATTRS_KEYS_PARAM R"__( @@ -122,7 +122,7 @@ extern const char* const MatchQueueAttributesQuery = R"__( (Equal (Member attrRead 'MessageRetentionPeriod) retention))) (Equal (Member attrRead 'VisibilityTimeout) visibility)) (Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount)) - (Bool 'true))) + (Bool 'false))) (let sameVersion (Equal currentVersion expectedVersion)) |