diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-05-26 16:22:26 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@yandex-team.ru> | 2022-05-26 16:22:26 +0300 |
commit | 79dae787b59bbf5e3c408af595165012389ffba9 (patch) | |
tree | b88c465112adc489544c6c15e385e222df589df0 | |
parent | a7b08e6b8df6e316f6630f8d8ddbf7c99f28bcdf (diff) | |
download | ydb-79dae787b59bbf5e3c408af595165012389ffba9.tar.gz |
KIKIMR-13251 Restrict internal_read counters for no consumer case
Restrict internal_read counter for no consumer case
Add http_proxy test with reference counters' json files
ref:14c4a707fb2a91f4171d1ac54c7e30f9331656cc
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 127 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 58 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 2 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.h | 2 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.cpp | 2 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 2 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_common_ut.h | 7 |
8 files changed, 99 insertions, 108 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index ea0b2f7aa2..5c5569980f 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -618,12 +618,12 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse); - HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle); HFunc(TEvents::TEvWakeup, HandleTimeout); - HFunc(TEvServerlessProxy::TEvToken, HandleToken); - HFunc(TEvServerlessProxy::TEvError, HandleError); HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady); + HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle); + HFunc(TEvServerlessProxy::TEvError, HandleError); + HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse); + HFunc(TEvServerlessProxy::TEvToken, HandleToken); default: HandleUnexpectedEvent(ev, ctx); break; @@ -654,23 +654,49 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") .AuthToken(HttpContext.IamToken) .DiscoveryMode(NYdb::EDiscoveryMode::Async); - if (!HttpContext.DatabaseName.empty()) { - if (!HttpContext.ServiceConfig.GetTestMode()) { - clientSettings.Database(HttpContext.DatabaseName); - } + if (!HttpContext.DatabaseName.empty() && !HttpContext.ServiceConfig.GetTestMode()) { + clientSettings.Database(HttpContext.DatabaseName); } Y_VERIFY(!Client); Client.Reset(new TDataStreamsClient(*HttpContext.Driver, clientSettings)); DiscoveryFuture = MakeHolder<NThreading::TFuture<void>>(Client->DiscoveryCompleted()); - DiscoveryFuture->Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<void>&) { - actorSystem->Send(actorId, new TEvServerlessProxy::TEvClientReady()); - }); + DiscoveryFuture->Subscribe( + [actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()] (const NThreading::TFuture<void>&) { + actorSystem->Send(actorId, new TEvServerlessProxy::TEvClientReady()); + }); } void HandleClientReady(TEvServerlessProxy::TEvClientReady::TPtr&, const TActorContext& ctx){ - SendGrpcRequest(ctx); + HttpContext.Driver ? SendGrpcRequest(ctx) : SendGrpcRequestNoDriver(ctx); } + void SendGrpcRequestNoDriver(const TActorContext& ctx) { + RequestState = StateGrpcRequest; + LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, + "sending grpc request to '" << HttpContext.DiscoveryEndpoint << + "' database: '" << HttpContext.DatabaseName << + "' iam token size: " << HttpContext.IamToken.size()); + + RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, + HttpContext.SerializedUserToken, ctx.ActorSystem()); + RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()] + (const NThreading::TFuture<TProtoResponse>& future) { + auto& response = future.GetValueSync(); + auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); + Y_VERIFY(response.operation().ready()); + if (response.operation().status() == Ydb::StatusIds::SUCCESS) { + TProtoResult rs; + response.operation().result().UnpackTo(&rs); + result->Message = MakeHolder<TProtoResult>(rs); + } + NYql::TIssues issues; + NYql::IssuesFromMessage(response.operation().issues(), issues); + result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), + std::move(issues)); + actorSystem->Send(actorId, result.Release()); + }); + return; + } void SendGrpcRequest(const TActorContext& ctx) { RequestState = StateGrpcRequest; @@ -678,42 +704,29 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") "sending grpc request to '" << HttpContext.DiscoveryEndpoint << "' database: '" << HttpContext.DatabaseName << "' iam token size: " << HttpContext.IamToken.size()); - if (!HttpContext.Driver) { - RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, HttpContext.SerializedUserToken, ctx.ActorSystem()); - RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResponse>& future) { - auto& response = future.GetValueSync(); - auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); - Y_VERIFY(response.operation().ready()); - if (response.operation().status() == Ydb::StatusIds::SUCCESS) { - TProtoResult rs; - response.operation().result().UnpackTo(&rs); - result->Message = MakeHolder<TProtoResult>(rs); - } - NYql::TIssues issues; - NYql::IssuesFromMessage(response.operation().issues(), issues); - result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), std::move(issues)); - actorSystem->Send(actorId, result.Release()); - }); - return; - } + Y_VERIFY(Client); Y_VERIFY(DiscoveryFuture->HasValue()); TProtoResponse response; - LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "sending grpc request " << Request.DebugString()); - - Future = MakeHolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>>(Client->template DoProtoRequest<TProtoRequest, TProtoResponse, TProtoResult, TProtoCall>(std::move(Request), ProtoCall)); - Future->Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResultWrapper<TProtoResult>>& future) { - auto& response = future.GetValueSync(); - auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); - if (response.IsSuccess()) { - result->Message = MakeHolder<TProtoResult>(response.GetResult()); - } - result->Status = MakeHolder<NYdb::TStatus>(response); - actorSystem->Send(actorId, result.Release()); - }); + LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, + "sending grpc request " << Request.DebugString()); + Future = MakeHolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>>( + Client->template DoProtoRequest<TProtoRequest, TProtoResponse, TProtoResult, + TProtoCall>(std::move(Request), ProtoCall)); + Future->Subscribe( + [actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()] + (const NThreading::TFuture<TProtoResultWrapper<TProtoResult>>& future) { + auto& response = future.GetValueSync(); + auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); + if (response.IsSuccess()) { + result->Message = MakeHolder<TProtoResult>(response.GetResult()); + } + result->Status = MakeHolder<NYdb::TStatus>(response); + actorSystem->Send(actorId, result.Release()); + }); } void HandleUnexpectedEvent(const TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) { @@ -728,7 +741,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") if (HttpContext.Driver) { SendYdbDriverRequest(ctx); } else { - SendGrpcRequest(ctx); + SendGrpcRequestNoDriver(ctx); } } @@ -758,7 +771,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") TBase::Die(ctx); } - void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev, const TActorContext& ctx) { + void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev, + const TActorContext& ctx) { if (ev->Get()->DatabaseInfo) { auto& db = ev->Get()->DatabaseInfo; HttpContext.FolderId = db->FolderId; @@ -775,8 +789,10 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") } FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx); - ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels(Method, HttpContext, "api.http.requests_per_second") - }); + ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels(Method, HttpContext, "api.http.requests_per_second") + }); CreateClient(ctx); return; } @@ -786,7 +802,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") void ReportLatencyCounters(const TActorContext& ctx) { TDuration dur = ctx.Now() - StartTime; - ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, + ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds") }); } @@ -797,7 +814,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") // return http response; if (ev->Get()->Status->IsSuccess()) { ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.ResponseBody); - FillOutputCustomMetrics<TProtoResult>(*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); + FillOutputCustomMetrics<TProtoResult>( + *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); ctx.Send(MakeMetricsServiceID(), @@ -815,7 +833,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") case ERetryErrorClass::LongRetry: RetryCounter.Click(); if (RetryCounter.HasAttemps()) { - return SendGrpcRequest(ctx); + return HttpContext.Driver ? SendGrpcRequest(ctx) : SendGrpcRequestNoDriver(ctx); } case ERetryErrorClass::NoRetry: { TString errorText; @@ -856,11 +874,14 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") "database '" << HttpContext.DatabaseName << "' " << "stream '" << ExtractStreamName<TProtoRequest>(Request) << "'"); - if (HttpContext.IamToken.empty() || !HttpContext.Driver) { //use Signature or no sdk mode - then need to auth anyway - if (HttpContext.IamToken.empty() && !Signature) { //Test mode - no driver and no creds - SendGrpcRequest(ctx); + // Use Signature or no sdk mode - then need to auth anyway + if (HttpContext.IamToken.empty() || !HttpContext.Driver) { + // Test mode - no driver and no creds + if (HttpContext.IamToken.empty() && !Signature) { + SendGrpcRequestNoDriver(ctx); } else { - AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature))); + AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor( + ctx.SelfID, HttpContext, std::move(Signature))); } } else { SendYdbDriverRequest(ctx); diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 728a8c7028..9b26dc3080 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -469,7 +469,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , Tablet(tablet) , BlobCache(blobCache) , InitState(WaitDiskStatus) - , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8 << 20) + , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB) , NewHeadKey{TKey{}, 0, TInstant::Zero(), 0} , BodySize(0) , MaxWriteResponsesSize(0) @@ -843,9 +843,9 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { WriteBufferIsFullCounter.SetCounter( NPersQueue::GetCountersForStream(counters), - {{"database", DbId}, - {"cloud", CloudId}, + {{"cloud", CloudId}, {"folder", FolderId}, + {"database", DbId}, {"stream", TopicConverter->GetFederationPath()}, {"host", DCId}, {"shard", ToString<ui32>(Partition)}}, @@ -2963,6 +2963,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "", user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "", false); + ctx.Send(ctx.SelfID, event.Release()); Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1); } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 3325f6147f..214404ddaa 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -27,7 +27,7 @@ namespace NDeprecatedUserData { static const ui32 MAX_USER_TS_CACHE_SIZE = 10'000; static const ui64 MIN_TIMESTAMP_MS = 1'000'000'000'000ll; // around 2002 year -static const TString CLIENTID_TO_READ_INTERNALLY = "$without_consumer"; +static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer"; typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters; @@ -200,38 +200,12 @@ struct TUserInfo { NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs; std::shared_ptr<TPercentileCounter> ReadTimeLag; - bool DoExternalRead = false; + bool DoInternalRead = false; bool WriteInProgress = false; bool Parsed = false; - TUserInfo(THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user, - const ui64 readRuleGeneration, bool important, const NPersQueue::TTopicConverterPtr& topicConverter, - const ui32 partition, bool doExternalRead, - ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000) - : ReadSpeedLimiter(std::move(readSpeedLimiter)) - , Important(important) - , LabeledCounters( - topicConverter->IsFirstClass() ? nullptr : new TUserLabeledCounters( - user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(), partition - ) - ) - , User(user) - , ReadRuleGeneration(readRuleGeneration) - , TopicConverter(topicConverter) - , ReadQuota(burst, speed, TAppData::TimeProvider->Now()) - , Counter(nullptr) - , BytesRead() - , MsgsRead() - , ActiveReads(0) - , Subscriptions(0) - , EndOffset(0) - , WriteLagMs(TDuration::Minutes(1), 100) - , DoExternalRead(doExternalRead) - { - } - void ForgetSubscription(const TInstant& now) { if (Subscriptions > 0) --Subscriptions; @@ -333,10 +307,13 @@ struct TUserInfo { , AvgReadBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}} , WriteLagMs(TDuration::Minutes(1), 100) + , DoInternalRead(user != CLIENTID_WITHOUT_CONSUMER) { if (AppData(ctx)->Counters) { if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { - SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId); + if (DoInternalRead) { + SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId); + } } else { SetupTopicCounters(ctx, dcId, ToString<ui32>(partition)); } @@ -348,38 +325,27 @@ struct TUserInfo { const TString& cloudId, const TString& dbId, const TString& folderId ) { auto subgroup = NPersQueue::GetCountersForStream(AppData(ctx)->Counters); - auto additionalLabels = [&](const TVector<std::pair<TString, TString>>& subgroups = {}) { - TVector<std::pair<TString, TString>> result; - std::copy_if(subgroups.begin(), subgroups.end(), std::back_inserter(result), - [] (const auto& sb) { - return sb.first != "consumer" || - sb.second != CLIENTID_TO_READ_INTERNALLY; - }); - return result; - }; auto aggregates = NPersQueue::GetLabelsForStream(TopicConverter, cloudId, dbId, folderId); BytesRead = TMultiCounter(subgroup, - aggregates, additionalLabels({{"consumer", User}}), + aggregates, {{"consumer", User}}, {"stream.internal_read.bytes_per_second", "stream.outgoing_bytes_per_second"}, true, "name"); MsgsRead = TMultiCounter(subgroup, - aggregates, additionalLabels({{"consumer", User}}), + aggregates, {{"consumer", User}}, {"stream.internal_read.records_per_second", "stream.outgoing_records_per_second"}, true, "name"); Counter.SetCounter(subgroup, - additionalLabels({{"database", dbId}, {"cloud", cloudId}, {"folder", folderId}, - {"stream", TopicConverter->GetFederationPath()}, - {"consumer", User}, {"host", dcId}, - {"shard", partition}}), + {{"cloud", cloudId}, {"folder", folderId}, {"database", dbId}, + {"stream", TopicConverter->GetFederationPath()}, + {"consumer", User}, {"host", dcId}, {"shard", partition}}, {"name", "stream.await_operating_milliseconds", true}); ReadTimeLag.reset(new TPercentileCounter( NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates, - additionalLabels({{"consumer", User}, - {"name", "stream.internal_read.time_lags_milliseconds"}}), "bin", + {{"consumer", User}, {"name", "stream.internal_read.time_lags_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 850e526c85..830b89e2d6 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1172,7 +1172,7 @@ Y_UNIT_TEST_SUITE(Cdc) { request.MutablePartitionRequest()->SetPartition(partitionId); auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead(); - cmd.SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY); + cmd.SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); cmd.SetCount(10000); cmd.SetOffset(0); cmd.SetReadTimestampMs(0); diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h index e2f24235e1..2a095df62e 100644 --- a/ydb/library/persqueue/tests/counters.h +++ b/ydb/library/persqueue/tests/counters.h @@ -97,9 +97,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, TStringBuilder queryBuilder; queryBuilder << "/counters/counters=" << counters << - "/database=" << databaseId << "/cloud=" << cloudId << "/folder=" << folderId << + "/database=" << databaseId << "/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end()); if (consumer) { diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp index 50df9c60e3..2d80eef499 100644 --- a/ydb/library/persqueue/topic_parser/counters.cpp +++ b/ydb/library/persqueue/topic_parser/counters.cpp @@ -40,9 +40,9 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic) TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, const TString& folderId) { TVector<TPQLabelsInfo> res = { - {{{"database", dbId}}, {dbId}}, {{{"cloud", cloudId}}, {cloudId}}, {{{"folder", folderId}}, {folderId}}, + {{{"database", dbId}}, {dbId}}, {{{"stream", topic->GetClientsideName()}}, {topic->GetClientsideName()}}}; return res; } diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 8e3f7f547e..fba19c7d00 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1284,7 +1284,7 @@ namespace NKikimr::NDataStreams::V1 { ActorIdToProto(PipeClient, request.MutablePartitionRequest()->MutablePipeClient()); auto cmdRead = request.MutablePartitionRequest()->MutableCmdRead(); - cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY); + cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER); cmdRead->SetCount(Limit); cmdRead->SetOffset(ShardIterator.GetSequenceNumber()); cmdRead->SetReadTimestampMs(ShardIterator.GetReadTimestamp()); diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h index 426bdd213b..c2966ba744 100644 --- a/ydb/services/ydb/ydb_common_ut.h +++ b/ydb/services/ydb/ydb_common_ut.h @@ -48,7 +48,8 @@ public: TAutoPtr<TLogBackend> logBackend = {}, bool enableYq = false, TAppPrepare::TFnReg udfFrFactory = nullptr, - std::function<void(TServerSettings& settings)> builder = nullptr) + std::function<void(TServerSettings& settings)> builder = nullptr, + ui16 dynamicNodeCount = 2, ui16 nodeCount = 0) { ui16 port = PortManager.GetPort(2134); ui16 grpc = PortManager.GetPort(2135); @@ -56,7 +57,9 @@ public: ServerSettings->SetGrpcPort(grpc); ServerSettings->SetLogBackend(logBackend); ServerSettings->SetDomainName("Root"); - ServerSettings->SetDynamicNodeCount(2); + ServerSettings->SetDynamicNodeCount(dynamicNodeCount); + if (nodeCount > 0) + ServerSettings->SetNodeCount(nodeCount); if (TestSettings::PrecreatePools) { ServerSettings->AddStoragePool("ssd"); ServerSettings->AddStoragePool("hdd"); |