aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-24 12:58:27 +0300
committeralexvru <alexvru@ydb.tech>2022-08-24 12:58:27 +0300
commit904057920e77da1419bb62de4edc86c122be917b (patch)
tree4afcba0e086868a6de4a5e65fd380595bd139d4d
parentb5e423791064ac4a6403e9ab60a78d9987bca786 (diff)
downloadydb-904057920e77da1419bb62de4edc86c122be917b.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/base/blobstorage.h20
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h23
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp4
-rw-r--r--ydb/core/blob_depot/agent/query.cpp46
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp5
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp5
-rw-r--r--ydb/core/blob_depot/garbage_collection.h11
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp25
8 files changed, 108 insertions, 31 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 6bb4523ac96..cde55cb5f95 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -2047,8 +2047,8 @@ struct TEvBlobStorage {
struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
struct TBlock {
- ui64 TabletId;
- ui32 BlockedGeneration;
+ ui64 TabletId = 0;
+ ui32 BlockedGeneration = 0;
TString ToString() const {
TStringStream str;
@@ -2063,10 +2063,10 @@ struct TEvBlobStorage {
struct TBarrier {
struct TValue {
- ui32 RecordGeneration;
- ui32 PerGenerationCounter;
- ui32 CollectGeneration;
- ui32 CollectStep;
+ ui32 RecordGeneration = 0;
+ ui32 PerGenerationCounter = 0;
+ ui32 CollectGeneration = 0;
+ ui32 CollectStep = 0;
void Output(IOutputStream& s) const {
if (RecordGeneration || PerGenerationCounter || CollectGeneration || CollectGeneration) {
@@ -2076,8 +2076,8 @@ struct TEvBlobStorage {
}
};
- ui64 TabletId;
- ui8 Channel;
+ ui64 TabletId = 0;
+ ui8 Channel = 0;
TValue Soft;
TValue Hard;
@@ -2098,8 +2098,8 @@ struct TEvBlobStorage {
struct TBlob {
TLogoBlobID Id;
- bool Keep;
- bool DoNotKeep;
+ bool Keep = false;
+ bool DoNotKeep = false;
TString ToString() const {
TStringStream str;
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 623c4741680..b679bd01f22 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -82,6 +82,13 @@ namespace NKikimr::NBlobDepot {
ui64 TabletId = Max<ui64>();
TActorId PipeId;
+ private:
+ struct TEvPrivate {
+ enum {
+ EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
public:
TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId);
~TBlobDepotAgent();
@@ -111,6 +118,8 @@ namespace NKikimr::NBlobDepot {
ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle);
+
+ fFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog);
);
#undef FORWARD_STORAGE_PROXY
@@ -215,15 +224,15 @@ namespace NKikimr::NBlobDepot {
protected:
std::unique_ptr<IEventHandle> Event; // original query event
const ui64 QueryId;
+ const TMonotonic StartTime;
+
+ static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);
public:
- TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
- : TRequestSender(agent)
- , Event(std::move(event))
- , QueryId(RandomNumber<ui64>())
- {}
+ TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event);
+ virtual ~TQuery();
- virtual ~TQuery() = default;
+ void CheckQueryExecutionTime();
void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason);
void EndWithSuccess(std::unique_ptr<IEventBase> response);
@@ -243,7 +252,9 @@ namespace NKikimr::NBlobDepot {
std::deque<std::unique_ptr<IEventHandle>> PendingEventQ;
TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries;
+ THashMultiMap<ui64, TQuery*> QueryIdToQuery;
+ void HandleQueryWatchdog(TAutoPtr<IEventHandle> ev);
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
TQuery *CreateQuery(TAutoPtr<IEventHandle> ev);
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 91ccc11fd66..1436ff38d69 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -111,8 +111,10 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::OnDisconnect() {
- for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) {
+ while (TabletRequestInFlight) {
+ auto [id, request] = std::move(*TabletRequestInFlight.begin());
request.Sender->OnRequestComplete(id, TTabletDisconnected{});
+ TabletRequestInFlight.erase(id);
}
for (auto& [_, kind] : ChannelKinds) {
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 6870cae93f2..b5ed19b9454 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -33,9 +33,51 @@ namespace NKikimr::NBlobDepot {
Y_FAIL();
}
+ void TBlobDepotAgent::HandleQueryWatchdog(TAutoPtr<IEventHandle> ev) {
+ for (auto [first, last] = QueryIdToQuery.equal_range(ev->Cookie); first != last; ++first) {
+ first->second->CheckQueryExecutionTime();
+ }
+ }
+
+ TBlobDepotAgent::TQuery::TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
+ : TRequestSender(agent)
+ , Event(std::move(event))
+ , QueryId(RandomNumber<ui64>())
+ , StartTime(TActivationContext::Monotonic())
+ {
+ Agent.QueryIdToQuery.emplace(QueryId, this);
+ TActivationContext::Schedule(WatchdogDuration, new IEventHandle(TEvPrivate::EvQueryWatchdog, 0,
+ Agent.SelfId(), {}, nullptr, QueryId));
+ }
+
+ TBlobDepotAgent::TQuery::~TQuery() {
+ if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) {
+ STLOG(PRI_WARN, BLOB_DEPOT_AGENT, BDA00, "query execution took too much time",
+ (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Duration, duration));
+ }
+
+ for (auto [first, last] = Agent.QueryIdToQuery.equal_range(QueryId); first != last; ++first) {
+ if (first->first == QueryId && first->second == this) {
+ Agent.QueryIdToQuery.erase(first);
+ return;
+ }
+ }
+ Y_FAIL();
+ }
+
+ void TBlobDepotAgent::TQuery::CheckQueryExecutionTime() {
+ if (TDuration duration(TActivationContext::Monotonic() - StartTime); duration >= WatchdogDuration) {
+ STLOG(PRI_WARN, BLOB_DEPOT_AGENT, BDA23, "query is still executing", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Duration, duration));
+ TActivationContext::Schedule(WatchdogDuration, new IEventHandle(TEvPrivate::EvQueryWatchdog, 0,
+ Agent.SelfId(), {}, nullptr, QueryId));
+ }
+ }
+
void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, QueryId), (Status, status), (ErrorReason, errorReason));
+ (QueryId, QueryId), (Status, status), (ErrorReason, errorReason),
+ (Duration, TActivationContext::Monotonic() - StartTime));
std::unique_ptr<IEventBase> response;
switch (Event->GetTypeRewrite()) {
@@ -54,7 +96,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, QueryId), (Response, response->ToString()));
+ (QueryId, QueryId), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime));
Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
delete this;
}
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 6b14a919d56..97bd23e98a1 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -234,6 +234,11 @@ namespace NKikimr::NBlobDepot {
}
}
});
+ if (value.OriginalBlobId) {
+ auto *out = item.AddValueChain();
+ out->SetGroupId(Self->Config.GetDecommitGroupId());
+ LogoBlobIDFromLogoBlobID(*value.OriginalBlobId, out->MutableBlobId());
+ }
if (value.Meta) {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 34f057652c1..71119ed8b95 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -235,11 +235,6 @@ namespace NKikimr::NBlobDepot {
}
}
- bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const {
- const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel()));
- return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard);
- }
-
void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const {
const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel()));
const TGenStep genStep(id);
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
index d12c4abb574..578f69ccba7 100644
--- a/ydb/core/blob_depot/garbage_collection.h
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -29,10 +29,19 @@ namespace NKikimr::NBlobDepot {
void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard);
void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc);
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
- bool CheckBlobForBarrier(TLogoBlobID id) const;
void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
void OnDataLoaded();
+ TString ToStringBarrier(ui64 tabletId, ui8 channel, bool hard) const {
+ if (const auto it = Barriers.find(std::make_tuple(tabletId, channel)); it == Barriers.end()) {
+ return "<none>";
+ } else if (auto& b = it->second; hard) {
+ return TStringBuilder() << "hard{" << b.HardGenCtr.ToString() << "=>" << b.Hard.ToString() << "}";
+ } else {
+ return TStringBuilder() << "soft{" << b.SoftGenCtr.ToString() << "=>" << b.Soft.ToString() << "}";
+ }
+ }
+
template<typename TCallback>
void Enumerate(TCallback&& callback) {
for (const auto& [key, value] : Barriers) {
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 714142fddf2..fe83420eb30 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -59,10 +59,11 @@ namespace NKikimr::NBlobDepot {
continue;
}
- if (!CheckKeyAgainstBarrier(key, value)) {
+ TString error;
+ if (!CheckKeyAgainstBarrier(key, value, &error)) {
responseItem->SetStatus(NKikimrProto::ERROR);
responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString()
- << " is being put beyond the barrier");
+ << " is being put beyond the barrier: " << error);
continue;
}
@@ -96,11 +97,23 @@ namespace NKikimr::NBlobDepot {
}
}
- bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value) {
+ bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value,
+ TString *error) {
const auto& v = key.AsVariant();
- const auto *id = std::get_if<TLogoBlobID>(&v);
- return !id || Self->BarrierServer->CheckBlobForBarrier(*id) ||
- value.GetKeepState() == NKikimrBlobDepot::EKeepState::Keep;
+ if (const auto *id = std::get_if<TLogoBlobID>(&v)) {
+ bool underSoft, underHard;
+ Self->BarrierServer->GetBlobBarrierRelation(*id, &underSoft, &underHard);
+ if (underHard) {
+ *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier(
+ id->TabletID(), id->Channel(), true);
+ return false;
+ } else if (underSoft && value.GetKeepState() != NKikimrBlobDepot::EKeepState::Keep) {
+ *error = TStringBuilder() << "under barrier# " << Self->BarrierServer->ToStringBarrier(
+ id->TabletID(), id->Channel(), false);
+ return false;
+ }
+ }
+ return true;
}
void Complete(const TActorContext&) override {