aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-17 16:18:23 +0300
committertesseract <tesseract@yandex-team.com>2023-03-17 16:18:23 +0300
commit0443bb163c49758b592a0fbaa67a26b791e233f1 (patch)
treee931e918795cecae1483ff0c5390c87eeb755d08
parentd20dfd4837a75ee004cde1ad32f1593453a24d9e (diff)
downloadydb-0443bb163c49758b592a0fbaa67a26b791e233f1.tar.gz
Добавить тесты на обработку в partition ситуации, когда закончилось место в SubDomain
Reserve test intermediate
-rw-r--r--ydb/core/persqueue/partition.cpp3
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp5
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h2
-rw-r--r--ydb/core/persqueue/ut/make_config.cpp2
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp148
5 files changed, 155 insertions, 5 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index e8f07bbe09..17dc5592bc 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -4858,6 +4858,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){
return sum + msg.Data.size();
});
+
bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom();
if (mirroredPartition && !ev->Get()->OwnerCookie.empty()) {
@@ -5731,7 +5732,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
bool haveData = false;
bool haveCheckDisk = false;
- if (!Requests.empty() && (DiskIsFull || WaitingForSubDomainQuota(ctx))) {
+ if (!Requests.empty() && DiskIsFull) {
CancelAllWritesOnIdle(ctx);
AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels());
haveCheckDisk = true;
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index e59a7d1c2d..d6be9c2d75 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -972,14 +972,13 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co
write->SetValue(idataDeprecated.Data(), idataDeprecated.Size());
}
-TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) {
+THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) {
runtime.ResetScheduledCount();
TActorId sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(balancerId, sender, new TEvPersQueue::TEvStatus(), 0, GetPipeConfigWithRetries());
- TAutoPtr<IEventHandle> handle;
- return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(handle, TDuration::Seconds(2));
+ return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
}
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index fedb783c0f..2f3fdf8a95 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -482,6 +482,6 @@ void CmdWrite(
bool treatBadOffsetAsError = true,
bool disableDeduplication = false);
-TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
+THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp
index b2d16c6300..ceeed50b44 100644
--- a/ydb/core/persqueue/ut/make_config.cpp
+++ b/ydb/core/persqueue/ut/make_config.cpp
@@ -25,6 +25,8 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
config.SetLocalDC(true);
config.SetYdbDatabasePath("");
+ config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+
return config;
}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index ad3f976ce7..dca1599b8d 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -202,6 +202,11 @@ protected:
ui64 begin, ui64 end,
TMaybe<bool> predicate = Nothing());
+ void SendSubDomainStatus(bool subDomainOutOfSpace = false);
+ void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
+ void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
+ void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data);
+
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
@@ -461,6 +466,51 @@ void TPartitionFixture::SendCmdWriteResponse(NMsgBusProxy::EResponseStatus statu
Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
}
+void TPartitionFixture::SendSubDomainStatus(bool subDomainOutOfSpace)
+{
+ auto event = MakeHolder<TEvPQ::TEvSubDomainStatus>();
+ event->Record.SetSubDomainOutOfSpace(subDomainOutOfSpace);
+
+ Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest)
+{
+ auto event = MakeHolder<TEvPQ::TEvReserveBytes>(cookie, size, ownerCookie, messageNo, lastRequest);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data)
+{
+ TEvPQ::TEvWrite::TMsg msg;
+ msg.SourceId = "SourceId";
+ msg.SeqNo = messageNo;
+ msg.PartNo = 0;
+ msg.TotalParts = 1;
+ msg.TotalSize = data.size();
+ msg.CreateTimestamp = TMonotonic::Now().Seconds();
+ msg.ReceiveTimestamp = TMonotonic::Now().Seconds();
+ msg.DisableDeduplication = false;
+ msg.Data = data;
+ msg.UncompressedSize = data.size();
+ msg.PartitionKey = "PartitionKey";
+ msg.ExplicitHashKey = "ExplicitHashKey";
+ msg.External = false;
+ msg.IgnoreQuotaDeadline = false;
+
+ TVector<TEvPQ::TEvWrite::TMsg> msgs;
+ msgs.push_back(msg);
+
+ auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TPartitionFixture::SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force)
+{
+ auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
+}
+
void TPartitionFixture::WaitProxyResponse(const TProxyResponseMatcher& matcher)
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>();
@@ -1267,6 +1317,104 @@ Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture)
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
}
+Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)
+{
+ Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
+
+ CreatePartition({
+ .Partition=1,
+ .Begin=0, .End=10,
+ //
+ // partition configuration
+ //
+ .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+ },
+ //
+ // tablet configuration
+ //
+ {.Version=2, .Consumers={{.Consumer="client-1"}}});
+
+ SendSubDomainStatus(true);
+
+ ui64 cookie = 1;
+ ui64 messageNo = 0;
+
+ SendChangeOwner(cookie, "owner1", Ctx->Edge);
+ auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
+ UNIT_ASSERT(ownerEvent != nullptr);
+ auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+
+ TAutoPtr<IEventHandle> handle;
+ std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; };
+
+ // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
+ SendReserveBytes(++cookie, 7, ownerCookie, messageNo++);
+
+ // Second message will not be processed because the limit is exceeded.
+ SendReserveBytes(++cookie, 13, ownerCookie, messageNo++);
+ auto reserveEvent = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(reserveEvent == nullptr);
+
+ // SudDomain quota available - second message will be processed..
+ SendSubDomainStatus(false);
+ reserveEvent = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(reserveEvent != nullptr);
+}
+
+Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)
+{
+ Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
+
+ CreatePartition({
+ .Partition=1,
+ .Begin=0, .End=10,
+ //
+ // partition configuration
+ //
+ .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+ },
+ //
+ // tablet configuration
+ //
+ {.Version=2, .Consumers={{.Consumer="client-1"}}});
+
+ SendSubDomainStatus(true);
+
+ ui64 cookie = 1;
+ ui64 messageNo = 0;
+
+ SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
+ auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
+ UNIT_ASSERT(ownerEvent != nullptr);
+ auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+
+ TAutoPtr<IEventHandle> handle;
+ std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; };
+
+ TString data = "data for write";
+
+ // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
+ messageNo++;
+
+ SendDiskStatusResponse();
+
+ // Second message will not be processed because the limit is exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
+ messageNo++;
+
+ SendDiskStatusResponse();
+ auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(event == nullptr);
+
+ // SudDomain quota available - second message will be processed..
+ SendSubDomainStatus(false);
+ SendDiskStatusResponse();
+
+ event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(event != nullptr);
+}
+
}
}