aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@yandex-team.ru>2022-03-01 18:07:27 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-03-01 18:07:27 +0300
commit384afaad2b4d2381d7bc1f003c6d7626d55f5a03 (patch)
tree8bdf0020df7c0c01caa1b40d386269cd89c07fbe
parente7de3116bb9e9e10fcc25dbad25e0a595e1324b4 (diff)
downloadydb-384afaad2b4d2381d7bc1f003c6d7626d55f5a03.tar.gz
[merge to 22-2] Add tests on pqv0/1 counters22.2.5
LOGBROKER-7220 Add pq v1 counters test REVIEW: 2266837 REVIEW: 2354266 x-ydb-stable-ref: 3c295a4e79d3ac0dd216667ae906252e7b53e9c3
-rw-r--r--ydb/core/base/counters.cpp3
-rw-r--r--ydb/core/persqueue/partition.cpp102
-rw-r--r--ydb/core/persqueue/percentile_counter.cpp6
-rw-r--r--ydb/core/persqueue/percentile_counter.h3
-rw-r--r--ydb/core/persqueue/pq_impl.cpp1
-rw-r--r--ydb/core/persqueue/user_info.cpp9
-rw-r--r--ydb/core/persqueue/user_info.h58
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp19
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp21
-rw-r--r--ydb/library/persqueue/tests/counters.h197
-rw-r--r--ydb/public/api/protos/draft/datastreams.proto2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h4
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp136
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp18
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp200
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp218
17 files changed, 709 insertions, 290 deletions
diff --git a/ydb/core/base/counters.cpp b/ydb/core/base/counters.cpp
index b192fa36a1..298ec7e192 100644
--- a/ydb/core/base/counters.cpp
+++ b/ydb/core/base/counters.cpp
@@ -39,11 +39,12 @@ static const THashSet<TString> DATABASE_SERVICES
TString("pqproxy|readSession"),
TString("pqproxy|schemecache"),
TString("pqproxy|mirrorWriteTimeLag"),
+ TString("datastreams"),
}};
static const THashSet<TString> DATABASE_ATTRIBUTE_SERVICES
- = {{ TString("ydb") }};
+ = {{ TString("ydb"), TString("datastreams") }};
static const THashSet<TString> DATABASE_ATTRIBUTE_LABELS
= {{ TString("cloud_id"),
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 1e775741d4..4da1293a83 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -450,6 +450,17 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
}
+namespace {
+ TString getStreamName(const TActorContext &ctx, const TString& topicPath, const TString& dbPath) {
+ const bool itsFirstClassCitizen = true;
+ auto converterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
+ itsFirstClassCitizen, AppData(ctx)->PQConfig.GetRoot()
+ );
+ auto converter = converterFactory->MakeTopicNameConverter(topicPath, "", dbPath);
+ return converter->GetClientsideName();
+ }
+}
+
TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const TString& topicName, const TString& topicPath, const bool localDC, TString dcId,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
@@ -475,7 +486,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, CloudId(config.GetYcCloudId())
, DbId(config.GetYdbDatabaseId())
, FolderId(config.GetYcFolderId())
- , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId)
+ , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId,
+ getStreamName(ctx, TopicPath, Config.GetYdbDatabasePath()))
, ReadingTimestamp(false)
, SetOffsetCookie(0)
, Cookie(0)
@@ -510,12 +522,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
ManageWriteTimestampEstimate = LocalDC;
}
-
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
CalcTopicWriteQuotaParams();
-
Counters.Populate(counters);
}
@@ -717,7 +727,6 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
void TPartition::Bootstrap(const TActorContext& ctx)
{
-
UsersInfoStorage.Init(Tablet, SelfId());
Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
@@ -770,7 +779,7 @@ void TPartition::Bootstrap(const TActorContext& ctx)
void TPartition::SetupTopicCounters(const TActorContext& ctx) {
auto counters = AppData(ctx)->Counters;
- auto labels = NKikimr::NPQ::GetLabels(TopicName);
+ const auto labels = NKikimr::NPQ::GetLabels(TopicName);
const TString suffix = LocalDC ? "Original" : "Mirrored";
WriteBufferIsFullCounter.SetCounter(
@@ -779,38 +788,33 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
{"Partition", ToString<ui32>(Partition)}},
{"sensor", "BufferFullTime" + suffix, true});
+ auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag");
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- GetServiceCounters(counters, "pqproxy|writeTimeLag"), GetLabels(TopicName),
- {{"sensor", "TimeLags" + suffix}}, "Interval",
- TVector<std::pair<ui64, TString>>{
- {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"},
- {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"},
- {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true));
+ subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval",
+ TVector<std::pair<ui64, TString>>{
+ {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"},
+ {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"},
+ {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true));
+ subGroup = GetServiceCounters(counters, "pqproxy|writeInfo");
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- GetServiceCounters(counters, "pqproxy|writeInfo"), GetLabels(TopicName),
- {{"sensor", "MessageSize" + suffix}}, "Size",
- TVector<std::pair<ui64, TString>>{
- {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"},
- {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"},
- {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"},
- {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true));
-
- BytesWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
- GetLabels(TopicName), {}, {"BytesWritten" + suffix}, true);
- BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
- GetLabels(TopicName), {}, {"UncompressedBytesWritten" + suffix}, true);
-
- BytesWrittenComp = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
- GetLabels(TopicName), {}, {"CompactedBytesWritten" + suffix}, true);
-
- MsgsWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
- GetLabels(TopicName), {}, {"MessagesWritten" + suffix}, true);
+ subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size",
+ TVector<std::pair<ui64, TString>>{
+ {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"},
+ {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"},
+ {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"},
+ {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true));
+
+ subGroup = GetServiceCounters(counters, "pqproxy|writeSession");
+ BytesWritten = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true);
+ BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
+ BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
+ MsgsWritten = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}};
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
- auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
+ subGroup = GetServiceCounters(counters, "pqproxy|SLI");
WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
{100, 200, 500, 1000, 1500, 2000,
5000, 10'000, 30'000, 99'999'999});
@@ -819,7 +823,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), GetLabels(TopicName),
+ GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels,
{{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval",
TVector<std::pair<ui64, TString>>{
{0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
@@ -830,7 +834,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"),
- GetLabels(TopicName), {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval",
+ labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval",
TVector<std::pair<ui64, TString>>{
{0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
{20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
@@ -839,18 +843,22 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
}
void TPartition::SetupStreamCounters(const TActorContext& ctx) {
+ const auto topicName = getStreamName(ctx, TopicPath, Config.GetYdbDatabasePath());
+ const auto labels = NKikimr::NPQ::GetLabelsForStream(topicName, CloudId, DbId, FolderId);
auto counters = AppData(ctx)->Counters;
- auto labels = NKikimr::NPQ::GetLabelsForStream(TopicName, CloudId, DbId, FolderId);
WriteBufferIsFullCounter.SetCounter(
- GetCountersForStream(counters, "writingTime"),
- {{"host", DCId},
- {"partition", ToString<ui32>(Partition)},
- {"stream", TopicName}},
+ GetCountersForStream(counters),
+ {{"database", DbId},
+ {"cloud", CloudId},
+ {"folder", FolderId},
+ {"stream", topicName},
+ {"host", DCId},
+ {"shard", ToString<ui32>(Partition)}},
{"name", "stream.internal_write.buffer_brimmed_duration_ms", true});
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeTimeLag"), labels,
+ NKikimr::NPQ::GetCountersForStream(counters), labels,
{{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{100, "100"}, {200, "200"}, {500, "500"},
@@ -859,7 +867,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{180'000,"180000"}, {9'999'999, "999999"}}, true));
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeInfo"), labels,
+ NKikimr::NPQ::GetCountersForStream(counters), labels,
{{"name", "stream.internal_write.record_size_bytes"}}, "bin",
TVector<std::pair<ui64, TString>>{
{1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
@@ -869,22 +877,22 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
BytesWritten = NKikimr::NPQ::TMultiCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
+ NKikimr::NPQ::GetCountersForStream(counters), labels, {},
{"stream.internal_write.bytes_per_second",
"stream.incoming_bytes_per_second"} , true, "name");
MsgsWritten = NKikimr::NPQ::TMultiCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
+ NKikimr::NPQ::GetCountersForStream(counters), labels, {},
{"stream.internal_write.records_per_second",
"stream.incoming_records_per_second"}, true, "name");
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
+ NKikimr::NPQ::GetCountersForStream(counters), labels, {},
{"stream.internal_write.uncompressed_bytes_per_second"}, true, "name");
BytesWrittenComp = NKikimr::NPQ::TMultiCounter(
- NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
+ NKikimr::NPQ::GetCountersForStream(counters), labels, {},
{"stream.internal_write.compacted_bytes_per_second"}, true, "name");
- TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}};
+ TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(topicName)}}, {"total"}}};
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
@@ -895,7 +903,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- GetCountersForStream(counters, "topicWriteQuotaWait"), labels,
+ NKikimr::NPQ::GetCountersForStream(counters), labels,
{{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
@@ -906,7 +914,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- GetCountersForStream(counters, "partitionWriteQuotaWait"), labels,
+ NKikimr::NPQ::GetCountersForStream(counters), labels,
{{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
@@ -2851,7 +2859,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "",
user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "",
- userInfo.DoExternalRead);
+ false);
ctx.Send(ctx.SelfID, event.Release());
Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1);
}
diff --git a/ydb/core/persqueue/percentile_counter.cpp b/ydb/core/persqueue/percentile_counter.cpp
index a6414e70bd..b89a5f04b9 100644
--- a/ydb/core/persqueue/percentile_counter.cpp
+++ b/ydb/core/persqueue/percentile_counter.cpp
@@ -28,11 +28,9 @@ NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr coun
->GetSubgroup("Topic", realTopic);
}
-NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters,
- const TString& subsystem)
+NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters)
{
- return counters->GetSubgroup("counters", "pqproxy")
- ->GetSubgroup("subsystem", subsystem);
+ return counters->GetSubgroup("counters", "datastreams");
}
TVector<TLabelsInfo> GetLabels(const TString& topic)
diff --git a/ydb/core/persqueue/percentile_counter.h b/ydb/core/persqueue/percentile_counter.h
index 74ee43d287..409534077c 100644
--- a/ydb/core/persqueue/percentile_counter.h
+++ b/ydb/core/persqueue/percentile_counter.h
@@ -8,8 +8,7 @@ namespace NPQ {
NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr counters,
const TString& subsystem,
const TString& topic);
-NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters,
- const TString& subsystem);
+NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters);
struct TLabelsInfo {
TVector<std::pair<TString,TString>> Labels;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index caba19df02..b7bb33a700 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -704,6 +704,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
}
TopicName = Config.GetTopicName();
+ TopicPath = Config.GetTopicPath();
LocalDC = Config.GetLocalDC();
KeySchema.clear();
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index 26811765e2..bdf1d51a05 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -37,7 +37,8 @@ TUsersInfoStorage::TUsersInfoStorage(
const NKikimrPQ::TPQTabletConfig& config,
const TString& cloudId,
const TString& dbId,
- const TString& folderId
+ const TString& folderId,
+ const TString& streamName
)
: DCId(std::move(dcId))
, TabletId(tabletId)
@@ -47,6 +48,7 @@ TUsersInfoStorage::TUsersInfoStorage(
, CloudId(cloudId)
, DbId(dbId)
, FolderId(folderId)
+ , StreamName(streamName)
{
Counters.Populate(counters);
}
@@ -162,8 +164,9 @@ TUserInfo& TUsersInfoStorage::Create(
auto result = UsersInfo.emplace(
std::piecewise_construct,
std::forward_as_tuple(user),
- std::forward_as_tuple(ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicName, Partition, session,
- gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, FolderId, burst, speed)
+ std::forward_as_tuple(ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important,
+ TopicName, Partition, session, gen, step, offset, readOffsetRewindSum, DCId,
+ readFromTimestamp, CloudId, DbId, FolderId, burst, speed, StreamName)
);
Y_VERIFY(result.second);
return result.first->second;
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 258e947c46..1752ea0b3b 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -228,7 +228,7 @@ struct TUserInfo {
}
void ForgetSubscription(const TInstant& now) {
- if(Subscriptions > 0)
+ if (Subscriptions > 0)
--Subscriptions;
UpdateReadingTimeAndState(now);
}
@@ -290,7 +290,7 @@ struct TUserInfo {
const ui64 readRuleGeneration, const bool important, const TString& topic, const ui32 partition, const TString &session,
ui32 gen, ui32 step, i64 offset, const ui64 readOffsetRewindSum, const TString& dcId,
TInstant readFromTimestamp, const TString& cloudId, const TString& dbId, const TString& folderId,
- ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000
+ ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000, const TString& streamName = "undefined"
)
: ReadSpeedLimiter(std::move(readSpeedLimiter))
, Session(session)
@@ -324,7 +324,7 @@ struct TUserInfo {
{
if (AppData(ctx)->Counters) {
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
- SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), topic, cloudId, dbId, folderId);
+ SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), streamName, cloudId, dbId, folderId);
} else {
if (topic.find("--") == TString::npos)
return;
@@ -336,38 +336,38 @@ struct TUserInfo {
void SetupStreamCounters(const TActorContext& ctx, const TString& dcId, const TString& partition,
const TString& topic, const TString& cloudId,
const TString& dbId, const TString& folderId) {
- auto subgroup = [&](const TString& subsystem) {
- return NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters, subsystem);
- };
- auto additionalLabels = [&](const TVector<std::pair<TString, TString>>& subgroups = {}) {
+ auto additionalLabels = [](const TVector<std::pair<TString, TString>>& subgroups = {}) {
TVector<std::pair<TString, TString>> result;
- if (User != CLIENTID_TO_READ_INTERNALLY)
- result.push_back({"consumer", User});
-
- for (const auto& sb : subgroups) {
- result.push_back(sb);
- }
+ std::copy_if(subgroups.begin(), subgroups.end(), std::back_inserter(result),
+ [] (const auto& sb) {
+ return sb.first != "consumer" ||
+ sb.second != CLIENTID_TO_READ_INTERNALLY;
+ });
return result;
};
const TVector<NPQ::TLabelsInfo> aggregates =
NKikimr::NPQ::GetLabelsForStream(topic, cloudId, dbId, folderId);
- if (DoExternalRead) {
- BytesRead = TMultiCounter(subgroup("readSession"), aggregates, additionalLabels(),
- {"stream.internal_read.bytes_per_second",
- "stream.outcoming_bytes_per_second"}, true);
- MsgsRead = TMultiCounter(subgroup("readSession"), aggregates, additionalLabels(),
- {"stream.internal_read.records_per_second",
- "stream.outcoming_records_per_second"}, true);
- }
-
- Counter.SetCounter(subgroup("readingTime"),
- additionalLabels({{"host", dcId}, {"shard", partition}, {"stream", topic}}),
+ BytesRead = TMultiCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters),
+ aggregates, additionalLabels({{"consumer", User}}),
+ {"stream.internal_read.bytes_per_second",
+ "stream.outgoing_bytes_per_second"}, true, "name");
+ MsgsRead = TMultiCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters),
+ aggregates, additionalLabels({{"consumer", User}}),
+ {"stream.internal_read.records_per_second",
+ "stream.outgoing_records_per_second"}, true, "name");
+
+ Counter.SetCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters),
+ additionalLabels({{"database", dbId}, {"cloud", cloudId}, {"folder", folderId},
+ {"stream", topic}, {"consumer", User}, {"host", dcId},
+ {"shard", partition}}),
{"name", "stream.await_operating_milliseconds", true});
- ReadTimeLag.reset(new TPercentileCounter(subgroup("readTimeLag"), aggregates,
- additionalLabels({{"name", "stream.internal_read.time_lags_milliseconds"}}), "bin",
- TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
+ ReadTimeLag.reset(new TPercentileCounter(
+ NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters), aggregates,
+ additionalLabels({{"consumer", User},
+ {"name", "stream.internal_read.time_lags_milliseconds"}}), "bin",
+ TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
{5000, "5000"}, {10'000, "10000"},
{30'000, "30000"}, {60'000, "60000"},
@@ -509,7 +509,8 @@ class TUsersInfoStorage {
public:
TUsersInfoStorage(TString dcId, ui64 tabletId, const TString& topicName, ui32 partition,
const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config,
- const TString& CloudId, const TString& DbId, const TString& FolderId);
+ const TString& CloudId, const TString& DbId, const TString& FolderId,
+ const TString& streamName);
void Init(TActorId tabletActor, TActorId partitionActor);
@@ -553,6 +554,7 @@ private:
TString CloudId;
TString DbId;
TString FolderId;
+ TString StreamName;
};
} //NPQ
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 049afed7ac..ffdfc48114 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -1374,7 +1374,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Self->RootPathElemets = SplitPath(rootPath->Name);
Y_VERIFY(!rootPath->StepDropped);
- Self->PathsById[rootPath->PathId ] = rootPath;
+ Self->PathsById[rootPath->PathId] = rootPath;
pathesRows.pop_front();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index c7480d3bee..9263fbc929 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -75,6 +75,7 @@ public:
}
TPersQueueGroupInfo::TPtr ParseParams(
+ TOperationContext& context,
NKikimrPQ::TPQTabletConfig* tabletConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& alter,
TString& errStr)
@@ -138,6 +139,22 @@ public:
return nullptr;
}
+ const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId());
+ if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) {
+ auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id");
+ alterConfig.SetYcCloudId(cloudId);
+ }
+ if (dbRootEl->UserAttrs->Attrs.contains("folder_id")) {
+ auto folderId = dbRootEl->UserAttrs->Attrs.at("folder_id");
+ alterConfig.SetYcFolderId(folderId);
+ }
+ if (dbRootEl->UserAttrs->Attrs.contains("database_id")) {
+ auto databaseId = dbRootEl->UserAttrs->Attrs.at("database_id");
+ alterConfig.SetYdbDatabaseId(databaseId);
+ }
+ const TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
+ alterConfig.SetYdbDatabasePath(databasePath);
+
alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema());
Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(&params->TabletConfig);
alterConfig.Swap(tabletConfig);
@@ -436,7 +453,7 @@ public:
newTabletConfig = tabletConfig;
- TPersQueueGroupInfo::TPtr alterData = ParseParams(&newTabletConfig, alter, errStr);
+ TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
if (!alterData) {
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 87c56a7466..22ba356eeb 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -12,7 +12,8 @@ namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-TPersQueueGroupInfo::TPtr CreatePersQueueGroup(const NKikimrSchemeOp::TPersQueueGroupDescription& op,
+TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& op,
TEvSchemeShard::EStatus& status, TString& errStr)
{
TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo;
@@ -146,6 +147,22 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(const NKikimrSchemeOp::TPersQueue
return nullptr;
}
+ const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId());
+ if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) {
+ auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id");
+ tabletConfig.SetYcCloudId(cloudId);
+ }
+ if (dbRootEl->UserAttrs->Attrs.contains("folder_id")) {
+ auto folderId = dbRootEl->UserAttrs->Attrs.at("folder_id");
+ tabletConfig.SetYcFolderId(folderId);
+ }
+ if (dbRootEl->UserAttrs->Attrs.contains("database_id")) {
+ auto databaseId = dbRootEl->UserAttrs->Attrs.at("database_id");
+ tabletConfig.SetYdbDatabaseId(databaseId);
+ }
+ const TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
+ tabletConfig.SetYdbDatabasePath(databasePath);
+
Y_PROTOBUF_SUPPRESS_NODISCARD tabletConfig.SerializeToString(&pqGroupInfo->TabletConfig);
if (op.HasBootstrapConfig()) {
@@ -360,7 +377,7 @@ public:
}
TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup(
- createDEscription, status, errStr);
+ context, createDEscription, status, errStr);
if (!pqGroup.Get()) {
result->SetError(status, errStr);
diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h
index 71e802965d..72bd545e45 100644
--- a/ydb/library/persqueue/tests/counters.h
+++ b/ydb/library/persqueue/tests/counters.h
@@ -1,71 +1,126 @@
-#pragma once
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/http/io/stream.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_value.h>
-
-#include <util/string/builder.h>
-#include <util/string/vector.h>
-#include <util/string/join.h>
-#include <util/network/socket.h>
-
-
-
-namespace NKikimr::NPersQueueTests {
-
-NJson::TJsonValue GetCounters(ui16 port, const TString& counters, const TString& subsystem, const TString& topicPath, const TString& clientDc = "", const TString& originalDc = "") {
- TString dcFilter = "";
- {
- auto prepareDC = [](TString dc) {
- dc.front() = std::toupper(dc.front());
- return dc;
- };
- if (originalDc) {
- dcFilter = TStringBuilder() << "/OriginDC=" << prepareDC(originalDc);
- } else if (clientDc) {
- dcFilter = TStringBuilder() << "/ClientDC=" << prepareDC(clientDc);
- }
- }
- TVector<TString> pathItems = SplitString(topicPath, "/");
- UNIT_ASSERT(pathItems.size() >= 2);
- TString account = pathItems.front();
- TString producer = JoinRange("@", pathItems.begin(), pathItems.end() - 1);
- TString query = TStringBuilder() << "/counters/counters=" << counters
- << "/subsystem=" << subsystem << "/Account=" << account << "/Producer=" << producer
- << "/Topic=" << Join("--", producer, pathItems.back()) << "/TopicPath=" << JoinRange("%2F", pathItems.begin(), pathItems.end())
- << dcFilter << "/json";
-
-
- TNetworkAddress addr("localhost", port);
- TString q2 = TStringBuilder() << "/counters/counters=" << counters
- << "/subsystem=" << subsystem;// << "/Account=" << account << "/Producer=" << producer;
- //q2 += "/json";
-
- /*Cerr << "q2: " << q2 << Endl;
- TSocket s2(addr);
- SendMinimalHttpRequest(s2, "localhost", q2);
- TSocketInput si2(s2);
- THttpInput input2(&si2);
- Cerr << "===Counters response: " << input2.ReadAll() << Endl;*/
-
-
-
- Cerr << "===Request counters with query: " << query << Endl;
- TSocket s(addr);
- SendMinimalHttpRequest(s, "localhost", query);
- TSocketInput si(s);
- THttpInput input(&si);
- TString firstLine = input.FirstLine();
- //Cerr << "Counters2: '" << firstLine << "' content: " << input.ReadAll() << Endl;
-
- unsigned httpCode = ParseHttpRetCode(firstLine);
- UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
- NJson::TJsonValue value;
- UNIT_ASSERT(NJson::ReadJsonTree(&input, &value));
-
- Cerr << "counters: " << value.GetStringRobust() << "\n";
- return value;
-}
-
-} // NKikimr::NPersQueueTests
+#pragma once
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/http/io/stream.h>
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_value.h>
+
+#include <util/string/builder.h>
+#include <util/string/vector.h>
+#include <util/string/join.h>
+#include <util/network/socket.h>
+
+
+namespace NKikimr::NPersQueueTests {
+
+namespace {
+ NJson::TJsonValue SendQuery(ui16 port, const TString& query, bool mayFail = false) {
+ Cerr << "===Request counters with query: " << query << Endl;
+ TNetworkAddress addr("localhost", port);
+ TSocket s(addr);
+ SendMinimalHttpRequest(s, "localhost", query);
+ TSocketInput si(s);
+ THttpInput input(&si);
+ TString firstLine = input.FirstLine();
+
+ const auto httpCode = ParseHttpRetCode(firstLine);
+ if (mayFail && httpCode != 200u) {
+ return {};
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
+ }
+ NJson::TJsonValue value;
+ UNIT_ASSERT(NJson::ReadJsonTree(&input, &value));
+
+ Cerr << "counters: " << value.GetStringRobust() << "\n";
+ return value;
+ }
+}
+
+NJson::TJsonValue GetCountersLegacy(ui16 port, const TString& counters, const TString& subsystem,
+ const TString& topicPath, const TString& clientDc = "",
+ const TString& originalDc = "", const TString& client = "",
+ const TString& consumerPath = "") {
+ TString dcFilter = "";
+ {
+ if (originalDc) {
+ dcFilter = TStringBuilder() << "/OriginDC=" << originalDc;
+ } else if (clientDc) {
+ dcFilter = TStringBuilder() << "/ClientDC=" << clientDc;
+ }
+ }
+ TVector<TString> pathItems = SplitString(topicPath, "/");
+ UNIT_ASSERT(pathItems.size() >= 2);
+ TString account = pathItems.front();
+ TString producer = JoinRange("@", pathItems.begin(), pathItems.end() - 1);
+ TStringBuilder queryBuilder = TStringBuilder() <<
+ "/counters/counters=" << counters <<
+ "/subsystem=" << subsystem <<
+ "/Account=" << account <<
+ "/Producer=" << producer <<
+ "/Topic=" << Join("--", producer, pathItems.back()) <<
+ "/TopicPath=" << JoinRange("%2F", pathItems.begin(), pathItems.end()) <<
+ dcFilter;
+
+ if (consumerPath) {
+ auto consumerPathItems = SplitString(consumerPath, "/");
+ queryBuilder <<
+ "/Client=" << client <<
+ "/ConsumerPath=" << JoinRange("%2F", consumerPathItems.begin(), consumerPathItems.end());
+ }
+ queryBuilder << "/json";
+
+ return SendQuery(port, queryBuilder);
+}
+
+ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, const TString& subsystem,
+ const TString& client, const TString& consumerPath) {
+ TVector<TString> consumerPathItems = SplitString(consumerPath, "/");
+ UNIT_ASSERT(consumerPathItems.size() >= 2);
+ TStringBuilder queryBuilder = TStringBuilder() <<
+ "/counters/counters=" << counters <<
+ "/subsystem=" << subsystem <<
+ "/Client=" << client <<
+ "/ConsumerPath=" << JoinRange("%2F", consumerPathItems.begin(), consumerPathItems.end()) <<
+ "/json";
+
+ return SendQuery(port, queryBuilder);
+}
+
+NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
+ const TString& cloudId, const TString& databaseId,
+ const TString& folderId, const TString& streamName,
+ const TString& consumer, const TString& host,
+ const TString& shard) {
+ bool mayFail = false;
+ TVector<TString> pathItems = SplitString(streamName, "/");
+ TStringBuilder queryBuilder;
+ queryBuilder <<
+ "/counters/counters=" << counters <<
+ "/database=" << databaseId <<
+ "/cloud=" << cloudId <<
+ "/folder=" << folderId <<
+ "/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end());
+
+ if (consumer) {
+ queryBuilder <<
+ "/consumer=" << consumer;
+ }
+
+ if (host) {
+ queryBuilder <<
+ "/host=" << host;
+ }
+
+ if (shard) {
+ queryBuilder <<
+ "/shard=" << shard;
+ mayFail = true;
+ }
+
+ queryBuilder << "/json";
+
+ return SendQuery(port, queryBuilder, mayFail);
+}
+
+} // NKikimr::NPersQueueTests
diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto
index 5c1354288a..4e80c1cbb8 100644
--- a/ydb/public/api/protos/draft/datastreams.proto
+++ b/ydb/public/api/protos/draft/datastreams.proto
@@ -110,7 +110,7 @@ message StreamDescription {
// Represents range of possible sequence numbers for the shard
message SequenceNumberRange {
string starting_sequence_number = 1;
- string ending_sequence_number = 2;
+ string ending_sequence_number = 2 [(FieldTransformer) = TRANSFORM_EMPTY_TO_NOTHING];
}
// Represents shard details
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
index 250ce03620..40559602f6 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
@@ -39,11 +39,11 @@ public:
StartServer();
}
- void StartServer(bool doClientInit = true) {
+ void StartServer(bool doClientInit = true, TMaybe<TString> databaseName = Nothing()) {
PrepareNetDataFile();
CleverServer = MakeHolder<NKikimr::Tests::TServer>(ServerSettings);
CleverServer->EnableGRpc(GrpcServerOptions);
- AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort);
+ AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName);
EnableLogs(LOGGED_SERVICES);
if (doClientInit) {
AnnoyingClient->FullInit();
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 509eb2a231..784b004fd8 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -142,8 +142,7 @@ private:
void Handle(TEvPQProxy::TEvReleasePartition::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPQProxy::TEvLockPartition::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvGetStatus::
- TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPQProxy::TEvDeadlineExceeded::TPtr& ev, const NActors::TActorContext& ctx);
@@ -270,7 +269,6 @@ TReadSessionActor::TReadSessionActor(
, TopicsHandler(topicsHandler)
{
Y_ASSERT(Request);
- ++(*GetServiceCounters(Counters, "pqproxy|readSession")->GetCounter("SessionsCreatedTotal", true));
}
@@ -279,6 +277,11 @@ TReadSessionActor::~TReadSessionActor() = default;
void TReadSessionActor::Bootstrap(const TActorContext& ctx) {
Y_VERIFY(Request);
+ if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ ++(*GetServiceCounters(Counters, "pqproxy|readSession")
+ ->GetNamedCounter("sensor", "SessionsCreatedTotal", true));
+ }
+
Request->GetStreamCtx()->Attach(ctx.SelfID);
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
@@ -464,13 +467,13 @@ void TReadSessionActor::Die(const TActorContext& ctx) {
}
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
- if (SessionsActive) {
- --(*SessionsActive);
- }
if (BytesInflight) {
(*BytesInflight) -= BytesInflight_;
}
- if (SessionsActive) { //PartsPerSession is inited too
+ if (SessionsActive) {
+ --(*SessionsActive);
+ }
+ if (SessionsActive) {
PartsPerSession.DecFor(Partitions.size(), 1);
}
@@ -549,11 +552,13 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorC
if (it == Partitions.end() || it->second.Releasing) {
//do nothing - already released partition
LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for " << ev->Get()->Partition
- << " at offset " << ev->Get()->ReadOffset);
+ << " at offset " << ev->Get()->ReadOffset);
return;
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for " << ev->Get()->Partition
- << " at readOffset " << ev->Get()->ReadOffset << " commitOffset " << ev->Get()->CommitOffset);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for "
+ << ev->Get()->Partition <<
+ " at readOffset " << ev->Get()->ReadOffset <<
+ " commitOffset " << ev->Get()->CommitOffset);
//proxy request to partition - allow initing
//TODO: add here VerifyReadOffset too and check it againts Committed position
@@ -597,10 +602,14 @@ void TReadSessionActor::DropPartition(THashMap<ui64, TPartitionActorInfo>::itera
Y_VERIFY(res);
}
- PartsPerSession.DecFor(Partitions.size(), 1);
+ if (SessionsActive) {
+ PartsPerSession.DecFor(Partitions.size(), 1);
+ }
BalancerGeneration.erase(it->first);
Partitions.erase(it);
- PartsPerSession.IncFor(Partitions.size(), 1);
+ if (SessionsActive) {
+ PartsPerSession.IncFor(Partitions.size(), 1);
+ }
}
@@ -696,7 +705,11 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
}
ClientId = NPersQueue::ConvertNewConsumerName(init.consumer(), ctx);
- ClientPath = init.consumer();
+ if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ ClientPath = init.consumer();
+ } else {
+ ClientPath = NPersQueue::NormalizeFullPath(NPersQueue::MakeConsumerPath(init.consumer()));
+ }
TStringBuilder session;
session << ClientPath << "_" << ctx.SelfID.NodeId() << "_" << Cookie << "_" << TAppData::RandomProvider->GenRand64() << "_v1";
@@ -750,7 +763,10 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
}
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " init: " << event->Request << " from " << PeerName);
- SetupCounters();
+
+ if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ SetupCounters();
+ }
if (Request->GetInternalToken().empty()) {
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
@@ -810,23 +826,31 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
void TReadSessionActor::SetupCounters()
{
- auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession")->GetSubgroup("Client", ClientId)->GetSubgroup("ConsumerPath", ClientPath);
- SessionsCreated = subGroup->GetExpiringCounter("SessionsCreated", true);
- SessionsActive = subGroup->GetExpiringCounter("SessionsActive", false);
- Errors = subGroup->GetExpiringCounter("Errors", true);
- PipeReconnects = subGroup->GetExpiringCounter("PipeReconnects", true);
-
- BytesInflight = subGroup->GetExpiringCounter("BytesInflight", false);
+ if (SessionsCreated) {
+ return;
+ }
- PartsPerSession = NKikimr::NPQ::TPercentileCounter(subGroup->GetSubgroup("sensor", "PartsPerSession"), {}, {}, "Count",
- TVector<std::pair<ui64, TString>>{{1, "1"}, {2, "2"}, {5, "5"},
- {10, "10"}, {20, "20"}, {50, "50"}, {70, "70"},
- {100, "100"}, {150, "150"}, {300,"300"}, {99999999, "99999999"}}, false);
+ auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession");
+ subGroup = subGroup->GetSubgroup("Client", ClientId)->GetSubgroup("ConsumerPath", ClientPath);
+ const TString name = "sensor";
+
+ BytesInflight = subGroup->GetExpiringNamedCounter(name, "BytesInflight", false);
+ Errors = subGroup->GetExpiringNamedCounter(name, "Errors", true);
+ PipeReconnects = subGroup->GetExpiringNamedCounter(name, "PipeReconnects", true);
+ SessionsActive = subGroup->GetExpiringNamedCounter(name, "SessionsActive", false);
+ SessionsCreated = subGroup->GetExpiringNamedCounter(name, "SessionsCreated", true);
+ PartsPerSession = NKikimr::NPQ::TPercentileCounter(
+ subGroup->GetSubgroup(name, "PartsPerSession"),
+ {}, {}, "Count",
+ TVector<std::pair<ui64, TString>>{{1, "1"}, {2, "2"}, {5, "5"},
+ {10, "10"}, {20, "20"}, {50, "50"},
+ {70, "70"}, {100, "100"}, {150, "150"},
+ {300,"300"}, {99999999, "99999999"}},
+ false, true);
++(*SessionsCreated);
++(*SessionsActive);
PartsPerSession.IncFor(Partitions.size(), 1); //for 0
-
}
@@ -856,19 +880,19 @@ void TReadSessionActor::SetupTopicCounters(const TString& topic, const TString&
const TString& dbId, const TString& folderId)
{
auto& topicCounters = TopicCounters[topic];
- auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters, "readSession");
+ auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters);
//client/consumerPath Account/Producer OriginDC Topic/TopicPath
TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(topic, cloudId, dbId, folderId);
- TVector<std::pair<TString, TString>> cons{};
+ TVector<std::pair<TString, TString>> cons = {{"consumer", ClientPath}};
- topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true);
- topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true);
- topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false);
- topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false);
- topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false);
- topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true);
- topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true);
- topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true);
+ topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name");
+ topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name");
+ topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name");
+ topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name");
+ topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name");
+ topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name");
+ topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name");
+ topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name");
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
@@ -931,6 +955,9 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAct
auto& topicHolder = Topics[t.TopicNameConverter->GetClientsideName()];
topicHolder.TabletID = t.TabletID;
topicHolder.TopicNameConverter = t.TopicNameConverter;
+ topicHolder.CloudId = t.CloudId;
+ topicHolder.DbId = t.DbId;
+ topicHolder.FolderId = t.FolderId;
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
}
@@ -1012,11 +1039,15 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC);
TActorId actorId = ctx.Register(partitionActor);
- PartsPerSession.DecFor(Partitions.size(), 1);
+ if (SessionsActive) {
+ PartsPerSession.DecFor(Partitions.size(), 1);
+ }
Y_VERIFY(record.GetGeneration() > 0);
auto pp = Partitions.insert(std::make_pair(assignId, TPartitionActorInfo{actorId, partitionId, ctx}));
Y_VERIFY(pp.second);
- PartsPerSession.IncFor(Partitions.size(), 1);
+ if (SessionsActive) {
+ PartsPerSession.IncFor(Partitions.size(), 1);
+ }
bool res = ActualPartitionActors.insert(actorId).second;
Y_VERIFY(res);
@@ -1234,7 +1265,7 @@ void TReadSessionActor::CloseSession(const TString& errorReason, const PersQueue
}
if (Errors) {
++(*Errors);
- } else {
+ } else if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
++(*GetServiceCounters(Counters, "pqproxy|readSession")->GetCounter("Errors", true));
}
@@ -1349,8 +1380,12 @@ bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorConte
clientConfig.RetryPolicy = RetryPolicyForPipes;
t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
if (InitDone) {
- ++(*PipeReconnects);
- ++(*Errors);
+ if (PipeReconnects) {
+ ++(*PipeReconnects);
+ }
+ if (Errors) {
+ ++(*Errors);
+ }
RegisterSession(t.second.PipeClient, t.first, t.second.Groups, ctx);
}
@@ -1447,7 +1482,9 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadResponse::TPtr& ev, const TAct
--formedResponse->RequestsInfly;
BytesInflight_ += diff;
- (*BytesInflight) += diff;
+ if (BytesInflight) {
+ (*BytesInflight) += diff;
+ }
if (formedResponse->RequestsInfly == 0) {
ProcessAnswer(ctx, formedResponse);
@@ -1458,7 +1495,9 @@ bool TReadSessionActor::WriteResponse(PersQueue::V1::MigrationStreamingReadServe
ui64 sz = response.ByteSize();
ActiveWrites.push(sz);
BytesInflight_ += sz;
- if (BytesInflight) (*BytesInflight) += sz;
+ if (BytesInflight) {
+ (*BytesInflight) += sz;
+ }
return finish ? Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK) : Request->GetStreamCtx()->Write(std::move(response));
}
@@ -1490,7 +1529,9 @@ void TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo
}
BytesInflight_ -= diff;
- (*BytesInflight) -= diff;
+ if (BytesInflight) {
+ (*BytesInflight) -= diff;
+ }
for (auto& pp : formedResponse->PartitionsTookPartInControlMessages) {
auto it = PartitionToControlMessages.find(pp);
@@ -1613,7 +1654,9 @@ void TReadSessionActor::ProcessReads(const TActorContext& ctx) {
i64 diff = formedResponse->Response.ByteSize();
BytesInflight_ += diff;
formedResponse->ByteSize = diff;
- (*BytesInflight) += diff;
+ if (BytesInflight) {
+ (*BytesInflight) += diff;
+ }
Reads.pop_front();
}
}
@@ -2629,6 +2672,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse()
auto& pqDescr = entry.PQGroupInfo->Description;
topicsIter->second.TabletID = pqDescr.GetBalancerTabletID();
+ topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId();
+ topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId();
+ topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId();
return CheckTopicACL(entry, topicsIter->first, ctx);
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 98334996a3..4964c33828 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -124,7 +124,6 @@ TWriteSessionActor::TWriteSessionActor(
, SourceIdUpdatesInflight(0)
{
Y_ASSERT(Request);
- ++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true));
}
@@ -344,7 +343,7 @@ void TWriteSessionActor::SetupCounters()
{
//now topic is checked, can create group for real topic, not garbage
auto subGroup = GetServiceCounters(Counters, "pqproxy|writeSession");
- TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(LocalDC, TopicConverter->GetClientsideName());
+ TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(TopicConverter->GetClientsideName());
BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"BytesInflight"}, false);
BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"BytesInflightTotal"}, false);
@@ -356,18 +355,17 @@ void TWriteSessionActor::SetupCounters()
SessionsActive.Inc();
}
-void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId,
- const TString& folderId)
+void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId)
{
//now topic is checked, can create group for real topic, not garbage
- auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters, "writeSession");
+ auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters);
TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(TopicConverter->GetClientsideName(), cloudId, dbId, folderId);
- BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false);
- BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false);
- SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true);
- SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false);
- Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true);
+ BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false, "name");
+ BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false, "name");
+ SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true, "name");
+ SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name");
+ Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name");
SessionsCreated.Inc();
SessionsActive.Inc();
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 8e209dbd9d..257648f9c2 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -10,6 +10,7 @@
#include <ydb/library/aclib/aclib.h>
#include <ydb/library/persqueue/obfuscate/obfuscate.h>
+#include <ydb/library/persqueue/tests/counters.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -42,30 +43,6 @@ namespace NKikimr::NPersQueueTests {
using namespace NYdb::NPersQueue;
using namespace NPersQueue;
- NJson::TJsonValue GetCountersNewSchemeCache(ui16 port, const TString& counters, const TString& subsystem, const TString& topicPath) {
- TString escapedPath = "%2F" + JoinStrings(SplitString(topicPath, "/"), "%2F");
- TString query = TStringBuilder() << "/counters/counters=" << counters
- << "/subsystem=" << subsystem
- << "/Topic=" << escapedPath << "/json";
-
- Cerr << "Will execute query " << query << Endl;
- TNetworkAddress addr("localhost", port);
- TSocket s(addr);
-
- SendMinimalHttpRequest(s, "localhost", query);
- TSocketInput si(s);
- THttpInput input(&si);
- Cerr << input.ReadAll() << Endl;
- unsigned httpCode = ParseHttpRetCode(input.FirstLine());
- UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
-
- NJson::TJsonValue value;
- UNIT_ASSERT(NJson::ReadJsonTree(&input, &value));
-
- Cerr << "counters: " << value.GetStringRobust() << "\n";
- return value;
- }
-
Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest) {
void PrepareForGrpcNoDC(TFlatMsgBusPQClient& annoyingClient) {
@@ -122,7 +99,8 @@ namespace NKikimr::NPersQueueTests {
auto persQueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
{
- auto res = persQueueClient->AddReadRule("/Root/account2/topic2", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1")));
+ auto res = persQueueClient->AddReadRule("/Root/account2/topic2",
+ TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1")));
res.Wait();
UNIT_ASSERT(res.GetValue().IsSuccess());
}
@@ -161,7 +139,171 @@ namespace NKikimr::NPersQueueTests {
testReadFromTopic("account2/topic2");
}
- }
+ Y_UNIT_TEST(TestWriteStat1stClass) {
+ auto testWriteStat1stClass = [](const TString& consumerName) {
+ TTestServer server(false);
+ server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ server.StartServer();
+ server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE});
+
+ const TString topicName{"account2/topic2"};
+ const TString fullTopicName{"/Root/account2/topic2"};
+ const TString folderId{"somefolder"};
+ const TString cloudId{"somecloud"};
+ const TString databaseId{"root"};
+ UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
+ server.AnnoyingClient->AlterUserAttributes("/", "Root",
+ {{"folder_id", folderId},
+ {"cloud_id", cloudId},
+ {"database_id", databaseId}}));
+
+ server.AnnoyingClient->SetNoConfigMode();
+ server.AnnoyingClient->FullInit();
+ server.AnnoyingClient->InitUserRegistry();
+ server.AnnoyingClient->MkDir("/Root", "account2");
+ server.AnnoyingClient->CreateTopicNoLegacy(fullTopicName, 5);
+
+ NYdb::TDriverConfig driverCfg;
+
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root");
+
+ const auto monPort = TPortManager().GetPort();
+ auto Counters = server.CleverServer->GetGRpcServerRootCounters();
+ NActors::TMon Monitoring({monPort, "localhost", 3, "root", "localhost", {}, {}, {}});
+ Monitoring.RegisterCountersPage("counters", "Counters", Counters);
+ Monitoring.Start();
+
+ auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg);
+ auto persQueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
+
+ {
+ auto res = persQueueClient->AddReadRule(fullTopicName,
+ TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName(consumerName)));
+ res.Wait();
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+ }
+
+ WaitACLModification();
+
+ auto checkCounters =
+ [cloudId, folderId, databaseId](auto monPort,
+ const std::set<std::string>& canonicalSensorNames,
+ const TString& stream, const TString& consumer,
+ const TString& host, const TString& shard) {
+ auto counters = GetCounters1stClass(monPort, "datastreams", cloudId, databaseId,
+ folderId, stream, consumer, host, shard);
+ const auto sensors = counters["sensors"].GetArray();
+ std::set<std::string> sensorNames;
+ std::transform(sensors.begin(), sensors.end(),
+ std::inserter(sensorNames, sensorNames.begin()),
+ [](auto& el) {
+ return el["labels"]["name"].GetString();
+ });
+ auto equal = sensorNames == canonicalSensorNames;
+ UNIT_ASSERT(equal);
+ };
+
+ {
+ auto writer = CreateSimpleWriter(*ydbDriver, fullTopicName, "123", 1);
+ for (int i = 0; i < 4; ++i) {
+ bool res = writer->Write(TString(10, 'a'));
+ UNIT_ASSERT(res);
+ }
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName(consumerName).AppendTopics(topicName);
+ auto reader = CreateReader(*ydbDriver, settings);
+
+ auto msg = GetNextMessageSkipAssignment(reader);
+ UNIT_ASSERT(msg);
+
+ checkCounters(monPort,
+ {
+ "stream.internal_read.commits_per_second",
+ "stream.internal_read.partitions_errors_per_second",
+ "stream.internal_read.partitions_locked",
+ "stream.internal_read.partitions_locked_per_second",
+ "stream.internal_read.partitions_released_per_second",
+ "stream.internal_read.partitions_to_be_locked",
+ "stream.internal_read.partitions_to_be_released",
+ "stream.internal_read.waits_for_data",
+ "stream.internal_write.bytes_proceeding",
+ "stream.internal_write.bytes_proceeding_total",
+ "stream.internal_write.errors_per_second",
+ "stream.internal_write.sessions_active",
+ "stream.internal_write.sessions_created_per_second",
+ },
+ topicName, "", "", ""
+ );
+
+ checkCounters(monPort,
+ {
+ "stream.internal_read.commits_per_second",
+ "stream.internal_read.partitions_errors_per_second",
+ "stream.internal_read.partitions_locked",
+ "stream.internal_read.partitions_locked_per_second",
+ "stream.internal_read.partitions_released_per_second",
+ "stream.internal_read.partitions_to_be_locked",
+ "stream.internal_read.partitions_to_be_released",
+ "stream.internal_read.waits_for_data",
+ },
+ topicName, consumerName, "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ {
+ "stream.internal_read.time_lags_milliseconds",
+ "stream.incoming_bytes_per_second",
+ "stream.incoming_records_per_second",
+ "stream.internal_write.bytes_per_second",
+ "stream.internal_write.compacted_bytes_per_second",
+ "stream.internal_write.partition_write_quota_wait_milliseconds",
+ "stream.internal_write.record_size_bytes",
+ "stream.internal_write.records_per_second",
+ "stream.internal_write.time_lags_milliseconds",
+ "stream.internal_write.uncompressed_bytes_per_second",
+ "stream.await_operating_milliseconds",
+ "stream.internal_write.buffer_brimmed_duration_ms",
+ "stream.internal_read.bytes_per_second",
+ "stream.internal_read.records_per_second",
+ "stream.outgoing_bytes_per_second",
+ "stream.outgoing_records_per_second",
+ },
+ topicName, "", "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ {
+ "stream.internal_read.time_lags_milliseconds",
+ "stream.await_operating_milliseconds",
+ "stream.internal_read.bytes_per_second",
+ "stream.internal_read.records_per_second",
+ "stream.outgoing_bytes_per_second",
+ "stream.outgoing_records_per_second",
+ },
+ topicName, consumerName, "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ {
+ "stream.await_operating_milliseconds"
+ },
+ topicName, consumerName, "1", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ {
+ "stream.internal_write.buffer_brimmed_duration_ms"
+ },
+ topicName, "", "1", ""
+ );
+ }
+ };
+
+ testWriteStat1stClass("user1");
+ testWriteStat1stClass("some@random@consumer");
+ }
+ } // Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest)
Y_UNIT_TEST_SUITE(TPersqueueDataPlaneTestSuite) {
@@ -186,7 +328,7 @@ namespace NKikimr::NPersQueueTests {
auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(TWriteSessionSettings()
.Path(topic).MessageGroupId("my_group_1")
.ClusterDiscoveryMode(EClusterDiscoveryMode::Off)
- .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()));
+ .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()));
Cerr << "InitSeqNO " << writer->GetInitSeqNo() << "\n";
writer->Write("somedata", 1);
writer->Close();
@@ -194,7 +336,7 @@ namespace NKikimr::NPersQueueTests {
{
auto reader = server.PersQueueClient->CreateReadSession(TReadSessionSettings().ConsumerName("non_existing")
.AppendTopics(topic).DisableClusterDiscovery(true)
- .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()));
+ .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()));
auto future = reader->WaitEvent();
@@ -211,7 +353,7 @@ namespace NKikimr::NPersQueueTests {
{
auto reader = server.PersQueueClient->CreateReadSession(TReadSessionSettings().ConsumerName(consumer)
.AppendTopics(topic).DisableClusterDiscovery(true)
- .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()));
+ .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()));
auto future = reader->WaitEvent();
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 42ea25e7c1..64c0957f14 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1316,71 +1316,202 @@ namespace {
Y_UNIT_TEST(TestWriteStat) {
- NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false);
- auto netDataUpdated = server.PrepareNetDataFile(FormNetData());
- UNIT_ASSERT(netDataUpdated);
- server.StartServer();
+ auto testWriteStat = [](const TString& originallyProvidedConsumerName,
+ const TString& consumerName,
+ const TString& consumerPath) {
+ auto checkCounters = [](auto monPort, const TString& session,
+ const std::set<std::string>& canonicalSensorNames,
+ const TString& clientDc, const TString& originDc,
+ const TString& client, const TString& consumerPath) {
+ NJson::TJsonValue counters;
+ if (clientDc.empty() && originDc.empty()) {
+ counters = GetClientCountersLegacy(monPort, "pqproxy", session, client, consumerPath);
+ } else {
+ counters = GetCountersLegacy(monPort, "pqproxy", session, "account/topic1",
+ clientDc, originDc, client, consumerPath);
+ }
+ const auto sensors = counters["sensors"].GetArray();
+ std::set<std::string> sensorNames;
+ std::transform(sensors.begin(), sensors.end(),
+ std::inserter(sensorNames, sensorNames.begin()),
+ [](auto& el) {
+ return el["labels"]["sensor"].GetString();
+ });
+ auto equal = sensorNames == canonicalSensorNames;
+ UNIT_ASSERT(equal);
+ };
- server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER });
- server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR);
+ NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false);
+ auto netDataUpdated = server.PrepareNetDataFile(FormNetData());
+ UNIT_ASSERT(netDataUpdated);
+ server.StartServer();
- auto sender = server.CleverServer->GetRuntime()->AllocateEdgeActor();
+ const auto monPort = TPortManager().GetPort();
+ auto Counters = server.CleverServer->GetGRpcServerRootCounters();
+ NActors::TMon Monitoring({monPort, "localhost", 3, "root", "localhost", {}, {}, {}});
+ Monitoring.RegisterCountersPage("counters", "Counters", Counters);
+ Monitoring.Start();
- GetClassifierUpdate(*server.CleverServer, sender); //wait for initializing
+ server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER });
+ server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR);
- server.AnnoyingClient->CreateTopic("rt3.dc1--account--topic1", 10, 10000, 10000, 2000);
- server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 10, 10000, 10000, 2000);
+ auto sender = server.CleverServer->GetRuntime()->AllocateEdgeActor();
- auto driver = server.AnnoyingClient->GetDriver();
+ GetClassifierUpdate(*server.CleverServer, sender); //wait for initializing
- auto writer = CreateWriter(*driver, "account/topic1", "base64:AAAAaaaa____----12", 0, "raw");
+ server.AnnoyingClient->CreateTopic("rt3.dc1--account--topic1", 10, 10000, 10000, 2000);
- auto msg = writer->GetEvent(true);
- UNIT_ASSERT(msg); // ReadyToAcceptEvent
+ auto driver = server.AnnoyingClient->GetDriver();
- auto ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg);
- UNIT_ASSERT(ev);
+ auto writer = CreateWriter(*driver, "account/topic1", "base64:AAAAaaaa____----12", 0, "raw");
- TInstant st(TInstant::Now());
- for (ui32 i = 1; i <= 5; ++i) {
- writer->Write(std::move(ev->ContinuationToken), TString(2000, 'a'));
- msg = writer->GetEvent(true);
+ auto msg = writer->GetEvent(true);
UNIT_ASSERT(msg); // ReadyToAcceptEvent
- ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg);
+ auto ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg);
UNIT_ASSERT(ev);
- msg = writer->GetEvent(true);
+ TInstant st(TInstant::Now());
+ for (ui32 i = 1; i <= 5; ++i) {
+ writer->Write(std::move(ev->ContinuationToken), TString(2000, 'a'));
+ msg = writer->GetEvent(true);
+ UNIT_ASSERT(msg); // ReadyToAcceptEvent
- Cerr << DebugString(*msg) << "\n";
+ ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg);
+ UNIT_ASSERT(ev);
- auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*msg);
- UNIT_ASSERT(ack);
+ msg = writer->GetEvent(true);
+
+ Cerr << DebugString(*msg) << "\n";
- if (i == 5) {
- UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3));
- // TODO: Describe this assert in comment
- UNIT_ASSERT(!ack->Acks.empty());
- UNIT_ASSERT(ack->Acks.back().Stat);
- UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue >= ack->Acks.back().Stat->PartitionQuotedTime);
- UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue <= ack->Acks.back().Stat->PartitionQuotedTime + TDuration::Seconds(10));
+ auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*msg);
+ UNIT_ASSERT(ack);
+ if (i == 5) {
+ UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3));
+ UNIT_ASSERT(!ack->Acks.empty());
+ UNIT_ASSERT(ack->Acks.back().Stat);
+ }
}
- }
- GetCounters(server.CleverServer->GetRuntime()->GetMonPort(), "pqproxy", "writeSession", "account/topic1", "Vla");
- {
- NYdb::NPersQueue::TReadSessionSettings settings;
- settings.ConsumerName("shared/user").AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"});
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "writeSession",
+ {
+ "BytesWrittenOriginal",
+ "CompactedBytesWrittenOriginal",
+ "MessagesWrittenOriginal",
+ "UncompressedBytesWrittenOriginal"
+ },
+ "", "cluster", "", ""
+ );
+
+ checkCounters(monPort,
+ "writeSession",
+ {
+ "BytesInflight",
+ "BytesInflightTotal",
+ "Errors",
+ "SessionsActive",
+ "SessionsCreated",
+ // "WithoutAuth" - this counter just not present in this test
+ },
+ "", "cluster", "", ""
+ );
- auto reader = CreateReader(*driver, settings);
+ {
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName(originallyProvidedConsumerName)
+ .AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"});
- auto msg = GetNextMessageSkipAssignment(reader);
- UNIT_ASSERT(msg);
+ auto reader = CreateReader(*driver, settings);
- Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n";
- }
- GetCounters(server.CleverServer->GetRuntime()->GetMonPort(), "pqproxy", "readSession", "account/topic1", "Vla");
+ auto msg = GetNextMessageSkipAssignment(reader);
+ UNIT_ASSERT(msg);
+
+ Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ checkCounters(monPort,
+ "readSession",
+ {
+ "Commits",
+ "PartitionsErrors",
+ "PartitionsInfly",
+ "PartitionsLocked",
+ "PartitionsReleased",
+ "PartitionsToBeLocked",
+ "PartitionsToBeReleased",
+ "WaitsForData"
+ },
+ "", "cluster", "", ""
+ );
+
+ checkCounters(monPort,
+ "readSession",
+ {
+ "BytesInflight",
+ "Errors",
+ "PipeReconnects",
+ "SessionsActive",
+ "SessionsCreated",
+ "PartsPerSession"
+ },
+ "", "", consumerName, consumerPath
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "readSession",
+ {
+ "BytesReadFromDC"
+ },
+ "Vla", "", "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "readSession",
+ {
+ "BytesRead",
+ "MessagesRead"
+ },
+ "", "Dc1", "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "readSession",
+ {
+ "BytesRead",
+ "MessagesRead"
+ },
+ "", "cluster", "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "readSession",
+ {
+ "BytesRead",
+ "MessagesRead"
+ },
+ "", "cluster", "", ""
+ );
+
+ checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
+ "readSession",
+ {
+ "BytesRead",
+ "MessagesRead"
+ },
+ "", "Dc1", consumerName, consumerPath
+ );
+ }
+ };
+
+ testWriteStat("some@random@consumer", "some@random@consumer", "some/random/consumer");
+ testWriteStat("some@user", "some@user", "some/user");
+ testWriteStat("shared@user", "shared@user", "shared/user");
+ testWriteStat("shared/user", "user", "shared/user");
+ testWriteStat("user", "user", "shared/user");
+ testWriteStat("some@random/consumer", "some@random@consumer", "some/random/consumer");
+ testWriteStat("/some/user", "some@user", "some/user");
}
+
Y_UNIT_TEST(TestWriteSessionsConflicts) {
NPersQueue::TTestServer server;
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1);
@@ -2561,6 +2692,7 @@ namespace {
Codecs: "zstd"
}
ReadRuleVersions: 567
+ YdbDatabasePath: "/Root"
}
ErrorCode: OK
}