diff options
author | alexvru <alexvru@ydb.tech> | 2022-09-07 10:04:52 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-09-07 10:04:52 +0300 |
commit | 3187d8a424b58ab2c83831abec061ad5ac3f3e64 (patch) | |
tree | 9ae3686bb4008b31b9aa4bc17b9976977fd5cd67 | |
parent | de0b37fee68ce7ed01fa9bd894573880926acd7c (diff) | |
download | ydb-3187d8a424b58ab2c83831abec061ad5ac3f3e64.tar.gz |
Fix empty resolve result query
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.cpp | 44 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp | 40 | ||||
-rw-r--r-- | ydb/core/util/stlog.h | 27 |
9 files changed, 74 insertions, 50 deletions
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index b16abfc0dc9..ac492180798 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -27,7 +27,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Bootstrap() { Become(&TThis::StateFunc); - if (TabletId) { + if (TabletId && TabletId != Max<ui64>()) { ConnectToBlobDepot(); } } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index b679bd01f22..d0c97d7dcbe 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -133,7 +133,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(info->BlobDepotId); if (TabletId != *info->BlobDepotId) { TabletId = *info->BlobDepotId; - if (TabletId) { + if (TabletId && TabletId != Max<ui64>()) { ConnectToBlobDepot(); } diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index e6427a3ce87..cb20a439d58 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -2,23 +2,47 @@ namespace NKikimr::NBlobDepot { - void TBlobDepotAgent::TBlobMappingCache::HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg) { - for (const auto& item : msg.GetResolvedKeys()) { - TString key = item.GetKey(); + struct TResolveContext : TRequestContext { + TString Key; + + TResolveContext(TString key) + : Key(std::move(key)) + {} + }; + + void TBlobDepotAgent::TBlobMappingCache::HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg, + TRequestContext::TPtr context) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDA28, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (Msg, msg)); + + auto process = [&](TString key, const NKikimrBlobDepot::TEvResolveResult::TResolvedKey *item) { const auto [it, inserted] = Cache.try_emplace(std::move(key)); auto& entry = it->second; if (inserted) { entry.Key = it->first; } - entry.Values = item.GetValueChain(); + if (item) { + Y_VERIFY(it->first == item->GetKey()); + entry.Values = item->GetValueChain(); + } else { + entry.Values.Clear(); + } Queue.PushBack(&entry); - entry.ResolveInFlight = false; - for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { - Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values}, Agent.OtherRequestInFlight); + Agent.OnRequestComplete(item.Id, TKeyResolved{entry.Values.empty() ? nullptr : &entry.Values}, + Agent.OtherRequestInFlight); + } + }; + + for (const auto& item : msg.GetResolvedKeys()) { + process(item.GetKey(), &item); + if (context && context->Obtain<TResolveContext>().Key == item.GetKey()) { + context.reset(); } } + if (context) { + process(context->Obtain<TResolveContext>().Key, nullptr); + } } const TResolvedValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, @@ -45,7 +69,7 @@ namespace NKikimr::NBlobDepot { item->SetTabletId(id.TabletID()); } - Agent.Issue(std::move(msg), this, nullptr); + Agent.Issue(std::move(msg), this, std::make_unique<TResolveContext>(it->first)); } const ui64 id = Agent.NextRequestId++; @@ -58,9 +82,9 @@ namespace NKikimr::NBlobDepot { return nullptr; } - void TBlobDepotAgent::TBlobMappingCache::ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response) { + void TBlobDepotAgent::TBlobMappingCache::ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr context, TResponse response) { if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { - HandleResolveResult((*p)->Record); + HandleResolveResult((*p)->Record, std::move(context)); } else if (std::holds_alternative<TTabletDisconnected>(response)) { for (auto& [key, entry] : Cache) { if (entry.ResolveInFlight) { diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h index 3309bc97999..989bde19041 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.h +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h @@ -33,9 +33,9 @@ namespace NKikimr::NBlobDepot { : TRequestSender(agent) {} - void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg); + void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg, TRequestContext::TPtr context); const TResolvedValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context); - void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response); + void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response) override; }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 1436ff38d69..2ed57bc8673 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -17,6 +17,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::ConnectToBlobDepot() { + Y_VERIFY(!PipeId); PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); const ui64 id = NextRequestId++; STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId), diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 1ba881e64b2..2055a48ec76 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -96,7 +96,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Msg, msg.Record)); - Agent.BlobMappingCache.HandleResolveResult(msg.Record); + Agent.BlobMappingCache.HandleResolveResult(msg.Record, nullptr); const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) { diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 2ee16378740..4f88bc947f7 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -56,6 +56,8 @@ namespace NKikimr::NBlobDepot { bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) { auto& msg = GetQuery(); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value)); if (!value) { Response->Responses[queryIdx].Status = NKikimrProto::NODATA; diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp index 1bde6436202..1ea731ca2b8 100644 --- a/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp @@ -129,17 +129,9 @@ void SendTEvGet(TEnvironmentSetup& env, TActorId sender, ui32 groupId, TLogoBlob } TAutoPtr<TEventHandle<TEvBlobStorage::TEvGetResult>> CaptureTEvGetResult(TEnvironmentSetup& env, TActorId sender, bool termOnCapture, bool withDeadline) { - TInstant deadline = TInstant::Max(); - if (withDeadline) { - env.Runtime->WrapInActorContext(sender, [&] { - deadline = TActivationContext::Now() + TDuration::Seconds(1); - }); - } - + const TInstant deadline = withDeadline ? env.Runtime->GetClock() + TDuration::Seconds(10) : TInstant::Max(); auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, termOnCapture, deadline); - // if (res.Get() == nullptr) { Cerr << "TEvGet didn't return" << Endl; return nullptr; } // <- Temporary solution - UNIT_ASSERT(res); #ifdef LOG_GET @@ -227,17 +219,9 @@ void SendTEvGet(TEnvironmentSetup& env, TActorId sender, ui32 groupId, std::vect } TAutoPtr<TEventHandle<TEvBlobStorage::TEvGetResult>> CaptureMultiTEvGetResult(TEnvironmentSetup& env, TActorId sender, bool termOnCapture, bool withDeadline) { - TInstant deadline = TInstant::Max(); - if (withDeadline) { - env.Runtime->WrapInActorContext(sender, [&] { - deadline = TActivationContext::Now() + TDuration::Seconds(1); - }); - } - + const TInstant deadline = withDeadline ? env.Runtime->GetClock() + TDuration::Seconds(10) : TInstant::Max(); auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, termOnCapture, deadline); - // if (!res) { Cerr << "TEvDiscover didn't return" << Endl; return nullptr; } // <- Temporary Solution - UNIT_ASSERT(res); #ifdef LOG_MULTIGET @@ -313,13 +297,7 @@ void SendTEvRange(TEnvironmentSetup& env, TActorId sender, ui32 groupId, ui64 ta } TAutoPtr<TEventHandle<TEvBlobStorage::TEvRangeResult>> CaptureTEvRangeResult(TEnvironmentSetup& env, TActorId sender, bool termOnCapture, bool withDeadline) { - TInstant deadline = TInstant::Max(); - if (withDeadline) { - env.Runtime->WrapInActorContext(sender, [&] { - deadline = TActivationContext::Now() + TDuration::Seconds(1); - }); - } - + const TInstant deadline = withDeadline ? env.Runtime->GetClock() + TDuration::Seconds(10) : TInstant::Max(); auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvRangeResult>(sender, termOnCapture, deadline); #ifdef LOG_RANGE @@ -414,17 +392,9 @@ void SendTEvDiscover(TEnvironmentSetup& env, TActorId sender, ui32 groupId, ui64 } TAutoPtr<TEventHandle<TEvBlobStorage::TEvDiscoverResult>> CaptureTEvDiscoverResult(TEnvironmentSetup& env, TActorId sender, bool termOnCapture, bool withDeadline) { - TInstant deadline = TInstant::Max(); - if (withDeadline) { - env.Runtime->WrapInActorContext(sender, [&] { - deadline = TActivationContext::Now() + TDuration::Seconds(1); - }); - } - + const TInstant deadline = withDeadline ? env.Runtime->GetClock() + TDuration::Seconds(10) : TInstant::Max(); auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvDiscoverResult>(sender, termOnCapture, deadline); - // if (!res) { Cerr << "TEvDiscover didn't return" << Endl; return nullptr; } // <- Temporary Solution - UNIT_ASSERT_C(res, "Timeout - no TEvDiscoverResult received"); #ifdef LOG_DISCOVER @@ -689,4 +659,4 @@ void VerifiedBlock(TEnvironmentSetup& env, ui32 nodeId, ui32 groupId, ui64 table SendTEvBlock(env, sender, groupId, tabletId, generation); auto res = CaptureTEvBlockResult(env, sender, true); VerifyTEvBlockResult(res.Release(), tabletId, generation, state); -}
\ No newline at end of file +} diff --git a/ydb/core/util/stlog.h b/ydb/core/util/stlog.h index de6cea21310..dfe8fa79502 100644 --- a/ydb/core/util/stlog.h +++ b/ydb/core/util/stlog.h @@ -98,6 +98,13 @@ namespace NKikimr::NStLog { static constexpr bool value = decltype(check<T>(0))::value; }; + template<typename T> struct TIsIterable { static constexpr bool value = false; }; + template<typename T, typename Y> struct TIsIterable<std::deque<T, Y>> { static constexpr bool value = true; }; + template<typename T, typename Y> struct TIsIterable<std::list<T, Y>> { static constexpr bool value = true; }; + template<typename T, typename Y> struct TIsIterable<std::vector<T, Y>> { static constexpr bool value = true; }; + template<typename T> struct TIsIterable<NProtoBuf::RepeatedField<T>> { static constexpr bool value = true; }; + template<typename T> struct TIsIterable<NProtoBuf::RepeatedPtrField<T>> { static constexpr bool value = true; }; + template<typename Base, typename T> class TBoundParam : public Base { T Value; @@ -147,6 +154,26 @@ namespace NKikimr::NStLog { } } else if constexpr (THasToStringMethod<TValue>::value) { s << value.ToString(); + } else if constexpr (std::is_pointer_v<TValue> && !std::is_same_v<std::remove_cv_t<std::remove_pointer_t<TValue>>, char>) { + if (value) { + OutputParam(s, *value); + } else { + s << "<null>"; + } + } else if constexpr (TIsIterable<TValue>::value) { + auto begin = std::begin(value); + auto end = std::end(value); + bool first = true; + s << "["; + for (; begin != end; ++begin) { + if (first) { + first = false; + } else { + s << " "; + } + OutputParam(s, *begin); + } + s << "]"; } else { s << value; } |