summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <[email protected]>2022-10-12 18:54:20 +0300
committeralexnick <[email protected]>2022-10-12 18:54:20 +0300
commit89955aa4582c9c9fa083bc5d7722e1a168f53922 (patch)
tree061c7c1dab46a0d845958cd7ac9009146e6de251
parent8e9ae82ce24b775c308380715bdac8ab4799343f (diff)
fix for cdc topic creation
fix for internal state fix for verify fix for cdc topic creation
-rw-r--r--ydb/core/persqueue/pq_impl.cpp29
1 files changed, 22 insertions, 7 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index d4feed02f46..3544393bc84 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -686,16 +686,29 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
{
if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK ||
- resp.WriteResultSize() != 1 ||
- resp.GetWriteResult(0).GetStatus() != NKikimrProto::OK)
- {
+ resp.WriteResultSize() < 1) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
<< " Config write error: " << resp.DebugString() << " " << ctx.SelfID);
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
return;
}
+ for (const auto& res : resp.GetWriteResult()) {
+ if (res.GetStatus() != NKikimrProto::OK) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
+ << " Config write error: " << resp.DebugString() << " " << ctx.SelfID);
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
+ }
+
+ if (resp.WriteResultSize() > 1) {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
+ << " restarting - have some registering of message groups");
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
- Y_VERIFY(resp.WriteResultSize() == 1);
+ Y_VERIFY(resp.WriteResultSize() >= 1);
Y_VERIFY(resp.GetWriteResult(0).GetStatus() == NKikimrProto::OK);
if (ConfigInited && PartitionsInited == Partitions.size()) //all partitions are working well - can apply new config
ApplyNewConfigAndReply(ctx);
@@ -1280,12 +1293,14 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
}
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
+ }
- for (const auto& partition : cfg.GetPartitions()) {
- sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId());
- }
+ for (const auto& partition : cfg.GetPartitions()) {
+ sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId());
}
+ Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1);
+
NewConfig = cfg;
ctx.Send(ctx.SelfID, request.Release());
}