aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-05-29 19:38:48 +0300
committerGitHub <noreply@github.com>2025-05-29 16:38:48 +0000
commit9fd893ca3552e0a08bd357deaaa032cad266b2bc (patch)
tree8179e329ba91906f48cfaeef495be7e2c7ce3edc
parentb08e92fc20e246e1e2b672a605138865ee6aa038 (diff)
downloadydb-9fd893ca3552e0a08bd357deaaa032cad266b2bc.tar.gz
(refactoring) CDC stream settings (#18958)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp35
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h80
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp14
3 files changed, 72 insertions, 57 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index f05a0dd1a6e..284f9cf38a5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -3044,12 +3044,13 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TLocalPathId(rowset.GetValue<Schema::CdcStream::LocalPathId>())
);
auto alterVersion = rowset.GetValue<Schema::CdcStream::AlterVersion>();
- auto mode = rowset.GetValue<Schema::CdcStream::Mode>();
- auto format = rowset.GetValue<Schema::CdcStream::Format>();
- auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false);
- auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0));
- auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>();
- auto state = rowset.GetValue<Schema::CdcStream::State>();
+ auto settings = TCdcStreamSettings()
+ .WithMode(rowset.GetValue<Schema::CdcStream::Mode>())
+ .WithFormat(rowset.GetValue<Schema::CdcStream::Format>())
+ .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false))
+ .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0)))
+ .WithAwsRegion(rowset.GetValue<Schema::CdcStream::AwsRegion>())
+ .WithState(rowset.GetValue<Schema::CdcStream::State>());
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
auto path = Self->PathsById.at(pathId);
@@ -3059,10 +3060,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType));
Y_ABORT_UNLESS(!Self->CdcStreams.contains(pathId));
- Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state);
+ const auto& stream = Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, std::move(settings));
Self->IncrementPathDbRefCount(pathId);
- if (state == NKikimrSchemeOp::ECdcStreamStateScan) {
+ if (stream->State == NKikimrSchemeOp::ECdcStreamStateScan) {
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
<< ", parent pathId: " << path->ParentPathId);
@@ -3087,14 +3088,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TOwnerId(rowset.GetValue<Schema::CdcStreamAlterData::OwnerPathId>()),
TLocalPathId(rowset.GetValue<Schema::CdcStreamAlterData::LocalPathId>())
);
-
auto alterVersion = rowset.GetValue<Schema::CdcStreamAlterData::AlterVersion>();
- auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>();
- auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>();
- auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false);
- auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0));
- auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>();
- auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>();
+ auto settings = TCdcStreamSettings()
+ .WithMode(rowset.GetValue<Schema::CdcStreamAlterData::Mode>())
+ .WithFormat(rowset.GetValue<Schema::CdcStreamAlterData::Format>())
+ .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false))
+ .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0)))
+ .WithAwsRegion(rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>())
+ .WithState(rowset.GetValue<Schema::CdcStreamAlterData::State>());
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
auto path = Self->PathsById.at(pathId);
@@ -3105,14 +3106,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (!Self->CdcStreams.contains(pathId)) {
Y_ABORT_UNLESS(alterVersion == 1);
- Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, rt, awsRegion);
+ Self->CdcStreams[pathId] = TCdcStreamInfo::New(settings);
Self->IncrementPathDbRefCount(pathId);
}
auto stream = Self->CdcStreams.at(pathId);
Y_ABORT_UNLESS(stream->AlterData == nullptr);
Y_ABORT_UNLESS(stream->AlterVersion < alterVersion);
- stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state);
+ stream->AlterData = new TCdcStreamInfo(alterVersion, std::move(settings));
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 8f805bcf622..39a5f7819e8 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2491,12 +2491,35 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> {
std::variant<std::monostate, NKikimrSchemeOp::TVectorIndexKmeansTreeDescription> SpecializedIndexDescription;
};
-struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
- using TPtr = TIntrusivePtr<TCdcStreamInfo>;
+struct TCdcStreamSettings {
+ using TSelf = TCdcStreamSettings;
using EMode = NKikimrSchemeOp::ECdcStreamMode;
using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
using EState = NKikimrSchemeOp::ECdcStreamState;
+ #define OPTION(type, name) \
+ TSelf&& With##name(type value) && { \
+ name = std::move(value); \
+ return std::move(*this); \
+ } \
+ type name;
+
+ OPTION(EMode, Mode);
+ OPTION(EFormat, Format);
+ OPTION(bool, VirtualTimestamps);
+ OPTION(TDuration, ResolvedTimestamps);
+ OPTION(TString, AwsRegion);
+ OPTION(EState, State);
+
+ #undef OPTION
+};
+
+struct TCdcStreamInfo
+ : public TCdcStreamSettings
+ , public TSimpleRefCount<TCdcStreamInfo>
+{
+ using TPtr = TIntrusivePtr<TCdcStreamInfo>;
+
// shards of the table
struct TShardStatus {
NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus Status;
@@ -2506,14 +2529,9 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
{}
};
- TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state)
- : AlterVersion(version)
- , Mode(mode)
- , Format(format)
- , VirtualTimestamps(vt)
- , ResolvedTimestamps(rt)
- , AwsRegion(awsRegion)
- , State(state)
+ TCdcStreamInfo(ui64 version, TCdcStreamSettings&& settings)
+ : TCdcStreamSettings(std::move(settings))
+ , AlterVersion(version)
{}
TCdcStreamInfo(const TCdcStreamInfo&) = default;
@@ -2526,13 +2544,18 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
return result;
}
- static TPtr New(EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion) {
- return new TCdcStreamInfo(0, mode, format, vt, rt, awsRegion, EState::ECdcStreamStateInvalid);
+ static TPtr New(TCdcStreamSettings settings) {
+ settings.State = EState::ECdcStreamStateInvalid;
+ return new TCdcStreamInfo(0, std::move(settings));
}
static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) {
- TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(),
- TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()), desc.GetAwsRegion());
+ TPtr result = New(TCdcStreamSettings()
+ .WithMode(desc.GetMode())
+ .WithFormat(desc.GetFormat())
+ .WithVirtualTimestamps(desc.GetVirtualTimestamps())
+ .WithResolvedTimestamps(TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()))
+ .WithAwsRegion(desc.GetAwsRegion()));
TPtr alterData = result->CreateNextVersion();
alterData->State = EState::ECdcStreamStateReady;
if (desc.HasState()) {
@@ -2542,28 +2565,31 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
return result;
}
+ void Serialize(NKikimrSchemeOp::TCdcStreamDescription& desc) const {
+ desc.SetSchemaVersion(AlterVersion);
+ desc.SetMode(Mode);
+ desc.SetFormat(Format);
+ desc.SetVirtualTimestamps(VirtualTimestamps);
+ desc.SetResolvedTimestampsIntervalMs(ResolvedTimestamps.MilliSeconds());
+ desc.SetAwsRegion(AwsRegion);
+ desc.SetState(State);
+ if (ScanShards) {
+ auto& scanProgress = *desc.MutableScanProgress();
+ scanProgress.SetShardsTotal(ScanShards.size());
+ scanProgress.SetShardsCompleted(DoneShards.size());
+ }
+ }
+
void FinishAlter() {
Y_ABORT_UNLESS(AlterData);
AlterVersion = AlterData->AlterVersion;
- Mode = AlterData->Mode;
- Format = AlterData->Format;
- VirtualTimestamps = AlterData->VirtualTimestamps;
- ResolvedTimestamps = AlterData->ResolvedTimestamps;
- AwsRegion = AlterData->AwsRegion;
- State = AlterData->State;
+ static_cast<TCdcStreamSettings&>(*this) = static_cast<TCdcStreamSettings&>(*AlterData);
AlterData.Reset();
}
ui64 AlterVersion = 1;
- EMode Mode;
- EFormat Format;
- bool VirtualTimestamps;
- TDuration ResolvedTimestamps;
- TString AwsRegion;
- EState State;
-
TCdcStreamInfo::TPtr AlterData = nullptr;
TMap<TShardIdx, TShardStatus> ScanShards;
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index d8feadd91f8..97558a7bff4 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1471,20 +1471,8 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
<< ", name# " << name);
desc.SetName(name);
- desc.SetMode(info->Mode);
- desc.SetFormat(info->Format);
- desc.SetVirtualTimestamps(info->VirtualTimestamps);
- desc.SetResolvedTimestampsIntervalMs(info->ResolvedTimestamps.MilliSeconds());
- desc.SetAwsRegion(info->AwsRegion);
pathId.ToProto(desc.MutablePathId());
- desc.SetState(info->State);
- desc.SetSchemaVersion(info->AlterVersion);
-
- if (info->ScanShards) {
- auto& scanProgress = *desc.MutableScanProgress();
- scanProgress.SetShardsTotal(info->ScanShards.size());
- scanProgress.SetShardsCompleted(info->DoneShards.size());
- }
+ info->Serialize(desc);
Y_ABORT_UNLESS(PathsById.contains(pathId));
auto path = PathsById.at(pathId);