diff options
author | Aleksandr Dmitriev <monster@ydb.tech> | 2025-04-28 23:45:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-28 23:45:22 +0300 |
commit | 86255fd7869f0b22f7e48cf2146156eca530352f (patch) | |
tree | 92a9af7703c46045a86a4f478b7b029092042e99 | |
parent | 0514f287ac7ea44cfde374f3e08fbbc4eaa8920e (diff) | |
download | ydb-86255fd7869f0b22f7e48cf2146156eca530352f.tar.gz |
introduce new CheckIntegrity query in DSProxy (#17626)
-rw-r--r-- | ydb/core/base/blobstorage.cpp | 13 | ||||
-rw-r--r-- | ydb/core/base/blobstorage.h | 92 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy.h | 11 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp | 198 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_mon.h | 8 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_request.cpp | 25 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/mock/model.h | 8 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ya.make | 1 | ||||
-rw-r--r-- | ydb/library/services/services.proto | 2 |
16 files changed, 382 insertions, 5 deletions
diff --git a/ydb/core/base/blobstorage.cpp b/ydb/core/base/blobstorage.cpp index 765dd34d351..dad40ebca01 100644 --- a/ydb/core/base/blobstorage.cpp +++ b/ydb/core/base/blobstorage.cpp @@ -105,6 +105,19 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR return res; } +void TEvBlobStorage::TEvCheckIntegrity::ToSpan(NWilson::TSpan& span) const { + span + .Attribute("Id", Id.ToString()) + .Attribute("GetHandleClass", NKikimrBlobStorage::EGetHandleClass_Name(GetHandleClass)); +} + +std::unique_ptr<TEvBlobStorage::TEvCheckIntegrityResult> TEvBlobStorage::TEvCheckIntegrity::MakeErrorResponse( + NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) { + auto res = std::make_unique<TEvCheckIntegrityResult>(status); + res->ErrorReason = errorReason; + return res; +} + void TEvBlobStorage::TEvBlock::ToSpan(NWilson::TSpan& span) const { span .Attribute("TabletId", ::ToString(TabletId)) diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 295252cdd29..289bf901228 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -494,6 +494,7 @@ struct TEvBlobStorage { EvGetQueuesInfo, // for debugging purposes EvGetBlock, + EvCheckIntegrity, // EvPutResult = EvPut + 512, /// 268 632 576 @@ -511,6 +512,7 @@ struct TEvBlobStorage { EvQueuesInfo, // for debugging purposes EvGetBlockResult, + EvCheckIntegrityResult, // proxy <-> vdisk interface EvVPut = EvPut + 2 * 512, /// 268 633 088 @@ -951,6 +953,7 @@ struct TEvBlobStorage { struct TEvPutResult; struct TEvGetResult; + struct TEvCheckIntegrityResult; struct TEvGetBlockResult; struct TEvBlockResult; struct TEvDiscoverResult; @@ -1367,6 +1370,95 @@ struct TEvBlobStorage { } }; + struct TEvCheckIntegrity : public TEventLocal<TEvCheckIntegrity, EvCheckIntegrity> { + TLogoBlobID Id; + TInstant Deadline; + NKikimrBlobStorage::EGetHandleClass GetHandleClass; + + ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; + + TEvCheckIntegrity( + const TLogoBlobID& id, + TInstant deadline, + NKikimrBlobStorage::EGetHandleClass getHandleClass) + : Id(id) + , Deadline(deadline) + , GetHandleClass(getHandleClass) + {} + + TString Print(bool /*isFull*/) const { + TStringStream str; + str << "TEvCheckIntegrity {" + << " Id# " << Id + << " Deadline# " << Deadline + << " GetHandleClass# " << NKikimrBlobStorage::EGetHandleClass_Name(GetHandleClass) + << " }"; + return str.Str(); + } + + TString ToString() const { + return Print(false); + } + + ui32 CalculateSize() const { + return sizeof(*this); + } + + void ToSpan(NWilson::TSpan& span) const; + + std::unique_ptr<TEvCheckIntegrityResult> MakeErrorResponse( + NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId); + }; + + struct TEvCheckIntegrityResult : public TEventLocal<TEvCheckIntegrityResult, EvCheckIntegrityResult> { + TLogoBlobID Id; + NKikimrProto::EReplyStatus Status; + // OK - we were able to check the integrity + // any other status - some problem prevents the check, ErrorReason contains detailed info + // for example if the group is disintegrated, the status is ERROR + + TString ErrorReason; + + enum EPlacementStatus { + PS_OK = 1, // blob parts are placed according to fail model + PS_ERROR = 2, // blob parts are definitely placed incorrectly or there are missing parts for sure + PS_UNKNOWN = 3, // status is unknown because of missing disks or network problems + PS_NOT_YET = 4, // there are missing parts but status may become OK after replication + }; + EPlacementStatus PlacementStatus; + + // TODO: calculate data status + enum EDataStatus { + DS_OK = 1, // all data parts contain valid data + DS_ERROR = 2, // some parts definitely contain invalid data + DS_UNKNOWN = 3, // status is unknown because of missing disks or network problems + }; + EDataStatus DataStatus; + + std::shared_ptr<TExecutionRelay> ExecutionRelay; + + TEvCheckIntegrityResult(NKikimrProto::EReplyStatus status) + : Status(status) + {} + + TString Print(bool /*isFull*/) const { + TStringStream str; + str << "TEvCheckIntegrityResult {" + << " Id# " << Id + << " Status# " << NKikimrProto::EReplyStatus_Name(Status) + << " ErrorReason# " << ErrorReason + << " PlacementStatus# " << (int)PlacementStatus + << " DataStatus# " << (int)DataStatus + << " }"; + return str.Str(); + } + + TString ToString() const { + return Print(false); + } + }; + struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> { const ui64 TabletId; const TInstant Deadline; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 3b9c0c87f5a..1bf26b4ffab 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -132,6 +132,7 @@ NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus sta XX(TEvBlobStorage::TEvStatus) \ XX(TEvBlobStorage::TEvPatch) \ XX(TEvBlobStorage::TEvAssimilate) \ + XX(TEvBlobStorage::TEvCheckIntegrity) \ // #define DSPROXY_ENUM_DISK_EVENTS(XX) \ @@ -427,6 +428,16 @@ struct TBlobStorageGroupRestoreGetParameters { }; IActor* CreateBlobStorageGroupIndexRestoreGetRequest(TBlobStorageGroupRestoreGetParameters params); +struct TBlobStorageGroupCheckIntegrityParameters { + TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvCheckIntegrity> Common; + TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = { + .LogComponent = NKikimrServices::BS_PROXY_CHECKINTEGRITY, + .Name = "DSProxy.CheckIntegrity", + .Activity = NKikimrServices::TActivity::BS_PROXY_CHECKINTEGRITY_ACTOR, + }; +}; +IActor* CreateBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters params); + struct TBlobStorageGroupDiscoverParameters { TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvDiscover> Common; TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp new file mode 100644 index 00000000000..ec6eb5268b3 --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp @@ -0,0 +1,198 @@ +#include "dsproxy.h" +#include "dsproxy_mon.h" +#include "dsproxy_quorum_tracker.h" +#include "dsproxy_blob_tracker.h" +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> + +namespace NKikimr { + + +class TBlobStorageGroupCheckIntegrityRequest : public TBlobStorageGroupRequestActor { + const TLogoBlobID Id; + const TInstant Deadline; + const NKikimrBlobStorage::EGetHandleClass GetHandleClass; + + TGroupQuorumTracker QuorumTracker; + std::unique_ptr<TBlobStatusTracker> BlobStatus; // treats NOT_YET as ERROR + std::unique_ptr<TBlobStatusTracker> BlobStatusOptimistic; // treats NOT_YET as currently replicating -> OK in the future + + ui32 VGetsInFlight = 0; + + using TEvCheckIntegrityResult = TEvBlobStorage::TEvCheckIntegrityResult; + std::unique_ptr<TEvCheckIntegrityResult> PendingResult; + + void ReplyAndDie(NKikimrProto::EReplyStatus status) override { + if (status != NKikimrProto::OK) { + PendingResult.reset(new TEvCheckIntegrityResult(status)); + PendingResult->ErrorReason = ErrorReason; + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_UNKNOWN; + PendingResult->DataStatus = TEvCheckIntegrityResult::DS_UNKNOWN; + PendingResult->Id = Id; + } + + SendResponseAndDie(std::move(PendingResult)); + } + + TString DumpBlobStatus() const { + TStringStream str; + BlobStatus->Output(str, Info.Get()); + return str.Str(); + } + + void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) { + ProcessReplyFromQueue(ev->Get()); + + const NKikimrBlobStorage::TEvVGetResult& record = ev->Get()->Record; + + if (!record.HasStatus()) { + ErrorReason = "erron in TEvVGetResult - no status"; + ReplyAndDie(NKikimrProto::ERROR); + return; + } + NKikimrProto::EReplyStatus status = record.GetStatus(); + + if (!record.HasVDiskID()) { + ErrorReason = "erron in TEvVGetResult - no VDisk id"; + ReplyAndDie(NKikimrProto::ERROR); + return; + } + const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + + Y_ABORT_UNLESS(VGetsInFlight > 0); + --VGetsInFlight; + + switch (NKikimrProto::EReplyStatus newStatus = QuorumTracker.ProcessReply(vDiskId, status)) { + case NKikimrProto::ERROR: + ErrorReason = "Group is disintegrated or has network problems"; + ReplyAndDie(NKikimrProto::ERROR); + return; + + case NKikimrProto::OK: + case NKikimrProto::UNKNOWN: + break; + + default: + Y_ABORT("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data()); + } + + for (size_t i = 0; i < record.ResultSize(); ++i) { + const auto& result = record.GetResult(i); + BlobStatus->UpdateFromResponseData(result, vDiskId, Info.Get()); + + if (result.GetStatus() == NKikimrProto::NOT_YET) { + NKikimrBlobStorage::TQueryResult okResult = result; + okResult.SetStatus(NKikimrProto::OK); + BlobStatusOptimistic->UpdateFromResponseData(okResult, vDiskId, Info.Get()); + } else { + BlobStatusOptimistic->UpdateFromResponseData(result, vDiskId, Info.Get()); + } + } + + if (!VGetsInFlight) { + Analyze(); + } + } + + void Analyze() { + PendingResult.reset(new TEvBlobStorage::TEvCheckIntegrityResult(NKikimrProto::OK)); + PendingResult->Id = Id; + PendingResult->DataStatus = TEvCheckIntegrityResult::DS_UNKNOWN; // TODO + + TBlobStorageGroupInfo::EBlobState state = BlobStatus->GetBlobState(Info.Get(), nullptr); + + switch (state) { + case TBlobStorageGroupInfo::EBS_DISINTEGRATED: + ErrorReason = "Group is disintegrated or has network problems"; + ReplyAndDie(NKikimrProto::ERROR); + return; + + case TBlobStorageGroupInfo::EBS_FULL: + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_OK; + break; + + case TBlobStorageGroupInfo::EBS_UNRECOVERABLE_FRAGMENTARY: + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_ERROR; + break; + + case TBlobStorageGroupInfo::EBS_RECOVERABLE_FRAGMENTARY: { + TBlobStorageGroupInfo::EBlobState stateOptimistic = + BlobStatusOptimistic->GetBlobState(Info.Get(), nullptr); + + if (stateOptimistic == TBlobStorageGroupInfo::EBS_FULL) { + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_NOT_YET; + } else { + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_ERROR; + } + break; + } + case TBlobStorageGroupInfo::EBS_RECOVERABLE_DOUBTED: + PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_UNKNOWN; + break; + } + + ReplyAndDie(NKikimrProto::OK); + } + + std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override { + ++*Mon->NodeMon->RestartCheckIntegrity; + + auto ev = std::make_unique<TEvBlobStorage::TEvCheckIntegrity>( + Id, Deadline, GetHandleClass); + ev->RestartCounter = counter; + return ev; + } + +public: + ::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override { + return Mon->ActiveCheckIntegrity; + } + + ERequestType GetRequestType() const override { + return ERequestType::CheckIntegrity; + } + + TBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters& params) + : TBlobStorageGroupRequestActor(params) + , Id(params.Common.Event->Id) + , Deadline(params.Common.Event->Deadline) + , GetHandleClass(params.Common.Event->GetHandleClass) + , QuorumTracker(Info.Get()) + {} + + void Bootstrap() override { + BlobStatus.reset(new TBlobStatusTracker(Id, Info.Get())); + BlobStatusOptimistic.reset(new TBlobStatusTracker(Id, Info.Get())); + + for (const auto& vdisk : Info->GetVDisks()) { + auto vDiskId = Info->GetVDiskId(vdisk.OrderNumber); + + if (!Info->BelongsToSubgroup(vDiskId, Id.Hash())) { + continue; + } + + auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery( + vDiskId, Deadline, GetHandleClass, TEvBlobStorage::TEvVGet::EFlags::ShowInternals); + vGet->AddExtremeQuery(Id, 0, 0); + + SendToQueue(std::move(vGet), 0); + ++VGetsInFlight; + } + + Become(&TBlobStorageGroupCheckIntegrityRequest::StateWait); + } + + STATEFN(StateWait) { + if (ProcessEvent(ev)) { + return; + } + switch (ev->GetTypeRewrite()) { + hFunc(TEvBlobStorage::TEvVGetResult, Handle); + } + } +}; + +IActor* CreateBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters params) { + return new TBlobStorageGroupCheckIntegrityRequest(params); +} + +} //NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 713179af159..3f185a99d8b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -153,6 +153,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> Mon->EventPatch->Inc(); } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvAssimilate>) { Mon->EventAssimilate->Inc(); + } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvCheckIntegrity>) { + Mon->EventCheckIntegrity->Inc(); } } @@ -270,6 +272,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> void HandleNormal(TEvBlobStorage::TEvCollectGarbage::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvStatus::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev); + void HandleNormal(TEvBlobStorage::TEvCheckIntegrity::TPtr &ev); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); void Handle(TEvDeathNote::TPtr ev); void Handle(TEvGetQueuesInfo::TPtr ev); @@ -380,6 +383,7 @@ public: hFunc(TEvBlobStorage::TEvStatus, HANDLER); \ hFunc(TEvBlobStorage::TEvPatch, HANDLER); \ hFunc(TEvBlobStorage::TEvAssimilate, HANDLER); \ + hFunc(TEvBlobStorage::TEvCheckIntegrity, HANDLER); \ /**/ STFUNC(StateUnconfigured) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp index 22ca4793da6..2136161a646 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp @@ -49,6 +49,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni EventStopGetBatching = EventGroup->GetCounter("EvStopGetBatching", true); EventPatch = EventGroup->GetCounter("EvPatch", true); EventAssimilate = EventGroup->GetCounter("EvAssimilate", true); + EventCheckIntegrity = EventGroup->GetCounter("EvCheckIntegrity", true); PutsSentViaPutBatching = EventGroup->GetCounter("PutsSentViaPutBatching", true); PutBatchesSent = EventGroup->GetCounter("PutBatchesSent", true); @@ -76,6 +77,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni ActiveStatus = ActiveRequestsGroup->GetCounter("ActiveStatus"); ActivePatch = ActiveRequestsGroup->GetCounter("ActivePatch"); ActiveAssimilate = ActiveRequestsGroup->GetCounter("ActiveAssimilate"); + ActiveCheckIntegrity = ActiveRequestsGroup->GetCounter("ActiveCheckIntegrity"); // special patch counters VPatchContinueFailed = ActiveRequestsGroup->GetCounter("VPatchContinueFailed"); @@ -94,6 +96,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni StatusGroup.Init(group->GetSubgroup("request", "status")); AssimilateGroup.Init(group->GetSubgroup("request", "assimilate")); BlockGroup.Init(group->GetSubgroup("request", "block")); + CheckIntegrityGroup.Init(group->GetSubgroup("request", "checkIntegrity")); } ActiveMultiGet = ActiveRequestsGroup->GetCounter("ActiveMultiGet"); @@ -111,6 +114,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni RespStatStatus.emplace(respStatGroup->GetSubgroup("request", "status")); RespStatPatch.emplace(respStatGroup->GetSubgroup("request", "patch")); RespStatAssimilate.emplace(respStatGroup->GetSubgroup("request", "assimilate")); + RespStatCheckIntegrity.emplace(respStatGroup->GetSubgroup("request", "checkIntegrity")); } void TBlobStorageGroupProxyMon::BecomeFull() { @@ -216,4 +220,3 @@ void TBlobStorageGroupProxyMon::ThroughputUpdate() { } // NKikimr - diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h index 96911c6fb15..8a90ebd5efb 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h @@ -31,6 +31,7 @@ enum class ERequestType { Assimilate, Block, GetBlock, + CheckIntegrity, }; struct TRequestMonGroup { @@ -198,6 +199,7 @@ protected: TRequestMonGroup AssimilateGroup; TRequestMonGroup BlockGroup; TRequestMonGroup GetBlockGroup; + TRequestMonGroup CheckIntegrityGroup; public: TBlobStorageGroupProxyTimeStats TimeStats; @@ -224,6 +226,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr EventStopGetBatching; ::NMonitoring::TDynamicCounters::TCounterPtr EventPatch; ::NMonitoring::TDynamicCounters::TCounterPtr EventAssimilate; + ::NMonitoring::TDynamicCounters::TCounterPtr EventCheckIntegrity; ::NMonitoring::TDynamicCounters::TCounterPtr PutsSentViaPutBatching; ::NMonitoring::TDynamicCounters::TCounterPtr PutBatchesSent; @@ -245,6 +248,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr ActiveStatus; ::NMonitoring::TDynamicCounters::TCounterPtr ActivePatch; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveAssimilate; + ::NMonitoring::TDynamicCounters::TCounterPtr ActiveCheckIntegrity; std::optional<TResponseStatusGroup> RespStatPut; std::optional<TResponseStatusGroup> RespStatGet; @@ -256,6 +260,7 @@ public: std::optional<TResponseStatusGroup> RespStatStatus; std::optional<TResponseStatusGroup> RespStatPatch; std::optional<TResponseStatusGroup> RespStatAssimilate; + std::optional<TResponseStatusGroup> RespStatCheckIntegrity; // special patch counters ::NMonitoring::TDynamicCounters::TCounterPtr VPatchContinueFailed; @@ -274,7 +279,7 @@ public: case ERequestType::Assimilate: return AssimilateGroup; case ERequestType::Block: return BlockGroup; case ERequestType::GetBlock: return GetBlockGroup; - + case ERequestType::CheckIntegrity: return CheckIntegrityGroup; } Y_ABORT(); } @@ -381,4 +386,3 @@ public: }; } // NKikimr - diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp index a0ae27f7060..9cad7f755cd 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp @@ -60,6 +60,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters> RestartIndexRestoreGet = group->GetCounter("EvIndexRestoreGet", true); RestartStatus = group->GetCounter("EvStatus", true); RestartAssimilate = group->GetCounter("EvAssimilate", true); + RestartCheckIntegrity = group->GetCounter("EvCheckIntegrity", true); } { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h index 3ed2154c73f..d9e4fa1ac0a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h @@ -77,6 +77,7 @@ struct TDsProxyNodeMon : public TThrRefBase { ::NMonitoring::TDynamicCounters::TCounterPtr RestartStatus; ::NMonitoring::TDynamicCounters::TCounterPtr RestartPatch; ::NMonitoring::TDynamicCounters::TCounterPtr RestartAssimilate; + ::NMonitoring::TDynamicCounters::TCounterPtr RestartCheckIntegrity; std::array<::NMonitoring::TDynamicCounters::TCounterPtr, 4> RestartHisto; @@ -130,4 +131,3 @@ struct TDsProxyNodeMon : public TThrRefBase { }; } // NKikimr - diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp index 0c86e35abe4..b1c4c8a700e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp @@ -107,4 +107,3 @@ IActor* CreateDsProxyNodeMon(TIntrusivePtr<TDsProxyNodeMon> mon) { } } // NKikimr - diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 80768959533..50101eacec5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -42,6 +42,7 @@ namespace NKikimr { EnsureMonitoring(true); LWTRACK(DSProxyGetHandle, ev->Get()->Orbit); EnableWilsonTracing(ev, Mon->GetSamplePPM); + if (ev->Get()->IsIndexOnly) { Mon->EventIndexRestoreGet->Inc(); PushRequest(CreateBlobStorageGroupIndexRestoreGetRequest( @@ -251,6 +252,30 @@ namespace NKikimr { } } + void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvCheckIntegrity::TPtr &ev) { + EnsureMonitoring(true); + + Mon->EventCheckIntegrity->Inc(); + PushRequest(CreateBlobStorageGroupCheckIntegrityRequest( + TBlobStorageGroupCheckIntegrityParameters{ + .Common = { + .GroupInfo = Info, + .GroupQueues = Sessions->GroupQueues, + .Mon = Mon, + .Source = ev->Sender, + .Cookie = ev->Cookie, + .Now = TActivationContext::Monotonic(), + .StoragePoolCounters = StoragePoolCounters, + .RestartCounter = ev->Get()->RestartCounter, + .TraceId = std::move(ev->TraceId), + .Event = ev->Get(), + .ExecutionRelay = ev->Get()->ExecutionRelay, + } + }), + ev->Get()->Deadline + ); + } + void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev) { EnsureMonitoring(ev->Get()->IsMonitored); Mon->EventBlock->Inc(); diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp index 1e3da2b8424..912c6eb1a67 100644 --- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp +++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp @@ -65,6 +65,11 @@ namespace NKikimr { Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } + void Handle(TEvBlobStorage::TEvCheckIntegrity::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PROXY, BSPM12, "TEvCheckIntegrity", (Msg, ev->Get()->ToString())); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); + } + template<typename TOut, typename TIn> TOut *CopyExecutionRelay(TIn *in, TOut *out) { out->ExecutionRelay = std::move(in->ExecutionRelay); @@ -92,6 +97,7 @@ namespace NKikimr { hFunc(TEvBlobStorage::TEvCollectGarbage, Handle); hFunc(TEvBlobStorage::TEvStatus, Handle); hFunc(TEvBlobStorage::TEvPatch, Handle); + hFunc(TEvBlobStorage::TEvCheckIntegrity, Handle); hFunc(TEvents::TEvPoisonPill, HandlePoison); hFunc(TEvBlobStorage::TEvConfigureProxy, Handle); diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h index c9e744347cd..f49fa1d6544 100644 --- a/ydb/core/blobstorage/dsproxy/mock/model.h +++ b/ydb/core/blobstorage/dsproxy/mock/model.h @@ -413,6 +413,14 @@ namespace NFake { msg->RecordGeneration, msg->PerGenerationCounter, msg->Channel); } + TEvBlobStorage::TEvCheckIntegrityResult* Handle(TEvBlobStorage::TEvCheckIntegrity *msg) { + auto* result = new TEvBlobStorage::TEvCheckIntegrityResult(NKikimrProto::OK); + result->Id = msg->Id; + result->PlacementStatus = TEvBlobStorage::TEvCheckIntegrityResult::PS_UNKNOWN; + result->DataStatus = TEvBlobStorage::TEvCheckIntegrityResult::DS_UNKNOWN; + return result; + } + public: // Non-event model interaction methods TStorageStatusFlags GetStorageStatusFlags() const noexcept { return StorageStatusFlags; diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp index ad735311744..45a80e973cb 100644 --- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp @@ -225,6 +225,7 @@ protected: , MessageStatusResult , MessageBlockResult , MessageGetBlockResult + , MessageCheckIntegrityResult , MessageStartProfilerResult , MessageStopProfilerResult , MessageVStatusResult @@ -516,6 +517,14 @@ protected: ActTestFSM(ctx); } + void HandleCheckIntegrityResult(TEvBlobStorage::TEvCheckIntegrityResult::TPtr &ev, const TActorContext &ctx) { + LastResponse.Message = TResponseData::MessageCheckIntegrityResult; + TEvBlobStorage::TEvCheckIntegrityResult *msg = ev->Get(); + VERBOSE_COUT("HandleCheckIntegrityResult: " << StatusToString(msg->Status)); + LastResponse.Status = msg->Status; + ActTestFSM(ctx); + } + void HandleVGetResult(TEvBlobStorage::TEvVGetResult::TPtr &ev, const TActorContext &ctx) { LastResponse.Message = TResponseData::MessageVGetResult; const NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record; @@ -668,6 +677,7 @@ public: HFunc(TEvBlobStorage::TEvCollectGarbageResult, HandleCollectGarbageResult); HFunc(TEvBlobStorage::TEvBlockResult, HandleBlockResult); HFunc(TEvBlobStorage::TEvGetBlockResult, HandleGetBlockResult); + HFunc(TEvBlobStorage::TEvCheckIntegrityResult, HandleCheckIntegrityResult); HFunc(TEvProfiler::TEvStartResult, HandleStartProfilerResult); HFunc(TEvProfiler::TEvStopResult, HandleStopProfilerResult); HFunc(TEvProxyQueueState, HandleProxyQueueState); diff --git a/ydb/core/blobstorage/dsproxy/ya.make b/ydb/core/blobstorage/dsproxy/ya.make index 3ed67cf9929..e0b3f0dddf0 100644 --- a/ydb/core/blobstorage/dsproxy/ya.make +++ b/ydb/core/blobstorage/dsproxy/ya.make @@ -13,6 +13,7 @@ SRCS( dsproxy_assimilate.cpp dsproxy_block.cpp dsproxy_collect.cpp + dsproxy_check_integrity_get.cpp dsproxy_discover.cpp dsproxy_discover_m3dc.cpp dsproxy_discover_m3of4.cpp diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index d2d16afe39f..3cbd2d878cd 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -55,6 +55,7 @@ enum EServiceKikimr { BS_VDISK_BALANCING = 2600; BS_PROXY_GETBLOCK = 2601; BS_SHRED = 2602; + BS_PROXY_CHECKINTEGRITY = 2603; // DATASHARD section // TX_DATASHARD = 290; // @@ -1086,5 +1087,6 @@ message TActivity { DATA_ERASURE = 658; TENANT_DATA_ERASURE = 659; BS_SYNC_BROKER = 660; + BS_PROXY_CHECKINTEGRITY_ACTOR = 661; }; }; |