diff options
author | mregrock <mregrock@ydb.tech> | 2024-12-11 15:32:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-11 15:32:21 +0300 |
commit | 4ca0cb63a3c717a033e7153b55192eebcf97eb31 (patch) | |
tree | 2145671501b5b5c5fb45ea30b9cbc0e0ca8d122a | |
parent | 410cfb65f31f95776ceba2c28a6a9df5aa7188b3 (diff) | |
download | ydb-4ca0cb63a3c717a033e7153b55192eebcf97eb31.tar.gz |
Add GetBlock request (#12431)
Тесты уже замьючены
29 files changed, 379 insertions, 1 deletions
diff --git a/ydb/core/base/blobstorage.cpp b/ydb/core/base/blobstorage.cpp index b891d7777d1..765dd34d351 100644 --- a/ydb/core/base/blobstorage.cpp +++ b/ydb/core/base/blobstorage.cpp @@ -118,6 +118,18 @@ std::unique_ptr<TEvBlobStorage::TEvBlockResult> TEvBlobStorage::TEvBlock::MakeEr return res; } +void TEvBlobStorage::TEvGetBlock::ToSpan(NWilson::TSpan& span) const { + span + .Attribute("TabletId", ::ToString(TabletId)); +} + +std::unique_ptr<TEvBlobStorage::TEvGetBlockResult> TEvBlobStorage::TEvGetBlock::MakeErrorResponse( + NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) { + auto res = std::make_unique<TEvGetBlockResult>(status, TabletId, 0); + res->ErrorReason = errorReason; + return res; +} + void TEvBlobStorage::TEvPatch::ToSpan(NWilson::TSpan& span) const { span .Attribute("OriginalGroupId", OriginalGroupId) diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index fb48e5185b6..f473d118eb9 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -493,6 +493,7 @@ struct TEvBlobStorage { EvAssimilate, EvGetQueuesInfo, // for debugging purposes + EvGetBlock, // EvPutResult = EvPut + 512, /// 268 632 576 @@ -509,6 +510,7 @@ struct TEvBlobStorage { EvAssimilateResult, EvQueuesInfo, // for debugging purposes + EvGetBlockResult, // proxy <-> vdisk interface EvVPut = EvPut + 2 * 512, /// 268 633 088 @@ -915,6 +917,7 @@ struct TEvBlobStorage { struct TEvPutResult; struct TEvGetResult; + struct TEvGetBlockResult; struct TEvBlockResult; struct TEvDiscoverResult; struct TEvRangeResult; @@ -1330,6 +1333,68 @@ struct TEvBlobStorage { } }; + struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> { + const ui64 TabletId; + const TInstant Deadline; + ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; + + TEvGetBlock(ui64 tabletId, TInstant deadline) + : TabletId(tabletId) + , Deadline(deadline) + {} + + TString Print(bool /*isFull*/) const { + TStringStream str; + str << "TEvGetBlock {TabletId# " << TabletId + << " Deadline# " << Deadline + << "}"; + return str.Str(); + } + + TString ToString() const { + return Print(false); + } + + ui32 CalculateSize() const { + return sizeof(*this); + } + + void ToSpan(NWilson::TSpan& span) const; + + std::unique_ptr<TEvGetBlockResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason, + TGroupId groupId); + }; + + struct TEvGetBlockResult : public TEventLocal<TEvGetBlockResult, EvGetBlockResult> { + NKikimrProto::EReplyStatus Status; + ui64 TabletId; + ui32 BlockedGeneration; + TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; + + TEvGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 blockedGeneration) + : Status(status) + , TabletId(tabletId) + , BlockedGeneration(blockedGeneration) + {} + + TString Print(bool /*isFull*/) const { + TStringStream str; + str << "TEvGetBlockResult {Status# " << NKikimrProto::EReplyStatus_Name(Status).data(); + str << " TabletId# " << TabletId << " BlockedGeneration# " << BlockedGeneration; + if (ErrorReason.size()) { + str << " ErrorReason# \"" << ErrorReason << "\""; + } + str << "}"; + return str.Str(); + } + + TString ToString() const { + return Print(false); + } + }; + struct TEvBlock : public TEventLocal<TEvBlock, EvBlock> { const ui64 TabletId; const ui32 Generation; diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index dc267c84c19..2add1b6637c 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -11,6 +11,7 @@ namespace NKikimr::NBlobDepot { XX(EvPut) \ XX(EvGet) \ XX(EvBlock) \ + XX(EvGetBlock) \ XX(EvDiscover) \ XX(EvRange) \ XX(EvCollectGarbage) \ @@ -124,6 +125,7 @@ namespace NKikimr::NBlobDepot { TEvBlobDepot::TEvRegisterAgentResult*, TEvBlobDepot::TEvAllocateIdsResult*, TEvBlobDepot::TEvBlockResult*, + TEvBlobDepot::TEvGetBlockResult*, TEvBlobDepot::TEvQueryBlocksResult*, TEvBlobDepot::TEvCollectGarbageResult*, TEvBlobDepot::TEvCommitBlobSeqResult*, @@ -228,6 +230,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvGetBlockResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse); diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 1b92ee886ad..193d537fea2 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -168,6 +168,7 @@ namespace NKikimr::NBlobDepot { template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCollectGarbage msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context); + template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvGetBlock msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCommitBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context); template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context); diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 6527bf6f7e2..087ec669bf7 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -93,6 +93,7 @@ namespace NKikimr::NBlobDepot { template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvGetBlockResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); diff --git a/ydb/core/blob_depot/agent/storage_get_block.cpp b/ydb/core/blob_depot/agent/storage_get_block.cpp new file mode 100644 index 00000000000..31e39bd6f00 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_get_block.cpp @@ -0,0 +1,23 @@ +#include "agent_impl.h" +#include "blocks.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGetBlock>(std::unique_ptr<IEventHandle> ev) { + class TGetBlockQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGetBlock> { + public: + using TBlobStorageQuery::TBlobStorageQuery; + + void Initiate() override { + // TODO: implement GetBlock logic for BlobDepot + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { + // TODO: implement GetBlock logic for BlobDepot + } + }; + return new TGetBlockQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/ya.make b/ydb/core/blob_depot/agent/ya.make index e09528669db..82195569e12 100644 --- a/ydb/core/blob_depot/agent/ya.make +++ b/ydb/core/blob_depot/agent/ya.make @@ -24,6 +24,7 @@ LIBRARY() # DS Proxy queries storage_put.cpp storage_get.cpp + storage_get_block.cpp storage_block.cpp storage_discover.cpp storage_range.cpp diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 19cc2596242..0cf46be7154 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -36,6 +36,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvDiscardSpoiledBlobSeq, Handle); hFunc(TEvBlobDepot::TEvResolve, Data->Handle); hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle); + hFunc(TEvBlobDepot::TEvGetBlock, BlocksManager->Handle); hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle); hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle); hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); @@ -107,6 +108,7 @@ namespace NKikimr::NBlobDepot { fFunc(TEvBlobDepot::EvDiscardSpoiledBlobSeq, handleFromAgentPipe); fFunc(TEvBlobDepot::EvResolve, handleFromAgentPipe); fFunc(TEvBlobDepot::EvBlock, handleFromAgentPipe); + fFunc(TEvBlobDepot::EvGetBlock, handleFromAgentPipe); fFunc(TEvBlobDepot::EvQueryBlocks, handleFromAgentPipe); fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe); fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe); diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index bc6172c0bbe..6dc49379f7f 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -313,6 +313,10 @@ namespace NKikimr::NBlobDepot { TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr } + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvGetBlock::TPtr ev) { + // TODO: implement GetBlock logic for BlobDepot + } + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { TAgent& agent = Self->GetAgent(ev->Recipient); const ui32 agentId = agent.Connection->NodeId; diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h index decb038d0e2..2bb7c926464 100644 --- a/ydb/core/blob_depot/blocks.h +++ b/ydb/core/blob_depot/blocks.h @@ -44,6 +44,7 @@ namespace NKikimr::NBlobDepot { void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, std::unique_ptr<IEventHandle> response); void Handle(TEvBlobDepot::TEvBlock::TPtr ev); + void Handle(TEvBlobDepot::TEvGetBlock::TPtr ev); void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); bool CheckBlock(ui64 tabletId, ui32 generation) const; diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index aa001274f42..290372d9a98 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -17,7 +17,9 @@ namespace NKikimr { EvPushNotify, EvPushNotifyResult, EvBlock, + EvGetBlock, EvBlockResult, + EvGetBlockResult, EvQueryBlocks, EvQueryBlocksResult, EvCollectGarbage, @@ -63,6 +65,8 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult); BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration, IssuerGuid); BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason, TimeToLiveMs); + BLOBDEPOT_EVENT_PB(EvGetBlock, TabletId); + BLOBDEPOT_EVENT_PB(EvGetBlockResult, Status, ErrorReason, TabletId, BlockedGeneration); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult); BLOBDEPOT_EVENT_PB_NO_ARGS(EvCollectGarbage); @@ -81,6 +85,7 @@ namespace NKikimr { template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; }; template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; + template<> struct TResponseFor<TEvGetBlock> { using Type = TEvGetBlockResult; }; template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; }; template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; @@ -103,6 +108,7 @@ namespace NKikimr { template<> struct TEventFor<NKikimrBlobDepot::TEvCollectGarbage> { using Type = TEvCollectGarbage; }; template<> struct TEventFor<NKikimrBlobDepot::TEvQueryBlocks> { using Type = TEvQueryBlocks; }; template<> struct TEventFor<NKikimrBlobDepot::TEvBlock> { using Type = TEvBlock; }; + template<> struct TEventFor<NKikimrBlobDepot::TEvGetBlock> { using Type = TEvGetBlock; }; template<> struct TEventFor<NKikimrBlobDepot::TEvResolve> { using Type = TEvResolve; }; template<> struct TEventFor<NKikimrBlobDepot::TEvCommitBlobSeq> { using Type = TEvCommitBlobSeq; }; template<> struct TEventFor<NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq> { using Type = TEvDiscardSpoiledBlobSeq; }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index e0ae85c22da..0b3479eb855 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -143,6 +143,7 @@ NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus sta XX(TEvBlobStorage::TEvPut) \ XX(TEvBlobStorage::TEvGet) \ XX(TEvBlobStorage::TEvBlock) \ + XX(TEvBlobStorage::TEvGetBlock) \ XX(TEvBlobStorage::TEvDiscover) \ XX(TEvBlobStorage::TEvRange) \ XX(TEvBlobStorage::TEvCollectGarbage) \ @@ -486,6 +487,16 @@ struct TBlobStorageGroupBlockParameters { }; IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params); +struct TBlobStorageGroupGetBlockParameters { + TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvGetBlock> Common; + TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = { + .LogComponent = NKikimrServices::BS_PROXY_GETBLOCK, + .Name = "DSProxy.GetBlock", + .Activity = NKikimrServices::TActivity::BS_GROUP_GETBLOCK, + }; +}; +IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params); + struct TBlobStorageGroupStatusParameters { TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvStatus> Common; TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp new file mode 100644 index 00000000000..d04f67f74bc --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_block.cpp @@ -0,0 +1,128 @@ +#include "dsproxy.h" +#include "dsproxy_mon.h" +#include "dsproxy_quorum_tracker.h" +#include <ydb/core/blobstorage/base/blobstorage_events.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> + +namespace NKikimr { +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// GET BLOCK request +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class TBlobStorageGroupGetBlockRequest : public TBlobStorageGroupRequestActor { + const ui64 TabletId; + ui64 Generation; + const TInstant Deadline; + ui64 Requests = 0; + ui64 Responses = 0; + TGroupQuorumTracker QuorumTracker; + + void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) { + ProcessReplyFromQueue(ev->Get()); + const NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record; + Y_ABORT_UNLESS(record.HasStatus()); + NKikimrProto::EReplyStatus status = record.GetStatus(); + Y_ABORT_UNLESS(record.HasVDiskID()); + const TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID()); + + DSP_LOG_LOG_S(PriorityForStatusInbound(status), "DSPGB01", "Handle TEvVGetBlockResult" + << " status# " << NKikimrProto::EReplyStatus_Name(status) + << " From# " << vdisk.ToString() + << " NodeId# " << Info->GetActorId(vdisk).NodeId()); + + if (record.HasGeneration()) { + Generation = Max<ui32>(Generation, record.GetGeneration()); + } + if (status == NKikimrProto::NODATA) { + status = NKikimrProto::OK; // assume OK for quorum tracker + } + ++Responses; + + switch (const NKikimrProto::EReplyStatus overallStatus = QuorumTracker.ProcessReply(vdisk, status)) { + case NKikimrProto::OK: + if (Responses == Requests) { + ReplyAndDie(NKikimrProto::OK); + } + break; + + case NKikimrProto::ERROR: + ReplyAndDie(NKikimrProto::ERROR); + break; + + default: + break; + } + } + + void ReplyAndDie(NKikimrProto::EReplyStatus status) override { + auto result = std::make_unique<TEvBlobStorage::TEvGetBlockResult>(status, TabletId, Generation); + result->ErrorReason = ErrorReason; + DSP_LOG_DEBUG_S("DSPGB02", "ReplyAndDie Result# " << result->Print(false)); + SendResponseAndDie(std::move(result)); + } + + void SendGetBlockRequest(const TVDiskID& vdiskId) { + DSP_LOG_DEBUG_S("DSPB03", "Sending TEvVBlock Tablet# " << TabletId + << " Generation# " << Generation + << " vdiskId# " << vdiskId + << " node# " << Info->GetActorId(vdiskId).NodeId()); + + auto msg = std::make_unique<TEvBlobStorage::TEvVGetBlock>(TabletId, vdiskId, Deadline); + SendToQueue(std::move(msg), 0); + } + + std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override { + ++*Mon->NodeMon->RestartGetBlock; + auto ev = std::make_unique<TEvBlobStorage::TEvGetBlock>(TabletId, Deadline); + ev->RestartCounter = counter; + return ev; + } +public: + ::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override { + return Mon->ActiveGetBlock; + } + + ERequestType GetRequestType() const override { + return ERequestType::GetBlock; + } + + TBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters& params) + : TBlobStorageGroupRequestActor(params) + , TabletId(params.Common.Event->TabletId) + , Deadline(params.Common.Event->Deadline) + , QuorumTracker(Info.Get()) + {} + + void Bootstrap() override { + DSP_LOG_INFO_S("DSPGB04", "bootstrap" + << " ActorId# " << SelfId() + << " Group# " << Info->GroupID + << " Deadline# " << Deadline + << " RestartCounter# " << RestartCounter); + for (const auto& vdisk : Info->GetVDisks()) { + SendGetBlockRequest(Info->GetVDiskId(vdisk.OrderNumber)); + ++Requests; + } + + Become(&TBlobStorageGroupGetBlockRequest::StateWait); + + if (Requests == 0) { + ReplyAndDie(NKikimrProto::OK); + } + } + + STATEFN(StateWait) { + if (ProcessEvent(ev)) { + return; + } + switch (ev->GetTypeRewrite()) { + hFunc(TEvBlobStorage::TEvVGetBlockResult, Handle); + } + } +}; + +IActor* CreateBlobStorageGroupGetBlockRequest(TBlobStorageGroupGetBlockParameters params) { + return new TBlobStorageGroupGetBlockRequest(params); +} + +} // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 3d7fb25129e..c5cd9bf062c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -139,6 +139,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> Mon->EventGet->Inc(); } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvBlock>) { Mon->EventBlock->Inc(); + } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvGetBlock>) { + Mon->EventGetBlock->Inc(); } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvDiscover>) { Mon->EventDiscover->Inc(); } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvRange>) { @@ -258,6 +260,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> void PushRequest(IActor *actor, TInstant deadline); void CheckDeadlines(); void HandleNormal(TEvBlobStorage::TEvGet::TPtr &ev); + void HandleNormal(TEvBlobStorage::TEvGetBlock::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvPut::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvPatch::TPtr &ev); @@ -371,6 +374,7 @@ public: #define HANDLE_EVENTS(HANDLER) \ hFunc(TEvBlobStorage::TEvPut, HANDLER); \ hFunc(TEvBlobStorage::TEvGet, HANDLER); \ + hFunc(TEvBlobStorage::TEvGetBlock, HANDLER); \ hFunc(TEvBlobStorage::TEvBlock, HANDLER); \ hFunc(TEvBlobStorage::TEvDiscover, HANDLER); \ hFunc(TEvBlobStorage::TEvRange, HANDLER); \ diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp index bf35722ac6a..22ca4793da6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp @@ -37,6 +37,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni EventGet = EventGroup->GetCounter("EvGet", true); EventGetResBytes = EventGroup->GetCounter("EvGetResBytes", true); EventBlock = EventGroup->GetCounter("EvBlock", true); + EventGetBlock = EventGroup->GetCounter("EvGetBlock", true); EventDiscover = EventGroup->GetCounter("EvDiscover", true); EventRange = EventGroup->GetCounter("EvRange", true); EventCollectGarbage = EventGroup->GetCounter("EvCollectGarbage", true); @@ -68,6 +69,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni ActiveGet = ActiveRequestsGroup->GetCounter("ActiveGet"); ActiveGetCapacity = ActiveRequestsGroup->GetCounter("ActiveGetCapacity"); ActiveBlock = ActiveRequestsGroup->GetCounter("ActiveBlock"); + ActiveGetBlock = ActiveRequestsGroup->GetCounter("ActiveGetBlock"); ActiveDiscover = ActiveRequestsGroup->GetCounter("ActiveDiscover"); ActiveRange = ActiveRequestsGroup->GetCounter("ActiveRange"); ActiveCollectGarbage = ActiveRequestsGroup->GetCounter("ActiveCollectGarbage"); @@ -101,6 +103,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni auto respStatGroup = NodeMon->Group->GetSubgroup("subsystem", "responseStatus"); RespStatPut.emplace(respStatGroup->GetSubgroup("request", "put")); RespStatGet.emplace(respStatGroup->GetSubgroup("request", "get")); + RespStatGetBlock.emplace(respStatGroup->GetSubgroup("request", "getBlock")); RespStatBlock.emplace(respStatGroup->GetSubgroup("request", "block")); RespStatDiscover.emplace(respStatGroup->GetSubgroup("request", "discover")); RespStatRange.emplace(respStatGroup->GetSubgroup("request", "range")); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h index b7a0a7a6955..96911c6fb15 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h @@ -30,6 +30,7 @@ enum class ERequestType { Status, Assimilate, Block, + GetBlock, }; struct TRequestMonGroup { @@ -196,6 +197,7 @@ protected: TRequestMonGroup StatusGroup; TRequestMonGroup AssimilateGroup; TRequestMonGroup BlockGroup; + TRequestMonGroup GetBlockGroup; public: TBlobStorageGroupProxyTimeStats TimeStats; @@ -210,6 +212,7 @@ public: // event counters ::NMonitoring::TDynamicCounters::TCounterPtr EventGet; ::NMonitoring::TDynamicCounters::TCounterPtr EventBlock; + ::NMonitoring::TDynamicCounters::TCounterPtr EventGetBlock; ::NMonitoring::TDynamicCounters::TCounterPtr EventDiscover; ::NMonitoring::TDynamicCounters::TCounterPtr EventRange; ::NMonitoring::TDynamicCounters::TCounterPtr EventCollectGarbage; @@ -230,6 +233,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr ActivePut; ::NMonitoring::TDynamicCounters::TCounterPtr ActivePutCapacity; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveGet; + ::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetBlock; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveGetCapacity; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveBlock; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveDiscover; @@ -244,6 +248,7 @@ public: std::optional<TResponseStatusGroup> RespStatPut; std::optional<TResponseStatusGroup> RespStatGet; + std::optional<TResponseStatusGroup> RespStatGetBlock; std::optional<TResponseStatusGroup> RespStatBlock; std::optional<TResponseStatusGroup> RespStatDiscover; std::optional<TResponseStatusGroup> RespStatRange; @@ -268,6 +273,8 @@ public: case ERequestType::Status: return StatusGroup; case ERequestType::Assimilate: return AssimilateGroup; case ERequestType::Block: return BlockGroup; + case ERequestType::GetBlock: return GetBlockGroup; + } Y_ABORT(); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp index 24bbbb04563..a0ae27f7060 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp @@ -51,6 +51,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters> auto group = Group->GetSubgroup("subsystem", "restart"); RestartPut = group->GetCounter("EvPut", true); RestartGet = group->GetCounter("EvGet", true); + RestartGetBlock = group->GetCounter("EvGetBlock", true); RestartPatch = group->GetCounter("EvPatch", true); RestartBlock = group->GetCounter("EvBlock", true); RestartDiscover = group->GetCounter("EvDiscover", true); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h index 0c395cd5e14..3ed2154c73f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h @@ -68,6 +68,7 @@ struct TDsProxyNodeMon : public TThrRefBase { // restart counters ::NMonitoring::TDynamicCounters::TCounterPtr RestartPut; ::NMonitoring::TDynamicCounters::TCounterPtr RestartGet; + ::NMonitoring::TDynamicCounters::TCounterPtr RestartGetBlock; ::NMonitoring::TDynamicCounters::TCounterPtr RestartBlock; ::NMonitoring::TDynamicCounters::TCounterPtr RestartDiscover; ::NMonitoring::TDynamicCounters::TCounterPtr RestartRange; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index a8bc3b422a5..0626a4070cf 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -259,6 +259,29 @@ namespace NKikimr { ); } + void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvGetBlock::TPtr &ev) { + EnsureMonitoring(true); + Mon->EventGetBlock->Inc(); + PushRequest(CreateBlobStorageGroupGetBlockRequest( + TBlobStorageGroupGetBlockParameters{ + .Common = { + .GroupInfo = Info, + .GroupQueues = Sessions->GroupQueues, + .Mon = Mon, + .Source = ev->Sender, + .Cookie = ev->Cookie, + .Now = TActivationContext::Monotonic(), + .StoragePoolCounters = StoragePoolCounters, + .RestartCounter = ev->Get()->RestartCounter, + .TraceId = std::move(ev->TraceId), + .Event = ev->Get(), + .ExecutionRelay = ev->Get()->ExecutionRelay + } + }), + ev->Get()->Deadline + ); + } + void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvPatch::TPtr &ev) { if (IsLimitedKeyless) { ErrorDescription = "Created as LIMITED without keys. It happens when tenant keys are missing on the node."; @@ -808,6 +831,7 @@ namespace NKikimr { XX(Put) XX(Get) + XX(GetBlock) XX(Block) XX(Discover) XX(Range) diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp index 29c62d7d5a7..1e3da2b8424 100644 --- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp +++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp @@ -60,6 +60,11 @@ namespace NKikimr { Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } + void Handle(TEvBlobStorage::TEvGetBlock::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PROXY, BSPM11, "TEvGetBlock", (Msg, ev->Get()->ToString())); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); + } + template<typename TOut, typename TIn> TOut *CopyExecutionRelay(TIn *in, TOut *out) { out->ExecutionRelay = std::move(in->ExecutionRelay); @@ -81,6 +86,7 @@ namespace NKikimr { hFunc(TEvBlobStorage::TEvPut, Handle); hFunc(TEvBlobStorage::TEvGet, Handle); hFunc(TEvBlobStorage::TEvBlock, Handle); + hFunc(TEvBlobStorage::TEvGetBlock, Handle); hFunc(TEvBlobStorage::TEvDiscover, Handle); hFunc(TEvBlobStorage::TEvRange, Handle); hFunc(TEvBlobStorage::TEvCollectGarbage, Handle); diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h index 07974f44289..6a3eafdcaa4 100644 --- a/ydb/core/blobstorage/dsproxy/mock/model.h +++ b/ydb/core/blobstorage/dsproxy/mock/model.h @@ -223,6 +223,20 @@ namespace NFake { return new TEvBlobStorage::TEvBlockResult(status); } + TEvBlobStorage::TEvGetBlockResult* Handle(TEvBlobStorage::TEvGetBlock *msg) { + NKikimrProto::EReplyStatus status = NKikimrProto::OK; + ui32 generation = 0; + + auto it = Blocks.find(msg->TabletId); + if (it != Blocks.end()) { + generation = it->second; + } else { + status = NKikimrProto::NODATA; + } + + return new TEvBlobStorage::TEvGetBlockResult(status, msg->TabletId, generation); + } + TEvBlobStorage::TEvDiscoverResult* Handle(TEvBlobStorage::TEvDiscover *msg) { ui32 blockedGeneration = 0; if (msg->DiscoverBlockedGeneration) { diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp index 9e9d61fa2a6..ad0f494106f 100644 --- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp @@ -217,11 +217,13 @@ protected: , MessageVGetResult , MessageVPutResult , MessageVBlockResult + , MessageVGetBlockResult , MessageRangeResult , MessageDiscoverResult , MessageCollectGarbageResult , MessageStatusResult , MessageBlockResult + , MessageGetBlockResult , MessageStartProfilerResult , MessageStopProfilerResult , MessageVStatusResult @@ -421,6 +423,14 @@ protected: ActTestFSM(ctx); } + void HandleGetBlockResult(TEvBlobStorage::TEvGetBlockResult::TPtr &ev, const TActorContext &ctx) { + LastResponse.Message = TResponseData::MessageGetBlockResult; + TEvBlobStorage::TEvGetBlockResult *msg = ev->Get(); + VERBOSE_COUT("HandleGetBlockResult: " << StatusToString(msg->Status)); + LastResponse.Status = msg->Status; + ActTestFSM(ctx); + } + void HandlePutResult(TEvBlobStorage::TEvPutResult::TPtr &ev, const TActorContext &ctx) { LastResponse.Message = TResponseData::MessagePutResult; TEvBlobStorage::TEvPutResult *msg = ev->Get(); @@ -562,6 +572,16 @@ protected: ActTestFSM(ctx); } + void HandleVGetBlockResult(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev, const TActorContext &ctx) { + LastResponse.Message = TResponseData::MessageVGetBlockResult; + const NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record; + + VERBOSE_COUT("HandleVGetBlockResult: " << StatusToString(record.GetStatus())); + + LastResponse.Status = record.GetStatus(); + ActTestFSM(ctx); + } + void HandleVStatusResult(TEvBlobStorage::TEvVStatusResult::TPtr &ev, const TActorContext &ctx) { LastResponse.Message = TResponseData::MessageVStatusResult; const NKikimrBlobStorage::TEvVStatusResult &record = ev->Get()->Record; @@ -639,12 +659,14 @@ public: HFunc(TEvBlobStorage::TEvVGetResult, HandleVGetResult); HFunc(TEvBlobStorage::TEvVPutResult, HandleVPutResult); HFunc(TEvBlobStorage::TEvVBlockResult, HandleVBlockResult); + HFunc(TEvBlobStorage::TEvVGetBlockResult, HandleVGetBlockResult); HFunc(TEvBlobStorage::TEvVStatusResult, HandleVStatusResult); HFunc(TEvBlobStorage::TEvStatusResult, HandleStatusResult); HFunc(TEvBlobStorage::TEvVCompactResult, HandleVCompactResult); HFunc(TEvBlobStorage::TEvDiscoverResult, HandleDiscoverResult); HFunc(TEvBlobStorage::TEvCollectGarbageResult, HandleCollectGarbageResult); HFunc(TEvBlobStorage::TEvBlockResult, HandleBlockResult); + HFunc(TEvBlobStorage::TEvGetBlockResult, HandleGetBlockResult); HFunc(TEvProfiler::TEvStartResult, HandleStartProfilerResult); HFunc(TEvProfiler::TEvStopResult, HandleStopProfilerResult); HFunc(TEvProxyQueueState, HandleProxyQueueState); diff --git a/ydb/core/blobstorage/dsproxy/ya.make b/ydb/core/blobstorage/dsproxy/ya.make index 89259dc7588..46938b6b954 100644 --- a/ydb/core/blobstorage/dsproxy/ya.make +++ b/ydb/core/blobstorage/dsproxy/ya.make @@ -17,6 +17,7 @@ SRCS( dsproxy_discover_m3dc.cpp dsproxy_discover_m3of4.cpp dsproxy_get.cpp + dsproxy_get_block.cpp dsproxy_get_impl.cpp dsproxy_get_impl.h dsproxy_indexrestoreget.cpp diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 7b6d7912140..92d6535a63d 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -66,6 +66,7 @@ STATEFN(TNodeWarden::StateOnline) { switch (ev->GetTypeRewrite()) { fFunc(TEvBlobStorage::TEvPut::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvGet::EventType, HandleForwarded); + fFunc(TEvBlobStorage::TEvGetBlock::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvBlock::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvPatch::EventType, HandleForwarded); fFunc(TEvBlobStorage::TEvDiscover::EventType, HandleForwarded); diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp index b01ffaafe08..0ef628e584f 100644 --- a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp +++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp @@ -19,6 +19,7 @@ namespace NKikimr::NTesting { switch (ev.GetTypeRewrite()) { // all of these events may alter storage state #define QUERY(EV) case TEvBlobStorage::EV: return ExamineResultEvent<TEvBlobStorage::T##EV>(nodeId, ev); QUERY(EvBlockResult) + QUERY(EvGetBlockResult) QUERY(EvPutResult) QUERY(EvPatchResult) QUERY(EvInplacePatchResult) @@ -34,6 +35,7 @@ namespace NKikimr::NTesting { switch (ev.GetTypeRewrite()) { // all of these events may alter storage state #define RESULT(EV) case TEvBlobStorage::EV: return ExamineQueryEvent<TEvBlobStorage::T##EV>(nodeId, ev, TEvBlobStorage::EV##Result); RESULT(EvBlock) + RESULT(EvGetBlock) RESULT(EvPut) RESULT(EvPatch) RESULT(EvInplacePatch) @@ -101,7 +103,8 @@ namespace NKikimr::NTesting { else if constexpr (T::EventType != TEvBlobStorage::EvBlockResult && T::EventType != TEvBlobStorage::EvInplacePatchResult && T::EventType != TEvBlobStorage::EvCollectGarbageResult && - T::EventType != TEvBlobStorage::EvDiscoverResult) { + T::EventType != TEvBlobStorage::EvDiscoverResult && + T::EventType != TEvBlobStorage::EvGetBlockResult) { Y_ABORT_UNLESS(groupId == msg.GroupId.GetRawId()); } diff --git a/ydb/core/blobstorage/ut_blobstorage/get_block.cpp b/ydb/core/blobstorage/ut_blobstorage/get_block.cpp new file mode 100644 index 00000000000..63691e36fdc --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/get_block.cpp @@ -0,0 +1,19 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> + +Y_UNIT_TEST_SUITE(GetBlock) { + Y_UNIT_TEST(EmptyGetBlockCmd) { + TEnvironmentSetup env({ + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + }); + auto& runtime = env.Runtime; + env.CreateBoxAndPool(1, 1); + auto info = env.GetGroupInfo(env.GetGroups().front()); + auto ev = std::make_unique<TEvBlobStorage::TEvGetBlock>(1u, TInstant::Max()); + const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, info->GroupID, ev.release()); + }); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetBlockResult>(edge); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index 8dbb935faed..94e2b71de65 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -28,6 +28,7 @@ SRCS( gc.cpp gc_quorum_3dc.cpp get.cpp + get_block.cpp group_reconfiguration.cpp incorrect_queries.cpp index_restore_get.cpp diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index c7fe27a8ad0..0f42d17a5d8 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -112,6 +112,17 @@ message TEvBlockResult { optional uint32 TimeToLiveMs = 3; } +message TEvGetBlock { + optional fixed64 TabletId = 1; +} + +message TEvGetBlockResult { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; + optional fixed64 TabletId = 3; + optional uint32 BlockedGeneration = 4; +} + message TEvPushNotify { message TBlockedTablet { optional fixed64 TabletId = 1; diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 76cf699b2da..5bd248386be 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -52,6 +52,7 @@ enum EServiceKikimr { BS_VDISK_DEFRAG = 346; BS_PROXY_ASSIMILATE = 347; BS_VDISK_BALANCING = 2600; + BS_PROXY_GETBLOCK = 2601; // DATASHARD section // TX_DATASHARD = 290; // @@ -1068,5 +1069,6 @@ message TActivity { RESHUFFLE_KMEANS_SCAN_ACTOR = 650; FEATURE_FLAGS_CONFIGURATOR = 651; DATASHARD_READ_SCAN = 652; + BS_GROUP_GETBLOCK = 653; }; }; |