diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-24 12:58:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-24 12:58:27 +0300 |
commit | 904057920e77da1419bb62de4edc86c122be917b (patch) | |
tree | 4afcba0e086868a6de4a5e65fd380595bd139d4d | |
parent | b5e423791064ac4a6403e9ab60a78d9987bca786 (diff) | |
download | ydb-904057920e77da1419bb62de4edc86c122be917b.tar.gz |
BlobDepot work in progress
-rw-r--r-- | ydb/core/base/blobstorage.h | 20 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 23 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 46 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.h | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 25 |
8 files changed, 108 insertions, 31 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 6bb4523ac96..cde55cb5f95 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -2047,8 +2047,8 @@ struct TEvBlobStorage { struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> { struct TBlock { - ui64 TabletId; - ui32 BlockedGeneration; + ui64 TabletId = 0; + ui32 BlockedGeneration = 0; TString ToString() const { TStringStream str; @@ -2063,10 +2063,10 @@ struct TEvBlobStorage { struct TBarrier { struct TValue { - ui32 RecordGeneration; - ui32 PerGenerationCounter; - ui32 CollectGeneration; - ui32 CollectStep; + ui32 RecordGeneration = 0; + ui32 PerGenerationCounter = 0; + ui32 CollectGeneration = 0; + ui32 CollectStep = 0; void Output(IOutputStream& s) const { if (RecordGeneration || PerGenerationCounter || CollectGeneration || CollectGeneration) { @@ -2076,8 +2076,8 @@ struct TEvBlobStorage { } }; - ui64 TabletId; - ui8 Channel; + ui64 TabletId = 0; + ui8 Channel = 0; TValue Soft; TValue Hard; @@ -2098,8 +2098,8 @@ struct TEvBlobStorage { struct TBlob { TLogoBlobID Id; - bool Keep; - bool DoNotKeep; + bool Keep = false; + bool DoNotKeep = false; TString ToString() const { TStringStream str; diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 623c4741680..b679bd01f22 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -82,6 +82,13 @@ namespace NKikimr::NBlobDepot { ui64 TabletId = Max<ui64>(); TActorId PipeId; + private: + struct TEvPrivate { + enum { + EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + public: TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId); ~TBlobDepotAgent(); @@ -111,6 +118,8 @@ namespace NKikimr::NBlobDepot { ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle); + + fFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog); ); #undef FORWARD_STORAGE_PROXY @@ -215,15 +224,15 @@ namespace NKikimr::NBlobDepot { protected: std::unique_ptr<IEventHandle> Event; // original query event const ui64 QueryId; + const TMonotonic StartTime; + + static constexpr TDuration WatchdogDuration = TDuration::Seconds(10); public: - TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) - : TRequestSender(agent) - , Event(std::move(event)) - , QueryId(RandomNumber<ui64>()) - {} + TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event); + virtual ~TQuery(); - virtual ~TQuery() = default; + void CheckQueryExecutionTime(); void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason); void EndWithSuccess(std::unique_ptr<IEventBase> response); @@ -243,7 +252,9 @@ namespace NKikimr::NBlobDepot { std::deque<std::unique_ptr<IEventHandle>> PendingEventQ; TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries; + THashMultiMap<ui64, TQuery*> QueryIdToQuery; + void HandleQueryWatchdog(TAutoPtr<IEventHandle> ev); void HandleStorageProxy(TAutoPtr<IEventHandle> ev); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); TQuery *CreateQuery(TAutoPtr<IEventHandle> ev); diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 91ccc11fd66..1436ff38d69 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -111,8 +111,10 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::OnDisconnect() { - for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) { + while (TabletRequestInFlight) { + auto [id, request] = std::move(*TabletRequestInFlight.begin()); request.Sender->OnRequestComplete(id, TTabletDisconnected{}); + TabletRequestInFlight.erase(id); } for (auto& [_, kind] : ChannelKinds) { diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 6870cae93f2..b5ed19b9454 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -33,9 +33,51 @@ namespace NKikimr::NBlobDepot { Y_FAIL(); } + void TBlobDepotAgent::HandleQueryWatchdog(TAutoPtr<IEventHandle> ev) { + for (auto [first, last] = QueryIdToQuery.equal_range(ev->Cookie); first != last; ++first) { + first->second->CheckQueryExecutionTime(); + } + } + + TBlobDepotAgent::TQuery::TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) + : TRequestSender(agent) + , Event(std::move(event)) + , QueryId(RandomNumber<ui64>()) + , StartTime(TActivationContext::Monotonic()) + { + Agent.QueryIdToQuery.emplace(QueryId, this); + TActivationContext::Schedule(WatchdogDuration, new IEventHandle(TEvPrivate::EvQueryWatchdog, 0, + Agent.SelfId(), {}, nullptr, QueryId)); + } + + TBlobDepotAgent::TQuery::~TQuery() { + if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) { + STLOG(PRI_WARN, BLOB_DEPOT_AGENT, BDA00, "query execution took too much time", + (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Duration, duration)); + } + + for (auto [first, last] = Agent.QueryIdToQuery.equal_range(QueryId); first != last; ++first) { + if (first->first == QueryId && first->second == this) { + Agent.QueryIdToQuery.erase(first); + return; + } + } + Y_FAIL(); + } + + void TBlobDepotAgent::TQuery::CheckQueryExecutionTime() { + if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) { + STLOG(PRI_WARN, BLOB_DEPOT_AGENT, BDA23, "query is still executing", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Duration, duration)); + TActivationContext::Schedule(WatchdogDuration, new IEventHandle(TEvPrivate::EvQueryWatchdog, 0, + Agent.SelfId(), {}, nullptr, QueryId)); + } + } + void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Status, status), (ErrorReason, errorReason)); + (QueryId, QueryId), (Status, status), (ErrorReason, errorReason), + (Duration, TActivationContext::Monotonic() - StartTime)); std::unique_ptr<IEventBase> response; switch (Event->GetTypeRewrite()) { @@ -54,7 +96,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Response, response->ToString())); + (QueryId, QueryId), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); delete this; } diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 6b14a919d56..97bd23e98a1 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -234,6 +234,11 @@ namespace NKikimr::NBlobDepot { } } }); + if (value.OriginalBlobId) { + auto *out = item.AddValueChain(); + out->SetGroupId(Self->Config.GetDecommitGroupId()); + LogoBlobIDFromLogoBlobID(*value.OriginalBlobId, out->MutableBlobId()); + } if (value.Meta) { item.SetMeta(value.Meta.data(), value.Meta.size()); } diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 34f057652c1..71119ed8b95 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -235,11 +235,6 @@ namespace NKikimr::NBlobDepot { } } - bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const { - const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel())); - return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard); - } - void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const { const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel())); const TGenStep genStep(id); diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h index d12c4abb574..578f69ccba7 100644 --- a/ydb/core/blob_depot/garbage_collection.h +++ b/ydb/core/blob_depot/garbage_collection.h @@ -29,10 +29,19 @@ namespace NKikimr::NBlobDepot { void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard); void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc); void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); - bool CheckBlobForBarrier(TLogoBlobID id) const; void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; void OnDataLoaded(); + TString ToStringBarrier(ui64 tabletId, ui8 channel, bool hard) const { + if (const auto it = Barriers.find(std::make_tuple(tabletId, channel)); it == Barriers.end()) { + return "<none>"; + } else if (auto& b = it->second; hard) { + return TStringBuilder() << "hard{" << b.HardGenCtr.ToString() << "=>" << b.Hard.ToString() << "}"; + } else { + return TStringBuilder() << "soft{" << b.SoftGenCtr.ToString() << "=>" << b.Soft.ToString() << "}"; + } + } + template<typename TCallback> void Enumerate(TCallback&& callback) { for (const auto& [key, value] : Barriers) { diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 714142fddf2..fe83420eb30 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -59,10 +59,11 @@ namespace NKikimr::NBlobDepot { continue; } - if (!CheckKeyAgainstBarrier(key, value)) { + TString error; + if (!CheckKeyAgainstBarrier(key, value, &error)) { responseItem->SetStatus(NKikimrProto::ERROR); responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString() - << " is being put beyond the barrier"); + << " is being put beyond the barrier: " << error); continue; } @@ -96,11 +97,23 @@ namespace NKikimr::NBlobDepot { } } - bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value) { + bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value, + TString *error) { const auto& v = key.AsVariant(); - const auto *id = std::get_if<TLogoBlobID>(&v); - return !id || Self->BarrierServer->CheckBlobForBarrier(*id) || - value.GetKeepState() == NKikimrBlobDepot::EKeepState::Keep; + if (const auto *id = std::get_if<TLogoBlobID>(&v)) { + bool underSoft, underHard; + Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard); + if (underHard) { + *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier( + id->TabletID(), id->Channel(), true); + return false; + } else if (underSoft && value.GetKeepState() != NKikimrBlobDepot::EKeepState::Keep) { + *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier( + id->TabletID(), id->Channel(), false); + return false; + } + } + return true; } void Complete(const TActorContext&) override { |