diff options
author | ilnaz <[email protected]> | 2023-01-23 12:27:43 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-01-23 12:27:43 +0300 |
commit | 006419e12383729f3d660ad51f5a84832d6e3780 (patch) | |
tree | f33f2657602af967e75c639749eb3b89974fb6eb | |
parent | 915620f6a01dbf2419d64239fc7014076c5b911b (diff) |
Initial scan implementation
39 files changed, 1728 insertions, 58 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index d71dbedd5e3..0eb5954ed40 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -148,6 +148,7 @@ struct TAppData { bool AllowShadowDataInSchemeShardForTests = false; bool EnableMvccSnapshotWithLegacyDomainRoot = false; bool UsePartitionStatsCollectorForTests = false; + bool DisableCdcAutoSwitchingToReadyStateForTests = false; TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks TVector<TString> DefaultUserSIDs; TString AllAuthenticatedUsers = "all-users@well-known"; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index b5ee255ef7e..eded529f541 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1522,6 +1522,8 @@ message TDataShardConfig { optional uint64 TtlReadAheadHi = 14 [default = 1048576]; optional uint64 IdleMemCompactionIntervalSeconds = 15 [default = 60]; optional uint64 RestoreReadBufferSizeLimit = 16 [default = 268435456]; // 256 MB + optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"]; + optional uint32 CdcInitialScanTaskPriority = 18 [default = 10]; } message TSchemeShardConfig { diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 3c61ce420d9..1b3ac6a39ab 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -449,4 +449,6 @@ enum ETxTypes { TXTYPE_REMOVE_LOCK_CHANGE_RECORDS = 72 [(TxTypeOpts) = {Name: "TxRemoveLockChangeRecords"}]; TXTYPE_VOLATILE_TX_COMMIT = 73 [(TxTypeOpts) = {Name: "TxVolatileTxCommit"}]; TXTYPE_VOLATILE_TX_ABORT = 74 [(TxTypeOpts) = {Name: "TxVolatileTxAbort"}]; + TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}]; + TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}]; } diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index f3bc4c20bba..9c3ed4d8797 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -483,4 +483,6 @@ enum ETxTypes { TXTYPE_BLOB_DEPOT_CONFIG_RESULT = 80 [(TxTypeOpts) = {Name: "TxBlobDepotConfigResult"}]; TXTYPE_ADD_BACKGROUND_TASK_RESULT = 81 [(TxTypeOpts) = {Name: "TxAddBackgroundTaskResult"}]; + + TXTYPE_CDC_STREAM_SCAN_PROGRESS = 82 [(TxTypeOpts) = {Name: "TxCdcStreamScanProgress"}]; } diff --git a/ydb/core/protos/out/out.cpp b/ydb/core/protos/out/out.cpp index 6e05ddb6f11..08ba6b36949 100644 --- a/ydb/core/protos/out/out.cpp +++ b/ydb/core/protos/out/out.cpp @@ -161,6 +161,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvCompactTableResult::EStatus, stream, stream << NKikimrTxDataShard::TEvCompactTableResult::EStatus_Name(value); } +Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus, stream, value) { + stream << NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus_Name(value); +} + Y_DECLARE_OUT_SPEC(, NKikimrKqp::EQueryAction, stream, value) { stream << NKikimrKqp::EQueryAction_Name(value); } diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 3d241f4acb0..a5b9d59c624 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -960,5 +960,7 @@ message TActivity { METADATA_SCHEME_DESCRIPTION_ACTOR = 599; SCHEMESHARD_BACKGROUND_COMPACTION = 600; SCHEMESHARD_BORROWED_COMPACTION = 601; + CDC_STREAM_SCAN_ACTOR = 602; + SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER = 603; }; }; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index ed3a9e25a35..450c6635cf0 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1,4 +1,3 @@ - option cc_enable_arenas = true; import "library/cpp/actors/protos/actors.proto"; @@ -1417,6 +1416,40 @@ message TEvBuildIndexProgressResponse { optional uint64 RequestSeqNoRound = 13; } +message TEvCdcStreamScanRequest { + message TLimits { + optional uint32 BatchMaxBytes = 1 [default = 512000]; + optional uint32 BatchMinRows = 2 [default = 10]; + optional uint32 BatchMaxRows = 3 [default = 1000]; + }; + + optional NKikimrProto.TPathID TablePathId = 1; // which table should be scanned + optional uint64 TableSchemaVersion = 2; + optional NKikimrProto.TPathID StreamPathId = 3; + optional uint64 SnapshotStep = 4; + optional uint64 SnapshotTxId = 5; + optional TLimits Limits = 6; +} + +message TEvCdcStreamScanResponse { + enum EStatus { + PENDING = 0; + ACCEPTED = 1; + IN_PROGRESS = 2; + DONE = 3; + BAD_REQUEST = 4; + SCHEME_ERROR = 5; + OVERLOADED = 6; + ABORTED = 7; + } + + optional uint64 TabletId = 1; + optional NKikimrProto.TPathID TablePathId = 2; + optional NKikimrProto.TPathID StreamPathId = 3; + optional EStatus Status = 4; + optional string ErrorDescription = 5; +} + message TEvKqpScan { optional uint64 TxId = 1; optional uint64 ScanId = 2; diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp index 4ff2abcd099..1124484e954 100644 --- a/ydb/core/tablet/resource_broker.cpp +++ b/ydb/core/tablet/resource_broker.cpp @@ -1350,6 +1350,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() queue->SetWeight(100); queue->MutableLimit()->SetCpu(1); + queue = config.AddQueues(); + queue->SetName("queue_cdc_initial_scan"); + queue->SetWeight(100); + queue->MutableLimit()->SetCpu(4); + auto task = config.AddTasks(); task->SetName(NLocalDb::UnknownTaskName); task->SetQueueName(NLocalDb::DefaultQueueName); @@ -1445,6 +1450,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() task->SetQueueName("queue_datashard_build_stats"); task->SetDefaultDuration(TDuration::Seconds(5).GetValue()); + task = config.AddTasks(); + task->SetName("cdc_initial_scan"); + task->SetQueueName("queue_cdc_initial_scan"); + task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); + config.MutableResourceLimit()->SetCpu(TotalCPU); config.MutableResourceLimit()->SetMemory(TotalMemory); diff --git a/ydb/core/tx/datashard/CMakeLists.darwin.txt b/ydb/core/tx/datashard/CMakeLists.darwin.txt index 22870fc7545..10c3d2f14db 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin.txt @@ -89,6 +89,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index d8d6b37b56e..9360c5258ab 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -90,6 +90,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux.txt b/ydb/core/tx/datashard/CMakeLists.linux.txt index d8d6b37b56e..9360c5258ab 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux.txt @@ -90,6 +90,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index 595405245e2..3f3451601e5 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -68,6 +68,15 @@ public: DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); } + auto& scanManager = DataShard.GetCdcStreamScanManager(); + scanManager.Forget(txc.DB, pathId, streamPathId); + if (scanManager.GetStreamPathId() == streamPathId) { + if (const auto scanId = scanManager.GetScanId()) { + DataShard.CancelScan(tableInfo->LocalTid, scanId); + } + scanManager.Clear(); + } + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp new file mode 100644 index 00000000000..2ed87e17375 --- /dev/null +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -0,0 +1,596 @@ +#include "change_record_body_serializer.h" +#include "datashard_impl.h" + +#include <ydb/core/scheme/scheme_tablecell.h> + +#include <util/generic/maybe.h> +#include <util/string/builder.h> + +#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream) +#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream) +#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream) + +namespace NKikimr::NDataShard { + +using namespace NActors; +using namespace NTable; +using namespace NTabletFlatExecutor; + +TCdcStreamScanManager::TCdcStreamScanManager() { + Clear(); +} + +void TCdcStreamScanManager::Enqueue(ui64 txId, ui64 scanId, const TPathId& streamPathId) { + TxId = txId; + ScanId = scanId; + StreamPathId = streamPathId; +} + +void TCdcStreamScanManager::Register(const TActorId& actorId) { + ActorId = actorId; +} + +void TCdcStreamScanManager::Clear() { + TxId = 0; + ScanId = 0; + ActorId = TActorId(); + StreamPathId = TPathId(); +} + +void TCdcStreamScanManager::Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId) { + NIceDb::TNiceDb nicedb(db); + RemoveLastKey(nicedb, tablePathId, streamPathId); +} + +void TCdcStreamScanManager::PersistLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, + const TPathId& tablePathId, const TPathId& streamPathId) +{ + using Schema = TDataShard::Schema; + db.Table<Schema::CdcStreamScans>() + .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId) + .Update<Schema::CdcStreamScans::LastKey>(value.GetBuffer()); +} + +bool TCdcStreamScanManager::LoadLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result, + const TPathId& tablePathId, const TPathId& streamPathId) +{ + using Schema = TDataShard::Schema; + + auto rowset = db.Table<Schema::CdcStreamScans>() + .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId) + .Select(); + + if (!rowset.IsReady()) { + return false; + } + + if (rowset.IsValid()) { + result.ConstructInPlace(); + Y_VERIFY(TSerializedCellVec::TryParse(rowset.GetValue<Schema::CdcStreamScans::LastKey>(), *result)); + } + + return true; +} + +void TCdcStreamScanManager::RemoveLastKey(NIceDb::TNiceDb& db, + const TPathId& tablePathId, const TPathId& streamPathId) +{ + using Schema = TDataShard::Schema; + db.Table<Schema::CdcStreamScans>() + .Key(tablePathId.OwnerId, tablePathId.LocalPathId, streamPathId.OwnerId, streamPathId.LocalPathId) + .Delete(); +} + +class TDataShard::TTxCdcStreamScanProgress + : public TTransactionBase<TDataShard> + , protected TChangeRecordBodySerializer +{ + TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr Request; + THolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue> Response; + TVector<NMiniKQL::IChangeCollector::TChange> ChangeRecords; + + static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) { + TVector<TRawTypeValue> key(Reserve(cells.size())); + + Y_VERIFY(cells.size() == table->KeyColumnTypes.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + } + + return key; + } + + static TVector<TUpdateOp> MakeUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) { + TVector<TUpdateOp> updates(Reserve(cells.size())); + + Y_VERIFY(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_VERIFY(it != table->Columns.end()); + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + + return updates; + } + + static TRowState MakeRow(TArrayRef<const TCell> cells) { + TRowState row(cells.size()); + + row.Touch(ERowOp::Upsert); + for (TPos pos = 0; pos < cells.size(); ++pos) { + row.Set(pos, ECellOp::Set, cells.at(pos)); + } + + return row; + } + +public: + explicit TTxCdcStreamScanProgress(TDataShard* self, TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr ev) + : TBase(self) + , Request(ev) + { + } + + TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + const auto& ev = *Request->Get(); + const auto& tablePathId = ev.TablePathId; + const auto& streamPathId = ev.StreamPathId; + const auto& readVersion = ev.ReadVersion; + const auto& valueTags = ev.ValueTags; + + LOG_D("Progress" + << ": streamPathId# " << streamPathId); + + if (Self->CheckChangesQueueOverflow()) { + return true; + } + + Y_VERIFY(Self->GetUserTables().contains(tablePathId.LocalPathId)); + auto table = Self->GetUserTables().at(tablePathId.LocalPathId); + + auto it = table->CdcStreams.find(streamPathId); + Y_VERIFY(it != table->CdcStreams.end()); + + NIceDb::TNiceDb db(txc.DB); + for (const auto& [k, v] : ev.Rows) { + const auto key = MakeKey(k.GetCells(), table); + const auto& keyTags = table->KeyColumnIds; + + TRowState row(0); + TSelectStats stats; + auto ready = txc.DB.Select(table->LocalTid, key, {}, row, stats, 0, readVersion); + if (ready == EReady::Page) { + return false; + } + + if (ready == EReady::Gone || stats.InvisibleRowSkips) { + continue; + } + + NKikimrChangeExchange::TChangeRecord::TDataChange body; + switch (it->second.Mode) { + case NKikimrSchemeOp::ECdcStreamModeKeysOnly: + Serialize(body, ERowOp::Upsert, key, keyTags, {}); + break; + case NKikimrSchemeOp::ECdcStreamModeUpdate: + Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); + break; + case NKikimrSchemeOp::ECdcStreamModeNewImage: + case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: { + const auto newImage = MakeRow(v.GetCells()); + Serialize(body, ERowOp::Upsert, key, keyTags, nullptr, &newImage, valueTags); + break; + } + case NKikimrSchemeOp::ECdcStreamModeOldImage: { + const auto oldImage = MakeRow(v.GetCells()); + Serialize(body, ERowOp::Upsert, key, keyTags, &oldImage, nullptr, valueTags); + break; + } + default: + Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(it->second.Mode)); + } + + auto record = TChangeRecordBuilder(TChangeRecord::EKind::CdcDataChange) + .WithOrder(Self->AllocateChangeRecordOrder(db)) + .WithGroup(0) + .WithStep(readVersion.Step) + .WithTxId(readVersion.TxId) + .WithPathId(streamPathId) + .WithTableId(tablePathId) + .WithSchemaVersion(table->GetTableSchemaVersion()) + .WithBody(body.SerializeAsString()) + .Build(); + + ChangeRecords.push_back(NMiniKQL::IChangeCollector::TChange{ + .Order = record.GetOrder(), + .Group = record.GetGroup(), + .Step = record.GetStep(), + .TxId = record.GetTxId(), + .PathId = record.GetPathId(), + .BodySize = record.GetBody().size(), + .TableId = record.GetTableId(), + .SchemaVersion = record.GetSchemaVersion(), + }); + + Self->PersistChangeRecord(db, record); + } + + if (ev.Rows) { + const auto& [key, _] = ev.Rows.back(); + Self->CdcStreamScanManager.PersistLastKey(db, key, tablePathId, streamPathId); + } + + Response = MakeHolder<TDataShard::TEvPrivate::TEvCdcStreamScanContinue>(); + return true; + } + + void Complete(const TActorContext& ctx) override { + if (Response) { + LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)" + << ": streamPathId# " << Request->Get()->StreamPathId); + + Self->EnqueueChangeRecords(std::move(ChangeRecords)); + ctx.Send(Request->Sender, Response.Release()); + } else { + LOG_I("Re-run progress tx" + << ": streamPathId# " << Request->Get()->StreamPathId); + + // re-schedule tx + Self->Execute(new TDataShard::TTxCdcStreamScanProgress(Self, Request), ctx); + } + } + +}; // TTxCdcStreamScanProgress + +class TCdcStreamScan: public IActorCallback, public IScan { + struct TDataShardId { + TActorId ActorId; + ui64 TabletId; + }; + + struct TLimits { + ui32 BatchMaxBytes; + ui32 BatchMinRows; + ui32 BatchMaxRows; + + TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto) + : BatchMaxBytes(proto.GetBatchMaxBytes()) + , BatchMinRows(proto.GetBatchMinRows()) + , BatchMaxRows(proto.GetBatchMaxRows()) + { + } + }; + + class TBuffer { + public: + void AddRow(TArrayRef<const TCell> key, TArrayRef<const TCell> value) { + const auto& [k, v] = Data.emplace_back( + TSerializedCellVec(TSerializedCellVec::Serialize(key)), + TSerializedCellVec(TSerializedCellVec::Serialize(value)) + ); + ByteSize += k.GetBuffer().size() + v.GetBuffer().size(); + } + + auto&& Flush() { + ByteSize = 0; + return std::move(Data); + } + + ui64 Bytes() const { + return ByteSize; + } + + ui64 Rows() const { + return Data.size(); + } + + explicit operator bool() const { + return !Data.empty(); + } + + private: + TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Data; // key & value (if any) + ui64 ByteSize = 0; + }; + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); + hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle); + } + } + + void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { + ReplyTo = ev->Sender; + Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS); + } + + void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) { + Driver->Touch(NoMoreData ? EScan::Final : EScan::Feed); + } + + void Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) { + auto response = MakeHolder<TEvDataShard::TEvCdcStreamScanResponse>(); + + response->Record.SetTabletId(DataShard.TabletId); + PathIdFromPathId(TablePathId, response->Record.MutableTablePathId()); + PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId()); + response->Record.SetStatus(status); + response->Record.SetErrorDescription(error); + + Send(ReplyTo, std::move(response)); + } + +public: + explicit TCdcStreamScan(const TDataShard* const self, const TActorId& replyTo, ui64 txId, + const TPathId& tablePathId, const TPathId& streamPathId, const TRowVersion& readVersion, + const TVector<TTag>& valueTags, const TMaybe<TSerializedCellVec>& lastKey, const TLimits& limits) + : IActorCallback(static_cast<TReceiveFunc>(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + , DataShard{self->SelfId(), self->TabletID()} + , ReplyTo(replyTo) + , TxId(txId) + , TablePathId(tablePathId) + , StreamPathId(streamPathId) + , ReadVersion(readVersion) + , ValueTags(valueTags) + , LastKey(lastKey) + , Limits(limits) + , Driver(nullptr) + , NoMoreData(false) + { + } + + void Describe(IOutputStream& o) const noexcept override { + o << "CdcStreamScan {" + << " TxId: " << TxId + << " TablePathId: " << TablePathId + << " StreamPathId: " << StreamPathId + << " }"; + } + + IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr<TScheme> scheme) noexcept override { + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); + Driver = driver; + Y_VERIFY(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size()); + return {EScan::Feed, {}}; + } + + void Registered(TActorSystem* sys, const TActorId&) override { + sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId())); + } + + EScan Seek(TLead& lead, ui64) noexcept override { + if (LastKey) { + lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper); + } else { + lead.To(ValueTags, {}, ESeek::Lower); + } + + return EScan::Feed; + } + + EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept override { + Buffer.AddRow(key, *row); + if (Buffer.Bytes() < Limits.BatchMaxBytes) { + if (Buffer.Rows() < Limits.BatchMaxRows) { + return EScan::Feed; + } + } else { + if (Buffer.Rows() < Limits.BatchMinRows) { + return EScan::Feed; + } + } + + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( + TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()) + )); + return EScan::Sleep; + } + + EScan Exhausted() noexcept override { + NoMoreData = true; + + if (!Buffer) { + return EScan::Final; + } + + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( + TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()) + )); + return EScan::Sleep; + } + + TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override { + if (abort != EAbort::None) { + Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); + } else { + Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE); + } + + PassAway(); + return nullptr; + } + +private: + const TDataShardId DataShard; + TActorId ReplyTo; + const ui64 TxId; + const TPathId TablePathId; + const TPathId StreamPathId; + const TRowVersion ReadVersion; + const TVector<TTag> ValueTags; + const TMaybe<TSerializedCellVec> LastKey; + const TLimits Limits; + + IDriver* Driver; + bool NoMoreData; + TBuffer Buffer; + +}; // TCdcStreamScan + +class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> { + TEvDataShard::TEvCdcStreamScanRequest::TPtr Request; + THolder<IEventHandle> Response; // response to sender or forward to scanner + + THolder<IEventHandle> MakeResponse(const TActorContext& ctx, + NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) const + { + return MakeHolder<IEventHandle>(Request->Sender, ctx.SelfID, new TEvDataShard::TEvCdcStreamScanResponse( + Request->Get()->Record, Self->TabletID(), status, error + )); + } + +public: + explicit TTxCdcStreamScanRun(TDataShard* self, TEvDataShard::TEvCdcStreamScanRequest::TPtr ev) + : TBase(self) + , Request(ev) + { + } + + TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_RUN; } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + const auto& record = Request->Get()->Record; + + LOG_D("Run" + << ": ev# " << record.ShortDebugString()); + + const auto tablePathId = PathIdFromPathId(record.GetTablePathId()); + if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST, + TStringBuilder() << "Unknown table" + << ": tablePathId# " << tablePathId); + return true; + } + + auto table = Self->GetUserTables().at(tablePathId.LocalPathId); + if (record.GetTableSchemaVersion() != table->GetTableSchemaVersion()) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR, + TStringBuilder() << "Schema version mismatch" + << ": tablePathId# " << tablePathId + << ", got# " << record.GetTableSchemaVersion() + << ", expected# " << table->GetTableSchemaVersion()); + return true; + } + + const auto streamPathId = PathIdFromPathId(record.GetStreamPathId()); + auto it = table->CdcStreams.find(streamPathId); + if (it == table->CdcStreams.end()) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR, + TStringBuilder() << "Unknown stream" + << ": tablePathId# " << tablePathId + << ", streamPathId# " << streamPathId); + return true; + } + + if (it->second.State != NKikimrSchemeOp::ECdcStreamStateScan) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR, + TStringBuilder() << "Unexpected stream state" + << ": tablePathId# " << tablePathId + << ", streamPathId# " << streamPathId + << ", state# " << it->second.State); + return true; + } + + if (Self->CdcStreamScanManager.GetScanId()) { + if (Self->CdcStreamScanManager.GetStreamPathId() == streamPathId) { + if (const auto& to = Self->CdcStreamScanManager.GetActorId()) { + Response = Request->Forward(to); + } else { + // nop, scan actor will report state when it starts + } + return true; + } + + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED); + return true; + } + + if (!record.HasSnapshotStep()) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST, + "SnapshotStep was not specified"); + return true; + } + + if (!record.HasSnapshotTxId()) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST, + "SnapshotTxId was not specified"); + return true; + } + + const TSnapshotKey snapshotKey(tablePathId, record.GetSnapshotStep(), record.GetSnapshotTxId()); + if (!Self->SnapshotManager.FindAvailable(snapshotKey)) { + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST, + TStringBuilder() << "Snapshot was not found" + << ": key# " << snapshotKey.ToTuple()); + return true; + } + + TVector<TTag> valueTags; + if (it->second.Mode != NKikimrSchemeOp::ECdcStreamModeKeysOnly) { + valueTags.reserve(table->Columns.size() - 1); + for (const auto& [tag, column] : table->Columns) { + if (!column.IsKey) { + valueTags.push_back(tag); + } + } + } + + NIceDb::TNiceDb db(txc.DB); + TMaybe<TSerializedCellVec> lastKey; + if (!Self->CdcStreamScanManager.LoadLastKey(db, lastKey, tablePathId, streamPathId)) { + return false; + } + + auto* appData = AppData(ctx); + const auto& taskName = appData->DataShardConfig.GetCdcInitialScanTaskName(); + const auto taskPrio = appData->DataShardConfig.GetCdcInitialScanTaskPriority(); + + const auto readVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId); + const ui64 localTxId = ++Self->NextTieBreakerIndex; + auto scan = MakeHolder<TCdcStreamScan>(Self, Request->Sender, localTxId, + tablePathId, streamPathId, readVersion, valueTags, lastKey, record.GetLimits()); + const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId, + TScanOptions() + .SetResourceBroker(taskName, taskPrio) + .SetSnapshotRowVersion(readVersion) + ); + Self->CdcStreamScanManager.Enqueue(localTxId, scanId, streamPathId); + + LOG_I("Run scan" + << ": streamPathId# " << streamPathId); + + Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::ACCEPTED); + return true; + } + + void Complete(const TActorContext& ctx) override { + if (Response) { + ctx.Send(Response.Release()); + } + } + +}; // TTxCdcStreamScanRun + +void TDataShard::Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxCdcStreamScanRun(this, ev), ctx); +} + +void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx) { + if (CdcStreamScanManager.GetTxId() != ev->Get()->TxId) { + LOG_W("Unknown cdc stream scan actor registered" + << ": at: " << TabletID()); + return; + } + + CdcStreamScanManager.Register(ev->Get()->ActorId); +} + +void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxCdcStreamScanProgress(this, ev), ctx); +} + +} diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h new file mode 100644 index 00000000000..ab44ae929b9 --- /dev/null +++ b/ydb/core/tx/datashard/cdc_stream_scan.h @@ -0,0 +1,37 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/base/pathid.h> +#include <ydb/core/tablet_flat/flat_cxx_database.h> + +namespace NKikimr::NDataShard { + +class TCdcStreamScanManager { +public: + TCdcStreamScanManager(); + + void Enqueue(ui64 txId, ui64 scanId, const TPathId& streamPathId); + void Register(const NActors::TActorId& actorId); + void Clear(); + void Forget(NTable::TDatabase& db, const TPathId& tablePathId, const TPathId& streamPathId); + + ui64 GetTxId() const { return TxId; } + ui64 GetScanId() const { return ScanId; } + const NActors::TActorId& GetActorId() const { return ActorId; } + const TPathId& GetStreamPathId() const { return StreamPathId; } + + void PersistLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, + const TPathId& tablePathId, const TPathId& streamPathId); + bool LoadLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result, + const TPathId& tablePathId, const TPathId& streamPathId); + void RemoveLastKey(NIceDb::TNiceDb& db, + const TPathId& tablePathId, const TPathId& streamPathId); + +private: + ui64 TxId; + ui64 ScanId; + NActors::TActorId ActorId; + TPathId StreamPathId; +}; + +} diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a8f6565a106..d92e82eef82 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3675,6 +3675,12 @@ void TDataShard::ScanComplete(NTable::EAbort, << ", at: "<< TabletID()); InFlightCondErase.Clear(); + } else if (CdcStreamScanManager.GetScanId() && CdcStreamScanManager.GetTxId() == cookie) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Cdc stream scan complete" + << ": cookie: " << cookie + << ", at: "<< TabletID()); + + CdcStreamScanManager.Clear(); } else if (!Pipeline.FinishStreamingTx(cookie)) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Scan complete at " << TabletID() << " for unknown tx " << cookie); diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index a2237a97ca0..1bdad5517bf 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -313,6 +313,9 @@ struct TEvDataShard { EvGetOpenTxs, /* for tests */ EvGetOpenTxsResult, /* for tests */ + EvCdcStreamScanRequest, + EvCdcStreamScanResponse, + EvEnd }; @@ -1582,6 +1585,31 @@ struct TEvDataShard { { } }; + struct TEvCdcStreamScanRequest + : public TEventPB<TEvCdcStreamScanRequest, + NKikimrTxDataShard::TEvCdcStreamScanRequest, + EvCdcStreamScanRequest> + { + }; + + struct TEvCdcStreamScanResponse + : public TEventPB<TEvCdcStreamScanResponse, + NKikimrTxDataShard::TEvCdcStreamScanResponse, + EvCdcStreamScanResponse> + { + TEvCdcStreamScanResponse() = default; + + explicit TEvCdcStreamScanResponse( + const NKikimrTxDataShard::TEvCdcStreamScanRequest& request, ui64 tabletId, + NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) + { + Record.SetTabletId(tabletId); + Record.MutableTablePathId()->CopyFrom(request.GetTablePathId()); + Record.MutableStreamPathId()->CopyFrom(request.GetStreamPathId()); + Record.SetStatus(status); + Record.SetErrorDescription(error); + } + }; }; IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 874fa1f8853..caeb7087d4b 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -14,6 +14,7 @@ #include "datashard_repl_offsets.h" #include "datashard_repl_offsets_client.h" #include "datashard_repl_offsets_server.h" +#include "cdc_stream_scan.h" #include "change_exchange.h" #include "change_record.h" #include "progress_queue.h" @@ -217,6 +218,8 @@ class TDataShard class TTxRemoveLockChangeRecords; class TTxVolatileTxCommit; class TTxVolatileTxAbort; + class TTxCdcStreamScanRun; + class TTxCdcStreamScanProgress; template <typename T> friend class TTxDirectBase; class TTxUploadRows; @@ -262,6 +265,7 @@ class TDataShard friend class TSnapshotManager; friend class TSchemaSnapshotManager; friend class TVolatileTxManager; + friend class TCdcStreamScanManager; friend class TReplicationSourceOffsetsClient; friend class TReplicationSourceOffsetsServer; @@ -271,6 +275,7 @@ class TDataShard friend class TBuildIndexScan; friend class TReadColumnsScan; friend class TCondEraseScan; + friend class TCdcStreamScan; friend class TDatashardKeySampler; friend class TS3UploadsManager; @@ -323,6 +328,9 @@ class TDataShard EvReplicationSourceOffsets, EvMediatorRestoreBackup, EvRemoveLockChangeRecords, + EvCdcStreamScanRegistered, + EvCdcStreamScanProgress, + EvCdcStreamScanContinue, EvEnd }; @@ -466,6 +474,41 @@ class TDataShard struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {}; struct TEvRemoveLockChangeRecords : public TEventLocal<TEvRemoveLockChangeRecords, EvRemoveLockChangeRecords> {}; + + struct TEvCdcStreamScanRegistered : public TEventLocal<TEvCdcStreamScanRegistered, EvCdcStreamScanRegistered> { + explicit TEvCdcStreamScanRegistered(ui64 txId, const TActorId& actorId) + : TxId(txId) + , ActorId(actorId) + { + } + + const ui64 TxId; + const TActorId ActorId; + }; + + struct TEvCdcStreamScanProgress : public TEventLocal<TEvCdcStreamScanProgress, EvCdcStreamScanProgress> { + explicit TEvCdcStreamScanProgress( + const TPathId& tablePathId, + const TPathId& streamPathId, + const TRowVersion& readVersion, + const TVector<ui32>& valueTags, + TVector<std::pair<TSerializedCellVec, TSerializedCellVec>>&& rows) + : TablePathId(tablePathId) + , StreamPathId(streamPathId) + , ReadVersion(readVersion) + , ValueTags(valueTags) + , Rows(std::move(rows)) + { + } + + const TPathId TablePathId; + const TPathId StreamPathId; + const TRowVersion ReadVersion; + const TVector<ui32> ValueTags; + TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows; + }; + + struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {}; }; struct Schema : NIceDb::Schema { @@ -906,6 +949,17 @@ class TDataShard using TColumns = TableColumns<TxId, ShardId>; }; + struct CdcStreamScans : Table<34> { + struct TableOwnerId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct TablePathId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct StreamOwnerId : Column<3, NScheme::NTypeIds::Uint64> {}; + struct StreamPathId : Column<4, NScheme::NTypeIds::Uint64> {}; + struct LastKey : Column<5, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId>; + using TColumns = TableColumns<TableOwnerId, TablePathId, StreamOwnerId, StreamPathId, LastKey>; + }; + using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue, DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress, Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts, @@ -913,7 +967,7 @@ class TDataShard ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived, UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts, LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits, - TxVolatileDetails, TxVolatileParticipants>; + TxVolatileDetails, TxVolatileParticipants, CdcStreamScans>; // These settings are persisted on each Init. So we use empty settings in order not to overwrite what // was changed by the user @@ -1117,6 +1171,9 @@ class TDataShard void Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvAsyncJobComplete::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCancelBackup::TPtr &ev, const TActorContext &ctx); @@ -1645,6 +1702,11 @@ public: void ScheduleRemoveLockChanges(ui64 lockId); void ScheduleRemoveAbandonedLockChanges(); + static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, + const TPathId& tablePathId, const TPathId& streamPathId); + static bool LoadCdcStreamScanLastKey(NIceDb::TNiceDb& db, TMaybe<TSerializedCellVec>& result, + const TPathId& tablePathId, const TPathId& streamPathId); + static void RemoveCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TPathId& tablePathId, const TPathId& streamPathId); static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op); void NotifySchemeshard(const TActorContext& ctx, ui64 txId = 0); @@ -1662,6 +1724,8 @@ public: TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; } const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; } + TCdcStreamScanManager& GetCdcStreamScanManager() { return CdcStreamScanManager; } + const TCdcStreamScanManager& GetCdcStreamScanManager() const { return CdcStreamScanManager; } template <typename... Args> bool PromoteCompleteEdge(Args&&... args) { @@ -2284,6 +2348,7 @@ private: TSnapshotManager SnapshotManager; TSchemaSnapshotManager SchemaSnapshotManager; TVolatileTxManager VolatileTxManager; + TCdcStreamScanManager CdcStreamScanManager; TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer; @@ -2648,6 +2713,9 @@ protected: HFunc(TEvDataShard::TEvRefreshVolatileSnapshotRequest, Handle); HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle); HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle); + HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); + HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle); + HFunc(TEvPrivate::TEvCdcStreamScanProgress, Handle); HFunc(TEvPrivate::TEvAsyncJobComplete, Handle); HFunc(TEvPrivate::TEvPeriodicWakeup, DoPeriodicTasks); HFunc(TEvents::TEvUndelivered, Handle); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index fd3be2d402e..691a0b8e6ea 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -786,7 +786,6 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - TShardedTableOptions SimpleTable() { return TShardedTableOptions() .Columns({ @@ -795,33 +794,40 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } - TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) { + TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly, .Format = format, - .VirtualTimestamps = vt, }; } - TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) { + TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate, .Format = format, - .VirtualTimestamps = vt, }; } - TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream", bool vt = false) { + TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format, const TString& name = "Stream") { return TCdcStream{ .Name = name, .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, .Format = format, - .VirtualTimestamps = vt, }; } + TCdcStream WithVirtualTimestamps(TCdcStream streamDesc) { + streamDesc.VirtualTimestamps = true; + return streamDesc; + } + + TCdcStream WithInitialScan(TCdcStream streamDesc) { + streamDesc.InitialState = NKikimrSchemeOp::ECdcStreamStateScan; + return streamDesc; + } + TString CalcPartitionKey(const TString& data) { NJson::TJsonValue json; UNIT_ASSERT(NJson::ReadJsonTree(data, &json)); @@ -1093,7 +1099,6 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - #define Y_UNIT_TEST_TRIPLET(N, VAR1, VAR2, VAR3) \ template<typename TRunner> void N(NUnitTest::TTestContext&); \ struct TTestRegistration##N { \ @@ -1164,7 +1169,7 @@ Y_UNIT_TEST_SUITE(Cdc) { } Y_UNIT_TEST_TRIPLET(VirtualTimestamps, PqRunner, YdsRunner, TopicRunner) { - TRunner::Read(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson, "Stream", true), {R"( + TRunner::Read(SimpleTable(), WithVirtualTimestamps(KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)), {R"( UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10), (2, 20), @@ -1916,6 +1921,126 @@ Y_UNIT_TEST_SUITE(Cdc) { } } + Y_UNIT_TEST(InitialScan) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + }); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )"); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"update":{"value":100},"key":[1]})", + R"({"update":{"value":200},"key":[2]})", + R"({"update":{"value":300},"key":[3]})", + }); + } + + Y_UNIT_TEST(InitialScanSkipUpdatedRows) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + TVector<THolder<IEventHandle>> delayed; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvCdcStreamScanRequest) { + delayed.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + if (delayed.empty()) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return !delayed.empty(); + }); + runtime.DispatchEvents(opts); + } + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (4, 40); + )"); + + ExecSQL(server, edgeActor, R"( + DELETE FROM `/Root/Table` WHERE key = 2; + )"); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":100},"key":[1]})", + R"({"update":{"value":40},"key":[4]})", + R"({"erase":{},"key":[2]})", + }); + + runtime.SetObserverFunc(prevObserver); + for (auto& ev : std::exchange(delayed, TVector<THolder<IEventHandle>>())) { + runtime.Send(ev.Release(), 0, true); + } + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":100},"key":[1]})", + R"({"update":{"value":40},"key":[4]})", + R"({"erase":{},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + }); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index d5a9599dace..79124ae262c 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1640,6 +1640,9 @@ ui64 AsyncAlterAddStream( desc.MutableStreamDescription()->SetMode(streamDesc.Mode); desc.MutableStreamDescription()->SetFormat(streamDesc.Format); desc.MutableStreamDescription()->SetVirtualTimestamps(streamDesc.VirtualTimestamps); + if (streamDesc.InitialState) { + desc.MutableStreamDescription()->SetState(*streamDesc.InitialState); + } return RunSchemeTx(*server->GetRuntime(), std::move(request)); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 0e8c4a14413..9bac26dbfe1 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -397,10 +397,12 @@ struct TShardedTableOptions { struct TCdcStream { using EMode = NKikimrSchemeOp::ECdcStreamMode; using EFormat = NKikimrSchemeOp::ECdcStreamFormat; + using EState = NKikimrSchemeOp::ECdcStreamState; TString Name; EMode Mode; EFormat Format; + TMaybe<EState> InitialState; bool VirtualTimestamps = false; }; diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp index e7a2c14f510..44912cd4633 100644 --- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -54,6 +54,15 @@ public: DataShard.GetSnapshotManager().RemoveSnapshot(txc.DB, key); } + auto& scanManager = DataShard.GetCdcStreamScanManager(); + scanManager.Forget(txc.DB, pathId, streamPathId); + if (scanManager.GetStreamPathId() == streamPathId) { + if (const auto scanId = scanManager.GetScanId()) { + DataShard.CancelScan(tableInfo->LocalTid, scanId); + } + scanManager.Clear(); + } + RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId)); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt index 30d43d36c9b..a6d51a786d9 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt @@ -199,6 +199,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index cad15eddcf8..5ed31fe71d1 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -200,6 +200,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/CMakeLists.linux.txt index cad15eddcf8..5ed31fe71d1 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux.txt @@ -200,6 +200,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_effective_acl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_identificators.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 734335427d7..293bcdfffa4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -19,6 +19,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TDeque<TPathId> BlockStoreVolumesToClean; TVector<ui64> ExportsToResume; TVector<ui64> ImportsToResume; + TVector<TPathId> CdcStreamScansToResume; bool Broken = false; explicit TTxInit(TSelf *self) @@ -2825,6 +2826,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, state); Self->IncrementPathDbRefCount(pathId); + if (state == NKikimrSchemeOp::ECdcStreamStateScan) { + Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" + << ", cdc stream pathId: " << pathId + << ", parent pathId: " << path->ParentPathId); + auto parent = Self->PathsById.at(path->ParentPathId); + + if (parent->NormalState()) { + CdcStreamScansToResume.push_back(pathId); + } + } + if (!rowset.Next()) { return false; } @@ -2886,6 +2898,38 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } + // Read CdcStreamScanShardStatus + { + auto rowset = db.Table<Schema::CdcStreamScanShardStatus>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + auto pathId = TPathId( + rowset.GetValue<Schema::CdcStreamScanShardStatus::OwnerPathId>(), + rowset.GetValue<Schema::CdcStreamScanShardStatus::LocalPathId>() + ); + auto shardIdx = TShardIdx( + rowset.GetValue<Schema::CdcStreamScanShardStatus::OwnerShardIdx>(), + rowset.GetValue<Schema::CdcStreamScanShardStatus::LocalShardIdx>() + ); + auto status = rowset.GetValue<Schema::CdcStreamScanShardStatus::Status>(); + + Y_VERIFY_S(Self->CdcStreams.contains(pathId), "Cdc stream not found" + << ": pathId# " << pathId); + + auto stream = Self->CdcStreams.at(pathId); + stream->ScanShards.emplace(shardIdx, status); + + if (status != NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) { + stream->PendingShards.insert(shardIdx); + } else { + stream->DoneShards.insert(shardIdx); + } + } + } + // Read DomainsPools { auto rowset = db.Table<Schema::StoragePools>().Range().Select(); @@ -4727,12 +4771,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { return; } - Self->ActivateAfterInitialization( - ctx, - std::move(delayPublications), - ExportsToResume, ImportsToResume, - std::move(TablesToClean), std::move(BlockStoreVolumesToClean) - ); + Self->ActivateAfterInitialization(ctx, { + .DelayPublications = std::move(delayPublications), + .ExportIds = ExportsToResume, + .ImportsIds = ImportsToResume, + .CdcStreamScans = std::move(CdcStreamScansToResume), + .TablesToClean = std::move(TablesToClean), + .BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean), + }); } }; diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp index 5201abd44b7..e8d12429402 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp @@ -144,8 +144,7 @@ struct TSchemeShard::TTxInitRoot : public TSchemeShard::TRwTxBase { << ", at schemeshard: " << Self->TabletID()); Self->SignalTabletActive(ctx); - - Self->ActivateAfterInitialization(ctx); + Self->ActivateAfterInitialization(ctx, {}); } }; @@ -440,7 +439,9 @@ struct TSchemeShard::TTxInitTenantSchemeShard : public TSchemeShard::TRwTxBase { auto publications = TSideEffects::TPublications(); publications[TTxId()] = TSideEffects::TPublications::mapped_type{Self->RootPathId()}; - Self->ActivateAfterInitialization(ctx, std::move(publications)); + Self->ActivateAfterInitialization(ctx, { + .DelayPublications = std::move(publications), + }); } }; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 2fc32251499..754f1efe4f5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -150,20 +150,20 @@ public: Y_VERIFY(context.SS->CdcStreams.contains(streamPath.Base()->PathId)); auto stream = context.SS->CdcStreams.at(streamPath.Base()->PathId); + TCdcStreamInfo::EState requiredState = TCdcStreamInfo::EState::ECdcStreamStateInvalid; TCdcStreamInfo::EState newState = TCdcStreamInfo::EState::ECdcStreamStateInvalid; switch (op.GetActionCase()) { case NKikimrSchemeOp::TAlterCdcStream::kDisable: - newState = TCdcStreamInfo::EState::ECdcStreamStateDisabled; + requiredState = TCdcStreamInfo::EState::ECdcStreamStateDisabled; + if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateReady) { + newState = requiredState; + } break; case NKikimrSchemeOp::TAlterCdcStream::kGetReady: + requiredState = TCdcStreamInfo::EState::ECdcStreamStateReady; if (stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan) { - newState = TCdcStreamInfo::EState::ECdcStreamStateReady; - } else { - result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() - << "Cannot switch to ready state" - << ": current# " << stream->State); - return result; + newState = requiredState; } break; default: @@ -172,6 +172,14 @@ public: return result; } + if (newState == TCdcStreamInfo::EState::ECdcStreamStateInvalid) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() + << "Cannot switch state" + << ": from# " << stream->State + << ", to# " << requiredState); + return result; + } + auto guard = context.DbGuard(); context.MemChanges.GrabPath(context.SS, streamPath.Base()->PathId); context.MemChanges.GrabCdcStream(context.SS, streamPath.Base()->PathId); @@ -183,8 +191,6 @@ public: auto streamAlter = stream->CreateNextVersion(); Y_VERIFY(streamAlter); - - Y_VERIFY(newState != TCdcStreamInfo::EState::ECdcStreamStateInvalid); streamAlter->State = newState; Y_VERIFY(!context.SS->FindTx(OperationId)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 1441944725c..fe08e99eb32 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -416,7 +416,7 @@ public: class TDone: public TSubOperationState { -private: +protected: TOperationId OperationId; TString DebugHint() const override { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 1dc4a69f177..de9d29af56c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -291,7 +291,7 @@ protected: Y_VERIFY(stream->AlterData); context.SS->DescribeCdcStream(childPathId, childName, stream->AlterData, *notice.MutableStreamDescription()); - if (stream->AlterData->State == NKikimrSchemeOp::ECdcStreamStateScan) { + if (stream->AlterData->State == TCdcStreamInfo::EState::ECdcStreamStateScan) { notice.SetSnapshotName("ChangefeedInitialScan"); } } @@ -307,7 +307,9 @@ public: using NCdcStreamState::TProposeAtTable::TProposeAtTable; bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { - NCdcStreamState::TProposeAtTable::HandleReply(ev, context); + if (!NCdcStreamState::TProposeAtTable::HandleReply(ev, context)) { + return false; + } const auto step = TStepId(ev->Get()->StepId); NIceDb::TNiceDb db(context.GetDB()); @@ -320,6 +322,55 @@ public: }; // TProposeAtTableWithInitialScan +class TDoneWithInitialScan: public TDone { +public: + using TDone::TDone; + + bool ProgressState(TOperationContext& context) override { + if (!TDone::ProgressState(context)) { + return false; + } + + const auto* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan); + const auto& pathId = txState->TargetPathId; + + Y_VERIFY(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + TMaybe<TPathId> streamPathId; + for (const auto& [_, childPathId] : path->GetChildren()) { + Y_VERIFY(context.SS->PathsById.contains(childPathId)); + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->CreateTxId != OperationId.GetTxId()) { + continue; + } + + Y_VERIFY(childPath->IsCdcStream() && !childPath->Dropped()); + Y_VERIFY(context.SS->CdcStreams.contains(childPathId)); + auto stream = context.SS->CdcStreams.at(childPathId); + + Y_VERIFY(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan); + Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan" + << ": found# " << *streamPathId + << ", another# " << childPathId); + streamPathId = childPathId; + } + + if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { + return true; + } + + Y_VERIFY(streamPathId); + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId)); + + return true; + } + +}; // TDoneWithInitialScan + class TNewCdcStreamAtTable: public TSubOperation { static TTxState::ETxState NextState() { return TTxState::ConfigureParts; @@ -353,7 +404,11 @@ class TNewCdcStreamAtTable: public TSubOperation { case TTxState::ProposedWaitParts: return MakeHolder<NTableState::TProposedWaitParts>(OperationId); case TTxState::Done: - return MakeHolder<TDone>(OperationId); + if (InitialScan) { + return MakeHolder<TDoneWithInitialScan>(OperationId); + } else { + return MakeHolder<TDone>(OperationId); + } default: return nullptr; } diff --git a/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp new file mode 100644 index 00000000000..2f0bfcbc640 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp @@ -0,0 +1,392 @@ +#include "schemeshard_impl.h" + +#include <ydb/core/tx/tx_proxy/proxy.h> + +#include <util/generic/deque.h> + +#if defined LOG_D || \ + defined LOG_W || \ + defined LOG_E +#error log macro redefinition +#endif + +#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) +#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) +#define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[CdcStreamScan] " << stream) + +namespace NKikimr::NSchemeShard { + +using namespace NTabletFlatExecutor; + +class TCdcStreamScanFinalizer: public TActorBootstrapped<TCdcStreamScanFinalizer> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER; + } + + explicit TCdcStreamScanFinalizer(const TActorId& ssActorId, THolder<TEvSchemeShard::TEvModifySchemeTransaction>&& req) + : SSActorId(ssActorId) + , Request(std::move(req)) // template without txId + { + } + + void Bootstrap() { + AllocateTxId(); + Become(&TCdcStreamScanFinalizer::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle) + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + void AllocateTxId() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + Request->Record.SetTxId(ev->Get()->TxId); + Send(SSActorId, Request.Release()); + } + +private: + const TActorId SSActorId; + THolder<TEvSchemeShard::TEvModifySchemeTransaction> Request; + +}; // TCdcStreamScanFinalizer + +struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchemeShard> { + // params + TEvPrivate::TEvRunCdcStreamScan::TPtr RunCdcStreamScan = nullptr; + TEvDataShard::TEvCdcStreamScanResponse::TPtr CdcStreamScanResponse = nullptr; + struct { + TPathId StreamPathId; + TTabletId TabletId; + explicit operator bool() const { return StreamPathId && TabletId; } + } PipeRetry; + + // side effects + TDeque<std::tuple<TPathId, TTabletId, THolder<IEventBase>>> ScanRequests; + TPathId StreamToProgress; + THolder<TEvSchemeShard::TEvModifySchemeTransaction> Finalize; + +public: + explicit TTxProgress(TSelf* self, TEvPrivate::TEvRunCdcStreamScan::TPtr& ev) + : TBase(self) + , RunCdcStreamScan(ev) + { + } + + explicit TTxProgress(TSelf* self, TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) + : TBase(self) + , CdcStreamScanResponse(ev) + { + } + + explicit TTxProgress(TSelf* self, const TPathId& streamPathId, TTabletId tabletId) + : TBase(self) + , PipeRetry({streamPathId, tabletId}) + { + } + + TTxType GetTxType() const override { + return TXTYPE_CDC_STREAM_SCAN_PROGRESS; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + if (RunCdcStreamScan) { + return OnRunCdcStreamScan(txc, ctx); + } else if (CdcStreamScanResponse) { + return OnCdcStreamScanResponse(txc, ctx); + } else if (PipeRetry) { + return OnPipeRetry(txc, ctx); + } else { + Y_FAIL("unreachable"); + } + } + + void Complete(const TActorContext& ctx) override { + for (auto& [streamPathId, tabletId, ev] : ScanRequests) { + Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx); + } + + if (StreamToProgress) { + ctx.Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(StreamToProgress)); + } + + if (Finalize) { + Self->CdcStreamScanFinalizer = ctx.Register(new TCdcStreamScanFinalizer(ctx.SelfID, std::move(Finalize))); + } + } + +private: + bool OnRunCdcStreamScan(TTransactionContext& txc, const TActorContext& ctx) { + const auto& streamPathId = RunCdcStreamScan->Get()->StreamPathId; + + LOG_D("Run" + << ": streamPathId# " << streamPathId); + + if (!Self->CdcStreams.contains(streamPathId)) { + LOG_W("Cannot run" + << ": streamPathId# " << streamPathId + << ", reason# " << "stream doesn't exist"); + return true; + } + + auto streamInfo = Self->CdcStreams.at(streamPathId); + if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + LOG_W("Cannot run" + << ": streamPathId# " << streamPathId + << ", reason# " << "unexpected state"); + return true; + } + + Y_VERIFY(Self->PathsById.contains(streamPathId)); + auto streamPath = Self->PathsById.at(streamPathId); + + Y_VERIFY(Self->PathsById.contains(streamPathId)); + const auto& tablePathId = Self->PathsById.at(streamPathId)->ParentPathId; + + Y_VERIFY(Self->Tables.contains(tablePathId)); + auto table = Self->Tables.at(tablePathId); + + if (streamInfo->ScanShards.empty()) { + NIceDb::TNiceDb db(txc.DB); + for (const auto& shard : table->GetPartitions()) { + const auto status = TCdcStreamInfo::TShardStatus(NKikimrTxDataShard::TEvCdcStreamScanResponse::PENDING); + streamInfo->ScanShards.emplace(shard.ShardIdx, status); + streamInfo->PendingShards.insert(shard.ShardIdx); + Self->PersistCdcStreamScanShardStatus(db, streamPathId, shard.ShardIdx, status); + } + } + + while (!streamInfo->PendingShards.empty()) { + if (streamInfo->InProgressShards.size() >= streamInfo->MaxInProgressShards) { + break; + } + + auto it = streamInfo->PendingShards.begin(); + + Y_VERIFY(Self->ShardInfos.contains(*it)); + const auto tabletId = Self->ShardInfos.at(*it).TabletID; + + streamInfo->InProgressShards.insert(*it); + streamInfo->PendingShards.erase(it); + + auto ev = MakeHolder<TEvDataShard::TEvCdcStreamScanRequest>(); + PathIdFromPathId(tablePathId, ev->Record.MutableTablePathId()); + ev->Record.SetTableSchemaVersion(table->AlterVersion); + PathIdFromPathId(streamPathId, ev->Record.MutableStreamPathId()); + ev->Record.SetSnapshotStep(ui64(streamPath->StepCreated)); + ev->Record.SetSnapshotTxId(ui64(streamPath->CreateTxId)); + ScanRequests.emplace_back(streamPathId, tabletId, std::move(ev)); + } + + if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) { + const auto path = TPath::Init(streamPathId, Self); + + Finalize = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(); + auto& tx = *Finalize->Record.AddTransaction(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream); + tx.SetWorkingDir(path.Parent().Parent().PathString()); // stream -> table -> working dir + tx.SetFailOnExist(false); + + auto& op = *tx.MutableAlterCdcStream(); + op.SetTableName(path.Parent().LeafName()); + op.SetStreamName(path.LeafName()); + op.MutableGetReady()->SetLockTxId(ui64(streamPath->CreateTxId)); + tx.MutableLockGuard()->SetOwnerTxId(ui64(streamPath->CreateTxId)); + } + + return true; + } + + bool OnCdcStreamScanResponse(TTransactionContext& txc, const TActorContext& ctx) { + const auto& record = CdcStreamScanResponse->Get()->Record; + + LOG_D("Response" + << ": ev# " << record.ShortDebugString()); + + const auto streamPathId = PathIdFromPathId(record.GetStreamPathId()); + if (!Self->CdcStreams.contains(streamPathId)) { + LOG_W("Cannot process response" + << ": streamPathId# " << streamPathId + << ", reason# " << "stream doesn't exist"); + return true; + } + + auto streamInfo = Self->CdcStreams.at(streamPathId); + if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + LOG_W("Cannot process response" + << ": streamPathId# " << streamPathId + << ", reason# " << "unexpected state"); + return true; + } + + const auto tabletId = TTabletId(record.GetTabletId()); + const auto shardIdx = Self->GetShardIdx(tabletId); + if (shardIdx == InvalidShardIdx) { + LOG_E("Cannot process response" + << ": streamPathId# " << streamPathId + << ", tabletId# " << tabletId + << ", reason# " << "tablet not found"); + return true; + } + + auto it = streamInfo->ScanShards.find(shardIdx); + if (it == streamInfo->ScanShards.end()) { + LOG_E("Cannot process response" + << ": streamPathId# " << streamPathId + << ", shardIdx# " << shardIdx + << ", reason# " << "shard not found"); + return true; + } + + auto& status = it->second; + if (!streamInfo->InProgressShards.contains(shardIdx)) { + LOG_W("Shard status mismatch" + << ": streamPathId# " << streamPathId + << ", shardIdx# " << shardIdx + << ", got# " << record.GetStatus() + << ", current# " << status.Status); + return true; + } + + switch (record.GetStatus()) { + case NKikimrTxDataShard::TEvCdcStreamScanResponse::ACCEPTED: + case NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS: + break; + + case NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE: + status.Status = record.GetStatus(); + streamInfo->DoneShards.insert(shardIdx); + streamInfo->InProgressShards.erase(shardIdx); + Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx); + StreamToProgress = streamPathId; + break; + + case NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED: + case NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED: + streamInfo->PendingShards.insert(shardIdx); + streamInfo->InProgressShards.erase(shardIdx); + Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx); + StreamToProgress = streamPathId; + break; + + case NKikimrTxDataShard::TEvCdcStreamScanResponse::BAD_REQUEST: + case NKikimrTxDataShard::TEvCdcStreamScanResponse::SCHEME_ERROR: + Y_FAIL("unreachable"); + + default: + LOG_E("Unexpected response status" + << ": status# " << static_cast<int>(record.GetStatus()) + << ", error# " << record.GetErrorDescription()); + return true; + } + + NIceDb::TNiceDb db(txc.DB); + Self->PersistCdcStreamScanShardStatus(db, streamPathId, shardIdx, status); + + if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) { + StreamToProgress = streamPathId; + } + + return true; + } + + bool OnPipeRetry(TTransactionContext&, const TActorContext& ctx) { + const auto& streamPathId = PipeRetry.StreamPathId; + const auto& tabletId = PipeRetry.TabletId; + + LOG_D("Pipe retry" + << ": streamPathId# " << streamPathId + << ", tabletId# " << tabletId); + + if (!Self->CdcStreams.contains(streamPathId)) { + LOG_W("Cannot retry" + << ": streamPathId# " << streamPathId + << ", reason# " << "stream doesn't exist"); + return true; + } + + auto streamInfo = Self->CdcStreams.at(streamPathId); + if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + LOG_W("Cannot retry" + << ": streamPathId# " << streamPathId + << ", reason# " << "unexpected state"); + return true; + } + + const auto shardIdx = Self->GetShardIdx(tabletId); + if (shardIdx == InvalidShardIdx) { + LOG_E("Cannot retry" + << ": streamPathId# " << streamPathId + << ", tabletId# " << tabletId + << ", reason# " << "tablet not found"); + return true; + } + + auto it = streamInfo->InProgressShards.find(shardIdx); + if (it == streamInfo->InProgressShards.end()) { + LOG_E("Cannot retry" + << ": streamPathId# " << streamPathId + << ", shardIdx# " << shardIdx + << ", reason# " << "shard not found"); + return true; + } + + streamInfo->PendingShards.insert(*it); + streamInfo->InProgressShards.erase(it); + Self->CdcStreamScanPipes.Close(streamPathId, tabletId, ctx); + StreamToProgress = streamPathId; + + return true; + } +}; + +ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev) { + return new TCdcStreamScan::TTxProgress(this, ev); +} + +ITransaction* TSchemeShard::CreateTxProgressCdcStreamScan(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) { + return new TCdcStreamScan::TTxProgress(this, ev); +} + +ITransaction* TSchemeShard::CreatePipeRetry(const TPathId& streamPathId, TTabletId tabletId) { + return new TCdcStreamScan::TTxProgress(this, streamPathId, tabletId); +} + +void TSchemeShard::Handle(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressCdcStreamScan(ev), ctx); +} + +void TSchemeShard::Handle(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressCdcStreamScan(ev), ctx); +} + +void TSchemeShard::ResumeCdcStreamScans(const TVector<TPathId>& ids, const TActorContext& ctx) { + for (const auto& id : ids) { + Send(ctx.SelfID, new TEvPrivate::TEvRunCdcStreamScan(id)); + } +} + +void TSchemeShard::PersistCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, + const TShardIdx& shardIdx, const TCdcStreamInfo::TShardStatus& status) +{ + db.Table<Schema::CdcStreamScanShardStatus>() + .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()) + .Update( + NIceDb::TUpdate<Schema::CdcStreamScanShardStatus::Status>(status.Status) + ); +} + +void TSchemeShard::RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx) { + db.Table<Schema::CdcStreamScanShardStatus>() + .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()) + .Delete(); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index fba9c2c0676..5804e7c5b61 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -56,14 +56,7 @@ bool ResolvePoolNames( const TSchemeLimits TSchemeShard::DefaultLimits = {}; -void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, - TSideEffects::TPublications&& delayPublications, - const TVector<ui64>& exportIds, - const TVector<ui64>& importsIds, - TVector<TPathId>&& tablesToClean, - TDeque<TPathId>&& blockStoreVolumesToClean - ) -{ +void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts) { TPathId subDomainPathId = GetCurrentSubDomainPathId(); TSubDomainInfo::TPtr domainPtr = ResolveDomainInfo(subDomainPathId); LoginProvider.Audience = TPath::Init(subDomainPathId, this).PathString(); @@ -74,14 +67,14 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, GetDomainKey(subDomainPathId), sysViewProcessorId ? sysViewProcessorId.GetValue() : 0); Send(SysPartitionStatsCollector, evInit.Release()); - Execute(CreateTxInitPopulator(std::move(delayPublications)), ctx); + Execute(CreateTxInitPopulator(std::move(opts.DelayPublications)), ctx); - if (tablesToClean) { - Execute(CreateTxCleanTables(std::move(tablesToClean)), ctx); + if (opts.TablesToClean) { + Execute(CreateTxCleanTables(std::move(opts.TablesToClean)), ctx); } - if (blockStoreVolumesToClean) { - Execute(CreateTxCleanBlockStoreVolumes(std::move(blockStoreVolumesToClean)), ctx); + if (opts.BlockStoreVolumesToClean) { + Execute(CreateTxCleanBlockStoreVolumes(std::move(opts.BlockStoreVolumesToClean)), ctx); } if (IsDomainSchemeShard) { @@ -114,8 +107,9 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, SVPMigrator = Register(CreateSVPMigrator(TabletID(), SelfId(), std::move(migrations)).Release()); } - ResumeExports(exportIds, ctx); - ResumeImports(importsIds, ctx); + ResumeExports(opts.ExportIds, ctx); + ResumeImports(opts.ImportsIds, ctx); + ResumeCdcStreamScans(opts.CdcStreamScans, ctx); ParentDomainLink.SendSync(ctx); @@ -1623,6 +1617,10 @@ void TSchemeShard::PersistRemoveCdcStream(NIceDb::TNiceDb &db, const TPathId& pa db.Table<Schema::CdcStream>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); + for (const auto& [shardIdx, _] : stream->ScanShards) { + RemoveCdcStreamScanShardStatus(db, pathId, shardIdx); + } + CdcStreams.erase(pathId); DecrementPathDbRefCount(pathId); } @@ -3969,7 +3967,12 @@ void TSchemeShard::Die(const TActorContext &ctx) { ctx.Send(SVPMigrator, new TEvents::TEvPoisonPill()); } + if (CdcStreamScanFinalizer) { + ctx.Send(CdcStreamScanFinalizer, new TEvents::TEvPoisonPill()); + } + IndexBuildPipes.Shutdown(ctx); + CdcStreamScanPipes.Shutdown(ctx); ShardDeleter.Shutdown(ctx); ParentDomainLink.Shutdown(ctx); @@ -4255,6 +4258,11 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvPrivate::TEvIndexBuildingMakeABill, Handle); // } // NIndexBuilder + //namespace NCdcStreamScan { + HFuncTraced(TEvPrivate::TEvRunCdcStreamScan, Handle); + HFuncTraced(TEvDataShard::TEvCdcStreamScanResponse, Handle); + // } // NCdcStreamScan + // namespace NLongRunningCommon { HFuncTraced(TEvTxAllocatorClient::TEvAllocateResult, Handle); HFuncTraced(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); @@ -4856,6 +4864,11 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc return; } + if (CdcStreamScanPipes.Has(clientId)) { + Execute(CreatePipeRetry(CdcStreamScanPipes.GetOwnerId(clientId), CdcStreamScanPipes.GetTabletId(clientId)), ctx); + return; + } + if (ShardDeleter.Has(tabletId, clientId)) { ShardDeleter.ResendDeleteRequests(TTabletId(ev->Get()->TabletId), ShardInfos, ctx); return; @@ -4900,6 +4913,11 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc return; } + if (CdcStreamScanPipes.Has(clientId)) { + Execute(CreatePipeRetry(CdcStreamScanPipes.GetOwnerId(clientId), CdcStreamScanPipes.GetTabletId(clientId)), ctx); + return; + } + if (ShardDeleter.Has(tabletId, clientId)) { ShardDeleter.ResendDeleteRequests(tabletId, ShardInfos, ctx); return; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 5568c57df74..d53c3c090dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -270,6 +270,7 @@ public: TActorId SysPartitionStatsCollector; TActorId SVPMigrator; + TActorId CdcStreamScanFinalizer; TDuration StatsMaxExecuteTime; TDuration StatsBatchTimeout; @@ -753,13 +754,15 @@ public: struct TTxInitTenantSchemeShard; NTabletFlatExecutor::ITransaction* CreateTxInitTenantSchemeShard(TEvSchemeShard::TEvInitTenantSchemeShard::TPtr &ev); - void ActivateAfterInitialization(const TActorContext &ctx, - TSideEffects::TPublications&& delayPublications = {}, - const TVector<ui64>& exportIds = {}, - const TVector<ui64>& importsIds = {}, - TVector<TPathId>&& tablesToClean = {}, - TDeque<TPathId>&& blockStoreVolumesToClean = {} - ); + struct TActivationOpts { + TSideEffects::TPublications DelayPublications; + TVector<ui64> ExportIds; + TVector<ui64> ImportsIds; + TVector<TPathId> CdcStreamScans; + TVector<TPathId> TablesToClean; + TDeque<TPathId> BlockStoreVolumesToClean; + }; + void ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts); struct TTxInitPopulator; NTabletFlatExecutor::ITransaction* CreateTxInitPopulator(TSideEffects::TPublications&& publications); @@ -1195,7 +1198,6 @@ public: NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId); NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev); - void Handle(TEvIndexBuilder::TEvCreateRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvIndexBuilder::TEvGetRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvIndexBuilder::TEvCancelRequest::TPtr& ev, const TActorContext& ctx); @@ -1211,6 +1213,27 @@ public: // } //NIndexBuilder + // namespace NCdcStreamScan { + struct TCdcStreamScan { + struct TTxProgress; + }; + + TDedicatedPipePool<TPathId> CdcStreamScanPipes; + + NTabletFlatExecutor::ITransaction* CreateTxProgressCdcStreamScan(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressCdcStreamScan(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreatePipeRetry(const TPathId& streamPathId, TTabletId tabletId); + + void Handle(TEvPrivate::TEvRunCdcStreamScan::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev, const TActorContext& ctx); + + void ResumeCdcStreamScans(const TVector<TPathId>& ids, const TActorContext& ctx); + + void PersistCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx, + const TCdcStreamInfo::TShardStatus& status); + void RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx); + // } // NCdcStreamScan + public: void ChangeStreamShardsCount(i64 delta) override; void ChangeStreamShardsQuota(i64 delta) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 4292ead6b92..304b441cd13 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2309,6 +2309,17 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { using EFormat = NKikimrSchemeOp::ECdcStreamFormat; using EState = NKikimrSchemeOp::ECdcStreamState; + // shards of the table + struct TShardStatus { + NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus Status; + + explicit TShardStatus(NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status) + : Status(status) + {} + }; + + static constexpr ui32 MaxInProgressShards = 10; + TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, EState state) : AlterVersion(version) , Mode(mode) @@ -2349,6 +2360,11 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { EState State; TCdcStreamInfo::TPtr AlterData = nullptr; + + TMap<TShardIdx, TShardStatus> ScanShards; + THashSet<TShardIdx> PendingShards; + THashSet<TShardIdx> InProgressShards; + THashSet<TShardIdx> DoneShards; }; struct TSequenceInfo : public TSimpleRefCount<TSequenceInfo> { diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index feace051751..6a2171865c5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -26,6 +26,7 @@ struct TEvPrivate { EvCompleteBarrier, EvPersistStats, EvConsoleConfigsTimeout, + EvRunCdcStreamScan, EvEnd }; @@ -165,6 +166,14 @@ struct TEvPrivate { struct TEvConsoleConfigsTimeout: public TEventLocal<TEvConsoleConfigsTimeout, EvConsoleConfigsTimeout> { }; + struct TEvRunCdcStreamScan: public TEventLocal<TEvRunCdcStreamScan, EvRunCdcStreamScan> { + const TPathId StreamPathId; + + TEvRunCdcStreamScan(const TPathId& streamPathId) + : StreamPathId(streamPathId) + {} + }; + }; // TEvPrivate } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index ed451025d63..007959c4860 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1552,6 +1552,26 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>; }; + struct CdcStreamScanShardStatus : Table<103> { + // path id of cdc stream + struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; + // shard idx of datashard + struct OwnerShardIdx : Column<3, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct LocalShardIdx : Column<4, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; }; + + struct Status : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus; }; + + using TKey = TableKey<OwnerPathId, LocalPathId, OwnerShardIdx, LocalShardIdx>; + using TColumns = TableColumns< + OwnerPathId, + LocalPathId, + OwnerShardIdx, + LocalShardIdx, + Status + >; + }; + struct Sequences : Table<97> { struct PathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {}; @@ -1700,7 +1720,8 @@ struct Schema : NIceDb::Schema { SequencesAlters, Replications, ReplicationsAlterData, - BlobDepots + BlobDepots, + CdcStreamScanShardStatus >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index e4a40a535aa..c9225677822 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -886,6 +886,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { TTestEnv env(runtime, TTestEnvOptions() .EnableProtoSourceIdInfo(true) .EnableChangefeedInitialScan(true)); + runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -920,6 +921,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { TestModificationResult(runtime, txId, expectedStatus); }; + // try to disable + testAlterCdcStream(++txId, "/MyRoot", R"( + TableName: "Table" + StreamName: "Stream" + Disable {} + )", lockTxId, NKikimrScheme::StatusPreconditionFailed); + // without guard & lockTxId testAlterCdcStream(++txId, "/MyRoot", R"( TableName: "Table" @@ -979,6 +987,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { TTestEnv env(runtime, TTestEnvOptions() .EnableProtoSourceIdInfo(true) .EnableChangefeedInitialScan(true)); + runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index d067dccb5af..a90a8b6705b 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -12,6 +12,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); + runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint64" } @@ -43,7 +45,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { NLs::PathExist, - NLs::StreamState(state.GetOrElse(NKikimrSchemeOp::ECdcStreamStateReady)), NLs::StreamVirtualTimestamps(vt), }); }); @@ -123,6 +124,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); + runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint64" } @@ -182,6 +185,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); + runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint64" } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 20d676d546b..73c678da014 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1814,6 +1814,68 @@ } }, { + "TableId": 34, + "TableName": "CdcStreamScans", + "TableKey": [ + 1, + 2, + 3, + 4 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "TableOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "TablePathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "StreamOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "StreamPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "LastKey", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { "TableId": 101, "TableName": "LockChangeRecords", "TableKey": [ diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 771cf1fd6f3..93f379652fa 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6708,5 +6708,67 @@ "Blobs": 1 } } + }, + { + "TableId": 103, + "TableName": "CdcStreamScanShardStatus", + "TableKey": [ + 1, + 2, + 3, + 4 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "OwnerPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LocalPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "OwnerShardIdx", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "LocalShardIdx", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "Status", + "ColumnType": "Uint32" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |