aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-07-13 16:18:58 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-07-13 16:18:58 +0300
commit7eb5a247ce7b3bc787d68274e49c37eda39cdf1c (patch)
tree0f05c2f08304c6f2b937aa162a6d8ad4c114fc84
parent3466d4a0b4bce07d32a57bbf155ede5c670eccdf (diff)
downloadydb-7eb5a247ce7b3bc787d68274e49c37eda39cdf1c.tar.gz
Read channel count from KV tablet
-rw-r--r--ydb/core/persqueue/partition.cpp7
-rw-r--r--ydb/core/persqueue/partition.h3
-rw-r--r--ydb/core/persqueue/partition_init.cpp4
-rw-r--r--ydb/core/persqueue/partition_write.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp4
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp1
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp1
9 files changed, 15 insertions, 11 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index d5eb10c59f..5e1c76e8c6 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -102,7 +102,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) {
TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
- const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace,
+ const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, ui32 numChannels,
bool newPartition,
TVector<TTransaction> distrTxs)
: Initializer(this)
@@ -143,6 +143,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
, ReservedSize(0)
, Channel(0)
+ , NumChannels(numChannels)
, WriteBufferIsFullCounter(nullptr)
, WriteLagMs(TDuration::Minutes(1), 100)
{
@@ -246,7 +247,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>();
bool haveChanges = CleanUp(request.Get(), ctx);
if (DiskIsFull) {
- AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels());
+ AddCheckDiskRequest(request.Get(), NumChannels);
haveChanges = true;
}
@@ -2264,7 +2265,7 @@ ui32 TPartition::NextChannel(bool isHead, ui32 blobSize) {
};
ui32 res = Channel;
- Channel = (Channel + 1) % Config.GetPartitionConfig().GetNumChannels();
+ Channel = (Channel + 1) % NumChannels;
return res;
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 1dc8a8cb01..5784df51e9 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -327,7 +327,7 @@ public:
TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
- const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace,
+ const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, ui32 numChannels,
bool newPartition = false,
TVector<TTransaction> distrTxs = {});
@@ -637,6 +637,7 @@ private:
std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests;
ui32 Channel;
+ ui32 NumChannels;
TVector<ui32> TotalChannelWritesByHead;
TWorkingTimeCounter WriteBufferIsFullCounter;
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index aa7bbd6b81..d47b026eda 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -221,7 +221,7 @@ TInitDiskStatusStep::TInitDiskStatusStep(TInitializer* initializer)
void TInitDiskStatusStep::Execute(const TActorContext& ctx) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
- AddCheckDiskRequest(request.Get(), Partition()->Config.GetPartitionConfig().GetNumChannels());
+ AddCheckDiskRequest(request.Get(), Partition()->NumChannels);
ctx.Send(Partition()->Tablet, request.Release());
}
@@ -691,7 +691,7 @@ void TPartition::Initialize(const TActorContext& ctx) {
Config.GetYdbDatabasePath(),
IsServerless,
FolderId);
- TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels());
+ TotalChannelWritesByHead.resize(NumChannels);
if (Config.GetPartitionConfig().HasMirrorFrom()) {
ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 232c50230a..ec498d1806 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1395,7 +1395,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
if (!Requests.empty() && DiskIsFull) {
CancelAllWritesOnIdle(ctx);
- AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels());
+ AddCheckDiskRequest(request.Get(), NumChannels);
haveCheckDisk = true;
} else {
haveData = ProcessWrites(request.Get(), now, ctx);
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index a556d295e8..202a30dda3 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -3273,6 +3273,9 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
bool newPartition,
const TActorContext& ctx)
{
+ int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet
+ Y_VERIFY(channels > 0);
+
return new TPartition(TabletID(),
partitionId,
ctx.SelfID,
@@ -3283,6 +3286,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
config,
*Counters,
SubDomainOutOfSpace,
+ (ui32)channels,
newPartition);
}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index d3cba84fd8..efe3ad3d29 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -272,6 +272,7 @@ void TPartitionFixture::CreatePartitionActor(ui32 id,
Config,
*TabletCounters,
false,
+ 1,
newPartition,
std::move(txs));
ActorId = Ctx->Runtime->Register(actor);
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index a8edbae082..e709a35314 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -251,7 +251,7 @@ message TPartitionConfig {
optional uint64 MaxWriteInflightSize = 9 [default = 30000000]; //near 30mb
optional uint64 BorderWriteInflightSize = 12 [default = 10000000]; //near 10mb
- optional uint32 NumChannels = 10 [default = 10];
+ reserved 10; // (deprecated) NumChannels;
optional uint32 TotalPartitions = 13 [default = 1];
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 9de74a9c83..9ce30ad5db 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -671,7 +671,6 @@ namespace NKikimr::NGRpcProxy::V1 {
const auto& channelProfiles = pqConfig.GetChannelProfiles();
if (channelProfiles.size() > 2) {
- partConfig->SetNumChannels(channelProfiles.size() - 2); // channels 0,1 are reserved in tablet
partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles);
}
if (settings.max_partition_storage_size() < 0) {
@@ -993,7 +992,6 @@ namespace NKikimr::NGRpcProxy::V1 {
const auto& channelProfiles = pqConfig.GetChannelProfiles();
if (channelProfiles.size() > 2) {
- partConfig->SetNumChannels(channelProfiles.size() - 2); // channels 0,1 are reserved in tablet
partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles);
}
if (request.has_retention_period()) {
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index b01b3dfde8..7b3ec66d9e 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4460,7 +4460,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
SourceIdLifetimeSeconds: 1382400
WriteSpeedInBytesPerSecond: 123
BurstSize: 1000
- NumChannels: 10
ExplicitChannelProfiles {
PoolKind: "test"
}