summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-01-23 12:27:43 +0300
committerilnaz <[email protected]>2023-01-23 12:27:43 +0300
commit006419e12383729f3d660ad51f5a84832d6e3780 (patch)
treef33f2657602af967e75c639749eb3b89974fb6eb
parent915620f6a01dbf2419d64239fc7014076c5b911b (diff)
Initial scan implementation
-rw-r--r--ydb/core/base/appdata.h1
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/protos/counters_datashard.proto2
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/out/out.cpp4
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/protos/tx_datashard.proto35
-rw-r--r--ydb/core/tablet/resource_broker.cpp10
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.cpp596
-rw-r--r--ydb/core/tx/datashard/cdc_stream_scan.h37
-rw-r--r--ydb/core/tx/datashard/datashard.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard.h28
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h70
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp143
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h2
-rw-r--r--ydb/core/tx/datashard/drop_cdc_stream_unit.cpp9
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp58
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init_root.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp24
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp61
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp392
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp48
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h39
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_private.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h23
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp9
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp7
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema62
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema62
39 files changed, 1728 insertions, 58 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index d71dbedd5e3..0eb5954ed40 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -148,6 +148,7 @@ struct TAppData {
bool AllowShadowDataInSchemeShardForTests = false;
bool EnableMvccSnapshotWithLegacyDomainRoot = false;
bool UsePartitionStatsCollectorForTests = false;
+ bool DisableCdcAutoSwitchingToReadyStateForTests = false;
TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks
TVector<TString> DefaultUserSIDs;
TString AllAuthenticatedUsers = "all-users@well-known";
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index b5ee255ef7e..eded529f541 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1522,6 +1522,8 @@ message TDataShardConfig {
optional uint64 TtlReadAheadHi = 14 [default = 1048576];
optional uint64 IdleMemCompactionIntervalSeconds = 15 [default = 60];
optional uint64 RestoreReadBufferSizeLimit = 16 [default = 268435456]; // 256 MB
+ optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"];
+ optional uint32 CdcInitialScanTaskPriority = 18 [default = 10];
}
message TSchemeShardConfig {
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 3c61ce420d9..1b3ac6a39ab 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -449,4 +449,6 @@ enum ETxTypes {
TXTYPE_REMOVE_LOCK_CHANGE_RECORDS = 72 [(TxTypeOpts) = {Name: "TxRemoveLockChangeRecords"}];
TXTYPE_VOLATILE_TX_COMMIT = 73 [(TxTypeOpts) = {Name: "TxVolatileTxCommit"}];
TXTYPE_VOLATILE_TX_ABORT = 74 [(TxTypeOpts) = {Name: "TxVolatileTxAbort"}];
+ TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}];
+ TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}];
}
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index f3bc4c20bba..9c3ed4d8797 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -483,4 +483,6 @@ enum ETxTypes {
TXTYPE_BLOB_DEPOT_CONFIG_RESULT = 80 [(TxTypeOpts) = {Name: "TxBlobDepotConfigResult"}];
TXTYPE_ADD_BACKGROUND_TASK_RESULT = 81 [(TxTypeOpts) = {Name: "TxAddBackgroundTaskResult"}];
+
+ TXTYPE_CDC_STREAM_SCAN_PROGRESS = 82 [(TxTypeOpts) = {Name: "TxCdcStreamScanProgress"}];
}
diff --git a/ydb/core/protos/out/out.cpp b/ydb/core/protos/out/out.cpp
index 6e05ddb6f11..08ba6b36949 100644
--- a/ydb/core/protos/out/out.cpp
+++ b/ydb/core/protos/out/out.cpp
@@ -161,6 +161,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvCompactTableResult::EStatus, stream,
stream << NKikimrTxDataShard::TEvCompactTableResult::EStatus_Name(value);
}
+Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus, stream, value) {
+ stream << NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus_Name(value);
+}
+
Y_DECLARE_OUT_SPEC(, NKikimrKqp::EQueryAction, stream, value) {
stream << NKikimrKqp::EQueryAction_Name(value);
}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 3d241f4acb0..a5b9d59c624 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -960,5 +960,7 @@ message TActivity {
METADATA_SCHEME_DESCRIPTION_ACTOR = 599;
SCHEMESHARD_BACKGROUND_COMPACTION = 600;
SCHEMESHARD_BORROWED_COMPACTION = 601;
+ CDC_STREAM_SCAN_ACTOR = 602;
+ SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603;
};
};
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index ed3a9e25a35..450c6635cf0 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1,4 +1,3 @@
-
option cc_enable_arenas = true;
import "library/cpp/actors/protos/actors.proto";
@@ -1417,6 +1416,40 @@ message TEvBuildIndexProgressResponse {
optional uint64 RequestSeqNoRound = 13;
}
+message TEvCdcStreamScanRequest {
+ message TLimits {
+ optional uint32 BatchMaxBytes = 1 [default = 512000];
+ optional uint32 BatchMinRows = 2 [default = 10];
+ optional uint32 BatchMaxRows = 3 [default = 1000];
+ };
+
+ optional NKikimrProto.TPathID TablePathId = 1; // which table should be scanned
+ optional uint64 TableSchemaVersion = 2;
+ optional NKikimrProto.TPathID StreamPathId = 3;
+ optional uint64 SnapshotStep = 4;
+ optional uint64 SnapshotTxId = 5;
+ optional TLimits Limits = 6;
+}
+
+message TEvCdcStreamScanResponse {
+ enum EStatus {
+ PENDING = 0;
+ ACCEPTED = 1;
+ IN_PROGRESS = 2;
+ DONE = 3;
+ BAD_REQUEST = 4;
+ SCHEME_ERROR = 5;
+ OVERLOADED = 6;
+ ABORTED = 7;
+ }
+
+ optional uint64 TabletId = 1;
+ optional NKikimrProto.TPathID TablePathId = 2;
+ optional NKikimrProto.TPathID StreamPathId = 3;
+ optional EStatus Status = 4;
+ optional string ErrorDescription = 5;
+}
+
message TEvKqpScan {
optional uint64 TxId = 1;
optional uint64 ScanId = 2;
diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp
index 4ff2abcd099..1124484e954 100644
--- a/ydb/core/tablet/resource_broker.cpp
+++ b/ydb/core/tablet/resource_broker.cpp
@@ -1350,6 +1350,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
queue->SetWeight(100);
queue->MutableLimit()->SetCpu(1);
+ queue = config.AddQueues();
+ queue->SetName("queue_cdc_initial_scan");
+ queue->SetWeight(100);
+ queue->MutableLimit()->SetCpu(4);
+
auto task = config.AddTasks();
task->SetName(NLocalDb::UnknownTaskName);
task->SetQueueName(NLocalDb::DefaultQueueName);
@@ -1445,6 +1450,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
task->SetQueueName("queue_datashard_build_stats");
task->SetDefaultDuration(TDuration::Seconds(5).GetValue());
+ task = config.AddTasks();
+ task->SetName("cdc_initial_scan");
+ task->SetQueueName("queue_cdc_initial_scan");
+ task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
+
config.MutableResourceLimit()->SetCpu(TotalCPU);
config.MutableResourceLimit()->SetMemory(TotalMemory);
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt
index 22870fc7545..10c3d2f14db 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt
@@ -89,6 +89,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index d8d6b37b56e..9360c5258ab 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -90,6 +90,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt
index d8d6b37b56e..9360c5258ab 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux.txt
@@ -90,6 +90,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index 595405245e2..3f3451601e5 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -68,6 +68,15 @@ public:
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
}
+ auto& scanManager = DataShard.GetCdcStreamScanManager();
+ scanManager.Forget(txc.DB, pathId, streamPathId);
+ if (scanManager.GetStreamPathId() == streamPathId) {
+ if (const auto scanId = scanManager.GetScanId()) {
+ DataShard.CancelScan(tableInfo->LocalTid, scanId);
+ }
+ scanManager.Clear();
+ }
+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp
new file mode 100644
index 00000000000..2ed87e17375
--- /dev/null
+++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp
@@ -0,0 +1,596 @@
+#include "change_record_body_serializer.h"
+#include "datashard_impl.h"
+
+#include <ydb/core/scheme/scheme_tablecell.h>
+
+#include <util/generic/maybe.h>
+#include <util/string/builder.h>
+
+#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
+#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
+#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
+
+namespace NKikimr::NDataShard {
+
+using namespace NActors;
+using namespace NTable;
+using namespace NTabletFlatExecutor;
+
+TCdcStreamScanManager::TCdcStreamScanManager() {
+ Clear();
+}
+
+void TCdcStreamScanManager::Enqueue(ui64 txId, ui64 scanId, const TPathId& streamPathId) {
+ TxId = txId;
+ ScanId = scanId;
+ StreamPathId = streamPathId;
+}
+
+void TCdcStreamScanManager::Register(const TActorId& actorId) {
+ ActorId = actorId;
+}
+
+void TCdcStreamScanManager::Clear() {
+ TxId = 0;
+ ScanId = 0;
+ ActorId = TActorId();
+ StreamPathId = TPathId();
+}
+
+void TCdcStreamScanManager::Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId) {
+ NIceDb::TNiceDb nicedb(db);
+ RemoveLastKey(nicedb, tablePathId, streamPathId);
+}
+
+void TCdcStreamScanManager::PersistLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
+ const TPathId& tablePathId, const TPathId& streamPathId)
+{
+ using Schema = TDataShard::Schema;
+ db.Table<Schema::CdcStreamScans>()
+ .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId)
+ .Update<Schema::CdcStreamScans::LastKey>(value.GetBuffer());
+}
+
+bool TCdcStreamScanManager::LoadLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result,
+ const TPathId& tablePathId, const TPathId& streamPathId)
+{
+ using Schema = TDataShard::Schema;
+
+ auto rowset = db.Table<Schema::CdcStreamScans>()
+ .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId)
+ .Select();
+
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ if (rowset.IsValid()) {
+ result.ConstructInPlace();
+ Y_VERIFY(TSerializedCellVec::TryParse(rowset.GetValue<Schema::CdcStreamScans::LastKey>(), *result));
+ }
+
+ return true;
+}
+
+void TCdcStreamScanManager::RemoveLastKey(NIceDb::TNiceDb& db,
+ const TPathId& tablePathId, const TPathId& streamPathId)
+{
+ using Schema = TDataShard::Schema;
+ db.Table<Schema::CdcStreamScans>()
+ .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId)
+ .Delete();
+}
+
+class TDataShard::TTxCdcStreamScanProgress
+ : public TTransactionBase<TDataShard>
+ , protected TChangeRecordBodySerializer
+{
+ TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr Request;
+ THolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue> Response;
+ TVector<NMiniKQL::IChangeCollector::TChange> ChangeRecords;
+
+ static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) {
+ TVector<TRawTypeValue> key(Reserve(cells.size()));
+
+ Y_VERIFY(cells.size() == table->KeyColumnTypes.size());
+ for (TPos pos = 0; pos < cells.size(); ++pos) {
+ key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos));
+ }
+
+ return key;
+ }
+
+ static TVector<TUpdateOp> MakeUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
+ TVector<TUpdateOp> updates(Reserve(cells.size()));
+
+ Y_VERIFY(cells.size() == tags.size());
+ for (TPos pos = 0; pos < cells.size(); ++pos) {
+ const auto tag = tags.at(pos);
+ auto it = table->Columns.find(tag);
+ Y_VERIFY(it != table->Columns.end());
+ updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type));
+ }
+
+ return updates;
+ }
+
+ static TRowState MakeRow(TArrayRef<const TCell> cells) {
+ TRowState row(cells.size());
+
+ row.Touch(ERowOp::Upsert);
+ for (TPos pos = 0; pos < cells.size(); ++pos) {
+ row.Set(pos, ECellOp::Set, cells.at(pos));
+ }
+
+ return row;
+ }
+
+public:
+ explicit TTxCdcStreamScanProgress(TDataShard* self, TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr ev)
+ : TBase(self)
+ , Request(ev)
+ {
+ }
+
+ TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ const auto& ev = *Request->Get();
+ const auto& tablePathId = ev.TablePathId;
+ const auto& streamPathId = ev.StreamPathId;
+ const auto& readVersion = ev.ReadVersion;
+ const auto& valueTags = ev.ValueTags;
+
+ LOG_D("Progress"
+ << ": streamPathId# " << streamPathId);
+
+ if (Self->CheckChangesQueueOverflow()) {
+ return true;
+ }
+
+ Y_VERIFY(Self->GetUserTables().contains(tablePathId.LocalPathId));
+ auto table = Self->GetUserTables().at(tablePathId.LocalPathId);
+
+ auto it = table->CdcStreams.find(streamPathId);
+ Y_VERIFY(it != table->CdcStreams.end());
+
+ NIceDb::TNiceDb db(txc.DB);
+ for (const auto& [k, v] : ev.Rows) {
+ const auto key = MakeKey(k.GetCells(), table);
+ const auto& keyTags = table->KeyColumnIds;
+
+ TRowState row(0);
+ TSelectStats stats;
+ auto ready = txc.DB.Select(table->LocalTid, key, {}, row, stats, 0, readVersion);
+ if (ready == EReady::Page) {
+ return false;
+ }
+
+ if (ready == EReady::Gone || stats.InvisibleRowSkips) {
+ continue;
+ }
+
+ NKikimrChangeExchange::TChangeRecord::TDataChange body;
+ switch (it->second.Mode) {
+ case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
+ Serialize(body, ERowOp::Upsert, key, keyTags, {});
+ break;
+ case NKikimrSchemeOp::ECdcStreamModeUpdate:
+ Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table));
+ break;
+ case NKikimrSchemeOp::ECdcStreamModeNewImage:
+ case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: {
+ const auto newImage = MakeRow(v.GetCells());
+ Serialize(body, ERowOp::Upsert, key, keyTags, nullptr, &newImage, valueTags);
+ break;
+ }
+ case NKikimrSchemeOp::ECdcStreamModeOldImage: {
+ const auto oldImage = MakeRow(v.GetCells());
+ Serialize(body, ERowOp::Upsert, key, keyTags, &oldImage, nullptr, valueTags);
+ break;
+ }
+ default:
+ Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(it->second.Mode));
+ }
+
+ auto record = TChangeRecordBuilder(TChangeRecord::EKind::CdcDataChange)
+ .WithOrder(Self->AllocateChangeRecordOrder(db))
+ .WithGroup(0)
+ .WithStep(readVersion.Step)
+ .WithTxId(readVersion.TxId)
+ .WithPathId(streamPathId)
+ .WithTableId(tablePathId)
+ .WithSchemaVersion(table->GetTableSchemaVersion())
+ .WithBody(body.SerializeAsString())
+ .Build();
+
+ ChangeRecords.push_back(NMiniKQL::IChangeCollector::TChange{
+ .Order = record.GetOrder(),
+ .Group = record.GetGroup(),
+ .Step = record.GetStep(),
+ .TxId = record.GetTxId(),
+ .PathId = record.GetPathId(),
+ .BodySize = record.GetBody().size(),
+ .TableId = record.GetTableId(),
+ .SchemaVersion = record.GetSchemaVersion(),
+ });
+
+ Self->PersistChangeRecord(db, record);
+ }
+
+ if (ev.Rows) {
+ const auto& [key, _] = ev.Rows.back();
+ Self->CdcStreamScanManager.PersistLastKey(db, key, tablePathId, streamPathId);
+ }
+
+ Response = MakeHolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue>();
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ if (Response) {
+ LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
+ << ": streamPathId# " << Request->Get()->StreamPathId);
+
+ Self->EnqueueChangeRecords(std::move(ChangeRecords));
+ ctx.Send(Request->Sender, Response.Release());
+ } else {
+ LOG_I("Re-run progress tx"
+ << ": streamPathId# " << Request->Get()->StreamPathId);
+
+ // re-schedule tx
+ Self->Execute(new TDataShard::TTxCdcStreamScanProgress(Self, Request), ctx);
+ }
+ }
+
+}; // TTxCdcStreamScanProgress
+
+class TCdcStreamScan: public IActorCallback, public IScan {
+ struct TDataShardId {
+ TActorId ActorId;
+ ui64 TabletId;
+ };
+
+ struct TLimits {
+ ui32 BatchMaxBytes;
+ ui32 BatchMinRows;
+ ui32 BatchMaxRows;
+
+ TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto)
+ : BatchMaxBytes(proto.GetBatchMaxBytes())
+ , BatchMinRows(proto.GetBatchMinRows())
+ , BatchMaxRows(proto.GetBatchMaxRows())
+ {
+ }
+ };
+
+ class TBuffer {
+ public:
+ void AddRow(TArrayRef<const TCell> key, TArrayRef<const TCell> value) {
+ const auto& [k, v] = Data.emplace_back(
+ TSerializedCellVec(TSerializedCellVec::Serialize(key)),
+ TSerializedCellVec(TSerializedCellVec::Serialize(value))
+ );
+ ByteSize += k.GetBuffer().size() + v.GetBuffer().size();
+ }
+
+ auto&& Flush() {
+ ByteSize = 0;
+ return std::move(Data);
+ }
+
+ ui64 Bytes() const {
+ return ByteSize;
+ }
+
+ ui64 Rows() const {
+ return Data.size();
+ }
+
+ explicit operator bool() const {
+ return !Data.empty();
+ }
+
+ private:
+ TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Data; // key & value (if any)
+ ui64 ByteSize = 0;
+ };
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
+ hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle);
+ }
+ }
+
+ void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) {
+ ReplyTo = ev->Sender;
+ Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS);
+ }
+
+ void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) {
+ Driver->Touch(NoMoreData ? EScan::Final : EScan::Feed);
+ }
+
+ void Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) {
+ auto response = MakeHolder<TEvDataShard::TEvCdcStreamScanResponse>();
+
+ response->Record.SetTabletId(DataShard.TabletId);
+ PathIdFromPathId(TablePathId, response->Record.MutableTablePathId());
+ PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId());
+ response->Record.SetStatus(status);
+ response->Record.SetErrorDescription(error);
+
+ Send(ReplyTo, std::move(response));
+ }
+
+public:
+ explicit TCdcStreamScan(const TDataShard* const self, const TActorId& replyTo, ui64 txId,
+ const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& readVersion,
+ const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey, const TLimits& limits)
+ : IActorCallback(static_cast<TReceiveFunc>(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR)
+ , DataShard{self->SelfId(), self->TabletID()}
+ , ReplyTo(replyTo)
+ , TxId(txId)
+ , TablePathId(tablePathId)
+ , StreamPathId(streamPathId)
+ , ReadVersion(readVersion)
+ , ValueTags(valueTags)
+ , LastKey(lastKey)
+ , Limits(limits)
+ , Driver(nullptr)
+ , NoMoreData(false)
+ {
+ }
+
+ void Describe(IOutputStream& o) const noexcept override {
+ o << "CdcStreamScan {"
+ << " TxId: " << TxId
+ << " TablePathId: " << TablePathId
+ << " StreamPathId: " << StreamPathId
+ << " }";
+ }
+
+ IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr<TScheme> scheme) noexcept override {
+ TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this);
+ Driver = driver;
+ Y_VERIFY(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size());
+ return {EScan::Feed, {}};
+ }
+
+ void Registered(TActorSystem* sys, const TActorId&) override {
+ sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId()));
+ }
+
+ EScan Seek(TLead& lead, ui64) noexcept override {
+ if (LastKey) {
+ lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper);
+ } else {
+ lead.To(ValueTags, {}, ESeek::Lower);
+ }
+
+ return EScan::Feed;
+ }
+
+ EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept override {
+ Buffer.AddRow(key, *row);
+ if (Buffer.Bytes() < Limits.BatchMaxBytes) {
+ if (Buffer.Rows() < Limits.BatchMaxRows) {
+ return EScan::Feed;
+ }
+ } else {
+ if (Buffer.Rows() < Limits.BatchMinRows) {
+ return EScan::Feed;
+ }
+ }
+
+ Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress(
+ TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush())
+ ));
+ return EScan::Sleep;
+ }
+
+ EScan Exhausted() noexcept override {
+ NoMoreData = true;
+
+ if (!Buffer) {
+ return EScan::Final;
+ }
+
+ Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress(
+ TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush())
+ ));
+ return EScan::Sleep;
+ }
+
+ TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override {
+ if (abort != EAbort::None) {
+ Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED);
+ } else {
+ Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE);
+ }
+
+ PassAway();
+ return nullptr;
+ }
+
+private:
+ const TDataShardId DataShard;
+ TActorId ReplyTo;
+ const ui64 TxId;
+ const TPathId TablePathId;
+ const TPathId StreamPathId;
+ const TRowVersion ReadVersion;
+ const TVector<TTag> ValueTags;
+ const TMaybe<TSerializedCellVec> LastKey;
+ const TLimits Limits;
+
+ IDriver* Driver;
+ bool NoMoreData;
+ TBuffer Buffer;
+
+}; // TCdcStreamScan
+
+class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
+ TEvDataShard::TEvCdcStreamScanRequest::TPtr Request;
+ THolder<IEventHandle> Response; // response to sender or forward to scanner
+
+ THolder<IEventHandle> MakeResponse(const TActorContext& ctx,
+ NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) const
+ {
+ return MakeHolder<IEventHandle>(Request->Sender, ctx.SelfID, new TEvDataShard::TEvCdcStreamScanResponse(
+ Request->Get()->Record, Self->TabletID(), status, error
+ ));
+ }
+
+public:
+ explicit TTxCdcStreamScanRun(TDataShard* self, TEvDataShard::TEvCdcStreamScanRequest::TPtr ev)
+ : TBase(self)
+ , Request(ev)
+ {
+ }
+
+ TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_RUN; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ const auto& record = Request->Get()->Record;
+
+ LOG_D("Run"
+ << ": ev# " << record.ShortDebugString());
+
+ const auto tablePathId = PathIdFromPathId(record.GetTablePathId());
+ if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST,
+ TStringBuilder() << "Unknown table"
+ << ": tablePathId# " << tablePathId);
+ return true;
+ }
+
+ auto table = Self->GetUserTables().at(tablePathId.LocalPathId);
+ if (record.GetTableSchemaVersion() != table->GetTableSchemaVersion()) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR,
+ TStringBuilder() << "Schema version mismatch"
+ << ": tablePathId# " << tablePathId
+ << ", got# " << record.GetTableSchemaVersion()
+ << ", expected# " << table->GetTableSchemaVersion());
+ return true;
+ }
+
+ const auto streamPathId = PathIdFromPathId(record.GetStreamPathId());
+ auto it = table->CdcStreams.find(streamPathId);
+ if (it == table->CdcStreams.end()) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR,
+ TStringBuilder() << "Unknown stream"
+ << ": tablePathId# " << tablePathId
+ << ", streamPathId# " << streamPathId);
+ return true;
+ }
+
+ if (it->second.State != NKikimrSchemeOp::ECdcStreamStateScan) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR,
+ TStringBuilder() << "Unexpected stream state"
+ << ": tablePathId# " << tablePathId
+ << ", streamPathId# " << streamPathId
+ << ", state# " << it->second.State);
+ return true;
+ }
+
+ if (Self->CdcStreamScanManager.GetScanId()) {
+ if (Self->CdcStreamScanManager.GetStreamPathId() == streamPathId) {
+ if (const auto& to = Self->CdcStreamScanManager.GetActorId()) {
+ Response = Request->Forward(to);
+ } else {
+ // nop, scan actor will report state when it starts
+ }
+ return true;
+ }
+
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED);
+ return true;
+ }
+
+ if (!record.HasSnapshotStep()) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST,
+ "SnapshotStep was not specified");
+ return true;
+ }
+
+ if (!record.HasSnapshotTxId()) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST,
+ "SnapshotTxId was not specified");
+ return true;
+ }
+
+ const TSnapshotKey snapshotKey(tablePathId, record.GetSnapshotStep(), record.GetSnapshotTxId());
+ if (!Self->SnapshotManager.FindAvailable(snapshotKey)) {
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST,
+ TStringBuilder() << "Snapshot was not found"
+ << ": key# " << snapshotKey.ToTuple());
+ return true;
+ }
+
+ TVector<TTag> valueTags;
+ if (it->second.Mode != NKikimrSchemeOp::ECdcStreamModeKeysOnly) {
+ valueTags.reserve(table->Columns.size() - 1);
+ for (const auto& [tag, column] : table->Columns) {
+ if (!column.IsKey) {
+ valueTags.push_back(tag);
+ }
+ }
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ TMaybe<TSerializedCellVec> lastKey;
+ if (!Self->CdcStreamScanManager.LoadLastKey(db, lastKey, tablePathId, streamPathId)) {
+ return false;
+ }
+
+ auto* appData = AppData(ctx);
+ const auto& taskName = appData->DataShardConfig.GetCdcInitialScanTaskName();
+ const auto taskPrio = appData->DataShardConfig.GetCdcInitialScanTaskPriority();
+
+ const auto readVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId);
+ const ui64 localTxId = ++Self->NextTieBreakerIndex;
+ auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId,
+ tablePathId, streamPathId, readVersion, valueTags, lastKey, record.GetLimits());
+ const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId,
+ TScanOptions()
+ .SetResourceBroker(taskName, taskPrio)
+ .SetSnapshotRowVersion(readVersion)
+ );
+ Self->CdcStreamScanManager.Enqueue(localTxId, scanId, streamPathId);
+
+ LOG_I("Run scan"
+ << ": streamPathId# " << streamPathId);
+
+ Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::ACCEPTED);
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ if (Response) {
+ ctx.Send(Response.Release());
+ }
+ }
+
+}; // TTxCdcStreamScanRun
+
+void TDataShard::Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxCdcStreamScanRun(this, ev), ctx);
+}
+
+void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx) {
+ if (CdcStreamScanManager.GetTxId() != ev->Get()->TxId) {
+ LOG_W("Unknown cdc stream scan actor registered"
+ << ": at: " << TabletID());
+ return;
+ }
+
+ CdcStreamScanManager.Register(ev->Get()->ActorId);
+}
+
+void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxCdcStreamScanProgress(this, ev), ctx);
+}
+
+}
diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h
new file mode 100644
index 00000000000..ab44ae929b9
--- /dev/null
+++ b/ydb/core/tx/datashard/cdc_stream_scan.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/base/pathid.h>
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+
+namespace NKikimr::NDataShard {
+
+class TCdcStreamScanManager {
+public:
+ TCdcStreamScanManager();
+
+ void Enqueue(ui64 txId, ui64 scanId, const TPathId& streamPathId);
+ void Register(const NActors::TActorId& actorId);
+ void Clear();
+ void Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId);
+
+ ui64 GetTxId() const { return TxId; }
+ ui64 GetScanId() const { return ScanId; }
+ const NActors::TActorId& GetActorId() const { return ActorId; }
+ const TPathId& GetStreamPathId() const { return StreamPathId; }
+
+ void PersistLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
+ const TPathId& tablePathId, const TPathId& streamPathId);
+ bool LoadLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result,
+ const TPathId& tablePathId, const TPathId& streamPathId);
+ void RemoveLastKey(NIceDb::TNiceDb& db,
+ const TPathId& tablePathId, const TPathId& streamPathId);
+
+private:
+ ui64 TxId;
+ ui64 ScanId;
+ NActors::TActorId ActorId;
+ TPathId StreamPathId;
+};
+
+}
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a8f6565a106..d92e82eef82 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3675,6 +3675,12 @@ void TDataShard::ScanComplete(NTable::EAbort,
<< ", at: "<< TabletID());
InFlightCondErase.Clear();
+ } else if (CdcStreamScanManager.GetScanId() && CdcStreamScanManager.GetTxId() == cookie) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Cdc stream scan complete"
+ << ": cookie: " << cookie
+ << ", at: "<< TabletID());
+
+ CdcStreamScanManager.Clear();
} else if (!Pipeline.FinishStreamingTx(cookie)) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
"Scan complete at " << TabletID() << " for unknown tx " << cookie);
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index a2237a97ca0..1bdad5517bf 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -313,6 +313,9 @@ struct TEvDataShard {
EvGetOpenTxs, /* for tests */
EvGetOpenTxsResult, /* for tests */
+ EvCdcStreamScanRequest,
+ EvCdcStreamScanResponse,
+
EvEnd
};
@@ -1582,6 +1585,31 @@ struct TEvDataShard {
{ }
};
+ struct TEvCdcStreamScanRequest
+ : public TEventPB<TEvCdcStreamScanRequest,
+ NKikimrTxDataShard::TEvCdcStreamScanRequest,
+ EvCdcStreamScanRequest>
+ {
+ };
+
+ struct TEvCdcStreamScanResponse
+ : public TEventPB<TEvCdcStreamScanResponse,
+ NKikimrTxDataShard::TEvCdcStreamScanResponse,
+ EvCdcStreamScanResponse>
+ {
+ TEvCdcStreamScanResponse() = default;
+
+ explicit TEvCdcStreamScanResponse(
+ const NKikimrTxDataShard::TEvCdcStreamScanRequest& request, ui64 tabletId,
+ NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {})
+ {
+ Record.SetTabletId(tabletId);
+ Record.MutableTablePathId()->CopyFrom(request.GetTablePathId());
+ Record.MutableStreamPathId()->CopyFrom(request.GetStreamPathId());
+ Record.SetStatus(status);
+ Record.SetErrorDescription(error);
+ }
+ };
};
IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 874fa1f8853..caeb7087d4b 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -14,6 +14,7 @@
#include "datashard_repl_offsets.h"
#include "datashard_repl_offsets_client.h"
#include "datashard_repl_offsets_server.h"
+#include "cdc_stream_scan.h"
#include "change_exchange.h"
#include "change_record.h"
#include "progress_queue.h"
@@ -217,6 +218,8 @@ class TDataShard
class TTxRemoveLockChangeRecords;
class TTxVolatileTxCommit;
class TTxVolatileTxAbort;
+ class TTxCdcStreamScanRun;
+ class TTxCdcStreamScanProgress;
template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
@@ -262,6 +265,7 @@ class TDataShard
friend class TSnapshotManager;
friend class TSchemaSnapshotManager;
friend class TVolatileTxManager;
+ friend class TCdcStreamScanManager;
friend class TReplicationSourceOffsetsClient;
friend class TReplicationSourceOffsetsServer;
@@ -271,6 +275,7 @@ class TDataShard
friend class TBuildIndexScan;
friend class TReadColumnsScan;
friend class TCondEraseScan;
+ friend class TCdcStreamScan;
friend class TDatashardKeySampler;
friend class TS3UploadsManager;
@@ -323,6 +328,9 @@ class TDataShard
EvReplicationSourceOffsets,
EvMediatorRestoreBackup,
EvRemoveLockChangeRecords,
+ EvCdcStreamScanRegistered,
+ EvCdcStreamScanProgress,
+ EvCdcStreamScanContinue,
EvEnd
};
@@ -466,6 +474,41 @@ class TDataShard
struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {};
struct TEvRemoveLockChangeRecords : public TEventLocal<TEvRemoveLockChangeRecords, EvRemoveLockChangeRecords> {};
+
+ struct TEvCdcStreamScanRegistered : public TEventLocal<TEvCdcStreamScanRegistered, EvCdcStreamScanRegistered> {
+ explicit TEvCdcStreamScanRegistered(ui64 txId, const TActorId& actorId)
+ : TxId(txId)
+ , ActorId(actorId)
+ {
+ }
+
+ const ui64 TxId;
+ const TActorId ActorId;
+ };
+
+ struct TEvCdcStreamScanProgress : public TEventLocal<TEvCdcStreamScanProgress, EvCdcStreamScanProgress> {
+ explicit TEvCdcStreamScanProgress(
+ const TPathId& tablePathId,
+ const TPathId& streamPathId,
+ const TRowVersion& readVersion,
+ const TVector<ui32>& valueTags,
+ TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows)
+ : TablePathId(tablePathId)
+ , StreamPathId(streamPathId)
+ , ReadVersion(readVersion)
+ , ValueTags(valueTags)
+ , Rows(std::move(rows))
+ {
+ }
+
+ const TPathId TablePathId;
+ const TPathId StreamPathId;
+ const TRowVersion ReadVersion;
+ const TVector<ui32> ValueTags;
+ TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows;
+ };
+
+ struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {};
};
struct Schema : NIceDb::Schema {
@@ -906,6 +949,17 @@ class TDataShard
using TColumns = TableColumns<TxId, ShardId>;
};
+ struct CdcStreamScans : Table<34> {
+ struct TableOwnerId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct TablePathId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct StreamOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct StreamPathId : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct LastKey : Column<5, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId>;
+ using TColumns = TableColumns<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId, LastKey>;
+ };
+
using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue,
DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress,
Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts,
@@ -913,7 +967,7 @@ class TDataShard
ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived,
UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts,
LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits,
- TxVolatileDetails, TxVolatileParticipants>;
+ TxVolatileDetails, TxVolatileParticipants, CdcStreamScans>;
// These settings are persisted on each Init. So we use empty settings in order not to overwrite what
// was changed by the user
@@ -1117,6 +1171,9 @@ class TDataShard
void Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAsyncJobComplete::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCancelBackup::TPtr &ev, const TActorContext &ctx);
@@ -1645,6 +1702,11 @@ public:
void ScheduleRemoveLockChanges(ui64 lockId);
void ScheduleRemoveAbandonedLockChanges();
+ static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
+ const TPathId& tablePathId, const TPathId& streamPathId);
+ static bool LoadCdcStreamScanLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result,
+ const TPathId& tablePathId, const TPathId& streamPathId);
+ static void RemoveCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TPathId& tablePathId, const TPathId& streamPathId);
static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op);
void NotifySchemeshard(const TActorContext& ctx, ui64 txId = 0);
@@ -1662,6 +1724,8 @@ public:
TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; }
const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; }
+ TCdcStreamScanManager& GetCdcStreamScanManager() { return CdcStreamScanManager; }
+ const TCdcStreamScanManager& GetCdcStreamScanManager() const { return CdcStreamScanManager; }
template <typename... Args>
bool PromoteCompleteEdge(Args&&... args) {
@@ -2284,6 +2348,7 @@ private:
TSnapshotManager SnapshotManager;
TSchemaSnapshotManager SchemaSnapshotManager;
TVolatileTxManager VolatileTxManager;
+ TCdcStreamScanManager CdcStreamScanManager;
TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer;
@@ -2648,6 +2713,9 @@ protected:
HFunc(TEvDataShard::TEvRefreshVolatileSnapshotRequest, Handle);
HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle);
HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle);
+ HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
+ HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);
+ HFunc(TEvPrivate::TEvCdcStreamScanProgress, Handle);
HFunc(TEvPrivate::TEvAsyncJobComplete, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, DoPeriodicTasks);
HFunc(TEvents::TEvUndelivered, Handle);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index fd3be2d402e..691a0b8e6ea 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -786,7 +786,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
-
TShardedTableOptions SimpleTable() {
return TShardedTableOptions()
.Columns({
@@ -795,33 +794,40 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
- TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) {
+ TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
.Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly,
.Format = format,
- .VirtualTimestamps = vt,
};
}
- TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) {
+ TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
.Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeUpdate,
.Format = format,
- .VirtualTimestamps = vt,
};
}
- TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) {
+ TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") {
return TCdcStream{
.Name = name,
.Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages,
.Format = format,
- .VirtualTimestamps = vt,
};
}
+ TCdcStream WithVirtualTimestamps(TCdcStream streamDesc) {
+ streamDesc.VirtualTimestamps = true;
+ return streamDesc;
+ }
+
+ TCdcStream WithInitialScan(TCdcStream streamDesc) {
+ streamDesc.InitialState = NKikimrSchemeOp::ECdcStreamStateScan;
+ return streamDesc;
+ }
+
TString CalcPartitionKey(const TString& data) {
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(data, &json));
@@ -1093,7 +1099,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
-
#define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \
template<typename TRunner> void N(NUnitTest::TTestContext&); \
struct TTestRegistration##N { \
@@ -1164,7 +1169,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) {
- TRunner::Read(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson, "Stream", true), {R"(
+ TRunner::Read(SimpleTable(), WithVirtualTimestamps(KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
(2, 20),
@@ -1916,6 +1921,126 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}
+ Y_UNIT_TEST(InitialScan) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ });
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )");
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"update":{"value":100},"key":[1]})",
+ R"({"update":{"value":200},"key":[2]})",
+ R"({"update":{"value":300},"key":[3]})",
+ });
+ }
+
+ Y_UNIT_TEST(InitialScanSkipUpdatedRows) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ TVector<THolder<IEventHandle>> delayed;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvCdcStreamScanRequest) {
+ delayed.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
+
+ if (delayed.empty()) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return !delayed.empty();
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (4, 40);
+ )");
+
+ ExecSQL(server, edgeActor, R"(
+ DELETE FROM `/Root/Table` WHERE key = 2;
+ )");
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":100},"key":[1]})",
+ R"({"update":{"value":40},"key":[4]})",
+ R"({"erase":{},"key":[2]})",
+ });
+
+ runtime.SetObserverFunc(prevObserver);
+ for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":100},"key":[1]})",
+ R"({"update":{"value":40},"key":[4]})",
+ R"({"erase":{},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ });
+ }
+
} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index d5a9599dace..79124ae262c 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1640,6 +1640,9 @@ ui64 AsyncAlterAddStream(
desc.MutableStreamDescription()->SetMode(streamDesc.Mode);
desc.MutableStreamDescription()->SetFormat(streamDesc.Format);
desc.MutableStreamDescription()->SetVirtualTimestamps(streamDesc.VirtualTimestamps);
+ if (streamDesc.InitialState) {
+ desc.MutableStreamDescription()->SetState(*streamDesc.InitialState);
+ }
return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 0e8c4a14413..9bac26dbfe1 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -397,10 +397,12 @@ struct TShardedTableOptions {
struct TCdcStream {
using EMode = NKikimrSchemeOp::ECdcStreamMode;
using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
+ using EState = NKikimrSchemeOp::ECdcStreamState;
TString Name;
EMode Mode;
EFormat Format;
+ TMaybe<EState> InitialState;
bool VirtualTimestamps = false;
};
diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
index e7a2c14f510..44912cd4633 100644
--- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -54,6 +54,15 @@ public:
DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key);
}
+ auto& scanManager = DataShard.GetCdcStreamScanManager();
+ scanManager.Forget(txc.DB, pathId, streamPathId);
+ if (scanManager.GetStreamPathId() == streamPathId) {
+ if (const auto scanId = scanManager.GetScanId()) {
+ DataShard.CancelScan(tableInfo->LocalTid, scanId);
+ }
+ scanManager.Clear();
+ }
+
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId));
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
index 30d43d36c9b..a6d51a786d9 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
@@ -199,6 +199,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
index cad15eddcf8..5ed31fe71d1 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
@@ -200,6 +200,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/CMakeLists.linux.txt
index cad15eddcf8..5ed31fe71d1 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux.txt
@@ -200,6 +200,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 734335427d7..293bcdfffa4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -19,6 +19,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TDeque<TPathId> BlockStoreVolumesToClean;
TVector<ui64> ExportsToResume;
TVector<ui64> ImportsToResume;
+ TVector<TPathId> CdcStreamScansToResume;
bool Broken = false;
explicit TTxInit(TSelf *self)
@@ -2825,6 +2826,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, state);
Self->IncrementPathDbRefCount(pathId);
+ if (state == NKikimrSchemeOp::ECdcStreamStateScan) {
+ Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
+ << ", cdc stream pathId: " << pathId
+ << ", parent pathId: " << path->ParentPathId);
+ auto parent = Self->PathsById.at(path->ParentPathId);
+
+ if (parent->NormalState()) {
+ CdcStreamScansToResume.push_back(pathId);
+ }
+ }
+
if (!rowset.Next()) {
return false;
}
@@ -2886,6 +2898,38 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}
+ // Read CdcStreamScanShardStatus
+ {
+ auto rowset = db.Table<Schema::CdcStreamScanShardStatus>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ auto pathId = TPathId(
+ rowset.GetValue<Schema::CdcStreamScanShardStatus::OwnerPathId>(),
+ rowset.GetValue<Schema::CdcStreamScanShardStatus::LocalPathId>()
+ );
+ auto shardIdx = TShardIdx(
+ rowset.GetValue<Schema::CdcStreamScanShardStatus::OwnerShardIdx>(),
+ rowset.GetValue<Schema::CdcStreamScanShardStatus::LocalShardIdx>()
+ );
+ auto status = rowset.GetValue<Schema::CdcStreamScanShardStatus::Status>();
+
+ Y_VERIFY_S(Self->CdcStreams.contains(pathId), "Cdc stream not found"
+ << ": pathId# " << pathId);
+
+ auto stream = Self->CdcStreams.at(pathId);
+ stream->ScanShards.emplace(shardIdx, status);
+
+ if (status != NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
+ stream->PendingShards.insert(shardIdx);
+ } else {
+ stream->DoneShards.insert(shardIdx);
+ }
+ }
+ }
+
// Read DomainsPools
{
auto rowset = db.Table<Schema::StoragePools>().Range().Select();
@@ -4727,12 +4771,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return;
}
- Self->ActivateAfterInitialization(
- ctx,
- std::move(delayPublications),
- ExportsToResume, ImportsToResume,
- std::move(TablesToClean), std::move(BlockStoreVolumesToClean)
- );
+ Self->ActivateAfterInitialization(ctx, {
+ .DelayPublications = std::move(delayPublications),
+ .ExportIds = ExportsToResume,
+ .ImportsIds = ImportsToResume,
+ .CdcStreamScans = std::move(CdcStreamScansToResume),
+ .TablesToClean = std::move(TablesToClean),
+ .BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean),
+ });
}
};
diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
index 5201abd44b7..e8d12429402 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
@@ -144,8 +144,7 @@ struct TSchemeShard::TTxInitRoot : public TSchemeShard::TRwTxBase {
<< ", at schemeshard: " << Self->TabletID());
Self->SignalTabletActive(ctx);
-
- Self->ActivateAfterInitialization(ctx);
+ Self->ActivateAfterInitialization(ctx, {});
}
};
@@ -440,7 +439,9 @@ struct TSchemeShard::TTxInitTenantSchemeShard : public TSchemeShard::TRwTxBase {
auto publications = TSideEffects::TPublications();
publications[TTxId()] = TSideEffects::TPublications::mapped_type{Self->RootPathId()};
- Self->ActivateAfterInitialization(ctx, std::move(publications));
+ Self->ActivateAfterInitialization(ctx, {
+ .DelayPublications = std::move(publications),
+ });
}
};
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
index 2fc32251499..754f1efe4f5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
@@ -150,20 +150,20 @@ public:
Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId));
auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId);
+ TCdcStreamInfo::EState requiredState = TCdcStreamInfo::EState::ECdcStreamStateInvalid;
TCdcStreamInfo::EState newState = TCdcStreamInfo::EState::ECdcStreamStateInvalid;
switch (op.GetActionCase()) {
case NKikimrSchemeOp::TAlterCdcStream::kDisable:
- newState = TCdcStreamInfo::EState::ECdcStreamStateDisabled;
+ requiredState = TCdcStreamInfo::EState::ECdcStreamStateDisabled;
+ if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateReady) {
+ newState = requiredState;
+ }
break;
case NKikimrSchemeOp::TAlterCdcStream::kGetReady:
+ requiredState = TCdcStreamInfo::EState::ECdcStreamStateReady;
if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan) {
- newState = TCdcStreamInfo::EState::ECdcStreamStateReady;
- } else {
- result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder()
- << "Cannot switch to ready state"
- << ": current# " << stream->State);
- return result;
+ newState = requiredState;
}
break;
default:
@@ -172,6 +172,14 @@ public:
return result;
}
+ if (newState == TCdcStreamInfo::EState::ECdcStreamStateInvalid) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder()
+ << "Cannot switch state"
+ << ": from# " << stream->State
+ << ", to# " << requiredState);
+ return result;
+ }
+
auto guard = context.DbGuard();
context.MemChanges.GrabPath(context.SS, streamPath.Base()->PathId);
context.MemChanges.GrabCdcStream(context.SS, streamPath.Base()->PathId);
@@ -183,8 +191,6 @@ public:
auto streamAlter = stream->CreateNextVersion();
Y_VERIFY(streamAlter);
-
- Y_VERIFY(newState != TCdcStreamInfo::EState::ECdcStreamStateInvalid);
streamAlter->State = newState;
Y_VERIFY(!context.SS->FindTx(OperationId));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 1441944725c..fe08e99eb32 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -416,7 +416,7 @@ public:
class TDone: public TSubOperationState {
-private:
+protected:
TOperationId OperationId;
TString DebugHint() const override {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 1dc4a69f177..de9d29af56c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -291,7 +291,7 @@ protected:
Y_VERIFY(stream->AlterData);
context.SS->DescribeCdcStream(childPathId, childName, stream->AlterData, *notice.MutableStreamDescription());
- if (stream->AlterData->State == NKikimrSchemeOp::ECdcStreamStateScan) {
+ if (stream->AlterData->State == TCdcStreamInfo::EState::ECdcStreamStateScan) {
notice.SetSnapshotName("ChangefeedInitialScan");
}
}
@@ -307,7 +307,9 @@ public:
using NCdcStreamState::TProposeAtTable::TProposeAtTable;
bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
- NCdcStreamState::TProposeAtTable::HandleReply(ev, context);
+ if (!NCdcStreamState::TProposeAtTable::HandleReply(ev, context)) {
+ return false;
+ }
const auto step = TStepId(ev->Get()->StepId);
NIceDb::TNiceDb db(context.GetDB());
@@ -320,6 +322,55 @@ public:
}; // TProposeAtTableWithInitialScan
+class TDoneWithInitialScan: public TDone {
+public:
+ using TDone::TDone;
+
+ bool ProgressState(TOperationContext& context) override {
+ if (!TDone::ProgressState(context)) {
+ return false;
+ }
+
+ const auto* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan);
+ const auto& pathId = txState->TargetPathId;
+
+ Y_VERIFY(context.SS->PathsById.contains(pathId));
+ auto path = context.SS->PathsById.at(pathId);
+
+ TMaybe<TPathId> streamPathId;
+ for (const auto& [_, childPathId] : path->GetChildren()) {
+ Y_VERIFY(context.SS->PathsById.contains(childPathId));
+ auto childPath = context.SS->PathsById.at(childPathId);
+
+ if (childPath->CreateTxId != OperationId.GetTxId()) {
+ continue;
+ }
+
+ Y_VERIFY(childPath->IsCdcStream() && !childPath->Dropped());
+ Y_VERIFY(context.SS->CdcStreams.contains(childPathId));
+ auto stream = context.SS->CdcStreams.at(childPathId);
+
+ Y_VERIFY(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan);
+ Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan"
+ << ": found# " << *streamPathId
+ << ", another# " << childPathId);
+ streamPathId = childPathId;
+ }
+
+ if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) {
+ return true;
+ }
+
+ Y_VERIFY(streamPathId);
+ context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId));
+
+ return true;
+ }
+
+}; // TDoneWithInitialScan
+
class TNewCdcStreamAtTable: public TSubOperation {
static TTxState::ETxState NextState() {
return TTxState::ConfigureParts;
@@ -353,7 +404,11 @@ class TNewCdcStreamAtTable: public TSubOperation {
case TTxState::ProposedWaitParts:
return MakeHolder<NTableState::TProposedWaitParts>(OperationId);
case TTxState::Done:
- return MakeHolder<TDone>(OperationId);
+ if (InitialScan) {
+ return MakeHolder<TDoneWithInitialScan>(OperationId);
+ } else {
+ return MakeHolder<TDone>(OperationId);
+ }
default:
return nullptr;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
new file mode 100644
index 00000000000..2f0bfcbc640
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
@@ -0,0 +1,392 @@
+#include "schemeshard_impl.h"
+
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+#include <util/generic/deque.h>
+
+#if defined LOG_D || \
+ defined LOG_W || \
+ defined LOG_E
+#error log macro redefinition
+#endif
+
+#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
+#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
+#define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream)
+
+namespace NKikimr::NSchemeShard {
+
+using namespace NTabletFlatExecutor;
+
+class TCdcStreamScanFinalizer: public TActorBootstrapped<TCdcStreamScanFinalizer> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER;
+ }
+
+ explicit TCdcStreamScanFinalizer(const TActorId& ssActorId, THolder<TEvSchemeShard::TEvModifySchemeTransaction>&& req)
+ : SSActorId(ssActorId)
+ , Request(std::move(req)) // template without txId
+ {
+ }
+
+ void Bootstrap() {
+ AllocateTxId();
+ Become(&TCdcStreamScanFinalizer::StateWork);
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle)
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+private:
+ void AllocateTxId() {
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ }
+
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ Request->Record.SetTxId(ev->Get()->TxId);
+ Send(SSActorId, Request.Release());
+ }
+
+private:
+ const TActorId SSActorId;
+ THolder<TEvSchemeShard::TEvModifySchemeTransaction> Request;
+
+}; // TCdcStreamScanFinalizer
+
+struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchemeShard> {
+ // params
+ TEvPrivate::TEvRunCdcStreamScan::TPtr RunCdcStreamScan = nullptr;
+ TEvDataShard::TEvCdcStreamScanResponse::TPtr CdcStreamScanResponse = nullptr;
+ struct {
+ TPathId StreamPathId;
+ TTabletId TabletId;
+ explicit operator bool() const { return StreamPathId && TabletId; }
+ } PipeRetry;
+
+ // side effects
+ TDeque<std::tuple<TPathId, TTabletId, THolder<IEventBase>>> ScanRequests;
+ TPathId StreamToProgress;
+ THolder<TEvSchemeShard::TEvModifySchemeTransaction> Finalize;
+
+public:
+ explicit TTxProgress(TSelf* self, TEvPrivate::TEvRunCdcStreamScan::TPtr& ev)
+ : TBase(self)
+ , RunCdcStreamScan(ev)
+ {
+ }
+
+ explicit TTxProgress(TSelf* self, TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev)
+ : TBase(self)
+ , CdcStreamScanResponse(ev)
+ {
+ }
+
+ explicit TTxProgress(TSelf* self, const TPathId& streamPathId, TTabletId tabletId)
+ : TBase(self)
+ , PipeRetry({streamPathId, tabletId})
+ {
+ }
+
+ TTxType GetTxType() const override {
+ return TXTYPE_CDC_STREAM_SCAN_PROGRESS;
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ if (RunCdcStreamScan) {
+ return OnRunCdcStreamScan(txc, ctx);
+ } else if (CdcStreamScanResponse) {
+ return OnCdcStreamScanResponse(txc, ctx);
+ } else if (PipeRetry) {
+ return OnPipeRetry(txc, ctx);
+ } else {
+ Y_FAIL("unreachable");
+ }
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ for (auto& [streamPathId, tabletId, ev] : ScanRequests) {
+ Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx);
+ }
+
+ if (StreamToProgress) {
+ ctx.Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(StreamToProgress));
+ }
+
+ if (Finalize) {
+ Self->CdcStreamScanFinalizer = ctx.Register(new TCdcStreamScanFinalizer(ctx.SelfID, std::move(Finalize)));
+ }
+ }
+
+private:
+ bool OnRunCdcStreamScan(TTransactionContext& txc, const TActorContext& ctx) {
+ const auto& streamPathId = RunCdcStreamScan->Get()->StreamPathId;
+
+ LOG_D("Run"
+ << ": streamPathId# " << streamPathId);
+
+ if (!Self->CdcStreams.contains(streamPathId)) {
+ LOG_W("Cannot run"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "stream doesn't exist");
+ return true;
+ }
+
+ auto streamInfo = Self->CdcStreams.at(streamPathId);
+ if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) {
+ LOG_W("Cannot run"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "unexpected state");
+ return true;
+ }
+
+ Y_VERIFY(Self->PathsById.contains(streamPathId));
+ auto streamPath = Self->PathsById.at(streamPathId);
+
+ Y_VERIFY(Self->PathsById.contains(streamPathId));
+ const auto& tablePathId = Self->PathsById.at(streamPathId)->ParentPathId;
+
+ Y_VERIFY(Self->Tables.contains(tablePathId));
+ auto table = Self->Tables.at(tablePathId);
+
+ if (streamInfo->ScanShards.empty()) {
+ NIceDb::TNiceDb db(txc.DB);
+ for (const auto& shard : table->GetPartitions()) {
+ const auto status = TCdcStreamInfo::TShardStatus(NKikimrTxDataShard::TEvCdcStreamScanResponse::PENDING);
+ streamInfo->ScanShards.emplace(shard.ShardIdx, status);
+ streamInfo->PendingShards.insert(shard.ShardIdx);
+ Self->PersistCdcStreamScanShardStatus(db, streamPathId, shard.ShardIdx, status);
+ }
+ }
+
+ while (!streamInfo->PendingShards.empty()) {
+ if (streamInfo->InProgressShards.size() >= streamInfo->MaxInProgressShards) {
+ break;
+ }
+
+ auto it = streamInfo->PendingShards.begin();
+
+ Y_VERIFY(Self->ShardInfos.contains(*it));
+ const auto tabletId = Self->ShardInfos.at(*it).TabletID;
+
+ streamInfo->InProgressShards.insert(*it);
+ streamInfo->PendingShards.erase(it);
+
+ auto ev = MakeHolder<TEvDataShard::TEvCdcStreamScanRequest>();
+ PathIdFromPathId(tablePathId, ev->Record.MutableTablePathId());
+ ev->Record.SetTableSchemaVersion(table->AlterVersion);
+ PathIdFromPathId(streamPathId, ev->Record.MutableStreamPathId());
+ ev->Record.SetSnapshotStep(ui64(streamPath->StepCreated));
+ ev->Record.SetSnapshotTxId(ui64(streamPath->CreateTxId));
+ ScanRequests.emplace_back(streamPathId, tabletId, std::move(ev));
+ }
+
+ if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) {
+ const auto path = TPath::Init(streamPathId, Self);
+
+ Finalize = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>();
+ auto& tx = *Finalize->Record.AddTransaction();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream);
+ tx.SetWorkingDir(path.Parent().Parent().PathString()); // stream -> table -> working dir
+ tx.SetFailOnExist(false);
+
+ auto& op = *tx.MutableAlterCdcStream();
+ op.SetTableName(path.Parent().LeafName());
+ op.SetStreamName(path.LeafName());
+ op.MutableGetReady()->SetLockTxId(ui64(streamPath->CreateTxId));
+ tx.MutableLockGuard()->SetOwnerTxId(ui64(streamPath->CreateTxId));
+ }
+
+ return true;
+ }
+
+ bool OnCdcStreamScanResponse(TTransactionContext& txc, const TActorContext& ctx) {
+ const auto& record = CdcStreamScanResponse->Get()->Record;
+
+ LOG_D("Response"
+ << ": ev# " << record.ShortDebugString());
+
+ const auto streamPathId = PathIdFromPathId(record.GetStreamPathId());
+ if (!Self->CdcStreams.contains(streamPathId)) {
+ LOG_W("Cannot process response"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "stream doesn't exist");
+ return true;
+ }
+
+ auto streamInfo = Self->CdcStreams.at(streamPathId);
+ if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) {
+ LOG_W("Cannot process response"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "unexpected state");
+ return true;
+ }
+
+ const auto tabletId = TTabletId(record.GetTabletId());
+ const auto shardIdx = Self->GetShardIdx(tabletId);
+ if (shardIdx == InvalidShardIdx) {
+ LOG_E("Cannot process response"
+ << ": streamPathId# " << streamPathId
+ << ", tabletId# " << tabletId
+ << ", reason# " << "tablet not found");
+ return true;
+ }
+
+ auto it = streamInfo->ScanShards.find(shardIdx);
+ if (it == streamInfo->ScanShards.end()) {
+ LOG_E("Cannot process response"
+ << ": streamPathId# " << streamPathId
+ << ", shardIdx# " << shardIdx
+ << ", reason# " << "shard not found");
+ return true;
+ }
+
+ auto& status = it->second;
+ if (!streamInfo->InProgressShards.contains(shardIdx)) {
+ LOG_W("Shard status mismatch"
+ << ": streamPathId# " << streamPathId
+ << ", shardIdx# " << shardIdx
+ << ", got# " << record.GetStatus()
+ << ", current# " << status.Status);
+ return true;
+ }
+
+ switch (record.GetStatus()) {
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::ACCEPTED:
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS:
+ break;
+
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE:
+ status.Status = record.GetStatus();
+ streamInfo->DoneShards.insert(shardIdx);
+ streamInfo->InProgressShards.erase(shardIdx);
+ Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx);
+ StreamToProgress = streamPathId;
+ break;
+
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED:
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED:
+ streamInfo->PendingShards.insert(shardIdx);
+ streamInfo->InProgressShards.erase(shardIdx);
+ Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx);
+ StreamToProgress = streamPathId;
+ break;
+
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST:
+ case NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR:
+ Y_FAIL("unreachable");
+
+ default:
+ LOG_E("Unexpected response status"
+ << ": status# " << static_cast<int>(record.GetStatus())
+ << ", error# " << record.GetErrorDescription());
+ return true;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistCdcStreamScanShardStatus(db, streamPathId, shardIdx, status);
+
+ if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) {
+ StreamToProgress = streamPathId;
+ }
+
+ return true;
+ }
+
+ bool OnPipeRetry(TTransactionContext&, const TActorContext& ctx) {
+ const auto& streamPathId = PipeRetry.StreamPathId;
+ const auto& tabletId = PipeRetry.TabletId;
+
+ LOG_D("Pipe retry"
+ << ": streamPathId# " << streamPathId
+ << ", tabletId# " << tabletId);
+
+ if (!Self->CdcStreams.contains(streamPathId)) {
+ LOG_W("Cannot retry"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "stream doesn't exist");
+ return true;
+ }
+
+ auto streamInfo = Self->CdcStreams.at(streamPathId);
+ if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) {
+ LOG_W("Cannot retry"
+ << ": streamPathId# " << streamPathId
+ << ", reason# " << "unexpected state");
+ return true;
+ }
+
+ const auto shardIdx = Self->GetShardIdx(tabletId);
+ if (shardIdx == InvalidShardIdx) {
+ LOG_E("Cannot retry"
+ << ": streamPathId# " << streamPathId
+ << ", tabletId# " << tabletId
+ << ", reason# " << "tablet not found");
+ return true;
+ }
+
+ auto it = streamInfo->InProgressShards.find(shardIdx);
+ if (it == streamInfo->InProgressShards.end()) {
+ LOG_E("Cannot retry"
+ << ": streamPathId# " << streamPathId
+ << ", shardIdx# " << shardIdx
+ << ", reason# " << "shard not found");
+ return true;
+ }
+
+ streamInfo->PendingShards.insert(*it);
+ streamInfo->InProgressShards.erase(it);
+ Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx);
+ StreamToProgress = streamPathId;
+
+ return true;
+ }
+};
+
+ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev) {
+ return new TCdcStreamScan::TTxProgress(this, ev);
+}
+
+ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
+ return new TCdcStreamScan::TTxProgress(this, ev);
+}
+
+ITransaction* TSchemeShard::CreatePipeRetry(const TPathId& streamPathId, TTabletId tabletId) {
+ return new TCdcStreamScan::TTxProgress(this, streamPathId, tabletId);
+}
+
+void TSchemeShard::Handle(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev, const TActorContext& ctx) {
+ Execute(CreateTxProgressCdcStreamScan(ev), ctx);
+}
+
+void TSchemeShard::Handle(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev, const TActorContext& ctx) {
+ Execute(CreateTxProgressCdcStreamScan(ev), ctx);
+}
+
+void TSchemeShard::ResumeCdcStreamScans(const TVector<TPathId>& ids, const TActorContext& ctx) {
+ for (const auto& id : ids) {
+ Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(id));
+ }
+}
+
+void TSchemeShard::PersistCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId,
+ const TShardIdx& shardIdx, const TCdcStreamInfo::TShardStatus& status)
+{
+ db.Table<Schema::CdcStreamScanShardStatus>()
+ .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId())
+ .Update(
+ NIceDb::TUpdate<Schema::CdcStreamScanShardStatus::Status>(status.Status)
+ );
+}
+
+void TSchemeShard::RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx) {
+ db.Table<Schema::CdcStreamScanShardStatus>()
+ .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId())
+ .Delete();
+}
+
+}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index fba9c2c0676..5804e7c5b61 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -56,14 +56,7 @@ bool ResolvePoolNames(
const TSchemeLimits TSchemeShard::DefaultLimits = {};
-void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
- TSideEffects::TPublications&& delayPublications,
- const TVector<ui64>& exportIds,
- const TVector<ui64>& importsIds,
- TVector<TPathId>&& tablesToClean,
- TDeque<TPathId>&& blockStoreVolumesToClean
- )
-{
+void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts) {
TPathId subDomainPathId = GetCurrentSubDomainPathId();
TSubDomainInfo::TPtr domainPtr = ResolveDomainInfo(subDomainPathId);
LoginProvider.Audience = TPath::Init(subDomainPathId, this).PathString();
@@ -74,14 +67,14 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
GetDomainKey(subDomainPathId), sysViewProcessorId ? sysViewProcessorId.GetValue() : 0);
Send(SysPartitionStatsCollector, evInit.Release());
- Execute(CreateTxInitPopulator(std::move(delayPublications)), ctx);
+ Execute(CreateTxInitPopulator(std::move(opts.DelayPublications)), ctx);
- if (tablesToClean) {
- Execute(CreateTxCleanTables(std::move(tablesToClean)), ctx);
+ if (opts.TablesToClean) {
+ Execute(CreateTxCleanTables(std::move(opts.TablesToClean)), ctx);
}
- if (blockStoreVolumesToClean) {
- Execute(CreateTxCleanBlockStoreVolumes(std::move(blockStoreVolumesToClean)), ctx);
+ if (opts.BlockStoreVolumesToClean) {
+ Execute(CreateTxCleanBlockStoreVolumes(std::move(opts.BlockStoreVolumesToClean)), ctx);
}
if (IsDomainSchemeShard) {
@@ -114,8 +107,9 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
SVPMigrator = Register(CreateSVPMigrator(TabletID(), SelfId(), std::move(migrations)).Release());
}
- ResumeExports(exportIds, ctx);
- ResumeImports(importsIds, ctx);
+ ResumeExports(opts.ExportIds, ctx);
+ ResumeImports(opts.ImportsIds, ctx);
+ ResumeCdcStreamScans(opts.CdcStreamScans, ctx);
ParentDomainLink.SendSync(ctx);
@@ -1623,6 +1617,10 @@ void TSchemeShard::PersistRemoveCdcStream(NIceDb::TNiceDb &db, const TPathId& pa
db.Table<Schema::CdcStream>().Key(pathId.OwnerId, pathId.LocalPathId).Delete();
+ for (const auto& [shardIdx, _] : stream->ScanShards) {
+ RemoveCdcStreamScanShardStatus(db, pathId, shardIdx);
+ }
+
CdcStreams.erase(pathId);
DecrementPathDbRefCount(pathId);
}
@@ -3969,7 +3967,12 @@ void TSchemeShard::Die(const TActorContext &ctx) {
ctx.Send(SVPMigrator, new TEvents::TEvPoisonPill());
}
+ if (CdcStreamScanFinalizer) {
+ ctx.Send(CdcStreamScanFinalizer, new TEvents::TEvPoisonPill());
+ }
+
IndexBuildPipes.Shutdown(ctx);
+ CdcStreamScanPipes.Shutdown(ctx);
ShardDeleter.Shutdown(ctx);
ParentDomainLink.Shutdown(ctx);
@@ -4255,6 +4258,11 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvPrivate::TEvIndexBuildingMakeABill, Handle);
// } // NIndexBuilder
+ //namespace NCdcStreamScan {
+ HFuncTraced(TEvPrivate::TEvRunCdcStreamScan, Handle);
+ HFuncTraced(TEvDataShard::TEvCdcStreamScanResponse, Handle);
+ // } // NCdcStreamScan
+
// namespace NLongRunningCommon {
HFuncTraced(TEvTxAllocatorClient::TEvAllocateResult, Handle);
HFuncTraced(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
@@ -4856,6 +4864,11 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc
return;
}
+ if (CdcStreamScanPipes.Has(clientId)) {
+ Execute(CreatePipeRetry(CdcStreamScanPipes.GetOwnerId(clientId), CdcStreamScanPipes.GetTabletId(clientId)), ctx);
+ return;
+ }
+
if (ShardDeleter.Has(tabletId, clientId)) {
ShardDeleter.ResendDeleteRequests(TTabletId(ev->Get()->TabletId), ShardInfos, ctx);
return;
@@ -4900,6 +4913,11 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc
return;
}
+ if (CdcStreamScanPipes.Has(clientId)) {
+ Execute(CreatePipeRetry(CdcStreamScanPipes.GetOwnerId(clientId), CdcStreamScanPipes.GetTabletId(clientId)), ctx);
+ return;
+ }
+
if (ShardDeleter.Has(tabletId, clientId)) {
ShardDeleter.ResendDeleteRequests(tabletId, ShardInfos, ctx);
return;
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 5568c57df74..d53c3c090dc 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -270,6 +270,7 @@ public:
TActorId SysPartitionStatsCollector;
TActorId SVPMigrator;
+ TActorId CdcStreamScanFinalizer;
TDuration StatsMaxExecuteTime;
TDuration StatsBatchTimeout;
@@ -753,13 +754,15 @@ public:
struct TTxInitTenantSchemeShard;
NTabletFlatExecutor::ITransaction* CreateTxInitTenantSchemeShard(TEvSchemeShard::TEvInitTenantSchemeShard::TPtr &ev);
- void ActivateAfterInitialization(const TActorContext &ctx,
- TSideEffects::TPublications&& delayPublications = {},
- const TVector<ui64>& exportIds = {},
- const TVector<ui64>& importsIds = {},
- TVector<TPathId>&& tablesToClean = {},
- TDeque<TPathId>&& blockStoreVolumesToClean = {}
- );
+ struct TActivationOpts {
+ TSideEffects::TPublications DelayPublications;
+ TVector<ui64> ExportIds;
+ TVector<ui64> ImportsIds;
+ TVector<TPathId> CdcStreamScans;
+ TVector<TPathId> TablesToClean;
+ TDeque<TPathId> BlockStoreVolumesToClean;
+ };
+ void ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts);
struct TTxInitPopulator;
NTabletFlatExecutor::ITransaction* CreateTxInitPopulator(TSideEffects::TPublications&& publications);
@@ -1195,7 +1198,6 @@ public:
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId);
NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev);
-
void Handle(TEvIndexBuilder::TEvCreateRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvIndexBuilder::TEvGetRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvIndexBuilder::TEvCancelRequest::TPtr& ev, const TActorContext& ctx);
@@ -1211,6 +1213,27 @@ public:
// } //NIndexBuilder
+ // namespace NCdcStreamScan {
+ struct TCdcStreamScan {
+ struct TTxProgress;
+ };
+
+ TDedicatedPipePool<TPathId> CdcStreamScanPipes;
+
+ NTabletFlatExecutor::ITransaction* CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev);
+ NTabletFlatExecutor::ITransaction* CreateTxProgressCdcStreamScan(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev);
+ NTabletFlatExecutor::ITransaction* CreatePipeRetry(const TPathId& streamPathId, TTabletId tabletId);
+
+ void Handle(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev, const TActorContext& ctx);
+
+ void ResumeCdcStreamScans(const TVector<TPathId>& ids, const TActorContext& ctx);
+
+ void PersistCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx,
+ const TCdcStreamInfo::TShardStatus& status);
+ void RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx);
+ // } // NCdcStreamScan
+
public:
void ChangeStreamShardsCount(i64 delta) override;
void ChangeStreamShardsQuota(i64 delta) override;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 4292ead6b92..304b441cd13 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2309,6 +2309,17 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
using EState = NKikimrSchemeOp::ECdcStreamState;
+ // shards of the table
+ struct TShardStatus {
+ NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus Status;
+
+ explicit TShardStatus(NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status)
+ : Status(status)
+ {}
+ };
+
+ static constexpr ui32 MaxInProgressShards = 10;
+
TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, EState state)
: AlterVersion(version)
, Mode(mode)
@@ -2349,6 +2360,11 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
EState State;
TCdcStreamInfo::TPtr AlterData = nullptr;
+
+ TMap<TShardIdx, TShardStatus> ScanShards;
+ THashSet<TShardIdx> PendingShards;
+ THashSet<TShardIdx> InProgressShards;
+ THashSet<TShardIdx> DoneShards;
};
struct TSequenceInfo : public TSimpleRefCount<TSequenceInfo> {
diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h
index feace051751..6a2171865c5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_private.h
+++ b/ydb/core/tx/schemeshard/schemeshard_private.h
@@ -26,6 +26,7 @@ struct TEvPrivate {
EvCompleteBarrier,
EvPersistStats,
EvConsoleConfigsTimeout,
+ EvRunCdcStreamScan,
EvEnd
};
@@ -165,6 +166,14 @@ struct TEvPrivate {
struct TEvConsoleConfigsTimeout: public TEventLocal<TEvConsoleConfigsTimeout, EvConsoleConfigsTimeout> {
};
+ struct TEvRunCdcStreamScan: public TEventLocal<TEvRunCdcStreamScan, EvRunCdcStreamScan> {
+ const TPathId StreamPathId;
+
+ TEvRunCdcStreamScan(const TPathId& streamPathId)
+ : StreamPathId(streamPathId)
+ {}
+ };
+
}; // TEvPrivate
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index ed451025d63..007959c4860 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1552,6 +1552,26 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>;
};
+ struct CdcStreamScanShardStatus : Table<103> {
+ // path id of cdc stream
+ struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; };
+ struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; };
+ // shard idx of datashard
+ struct OwnerShardIdx : Column<3, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; };
+ struct LocalShardIdx : Column<4, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; };
+
+ struct Status : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus; };
+
+ using TKey = TableKey<OwnerPathId, LocalPathId, OwnerShardIdx, LocalShardIdx>;
+ using TColumns = TableColumns<
+ OwnerPathId,
+ LocalPathId,
+ OwnerShardIdx,
+ LocalShardIdx,
+ Status
+ >;
+ };
+
struct Sequences : Table<97> {
struct PathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; };
struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {};
@@ -1700,7 +1720,8 @@ struct Schema : NIceDb::Schema {
SequencesAlters,
Replications,
ReplicationsAlterData,
- BlobDepots
+ BlobDepots,
+ CdcStreamScanShardStatus
>;
static constexpr ui64 SysParam_NextPathId = 1;
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index e4a40a535aa..c9225677822 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -886,6 +886,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
TTestEnv env(runtime, TTestEnvOptions()
.EnableProtoSourceIdInfo(true)
.EnableChangefeedInitialScan(true));
+ runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
@@ -920,6 +921,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
TestModificationResult(runtime, txId, expectedStatus);
};
+ // try to disable
+ testAlterCdcStream(++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamName: "Stream"
+ Disable {}
+ )", lockTxId, NKikimrScheme::StatusPreconditionFailed);
+
// without guard & lockTxId
testAlterCdcStream(++txId, "/MyRoot", R"(
TableName: "Table"
@@ -979,6 +987,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
TTestEnv env(runtime, TTestEnvOptions()
.EnableProtoSourceIdInfo(true)
.EnableChangefeedInitialScan(true));
+ runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index d067dccb5af..a90a8b6705b 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -12,6 +12,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
+ runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
+
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
@@ -43,7 +45,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::PathExist,
- NLs::StreamState(state.GetOrElse(NKikimrSchemeOp::ECdcStreamStateReady)),
NLs::StreamVirtualTimestamps(vt),
});
});
@@ -123,6 +124,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
+ runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
+
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
@@ -182,6 +185,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
+ runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true;
+
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 20d676d546b..73c678da014 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1814,6 +1814,68 @@
}
},
{
+ "TableId": 34,
+ "TableName": "CdcStreamScans",
+ "TableKey": [
+ 1,
+ 2,
+ 3,
+ 4
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "TableOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "TablePathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "StreamOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "StreamPathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "LastKey",
+ "ColumnType": "String"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
"TableId": 101,
"TableName": "LockChangeRecords",
"TableKey": [
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 771cf1fd6f3..93f379652fa 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6708,5 +6708,67 @@
"Blobs": 1
}
}
+ },
+ {
+ "TableId": 103,
+ "TableName": "CdcStreamScanShardStatus",
+ "TableKey": [
+ 1,
+ 2,
+ 3,
+ 4
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "OwnerPathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "LocalPathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "OwnerShardIdx",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "LocalShardIdx",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "Status",
+ "ColumnType": "Uint32"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
}
] \ No newline at end of file