aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-08 15:11:50 +0300
committeralexvru <alexvru@ydb.tech>2022-11-08 15:11:50 +0300
commit2ac9ef76f85e700ca2b257601886b7b04f53bb61 (patch)
tree2cbdde9068de2c83238fbea09b01f3ecd571e496
parentb35fb596af217a9bfca255332f929cf6a92f2c20 (diff)
downloadydb-2ac9ef76f85e700ca2b257601886b7b04f53bb61.tar.gz
Improve BlobDepot agent logging
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h13
-rw-r--r--ydb/core/blob_depot/agent/query.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_assimilate.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_block.cpp25
-rw-r--r--ydb/core/blob_depot/agent/storage_collect_garbage.cpp40
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp18
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp45
-rw-r--r--ydb/core/blob_depot/agent/storage_patch.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp61
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp41
-rw-r--r--ydb/core/blob_depot/agent/storage_status.cpp4
11 files changed, 133 insertions, 124 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index fb5bbe6ae27..5b9fd324e02 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -241,6 +241,7 @@ namespace NKikimr::NBlobDepot {
void EndWithSuccess(std::unique_ptr<IEventBase> response);
TString GetName() const;
ui64 GetQueryId() const { return QueryId; }
+ virtual ui64 GetTabletId() const { return 0; }
virtual void Initiate() = 0;
virtual void OnUpdateBlock(bool /*success*/) {}
@@ -254,6 +255,18 @@ namespace NKikimr::NBlobDepot {
};
};
+ template<typename TEvent>
+ class TBlobStorageQuery : public TQuery {
+ public:
+ TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
+ : TQuery(agent, std::move(event))
+ , Request(*Event->Get<TEvent>())
+ {}
+
+ protected:
+ TEvent& Request;
+ };
+
std::deque<std::unique_ptr<IEventHandle>> PendingEventQ;
TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries;
THashMultiMap<ui64, TQuery*> QueryIdToQuery;
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 1b9a45b3f27..b1c628b97e2 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -9,7 +9,7 @@ namespace NKikimr::NBlobDepot {
} else {
auto *query = CreateQuery(ev);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId),
- (QueryId, query->GetQueryId()), (Name, query->GetName()));
+ (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName()));
if (!TabletId) {
query->EndWithError(NKikimrProto::ERROR, "group is in error state");
} else {
diff --git a/ydb/core/blob_depot/agent/storage_assimilate.cpp b/ydb/core/blob_depot/agent/storage_assimilate.cpp
index 05964b1ca03..16283f3623f 100644
--- a/ydb/core/blob_depot/agent/storage_assimilate.cpp
+++ b/ydb/core/blob_depot/agent/storage_assimilate.cpp
@@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvAssimilate>(std::unique_ptr<IEventHandle> ev) {
- class TAssimilateQuery : public TQuery {
+ class TAssimilateQuery : public TBlobStorageQuery<TEvBlobStorage::TEvAssimilate> {
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
Y_VERIFY(Agent.ProxyId);
diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp
index defebab3370..0bf35d59611 100644
--- a/ydb/core/blob_depot/agent/storage_block.cpp
+++ b/ydb/core/blob_depot/agent/storage_block.cpp
@@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) {
- class TBlockQuery : public TQuery {
+ class TBlockQuery : public TBlobStorageQuery<TEvBlobStorage::TEvBlock> {
struct TBlockContext : TRequestContext {
TMonotonic Timestamp;
@@ -15,24 +15,22 @@ namespace NKikimr::NBlobDepot {
};
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>();
-
// lookup existing blocks to try fail-fast
- const auto& [blockedGeneration, issuerGuid] = Agent.BlocksManager.GetBlockForTablet(msg.TabletId);
- if (msg.Generation < blockedGeneration || (msg.Generation == blockedGeneration && (
- msg.IssuerGuid != issuerGuid || !msg.IssuerGuid || !issuerGuid))) {
+ const auto& [blockedGeneration, issuerGuid] = Agent.BlocksManager.GetBlockForTablet(Request.TabletId);
+ if (Request.Generation < blockedGeneration || (Request.Generation == blockedGeneration && (
+ Request.IssuerGuid != issuerGuid || !Request.IssuerGuid || !issuerGuid))) {
// we don't consider ExpirationTimestamp here because blocked generation may only increase
return EndWithError(NKikimrProto::ALREADY, "block race detected");
}
// issue request to the tablet
NKikimrBlobDepot::TEvBlock block;
- block.SetTabletId(msg.TabletId);
- block.SetBlockedGeneration(msg.Generation);
- block.SetIssuerGuid(msg.IssuerGuid);
+ block.SetTabletId(Request.TabletId);
+ block.SetBlockedGeneration(Request.Generation);
+ block.SetIssuerGuid(Request.IssuerGuid);
Agent.Issue(std::move(block), this, std::make_shared<TBlockContext>(TActivationContext::Monotonic()));
}
@@ -54,12 +52,15 @@ namespace NKikimr::NBlobDepot {
} else {
// update blocks cache
auto& blockContext = context->Obtain<TBlockContext>();
- auto& query = *Event->Get<TEvBlobStorage::TEvBlock>();
- Agent.BlocksManager.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp,
+ Agent.BlocksManager.SetBlockForTablet(Request.TabletId, Request.Generation, blockContext.Timestamp,
TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()));
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK));
}
}
+
+ ui64 GetTabletId() const override {
+ return Request.TabletId;
+ }
};
return new TBlockQuery(*this, std::move(ev));
diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
index 94115450075..c4db1e609c4 100644
--- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
+++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
@@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) {
- class TCollectGarbageQuery : public TQuery {
+ class TCollectGarbageQuery : public TBlobStorageQuery<TEvBlobStorage::TEvCollectGarbage> {
ui32 BlockChecksRemain = 3;
ui32 KeepIndex = 0;
ui32 NumKeep;
@@ -16,15 +16,13 @@ namespace NKikimr::NBlobDepot {
bool QueryInFlight = false;
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>();
+ NumKeep = Request.Keep ? Request.Keep->size() : 0;
+ NumDoNotKeep = Request.DoNotKeep ? Request.DoNotKeep->size() : 0;
- NumKeep = msg.Keep ? msg.Keep->size() : 0;
- NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0;
-
- const auto status = Agent.BlocksManager.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this, nullptr);
+ const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, Request.RecordGeneration, this, nullptr);
if (status == NKikimrProto::OK) {
IssueCollectGarbage();
} else if (status != NKikimrProto::UNKNOWN) {
@@ -35,31 +33,30 @@ namespace NKikimr::NBlobDepot {
}
void IssueCollectGarbage() {
- auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>();
NKikimrBlobDepot::TEvCollectGarbage record;
ui32 numItemsIssued = 0;
for (; KeepIndex < NumKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++KeepIndex) {
- LogoBlobIDFromLogoBlobID((*msg.Keep)[KeepIndex], record.AddKeep());
+ LogoBlobIDFromLogoBlobID((*Request.Keep)[KeepIndex], record.AddKeep());
++numItemsIssued;
}
for (; DoNotKeepIndex < NumDoNotKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++DoNotKeepIndex) {
- LogoBlobIDFromLogoBlobID((*msg.DoNotKeep)[DoNotKeepIndex], record.AddDoNotKeep());
+ LogoBlobIDFromLogoBlobID((*Request.DoNotKeep)[DoNotKeepIndex], record.AddDoNotKeep());
++numItemsIssued;
}
IsLast = KeepIndex == NumKeep && DoNotKeepIndex == NumDoNotKeep;
- record.SetTabletId(msg.TabletId);
- record.SetGeneration(msg.RecordGeneration);
- record.SetPerGenerationCounter(msg.PerGenerationCounter + CounterShift);
- record.SetChannel(msg.Channel);
+ record.SetTabletId(Request.TabletId);
+ record.SetGeneration(Request.RecordGeneration);
+ record.SetPerGenerationCounter(Request.PerGenerationCounter + CounterShift);
+ record.SetChannel(Request.Channel);
- if (msg.Collect && IsLast) {
- record.SetHard(msg.Hard);
- record.SetCollectGeneration(msg.CollectGeneration);
- record.SetCollectStep(msg.CollectStep);
+ if (Request.Collect && IsLast) {
+ record.SetHard(Request.Hard);
+ record.SetCollectGeneration(Request.CollectGeneration);
+ record.SetCollectStep(Request.CollectStep);
}
Agent.Issue(std::move(record), this, nullptr);
@@ -97,13 +94,16 @@ namespace NKikimr::NBlobDepot {
} else if (const auto status = msg.GetStatus(); status != NKikimrProto::OK) {
EndWithError(status, msg.GetErrorReason());
} else if (IsLast) {
- auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>();
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvCollectGarbageResult>(NKikimrProto::OK,
- msg.TabletId, msg.RecordGeneration, msg.PerGenerationCounter, msg.Channel));
+ Request.TabletId, Request.RecordGeneration, Request.PerGenerationCounter, Request.Channel));
} else {
IssueCollectGarbage();
}
}
+
+ ui64 GetTabletId() const override {
+ return Request.TabletId;
+ }
};
return new TCollectGarbageQuery(*this, std::move(ev));
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index b53c5eab346..8f59c53a93e 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) {
- class TDiscoverQuery : public TQuery {
+ class TDiscoverQuery : public TBlobStorageQuery<TEvBlobStorage::TEvDiscover> {
ui64 TabletId = 0;
bool ReadBody;
ui32 MinGeneration = 0;
@@ -21,18 +21,16 @@ namespace NKikimr::NBlobDepot {
ui32 BlockedGeneration = 0;
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>();
-
- TabletId = msg.TabletId;
- ReadBody = msg.ReadBody;
- MinGeneration = msg.MinGeneration;
+ TabletId = Request.TabletId;
+ ReadBody = Request.ReadBody;
+ MinGeneration = Request.MinGeneration;
IssueResolve();
- if (msg.DiscoverBlockedGeneration) {
+ if (Request.DiscoverBlockedGeneration) {
const auto status = Agent.BlocksManager.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration);
if (status == NKikimrProto::OK) {
DoneWithBlockedGeneration = true;
@@ -165,6 +163,10 @@ namespace NKikimr::NBlobDepot {
: std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration));
}
}
+
+ ui64 GetTabletId() const override {
+ return Request.TabletId;
+ }
};
return new TDiscoverQuery(*this, std::move(ev));
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index ef84be70527..d4b08c13057 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) {
- class TGetQuery : public TQuery {
+ class TGetQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGet> {
std::unique_ptr<TEvBlobStorage::TEvGetResult> Response;
ui32 AnswersRemain;
@@ -19,12 +19,10 @@ namespace NKikimr::NBlobDepot {
};
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
- auto& msg = GetQuery();
-
- if (msg.Decommission) {
+ if (Request.Decommission) {
// just forward this message to underlying proxy
Y_VERIFY(Agent.ProxyId);
const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId));
@@ -33,20 +31,20 @@ namespace NKikimr::NBlobDepot {
return;
}
- Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, msg.QuerySize,
+ Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize,
Agent.VirtualGroupId);
- AnswersRemain = msg.QuerySize;
+ AnswersRemain = Request.QuerySize;
- if (msg.ReaderTabletData) {
- auto status = Agent.BlocksManager.CheckBlockForTablet(msg.ReaderTabletData->Id, msg.ReaderTabletData->Generation, this, nullptr);
+ if (Request.ReaderTabletData) {
+ auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr);
if (status == NKikimrProto::BLOCKED) {
EndWithError(status, "Fail TEvGet due to BLOCKED tablet generation");
return;
}
}
- for (ui32 i = 0; i < msg.QuerySize; ++i) {
- auto& query = msg.Queries[i];
+ for (ui32 i = 0; i < Request.QuerySize; ++i) {
+ auto& query = Request.Queries[i];
auto& response = Response->Responses[i];
response.Id = query.Id;
@@ -64,26 +62,25 @@ namespace NKikimr::NBlobDepot {
}
bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) {
- auto& msg = GetQuery();
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value));
if (!value) {
Response->Responses[queryIdx].Status = NKikimrProto::NODATA;
--AnswersRemain;
- } else if (msg.IsIndexOnly) {
+ } else if (Request.IsIndexOnly) {
Response->Responses[queryIdx].Status = NKikimrProto::OK;
--AnswersRemain;
} else if (value) {
TReadArg arg{
*value,
- msg.GetHandleClass,
- msg.MustRestoreFirst,
+ Request.GetHandleClass,
+ Request.MustRestoreFirst,
this,
- msg.Queries[queryIdx].Shift,
- msg.Queries[queryIdx].Size,
+ Request.Queries[queryIdx].Shift,
+ Request.Queries[queryIdx].Size,
queryIdx,
- msg.ReaderTabletData};
+ Request.ReaderTabletData};
TString error;
const bool success = Agent.IssueRead(arg, error);
if (!success) {
@@ -126,8 +123,16 @@ namespace NKikimr::NBlobDepot {
}
}
- TEvBlobStorage::TEvGet& GetQuery() const {
- return *Event->Get<TEvBlobStorage::TEvGet>();
+ ui64 GetTabletId() const override {
+ ui64 value = 0;
+ for (ui32 i = 0; i < Request.QuerySize; ++i) {
+ auto& req = Request.Queries[i];
+ if (value && value != req.Id.TabletID()) {
+ return 0;
+ }
+ value = req.Id.TabletID();
+ }
+ return value;
}
};
diff --git a/ydb/core/blob_depot/agent/storage_patch.cpp b/ydb/core/blob_depot/agent/storage_patch.cpp
index bc6cdfcf911..b25443d501a 100644
--- a/ydb/core/blob_depot/agent/storage_patch.cpp
+++ b/ydb/core/blob_depot/agent/storage_patch.cpp
@@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) {
- class TPatchQuery : public TQuery {
+ class TPatchQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPatch> {
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
EndWithError(NKikimrProto::ERROR, "not implemented");
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index a97a38b59a7..c47fd238fac 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) {
- class TPutQuery : public TQuery {
+ class TPutQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPut> {
const bool SuppressFooter = true;
const bool IssueUncertainWrites = true;
@@ -18,7 +18,7 @@ namespace NKikimr::NBlobDepot {
TBlobSeqId BlobSeqId;
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void OnDestroy(bool success) override {
if (IsInFlight) {
@@ -31,29 +31,27 @@ namespace NKikimr::NBlobDepot {
}
void Initiate() override {
- auto& msg = GetQuery();
- if (msg.Buffer.size() > MaxBlobSize) {
+ if (Request.Buffer.size() > MaxBlobSize) {
return EndWithError(NKikimrProto::ERROR, "blob is way too big");
- } else if (msg.Buffer.size() != msg.Id.BlobSize()) {
+ } else if (Request.Buffer.size() != Request.Id.BlobSize()) {
return EndWithError(NKikimrProto::ERROR, "blob size mismatch");
- } else if (!msg.Buffer) {
+ } else if (!Request.Buffer) {
return EndWithError(NKikimrProto::ERROR, "no blob data");
- } else if (!msg.Id) {
+ } else if (!Request.Id) {
return EndWithError(NKikimrProto::ERROR, "blob id is zero");
}
- BlockChecksRemain.resize(1 + msg.ExtraBlockChecks.size(), 3); // set number of tries for every block
+ BlockChecksRemain.resize(1 + Request.ExtraBlockChecks.size(), 3); // set number of tries for every block
CheckBlocks();
}
void CheckBlocks() {
- auto& msg = GetQuery();
bool someBlocksMissing = false;
- for (size_t i = 0; i <= msg.ExtraBlockChecks.size(); ++i) {
- const auto *blkp = i ? &msg.ExtraBlockChecks[i - 1] : nullptr;
- const ui64 tabletId = blkp ? blkp->first : msg.Id.TabletID();
- const ui32 generation = blkp ? blkp->second : msg.Id.Generation();
- const auto status = msg.Decommission
+ for (size_t i = 0; i <= Request.ExtraBlockChecks.size(); ++i) {
+ const auto *blkp = i ? &Request.ExtraBlockChecks[i - 1] : nullptr;
+ const ui64 tabletId = blkp ? blkp->first : Request.Id.TabletID();
+ const ui32 generation = blkp ? blkp->second : Request.Id.Generation();
+ const auto status = Request.Decommission
? NKikimrProto::OK // suppress blocks check when copying blob from decommitted group
: Agent.BlocksManager.CheckBlockForTablet(tabletId, generation, this, nullptr);
if (status == NKikimrProto::OK) {
@@ -74,8 +72,6 @@ namespace NKikimr::NBlobDepot {
void IssuePuts() {
Y_VERIFY(!PutsIssued);
- auto& msg = GetQuery();
-
const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data);
if (it == Agent.ChannelKinds.end()) {
return EndWithError(NKikimrProto::ERROR, "no Data channels");
@@ -84,7 +80,7 @@ namespace NKikimr::NBlobDepot {
std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, msg.Id));
+ (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, Request.Id));
if (!blobSeqId) {
return kind.EnqueueQueryWaitingForId(this);
}
@@ -98,11 +94,11 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(CommitBlobSeq.ItemsSize() == 0);
auto *commitItem = CommitBlobSeq.AddItems();
- commitItem->SetKey(msg.Id.AsBinaryString());
+ commitItem->SetKey(Request.Id.AsBinaryString());
auto *locator = commitItem->MutableBlobLocator();
BlobSeqId.ToProto(locator->MutableBlobSeqId());
- //locator->SetChecksum(Crc32c(msg.Buffer.data(), msg.Buffer.size()));
- locator->SetTotalDataLen(msg.Buffer.size());
+ //locator->SetChecksum(Crc32c(Request.Buffer.data(), Request.Buffer.size()));
+ locator->SetTotalDataLen(Request.Buffer.size());
if (!SuppressFooter) {
locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter));
}
@@ -112,17 +108,17 @@ namespace NKikimr::NBlobDepot {
footerData = TContiguousData::Uninitialized(sizeof(TVirtualGroupBlobFooter));
auto& footer = *reinterpret_cast<TVirtualGroupBlobFooter*>(footerData.UnsafeGetDataMut());
memset(&footer, 0, sizeof(footer));
- footer.StoredBlobId = msg.Id;
+ footer.StoredBlobId = Request.Id;
}
auto put = [&](EBlobType type, TContiguousData&& buffer) {
const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size());
Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId);
locator->SetGroupId(groupId);
- auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), msg.Deadline, msg.HandleClass, msg.Tactic);
- ev->ExtraBlockChecks = msg.ExtraBlockChecks;
- if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy
- ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation());
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), Request.Deadline, Request.HandleClass, Request.Tactic);
+ ev->ExtraBlockChecks = Request.ExtraBlockChecks;
+ if (!Request.Decommission) { // do not check original blob against blocks when writing decommission copy
+ ev->ExtraBlockChecks.emplace_back(Request.Id.TabletID(), Request.Id.Generation());
}
Agent.SendToProxy(groupId, std::move(ev), this, nullptr);
++PutsInFlight;
@@ -130,16 +126,16 @@ namespace NKikimr::NBlobDepot {
if (SuppressFooter) {
// write the blob as is, we don't need footer for this kind
- put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(msg.Buffer)));
- } else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) {
+ put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(Request.Buffer)));
+ } else if (Request.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) {
// write single blob with footer
- TRope buffer = TRope(std::move(msg.Buffer));
+ TRope buffer = TRope(std::move(Request.Buffer));
buffer.Insert(buffer.End(), std::move(footerData));
buffer.Compact();
put(EBlobType::VG_COMPOSITE_BLOB, TContiguousData(std::move(buffer)));
} else {
// write data blob and blob with footer
- put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(msg.Buffer)));
+ put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(Request.Buffer)));
put(EBlobType::VG_FOOTER_BLOB, TContiguousData(std::move(footerData)));
}
@@ -252,13 +248,12 @@ namespace NKikimr::NBlobDepot {
IssueCommitBlobSeq(false);
}
- auto& msg = GetQuery();
- TQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, msg.Id,
+ TQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, Request.Id,
Agent.GetStorageStatusFlags(), Agent.VirtualGroupId, Agent.GetApproximateFreeSpaceShare()));
}
- TEvBlobStorage::TEvPut& GetQuery() const {
- return *Event->Get<TEvBlobStorage::TEvPut>();
+ ui64 GetTabletId() const override {
+ return Request.Id.TabletID();
}
};
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 20a8141b4d0..3869754d50b 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -4,7 +4,7 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) {
- class TRangeQuery : public TQuery {
+ class TRangeQuery : public TBlobStorageQuery<TEvBlobStorage::TEvRange> {
std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response;
ui32 ReadsInFlight = 0;
ui32 ResolvesInFlight = 0;
@@ -18,33 +18,29 @@ namespace NKikimr::NBlobDepot {
};
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
- auto& msg = GetQuery();
-
- if (msg.Decommission) {
+ if (Request.Decommission) {
Y_VERIFY(Agent.ProxyId);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "forwarding TEvRange", (VirtualGroupId, Agent.VirtualGroupId),
- (TabletId, Agent.TabletId), (Msg, msg), (ProxyId, Agent.ProxyId));
+ (TabletId, Agent.TabletId), (Msg, Request), (ProxyId, Agent.ProxyId));
const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId));
Y_VERIFY(sent);
delete this;
return;
}
- Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To,
+ Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, Request.From, Request.To,
Agent.VirtualGroupId);
IssueResolve();
}
void IssueResolve() {
- auto& msg = GetQuery();
-
- TString from = msg.From.AsBinaryString();
- TString to = msg.To.AsBinaryString();
- const bool reverse = msg.To < msg.From;
+ TString from = Request.From.AsBinaryString();
+ TString to = Request.To.AsBinaryString();
+ const bool reverse = Request.To < Request.From;
if (reverse) {
std::swap(from, to);
}
@@ -57,20 +53,19 @@ namespace NKikimr::NBlobDepot {
range->SetEndingKey(to);
range->SetIncludeEnding(true);
range->SetReverse(reverse);
- item->SetTabletId(msg.TabletId);
- item->SetMustRestoreFirst(msg.MustRestoreFirst);
+ item->SetTabletId(Request.TabletId);
+ item->SetMustRestoreFirst(Request.MustRestoreFirst);
Agent.Issue(std::move(resolve), this, nullptr);
++ResolvesInFlight;
}
void IssueResolve(TLogoBlobID id, size_t index) {
- auto& msg = GetQuery();
NKikimrBlobDepot::TEvResolve resolve;
auto *item = resolve.AddItems();
item->SetExactKey(id.AsBinaryString());
- item->SetTabletId(msg.TabletId);
- item->SetMustRestoreFirst(msg.MustRestoreFirst);
+ item->SetTabletId(Request.TabletId);
+ item->SetMustRestoreFirst(Request.MustRestoreFirst);
Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index));
++ResolvesInFlight;
@@ -89,8 +84,6 @@ namespace NKikimr::NBlobDepot {
}
void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) {
- auto& query = GetQuery();
-
--ResolvesInFlight;
if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) {
@@ -108,11 +101,11 @@ namespace NKikimr::NBlobDepot {
Response->Responses.emplace_back(id, TString());
}
- if (!query.IsIndexOnly) {
+ if (!Request.IsIndexOnly) {
TReadArg arg{
key.GetValueChain(),
NKikimrBlobStorage::EGetHandleClass::FastRead,
- query.MustRestoreFirst,
+ Request.MustRestoreFirst,
this,
0,
0,
@@ -124,7 +117,7 @@ namespace NKikimr::NBlobDepot {
<< error);
}
++ReadsInFlight;
- } else if (query.MustRestoreFirst) {
+ } else if (Request.MustRestoreFirst) {
Y_FAIL("not implemented yet");
}
}
@@ -163,8 +156,8 @@ namespace NKikimr::NBlobDepot {
}
}
- TEvBlobStorage::TEvRange& GetQuery() const {
- return *Event->Get<TEvBlobStorage::TEvRange>();
+ ui64 GetTabletId() const override {
+ return Request.TabletId;
}
};
diff --git a/ydb/core/blob_depot/agent/storage_status.cpp b/ydb/core/blob_depot/agent/storage_status.cpp
index 8b12ee0e889..8e52166a9ef 100644
--- a/ydb/core/blob_depot/agent/storage_status.cpp
+++ b/ydb/core/blob_depot/agent/storage_status.cpp
@@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) {
- class TStatusQuery : public TQuery {
+ class TStatusQuery : public TBlobStorageQuery<TEvBlobStorage::TEvStatus> {
public:
- using TQuery::TQuery;
+ using TBlobStorageQuery::TBlobStorageQuery;
void Initiate() override {
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvStatusResult>(NKikimrProto::OK,