diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-08 15:11:50 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-08 15:11:50 +0300 |
commit | 2ac9ef76f85e700ca2b257601886b7b04f53bb61 (patch) | |
tree | 2cbdde9068de2c83238fbea09b01f3ecd571e496 | |
parent | b35fb596af217a9bfca255332f929cf6a92f2c20 (diff) | |
download | ydb-2ac9ef76f85e700ca2b257601886b7b04f53bb61.tar.gz |
Improve BlobDepot agent logging
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_assimilate.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_block.cpp | 25 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_collect_garbage.cpp | 40 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 45 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_patch.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 61 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_range.cpp | 41 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_status.cpp | 4 |
11 files changed, 133 insertions, 124 deletions
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index fb5bbe6ae27..5b9fd324e02 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -241,6 +241,7 @@ namespace NKikimr::NBlobDepot { void EndWithSuccess(std::unique_ptr<IEventBase> response); TString GetName() const; ui64 GetQueryId() const { return QueryId; } + virtual ui64 GetTabletId() const { return 0; } virtual void Initiate() = 0; virtual void OnUpdateBlock(bool /*success*/) {} @@ -254,6 +255,18 @@ namespace NKikimr::NBlobDepot { }; }; + template<typename TEvent> + class TBlobStorageQuery : public TQuery { + public: + TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) + : TQuery(agent, std::move(event)) + , Request(*Event->Get<TEvent>()) + {} + + protected: + TEvent& Request; + }; + std::deque<std::unique_ptr<IEventHandle>> PendingEventQ; TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries; THashMultiMap<ui64, TQuery*> QueryIdToQuery; diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 1b9a45b3f27..b1c628b97e2 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -9,7 +9,7 @@ namespace NKikimr::NBlobDepot { } else { auto *query = CreateQuery(ev); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId), - (QueryId, query->GetQueryId()), (Name, query->GetName())); + (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName())); if (!TabletId) { query->EndWithError(NKikimrProto::ERROR, "group is in error state"); } else { diff --git a/ydb/core/blob_depot/agent/storage_assimilate.cpp b/ydb/core/blob_depot/agent/storage_assimilate.cpp index 05964b1ca03..16283f3623f 100644 --- a/ydb/core/blob_depot/agent/storage_assimilate.cpp +++ b/ydb/core/blob_depot/agent/storage_assimilate.cpp @@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvAssimilate>(std::unique_ptr<IEventHandle> ev) { - class TAssimilateQuery : public TQuery { + class TAssimilateQuery : public TBlobStorageQuery<TEvBlobStorage::TEvAssimilate> { public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { Y_VERIFY(Agent.ProxyId); diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp index defebab3370..0bf35d59611 100644 --- a/ydb/core/blob_depot/agent/storage_block.cpp +++ b/ydb/core/blob_depot/agent/storage_block.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) { - class TBlockQuery : public TQuery { + class TBlockQuery : public TBlobStorageQuery<TEvBlobStorage::TEvBlock> { struct TBlockContext : TRequestContext { TMonotonic Timestamp; @@ -15,24 +15,22 @@ namespace NKikimr::NBlobDepot { }; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>(); - // lookup existing blocks to try fail-fast - const auto& [blockedGeneration, issuerGuid] = Agent.BlocksManager.GetBlockForTablet(msg.TabletId); - if (msg.Generation < blockedGeneration || (msg.Generation == blockedGeneration && ( - msg.IssuerGuid != issuerGuid || !msg.IssuerGuid || !issuerGuid))) { + const auto& [blockedGeneration, issuerGuid] = Agent.BlocksManager.GetBlockForTablet(Request.TabletId); + if (Request.Generation < blockedGeneration || (Request.Generation == blockedGeneration && ( + Request.IssuerGuid != issuerGuid || !Request.IssuerGuid || !issuerGuid))) { // we don't consider ExpirationTimestamp here because blocked generation may only increase return EndWithError(NKikimrProto::ALREADY, "block race detected"); } // issue request to the tablet NKikimrBlobDepot::TEvBlock block; - block.SetTabletId(msg.TabletId); - block.SetBlockedGeneration(msg.Generation); - block.SetIssuerGuid(msg.IssuerGuid); + block.SetTabletId(Request.TabletId); + block.SetBlockedGeneration(Request.Generation); + block.SetIssuerGuid(Request.IssuerGuid); Agent.Issue(std::move(block), this, std::make_shared<TBlockContext>(TActivationContext::Monotonic())); } @@ -54,12 +52,15 @@ namespace NKikimr::NBlobDepot { } else { // update blocks cache auto& blockContext = context->Obtain<TBlockContext>(); - auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); - Agent.BlocksManager.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp, + Agent.BlocksManager.SetBlockForTablet(Request.TabletId, Request.Generation, blockContext.Timestamp, TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs())); EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); } } + + ui64 GetTabletId() const override { + return Request.TabletId; + } }; return new TBlockQuery(*this, std::move(ev)); diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index 94115450075..c4db1e609c4 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) { - class TCollectGarbageQuery : public TQuery { + class TCollectGarbageQuery : public TBlobStorageQuery<TEvBlobStorage::TEvCollectGarbage> { ui32 BlockChecksRemain = 3; ui32 KeepIndex = 0; ui32 NumKeep; @@ -16,15 +16,13 @@ namespace NKikimr::NBlobDepot { bool QueryInFlight = false; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); + NumKeep = Request.Keep ? Request.Keep->size() : 0; + NumDoNotKeep = Request.DoNotKeep ? Request.DoNotKeep->size() : 0; - NumKeep = msg.Keep ? msg.Keep->size() : 0; - NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0; - - const auto status = Agent.BlocksManager.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this, nullptr); + const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, Request.RecordGeneration, this, nullptr); if (status == NKikimrProto::OK) { IssueCollectGarbage(); } else if (status != NKikimrProto::UNKNOWN) { @@ -35,31 +33,30 @@ namespace NKikimr::NBlobDepot { } void IssueCollectGarbage() { - auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); NKikimrBlobDepot::TEvCollectGarbage record; ui32 numItemsIssued = 0; for (; KeepIndex < NumKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++KeepIndex) { - LogoBlobIDFromLogoBlobID((*msg.Keep)[KeepIndex], record.AddKeep()); + LogoBlobIDFromLogoBlobID((*Request.Keep)[KeepIndex], record.AddKeep()); ++numItemsIssued; } for (; DoNotKeepIndex < NumDoNotKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++DoNotKeepIndex) { - LogoBlobIDFromLogoBlobID((*msg.DoNotKeep)[DoNotKeepIndex], record.AddDoNotKeep()); + LogoBlobIDFromLogoBlobID((*Request.DoNotKeep)[DoNotKeepIndex], record.AddDoNotKeep()); ++numItemsIssued; } IsLast = KeepIndex == NumKeep && DoNotKeepIndex == NumDoNotKeep; - record.SetTabletId(msg.TabletId); - record.SetGeneration(msg.RecordGeneration); - record.SetPerGenerationCounter(msg.PerGenerationCounter + CounterShift); - record.SetChannel(msg.Channel); + record.SetTabletId(Request.TabletId); + record.SetGeneration(Request.RecordGeneration); + record.SetPerGenerationCounter(Request.PerGenerationCounter + CounterShift); + record.SetChannel(Request.Channel); - if (msg.Collect && IsLast) { - record.SetHard(msg.Hard); - record.SetCollectGeneration(msg.CollectGeneration); - record.SetCollectStep(msg.CollectStep); + if (Request.Collect && IsLast) { + record.SetHard(Request.Hard); + record.SetCollectGeneration(Request.CollectGeneration); + record.SetCollectStep(Request.CollectStep); } Agent.Issue(std::move(record), this, nullptr); @@ -97,13 +94,16 @@ namespace NKikimr::NBlobDepot { } else if (const auto status = msg.GetStatus(); status != NKikimrProto::OK) { EndWithError(status, msg.GetErrorReason()); } else if (IsLast) { - auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); EndWithSuccess(std::make_unique<TEvBlobStorage::TEvCollectGarbageResult>(NKikimrProto::OK, - msg.TabletId, msg.RecordGeneration, msg.PerGenerationCounter, msg.Channel)); + Request.TabletId, Request.RecordGeneration, Request.PerGenerationCounter, Request.Channel)); } else { IssueCollectGarbage(); } } + + ui64 GetTabletId() const override { + return Request.TabletId; + } }; return new TCollectGarbageQuery(*this, std::move(ev)); diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index b53c5eab346..8f59c53a93e 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) { - class TDiscoverQuery : public TQuery { + class TDiscoverQuery : public TBlobStorageQuery<TEvBlobStorage::TEvDiscover> { ui64 TabletId = 0; bool ReadBody; ui32 MinGeneration = 0; @@ -21,18 +21,16 @@ namespace NKikimr::NBlobDepot { ui32 BlockedGeneration = 0; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>(); - - TabletId = msg.TabletId; - ReadBody = msg.ReadBody; - MinGeneration = msg.MinGeneration; + TabletId = Request.TabletId; + ReadBody = Request.ReadBody; + MinGeneration = Request.MinGeneration; IssueResolve(); - if (msg.DiscoverBlockedGeneration) { + if (Request.DiscoverBlockedGeneration) { const auto status = Agent.BlocksManager.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); if (status == NKikimrProto::OK) { DoneWithBlockedGeneration = true; @@ -165,6 +163,10 @@ namespace NKikimr::NBlobDepot { : std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration)); } } + + ui64 GetTabletId() const override { + return Request.TabletId; + } }; return new TDiscoverQuery(*this, std::move(ev)); diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index ef84be70527..d4b08c13057 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) { - class TGetQuery : public TQuery { + class TGetQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGet> { std::unique_ptr<TEvBlobStorage::TEvGetResult> Response; ui32 AnswersRemain; @@ -19,12 +19,10 @@ namespace NKikimr::NBlobDepot { }; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { - auto& msg = GetQuery(); - - if (msg.Decommission) { + if (Request.Decommission) { // just forward this message to underlying proxy Y_VERIFY(Agent.ProxyId); const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId)); @@ -33,20 +31,20 @@ namespace NKikimr::NBlobDepot { return; } - Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, msg.QuerySize, + Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize, Agent.VirtualGroupId); - AnswersRemain = msg.QuerySize; + AnswersRemain = Request.QuerySize; - if (msg.ReaderTabletData) { - auto status = Agent.BlocksManager.CheckBlockForTablet(msg.ReaderTabletData->Id, msg.ReaderTabletData->Generation, this, nullptr); + if (Request.ReaderTabletData) { + auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr); if (status == NKikimrProto::BLOCKED) { EndWithError(status, "Fail TEvGet due to BLOCKED tablet generation"); return; } } - for (ui32 i = 0; i < msg.QuerySize; ++i) { - auto& query = msg.Queries[i]; + for (ui32 i = 0; i < Request.QuerySize; ++i) { + auto& query = Request.Queries[i]; auto& response = Response->Responses[i]; response.Id = query.Id; @@ -64,26 +62,25 @@ namespace NKikimr::NBlobDepot { } bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) { - auto& msg = GetQuery(); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value)); if (!value) { Response->Responses[queryIdx].Status = NKikimrProto::NODATA; --AnswersRemain; - } else if (msg.IsIndexOnly) { + } else if (Request.IsIndexOnly) { Response->Responses[queryIdx].Status = NKikimrProto::OK; --AnswersRemain; } else if (value) { TReadArg arg{ *value, - msg.GetHandleClass, - msg.MustRestoreFirst, + Request.GetHandleClass, + Request.MustRestoreFirst, this, - msg.Queries[queryIdx].Shift, - msg.Queries[queryIdx].Size, + Request.Queries[queryIdx].Shift, + Request.Queries[queryIdx].Size, queryIdx, - msg.ReaderTabletData}; + Request.ReaderTabletData}; TString error; const bool success = Agent.IssueRead(arg, error); if (!success) { @@ -126,8 +123,16 @@ namespace NKikimr::NBlobDepot { } } - TEvBlobStorage::TEvGet& GetQuery() const { - return *Event->Get<TEvBlobStorage::TEvGet>(); + ui64 GetTabletId() const override { + ui64 value = 0; + for (ui32 i = 0; i < Request.QuerySize; ++i) { + auto& req = Request.Queries[i]; + if (value && value != req.Id.TabletID()) { + return 0; + } + value = req.Id.TabletID(); + } + return value; } }; diff --git a/ydb/core/blob_depot/agent/storage_patch.cpp b/ydb/core/blob_depot/agent/storage_patch.cpp index bc6cdfcf911..b25443d501a 100644 --- a/ydb/core/blob_depot/agent/storage_patch.cpp +++ b/ydb/core/blob_depot/agent/storage_patch.cpp @@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) { - class TPatchQuery : public TQuery { + class TPatchQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPatch> { public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { EndWithError(NKikimrProto::ERROR, "not implemented"); diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index a97a38b59a7..c47fd238fac 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) { - class TPutQuery : public TQuery { + class TPutQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPut> { const bool SuppressFooter = true; const bool IssueUncertainWrites = true; @@ -18,7 +18,7 @@ namespace NKikimr::NBlobDepot { TBlobSeqId BlobSeqId; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void OnDestroy(bool success) override { if (IsInFlight) { @@ -31,29 +31,27 @@ namespace NKikimr::NBlobDepot { } void Initiate() override { - auto& msg = GetQuery(); - if (msg.Buffer.size() > MaxBlobSize) { + if (Request.Buffer.size() > MaxBlobSize) { return EndWithError(NKikimrProto::ERROR, "blob is way too big"); - } else if (msg.Buffer.size() != msg.Id.BlobSize()) { + } else if (Request.Buffer.size() != Request.Id.BlobSize()) { return EndWithError(NKikimrProto::ERROR, "blob size mismatch"); - } else if (!msg.Buffer) { + } else if (!Request.Buffer) { return EndWithError(NKikimrProto::ERROR, "no blob data"); - } else if (!msg.Id) { + } else if (!Request.Id) { return EndWithError(NKikimrProto::ERROR, "blob id is zero"); } - BlockChecksRemain.resize(1 + msg.ExtraBlockChecks.size(), 3); // set number of tries for every block + BlockChecksRemain.resize(1 + Request.ExtraBlockChecks.size(), 3); // set number of tries for every block CheckBlocks(); } void CheckBlocks() { - auto& msg = GetQuery(); bool someBlocksMissing = false; - for (size_t i = 0; i <= msg.ExtraBlockChecks.size(); ++i) { - const auto *blkp = i ? &msg.ExtraBlockChecks[i - 1] : nullptr; - const ui64 tabletId = blkp ? blkp->first : msg.Id.TabletID(); - const ui32 generation = blkp ? blkp->second : msg.Id.Generation(); - const auto status = msg.Decommission + for (size_t i = 0; i <= Request.ExtraBlockChecks.size(); ++i) { + const auto *blkp = i ? &Request.ExtraBlockChecks[i - 1] : nullptr; + const ui64 tabletId = blkp ? blkp->first : Request.Id.TabletID(); + const ui32 generation = blkp ? blkp->second : Request.Id.Generation(); + const auto status = Request.Decommission ? NKikimrProto::OK // suppress blocks check when copying blob from decommitted group : Agent.BlocksManager.CheckBlockForTablet(tabletId, generation, this, nullptr); if (status == NKikimrProto::OK) { @@ -74,8 +72,6 @@ namespace NKikimr::NBlobDepot { void IssuePuts() { Y_VERIFY(!PutsIssued); - auto& msg = GetQuery(); - const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data); if (it == Agent.ChannelKinds.end()) { return EndWithError(NKikimrProto::ERROR, "no Data channels"); @@ -84,7 +80,7 @@ namespace NKikimr::NBlobDepot { std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA21, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, msg.Id)); + (QueryId, GetQueryId()), (BlobSeqId, blobSeqId), (BlobId, Request.Id)); if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); } @@ -98,11 +94,11 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(CommitBlobSeq.ItemsSize() == 0); auto *commitItem = CommitBlobSeq.AddItems(); - commitItem->SetKey(msg.Id.AsBinaryString()); + commitItem->SetKey(Request.Id.AsBinaryString()); auto *locator = commitItem->MutableBlobLocator(); BlobSeqId.ToProto(locator->MutableBlobSeqId()); - //locator->SetChecksum(Crc32c(msg.Buffer.data(), msg.Buffer.size())); - locator->SetTotalDataLen(msg.Buffer.size()); + //locator->SetChecksum(Crc32c(Request.Buffer.data(), Request.Buffer.size())); + locator->SetTotalDataLen(Request.Buffer.size()); if (!SuppressFooter) { locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter)); } @@ -112,17 +108,17 @@ namespace NKikimr::NBlobDepot { footerData = TContiguousData::Uninitialized(sizeof(TVirtualGroupBlobFooter)); auto& footer = *reinterpret_cast<TVirtualGroupBlobFooter*>(footerData.UnsafeGetDataMut()); memset(&footer, 0, sizeof(footer)); - footer.StoredBlobId = msg.Id; + footer.StoredBlobId = Request.Id; } auto put = [&](EBlobType type, TContiguousData&& buffer) { const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size()); Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId); locator->SetGroupId(groupId); - auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), msg.Deadline, msg.HandleClass, msg.Tactic); - ev->ExtraBlockChecks = msg.ExtraBlockChecks; - if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy - ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation()); + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), Request.Deadline, Request.HandleClass, Request.Tactic); + ev->ExtraBlockChecks = Request.ExtraBlockChecks; + if (!Request.Decommission) { // do not check original blob against blocks when writing decommission copy + ev->ExtraBlockChecks.emplace_back(Request.Id.TabletID(), Request.Id.Generation()); } Agent.SendToProxy(groupId, std::move(ev), this, nullptr); ++PutsInFlight; @@ -130,16 +126,16 @@ namespace NKikimr::NBlobDepot { if (SuppressFooter) { // write the blob as is, we don't need footer for this kind - put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(msg.Buffer))); - } else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) { + put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(Request.Buffer))); + } else if (Request.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) { // write single blob with footer - TRope buffer = TRope(std::move(msg.Buffer)); + TRope buffer = TRope(std::move(Request.Buffer)); buffer.Insert(buffer.End(), std::move(footerData)); buffer.Compact(); put(EBlobType::VG_COMPOSITE_BLOB, TContiguousData(std::move(buffer))); } else { // write data blob and blob with footer - put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(msg.Buffer))); + put(EBlobType::VG_DATA_BLOB, TContiguousData(std::move(Request.Buffer))); put(EBlobType::VG_FOOTER_BLOB, TContiguousData(std::move(footerData))); } @@ -252,13 +248,12 @@ namespace NKikimr::NBlobDepot { IssueCommitBlobSeq(false); } - auto& msg = GetQuery(); - TQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, msg.Id, + TQuery::EndWithSuccess(std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::OK, Request.Id, Agent.GetStorageStatusFlags(), Agent.VirtualGroupId, Agent.GetApproximateFreeSpaceShare())); } - TEvBlobStorage::TEvPut& GetQuery() const { - return *Event->Get<TEvBlobStorage::TEvPut>(); + ui64 GetTabletId() const override { + return Request.Id.TabletID(); } }; diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 20a8141b4d0..3869754d50b 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { - class TRangeQuery : public TQuery { + class TRangeQuery : public TBlobStorageQuery<TEvBlobStorage::TEvRange> { std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response; ui32 ReadsInFlight = 0; ui32 ResolvesInFlight = 0; @@ -18,33 +18,29 @@ namespace NKikimr::NBlobDepot { }; public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { - auto& msg = GetQuery(); - - if (msg.Decommission) { + if (Request.Decommission) { Y_VERIFY(Agent.ProxyId); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "forwarding TEvRange", (VirtualGroupId, Agent.VirtualGroupId), - (TabletId, Agent.TabletId), (Msg, msg), (ProxyId, Agent.ProxyId)); + (TabletId, Agent.TabletId), (Msg, Request), (ProxyId, Agent.ProxyId)); const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId)); Y_VERIFY(sent); delete this; return; } - Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To, + Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, Request.From, Request.To, Agent.VirtualGroupId); IssueResolve(); } void IssueResolve() { - auto& msg = GetQuery(); - - TString from = msg.From.AsBinaryString(); - TString to = msg.To.AsBinaryString(); - const bool reverse = msg.To < msg.From; + TString from = Request.From.AsBinaryString(); + TString to = Request.To.AsBinaryString(); + const bool reverse = Request.To < Request.From; if (reverse) { std::swap(from, to); } @@ -57,20 +53,19 @@ namespace NKikimr::NBlobDepot { range->SetEndingKey(to); range->SetIncludeEnding(true); range->SetReverse(reverse); - item->SetTabletId(msg.TabletId); - item->SetMustRestoreFirst(msg.MustRestoreFirst); + item->SetTabletId(Request.TabletId); + item->SetMustRestoreFirst(Request.MustRestoreFirst); Agent.Issue(std::move(resolve), this, nullptr); ++ResolvesInFlight; } void IssueResolve(TLogoBlobID id, size_t index) { - auto& msg = GetQuery(); NKikimrBlobDepot::TEvResolve resolve; auto *item = resolve.AddItems(); item->SetExactKey(id.AsBinaryString()); - item->SetTabletId(msg.TabletId); - item->SetMustRestoreFirst(msg.MustRestoreFirst); + item->SetTabletId(Request.TabletId); + item->SetMustRestoreFirst(Request.MustRestoreFirst); Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index)); ++ResolvesInFlight; @@ -89,8 +84,6 @@ namespace NKikimr::NBlobDepot { } void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) { - auto& query = GetQuery(); - --ResolvesInFlight; if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) { @@ -108,11 +101,11 @@ namespace NKikimr::NBlobDepot { Response->Responses.emplace_back(id, TString()); } - if (!query.IsIndexOnly) { + if (!Request.IsIndexOnly) { TReadArg arg{ key.GetValueChain(), NKikimrBlobStorage::EGetHandleClass::FastRead, - query.MustRestoreFirst, + Request.MustRestoreFirst, this, 0, 0, @@ -124,7 +117,7 @@ namespace NKikimr::NBlobDepot { << error); } ++ReadsInFlight; - } else if (query.MustRestoreFirst) { + } else if (Request.MustRestoreFirst) { Y_FAIL("not implemented yet"); } } @@ -163,8 +156,8 @@ namespace NKikimr::NBlobDepot { } } - TEvBlobStorage::TEvRange& GetQuery() const { - return *Event->Get<TEvBlobStorage::TEvRange>(); + ui64 GetTabletId() const override { + return Request.TabletId; } }; diff --git a/ydb/core/blob_depot/agent/storage_status.cpp b/ydb/core/blob_depot/agent/storage_status.cpp index 8b12ee0e889..8e52166a9ef 100644 --- a/ydb/core/blob_depot/agent/storage_status.cpp +++ b/ydb/core/blob_depot/agent/storage_status.cpp @@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) { - class TStatusQuery : public TQuery { + class TStatusQuery : public TBlobStorageQuery<TEvBlobStorage::TEvStatus> { public: - using TQuery::TQuery; + using TBlobStorageQuery::TBlobStorageQuery; void Initiate() override { EndWithSuccess(std::make_unique<TEvBlobStorage::TEvStatusResult>(NKikimrProto::OK, |