aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@yandex-team.ru>2022-05-26 16:22:26 +0300
committermokhotskii <mokhotskii@yandex-team.ru>2022-05-26 16:22:26 +0300
commit79dae787b59bbf5e3c408af595165012389ffba9 (patch)
treeb88c465112adc489544c6c15e385e222df589df0
parenta7b08e6b8df6e316f6630f8d8ddbf7c99f28bcdf (diff)
downloadydb-79dae787b59bbf5e3c408af595165012389ffba9.tar.gz
KIKIMR-13251 Restrict internal_read counters for no consumer case
Restrict internal_read counter for no consumer case Add http_proxy test with reference counters' json files ref:14c4a707fb2a91f4171d1ac54c7e30f9331656cc
-rw-r--r--ydb/core/http_proxy/http_req.cpp127
-rw-r--r--ydb/core/persqueue/partition.cpp7
-rw-r--r--ydb/core/persqueue/user_info.h58
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp2
-rw-r--r--ydb/library/persqueue/tests/counters.h2
-rw-r--r--ydb/library/persqueue/topic_parser/counters.cpp2
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp2
-rw-r--r--ydb/services/ydb/ydb_common_ut.h7
8 files changed, 99 insertions, 108 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index ea0b2f7aa2..5c5569980f 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -618,12 +618,12 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
STFUNC(StateWork)
{
switch (ev->GetTypeRewrite()) {
- HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse);
- HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle);
HFunc(TEvents::TEvWakeup, HandleTimeout);
- HFunc(TEvServerlessProxy::TEvToken, HandleToken);
- HFunc(TEvServerlessProxy::TEvError, HandleError);
HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady);
+ HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle);
+ HFunc(TEvServerlessProxy::TEvError, HandleError);
+ HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse);
+ HFunc(TEvServerlessProxy::TEvToken, HandleToken);
default:
HandleUnexpectedEvent(ev, ctx);
break;
@@ -654,23 +654,49 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
.AuthToken(HttpContext.IamToken)
.DiscoveryMode(NYdb::EDiscoveryMode::Async);
- if (!HttpContext.DatabaseName.empty()) {
- if (!HttpContext.ServiceConfig.GetTestMode()) {
- clientSettings.Database(HttpContext.DatabaseName);
- }
+ if (!HttpContext.DatabaseName.empty() && !HttpContext.ServiceConfig.GetTestMode()) {
+ clientSettings.Database(HttpContext.DatabaseName);
}
Y_VERIFY(!Client);
Client.Reset(new TDataStreamsClient(*HttpContext.Driver, clientSettings));
DiscoveryFuture = MakeHolder<NThreading::TFuture<void>>(Client->DiscoveryCompleted());
- DiscoveryFuture->Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<void>&) {
- actorSystem->Send(actorId, new TEvServerlessProxy::TEvClientReady());
- });
+ DiscoveryFuture->Subscribe(
+ [actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()] (const NThreading::TFuture<void>&) {
+ actorSystem->Send(actorId, new TEvServerlessProxy::TEvClientReady());
+ });
}
void HandleClientReady(TEvServerlessProxy::TEvClientReady::TPtr&, const TActorContext& ctx){
- SendGrpcRequest(ctx);
+ HttpContext.Driver ? SendGrpcRequest(ctx) : SendGrpcRequestNoDriver(ctx);
}
+ void SendGrpcRequestNoDriver(const TActorContext& ctx) {
+ RequestState = StateGrpcRequest;
+ LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
+ "sending grpc request to '" << HttpContext.DiscoveryEndpoint <<
+ "' database: '" << HttpContext.DatabaseName <<
+ "' iam token size: " << HttpContext.IamToken.size());
+
+ RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName,
+ HttpContext.SerializedUserToken, ctx.ActorSystem());
+ RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()]
+ (const NThreading::TFuture<TProtoResponse>& future) {
+ auto& response = future.GetValueSync();
+ auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
+ Y_VERIFY(response.operation().ready());
+ if (response.operation().status() == Ydb::StatusIds::SUCCESS) {
+ TProtoResult rs;
+ response.operation().result().UnpackTo(&rs);
+ result->Message = MakeHolder<TProtoResult>(rs);
+ }
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(response.operation().issues(), issues);
+ result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()),
+ std::move(issues));
+ actorSystem->Send(actorId, result.Release());
+ });
+ return;
+ }
void SendGrpcRequest(const TActorContext& ctx) {
RequestState = StateGrpcRequest;
@@ -678,42 +704,29 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
"sending grpc request to '" << HttpContext.DiscoveryEndpoint <<
"' database: '" << HttpContext.DatabaseName <<
"' iam token size: " << HttpContext.IamToken.size());
- if (!HttpContext.Driver) {
- RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, HttpContext.SerializedUserToken, ctx.ActorSystem());
- RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResponse>& future) {
- auto& response = future.GetValueSync();
- auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
- Y_VERIFY(response.operation().ready());
- if (response.operation().status() == Ydb::StatusIds::SUCCESS) {
- TProtoResult rs;
- response.operation().result().UnpackTo(&rs);
- result->Message = MakeHolder<TProtoResult>(rs);
- }
- NYql::TIssues issues;
- NYql::IssuesFromMessage(response.operation().issues(), issues);
- result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), std::move(issues));
- actorSystem->Send(actorId, result.Release());
- });
- return;
- }
+
Y_VERIFY(Client);
Y_VERIFY(DiscoveryFuture->HasValue());
TProtoResponse response;
- LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "sending grpc request " << Request.DebugString());
-
- Future = MakeHolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>>(Client->template DoProtoRequest<TProtoRequest, TProtoResponse, TProtoResult, TProtoCall>(std::move(Request), ProtoCall));
- Future->Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResultWrapper<TProtoResult>>& future) {
- auto& response = future.GetValueSync();
- auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
- if (response.IsSuccess()) {
- result->Message = MakeHolder<TProtoResult>(response.GetResult());
- }
- result->Status = MakeHolder<NYdb::TStatus>(response);
- actorSystem->Send(actorId, result.Release());
- });
+ LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY,
+ "sending grpc request " << Request.DebugString());
+ Future = MakeHolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>>(
+ Client->template DoProtoRequest<TProtoRequest, TProtoResponse, TProtoResult,
+ TProtoCall>(std::move(Request), ProtoCall));
+ Future->Subscribe(
+ [actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()]
+ (const NThreading::TFuture<TProtoResultWrapper<TProtoResult>>& future) {
+ auto& response = future.GetValueSync();
+ auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
+ if (response.IsSuccess()) {
+ result->Message = MakeHolder<TProtoResult>(response.GetResult());
+ }
+ result->Status = MakeHolder<NYdb::TStatus>(response);
+ actorSystem->Send(actorId, result.Release());
+ });
}
void HandleUnexpectedEvent(const TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) {
@@ -728,7 +741,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
if (HttpContext.Driver) {
SendYdbDriverRequest(ctx);
} else {
- SendGrpcRequest(ctx);
+ SendGrpcRequestNoDriver(ctx);
}
}
@@ -758,7 +771,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
TBase::Die(ctx);
}
- void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev, const TActorContext& ctx) {
+ void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev,
+ const TActorContext& ctx) {
if (ev->Get()->DatabaseInfo) {
auto& db = ev->Get()->DatabaseInfo;
HttpContext.FolderId = db->FolderId;
@@ -775,8 +789,10 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
}
FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx);
- ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels(Method, HttpContext, "api.http.requests_per_second")
- });
+ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels(Method, HttpContext, "api.http.requests_per_second")
+ });
CreateClient(ctx);
return;
}
@@ -786,7 +802,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
void ReportLatencyCounters(const TActorContext& ctx) {
TDuration dur = ctx.Now() - StartTime;
- ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
+ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds")
});
}
@@ -797,7 +814,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
// return http response;
if (ev->Get()->Status->IsSuccess()) {
ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.ResponseBody);
- FillOutputCustomMetrics<TProtoResult>(*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
+ FillOutputCustomMetrics<TProtoResult>(
+ *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
ReportLatencyCounters(ctx);
ctx.Send(MakeMetricsServiceID(),
@@ -815,7 +833,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
case ERetryErrorClass::LongRetry:
RetryCounter.Click();
if (RetryCounter.HasAttemps()) {
- return SendGrpcRequest(ctx);
+ return HttpContext.Driver ? SendGrpcRequest(ctx) : SendGrpcRequestNoDriver(ctx);
}
case ERetryErrorClass::NoRetry: {
TString errorText;
@@ -856,11 +874,14 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
"database '" << HttpContext.DatabaseName << "' " <<
"stream '" << ExtractStreamName<TProtoRequest>(Request) << "'");
- if (HttpContext.IamToken.empty() || !HttpContext.Driver) { //use Signature or no sdk mode - then need to auth anyway
- if (HttpContext.IamToken.empty() && !Signature) { //Test mode - no driver and no creds
- SendGrpcRequest(ctx);
+ // Use Signature or no sdk mode - then need to auth anyway
+ if (HttpContext.IamToken.empty() || !HttpContext.Driver) {
+ // Test mode - no driver and no creds
+ if (HttpContext.IamToken.empty() && !Signature) {
+ SendGrpcRequestNoDriver(ctx);
} else {
- AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature)));
+ AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(
+ ctx.SelfID, HttpContext, std::move(Signature)));
}
} else {
SendYdbDriverRequest(ctx);
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 728a8c7028..9b26dc3080 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -469,7 +469,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, Tablet(tablet)
, BlobCache(blobCache)
, InitState(WaitDiskStatus)
- , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8 << 20)
+ , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB)
, NewHeadKey{TKey{}, 0, TInstant::Zero(), 0}
, BodySize(0)
, MaxWriteResponsesSize(0)
@@ -843,9 +843,9 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
WriteBufferIsFullCounter.SetCounter(
NPersQueue::GetCountersForStream(counters),
- {{"database", DbId},
- {"cloud", CloudId},
+ {{"cloud", CloudId},
{"folder", FolderId},
+ {"database", DbId},
{"stream", TopicConverter->GetFederationPath()},
{"host", DCId},
{"shard", ToString<ui32>(Partition)}},
@@ -2963,6 +2963,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "",
user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "",
false);
+
ctx.Send(ctx.SelfID, event.Release());
Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1);
}
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 3325f6147f..214404ddaa 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -27,7 +27,7 @@ namespace NDeprecatedUserData {
static const ui32 MAX_USER_TS_CACHE_SIZE = 10'000;
static const ui64 MIN_TIMESTAMP_MS = 1'000'000'000'000ll; // around 2002 year
-static const TString CLIENTID_TO_READ_INTERNALLY = "$without_consumer";
+static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer";
typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters;
@@ -200,38 +200,12 @@ struct TUserInfo {
NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs;
std::shared_ptr<TPercentileCounter> ReadTimeLag;
- bool DoExternalRead = false;
+ bool DoInternalRead = false;
bool WriteInProgress = false;
bool Parsed = false;
- TUserInfo(THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user,
- const ui64 readRuleGeneration, bool important, const NPersQueue::TTopicConverterPtr& topicConverter,
- const ui32 partition, bool doExternalRead,
- ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000)
- : ReadSpeedLimiter(std::move(readSpeedLimiter))
- , Important(important)
- , LabeledCounters(
- topicConverter->IsFirstClass() ? nullptr : new TUserLabeledCounters(
- user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(), partition
- )
- )
- , User(user)
- , ReadRuleGeneration(readRuleGeneration)
- , TopicConverter(topicConverter)
- , ReadQuota(burst, speed, TAppData::TimeProvider->Now())
- , Counter(nullptr)
- , BytesRead()
- , MsgsRead()
- , ActiveReads(0)
- , Subscriptions(0)
- , EndOffset(0)
- , WriteLagMs(TDuration::Minutes(1), 100)
- , DoExternalRead(doExternalRead)
- {
- }
-
void ForgetSubscription(const TInstant& now) {
if (Subscriptions > 0)
--Subscriptions;
@@ -333,10 +307,13 @@ struct TUserInfo {
, AvgReadBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000},
{TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
, WriteLagMs(TDuration::Minutes(1), 100)
+ , DoInternalRead(user != CLIENTID_WITHOUT_CONSUMER)
{
if (AppData(ctx)->Counters) {
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
- SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId);
+ if (DoInternalRead) {
+ SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId);
+ }
} else {
SetupTopicCounters(ctx, dcId, ToString<ui32>(partition));
}
@@ -348,38 +325,27 @@ struct TUserInfo {
const TString& cloudId, const TString& dbId, const TString& folderId
) {
auto subgroup = NPersQueue::GetCountersForStream(AppData(ctx)->Counters);
- auto additionalLabels = [&](const TVector<std::pair<TString, TString>>& subgroups = {}) {
- TVector<std::pair<TString, TString>> result;
- std::copy_if(subgroups.begin(), subgroups.end(), std::back_inserter(result),
- [] (const auto& sb) {
- return sb.first != "consumer" ||
- sb.second != CLIENTID_TO_READ_INTERNALLY;
- });
- return result;
- };
auto aggregates =
NPersQueue::GetLabelsForStream(TopicConverter, cloudId, dbId, folderId);
BytesRead = TMultiCounter(subgroup,
- aggregates, additionalLabels({{"consumer", User}}),
+ aggregates, {{"consumer", User}},
{"stream.internal_read.bytes_per_second",
"stream.outgoing_bytes_per_second"}, true, "name");
MsgsRead = TMultiCounter(subgroup,
- aggregates, additionalLabels({{"consumer", User}}),
+ aggregates, {{"consumer", User}},
{"stream.internal_read.records_per_second",
"stream.outgoing_records_per_second"}, true, "name");
Counter.SetCounter(subgroup,
- additionalLabels({{"database", dbId}, {"cloud", cloudId}, {"folder", folderId},
- {"stream", TopicConverter->GetFederationPath()},
- {"consumer", User}, {"host", dcId},
- {"shard", partition}}),
+ {{"cloud", cloudId}, {"folder", folderId}, {"database", dbId},
+ {"stream", TopicConverter->GetFederationPath()},
+ {"consumer", User}, {"host", dcId}, {"shard", partition}},
{"name", "stream.await_operating_milliseconds", true});
ReadTimeLag.reset(new TPercentileCounter(
NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates,
- additionalLabels({{"consumer", User},
- {"name", "stream.internal_read.time_lags_milliseconds"}}), "bin",
+ {{"consumer", User}, {"name", "stream.internal_read.time_lags_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
{5000, "5000"}, {10'000, "10000"},
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 850e526c85..830b89e2d6 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1172,7 +1172,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
request.MutablePartitionRequest()->SetPartition(partitionId);
auto& cmd = *request.MutablePartitionRequest()->MutableCmdRead();
- cmd.SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY);
+ cmd.SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER);
cmd.SetCount(10000);
cmd.SetOffset(0);
cmd.SetReadTimestampMs(0);
diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h
index e2f24235e1..2a095df62e 100644
--- a/ydb/library/persqueue/tests/counters.h
+++ b/ydb/library/persqueue/tests/counters.h
@@ -97,9 +97,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
TStringBuilder queryBuilder;
queryBuilder <<
"/counters/counters=" << counters <<
- "/database=" << databaseId <<
"/cloud=" << cloudId <<
"/folder=" << folderId <<
+ "/database=" << databaseId <<
"/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end());
if (consumer) {
diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp
index 50df9c60e3..2d80eef499 100644
--- a/ydb/library/persqueue/topic_parser/counters.cpp
+++ b/ydb/library/persqueue/topic_parser/counters.cpp
@@ -40,9 +40,9 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic)
TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId,
const TString& dbId, const TString& folderId) {
TVector<TPQLabelsInfo> res = {
- {{{"database", dbId}}, {dbId}},
{{{"cloud", cloudId}}, {cloudId}},
{{{"folder", folderId}}, {folderId}},
+ {{{"database", dbId}}, {dbId}},
{{{"stream", topic->GetClientsideName()}}, {topic->GetClientsideName()}}};
return res;
}
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 8e3f7f547e..fba19c7d00 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -1284,7 +1284,7 @@ namespace NKikimr::NDataStreams::V1 {
ActorIdToProto(PipeClient, request.MutablePartitionRequest()->MutablePipeClient());
auto cmdRead = request.MutablePartitionRequest()->MutableCmdRead();
- cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_TO_READ_INTERNALLY);
+ cmdRead->SetClientId(NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER);
cmdRead->SetCount(Limit);
cmdRead->SetOffset(ShardIterator.GetSequenceNumber());
cmdRead->SetReadTimestampMs(ShardIterator.GetReadTimestamp());
diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h
index 426bdd213b..c2966ba744 100644
--- a/ydb/services/ydb/ydb_common_ut.h
+++ b/ydb/services/ydb/ydb_common_ut.h
@@ -48,7 +48,8 @@ public:
TAutoPtr<TLogBackend> logBackend = {},
bool enableYq = false,
TAppPrepare::TFnReg udfFrFactory = nullptr,
- std::function<void(TServerSettings& settings)> builder = nullptr)
+ std::function<void(TServerSettings& settings)> builder = nullptr,
+ ui16 dynamicNodeCount = 2, ui16 nodeCount = 0)
{
ui16 port = PortManager.GetPort(2134);
ui16 grpc = PortManager.GetPort(2135);
@@ -56,7 +57,9 @@ public:
ServerSettings->SetGrpcPort(grpc);
ServerSettings->SetLogBackend(logBackend);
ServerSettings->SetDomainName("Root");
- ServerSettings->SetDynamicNodeCount(2);
+ ServerSettings->SetDynamicNodeCount(dynamicNodeCount);
+ if (nodeCount > 0)
+ ServerSettings->SetNodeCount(nodeCount);
if (TestSettings::PrecreatePools) {
ServerSettings->AddStoragePool("ssd");
ServerSettings->AddStoragePool("hdd");