aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2022-10-22 22:03:02 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-10-22 22:03:02 +0300
commit11bc4015b8010ae201bf3eb33db7dba425aca35e (patch)
treed588389611dfce6ca3cfaf78152eef9c205ca630
parent0b931ad6e6868bca7e7ec4617999c0d6befd7003 (diff)
downloadydb-11bc4015b8010ae201bf3eb33db7dba425aca35e.tar.gz
Ydb stable 22-4-3122.4.31
x-stable-origin-commit: 2bc59c7eeae4a8f3d396867de193d1375dd388ce
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp190
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp8
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h7
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp15
-rw-r--r--ydb/core/grpc_services/CMakeLists.txt1
-rw-r--r--ydb/core/grpc_services/audit_log.cpp21
-rw-r--r--ydb/core/grpc_services/audit_log.h12
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h10
-rw-r--r--ydb/core/mind/hive/balancer.cpp3
-rw-r--r--ydb/core/mind/hive/hive_events.h5
-rw-r--r--ydb/core/mind/hive/hive_impl.cpp17
-rw-r--r--ydb/core/mind/hive/hive_impl.h12
-rw-r--r--ydb/core/mind/hive/hive_schema.h5
-rw-r--r--ydb/core/mind/hive/hive_statics.cpp11
-rw-r--r--ydb/core/mind/hive/hive_ut.cpp325
-rw-r--r--ydb/core/mind/hive/monitoring.cpp88
-rw-r--r--ydb/core/mind/hive/tablet_info.cpp5
-rw-r--r--ydb/core/mind/hive/tablet_info.h2
-rw-r--r--ydb/core/mind/hive/tx__create_tablet.cpp9
-rw-r--r--ydb/core/node_whiteboard/node_whiteboard.h22
-rw-r--r--ydb/core/persqueue/pq_impl.cpp29
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/protos/hive.proto7
-rw-r--r--ydb/core/tablet/node_whiteboard.cpp33
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.cpp132
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp6
-rw-r--r--ydb/core/viewer/viewer.cpp1
-rw-r--r--ydb/core/ymq/actor/queue_schema.cpp5
-rw-r--r--ydb/public/api/protos/CMakeLists.txt5
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h1
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp19
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp1
-rw-r--r--ydb/services/ydb/ydb_logstore_ut.cpp10
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema6
35 files changed, 876 insertions, 150 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index b246214495..a368b0ea39 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -3,6 +3,7 @@
#include "root_cause.h"
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
+#include <ydb/core/util/stlog.h>
#include <util/generic/ymath.h>
#include <util/system/datetime.h>
@@ -71,6 +72,20 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
TBlobStorageGroupInfo::TVDiskIds VDisks;
bool UseVPatch = false;
+ bool IsGoodPatchedBlobId = false;
+ bool IsAllowedErasure = false;
+
+#define PATCH_LOG(priority, service, marker, msg, ...) \
+ STLOG(priority, service, marker, msg, \
+ (ActorId, SelfId()), \
+ (Group, Info->GroupID), \
+ (DiffCount, DiffCount), \
+ (OriginalBlob, OriginalId), \
+ (PatchedBlob, PatchedId), \
+ (Deadline, Deadline), \
+ (RestartCounter, RestartCounter), \
+ __VA_ARGS__) \
+// PATCH_LOG
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -109,6 +124,10 @@ public:
}
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA02, "ReplyAndDie",
+ (Status, status),
+ (ErrorReason, ErrorReason));
+
std::unique_ptr<TEvBlobStorage::TEvPatchResult> result = std::make_unique<TEvBlobStorage::TEvPatchResult>(status, PatchedId,
StatusFlags, Info->GroupID, ApproximateFreeSpaceShare);
result->ErrorReason = ErrorReason;
@@ -139,6 +158,10 @@ public:
TEvBlobStorage::TEvGetResult *result = ev->Get();
Orbit = std::move(result->Orbit);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA30, "Received TEvGetResult",
+ (Status, result->Status),
+ (ErrorReason, result->ErrorReason));
+
ui32 patchedIdHash = PatchedId.Hash();
bool incorrectCookie = ev->Cookie != patchedIdHash;
bool fail = incorrectCookie
@@ -161,7 +184,6 @@ public:
<< getResponseStatus
<< " GetErrorReason# " << result->ErrorReason;
}
- R_LOG_ERROR_S("BPPA04", ErrorReason);
ReplyAndDie(NKikimrProto::ERROR);
return;
}
@@ -179,6 +201,10 @@ public:
TEvBlobStorage::TEvPutResult *result = ev->Get();
Orbit = std::move(result->Orbit);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA29, "Received TEvPutResult",
+ (Status, result->Status),
+ (ErrorReason, result->ErrorReason));
+
StatusFlags = result->StatusFlags;
ApproximateFreeSpaceShare = result->ApproximateFreeSpaceShare;
@@ -194,7 +220,6 @@ public:
<< " PutStatus# " << NKikimrProto::EReplyStatus_Name(result->Status)
<< " PutErrorReason# " << result->ErrorReason;
}
- R_LOG_ERROR_S("BPPA03", ErrorReason);
ReplyAndDie(NKikimrProto::ERROR);
return;
}
@@ -217,9 +242,11 @@ public:
void Handle(TEvBlobStorage::TEvVMovedPatchResult::TPtr &ev) {
TEvBlobStorage::TEvVMovedPatchResult *result = ev->Get();
- A_LOG_DEBUG_S("BPPA02", "received " << ev->Get()->ToString()
- << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID()));
NKikimrBlobStorage::TEvVMovedPatchResult &record = result->Record;
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA28, "Received TEvVMovedPatchResult",
+ (Status, record.GetStatus()),
+ (ErrorReason, record.GetErrorReason()),
+ (VDiskId, VDiskIDFromVDiskID(record.GetVDiskID())));
PullOutStatusFlagsAndFressSpace(record);
Orbit = std::move(result->Orbit);
@@ -240,10 +267,9 @@ public:
<< " VMovedPatchStatus# " << NKikimrProto::EReplyStatus_Name(record.GetStatus())
<< subErrorReason;
}
- A_LOG_INFO_S("BPPA05", "VMovedPatch failed, NaivePatch started;"
- << " OriginalId# " << OriginalId
- << " PatchedId# " << PatchedId
- << " ErrorReason# " << ErrorReason);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Start Naive strategy from hadling TEvVMovedPatchResult",
+ (Status, record.GetStatus()),
+ (ErrorReason, ErrorReason));
StartNaivePatch();
return;
}
@@ -284,26 +310,22 @@ public:
OkVDisksWithParts.push_back(subgroupIdx);
}
- A_LOG_INFO_S("BPPA07", "received VPatchFoundParts"
- << " Status# " << status
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline
- << " SubgroupIdx# " << (ui32)subgroupIdx
- << " PartsCount# " << record.OriginalPartsSize()
- << " ReceivedFoundParts# " << ReceivedFoundParts << '/' << SendedStarts
- << " ErrorReason# " << errorReason);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA26, "Received VPatchFoundParts",
+ (Status, status),
+ (SubgroupIdx, (ui32)subgroupIdx),
+ (ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())),
+ (ErrorReason, errorReason));
if (ReceivedFoundParts == SendedStarts) {
bool continueVPatch = VerifyPartPlacement();
if (continueVPatch) {
continueVPatch = ContinueVPatch();
} else {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Failed VerifyPartPlacement");
Mon->VPatchPartPlacementVerifyFailed->Inc();
}
if (!continueVPatch) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA27, "Start Fallback strategy from hadling TEvVPatchFoundParts");
StopVPatch();
StartFallback();
}
@@ -321,28 +343,27 @@ public:
errorReason = record.GetErrorReason();
}
- A_LOG_INFO_S("BPPA06", "received VPatchResult"
- << " Status# " << status
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline
- << " ReceivedResults# " << ReceivedResults << '/' << Info->Type.TotalPartCount()
- << " ErrorReason# " << errorReason);
-
Y_VERIFY(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
+ (Status, status),
+ (SubgroupIdx, (ui32)subgroupIdx),
+ (ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())),
+ (ErrorReason, errorReason));
+
bool wasReceived = std::exchange(ReceivedResponseFlags[subgroupIdx], true);
Y_VERIFY(!wasReceived);
if (status != NKikimrProto::OK) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA24, "Start Fallback strategy from handling VPatchResult",
+ (ReceivedResults, TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount()));
StartFallback();
return;
}
if (ReceivedResults == Info->Type.TotalPartCount()) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA25, "Got all succesful responses, make own success response");
ReplyAndDie(NKikimrProto::OK);
}
}
@@ -357,6 +378,7 @@ public:
}
if (countByDC[0] && countByDC[1] && countByDC[2]) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA22, "VerifyPartPlacement {mirror-3-dc} found all 3 disks");
return true;
}
@@ -366,6 +388,8 @@ public:
x2Count++;
}
}
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "VerifyPartPlacement {mirror-3-dc}",
+ (X2Count, x2Count));
return x2Count >= 2;
}
@@ -374,15 +398,21 @@ public:
return VerifyPartPlacementForMirror3dc();
} else {
TSubgroupPartLayout layout;
-
- for (auto &[subgroupIdx, partId] : FoundParts) {
- layout.AddItem(subgroupIdx, partId - 1, Info->Type);
+ for (auto &placement : FoundParts) {
+ PATCH_LOG(PRI_TRACE, BS_PROXY_PATCH, BPPA31, "Get part",
+ (SubgroupIdx, (ui32)placement.VDiskIdxInSubgroup),
+ (PartId, (ui32)placement.PartId));
+ layout.AddItem(placement.VDiskIdxInSubgroup, placement.PartId - 1, Info->Type);
}
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA21, "VerifyPartPlacement",
+ (EffectiveReplicas, layout.CountEffectiveReplicas(Info->Type)),
+ (TotalPartount, Info->Type.TotalPartCount()));
return layout.CountEffectiveReplicas(Info->Type) == Info->Type.TotalPartCount();
}
}
void SendStopDiffs() {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs");
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events;
for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) {
if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) {
@@ -390,6 +420,9 @@ public:
OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx);
ev->SetForceEnd();
events.emplace_back(std::move(ev));
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message",
+ (VDiskIdxInSubgroup, vdiskIdx),
+ (VDiskId, VDisks[vdiskIdx]));
}
}
SendToQueues(events, false);
@@ -456,7 +489,12 @@ public:
for (const TPartPlacement &parity : parityPlacements) {
ev->AddXorReceiver(VDisks[parity.VDiskIdxInSubgroup], parity.PartId);
}
-
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
+ (VDiskIdxInSubgroup, idxInSubgroup),
+ (PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
+ (DiffsForPart, diffsForPart.size()),
+ (ParityPlacements, parityPlacements.size()),
+ (WaitedXorDiffs, waitedXorDiffs));
events.push_back(std::move(ev));
}
SendToQueues(events, false);
@@ -487,12 +525,8 @@ public:
}
void StartMovedPatch() {
- A_LOG_DEBUG_S("BPPA12", "StartMovedPatch"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA09, "Start Moved strategy",
+ (SendedStarts, SendedStarts));
Become(&TThis::MovedPatchState);
ui32 subgroupIdx = 0;
@@ -517,12 +551,7 @@ public:
}
void StartNaivePatch() {
- A_LOG_DEBUG_S("BPPA11", "StartNaivePatch"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA07, "Start Naive strategy");
Become(&TThis::NaiveState);
auto get = std::make_unique<TEvBlobStorage::TEvGet>(OriginalId, 0, OriginalId.BlobSize(), Deadline,
NKikimrBlobStorage::AsyncRead);
@@ -537,8 +566,12 @@ public:
void StartFallback() {
Mon->PatchesWithFallback->Inc();
if (WithMovingPatchRequestToStaticNode && UseVPatch) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback");
StartMovedPatch();
} else {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA06, "Start Naive strategy from fallback",
+ (WithMovingPatchRequestToStaticNode, WithMovingPatchRequestToStaticNode),
+ (UseVPatch, UseVPatch));
StartNaivePatch();
}
}
@@ -560,13 +593,8 @@ public:
SendedStarts++;
}
- A_LOG_DEBUG_S("BPPA08", "StartVPatcn"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline
- << " SendedStarts# " << SendedStarts);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA08, "Start VPatch strategy",
+ (SendedStarts, SendedStarts));
SendToQueues(events, false);
}
@@ -611,6 +639,7 @@ public:
}
bool ContinueVPatchForMirror3dc() {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA10, "Continue VPatch {mirror-3-dc}");
constexpr ui32 DCCount = 3;
constexpr ui32 VDiskByDC = 3;
ui32 countByDC[DCCount] = {0, 0, 0};
@@ -624,9 +653,14 @@ public:
}
if (countByDC[0] && countByDC[1] && countByDC[2]) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA11, "Found disks {mirror-3-dc} on each dc",
+ (DiskFromFirstDC, diskByDC[0][0].ToString()),
+ (DiskFromSecondDC, diskByDC[0][0].ToString()),
+ (DiskFromThirdDC, diskByDC[2][0].ToString()));
SendDiffs({diskByDC[0][0], diskByDC[1][0], diskByDC[2][0]});
return true;
}
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA12, "Didn't find disks {mirror-3-dc} on each dc");
ui32 x2Count = 0;
for (ui32 dcIdx = 0; dcIdx < DCCount; ++dcIdx) {
@@ -635,6 +669,7 @@ public:
}
}
if (x2Count < 2) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA13, "Didn't find disks {mirror-3-dc}");
return false;
}
TStackVec<TPartPlacement, TypicalPartsInBlob> placements;
@@ -645,17 +680,17 @@ public:
}
}
SendDiffs(placements);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA14, "Found disks {mirror-3-dc} x2 mode",
+ (FirstDiskFromFirstDC, placements[0]),
+ (SecondDiskFromFirstDC, placements[1]),
+ (FirstDiskFromSecondDC, placements[2]),
+ (SecondDiskFromSecondDC, placements[3]));
return true;
}
bool ContinueVPatch() {
- A_LOG_DEBUG_S("BPPA09", "ContinueVPatch"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " FoundParts# " << ConvertFoundPartsToString()
- << " Deadline# " << Deadline);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA15, "Continue VPatch strategy",
+ (FoundParts, ConvertFoundPartsToString()));
if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) {
return ContinueVPatchForMirror3dc();
@@ -685,10 +720,11 @@ public:
TStackVec<ui32, TypicalHandoffCount> choosenHandoffForParts(handoffParts.size());
if (handoffParts.size()) {
bool find = FindHandoffs(handoffForParts, handoffParts, &choosenHandoffForParts);
- Y_VERIFY_DEBUG_S(find, "handoffParts# " << FormatList(handoffParts)
- << " FoundParts# " << ConvertFoundPartsToString()
- << " choosenHandoffForParts# " << FormatList(choosenHandoffForParts)
- << " inPrimary# " << FormatList(inPrimary));
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA16, "Find handoff parts",
+ (HandoffParts, FormatList(handoffParts)),
+ (FoundParts, ConvertFoundPartsToString()),
+ (choosenHandoffForParts, FormatList(choosenHandoffForParts)),
+ (IsPrimary, FormatList(inPrimary)));
if (!find) {
Mon->VPatchContinueFailed->Inc();
return false;
@@ -700,12 +736,7 @@ public:
}
void StopVPatch() {
- A_LOG_DEBUG_S("BPPA10", "StopVPatch"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA17, "Stop VPatch strategy");
SendStopDiffs();
ReceivedResponseFlags.assign(VDisks.size(), false);
}
@@ -741,14 +772,8 @@ public:
}
void Bootstrap() {
- A_LOG_INFO_S("BPPA01", "bootstrap"
- << " ActorId# " << SelfId()
- << " Group# " << Info->GroupID
- << " DiffCount# " << DiffCount
- << " OriginalBlob# " << OriginalId
- << " PatchedBlob# " << PatchedId
- << " Deadline# " << Deadline
- << " RestartCounter# " << RestartCounter);
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA01, "Actor bootstrapped");
+
TLogoBlobID truePatchedBlobId = PatchedId;
bool result = true;
if (Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock) {
@@ -770,13 +795,20 @@ public:
return;
}
- bool isAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock
+ IsGoodPatchedBlobId = result;
+ IsAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock
|| Info->Type.GetErasure() == TErasureType::ErasureNone
|| Info->Type.GetErasure() == TErasureType::ErasureMirror3
|| Info->Type.GetErasure() == TErasureType::ErasureMirror3dc;
- if (result && isAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID) {
+ if (IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID) {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA03, "Start VPatch strategy from bootstrap");
StartVPatch();
} else {
+ PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA04, "Start Fallback strategy from bootstrap",
+ (IsGoodPatchedBlobId, IsGoodPatchedBlobId),
+ (IsAllowedErasure, IsAllowedErasure),
+ (UseVPatch, UseVPatch),
+ (IsSameGroup, OriginalGroupId == Info->GroupID));
StartFallback();
}
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 02586ae450..925e46d0d1 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -401,8 +401,14 @@ void TNodeWarden::Handle(TEvStatusUpdate::TPtr ev) {
auto *msg = ev->Get();
const TVSlotId vslotId(msg->NodeId, msg->PDiskId, msg->VSlotId);
if (const auto it = LocalVDisks.find(vslotId); it != LocalVDisks.end() && it->second.Status != msg->Status) {
- it->second.Status = msg->Status;
+ auto& vdisk = it->second;
+ vdisk.Status = msg->Status;
SendDiskMetrics(false);
+
+ if (msg->Status == NKikimrBlobStorage::EVDiskStatus::READY && vdisk.WhiteboardVDiskId) {
+ Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors(*vdisk.WhiteboardVDiskId,
+ vdisk.WhiteboardInstanceGuid, NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors::TDropAllDonors()));
+ }
}
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 9b72c8dfdc..5863448155 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -224,6 +224,12 @@ namespace NKikimr::NStorage {
return MakeBlobStorageVDiskID(NodeId, PDiskId, VDiskSlotId);
}
+ void Serialize(NKikimrBlobStorage::TVSlotId *proto) const {
+ proto->SetNodeId(NodeId);
+ proto->SetPDiskId(PDiskId);
+ proto->SetVSlotId(VDiskSlotId);
+ }
+
auto AsTuple() const { return std::make_tuple(NodeId, PDiskId, VDiskSlotId); }
friend bool operator <(const TVSlotId& x, const TVSlotId& y) { return x.AsTuple() < y.AsTuple(); }
friend bool operator <=(const TVSlotId& x, const TVSlotId& y) { return x.AsTuple() <= y.AsTuple(); }
@@ -253,6 +259,7 @@ namespace NKikimr::NStorage {
// Last VDiskId reported to Node Whiteboard.
std::optional<TVDiskID> WhiteboardVDiskId;
+ ui64 WhiteboardInstanceGuid;
bool SlayInFlight = false;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
index 8e6bc26986..22a59222f8 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
@@ -175,6 +175,7 @@ namespace NKikimr::NStorage {
Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(vdiskId, groupInfo->GetStoragePoolName(),
vslotId.PDiskId, vslotId.VDiskSlotId, pdiskGuid, kind, donorMode, whiteboardInstanceGuid, std::move(donors)));
vdisk.WhiteboardVDiskId.emplace(vdiskId);
+ vdisk.WhiteboardInstanceGuid = whiteboardInstanceGuid;
// create an actor
auto *as = TActivationContext::ActorSystem();
@@ -281,9 +282,19 @@ namespace NKikimr::NStorage {
void TNodeWarden::Handle(TEvBlobStorage::TEvDropDonor::TPtr ev) {
auto *msg = ev->Get();
- STLOG(PRI_INFO, BS_NODE, NW34, "TEvDropDonor", (VSlotId, TVSlotId(msg->NodeId, msg->PDiskId, msg->VSlotId)),
- (VDiskId, msg->VDiskId));
+ const TVSlotId vslotId(msg->NodeId, msg->PDiskId, msg->VSlotId);
+ STLOG(PRI_INFO, BS_NODE, NW34, "TEvDropDonor", (VSlotId, vslotId), (VDiskId, msg->VDiskId));
SendDropDonorQuery(msg->NodeId, msg->PDiskId, msg->VSlotId, msg->VDiskId);
+
+ if (const auto it = LocalVDisks.find(vslotId); it != LocalVDisks.end()) {
+ const auto& vdisk = it->second;
+ if (vdisk.WhiteboardVDiskId) {
+ NKikimrBlobStorage::TVSlotId id;
+ vslotId.Serialize(&id);
+ Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskDropDonors(*vdisk.WhiteboardVDiskId,
+ vdisk.WhiteboardInstanceGuid, {id}));
+ }
+ }
}
void TNodeWarden::UpdateGroupInfoForDisk(TVDiskRecord& vdisk, const TIntrusivePtr<TBlobStorageGroupInfo>& newInfo) {
diff --git a/ydb/core/grpc_services/CMakeLists.txt b/ydb/core/grpc_services/CMakeLists.txt
index 9299bc2887..626dda12a5 100644
--- a/ydb/core/grpc_services/CMakeLists.txt
+++ b/ydb/core/grpc_services/CMakeLists.txt
@@ -53,6 +53,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
cpp-client-resources
)
target_sources(ydb-core-grpc_services PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_helper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/grpc_mon.cpp
diff --git a/ydb/core/grpc_services/audit_log.cpp b/ydb/core/grpc_services/audit_log.cpp
new file mode 100644
index 0000000000..8d353d0517
--- /dev/null
+++ b/ydb/core/grpc_services/audit_log.cpp
@@ -0,0 +1,21 @@
+#include "defs.h"
+#include "audit_log.h"
+
+#include "base/base.h"
+
+namespace NKikimr {
+namespace NGRpcService {
+
+void AuditLog(const IRequestProxyCtx* reqCtx, const TString& database,
+ const TString& subject, const TActorContext& ctx)
+{
+ LOG_NOTICE_S(ctx, NKikimrServices::GRPC_SERVER, "AUDIT: "
+ << "request name: " << reqCtx->GetRequestName()
+ << ", database: " << database
+ << ", peer: " << reqCtx->GetPeerName()
+ << ", subject: " << subject);
+}
+
+}
+}
+
diff --git a/ydb/core/grpc_services/audit_log.h b/ydb/core/grpc_services/audit_log.h
new file mode 100644
index 0000000000..47742db4ee
--- /dev/null
+++ b/ydb/core/grpc_services/audit_log.h
@@ -0,0 +1,12 @@
+#pragma once
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestProxyCtx;
+
+void AuditLog(const IRequestProxyCtx* reqCtx, const TString& database,
+ const TString& subject, const TActorContext& ctx);
+
+}
+}
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index 715172355c..fe0902b4e4 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include "audit_log.h"
#include "service_ratelimiter_events.h"
#include "local_rate_limiter.h"
#include "operation_helpers.h"
@@ -112,6 +113,10 @@ public:
}
}
+ if (AppData(ctx)->FeatureFlags.GetEnableGrpcAudit()) {
+ AuditLog(GrpcRequestBaseCtx_, CheckedDatabaseName_, GetSubject(), ctx);
+ }
+
// Simple rps limitation
static NRpcService::TRlConfig rpsRlConfig(
"serverless_rt_coordination_node_path",
@@ -205,6 +210,11 @@ public:
}
private:
+ TString GetSubject() const {
+ const auto sid = TBase::GetUserSID();
+ return sid ? sid : "no subject";
+ }
+
static NYql::TIssues GetRlIssues(const Ydb::RateLimiter::AcquireResourceResponse& resp) {
NYql::TIssues opIssues;
NYql::IssuesFromMessage(resp.operation().issues(), opIssues);
diff --git a/ydb/core/mind/hive/balancer.cpp b/ydb/core/mind/hive/balancer.cpp
index 8521e30499..9106fceccd 100644
--- a/ydb/core/mind/hive/balancer.cpp
+++ b/ydb/core/mind/hive/balancer.cpp
@@ -146,6 +146,8 @@ protected:
if (RecheckOnFinish && MaxMovements != 0 && Movements >= MaxMovements) {
BLOG_D("Balancer initiated recheck");
Hive->ProcessTabletBalancer();
+ } else {
+ Send(Hive->SelfId(), new TEvPrivate::TEvBalancerOut());
}
return IActor::PassAway();
}
@@ -239,6 +241,7 @@ protected:
tablets.emplace_back(tablet);
}
}
+ BLOG_TRACE("Balancer on node " << node->Id << ": " << tablets.size() << "/" << nodeTablets.size() << " tablets is suitable for balancing");
if (!tablets.empty()) {
switch (Hive->GetTabletBalanceStrategy()) {
case NKikimrConfig::THiveConfig::HIVE_TABLET_BALANCE_STRATEGY_OLD_WEIGHTED_RANDOM:
diff --git a/ydb/core/mind/hive/hive_events.h b/ydb/core/mind/hive/hive_events.h
index 9e26edc7a8..b846a32cbe 100644
--- a/ydb/core/mind/hive/hive_events.h
+++ b/ydb/core/mind/hive/hive_events.h
@@ -23,6 +23,7 @@ struct TEvPrivate {
EvUnlockTabletReconnectTimeout,
EvProcessPendingOperations,
EvRestartComplete,
+ EvBalancerOut,
EvEnd
};
@@ -37,7 +38,7 @@ struct TEvPrivate {
};
struct TEvProcessBootQueue : TEventLocal<TEvProcessBootQueue, EvProcessBootQueue> {};
-
+
struct TEvPostponeProcessBootQueue : TEventLocal<TEvPostponeProcessBootQueue, EvPostponeProcessBootQueue> {};
struct TEvProcessDisconnectNode : TEventLocal<TEvProcessDisconnectNode, EvProcessDisconnectNode> {
@@ -78,6 +79,8 @@ struct TEvPrivate {
};
struct TEvProcessPendingOperations : TEventLocal<TEvProcessPendingOperations, EvProcessPendingOperations> {};
+
+ struct TEvBalancerOut : TEventLocal<TEvBalancerOut, EvBalancerOut> {};
};
} // NHive
diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp
index efd7f36bb1..2dfb33dbb9 100644
--- a/ydb/core/mind/hive/hive_impl.cpp
+++ b/ydb/core/mind/hive/hive_impl.cpp
@@ -308,6 +308,10 @@ void THive::Handle(TEvPrivate::TEvProcessPendingOperations::TPtr&) {
BLOG_D("Handle ProcessPendingOperations");
}
+void THive::Handle(TEvPrivate::TEvBalancerOut::TPtr&) {
+ BLOG_D("Handle BalancerOut");
+}
+
void THive::Handle(TEvHive::TEvBootTablet::TPtr& ev) {
TTabletId tabletId = ev->Get()->Record.GetTabletID();
TTabletInfo* tablet = FindTablet(tabletId);
@@ -554,6 +558,14 @@ void THive::BuildCurrentConfig() {
for (const NKikimrConfig::THiveTabletPreference& tabletPreference : CurrentConfig.GetDefaultTabletPreference()) {
DefaultDataCentersPreference[tabletPreference.GetType()] = tabletPreference.GetDataCentersPreference();
}
+ BalancerIgnoreTabletTypes.clear();
+ for (auto i : CurrentConfig.GetBalancerIgnoreTabletTypes()) {
+ const auto type = TTabletTypes::EType(i);
+ if (IsValidTabletType(type)) {
+ BalancerIgnoreTabletTypes.emplace_back(type);
+ }
+ }
+ MakeTabletTypeSet(BalancerIgnoreTabletTypes);
}
void THive::Cleanup() {
@@ -1612,6 +1624,7 @@ void THive::FillTabletInfo(NKikimrHive::TEvResponseHiveInfo& response, ui64 tabl
auto& tabletInfo = *response.AddTablets();
tabletInfo.SetTabletID(tabletId);
tabletInfo.SetTabletType(info->Type);
+ tabletInfo.SetObjectId(info->ObjectId);
tabletInfo.SetState(static_cast<ui32>(info->State));
tabletInfo.SetTabletBootMode(info->BootMode);
tabletInfo.SetVolatileState(info->GetVolatileState());
@@ -1620,6 +1633,9 @@ void THive::FillTabletInfo(NKikimrHive::TEvResponseHiveInfo& response, ui64 tabl
tabletInfo.MutableTabletOwner()->SetOwnerIdx(info->Owner.second);
tabletInfo.SetGeneration(info->KnownGeneration);
tabletInfo.MutableObjectDomain()->CopyFrom(info->ObjectDomain);
+ if (info->BalancerPolicy != NKikimrHive::EBalancerPolicy::POLICY_BALANCE) {
+ tabletInfo.SetBalancerPolicy(info->BalancerPolicy);
+ }
if (!info->IsRunning()) {
tabletInfo.SetLastAliveTimestamp(info->Statistics.GetLastAliveTimestamp());
}
@@ -2437,6 +2453,7 @@ STFUNC(THive::StateWork) {
hFunc(NSysView::TEvSysView::TEvGetTabletsRequest, Handle);
hFunc(TEvHive::TEvRequestTabletOwners, Handle);
hFunc(TEvHive::TEvTabletOwnersReply, Handle);
+ hFunc(TEvPrivate::TEvBalancerOut, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
BLOG_W("THive::StateWork unhandled event type: " << ev->GetTypeRewrite()
diff --git a/ydb/core/mind/hive/hive_impl.h b/ydb/core/mind/hive/hive_impl.h
index 85f6ea922d..ad25889305 100644
--- a/ydb/core/mind/hive/hive_impl.h
+++ b/ydb/core/mind/hive/hive_impl.h
@@ -140,6 +140,8 @@ TString GetConditionalRedString(const TString& str, bool condition);
TString GetDataCenterName(ui64 dataCenterId);
TString LongToShortTabletName(const TString& longTabletName);
TString GetLocationString(const NActors::TNodeLocation& location);
+void MakeTabletTypeSet(std::vector<TTabletTypes::EType>& list);
+bool IsValidTabletType(TTabletTypes::EType type);
class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveSharedSettings {
public:
@@ -404,6 +406,9 @@ protected:
std::unordered_map<TDataCenterId, std::unordered_set<TNodeId>> RegisteredDataCenterNodes;
std::unordered_set<TNodeId> ConnectedNodes;
+ // normalized to be sorted list of unique values
+ std::vector<TTabletTypes::EType> BalancerIgnoreTabletTypes; // built from CurrentConfig
+
// to be removed later
bool TabletOwnersSynced = false;
// to be removed later
@@ -482,6 +487,7 @@ protected:
void Handle(TEvPrivate::TEvProcessTabletBalancer::TPtr&);
void Handle(TEvPrivate::TEvUnlockTabletReconnectTimeout::TPtr&);
void Handle(TEvPrivate::TEvProcessPendingOperations::TPtr&);
+ void Handle(TEvPrivate::TEvBalancerOut::TPtr&);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev);
void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev);
@@ -748,6 +754,12 @@ public:
return initialMaximum;
}
+ bool IsInBalancerIgnoreList(TTabletTypes::EType type) const {
+ const auto& ignoreList = BalancerIgnoreTabletTypes;
+ auto found = std::find(ignoreList.begin(), ignoreList.end(), type);
+ return (found != ignoreList.end());
+ }
+
static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
static bool IsSystemTablet(TTabletTypes::EType type);
diff --git a/ydb/core/mind/hive/hive_schema.h b/ydb/core/mind/hive/hive_schema.h
index 2b664b6fdc..8fdd9cf6c7 100644
--- a/ydb/core/mind/hive/hive_schema.h
+++ b/ydb/core/mind/hive/hive_schema.h
@@ -93,6 +93,8 @@ struct Schema : NIceDb::Schema {
struct DataCentersPreference : Column<121, NScheme::NTypeIds::String> { using Type = NKikimrHive::TDataCentersPreference; };
struct AllowedDataCenterIds : Column<122, NScheme::NTypeIds::String> { using Type = TVector<TString>; };
+ struct BalancerPolicy : Column<123, NScheme::NTypeIds::Uint64> { using Type = NKikimrHive::EBalancerPolicy; static constexpr NKikimrHive::EBalancerPolicy Default = NKikimrHive::EBalancerPolicy::POLICY_BALANCE; };
+
using TKey = TableKey<ID>;
using TColumns = TableColumns<
ID,
@@ -118,7 +120,8 @@ struct Schema : NIceDb::Schema {
ReassignReason,
Statistics,
DataCentersPreference,
- AllowedDataCenterIds
+ AllowedDataCenterIds,
+ BalancerPolicy
>;
};
diff --git a/ydb/core/mind/hive/hive_statics.cpp b/ydb/core/mind/hive/hive_statics.cpp
index 1f5e264393..4eae631b94 100644
--- a/ydb/core/mind/hive/hive_statics.cpp
+++ b/ydb/core/mind/hive/hive_statics.cpp
@@ -340,5 +340,16 @@ TString GetLocationString(const NActors::TNodeLocation& location) {
return proto.ShortDebugString();
}
+void MakeTabletTypeSet(std::vector<TTabletTypes::EType>& list) {
+ std::sort(list.begin(), list.end());
+ list.erase(std::unique(list.begin(), list.end()), list.end());
+}
+
+bool IsValidTabletType(TTabletTypes::EType type) {
+ return (type > TTabletTypes::Unknown
+ && type < TTabletTypes::Reserved40
+ );
+}
+
} // NHive
} // NKikimr
diff --git a/ydb/core/mind/hive/hive_ut.cpp b/ydb/core/mind/hive/hive_ut.cpp
index e20b07b98a..24ecde610e 100644
--- a/ydb/core/mind/hive/hive_ut.cpp
+++ b/ydb/core/mind/hive/hive_ut.cpp
@@ -20,6 +20,8 @@
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/mediator/mediator.h>
+#include <ydb/core/mind/hive/hive_events.h>
+
#include <library/cpp/actors/interconnect/interconnect_impl.h>
#include <library/cpp/malloc/api/malloc.h>
@@ -245,7 +247,7 @@ namespace {
}
}
- void SetupServices(TTestActorRuntime &runtime, bool isLocalEnabled) {
+ void SetupServices(TTestActorRuntime &runtime, bool isLocalEnabled, const std::function<void(TAppPrepare&)> & appConfigSetup) {
TAppPrepare app;
SetupDomainInfo(runtime, app);
@@ -255,6 +257,10 @@ namespace {
app.SetRequestSequenceSize(10);
app.SetHiveStoragePoolFreshPeriod(0);
+ if (appConfigSetup) {
+ appConfigSetup(app);
+ }
+
SetupNodeWarden(runtime);
SetupPDisk(runtime);
@@ -352,12 +358,12 @@ namespace {
UNIT_ASSERT(configureResponse->Record.GetResponse().GetSuccess());
}
- void Setup(TTestActorRuntime& runtime, bool isLocalEnabled = true, ui32 numGroups = 1) {
+ void Setup(TTestActorRuntime& runtime, bool isLocalEnabled = true, ui32 numGroups = 1, const std::function<void(TAppPrepare&)> & appConfigSetup = nullptr) {
using namespace NMalloc;
TMallocInfo mallocInfo = MallocInfo();
mallocInfo.SetParam("FillMemoryOnAllocation", "false");
SetupLogging(runtime);
- SetupServices(runtime, isLocalEnabled);
+ SetupServices(runtime, isLocalEnabled, appConfigSetup);
SetupBoxAndStoragePool(runtime, numGroups);
}
@@ -3596,6 +3602,319 @@ Y_UNIT_TEST_SUITE(THiveTest) {
}
}
+ Y_UNIT_TEST(TestHiveBalancerIgnoreTablet) {
+ // Test plan:
+ // - create configuration where:
+ // - there is single node which run several tablets with different BalancerPolicy
+ // - and all tablets report very high resource usage
+ // (so that balancer wants to unload the node but have no space to move tablets to)
+ // - then add enough empty nodes
+ // - test that balancer moved out all tablets except those with BalancerPolicy=BALANCER_IGNORE
+ // - change BalancerPolicy to BALANCER_BALANCE for all remaining tablets
+ // - test that balancer also moved out former BALANCER_IGNORE tablets
+ //
+ static const int NUM_NODES = 4;
+ static const int NUM_TABLETS = 3;
+ static const ui64 SINGLE_TABLET_NETWORK_USAGE = 5000000;
+
+ TTestBasicRuntime runtime(NUM_NODES, false);
+
+ Setup(runtime, true, 1, [](TAppPrepare& app) {
+ app.HiveConfig.SetMaxMovementsOnAutoBalancer(100);
+ app.HiveConfig.SetMinPeriodBetweenBalance(0.1);
+ app.HiveConfig.SetTabletKickCooldownPeriod(0);
+ app.HiveConfig.SetResourceChangeReactionPeriod(0);
+ // this value of MaxNodeUsageToKick is selected specifically to make test scenario work
+ // in link with number of tablets and values of network usage metrics used below
+ app.HiveConfig.SetMaxNodeUsageToKick(0.01);
+ });
+
+ TActorId senderA = runtime.AllocateEdgeActor();
+ const ui64 hiveTablet = MakeDefaultHiveID(0);
+
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive);
+
+ // wait for creation of nodes
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvLocal::EvStatus, NUM_NODES);
+ runtime.DispatchEvents(options);
+ }
+
+ // stop all but one local services to emulate single node configuration
+ for (int i = 1; i < NUM_NODES; ++i) {
+ SendKillLocal(runtime, i);
+ }
+
+ struct TTabletMiniInfo {
+ ui64 TabletId;
+ ui64 ObjectId;
+ ui32 NodeIndex;
+ NKikimrHive::EBalancerPolicy BalancerPolicy;
+ };
+ auto getTabletInfos = [&runtime, senderA] (ui64 hiveTablet) {
+ runtime.SendToPipe(hiveTablet, senderA, new TEvHive::TEvRequestHiveInfo());
+ TAutoPtr<IEventHandle> handle;
+ TEvHive::TEvResponseHiveInfo* response = runtime.GrabEdgeEventRethrow<TEvHive::TEvResponseHiveInfo>(handle);
+ const int nodeBase = runtime.GetNodeId(0);
+ std::vector<TTabletMiniInfo> tabletInfos;
+ for (const NKikimrHive::TTabletInfo& tablet : response->Record.GetTablets()) {
+ int nodeIndex = (int)tablet.GetNodeID() - nodeBase;
+ UNIT_ASSERT_C(nodeIndex >= 0 && nodeIndex < NUM_NODES, "nodeId# " << tablet.GetNodeID() << " nodeBase# " << nodeBase);
+ tabletInfos.push_back({tablet.GetTabletID(), tablet.GetObjectId(), tablet.GetNodeID() - nodeBase, tablet.GetBalancerPolicy()});
+ }
+ std::reverse(tabletInfos.begin(), tabletInfos.end());
+ return tabletInfos;
+ };
+ auto reportTabletMetrics = [&runtime, senderA, hiveTablet](ui64 tabletId, ui64 network, bool sync) {
+ THolder<TEvHive::TEvTabletMetrics> metrics = MakeHolder<TEvHive::TEvTabletMetrics>();
+ NKikimrHive::TTabletMetrics* metric = metrics->Record.AddTabletMetrics();
+ metric->SetTabletID(tabletId);
+ metric->MutableResourceUsage()->SetNetwork(network);
+
+ runtime.SendToPipe(hiveTablet, senderA, metrics.Release());
+
+ if (sync) {
+ TAutoPtr<IEventHandle> handle;
+ auto* response = runtime.GrabEdgeEvent<TEvLocal::TEvTabletMetricsAck>(handle);
+ Y_UNUSED(response);
+ }
+ };
+
+ const ui64 testerTablet = MakeDefaultHiveID(1);
+ const TTabletTypes::EType tabletType = TTabletTypes::Dummy;
+
+ Ctest << "Step A: create tablets" << Endl;
+
+ // create NUM_TABLETS tablets, some with BalancerPolicy set to "ignore"
+ for (int i = 0; i < NUM_TABLETS; ++i) {
+ THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, tabletType, BINDED_CHANNELS));
+ ev->Record.SetObjectId(i);
+ switch (i % NUM_TABLETS) {
+ case 0: // policy not explicitly set
+ break;
+ case 1: // policy explicitly set to default value
+ ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_BALANCE);
+ break;
+ case 2: // policy explicitly set to ignore
+ ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_IGNORE);
+ break;
+ }
+ ui64 tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, true);
+ MakeSureTabletIsUp(runtime, tabletId, 0);
+ }
+
+ Ctest << "Step A: get tablets info" << Endl;
+ auto tabletInfos_A = getTabletInfos(hiveTablet);
+
+ // check that tablets retain their BalancerPolicy flags...
+ for (const auto& i : tabletInfos_A) {
+ Ctest << "Step A: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl;
+ switch (i.ObjectId % NUM_TABLETS) {
+ case 0:
+ case 1:
+ UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_BALANCE, "objectId# " << i.ObjectId << " value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy));
+ break;
+ case 2:
+ UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_IGNORE, "value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy));
+ break;
+ }
+ }
+ // ...and that all tablets are distributed on a single node
+ {
+ std::array<int, NUM_NODES> nodeTablets = {};
+ for (auto& i : tabletInfos_A) {
+ ++nodeTablets[i.NodeIndex];
+ }
+ Ctest << "Step A: tablet distribution";
+ for (auto i : nodeTablets) {
+ Ctest << " " << i;
+ }
+ Ctest << Endl;
+ auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end());
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0);
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.second, NUM_TABLETS);
+ }
+
+ Ctest << "Step B: report tablets metrics" << Endl;
+
+ // report raised tablet metrics (to kickoff the balancer)
+ for (const auto& i: tabletInfos_A) {
+ reportTabletMetrics(i.TabletId, SINGLE_TABLET_NETWORK_USAGE, true);
+ }
+
+ Ctest << "Step B: wait for balancer to complete" << Endl;
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut);
+ runtime.DispatchEvents(options, TDuration::Seconds(10));
+ }
+
+ Ctest << "Step B: get tablets info" << Endl;
+ auto tabletInfos_B = getTabletInfos(hiveTablet);
+
+ // check that all tablet are still on a single node
+ {
+ std::array<int, NUM_NODES> nodeTablets = {};
+ for (auto& i : tabletInfos_B) {
+ ++nodeTablets[i.NodeIndex];
+ }
+ Ctest << "Step B: tablet distribution";
+ for (auto i : nodeTablets) {
+ Ctest << " " << i;
+ }
+ Ctest << Endl;
+ auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end());
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0);
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.second, NUM_TABLETS);
+ }
+
+ Ctest << "Step C: add empty nodes" << Endl;
+ for (int i = 1; i < NUM_NODES; ++i) {
+ CreateLocal(runtime, i);
+ }
+ // wait for creation of nodes
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvLocal::EvStatus, NUM_NODES - 1);
+ runtime.DispatchEvents(options);
+ }
+
+ Ctest << "Step C: touch tablets metrics" << Endl;
+ // touch tablet metrics (to kickoff the balancer)
+ for (const auto& i: tabletInfos_B) {
+ reportTabletMetrics(i.TabletId, 0, true);
+ }
+
+ Ctest << "Step C: wait for balancer to complete" << Endl;
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut);
+ runtime.DispatchEvents(options, TDuration::Seconds(10));
+ }
+
+ Ctest << "Step C: get tablets info" << Endl;
+ auto tabletInfos_C = getTabletInfos(hiveTablet);
+
+ // check that ignored tablets stayed as they are...
+ for (const auto& i : tabletInfos_C) {
+ Ctest << "Step C: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl;
+ switch (i.ObjectId % NUM_TABLETS) {
+ case 0:
+ case 1:
+ break;
+ case 2:
+ UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_IGNORE, "value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy));
+ ui32 oldNodeIndex = tabletInfos_B[i.ObjectId].NodeIndex;
+ ui32 newNodeIndex = i.NodeIndex;
+ UNIT_ASSERT_VALUES_EQUAL(oldNodeIndex, newNodeIndex);
+ break;
+ }
+ }
+ // ...but ordinary tablets did move out to other nodes
+ {
+ std::array<int, NUM_NODES> nodeTablets = {};
+ for (auto& i : tabletInfos_C) {
+ ++nodeTablets[i.NodeIndex];
+ }
+ Ctest << "Step C: tablet distribution";
+ for (auto i : nodeTablets) {
+ Ctest << " " << i;
+ }
+ Ctest << Endl;
+ auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end());
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0);
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.second, 1);
+ UNIT_ASSERT_VALUES_EQUAL(nodeTablets[0], 1);
+ }
+
+ Ctest << "Step D: change tablets BalancerPolicy" << Endl;
+
+ // set all tablets with BalancerPolicy "ignore" back to "balance"
+ for (int i = 0; i < NUM_TABLETS; ++i) {
+ switch(i % NUM_TABLETS) {
+ case 0:
+ case 1:
+ break;
+ case 2:
+ THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, tabletType, BINDED_CHANNELS));
+ ev->Record.SetObjectId(i);
+ ev->Record.SetBalancerPolicy(NKikimrHive::EBalancerPolicy::POLICY_BALANCE);
+ ui64 tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, false);
+ Y_UNUSED(tabletId);
+ break;
+ }
+ }
+
+ Ctest << "Step D: get tablets info" << Endl;
+ auto tabletInfos_D = getTabletInfos(hiveTablet);
+
+ // check that all BalancerPolicy "ignore" flags are dropped
+ for (const auto& i : tabletInfos_D) {
+ Ctest << "Step D: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl;
+ UNIT_ASSERT_EQUAL_C(i.BalancerPolicy, NKikimrHive::EBalancerPolicy::POLICY_BALANCE, "objectId# " << i.ObjectId << " value# " << (ui64)i.BalancerPolicy << " name# " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy));
+ }
+
+ Ctest << "Step D: raise metrics for previously ignored tablets" << Endl;
+ for (const auto& i: tabletInfos_D) {
+ switch(i.ObjectId % NUM_TABLETS) {
+ case 0:
+ case 1:
+ break;
+ case 2:
+ reportTabletMetrics(i.TabletId, NUM_TABLETS * SINGLE_TABLET_NETWORK_USAGE, true);
+ break;
+ }
+ }
+
+ Ctest << "Step D: wait for balancer to complete" << Endl;
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(NHive::TEvPrivate::EvBalancerOut);
+ runtime.DispatchEvents(options, TDuration::Seconds(10));
+ }
+
+ Ctest << "Step E: get tablets info" << Endl;
+ auto tabletInfos_E = getTabletInfos(hiveTablet);
+
+ // check that (some) former ignored tablets have moved now...
+ {
+ bool ignoredTabletsAreMoved = false;
+ for (const auto& i : tabletInfos_E) {
+ Ctest << "Step E: tablet index " << i.ObjectId << ", tablet id " << i.TabletId << ", node index " << i.NodeIndex << ", balancer policy " << NKikimrHive::EBalancerPolicy_Name(i.BalancerPolicy) << Endl;
+ switch (i.ObjectId % NUM_TABLETS) {
+ case 0:
+ case 1:
+ break;
+ case 2:
+ ui32 oldNodeIndex = tabletInfos_A[i.ObjectId].NodeIndex;
+ ui32 newNodeIndex = i.NodeIndex;
+ if (oldNodeIndex != newNodeIndex) {
+ ignoredTabletsAreMoved = true;
+ }
+ break;
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(ignoredTabletsAreMoved, true);
+ }
+ // ...and that the original node is completely void of tablets
+ {
+ std::array<int, NUM_NODES> nodeTablets = {};
+ for (auto& i : tabletInfos_E) {
+ ++nodeTablets[i.NodeIndex];
+ }
+ Ctest << "Step E: tablet distribution";
+ for (auto i : nodeTablets) {
+ Ctest << " " << i;
+ }
+ Ctest << Endl;
+ auto minmax = std::minmax_element(nodeTablets.begin(), nodeTablets.end());
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.first, 0);
+ UNIT_ASSERT_VALUES_EQUAL(*minmax.second, 1);
+ UNIT_ASSERT_VALUES_EQUAL(nodeTablets[0], 0);
+ }
+ }
+
Y_UNIT_TEST(TestRestartTablets) {
TTestBasicRuntime runtime(3, false);
Setup(runtime, true);
diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp
index ba1fea2c46..8770c39c4d 100644
--- a/ydb/core/mind/hive/monitoring.cpp
+++ b/ydb/core/mind/hive/monitoring.cpp
@@ -730,6 +730,29 @@ public:
UpdateConfig(db, "NodeSelectStrategy");
UpdateConfig(db, "CheckMoveExpediency");
+ if (params.contains("BalancerIgnoreTabletTypes")) {
+ TVector<TString> tabletTypeNames = SplitString(params.Get("BalancerIgnoreTabletTypes"), ";");
+ std::vector<TTabletTypes::EType> newTypeList;
+ for (const auto& name : tabletTypeNames) {
+ TTabletTypes::EType type = TTabletTypes::StrToType(Strip(name));
+ if (IsValidTabletType(type)) {
+ newTypeList.emplace_back(type);
+ }
+ }
+ MakeTabletTypeSet(newTypeList);
+ if (newTypeList != Self->BalancerIgnoreTabletTypes) {
+ // replace DatabaseConfig.BalancerIgnoreTabletTypes inplace
+ auto* field = Self->DatabaseConfig.MutableBalancerIgnoreTabletTypes();
+ field->Reserve(newTypeList.size());
+ field->Clear();
+ for (auto i : newTypeList) {
+ field->Add(i);
+ }
+ ChangeRequest = true;
+ // Self->BalancerIgnoreTabletTypes will be replaced by Self->BuildCurrentConfig()
+ }
+ }
+
if (ChangeRequest) {
Self->BuildCurrentConfig();
db.Table<Schema::State>().Key(TSchemeIds::State::DefaultState).Update<Schema::State::Config>(Self->DatabaseConfig);
@@ -880,6 +903,65 @@ public:
}
}
+ void ShowConfigForBalancerIgnoreTabletTypes(IOutputStream& out) {
+ // value of protobuf type "repeated field of ETabletTypes::EType"
+ // is represented as a single string build from list delimited type names
+
+ auto makeListString = [] (const NKikimrConfig::THiveConfig& config) {
+ std::vector<TTabletTypes::EType> types;
+ for (auto i : config.GetBalancerIgnoreTabletTypes()) {
+ const auto type = TTabletTypes::EType(i);
+ if (IsValidTabletType(type)) {
+ types.emplace_back(type);
+ }
+ }
+ MakeTabletTypeSet(types);
+ TVector<TString> names;
+ for (auto i : types) {
+ names.emplace_back(TTabletTypes::TypeToStr(i));
+ }
+ return JoinStrings(names, ";");
+ };
+
+ const TString param("BalancerIgnoreTabletTypes");
+
+ NKikimrConfig::THiveConfig builtinConfig;
+ auto builtinDefault = makeListString(builtinConfig);
+ auto clusterDefault = makeListString(Self->ClusterConfig);
+ auto currentValue = makeListString(Self->CurrentConfig);
+
+ bool localOverrided = (currentValue != clusterDefault);
+
+ out << "<div class='row'>";
+ {
+ // mark if value is changed locally
+ out << "<div class='col-sm-3' style='padding-top:12px;text-align:right'>"
+ << "<label for='" << param << "'"
+ << (localOverrided ? "" : "' style='font-weight:normal'")
+ << ">" << param << ":</label>"
+ << "</div>";
+ // editable current value
+ out << "<div class='col-sm-2' style='padding-top:5px'>"
+ << "<input id='" << param << "' style='max-width:170px;margin-top:7px' onkeydown='edit(this);' onchange='edit(this);'"
+ << " value='" << currentValue << "'>"
+ << "</div>";
+ // apply button
+ out << "<div class='col-sm-1'><button type='button' class='btn' style='margin-top:5px' onclick='applyVal(this, \"" << param << "\");' disabled='true'>Apply</button></div>";
+ // reset button
+ out << "<div class='col-sm-1'><button type='button' class='btn' style='margin-top:5px' onclick='resetVal(this, \"" << param << "\");' " << (localOverrided ? "" : "disabled='true'") << ">Reset</button></div>";
+ // show cluster default
+ out << "<div id='CMS" << param << "' class='col-sm-2' style='padding-top:12px'>"
+ << clusterDefault
+ << "</div>";
+ // show builtin default
+ out << "<div id='Default" << param << "' class='col-sm-2' style='padding-top:12px'>"
+ << builtinDefault
+ << "</div>";
+ }
+ out << "</div>";
+ }
+
+
void RenderHTMLPage(IOutputStream& out, const TActorContext&/* ctx*/) {
out << "<head></head><body>";
out << "<script>$('.container > h2').html('Settings');</script>";
@@ -916,6 +998,7 @@ public:
ShowConfig(out, "MaxMovementsOnAutoBalancer");
ShowConfig(out, "ContinueAutoBalancer");
ShowConfig(out, "CheckMoveExpediency");
+ ShowConfigForBalancerIgnoreTabletTypes(out);
out << "<div class='row' style='margin-top:40px'>";
out << "<div class='col-sm-2' style='padding-top:30px;text-align:right'><label for='allowedMetrics'>AllowedMetrics:</label></div>";
@@ -956,10 +1039,8 @@ public:
}
out << "</table></div>";
out << "<div class='col-sm-2' style='padding-top:22px'><button type='button' class='btn' style='margin-top:5px' onclick='applyTab(this);' disabled='true'>Apply</button></div>";
-
out << "</div>";
-
out << "</div>";
out << R"___(
@@ -1027,6 +1108,7 @@ public:
error: function() { $(button).addClass('btn-danger'); }
});
}
+
</script>
)___";
@@ -1061,6 +1143,7 @@ public:
Y_UNUSED(ctx);
}
+ //TODO: move to hive_statics.cpp as utility function
static TString GetTabletType(TTabletTypes::EType type) {
switch(type) {
case TTabletTypes::SchemeShard:
@@ -2880,6 +2963,7 @@ public:
result["VolatileStateChangeTime"] = tablet.VolatileStateChangeTime.ToString();
result["TabletRole"] = TTabletInfo::ETabletRoleName(tablet.TabletRole);
result["LastBalancerDecisionTime"] = tablet.LastBalancerDecisionTime.ToString();
+ result["BalancerPolicy"] = NKikimrHive::EBalancerPolicy_Name(tablet.BalancerPolicy);
result["NodeId"] = tablet.NodeId;
result["LastNodeId"] = tablet.LastNodeId;
result["PreferredNodeId"] = tablet.PreferredNodeId;
diff --git a/ydb/core/mind/hive/tablet_info.cpp b/ydb/core/mind/hive/tablet_info.cpp
index 81af51c3a9..81ad491b22 100644
--- a/ydb/core/mind/hive/tablet_info.cpp
+++ b/ydb/core/mind/hive/tablet_info.cpp
@@ -21,6 +21,7 @@ TTabletInfo::TTabletInfo(ETabletRole role, THive& hive)
, ResourceValues()
, ResourceMetricsAggregates(Hive.GetDefaultResourceMetricsAggregates())
, Weight(0)
+ , BalancerPolicy(EBalancerPolicy::POLICY_BALANCE)
{}
const TLeaderTabletInfo& TTabletInfo::GetLeader() const {
@@ -181,7 +182,9 @@ bool TTabletInfo::IsStopped() const {
}
bool TTabletInfo::IsGoodForBalancer(TInstant now) const {
- return now - LastBalancerDecisionTime > Hive.GetTabletKickCooldownPeriod();
+ return (BalancerPolicy == EBalancerPolicy::POLICY_BALANCE)
+ && !Hive.IsInBalancerIgnoreList(GetTabletType())
+ && (now - LastBalancerDecisionTime > Hive.GetTabletKickCooldownPeriod());
}
bool TTabletInfo::InitiateBoot() {
diff --git a/ydb/core/mind/hive/tablet_info.h b/ydb/core/mind/hive/tablet_info.h
index aa03ecd357..5ab320e759 100644
--- a/ydb/core/mind/hive/tablet_info.h
+++ b/ydb/core/mind/hive/tablet_info.h
@@ -95,6 +95,7 @@ struct TTabletInfo {
friend class TTxMonEvent_TabletInfo;
public:
using EVolatileState = NKikimrHive::ETabletVolatileState;
+ using EBalancerPolicy = NKikimrHive::EBalancerPolicy;
enum class ETabletRole {
Leader,
@@ -158,6 +159,7 @@ public:
double Weight;
mutable TString BootState;
TInstant PostponedStart;
+ EBalancerPolicy BalancerPolicy;
TTabletInfo(ETabletRole role, THive& hive);
TTabletInfo(const TTabletInfo&) = delete;
diff --git a/ydb/core/mind/hive/tx__create_tablet.cpp b/ydb/core/mind/hive/tx__create_tablet.cpp
index 5606a10f22..6590c0abf1 100644
--- a/ydb/core/mind/hive/tx__create_tablet.cpp
+++ b/ydb/core/mind/hive/tx__create_tablet.cpp
@@ -29,6 +29,8 @@ class TTxCreateTablet : public TTransactionBase<THive> {
NKikimrHive::ETabletBootMode BootMode;
NKikimrHive::TForwardRequest ForwardRequest;
+ NKikimrHive::EBalancerPolicy BalancerPolicy;
+
TSideEffects SideEffects;
public:
@@ -48,6 +50,7 @@ public:
, BoundChannels(RequestData.GetBindedChannels().begin(), RequestData.GetBindedChannels().end())
, AllowedDomains(RequestData.GetAllowedDomains().begin(), RequestData.GetAllowedDomains().end())
, BootMode(RequestData.GetTabletBootMode())
+ , BalancerPolicy(RequestData.GetBalancerPolicy())
{
const ui32 allowedNodeIdsSize = RequestData.AllowedNodeIDsSize();
AllowedNodeIds.reserve(allowedNodeIdsSize);
@@ -270,6 +273,8 @@ public:
db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::ObjectID>(ObjectId);
tablet->BootMode = BootMode;
db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::BootMode>(BootMode);
+ tablet->BalancerPolicy = BalancerPolicy;
+ db.Table<Schema::Tablet>().Key(TabletId).Update<Schema::Tablet::BalancerPolicy>(BalancerPolicy);
UpdateChannelsBinding(*tablet, db);
@@ -368,6 +373,7 @@ public:
tablet.ObjectId = ObjectId;
tablet.AssignDomains(ObjectDomain, AllowedDomains);
tablet.Statistics.SetLastAliveTimestamp(now.MilliSeconds());
+ tablet.BalancerPolicy = BalancerPolicy;
TVector<ui32> allowedDataCenters;
for (const TDataCenterId& dc : tablet.AllowedDataCenters) {
@@ -387,7 +393,8 @@ public:
NIceDb::TUpdate<Schema::Tablet::BootMode>(tablet.BootMode),
NIceDb::TUpdate<Schema::Tablet::ObjectID>(tablet.ObjectId),
NIceDb::TUpdate<Schema::Tablet::ObjectDomain>(ObjectDomain),
- NIceDb::TUpdate<Schema::Tablet::Statistics>(tablet.Statistics));
+ NIceDb::TUpdate<Schema::Tablet::Statistics>(tablet.Statistics),
+ NIceDb::TUpdate<Schema::Tablet::BalancerPolicy>(tablet.BalancerPolicy));
Self->PendingCreateTablets.erase({OwnerId, OwnerIdx});
diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h
index 8769068f37..9230290af2 100644
--- a/ydb/core/node_whiteboard/node_whiteboard.h
+++ b/ydb/core/node_whiteboard/node_whiteboard.h
@@ -57,6 +57,7 @@ struct TEvWhiteboard{
EvSignalBodyResponse,
EvPDiskStateDelete,
EvVDiskStateGenerationChange,
+ EvVDiskDropDonors,
EvEnd
};
@@ -220,6 +221,27 @@ struct TEvWhiteboard{
{}
};
+ struct TEvVDiskDropDonors : TEventLocal<TEvVDiskDropDonors, EvVDiskDropDonors> {
+ const TVDiskID VDiskId;
+ const ui64 InstanceGuid;
+ const std::vector<NKikimrBlobStorage::TVSlotId> DropDonors;
+ const bool DropAllDonors = false;
+
+ TEvVDiskDropDonors(TVDiskID vdiskId, ui64 instanceGuid, std::vector<NKikimrBlobStorage::TVSlotId> dropDonors)
+ : VDiskId(vdiskId)
+ , InstanceGuid(instanceGuid)
+ , DropDonors(std::move(dropDonors))
+ {}
+
+ struct TDropAllDonors {};
+
+ TEvVDiskDropDonors(TVDiskID vdiskId, ui64 instanceGuid, TDropAllDonors)
+ : VDiskId(vdiskId)
+ , InstanceGuid(instanceGuid)
+ , DropAllDonors(true)
+ {}
+ };
+
struct TEvPDiskStateDelete : TEventPB<TEvPDiskStateDelete, NKikimrWhiteboard::TPDiskStateInfo, EvPDiskStateDelete> {
TEvPDiskStateDelete() = default;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index ecdbf13e2c..c066022327 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -682,16 +682,29 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
{
if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK ||
- resp.WriteResultSize() != 1 ||
- resp.GetWriteResult(0).GetStatus() != NKikimrProto::OK)
- {
+ resp.WriteResultSize() < 1) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
<< " Config write error: " << resp.DebugString() << " " << ctx.SelfID);
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
return;
}
+ for (const auto& res : resp.GetWriteResult()) {
+ if (res.GetStatus() != NKikimrProto::OK) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
+ << " Config write error: " << resp.DebugString() << " " << ctx.SelfID);
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
+ }
+
+ if (resp.WriteResultSize() > 1) {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
+ << " restarting - have some registering of message groups");
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
- Y_VERIFY(resp.WriteResultSize() == 1);
+ Y_VERIFY(resp.WriteResultSize() >= 1);
Y_VERIFY(resp.GetWriteResult(0).GetStatus() == NKikimrProto::OK);
if (ConfigInited && PartitionsInited == Partitions.size()) //all partitions are working well - can apply new config
ApplyNewConfigAndReply(ctx);
@@ -1269,12 +1282,14 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
}
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
+ }
- for (const auto& partition : cfg.GetPartitions()) {
- sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId());
- }
+ for (const auto& partition : cfg.GetPartitions()) {
+ sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId());
}
+ Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1);
+
NewConfig = cfg;
ctx.Send(ctx.SelfID, request.Release());
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 50a7de9572..eb50087482 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -707,6 +707,7 @@ message TFeatureFlags {
optional bool EnablePredicateExtractForDataQueries = 68 [default = false];
optional bool EnableMoveIndex = 70 [default = false];
optional bool EnableNotNullDataColumns = 73 [default = false];
+ optional bool EnableGrpcAudit = 74 [default = false];
}
@@ -1419,6 +1420,7 @@ message THiveConfig {
optional bool CheckMoveExpediency = 46 [default = true];
optional uint64 StoragePoolFreshPeriod = 47 [default = 60000]; // milliseconds
optional string PoolsToMonitorForUsage = 48 [default = "System,User,IC"];
+ repeated NKikimrTabletBase.TTabletTypes.EType BalancerIgnoreTabletTypes = 49;
}
message TDataShardConfig {
diff --git a/ydb/core/protos/hive.proto b/ydb/core/protos/hive.proto
index 0b6bac0755..659a158eaf 100644
--- a/ydb/core/protos/hive.proto
+++ b/ydb/core/protos/hive.proto
@@ -16,6 +16,11 @@ enum ETabletBootMode {
TABLET_BOOT_MODE_EXTERNAL = 1;
}
+enum EBalancerPolicy {
+ POLICY_BALANCE = 0;
+ POLICY_IGNORE = 1;
+}
+
enum EErrorReason {
// Default unspecified error reason
ERROR_REASON_UNKNOWN = 0;
@@ -95,6 +100,7 @@ message TEvCreateTablet {
repeated NKikimrSubDomains.TDomainKey AllowedDomains = 23;
optional TDataCentersPreference DataCentersPreference = 24;
repeated string AllowedDataCenters = 25;
+ optional EBalancerPolicy BalancerPolicy = 26;
optional uint32 ChannelsProfile = 5 [deprecated = true]; // DEPRECATED
optional uint32 Flags = 6 [deprecated = true]; // DEPRECATED
@@ -458,6 +464,7 @@ message TTabletInfo {
optional NKikimrTabletBase.TMetrics ResourceUsage = 21;
optional uint32 RestartsPerPeriod = 22;
optional uint64 LastAliveTimestamp = 23;
+ optional EBalancerPolicy BalancerPolicy = 24;
}
message TEvSeizeTabletsReply {
diff --git a/ydb/core/tablet/node_whiteboard.cpp b/ydb/core/tablet/node_whiteboard.cpp
index 46be0f95e0..0677480cf3 100644
--- a/ydb/core/tablet/node_whiteboard.cpp
+++ b/ydb/core/tablet/node_whiteboard.cpp
@@ -392,6 +392,7 @@ protected:
HFunc(TEvWhiteboard::TEvVDiskStateGenerationChange, Handle);
HFunc(TEvWhiteboard::TEvVDiskStateDelete, Handle);
HFunc(TEvWhiteboard::TEvVDiskStateRequest, Handle);
+ HFunc(TEvWhiteboard::TEvVDiskDropDonors, Handle);
HFunc(TEvWhiteboard::TEvBSGroupStateUpdate, Handle);
HFunc(TEvWhiteboard::TEvBSGroupStateDelete, Handle);
HFunc(TEvWhiteboard::TEvBSGroupStateRequest, Handle);
@@ -455,6 +456,7 @@ protected:
} else if (const auto it = VDiskStateInfo.find(key); it != VDiskStateInfo.end() &&
it->second.GetInstanceGuid() == record.GetInstanceGuid()) {
auto& value = it->second;
+
if (CheckedMerge(value, record) >= 100) {
value.SetChangeTime(ctx.Now().MilliSeconds());
UpdateSystemState(ctx);
@@ -479,6 +481,37 @@ protected:
}
}
+ void Handle(TEvWhiteboard::TEvVDiskDropDonors::TPtr& ev, const TActorContext& ctx) {
+ auto& msg = *ev->Get();
+ if (const auto it = VDiskStateInfo.find(msg.VDiskId); it != VDiskStateInfo.end() &&
+ it->second.GetInstanceGuid() == msg.InstanceGuid) {
+ auto& value = it->second;
+ bool change = false;
+
+ if (msg.DropAllDonors) {
+ change = !value.GetDonors().empty();
+ value.ClearDonors();
+ } else {
+ for (const auto& donor : msg.DropDonors) {
+ auto *donors = value.MutableDonors();
+ for (int i = 0; i < donors->size(); ++i) {
+ auto& x = donors->at(i);
+ if (x.GetNodeId() == donor.GetNodeId() && x.GetPDiskId() == donor.GetPDiskId() && x.GetVSlotId() == donor.GetVSlotId()) {
+ donors->DeleteSubrange(i, 1);
+ change = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (change) {
+ value.SetChangeTime(ctx.Now().MilliSeconds());
+ UpdateSystemState(ctx);
+ }
+ }
+ }
+
void Handle(TEvWhiteboard::TEvBSGroupStateUpdate::TPtr &ev, const TActorContext &ctx) {
auto& bSGroupStateInfo = BSGroupStateInfo[ev->Get()->Record.GetGroupID()];
if (CheckedMerge(bSGroupStateInfo, ev->Get()->Record) >= 100) {
diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp
index 94a0aea199..7749e41106 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator.cpp
@@ -701,7 +701,7 @@ public:
//
TTabletMon(::NMonitoring::TDynamicCounterPtr counters, bool isFollower, TActorId dbWatcherActorId)
: Counters(GetServiceCounters(counters, isFollower ? "followers" : "tablets"))
- , AllTypes(Counters.Get(), "type", "all")
+ , AllTypes(MakeIntrusive<TTabletCountersForTabletType>(Counters.Get(), "type", "all"))
, IsFollower(isFollower)
, DbWatcherActorId(dbWatcherActorId)
{
@@ -714,9 +714,9 @@ public:
const TTabletCountersBase* executorCounters, const TTabletCountersBase* appCounters,
const TActorContext& ctx)
{
- AllTypes.Apply(tabletID, executorCounters, nullptr, tabletType);
+ AllTypes->Apply(tabletID, executorCounters, nullptr, tabletType);
//
- auto* typeCounters = GetOrAddCountersByTabletType(tabletType, CountersByTabletType, Counters);
+ auto typeCounters = GetOrAddCountersByTabletType(tabletType, CountersByTabletType, Counters);
if (typeCounters) {
typeCounters->Apply(tabletID, executorCounters, appCounters, tabletType);
}
@@ -783,7 +783,7 @@ public:
}
void ForgetTablet(ui64 tabletID, TTabletTypes::EType tabletType, TPathId tenantPathId) {
- AllTypes.Forget(tabletID);
+ AllTypes->Forget(tabletID);
// and now erase from every other path
auto iterTabletType = CountersByTabletType.find(tabletType);
if (iterTabletType != CountersByTabletType.end()) {
@@ -912,13 +912,17 @@ public:
}
void RecalcAll() {
- AllTypes.RecalcAll();
- for (auto& c : CountersByTabletType) {
- c.second->RecalcAll();
+ AllTypes->RecalcAll();
+ for (auto& [_, counters] : CountersByTabletType) {
+ counters->RecalcAll();
}
if (YdbCounters) {
- YdbCounters->Initialize(Counters, CountersByTabletType);
+ auto hasDatashard = (bool)FindCountersByTabletType(
+ TTabletTypes::DataShard, CountersByTabletType);
+ auto hasSchemeshard = (bool)FindCountersByTabletType(
+ TTabletTypes::SchemeShard, CountersByTabletType);
+ YdbCounters->Initialize(Counters, hasDatashard, hasSchemeshard);
YdbCounters->Transform();
}
}
@@ -929,7 +933,7 @@ public:
private:
// subgroups
- class TTabletCountersForTabletType {
+ class TTabletCountersForTabletType : public TThrRefBase {
public:
//
TTabletCountersForTabletType(::NMonitoring::TDynamicCounters* owner, const char* category, const char* name)
@@ -1299,25 +1303,26 @@ private:
TSolomonCounters TabletAppCounters;
};
- typedef TMap<TTabletTypes::EType, TAutoPtr<TTabletCountersForTabletType> > TCountersByTabletType;
+ using TTabletCountersForTabletTypePtr = TIntrusivePtr<TTabletCountersForTabletType>;
+ typedef TMap<TTabletTypes::EType, TTabletCountersForTabletTypePtr> TCountersByTabletType;
- static TTabletCountersForTabletType* FindCountersByTabletType(
+ static TTabletCountersForTabletTypePtr FindCountersByTabletType(
TTabletTypes::EType tabletType,
TCountersByTabletType& countersByTabletType)
{
auto iterTabletType = countersByTabletType.find(tabletType);
if (iterTabletType != countersByTabletType.end()) {
- return iterTabletType->second.Get();
+ return iterTabletType->second;
}
return {};
}
- static TTabletCountersForTabletType* GetOrAddCountersByTabletType(
+ static TTabletCountersForTabletTypePtr GetOrAddCountersByTabletType(
TTabletTypes::EType tabletType,
TCountersByTabletType& countersByTabletType,
::NMonitoring::TDynamicCounterPtr counters)
{
- auto* typeCounters = FindCountersByTabletType(tabletType, countersByTabletType);
+ auto typeCounters = FindCountersByTabletType(tabletType, countersByTabletType);
if (!typeCounters) {
TString tabletTypeStr = TTabletTypes::TypeToStr(tabletType);
typeCounters = new TTabletCountersForTabletType(
@@ -1443,14 +1448,8 @@ private:
"table.datashard.used_core_percents", NMonitoring::LinearHistogram(12, 0, 10), false);
};
- void Initialize(
- ::NMonitoring::TDynamicCounterPtr counters,
- TCountersByTabletType& countersByTabletType)
- {
- auto datashard = FindCountersByTabletType(
- TTabletTypes::DataShard, countersByTabletType);
-
- if (datashard && !RowUpdates) {
+ void Initialize(::NMonitoring::TDynamicCounterPtr counters, bool hasDatashard, bool hasSchemeshard) {
+ if (hasDatashard && !RowUpdates) {
auto datashardGroup = counters->GetSubgroup("type", "DataShard");
auto appGroup = datashardGroup->GetSubgroup("category", "app");
@@ -1474,10 +1473,7 @@ private:
ConsumedCpuHistogram = execGroup->FindHistogram("HIST(ConsumedCPU)");
}
- auto schemeshard = FindCountersByTabletType(
- TTabletTypes::SchemeShard, countersByTabletType);
-
- if (schemeshard && !DiskSpaceTablesTotalBytes) {
+ if (hasSchemeshard && !DiskSpaceTablesTotalBytes) {
auto schemeshardGroup = counters->GetSubgroup("type", "SchemeShard");
auto appGroup = schemeshardGroup->GetSubgroup("category", "app");
@@ -1552,7 +1548,6 @@ public:
public:
TTabletCountersForDb()
: SolomonCounters(new ::NMonitoring::TDynamicCounters)
- , AllTypes(SolomonCounters.Get(), "type", "all")
{}
TTabletCountersForDb(::NMonitoring::TDynamicCounterPtr externalGroup,
@@ -1560,30 +1555,24 @@ public:
THolder<TTabletCountersBase> executorCounters)
: SolomonCounters(internalGroup)
, ExecutorCounters(std::move(executorCounters))
- , AllTypes(SolomonCounters.Get(), "type", "all")
{
YdbCounters = MakeIntrusive<TYdbTabletCounters>(externalGroup);
}
void ToProto(NKikimr::NSysView::TDbServiceCounters& counters) override {
- auto* proto = counters.FindOrAddTabletCounters(TTabletTypes::Unknown);
- AllTypes.ToProto(*proto);
-
- for (auto& [type, tabletCounters] : CountersByTabletType) {
- auto* proto = counters.FindOrAddTabletCounters(type);
- tabletCounters->ToProto(*proto);
+ for (auto& bucket : CountersByTabletType.Buckets) {
+ TWriteGuard guard(bucket.GetLock());
+ for (auto& [type, tabletCounters] : bucket.GetMap()) {
+ auto* proto = counters.FindOrAddTabletCounters(type);
+ tabletCounters->ToProto(*proto);
+ }
}
}
void FromProto(NKikimr::NSysView::TDbServiceCounters& counters) override {
for (auto& proto : *counters.Proto().MutableTabletCounters()) {
auto type = proto.GetType();
- TTabletCountersForTabletType* tabletCounters = {};
- if (type == TTabletTypes::Unknown) {
- tabletCounters = &AllTypes;
- } else {
- tabletCounters = GetOrAddCountersByTabletType(type, CountersByTabletType, SolomonCounters);
- }
+ auto tabletCounters = GetOrAddCounters(type);
if (tabletCounters) {
if (!tabletCounters->IsInitialized()) {
Y_VERIFY(ExecutorCounters.Get());
@@ -1594,7 +1583,9 @@ public:
}
}
if (YdbCounters) {
- YdbCounters->Initialize(SolomonCounters, CountersByTabletType);
+ auto hasDatashard = (bool)GetCounters(TTabletTypes::DataShard);
+ auto hasSchemeshard = (bool)GetCounters(TTabletTypes::SchemeShard);
+ YdbCounters->Initialize(SolomonCounters, hasDatashard, hasSchemeshard);
YdbCounters->Transform();
}
}
@@ -1603,25 +1594,66 @@ public:
const TTabletCountersBase* appCounters, TTabletTypes::EType type,
const TTabletCountersBase* limitedAppCounters)
{
- AllTypes.Apply(tabletId, executorCounters, nullptr, type);
- auto* tabletCounters = GetOrAddCountersByTabletType(type, CountersByTabletType, SolomonCounters);
- if (tabletCounters) {
- tabletCounters->Apply(tabletId, executorCounters, appCounters, type, limitedAppCounters);
+ auto allTypes = GetOrAddCounters(TTabletTypes::Unknown);
+ {
+ TWriteGuard guard(CountersByTabletType.GetBucketForKey(TTabletTypes::Unknown).GetLock());
+ allTypes->Apply(tabletId, executorCounters, nullptr, type);
+ }
+ auto typeCounters = GetOrAddCounters(type);
+ {
+ TWriteGuard guard(CountersByTabletType.GetBucketForKey(type).GetLock());
+ typeCounters->Apply(tabletId, executorCounters, appCounters, type, limitedAppCounters);
}
}
void Forget(ui64 tabletId, TTabletTypes::EType type) {
- if (auto it = CountersByTabletType.find(type); it != CountersByTabletType.end()) {
- it->second->Forget(tabletId);
+ auto allTypes = GetCounters(TTabletTypes::Unknown);
+ if (allTypes) {
+ TWriteGuard guard(CountersByTabletType.GetBucketForKey(TTabletTypes::Unknown).GetLock());
+ allTypes->Forget(tabletId);
+ }
+ auto typeCounters = GetCounters(type);
+ if (typeCounters) {
+ TWriteGuard guard(CountersByTabletType.GetBucketForKey(type).GetLock());
+ typeCounters->Forget(tabletId);
+ }
+ }
+
+ void RecalcAll() {
+ for (auto& bucket : CountersByTabletType.Buckets) {
+ TWriteGuard guard(bucket.GetLock());
+ for (auto& [_, tabletCounters] : bucket.GetMap()) {
+ tabletCounters->RecalcAll();
+ }
+ }
+ }
+
+ private:
+ TTabletCountersForTabletTypePtr GetCounters(TTabletTypes::EType tabletType) {
+ TTabletCountersForTabletTypePtr res;
+ CountersByTabletType.Get(tabletType, res);
+ return res;
+ }
+
+ TTabletCountersForTabletTypePtr GetOrAddCounters(TTabletTypes::EType tabletType) {
+ auto res = GetCounters(tabletType);
+ if (res) {
+ return res;
}
+ res = CountersByTabletType.InsertIfAbsentWithInit(tabletType, [this, tabletType] {
+ TString type = (tabletType == TTabletTypes::Unknown) ?
+ "all" : TTabletTypes::TypeToStr(tabletType);
+ return MakeIntrusive<TTabletCountersForTabletType>(
+ SolomonCounters.Get(), "type", type.data());
+ });
+ return res;
}
private:
::NMonitoring::TDynamicCounterPtr SolomonCounters;
THolder<TTabletCountersBase> ExecutorCounters;
- TTabletCountersForTabletType AllTypes;
- TCountersByTabletType CountersByTabletType;
+ TConcurrentRWHashMap<TTabletTypes::EType, TTabletCountersForTabletTypePtr, 16> CountersByTabletType;
TYdbTabletCountersPtr YdbCounters;
};
@@ -1666,7 +1698,7 @@ private:
private:
//
::NMonitoring::TDynamicCounterPtr Counters;
- TTabletCountersForTabletType AllTypes;
+ TTabletCountersForTabletTypePtr AllTypes;
bool IsFollower = false;
typedef THashMap<TPathId, TIntrusivePtr<TTabletCountersForDb>> TCountersByPathId;
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 7bf2b23d89..95f2d8c0e9 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -38,6 +38,7 @@ public:
FEATURE_FLAG_SETTER(EnableMoveIndex)
FEATURE_FLAG_SETTER(EnableNotNullDataColumns)
FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard)
+ FEATURE_FLAG_SETTER(EnableGrpcAudit)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index ad91e488ec..0e26490418 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -170,6 +170,12 @@ TOlapStoreInfo::TPtr CreateOlapStore(const NKikimrSchemeOp::TColumnStoreDescript
return nullptr;
}
+ if (op.GetColumnShardCount() == 0) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = Sprintf("trying to create OLAP store without zero shards");
+ return nullptr;
+ }
+
op.SetNextSchemaPresetId(1);
op.SetNextTtlSettingsPresetId(1);
diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp
index 7d7729b76f..12e2c9bb97 100644
--- a/ydb/core/viewer/viewer.cpp
+++ b/ydb/core/viewer/viewer.cpp
@@ -451,6 +451,7 @@ private:
}
if (IsMatchesWildcard(filename, "monitoring*/resources/js/*")
|| IsMatchesWildcard(filename, "monitoring*/resources/css/*")
+ || IsMatchesWildcard(filename, "monitoring*/resources/media/*")
|| IsMatchesWildcard(filename, "monitoring*/resources/assets/fonts/*")
|| IsMatchesWildcard(filename, "monitoring*/resources/favicon.png")) {
auto resPos = filename.find("/resources/");
diff --git a/ydb/core/ymq/actor/queue_schema.cpp b/ydb/core/ymq/actor/queue_schema.cpp
index 90bcb11c45..616843185e 100644
--- a/ydb/core/ymq/actor/queue_schema.cpp
+++ b/ydb/core/ymq/actor/queue_schema.cpp
@@ -1065,10 +1065,11 @@ void TCreateQueueSchemaActorV2::MatchQueueAttributes(
) {
Become(&TCreateQueueSchemaActorV2::MatchAttributes);
+ const TString existingQueueName = IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName;
TDbQueriesMaker queryMaker(
Cfg().GetRoot(),
QueuePath_.UserName,
- QueuePath_.QueueName,
+ existingQueueName,
currentVersion,
IsFifo_,
0,
@@ -1085,7 +1086,7 @@ void TCreateQueueSchemaActorV2::MatchQueueAttributes(
TParameters(trans->MutableParams()->MutableProto())
.Uint64("QUEUE_ID_NUMBER", currentVersion)
.Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(currentVersion))
- .Utf8("NAME", IsCloudMode_ ? ExistingQueueResourceId_ : QueuePath_.QueueName)
+ .Utf8("NAME", existingQueueName)
.Bool("FIFO", IsFifo_)
.Uint64("SHARDS", RequiredShardsCount_)
.Uint64("PARTITIONS", Request_.GetPartitions())
diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt
index 9096201adf..a3e2b117d7 100644
--- a/ydb/public/api/protos/CMakeLists.txt
+++ b/ydb/public/api/protos/CMakeLists.txt
@@ -65,6 +65,11 @@ generate_enum_serilization(api-protos
INCLUDE_HEADERS
ydb/public/api/protos/draft/datastreams.pb.h
)
+generate_enum_serilization(api-protos
+ ${CMAKE_BINARY_DIR}/ydb/public/api/protos/ydb_topic.pb.h
+ INCLUDE_HEADERS
+ ydb/public/api/protos/ydb_topic.pb.h
+)
target_proto_addincls(api-protos
./
${CMAKE_SOURCE_DIR}/
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 99b026acee..44e5d71a23 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -256,6 +256,7 @@ private:
NKikimr::NPQ::TMultiCounter SessionsActive;
NKikimr::NPQ::TMultiCounter Errors;
+ std::vector<NKikimr::NPQ::TMultiCounter> CodecCounters;
TIntrusivePtr<NACLib::TUserToken> Token;
TString Auth;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 357f003ebd..48571cacec 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -490,6 +490,16 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters()
SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"SessionsActive"}, false);
Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"Errors"}, true);
+ CodecCounters.push_back(NKikimr::NPQ::TMultiCounter(subGroup, aggr, {{"codec", "user"}}, {"MessagesWrittenByCodec"}, true));
+
+ auto allNames = GetEnumAllCppNames<Ydb::Topic::Codec>();
+ allNames.erase(allNames.begin());
+ allNames.pop_back();
+ allNames.pop_back();
+ for (auto &name : allNames) {
+ auto nm = to_lower(name).substr(18);
+ CodecCounters.push_back(NKikimr::NPQ::TMultiCounter(subGroup, aggr, {{"codec", nm}}, {"MessagesWrittenByCodec"}, true));
+ }
SessionsCreated.Inc();
SessionsActive.Inc();
}
@@ -511,6 +521,7 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou
SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name");
Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name");
+
SessionsCreated.Inc();
SessionsActive.Inc();
}
@@ -1388,6 +1399,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx);
return false;
}
+ ui32 intCodec = ((ui32)codecID + 1) < CodecCounters.size() ? ((ui32)codecID + 1) : 0;
+ if (CodecCounters.size() > intCodec) {
+ CodecCounters[intCodec].Inc();
+ }
if (data.blocks_message_counts(messageIndex) != 1) {
CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex)
@@ -1426,6 +1441,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
CloseSession(TStringBuilder() << "bad write request - codec is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx);
return false;
}
+ ui32 intCodec = codecID < CodecCounters.size() ? codecID : 0;
+ if (CodecCounters.size() > intCodec) {
+ CodecCounters[intCodec].Inc();
+ }
return true;
};
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index d334982ad1..8e9affa949 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -2323,6 +2323,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
{
"BytesInflight",
"BytesInflightTotal",
+ "MessagesWrittenByCodec",
"Errors",
"SessionsActive",
"SessionsCreated",
diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp
index f2e2407cab..9fca08cd0d 100644
--- a/ydb/services/ydb/ydb_logstore_ut.cpp
+++ b/ydb/services/ydb/ydb_logstore_ut.cpp
@@ -101,6 +101,16 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto res = logStoreClient.DropLogStore("/Root/LogStore").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}
+
+ // negative
+ {
+ NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey());
+ THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets;
+ schemaPresets["default"] = logSchema;
+ NYdb::NLogStore::TLogStoreDescription storeDescr(0, schemaPresets);
+ auto res = logStoreClient.CreateLogStore("/Root/LogStore1", std::move(storeDescr)).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SCHEME_ERROR, res.GetIssues().ToString());
+ }
}
Y_UNIT_TEST(LogStoreTiers) {
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema
index f55ef6ca43..3d579d78f8 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_hive_/flat_hive.schema
@@ -214,6 +214,11 @@
"ColumnType": "String"
},
{
+ "ColumnId": 123,
+ "ColumnName": "BalancerPolicy",
+ "ColumnType": "Uint64"
+ },
+ {
"ColumnId": 7,
"ColumnName": "AllowedNodes",
"ColumnType": "String"
@@ -286,6 +291,7 @@
121,
5,
122,
+ 123,
7,
11,
13,