diff options
author | alexvru <alexvru@ydb.tech> | 2023-10-02 18:52:24 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-10-02 19:27:45 +0300 |
commit | de63c80b75948ecc13894854514d147840ff8430 (patch) | |
tree | 6a87c0f3364aa10cfafbe61ac7923a5ff3bc50e8 | |
parent | 3244b37d549dcd72f6f588c1aa01406f33ad1dc2 (diff) | |
download | ydb-de63c80b75948ecc13894854514d147840ff8430.tar.gz |
BlobDepot decommit fixes KIKIMR-14867
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_decommit.cpp | 39 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp | 19 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_range.cpp | 9 |
5 files changed, 59 insertions, 18 deletions
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 843bb4a0ba..54b89c4996 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -29,15 +29,13 @@ namespace NKikimr::NBlobDepot { PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); NextTabletRequestId = 1; const ui64 id = NextTabletRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), - (PipeId, PipeId), (RequestId, id)); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id)); NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id); RegisterRequest(id, this, nullptr, {}, true); } void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (AgentId, LogId), - (Msg, msg)); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (AgentId, LogId), (Msg, msg)); BlobDepotGeneration = msg.GetGeneration(); DecommitGroupId = msg.HasDecommitGroupId() ? std::make_optional(msg.GetDecommitGroupId()) : std::nullopt; diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 8caeb91f77..ec40c696c9 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -49,6 +49,8 @@ namespace NKikimr::NBlobDepot { auto handleDelivery = [this](auto& ev) { const auto it = PipeServers.find(ev->Recipient); if (it == PipeServers.end()) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "HandleDelivery dropped", (Id, GetLogId()), + (RequestId, ev->Cookie), (Sender, ev->Sender), (PipeServerId, ev->Recipient), (Type, ev->Type)); return; } auto& info = it->second; @@ -70,6 +72,8 @@ namespace NKikimr::NBlobDepot { auto handleFromAgentPipe = [this](auto& ev) { const auto it = PipeServers.find(ev->Recipient); if (it == PipeServers.end()) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT23, "HandleFromAgentPipe dropped", (Id, GetLogId()), + (RequestId, ev->Cookie), (Sender, ev->Sender), (PipeServerId, ev->Recipient), (Type, ev->Type)); return; // this may be a race with TEvServerDisconnected and postpone queue; it's okay to have this } auto& info = it->second; diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index 991510217b..93d8baf06e 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -106,8 +106,9 @@ namespace NKikimr::NBlobDepot { TCoroTx::RestartTx(); } const TValue *value = Self->Data->FindKey(key); - const bool doGet = (!value && Self->Data->LastAssimilatedBlobId < key.GetBlobId()) // value not yet assimilated - || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet + const bool notYetAssimilated = Self->Data->LastAssimilatedBlobId < key.GetBlobId(); + const bool doGet = !value ? notYetAssimilated : + value->GoingToAssimilate ? item.GetMustRestoreFirst() : notYetAssimilated; if (doGet) { IssueGet(key.GetBlobId(), item.GetMustRestoreFirst()); } @@ -161,12 +162,14 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { auto& msg = *ev->Get(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvRangeResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), - (Cookie, Ev->Cookie), (Msg, msg)); + (Cookie, Ev->Cookie), (Msg, msg), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight), + (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size())); if (msg.Status == NKikimrProto::OK) { for (const auto& r : msg.Responses) { if (ev->Cookie) { - if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate) { + if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate || + Self->Data->LastAssimilatedBlobId < r.Id) { IssueGet(r.Id, true /*mustRestoreFirst*/); } } else { @@ -174,9 +177,15 @@ namespace NKikimr::NBlobDepot { } } } else { - return FinishWithError(NLog::PRI_NOTICE, TStringBuilder() << "TEvRange query failed: " << msg.ErrorReason); + TStringBuilder err; + err << "TEvRange query failed: " << NKikimrProto::EReplyStatus_Name(msg.Status); + if (msg.ErrorReason) { + err << " (" << msg.ErrorReason << ')'; + } + return FinishWithError(NLog::PRI_NOTICE, err); } + Y_VERIFY(RangesInFlight); --RangesInFlight; CheckIfDone(); } @@ -215,7 +224,8 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { auto& msg = *ev->Get(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT87, "TEvGetResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), - (Cookie, Ev->Cookie), (Msg, msg)); + (Cookie, Ev->Cookie), (Msg, msg), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight), + (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size())); for (ui32 i = 0; i < msg.ResponseSz; ++i) { auto& r = msg.Responses[i]; @@ -235,6 +245,8 @@ namespace NKikimr::NBlobDepot { } } + Y_VERIFY(GetsInFlight); + Y_VERIFY(GetBytesInFlight >= ev->Cookie); --GetsInFlight; GetBytesInFlight -= ev->Cookie; @@ -263,8 +275,6 @@ namespace NKikimr::NBlobDepot { ++PutsInFlight; } else { // we couldn't restore this blob -- there was no place to write it to ResolutionErrors.insert(key.GetBlobId()); - ++PutsInFlight; - HandleTxComplete(); } } @@ -280,17 +290,28 @@ namespace NKikimr::NBlobDepot { const bool doNotKeep = ev->Cookie >> 1 & 1; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "got TEvPutResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), - (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep)); + (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep), + (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight), (TxInFlight, TxInFlight), + (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size())); if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item ResolutionErrors.insert(key.GetBlobId()); } + Y_VERIFY(PutsInFlight); + --PutsInFlight; + Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key), TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep); + ++TxInFlight; } void HandleTxComplete() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "HandleTxComplete", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (GetsInFlight, GetsInFlight), (RangesInFlight, RangesInFlight), + (TxInFlight, TxInFlight), (PutsInFlight, PutsInFlight), (GetQ.size, GetQ.size())); + + Y_VERIFY(TxInFlight); --TxInFlight; CheckIfDone(); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp index 240ebc258e..cc93c11c78 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp @@ -104,6 +104,8 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor< using TItemVariant = std::variant<TBlock, TBarrier, TBlob>; struct TPerVDiskInfo { + std::optional<TString> ErrorReason; + std::optional<ui64> LastProcessedBlock; std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier; std::optional<TLogoBlobID> LastProcessedBlob; @@ -343,9 +345,9 @@ public: Y_VERIFY(RequestsInFlight); --RequestsInFlight; + auto& info = PerVDiskInfo[orderNumber]; + Y_VERIFY(!info.HasItemsToMerge()); if (record.GetStatus() == NKikimrProto::OK) { - auto& info = PerVDiskInfo[orderNumber]; - Y_VERIFY(!info.HasItemsToMerge()); info.PushDataFromMessage(record, *this, Info->Type); if (info.HasItemsToMerge()) { Heap.push_back(&info); @@ -353,6 +355,11 @@ public: } else if (!info.Finished()) { Request(orderNumber); } + } else { + info.ErrorReason = TStringBuilder() << vdiskId << ": " << NKikimrProto::EReplyStatus_Name(record.GetStatus()); + if (record.GetErrorReason()) { + *info.ErrorReason += " (" + record.GetErrorReason() + ')'; + } } if (!RequestsInFlight) { @@ -445,6 +452,14 @@ public: void ReplyAndDie(NKikimrProto::EReplyStatus status) { A_LOG_DEBUG_S("BPA04", "ReplyAndDie status# " << NKikimrProto::EReplyStatus_Name(status)); + for (const auto& item : PerVDiskInfo) { + if (item.ErrorReason) { + if (ErrorReason) { + ErrorReason += ", "; + } + ErrorReason += *item.ErrorReason; + } + } SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason)); } }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index b5c0d75ff9..bbfe6c5128 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -241,9 +241,11 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob } Y_VERIFY(query == queries.Get() + queryCount); - // register query in wilson and send it to DS proxy + // register query in wilson and send it to DS proxy; issue non-index query when MustRestoreFirst is false to + // prevent IndexRestoreGet invocation auto get = std::make_unique<TEvBlobStorage::TEvGet>(queries, queryCount, Deadline, - NKikimrBlobStorage::EGetHandleClass::FastRead, MustRestoreFirst, IsIndexOnly, TEvBlobStorage::TEvGet::TForceBlockTabletData(TabletId, ForceBlockedGeneration)); + NKikimrBlobStorage::EGetHandleClass::FastRead, MustRestoreFirst, MustRestoreFirst ? IsIndexOnly : false, + TEvBlobStorage::TEvGet::TForceBlockTabletData(TabletId, ForceBlockedGeneration)); get->IsInternal = true; get->Decommission = Decommission; @@ -284,7 +286,8 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob Y_VERIFY(response.Id == BlobsToGet[i].BlobId); if (getResult.Responses[i].Status == NKikimrProto::OK) { - result->Responses.emplace_back(response.Id, response.Buffer.ConvertToString(), response.Keep, response.DoNotKeep); + result->Responses.emplace_back(response.Id, IsIndexOnly ? TString() : response.Buffer.ConvertToString(), + response.Keep, response.DoNotKeep); } else if (getResult.Responses[i].Status != NKikimrProto::NODATA || BlobsToGet[i].RequiredToBePresent) { // it's okay to get NODATA if blob wasn't confirmed -- this blob is simply thrown out of resulting // set; otherwise we return error about lost data |