diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-09 22:45:41 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-09 22:45:41 +0300 |
commit | 1e91ddd433d2fe0ff5e1a45163e52e9c4d9bd6a0 (patch) | |
tree | d44925985d9f1213c5e43cbbbf58146013ff3726 | |
parent | f08ddb7b69dd9da61be6f99f8c0e4b22c5bac4dd (diff) | |
download | ydb-1e91ddd433d2fe0ff5e1a45163e52e9c4d9bd6a0.tar.gz |
Fix uncertainty resolver
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 9 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 27 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 96 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_range.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 35 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.h | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.cpp | 116 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_uncertain.h | 9 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 26 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 1 |
16 files changed, 275 insertions, 110 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index e28a2c62f8..a4c849aa9b 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -239,16 +239,18 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { class TTxInvokeCallback : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui32 NodeId; TEvBlobDepot::TEvPushNotifyResult::TPtr Ev; public: - TTxInvokeCallback(TBlobDepot *self, TEvBlobDepot::TEvPushNotifyResult::TPtr ev) + TTxInvokeCallback(TBlobDepot *self, ui32 nodeId, TEvBlobDepot::TEvPushNotifyResult::TPtr ev) : TTransactionBase(self) + , NodeId(nodeId) , Ev(ev) {} bool Execute(TTransactionContext& /*txc*/, const TActorContext&) override { - TAgent& agent = Self->GetAgent(Ev->Recipient); + TAgent& agent = Self->GetAgent(NodeId); if (const auto it = agent.PushCallbacks.find(Ev->Cookie); it != agent.PushCallbacks.end()) { auto callback = std::move(it->second); agent.PushCallbacks.erase(it); @@ -260,7 +262,8 @@ namespace NKikimr::NBlobDepot { void Complete(const TActorContext&) override {} }; - Execute(std::make_unique<TTxInvokeCallback>(this, ev)); + TAgent& agent = GetAgent(ev->Recipient); + Execute(std::make_unique<TTxInvokeCallback>(this, agent.ConnectedNodeId, ev)); } void TBlobDepot::ProcessRegisterAgentQ() { diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index 89a0f1787a..c762ac6631 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -32,6 +32,7 @@ namespace NKikimr::NBlobDepot { } HandleQueryWatchdog(); + HandlePendingEventQueueWatchdog(); } IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) { diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 8745e873ab..4b7a9b7086 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -32,7 +32,11 @@ namespace NKikimr::NBlobDepot { }; struct TTabletDisconnected {}; - struct TKeyResolved { const TResolvedValueChain* ValueChain; }; + + struct TKeyResolved { + const TResolvedValueChain* ValueChain; + std::optional<TString> ErrorReason; + }; class TRequestSender { THashMap<ui64, TRequestContext::TPtr> RequestsInFlight; @@ -87,6 +91,7 @@ namespace NKikimr::NBlobDepot { enum { EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE), EvProcessPendingEvent, + EvPendingEventQueueWatchdog, }; }; @@ -124,6 +129,7 @@ namespace NKikimr::NBlobDepot { ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle); cFunc(TEvPrivate::EvProcessPendingEvent, HandlePendingEvent); + cFunc(TEvPrivate::EvPendingEventQueueWatchdog, HandlePendingEventQueueWatchdog); cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog); ); @@ -226,6 +232,7 @@ namespace NKikimr::NBlobDepot { protected: std::unique_ptr<IEventHandle> Event; // original query event const ui64 QueryId; + mutable TString QueryIdString; const TMonotonic StartTime; std::multimap<TMonotonic, TQuery*>::iterator QueryWatchdogMapIter; NLog::EPriority WatchdogPriority = NLog::PRI_WARN; @@ -241,7 +248,7 @@ namespace NKikimr::NBlobDepot { void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason); void EndWithSuccess(std::unique_ptr<IEventBase> response); TString GetName() const; - ui64 GetQueryId() const { return QueryId; } + TString GetQueryId() const; virtual ui64 GetTabletId() const { return 0; } virtual void Initiate() = 0; @@ -268,16 +275,26 @@ namespace NKikimr::NBlobDepot { TEvent& Request; }; - std::deque<std::unique_ptr<IEventHandle>> PendingEventQ; + struct TPendingEvent { + std::unique_ptr<IEventHandle> Event; + size_t Size; + TMonotonic ExpirationTimestamp; + }; + + std::deque<TPendingEvent> PendingEventQ; + size_t PendingEventBytes = 0; + static constexpr size_t MaxPendingEventBytes = 32'000'000; // ~32 MB + static constexpr TDuration EventExpirationTime = TDuration::Seconds(5); TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries; std::multimap<TMonotonic, TQuery*> QueryWatchdogMap; - void HandleQueryWatchdog(); + template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev); void HandleStorageProxy(TAutoPtr<IEventHandle> ev); void HandlePendingEvent(); void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev); + void HandlePendingEventQueueWatchdog(); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); - template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev); + void HandleQueryWatchdog(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index 6625332412..4a65f9c0b7 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -12,7 +12,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TBlobMappingCache::HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg, TRequestContext::TPtr context) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDA28, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (Msg, msg)); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA28, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (Msg, msg)); auto process = [&](TString key, const NKikimrBlobDepot::TEvResolveResult::TResolvedKey *item) { const auto [it, inserted] = Cache.try_emplace(std::move(key)); @@ -28,8 +28,9 @@ namespace NKikimr::NBlobDepot { } Queue.PushBack(&entry); entry.ResolveInFlight = false; - for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { - Agent.OnRequestComplete(item.Id, TKeyResolved{entry.Values.empty() ? nullptr : &entry.Values}, + for (TQueryWaitingForKey& q : std::exchange(entry.QueriesWaitingForKey, {})) { + Agent.OnRequestComplete(q.Id, TKeyResolved{entry.Values.empty() ? nullptr : &entry.Values, + item && item->HasErrorReason() ? std::make_optional(item->GetErrorReason()) : std::nullopt}, Agent.OtherRequestInFlight); } }; diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 8cdf457bcd..6969dda9bb 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -2,47 +2,59 @@ namespace NKikimr::NBlobDepot { + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<0>(std::unique_ptr<IEventHandle> ev) { + switch (ev->GetTypeRewrite()) { +#define XX(TYPE) case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::move(ev)); + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } + Y_FAIL(); + } + void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) { + std::unique_ptr<IEventHandle> p(ev.Release()); + if (TabletId == Max<ui64>() || !PendingEventQ.empty()) { - // TODO: memory usage control - PendingEventQ.emplace_back(ev.Release()); + size_t size = Max<size_t>(); + switch (p->GetTypeRewrite()) { +#define XX(TYPE) case TEvBlobStorage::TYPE: size = p->Get<TEvBlobStorage::T##TYPE>()->CalculateSize(); break; + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } + Y_VERIFY(size != Max<size_t>()); + + if (size + PendingEventBytes > MaxPendingEventBytes) { + CreateQuery<0>(std::move(p))->EndWithError(NKikimrProto::ERROR, "pending event queue overflow"); + } else { + PendingEventBytes += size; + PendingEventQ.push_back(TPendingEvent{std::move(p), size, TMonotonic::Now() + EventExpirationTime}); + } } else { - ProcessStorageEvent(std::unique_ptr<IEventHandle>(ev.Release())); + ProcessStorageEvent(std::move(p)); } } void TBlobDepotAgent::HandlePendingEvent() { - THPTimer timer; - - do { - if (!PendingEventQ.empty()) { - ProcessStorageEvent(std::move(PendingEventQ.front())); - PendingEventQ.pop_front(); - } else { + for (THPTimer timer; !PendingEventQ.empty(); ) { + TPendingEvent& item = PendingEventQ.front(); + ProcessStorageEvent(std::move(item.Event)); + Y_VERIFY(PendingEventBytes >= item.Size); + PendingEventBytes -= item.Size; + PendingEventQ.pop_front(); + Y_VERIFY(!PendingEventQ.empty() || !PendingEventBytes); + + if (TDuration::Seconds(timer.Passed()) >= TDuration::MicroSeconds(100)) { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); break; } - } while (TDuration::Seconds(timer.Passed()) <= TDuration::MicroSeconds(100)); - - if (!PendingEventQ.empty()) { - TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); } } void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr<IEventHandle> ev) { - TQuery *query = nullptr; - - switch (ev->GetTypeRewrite()) { -#define XX(TYPE) \ - case TEvBlobStorage::TYPE: query = CreateQuery<TEvBlobStorage::TYPE>(std::move(ev)); break; - - ENUMERATE_INCOMING_EVENTS(XX) -#undef XX - } - - Y_VERIFY(query); - + TQuery *query = CreateQuery<0>(std::move(ev)); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId), - (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName())); + (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { query->EndWithError(NKikimrProto::ERROR, "group is in error state"); } else { @@ -50,6 +62,18 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepotAgent::HandlePendingEventQueueWatchdog() { + const TMonotonic now = TActivationContext::Monotonic(); + std::deque<TPendingEvent>::iterator it; + for (it = PendingEventQ.begin(); it != PendingEventQ.end() && it->ExpirationTimestamp <= now; ++it) { + CreateQuery<0>(std::move(it->Event))->EndWithError(NKikimrProto::ERROR, "pending event queue timeout"); + } + PendingEventQ.erase(PendingEventQ.begin(), it); + + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvPendingEventQueueWatchdog, 0, + SelfId(), {}, nullptr, 0)); + } + void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) { ev->Get()->Process(this); } @@ -79,7 +103,7 @@ namespace NKikimr::NBlobDepot { TBlobDepotAgent::TQuery::~TQuery() { if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) { STLOG(WatchdogPriority, BLOB_DEPOT_AGENT, BDA00, "query execution took too much time", - (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Duration, duration)); + (VirtualGroupId, Agent.VirtualGroupId), (QueryId, GetQueryId()), (Duration, duration)); } Agent.QueryWatchdogMap.erase(QueryWatchdogMapIter); } @@ -95,7 +119,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Status, status), (ErrorReason, errorReason), + (QueryId, GetQueryId()), (Status, status), (ErrorReason, errorReason), (Duration, TActivationContext::Monotonic() - StartTime)); std::unique_ptr<IEventBase> response; @@ -116,12 +140,24 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); + (QueryId, GetQueryId()), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); OnDestroy(true); delete this; } + TString TBlobDepotAgent::TQuery::GetQueryId() const { + if (!QueryIdString) { + TStringStream s; + s << Hex(QueryId); + if (const ui64 tabletId = GetTabletId()) { + s << '@' << tabletId; + } + QueryIdString = std::move(s.Str()); + } + return QueryIdString; + } + TString TBlobDepotAgent::TQuery::GetName() const { switch (Event->GetTypeRewrite()) { #define XX(TYPE) case TEvBlobStorage::TYPE: return #TYPE; diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 8f59c53a93..a65c558f78 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -76,7 +76,7 @@ namespace NKikimr::NBlobDepot { void OnUpdateBlock(bool success) override { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Success, success)); + (QueryId, GetQueryId()), (Success, success)); if (!success) { return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); @@ -95,7 +95,7 @@ namespace NKikimr::NBlobDepot { void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Msg, msg.Record)); + (QueryId, GetQueryId()), (Msg, msg.Record)); Agent.BlobMappingCache.HandleResolveResult(msg.Record, nullptr); @@ -107,6 +107,10 @@ namespace NKikimr::NBlobDepot { if (status == NKikimrProto::OK) { for (const auto& item : msg.Record.GetResolvedKeys()) { Id = TLogoBlobID::FromBinary(item.GetKey()); + if (item.HasErrorReason()) { + return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to resolve blob# " << Id + << ": " << item.GetErrorReason()); + } Y_VERIFY(item.ValueChainSize() == 1); if (ReadBody) { TReadArg arg{ @@ -141,7 +145,7 @@ namespace NKikimr::NBlobDepot { void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Status, status)); + (QueryId, GetQueryId()), (Status, status)); if (status == NKikimrProto::OK) { Buffer = std::move(dataOrErrorReason); diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 7e4e3adf46..5a66a5853f 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -54,7 +54,7 @@ namespace NKikimr::NBlobDepot { TString blobId = query.Id.AsBinaryString(); if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, std::make_shared<TResolveKeyContext>(i))) { - if (!ProcessSingleResult(i, value)) { + if (!ProcessSingleResult(i, value, std::nullopt)) { return; } } else { @@ -64,15 +64,19 @@ namespace NKikimr::NBlobDepot { } } - bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) { + bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value)); + (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason)); - if (!value) { - Response->Responses[queryIdx].Status = NKikimrProto::NODATA; + auto& r = Response->Responses[queryIdx]; + if (errorReason) { + r.Status = NKikimrProto::ERROR; + --AnswersRemain; + } else if (!value) { + r.Status = NKikimrProto::NODATA; --AnswersRemain; } else if (Request.IsIndexOnly) { - Response->Responses[queryIdx].Status = NKikimrProto::OK; + r.Status = NKikimrProto::OK; --AnswersRemain; } else if (value) { TReadArg arg{ @@ -111,7 +115,7 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { if (auto *p = std::get_if<TKeyResolved>(&response)) { - ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain); + ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain, p->ErrorReason); } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { Agent.HandleGetResult(context, **p); } else if (std::holds_alternative<TTabletDisconnected>(response)) { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index c47fd238fa..226ab8f241 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -153,6 +153,10 @@ namespace NKikimr::NBlobDepot { } else { item->ClearUncertainWrite(); } + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA30, "IssueCommitBlobSeq", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId()), (UncertainWrite, uncertainWrite), (Msg, CommitBlobSeq)); + Agent.Issue(CommitBlobSeq, this, nullptr); Y_VERIFY(!WaitingForCommitBlobSeq); @@ -160,6 +164,9 @@ namespace NKikimr::NBlobDepot { } void RemoveBlobSeqFromInFlight() { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA32, "RemoveBlobSeqFromInFlight", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId())); + Y_VERIFY(IsInFlight); IsInFlight = false; @@ -222,6 +229,9 @@ namespace NKikimr::NBlobDepot { } void HandleCommitBlobSeqResult(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvCommitBlobSeqResult& msg) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA31, "TEvCommitBlobSeqResult", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId()), (Msg, msg)); + Y_VERIFY(WaitingForCommitBlobSeq); WaitingForCommitBlobSeq = false; diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 3869754d50..5c889ff675 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -101,7 +101,10 @@ namespace NKikimr::NBlobDepot { Response->Responses.emplace_back(id, TString()); } - if (!Request.IsIndexOnly) { + if (key.HasErrorReason()) { + return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to resolve blob# " << id + << ": " << key.GetErrorReason()); + } else if (!Request.IsIndexOnly) { TReadArg arg{ key.GetValueChain(), NKikimrBlobStorage::EGetHandleClass::FastRead, diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 8c5a4f6b7c..e53468c524 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -13,13 +13,15 @@ namespace NKikimr::NBlobDepot { , InterconnectSession(ev.InterconnectSession) {} - void TData::TResolveResultAccumulator::AddItem(NKikimrBlobDepot::TEvResolveResult::TResolvedKey&& item) { + void TData::TResolveResultAccumulator::AddItem(NKikimrBlobDepot::TEvResolveResult::TResolvedKey&& item, + const NKikimrBlobDepot::TBlobDepotConfig& config) { + KeyToIndex.emplace(TKey::FromBinaryKey(item.GetKey(), config), Items.size()); Items.push_back(std::move(item)); + KeysToFilterOut.push_back(false); } void TData::TResolveResultAccumulator::Send(TActorIdentity selfId, NKikimrProto::EReplyStatus status, - std::optional<TString> errorReason, const std::unordered_set<TKey> *filter, - const NKikimrBlobDepot::TBlobDepotConfig *config) { + std::optional<TString> errorReason) { auto sendResult = [&](std::unique_ptr<TEvBlobDepot::TEvResolveResult> ev) { auto handle = std::make_unique<IEventHandle>(Sender, selfId, ev.release(), 0, Cookie); if (InterconnectSession) { @@ -35,15 +37,13 @@ namespace NKikimr::NBlobDepot { size_t lastResponseSize; std::unique_ptr<TEvBlobDepot::TEvResolveResult> ev; - for (auto& item : Items) { - if (filter) { - Y_VERIFY_DEBUG(config); - const TKey key(TKey::FromBinaryKey(item.GetKey(), *config)); - if (filter->contains(key)) { - continue; - } + size_t index = 0; + for (auto it = Items.begin(); it != Items.end(); ++it, ++index) { + if (KeysToFilterOut[index]) { + continue; } + auto& item = *it; const size_t itemSize = item.ByteSizeLong(); if (!ev || lastResponseSize + itemSize > EventMaxByteSize) { if (ev) { @@ -68,6 +68,19 @@ namespace NKikimr::NBlobDepot { return std::exchange(Items, {}); } + void TData::TResolveResultAccumulator::AddKeyWithNoData(const TKey& key) { + const auto it = KeyToIndex.find(key); + Y_VERIFY(it != KeyToIndex.end()); + KeysToFilterOut[it->second] = true; + } + + void TData::TResolveResultAccumulator::AddKeyWithError(const TKey& key, const TString& errorReason) { + const auto it = KeyToIndex.find(key); + Y_VERIFY(it != KeyToIndex.end()); + auto& item = Items[it->second]; + item.SetErrorReason(item.HasErrorReason() ? TStringBuilder() << item.GetErrorReason() << ", " << errorReason : errorReason); + } + class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request; int ItemIndex = 0; @@ -321,7 +334,7 @@ namespace NKikimr::NBlobDepot { (Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie)); } - Result.AddItem(std::move(item)); + Result.AddItem(std::move(item), Self->Config); if (value.UncertainWrite && !value.ValueChain.empty()) { Uncertainties.push_back(key); } diff --git a/ydb/core/blob_depot/data_resolve.h b/ydb/core/blob_depot/data_resolve.h index 81b306b874..2d1f033591 100644 --- a/ydb/core/blob_depot/data_resolve.h +++ b/ydb/core/blob_depot/data_resolve.h @@ -9,14 +9,21 @@ namespace NKikimr::NBlobDepot { const ui64 Cookie; const TActorId InterconnectSession; std::deque<NKikimrBlobDepot::TEvResolveResult::TResolvedKey> Items; + std::unordered_map<TKey, size_t> KeyToIndex; + std::vector<bool> KeysToFilterOut; public: TResolveResultAccumulator(TEventHandle<TEvBlobDepot::TEvResolve>& ev); - void AddItem(NKikimrBlobDepot::TEvResolveResult::TResolvedKey&& item); - void Send(TActorIdentity selfId, NKikimrProto::EReplyStatus status, std::optional<TString> errorReason, - const std::unordered_set<TKey> *filter = nullptr, const NKikimrBlobDepot::TBlobDepotConfig *config = nullptr); + void AddItem(NKikimrBlobDepot::TEvResolveResult::TResolvedKey&& item, const NKikimrBlobDepot::TBlobDepotConfig& config); + void Send(TActorIdentity selfId, NKikimrProto::EReplyStatus status, std::optional<TString> errorReason); std::deque<NKikimrBlobDepot::TEvResolveResult::TResolvedKey> ReleaseItems(); + + void AddKeyWithNoData(const TKey& key); + void AddKeyWithError(const TKey& key, const TString& errorReason); + + const TActorId& GetSender() const { return Sender; } + ui64 GetCookie() const { return Cookie; } }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 91b68d0947..4957a88cc7 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -120,7 +120,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), - (TrashInFlight.size, record.TrashInFlight.size())); + (LeastExpectedBlobId, record.LeastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size())); const ui64 id = ++LastCollectCmdId; CollectCmdToGroup.emplace(id, record.GroupId); diff --git a/ydb/core/blob_depot/data_uncertain.cpp b/ydb/core/blob_depot/data_uncertain.cpp index 5404c3455d..2402916351 100644 --- a/ydb/core/blob_depot/data_uncertain.cpp +++ b/ydb/core/blob_depot/data_uncertain.cpp @@ -18,20 +18,27 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(!uncertainties.empty()); auto entry = MakeIntrusive<TResolveOnHold>(std::move(result)); + for (const TKey& key : uncertainties) { if (const TValue *value = Self->Data->FindKey(key); value && value->UncertainWrite && !value->ValueChain.empty()) { const auto [it, _] = Keys.try_emplace(key); it->second.DependentRequests.push_back(entry); ++entry->NumUncertainKeys; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT61, "uncertain key", (Id, Self->GetLogId()), + (Sender, entry->Result.GetSender()), (Cookie, entry->Result.GetCookie()), (Key, key)); CheckAndFinishKeyIfPossible(&*it); } else { // this value is not uncertainly written anymore, we can issue response // FIXME: handle race when underlying value gets changed here and we reply with old value chain + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT62, "racing uncertain key", (Id, Self->GetLogId()), + (Sender, entry->Result.GetSender()), (Cookie, entry->Result.GetCookie()), (Key, key)); } } if (entry->NumUncertainKeys == 0) { // we had no more uncertain keys to resolve + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT63, "uncertainty resolver finished with noop", (Id, Self->GetLogId()), + (Sender, entry->Result.GetSender()), (Cookie, entry->Result.GetCookie())); entry->Result.Send(Self->SelfId(), NKikimrProto::OK, std::nullopt); } else { NumKeysQueried += entry->NumUncertainKeys; @@ -39,17 +46,17 @@ namespace NKikimr::NBlobDepot { } void TData::TUncertaintyResolver::MakeKeyCertain(const TKey& key) { - FinishKey(key, true); + FinishKey(key, NKikimrProto::OK, {}); } void TData::TUncertaintyResolver::DropBlobs(const std::vector<TLogoBlobID>& blobIds) { for (const TLogoBlobID& id : blobIds) { - FinishBlob(id, EKeyBlobState::WASNT_WRITTEN); + FinishBlob(id, EKeyBlobState::WASNT_WRITTEN, {}); } } void TData::TUncertaintyResolver::DropKey(const TKey& key) { - FinishKey(key, false); + FinishKey(key, NKikimrProto::NODATA, {}); ++NumKeysDropped; } @@ -58,11 +65,14 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(msg.ResponseSz == 1); auto& resp = msg.Responses[0]; FinishBlob(resp.Id, resp.Status == NKikimrProto::OK ? EKeyBlobState::CONFIRMED : - resp.Status == NKikimrProto::NODATA ? EKeyBlobState::WASNT_WRITTEN : - EKeyBlobState::ERROR); + resp.Status == NKikimrProto::NODATA ? EKeyBlobState::WASNT_WRITTEN : + EKeyBlobState::ERROR, msg.ErrorReason ? msg.ErrorReason : "EvGet failed"); } - void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, EKeyBlobState state) { + void TData::TUncertaintyResolver::FinishBlob(TLogoBlobID id, EKeyBlobState state, const TString& errorReason) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT64, "TUncertaintyResolver::FinishBlob", (Id, Self->GetLogId()), (BlobId, id), + (State, state), (Contained, Blobs.contains(id))); + const auto blobIt = Blobs.find(id); if (blobIt == Blobs.end()) { return; @@ -75,7 +85,7 @@ namespace NKikimr::NBlobDepot { const auto blobStateIt = keyContext.BlobState.find(id); Y_VERIFY(blobStateIt != keyContext.BlobState.end()); - blobStateIt->second = state; + blobStateIt->second = {state, errorReason}; CheckAndFinishKeyIfPossible(keyRecord); } @@ -84,14 +94,18 @@ namespace NKikimr::NBlobDepot { void TData::TUncertaintyResolver::CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord) { auto& [key, keyContext] = *keyRecord; - bool okay = true; - if (const TValue *value = Self->Data->FindKey(key); value && !value->ValueChain.empty()) { Y_VERIFY(value->UncertainWrite); // otherwise we must have already received push notification + bool wait = false; + bool nodata = false; + bool error = false; + TStringStream errorReason; + EnumerateBlobsForValueChain(value->ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { auto& [key, keyContext] = *keyRecord; - switch (EKeyBlobState& state = keyContext.BlobState[id]) { + auto& [state, blobErrorReason] = keyContext.BlobState[id]; + switch (state) { case EKeyBlobState::INITIAL: { // have to additionally query this blob and wait for it TBlobContext& blobContext = Blobs[id]; @@ -99,19 +113,21 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(inserted); if (blobContext.ReferringKeys.size() == 1) { const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT65, "TUncertaintyResolver sending Get", (Id, Self->GetLogId()), + (BlobId, id), (Key, key), (GroupId, groupId)); SendToBSProxy(Self->SelfId(), groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, true, true)); ++NumGetsIssued; } - okay = false; state = EKeyBlobState::QUERY_IN_FLIGHT; + wait = true; break; } case EKeyBlobState::QUERY_IN_FLIGHT: // still have to wait for this one - okay = false; + wait = true; break; case EKeyBlobState::CONFIRMED: @@ -120,48 +136,77 @@ namespace NKikimr::NBlobDepot { case EKeyBlobState::WASNT_WRITTEN: // the blob hasn't been written completely; this may also be a race when it is being written - // right now, but we are asking for the data too early (like in scan request) - okay = false; + // right now, but we are asking for the data too early (like in scan request); however this means + // that blob couldn't have been reported as OK to the agent, and we may respond with NODATA to it + nodata = true; break; case EKeyBlobState::ERROR: - // we can't figure out this blob's state - okay = false; + // we can't figure out this blob's state; this means we have to respond with ERROR for this + // particular blob + if (error) { + errorReason << ", "; + } + errorReason << id << ": " << blobErrorReason; + error = true; break; } }); - } else { // key has been deleted, we have to drop it from the response - okay = false; - FinishKey(key, false); - } - if (okay) { - Self->Data->MakeKeyCertain(key); + if (error) { + FinishKey(key, NKikimrProto::ERROR, errorReason.Str()); + } else if (nodata) { + FinishKey(key, NKikimrProto::NODATA, {}); + } else if (wait) { + // just do nothing, wait for the request to fulfill + } else { + Self->Data->MakeKeyCertain(key); + } + } else { // key has been deleted, we have to drop it from the response + FinishKey(key, NKikimrProto::NODATA, {}); } } - void TData::TUncertaintyResolver::FinishKey(const TKey& key, bool success) { + void TData::TUncertaintyResolver::FinishKey(const TKey& key, NKikimrProto::EReplyStatus status, + const TString& errorReason) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT66, "TUncertaintyResolver::FinishKey", (Id, Self->GetLogId()), (Key, key), + (Status, status)); + const auto keyIt = Keys.find(key); if (keyIt == Keys.end()) { return; } - ++(success ? NumKeysResolved : NumKeysUnresolved); + ++(status == NKikimrProto::OK ? NumKeysResolved : NumKeysUnresolved); auto item = Keys.extract(keyIt); auto& keyContext = item.mapped(); for (auto& request : keyContext.DependentRequests) { - if (!success) { - request->KeysToBeFilteredOut.insert(key); + switch (status) { + case NKikimrProto::OK: + break; + + case NKikimrProto::NODATA: + request->Result.AddKeyWithNoData(key); + break; + + case NKikimrProto::ERROR: + request->Result.AddKeyWithError(key, errorReason); + break; + + default: + Y_FAIL(); } if (--request->NumUncertainKeys == 0) { // we can finish the request - request->Result.Send(Self->SelfId(), NKikimrProto::OK, std::nullopt, &request->KeysToBeFilteredOut, - &Self->Config); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT67, "uncertainty resolver finished", (Id, Self->GetLogId()), + (Sender, request->Result.GetSender()), (Cookie, request->Result.GetCookie())); + request->Result.Send(Self->SelfId(), NKikimrProto::OK, std::nullopt); } } - for (const auto& [id, state] : keyContext.BlobState) { + for (const auto& [id, s] : keyContext.BlobState) { + const auto& [state, errorReason] = s; if (state == EKeyBlobState::QUERY_IN_FLIGHT) { const auto blobIt = Blobs.find(id); Y_VERIFY(blobIt != Blobs.end()); @@ -190,3 +235,16 @@ namespace NKikimr::NBlobDepot { } } // NKikimr::NBlobDepot + +template<> +void Out<NKikimr::NBlobDepot::TData::TUncertaintyResolver::EKeyBlobState>(IOutputStream& s, + NKikimr::NBlobDepot::TData::TUncertaintyResolver::EKeyBlobState value) { + using E = decltype(value); + switch (value) { + case E::INITIAL: s << "INITIAL"; break; + case E::QUERY_IN_FLIGHT: s << "QUERY_IN_FLIGHT"; break; + case E::CONFIRMED: s << "CONFIRMED"; break; + case E::WASNT_WRITTEN: s << "WASNT_WRITTEN"; break; + case E::ERROR: s << "ERROR"; break; + } +} diff --git a/ydb/core/blob_depot/data_uncertain.h b/ydb/core/blob_depot/data_uncertain.h index da42cfc47c..c4d55cafb3 100644 --- a/ydb/core/blob_depot/data_uncertain.h +++ b/ydb/core/blob_depot/data_uncertain.h @@ -15,7 +15,6 @@ namespace NKikimr::NBlobDepot { struct TResolveOnHold : TSimpleRefCount<TResolveOnHold> { TResolveResultAccumulator Result; ui32 NumUncertainKeys = 0; - std::unordered_set<TKey> KeysToBeFilteredOut; TResolveOnHold(TResolveResultAccumulator&& result) : Result(std::move(result)) @@ -35,7 +34,7 @@ namespace NKikimr::NBlobDepot { std::vector<TIntrusivePtr<TResolveOnHold>> DependentRequests; // blob queries issued and replied - std::unordered_map<TLogoBlobID, EKeyBlobState> BlobState; + std::unordered_map<TLogoBlobID, std::tuple<EKeyBlobState, TString>> BlobState; }; using TKeys = std::map<TKey, TKeyContext>; @@ -54,6 +53,8 @@ namespace NKikimr::NBlobDepot { ui64 NumKeysUnresolved = 0; ui64 NumKeysDropped = 0; + friend void Out(IOutputStream& s, EKeyBlobState value); + public: TUncertaintyResolver(TBlobDepot *self); void PushResultWithUncertainties(TResolveResultAccumulator&& result, std::deque<TKey>&& uncertainties); @@ -65,9 +66,9 @@ namespace NKikimr::NBlobDepot { void RenderMainPage(IOutputStream& s); private: - void FinishBlob(TLogoBlobID id, EKeyBlobState state); + void FinishBlob(TLogoBlobID id, EKeyBlobState state, const TString& errorReason); void CheckAndFinishKeyIfPossible(TKeys::value_type *keyRecord); - void FinishKey(const TKey& key, bool success); + void FinishKey(const TKey& key, NKikimrProto::EReplyStatus status, const TString& errorReason); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index a8501a57d1..00273e7d14 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -7,12 +7,14 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) { class TTxCommitBlobSeq : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui32 NodeId; std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> Request; std::unique_ptr<IEventHandle> Response; public: - TTxCommitBlobSeq(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request) + TTxCommitBlobSeq(TBlobDepot *self, ui32 nodeId, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request) : TTransactionBase(self) + , NodeId(nodeId) , Request(std::move(request)) {} @@ -22,7 +24,7 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord; std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); - TAgent& agent = Self->GetAgent(Request->Recipient); + TAgent& agent = Self->GetAgent(NodeId); const ui32 generation = Self->Executor()->Generation(); for (const auto& item : Request->Get()->Record.GetItems()) { @@ -41,15 +43,12 @@ namespace NKikimr::NBlobDepot { const auto blobSeqId = TBlobSeqId::FromProto(blobLocator.GetBlobSeqId()); const bool canBeCollected = Self->Data->CanBeCollected(blobLocator.GetGroupId(), blobSeqId); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT68, "TTxCommitBlobSeq process key", (Id, Self->GetLogId()), + (Key, key), (Item, item), (CanBeCollected, canBeCollected), (Generation, generation)); + if (blobSeqId.Generation == generation) { // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming Y_VERIFY_S(!canBeCollected || item.GetCommitNotify(), "BlobSeqId# " << blobSeqId.ToString()); - - // mark given blob as committed only when it was issued in current generation -- only for this - // generation we have correct GivenIdRanges - if (!item.GetCommitNotify()) { - MarkGivenIdCommitted(agent, blobSeqId); - } } else if (canBeCollected) { // we can't accept this record, because it is potentially under already issued barrier responseItem->SetStatus(NKikimrProto::ERROR); @@ -77,6 +76,12 @@ namespace NKikimr::NBlobDepot { } } else { Self->Data->UpdateKey(key, item, item.GetUncertainWrite(), txc, this); + if (blobSeqId.Generation == generation) { + // mark given blob as committed only when it was issued in current generation -- only for this + // generation we have correct GivenIdRanges; and we can do this only after updating key as the + // callee function may trigger garbage collection + MarkGivenIdCommitted(agent, blobSeqId); + } } } @@ -128,8 +133,9 @@ namespace NKikimr::NBlobDepot { } }; - Execute(std::make_unique<TTxCommitBlobSeq>(this, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>( - ev.Release()))); + TAgent& agent = GetAgent(ev->Recipient); + Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.ConnectedNodeId, + std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>(ev.Release()))); } void TBlobDepot::Handle(TEvBlobDepot::TEvDiscardSpoiledBlobSeq::TPtr ev) { diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 270869d509..ade8c57124 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -217,6 +217,7 @@ message TEvResolveResult { repeated TResolvedValueChain ValueChain = 3; optional bytes Meta = 4; repeated uint64 Owners = 5; + optional string ErrorReason = 6; // if set, this means value wasn't resolved due to error } optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way optional string ErrorReason = 2; |