diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-03-01 18:07:27 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-03-01 18:07:27 +0300 |
commit | 384afaad2b4d2381d7bc1f003c6d7626d55f5a03 (patch) | |
tree | 8bdf0020df7c0c01caa1b40d386269cd89c07fbe | |
parent | e7de3116bb9e9e10fcc25dbad25e0a595e1324b4 (diff) | |
download | ydb-22.2.5.tar.gz |
[merge to 22-2] Add tests on pqv0/1 counters22.2.5
LOGBROKER-7220 Add pq v1 counters test
REVIEW: 2266837
REVIEW: 2354266
x-ydb-stable-ref: 3c295a4e79d3ac0dd216667ae906252e7b53e9c3
-rw-r--r-- | ydb/core/base/counters.cpp | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 102 | ||||
-rw-r--r-- | ydb/core/persqueue/percentile_counter.cpp | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/percentile_counter.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.cpp | 9 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 58 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp | 21 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.h | 197 | ||||
-rw-r--r-- | ydb/public/api/protos/draft/datastreams.proto | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 136 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 18 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 200 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 218 |
17 files changed, 709 insertions, 290 deletions
diff --git a/ydb/core/base/counters.cpp b/ydb/core/base/counters.cpp index b192fa36a1..298ec7e192 100644 --- a/ydb/core/base/counters.cpp +++ b/ydb/core/base/counters.cpp @@ -39,11 +39,12 @@ static const THashSet<TString> DATABASE_SERVICES TString("pqproxy|readSession"), TString("pqproxy|schemecache"), TString("pqproxy|mirrorWriteTimeLag"), + TString("datastreams"), }}; static const THashSet<TString> DATABASE_ATTRIBUTE_SERVICES - = {{ TString("ydb") }}; + = {{ TString("ydb"), TString("datastreams") }}; static const THashSet<TString> DATABASE_ATTRIBUTE_LABELS = {{ TString("cloud_id"), diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 1e775741d4..4da1293a83 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -450,6 +450,17 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config } } +namespace { + TString getStreamName(const TActorContext &ctx, const TString& topicPath, const TString& dbPath) { + const bool itsFirstClassCitizen = true; + auto converterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( + itsFirstClassCitizen, AppData(ctx)->PQConfig.GetRoot() + ); + auto converter = converterFactory->MakeTopicNameConverter(topicPath, "", dbPath); + return converter->GetClientsideName(); + } +} + TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const TString& topicName, const TString& topicPath, const bool localDC, TString dcId, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, @@ -475,7 +486,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , CloudId(config.GetYcCloudId()) , DbId(config.GetYdbDatabaseId()) , FolderId(config.GetYcFolderId()) - , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId) + , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId, + getStreamName(ctx, TopicPath, Config.GetYdbDatabasePath())) , ReadingTimestamp(false) , SetOffsetCookie(0) , Cookie(0) @@ -510,12 +522,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co ManageWriteTimestampEstimate = LocalDC; } - WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); CalcTopicWriteQuotaParams(); - Counters.Populate(counters); } @@ -717,7 +727,6 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo void TPartition::Bootstrap(const TActorContext& ctx) { - UsersInfoStorage.Init(Tablet, SelfId()); Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); @@ -770,7 +779,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) void TPartition::SetupTopicCounters(const TActorContext& ctx) { auto counters = AppData(ctx)->Counters; - auto labels = NKikimr::NPQ::GetLabels(TopicName); + const auto labels = NKikimr::NPQ::GetLabels(TopicName); const TString suffix = LocalDC ? "Original" : "Mirrored"; WriteBufferIsFullCounter.SetCounter( @@ -779,38 +788,33 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { {"Partition", ToString<ui32>(Partition)}}, {"sensor", "BufferFullTime" + suffix, true}); + auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag"); InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - GetServiceCounters(counters, "pqproxy|writeTimeLag"), GetLabels(TopicName), - {{"sensor", "TimeLags" + suffix}}, "Interval", - TVector<std::pair<ui64, TString>>{ - {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"}, - {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"}, - {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true)); + subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval", + TVector<std::pair<ui64, TString>>{ + {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"}, + {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"}, + {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true)); + subGroup = GetServiceCounters(counters, "pqproxy|writeInfo"); MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - GetServiceCounters(counters, "pqproxy|writeInfo"), GetLabels(TopicName), - {{"sensor", "MessageSize" + suffix}}, "Size", - TVector<std::pair<ui64, TString>>{ - {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"}, - {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"}, - {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"}, - {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true)); - - BytesWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), - GetLabels(TopicName), {}, {"BytesWritten" + suffix}, true); - BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), - GetLabels(TopicName), {}, {"UncompressedBytesWritten" + suffix}, true); - - BytesWrittenComp = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), - GetLabels(TopicName), {}, {"CompactedBytesWritten" + suffix}, true); - - MsgsWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), - GetLabels(TopicName), {}, {"MessagesWritten" + suffix}, true); + subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size", + TVector<std::pair<ui64, TString>>{ + {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"}, + {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"}, + {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"}, + {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true)); + + subGroup = GetServiceCounters(counters, "pqproxy|writeSession"); + BytesWritten = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true); + BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true); + BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true); + MsgsWritten = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true); TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}}; ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); - auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); + subGroup = GetServiceCounters(counters, "pqproxy|SLI"); WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, {100, 200, 500, 1000, 1500, 2000, 5000, 10'000, 30'000, 99'999'999}); @@ -819,7 +823,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), GetLabels(TopicName), + GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels, {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval", TVector<std::pair<ui64, TString>>{ {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, @@ -830,7 +834,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"), - GetLabels(TopicName), {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval", + labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval", TVector<std::pair<ui64, TString>>{ {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, @@ -839,18 +843,22 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { } void TPartition::SetupStreamCounters(const TActorContext& ctx) { + const auto topicName = getStreamName(ctx, TopicPath, Config.GetYdbDatabasePath()); + const auto labels = NKikimr::NPQ::GetLabelsForStream(topicName, CloudId, DbId, FolderId); auto counters = AppData(ctx)->Counters; - auto labels = NKikimr::NPQ::GetLabelsForStream(TopicName, CloudId, DbId, FolderId); WriteBufferIsFullCounter.SetCounter( - GetCountersForStream(counters, "writingTime"), - {{"host", DCId}, - {"partition", ToString<ui32>(Partition)}, - {"stream", TopicName}}, + GetCountersForStream(counters), + {{"database", DbId}, + {"cloud", CloudId}, + {"folder", FolderId}, + {"stream", topicName}, + {"host", DCId}, + {"shard", ToString<ui32>(Partition)}}, {"name", "stream.internal_write.buffer_brimmed_duration_ms", true}); InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeTimeLag"), labels, + NKikimr::NPQ::GetCountersForStream(counters), labels, {{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {100, "100"}, {200, "200"}, {500, "500"}, @@ -859,7 +867,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {180'000,"180000"}, {9'999'999, "999999"}}, true)); MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeInfo"), labels, + NKikimr::NPQ::GetCountersForStream(counters), labels, {{"name", "stream.internal_write.record_size_bytes"}}, "bin", TVector<std::pair<ui64, TString>>{ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, @@ -869,22 +877,22 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); BytesWritten = NKikimr::NPQ::TMultiCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, + NKikimr::NPQ::GetCountersForStream(counters), labels, {}, {"stream.internal_write.bytes_per_second", "stream.incoming_bytes_per_second"} , true, "name"); MsgsWritten = NKikimr::NPQ::TMultiCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, + NKikimr::NPQ::GetCountersForStream(counters), labels, {}, {"stream.internal_write.records_per_second", "stream.incoming_records_per_second"}, true, "name"); BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, + NKikimr::NPQ::GetCountersForStream(counters), labels, {}, {"stream.internal_write.uncompressed_bytes_per_second"}, true, "name"); BytesWrittenComp = NKikimr::NPQ::TMultiCounter( - NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, + NKikimr::NPQ::GetCountersForStream(counters), labels, {}, {"stream.internal_write.compacted_bytes_per_second"}, true, "name"); - TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}}; + TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(topicName)}}, {"total"}}}; ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, @@ -895,7 +903,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - GetCountersForStream(counters, "topicWriteQuotaWait"), labels, + NKikimr::NPQ::GetCountersForStream(counters), labels, {{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, @@ -906,7 +914,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - GetCountersForStream(counters, "partitionWriteQuotaWait"), labels, + NKikimr::NPQ::GetCountersForStream(counters), labels, {{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, @@ -2851,7 +2859,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "", user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "", - userInfo.DoExternalRead); + false); ctx.Send(ctx.SelfID, event.Release()); Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1); } diff --git a/ydb/core/persqueue/percentile_counter.cpp b/ydb/core/persqueue/percentile_counter.cpp index a6414e70bd..b89a5f04b9 100644 --- a/ydb/core/persqueue/percentile_counter.cpp +++ b/ydb/core/persqueue/percentile_counter.cpp @@ -28,11 +28,9 @@ NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr coun ->GetSubgroup("Topic", realTopic); } -NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters, - const TString& subsystem) +NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters) { - return counters->GetSubgroup("counters", "pqproxy") - ->GetSubgroup("subsystem", subsystem); + return counters->GetSubgroup("counters", "datastreams"); } TVector<TLabelsInfo> GetLabels(const TString& topic) diff --git a/ydb/core/persqueue/percentile_counter.h b/ydb/core/persqueue/percentile_counter.h index 74ee43d287..409534077c 100644 --- a/ydb/core/persqueue/percentile_counter.h +++ b/ydb/core/persqueue/percentile_counter.h @@ -8,8 +8,7 @@ namespace NPQ { NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr counters, const TString& subsystem, const TString& topic); -NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters, - const TString& subsystem); +NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters); struct TLabelsInfo { TVector<std::pair<TString,TString>> Labels; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index caba19df02..b7bb33a700 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -704,6 +704,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& } TopicName = Config.GetTopicName(); + TopicPath = Config.GetTopicPath(); LocalDC = Config.GetLocalDC(); KeySchema.clear(); diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 26811765e2..bdf1d51a05 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -37,7 +37,8 @@ TUsersInfoStorage::TUsersInfoStorage( const NKikimrPQ::TPQTabletConfig& config, const TString& cloudId, const TString& dbId, - const TString& folderId + const TString& folderId, + const TString& streamName ) : DCId(std::move(dcId)) , TabletId(tabletId) @@ -47,6 +48,7 @@ TUsersInfoStorage::TUsersInfoStorage( , CloudId(cloudId) , DbId(dbId) , FolderId(folderId) + , StreamName(streamName) { Counters.Populate(counters); } @@ -162,8 +164,9 @@ TUserInfo& TUsersInfoStorage::Create( auto result = UsersInfo.emplace( std::piecewise_construct, std::forward_as_tuple(user), - std::forward_as_tuple(ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicName, Partition, session, - gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, FolderId, burst, speed) + std::forward_as_tuple(ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, + TopicName, Partition, session, gen, step, offset, readOffsetRewindSum, DCId, + readFromTimestamp, CloudId, DbId, FolderId, burst, speed, StreamName) ); Y_VERIFY(result.second); return result.first->second; diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 258e947c46..1752ea0b3b 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -228,7 +228,7 @@ struct TUserInfo { } void ForgetSubscription(const TInstant& now) { - if(Subscriptions > 0) + if (Subscriptions > 0) --Subscriptions; UpdateReadingTimeAndState(now); } @@ -290,7 +290,7 @@ struct TUserInfo { const ui64 readRuleGeneration, const bool important, const TString& topic, const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset, const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp, const TString& cloudId, const TString& dbId, const TString& folderId, - ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000 + ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000, const TString& streamName = "undefined" ) : ReadSpeedLimiter(std::move(readSpeedLimiter)) , Session(session) @@ -324,7 +324,7 @@ struct TUserInfo { { if (AppData(ctx)->Counters) { if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { - SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), topic, cloudId, dbId, folderId); + SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), streamName, cloudId, dbId, folderId); } else { if (topic.find("--") == TString::npos) return; @@ -336,38 +336,38 @@ struct TUserInfo { void SetupStreamCounters(const TActorContext& ctx, const TString& dcId, const TString& partition, const TString& topic, const TString& cloudId, const TString& dbId, const TString& folderId) { - auto subgroup = [&](const TString& subsystem) { - return NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters, subsystem); - }; - auto additionalLabels = [&](const TVector<std::pair<TString, TString>>& subgroups = {}) { + auto additionalLabels = [](const TVector<std::pair<TString, TString>>& subgroups = {}) { TVector<std::pair<TString, TString>> result; - if (User != CLIENTID_TO_READ_INTERNALLY) - result.push_back({"consumer", User}); - - for (const auto& sb : subgroups) { - result.push_back(sb); - } + std::copy_if(subgroups.begin(), subgroups.end(), std::back_inserter(result), + [] (const auto& sb) { + return sb.first != "consumer" || + sb.second != CLIENTID_TO_READ_INTERNALLY; + }); return result; }; const TVector<NPQ::TLabelsInfo> aggregates = NKikimr::NPQ::GetLabelsForStream(topic, cloudId, dbId, folderId); - if (DoExternalRead) { - BytesRead = TMultiCounter(subgroup("readSession"), aggregates, additionalLabels(), - {"stream.internal_read.bytes_per_second", - "stream.outcoming_bytes_per_second"}, true); - MsgsRead = TMultiCounter(subgroup("readSession"), aggregates, additionalLabels(), - {"stream.internal_read.records_per_second", - "stream.outcoming_records_per_second"}, true); - } - - Counter.SetCounter(subgroup("readingTime"), - additionalLabels({{"host", dcId}, {"shard", partition}, {"stream", topic}}), + BytesRead = TMultiCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters), + aggregates, additionalLabels({{"consumer", User}}), + {"stream.internal_read.bytes_per_second", + "stream.outgoing_bytes_per_second"}, true, "name"); + MsgsRead = TMultiCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters), + aggregates, additionalLabels({{"consumer", User}}), + {"stream.internal_read.records_per_second", + "stream.outgoing_records_per_second"}, true, "name"); + + Counter.SetCounter(NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters), + additionalLabels({{"database", dbId}, {"cloud", cloudId}, {"folder", folderId}, + {"stream", topic}, {"consumer", User}, {"host", dcId}, + {"shard", partition}}), {"name", "stream.await_operating_milliseconds", true}); - ReadTimeLag.reset(new TPercentileCounter(subgroup("readTimeLag"), aggregates, - additionalLabels({{"name", "stream.internal_read.time_lags_milliseconds"}}), "bin", - TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, + ReadTimeLag.reset(new TPercentileCounter( + NKikimr::NPQ::GetCountersForStream(AppData(ctx)->Counters), aggregates, + additionalLabels({{"consumer", User}, + {"name", "stream.internal_read.time_lags_milliseconds"}}), "bin", + TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, @@ -509,7 +509,8 @@ class TUsersInfoStorage { public: TUsersInfoStorage(TString dcId, ui64 tabletId, const TString& topicName, ui32 partition, const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config, - const TString& CloudId, const TString& DbId, const TString& FolderId); + const TString& CloudId, const TString& DbId, const TString& FolderId, + const TString& streamName); void Init(TActorId tabletActor, TActorId partitionActor); @@ -553,6 +554,7 @@ private: TString CloudId; TString DbId; TString FolderId; + TString StreamName; }; } //NPQ diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 049afed7ac..ffdfc48114 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -1374,7 +1374,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Self->RootPathElemets = SplitPath(rootPath->Name); Y_VERIFY(!rootPath->StepDropped); - Self->PathsById[rootPath->PathId ] = rootPath; + Self->PathsById[rootPath->PathId] = rootPath; pathesRows.pop_front(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index c7480d3bee..9263fbc929 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -75,6 +75,7 @@ public: } TPersQueueGroupInfo::TPtr ParseParams( + TOperationContext& context, NKikimrPQ::TPQTabletConfig* tabletConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& alter, TString& errStr) @@ -138,6 +139,22 @@ public: return nullptr; } + const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId()); + if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) { + auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id"); + alterConfig.SetYcCloudId(cloudId); + } + if (dbRootEl->UserAttrs->Attrs.contains("folder_id")) { + auto folderId = dbRootEl->UserAttrs->Attrs.at("folder_id"); + alterConfig.SetYcFolderId(folderId); + } + if (dbRootEl->UserAttrs->Attrs.contains("database_id")) { + auto databaseId = dbRootEl->UserAttrs->Attrs.at("database_id"); + alterConfig.SetYdbDatabaseId(databaseId); + } + const TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString(); + alterConfig.SetYdbDatabasePath(databasePath); + alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema()); Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(¶ms->TabletConfig); alterConfig.Swap(tabletConfig); @@ -436,7 +453,7 @@ public: newTabletConfig = tabletConfig; - TPersQueueGroupInfo::TPtr alterData = ParseParams(&newTabletConfig, alter, errStr); + TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); if (!alterData) { result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index 87c56a7466..22ba356eeb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -12,7 +12,8 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -TPersQueueGroupInfo::TPtr CreatePersQueueGroup(const NKikimrSchemeOp::TPersQueueGroupDescription& op, +TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, + const NKikimrSchemeOp::TPersQueueGroupDescription& op, TEvSchemeShard::EStatus& status, TString& errStr) { TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo; @@ -146,6 +147,22 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(const NKikimrSchemeOp::TPersQueue return nullptr; } + const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId()); + if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) { + auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id"); + tabletConfig.SetYcCloudId(cloudId); + } + if (dbRootEl->UserAttrs->Attrs.contains("folder_id")) { + auto folderId = dbRootEl->UserAttrs->Attrs.at("folder_id"); + tabletConfig.SetYcFolderId(folderId); + } + if (dbRootEl->UserAttrs->Attrs.contains("database_id")) { + auto databaseId = dbRootEl->UserAttrs->Attrs.at("database_id"); + tabletConfig.SetYdbDatabaseId(databaseId); + } + const TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString(); + tabletConfig.SetYdbDatabasePath(databasePath); + Y_PROTOBUF_SUPPRESS_NODISCARD tabletConfig.SerializeToString(&pqGroupInfo->TabletConfig); if (op.HasBootstrapConfig()) { @@ -360,7 +377,7 @@ public: } TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup( - createDEscription, status, errStr); + context, createDEscription, status, errStr); if (!pqGroup.Get()) { result->SetError(status, errStr); diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h index 71e802965d..72bd545e45 100644 --- a/ydb/library/persqueue/tests/counters.h +++ b/ydb/library/persqueue/tests/counters.h @@ -1,71 +1,126 @@ -#pragma once
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/http/io/stream.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_value.h>
-
-#include <util/string/builder.h>
-#include <util/string/vector.h>
-#include <util/string/join.h>
-#include <util/network/socket.h>
-
-
-
-namespace NKikimr::NPersQueueTests {
-
-NJson::TJsonValue GetCounters(ui16 port, const TString& counters, const TString& subsystem, const TString& topicPath, const TString& clientDc = "", const TString& originalDc = "") {
- TString dcFilter = "";
- {
- auto prepareDC = [](TString dc) {
- dc.front() = std::toupper(dc.front());
- return dc;
- };
- if (originalDc) {
- dcFilter = TStringBuilder() << "/OriginDC=" << prepareDC(originalDc);
- } else if (clientDc) {
- dcFilter = TStringBuilder() << "/ClientDC=" << prepareDC(clientDc);
- }
- }
- TVector<TString> pathItems = SplitString(topicPath, "/");
- UNIT_ASSERT(pathItems.size() >= 2);
- TString account = pathItems.front();
- TString producer = JoinRange("@", pathItems.begin(), pathItems.end() - 1);
- TString query = TStringBuilder() << "/counters/counters=" << counters
- << "/subsystem=" << subsystem << "/Account=" << account << "/Producer=" << producer
- << "/Topic=" << Join("--", producer, pathItems.back()) << "/TopicPath=" << JoinRange("%2F", pathItems.begin(), pathItems.end())
- << dcFilter << "/json";
-
-
- TNetworkAddress addr("localhost", port);
- TString q2 = TStringBuilder() << "/counters/counters=" << counters
- << "/subsystem=" << subsystem;// << "/Account=" << account << "/Producer=" << producer;
- //q2 += "/json";
-
- /*Cerr << "q2: " << q2 << Endl;
- TSocket s2(addr);
- SendMinimalHttpRequest(s2, "localhost", q2);
- TSocketInput si2(s2);
- THttpInput input2(&si2);
- Cerr << "===Counters response: " << input2.ReadAll() << Endl;*/
-
-
-
- Cerr << "===Request counters with query: " << query << Endl;
- TSocket s(addr);
- SendMinimalHttpRequest(s, "localhost", query);
- TSocketInput si(s);
- THttpInput input(&si);
- TString firstLine = input.FirstLine();
- //Cerr << "Counters2: '" << firstLine << "' content: " << input.ReadAll() << Endl;
-
- unsigned httpCode = ParseHttpRetCode(firstLine);
- UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
- NJson::TJsonValue value;
- UNIT_ASSERT(NJson::ReadJsonTree(&input, &value));
-
- Cerr << "counters: " << value.GetStringRobust() << "\n";
- return value;
-}
-
-} // NKikimr::NPersQueueTests
+#pragma once + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/http/io/stream.h> +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_value.h> + +#include <util/string/builder.h> +#include <util/string/vector.h> +#include <util/string/join.h> +#include <util/network/socket.h> + + +namespace NKikimr::NPersQueueTests { + +namespace { + NJson::TJsonValue SendQuery(ui16 port, const TString& query, bool mayFail = false) { + Cerr << "===Request counters with query: " << query << Endl; + TNetworkAddress addr("localhost", port); + TSocket s(addr); + SendMinimalHttpRequest(s, "localhost", query); + TSocketInput si(s); + THttpInput input(&si); + TString firstLine = input.FirstLine(); + + const auto httpCode = ParseHttpRetCode(firstLine); + if (mayFail && httpCode != 200u) { + return {}; + } else { + UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); + } + NJson::TJsonValue value; + UNIT_ASSERT(NJson::ReadJsonTree(&input, &value)); + + Cerr << "counters: " << value.GetStringRobust() << "\n"; + return value; + } +} + +NJson::TJsonValue GetCountersLegacy(ui16 port, const TString& counters, const TString& subsystem, + const TString& topicPath, const TString& clientDc = "", + const TString& originalDc = "", const TString& client = "", + const TString& consumerPath = "") { + TString dcFilter = ""; + { + if (originalDc) { + dcFilter = TStringBuilder() << "/OriginDC=" << originalDc; + } else if (clientDc) { + dcFilter = TStringBuilder() << "/ClientDC=" << clientDc; + } + } + TVector<TString> pathItems = SplitString(topicPath, "/"); + UNIT_ASSERT(pathItems.size() >= 2); + TString account = pathItems.front(); + TString producer = JoinRange("@", pathItems.begin(), pathItems.end() - 1); + TStringBuilder queryBuilder = TStringBuilder() << + "/counters/counters=" << counters << + "/subsystem=" << subsystem << + "/Account=" << account << + "/Producer=" << producer << + "/Topic=" << Join("--", producer, pathItems.back()) << + "/TopicPath=" << JoinRange("%2F", pathItems.begin(), pathItems.end()) << + dcFilter; + + if (consumerPath) { + auto consumerPathItems = SplitString(consumerPath, "/"); + queryBuilder << + "/Client=" << client << + "/ConsumerPath=" << JoinRange("%2F", consumerPathItems.begin(), consumerPathItems.end()); + } + queryBuilder << "/json"; + + return SendQuery(port, queryBuilder); +} + + NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, const TString& subsystem, + const TString& client, const TString& consumerPath) { + TVector<TString> consumerPathItems = SplitString(consumerPath, "/"); + UNIT_ASSERT(consumerPathItems.size() >= 2); + TStringBuilder queryBuilder = TStringBuilder() << + "/counters/counters=" << counters << + "/subsystem=" << subsystem << + "/Client=" << client << + "/ConsumerPath=" << JoinRange("%2F", consumerPathItems.begin(), consumerPathItems.end()) << + "/json"; + + return SendQuery(port, queryBuilder); +} + +NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, + const TString& cloudId, const TString& databaseId, + const TString& folderId, const TString& streamName, + const TString& consumer, const TString& host, + const TString& shard) { + bool mayFail = false; + TVector<TString> pathItems = SplitString(streamName, "/"); + TStringBuilder queryBuilder; + queryBuilder << + "/counters/counters=" << counters << + "/database=" << databaseId << + "/cloud=" << cloudId << + "/folder=" << folderId << + "/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end()); + + if (consumer) { + queryBuilder << + "/consumer=" << consumer; + } + + if (host) { + queryBuilder << + "/host=" << host; + } + + if (shard) { + queryBuilder << + "/shard=" << shard; + mayFail = true; + } + + queryBuilder << "/json"; + + return SendQuery(port, queryBuilder, mayFail); +} + +} // NKikimr::NPersQueueTests diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto index 5c1354288a..4e80c1cbb8 100644 --- a/ydb/public/api/protos/draft/datastreams.proto +++ b/ydb/public/api/protos/draft/datastreams.proto @@ -110,7 +110,7 @@ message StreamDescription { // Represents range of possible sequence numbers for the shard message SequenceNumberRange { string starting_sequence_number = 1; - string ending_sequence_number = 2; + string ending_sequence_number = 2 [(FieldTransformer) = TRANSFORM_EMPTY_TO_NOTHING]; } // Represents shard details diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h index 250ce03620..40559602f6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h @@ -39,11 +39,11 @@ public: StartServer(); } - void StartServer(bool doClientInit = true) { + void StartServer(bool doClientInit = true, TMaybe<TString> databaseName = Nothing()) { PrepareNetDataFile(); CleverServer = MakeHolder<NKikimr::Tests::TServer>(ServerSettings); CleverServer->EnableGRpc(GrpcServerOptions); - AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort); + AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName); EnableLogs(LOGGED_SERVICES); if (doClientInit) { AnnoyingClient->FullInit(); diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 509eb2a231..784b004fd8 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -142,8 +142,7 @@ private: void Handle(TEvPQProxy::TEvReleasePartition::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPQProxy::TEvLockPartition::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvGetStatus:: - TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPQProxy::TEvDeadlineExceeded::TPtr& ev, const NActors::TActorContext& ctx); @@ -270,7 +269,6 @@ TReadSessionActor::TReadSessionActor( , TopicsHandler(topicsHandler) { Y_ASSERT(Request); - ++(*GetServiceCounters(Counters, "pqproxy|readSession")->GetCounter("SessionsCreatedTotal", true)); } @@ -279,6 +277,11 @@ TReadSessionActor::~TReadSessionActor() = default; void TReadSessionActor::Bootstrap(const TActorContext& ctx) { Y_VERIFY(Request); + if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + ++(*GetServiceCounters(Counters, "pqproxy|readSession") + ->GetNamedCounter("sensor", "SessionsCreatedTotal", true)); + } + Request->GetStreamCtx()->Attach(ctx.SelfID); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); @@ -464,13 +467,13 @@ void TReadSessionActor::Die(const TActorContext& ctx) { } LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD"); - if (SessionsActive) { - --(*SessionsActive); - } if (BytesInflight) { (*BytesInflight) -= BytesInflight_; } - if (SessionsActive) { //PartsPerSession is inited too + if (SessionsActive) { + --(*SessionsActive); + } + if (SessionsActive) { PartsPerSession.DecFor(Partitions.size(), 1); } @@ -549,11 +552,13 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorC if (it == Partitions.end() || it->second.Releasing) { //do nothing - already released partition LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for " << ev->Get()->Partition - << " at offset " << ev->Get()->ReadOffset); + << " at offset " << ev->Get()->ReadOffset); return; } - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for " << ev->Get()->Partition - << " at readOffset " << ev->Get()->ReadOffset << " commitOffset " << ev->Get()->CommitOffset); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for " + << ev->Get()->Partition << + " at readOffset " << ev->Get()->ReadOffset << + " commitOffset " << ev->Get()->CommitOffset); //proxy request to partition - allow initing //TODO: add here VerifyReadOffset too and check it againts Committed position @@ -597,10 +602,14 @@ void TReadSessionActor::DropPartition(THashMap<ui64, TPartitionActorInfo>::itera Y_VERIFY(res); } - PartsPerSession.DecFor(Partitions.size(), 1); + if (SessionsActive) { + PartsPerSession.DecFor(Partitions.size(), 1); + } BalancerGeneration.erase(it->first); Partitions.erase(it); - PartsPerSession.IncFor(Partitions.size(), 1); + if (SessionsActive) { + PartsPerSession.IncFor(Partitions.size(), 1); + } } @@ -696,7 +705,11 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo } ClientId = NPersQueue::ConvertNewConsumerName(init.consumer(), ctx); - ClientPath = init.consumer(); + if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + ClientPath = init.consumer(); + } else { + ClientPath = NPersQueue::NormalizeFullPath(NPersQueue::MakeConsumerPath(init.consumer())); + } TStringBuilder session; session << ClientPath << "_" << ctx.SelfID.NodeId() << "_" << Cookie << "_" << TAppData::RandomProvider->GenRand64() << "_v1"; @@ -750,7 +763,10 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo } LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " init: " << event->Request << " from " << PeerName); - SetupCounters(); + + if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + SetupCounters(); + } if (Request->GetInternalToken().empty()) { if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { @@ -810,23 +826,31 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) { void TReadSessionActor::SetupCounters() { - auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession")->GetSubgroup("Client", ClientId)->GetSubgroup("ConsumerPath", ClientPath); - SessionsCreated = subGroup->GetExpiringCounter("SessionsCreated", true); - SessionsActive = subGroup->GetExpiringCounter("SessionsActive", false); - Errors = subGroup->GetExpiringCounter("Errors", true); - PipeReconnects = subGroup->GetExpiringCounter("PipeReconnects", true); - - BytesInflight = subGroup->GetExpiringCounter("BytesInflight", false); + if (SessionsCreated) { + return; + } - PartsPerSession = NKikimr::NPQ::TPercentileCounter(subGroup->GetSubgroup("sensor", "PartsPerSession"), {}, {}, "Count", - TVector<std::pair<ui64, TString>>{{1, "1"}, {2, "2"}, {5, "5"}, - {10, "10"}, {20, "20"}, {50, "50"}, {70, "70"}, - {100, "100"}, {150, "150"}, {300,"300"}, {99999999, "99999999"}}, false); + auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession"); + subGroup = subGroup->GetSubgroup("Client", ClientId)->GetSubgroup("ConsumerPath", ClientPath); + const TString name = "sensor"; + + BytesInflight = subGroup->GetExpiringNamedCounter(name, "BytesInflight", false); + Errors = subGroup->GetExpiringNamedCounter(name, "Errors", true); + PipeReconnects = subGroup->GetExpiringNamedCounter(name, "PipeReconnects", true); + SessionsActive = subGroup->GetExpiringNamedCounter(name, "SessionsActive", false); + SessionsCreated = subGroup->GetExpiringNamedCounter(name, "SessionsCreated", true); + PartsPerSession = NKikimr::NPQ::TPercentileCounter( + subGroup->GetSubgroup(name, "PartsPerSession"), + {}, {}, "Count", + TVector<std::pair<ui64, TString>>{{1, "1"}, {2, "2"}, {5, "5"}, + {10, "10"}, {20, "20"}, {50, "50"}, + {70, "70"}, {100, "100"}, {150, "150"}, + {300,"300"}, {99999999, "99999999"}}, + false, true); ++(*SessionsCreated); ++(*SessionsActive); PartsPerSession.IncFor(Partitions.size(), 1); //for 0 - } @@ -856,19 +880,19 @@ void TReadSessionActor::SetupTopicCounters(const TString& topic, const TString& const TString& dbId, const TString& folderId) { auto& topicCounters = TopicCounters[topic]; - auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters, "readSession"); + auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters); //client/consumerPath Account/Producer OriginDC Topic/TopicPath TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(topic, cloudId, dbId, folderId); - TVector<std::pair<TString, TString>> cons{}; + TVector<std::pair<TString, TString>> cons = {{"consumer", ClientPath}}; - topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true); - topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true); - topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false); - topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false); - topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false); - topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true); - topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true); - topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true); + topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name"); + topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name"); + topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name"); + topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name"); + topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name"); + topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name"); + topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name"); + topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name"); topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; @@ -931,6 +955,9 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAct auto& topicHolder = Topics[t.TopicNameConverter->GetClientsideName()]; topicHolder.TabletID = t.TabletID; topicHolder.TopicNameConverter = t.TopicNameConverter; + topicHolder.CloudId = t.CloudId; + topicHolder.DbId = t.DbId; + topicHolder.FolderId = t.FolderId; FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter; } @@ -1012,11 +1039,15 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC); TActorId actorId = ctx.Register(partitionActor); - PartsPerSession.DecFor(Partitions.size(), 1); + if (SessionsActive) { + PartsPerSession.DecFor(Partitions.size(), 1); + } Y_VERIFY(record.GetGeneration() > 0); auto pp = Partitions.insert(std::make_pair(assignId, TPartitionActorInfo{actorId, partitionId, ctx})); Y_VERIFY(pp.second); - PartsPerSession.IncFor(Partitions.size(), 1); + if (SessionsActive) { + PartsPerSession.IncFor(Partitions.size(), 1); + } bool res = ActualPartitionActors.insert(actorId).second; Y_VERIFY(res); @@ -1234,7 +1265,7 @@ void TReadSessionActor::CloseSession(const TString& errorReason, const PersQueue } if (Errors) { ++(*Errors); - } else { + } else if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { ++(*GetServiceCounters(Counters, "pqproxy|readSession")->GetCounter("Errors", true)); } @@ -1349,8 +1380,12 @@ bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorConte clientConfig.RetryPolicy = RetryPolicyForPipes; t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig)); if (InitDone) { - ++(*PipeReconnects); - ++(*Errors); + if (PipeReconnects) { + ++(*PipeReconnects); + } + if (Errors) { + ++(*Errors); + } RegisterSession(t.second.PipeClient, t.first, t.second.Groups, ctx); } @@ -1447,7 +1482,9 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadResponse::TPtr& ev, const TAct --formedResponse->RequestsInfly; BytesInflight_ += diff; - (*BytesInflight) += diff; + if (BytesInflight) { + (*BytesInflight) += diff; + } if (formedResponse->RequestsInfly == 0) { ProcessAnswer(ctx, formedResponse); @@ -1458,7 +1495,9 @@ bool TReadSessionActor::WriteResponse(PersQueue::V1::MigrationStreamingReadServe ui64 sz = response.ByteSize(); ActiveWrites.push(sz); BytesInflight_ += sz; - if (BytesInflight) (*BytesInflight) += sz; + if (BytesInflight) { + (*BytesInflight) += sz; + } return finish ? Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK) : Request->GetStreamCtx()->Write(std::move(response)); } @@ -1490,7 +1529,9 @@ void TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo } BytesInflight_ -= diff; - (*BytesInflight) -= diff; + if (BytesInflight) { + (*BytesInflight) -= diff; + } for (auto& pp : formedResponse->PartitionsTookPartInControlMessages) { auto it = PartitionToControlMessages.find(pp); @@ -1613,7 +1654,9 @@ void TReadSessionActor::ProcessReads(const TActorContext& ctx) { i64 diff = formedResponse->Response.ByteSize(); BytesInflight_ += diff; formedResponse->ByteSize = diff; - (*BytesInflight) += diff; + if (BytesInflight) { + (*BytesInflight) += diff; + } Reads.pop_front(); } } @@ -2629,6 +2672,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse() auto& pqDescr = entry.PQGroupInfo->Description; topicsIter->second.TabletID = pqDescr.GetBalancerTabletID(); + topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId(); + topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId(); + topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId(); return CheckTopicACL(entry, topicsIter->first, ctx); } diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 98334996a3..4964c33828 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -124,7 +124,6 @@ TWriteSessionActor::TWriteSessionActor( , SourceIdUpdatesInflight(0) { Y_ASSERT(Request); - ++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true)); } @@ -344,7 +343,7 @@ void TWriteSessionActor::SetupCounters() { //now topic is checked, can create group for real topic, not garbage auto subGroup = GetServiceCounters(Counters, "pqproxy|writeSession"); - TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(LocalDC, TopicConverter->GetClientsideName()); + TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(TopicConverter->GetClientsideName()); BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"BytesInflight"}, false); BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"BytesInflightTotal"}, false); @@ -356,18 +355,17 @@ void TWriteSessionActor::SetupCounters() SessionsActive.Inc(); } -void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, - const TString& folderId) +void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId) { //now topic is checked, can create group for real topic, not garbage - auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters, "writeSession"); + auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters); TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(TopicConverter->GetClientsideName(), cloudId, dbId, folderId); - BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false); - BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false); - SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true); - SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false); - Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true); + BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false, "name"); + BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false, "name"); + SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true, "name"); + SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name"); + Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name"); SessionsCreated.Inc(); SessionsActive.Inc(); diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 8e209dbd9d..257648f9c2 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -10,6 +10,7 @@ #include <ydb/library/aclib/aclib.h> #include <ydb/library/persqueue/obfuscate/obfuscate.h> +#include <ydb/library/persqueue/tests/counters.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -42,30 +43,6 @@ namespace NKikimr::NPersQueueTests { using namespace NYdb::NPersQueue; using namespace NPersQueue; - NJson::TJsonValue GetCountersNewSchemeCache(ui16 port, const TString& counters, const TString& subsystem, const TString& topicPath) { - TString escapedPath = "%2F" + JoinStrings(SplitString(topicPath, "/"), "%2F"); - TString query = TStringBuilder() << "/counters/counters=" << counters - << "/subsystem=" << subsystem - << "/Topic=" << escapedPath << "/json"; - - Cerr << "Will execute query " << query << Endl; - TNetworkAddress addr("localhost", port); - TSocket s(addr); - - SendMinimalHttpRequest(s, "localhost", query); - TSocketInput si(s); - THttpInput input(&si); - Cerr << input.ReadAll() << Endl; - unsigned httpCode = ParseHttpRetCode(input.FirstLine()); - UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); - - NJson::TJsonValue value; - UNIT_ASSERT(NJson::ReadJsonTree(&input, &value)); - - Cerr << "counters: " << value.GetStringRobust() << "\n"; - return value; - } - Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest) { void PrepareForGrpcNoDC(TFlatMsgBusPQClient& annoyingClient) { @@ -122,7 +99,8 @@ namespace NKikimr::NPersQueueTests { auto persQueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver); { - auto res = persQueueClient->AddReadRule("/Root/account2/topic2", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1"))); + auto res = persQueueClient->AddReadRule("/Root/account2/topic2", + TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1"))); res.Wait(); UNIT_ASSERT(res.GetValue().IsSuccess()); } @@ -161,7 +139,171 @@ namespace NKikimr::NPersQueueTests { testReadFromTopic("account2/topic2"); } - } + Y_UNIT_TEST(TestWriteStat1stClass) { + auto testWriteStat1stClass = [](const TString& consumerName) { + TTestServer server(false); + server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + server.StartServer(); + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE}); + + const TString topicName{"account2/topic2"}; + const TString fullTopicName{"/Root/account2/topic2"}; + const TString folderId{"somefolder"}; + const TString cloudId{"somecloud"}; + const TString databaseId{"root"}; + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + server.AnnoyingClient->AlterUserAttributes("/", "Root", + {{"folder_id", folderId}, + {"cloud_id", cloudId}, + {"database_id", databaseId}})); + + server.AnnoyingClient->SetNoConfigMode(); + server.AnnoyingClient->FullInit(); + server.AnnoyingClient->InitUserRegistry(); + server.AnnoyingClient->MkDir("/Root", "account2"); + server.AnnoyingClient->CreateTopicNoLegacy(fullTopicName, 5); + + NYdb::TDriverConfig driverCfg; + + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort).SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)).SetDatabase("/Root"); + + const auto monPort = TPortManager().GetPort(); + auto Counters = server.CleverServer->GetGRpcServerRootCounters(); + NActors::TMon Monitoring({monPort, "localhost", 3, "root", "localhost", {}, {}, {}}); + Monitoring.RegisterCountersPage("counters", "Counters", Counters); + Monitoring.Start(); + + auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg); + auto persQueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver); + + { + auto res = persQueueClient->AddReadRule(fullTopicName, + TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName(consumerName))); + res.Wait(); + UNIT_ASSERT(res.GetValue().IsSuccess()); + } + + WaitACLModification(); + + auto checkCounters = + [cloudId, folderId, databaseId](auto monPort, + const std::set<std::string>& canonicalSensorNames, + const TString& stream, const TString& consumer, + const TString& host, const TString& shard) { + auto counters = GetCounters1stClass(monPort, "datastreams", cloudId, databaseId, + folderId, stream, consumer, host, shard); + const auto sensors = counters["sensors"].GetArray(); + std::set<std::string> sensorNames; + std::transform(sensors.begin(), sensors.end(), + std::inserter(sensorNames, sensorNames.begin()), + [](auto& el) { + return el["labels"]["name"].GetString(); + }); + auto equal = sensorNames == canonicalSensorNames; + UNIT_ASSERT(equal); + }; + + { + auto writer = CreateSimpleWriter(*ydbDriver, fullTopicName, "123", 1); + for (int i = 0; i < 4; ++i) { + bool res = writer->Write(TString(10, 'a')); + UNIT_ASSERT(res); + } + + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName(consumerName).AppendTopics(topicName); + auto reader = CreateReader(*ydbDriver, settings); + + auto msg = GetNextMessageSkipAssignment(reader); + UNIT_ASSERT(msg); + + checkCounters(monPort, + { + "stream.internal_read.commits_per_second", + "stream.internal_read.partitions_errors_per_second", + "stream.internal_read.partitions_locked", + "stream.internal_read.partitions_locked_per_second", + "stream.internal_read.partitions_released_per_second", + "stream.internal_read.partitions_to_be_locked", + "stream.internal_read.partitions_to_be_released", + "stream.internal_read.waits_for_data", + "stream.internal_write.bytes_proceeding", + "stream.internal_write.bytes_proceeding_total", + "stream.internal_write.errors_per_second", + "stream.internal_write.sessions_active", + "stream.internal_write.sessions_created_per_second", + }, + topicName, "", "", "" + ); + + checkCounters(monPort, + { + "stream.internal_read.commits_per_second", + "stream.internal_read.partitions_errors_per_second", + "stream.internal_read.partitions_locked", + "stream.internal_read.partitions_locked_per_second", + "stream.internal_read.partitions_released_per_second", + "stream.internal_read.partitions_to_be_locked", + "stream.internal_read.partitions_to_be_released", + "stream.internal_read.waits_for_data", + }, + topicName, consumerName, "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "stream.internal_read.time_lags_milliseconds", + "stream.incoming_bytes_per_second", + "stream.incoming_records_per_second", + "stream.internal_write.bytes_per_second", + "stream.internal_write.compacted_bytes_per_second", + "stream.internal_write.partition_write_quota_wait_milliseconds", + "stream.internal_write.record_size_bytes", + "stream.internal_write.records_per_second", + "stream.internal_write.time_lags_milliseconds", + "stream.internal_write.uncompressed_bytes_per_second", + "stream.await_operating_milliseconds", + "stream.internal_write.buffer_brimmed_duration_ms", + "stream.internal_read.bytes_per_second", + "stream.internal_read.records_per_second", + "stream.outgoing_bytes_per_second", + "stream.outgoing_records_per_second", + }, + topicName, "", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "stream.internal_read.time_lags_milliseconds", + "stream.await_operating_milliseconds", + "stream.internal_read.bytes_per_second", + "stream.internal_read.records_per_second", + "stream.outgoing_bytes_per_second", + "stream.outgoing_records_per_second", + }, + topicName, consumerName, "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "stream.await_operating_milliseconds" + }, + topicName, consumerName, "1", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "stream.internal_write.buffer_brimmed_duration_ms" + }, + topicName, "", "1", "" + ); + } + }; + + testWriteStat1stClass("user1"); + testWriteStat1stClass("some@random@consumer"); + } + } // Y_UNIT_TEST_SUITE(TPersQueueNewSchemeCacheTest) Y_UNIT_TEST_SUITE(TPersqueueDataPlaneTestSuite) { @@ -186,7 +328,7 @@ namespace NKikimr::NPersQueueTests { auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(TWriteSessionSettings() .Path(topic).MessageGroupId("my_group_1") .ClusterDiscoveryMode(EClusterDiscoveryMode::Off) - .RetryPolicy(IRetryPolicy::GetNoRetryPolicy())); + .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy())); Cerr << "InitSeqNO " << writer->GetInitSeqNo() << "\n"; writer->Write("somedata", 1); writer->Close(); @@ -194,7 +336,7 @@ namespace NKikimr::NPersQueueTests { { auto reader = server.PersQueueClient->CreateReadSession(TReadSessionSettings().ConsumerName("non_existing") .AppendTopics(topic).DisableClusterDiscovery(true) - .RetryPolicy(IRetryPolicy::GetNoRetryPolicy())); + .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy())); auto future = reader->WaitEvent(); @@ -211,7 +353,7 @@ namespace NKikimr::NPersQueueTests { { auto reader = server.PersQueueClient->CreateReadSession(TReadSessionSettings().ConsumerName(consumer) .AppendTopics(topic).DisableClusterDiscovery(true) - .RetryPolicy(IRetryPolicy::GetNoRetryPolicy())); + .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy())); auto future = reader->WaitEvent(); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 42ea25e7c1..64c0957f14 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1316,71 +1316,202 @@ namespace { Y_UNIT_TEST(TestWriteStat) { - NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false); - auto netDataUpdated = server.PrepareNetDataFile(FormNetData()); - UNIT_ASSERT(netDataUpdated); - server.StartServer(); + auto testWriteStat = [](const TString& originallyProvidedConsumerName, + const TString& consumerName, + const TString& consumerPath) { + auto checkCounters = [](auto monPort, const TString& session, + const std::set<std::string>& canonicalSensorNames, + const TString& clientDc, const TString& originDc, + const TString& client, const TString& consumerPath) { + NJson::TJsonValue counters; + if (clientDc.empty() && originDc.empty()) { + counters = GetClientCountersLegacy(monPort, "pqproxy", session, client, consumerPath); + } else { + counters = GetCountersLegacy(monPort, "pqproxy", session, "account/topic1", + clientDc, originDc, client, consumerPath); + } + const auto sensors = counters["sensors"].GetArray(); + std::set<std::string> sensorNames; + std::transform(sensors.begin(), sensors.end(), + std::inserter(sensorNames, sensorNames.begin()), + [](auto& el) { + return el["labels"]["sensor"].GetString(); + }); + auto equal = sensorNames == canonicalSensorNames; + UNIT_ASSERT(equal); + }; - server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER }); - server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR); + NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false); + auto netDataUpdated = server.PrepareNetDataFile(FormNetData()); + UNIT_ASSERT(netDataUpdated); + server.StartServer(); - auto sender = server.CleverServer->GetRuntime()->AllocateEdgeActor(); + const auto monPort = TPortManager().GetPort(); + auto Counters = server.CleverServer->GetGRpcServerRootCounters(); + NActors::TMon Monitoring({monPort, "localhost", 3, "root", "localhost", {}, {}, {}}); + Monitoring.RegisterCountersPage("counters", "Counters", Counters); + Monitoring.Start(); - GetClassifierUpdate(*server.CleverServer, sender); //wait for initializing + server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::NET_CLASSIFIER }); + server.EnableLogs({ NKikimrServices::PERSQUEUE }, NActors::NLog::PRI_ERROR); - server.AnnoyingClient->CreateTopic("rt3.dc1--account--topic1", 10, 10000, 10000, 2000); - server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 10, 10000, 10000, 2000); + auto sender = server.CleverServer->GetRuntime()->AllocateEdgeActor(); - auto driver = server.AnnoyingClient->GetDriver(); + GetClassifierUpdate(*server.CleverServer, sender); //wait for initializing - auto writer = CreateWriter(*driver, "account/topic1", "base64:AAAAaaaa____----12", 0, "raw"); + server.AnnoyingClient->CreateTopic("rt3.dc1--account--topic1", 10, 10000, 10000, 2000); - auto msg = writer->GetEvent(true); - UNIT_ASSERT(msg); // ReadyToAcceptEvent + auto driver = server.AnnoyingClient->GetDriver(); - auto ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg); - UNIT_ASSERT(ev); + auto writer = CreateWriter(*driver, "account/topic1", "base64:AAAAaaaa____----12", 0, "raw"); - TInstant st(TInstant::Now()); - for (ui32 i = 1; i <= 5; ++i) { - writer->Write(std::move(ev->ContinuationToken), TString(2000, 'a')); - msg = writer->GetEvent(true); + auto msg = writer->GetEvent(true); UNIT_ASSERT(msg); // ReadyToAcceptEvent - ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg); + auto ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg); UNIT_ASSERT(ev); - msg = writer->GetEvent(true); + TInstant st(TInstant::Now()); + for (ui32 i = 1; i <= 5; ++i) { + writer->Write(std::move(ev->ContinuationToken), TString(2000, 'a')); + msg = writer->GetEvent(true); + UNIT_ASSERT(msg); // ReadyToAcceptEvent - Cerr << DebugString(*msg) << "\n"; + ev = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*msg); + UNIT_ASSERT(ev); - auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*msg); - UNIT_ASSERT(ack); + msg = writer->GetEvent(true); + + Cerr << DebugString(*msg) << "\n"; - if (i == 5) { - UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3)); - // TODO: Describe this assert in comment - UNIT_ASSERT(!ack->Acks.empty()); - UNIT_ASSERT(ack->Acks.back().Stat); - UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue >= ack->Acks.back().Stat->PartitionQuotedTime); - UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue <= ack->Acks.back().Stat->PartitionQuotedTime + TDuration::Seconds(10)); + auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*msg); + UNIT_ASSERT(ack); + if (i == 5) { + UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3)); + UNIT_ASSERT(!ack->Acks.empty()); + UNIT_ASSERT(ack->Acks.back().Stat); + } } - } - GetCounters(server.CleverServer->GetRuntime()->GetMonPort(), "pqproxy", "writeSession", "account/topic1", "Vla"); - { - NYdb::NPersQueue::TReadSessionSettings settings; - settings.ConsumerName("shared/user").AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"}); + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "writeSession", + { + "BytesWrittenOriginal", + "CompactedBytesWrittenOriginal", + "MessagesWrittenOriginal", + "UncompressedBytesWrittenOriginal" + }, + "", "cluster", "", "" + ); + + checkCounters(monPort, + "writeSession", + { + "BytesInflight", + "BytesInflightTotal", + "Errors", + "SessionsActive", + "SessionsCreated", + // "WithoutAuth" - this counter just not present in this test + }, + "", "cluster", "", "" + ); - auto reader = CreateReader(*driver, settings); + { + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName(originallyProvidedConsumerName) + .AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"}); - auto msg = GetNextMessageSkipAssignment(reader); - UNIT_ASSERT(msg); + auto reader = CreateReader(*driver, settings); - Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n"; - } - GetCounters(server.CleverServer->GetRuntime()->GetMonPort(), "pqproxy", "readSession", "account/topic1", "Vla"); + auto msg = GetNextMessageSkipAssignment(reader); + UNIT_ASSERT(msg); + + Cerr << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + checkCounters(monPort, + "readSession", + { + "Commits", + "PartitionsErrors", + "PartitionsInfly", + "PartitionsLocked", + "PartitionsReleased", + "PartitionsToBeLocked", + "PartitionsToBeReleased", + "WaitsForData" + }, + "", "cluster", "", "" + ); + + checkCounters(monPort, + "readSession", + { + "BytesInflight", + "Errors", + "PipeReconnects", + "SessionsActive", + "SessionsCreated", + "PartsPerSession" + }, + "", "", consumerName, consumerPath + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "readSession", + { + "BytesReadFromDC" + }, + "Vla", "", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "readSession", + { + "BytesRead", + "MessagesRead" + }, + "", "Dc1", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "readSession", + { + "BytesRead", + "MessagesRead" + }, + "", "cluster", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "readSession", + { + "BytesRead", + "MessagesRead" + }, + "", "cluster", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "readSession", + { + "BytesRead", + "MessagesRead" + }, + "", "Dc1", consumerName, consumerPath + ); + } + }; + + testWriteStat("some@random@consumer", "some@random@consumer", "some/random/consumer"); + testWriteStat("some@user", "some@user", "some/user"); + testWriteStat("shared@user", "shared@user", "shared/user"); + testWriteStat("shared/user", "user", "shared/user"); + testWriteStat("user", "user", "shared/user"); + testWriteStat("some@random/consumer", "some@random@consumer", "some/random/consumer"); + testWriteStat("/some/user", "some@user", "some/user"); } + Y_UNIT_TEST(TestWriteSessionsConflicts) { NPersQueue::TTestServer server; server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1); @@ -2561,6 +2692,7 @@ namespace { Codecs: "zstd" } ReadRuleVersions: 567 + YdbDatabasePath: "/Root" } ErrorCode: OK } |