summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-09-28 17:38:09 +0300
committeralexvru <[email protected]>2023-09-28 18:15:01 +0300
commite0c2bf53d562d569da2da261bf16192f270a24b1 (patch)
tree06bfdd86554111deeb660d255d6028e3857cbf1a
parentff1b1bc5292f7765c9e6c084291ca72d82f6b551 (diff)
Fix decommit machinery in BlobDepot KIKIMR-14867
-rw-r--r--ydb/core/blob_depot/agent.cpp13
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp5
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp50
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h4
-rw-r--r--ydb/core/blob_depot/data_trash.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp37
6 files changed, 81 insertions, 30 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 5e59be28a0f..8deb40da82e 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -220,17 +220,10 @@ namespace NKikimr::NBlobDepot {
if (!ReadyForAgentQueries()) {
return;
}
-
for (auto& [pipeServerId, info] : PipeServers) {
- if (info.ProcessThroughQueue) {
- if (info.PostponeQ.empty()) {
- info.ProcessThroughQueue = false;
- } else {
- for (auto& ev : std::exchange(info.PostponeQ, {})) {
- TActivationContext::Send(ev.release());
- }
- TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessRegisterAgentQ, 0, SelfId(), {}, nullptr, 0));
- }
+ for (auto& ev : std::exchange(info.PostponeQ, {})) {
+ TActivationContext::Send(ev.release());
+ ++info.InFlightDeliveries;
}
}
}
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 9e5c29c97b8..bfa14780bf5 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -58,6 +58,7 @@ namespace NKikimr::NBlobDepot {
}
void IssueResolve() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA49, "IssueResolve", (AgentId, Agent.LogId), (QueryId, GetQueryId()));
Agent.Issue(Resolve, this, nullptr);
}
@@ -65,8 +66,12 @@ namespace NKikimr::NBlobDepot {
if (std::holds_alternative<TTabletDisconnected>(response)) {
return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
} else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA50, "TEvGetResult", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (Response, *p));
TQuery::HandleGetResult(context, **p);
} else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA51, "TEvResolveResult", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (Response, (*p)->Record));
if (context) {
TQuery::HandleResolveResult(std::move(context), **p);
} else {
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index c3bb44b7813..8caeb91f777 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -46,6 +46,27 @@ namespace NKikimr::NBlobDepot {
STFUNC(TBlobDepot::StateWork) {
try {
+ auto handleDelivery = [this](auto& ev) {
+ const auto it = PipeServers.find(ev->Recipient);
+ if (it == PipeServers.end()) {
+ return;
+ }
+ auto& info = it->second;
+
+ Y_VERIFY(info.InFlightDeliveries);
+ --info.InFlightDeliveries;
+
+ // return original event type
+ ev->Rewrite(ev->Type, ev->GetRecipientRewrite());
+
+ // ensure correct ordering of incoming messages
+ Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie
+ << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32,
+ ev->GetTypeRewrite()) << " Id# " << GetLogId());
+ ++info.NextExpectedMsgId;
+ HandleFromAgent(ev);
+ };
+
auto handleFromAgentPipe = [this](auto& ev) {
const auto it = PipeServers.find(ev->Recipient);
if (it == PipeServers.end()) {
@@ -54,20 +75,21 @@ namespace NKikimr::NBlobDepot {
auto& info = it->second;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie),
- (Sender, ev->Sender), (PipeServerId, ev->Recipient), (ProcessThroughQueue, info.ProcessThroughQueue),
- (NextExpectedMsgId, info.NextExpectedMsgId), (PostponeQ.size, info.PostponeQ.size()));
+ (Sender, ev->Sender), (PipeServerId, ev->Recipient), (NextExpectedMsgId, info.NextExpectedMsgId),
+ (PostponeQ.size, info.PostponeQ.size()), (InFlightDeliveries, info.InFlightDeliveries),
+ (ReadyForAgentQueries, ReadyForAgentQueries()), (Type, ev->Type));
- if (info.ProcessThroughQueue || !ReadyForAgentQueries()) {
- info.PostponeQ.emplace_back(ev.Release());
- info.ProcessThroughQueue = true;
- } else {
- // ensure correct ordering of incoming messages
- Y_VERIFY_S(ev->Cookie == info.NextExpectedMsgId, "message reordering detected Cookie# " << ev->Cookie
- << " NextExpectedMsgId# " << info.NextExpectedMsgId << " Type# " << Sprintf("%08" PRIx32,
- ev->GetTypeRewrite()) << " Id# " << GetLogId());
- ++info.NextExpectedMsgId;
+ Y_VERIFY(ev->Type == ev->GetTypeRewrite());
+ ev->Rewrite(TEvPrivate::EvDeliver, ev->GetRecipientRewrite());
- HandleFromAgent(ev);
+ if (!ReadyForAgentQueries()) { // we can't handle agent queries now -- enqueue this message
+ info.PostponeQ.emplace_back(ev.Release());
+ } else if (!info.PostponeQ.empty()) {
+ Y_FAIL("PostponeQ can't be nonempty while agent is running");
+ } else if (info.InFlightDeliveries++) {
+ TActivationContext::Send(ev.Release());
+ } else { // handle event as delivery one
+ StateWork(ev);
}
};
@@ -84,9 +106,9 @@ namespace NKikimr::NBlobDepot {
fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe);
- hFunc(TEvBlobDepot::TEvPushMetrics, Handle);
+ fFunc(TEvPrivate::EvDeliver, handleDelivery);
- cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ);
+ hFunc(TEvBlobDepot::TEvPushMetrics, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle);
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 1c704700046..296872f990d 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -27,8 +27,8 @@ namespace NKikimr::NBlobDepot {
EvCommitCertainKeys,
EvDoGroupMetricsExchange,
EvKickSpaceMonitor,
- EvProcessRegisterAgentQ,
EvUpdateThroughputs,
+ EvDeliver,
};
};
@@ -78,7 +78,7 @@ namespace NKikimr::NBlobDepot {
std::optional<ui32> NodeId; // as reported by RegisterAgent
ui64 NextExpectedMsgId = 1;
std::deque<std::unique_ptr<IEventHandle>> PostponeQ;
- bool ProcessThroughQueue = false;
+ size_t InFlightDeliveries = 0;
};
THashMap<TActorId, TPipeServerContext> PipeServers;
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index 62e3d625e91..d6f4fc99247 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -124,7 +124,7 @@ namespace NKikimr::NBlobDepot {
doNotKeep_.release();
record.CollectGarbageRequestInFlight = true;
- record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
+ record.PerGenerationCounter += ev->Collect;
record.TrashInFlight.swap(trashInFlight);
record.IssuedGenStep = nextGenStep;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
index 3e76e63db80..240ebc258ec 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
@@ -279,19 +279,31 @@ public:
}
void Bootstrap() {
- Become(&TThis::StateWork);
+ A_LOG_INFO_S("BPA01", "bootstrap"
+ << " ActorId# " << SelfId()
+ << " Group# " << Info->GroupID
+ << " RestartCounter# " << RestartCounter);
+
+ Become(&TThis::StateWork, TDuration::Seconds(10), new TEvents::TEvWakeup);
for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) {
Request(i);
}
}
+ void HandleWakeup() {
+ A_LOG_NOTICE_S("BPA25", "assimilation is way too long");
+ }
+
STATEFN(StateWork) {
if (ProcessEvent(ev)) {
return;
}
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ default:
+ Y_VERIFY_DEBUG(false);
}
}
@@ -306,19 +318,34 @@ public:
maxOpt(SkipBarriersUpTo, info.LastProcessedBarrier),
maxOpt(SkipBlobsUpTo, info.LastProcessedBlob)), 0);
+ A_LOG_DEBUG_S("BPA03", "Request orderNumber# " << orderNumber << " VDiskId# " << Info->GetVDiskId(orderNumber));
+
++RequestsInFlight;
}
void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) {
- --RequestsInFlight;
+ ProcessReplyFromQueue(ev);
const auto& record = ev->Get()->Record;
const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
const ui32 orderNumber = Info->GetTopology().GetOrderNumber(vdiskId);
Y_VERIFY(orderNumber < PerVDiskInfo.size());
+ A_LOG_DEBUG_S("BPA02", "Handle TEvVAssimilateResult"
+ << " Status# " << NKikimrProto::EReplyStatus_Name(record.GetStatus())
+ << " ErrorReason# '" << record.GetErrorReason() << "'"
+ << " VDiskId# " << vdiskId
+ << " Blocks# " << record.BlocksSize()
+ << " Barriers# " << record.BarriersSize()
+ << " Blobs# " << record.BlobsSize()
+ << " RequestsInFlight# " << RequestsInFlight);
+
+ Y_VERIFY(RequestsInFlight);
+ --RequestsInFlight;
+
if (record.GetStatus() == NKikimrProto::OK) {
auto& info = PerVDiskInfo[orderNumber];
+ Y_VERIFY(!info.HasItemsToMerge());
info.PushDataFromMessage(record, *this, Info->Type);
if (info.HasItemsToMerge()) {
Heap.push_back(&info);
@@ -334,7 +361,7 @@ public:
}
void Merge() {
- std::vector<ui32> requests;
+ TStackVec<ui8, 32> requests;
const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology();
TBlobStorageGroupInfo::TGroupVDisks disksWithData(top);
@@ -350,12 +377,14 @@ public:
ReplyAndDie(NKikimrProto::ERROR);
} else {
// answer with what we have already collected
+ A_LOG_DEBUG_S("BPA06", "SendResponseAndDie (no items to merge)");
SendResponseAndDie(std::move(Result));
}
return;
}
while (requests.empty()) {
if (Heap.empty()) {
+ A_LOG_DEBUG_S("BPA07", "SendResponseAndDie (heap empty)");
SendResponseAndDie(std::move(Result));
return;
}
@@ -398,6 +427,7 @@ public:
}
if (Result->Blocks.size() + Result->Barriers.size() + Result->Blobs.size() >= 10'000) {
+ A_LOG_DEBUG_S("BPA05", "SendResponseAndDie (10k)");
SendResponseAndDie(std::move(Result));
} else {
for (const ui32 orderNumber : requests) {
@@ -414,6 +444,7 @@ public:
}
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
+ A_LOG_DEBUG_S("BPA04", "ReplyAndDie status# " << NKikimrProto::EReplyStatus_Name(status));
SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason));
}
};