diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-24 11:19:11 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-24 11:19:11 +0300 |
commit | 2c3b726f8b11cffca0dcb9a41aa14c7647e8f837 (patch) | |
tree | 284cbad3b8d8c33e906758a6d42f0ac33ab89179 | |
parent | 54fe5017c76997ad63a4d6031b8f662504e271a5 (diff) | |
download | ydb-2c3b726f8b11cffca0dcb9a41aa14c7647e8f837.tar.gz |
Fix BlobDepot and unit tests
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/read.cpp | 17 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/request.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 27 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 35 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_range.cpp | 58 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 5 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/config.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/impl.h | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/virtual_group.cpp | 1 |
16 files changed, 128 insertions, 87 deletions
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index e9d8c022b6..3f02eda753 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -18,6 +18,9 @@ namespace NKikimr::NBlobDepot { if (info) { Y_VERIFY(info->BlobDepotId); TabletId = *info->BlobDepotId; + LogId = TStringBuilder() << '{' << TabletId << '@' << virtualGroupId << '}'; + } else { + LogId = TStringBuilder() << '{' << '?' << '@' << virtualGroupId << "}"; } } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index fcfdd67e9d..fa79201822 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -131,6 +131,9 @@ namespace NKikimr::NBlobDepot { }; public: + TString LogId; + + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BLOB_DEPOT_AGENT_ACTOR; } @@ -185,6 +188,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(info->BlobDepotId); if (TabletId != *info->BlobDepotId) { TabletId = *info->BlobDepotId; + LogId = TStringBuilder() << '{' << TabletId << '@' << VirtualGroupId << '}'; if (TabletId && TabletId != Max<ui64>()) { ConnectToBlobDepot(); } @@ -394,7 +398,7 @@ namespace NKikimr::NBlobDepot { struct TReadContext; struct TReadArg { - const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>& Values; + const TResolvedValueChain& Values; NKikimrBlobStorage::EGetHandleClass GetHandleClass; bool MustRestoreFirst = false; TQuery *Query = nullptr; @@ -405,6 +409,7 @@ namespace NKikimr::NBlobDepot { }; bool IssueRead(const TReadArg& arg, TString& error); + static TString GetValueChainId(const TResolvedValueChain& valueChain); void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg); diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index 0acdb5ccc3..49e9d467e5 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -12,7 +12,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TBlobMappingCache::HandleResolveResult(ui64 tag, const NKikimrBlobDepot::TEvResolveResult& msg, TRequestContext::TPtr context) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA28, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA28, "HandleResolveResult", (AgentId, Agent.LogId), (Cookie, tag), (Msg, msg)); auto process = [&](TString key, const NKikimrBlobDepot::TEvResolveResult::TResolvedKey *item) { @@ -84,7 +84,7 @@ namespace NKikimr::NBlobDepot { HandleResolveResult(tag, (*p)->Record, std::move(context)); } else if (std::holds_alternative<TTabletDisconnected>(response)) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA38, "TBlobMappingCache::TTabletDisconnected", - (VirtualGroupId, Agent.VirtualGroupId), (Cookie, tag)); + (AgentId, Agent.LogId), (Cookie, tag)); if (auto resolveContext = std::dynamic_pointer_cast<TResolveContext>(context)) { if (const auto it = Cache.find(resolveContext->Key); it != Cache.end() && it->second.ResolveInFlight) { for (const ui64 id : std::exchange(it->second.PendingQueries, {})) { diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 490bc26795..04dab7c1eb 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot { if (status == NKikimrProto::UNKNOWN) { block.PendingBlockChecks.PushBack(query); } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (AgentId, Agent.LogId), (QueryId, query->GetQueryId()), (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now), (ExpirationTimestamp, block.ExpirationTimestamp), (RefreshQueried, refreshQueried), (RefreshId, block.RefreshId)); @@ -39,7 +39,7 @@ namespace NKikimr::NBlobDepot { } else if (std::holds_alternative<TTabletDisconnected>(response)) { auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); auto& block = Blocks[queryBlockContext.TabletId]; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA36, "TBlocksManager::TTabletDisconnected", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA36, "TBlocksManager::TTabletDisconnected", (AgentId, Agent.LogId), (TabletId, queryBlockContext.TabletId), (RefreshId, block.RefreshId)); block.RefreshId = 0; IssueOnUpdateBlock(block); @@ -50,7 +50,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TBlocksManager::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (AgentId, Agent.LogId), (Msg, msg), (TabletId, queryBlockContext.TabletId)); auto& block = Blocks[queryBlockContext.TabletId]; Y_VERIFY(block.RefreshId); @@ -95,7 +95,7 @@ namespace NKikimr::NBlobDepot { for (const auto& tablet : tablets) { if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) { auto& block = it->second; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA37, "OnBlockedTablets", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA37, "OnBlockedTablets", (AgentId, Agent.LogId), (TabletId, it->first), (RefreshId, block.RefreshId), (BlockedGeneration, tablet.GetBlockedGeneration()), (IssuerGuid, tablet.GetIssuerGuid())); block.BlockedGeneration = tablet.GetBlockedGeneration(); diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index ea5ea51235..a678898486 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (AgentId, LogId), (TabletId, msg.TabletId), (Status, msg.Status), (ClientId, msg.ClientId), (ServerId, msg.ServerId)); Y_VERIFY_DEBUG_S(msg.Status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(msg.Status)); if (msg.Status != NKikimrProto::OK) { @@ -17,7 +17,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { auto& msg = *ev->Get(); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (AgentId, LogId), (ClientId, msg.ClientId), (ServerId, msg.ServerId)); PipeId = PipeServerId = {}; OnDisconnect(); @@ -29,14 +29,14 @@ namespace NKikimr::NBlobDepot { PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); NextTabletRequestId = 1; const ui64 id = NextTabletRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id)); NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id); RegisterRequest(id, this, nullptr, {}, true); } void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (AgentId, LogId), (Msg, msg)); BlobDepotGeneration = msg.GetGeneration(); DecommitGroupId = msg.HasDecommitGroupId() ? std::make_optional(msg.GetDecommitGroupId()) : std::nullopt; @@ -68,7 +68,7 @@ namespace NKikimr::NBlobDepot { } for (const NKikimrBlobDepot::TChannelKind::E kind : vanishedKinds) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind)); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (AgentId, LogId), (Kind, kind)); ChannelKinds.erase(kind); } @@ -94,7 +94,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) { if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) { const ui64 id = NextTabletRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (AgentId, LogId), (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GetNumAvailableItems()), (RequestId, id)); @@ -121,7 +121,7 @@ namespace NKikimr::NBlobDepot { kind.IssueGivenIdRange(msg.GetGivenIdRange()); } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), (Msg, msg), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (AgentId, LogId), (Msg, msg), (NumAvailableItems, kind.GetNumAvailableItems())); } @@ -174,7 +174,7 @@ namespace NKikimr::NBlobDepot { ui64 TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) { const ui64 id = NextTabletRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA10, "Issue", (VirtualGroupId, VirtualGroupId), (RequestId, id), (Msg, ev->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA10, "Issue", (AgentId, LogId), (RequestId, id), (Msg, ev->ToString())); NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); RegisterRequest(id, sender, std::move(context), {}, true); return id; @@ -182,7 +182,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) { auto& msg = ev->Get()->Record; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (VirtualGroupId, VirtualGroupId), (Msg, msg), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (AgentId, LogId), (Msg, msg), (Id, ev->Cookie), (Sender, ev->Sender), (PipeServerId, PipeServerId), (Match, ev->Sender == PipeServerId)); if (ev->Sender != PipeServerId) { return; // race with previous connection @@ -209,7 +209,7 @@ namespace NKikimr::NBlobDepot { it->ToProto(response->Record.AddWritesInFlight()); } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "TrimChannel", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "TrimChannel", (AgentId, LogId), (Channel, int(channel)), (NumAvailableItemsBefore, numAvailableItemsBefore), (NumAvailableItemsAfter, kind.GetNumAvailableItems())); } @@ -223,7 +223,7 @@ namespace NKikimr::NBlobDepot { // it is essential to send response through the pipe -- otherwise we can break order with, for example, commits: // this message can outrun previously sent commit and lead to data loss - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA33, "sending TEvPushNotifyResult", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA33, "sending TEvPushNotifyResult", (AgentId, LogId), (RequestId, NextTabletRequestId)); NTabletPipe::SendData(SelfId(), PipeId, response.release(), NextTabletRequestId++); diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 5dd0f18364..2e366a0a6c 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -92,7 +92,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr<IEventHandle> ev) { TQuery *query = CreateQuery<0>(std::move(ev)); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (AgentId, LogId), (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { query->EndWithError(NKikimrProto::ERROR, "group is in error state"); @@ -145,14 +145,14 @@ namespace NKikimr::NBlobDepot { TBlobDepotAgent::TQuery::~TQuery() { if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) { STLOG(WatchdogPriority, BLOB_DEPOT_AGENT, BDA00, "query execution took too much time", - (VirtualGroupId, Agent.VirtualGroupId), (QueryId, GetQueryId()), (Duration, duration)); + (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Duration, duration)); } Agent.QueryWatchdogMap.erase(QueryWatchdogMapIter); } void TBlobDepotAgent::TQuery::CheckQueryExecutionTime(TMonotonic now) { const auto prio = std::exchange(WatchdogPriority, NLog::PRI_NOTICE); - STLOG(prio, BLOB_DEPOT_AGENT, BDA23, "query is still executing", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(prio, BLOB_DEPOT_AGENT, BDA23, "query is still executing", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Duration, now - StartTime)); auto nh = Agent.QueryWatchdogMap.extract(QueryWatchdogMapIter); nh.key() = now + WatchdogDuration; @@ -160,7 +160,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Status, status), (ErrorReason, errorReason), (Duration, TActivationContext::Monotonic() - StartTime)); @@ -181,7 +181,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); OnDestroy(true); diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 43ab0ff651..c4888b6d24 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -58,7 +58,7 @@ namespace NKikimr::NBlobDepot { if (end <= begin || blobId.BlobSize() < end) { error = "incorrect SubrangeBegin/SubrangeEnd pair"; - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId), + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, LogId), (TabletId, TabletId), (Values, FormatList(arg.Values))); return false; } @@ -90,7 +90,7 @@ namespace NKikimr::NBlobDepot { if (size) { error = "incorrect offset/size provided"; - STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId), + STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, LogId), (TabletId, TabletId), (Offset, arg.Offset), (Size, arg.Size), (Values, FormatList(arg.Values))); return false; } @@ -129,6 +129,19 @@ namespace NKikimr::NBlobDepot { return true; } + TString TBlobDepotAgent::GetValueChainId(const TResolvedValueChain& valueChain) { + constexpr ui8 separator = 7; + TString s; + for (auto it = valueChain.begin(); it != valueChain.end(); ++it) { + if (it != valueChain.begin()) { + s += separator; + } + const bool success = it->AppendToString(&s); + Y_VERIFY_DEBUG(success); + } + return s; + } + void TBlobDepotAgent::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) { auto& partContext = context->Obtain<TReadContext::TPartContext>(); auto& readContext = *partContext.Read; diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 0cff513036..dc830f5cec 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -72,7 +72,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleTabletResponse", (AgentId, LogId), (Id, ev->Cookie), (Type, TypeName<TEvent>()), (Sender, ev->Sender), (PipeServerId, PipeServerId), (Match, ev->Sender == PipeServerId)); if (ev->Sender == PipeServerId) { @@ -91,7 +91,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight); } diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 92b45b6b06..db1ea8f96b 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -16,6 +16,9 @@ namespace NKikimr::NBlobDepot { TString Buffer; ui32 BlockedGeneration = 0; + std::unordered_set<TString> ValueChainsWithNodata; + TString ValueChain; + public: using TBlobStorageQuery::TBlobStorageQuery; @@ -70,7 +73,7 @@ namespace NKikimr::NBlobDepot { } void OnUpdateBlock() override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "OnUpdateBlock", (AgentId, Agent.LogId), (QueryId, GetQueryId())); const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, Max<ui32>(), this, &BlockedGeneration); @@ -85,7 +88,7 @@ namespace NKikimr::NBlobDepot { } void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "HandleResolveResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Msg, msg.Record)); Agent.BlobMappingCache.HandleResolveResult(id, msg.Record, nullptr); @@ -102,8 +105,12 @@ namespace NKikimr::NBlobDepot { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to resolve blob# " << Id << ": " << item.GetErrorReason()); } - Y_VERIFY(item.ValueChainSize() == 1); if (Request.ReadBody) { + if (!item.ValueChainSize()) { + // FIXME(alexvru): hypothetically this can be considered normal and we may continue scan + return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "empty ValueChain"); + } + ValueChain = GetValueChainId(item.GetValueChain()); TReadArg arg{ item.GetValueChain(), NKikimrBlobStorage::Discover, @@ -135,7 +142,7 @@ namespace NKikimr::NBlobDepot { } void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Status, status)); if (status == NKikimrProto::OK) { @@ -143,9 +150,15 @@ namespace NKikimr::NBlobDepot { DoneWithData = true; CheckIfDone(); } else if (status == NKikimrProto::NODATA) { - // this may indicate a data race between locator and key value, we have to restart our resolution query - IssueResolve(); - // FIXME: infinite cycle? + if (ValueChainsWithNodata.insert(std::exchange(ValueChain, {})).second) { + // this may indicate a data race between locator and key value, we have to restart our resolution query + IssueResolve(); + } else { + Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << Id); + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA39, "failed to Discover blob -- data is lost", + (AgentId, Agent.LogId), (BlobId, Id)); + status = NKikimrProto::ERROR; + } } else { EndWithError(status, dataOrErrorReason); } diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 07eb534735..a923a5ec53 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -4,14 +4,13 @@ namespace NKikimr::NBlobDepot { - static constexpr ui32 MaxNodataTryCount = 3; - template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) { class TGetQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGet> { std::unique_ptr<TEvBlobStorage::TEvGetResult> Response; ui32 AnswersRemain; - std::unordered_map<ui32, ui32> RetryCount; + std::vector<TString> ValueChainsInFlight; + std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata; struct TResolveKeyContext : TRequestContext { ui32 QueryIdx; @@ -36,6 +35,7 @@ namespace NKikimr::NBlobDepot { Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize, Agent.VirtualGroupId); AnswersRemain = Request.QuerySize; + ValueChainsInFlight.resize(Request.QuerySize); if (Request.ReaderTabletData) { auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr); @@ -57,10 +57,10 @@ namespace NKikimr::NBlobDepot { if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, std::make_shared<TResolveKeyContext>(i))) { if (!ProcessSingleResult(i, value, std::nullopt)) { - return; + return; // error occured } } else { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA29, "resolve pending", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA29, "resolve pending", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id)); } } @@ -69,7 +69,7 @@ namespace NKikimr::NBlobDepot { } bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason)); auto& r = Response->Responses[queryIdx]; @@ -77,13 +77,14 @@ namespace NKikimr::NBlobDepot { if (errorReason) { r.Status = NKikimrProto::ERROR; --AnswersRemain; - } else if (!value) { + } else if (!value || value->empty()) { r.Status = NKikimrProto::NODATA; --AnswersRemain; } else if (Request.IsIndexOnly) { r.Status = NKikimrProto::OK; --AnswersRemain; } else { + ValueChainsInFlight[queryIdx] = GetValueChainId(*value); TReadArg arg{ *value, Request.GetHandleClass, @@ -110,7 +111,7 @@ namespace NKikimr::NBlobDepot { str << ']'; return str.Str(); }; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (Offset, arg.Offset), (Size, arg.Size), (ValueChain, makeValueChain()), (Tag, arg.Tag)); const bool success = Agent.IssueRead(arg, error); if (!success) { @@ -122,20 +123,13 @@ namespace NKikimr::NBlobDepot { } void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId), (Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0), (ErrorReason, status != NKikimrProto::OK ? buffer : "")); if (status == NKikimrProto::NODATA) { // we have to retry this read, this may be a race between blob movement const auto& q = Request.Queries[tag]; - - if (++RetryCount[tag] == MaxNodataTryCount) { - STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA39, "NODATA retry count exceeded for blob -- it may be lost", - (VirtualGroupId, Agent.VirtualGroupId), - (BlobId, q.Id), - (Status, status)); - status = NKikimrProto::ERROR; - } else { + if (ValueChainsWithNodata.emplace(tag, std::exchange(ValueChainsInFlight[tag], {})).second) { // real race const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey( q.Id.AsBinaryString(), this, @@ -143,6 +137,11 @@ namespace NKikimr::NBlobDepot { true); Y_VERIFY(!value); return; + } else { + Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << q.Id); + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA41, "failed to Get blob -- data is lost", + (AgentId, Agent.LogId), (BlobId, q.Id)); + status = NKikimrProto::ERROR; } } @@ -202,7 +201,7 @@ namespace NKikimr::NBlobDepot { Agent.HandleGetResult(context, **p); } else if (std::holds_alternative<TTabletDisconnected>(response)) { if (auto *resolveContext = dynamic_cast<TResolveKeyContext*>(context.get())) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "TTabletDisconnected", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "TTabletDisconnected", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (QueryIdx, resolveContext->QueryIdx)); Response->Responses[resolveContext->QueryIdx].Status = NKikimrProto::ERROR; --AnswersRemain; diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index cf65a13cbe..8185b5d568 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -80,7 +80,7 @@ namespace NKikimr::NBlobDepot { auto& kind = it->second; std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, Request.Id)); if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); @@ -157,7 +157,7 @@ namespace NKikimr::NBlobDepot { item->ClearUncertainWrite(); } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA30, "IssueCommitBlobSeq", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA30, "IssueCommitBlobSeq", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (UncertainWrite, uncertainWrite), (Msg, CommitBlobSeq)); Agent.Issue(CommitBlobSeq, this, nullptr); @@ -167,7 +167,7 @@ namespace NKikimr::NBlobDepot { } void RemoveBlobSeqFromInFlight() { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA32, "RemoveBlobSeqFromInFlight", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA32, "RemoveBlobSeqFromInFlight", (AgentId, Agent.LogId), (QueryId, GetQueryId())); Y_VERIFY(IsInFlight); @@ -205,7 +205,7 @@ namespace NKikimr::NBlobDepot { } void HandlePutResult(TRequestContext::TPtr /*context*/, TEvBlobStorage::TEvPutResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA22, "TEvPutResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA22, "TEvPutResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Msg, msg)); BDEV_QUERY(BDEV11, "TEvPut_resultFromProxy", (BlobId, msg.Id), (Status, msg.Status), @@ -235,7 +235,7 @@ namespace NKikimr::NBlobDepot { } void HandleCommitBlobSeqResult(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvCommitBlobSeqResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA31, "TEvCommitBlobSeqResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA31, "TEvCommitBlobSeqResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Msg, msg)); Y_VERIFY(WaitingForCommitBlobSeq); diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 1c113adf25..4c958f3367 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -5,11 +5,17 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { class TRangeQuery : public TBlobStorageQuery<TEvBlobStorage::TEvRange> { + struct TRead { + TLogoBlobID Id; + TString ValueChain; + }; + std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response; ui32 ReadsInFlight = 0; ui32 ResolvesInFlight = 0; std::map<TLogoBlobID, TString> FoundBlobs; - std::vector<TLogoBlobID> Reads; + std::vector<TRead> Reads; + std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata; // processed value chains with this status bool Reverse = false; bool Finished = false; @@ -52,7 +58,7 @@ namespace NKikimr::NBlobDepot { void IssueResolve(ui64 tag) { NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); - item->SetExactKey(Reads[tag].AsBinaryString()); + item->SetExactKey(Reads[tag].Id.AsBinaryString()); item->SetTabletId(Request.TabletId); item->SetMustRestoreFirst(Request.MustRestoreFirst); item->SetCookie(tag); @@ -80,28 +86,23 @@ namespace NKikimr::NBlobDepot { return EndWithError(msg.GetStatus(), msg.GetErrorReason()); } -#ifndef NDEBUG - THashSet<TLogoBlobID> ids; -#endif - for (const auto& key : msg.GetResolvedKeys()) { const TString& blobId = key.GetKey(); auto id = TLogoBlobID::FromBinary(blobId); -#ifndef NDEBUG - const bool inserted = ids.insert(id).second; - Y_VERIFY(inserted); -#endif - if (key.HasErrorReason()) { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to resolve blob# " << id << ": " << key.GetErrorReason()); - } else if (!Request.IsIndexOnly) { + } else if (Request.IsIndexOnly) { + FoundBlobs.try_emplace(id); + } else if (key.ValueChainSize()) { const ui64 tag = key.HasCookie() ? key.GetCookie() : Reads.size(); + TString valueChain = GetValueChainId(key.GetValueChain()); if (tag == Reads.size()) { - Reads.push_back(id); + Reads.push_back(TRead{id, std::move(valueChain)}); } else { - Y_VERIFY(Reads[tag] == id); + Y_VERIFY(Reads[tag].Id == id); + Reads[tag].ValueChain = std::move(valueChain); } TReadArg arg{ key.GetValueChain(), @@ -118,8 +119,6 @@ namespace NKikimr::NBlobDepot { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " << error); } - } else { - FoundBlobs.try_emplace(id); } } @@ -133,20 +132,33 @@ namespace NKikimr::NBlobDepot { void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { --ReadsInFlight; + Y_VERIFY(tag < Reads.size()); + TRead& read = Reads[tag]; + Y_VERIFY(read.ValueChain); + switch (status) { case NKikimrProto::OK: { - const bool inserted = FoundBlobs.try_emplace(Reads[tag], std::move(dataOrErrorReason)).second; + Y_VERIFY(dataOrErrorReason.size() == read.Id.BlobSize()); + const bool inserted = FoundBlobs.try_emplace(read.Id, std::move(dataOrErrorReason)).second; Y_VERIFY(inserted); break; } case NKikimrProto::NODATA: - IssueResolve(tag); + if (ValueChainsWithNodata.emplace(tag, std::exchange(read.ValueChain, {})).second) { // real race + IssueResolve(tag); + } else { + Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << read.Id); + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to ReadRange blob -- data is lost", + (AgentId, Agent.LogId), (BlobId, read.Id)); + return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# " + << read.Id << " data is lost"); + } break; default: return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# " - << Reads[tag] << " Error# " << dataOrErrorReason); + << read.Id << " Error# " << dataOrErrorReason); } CheckAndFinish(); @@ -155,12 +167,8 @@ namespace NKikimr::NBlobDepot { void CheckAndFinish() { if (!ReadsInFlight && !ResolvesInFlight && !Finished) { for (auto& [id, buffer] : FoundBlobs) { - if (!Request.IsIndexOnly) { - Y_VERIFY_S(buffer.size() == id.BlobSize(), "Id# " << id << " Buffer.size# " << buffer.size()); - } - if (buffer || Request.IsIndexOnly) { - Response->Responses.emplace_back(id, std::move(buffer)); - } + Y_VERIFY_S(buffer.size() == Request.IsIndexOnly ? 0 : id.BlobSize(), "Id# " << id << " Buffer.size# " << buffer.size()); + Response->Responses.emplace_back(id, std::move(buffer)); } if (Reverse) { std::reverse(Response->Responses.begin(), Response->Responses.end()); diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index a5ee2c14ba..001d5890eb 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -121,7 +121,7 @@ namespace NKikimr::NBlobDepot { // skip the origin agent continue; } - if (info.ExpirationTimestamp <= now) { // includes case when agent is connected right now + if (now < info.ExpirationTimestamp) { // includes case when agent is connected right now TAgent& agent = Self->GetAgent(agentId); // enqueue push notification @@ -134,7 +134,6 @@ namespace NKikimr::NBlobDepot { // add node to wait list; also start timer to remove this node from the wait queue NodesWaitingForPushResult.insert(agentId); - Y_VERIFY(info.ExpirationTimestamp <= now); TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode, 0, SelfId(), {}, nullptr, agentId)); @@ -161,7 +160,7 @@ namespace NKikimr::NBlobDepot { if (NodesWaitingForPushResult.contains(agentId)) { const TMonotonic now = TActivationContext::Monotonic(); const auto& info = Self->BlocksManager->Blocks[TabletId].PerAgentInfo[agentId]; - if (info.ExpirationTimestamp <= now) { // node still can write data for this tablet, reschedule timer + if (now < info.ExpirationTimestamp) { // node still can write data for this tablet, reschedule timer TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode, 0, SelfId(), {}, nullptr, agentId)); } else { diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp index b123a57e2d..1c9d6663f4 100644 --- a/ydb/core/mind/bscontroller/config.cpp +++ b/ydb/core/mind/bscontroller/config.cpp @@ -295,7 +295,7 @@ namespace NKikimr::NBsController { continue; } auto& group = overlay->second; - if (base->second->Generation != group->Generation || group->MoodChanged) { + if ((base->second->Generation != group->Generation || group->MoodChanged) && group->VDisksInGroup) { // process only groups with changed content; create topology for group auto& topology = *group->Topology; // fill in vector of failed disks (that are not fully operational) diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index 65ca1b30a8..e848848d3d 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -528,7 +528,7 @@ public: const ui32 NumVDisksPerFailDomain = 0; // topology according to the geometry - const std::shared_ptr<TBlobStorageGroupInfo::TTopology> Topology; + std::shared_ptr<TBlobStorageGroupInfo::TTopology> Topology; struct TGroupStatus { // status derived from the actual state of VDisks (IsReady() to be exact) diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 63a00cba73..5e25b1d46b 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -528,6 +528,7 @@ namespace NKikimr::NBsController { group->VDisksInGroup.clear(); group->DecommitStatus = NKikimrBlobStorage::TGroupDecommitStatus::DONE; group->ContentChanged = true; + group->Topology = std::make_shared<TBlobStorageGroupInfo::TTopology>(group->Topology->GType, 0, 0, 0); } STLOG(PRI_INFO, BS_CONTROLLER, BSCVGxx, "decommission update processed", (Status, Status), |