diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-10-22 22:03:02 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-10-22 22:03:02 +0300 |
commit | 11bc4015b8010ae201bf3eb33db7dba425aca35e (patch) | |
tree | d588389611dfce6ca3cfaf78152eef9c205ca630 | |
parent | 0b931ad6e6868bca7e7ec4617999c0d6befd7003 (diff) | |
download | ydb-11bc4015b8010ae201bf3eb33db7dba425aca35e.tar.gz |
Ydb stable 22-4-3122.4.31
x-stable-origin-commit: 2bc59c7eeae4a8f3d396867de193d1375dd388ce
35 files changed, 876 insertions, 150 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index b246214495..a368b0ea39 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -3,6 +3,7 @@ #include "root_cause.h" #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h> +#include <ydb/core/util/stlog.h> #include <util/generic/ymath.h> #include <util/system/datetime.h> @@ -71,6 +72,20 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob TBlobStorageGroupInfo::TVDiskIds VDisks; bool UseVPatch = false; + bool IsGoodPatchedBlobId = false; + bool IsAllowedErasure = false; + +#define PATCH_LOG(priority, service, marker, msg, ...) \ + STLOG(priority, service, marker, msg, \ + (ActorId, SelfId()), \ + (Group, Info->GroupID), \ + (DiffCount, DiffCount), \ + (OriginalBlob, OriginalId), \ + (PatchedBlob, PatchedId), \ + (Deadline, Deadline), \ + (RestartCounter, RestartCounter), \ + __VA_ARGS__) \ +// PATCH_LOG public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -109,6 +124,10 @@ public: } void ReplyAndDie(NKikimrProto::EReplyStatus status) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA02, "ReplyAndDie", + (Status, status), + (ErrorReason, ErrorReason)); + std::unique_ptr<TEvBlobStorage::TEvPatchResult> result = std::make_unique<TEvBlobStorage::TEvPatchResult>(status, PatchedId, StatusFlags, Info->GroupID, ApproximateFreeSpaceShare); result->ErrorReason = ErrorReason; @@ -139,6 +158,10 @@ public: TEvBlobStorage::TEvGetResult *result = ev->Get(); Orbit = std::move(result->Orbit); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA30, "Received TEvGetResult", + (Status, result->Status), + (ErrorReason, result->ErrorReason)); + ui32 patchedIdHash = PatchedId.Hash(); bool incorrectCookie = ev->Cookie != patchedIdHash; bool fail = incorrectCookie @@ -161,7 +184,6 @@ public: << getResponseStatus << " GetErrorReason# " << result->ErrorReason; } - R_LOG_ERROR_S("BPPA04", ErrorReason); ReplyAndDie(NKikimrProto::ERROR); return; } @@ -179,6 +201,10 @@ public: TEvBlobStorage::TEvPutResult *result = ev->Get(); Orbit = std::move(result->Orbit); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA29, "Received TEvPutResult", + (Status, result->Status), + (ErrorReason, result->ErrorReason)); + StatusFlags = result->StatusFlags; ApproximateFreeSpaceShare = result->ApproximateFreeSpaceShare; @@ -194,7 +220,6 @@ public: << " PutStatus# " << NKikimrProto::EReplyStatus_Name(result->Status) << " PutErrorReason# " << result->ErrorReason; } - R_LOG_ERROR_S("BPPA03", ErrorReason); ReplyAndDie(NKikimrProto::ERROR); return; } @@ -217,9 +242,11 @@ public: void Handle(TEvBlobStorage::TEvVMovedPatchResult::TPtr &ev) { TEvBlobStorage::TEvVMovedPatchResult *result = ev->Get(); - A_LOG_DEBUG_S("BPPA02", "received " << ev->Get()->ToString() - << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID())); NKikimrBlobStorage::TEvVMovedPatchResult &record = result->Record; + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA28, "Received TEvVMovedPatchResult", + (Status, record.GetStatus()), + (ErrorReason, record.GetErrorReason()), + (VDiskId, VDiskIDFromVDiskID(record.GetVDiskID()))); PullOutStatusFlagsAndFressSpace(record); Orbit = std::move(result->Orbit); @@ -240,10 +267,9 @@ public: << " VMovedPatchStatus# " << NKikimrProto::EReplyStatus_Name(record.GetStatus()) << subErrorReason; } - A_LOG_INFO_S("BPPA05", "VMovedPatch failed, NaivePatch started;" - << " OriginalId# " << OriginalId - << " PatchedId# " << PatchedId - << " ErrorReason# " << ErrorReason); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Start Naive strategy from hadling TEvVMovedPatchResult", + (Status, record.GetStatus()), + (ErrorReason, ErrorReason)); StartNaivePatch(); return; } @@ -284,26 +310,22 @@ public: OkVDisksWithParts.push_back(subgroupIdx); } - A_LOG_INFO_S("BPPA07", "received VPatchFoundParts" - << " Status# " << status - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline - << " SubgroupIdx# " << (ui32)subgroupIdx - << " PartsCount# " << record.OriginalPartsSize() - << " ReceivedFoundParts# " << ReceivedFoundParts << '/' << SendedStarts - << " ErrorReason# " << errorReason); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA26, "Received VPatchFoundParts", + (Status, status), + (SubgroupIdx, (ui32)subgroupIdx), + (ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())), + (ErrorReason, errorReason)); if (ReceivedFoundParts == SendedStarts) { bool continueVPatch = VerifyPartPlacement(); if (continueVPatch) { continueVPatch = ContinueVPatch(); } else { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Failed VerifyPartPlacement"); Mon->VPatchPartPlacementVerifyFailed->Inc(); } if (!continueVPatch) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Start Fallback strategy from hadling TEvVPatchFoundParts"); StopVPatch(); StartFallback(); } @@ -321,28 +343,27 @@ public: errorReason = record.GetErrorReason(); } - A_LOG_INFO_S("BPPA06", "received VPatchResult" - << " Status# " << status - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline - << " ReceivedResults# " << ReceivedResults << '/' << Info->Type.TotalPartCount() - << " ErrorReason# " << errorReason); - Y_VERIFY(record.HasCookie()); ui8 subgroupIdx = record.GetCookie(); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult", + (Status, status), + (SubgroupIdx, (ui32)subgroupIdx), + (ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())), + (ErrorReason, errorReason)); + bool wasReceived = std::exchange(ReceivedResponseFlags[subgroupIdx], true); Y_VERIFY(!wasReceived); if (status != NKikimrProto::OK) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA24, "Start Fallback strategy from handling VPatchResult", + (ReceivedResults, TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())); StartFallback(); return; } if (ReceivedResults == Info->Type.TotalPartCount()) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA25, "Got all succesful responses, make own success response"); ReplyAndDie(NKikimrProto::OK); } } @@ -357,6 +378,7 @@ public: } if (countByDC[0] && countByDC[1] && countByDC[2]) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA22, "VerifyPartPlacement {mirror-3-dc} found all 3 disks"); return true; } @@ -366,6 +388,8 @@ public: x2Count++; } } + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "VerifyPartPlacement {mirror-3-dc}", + (X2Count, x2Count)); return x2Count >= 2; } @@ -374,15 +398,21 @@ public: return VerifyPartPlacementForMirror3dc(); } else { TSubgroupPartLayout layout; - - for (auto &[subgroupIdx, partId] : FoundParts) { - layout.AddItem(subgroupIdx, partId - 1, Info->Type); + for (auto &placement : FoundParts) { + PATCH_LOG(PRI_TRACE, BS_PROXY_PATCH, BPPA31, "Get part", + (SubgroupIdx, (ui32)placement.VDiskIdxInSubgroup), + (PartId, (ui32)placement.PartId)); + layout.AddItem(placement.VDiskIdxInSubgroup, placement.PartId - 1, Info->Type); } + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA21, "VerifyPartPlacement", + (EffectiveReplicas, layout.CountEffectiveReplicas(Info->Type)), + (TotalPartount, Info->Type.TotalPartCount())); return layout.CountEffectiveReplicas(Info->Type) == Info->Type.TotalPartCount(); } } void SendStopDiffs() { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs"); TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events; for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) { if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) { @@ -390,6 +420,9 @@ public: OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx); ev->SetForceEnd(); events.emplace_back(std::move(ev)); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message", + (VDiskIdxInSubgroup, vdiskIdx), + (VDiskId, VDisks[vdiskIdx])); } } SendToQueues(events, false); @@ -456,7 +489,12 @@ public: for (const TPartPlacement &parity : parityPlacements) { ev->AddXorReceiver(VDisks[parity.VDiskIdxInSubgroup], parity.PartId); } - + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff", + (VDiskIdxInSubgroup, idxInSubgroup), + (PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup), + (DiffsForPart, diffsForPart.size()), + (ParityPlacements, parityPlacements.size()), + (WaitedXorDiffs, waitedXorDiffs)); events.push_back(std::move(ev)); } SendToQueues(events, false); @@ -487,12 +525,8 @@ public: } void StartMovedPatch() { - A_LOG_DEBUG_S("BPPA12", "StartMovedPatch" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA09, "Start Moved strategy", + (SendedStarts, SendedStarts)); Become(&TThis::MovedPatchState); ui32 subgroupIdx = 0; @@ -517,12 +551,7 @@ public: } void StartNaivePatch() { - A_LOG_DEBUG_S("BPPA11", "StartNaivePatch" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA07, "Start Naive strategy"); Become(&TThis::NaiveState); auto get = std::make_unique<TEvBlobStorage::TEvGet>(OriginalId, 0, OriginalId.BlobSize(), Deadline, NKikimrBlobStorage::AsyncRead); @@ -537,8 +566,12 @@ public: void StartFallback() { Mon->PatchesWithFallback->Inc(); if (WithMovingPatchRequestToStaticNode && UseVPatch) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback"); StartMovedPatch(); } else { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA06, "Start Naive strategy from fallback", + (WithMovingPatchRequestToStaticNode, WithMovingPatchRequestToStaticNode), + (UseVPatch, UseVPatch)); StartNaivePatch(); } } @@ -560,13 +593,8 @@ public: SendedStarts++; } - A_LOG_DEBUG_S("BPPA08", "StartVPatcn" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline - << " SendedStarts# " << SendedStarts); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA08, "Start VPatch strategy", + (SendedStarts, SendedStarts)); SendToQueues(events, false); } @@ -611,6 +639,7 @@ public: } bool ContinueVPatchForMirror3dc() { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA10, "Continue VPatch {mirror-3-dc}"); constexpr ui32 DCCount = 3; constexpr ui32 VDiskByDC = 3; ui32 countByDC[DCCount] = {0, 0, 0}; @@ -624,9 +653,14 @@ public: } if (countByDC[0] && countByDC[1] && countByDC[2]) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA11, "Found disks {mirror-3-dc} on each dc", + (DiskFromFirstDC, diskByDC[0][0].ToString()), + (DiskFromSecondDC, diskByDC[0][0].ToString()), + (DiskFromThirdDC, diskByDC[2][0].ToString())); SendDiffs({diskByDC[0][0], diskByDC[1][0], diskByDC[2][0]}); return true; } + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA12, "Didn't find disks {mirror-3-dc} on each dc"); ui32 x2Count = 0; for (ui32 dcIdx = 0; dcIdx < DCCount; ++dcIdx) { @@ -635,6 +669,7 @@ public: } } if (x2Count < 2) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA13, "Didn't find disks {mirror-3-dc}"); return false; } TStackVec<TPartPlacement, TypicalPartsInBlob> placements; @@ -645,17 +680,17 @@ public: } } SendDiffs(placements); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA14, "Found disks {mirror-3-dc} x2 mode", + (FirstDiskFromFirstDC, placements[0]), + (SecondDiskFromFirstDC, placements[1]), + (FirstDiskFromSecondDC, placements[2]), + (SecondDiskFromSecondDC, placements[3])); return true; } bool ContinueVPatch() { - A_LOG_DEBUG_S("BPPA09", "ContinueVPatch" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " FoundParts# " << ConvertFoundPartsToString() - << " Deadline# " << Deadline); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA15, "Continue VPatch strategy", + (FoundParts, ConvertFoundPartsToString())); if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) { return ContinueVPatchForMirror3dc(); @@ -685,10 +720,11 @@ public: TStackVec<ui32, TypicalHandoffCount> choosenHandoffForParts(handoffParts.size()); if (handoffParts.size()) { bool find = FindHandoffs(handoffForParts, handoffParts, &choosenHandoffForParts); - Y_VERIFY_DEBUG_S(find, "handoffParts# " << FormatList(handoffParts) - << " FoundParts# " << ConvertFoundPartsToString() - << " choosenHandoffForParts# " << FormatList(choosenHandoffForParts) - << " inPrimary# " << FormatList(inPrimary)); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA16, "Find handoff parts", + (HandoffParts, FormatList(handoffParts)), + (FoundParts, ConvertFoundPartsToString()), + (choosenHandoffForParts, FormatList(choosenHandoffForParts)), + (IsPrimary, FormatList(inPrimary))); if (!find) { Mon->VPatchContinueFailed->Inc(); return false; @@ -700,12 +736,7 @@ public: } void StopVPatch() { - A_LOG_DEBUG_S("BPPA10", "StopVPatch" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA17, "Stop VPatch strategy"); SendStopDiffs(); ReceivedResponseFlags.assign(VDisks.size(), false); } @@ -741,14 +772,8 @@ public: } void Bootstrap() { - A_LOG_INFO_S("BPPA01", "bootstrap" - << " ActorId# " << SelfId() - << " Group# " << Info->GroupID - << " DiffCount# " << DiffCount - << " OriginalBlob# " << OriginalId - << " PatchedBlob# " << PatchedId - << " Deadline# " << Deadline - << " RestartCounter# " << RestartCounter); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA01, "Actor bootstrapped"); + TLogoBlobID truePatchedBlobId = PatchedId; bool result = true; if (Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock) { @@ -770,13 +795,20 @@ public: return; } - bool isAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock + IsGoodPatchedBlobId = result; + IsAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock || Info->Type.GetErasure() == TErasureType::ErasureNone || Info->Type.GetErasure() == TErasureType::ErasureMirror3 || Info->Type.GetErasure() == TErasureType::ErasureMirror3dc; - if (result && isAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID) { + if (IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA03, "Start VPatch strategy from bootstrap"); StartVPatch(); } else { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA04, "Start Fallback strategy from bootstrap", + (IsGoodPatchedBlobId, IsGoodPatchedBlobId), + (IsAllowedErasure, IsAllowedErasure), + (UseVPatch, UseVPatch), + (IsSameGroup, OriginalGroupId == Info->GroupID)); StartFallback(); } } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 02586ae450..925e46d0d1 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -401,8 +401,14 @@ void TNodeWarden::Handle(TEvStatusUpdate::TPtr ev) { auto *msg = ev->Get(); const TVSlotId vslotId(msg->NodeId, msg->PDiskId, msg->VSlotId); if (const auto it = LocalVDisks.find(vslotId); it != LocalVDisks.end() && it->second.Status != msg->Status) { - it->second.Status = msg->Status; + auto& vdisk = it->second; + vdisk.Status = msg->Status; SendDiskMetrics(false); + + if (msg->Status == NKikimrBlobStorage::EVDiskStatus::READY && vdisk.WhiteboardVDiskId) { + Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors(*vdisk.WhiteboardVDiskId, + vdisk.WhiteboardInstanceGuid, NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors::TDropAllDonors())); + } } } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 9b72c8dfdc..5863448155 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -224,6 +224,12 @@ namespace NKikimr::NStorage { return MakeBlobStorageVDiskID(NodeId, PDiskId, VDiskSlotId); } + void Serialize(NKikimrBlobStorage::TVSlotId *proto) const { + proto->SetNodeId(NodeId); + proto->SetPDiskId(PDiskId); + proto->SetVSlotId(VDiskSlotId); + } + auto AsTuple() const { return std::make_tuple(NodeId, PDiskId, VDiskSlotId); } friend bool operator <(const TVSlotId& x, const TVSlotId& y) { return x.AsTuple() < y.AsTuple(); } friend bool operator <=(const TVSlotId& x, const TVSlotId& y) { return x.AsTuple() <= y.AsTuple(); } @@ -253,6 +259,7 @@ namespace NKikimr::NStorage { // Last VDiskId reported to Node Whiteboard. std::optional<TVDiskID> WhiteboardVDiskId; + ui64 WhiteboardInstanceGuid; bool SlayInFlight = false; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp index 8e6bc26986..22a59222f8 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp @@ -175,6 +175,7 @@ namespace NKikimr::NStorage { Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(vdiskId, groupInfo->GetStoragePoolName(), vslotId.PDiskId, vslotId.VDiskSlotId, pdiskGuid, kind, donorMode, whiteboardInstanceGuid, std::move(donors))); vdisk.WhiteboardVDiskId.emplace(vdiskId); + vdisk.WhiteboardInstanceGuid = whiteboardInstanceGuid; // create an actor auto *as = TActivationContext::ActorSystem(); @@ -281,9 +282,19 @@ namespace NKikimr::NStorage { void TNodeWarden::Handle(TEvBlobStorage::TEvDropDonor::TPtr ev) { auto *msg = ev->Get(); - STLOG(PRI_INFO, BS_NODE, NW34, "TEvDropDonor", (VSlotId, TVSlotId(msg->NodeId, msg->PDiskId, msg->VSlotId)), - (VDiskId, msg->VDiskId)); + const TVSlotId vslotId(msg->NodeId, msg->PDiskId, msg->VSlotId); + STLOG(PRI_INFO, BS_NODE, NW34, "TEvDropDonor", (VSlotId, vslotId), (VDiskId, msg->VDiskId)); SendDropDonorQuery(msg->NodeId, msg->PDiskId, msg->VSlotId, msg->VDiskId); + + if (const auto it = LocalVDisks.find(vslotId); it != LocalVDisks.end()) { + const auto& vdisk = it->second; + if (vdisk.WhiteboardVDiskId) { + NKikimrBlobStorage::TVSlotId id; + vslotId.Serialize(&id); + Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors(*vdisk.WhiteboardVDiskId, + vdisk.WhiteboardInstanceGuid, {id})); + } + } } void TNodeWarden::UpdateGroupInfoForDisk(TVDiskRecord& vdisk, const TIntrusivePtr<TBlobStorageGroupInfo>& newInfo) { diff --git a/ydb/core/grpc_services/CMakeLists.txt b/ydb/core/grpc_services/CMakeLists.txt index 9299bc2887..626dda12a5 100644 --- a/ydb/core/grpc_services/CMakeLists.txt +++ b/ydb/core/grpc_services/CMakeLists.txt @@ -53,6 +53,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC cpp-client-resources ) target_sources(ydb-core-grpc_services PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_mon.cpp diff --git a/ydb/core/grpc_services/audit_log.cpp b/ydb/core/grpc_services/audit_log.cpp new file mode 100644 index 0000000000..8d353d0517 --- /dev/null +++ b/ydb/core/grpc_services/audit_log.cpp @@ -0,0 +1,21 @@ +#include "defs.h" +#include "audit_log.h" + +#include "base/base.h" + +namespace NKikimr { +namespace NGRpcService { + +void AuditLog(const IRequestProxyCtx* reqCtx, const TString& database, + const TString& subject, const TActorContext& ctx) +{ + LOG_NOTICE_S(ctx, NKikimrServices::GRPC_SERVER, "AUDIT: " + << "request name: " << reqCtx->GetRequestName() + << ", database: " << database + << ", peer: " << reqCtx->GetPeerName() + << ", subject: " << subject); +} + +} +} + diff --git a/ydb/core/grpc_services/audit_log.h b/ydb/core/grpc_services/audit_log.h new file mode 100644 index 0000000000..47742db4ee --- /dev/null +++ b/ydb/core/grpc_services/audit_log.h @@ -0,0 +1,12 @@ +#pragma once + +namespace NKikimr { +namespace NGRpcService { + +class IRequestProxyCtx; + +void AuditLog(const IRequestProxyCtx* reqCtx, const TString& database, + const TString& subject, const TActorContext& ctx); + +} +} diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index 715172355c..fe0902b4e4 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -1,5 +1,6 @@ #pragma once #include "defs.h" +#include "audit_log.h" #include "service_ratelimiter_events.h" #include "local_rate_limiter.h" #include "operation_helpers.h" @@ -112,6 +113,10 @@ public: } } + if (AppData(ctx)->FeatureFlags.GetEnableGrpcAudit()) { + AuditLog(GrpcRequestBaseCtx_, CheckedDatabaseName_, GetSubject(), ctx); + } + // Simple rps limitation static NRpcService::TRlConfig rpsRlConfig( "serverless_rt_coordination_node_path", @@ -205,6 +210,11 @@ public: } private: + TString GetSubject() const { + const auto sid = TBase::GetUserSID(); + return sid ? sid : "no subject"; + } + static NYql::TIssues GetRlIssues(const Ydb::RateLimiter::AcquireResourceResponse& resp) { NYql::TIssues opIssues; NYql::IssuesFromMessage(resp.operation().issues(), opIssues); diff --git a/ydb/core/mind/hive/balancer.cpp b/ydb/core/mind/hive/balancer.cpp index 8521e30499..9106fceccd 100644 --- a/ydb/core/mind/hive/balancer.cpp +++ b/ydb/core/mind/hive/balancer.cpp @@ -146,6 +146,8 @@ protected: if (RecheckOnFinish && MaxMovements != 0 && Movements >= MaxMovements) { BLOG_D("Balancer initiated recheck"); Hive->ProcessTabletBalancer(); + } else { + Send(Hive->SelfId(), new TEvPrivate::TEvBalancerOut()); } return IActor::PassAway(); } @@ -239,6 +241,7 @@ protected: tablets.emplace_back(tablet); } } + BLOG_TRACE("Balancer on node " << node->Id << ": " << tablets.size() << "/" << nodeTablets.size() << " tablets is suitable for balancing"); if (!tablets.empty()) { switch (Hive->GetTabletBalanceStrategy()) { case NKikimrConfig::THiveConfig::HIVE_TABLET_BALANCE_STRATEGY_OLD_WEIGHTED_RANDOM: diff --git a/ydb/core/mind/hive/hive_events.h b/ydb/core/mind/hive/hive_events.h index 9e26edc7a8..b846a32cbe 100644 --- a/ydb/core/mind/hive/hive_events.h +++ b/ydb/core/mind/hive/hive_events.h @@ -23,6 +23,7 @@ struct TEvPrivate { EvUnlockTabletReconnectTimeout, EvProcessPendingOperations, EvRestartComplete, + EvBalancerOut, EvEnd }; @@ -37,7 +38,7 @@ struct TEvPrivate { }; struct TEvProcessBootQueue : TEventLocal<TEvProcessBootQueue, EvProcessBootQueue> {}; - + struct TEvPostponeProcessBootQueue : TEventLocal<TEvPostponeProcessBootQueue, EvPostponeProcessBootQueue> {}; struct TEvProcessDisconnectNode : TEventLocal<TEvProcessDisconnectNode, EvProcessDisconnectNode> { @@ -78,6 +79,8 @@ struct TEvPrivate { }; struct TEvProcessPendingOperations : TEventLocal<TEvProcessPendingOperations, EvProcessPendingOperations> {}; + + struct TEvBalancerOut : TEventLocal<TEvBalancerOut, EvBalancerOut> {}; }; } // NHive diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp index efd7f36bb1..2dfb33dbb9 100644 --- a/ydb/core/mind/hive/hive_impl.cpp +++ b/ydb/core/mind/hive/hive_impl.cpp @@ -308,6 +308,10 @@ void THive::Handle(TEvPrivate::TEvProcessPendingOperations::TPtr&) { BLOG_D("Handle ProcessPendingOperations"); } +void THive::Handle(TEvPrivate::TEvBalancerOut::TPtr&) { + BLOG_D("Handle BalancerOut"); +} + void THive::Handle(TEvHive::TEvBootTablet::TPtr& ev) { TTabletId tabletId = ev->Get()->Record.GetTabletID(); TTabletInfo* tablet = FindTablet(tabletId); @@ -554,6 +558,14 @@ void THive::BuildCurrentConfig() { for (const NKikimrConfig::THiveTabletPreference& tabletPreference : CurrentConfig.GetDefaultTabletPreference()) { DefaultDataCentersPreference[tabletPreference.GetType()] = tabletPreference.GetDataCentersPreference(); } + BalancerIgnoreTabletTypes.clear(); + for (auto i : CurrentConfig.GetBalancerIgnoreTabletTypes()) { + const auto type = TTabletTypes::EType(i); + if (IsValidTabletType(type)) { + BalancerIgnoreTabletTypes.emplace_back(type); + } + } + MakeTabletTypeSet(BalancerIgnoreTabletTypes); } void THive::Cleanup() { @@ -1612,6 +1624,7 @@ void THive::FillTabletInfo(NKikimrHive::TEvResponseHiveInfo& response, ui64 tabl auto& tabletInfo = *response.AddTablets(); tabletInfo.SetTabletID(tabletId); tabletInfo.SetTabletType(info->Type); + tabletInfo.SetObjectId(info->ObjectId); tabletInfo.SetState(static_cast<ui32>(info->State)); tabletInfo.SetTabletBootMode(info->BootMode); tabletInfo.SetVolatileState(info->GetVolatileState()); @@ -1620,6 +1633,9 @@ void THive::FillTabletInfo(NKikimrHive::TEvResponseHiveInfo& response, ui64 tabl tabletInfo.MutableTabletOwner()->SetOwnerIdx(info->Owner.second); tabletInfo.SetGeneration(info->KnownGeneration); tabletInfo.MutableObjectDomain()->CopyFrom(info->ObjectDomain); + if (info->BalancerPolicy != NKikimrHive::EBalancerPolicy::POLICY_BALANCE) { + tabletInfo.SetBalancerPolicy(info->BalancerPolicy); + } if (!info->IsRunning()) { tabletInfo.SetLastAliveTimestamp(info->Statistics.GetLastAliveTimestamp()); } @@ -2437,6 +2453,7 @@ STFUNC(THive::StateWork) { hFunc(NSysView::TEvSysView::TEvGetTabletsRequest, Handle); hFunc(TEvHive::TEvRequestTabletOwners, Handle); hFunc(TEvHive::TEvTabletOwnersReply, Handle); + hFunc(TEvPrivate::TEvBalancerOut, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { BLOG_W("THive::StateWork unhandled event type: " << ev->GetTypeRewrite() diff --git a/ydb/core/mind/hive/hive_impl.h b/ydb/core/mind/hive/hive_impl.h index 85f6ea922d..ad25889305 100644 --- a/ydb/core/mind/hive/hive_impl.h +++ b/ydb/core/mind/hive/hive_impl.h @@ -140,6 +140,8 @@ TString GetConditionalRedString(const TString& str, bool condition); TString GetDataCenterName(ui64 dataCenterId); TString LongToShortTabletName(const TString& longTabletName); TString GetLocationString(const NActors::TNodeLocation& location); +void MakeTabletTypeSet(std::vector<TTabletTypes::EType>& list); +bool IsValidTabletType(TTabletTypes::EType type); class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveSharedSettings { public: @@ -404,6 +406,9 @@ protected: std::unordered_map<TDataCenterId, std::unordered_set<TNodeId>> RegisteredDataCenterNodes; std::unordered_set<TNodeId> ConnectedNodes; + // normalized to be sorted list of unique values + std::vector<TTabletTypes::EType> BalancerIgnoreTabletTypes; // built from CurrentConfig + // to be removed later bool TabletOwnersSynced = false; // to be removed later @@ -482,6 +487,7 @@ protected: void Handle(TEvPrivate::TEvProcessTabletBalancer::TPtr&); void Handle(TEvPrivate::TEvUnlockTabletReconnectTimeout::TPtr&); void Handle(TEvPrivate::TEvProcessPendingOperations::TPtr&); + void Handle(TEvPrivate::TEvBalancerOut::TPtr&); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev); void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev); @@ -748,6 +754,12 @@ public: return initialMaximum; } + bool IsInBalancerIgnoreList(TTabletTypes::EType type) const { + const auto& ignoreList = BalancerIgnoreTabletTypes; + auto found = std::find(ignoreList.begin(), ignoreList.end(), type); + return (found != ignoreList.end()); + } + static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier); static bool IsSystemTablet(TTabletTypes::EType type); diff --git a/ydb/core/mind/hive/hive_schema.h b/ydb/core/mind/hive/hive_schema.h index 2b664b6fdc..8fdd9cf6c7 100644 --- a/ydb/core/mind/hive/hive_schema.h +++ b/ydb/core/mind/hive/hive_schema.h @@ -93,6 +93,8 @@ struct Schema : NIceDb::Schema { struct DataCentersPreference : Column<121, NScheme::NTypeIds::String> { using Type = NKikimrHive::TDataCentersPreference; }; struct AllowedDataCenterIds : Column<122, NScheme::NTypeIds::String> { using Type = TVector<TString>; }; + struct BalancerPolicy : Column<123, NScheme::NTypeIds::Uint64> { using Type = NKikimrHive::EBalancerPolicy; static constexpr NKikimrHive::EBalancerPolicy Default = NKikimrHive::EBalancerPolicy::POLICY_BALANCE; }; + using TKey = TableKey<ID>; using TColumns = TableColumns< ID, @@ -118,7 +120,8 @@ struct Schema : NIceDb::Schema { ReassignReason, Statistics, DataCentersPreference, - AllowedDataCenterIds + AllowedDataCenterIds, + BalancerPolicy >; }; diff --git a/ydb/core/mind/hive/hive_statics.cpp b/ydb/core/mind/hive/hive_statics.cpp index 1f5e264393..4eae631b94 100644 --- a/ydb/core/mind/hive/hive_statics.cpp +++ b/ydb/core/mind/hive/hive_statics.cpp @@ -340,5 +340,16 @@ TString GetLocationString(const NActors::TNodeLocation& location) { return proto.ShortDebugString(); } +void MakeTabletTypeSet(std::vector<TTabletTypes::EType>& list) { + std::sort(list.begin(), list.end()); + list.erase(std::unique(list.begin(), list.end()), list.end()); +} + +bool IsValidTabletType(TTabletTypes::EType type) { + return (type > TTabletTypes::Unknown + && type < TTabletTypes::Reserved40 + ); +} + } // NHive } // NKikimr diff --git a/ydb/core/mind/hive/hive_ut.cpp b/ydb/core/mind/hive/hive_ut.cpp index e20b07b98a..24ecde610e 100644 --- a/ydb/core/mind/hive/hive_ut.cpp +++ b/ydb/core/mind/hive/hive_ut.cpp @@ -20,6 +20,8 @@ #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/mediator/mediator.h> +#include <ydb/core/mind/hive/hive_events.h> + #include <library/cpp/actors/interconnect/interconnect_impl.h> #include <library/cpp/malloc/api/malloc.h> @@ -245,7 +247,7 @@ namespace { } } - void SetupServices(TTestActorRuntime &runtime, bool isLocalEnabled) { + void SetupServices(TTestActorRuntime &runtime, bool isLocalEnabled, const std::function<void(TAppPrepare&)> & appConfigSetup) { TAppPrepare app; SetupDomainInfo(runtime, app); @@ -255,6 +257,10 @@ namespace { app.SetRequestSequenceSize(10); app.SetHiveStoragePoolFreshPeriod(0); + if (appConfigSetup) { + appConfigSetup(app); + } + SetupNodeWarden(runtime); SetupPDisk(runtime); @@ -352,12 +358,12 @@ namespace { UNIT_ASSERT(configureResponse->Record.GetResponse().GetSuccess()); } - void Setup(TTestActorRuntime& runtime, bool isLocalEnabled = true, ui32 numGroups = 1) { + void Setup(TTestActorRuntime& runtime, bool isLocalEnabled = true, ui32 numGroups = 1, const std::function<void(TAppPrepare&)> & appConfigSetup = nullptr) { using namespace NMalloc; TMallocInfo mallocInfo = MallocInfo(); mallocInfo.SetParam("FillMemoryOnAllocation", "false"); SetupLogging(runtime); - SetupServices(runtime, isLocalEnabled); + SetupServices(runtime, isLocalEnabled, appConfigSetup); SetupBoxAndStoragePool(runtime, numGroups); } @@ -3596,6 +3602,319 @@ Y_UNIT_TEST_SUITE(THiveTest) { } } + Y_UNIT_TEST(TestHiveBalancerIgnoreTablet) { + // Test plan: + // - create configuration where: + // - there is single node which run several tablets with different BalancerPolicy + // - and all tablets report very high resource usage + // (so that balancer wants to unload the node but have no space to move tablets to) + // - then add enough empty nodes + // - test that balancer moved out all tablets except those with BalancerPolicy=BALANCER_IGNORE + // - change BalancerPolicy to BALANCER_BALANCE for all remaining tablets + // - test that balancer also moved out former BALANCER_IGNORE tablets + // + static const int NUM_NODES = 4; + static const int NUM_TABLETS = 3; + static const ui64 SINGLE_TABLET_NETWORK_USAGE = 5000000; + + TTestBasicRuntime runtime(NUM_NODES, false); + + Setup(runtime, true, 1, [](TAppPrepare& app) { + app.HiveConfig.SetMaxMovementsOnAutoBalancer(100); + app.HiveConfig.SetMinPeriodBetweenBalance(0.1); + app.HiveConfig.SetTabletKickCooldownPeriod(0); + app.HiveConfig.SetResourceChangeReactionPeriod(0); + // this value of MaxNodeUsageToKick is selected specifically to make test scenario work + // in link with number of tablets and values of network usage metrics used below + app.HiveConfig.SetMaxNodeUsageToKick(0.01); + }); + + TActorId senderA = runtime.AllocateEdgeActor(); + const ui64 hiveTablet = MakeDefaultHiveID(0); + + CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive); + + // wait for creation of nodes + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvLocal::EvStatus, NUM_NODES); + runtime.DispatchEvents(options); + } + + // stop all but one local services to emulate single node configuration + for (int i = 1; i < NUM_NODES; ++i) { + SendKillLocal(runtime, i); + } + + struct TTabletMiniInfo { + ui64 TabletId; + ui64 ObjectId; + ui32 NodeIndex; + NKikimrHive::EBalancerPolicy BalancerPolicy; + }; + auto getTabletInfos = [&runtime, senderA] (ui64 hiveTablet) { + runtime.SendToPipe(hiveTablet, senderA, new TEvHive::TEvRequestHiveInfo()); + TAutoPtr<IEventHandle> handle; + TEvHive::TEvResponseHiveInfo* response = runtime.GrabEdgeEventRethrow<TEvHive::TEvResponseHiveInfo>(handle); + const int nodeBase = runtime.GetNodeId(0); + std::vector<TTabletMiniInfo> tabletInfos; + for (const NKikimrHive::TTabletInfo& tablet : response->Record.GetTablets()) { + int nodeIndex = (int)tablet.GetNodeID() - nodeBase; + UNIT_ASSERT_C(nodeIndex >= 0 && nodeIndex < NUM_NODES, "nodeId# " << tablet.GetNodeID() << " nodeBase# " << nodeBase); + tabletInfos.push_back({tablet.GetTabletID(), tablet.GetObjectId(), tablet.GetNodeID() - nodeBase, tablet.GetBalancerPolicy()}); + } + std::reverse(tabletInfos.begin(), tabletInfos.end()); + return tabletInfos; + }; + auto reportTabletMetrics = [&runtime, senderA, hiveTablet](ui64 tabletId, ui64 network, bool sync) { + THolder<TEvHive::TEvTabletMetrics> metrics = MakeHolder<TEvHive::TEvTabletMetrics>(); + NKikimrHive::TTabletMetrics* metric = metrics->Record.AddTabletMetrics(); + metric->SetTabletID(tabletId); + metric->MutableResourceUsage()->SetNetwork(network); + + runtime.SendToPipe(hiveTablet, senderA, metrics.Release()); + + if (sync) { + TAutoPtr<IEventHandle> handle; + auto* response = runtime.GrabEdgeEvent<TEvLocal::TEvTabletMetricsAck>(handle); + Y_UNUSED(response); + } + }; + + const ui64 testerTablet = MakeDefaultHiveID(1); + const TTabletTypes::EType tabletType = TTabletTypes::Dummy; + + Ctest << "Step A: create tablets" << Endl; + + // create NUM_TABLETS tablets, some with BalancerPolicy set to "ignore" + for (int i = 0; i < NUM_TABLETS; ++i) { + THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, tabletType, BINDED_CHANNELS)); + ev->Record.SetObjectId(i); + switch (i % NUM_TABLETS) { + case 0: // policy not explicitly set + break; + case 1: // policy explicitly set to default value + ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_BALANCE); + break; + case 2: // policy explicitly set to ignore + ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_IGNORE); + break; + } + ui64 tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, true); + MakeSureTabletIsUp(runtime, tabletId, 0); + } + + Ctest << "Step A: get tablets info" << Endl; + auto tabletInfos_A = getTabletInfos(hiveTablet); + + // check that tablets retain their BalancerPolicy flags... + for (const auto& i : tabletInfos_A) { + Ctest << "Step A: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl; + switch (i.ObjectId % NUM_TABLETS) { + case 0: + case 1: + UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_BALANCE, "objectId# " << i.ObjectId << " value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy)); + break; + case 2: + UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_IGNORE, "value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy)); + break; + } + } + // ...and that all tablets are distributed on a single node + { + std::array<int, NUM_NODES> nodeTablets = {}; + for (auto& i : tabletInfos_A) { + ++nodeTablets[i.NodeIndex]; + } + Ctest << "Step A: tablet distribution"; + for (auto i : nodeTablets) { + Ctest << " " << i; + } + Ctest << Endl; + auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end()); + UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0); + UNIT_ASSERT_VALUES_EQUAL(*minmax.second, NUM_TABLETS); + } + + Ctest << "Step B: report tablets metrics" << Endl; + + // report raised tablet metrics (to kickoff the balancer) + for (const auto& i: tabletInfos_A) { + reportTabletMetrics(i.TabletId, SINGLE_TABLET_NETWORK_USAGE, true); + } + + Ctest << "Step B: wait for balancer to complete" << Endl; + { + TDispatchOptions options; + options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut); + runtime.DispatchEvents(options, TDuration::Seconds(10)); + } + + Ctest << "Step B: get tablets info" << Endl; + auto tabletInfos_B = getTabletInfos(hiveTablet); + + // check that all tablet are still on a single node + { + std::array<int, NUM_NODES> nodeTablets = {}; + for (auto& i : tabletInfos_B) { + ++nodeTablets[i.NodeIndex]; + } + Ctest << "Step B: tablet distribution"; + for (auto i : nodeTablets) { + Ctest << " " << i; + } + Ctest << Endl; + auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end()); + UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0); + UNIT_ASSERT_VALUES_EQUAL(*minmax.second, NUM_TABLETS); + } + + Ctest << "Step C: add empty nodes" << Endl; + for (int i = 1; i < NUM_NODES; ++i) { + CreateLocal(runtime, i); + } + // wait for creation of nodes + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvLocal::EvStatus, NUM_NODES - 1); + runtime.DispatchEvents(options); + } + + Ctest << "Step C: touch tablets metrics" << Endl; + // touch tablet metrics (to kickoff the balancer) + for (const auto& i: tabletInfos_B) { + reportTabletMetrics(i.TabletId, 0, true); + } + + Ctest << "Step C: wait for balancer to complete" << Endl; + { + TDispatchOptions options; + options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut); + runtime.DispatchEvents(options, TDuration::Seconds(10)); + } + + Ctest << "Step C: get tablets info" << Endl; + auto tabletInfos_C = getTabletInfos(hiveTablet); + + // check that ignored tablets stayed as they are... + for (const auto& i : tabletInfos_C) { + Ctest << "Step C: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl; + switch (i.ObjectId % NUM_TABLETS) { + case 0: + case 1: + break; + case 2: + UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_IGNORE, "value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy)); + ui32 oldNodeIndex = tabletInfos_B[i.ObjectId].NodeIndex; + ui32 newNodeIndex = i.NodeIndex; + UNIT_ASSERT_VALUES_EQUAL(oldNodeIndex, newNodeIndex); + break; + } + } + // ...but ordinary tablets did move out to other nodes + { + std::array<int, NUM_NODES> nodeTablets = {}; + for (auto& i : tabletInfos_C) { + ++nodeTablets[i.NodeIndex]; + } + Ctest << "Step C: tablet distribution"; + for (auto i : nodeTablets) { + Ctest << " " << i; + } + Ctest << Endl; + auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end()); + UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0); + UNIT_ASSERT_VALUES_EQUAL(*minmax.second, 1); + UNIT_ASSERT_VALUES_EQUAL(nodeTablets[0], 1); + } + + Ctest << "Step D: change tablets BalancerPolicy" << Endl; + + // set all tablets with BalancerPolicy "ignore" back to "balance" + for (int i = 0; i < NUM_TABLETS; ++i) { + switch(i % NUM_TABLETS) { + case 0: + case 1: + break; + case 2: + THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, tabletType, BINDED_CHANNELS)); + ev->Record.SetObjectId(i); + ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_BALANCE); + ui64 tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, false); + Y_UNUSED(tabletId); + break; + } + } + + Ctest << "Step D: get tablets info" << Endl; + auto tabletInfos_D = getTabletInfos(hiveTablet); + + // check that all BalancerPolicy "ignore" flags are dropped + for (const auto& i : tabletInfos_D) { + Ctest << "Step D: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl; + UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_BALANCE, "objectId# " << i.ObjectId << " value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy)); + } + + Ctest << "Step D: raise metrics for previously ignored tablets" << Endl; + for (const auto& i: tabletInfos_D) { + switch(i.ObjectId % NUM_TABLETS) { + case 0: + case 1: + break; + case 2: + reportTabletMetrics(i.TabletId, NUM_TABLETS * SINGLE_TABLET_NETWORK_USAGE, true); + break; + } + } + + Ctest << "Step D: wait for balancer to complete" << Endl; + { + TDispatchOptions options; + options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut); + runtime.DispatchEvents(options, TDuration::Seconds(10)); + } + + Ctest << "Step E: get tablets info" << Endl; + auto tabletInfos_E = getTabletInfos(hiveTablet); + + // check that (some) former ignored tablets have moved now... + { + bool ignoredTabletsAreMoved = false; + for (const auto& i : tabletInfos_E) { + Ctest << "Step E: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl; + switch (i.ObjectId % NUM_TABLETS) { + case 0: + case 1: + break; + case 2: + ui32 oldNodeIndex = tabletInfos_A[i.ObjectId].NodeIndex; + ui32 newNodeIndex = i.NodeIndex; + if (oldNodeIndex != newNodeIndex) { + ignoredTabletsAreMoved = true; + } + break; + } + } + UNIT_ASSERT_VALUES_EQUAL(ignoredTabletsAreMoved, true); + } + // ...and that the original node is completely void of tablets + { + std::array<int, NUM_NODES> nodeTablets = {}; + for (auto& i : tabletInfos_E) { + ++nodeTablets[i.NodeIndex]; + } + Ctest << "Step E: tablet distribution"; + for (auto i : nodeTablets) { + Ctest << " " << i; + } + Ctest << Endl; + auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end()); + UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0); + UNIT_ASSERT_VALUES_EQUAL(*minmax.second, 1); + UNIT_ASSERT_VALUES_EQUAL(nodeTablets[0], 0); + } + } + Y_UNIT_TEST(TestRestartTablets) { TTestBasicRuntime runtime(3, false); Setup(runtime, true); diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp index ba1fea2c46..8770c39c4d 100644 --- a/ydb/core/mind/hive/monitoring.cpp +++ b/ydb/core/mind/hive/monitoring.cpp @@ -730,6 +730,29 @@ public: UpdateConfig(db, "NodeSelectStrategy"); UpdateConfig(db, "CheckMoveExpediency"); + if (params.contains("BalancerIgnoreTabletTypes")) { + TVector<TString> tabletTypeNames = SplitString(params.Get("BalancerIgnoreTabletTypes"), ";"); + std::vector<TTabletTypes::EType> newTypeList; + for (const auto& name : tabletTypeNames) { + TTabletTypes::EType type = TTabletTypes::StrToType(Strip(name)); + if (IsValidTabletType(type)) { + newTypeList.emplace_back(type); + } + } + MakeTabletTypeSet(newTypeList); + if (newTypeList != Self->BalancerIgnoreTabletTypes) { + // replace DatabaseConfig.BalancerIgnoreTabletTypes inplace + auto* field = Self->DatabaseConfig.MutableBalancerIgnoreTabletTypes(); + field->Reserve(newTypeList.size()); + field->Clear(); + for (auto i : newTypeList) { + field->Add(i); + } + ChangeRequest = true; + // Self->BalancerIgnoreTabletTypes will be replaced by Self->BuildCurrentConfig() + } + } + if (ChangeRequest) { Self->BuildCurrentConfig(); db.Table<Schema::State>().Key(TSchemeIds::State::DefaultState).Update<Schema::State::Config>(Self->DatabaseConfig); @@ -880,6 +903,65 @@ public: } } + void ShowConfigForBalancerIgnoreTabletTypes(IOutputStream& out) { + // value of protobuf type "repeated field of ETabletTypes::EType" + // is represented as a single string build from list delimited type names + + auto makeListString = [] (const NKikimrConfig::THiveConfig& config) { + std::vector<TTabletTypes::EType> types; + for (auto i : config.GetBalancerIgnoreTabletTypes()) { + const auto type = TTabletTypes::EType(i); + if (IsValidTabletType(type)) { + types.emplace_back(type); + } + } + MakeTabletTypeSet(types); + TVector<TString> names; + for (auto i : types) { + names.emplace_back(TTabletTypes::TypeToStr(i)); + } + return JoinStrings(names, ";"); + }; + + const TString param("BalancerIgnoreTabletTypes"); + + NKikimrConfig::THiveConfig builtinConfig; + auto builtinDefault = makeListString(builtinConfig); + auto clusterDefault = makeListString(Self->ClusterConfig); + auto currentValue = makeListString(Self->CurrentConfig); + + bool localOverrided = (currentValue != clusterDefault); + + out << "<div class='row'>"; + { + // mark if value is changed locally + out << "<div class='col-sm-3' style='padding-top:12px;text-align:right'>" + << "<label for='" << param << "'" + << (localOverrided ? "" : "' style='font-weight:normal'") + << ">" << param << ":</label>" + << "</div>"; + // editable current value + out << "<div class='col-sm-2' style='padding-top:5px'>" + << "<input id='" << param << "' style='max-width:170px;margin-top:7px' onkeydown='edit(this);' onchange='edit(this);'" + << " value='" << currentValue << "'>" + << "</div>"; + // apply button + out << "<div class='col-sm-1'><button type='button' class='btn' style='margin-top:5px' onclick='applyVal(this, \"" << param << "\");' disabled='true'>Apply</button></div>"; + // reset button + out << "<div class='col-sm-1'><button type='button' class='btn' style='margin-top:5px' onclick='resetVal(this, \"" << param << "\");' " << (localOverrided ? "" : "disabled='true'") << ">Reset</button></div>"; + // show cluster default + out << "<div id='CMS" << param << "' class='col-sm-2' style='padding-top:12px'>" + << clusterDefault + << "</div>"; + // show builtin default + out << "<div id='Default" << param << "' class='col-sm-2' style='padding-top:12px'>" + << builtinDefault + << "</div>"; + } + out << "</div>"; + } + + void RenderHTMLPage(IOutputStream& out, const TActorContext&/* ctx*/) { out << "<head></head><body>"; out << "<script>$('.container > h2').html('Settings');</script>"; @@ -916,6 +998,7 @@ public: ShowConfig(out, "MaxMovementsOnAutoBalancer"); ShowConfig(out, "ContinueAutoBalancer"); ShowConfig(out, "CheckMoveExpediency"); + ShowConfigForBalancerIgnoreTabletTypes(out); out << "<div class='row' style='margin-top:40px'>"; out << "<div class='col-sm-2' style='padding-top:30px;text-align:right'><label for='allowedMetrics'>AllowedMetrics:</label></div>"; @@ -956,10 +1039,8 @@ public: } out << "</table></div>"; out << "<div class='col-sm-2' style='padding-top:22px'><button type='button' class='btn' style='margin-top:5px' onclick='applyTab(this);' disabled='true'>Apply</button></div>"; - out << "</div>"; - out << "</div>"; out << R"___( @@ -1027,6 +1108,7 @@ public: error: function() { $(button).addClass('btn-danger'); } }); } + </script> )___"; @@ -1061,6 +1143,7 @@ public: Y_UNUSED(ctx); } + //TODO: move to hive_statics.cpp as utility function static TString GetTabletType(TTabletTypes::EType type) { switch(type) { case TTabletTypes::SchemeShard: @@ -2880,6 +2963,7 @@ public: result["VolatileStateChangeTime"] = tablet.VolatileStateChangeTime.ToString(); result["TabletRole"] = TTabletInfo::ETabletRoleName(tablet.TabletRole); result["LastBalancerDecisionTime"] = tablet.LastBalancerDecisionTime.ToString(); + result["BalancerPolicy"] = NKikimrHive::EBalancerPolicy_Name(tablet.BalancerPolicy); result["NodeId"] = tablet.NodeId; result["LastNodeId"] = tablet.LastNodeId; result["PreferredNodeId"] = tablet.PreferredNodeId; diff --git a/ydb/core/mind/hive/tablet_info.cpp b/ydb/core/mind/hive/tablet_info.cpp index 81af51c3a9..81ad491b22 100644 --- a/ydb/core/mind/hive/tablet_info.cpp +++ b/ydb/core/mind/hive/tablet_info.cpp @@ -21,6 +21,7 @@ TTabletInfo::TTabletInfo(ETabletRole role, THive& hive) , ResourceValues() , ResourceMetricsAggregates(Hive.GetDefaultResourceMetricsAggregates()) , Weight(0) + , BalancerPolicy(EBalancerPolicy::POLICY_BALANCE) {} const TLeaderTabletInfo& TTabletInfo::GetLeader() const { @@ -181,7 +182,9 @@ bool TTabletInfo::IsStopped() const { } bool TTabletInfo::IsGoodForBalancer(TInstant now) const { - return now - LastBalancerDecisionTime > Hive.GetTabletKickCooldownPeriod(); + return (BalancerPolicy == EBalancerPolicy::POLICY_BALANCE) + && !Hive.IsInBalancerIgnoreList(GetTabletType()) + && (now - LastBalancerDecisionTime > Hive.GetTabletKickCooldownPeriod()); } bool TTabletInfo::InitiateBoot() { diff --git a/ydb/core/mind/hive/tablet_info.h b/ydb/core/mind/hive/tablet_info.h index aa03ecd357..5ab320e759 100644 --- a/ydb/core/mind/hive/tablet_info.h +++ b/ydb/core/mind/hive/tablet_info.h @@ -95,6 +95,7 @@ struct TTabletInfo { friend class TTxMonEvent_TabletInfo; public: using EVolatileState = NKikimrHive::ETabletVolatileState; + using EBalancerPolicy = NKikimrHive::EBalancerPolicy; enum class ETabletRole { Leader, @@ -158,6 +159,7 @@ public: double Weight; mutable TString BootState; TInstant PostponedStart; + EBalancerPolicy BalancerPolicy; TTabletInfo(ETabletRole role, THive& hive); TTabletInfo(const TTabletInfo&) = delete; diff --git a/ydb/core/mind/hive/tx__create_tablet.cpp b/ydb/core/mind/hive/tx__create_tablet.cpp index 5606a10f22..6590c0abf1 100644 --- a/ydb/core/mind/hive/tx__create_tablet.cpp +++ b/ydb/core/mind/hive/tx__create_tablet.cpp @@ -29,6 +29,8 @@ class TTxCreateTablet : public TTransactionBase<THive> { NKikimrHive::ETabletBootMode BootMode; NKikimrHive::TForwardRequest ForwardRequest; + NKikimrHive::EBalancerPolicy BalancerPolicy; + TSideEffects SideEffects; public: @@ -48,6 +50,7 @@ public: , BoundChannels(RequestData.GetBindedChannels().begin(), RequestData.GetBindedChannels().end()) , AllowedDomains(RequestData.GetAllowedDomains().begin(), RequestData.GetAllowedDomains().end()) , BootMode(RequestData.GetTabletBootMode()) + , BalancerPolicy(RequestData.GetBalancerPolicy()) { const ui32 allowedNodeIdsSize = RequestData.AllowedNodeIDsSize(); AllowedNodeIds.reserve(allowedNodeIdsSize); @@ -270,6 +273,8 @@ public: db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::ObjectID>(ObjectId); tablet->BootMode = BootMode; db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::BootMode>(BootMode); + tablet->BalancerPolicy = BalancerPolicy; + db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::BalancerPolicy>(BalancerPolicy); UpdateChannelsBinding(*tablet, db); @@ -368,6 +373,7 @@ public: tablet.ObjectId = ObjectId; tablet.AssignDomains(ObjectDomain, AllowedDomains); tablet.Statistics.SetLastAliveTimestamp(now.MilliSeconds()); + tablet.BalancerPolicy = BalancerPolicy; TVector<ui32> allowedDataCenters; for (const TDataCenterId& dc : tablet.AllowedDataCenters) { @@ -387,7 +393,8 @@ public: NIceDb::TUpdate<Schema::Tablet::BootMode>(tablet.BootMode), NIceDb::TUpdate<Schema::Tablet::ObjectID>(tablet.ObjectId), NIceDb::TUpdate<Schema::Tablet::ObjectDomain>(ObjectDomain), - NIceDb::TUpdate<Schema::Tablet::Statistics>(tablet.Statistics)); + NIceDb::TUpdate<Schema::Tablet::Statistics>(tablet.Statistics), + NIceDb::TUpdate<Schema::Tablet::BalancerPolicy>(tablet.BalancerPolicy)); Self->PendingCreateTablets.erase({OwnerId, OwnerIdx}); diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h index 8769068f37..9230290af2 100644 --- a/ydb/core/node_whiteboard/node_whiteboard.h +++ b/ydb/core/node_whiteboard/node_whiteboard.h @@ -57,6 +57,7 @@ struct TEvWhiteboard{ EvSignalBodyResponse, EvPDiskStateDelete, EvVDiskStateGenerationChange, + EvVDiskDropDonors, EvEnd }; @@ -220,6 +221,27 @@ struct TEvWhiteboard{ {} }; + struct TEvVDiskDropDonors : TEventLocal<TEvVDiskDropDonors, EvVDiskDropDonors> { + const TVDiskID VDiskId; + const ui64 InstanceGuid; + const std::vector<NKikimrBlobStorage::TVSlotId> DropDonors; + const bool DropAllDonors = false; + + TEvVDiskDropDonors(TVDiskID vdiskId, ui64 instanceGuid, std::vector<NKikimrBlobStorage::TVSlotId> dropDonors) + : VDiskId(vdiskId) + , InstanceGuid(instanceGuid) + , DropDonors(std::move(dropDonors)) + {} + + struct TDropAllDonors {}; + + TEvVDiskDropDonors(TVDiskID vdiskId, ui64 instanceGuid, TDropAllDonors) + : VDiskId(vdiskId) + , InstanceGuid(instanceGuid) + , DropAllDonors(true) + {} + }; + struct TEvPDiskStateDelete : TEventPB<TEvPDiskStateDelete, NKikimrWhiteboard::TPDiskStateInfo, EvPDiskStateDelete> { TEvPDiskStateDelete() = default; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index ecdbf13e2c..c066022327 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -682,16 +682,29 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) { if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK || - resp.WriteResultSize() != 1 || - resp.GetWriteResult(0).GetStatus() != NKikimrProto::OK) - { + resp.WriteResultSize() < 1) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Config write error: " << resp.DebugString() << " " << ctx.SelfID); ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); return; } + for (const auto& res : resp.GetWriteResult()) { + if (res.GetStatus() != NKikimrProto::OK) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() + << " Config write error: " << resp.DebugString() << " " << ctx.SelfID); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } + } + + if (resp.WriteResultSize() > 1) { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() + << " restarting - have some registering of message groups"); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } - Y_VERIFY(resp.WriteResultSize() == 1); + Y_VERIFY(resp.WriteResultSize() >= 1); Y_VERIFY(resp.GetWriteResult(0).GetStatus() == NKikimrProto::OK); if (ConfigInited && PartitionsInited == Partitions.size()) //all partitions are working well - can apply new config ApplyNewConfigAndReply(ctx); @@ -1269,12 +1282,14 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf } sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange)); + } - for (const auto& partition : cfg.GetPartitions()) { - sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId()); - } + for (const auto& partition : cfg.GetPartitions()) { + sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId()); } + Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1); + NewConfig = cfg; ctx.Send(ctx.SelfID, request.Release()); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 50a7de9572..eb50087482 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -707,6 +707,7 @@ message TFeatureFlags { optional bool EnablePredicateExtractForDataQueries = 68 [default = false]; optional bool EnableMoveIndex = 70 [default = false]; optional bool EnableNotNullDataColumns = 73 [default = false]; + optional bool EnableGrpcAudit = 74 [default = false]; } @@ -1419,6 +1420,7 @@ message THiveConfig { optional bool CheckMoveExpediency = 46 [default = true]; optional uint64 StoragePoolFreshPeriod = 47 [default = 60000]; // milliseconds optional string PoolsToMonitorForUsage = 48 [default = "System,User,IC"]; + repeated NKikimrTabletBase.TTabletTypes.EType BalancerIgnoreTabletTypes = 49; } message TDataShardConfig { diff --git a/ydb/core/protos/hive.proto b/ydb/core/protos/hive.proto index 0b6bac0755..659a158eaf 100644 --- a/ydb/core/protos/hive.proto +++ b/ydb/core/protos/hive.proto @@ -16,6 +16,11 @@ enum ETabletBootMode { TABLET_BOOT_MODE_EXTERNAL = 1; } +enum EBalancerPolicy { + POLICY_BALANCE = 0; + POLICY_IGNORE = 1; +} + enum EErrorReason { // Default unspecified error reason ERROR_REASON_UNKNOWN = 0; @@ -95,6 +100,7 @@ message TEvCreateTablet { repeated NKikimrSubDomains.TDomainKey AllowedDomains = 23; optional TDataCentersPreference DataCentersPreference = 24; repeated string AllowedDataCenters = 25; + optional EBalancerPolicy BalancerPolicy = 26; optional uint32 ChannelsProfile = 5 [deprecated = true]; // DEPRECATED optional uint32 Flags = 6 [deprecated = true]; // DEPRECATED @@ -458,6 +464,7 @@ message TTabletInfo { optional NKikimrTabletBase.TMetrics ResourceUsage = 21; optional uint32 RestartsPerPeriod = 22; optional uint64 LastAliveTimestamp = 23; + optional EBalancerPolicy BalancerPolicy = 24; } message TEvSeizeTabletsReply { diff --git a/ydb/core/tablet/node_whiteboard.cpp b/ydb/core/tablet/node_whiteboard.cpp index 46be0f95e0..0677480cf3 100644 --- a/ydb/core/tablet/node_whiteboard.cpp +++ b/ydb/core/tablet/node_whiteboard.cpp @@ -392,6 +392,7 @@ protected: HFunc(TEvWhiteboard::TEvVDiskStateGenerationChange, Handle); HFunc(TEvWhiteboard::TEvVDiskStateDelete, Handle); HFunc(TEvWhiteboard::TEvVDiskStateRequest, Handle); + HFunc(TEvWhiteboard::TEvVDiskDropDonors, Handle); HFunc(TEvWhiteboard::TEvBSGroupStateUpdate, Handle); HFunc(TEvWhiteboard::TEvBSGroupStateDelete, Handle); HFunc(TEvWhiteboard::TEvBSGroupStateRequest, Handle); @@ -455,6 +456,7 @@ protected: } else if (const auto it = VDiskStateInfo.find(key); it != VDiskStateInfo.end() && it->second.GetInstanceGuid() == record.GetInstanceGuid()) { auto& value = it->second; + if (CheckedMerge(value, record) >= 100) { value.SetChangeTime(ctx.Now().MilliSeconds()); UpdateSystemState(ctx); @@ -479,6 +481,37 @@ protected: } } + void Handle(TEvWhiteboard::TEvVDiskDropDonors::TPtr& ev, const TActorContext& ctx) { + auto& msg = *ev->Get(); + if (const auto it = VDiskStateInfo.find(msg.VDiskId); it != VDiskStateInfo.end() && + it->second.GetInstanceGuid() == msg.InstanceGuid) { + auto& value = it->second; + bool change = false; + + if (msg.DropAllDonors) { + change = !value.GetDonors().empty(); + value.ClearDonors(); + } else { + for (const auto& donor : msg.DropDonors) { + auto *donors = value.MutableDonors(); + for (int i = 0; i < donors->size(); ++i) { + auto& x = donors->at(i); + if (x.GetNodeId() == donor.GetNodeId() && x.GetPDiskId() == donor.GetPDiskId() && x.GetVSlotId() == donor.GetVSlotId()) { + donors->DeleteSubrange(i, 1); + change = true; + break; + } + } + } + } + + if (change) { + value.SetChangeTime(ctx.Now().MilliSeconds()); + UpdateSystemState(ctx); + } + } + } + void Handle(TEvWhiteboard::TEvBSGroupStateUpdate::TPtr &ev, const TActorContext &ctx) { auto& bSGroupStateInfo = BSGroupStateInfo[ev->Get()->Record.GetGroupID()]; if (CheckedMerge(bSGroupStateInfo, ev->Get()->Record) >= 100) { diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index 94a0aea199..7749e41106 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -701,7 +701,7 @@ public: // TTabletMon(::NMonitoring::TDynamicCounterPtr counters, bool isFollower, TActorId dbWatcherActorId) : Counters(GetServiceCounters(counters, isFollower ? "followers" : "tablets")) - , AllTypes(Counters.Get(), "type", "all") + , AllTypes(MakeIntrusive<TTabletCountersForTabletType>(Counters.Get(), "type", "all")) , IsFollower(isFollower) , DbWatcherActorId(dbWatcherActorId) { @@ -714,9 +714,9 @@ public: const TTabletCountersBase* executorCounters, const TTabletCountersBase* appCounters, const TActorContext& ctx) { - AllTypes.Apply(tabletID, executorCounters, nullptr, tabletType); + AllTypes->Apply(tabletID, executorCounters, nullptr, tabletType); // - auto* typeCounters = GetOrAddCountersByTabletType(tabletType, CountersByTabletType, Counters); + auto typeCounters = GetOrAddCountersByTabletType(tabletType, CountersByTabletType, Counters); if (typeCounters) { typeCounters->Apply(tabletID, executorCounters, appCounters, tabletType); } @@ -783,7 +783,7 @@ public: } void ForgetTablet(ui64 tabletID, TTabletTypes::EType tabletType, TPathId tenantPathId) { - AllTypes.Forget(tabletID); + AllTypes->Forget(tabletID); // and now erase from every other path auto iterTabletType = CountersByTabletType.find(tabletType); if (iterTabletType != CountersByTabletType.end()) { @@ -912,13 +912,17 @@ public: } void RecalcAll() { - AllTypes.RecalcAll(); - for (auto& c : CountersByTabletType) { - c.second->RecalcAll(); + AllTypes->RecalcAll(); + for (auto& [_, counters] : CountersByTabletType) { + counters->RecalcAll(); } if (YdbCounters) { - YdbCounters->Initialize(Counters, CountersByTabletType); + auto hasDatashard = (bool)FindCountersByTabletType( + TTabletTypes::DataShard, CountersByTabletType); + auto hasSchemeshard = (bool)FindCountersByTabletType( + TTabletTypes::SchemeShard, CountersByTabletType); + YdbCounters->Initialize(Counters, hasDatashard, hasSchemeshard); YdbCounters->Transform(); } } @@ -929,7 +933,7 @@ public: private: // subgroups - class TTabletCountersForTabletType { + class TTabletCountersForTabletType : public TThrRefBase { public: // TTabletCountersForTabletType(::NMonitoring::TDynamicCounters* owner, const char* category, const char* name) @@ -1299,25 +1303,26 @@ private: TSolomonCounters TabletAppCounters; }; - typedef TMap<TTabletTypes::EType, TAutoPtr<TTabletCountersForTabletType> > TCountersByTabletType; + using TTabletCountersForTabletTypePtr = TIntrusivePtr<TTabletCountersForTabletType>; + typedef TMap<TTabletTypes::EType, TTabletCountersForTabletTypePtr> TCountersByTabletType; - static TTabletCountersForTabletType* FindCountersByTabletType( + static TTabletCountersForTabletTypePtr FindCountersByTabletType( TTabletTypes::EType tabletType, TCountersByTabletType& countersByTabletType) { auto iterTabletType = countersByTabletType.find(tabletType); if (iterTabletType != countersByTabletType.end()) { - return iterTabletType->second.Get(); + return iterTabletType->second; } return {}; } - static TTabletCountersForTabletType* GetOrAddCountersByTabletType( + static TTabletCountersForTabletTypePtr GetOrAddCountersByTabletType( TTabletTypes::EType tabletType, TCountersByTabletType& countersByTabletType, ::NMonitoring::TDynamicCounterPtr counters) { - auto* typeCounters = FindCountersByTabletType(tabletType, countersByTabletType); + auto typeCounters = FindCountersByTabletType(tabletType, countersByTabletType); if (!typeCounters) { TString tabletTypeStr = TTabletTypes::TypeToStr(tabletType); typeCounters = new TTabletCountersForTabletType( @@ -1443,14 +1448,8 @@ private: "table.datashard.used_core_percents", NMonitoring::LinearHistogram(12, 0, 10), false); }; - void Initialize( - ::NMonitoring::TDynamicCounterPtr counters, - TCountersByTabletType& countersByTabletType) - { - auto datashard = FindCountersByTabletType( - TTabletTypes::DataShard, countersByTabletType); - - if (datashard && !RowUpdates) { + void Initialize(::NMonitoring::TDynamicCounterPtr counters, bool hasDatashard, bool hasSchemeshard) { + if (hasDatashard && !RowUpdates) { auto datashardGroup = counters->GetSubgroup("type", "DataShard"); auto appGroup = datashardGroup->GetSubgroup("category", "app"); @@ -1474,10 +1473,7 @@ private: ConsumedCpuHistogram = execGroup->FindHistogram("HIST(ConsumedCPU)"); } - auto schemeshard = FindCountersByTabletType( - TTabletTypes::SchemeShard, countersByTabletType); - - if (schemeshard && !DiskSpaceTablesTotalBytes) { + if (hasSchemeshard && !DiskSpaceTablesTotalBytes) { auto schemeshardGroup = counters->GetSubgroup("type", "SchemeShard"); auto appGroup = schemeshardGroup->GetSubgroup("category", "app"); @@ -1552,7 +1548,6 @@ public: public: TTabletCountersForDb() : SolomonCounters(new ::NMonitoring::TDynamicCounters) - , AllTypes(SolomonCounters.Get(), "type", "all") {} TTabletCountersForDb(::NMonitoring::TDynamicCounterPtr externalGroup, @@ -1560,30 +1555,24 @@ public: THolder<TTabletCountersBase> executorCounters) : SolomonCounters(internalGroup) , ExecutorCounters(std::move(executorCounters)) - , AllTypes(SolomonCounters.Get(), "type", "all") { YdbCounters = MakeIntrusive<TYdbTabletCounters>(externalGroup); } void ToProto(NKikimr::NSysView::TDbServiceCounters& counters) override { - auto* proto = counters.FindOrAddTabletCounters(TTabletTypes::Unknown); - AllTypes.ToProto(*proto); - - for (auto& [type, tabletCounters] : CountersByTabletType) { - auto* proto = counters.FindOrAddTabletCounters(type); - tabletCounters->ToProto(*proto); + for (auto& bucket : CountersByTabletType.Buckets) { + TWriteGuard guard(bucket.GetLock()); + for (auto& [type, tabletCounters] : bucket.GetMap()) { + auto* proto = counters.FindOrAddTabletCounters(type); + tabletCounters->ToProto(*proto); + } } } void FromProto(NKikimr::NSysView::TDbServiceCounters& counters) override { for (auto& proto : *counters.Proto().MutableTabletCounters()) { auto type = proto.GetType(); - TTabletCountersForTabletType* tabletCounters = {}; - if (type == TTabletTypes::Unknown) { - tabletCounters = &AllTypes; - } else { - tabletCounters = GetOrAddCountersByTabletType(type, CountersByTabletType, SolomonCounters); - } + auto tabletCounters = GetOrAddCounters(type); if (tabletCounters) { if (!tabletCounters->IsInitialized()) { Y_VERIFY(ExecutorCounters.Get()); @@ -1594,7 +1583,9 @@ public: } } if (YdbCounters) { - YdbCounters->Initialize(SolomonCounters, CountersByTabletType); + auto hasDatashard = (bool)GetCounters(TTabletTypes::DataShard); + auto hasSchemeshard = (bool)GetCounters(TTabletTypes::SchemeShard); + YdbCounters->Initialize(SolomonCounters, hasDatashard, hasSchemeshard); YdbCounters->Transform(); } } @@ -1603,25 +1594,66 @@ public: const TTabletCountersBase* appCounters, TTabletTypes::EType type, const TTabletCountersBase* limitedAppCounters) { - AllTypes.Apply(tabletId, executorCounters, nullptr, type); - auto* tabletCounters = GetOrAddCountersByTabletType(type, CountersByTabletType, SolomonCounters); - if (tabletCounters) { - tabletCounters->Apply(tabletId, executorCounters, appCounters, type, limitedAppCounters); + auto allTypes = GetOrAddCounters(TTabletTypes::Unknown); + { + TWriteGuard guard(CountersByTabletType.GetBucketForKey(TTabletTypes::Unknown).GetLock()); + allTypes->Apply(tabletId, executorCounters, nullptr, type); + } + auto typeCounters = GetOrAddCounters(type); + { + TWriteGuard guard(CountersByTabletType.GetBucketForKey(type).GetLock()); + typeCounters->Apply(tabletId, executorCounters, appCounters, type, limitedAppCounters); } } void Forget(ui64 tabletId, TTabletTypes::EType type) { - if (auto it = CountersByTabletType.find(type); it != CountersByTabletType.end()) { - it->second->Forget(tabletId); + auto allTypes = GetCounters(TTabletTypes::Unknown); + if (allTypes) { + TWriteGuard guard(CountersByTabletType.GetBucketForKey(TTabletTypes::Unknown).GetLock()); + allTypes->Forget(tabletId); + } + auto typeCounters = GetCounters(type); + if (typeCounters) { + TWriteGuard guard(CountersByTabletType.GetBucketForKey(type).GetLock()); + typeCounters->Forget(tabletId); + } + } + + void RecalcAll() { + for (auto& bucket : CountersByTabletType.Buckets) { + TWriteGuard guard(bucket.GetLock()); + for (auto& [_, tabletCounters] : bucket.GetMap()) { + tabletCounters->RecalcAll(); + } + } + } + + private: + TTabletCountersForTabletTypePtr GetCounters(TTabletTypes::EType tabletType) { + TTabletCountersForTabletTypePtr res; + CountersByTabletType.Get(tabletType, res); + return res; + } + + TTabletCountersForTabletTypePtr GetOrAddCounters(TTabletTypes::EType tabletType) { + auto res = GetCounters(tabletType); + if (res) { + return res; } + res = CountersByTabletType.InsertIfAbsentWithInit(tabletType, [this, tabletType] { + TString type = (tabletType == TTabletTypes::Unknown) ? + "all" : TTabletTypes::TypeToStr(tabletType); + return MakeIntrusive<TTabletCountersForTabletType>( + SolomonCounters.Get(), "type", type.data()); + }); + return res; } private: ::NMonitoring::TDynamicCounterPtr SolomonCounters; THolder<TTabletCountersBase> ExecutorCounters; - TTabletCountersForTabletType AllTypes; - TCountersByTabletType CountersByTabletType; + TConcurrentRWHashMap<TTabletTypes::EType, TTabletCountersForTabletTypePtr, 16> CountersByTabletType; TYdbTabletCountersPtr YdbCounters; }; @@ -1666,7 +1698,7 @@ private: private: // ::NMonitoring::TDynamicCounterPtr Counters; - TTabletCountersForTabletType AllTypes; + TTabletCountersForTabletTypePtr AllTypes; bool IsFollower = false; typedef THashMap<TPathId, TIntrusivePtr<TTabletCountersForDb>> TCountersByPathId; diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 7bf2b23d89..95f2d8c0e9 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -38,6 +38,7 @@ public: FEATURE_FLAG_SETTER(EnableMoveIndex) FEATURE_FLAG_SETTER(EnableNotNullDataColumns) FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard) + FEATURE_FLAG_SETTER(EnableGrpcAudit) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index ad91e488ec..0e26490418 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -170,6 +170,12 @@ TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescript return nullptr; } + if (op.GetColumnShardCount() == 0) { + status = NKikimrScheme::StatusSchemeError; + errStr = Sprintf("trying to create OLAP store without zero shards"); + return nullptr; + } + op.SetNextSchemaPresetId(1); op.SetNextTtlSettingsPresetId(1); diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index 7d7729b76f..12e2c9bb97 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -451,6 +451,7 @@ private: } if (IsMatchesWildcard(filename, "monitoring*/resources/js/*") || IsMatchesWildcard(filename, "monitoring*/resources/css/*") + || IsMatchesWildcard(filename, "monitoring*/resources/media/*") || IsMatchesWildcard(filename, "monitoring*/resources/assets/fonts/*") || IsMatchesWildcard(filename, "monitoring*/resources/favicon.png")) { auto resPos = filename.find("/resources/"); diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp index 90bcb11c45..616843185e 100644 --- a/ydb/core/ymq/actor/queue_schema.cpp +++ b/ydb/core/ymq/actor/queue_schema.cpp @@ -1065,10 +1065,11 @@ void TCreateQueueSchemaActorV2::MatchQueueAttributes( ) { Become(&TCreateQueueSchemaActorV2::MatchAttributes); + const TString existingQueueName = IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName; TDbQueriesMaker queryMaker( Cfg().GetRoot(), QueuePath_.UserName, - QueuePath_.QueueName, + existingQueueName, currentVersion, IsFifo_, 0, @@ -1085,7 +1086,7 @@ void TCreateQueueSchemaActorV2::MatchQueueAttributes( TParameters(trans->MutableParams()->MutableProto()) .Uint64("QUEUE_ID_NUMBER", currentVersion) .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(currentVersion)) - .Utf8("NAME", IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName) + .Utf8("NAME", existingQueueName) .Bool("FIFO", IsFifo_) .Uint64("SHARDS", RequiredShardsCount_) .Uint64("PARTITIONS", Request_.GetPartitions()) diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt index 9096201adf..a3e2b117d7 100644 --- a/ydb/public/api/protos/CMakeLists.txt +++ b/ydb/public/api/protos/CMakeLists.txt @@ -65,6 +65,11 @@ generate_enum_serilization(api-protos INCLUDE_HEADERS ydb/public/api/protos/draft/datastreams.pb.h ) +generate_enum_serilization(api-protos + ${CMAKE_BINARY_DIR}/ydb/public/api/protos/ydb_topic.pb.h + INCLUDE_HEADERS + ydb/public/api/protos/ydb_topic.pb.h +) target_proto_addincls(api-protos ./ ${CMAKE_SOURCE_DIR}/ diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 99b026acee..44e5d71a23 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -256,6 +256,7 @@ private: NKikimr::NPQ::TMultiCounter SessionsActive; NKikimr::NPQ::TMultiCounter Errors; + std::vector<NKikimr::NPQ::TMultiCounter> CodecCounters; TIntrusivePtr<NACLib::TUserToken> Token; TString Auth; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 357f003ebd..48571cacec 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -490,6 +490,16 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters() SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"SessionsActive"}, false); Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"Errors"}, true); + CodecCounters.push_back(NKikimr::NPQ::TMultiCounter(subGroup, aggr, {{"codec", "user"}}, {"MessagesWrittenByCodec"}, true)); + + auto allNames = GetEnumAllCppNames<Ydb::Topic::Codec>(); + allNames.erase(allNames.begin()); + allNames.pop_back(); + allNames.pop_back(); + for (auto &name : allNames) { + auto nm = to_lower(name).substr(18); + CodecCounters.push_back(NKikimr::NPQ::TMultiCounter(subGroup, aggr, {{"codec", nm}}, {"MessagesWrittenByCodec"}, true)); + } SessionsCreated.Inc(); SessionsActive.Inc(); } @@ -511,6 +521,7 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name"); Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name"); + SessionsCreated.Inc(); SessionsActive.Inc(); } @@ -1388,6 +1399,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx); return false; } + ui32 intCodec = ((ui32)codecID + 1) < CodecCounters.size() ? ((ui32)codecID + 1) : 0; + if (CodecCounters.size() > intCodec) { + CodecCounters[intCodec].Inc(); + } if (data.blocks_message_counts(messageIndex) != 1) { CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex) @@ -1426,6 +1441,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e CloseSession(TStringBuilder() << "bad write request - codec is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx); return false; } + ui32 intCodec = codecID < CodecCounters.size() ? codecID : 0; + if (CodecCounters.size() > intCodec) { + CodecCounters[intCodec].Inc(); + } return true; }; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index d334982ad1..8e9affa949 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -2323,6 +2323,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { { "BytesInflight", "BytesInflightTotal", + "MessagesWrittenByCodec", "Errors", "SessionsActive", "SessionsCreated", diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp index f2e2407cab..9fca08cd0d 100644 --- a/ydb/services/ydb/ydb_logstore_ut.cpp +++ b/ydb/services/ydb/ydb_logstore_ut.cpp @@ -101,6 +101,16 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto res = logStoreClient.DropLogStore("/Root/LogStore").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); } + + // negative + { + NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey()); + THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets; + schemaPresets["default"] = logSchema; + NYdb::NLogStore::TLogStoreDescription storeDescr(0, schemaPresets); + auto res = logStoreClient.CreateLogStore("/Root/LogStore1", std::move(storeDescr)).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SCHEME_ERROR, res.GetIssues().ToString()); + } } Y_UNIT_TEST(LogStoreTiers) { diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema index f55ef6ca43..3d579d78f8 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema @@ -214,6 +214,11 @@ "ColumnType": "String" }, { + "ColumnId": 123, + "ColumnName": "BalancerPolicy", + "ColumnType": "Uint64" + }, + { "ColumnId": 7, "ColumnName": "AllowedNodes", "ColumnType": "String" @@ -286,6 +291,7 @@ 121, 5, 122, + 123, 7, 11, 13, |