diff options
author | alexvru <[email protected]> | 2023-09-28 17:38:09 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-09-28 18:15:01 +0300 |
commit | e0c2bf53d562d569da2da261bf16192f270a24b1 (patch) | |
tree | 06bfdd86554111deeb660d255d6028e3857cbf1a | |
parent | ff1b1bc5292f7765c9e6c084291ca72d82f6b551 (diff) |
Fix decommit machinery in BlobDepot KIKIMR-14867
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 50 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp | 37 |
6 files changed, 81 insertions, 30 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 5e59be28a0f..8deb40da82e 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -220,17 +220,10 @@ namespace NKikimr::NBlobDepot { if (!ReadyForAgentQueries()) { return; } - for (auto& [pipeServerId, info] : PipeServers) { - if (info.ProcessThroughQueue) { - if (info.PostponeQ.empty()) { - info.ProcessThroughQueue = false; - } else { - for (auto& ev : std::exchange(info.PostponeQ, {})) { - TActivationContext::Send(ev.release()); - } - TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessRegisterAgentQ, 0, SelfId(), {}, nullptr, 0)); - } + for (auto& ev : std::exchange(info.PostponeQ, {})) { + TActivationContext::Send(ev.release()); + ++info.InFlightDeliveries; } } } diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 9e5c29c97b8..bfa14780bf5 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -58,6 +58,7 @@ namespace NKikimr::NBlobDepot { } void IssueResolve() { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA49, "IssueResolve", (AgentId, Agent.LogId), (QueryId, GetQueryId())); Agent.Issue(Resolve, this, nullptr); } @@ -65,8 +66,12 @@ namespace NKikimr::NBlobDepot { if (std::holds_alternative<TTabletDisconnected>(response)) { return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA50, "TEvGetResult", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (Response, *p)); TQuery::HandleGetResult(context, **p); } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA51, "TEvResolveResult", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (Response, (*p)->Record)); if (context) { TQuery::HandleResolveResult(std::move(context), **p); } else { diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index c3bb44b7813..8caeb91f777 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -46,6 +46,27 @@ namespace NKikimr::NBlobDepot { STFUNC(TBlobDepot::StateWork) { try { + auto handleDelivery = [this](auto& ev) { + const auto it = PipeServers.find(ev->Recipient); + if (it == PipeServers.end()) { + return; + } + auto& info = it->second; + + Y_VERIFY(info.InFlightDeliveries); + --info.InFlightDeliveries; + + // return original event type + ev->Rewrite(ev->Type, ev->GetRecipientRewrite()); + + // ensure correct ordering of incoming messages + Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie + << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32, + ev->GetTypeRewrite()) << " Id# " << GetLogId()); + ++info.NextExpectedMsgId; + HandleFromAgent(ev); + }; + auto handleFromAgentPipe = [this](auto& ev) { const auto it = PipeServers.find(ev->Recipient); if (it == PipeServers.end()) { @@ -54,20 +75,21 @@ namespace NKikimr::NBlobDepot { auto& info = it->second; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie), - (Sender, ev->Sender), (PipeServerId, ev->Recipient), (ProcessThroughQueue, info.ProcessThroughQueue), - (NextExpectedMsgId, info.NextExpectedMsgId), (PostponeQ.size, info.PostponeQ.size())); + (Sender, ev->Sender), (PipeServerId, ev->Recipient), (NextExpectedMsgId, info.NextExpectedMsgId), + (PostponeQ.size, info.PostponeQ.size()), (InFlightDeliveries, info.InFlightDeliveries), + (ReadyForAgentQueries, ReadyForAgentQueries()), (Type, ev->Type)); - if (info.ProcessThroughQueue || !ReadyForAgentQueries()) { - info.PostponeQ.emplace_back(ev.Release()); - info.ProcessThroughQueue = true; - } else { - // ensure correct ordering of incoming messages - Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie - << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32, - ev->GetTypeRewrite()) << " Id# " << GetLogId()); - ++info.NextExpectedMsgId; + Y_VERIFY(ev->Type == ev->GetTypeRewrite()); + ev->Rewrite(TEvPrivate::EvDeliver, ev->GetRecipientRewrite()); - HandleFromAgent(ev); + if (!ReadyForAgentQueries()) { // we can't handle agent queries now -- enqueue this message + info.PostponeQ.emplace_back(ev.Release()); + } else if (!info.PostponeQ.empty()) { + Y_FAIL("PostponeQ can't be nonempty while agent is running"); + } else if (info.InFlightDeliveries++) { + TActivationContext::Send(ev.Release()); + } else { // handle event as delivery one + StateWork(ev); } }; @@ -84,9 +106,9 @@ namespace NKikimr::NBlobDepot { fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe); fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe); - hFunc(TEvBlobDepot::TEvPushMetrics, Handle); + fFunc(TEvPrivate::EvDeliver, handleDelivery); - cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ); + hFunc(TEvBlobDepot::TEvPushMetrics, Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle); diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 1c704700046..296872f990d 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -27,8 +27,8 @@ namespace NKikimr::NBlobDepot { EvCommitCertainKeys, EvDoGroupMetricsExchange, EvKickSpaceMonitor, - EvProcessRegisterAgentQ, EvUpdateThroughputs, + EvDeliver, }; }; @@ -78,7 +78,7 @@ namespace NKikimr::NBlobDepot { std::optional<ui32> NodeId; // as reported by RegisterAgent ui64 NextExpectedMsgId = 1; std::deque<std::unique_ptr<IEventHandle>> PostponeQ; - bool ProcessThroughQueue = false; + size_t InFlightDeliveries = 0; }; THashMap<TActorId, TPipeServerContext> PipeServers; diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 62e3d625e91..d6f4fc99247 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -124,7 +124,7 @@ namespace NKikimr::NBlobDepot { doNotKeep_.release(); record.CollectGarbageRequestInFlight = true; - record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; + record.PerGenerationCounter += ev->Collect; record.TrashInFlight.swap(trashInFlight); record.IssuedGenStep = nextGenStep; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp index 3e76e63db80..240ebc258ec 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp @@ -279,19 +279,31 @@ public: } void Bootstrap() { - Become(&TThis::StateWork); + A_LOG_INFO_S("BPA01", "bootstrap" + << " ActorId# " << SelfId() + << " Group# " << Info->GroupID + << " RestartCounter# " << RestartCounter); + + Become(&TThis::StateWork, TDuration::Seconds(10), new TEvents::TEvWakeup); for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) { Request(i); } } + void HandleWakeup() { + A_LOG_NOTICE_S("BPA25", "assimilation is way too long"); + } + STATEFN(StateWork) { if (ProcessEvent(ev)) { return; } switch (ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); + default: + Y_VERIFY_DEBUG(false); } } @@ -306,19 +318,34 @@ public: maxOpt(SkipBarriersUpTo, info.LastProcessedBarrier), maxOpt(SkipBlobsUpTo, info.LastProcessedBlob)), 0); + A_LOG_DEBUG_S("BPA03", "Request orderNumber# " << orderNumber << " VDiskId# " << Info->GetVDiskId(orderNumber)); + ++RequestsInFlight; } void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) { - --RequestsInFlight; + ProcessReplyFromQueue(ev); const auto& record = ev->Get()->Record; const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); const ui32 orderNumber = Info->GetTopology().GetOrderNumber(vdiskId); Y_VERIFY(orderNumber < PerVDiskInfo.size()); + A_LOG_DEBUG_S("BPA02", "Handle TEvVAssimilateResult" + << " Status# " << NKikimrProto::EReplyStatus_Name(record.GetStatus()) + << " ErrorReason# '" << record.GetErrorReason() << "'" + << " VDiskId# " << vdiskId + << " Blocks# " << record.BlocksSize() + << " Barriers# " << record.BarriersSize() + << " Blobs# " << record.BlobsSize() + << " RequestsInFlight# " << RequestsInFlight); + + Y_VERIFY(RequestsInFlight); + --RequestsInFlight; + if (record.GetStatus() == NKikimrProto::OK) { auto& info = PerVDiskInfo[orderNumber]; + Y_VERIFY(!info.HasItemsToMerge()); info.PushDataFromMessage(record, *this, Info->Type); if (info.HasItemsToMerge()) { Heap.push_back(&info); @@ -334,7 +361,7 @@ public: } void Merge() { - std::vector<ui32> requests; + TStackVec<ui8, 32> requests; const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology(); TBlobStorageGroupInfo::TGroupVDisks disksWithData(top); @@ -350,12 +377,14 @@ public: ReplyAndDie(NKikimrProto::ERROR); } else { // answer with what we have already collected + A_LOG_DEBUG_S("BPA06", "SendResponseAndDie (no items to merge)"); SendResponseAndDie(std::move(Result)); } return; } while (requests.empty()) { if (Heap.empty()) { + A_LOG_DEBUG_S("BPA07", "SendResponseAndDie (heap empty)"); SendResponseAndDie(std::move(Result)); return; } @@ -398,6 +427,7 @@ public: } if (Result->Blocks.size() + Result->Barriers.size() + Result->Blobs.size() >= 10'000) { + A_LOG_DEBUG_S("BPA05", "SendResponseAndDie (10k)"); SendResponseAndDie(std::move(Result)); } else { for (const ui32 orderNumber : requests) { @@ -414,6 +444,7 @@ public: } void ReplyAndDie(NKikimrProto::EReplyStatus status) { + A_LOG_DEBUG_S("BPA04", "ReplyAndDie status# " << NKikimrProto::EReplyStatus_Name(status)); SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason)); } }; |