aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-10-02 18:52:24 +0300
committeralexvru <alexvru@ydb.tech>2023-10-02 19:27:45 +0300
commitde63c80b75948ecc13894854514d147840ff8430 (patch)
tree6a87c0f3364aa10cfafbe61ac7923a5ff3bc50e8
parent3244b37d549dcd72f6f588c1aa01406f33ad1dc2 (diff)
downloadydb-de63c80b75948ecc13894854514d147840ff8430.tar.gz
BlobDepot decommit fixes KIKIMR-14867
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp6
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp4
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp39
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp19
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp9
5 files changed, 59 insertions, 18 deletions
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 843bb4a0ba..54b89c4996 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -29,15 +29,13 @@ namespace NKikimr::NBlobDepot {
PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
NextTabletRequestId = 1;
const ui64 id = NextTabletRequestId++;
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId),
- (PipeId, PipeId), (RequestId, id));
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id));
NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id);
RegisterRequest(id, this, nullptr, {}, true);
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (AgentId, LogId),
- (Msg, msg));
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (AgentId, LogId), (Msg, msg));
BlobDepotGeneration = msg.GetGeneration();
DecommitGroupId = msg.HasDecommitGroupId() ? std::make_optional(msg.GetDecommitGroupId()) : std::nullopt;
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 8caeb91f77..ec40c696c9 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -49,6 +49,8 @@ namespace NKikimr::NBlobDepot {
auto handleDelivery = [this](auto& ev) {
const auto it = PipeServers.find(ev->Recipient);
if (it == PipeServers.end()) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "HandleDelivery dropped", (Id, GetLogId()),
+ (RequestId, ev->Cookie), (Sender, ev->Sender), (PipeServerId, ev->Recipient), (Type, ev->Type));
return;
}
auto& info = it->second;
@@ -70,6 +72,8 @@ namespace NKikimr::NBlobDepot {
auto handleFromAgentPipe = [this](auto& ev) {
const auto it = PipeServers.find(ev->Recipient);
if (it == PipeServers.end()) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT23, "HandleFromAgentPipe dropped", (Id, GetLogId()),
+ (RequestId, ev->Cookie), (Sender, ev->Sender), (PipeServerId, ev->Recipient), (Type, ev->Type));
return; // this may be a race with TEvServerDisconnected and postpone queue; it's okay to have this
}
auto& info = it->second;
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index 991510217b..93d8baf06e 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -106,8 +106,9 @@ namespace NKikimr::NBlobDepot {
TCoroTx::RestartTx();
}
const TValue *value = Self->Data->FindKey(key);
- const bool doGet = (!value && Self->Data->LastAssimilatedBlobId < key.GetBlobId()) // value not yet assimilated
- || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet
+ const bool notYetAssimilated = Self->Data->LastAssimilatedBlobId < key.GetBlobId();
+ const bool doGet = !value ? notYetAssimilated :
+ value->GoingToAssimilate ? item.GetMustRestoreFirst() : notYetAssimilated;
if (doGet) {
IssueGet(key.GetBlobId(), item.GetMustRestoreFirst());
}
@@ -161,12 +162,14 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
auto& msg = *ev->Get();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvRangeResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
- (Cookie, Ev->Cookie), (Msg, msg));
+ (Cookie, Ev->Cookie), (Msg, msg), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight),
+ (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size()));
if (msg.Status == NKikimrProto::OK) {
for (const auto& r : msg.Responses) {
if (ev->Cookie) {
- if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate) {
+ if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate ||
+ Self->Data->LastAssimilatedBlobId < r.Id) {
IssueGet(r.Id, true /*mustRestoreFirst*/);
}
} else {
@@ -174,9 +177,15 @@ namespace NKikimr::NBlobDepot {
}
}
} else {
- return FinishWithError(NLog::PRI_NOTICE, TStringBuilder() << "TEvRange query failed: " << msg.ErrorReason);
+ TStringBuilder err;
+ err << "TEvRange query failed: " << NKikimrProto::EReplyStatus_Name(msg.Status);
+ if (msg.ErrorReason) {
+ err << " (" << msg.ErrorReason << ')';
+ }
+ return FinishWithError(NLog::PRI_NOTICE, err);
}
+ Y_VERIFY(RangesInFlight);
--RangesInFlight;
CheckIfDone();
}
@@ -215,7 +224,8 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
auto& msg = *ev->Get();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT87, "TEvGetResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
- (Cookie, Ev->Cookie), (Msg, msg));
+ (Cookie, Ev->Cookie), (Msg, msg), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight),
+ (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size()));
for (ui32 i = 0; i < msg.ResponseSz; ++i) {
auto& r = msg.Responses[i];
@@ -235,6 +245,8 @@ namespace NKikimr::NBlobDepot {
}
}
+ Y_VERIFY(GetsInFlight);
+ Y_VERIFY(GetBytesInFlight >= ev->Cookie);
--GetsInFlight;
GetBytesInFlight -= ev->Cookie;
@@ -263,8 +275,6 @@ namespace NKikimr::NBlobDepot {
++PutsInFlight;
} else { // we couldn't restore this blob -- there was no place to write it to
ResolutionErrors.insert(key.GetBlobId());
- ++PutsInFlight;
- HandleTxComplete();
}
}
@@ -280,17 +290,28 @@ namespace NKikimr::NBlobDepot {
const bool doNotKeep = ev->Cookie >> 1 & 1;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "got TEvPutResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
- (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep));
+ (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep),
+ (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight), (TxInFlight, TxInFlight),
+ (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size()));
if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item
ResolutionErrors.insert(key.GetBlobId());
}
+ Y_VERIFY(PutsInFlight);
+ --PutsInFlight;
+
Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key),
TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep);
+ ++TxInFlight;
}
void HandleTxComplete() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "HandleTxComplete", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight),
+ (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size()));
+
+ Y_VERIFY(TxInFlight);
--TxInFlight;
CheckIfDone();
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
index 240ebc258e..cc93c11c78 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
@@ -104,6 +104,8 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor<
using TItemVariant = std::variant<TBlock, TBarrier, TBlob>;
struct TPerVDiskInfo {
+ std::optional<TString> ErrorReason;
+
std::optional<ui64> LastProcessedBlock;
std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier;
std::optional<TLogoBlobID> LastProcessedBlob;
@@ -343,9 +345,9 @@ public:
Y_VERIFY(RequestsInFlight);
--RequestsInFlight;
+ auto& info = PerVDiskInfo[orderNumber];
+ Y_VERIFY(!info.HasItemsToMerge());
if (record.GetStatus() == NKikimrProto::OK) {
- auto& info = PerVDiskInfo[orderNumber];
- Y_VERIFY(!info.HasItemsToMerge());
info.PushDataFromMessage(record, *this, Info->Type);
if (info.HasItemsToMerge()) {
Heap.push_back(&info);
@@ -353,6 +355,11 @@ public:
} else if (!info.Finished()) {
Request(orderNumber);
}
+ } else {
+ info.ErrorReason = TStringBuilder() << vdiskId << ": " << NKikimrProto::EReplyStatus_Name(record.GetStatus());
+ if (record.GetErrorReason()) {
+ *info.ErrorReason += " (" + record.GetErrorReason() + ')';
+ }
}
if (!RequestsInFlight) {
@@ -445,6 +452,14 @@ public:
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
A_LOG_DEBUG_S("BPA04", "ReplyAndDie status# " << NKikimrProto::EReplyStatus_Name(status));
+ for (const auto& item : PerVDiskInfo) {
+ if (item.ErrorReason) {
+ if (ErrorReason) {
+ ErrorReason += ", ";
+ }
+ ErrorReason += *item.ErrorReason;
+ }
+ }
SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason));
}
};
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index b5c0d75ff9..bbfe6c5128 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -241,9 +241,11 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
}
Y_VERIFY(query == queries.Get() + queryCount);
- // register query in wilson and send it to DS proxy
+ // register query in wilson and send it to DS proxy; issue non-index query when MustRestoreFirst is false to
+ // prevent IndexRestoreGet invocation
auto get = std::make_unique<TEvBlobStorage::TEvGet>(queries, queryCount, Deadline,
- NKikimrBlobStorage::EGetHandleClass::FastRead, MustRestoreFirst, IsIndexOnly, TEvBlobStorage::TEvGet::TForceBlockTabletData(TabletId, ForceBlockedGeneration));
+ NKikimrBlobStorage::EGetHandleClass::FastRead, MustRestoreFirst, MustRestoreFirst ? IsIndexOnly : false,
+ TEvBlobStorage::TEvGet::TForceBlockTabletData(TabletId, ForceBlockedGeneration));
get->IsInternal = true;
get->Decommission = Decommission;
@@ -284,7 +286,8 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
Y_VERIFY(response.Id == BlobsToGet[i].BlobId);
if (getResult.Responses[i].Status == NKikimrProto::OK) {
- result->Responses.emplace_back(response.Id, response.Buffer.ConvertToString(), response.Keep, response.DoNotKeep);
+ result->Responses.emplace_back(response.Id, IsIndexOnly ? TString() : response.Buffer.ConvertToString(),
+ response.Keep, response.DoNotKeep);
} else if (getResult.Responses[i].Status != NKikimrProto::NODATA || BlobsToGet[i].RequiredToBePresent) {
// it's okay to get NODATA if blob wasn't confirmed -- this blob is simply thrown out of resulting
// set; otherwise we return error about lost data