aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Dmitriev <monster@ydb.tech>2025-04-28 23:45:22 +0300
committerGitHub <noreply@github.com>2025-04-28 23:45:22 +0300
commit86255fd7869f0b22f7e48cf2146156eca530352f (patch)
tree92a9af7703c46045a86a4f478b7b029092042e99
parent0514f287ac7ea44cfde374f3e08fbbc4eaa8920e (diff)
downloadydb-86255fd7869f0b22f7e48cf2146156eca530352f.tar.gz
introduce new CheckIntegrity query in DSProxy (#17626)
-rw-r--r--ydb/core/base/blobstorage.cpp13
-rw-r--r--ydb/core/base/blobstorage.h92
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp198
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp5
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_mon.h8
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp25
-rw-r--r--ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/mock/model.h8
-rw-r--r--ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp10
-rw-r--r--ydb/core/blobstorage/dsproxy/ya.make1
-rw-r--r--ydb/library/services/services.proto2
16 files changed, 382 insertions, 5 deletions
diff --git a/ydb/core/base/blobstorage.cpp b/ydb/core/base/blobstorage.cpp
index 765dd34d351..dad40ebca01 100644
--- a/ydb/core/base/blobstorage.cpp
+++ b/ydb/core/base/blobstorage.cpp
@@ -105,6 +105,19 @@ std::unique_ptr<TEvBlobStorage::TEvGetResult> TEvBlobStorage::TEvGet::MakeErrorR
return res;
}
+void TEvBlobStorage::TEvCheckIntegrity::ToSpan(NWilson::TSpan& span) const {
+ span
+ .Attribute("Id", Id.ToString())
+ .Attribute("GetHandleClass", NKikimrBlobStorage::EGetHandleClass_Name(GetHandleClass));
+}
+
+std::unique_ptr<TEvBlobStorage::TEvCheckIntegrityResult> TEvBlobStorage::TEvCheckIntegrity::MakeErrorResponse(
+ NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId /*groupId*/) {
+ auto res = std::make_unique<TEvCheckIntegrityResult>(status);
+ res->ErrorReason = errorReason;
+ return res;
+}
+
void TEvBlobStorage::TEvBlock::ToSpan(NWilson::TSpan& span) const {
span
.Attribute("TabletId", ::ToString(TabletId))
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 295252cdd29..289bf901228 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -494,6 +494,7 @@ struct TEvBlobStorage {
EvGetQueuesInfo, // for debugging purposes
EvGetBlock,
+ EvCheckIntegrity,
//
EvPutResult = EvPut + 512, /// 268 632 576
@@ -511,6 +512,7 @@ struct TEvBlobStorage {
EvQueuesInfo, // for debugging purposes
EvGetBlockResult,
+ EvCheckIntegrityResult,
// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
@@ -951,6 +953,7 @@ struct TEvBlobStorage {
struct TEvPutResult;
struct TEvGetResult;
+ struct TEvCheckIntegrityResult;
struct TEvGetBlockResult;
struct TEvBlockResult;
struct TEvDiscoverResult;
@@ -1367,6 +1370,95 @@ struct TEvBlobStorage {
}
};
+ struct TEvCheckIntegrity : public TEventLocal<TEvCheckIntegrity, EvCheckIntegrity> {
+ TLogoBlobID Id;
+ TInstant Deadline;
+ NKikimrBlobStorage::EGetHandleClass GetHandleClass;
+
+ ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
+
+ TEvCheckIntegrity(
+ const TLogoBlobID& id,
+ TInstant deadline,
+ NKikimrBlobStorage::EGetHandleClass getHandleClass)
+ : Id(id)
+ , Deadline(deadline)
+ , GetHandleClass(getHandleClass)
+ {}
+
+ TString Print(bool /*isFull*/) const {
+ TStringStream str;
+ str << "TEvCheckIntegrity {"
+ << " Id# " << Id
+ << " Deadline# " << Deadline
+ << " GetHandleClass# " << NKikimrBlobStorage::EGetHandleClass_Name(GetHandleClass)
+ << " }";
+ return str.Str();
+ }
+
+ TString ToString() const {
+ return Print(false);
+ }
+
+ ui32 CalculateSize() const {
+ return sizeof(*this);
+ }
+
+ void ToSpan(NWilson::TSpan& span) const;
+
+ std::unique_ptr<TEvCheckIntegrityResult> MakeErrorResponse(
+ NKikimrProto::EReplyStatus status, const TString& errorReason, TGroupId groupId);
+ };
+
+ struct TEvCheckIntegrityResult : public TEventLocal<TEvCheckIntegrityResult, EvCheckIntegrityResult> {
+ TLogoBlobID Id;
+ NKikimrProto::EReplyStatus Status;
+ // OK - we were able to check the integrity
+ // any other status - some problem prevents the check, ErrorReason contains detailed info
+ // for example if the group is disintegrated, the status is ERROR
+
+ TString ErrorReason;
+
+ enum EPlacementStatus {
+ PS_OK = 1, // blob parts are placed according to fail model
+ PS_ERROR = 2, // blob parts are definitely placed incorrectly or there are missing parts for sure
+ PS_UNKNOWN = 3, // status is unknown because of missing disks or network problems
+ PS_NOT_YET = 4, // there are missing parts but status may become OK after replication
+ };
+ EPlacementStatus PlacementStatus;
+
+ // TODO: calculate data status
+ enum EDataStatus {
+ DS_OK = 1, // all data parts contain valid data
+ DS_ERROR = 2, // some parts definitely contain invalid data
+ DS_UNKNOWN = 3, // status is unknown because of missing disks or network problems
+ };
+ EDataStatus DataStatus;
+
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
+
+ TEvCheckIntegrityResult(NKikimrProto::EReplyStatus status)
+ : Status(status)
+ {}
+
+ TString Print(bool /*isFull*/) const {
+ TStringStream str;
+ str << "TEvCheckIntegrityResult {"
+ << " Id# " << Id
+ << " Status# " << NKikimrProto::EReplyStatus_Name(Status)
+ << " ErrorReason# " << ErrorReason
+ << " PlacementStatus# " << (int)PlacementStatus
+ << " DataStatus# " << (int)DataStatus
+ << " }";
+ return str.Str();
+ }
+
+ TString ToString() const {
+ return Print(false);
+ }
+ };
+
struct TEvGetBlock : public TEventLocal<TEvGetBlock, EvGetBlock> {
const ui64 TabletId;
const TInstant Deadline;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index 3b9c0c87f5a..1bf26b4ffab 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -132,6 +132,7 @@ NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus sta
XX(TEvBlobStorage::TEvStatus) \
XX(TEvBlobStorage::TEvPatch) \
XX(TEvBlobStorage::TEvAssimilate) \
+ XX(TEvBlobStorage::TEvCheckIntegrity) \
//
#define DSPROXY_ENUM_DISK_EVENTS(XX) \
@@ -427,6 +428,16 @@ struct TBlobStorageGroupRestoreGetParameters {
};
IActor* CreateBlobStorageGroupIndexRestoreGetRequest(TBlobStorageGroupRestoreGetParameters params);
+struct TBlobStorageGroupCheckIntegrityParameters {
+ TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvCheckIntegrity> Common;
+ TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
+ .LogComponent = NKikimrServices::BS_PROXY_CHECKINTEGRITY,
+ .Name = "DSProxy.CheckIntegrity",
+ .Activity = NKikimrServices::TActivity::BS_PROXY_CHECKINTEGRITY_ACTOR,
+ };
+};
+IActor* CreateBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters params);
+
struct TBlobStorageGroupDiscoverParameters {
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvDiscover> Common;
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp
new file mode 100644
index 00000000000..ec6eb5268b3
--- /dev/null
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp
@@ -0,0 +1,198 @@
+#include "dsproxy.h"
+#include "dsproxy_mon.h"
+#include "dsproxy_quorum_tracker.h"
+#include "dsproxy_blob_tracker.h"
+#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
+
+namespace NKikimr {
+
+
+class TBlobStorageGroupCheckIntegrityRequest : public TBlobStorageGroupRequestActor {
+ const TLogoBlobID Id;
+ const TInstant Deadline;
+ const NKikimrBlobStorage::EGetHandleClass GetHandleClass;
+
+ TGroupQuorumTracker QuorumTracker;
+ std::unique_ptr<TBlobStatusTracker> BlobStatus; // treats NOT_YET as ERROR
+ std::unique_ptr<TBlobStatusTracker> BlobStatusOptimistic; // treats NOT_YET as currently replicating -> OK in the future
+
+ ui32 VGetsInFlight = 0;
+
+ using TEvCheckIntegrityResult = TEvBlobStorage::TEvCheckIntegrityResult;
+ std::unique_ptr<TEvCheckIntegrityResult> PendingResult;
+
+ void ReplyAndDie(NKikimrProto::EReplyStatus status) override {
+ if (status != NKikimrProto::OK) {
+ PendingResult.reset(new TEvCheckIntegrityResult(status));
+ PendingResult->ErrorReason = ErrorReason;
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_UNKNOWN;
+ PendingResult->DataStatus = TEvCheckIntegrityResult::DS_UNKNOWN;
+ PendingResult->Id = Id;
+ }
+
+ SendResponseAndDie(std::move(PendingResult));
+ }
+
+ TString DumpBlobStatus() const {
+ TStringStream str;
+ BlobStatus->Output(str, Info.Get());
+ return str.Str();
+ }
+
+ void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
+ ProcessReplyFromQueue(ev->Get());
+
+ const NKikimrBlobStorage::TEvVGetResult& record = ev->Get()->Record;
+
+ if (!record.HasStatus()) {
+ ErrorReason = "erron in TEvVGetResult - no status";
+ ReplyAndDie(NKikimrProto::ERROR);
+ return;
+ }
+ NKikimrProto::EReplyStatus status = record.GetStatus();
+
+ if (!record.HasVDiskID()) {
+ ErrorReason = "erron in TEvVGetResult - no VDisk id";
+ ReplyAndDie(NKikimrProto::ERROR);
+ return;
+ }
+ const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
+
+ Y_ABORT_UNLESS(VGetsInFlight > 0);
+ --VGetsInFlight;
+
+ switch (NKikimrProto::EReplyStatus newStatus = QuorumTracker.ProcessReply(vDiskId, status)) {
+ case NKikimrProto::ERROR:
+ ErrorReason = "Group is disintegrated or has network problems";
+ ReplyAndDie(NKikimrProto::ERROR);
+ return;
+
+ case NKikimrProto::OK:
+ case NKikimrProto::UNKNOWN:
+ break;
+
+ default:
+ Y_ABORT("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data());
+ }
+
+ for (size_t i = 0; i < record.ResultSize(); ++i) {
+ const auto& result = record.GetResult(i);
+ BlobStatus->UpdateFromResponseData(result, vDiskId, Info.Get());
+
+ if (result.GetStatus() == NKikimrProto::NOT_YET) {
+ NKikimrBlobStorage::TQueryResult okResult = result;
+ okResult.SetStatus(NKikimrProto::OK);
+ BlobStatusOptimistic->UpdateFromResponseData(okResult, vDiskId, Info.Get());
+ } else {
+ BlobStatusOptimistic->UpdateFromResponseData(result, vDiskId, Info.Get());
+ }
+ }
+
+ if (!VGetsInFlight) {
+ Analyze();
+ }
+ }
+
+ void Analyze() {
+ PendingResult.reset(new TEvBlobStorage::TEvCheckIntegrityResult(NKikimrProto::OK));
+ PendingResult->Id = Id;
+ PendingResult->DataStatus = TEvCheckIntegrityResult::DS_UNKNOWN; // TODO
+
+ TBlobStorageGroupInfo::EBlobState state = BlobStatus->GetBlobState(Info.Get(), nullptr);
+
+ switch (state) {
+ case TBlobStorageGroupInfo::EBS_DISINTEGRATED:
+ ErrorReason = "Group is disintegrated or has network problems";
+ ReplyAndDie(NKikimrProto::ERROR);
+ return;
+
+ case TBlobStorageGroupInfo::EBS_FULL:
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_OK;
+ break;
+
+ case TBlobStorageGroupInfo::EBS_UNRECOVERABLE_FRAGMENTARY:
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_ERROR;
+ break;
+
+ case TBlobStorageGroupInfo::EBS_RECOVERABLE_FRAGMENTARY: {
+ TBlobStorageGroupInfo::EBlobState stateOptimistic =
+ BlobStatusOptimistic->GetBlobState(Info.Get(), nullptr);
+
+ if (stateOptimistic == TBlobStorageGroupInfo::EBS_FULL) {
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_NOT_YET;
+ } else {
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_ERROR;
+ }
+ break;
+ }
+ case TBlobStorageGroupInfo::EBS_RECOVERABLE_DOUBTED:
+ PendingResult->PlacementStatus = TEvCheckIntegrityResult::PS_UNKNOWN;
+ break;
+ }
+
+ ReplyAndDie(NKikimrProto::OK);
+ }
+
+ std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override {
+ ++*Mon->NodeMon->RestartCheckIntegrity;
+
+ auto ev = std::make_unique<TEvBlobStorage::TEvCheckIntegrity>(
+ Id, Deadline, GetHandleClass);
+ ev->RestartCounter = counter;
+ return ev;
+ }
+
+public:
+ ::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override {
+ return Mon->ActiveCheckIntegrity;
+ }
+
+ ERequestType GetRequestType() const override {
+ return ERequestType::CheckIntegrity;
+ }
+
+ TBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters& params)
+ : TBlobStorageGroupRequestActor(params)
+ , Id(params.Common.Event->Id)
+ , Deadline(params.Common.Event->Deadline)
+ , GetHandleClass(params.Common.Event->GetHandleClass)
+ , QuorumTracker(Info.Get())
+ {}
+
+ void Bootstrap() override {
+ BlobStatus.reset(new TBlobStatusTracker(Id, Info.Get()));
+ BlobStatusOptimistic.reset(new TBlobStatusTracker(Id, Info.Get()));
+
+ for (const auto& vdisk : Info->GetVDisks()) {
+ auto vDiskId = Info->GetVDiskId(vdisk.OrderNumber);
+
+ if (!Info->BelongsToSubgroup(vDiskId, Id.Hash())) {
+ continue;
+ }
+
+ auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(
+ vDiskId, Deadline, GetHandleClass, TEvBlobStorage::TEvVGet::EFlags::ShowInternals);
+ vGet->AddExtremeQuery(Id, 0, 0);
+
+ SendToQueue(std::move(vGet), 0);
+ ++VGetsInFlight;
+ }
+
+ Become(&TBlobStorageGroupCheckIntegrityRequest::StateWait);
+ }
+
+ STATEFN(StateWait) {
+ if (ProcessEvent(ev)) {
+ return;
+ }
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvBlobStorage::TEvVGetResult, Handle);
+ }
+ }
+};
+
+IActor* CreateBlobStorageGroupCheckIntegrityRequest(TBlobStorageGroupCheckIntegrityParameters params) {
+ return new TBlobStorageGroupCheckIntegrityRequest(params);
+}
+
+} //NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index 713179af159..3f185a99d8b 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -153,6 +153,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
Mon->EventPatch->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvAssimilate>) {
Mon->EventAssimilate->Inc();
+ } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvCheckIntegrity>) {
+ Mon->EventCheckIntegrity->Inc();
}
}
@@ -270,6 +272,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
void HandleNormal(TEvBlobStorage::TEvCollectGarbage::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvStatus::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev);
+ void HandleNormal(TEvBlobStorage::TEvCheckIntegrity::TPtr &ev);
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
void Handle(TEvDeathNote::TPtr ev);
void Handle(TEvGetQueuesInfo::TPtr ev);
@@ -380,6 +383,7 @@ public:
hFunc(TEvBlobStorage::TEvStatus, HANDLER); \
hFunc(TEvBlobStorage::TEvPatch, HANDLER); \
hFunc(TEvBlobStorage::TEvAssimilate, HANDLER); \
+ hFunc(TEvBlobStorage::TEvCheckIntegrity, HANDLER); \
/**/
STFUNC(StateUnconfigured) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
index 22ca4793da6..2136161a646 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
@@ -49,6 +49,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
EventStopGetBatching = EventGroup->GetCounter("EvStopGetBatching", true);
EventPatch = EventGroup->GetCounter("EvPatch", true);
EventAssimilate = EventGroup->GetCounter("EvAssimilate", true);
+ EventCheckIntegrity = EventGroup->GetCounter("EvCheckIntegrity", true);
PutsSentViaPutBatching = EventGroup->GetCounter("PutsSentViaPutBatching", true);
PutBatchesSent = EventGroup->GetCounter("PutBatchesSent", true);
@@ -76,6 +77,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
ActiveStatus = ActiveRequestsGroup->GetCounter("ActiveStatus");
ActivePatch = ActiveRequestsGroup->GetCounter("ActivePatch");
ActiveAssimilate = ActiveRequestsGroup->GetCounter("ActiveAssimilate");
+ ActiveCheckIntegrity = ActiveRequestsGroup->GetCounter("ActiveCheckIntegrity");
// special patch counters
VPatchContinueFailed = ActiveRequestsGroup->GetCounter("VPatchContinueFailed");
@@ -94,6 +96,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
StatusGroup.Init(group->GetSubgroup("request", "status"));
AssimilateGroup.Init(group->GetSubgroup("request", "assimilate"));
BlockGroup.Init(group->GetSubgroup("request", "block"));
+ CheckIntegrityGroup.Init(group->GetSubgroup("request", "checkIntegrity"));
}
ActiveMultiGet = ActiveRequestsGroup->GetCounter("ActiveMultiGet");
@@ -111,6 +114,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
RespStatStatus.emplace(respStatGroup->GetSubgroup("request", "status"));
RespStatPatch.emplace(respStatGroup->GetSubgroup("request", "patch"));
RespStatAssimilate.emplace(respStatGroup->GetSubgroup("request", "assimilate"));
+ RespStatCheckIntegrity.emplace(respStatGroup->GetSubgroup("request", "checkIntegrity"));
}
void TBlobStorageGroupProxyMon::BecomeFull() {
@@ -216,4 +220,3 @@ void TBlobStorageGroupProxyMon::ThroughputUpdate() {
} // NKikimr
-
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
index 96911c6fb15..8a90ebd5efb 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
@@ -31,6 +31,7 @@ enum class ERequestType {
Assimilate,
Block,
GetBlock,
+ CheckIntegrity,
};
struct TRequestMonGroup {
@@ -198,6 +199,7 @@ protected:
TRequestMonGroup AssimilateGroup;
TRequestMonGroup BlockGroup;
TRequestMonGroup GetBlockGroup;
+ TRequestMonGroup CheckIntegrityGroup;
public:
TBlobStorageGroupProxyTimeStats TimeStats;
@@ -224,6 +226,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr EventStopGetBatching;
::NMonitoring::TDynamicCounters::TCounterPtr EventPatch;
::NMonitoring::TDynamicCounters::TCounterPtr EventAssimilate;
+ ::NMonitoring::TDynamicCounters::TCounterPtr EventCheckIntegrity;
::NMonitoring::TDynamicCounters::TCounterPtr PutsSentViaPutBatching;
::NMonitoring::TDynamicCounters::TCounterPtr PutBatchesSent;
@@ -245,6 +248,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr ActiveStatus;
::NMonitoring::TDynamicCounters::TCounterPtr ActivePatch;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveAssimilate;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ActiveCheckIntegrity;
std::optional<TResponseStatusGroup> RespStatPut;
std::optional<TResponseStatusGroup> RespStatGet;
@@ -256,6 +260,7 @@ public:
std::optional<TResponseStatusGroup> RespStatStatus;
std::optional<TResponseStatusGroup> RespStatPatch;
std::optional<TResponseStatusGroup> RespStatAssimilate;
+ std::optional<TResponseStatusGroup> RespStatCheckIntegrity;
// special patch counters
::NMonitoring::TDynamicCounters::TCounterPtr VPatchContinueFailed;
@@ -274,7 +279,7 @@ public:
case ERequestType::Assimilate: return AssimilateGroup;
case ERequestType::Block: return BlockGroup;
case ERequestType::GetBlock: return GetBlockGroup;
-
+ case ERequestType::CheckIntegrity: return CheckIntegrityGroup;
}
Y_ABORT();
}
@@ -381,4 +386,3 @@ public:
};
} // NKikimr
-
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
index a0ae27f7060..9cad7f755cd 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
@@ -60,6 +60,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
RestartIndexRestoreGet = group->GetCounter("EvIndexRestoreGet", true);
RestartStatus = group->GetCounter("EvStatus", true);
RestartAssimilate = group->GetCounter("EvAssimilate", true);
+ RestartCheckIntegrity = group->GetCounter("EvCheckIntegrity", true);
}
{
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
index 3ed2154c73f..d9e4fa1ac0a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
@@ -77,6 +77,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr RestartStatus;
::NMonitoring::TDynamicCounters::TCounterPtr RestartPatch;
::NMonitoring::TDynamicCounters::TCounterPtr RestartAssimilate;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RestartCheckIntegrity;
std::array<::NMonitoring::TDynamicCounters::TCounterPtr, 4> RestartHisto;
@@ -130,4 +131,3 @@ struct TDsProxyNodeMon : public TThrRefBase {
};
} // NKikimr
-
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp
index 0c86e35abe4..b1c4c8a700e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp
@@ -107,4 +107,3 @@ IActor* CreateDsProxyNodeMon(TIntrusivePtr<TDsProxyNodeMon> mon) {
}
} // NKikimr
-
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 80768959533..50101eacec5 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -42,6 +42,7 @@ namespace NKikimr {
EnsureMonitoring(true);
LWTRACK(DSProxyGetHandle, ev->Get()->Orbit);
EnableWilsonTracing(ev, Mon->GetSamplePPM);
+
if (ev->Get()->IsIndexOnly) {
Mon->EventIndexRestoreGet->Inc();
PushRequest(CreateBlobStorageGroupIndexRestoreGetRequest(
@@ -251,6 +252,30 @@ namespace NKikimr {
}
}
+ void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvCheckIntegrity::TPtr &ev) {
+ EnsureMonitoring(true);
+
+ Mon->EventCheckIntegrity->Inc();
+ PushRequest(CreateBlobStorageGroupCheckIntegrityRequest(
+ TBlobStorageGroupCheckIntegrityParameters{
+ .Common = {
+ .GroupInfo = Info,
+ .GroupQueues = Sessions->GroupQueues,
+ .Mon = Mon,
+ .Source = ev->Sender,
+ .Cookie = ev->Cookie,
+ .Now = TActivationContext::Monotonic(),
+ .StoragePoolCounters = StoragePoolCounters,
+ .RestartCounter = ev->Get()->RestartCounter,
+ .TraceId = std::move(ev->TraceId),
+ .Event = ev->Get(),
+ .ExecutionRelay = ev->Get()->ExecutionRelay,
+ }
+ }),
+ ev->Get()->Deadline
+ );
+ }
+
void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev) {
EnsureMonitoring(ev->Get()->IsMonitored);
Mon->EventBlock->Inc();
diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
index 1e3da2b8424..912c6eb1a67 100644
--- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
+++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
@@ -65,6 +65,11 @@ namespace NKikimr {
Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
+ void Handle(TEvBlobStorage::TEvCheckIntegrity::TPtr& ev) {
+ STLOG(PRI_DEBUG, BS_PROXY, BSPM12, "TEvCheckIntegrity", (Msg, ev->Get()->ToString()));
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
+ }
+
template<typename TOut, typename TIn>
TOut *CopyExecutionRelay(TIn *in, TOut *out) {
out->ExecutionRelay = std::move(in->ExecutionRelay);
@@ -92,6 +97,7 @@ namespace NKikimr {
hFunc(TEvBlobStorage::TEvCollectGarbage, Handle);
hFunc(TEvBlobStorage::TEvStatus, Handle);
hFunc(TEvBlobStorage::TEvPatch, Handle);
+ hFunc(TEvBlobStorage::TEvCheckIntegrity, Handle);
hFunc(TEvents::TEvPoisonPill, HandlePoison);
hFunc(TEvBlobStorage::TEvConfigureProxy, Handle);
diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h
index c9e744347cd..f49fa1d6544 100644
--- a/ydb/core/blobstorage/dsproxy/mock/model.h
+++ b/ydb/core/blobstorage/dsproxy/mock/model.h
@@ -413,6 +413,14 @@ namespace NFake {
msg->RecordGeneration, msg->PerGenerationCounter, msg->Channel);
}
+ TEvBlobStorage::TEvCheckIntegrityResult* Handle(TEvBlobStorage::TEvCheckIntegrity *msg) {
+ auto* result = new TEvBlobStorage::TEvCheckIntegrityResult(NKikimrProto::OK);
+ result->Id = msg->Id;
+ result->PlacementStatus = TEvBlobStorage::TEvCheckIntegrityResult::PS_UNKNOWN;
+ result->DataStatus = TEvBlobStorage::TEvCheckIntegrityResult::DS_UNKNOWN;
+ return result;
+ }
+
public: // Non-event model interaction methods
TStorageStatusFlags GetStorageStatusFlags() const noexcept {
return StorageStatusFlags;
diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
index ad735311744..45a80e973cb 100644
--- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
@@ -225,6 +225,7 @@ protected:
, MessageStatusResult
, MessageBlockResult
, MessageGetBlockResult
+ , MessageCheckIntegrityResult
, MessageStartProfilerResult
, MessageStopProfilerResult
, MessageVStatusResult
@@ -516,6 +517,14 @@ protected:
ActTestFSM(ctx);
}
+ void HandleCheckIntegrityResult(TEvBlobStorage::TEvCheckIntegrityResult::TPtr &ev, const TActorContext &ctx) {
+ LastResponse.Message = TResponseData::MessageCheckIntegrityResult;
+ TEvBlobStorage::TEvCheckIntegrityResult *msg = ev->Get();
+ VERBOSE_COUT("HandleCheckIntegrityResult: " << StatusToString(msg->Status));
+ LastResponse.Status = msg->Status;
+ ActTestFSM(ctx);
+ }
+
void HandleVGetResult(TEvBlobStorage::TEvVGetResult::TPtr &ev, const TActorContext &ctx) {
LastResponse.Message = TResponseData::MessageVGetResult;
const NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
@@ -668,6 +677,7 @@ public:
HFunc(TEvBlobStorage::TEvCollectGarbageResult, HandleCollectGarbageResult);
HFunc(TEvBlobStorage::TEvBlockResult, HandleBlockResult);
HFunc(TEvBlobStorage::TEvGetBlockResult, HandleGetBlockResult);
+ HFunc(TEvBlobStorage::TEvCheckIntegrityResult, HandleCheckIntegrityResult);
HFunc(TEvProfiler::TEvStartResult, HandleStartProfilerResult);
HFunc(TEvProfiler::TEvStopResult, HandleStopProfilerResult);
HFunc(TEvProxyQueueState, HandleProxyQueueState);
diff --git a/ydb/core/blobstorage/dsproxy/ya.make b/ydb/core/blobstorage/dsproxy/ya.make
index 3ed67cf9929..e0b3f0dddf0 100644
--- a/ydb/core/blobstorage/dsproxy/ya.make
+++ b/ydb/core/blobstorage/dsproxy/ya.make
@@ -13,6 +13,7 @@ SRCS(
dsproxy_assimilate.cpp
dsproxy_block.cpp
dsproxy_collect.cpp
+ dsproxy_check_integrity_get.cpp
dsproxy_discover.cpp
dsproxy_discover_m3dc.cpp
dsproxy_discover_m3of4.cpp
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index d2d16afe39f..3cbd2d878cd 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -55,6 +55,7 @@ enum EServiceKikimr {
BS_VDISK_BALANCING = 2600;
BS_PROXY_GETBLOCK = 2601;
BS_SHRED = 2602;
+ BS_PROXY_CHECKINTEGRITY = 2603;
// DATASHARD section //
TX_DATASHARD = 290; //
@@ -1086,5 +1087,6 @@ message TActivity {
DATA_ERASURE = 658;
TENANT_DATA_ERASURE = 659;
BS_SYNC_BROKER = 660;
+ BS_PROXY_CHECKINTEGRITY_ACTOR = 661;
};
};