diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-07-13 16:18:58 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-07-13 16:18:58 +0300 |
commit | 7eb5a247ce7b3bc787d68274e49c37eda39cdf1c (patch) | |
tree | 0f05c2f08304c6f2b937aa162a6d8ad4c114fc84 | |
parent | 3466d4a0b4bce07d32a57bbf155ede5c670eccdf (diff) | |
download | ydb-7eb5a247ce7b3bc787d68274e49c37eda39cdf1c.tar.gz |
Read channel count from KV tablet
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 2 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 1 |
9 files changed, 15 insertions, 11 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index d5eb10c59f..5e1c76e8c6 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -102,7 +102,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) { TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless, - const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, + const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, ui32 numChannels, bool newPartition, TVector<TTransaction> distrTxs) : Initializer(this) @@ -143,6 +143,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}} , ReservedSize(0) , Channel(0) + , NumChannels(numChannels) , WriteBufferIsFullCounter(nullptr) , WriteLagMs(TDuration::Minutes(1), 100) { @@ -246,7 +247,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>(); bool haveChanges = CleanUp(request.Get(), ctx); if (DiskIsFull) { - AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); + AddCheckDiskRequest(request.Get(), NumChannels); haveChanges = true; } @@ -2264,7 +2265,7 @@ ui32 TPartition::NextChannel(bool isHead, ui32 blobSize) { }; ui32 res = Channel; - Channel = (Channel + 1) % Config.GetPartitionConfig().GetNumChannels(); + Channel = (Channel + 1) % NumChannels; return res; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 1dc8a8cb01..5784df51e9 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -327,7 +327,7 @@ public: TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless, - const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, + const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, ui32 numChannels, bool newPartition = false, TVector<TTransaction> distrTxs = {}); @@ -637,6 +637,7 @@ private: std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests; ui32 Channel; + ui32 NumChannels; TVector<ui32> TotalChannelWritesByHead; TWorkingTimeCounter WriteBufferIsFullCounter; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index aa7bbd6b81..d47b026eda 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -221,7 +221,7 @@ TInitDiskStatusStep::TInitDiskStatusStep(TInitializer* initializer) void TInitDiskStatusStep::Execute(const TActorContext& ctx) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - AddCheckDiskRequest(request.Get(), Partition()->Config.GetPartitionConfig().GetNumChannels()); + AddCheckDiskRequest(request.Get(), Partition()->NumChannels); ctx.Send(Partition()->Tablet, request.Release()); } @@ -691,7 +691,7 @@ void TPartition::Initialize(const TActorContext& ctx) { Config.GetYdbDatabasePath(), IsServerless, FolderId); - TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels()); + TotalChannelWritesByHead.resize(NumChannels); if (Config.GetPartitionConfig().HasMirrorFrom()) { ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 232c50230a..ec498d1806 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1395,7 +1395,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) { if (!Requests.empty() && DiskIsFull) { CancelAllWritesOnIdle(ctx); - AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); + AddCheckDiskRequest(request.Get(), NumChannels); haveCheckDisk = true; } else { haveData = ProcessWrites(request.Get(), now, ctx); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index a556d295e8..202a30dda3 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3273,6 +3273,9 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, bool newPartition, const TActorContext& ctx) { + int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet + Y_VERIFY(channels > 0); + return new TPartition(TabletID(), partitionId, ctx.SelfID, @@ -3283,6 +3286,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, config, *Counters, SubDomainOutOfSpace, + (ui32)channels, newPartition); } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index d3cba84fd8..efe3ad3d29 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -272,6 +272,7 @@ void TPartitionFixture::CreatePartitionActor(ui32 id, Config, *TabletCounters, false, + 1, newPartition, std::move(txs)); ActorId = Ctx->Runtime->Register(actor); diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index a8edbae082..e709a35314 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -251,7 +251,7 @@ message TPartitionConfig { optional uint64 MaxWriteInflightSize = 9 [default = 30000000]; //near 30mb optional uint64 BorderWriteInflightSize = 12 [default = 10000000]; //near 10mb - optional uint32 NumChannels = 10 [default = 10]; + reserved 10; // (deprecated) NumChannels; optional uint32 TotalPartitions = 13 [default = 1]; diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 9de74a9c83..9ce30ad5db 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -671,7 +671,6 @@ namespace NKikimr::NGRpcProxy::V1 { const auto& channelProfiles = pqConfig.GetChannelProfiles(); if (channelProfiles.size() > 2) { - partConfig->SetNumChannels(channelProfiles.size() - 2); // channels 0,1 are reserved in tablet partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles); } if (settings.max_partition_storage_size() < 0) { @@ -993,7 +992,6 @@ namespace NKikimr::NGRpcProxy::V1 { const auto& channelProfiles = pqConfig.GetChannelProfiles(); if (channelProfiles.size() > 2) { - partConfig->SetNumChannels(channelProfiles.size() - 2); // channels 0,1 are reserved in tablet partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles); } if (request.has_retention_period()) { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index b01b3dfde8..7b3ec66d9e 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4460,7 +4460,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { SourceIdLifetimeSeconds: 1382400 WriteSpeedInBytesPerSecond: 123 BurstSize: 1000 - NumChannels: 10 ExplicitChannelProfiles { PoolKind: "test" } |