diff options
author | alexnick <[email protected]> | 2022-10-12 18:54:20 +0300 |
---|---|---|
committer | alexnick <[email protected]> | 2022-10-12 18:54:20 +0300 |
commit | 89955aa4582c9c9fa083bc5d7722e1a168f53922 (patch) | |
tree | 061c7c1dab46a0d845958cd7ac9009146e6de251 | |
parent | 8e9ae82ce24b775c308380715bdac8ab4799343f (diff) |
fix for cdc topic creation
fix for internal state
fix for verify
fix for cdc topic creation
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index d4feed02f46..3544393bc84 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -686,16 +686,29 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) { if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK || - resp.WriteResultSize() != 1 || - resp.GetWriteResult(0).GetStatus() != NKikimrProto::OK) - { + resp.WriteResultSize() < 1) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Config write error: " << resp.DebugString() << " " << ctx.SelfID); ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); return; } + for (const auto& res : resp.GetWriteResult()) { + if (res.GetStatus() != NKikimrProto::OK) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() + << " Config write error: " << resp.DebugString() << " " << ctx.SelfID); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } + } + + if (resp.WriteResultSize() > 1) { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() + << " restarting - have some registering of message groups"); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } - Y_VERIFY(resp.WriteResultSize() == 1); + Y_VERIFY(resp.WriteResultSize() >= 1); Y_VERIFY(resp.GetWriteResult(0).GetStatus() == NKikimrProto::OK); if (ConfigInited && PartitionsInited == Partitions.size()) //all partitions are working well - can apply new config ApplyNewConfigAndReply(ctx); @@ -1280,12 +1293,14 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf } sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange)); + } - for (const auto& partition : cfg.GetPartitions()) { - sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId()); - } + for (const auto& partition : cfg.GetPartitions()) { + sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId()); } + Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1); + NewConfig = cfg; ctx.Send(ctx.SelfID, request.Release()); } |