diff options
author | alexvru <alexvru@ydb.tech> | 2022-12-21 11:32:11 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-12-21 11:32:11 +0300 |
commit | d4e640830ffe49640c9b4833529acc8ebefb7c11 (patch) | |
tree | 6fdeaf885bb0331171719d0c3a4c49aebfc692ef | |
parent | 412f8d7d894ddc6bb04fc7f36729d4f36d9aea50 (diff) | |
download | ydb-d4e640830ffe49640c9b4833529acc8ebefb7c11.tar.gz |
Refactor BlobDepot agent response processing
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 49 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/request.cpp | 65 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 23 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 58 |
10 files changed, 142 insertions, 84 deletions
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index c762ac6631..e9d8c022b6 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -21,8 +21,9 @@ namespace NKikimr::NBlobDepot { } } - TBlobDepotAgent::~TBlobDepotAgent() - {} + TBlobDepotAgent::~TBlobDepotAgent() { + TRequestSender::ClearRequestsInFlight(); + } void TBlobDepotAgent::Bootstrap() { Become(&TThis::StateFunc); diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index f707e35070..01f2901593 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -38,8 +38,38 @@ namespace NKikimr::NBlobDepot { std::optional<TString> ErrorReason; }; + class TRequestSender; + + struct TRequestInFlight + : TIntrusiveListItem<TRequestInFlight> + , TNonCopyable + { + using TCancelCallback = std::function<void()>; + + const ui64 Id; + TRequestSender* const Sender = {}; + TRequestContext::TPtr Context; + TCancelCallback CancelCallback; + const bool ToBlobDepotTablet = {}; + + TRequestInFlight(ui64 id) + : Id(id) + {} + + TRequestInFlight(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, TCancelCallback cancelCallback, + bool toBlobDepotTablet); + + struct THash { + size_t operator ()(const TRequestInFlight& x) const { return std::hash<ui64>()(x.Id); } + }; + + friend bool operator ==(const TRequestInFlight& x, const TRequestInFlight& y) { + return x.Id == y.Id; + } + }; + class TRequestSender { - THashMap<ui64, TRequestContext::TPtr> RequestsInFlight; + TIntrusiveList<TRequestInFlight> RequestsInFlight; protected: TBlobDepotAgent& Agent; @@ -69,11 +99,15 @@ namespace NKikimr::NBlobDepot { public: TRequestSender(TBlobDepotAgent& agent); virtual ~TRequestSender(); - void RegisterRequest(ui64 id, TRequestContext::TPtr context); - void OnRequestComplete(ui64 id, TResponse response); + void ClearRequestsInFlight(); + void OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response); protected: virtual void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) = 0; + + private: + friend struct TRequestInFlight; + void RegisterRequestInFlight(TRequestInFlight *requestInFlight); }; class TBlobDepotAgent @@ -167,14 +201,7 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Request/response delivery logic - struct TRequestInFlight { - using TCancelCallback = std::function<void()>; - - TRequestSender *Sender; - TCancelCallback CancelCallback; - }; - - using TRequestsInFlight = THashMap<ui64, TRequestInFlight>; + using TRequestsInFlight = std::unordered_set<TRequestInFlight, TRequestInFlight::THash>; ui64 NextTabletRequestId = 1; TRequestsInFlight TabletRequestInFlight; diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 43b6227162..97ad1fd1bf 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -130,10 +130,10 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::OnDisconnect() { - while (TabletRequestInFlight) { - auto [id, request] = std::move(*TabletRequestInFlight.begin()); - request.Sender->OnRequestComplete(id, TTabletDisconnected{}); - TabletRequestInFlight.erase(id); + while (!TabletRequestInFlight.empty()) { + auto node = TabletRequestInFlight.extract(TabletRequestInFlight.begin()); + auto& requestInFlight = node.value(); + requestInFlight.Sender->OnRequestComplete(requestInFlight, TTabletDisconnected{}); } for (auto& [_, kind] : ChannelKinds) { diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 28bf510147..bc2b8353ed 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -152,6 +152,7 @@ namespace NKikimr::NBlobDepot { Destroyed = true; Agent.ExecutingQueries.Remove(this); Agent.DeletePendingQueries.PushBack(this); + TRequestSender::ClearRequestsInFlight(); } TString TBlobDepotAgent::TQuery::GetQueryId() const { diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 2c5ffa143a..9f49b16af2 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -2,6 +2,17 @@ namespace NKikimr::NBlobDepot { + TRequestInFlight::TRequestInFlight(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, + TCancelCallback cancelCallback, bool toBlobDepotTablet) + : Id(id) + , Sender(sender) + , Context(std::move(context)) + , CancelCallback(std::move(cancelCallback)) + , ToBlobDepotTablet(toBlobDepotTablet) + { + Sender->RegisterRequestInFlight(this); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TRequestSender class @@ -10,37 +21,28 @@ namespace NKikimr::NBlobDepot { {} TRequestSender::~TRequestSender() { - if (this != &Agent) { - for (const auto& [id, context] : RequestsInFlight) { - const ui64 id_ = id; - auto tryToProcess = [&](auto& map) { - if (const auto it = map.find(id_); it != map.end()) { - TBlobDepotAgent::TRequestInFlight& request = it->second; - if (request.CancelCallback) { - request.CancelCallback(); - } - map.erase(it); - return true; - } else { - return false; - } - }; - tryToProcess(Agent.TabletRequestInFlight) || tryToProcess(Agent.OtherRequestInFlight); + ClearRequestsInFlight(); + } + + void TRequestSender::ClearRequestsInFlight() { + RequestsInFlight.ForEach([this](TRequestInFlight *requestInFlight) { + auto& map = requestInFlight->ToBlobDepotTablet ? Agent.TabletRequestInFlight : Agent.OtherRequestInFlight; + auto node = map.extract(requestInFlight->Id); + Y_VERIFY(node); + + if (requestInFlight->CancelCallback) { + requestInFlight->CancelCallback(); } - } + }); } - void TRequestSender::RegisterRequest(ui64 id, TRequestContext::TPtr context) { - const auto [_, inserted] = RequestsInFlight.emplace(id, std::move(context)); - Y_VERIFY(inserted); + void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response) { + requestInFlight.Unlink(); + ProcessResponse(requestInFlight.Id, std::move(requestInFlight.Context), std::move(response)); } - void TRequestSender::OnRequestComplete(ui64 id, TResponse response) { - const auto it = RequestsInFlight.find(id); - Y_VERIFY(it != RequestsInFlight.end()); - TRequestContext::TPtr context = std::move(it->second); - RequestsInFlight.erase(it); - ProcessResponse(id, std::move(context), std::move(response)); + void TRequestSender::RegisterRequestInFlight(TRequestInFlight *requestInFlight) { + RequestsInFlight.PushBack(requestInFlight); } TString TRequestSender::ToString(const TResponse& response) { @@ -63,9 +65,9 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) { TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight; - const auto [_, inserted] = map.emplace(id, TRequestInFlight{sender, std::move(cancelCallback)}); + const bool inserted = map.emplace(id, sender, std::move(context), std::move(cancelCallback), + toBlobDepotTablet).second; Y_VERIFY(inserted); - sender->RegisterRequest(id, std::move(context)); } template<typename TEvent> @@ -94,10 +96,9 @@ namespace NKikimr::NBlobDepot { template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev); void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) { - if (const auto it = map.find(id); it != map.end()) { - TRequestInFlight request = std::move(it->second); - map.erase(it); - request.Sender->OnRequestComplete(id, std::move(response)); + if (auto node = map.extract(id)) { + auto& requestInFlight = node.value(); + requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response)); } } diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 4c5be62edd..2b142b6b3f 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -89,7 +89,7 @@ namespace NKikimr::NBlobDepot { } else if (Request.IsIndexOnly) { r.Status = NKikimrProto::OK; --AnswersRemain; - } else if (value) { + } else { TReadArg arg{ *value, Request.GetHandleClass, @@ -100,6 +100,24 @@ namespace NKikimr::NBlobDepot { queryIdx, Request.ReaderTabletData}; TString error; + auto makeValueChain = [&] { + TStringStream str; + str << '['; + for (int i = 0; i < value->size(); ++i) { + const auto& item = value->at(i); + if (i != 0) { + str << ' '; + } + const auto blobId = LogoBlobIDFromLogoBlobID(item.GetBlobId()); + const ui64 subrangeBegin = item.GetSubrangeBegin(); + const ui64 subrangeEnd = item.HasSubrangeEnd() ? item.GetSubrangeEnd() : blobId.BlobSize(); + str << blobId << '@' << item.GetGroupId() << '{' << subrangeBegin << '-' << (subrangeEnd - 1) << '}'; + } + str << ']'; + return str.Str(); + }; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (VirtualGroupId, Agent.VirtualGroupId), + (Offset, arg.Offset), (Size, arg.Size), (ValueChain, makeValueChain()), (Tag, arg.Tag)); const bool success = Agent.IssueRead(arg, error); if (!success) { EndWithError(NKikimrProto::ERROR, std::move(error)); @@ -110,6 +128,9 @@ namespace NKikimr::NBlobDepot { } void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (VirtualGroupId, Agent.VirtualGroupId), + (Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0), + (ErrorReason, status != NKikimrProto::OK ? buffer : "")); auto& resp = Response->Responses[tag]; Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN); resp.Status = status; diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index cb2f3f7e28..a96a188726 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -265,6 +265,12 @@ namespace NKikimr::NBlobDepot { return it->second; } + TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(ui8 channel, ui32 groupId) { + const auto it = RecordsPerChannelGroup.find(std::make_tuple(channel, groupId)); + Y_VERIFY(it != RecordsPerChannelGroup.end()); + return it->second; + } + void TData::AddDataOnLoad(TKey key, TString value, bool uncertainWrite) { NKikimrBlobDepot::TValue proto; const bool success = proto.ParseFromString(value); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 21574f2228..9a13f08ade 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -382,8 +382,12 @@ namespace NKikimr::NBlobDepot { std::deque<TKey> KeysMadeCertain; // but not yet committed bool CommitCertainKeysScheduled = false; + struct TCollectCmd { + ui64 QueryId; + ui32 GroupId; + }; ui64 LastCollectCmdId = 0; - std::unordered_map<ui64, ui32> CollectCmdToGroup; + std::unordered_map<ui64, TCollectCmd> CollectCmds; public: TData(TBlobDepot *self); @@ -443,6 +447,7 @@ namespace NKikimr::NBlobDepot { void HandleCommitCertainKeys(); TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); + TRecordsPerChannelGroup& GetRecordsPerChannelGroup(ui8 channel, ui32 groupId); void AddDataOnLoad(TKey key, TString value, bool uncertainWrite); void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index b88e499668..2b7729b744 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -181,10 +181,10 @@ namespace NKikimr::NBlobDepot { } } - if (end && Self->Data->LastLoadedKey && *end <= Self->Data->LastLoadedKey) { + if (end && Self->Data->LastLoadedKey && *end <= *Self->Data->LastLoadedKey) { // we have everything we need contained in memory, skip this item continue; - } else if (Self->Data->LastLoadedKey && (!begin || *begin <= Self->Data->LastLoadedKey)) { + } else if (Self->Data->LastLoadedKey && (!begin || *begin <= Self->Data->LastLoadedKey) && !(flags & EScanFlags::REVERSE)) { // we can scan only some part from memory -- do it auto callback = [&](const TKey& key, const TValue&) { LastScannedKey = key; diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 0f1167db59..ff653a2c27 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -29,7 +29,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; - TGenStep nextGenStep(*--record.Trash.end()); + TGenStep nextGenStep = Max(record.IssuedGenStep, TGenStep(*--record.Trash.end())); std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); // step we are going to invalidate (including blobs with this one) @@ -74,24 +74,23 @@ namespace NKikimr::NBlobDepot { const TLogoBlobID maxId(Self->TabletID(), leastExpectedBlobId.Generation, leastExpectedBlobId.Step, record.Channel, 0, 0); trashEndIter = record.Trash.lower_bound(maxId); - if (trashEndIter != record.Trash.begin()) { - nextGenStep = TGenStep(*std::prev(trashEndIter)); - } else { - nextGenStep = {}; - } + nextGenStep = Max(record.IssuedGenStep, + trashEndIter != record.Trash.begin() + ? TGenStep(*std::prev(trashEndIter)) + : TGenStep()); } + Y_VERIFY(nextGenStep < TGenStep(leastExpectedBlobId)); + TVector<TLogoBlobID> keep; TVector<TLogoBlobID> doNotKeep; - std::vector<TLogoBlobID> trashInFlight; for (auto it = record.Trash.begin(); it != trashEndIter; ++it) { - if (const TGenStep genStep(*it); genStep <= record.LastConfirmedGenStep) { + if (const TGenStep genStep(*it); genStep <= record.IssuedGenStep) { doNotKeep.push_back(*it); } else if (nextGenStep < genStep) { Y_FAIL(); } - trashInFlight.push_back(*it); } const TLogoBlobID keepFrom(Self->TabletID(), record.LastConfirmedGenStep.Generation(), @@ -102,7 +101,11 @@ namespace NKikimr::NBlobDepot { keep.push_back(*it); } + // trash items that will be deleted during this round + std::vector<TLogoBlobID> trashInFlight(record.Trash.begin(), trashEndIter); + const bool collect = nextGenStep > record.LastConfirmedGenStep; + Y_VERIFY(nextGenStep >= record.IssuedGenStep); if (trashInFlight.empty()) { Y_VERIFY(keep.empty()); // nothing to do here @@ -120,7 +123,7 @@ namespace NKikimr::NBlobDepot { record.CollectGarbageRequestInFlight = true; record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; record.TrashInFlight.swap(trashInFlight); - record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); + record.IssuedGenStep = nextGenStep; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), @@ -128,36 +131,33 @@ namespace NKikimr::NBlobDepot { (LeastExpectedBlobId, leastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size())); const ui64 id = ++LastCollectCmdId; - CollectCmdToGroup.emplace(id, record.GroupId); + const ui64 queryId = RandomNumber<ui64>(); + CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId}); if (IS_LOG_PRIORITY_ENABLED(*TlsActivationContext, NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) { if (ev->Keep) { for (const TLogoBlobID& blobId : *ev->Keep) { Y_VERIFY(blobId.Channel() == record.Channel); BDEV(BDEV00, "TrashManager_issueKeep", (BDT, Self->TabletID()), (GroupId, record.GroupId), - (Channel, int(record.Channel)), (BlobId, blobId), (Cookie, id)); + (Channel, int(record.Channel)), (Q, queryId), (Cookie, id), (BlobId, blobId)); } } if (ev->DoNotKeep) { for (const TLogoBlobID& blobId : *ev->DoNotKeep) { Y_VERIFY(blobId.Channel() == record.Channel); BDEV(BDEV01, "TrashManager_issueDoNotKeep", (BDT, Self->TabletID()), (GroupId, record.GroupId), - (Channel, int(record.Channel)), (BlobId, blobId), (Cookie, id)); + (Channel, int(record.Channel)), (Q, queryId), (Cookie, id), (BlobId, blobId)); } } if (collect) { BDEV(BDEV02, "TrashManager_issueCollect", (BDT, Self->TabletID()), (GroupId, record.GroupId), - (Channel, int(ev->Channel)), (RecordGeneration, ev->RecordGeneration), + (Channel, int(ev->Channel)), (Q, queryId), (Cookie, id), (RecordGeneration, ev->RecordGeneration), (PerGenerationCounter, ev->PerGenerationCounter), (Hard, ev->Hard), - (CollectGeneration, ev->CollectGeneration), (CollectStep, ev->CollectStep), (Cookie, id)); + (CollectGeneration, ev->CollectGeneration), (CollectStep, ev->CollectStep)); } } - if (collect) { - ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id); - } else { - SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id); - } + ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev), id); } for (auto& [agentId, ev] : outbox) { @@ -175,20 +175,18 @@ namespace NKikimr::NBlobDepot { } void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { - auto cmd = CollectCmdToGroup.extract(ev->Cookie); + auto cmd = CollectCmds.extract(ev->Cookie); Y_VERIFY(cmd); - const ui32 groupId = cmd.mapped(); + const TCollectCmd& info = cmd.mapped(); + const ui32 groupId = info.GroupId; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()), (Channel, ev->Get()->Channel), (GroupId, groupId), (Msg, ev->Get()->ToString())); BDEV(BDEV03, "TrashManager_collectResult", (BDT, Self->TabletID()), (GroupId, groupId), (Channel, ev->Get()->Channel), - (Cookie, ev->Cookie), (Status, ev->Get()->Status), (ErrorReason, ev->Get()->ErrorReason)); + (Q, info.QueryId), (Cookie, ev->Cookie), (Status, ev->Get()->Status), (ErrorReason, ev->Get()->ErrorReason)); - const auto& key = std::make_tuple(ev->Get()->Channel, groupId); - const auto it = RecordsPerChannelGroup.find(key); - Y_VERIFY(it != RecordsPerChannelGroup.end()); - auto& record = it->second; + TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(ev->Get()->Channel, groupId); if (ev->Get()->Status == NKikimrProto::OK) { Y_VERIFY(record.CollectGarbageRequestInFlight); record.OnSuccessfulCollect(this); @@ -201,10 +199,8 @@ namespace NKikimr::NBlobDepot { } void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { - const auto& key = std::make_tuple(channel, groupId); - const auto it = RecordsPerChannelGroup.find(key); - Y_VERIFY(it != RecordsPerChannelGroup.end()); - it->second.ClearInFlight(this); + TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId); + record.ClearInFlight(this); } } // NKikimr::NBlobDepot |