summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <[email protected]>2023-02-16 13:31:24 +0300
committerabcdef <[email protected]>2023-02-16 13:31:24 +0300
commitdf5f90063aea983d2bcf315c8ffdee35d173b8c5 (patch)
tree021cbe7beb5644e397f8efc1004a93c33db33b32
parent61239a868c9b1e58172ca90cd8b78fbe8d279695 (diff)
-rw-r--r--ydb/core/persqueue/partition.cpp252
-rw-r--r--ydb/core/persqueue/partition.h24
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp303
3 files changed, 453 insertions, 126 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 66b5d08f51a..c79fa14bbe9 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -434,19 +434,22 @@ void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partit
void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
TSet<TString> hasReadRule;
- for (auto& userInfo : UsersInfoStorage.GetAll()) {
+ for (auto& userInfo : UsersInfoStorage->GetAll()) {
userInfo.second.ReadFromTimestamp = TInstant::Zero();
userInfo.second.HasReadRule = false;
hasReadRule.insert(userInfo.first);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
- auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx, 0);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, 0, "", 0, 0,
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);
+ //
+ // TODO(abcdef): заменить на вызов ProcessUserAct
+ //
AddUserAct(event.Release());
userInfo.Session = "";
userInfo.Offset = 0;
@@ -462,7 +465,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
userInfo.ReadFromTimestamp = ts;
}
for (auto& consumer : hasReadRule) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,
0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
if (!userInfo.Important && userInfo.LabeledCounters) {
@@ -471,18 +474,22 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
userInfo.Session = "";
userInfo.Offset = 0;
userInfo.Step = userInfo.Generation = 0;
+ //
+ // TODO(abcdef): заменить на вызов ProcessUserAct
+ //
AddUserAct(event.Release());
}
}
TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
- const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
+ const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters,
bool newPartition,
TVector<TTransaction> distrTxs)
: TabletID(tabletId)
, Partition(partition)
- , Config(config)
+ , TabletConfig(tabletConfig)
+ , Counters(counters)
, TopicConverter(topicConverter)
, IsLocalDC(isLocalDC)
, DCId(std::move(dcId))
@@ -491,21 +498,13 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, WriteInflightSize(0)
, Tablet(tablet)
, BlobCache(blobCache)
- , InitState(WaitDiskStatus)
+ , InitState(WaitConfig)
, PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB)
, NewHeadKey{TKey{}, 0, TInstant::Zero(), 0}
, BodySize(0)
, MaxWriteResponsesSize(0)
, GapSize(0)
- , CloudId(config.GetYcCloudId())
- , DbId(config.GetYdbDatabaseId())
- , DbPath(config.GetYdbDatabasePath())
, IsServerless(isServerless)
- , FolderId(config.GetYcFolderId())
- , UsersInfoStorage(
- DCId, TabletID, TopicConverter, Partition, counters, Config,
- CloudId, DbId, config.GetYdbDatabasePath(), IsServerless, FolderId
- )
, ReadingTimestamp(false)
, Cookie(0)
, InitDuration(TDuration::Zero())
@@ -524,17 +523,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
, ReservedSize(0)
, Channel(0)
- , TotalChannelWritesByHead(Config.GetPartitionConfig().GetNumChannels(), 0)
, WriteBufferIsFullCounter(nullptr)
, WriteLagMs(TDuration::Minutes(1), 100)
{
- if (Config.GetPartitionConfig().HasMirrorFrom()) {
- ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
- } else {
- ManageWriteTimestampEstimate = IsLocalDC;
- }
-
- TabletCounters.Populate(counters);
+ TabletCounters.Populate(Counters);
if (!distrTxs.empty()) {
std::move(distrTxs.begin(), distrTxs.end(),
@@ -712,7 +704,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
}
}
TABLEBODY() {
- for (auto& d: UsersInfoStorage.GetAll()) {
+ for (auto& d: UsersInfoStorage->GetAll()) {
TABLER() {
TABLED() {out << EncodeHtmlPcdata(d.first);}
TABLED() {out << d.second.Offset;}
@@ -736,8 +728,52 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, res, out.Str()));
}
+void TPartition::RequestConfig(const TActorContext& ctx)
+{
+ auto event = MakeHolder<TEvKeyValue::TEvRequest>();
+ auto read = event->Record.AddCmdRead();
+ read->SetKey(GetKeyConfig());
+ ctx.Send(Tablet, event.Release());
+}
+
+void TPartition::HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx)
+{
+ auto& response = res.GetReadResult(0);
+
+ switch (response.GetStatus()) {
+ case NKikimrProto::OK:
+ Y_VERIFY(Config.ParseFromString(response.GetValue()));
+ Y_VERIFY(Config.GetVersion() <= TabletConfig.GetVersion());
+ if (Config.GetVersion() < TabletConfig.GetVersion()) {
+ auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
+ TabletConfig);
+ PushFrontDistrTx(event.Release());
+ }
+ break;
+ case NKikimrProto::NODATA:
+ Config = TabletConfig;
+ break;
+ case NKikimrProto::ERROR:
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition << " can't read config");
+ ctx.Send(Tablet, new TEvents::TEvPoisonPill());
+ break;
+ default:
+ Cerr << "ERROR " << response.GetStatus() << "\n";
+ Y_FAIL("bad status");
+ };
+
+ InitState = WaitDiskStatus;
+ Initialize(ctx);
+}
void TPartition::Bootstrap(const TActorContext& ctx) {
+ Y_VERIFY(InitState == WaitConfig);
+ RequestConfig(ctx);
+ Become(&TThis::StateInit);
+}
+
+void TPartition::Initialize(const TActorContext& ctx) {
CreationTime = ctx.Now();
WriteCycleStartTime = ctx.Now();
WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(),
@@ -747,6 +783,11 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
LastUsedStorageMeterTimestamp = ctx.Now();
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
+ CloudId = Config.GetYcCloudId();
+ DbId = Config.GetYdbDatabaseId();
+ DbPath = Config.GetYdbDatabasePath();
+ FolderId = Config.GetYcFolderId();
+
CalcTopicWriteQuotaParams(AppData()->PQConfig,
IsLocalDC,
TopicConverter,
@@ -755,6 +796,25 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
TopicWriteQuoterPath,
TopicWriteQuotaResourcePath);
+ UsersInfoStorage.ConstructInPlace(DCId,
+ TabletID,
+ TopicConverter,
+ Partition,
+ Counters,
+ Config,
+ CloudId,
+ DbId,
+ Config.GetYdbDatabasePath(),
+ IsServerless,
+ FolderId);
+ TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels());
+
+ if (Config.GetPartitionConfig().HasMirrorFrom()) {
+ ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
+ } else {
+ ManageWriteTimestampEstimate = IsLocalDC;
+ }
+
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(),
Partition,
@@ -764,7 +824,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
Partition));
}
- UsersInfoStorage.Init(Tablet, SelfId(), ctx);
+ UsersInfoStorage->Init(Tablet, SelfId(), ctx);
Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
ui32 border = LEVEL0;
@@ -787,7 +847,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
}
for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto &userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
+ auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
}
@@ -982,7 +1042,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
res->Record.SetCookie(*(it->Cookie));
ctx.Send(it->Sender, res.Release());
if (!it->ClientId.empty()) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(it->ClientId, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(it->ClientId, ctx);
userInfo.ForgetSubscription(ctx.Now());
}
it = HasDataRequests.erase(it);
@@ -1002,7 +1062,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
res->Record.SetCookie(*(it->Request.Cookie));
ctx.Send(it->Request.Sender, res.Release());
if (!it->Request.ClientId.empty()) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(it->Request.ClientId, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(it->Request.ClientId, ctx);
userInfo.ForgetSubscription(ctx.Now());
}
HasDataRequests.erase(jt);
@@ -1020,7 +1080,7 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
auto now = ctx.Now();
WriteQuota->Update(now);
- for (auto &c : UsersInfoStorage.GetAll()) {
+ for (auto &c : UsersInfoStorage->GetAll()) {
while (true) {
c.second.ReadQuota.Update(now);
if (!c.second.ReadQuota.CanExaust() && !c.second.ReadRequests.empty()) {
@@ -1077,7 +1137,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
ProcessHasDataRequests(ctx);
auto now = ctx.Now();
- for (auto& userInfo : UsersInfoStorage.GetAll()) {
+ for (auto& userInfo : UsersInfoStorage->GetAll()) {
userInfo.second.UpdateReadingTimeAndState(now);
for (auto& avg : userInfo.second.AvgReadBytes) {
avg.Update(now);
@@ -1162,7 +1222,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites,
const auto& partConfig = Config.GetPartitionConfig();
ui64 minOffset = EndOffset;
for (const auto& importantClientId : partConfig.GetImportantClientId()) {
- TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantClientId);
+ TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
ui64 curOffset = StartOffset;
if (userInfo && userInfo->Offset >= 0) //-1 means no offset
curOffset = userInfo->Offset;
@@ -1285,7 +1345,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
Y_VERIFY(res.second);
if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(record.GetClientId(), ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
++userInfo.Subscriptions;
userInfo.UpdateReadOffset((i64)EndOffset - 1, ctx.Now(), ctx.Now(), ctx.Now());
userInfo.UpdateReadingTimeAndState(ctx.Now());
@@ -1302,7 +1362,7 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
}
void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& /*ctx*/) {
- auto userInfo = UsersInfoStorage.GetIfExists(ev->Get()->User);
+ auto userInfo = UsersInfoStorage->GetIfExists(ev->Get()->User);
if (userInfo && userInfo->ReadSpeedLimiter) {
auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline);
TabletCounters.Populate(*diff.Get());
@@ -1336,7 +1396,10 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
if (Mirrorer) {
Send(Mirrorer->Actor, new TEvents::TEvPoisonPill());
}
- UsersInfoStorage.Clear(ctx);
+
+ if (UsersInfoStorage.Defined()) {
+ UsersInfoStorage->Clear(ctx);
+ }
Die(ctx);
}
@@ -1504,9 +1567,9 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe
} else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkProtoSourceId) {
SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now());
} else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) {
- UsersInfoStorage.Parse(*key, pair.GetValue(), ctx);
+ UsersInfoStorage->Parse(*key, pair.GetValue(), ctx);
} else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) {
- UsersInfoStorage.ParseDeprecated(*key, pair.GetValue(), ctx);
+ UsersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
}
}
//make next step
@@ -1729,6 +1792,10 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo
DiskIsFull = !diskIsOk;
switch(InitState) {
+ case WaitConfig:
+ Y_VERIFY(response.ReadResultSize() == 1);
+ HandleConfig(response, ctx);
+ break;
case WaitDiskStatus:
Y_VERIFY(response.GetStatusResultSize());
HandleGetDiskStatus(response, ctx);
@@ -1757,7 +1824,7 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo
void TPartition::InitComplete(const TActorContext& ctx) {
if (StartOffset == EndOffset && EndOffset == 0) {
- for (auto& [user, info] : UsersInfoStorage.GetAll()) {
+ for (auto& [user, info] : UsersInfoStorage->GetAll()) {
if (info.Offset > 0 && StartOffset < (ui64)info.Offset) {
Head.Offset = EndOffset = StartOffset = info.Offset;
}
@@ -1810,7 +1877,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
InitUserInfoForImportantClients(ctx);
- for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+ for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
Y_VERIFY(userInfoPair.second.Offset >= 0);
ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
}
@@ -1834,7 +1901,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
void TPartition::UpdateUserInfoEndOffset(const TInstant& now) {
- for (auto& userInfo : UsersInfoStorage.GetAll()) {
+ for (auto& userInfo : UsersInfoStorage->GetAll()) {
userInfo.second.EndOffset = (i64)EndOffset;
userInfo.second.UpdateReadingTimeAndState(now);
}
@@ -1885,20 +1952,20 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
- TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantUser);
+ TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
userInfo->SetImportant(true);
continue;
}
if (!userInfo) {
- userInfo = &UsersInfoStorage.Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero());
+ userInfo = &UsersInfoStorage->Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero());
}
if (userInfo->Offset < (i64)StartOffset)
userInfo->Offset = StartOffset;
ReadTimestampForOffset(importantUser, *userInfo, ctx);
}
- for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+ for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
if (!important.contains(userInfoPair.first) && userInfoPair.second.Important && userInfoPair.second.LabeledCounters) {
ctx.Send(
Tablet,
@@ -1911,7 +1978,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
- AddDistrTx(ev->Release());
+ PushBackDistrTx(ev->Release());
ProcessTxsAndUserActs(ctx);
}
@@ -2019,7 +2086,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex
result.SetWriteTimestampEstimateMS(WriteTimestampEstimate.MilliSeconds());
if (!ev->Get()->ClientId.empty()) {
- TUserInfo* userInfo = UsersInfoStorage.GetIfExists(ev->Get()->ClientId);
+ TUserInfo* userInfo = UsersInfoStorage->GetIfExists(ev->Get()->ClientId);
if (userInfo) {
i64 offset = Max<i64>(userInfo->Offset, 0);
result.SetClientOffset(userInfo->Offset);
@@ -2081,7 +2148,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
TVector<ui64> resSpeed;
resSpeed.resize(4);
ui64 maxQuota = 0;
- for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+ for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
auto& userInfo = userInfoPair.second;
if (ev->Get()->ClientId.empty() || ev->Get()->ClientId == userInfo.User) {
Y_VERIFY(userInfo.AvgReadBytes.size() == 4);
@@ -2166,7 +2233,7 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor
result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
result.SetResponseTimestamp(ctx.Now().MilliSeconds());
- for (auto& pr : UsersInfoStorage.GetAll()) {
+ for (auto& pr : UsersInfoStorage->GetAll()) {
TUserInfo& userInfo(pr.second);
NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo();
clientInfo.SetClientId(pr.first);
@@ -2239,7 +2306,7 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(ev->Get()->ClientId, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(ev->Get()->ClientId, ctx);
Y_VERIFY(userInfo.Offset >= -1, "Unexpected Offset: %" PRIi64, userInfo.Offset);
ui64 offset = Max<i64>(userInfo.Offset, 0);
auto ts = GetTime(userInfo, offset);
@@ -2284,7 +2351,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
{
- AddDistrTx(ev->Release());
+ PushBackDistrTx(ev->Release());
ProcessTxsAndUserActs(ctx);
}
@@ -2366,7 +2433,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
//make readinfo class
TReadAnswer answer(info.FormAnswer(
- ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx),
+ ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
));
@@ -2631,7 +2698,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
ctx.Send(Tablet, answer.Event.Release());
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, " waiting read cookie " << ev->Get()->Cookie
<< " partition " << Partition << " read timeout for " << res->User << " offset " << res->Offset);
- auto& userInfo = UsersInfoStorage.GetOrCreate(res->User, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(res->User, ctx);
userInfo.ForgetSubscription(ctx.Now());
OnReadRequestFinished(std::move(res.GetRef()), answer.Size);
@@ -2791,7 +2858,7 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
Y_VERIFY(read->Offset <= EndOffset);
- auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
if (!read->SessionId.empty()) {
if (userInfo.Session != read->SessionId) {
@@ -2817,7 +2884,7 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TA
void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx) {
auto read = ev->Get();
const TString& user = read->ClientId;
- auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
ui64 offset = read->Offset;
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) {
@@ -2865,7 +2932,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
}
void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
- auto userInfo = UsersInfoStorage.GetIfExists(info.User);
+ auto userInfo = UsersInfoStorage->GetIfExists(info.User);
Y_VERIFY(userInfo);
if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) {
@@ -3063,7 +3130,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
}
void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) {
- for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+ for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) {
ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
}
@@ -3072,7 +3139,7 @@ void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TAc
void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) {
ReadingTimestamp = false;
- auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
+ auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser);
if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
@@ -3138,7 +3205,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
TString user = UpdateUserInfoTimestamp.front().first;
ui64 readRuleGeneration = UpdateUserInfoTimestamp.front().second;
UpdateUserInfoTimestamp.pop_front();
- auto userInfo = UsersInfoStorage.GetIfExists(user);
+ auto userInfo = UsersInfoStorage->GetIfExists(user);
if (!userInfo || !userInfo->ReadScheduled || userInfo->ReadRuleGeneration != readRuleGeneration)
continue;
userInfo->ReadScheduled = false;
@@ -3152,7 +3219,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
ReadingTimestamp = false;
- auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
+ auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser);
if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
ProcessTimestampRead(ctx);
return;
@@ -3293,7 +3360,7 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
}
// per client counters
const auto now = ctx.Now();
- for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+ for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
@@ -3590,7 +3657,7 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
for (auto& user : AffectedUsers) {
if (auto* actual = GetPendingUserIfExists(user)) {
- TUserInfo& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+ TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
bool offsetHasChanged = (userInfo.Offset != actual->Offset);
userInfo.Session = actual->Session;
@@ -3614,12 +3681,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
}
} else {
- UsersInfoStorage.Remove(user, ctx);
+ UsersInfoStorage->Remove(user, ctx);
}
}
for (auto& [consumer, important] : PendingSetImportant) {
- if (auto* userInfo = UsersInfoStorage.GetIfExists(consumer); userInfo) {
+ if (auto* userInfo = UsersInfoStorage->GetIfExists(consumer); userInfo) {
userInfo->SetImportant(important);
}
}
@@ -3642,14 +3709,19 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
ProcessTxsAndUserActs(ctx);
}
-void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
+void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
{
DistrTxs.emplace_back(std::move(event));
}
-void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
+void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
{
- DistrTxs.emplace_back(std::move(event));
+ DistrTxs.emplace_back(std::move(event), true);
+}
+
+void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
+{
+ DistrTxs.emplace_front(std::move(event), false);
}
void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
@@ -3712,7 +3784,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
if (!DistrTxs.empty()) {
ProcessDistrTxs(ctx);
- if (!DistrTxs.empty()) {
+ if (TxInProgress) {
return;
}
}
@@ -3728,6 +3800,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
*PlanStep, *TxId);
}
AddCmdWriteUserInfos(request->Record);
+ AddCmdWriteConfig(request->Record);
ctx.Send(Tablet, request.Release());
UsersInfoWriteInProgress = true;
@@ -3762,7 +3835,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
break;
}
- if (!UsersInfoStorage.GetIfExists(consumer)) {
+ if (!UsersInfoStorage->GetIfExists(consumer)) {
predicate = false;
break;
}
@@ -3861,7 +3934,7 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi
TSet<TString> hasReadRule;
- for (auto& [consumer, info] : UsersInfoStorage.GetAll()) {
+ for (auto& [consumer, info] : UsersInfoStorage->GetAll()) {
PendingReadFromTimestamp[consumer] = TInstant::Zero();
hasReadRule.insert(consumer);
@@ -3927,17 +4000,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0);
- UsersInfoStorage.UpdateConfig(event.Config);
+ UsersInfoStorage->UpdateConfig(event.Config);
WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
- for (auto& userInfo : UsersInfoStorage.GetAll()) {
+ for (auto& userInfo : UsersInfoStorage->GetAll()) {
userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2);
}
}
for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
}
@@ -3959,10 +4032,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
}
}
- SchedulePartitionConfigChanged();
+ if (SendChangeConfigReply) {
+ SchedulePartitionConfigChanged();
+ }
ReportCounters(ctx);
}
+TString TPartition::GetKeyConfig() const
+{
+ return Sprintf("_config_%u", Partition);
+}
+
void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
const TActorContext& ctx) {
const NKikimrPQ::TPQTabletConfig& config = event.Config;
@@ -3991,7 +4071,7 @@ void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePa
//ReadTimestampForOffset(consumer, *userInfo, ctx);
}
- for (auto& [consumer, userInfo] : UsersInfoStorage.GetAll()) {
+ for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
PendingSetImportant[consumer] = false;
@@ -4020,6 +4100,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
Y_VERIFY(!ChangeConfig);
ChangeConfig = t.ChangeConfig;
+ SendChangeConfigReply = t.SendReply;
BeginChangePartitionConfig(*ChangeConfig, ctx);
RemoveDistrTx();
@@ -4440,17 +4521,34 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
}
}
+void TPartition::AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request)
+{
+ if (!ChangeConfig) {
+ return;
+ }
+
+ TString key = GetKeyConfig();
+
+ TString data;
+ Y_VERIFY(ChangeConfig->Config.SerializeToString(&data));
+
+ auto write = request.AddCmdWrite();
+ write->SetKey(key.Data(), key.Size());
+ write->SetValue(data.Data(), data.Size());
+ write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
+}
+
TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user,
const TActorContext& ctx,
TMaybe<ui64> readRuleGeneration)
{
- TUserInfo* userInfo = UsersInfoStorage.GetIfExists(user);
+ TUserInfo* userInfo = UsersInfoStorage->GetIfExists(user);
auto i = PendingUsersInfo.find(user);
if (i == PendingUsersInfo.end()) {
- auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage.CreateUserInfo(user,
- ctx,
- readRuleGeneration));
+ auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user,
+ ctx,
+ readRuleGeneration));
if (userInfo) {
p->second.Session = userInfo->Session;
@@ -5542,7 +5640,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
ui32 size = 0;
Y_VERIFY(!info.User.empty());
- auto& userInfo = UsersInfoStorage.GetOrCreate(info.User, ctx);
+ auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx);
if (subscription) {
userInfo.ForgetSubscription(ctx.Now());
@@ -5573,7 +5671,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reading cookie " << cookie << ". All data is from uncompacted head.");
TReadAnswer answer(info.FormAnswer(
- ctx, EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx),
+ ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
));
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 4a68000f90d..880e5aea9f9 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -48,8 +48,10 @@ struct TTransaction {
Y_VERIFY(Tx);
}
- explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig) :
- ChangeConfig(changeConfig)
+ TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig,
+ bool sendReply) :
+ ChangeConfig(changeConfig),
+ SendReply(sendReply)
{
Y_VERIFY(ChangeConfig);
}
@@ -58,6 +60,7 @@ struct TTransaction {
TMaybe<bool> Predicate;
TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
+ bool SendReply;
};
class TPartition : public TActorBootstrapped<TPartition> {
@@ -205,8 +208,9 @@ private:
void ProcessTxsAndUserActs(const TActorContext& ctx);
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
- void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
- void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
+ void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
+ void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
+ void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
void RemoveDistrTx();
void ProcessDistrTxs(const TActorContext& ctx);
void ProcessDistrTx(const TActorContext& ctx);
@@ -249,6 +253,7 @@ private:
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
ui64 step, ui64 txId);
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
+ void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
@@ -277,10 +282,15 @@ private:
const TActorContext& ctx);
void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
const TActorContext& ctx);
+ TString GetKeyConfig() const;
void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
const TActorContext& ctx);
+ void RequestConfig(const TActorContext& ctx);
+ void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx);
+ void Initialize(const TActorContext& ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -454,6 +464,7 @@ private:
}
private:
enum EInitState {
+ WaitConfig,
WaitDiskStatus,
WaitInfoRange,
WaitDataRange,
@@ -465,6 +476,8 @@ private:
ui64 TabletID;
ui32 Partition;
NKikimrPQ::TPQTabletConfig Config;
+ NKikimrPQ::TPQTabletConfig TabletConfig;
+ const TTabletCountersBase& Counters;
NPersQueue::TTopicConverterPtr TopicConverter;
bool IsLocalDC;
TString DCId;
@@ -514,7 +527,7 @@ private:
bool IsServerless;
TString FolderId;
- TUsersInfoStorage UsersInfoStorage;
+ TMaybe<TUsersInfoStorage> UsersInfoStorage;
//
// user actions and transactions
@@ -535,6 +548,7 @@ private:
TMaybe<ui64> TxId;
bool TxIdHasChanged = false;
TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
+ bool SendChangeConfigReply = true;
//
//
//
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 2db2d2145e8..d941812800e 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -25,15 +25,6 @@ Y_UNIT_TEST_SUITE(TPartitionTests) {
namespace NHelpers {
-struct TCreatePartitionParams {
- ui32 Partition = 1;
- ui64 Begin = 0;
- ui64 End = 0;
- TMaybe<ui64> PlanStep;
- TMaybe<ui64> TxId;
- TVector<TTransaction> Transactions;
-};
-
struct TCreateConsumerParams {
TString Consumer;
ui64 Offset = 0;
@@ -44,15 +35,39 @@ struct TCreateConsumerParams {
ui64 ReadRuleGeneration = 0;
};
+struct TConfigParams {
+ ui64 Version = 0;
+ TVector<TCreateConsumerParams> Consumers;
+};
+
+struct TCreatePartitionParams {
+ ui32 Partition = 1;
+ ui64 Begin = 0;
+ ui64 End = 0;
+ TMaybe<ui64> PlanStep;
+ TMaybe<ui64> TxId;
+ TVector<TTransaction> Transactions;
+ TConfigParams Config;
+};
+
}
class TPartitionFixture : public NUnitTest::TBaseFixture {
protected:
struct TUserInfoMatcher {
+ TMaybe<TString> Consumer;
TMaybe<TString> Session;
TMaybe<ui64> Offset;
TMaybe<ui32> Generation;
TMaybe<ui32> Step;
+ TMaybe<ui64> ReadRuleGeneration;
+ };
+
+ struct TDeleteRangeMatcher {
+ TMaybe<char> TypeInfo;
+ TMaybe<ui32> Partition;
+ TMaybe<char> Mark;
+ TMaybe<TString> Consumer;
};
struct TCmdWriteMatcher {
@@ -60,6 +75,7 @@ protected:
TMaybe<ui64> PlanStep;
TMaybe<ui64> TxId;
THashMap<size_t, TUserInfoMatcher> UserInfos;
+ THashMap<size_t, TDeleteRangeMatcher> DeleteRanges;
};
struct TProxyResponseMatcher {
@@ -93,6 +109,10 @@ protected:
TMaybe<ui32> Partition;
};
+ struct TChangePartitionConfigMatcher {
+ TMaybe<ui32> Partition;
+ };
+
struct TTxOperationMatcher {
TMaybe<ui32> Partition;
TMaybe<TString> Consumer;
@@ -110,16 +130,17 @@ protected:
using TCreatePartitionParams = NHelpers::TCreatePartitionParams;
using TCreateConsumerParams = NHelpers::TCreateConsumerParams;
+ using TConfigParams = NHelpers::TConfigParams;
void SetUp(NUnitTest::TTestContext&) override;
void TearDown(NUnitTest::TTestContext&) override;
void CreatePartitionActor(ui32 partition,
- const TVector<TCreateConsumerParams>& consumers,
+ const TConfigParams& config,
bool newPartition,
TVector<TTransaction> txs);
void CreatePartition(const TCreatePartitionParams& params = {},
- const TVector<TCreateConsumerParams>& consumers = {});
+ const TConfigParams& config = {});
void CreateSession(const TString& clientId,
const TString& sessionId,
@@ -148,6 +169,8 @@ protected:
void WaitProxyResponse(const TProxyResponseMatcher &matcher = {});
void WaitErrorResponse(const TErrorMatcher& matcher = {});
+ void WaitConfigRequest();
+ void SendConfigResponse(const TConfigParams& config);
void WaitDiskStatusRequest();
void SendDiskStatusResponse();
void WaitMetaReadRequest();
@@ -179,15 +202,26 @@ protected:
void SendRollbackTx(ui64 step, ui64 txId);
void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {});
+ void SendChangePartitionConfig(const TConfigParams& config = {});
+ void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {});
+
TTransaction MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
TMaybe<bool> predicate = Nothing());
+ static NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
+ const TVector<TCreateConsumerParams>& consumers);
+
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
TActorId ActorId;
+
+ NPersQueue::TTopicConverterPtr TopicConverter;
+ NKikimrPQ::TPQTabletConfig Config;
+
+ TAutoPtr<TTabletCountersBase> TabletCounters;
};
void TPartitionFixture::SetUp(NUnitTest::TTestContext&)
@@ -203,8 +237,10 @@ void TPartitionFixture::TearDown(NUnitTest::TTestContext&)
{
}
+
+
void TPartitionFixture::CreatePartitionActor(ui32 id,
- const TVector<TCreateConsumerParams>& consumers,
+ const TConfigParams& config,
bool newPartition,
TVector<TTransaction> txs)
{
@@ -225,46 +261,42 @@ void TPartitionFixture::CreatePartitionActor(ui32 id,
>;
TAutoPtr<TCounters> counters(new TCounters());
- TAutoPtr<TTabletCountersBase> tabletCounters = counters->GetSecondTabletCounters().Release();
-
- NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
- NPersQueue::TTopicConverterPtr topicConverter;
- NKikimrPQ::TPQTabletConfig config;
+ TabletCounters = counters->GetSecondTabletCounters().Release();
- for (auto& c : consumers) {
- config.AddReadRules(c.Consumer);
- }
-
- config.SetTopicName("rt3.dc1--account--topic");
- config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
- config.SetFederationAccount("account");
- config.SetLocalDC(true);
- config.SetYdbDatabasePath("");
+ Config = MakeConfig(config.Version,
+ config.Consumers);
- topicConverter = factory.MakeTopicConverter(config);
+ NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
+ TopicConverter = factory.MakeTopicConverter(Config);
auto actor = new NPQ::TPartition(Ctx->TabletId,
id,
Ctx->Edge,
Ctx->Edge,
- topicConverter,
+ TopicConverter,
true,
"dcId",
false,
- config,
- *tabletCounters,
+ Config,
+ *TabletCounters,
newPartition,
std::move(txs));
ActorId = Ctx->Runtime->Register(actor);
}
void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params,
- const TVector<TCreateConsumerParams>& consumers)
+ const TConfigParams& config)
{
if ((params.Begin == 0) && (params.End == 0)) {
- CreatePartitionActor(params.Partition, consumers, true, {});
+ CreatePartitionActor(params.Partition, config, true, {});
+
+ WaitConfigRequest();
+ SendConfigResponse(params.Config);
} else {
- CreatePartitionActor(params.Partition, consumers, false, params.Transactions);
+ CreatePartitionActor(params.Partition, config, false, params.Transactions);
+
+ WaitConfigRequest();
+ SendConfigResponse(params.Config);
WaitDiskStatusRequest();
SendDiskStatusResponse();
@@ -273,7 +305,7 @@ void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params,
SendMetaReadResponse(params.PlanStep, params.TxId);
WaitInfoRangeRequest();
- SendInfoRangeResponse(params.Partition, consumers);
+ SendInfoRangeResponse(params.Partition, params.Config.Consumers);
WaitDataRangeRequest();
SendDataRangeResponse(params.Begin, params.End);
@@ -349,8 +381,13 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE
if (matcher.Count.Defined()) {
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Count,
+ event->Record.CmdWriteSize() + event->Record.CmdDeleteRangeSize());
}
+
+ //
+ // TxMeta
+ //
if (matcher.PlanStep.Defined()) {
NKikimrPQ::TPartitionTxMeta meta;
UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue()));
@@ -363,10 +400,12 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId());
}
+
+ //
+ // CmdWrite
+ //
for (auto& [index, userInfo] : matcher.UserInfos) {
- if (matcher.Count.Defined()) {
- UNIT_ASSERT(index < *matcher.Count);
- }
+ UNIT_ASSERT(index < event->Record.CmdWriteSize());
NKikimrPQ::TUserInfo ud;
UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue()));
@@ -387,6 +426,31 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT(ud.HasOffset());
UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset());
}
+ if (userInfo.ReadRuleGeneration) {
+ UNIT_ASSERT(ud.HasReadRuleGeneration());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.ReadRuleGeneration, ud.GetReadRuleGeneration());
+ }
+ }
+
+ //
+ // CmdDeleteRange
+ //
+ for (auto& [index, deleteRange] : matcher.DeleteRanges) {
+ UNIT_ASSERT(index < event->Record.CmdDeleteRangeSize());
+ UNIT_ASSERT(event->Record.GetCmdDeleteRange(index).HasRange());
+
+ auto& range = event->Record.GetCmdDeleteRange(index).GetRange();
+ TString key = range.GetFrom();
+ UNIT_ASSERT(key.Size() > (1 + 10 + 1)); // type + partition + mark + consumer
+
+ if (deleteRange.Partition.Defined()) {
+ auto partition = FromString<ui32>(key.substr(1, 10));
+ UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Partition, partition);
+ }
+ if (deleteRange.Consumer.Defined()) {
+ TString consumer = key.substr(12);
+ UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Consumer, consumer);
+ }
}
}
@@ -453,6 +517,35 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher)
}
}
+void TPartitionFixture::WaitConfigRequest()
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
+ UNIT_ASSERT(event != nullptr);
+
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 1);
+}
+
+void TPartitionFixture::SendConfigResponse(const TConfigParams& config)
+{
+ auto event = MakeHolder<TEvKeyValue::TEvResponse>();
+ event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+
+ auto read = event->Record.AddReadResult();
+ if (config.Consumers.empty()) {
+ read->SetStatus(NKikimrProto::NODATA);
+ } else {
+ read->SetStatus(NKikimrProto::OK);
+
+ TString out;
+ Y_VERIFY(MakeConfig(config.Version,
+ config.Consumers).SerializeToString(&out));
+
+ read->SetValue(out);
+ }
+
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
void TPartitionFixture::WaitDiskStatusRequest()
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
@@ -688,6 +781,23 @@ void TPartitionFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher)
}
}
+void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config)
+{
+ auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version,
+ config.Consumers));
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher)
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvPartitionConfigChanged>();
+ UNIT_ASSERT(event != nullptr);
+
+ if (matcher.Partition) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition);
+ }
+}
+
TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
@@ -699,6 +809,27 @@ TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
return TTransaction(event, predicate);
}
+NKikimrPQ::TPQTabletConfig TPartitionFixture::MakeConfig(ui64 version,
+ const TVector<TCreateConsumerParams>& consumers)
+{
+ NKikimrPQ::TPQTabletConfig config;
+
+ config.SetVersion(version);
+
+ for (auto& c : consumers) {
+ config.AddReadRules(c.Consumer);
+ config.AddReadRuleGenerations(c.Generation);
+ }
+
+ config.SetTopicName("rt3.dc1--account--topic");
+ config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
+ config.SetFederationAccount("account");
+ config.SetLocalDC(true);
+ config.SetYdbDatabasePath("");
+
+ return config;
+}
+
Y_UNIT_TEST_F(Batching, TPartitionFixture)
{
CreatePartition();
@@ -986,12 +1117,14 @@ Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture)
txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2, true));
txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
- CreatePartition({.Partition=partition,
+ CreatePartition({
+ .Partition=partition,
.Begin=begin,
.End=end,
.PlanStep=step, .TxId=10000,
- .Transactions=std::move(txs)},
- {{.Consumer=consumer, .Offset=0, .Session=session}});
+ .Transactions=std::move(txs),
+ .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}}
+ });
SendCommitTx(step, 11111);
@@ -1015,12 +1148,14 @@ Y_UNIT_TEST_F(AfterRestart_2, TPartitionFixture)
txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2));
txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
- CreatePartition({.Partition=partition,
+ CreatePartition({
+ .Partition=partition,
.Begin=begin,
.End=end,
.PlanStep=step, .TxId=10000,
- .Transactions=std::move(txs)},
- {{.Consumer=consumer, .Offset=0, .Session=session}});
+ .Transactions=std::move(txs),
+ .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}}
+ });
WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true});
}
@@ -1085,6 +1220,86 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture)
WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true});
}
+Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+
+ CreatePartition({
+ .Partition=partition, .Begin=begin, .End=end,
+ .Config={.Consumers={
+ {.Consumer="client-1", .Offset=0, .Session="session-1"},
+ {.Consumer="client-2", .Offset=0, .Session="session-2"},
+ {.Consumer="client-3", .Offset=0, .Session="session-3"}
+ }}
+ });
+
+ SendCalcPredicate(step, txId_1, "client-1", 0, 2);
+ SendChangePartitionConfig({.Version=2,
+ .Consumers={
+ {.Consumer="client-1", .Generation=0},
+ {.Consumer="client-3", .Generation=7}
+ }});
+ //
+ // consumer 'client-2' will be deleted
+ //
+ SendCalcPredicate(step, txId_2, "client-2", 0, 2);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+ SendCommitTx(step, txId_1);
+
+ //
+ // consumer 'client-2' was deleted
+ //
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId_2);
+
+ WaitCmdWrite({.Count=8,
+ .PlanStep=step, .TxId=txId_2,
+ .UserInfos={
+ {1, {.Consumer="client-1", .Session="", .Offset=2}},
+ {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}}
+ },
+ .DeleteRanges={
+ {0, {.Partition=3, .Consumer="client-2"}}
+ }});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ WaitPartitionConfigChanged({.Partition=partition});
+}
+
+Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture)
+{
+ CreatePartition({
+ .Partition=3,
+ .Begin=0, .End=10,
+ //
+ // конфиг партиции
+ //
+ .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+ },
+ //
+ // конфиг таблетки
+ //
+ {.Version=2, .Consumers={{.Consumer="client-2"}}});
+
+ WaitCmdWrite({.Count=5,
+ .UserInfos={
+ {0, {.Consumer="client-2", .Session="", .Offset=0, .ReadRuleGeneration=0}}
+ },
+ .DeleteRanges={
+ {0, {.Partition=3, .Consumer="client-1"}}
+ }});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+}
+
}
}