diff options
author | abcdef <[email protected]> | 2023-02-16 13:31:24 +0300 |
---|---|---|
committer | abcdef <[email protected]> | 2023-02-16 13:31:24 +0300 |
commit | df5f90063aea983d2bcf315c8ffdee35d173b8c5 (patch) | |
tree | 021cbe7beb5644e397f8efc1004a93c33db33b32 | |
parent | 61239a868c9b1e58172ca90cd8b78fbe8d279695 (diff) |
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 252 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 24 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 303 |
3 files changed, 453 insertions, 126 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 66b5d08f51a..c79fa14bbe9 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -434,19 +434,22 @@ void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partit void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { TSet<TString> hasReadRule; - for (auto& userInfo : UsersInfoStorage.GetAll()) { + for (auto& userInfo : UsersInfoStorage->GetAll()) { userInfo.second.ReadFromTimestamp = TInstant::Zero(); userInfo.second.HasReadRule = false; hasReadRule.insert(userInfo.first); } for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); - auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx, 0); + auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0); userInfo.HasReadRule = true; ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; if (userInfo.ReadRuleGeneration != rrGen) { THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); + // + // TODO(abcdef): заменить на вызов ProcessUserAct + // AddUserAct(event.Release()); userInfo.Session = ""; userInfo.Offset = 0; @@ -462,7 +465,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config userInfo.ReadFromTimestamp = ts; } for (auto& consumer : hasReadRule) { - auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx); THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); if (!userInfo.Important && userInfo.LabeledCounters) { @@ -471,18 +474,22 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config userInfo.Session = ""; userInfo.Offset = 0; userInfo.Step = userInfo.Generation = 0; + // + // TODO(abcdef): заменить на вызов ProcessUserAct + // AddUserAct(event.Release()); } } TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless, - const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, + const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool newPartition, TVector<TTransaction> distrTxs) : TabletID(tabletId) , Partition(partition) - , Config(config) + , TabletConfig(tabletConfig) + , Counters(counters) , TopicConverter(topicConverter) , IsLocalDC(isLocalDC) , DCId(std::move(dcId)) @@ -491,21 +498,13 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , WriteInflightSize(0) , Tablet(tablet) , BlobCache(blobCache) - , InitState(WaitDiskStatus) + , InitState(WaitConfig) , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB) , NewHeadKey{TKey{}, 0, TInstant::Zero(), 0} , BodySize(0) , MaxWriteResponsesSize(0) , GapSize(0) - , CloudId(config.GetYcCloudId()) - , DbId(config.GetYdbDatabaseId()) - , DbPath(config.GetYdbDatabasePath()) , IsServerless(isServerless) - , FolderId(config.GetYcFolderId()) - , UsersInfoStorage( - DCId, TabletID, TopicConverter, Partition, counters, Config, - CloudId, DbId, config.GetYdbDatabasePath(), IsServerless, FolderId - ) , ReadingTimestamp(false) , Cookie(0) , InitDuration(TDuration::Zero()) @@ -524,17 +523,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}} , ReservedSize(0) , Channel(0) - , TotalChannelWritesByHead(Config.GetPartitionConfig().GetNumChannels(), 0) , WriteBufferIsFullCounter(nullptr) , WriteLagMs(TDuration::Minutes(1), 100) { - if (Config.GetPartitionConfig().HasMirrorFrom()) { - ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); - } else { - ManageWriteTimestampEstimate = IsLocalDC; - } - - TabletCounters.Populate(counters); + TabletCounters.Populate(Counters); if (!distrTxs.empty()) { std::move(distrTxs.begin(), distrTxs.end(), @@ -712,7 +704,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo } } TABLEBODY() { - for (auto& d: UsersInfoStorage.GetAll()) { + for (auto& d: UsersInfoStorage->GetAll()) { TABLER() { TABLED() {out << EncodeHtmlPcdata(d.first);} TABLED() {out << d.second.Offset;} @@ -736,8 +728,52 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, res, out.Str())); } +void TPartition::RequestConfig(const TActorContext& ctx) +{ + auto event = MakeHolder<TEvKeyValue::TEvRequest>(); + auto read = event->Record.AddCmdRead(); + read->SetKey(GetKeyConfig()); + ctx.Send(Tablet, event.Release()); +} + +void TPartition::HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx) +{ + auto& response = res.GetReadResult(0); + + switch (response.GetStatus()) { + case NKikimrProto::OK: + Y_VERIFY(Config.ParseFromString(response.GetValue())); + Y_VERIFY(Config.GetVersion() <= TabletConfig.GetVersion()); + if (Config.GetVersion() < TabletConfig.GetVersion()) { + auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, + TabletConfig); + PushFrontDistrTx(event.Release()); + } + break; + case NKikimrProto::NODATA: + Config = TabletConfig; + break; + case NKikimrProto::ERROR: + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition << " can't read config"); + ctx.Send(Tablet, new TEvents::TEvPoisonPill()); + break; + default: + Cerr << "ERROR " << response.GetStatus() << "\n"; + Y_FAIL("bad status"); + }; + + InitState = WaitDiskStatus; + Initialize(ctx); +} void TPartition::Bootstrap(const TActorContext& ctx) { + Y_VERIFY(InitState == WaitConfig); + RequestConfig(ctx); + Become(&TThis::StateInit); +} + +void TPartition::Initialize(const TActorContext& ctx) { CreationTime = ctx.Now(); WriteCycleStartTime = ctx.Now(); WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(), @@ -747,6 +783,11 @@ void TPartition::Bootstrap(const TActorContext& ctx) { LastUsedStorageMeterTimestamp = ctx.Now(); WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); + CloudId = Config.GetYcCloudId(); + DbId = Config.GetYdbDatabaseId(); + DbPath = Config.GetYdbDatabasePath(); + FolderId = Config.GetYcFolderId(); + CalcTopicWriteQuotaParams(AppData()->PQConfig, IsLocalDC, TopicConverter, @@ -755,6 +796,25 @@ void TPartition::Bootstrap(const TActorContext& ctx) { TopicWriteQuoterPath, TopicWriteQuotaResourcePath); + UsersInfoStorage.ConstructInPlace(DCId, + TabletID, + TopicConverter, + Partition, + Counters, + Config, + CloudId, + DbId, + Config.GetYdbDatabasePath(), + IsServerless, + FolderId); + TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels()); + + if (Config.GetPartitionConfig().HasMirrorFrom()) { + ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); + } else { + ManageWriteTimestampEstimate = IsLocalDC; + } + if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(), Partition, @@ -764,7 +824,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) { Partition)); } - UsersInfoStorage.Init(Tablet, SelfId(), ctx); + UsersInfoStorage->Init(Tablet, SelfId(), ctx); Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); ui32 border = LEVEL0; @@ -787,7 +847,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) { } for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto &userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx); + auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); } @@ -982,7 +1042,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) { res->Record.SetCookie(*(it->Cookie)); ctx.Send(it->Sender, res.Release()); if (!it->ClientId.empty()) { - auto& userInfo = UsersInfoStorage.GetOrCreate(it->ClientId, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(it->ClientId, ctx); userInfo.ForgetSubscription(ctx.Now()); } it = HasDataRequests.erase(it); @@ -1002,7 +1062,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) { res->Record.SetCookie(*(it->Request.Cookie)); ctx.Send(it->Request.Sender, res.Release()); if (!it->Request.ClientId.empty()) { - auto& userInfo = UsersInfoStorage.GetOrCreate(it->Request.ClientId, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(it->Request.ClientId, ctx); userInfo.ForgetSubscription(ctx.Now()); } HasDataRequests.erase(jt); @@ -1020,7 +1080,7 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { auto now = ctx.Now(); WriteQuota->Update(now); - for (auto &c : UsersInfoStorage.GetAll()) { + for (auto &c : UsersInfoStorage->GetAll()) { while (true) { c.second.ReadQuota.Update(now); if (!c.second.ReadQuota.CanExaust() && !c.second.ReadRequests.empty()) { @@ -1077,7 +1137,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ProcessHasDataRequests(ctx); auto now = ctx.Now(); - for (auto& userInfo : UsersInfoStorage.GetAll()) { + for (auto& userInfo : UsersInfoStorage->GetAll()) { userInfo.second.UpdateReadingTimeAndState(now); for (auto& avg : userInfo.second.AvgReadBytes) { avg.Update(now); @@ -1162,7 +1222,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const auto& partConfig = Config.GetPartitionConfig(); ui64 minOffset = EndOffset; for (const auto& importantClientId : partConfig.GetImportantClientId()) { - TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantClientId); + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); ui64 curOffset = StartOffset; if (userInfo && userInfo->Offset >= 0) //-1 means no offset curOffset = userInfo->Offset; @@ -1285,7 +1345,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont Y_VERIFY(res.second); if (InitDone && record.HasClientId() && !record.GetClientId().empty()) { - auto& userInfo = UsersInfoStorage.GetOrCreate(record.GetClientId(), ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx); ++userInfo.Subscriptions; userInfo.UpdateReadOffset((i64)EndOffset - 1, ctx.Now(), ctx.Now(), ctx.Now()); userInfo.UpdateReadingTimeAndState(ctx.Now()); @@ -1302,7 +1362,7 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex } void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& /*ctx*/) { - auto userInfo = UsersInfoStorage.GetIfExists(ev->Get()->User); + auto userInfo = UsersInfoStorage->GetIfExists(ev->Get()->User); if (userInfo && userInfo->ReadSpeedLimiter) { auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline); TabletCounters.Populate(*diff.Get()); @@ -1336,7 +1396,10 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) if (Mirrorer) { Send(Mirrorer->Actor, new TEvents::TEvPoisonPill()); } - UsersInfoStorage.Clear(ctx); + + if (UsersInfoStorage.Defined()) { + UsersInfoStorage->Clear(ctx); + } Die(ctx); } @@ -1504,9 +1567,9 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkProtoSourceId) { SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now()); } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) { - UsersInfoStorage.Parse(*key, pair.GetValue(), ctx); + UsersInfoStorage->Parse(*key, pair.GetValue(), ctx); } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) { - UsersInfoStorage.ParseDeprecated(*key, pair.GetValue(), ctx); + UsersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx); } } //make next step @@ -1729,6 +1792,10 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo DiskIsFull = !diskIsOk; switch(InitState) { + case WaitConfig: + Y_VERIFY(response.ReadResultSize() == 1); + HandleConfig(response, ctx); + break; case WaitDiskStatus: Y_VERIFY(response.GetStatusResultSize()); HandleGetDiskStatus(response, ctx); @@ -1757,7 +1824,7 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo void TPartition::InitComplete(const TActorContext& ctx) { if (StartOffset == EndOffset && EndOffset == 0) { - for (auto& [user, info] : UsersInfoStorage.GetAll()) { + for (auto& [user, info] : UsersInfoStorage->GetAll()) { if (info.Offset > 0 && StartOffset < (ui64)info.Offset) { Head.Offset = EndOffset = StartOffset = info.Offset; } @@ -1810,7 +1877,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { InitUserInfoForImportantClients(ctx); - for (auto& userInfoPair : UsersInfoStorage.GetAll()) { + for (auto& userInfoPair : UsersInfoStorage->GetAll()) { Y_VERIFY(userInfoPair.second.Offset >= 0); ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx); } @@ -1834,7 +1901,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { void TPartition::UpdateUserInfoEndOffset(const TInstant& now) { - for (auto& userInfo : UsersInfoStorage.GetAll()) { + for (auto& userInfo : UsersInfoStorage->GetAll()) { userInfo.second.EndOffset = (i64)EndOffset; userInfo.second.UpdateReadingTimeAndState(now); } @@ -1885,20 +1952,20 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet<TString> important; for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) { important.insert(importantUser); - TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantUser); + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser); if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup())); userInfo->SetImportant(true); continue; } if (!userInfo) { - userInfo = &UsersInfoStorage.Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero()); + userInfo = &UsersInfoStorage->Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero()); } if (userInfo->Offset < (i64)StartOffset) userInfo->Offset = StartOffset; ReadTimestampForOffset(importantUser, *userInfo, ctx); } - for (auto& userInfoPair : UsersInfoStorage.GetAll()) { + for (auto& userInfoPair : UsersInfoStorage->GetAll()) { if (!important.contains(userInfoPair.first) && userInfoPair.second.Important && userInfoPair.second.LabeledCounters) { ctx.Send( Tablet, @@ -1911,7 +1978,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) { - AddDistrTx(ev->Release()); + PushBackDistrTx(ev->Release()); ProcessTxsAndUserActs(ctx); } @@ -2019,7 +2086,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex result.SetWriteTimestampEstimateMS(WriteTimestampEstimate.MilliSeconds()); if (!ev->Get()->ClientId.empty()) { - TUserInfo* userInfo = UsersInfoStorage.GetIfExists(ev->Get()->ClientId); + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(ev->Get()->ClientId); if (userInfo) { i64 offset = Max<i64>(userInfo->Offset, 0); result.SetClientOffset(userInfo->Offset); @@ -2081,7 +2148,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext TVector<ui64> resSpeed; resSpeed.resize(4); ui64 maxQuota = 0; - for (auto& userInfoPair : UsersInfoStorage.GetAll()) { + for (auto& userInfoPair : UsersInfoStorage->GetAll()) { auto& userInfo = userInfoPair.second; if (ev->Get()->ClientId.empty() || ev->Get()->ClientId == userInfo.User) { Y_VERIFY(userInfo.AvgReadBytes.size() == 4); @@ -2166,7 +2233,7 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor result.SetStartOffset(StartOffset); result.SetEndOffset(EndOffset); result.SetResponseTimestamp(ctx.Now().MilliSeconds()); - for (auto& pr : UsersInfoStorage.GetAll()) { + for (auto& pr : UsersInfoStorage->GetAll()) { TUserInfo& userInfo(pr.second); NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo(); clientInfo.SetClientId(pr.first); @@ -2239,7 +2306,7 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx) { - auto& userInfo = UsersInfoStorage.GetOrCreate(ev->Get()->ClientId, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(ev->Get()->ClientId, ctx); Y_VERIFY(userInfo.Offset >= -1, "Unexpected Offset: %" PRIi64, userInfo.Offset); ui64 offset = Max<i64>(userInfo.Offset, 0); auto ts = GetTime(userInfo, offset); @@ -2284,7 +2351,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) { - AddDistrTx(ev->Release()); + PushBackDistrTx(ev->Release()); ProcessTxsAndUserActs(ctx); } @@ -2366,7 +2433,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c //make readinfo class TReadAnswer answer(info.FormAnswer( - ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx), + ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx), info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode() )); @@ -2631,7 +2698,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct ctx.Send(Tablet, answer.Event.Release()); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, " waiting read cookie " << ev->Get()->Cookie << " partition " << Partition << " read timeout for " << res->User << " offset " << res->Offset); - auto& userInfo = UsersInfoStorage.GetOrCreate(res->User, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(res->User, ctx); userInfo.ForgetSubscription(ctx.Now()); OnReadRequestFinished(std::move(res.GetRef()), answer.Size); @@ -2791,7 +2858,7 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { Y_VERIFY(read->Offset <= EndOffset); - auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); if (!read->SessionId.empty()) { if (userInfo.Session != read->SessionId) { @@ -2817,7 +2884,7 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TA void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx) { auto read = ev->Get(); const TString& user = read->ClientId; - auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); ui64 offset = read->Offset; if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) { @@ -2865,7 +2932,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const } void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { - auto userInfo = UsersInfoStorage.GetIfExists(info.User); + auto userInfo = UsersInfoStorage->GetIfExists(info.User); Y_VERIFY(userInfo); if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) { @@ -3063,7 +3130,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo } void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) { - for (auto& userInfoPair : UsersInfoStorage.GetAll()) { + for (auto& userInfoPair : UsersInfoStorage->GetAll()) { if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) { ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx); } @@ -3072,7 +3139,7 @@ void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TAc void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) { ReadingTimestamp = false; - auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser); + auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser); if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) { LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, @@ -3138,7 +3205,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) { TString user = UpdateUserInfoTimestamp.front().first; ui64 readRuleGeneration = UpdateUserInfoTimestamp.front().second; UpdateUserInfoTimestamp.pop_front(); - auto userInfo = UsersInfoStorage.GetIfExists(user); + auto userInfo = UsersInfoStorage->GetIfExists(user); if (!userInfo || !userInfo->ReadScheduled || userInfo->ReadRuleGeneration != readRuleGeneration) continue; userInfo->ReadScheduled = false; @@ -3152,7 +3219,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) { void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) { ReadingTimestamp = false; - auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser); + auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser); if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) { ProcessTimestampRead(ctx); return; @@ -3293,7 +3360,7 @@ void TPartition::ReportCounters(const TActorContext& ctx) { } // per client counters const auto now = ctx.Now(); - for (auto& userInfoPair : UsersInfoStorage.GetAll()) { + for (auto& userInfoPair : UsersInfoStorage->GetAll()) { auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; @@ -3590,7 +3657,7 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) for (auto& user : AffectedUsers) { if (auto* actual = GetPendingUserIfExists(user)) { - TUserInfo& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); + TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); bool offsetHasChanged = (userInfo.Offset != actual->Offset); userInfo.Session = actual->Session; @@ -3614,12 +3681,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); } } else { - UsersInfoStorage.Remove(user, ctx); + UsersInfoStorage->Remove(user, ctx); } } for (auto& [consumer, important] : PendingSetImportant) { - if (auto* userInfo = UsersInfoStorage.GetIfExists(consumer); userInfo) { + if (auto* userInfo = UsersInfoStorage->GetIfExists(consumer); userInfo) { userInfo->SetImportant(important); } } @@ -3642,14 +3709,19 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) ProcessTxsAndUserActs(ctx); } -void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event) +void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event) { DistrTxs.emplace_back(std::move(event)); } -void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event) +void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event) { - DistrTxs.emplace_back(std::move(event)); + DistrTxs.emplace_back(std::move(event), true); +} + +void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event) +{ + DistrTxs.emplace_front(std::move(event), false); } void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx) @@ -3712,7 +3784,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx) if (!DistrTxs.empty()) { ProcessDistrTxs(ctx); - if (!DistrTxs.empty()) { + if (TxInProgress) { return; } } @@ -3728,6 +3800,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx) *PlanStep, *TxId); } AddCmdWriteUserInfos(request->Record); + AddCmdWriteConfig(request->Record); ctx.Send(Tablet, request.Release()); UsersInfoWriteInProgress = true; @@ -3762,7 +3835,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, break; } - if (!UsersInfoStorage.GetIfExists(consumer)) { + if (!UsersInfoStorage->GetIfExists(consumer)) { predicate = false; break; } @@ -3861,7 +3934,7 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi TSet<TString> hasReadRule; - for (auto& [consumer, info] : UsersInfoStorage.GetAll()) { + for (auto& [consumer, info] : UsersInfoStorage->GetAll()) { PendingReadFromTimestamp[consumer] = TInstant::Zero(); hasReadRule.insert(consumer); @@ -3927,17 +4000,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0); - UsersInfoStorage.UpdateConfig(event.Config); + UsersInfoStorage->UpdateConfig(event.Config); WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { - for (auto& userInfo : UsersInfoStorage.GetAll()) { + for (auto& userInfo : UsersInfoStorage->GetAll()) { userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2); } } for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); } @@ -3959,10 +4032,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& } } - SchedulePartitionConfigChanged(); + if (SendChangeConfigReply) { + SchedulePartitionConfigChanged(); + } ReportCounters(ctx); } +TString TPartition::GetKeyConfig() const +{ + return Sprintf("_config_%u", Partition); +} + void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, const TActorContext& ctx) { const NKikimrPQ::TPQTabletConfig& config = event.Config; @@ -3991,7 +4071,7 @@ void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePa //ReadTimestampForOffset(consumer, *userInfo, ctx); } - for (auto& [consumer, userInfo] : UsersInfoStorage.GetAll()) { + for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); PendingSetImportant[consumer] = false; @@ -4020,6 +4100,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx) Y_VERIFY(!ChangeConfig); ChangeConfig = t.ChangeConfig; + SendChangeConfigReply = t.SendReply; BeginChangePartitionConfig(*ChangeConfig, ctx); RemoveDistrTx(); @@ -4440,17 +4521,34 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request) } } +void TPartition::AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request) +{ + if (!ChangeConfig) { + return; + } + + TString key = GetKeyConfig(); + + TString data; + Y_VERIFY(ChangeConfig->Config.SerializeToString(&data)); + + auto write = request.AddCmdWrite(); + write->SetKey(key.Data(), key.Size()); + write->SetValue(data.Data(), data.Size()); + write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); +} + TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration) { - TUserInfo* userInfo = UsersInfoStorage.GetIfExists(user); + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(user); auto i = PendingUsersInfo.find(user); if (i == PendingUsersInfo.end()) { - auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage.CreateUserInfo(user, - ctx, - readRuleGeneration)); + auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, + ctx, + readRuleGeneration)); if (userInfo) { p->second.Session = userInfo->Session; @@ -5542,7 +5640,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u ui32 size = 0; Y_VERIFY(!info.User.empty()); - auto& userInfo = UsersInfoStorage.GetOrCreate(info.User, ctx); + auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx); if (subscription) { userInfo.ForgetSubscription(ctx.Now()); @@ -5573,7 +5671,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reading cookie " << cookie << ". All data is from uncompacted head."); TReadAnswer answer(info.FormAnswer( - ctx, EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx), + ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx), info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode() )); const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 4a68000f90d..880e5aea9f9 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -48,8 +48,10 @@ struct TTransaction { Y_VERIFY(Tx); } - explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig) : - ChangeConfig(changeConfig) + TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig, + bool sendReply) : + ChangeConfig(changeConfig), + SendReply(sendReply) { Y_VERIFY(ChangeConfig); } @@ -58,6 +60,7 @@ struct TTransaction { TMaybe<bool> Predicate; TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig; + bool SendReply; }; class TPartition : public TActorBootstrapped<TPartition> { @@ -205,8 +208,9 @@ private: void ProcessTxsAndUserActs(const TActorContext& ctx); void ContinueProcessTxsAndUserActs(const TActorContext& ctx); - void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event); - void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); + void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event); + void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); + void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); void RemoveDistrTx(); void ProcessDistrTxs(const TActorContext& ctx); void ProcessDistrTx(const TActorContext& ctx); @@ -249,6 +253,7 @@ private: void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request, ui64 step, ui64 txId); void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request); + void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request); void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated); @@ -277,10 +282,15 @@ private: const TActorContext& ctx); void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, const TActorContext& ctx); + TString GetKeyConfig() const; void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, const TActorContext& ctx); + void RequestConfig(const TActorContext& ctx); + void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx); + void Initialize(const TActorContext& ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; @@ -454,6 +464,7 @@ private: } private: enum EInitState { + WaitConfig, WaitDiskStatus, WaitInfoRange, WaitDataRange, @@ -465,6 +476,8 @@ private: ui64 TabletID; ui32 Partition; NKikimrPQ::TPQTabletConfig Config; + NKikimrPQ::TPQTabletConfig TabletConfig; + const TTabletCountersBase& Counters; NPersQueue::TTopicConverterPtr TopicConverter; bool IsLocalDC; TString DCId; @@ -514,7 +527,7 @@ private: bool IsServerless; TString FolderId; - TUsersInfoStorage UsersInfoStorage; + TMaybe<TUsersInfoStorage> UsersInfoStorage; // // user actions and transactions @@ -535,6 +548,7 @@ private: TMaybe<ui64> TxId; bool TxIdHasChanged = false; TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig; + bool SendChangeConfigReply = true; // // // diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 2db2d2145e8..d941812800e 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -25,15 +25,6 @@ Y_UNIT_TEST_SUITE(TPartitionTests) { namespace NHelpers { -struct TCreatePartitionParams { - ui32 Partition = 1; - ui64 Begin = 0; - ui64 End = 0; - TMaybe<ui64> PlanStep; - TMaybe<ui64> TxId; - TVector<TTransaction> Transactions; -}; - struct TCreateConsumerParams { TString Consumer; ui64 Offset = 0; @@ -44,15 +35,39 @@ struct TCreateConsumerParams { ui64 ReadRuleGeneration = 0; }; +struct TConfigParams { + ui64 Version = 0; + TVector<TCreateConsumerParams> Consumers; +}; + +struct TCreatePartitionParams { + ui32 Partition = 1; + ui64 Begin = 0; + ui64 End = 0; + TMaybe<ui64> PlanStep; + TMaybe<ui64> TxId; + TVector<TTransaction> Transactions; + TConfigParams Config; +}; + } class TPartitionFixture : public NUnitTest::TBaseFixture { protected: struct TUserInfoMatcher { + TMaybe<TString> Consumer; TMaybe<TString> Session; TMaybe<ui64> Offset; TMaybe<ui32> Generation; TMaybe<ui32> Step; + TMaybe<ui64> ReadRuleGeneration; + }; + + struct TDeleteRangeMatcher { + TMaybe<char> TypeInfo; + TMaybe<ui32> Partition; + TMaybe<char> Mark; + TMaybe<TString> Consumer; }; struct TCmdWriteMatcher { @@ -60,6 +75,7 @@ protected: TMaybe<ui64> PlanStep; TMaybe<ui64> TxId; THashMap<size_t, TUserInfoMatcher> UserInfos; + THashMap<size_t, TDeleteRangeMatcher> DeleteRanges; }; struct TProxyResponseMatcher { @@ -93,6 +109,10 @@ protected: TMaybe<ui32> Partition; }; + struct TChangePartitionConfigMatcher { + TMaybe<ui32> Partition; + }; + struct TTxOperationMatcher { TMaybe<ui32> Partition; TMaybe<TString> Consumer; @@ -110,16 +130,17 @@ protected: using TCreatePartitionParams = NHelpers::TCreatePartitionParams; using TCreateConsumerParams = NHelpers::TCreateConsumerParams; + using TConfigParams = NHelpers::TConfigParams; void SetUp(NUnitTest::TTestContext&) override; void TearDown(NUnitTest::TTestContext&) override; void CreatePartitionActor(ui32 partition, - const TVector<TCreateConsumerParams>& consumers, + const TConfigParams& config, bool newPartition, TVector<TTransaction> txs); void CreatePartition(const TCreatePartitionParams& params = {}, - const TVector<TCreateConsumerParams>& consumers = {}); + const TConfigParams& config = {}); void CreateSession(const TString& clientId, const TString& sessionId, @@ -148,6 +169,8 @@ protected: void WaitProxyResponse(const TProxyResponseMatcher &matcher = {}); void WaitErrorResponse(const TErrorMatcher& matcher = {}); + void WaitConfigRequest(); + void SendConfigResponse(const TConfigParams& config); void WaitDiskStatusRequest(); void SendDiskStatusResponse(); void WaitMetaReadRequest(); @@ -179,15 +202,26 @@ protected: void SendRollbackTx(ui64 step, ui64 txId); void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {}); + void SendChangePartitionConfig(const TConfigParams& config = {}); + void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {}); + TTransaction MakeTransaction(ui64 step, ui64 txId, TString consumer, ui64 begin, ui64 end, TMaybe<bool> predicate = Nothing()); + static NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, + const TVector<TCreateConsumerParams>& consumers); + TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; TActorId ActorId; + + NPersQueue::TTopicConverterPtr TopicConverter; + NKikimrPQ::TPQTabletConfig Config; + + TAutoPtr<TTabletCountersBase> TabletCounters; }; void TPartitionFixture::SetUp(NUnitTest::TTestContext&) @@ -203,8 +237,10 @@ void TPartitionFixture::TearDown(NUnitTest::TTestContext&) { } + + void TPartitionFixture::CreatePartitionActor(ui32 id, - const TVector<TCreateConsumerParams>& consumers, + const TConfigParams& config, bool newPartition, TVector<TTransaction> txs) { @@ -225,46 +261,42 @@ void TPartitionFixture::CreatePartitionActor(ui32 id, >; TAutoPtr<TCounters> counters(new TCounters()); - TAutoPtr<TTabletCountersBase> tabletCounters = counters->GetSecondTabletCounters().Release(); - - NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1"); - NPersQueue::TTopicConverterPtr topicConverter; - NKikimrPQ::TPQTabletConfig config; + TabletCounters = counters->GetSecondTabletCounters().Release(); - for (auto& c : consumers) { - config.AddReadRules(c.Consumer); - } - - config.SetTopicName("rt3.dc1--account--topic"); - config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); - config.SetFederationAccount("account"); - config.SetLocalDC(true); - config.SetYdbDatabasePath(""); + Config = MakeConfig(config.Version, + config.Consumers); - topicConverter = factory.MakeTopicConverter(config); + NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1"); + TopicConverter = factory.MakeTopicConverter(Config); auto actor = new NPQ::TPartition(Ctx->TabletId, id, Ctx->Edge, Ctx->Edge, - topicConverter, + TopicConverter, true, "dcId", false, - config, - *tabletCounters, + Config, + *TabletCounters, newPartition, std::move(txs)); ActorId = Ctx->Runtime->Register(actor); } void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params, - const TVector<TCreateConsumerParams>& consumers) + const TConfigParams& config) { if ((params.Begin == 0) && (params.End == 0)) { - CreatePartitionActor(params.Partition, consumers, true, {}); + CreatePartitionActor(params.Partition, config, true, {}); + + WaitConfigRequest(); + SendConfigResponse(params.Config); } else { - CreatePartitionActor(params.Partition, consumers, false, params.Transactions); + CreatePartitionActor(params.Partition, config, false, params.Transactions); + + WaitConfigRequest(); + SendConfigResponse(params.Config); WaitDiskStatusRequest(); SendDiskStatusResponse(); @@ -273,7 +305,7 @@ void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params, SendMetaReadResponse(params.PlanStep, params.TxId); WaitInfoRangeRequest(); - SendInfoRangeResponse(params.Partition, consumers); + SendInfoRangeResponse(params.Partition, params.Config.Consumers); WaitDataRangeRequest(); SendDataRangeResponse(params.Begin, params.End); @@ -349,8 +381,13 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE if (matcher.Count.Defined()) { - UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, + event->Record.CmdWriteSize() + event->Record.CmdDeleteRangeSize()); } + + // + // TxMeta + // if (matcher.PlanStep.Defined()) { NKikimrPQ::TPartitionTxMeta meta; UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue())); @@ -363,10 +400,12 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId()); } + + // + // CmdWrite + // for (auto& [index, userInfo] : matcher.UserInfos) { - if (matcher.Count.Defined()) { - UNIT_ASSERT(index < *matcher.Count); - } + UNIT_ASSERT(index < event->Record.CmdWriteSize()); NKikimrPQ::TUserInfo ud; UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue())); @@ -387,6 +426,31 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT(ud.HasOffset()); UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset()); } + if (userInfo.ReadRuleGeneration) { + UNIT_ASSERT(ud.HasReadRuleGeneration()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.ReadRuleGeneration, ud.GetReadRuleGeneration()); + } + } + + // + // CmdDeleteRange + // + for (auto& [index, deleteRange] : matcher.DeleteRanges) { + UNIT_ASSERT(index < event->Record.CmdDeleteRangeSize()); + UNIT_ASSERT(event->Record.GetCmdDeleteRange(index).HasRange()); + + auto& range = event->Record.GetCmdDeleteRange(index).GetRange(); + TString key = range.GetFrom(); + UNIT_ASSERT(key.Size() > (1 + 10 + 1)); // type + partition + mark + consumer + + if (deleteRange.Partition.Defined()) { + auto partition = FromString<ui32>(key.substr(1, 10)); + UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Partition, partition); + } + if (deleteRange.Consumer.Defined()) { + TString consumer = key.substr(12); + UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Consumer, consumer); + } } } @@ -453,6 +517,35 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher) } } +void TPartitionFixture::WaitConfigRequest() +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>(); + UNIT_ASSERT(event != nullptr); + + UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 1); +} + +void TPartitionFixture::SendConfigResponse(const TConfigParams& config) +{ + auto event = MakeHolder<TEvKeyValue::TEvResponse>(); + event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + + auto read = event->Record.AddReadResult(); + if (config.Consumers.empty()) { + read->SetStatus(NKikimrProto::NODATA); + } else { + read->SetStatus(NKikimrProto::OK); + + TString out; + Y_VERIFY(MakeConfig(config.Version, + config.Consumers).SerializeToString(&out)); + + read->SetValue(out); + } + + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + void TPartitionFixture::WaitDiskStatusRequest() { auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>(); @@ -688,6 +781,23 @@ void TPartitionFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher) } } +void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config) +{ + auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version, + config.Consumers)); + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + +void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher) +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvPartitionConfigChanged>(); + UNIT_ASSERT(event != nullptr); + + if (matcher.Partition) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition); + } +} + TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId, TString consumer, ui64 begin, ui64 end, @@ -699,6 +809,27 @@ TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId, return TTransaction(event, predicate); } +NKikimrPQ::TPQTabletConfig TPartitionFixture::MakeConfig(ui64 version, + const TVector<TCreateConsumerParams>& consumers) +{ + NKikimrPQ::TPQTabletConfig config; + + config.SetVersion(version); + + for (auto& c : consumers) { + config.AddReadRules(c.Consumer); + config.AddReadRuleGenerations(c.Generation); + } + + config.SetTopicName("rt3.dc1--account--topic"); + config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); + config.SetFederationAccount("account"); + config.SetLocalDC(true); + config.SetYdbDatabasePath(""); + + return config; +} + Y_UNIT_TEST_F(Batching, TPartitionFixture) { CreatePartition(); @@ -986,12 +1117,14 @@ Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture) txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2, true)); txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4)); - CreatePartition({.Partition=partition, + CreatePartition({ + .Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000, - .Transactions=std::move(txs)}, - {{.Consumer=consumer, .Offset=0, .Session=session}}); + .Transactions=std::move(txs), + .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}} + }); SendCommitTx(step, 11111); @@ -1015,12 +1148,14 @@ Y_UNIT_TEST_F(AfterRestart_2, TPartitionFixture) txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2)); txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4)); - CreatePartition({.Partition=partition, + CreatePartition({ + .Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000, - .Transactions=std::move(txs)}, - {{.Consumer=consumer, .Offset=0, .Session=session}}); + .Transactions=std::move(txs), + .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}} + }); WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true}); } @@ -1085,6 +1220,86 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true}); } +Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + + CreatePartition({ + .Partition=partition, .Begin=begin, .End=end, + .Config={.Consumers={ + {.Consumer="client-1", .Offset=0, .Session="session-1"}, + {.Consumer="client-2", .Offset=0, .Session="session-2"}, + {.Consumer="client-3", .Offset=0, .Session="session-3"} + }} + }); + + SendCalcPredicate(step, txId_1, "client-1", 0, 2); + SendChangePartitionConfig({.Version=2, + .Consumers={ + {.Consumer="client-1", .Generation=0}, + {.Consumer="client-3", .Generation=7} + }}); + // + // consumer 'client-2' will be deleted + // + SendCalcPredicate(step, txId_2, "client-2", 0, 2); + + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + SendCommitTx(step, txId_1); + + // + // consumer 'client-2' was deleted + // + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId_2); + + WaitCmdWrite({.Count=8, + .PlanStep=step, .TxId=txId_2, + .UserInfos={ + {1, {.Consumer="client-1", .Session="", .Offset=2}}, + {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}} + }, + .DeleteRanges={ + {0, {.Partition=3, .Consumer="client-2"}} + }}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitPartitionConfigChanged({.Partition=partition}); +} + +Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture) +{ + CreatePartition({ + .Partition=3, + .Begin=0, .End=10, + // + // конфиг партиции + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // конфиг таблетки + // + {.Version=2, .Consumers={{.Consumer="client-2"}}}); + + WaitCmdWrite({.Count=5, + .UserInfos={ + {0, {.Consumer="client-2", .Session="", .Offset=0, .ReadRuleGeneration=0}} + }, + .DeleteRanges={ + {0, {.Partition=3, .Consumer="client-1"}} + }}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); +} + } } |