diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-24 17:07:36 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-24 17:07:36 +0300 |
commit | 833d012d60f469736200a888fc73dda823a9be4b (patch) | |
tree | 43189964556d15bc601c4a647aed37255ba166f4 | |
parent | 592893d573feb609a369e392a16db268c39edb80 (diff) | |
download | ydb-833d012d60f469736200a888fc73dda823a9be4b.tar.gz |
Написать тест на datastreams put-records запрос
1) --В SchemeShard-е не учитывалась StateVersion , из-за которой не обрабатывалось изменение DiskQuotaExceeded для корневого SubDomain. ИМХО, это неожиданное поведение.--
Договорились. что неочевидное поведение является незадокументированной фичей, и не надо чинить ничего для корневого субдомена. Переписал тесты.
2) В SchemeShard-е поправлено логирование - слияне слова path и его значения
3) В ReadBalancer уточнена логика для запуска и перезапуска отслеживания изменений SubDomen-а при обновлении конфигурации ReadBalancer-а
4) Добавлены тесты на запись в топик при DiskQuotaExceeded=true
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/type_codecs_defs.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/type_coders.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 114 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp | 2 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 1 |
6 files changed, 116 insertions, 9 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index f692d87b4e..2491ddc335 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -615,7 +615,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Execute(new TTxWrite(this, std::move(deletedPartitions), std::move(newPartitions), std::move(newTablets), std::move(newGroups), std::move(reallocatedTablets)), ctx); - if (WatchingSubDomainPathId && SubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) { + if (SubDomainPathId && (!WatchingSubDomainPathId || *WatchingSubDomainPathId != *SubDomainPathId)) { StartWatchingSubDomainPathId(); } } diff --git a/ydb/core/persqueue/type_codecs_defs.h b/ydb/core/persqueue/type_codecs_defs.h index 90c55d4b89..6b35c2b23b 100644 --- a/ydb/core/persqueue/type_codecs_defs.h +++ b/ydb/core/persqueue/type_codecs_defs.h @@ -134,8 +134,8 @@ protected: virtual void DoAddNull() = 0; protected: - ui32 ValuesConsumed; - size_t ConsumedSize; + ui32 ValuesConsumed = 0; + size_t ConsumedSize = 0; }; /***************************************************************************//** diff --git a/ydb/core/persqueue/type_coders.h b/ydb/core/persqueue/type_coders.h index 26c2566d32..32569eba8c 100644 --- a/ydb/core/persqueue/type_coders.h +++ b/ydb/core/persqueue/type_coders.h @@ -151,7 +151,7 @@ public: inline size_t Save(TFlatBlobDataOutputStream* output, TType value) { const auto outValue = static_cast<i64>(value); // TODO: fix out_long(i32) - char varIntOut[sizeof(outValue) + 1]; + char varIntOut[sizeof(outValue) + 1] {}; auto bytes = out_long(outValue, varIntOut); output->Write(varIntOut, bytes); return bytes; diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index dca1599b8d..ddb894bf81 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -205,7 +205,7 @@ protected: void SendSubDomainStatus(bool subDomainOutOfSpace = false); void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false); void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true); - void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data); + void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data, bool ignoreQuotaDeadline = false); TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; @@ -480,7 +480,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release())); } -void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data) +void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data, bool ignoreQuotaDeadline) { TEvPQ::TEvWrite::TMsg msg; msg.SourceId = "SourceId"; @@ -496,7 +496,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const msg.PartitionKey = "PartitionKey"; msg.ExplicitHashKey = "ExplicitHashKey"; msg.External = false; - msg.IgnoreQuotaDeadline = false; + msg.IgnoreQuotaDeadline = ignoreQuotaDeadline; TVector<TEvPQ::TEvWrite::TMsg> msgs; msgs.push_back(msg); @@ -1364,6 +1364,57 @@ Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture) Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture) { Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300); + + CreatePartition({ + .Partition=1, + .Begin=0, .End=10, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + + SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1)); + UNIT_ASSERT(ownerEvent != nullptr); + auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + + TAutoPtr<IEventHandle> handle; + std::function<bool(const TEvPQ::TEvError&)> truth = [&](const TEvPQ::TEvError& e) { return cookie == e.Cookie; }; + + TString data = "data for write"; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); + messageNo++; + + SendDiskStatusResponse(); + + // Second message will not be processed because the limit is exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); + messageNo++; + + SendDiskStatusResponse(); + auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvError>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode); +} + +Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + // disable write request expiration while thes wait quota + Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(0); CreatePartition({ .Partition=1, @@ -1413,6 +1464,63 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture) event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); UNIT_ASSERT(event != nullptr); + UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response.GetStatus()); +} + +Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300); + + CreatePartition({ + .Partition=1, + .Begin=0, .End=10, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + + SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1)); + UNIT_ASSERT(ownerEvent != nullptr); + auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + + TAutoPtr<IEventHandle> handle; + std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; }; + + TString data = "data for write"; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true); + messageNo++; + + SendDiskStatusResponse(); + + // Second message will not be processed because the limit is exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true); + messageNo++; + + SendDiskStatusResponse(); + auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event == nullptr); + + // SudDomain quota available - second message will be processed.. + SendSubDomainStatus(false); + SendDiskStatusResponse(); + + event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response.GetStatus()); } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp index 0cc2f9cb8b..f5a58df674 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp @@ -86,7 +86,7 @@ public: LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TCreateSubDomain Propose" - << ", path" << parentPathStr << "/" << name + << ", path: " << parentPathStr << "/" << name << ", opId: " << OperationId << ", at schemeshard: " << ssId); diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index c7899e0750..a13cec91c5 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -79,7 +79,6 @@ public: limit->SetMinStorageMegabytes(50_KB); limit->SetMaxStorageMegabytes(1_MB); - MeteringFile = MakeHolder<TTempFileHandle>(); appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name()); |