summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp19
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp42
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp50
6 files changed, 120 insertions, 6 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 498063a8c72..0068d86fc9b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -428,10 +428,7 @@ TDone::TDone(const TOperationId& id)
IgnoreMessages(DebugHint(), AllIncomingEvents());
}
-bool TDone::ProgressState(TOperationContext& context) {
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState");
-
+bool TDone::Process(TOperationContext& context) {
const auto* txState = context.SS->FindTx(OperationId);
const auto& pathId = txState->TargetPathId;
@@ -480,6 +477,13 @@ bool TDone::ProgressState(TOperationContext& context) {
return true;
}
+bool TDone::ProgressState(TOperationContext& context) {
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState");
+
+ return Process(context);
+}
+
namespace {
template <typename T, typename TFuncCheck, typename TFuncToString>
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 01e9068db9d..e6182d631d1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -116,6 +116,8 @@ protected:
<< " opId# " << OperationId;
}
+ bool Process(TOperationContext& context);
+
public:
explicit TDone(const TOperationId& id);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 41a18a4cd89..05409e95174 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -381,10 +381,25 @@ public:
class TDoneWithInitialScan: public TDone {
public:
- using TDone::TDone;
+ explicit TDoneWithInitialScan(const TOperationId& id)
+ : TDone(id)
+ {
+ auto events = AllIncomingEvents();
+ events.erase(TEvPrivate::TEvCompleteBarrier::EventType);
+ IgnoreMessages(DebugHint(), events);
+ }
bool ProgressState(TOperationContext& context) override {
- if (!TDone::ProgressState(context)) {
+ LOG_I(DebugHint() << "ProgressState");
+
+ context.OnComplete.Barrier(OperationId, "DoneBarrier");
+ return false;
+ }
+
+ bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override {
+ LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier");
+
+ if (!TDone::Process(context)) {
return false;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
index ef5070e3ace..a638bf2fec9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
@@ -165,6 +165,7 @@ void TSideEffects::ApplyOnExecute(TSchemeShard* ss, NTabletFlatExecutor::TTransa
DoDoneParts(ss, ctx);
DoSetBarriers(ss, ctx);
DoCheckBarriers(ss, txc, ctx);
+ DoDoneParts(ss, ctx);
DoWaitShardCreated(ss, ctx);
DoActivateShardCreated(ss, ctx);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index f8111093902..2fcde7dc4da 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -1588,6 +1588,48 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
runtime.Send(blockedAlterStream.Release(), 0, true);
}
+ void PqTransactions(bool enable) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions()
+ .EnableChangefeedInitialScan(true)
+ .EnablePQConfigTransactionsAtSchemeShard(enable));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ NKikimrSchemeOp::ECdcStreamState state;
+ do {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream")
+ .GetPathDescription().GetCdcStreamDescription().GetState();
+ } while (state != NKikimrSchemeOp::ECdcStreamStateReady);
+ }
+
+ Y_UNIT_TEST(WithoutPqTransactions) {
+ PqTransactions(false);
+ }
+
+ Y_UNIT_TEST(WithPqTransactions) {
+ PqTransactions(true);
+ }
+
Y_UNIT_TEST(AlterStream) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions()
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 47209679f22..19f1f52d318 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -819,4 +819,54 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ template <typename T>
+ void PqTransactions(bool enable) {
+ T t;
+ t.GetTestEnvOptions()
+ .EnableChangefeedInitialScan(true)
+ .EnablePQConfigTransactionsAtSchemeShard(enable);
+
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ State: ECdcStreamStateScan
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ NKikimrSchemeOp::ECdcStreamState state;
+ do {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream")
+ .GetPathDescription().GetCdcStreamDescription().GetState();
+ } while (state != NKikimrSchemeOp::ECdcStreamStateReady);
+ }
+ });
+ }
+
+ Y_UNIT_TEST_WITH_REBOOTS(WithoutPqTransactions) {
+ PqTransactions<T>(false);
+ }
+
+ Y_UNIT_TEST_WITH_REBOOTS(WithPqTransactions) {
+ PqTransactions<T>(true);
+ }
+
} // TCdcStreamWithRebootsTests