diff options
6 files changed, 120 insertions, 6 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 498063a8c72..0068d86fc9b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -428,10 +428,7 @@ TDone::TDone(const TOperationId& id) IgnoreMessages(DebugHint(), AllIncomingEvents()); } -bool TDone::ProgressState(TOperationContext& context) { - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState"); - +bool TDone::Process(TOperationContext& context) { const auto* txState = context.SS->FindTx(OperationId); const auto& pathId = txState->TargetPathId; @@ -480,6 +477,13 @@ bool TDone::ProgressState(TOperationContext& context) { return true; } +bool TDone::ProgressState(TOperationContext& context) { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState"); + + return Process(context); +} + namespace { template <typename T, typename TFuncCheck, typename TFuncToString> diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 01e9068db9d..e6182d631d1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -116,6 +116,8 @@ protected: << " opId# " << OperationId; } + bool Process(TOperationContext& context); + public: explicit TDone(const TOperationId& id); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 41a18a4cd89..05409e95174 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -381,10 +381,25 @@ public: class TDoneWithInitialScan: public TDone { public: - using TDone::TDone; + explicit TDoneWithInitialScan(const TOperationId& id) + : TDone(id) + { + auto events = AllIncomingEvents(); + events.erase(TEvPrivate::TEvCompleteBarrier::EventType); + IgnoreMessages(DebugHint(), events); + } bool ProgressState(TOperationContext& context) override { - if (!TDone::ProgressState(context)) { + LOG_I(DebugHint() << "ProgressState"); + + context.OnComplete.Barrier(OperationId, "DoneBarrier"); + return false; + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override { + LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier"); + + if (!TDone::Process(context)) { return false; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index ef5070e3ace..a638bf2fec9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -165,6 +165,7 @@ void TSideEffects::ApplyOnExecute(TSchemeShard* ss, NTabletFlatExecutor::TTransa DoDoneParts(ss, ctx); DoSetBarriers(ss, ctx); DoCheckBarriers(ss, txc, ctx); + DoDoneParts(ss, ctx); DoWaitShardCreated(ss, ctx); DoActivateShardCreated(ss, ctx); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index f8111093902..2fcde7dc4da 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1588,6 +1588,48 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { runtime.Send(blockedAlterStream.Release(), 0, true); } + void PqTransactions(bool enable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnablePQConfigTransactionsAtSchemeShard(enable)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + NKikimrSchemeOp::ECdcStreamState state; + do { + runtime.SimulateSleep(TDuration::Seconds(1)); + state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream") + .GetPathDescription().GetCdcStreamDescription().GetState(); + } while (state != NKikimrSchemeOp::ECdcStreamStateReady); + } + + Y_UNIT_TEST(WithoutPqTransactions) { + PqTransactions(false); + } + + Y_UNIT_TEST(WithPqTransactions) { + PqTransactions(true); + } + Y_UNIT_TEST(AlterStream) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions() diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 47209679f22..19f1f52d318 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -819,4 +819,54 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + template <typename T> + void PqTransactions(bool enable) { + T t; + t.GetTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnablePQConfigTransactionsAtSchemeShard(enable); + + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + NKikimrSchemeOp::ECdcStreamState state; + do { + runtime.SimulateSleep(TDuration::Seconds(1)); + state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream") + .GetPathDescription().GetCdcStreamDescription().GetState(); + } while (state != NKikimrSchemeOp::ECdcStreamStateReady); + } + }); + } + + Y_UNIT_TEST_WITH_REBOOTS(WithoutPqTransactions) { + PqTransactions<T>(false); + } + + Y_UNIT_TEST_WITH_REBOOTS(WithPqTransactions) { + PqTransactions<T>(true); + } + } // TCdcStreamWithRebootsTests |
