aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-24 17:07:36 +0300
committertesseract <tesseract@yandex-team.com>2023-03-24 17:07:36 +0300
commit833d012d60f469736200a888fc73dda823a9be4b (patch)
tree43189964556d15bc601c4a647aed37255ba166f4
parent592893d573feb609a369e392a16db268c39edb80 (diff)
downloadydb-833d012d60f469736200a888fc73dda823a9be4b.tar.gz
Написать тест на datastreams put-records запрос
1) --В SchemeShard-е не учитывалась StateVersion , из-за которой не обрабатывалось изменение DiskQuotaExceeded для корневого SubDomain. ИМХО, это неожиданное поведение.-- Договорились. что неочевидное поведение является незадокументированной фичей, и не надо чинить ничего для корневого субдомена. Переписал тесты. 2) В SchemeShard-е поправлено логирование - слияне слова path и его значения 3) В ReadBalancer уточнена логика для запуска и перезапуска отслеживания изменений SubDomen-а при обновлении конфигурации ReadBalancer-а 4) Добавлены тесты на запись в топик при DiskQuotaExceeded=true
-rw-r--r--ydb/core/persqueue/read_balancer.cpp2
-rw-r--r--ydb/core/persqueue/type_codecs_defs.h4
-rw-r--r--ydb/core/persqueue/type_coders.h2
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp114
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp2
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp1
6 files changed, 116 insertions, 9 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index f692d87b4e..2491ddc335 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -615,7 +615,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Execute(new TTxWrite(this, std::move(deletedPartitions), std::move(newPartitions), std::move(newTablets), std::move(newGroups), std::move(reallocatedTablets)), ctx);
- if (WatchingSubDomainPathId && SubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) {
+ if (SubDomainPathId && (!WatchingSubDomainPathId || *WatchingSubDomainPathId != *SubDomainPathId)) {
StartWatchingSubDomainPathId();
}
}
diff --git a/ydb/core/persqueue/type_codecs_defs.h b/ydb/core/persqueue/type_codecs_defs.h
index 90c55d4b89..6b35c2b23b 100644
--- a/ydb/core/persqueue/type_codecs_defs.h
+++ b/ydb/core/persqueue/type_codecs_defs.h
@@ -134,8 +134,8 @@ protected:
virtual void DoAddNull() = 0;
protected:
- ui32 ValuesConsumed;
- size_t ConsumedSize;
+ ui32 ValuesConsumed = 0;
+ size_t ConsumedSize = 0;
};
/***************************************************************************//**
diff --git a/ydb/core/persqueue/type_coders.h b/ydb/core/persqueue/type_coders.h
index 26c2566d32..32569eba8c 100644
--- a/ydb/core/persqueue/type_coders.h
+++ b/ydb/core/persqueue/type_coders.h
@@ -151,7 +151,7 @@ public:
inline size_t Save(TFlatBlobDataOutputStream* output, TType value) {
const auto outValue = static_cast<i64>(value); // TODO: fix out_long(i32)
- char varIntOut[sizeof(outValue) + 1];
+ char varIntOut[sizeof(outValue) + 1] {};
auto bytes = out_long(outValue, varIntOut);
output->Write(varIntOut, bytes);
return bytes;
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index dca1599b8d..ddb894bf81 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -205,7 +205,7 @@ protected:
void SendSubDomainStatus(bool subDomainOutOfSpace = false);
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
- void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data);
+ void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data, bool ignoreQuotaDeadline = false);
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
@@ -480,7 +480,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con
Ctx->Runtime->SingleSys()->Send(new IEventHandleFat(ActorId, Ctx->Edge, event.Release()));
}
-void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data)
+void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data, bool ignoreQuotaDeadline)
{
TEvPQ::TEvWrite::TMsg msg;
msg.SourceId = "SourceId";
@@ -496,7 +496,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const
msg.PartitionKey = "PartitionKey";
msg.ExplicitHashKey = "ExplicitHashKey";
msg.External = false;
- msg.IgnoreQuotaDeadline = false;
+ msg.IgnoreQuotaDeadline = ignoreQuotaDeadline;
TVector<TEvPQ::TEvWrite::TMsg> msgs;
msgs.push_back(msg);
@@ -1364,6 +1364,57 @@ Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)
Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
+ Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
+
+ CreatePartition({
+ .Partition=1,
+ .Begin=0, .End=10,
+ //
+ // partition configuration
+ //
+ .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+ },
+ //
+ // tablet configuration
+ //
+ {.Version=2, .Consumers={{.Consumer="client-1"}}});
+
+ SendSubDomainStatus(true);
+
+ ui64 cookie = 1;
+ ui64 messageNo = 0;
+
+ SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
+ auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
+ UNIT_ASSERT(ownerEvent != nullptr);
+ auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+
+ TAutoPtr<IEventHandle> handle;
+ std::function<bool(const TEvPQ::TEvError&)> truth = [&](const TEvPQ::TEvError& e) { return cookie == e.Cookie; };
+
+ TString data = "data for write";
+
+ // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
+ messageNo++;
+
+ SendDiskStatusResponse();
+
+ // Second message will not be processed because the limit is exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
+ messageNo++;
+
+ SendDiskStatusResponse();
+ auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvError>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(event != nullptr);
+ UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode);
+}
+
+Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture)
+{
+ Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
+ // disable write request expiration while thes wait quota
+ Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(0);
CreatePartition({
.Partition=1,
@@ -1413,6 +1464,63 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)
event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
+ UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response.GetStatus());
+}
+
+Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture)
+{
+ Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
+ Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
+
+ CreatePartition({
+ .Partition=1,
+ .Begin=0, .End=10,
+ //
+ // partition configuration
+ //
+ .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+ },
+ //
+ // tablet configuration
+ //
+ {.Version=2, .Consumers={{.Consumer="client-1"}}});
+
+ SendSubDomainStatus(true);
+
+ ui64 cookie = 1;
+ ui64 messageNo = 0;
+
+ SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
+ auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
+ UNIT_ASSERT(ownerEvent != nullptr);
+ auto ownerCookie = ownerEvent->Response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+
+ TAutoPtr<IEventHandle> handle;
+ std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; };
+
+ TString data = "data for write";
+
+ // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
+ messageNo++;
+
+ SendDiskStatusResponse();
+
+ // Second message will not be processed because the limit is exceeded.
+ SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
+ messageNo++;
+
+ SendDiskStatusResponse();
+ auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(event == nullptr);
+
+ // SudDomain quota available - second message will be processed..
+ SendSubDomainStatus(false);
+ SendDiskStatusResponse();
+
+ event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
+ UNIT_ASSERT(event != nullptr);
+ UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response.GetStatus());
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
index 0cc2f9cb8b..f5a58df674 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp
@@ -86,7 +86,7 @@ public:
LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSubDomain Propose"
- << ", path" << parentPathStr << "/" << name
+ << ", path: " << parentPathStr << "/" << name
<< ", opId: " << OperationId
<< ", at schemeshard: " << ssId);
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index c7899e0750..a13cec91c5 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -79,7 +79,6 @@ public:
limit->SetMinStorageMegabytes(50_KB);
limit->SetMaxStorageMegabytes(1_MB);
-
MeteringFile = MakeHolder<TTempFileHandle>();
appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name());