diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-17 16:18:23 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-17 16:18:23 +0300 |
commit | 0443bb163c49758b592a0fbaa67a26b791e233f1 (patch) | |
tree | e931e918795cecae1483ff0c5390c87eeb755d08 | |
parent | d20dfd4837a75ee004cde1ad32f1593453a24d9e (diff) | |
download | ydb-0443bb163c49758b592a0fbaa67a26b791e233f1.tar.gz |
Добавить тесты на обработку в partition ситуации, когда закончилось место в SubDomain
Reserve test
intermediate
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/make_config.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 148 |
5 files changed, 155 insertions, 5 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index e8f07bbe09..17dc5592bc 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -4858,6 +4858,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){ return sum + msg.Data.size(); }); + bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom(); if (mirroredPartition && !ev->Get()->OwnerCookie.empty()) { @@ -5731,7 +5732,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) { bool haveData = false; bool haveCheckDisk = false; - if (!Requests.empty() && (DiskIsFull || WaitingForSubDomainQuota(ctx))) { + if (!Requests.empty() && DiskIsFull) { CancelAllWritesOnIdle(ctx); AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); haveCheckDisk = true; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index e59a7d1c2d..d6be9c2d75 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -972,14 +972,13 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co write->SetValue(idataDeprecated.Data(), idataDeprecated.Size()); } -TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) { +THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) { runtime.ResetScheduledCount(); TActorId sender = runtime.AllocateEdgeActor(); runtime.SendToPipe(balancerId, sender, new TEvPersQueue::TEvStatus(), 0, GetPipeConfigWithRetries()); - TAutoPtr<IEventHandle> handle; - return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(handle, TDuration::Seconds(2)); + return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2)); } } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index fedb783c0f..2f3fdf8a95 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -482,6 +482,6 @@ void CmdWrite( bool treatBadOffsetAsError = true, bool disableDeduplication = false); -TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId); +THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId); } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp index b2d16c6300..ceeed50b44 100644 --- a/ydb/core/persqueue/ut/make_config.cpp +++ b/ydb/core/persqueue/ut/make_config.cpp @@ -25,6 +25,8 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, config.SetLocalDC(true); config.SetYdbDatabasePath(""); + config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + return config; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index ad3f976ce7..dca1599b8d 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -202,6 +202,11 @@ protected: ui64 begin, ui64 end, TMaybe<bool> predicate = Nothing()); + void SendSubDomainStatus(bool subDomainOutOfSpace = false); + void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false); + void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true); + void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data); + TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; @@ -461,6 +466,51 @@ void TPartitionFixture::SendCmdWriteResponse(NMsgBusProxy::EResponseStatus statu Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); } +void TPartitionFixture::SendSubDomainStatus(bool subDomainOutOfSpace) +{ + auto event = MakeHolder<TEvPQ::TEvSubDomainStatus>(); + event->Record.SetSubDomainOutOfSpace(subDomainOutOfSpace); + + Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); +} + +void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest) +{ + auto event = MakeHolder<TEvPQ::TEvReserveBytes>(cookie, size, ownerCookie, messageNo, lastRequest); + Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); +} + +void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data) +{ + TEvPQ::TEvWrite::TMsg msg; + msg.SourceId = "SourceId"; + msg.SeqNo = messageNo; + msg.PartNo = 0; + msg.TotalParts = 1; + msg.TotalSize = data.size(); + msg.CreateTimestamp = TMonotonic::Now().Seconds(); + msg.ReceiveTimestamp = TMonotonic::Now().Seconds(); + msg.DisableDeduplication = false; + msg.Data = data; + msg.UncompressedSize = data.size(); + msg.PartitionKey = "PartitionKey"; + msg.ExplicitHashKey = "ExplicitHashKey"; + msg.External = false; + msg.IgnoreQuotaDeadline = false; + + TVector<TEvPQ::TEvWrite::TMsg> msgs; + msgs.push_back(msg); + + auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false); + Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); +} + +void TPartitionFixture::SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force) +{ + auto event = MakeHolder<TEvPQ::TEvChangeOwner>(cookie, owner, pipeClient, Ctx->Edge, force); + Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); +} + void TPartitionFixture::WaitProxyResponse(const TProxyResponseMatcher& matcher) { auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(); @@ -1267,6 +1317,104 @@ Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture) SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); } +Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + + CreatePartition({ + .Partition=1, + .Begin=0, .End=10, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + + SendChangeOwner(cookie, "owner1", Ctx->Edge); + auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1)); + UNIT_ASSERT(ownerEvent != nullptr); + auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + + TAutoPtr<IEventHandle> handle; + std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; }; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendReserveBytes(++cookie, 7, ownerCookie, messageNo++); + + // Second message will not be processed because the limit is exceeded. + SendReserveBytes(++cookie, 13, ownerCookie, messageNo++); + auto reserveEvent = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(reserveEvent == nullptr); + + // SudDomain quota available - second message will be processed.. + SendSubDomainStatus(false); + reserveEvent = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(reserveEvent != nullptr); +} + +Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + + CreatePartition({ + .Partition=1, + .Begin=0, .End=10, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + + SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1)); + UNIT_ASSERT(ownerEvent != nullptr); + auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + + TAutoPtr<IEventHandle> handle; + std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; }; + + TString data = "data for write"; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); + messageNo++; + + SendDiskStatusResponse(); + + // Second message will not be processed because the limit is exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); + messageNo++; + + SendDiskStatusResponse(); + auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event == nullptr); + + // SudDomain quota available - second message will be processed.. + SendSubDomainStatus(false); + SendDiskStatusResponse(); + + event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); +} + } } |