diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2025-05-29 19:38:48 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-29 16:38:48 +0000 |
commit | 9fd893ca3552e0a08bd357deaaa032cad266b2bc (patch) | |
tree | 8179e329ba91906f48cfaeef495be7e2c7ce3edc | |
parent | b08e92fc20e246e1e2b672a605138865ee6aa038 (diff) | |
download | ydb-9fd893ca3552e0a08bd357deaaa032cad266b2bc.tar.gz |
(refactoring) CDC stream settings (#18958)
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 80 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 14 |
3 files changed, 72 insertions, 57 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f05a0dd1a6e..284f9cf38a5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3044,12 +3044,13 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TLocalPathId(rowset.GetValue<Schema::CdcStream::LocalPathId>()) ); auto alterVersion = rowset.GetValue<Schema::CdcStream::AlterVersion>(); - auto mode = rowset.GetValue<Schema::CdcStream::Mode>(); - auto format = rowset.GetValue<Schema::CdcStream::Format>(); - auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false); - auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0)); - auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>(); - auto state = rowset.GetValue<Schema::CdcStream::State>(); + auto settings = TCdcStreamSettings() + .WithMode(rowset.GetValue<Schema::CdcStream::Mode>()) + .WithFormat(rowset.GetValue<Schema::CdcStream::Format>()) + .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false)) + .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0))) + .WithAwsRegion(rowset.GetValue<Schema::CdcStream::AwsRegion>()) + .WithState(rowset.GetValue<Schema::CdcStream::State>()); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); auto path = Self->PathsById.at(pathId); @@ -3059,10 +3060,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { << ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType)); Y_ABORT_UNLESS(!Self->CdcStreams.contains(pathId)); - Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state); + const auto& stream = Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, std::move(settings)); Self->IncrementPathDbRefCount(pathId); - if (state == NKikimrSchemeOp::ECdcStreamStateScan) { + if (stream->State == NKikimrSchemeOp::ECdcStreamStateScan) { Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId << ", parent pathId: " << path->ParentPathId); @@ -3087,14 +3088,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TOwnerId(rowset.GetValue<Schema::CdcStreamAlterData::OwnerPathId>()), TLocalPathId(rowset.GetValue<Schema::CdcStreamAlterData::LocalPathId>()) ); - auto alterVersion = rowset.GetValue<Schema::CdcStreamAlterData::AlterVersion>(); - auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>(); - auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>(); - auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false); - auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0)); - auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>(); - auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>(); + auto settings = TCdcStreamSettings() + .WithMode(rowset.GetValue<Schema::CdcStreamAlterData::Mode>()) + .WithFormat(rowset.GetValue<Schema::CdcStreamAlterData::Format>()) + .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false)) + .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0))) + .WithAwsRegion(rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>()) + .WithState(rowset.GetValue<Schema::CdcStreamAlterData::State>()); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); auto path = Self->PathsById.at(pathId); @@ -3105,14 +3106,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (!Self->CdcStreams.contains(pathId)) { Y_ABORT_UNLESS(alterVersion == 1); - Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, rt, awsRegion); + Self->CdcStreams[pathId] = TCdcStreamInfo::New(settings); Self->IncrementPathDbRefCount(pathId); } auto stream = Self->CdcStreams.at(pathId); Y_ABORT_UNLESS(stream->AlterData == nullptr); Y_ABORT_UNLESS(stream->AlterVersion < alterVersion); - stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state); + stream->AlterData = new TCdcStreamInfo(alterVersion, std::move(settings)); Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 8f805bcf622..39a5f7819e8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2491,12 +2491,35 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> { std::variant<std::monostate, NKikimrSchemeOp::TVectorIndexKmeansTreeDescription> SpecializedIndexDescription; }; -struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { - using TPtr = TIntrusivePtr<TCdcStreamInfo>; +struct TCdcStreamSettings { + using TSelf = TCdcStreamSettings; using EMode = NKikimrSchemeOp::ECdcStreamMode; using EFormat = NKikimrSchemeOp::ECdcStreamFormat; using EState = NKikimrSchemeOp::ECdcStreamState; + #define OPTION(type, name) \ + TSelf&& With##name(type value) && { \ + name = std::move(value); \ + return std::move(*this); \ + } \ + type name; + + OPTION(EMode, Mode); + OPTION(EFormat, Format); + OPTION(bool, VirtualTimestamps); + OPTION(TDuration, ResolvedTimestamps); + OPTION(TString, AwsRegion); + OPTION(EState, State); + + #undef OPTION +}; + +struct TCdcStreamInfo + : public TCdcStreamSettings + , public TSimpleRefCount<TCdcStreamInfo> +{ + using TPtr = TIntrusivePtr<TCdcStreamInfo>; + // shards of the table struct TShardStatus { NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus Status; @@ -2506,14 +2529,9 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { {} }; - TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state) - : AlterVersion(version) - , Mode(mode) - , Format(format) - , VirtualTimestamps(vt) - , ResolvedTimestamps(rt) - , AwsRegion(awsRegion) - , State(state) + TCdcStreamInfo(ui64 version, TCdcStreamSettings&& settings) + : TCdcStreamSettings(std::move(settings)) + , AlterVersion(version) {} TCdcStreamInfo(const TCdcStreamInfo&) = default; @@ -2526,13 +2544,18 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { return result; } - static TPtr New(EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion) { - return new TCdcStreamInfo(0, mode, format, vt, rt, awsRegion, EState::ECdcStreamStateInvalid); + static TPtr New(TCdcStreamSettings settings) { + settings.State = EState::ECdcStreamStateInvalid; + return new TCdcStreamInfo(0, std::move(settings)); } static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) { - TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), - TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()), desc.GetAwsRegion()); + TPtr result = New(TCdcStreamSettings() + .WithMode(desc.GetMode()) + .WithFormat(desc.GetFormat()) + .WithVirtualTimestamps(desc.GetVirtualTimestamps()) + .WithResolvedTimestamps(TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs())) + .WithAwsRegion(desc.GetAwsRegion())); TPtr alterData = result->CreateNextVersion(); alterData->State = EState::ECdcStreamStateReady; if (desc.HasState()) { @@ -2542,28 +2565,31 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { return result; } + void Serialize(NKikimrSchemeOp::TCdcStreamDescription& desc) const { + desc.SetSchemaVersion(AlterVersion); + desc.SetMode(Mode); + desc.SetFormat(Format); + desc.SetVirtualTimestamps(VirtualTimestamps); + desc.SetResolvedTimestampsIntervalMs(ResolvedTimestamps.MilliSeconds()); + desc.SetAwsRegion(AwsRegion); + desc.SetState(State); + if (ScanShards) { + auto& scanProgress = *desc.MutableScanProgress(); + scanProgress.SetShardsTotal(ScanShards.size()); + scanProgress.SetShardsCompleted(DoneShards.size()); + } + } + void FinishAlter() { Y_ABORT_UNLESS(AlterData); AlterVersion = AlterData->AlterVersion; - Mode = AlterData->Mode; - Format = AlterData->Format; - VirtualTimestamps = AlterData->VirtualTimestamps; - ResolvedTimestamps = AlterData->ResolvedTimestamps; - AwsRegion = AlterData->AwsRegion; - State = AlterData->State; + static_cast<TCdcStreamSettings&>(*this) = static_cast<TCdcStreamSettings&>(*AlterData); AlterData.Reset(); } ui64 AlterVersion = 1; - EMode Mode; - EFormat Format; - bool VirtualTimestamps; - TDuration ResolvedTimestamps; - TString AwsRegion; - EState State; - TCdcStreamInfo::TPtr AlterData = nullptr; TMap<TShardIdx, TShardStatus> ScanShards; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index d8feadd91f8..97558a7bff4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1471,20 +1471,8 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, << ", name# " << name); desc.SetName(name); - desc.SetMode(info->Mode); - desc.SetFormat(info->Format); - desc.SetVirtualTimestamps(info->VirtualTimestamps); - desc.SetResolvedTimestampsIntervalMs(info->ResolvedTimestamps.MilliSeconds()); - desc.SetAwsRegion(info->AwsRegion); pathId.ToProto(desc.MutablePathId()); - desc.SetState(info->State); - desc.SetSchemaVersion(info->AlterVersion); - - if (info->ScanShards) { - auto& scanProgress = *desc.MutableScanProgress(); - scanProgress.SetShardsTotal(info->ScanShards.size()); - scanProgress.SetShardsCompleted(info->DoneShards.size()); - } + info->Serialize(desc); Y_ABORT_UNLESS(PathsById.contains(pathId)); auto path = PathsById.at(pathId); |