diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-02-28 02:18:16 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-28 00:18:16 +0300 |
commit | ce94a364f46bc962c4ff261a631ccea8f717ce4d (patch) | |
tree | 2c9a233c5d7c348d2fb7fced1dfcc1ec3be4ee63 | |
parent | e8002d3a737240cf8751a577f634e7b73b68ada1 (diff) | |
download | ydb-ce94a364f46bc962c4ff261a631ccea8f717ce4d.tar.gz |
Alter transfer from a topic to a table (#15024)
15 files changed, 205 insertions, 41 deletions
diff --git a/ydb/core/tx/replication/controller/dst_alterer.cpp b/ydb/core/tx/replication/controller/dst_alterer.cpp index 7a197f0c80..1bdfb49923 100644 --- a/ydb/core/tx/replication/controller/dst_alterer.cpp +++ b/ydb/core/tx/replication/controller/dst_alterer.cpp @@ -43,12 +43,13 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> { switch (Kind) { case TReplication::ETargetKind::Table: case TReplication::ETargetKind::IndexTable: - case TReplication::ETargetKind::Transfer: tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); DstPathId.ToProto(tx.MutableAlterTable()->MutablePathId()); tx.MutableAlterTable()->MutableReplicationConfig()->SetMode( NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE); break; + case TReplication::ETargetKind::Transfer: + break; } Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true)); @@ -153,7 +154,13 @@ public: if (!DstPathId) { Success(); } else { - AllocateTxId(); + switch (Kind) { + case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: + return AllocateTxId(); + case TReplication::ETargetKind::Transfer: + return Success(); + } } } diff --git a/ydb/core/tx/replication/controller/event_util.cpp b/ydb/core/tx/replication/controller/event_util.cpp index 4b9d766b36..4882593286 100644 --- a/ydb/core/tx/replication/controller/event_util.cpp +++ b/ydb/core/tx/replication/controller/event_util.cpp @@ -51,7 +51,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv( break; } case TReplication::ETargetKind::Transfer: { - auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(config); + auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(config); auto& writerSettings = *record.MutableCommand()->MutableTransferWriter(); dstPathId.ToProto(writerSettings.MutablePathId()); writerSettings.SetTransformLambda(p->GetTransformLambda()); diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index 2e235267a6..b0417987f8 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -228,6 +228,7 @@ private: NKikimrReplication::TReplicationConfig Config; EState State = EState::Ready; TString Issue; + EState DesiredState = EState::Ready; ui64 NextTargetId = 1; THashMap<ui64, TTarget> Targets; THashSet<ui64> PendingAlterTargets; @@ -329,6 +330,14 @@ const TString& TReplication::GetIssue() const { return Impl->Issue; } +TReplication::EState TReplication::GetDesiredState() const { + return Impl->DesiredState; +} + +void TReplication::SetDesiredState(EState state) { + Impl->DesiredState = state; +} + void TReplication::SetNextTargetId(ui64 value) { Impl->NextTargetId = value; } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 22252c82e1..c95222d382 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -56,7 +56,7 @@ public: class ITarget { public: struct IConfig { - using TPtr = std::shared_ptr<IConfig>; + using TPtr = std::shared_ptr<const IConfig>; virtual ~IConfig() = default; @@ -98,6 +98,8 @@ public: virtual void Progress(const TActorContext& ctx) = 0; virtual void Shutdown(const TActorContext& ctx) = 0; + virtual void UpdateConfig(const NKikimrReplication::TReplicationConfig&) = 0; + protected: virtual IActor* CreateWorkerRegistar(const TActorContext& ctx) const = 0; }; @@ -135,6 +137,8 @@ public: const NKikimrReplication::TReplicationConfig& GetConfig() const; void SetState(EState state, TString issue = {}); EState GetState() const; + EState GetDesiredState() const; + void SetDesiredState(EState state); const TString& GetIssue() const; const TMaybe<TDuration> GetLag() const; diff --git a/ydb/core/tx/replication/controller/schema.h b/ydb/core/tx/replication/controller/schema.h index 861e7e61a0..b30ab4a839 100644 --- a/ydb/core/tx/replication/controller/schema.h +++ b/ydb/core/tx/replication/controller/schema.h @@ -25,9 +25,10 @@ struct TControllerSchema: NIceDb::Schema { struct State: Column<5, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; }; struct Issue: Column<6, NScheme::NTypeIds::Utf8> {}; struct NextTargetId: Column<7, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 1; }; + struct DesiredState: Column<8, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; }; using TKey = TableKey<Id>; - using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId>; + using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId, DesiredState>; }; struct Targets: Table<3> { diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 881e033ac5..59ff79a38b 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -73,6 +73,9 @@ void TTargetBase::SetDstState(const EDstState value) { return Replication->AddPendingAlterTarget(Id); case EDstState::Done: return Replication->RemovePendingAlterTarget(Id); + case EDstState::Ready: + PendingRemoveWorkers = false; + break; default: break; } @@ -196,4 +199,7 @@ void TTargetBase::Shutdown(const TActorContext& ctx) { } } +void TTargetBase::UpdateConfig(const NKikimrReplication::TReplicationConfig&) { +} + } diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index c627afc410..f3a474752b 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -71,11 +71,12 @@ public: void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; + void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override; + private: TReplication* const Replication; const ui64 Id; const ETargetKind Kind; - const IConfig::TPtr Config; EDstState DstState = EDstState::Creating; TPathId DstPathId; @@ -90,6 +91,9 @@ private: THashMap<ui64, TWorker> Workers; bool PendingRemoveWorkers = false; +protected: + IConfig::TPtr Config; + }; // TTargetBase } diff --git a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp index 025941ba02..72e4a43c32 100644 --- a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp +++ b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp @@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(TargetDiscoverer) { UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetSrcPath(), "/Root/Topic"); UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetDstPath(), "/Root/Replicated/Table"); UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Kind, TReplication::ETargetKind::Transfer); - auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(toAdd.at(0).Config); + auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(toAdd.at(0).Config); UNIT_ASSERT(p); UNIT_ASSERT_VALUES_EQUAL(p->GetTransformLambda(), "lambda body"); } diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp index b9c4a773c1..e60b71693e 100644 --- a/ydb/core/tx/replication/controller/target_table.cpp +++ b/ydb/core/tx/replication/controller/target_table.cpp @@ -141,6 +141,14 @@ TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConf { } +void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) { + auto& t = cfg.GetTransferSpecific().GetTargets(0); + Config = std::make_shared<TTargetTransfer::TTransferConfig>( + GetConfig()->GetSrcPath(), + GetConfig()->GetDstPath(), + t.GetTransformLambda()); +} + TString TTargetTransfer::BuildStreamPath() const { return CanonizePath(GetSrcPath()); } diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h index 1ca7aa1ca0..6b43fd2fc1 100644 --- a/ydb/core/tx/replication/controller/target_table.h +++ b/ydb/core/tx/replication/controller/target_table.h @@ -66,6 +66,7 @@ public: explicit TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config); + void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override; protected: TString BuildStreamPath() const override; diff --git a/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp b/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp index 940b6ba90b..4154b62792 100644 --- a/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp +++ b/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp @@ -4,6 +4,7 @@ namespace NKikimr::NReplication::NController { class TController::TTxAlterDstResult: public TTxBase { TEvPrivate::TEvAlterDstResult::TPtr Ev; + TReplication::TPtr Replication; public: explicit TTxAlterDstResult(TController* self, TEvPrivate::TEvAlterDstResult::TPtr& ev) @@ -22,14 +23,14 @@ public: const auto rid = Ev->Get()->ReplicationId; const auto tid = Ev->Get()->TargetId; - auto replication = Self->Find(rid); - if (!replication) { + Replication = Self->Find(rid); + if (!Replication) { CLOG_W(ctx, "Unknown replication" << ": rid# " << rid); return true; } - auto* target = replication->FindTarget(tid); + auto* target = Replication->FindTarget(tid); if (!target) { CLOG_W(ctx, "Unknown target" << ": rid# " << rid @@ -46,16 +47,20 @@ public: } if (Ev->Get()->IsSuccess()) { - target->SetDstState(TReplication::EDstState::Done); + target->SetDstState(NextState(Replication->GetDesiredState())); + target->UpdateConfig(Replication->GetConfig()); CLOG_N(ctx, "Target dst altered" << ": rid# " << rid << ", tid# " << tid); - if (replication->CheckAlterDone()) { + if (Replication->CheckAlterDone()) { CLOG_N(ctx, "Replication altered" << ": rid# " << rid); - replication->SetState(TReplication::EState::Done); + Replication->SetState(Replication->GetDesiredState()); + if (Replication->GetState() != TReplication::EState::Ready) { + Replication.Reset(); + } } } else { target->SetDstState(TReplication::EDstState::Error); @@ -63,7 +68,7 @@ public: << ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status) << ", " << Ev->Get()->Error); - replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId() + Replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId() << ": " << target->GetIssue()); CLOG_E(ctx, "Alter dst error" @@ -75,8 +80,8 @@ public: NIceDb::TNiceDb db(txc.DB); db.Table<Schema::Replications>().Key(rid).Update( - NIceDb::TUpdate<Schema::Replications::State>(replication->GetState()), - NIceDb::TUpdate<Schema::Replications::Issue>(replication->GetIssue()) + NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()), + NIceDb::TUpdate<Schema::Replications::Issue>(Replication->GetIssue()) ); db.Table<Schema::Targets>().Key(rid, tid).Update( NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()), @@ -86,8 +91,25 @@ public: return true; } + TReplication::EDstState NextState(TReplication::EState state) { + switch (state) { + case TReplication::EState::Done: + return TReplication::EDstState::Done; + case TReplication::EState::Ready: + return TReplication::EDstState::Ready; + case TReplication::EState::Error: + return TReplication::EDstState::Error; + case TReplication::EState::Removing: + return TReplication::EDstState::Removing; + } + } + void Complete(const TActorContext& ctx) override { CLOG_D(ctx, "Complete"); + + if (Replication) { + Replication->Progress(ctx); + } } }; // TTxAlterDstResult diff --git a/ydb/core/tx/replication/controller/tx_alter_replication.cpp b/ydb/core/tx/replication/controller/tx_alter_replication.cpp index bf50c9548e..d1688184c0 100644 --- a/ydb/core/tx/replication/controller/tx_alter_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_alter_replication.cpp @@ -37,27 +37,48 @@ public: return true; } + bool alter = false; + + const auto& oldConfig = Replication->GetConfig(); + const auto& newConfig = record.GetConfig(); + + if (oldConfig.HasTransferSpecific()) { + auto& oldLambda = oldConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda(); + auto& newLambda = newConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda(); + + alter = oldLambda != newLambda; + } + + auto desiredState = Replication->GetState(); + if (record.HasSwitchState()) { + switch (record.GetSwitchState().GetStateCase()) { + case NKikimrReplication::TReplicationState::kDone: + desiredState = TReplication::EState::Done; + alter = true; + break; + default: + Y_ABORT("Invalid state"); + } + } + + if (alter) { + Replication->SetDesiredState(desiredState); + } + Replication->SetConfig(std::move(*record.MutableConfig())); NIceDb::TNiceDb db(txc.DB); db.Table<Schema::Replications>().Key(Replication->GetId()).Update( - NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString()) + NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString()), + NIceDb::TUpdate<Schema::Replications::DesiredState>(desiredState) ); - if (!record.HasSwitchState()) { + if (!alter) { Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS); return true; } - switch (record.GetSwitchState().GetStateCase()) { - case NKikimrReplication::TReplicationState::kDone: - break; - default: - Y_ABORT("Invalid state"); - } - Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS); - bool alter = false; for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) { auto* target = Replication->FindTarget(tid); if (!target) { diff --git a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp index 8649dc0610..cd247e2b5d 100644 --- a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp +++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp @@ -47,7 +47,7 @@ public: const auto tid = Replication->AddTarget(target.Kind, target.Config); TString transformLambda; - if (auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(target.Config)) { + if (auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(target.Config)) { transformLambda = p->GetTransformLambda(); } diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp index af05d350a1..99db3921c9 100644 --- a/ydb/core/tx/replication/controller/tx_init.cpp +++ b/ydb/core/tx/replication/controller/tx_init.cpp @@ -53,10 +53,12 @@ class TController::TTxInit: public TTxBase { const auto state = rowset.GetValue<Schema::Replications::State>(); const auto issue = rowset.GetValue<Schema::Replications::Issue>(); const auto nextTid = rowset.GetValue<Schema::Replications::NextTargetId>(); + const auto desiredState = rowset.GetValue<Schema::Replications::DesiredState>(); auto replication = Self->Add(rid, pathId, config); replication->SetState(state, issue); replication->SetNextTargetId(nextTid); + replication->SetDesiredState(desiredState); if (!rowset.Next()) { return false; diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index 6a96711a1a..b24dc77ed9 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -114,10 +114,11 @@ TMessage _withMessageGroupId(const TString& messageGroupId) { } struct TConfig { - const char* TableDDL; - const char* Lambda; + TString TableDDL; + const TString Lambda; const TVector<TMessage> Messages; TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations; + const TVector<TString> AlterLambdas; }; @@ -142,7 +143,7 @@ struct MainTestCase { auto topicClient = TTopicClient(driver); { - auto tableDDL = Sprintf(Config.TableDDL, TableName.data()); + auto tableDDL = Sprintf(Config.TableDDL.data(), TableName.data()); auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -157,18 +158,39 @@ struct MainTestCase { UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } - { - auto res = session.ExecuteQuery(Sprintf(R"( - %s; + TVector<TString> lambdas; + lambdas.insert(lambdas.end(), Config.AlterLambdas.begin(), Config.AlterLambdas.end()); + lambdas.push_back(Config.Lambda); + + for (size_t i = 0; i < lambdas.size(); ++i) { + auto lambda = lambdas[i]; + if (!i) { + auto res = session.ExecuteQuery(Sprintf(R"( + %s; + + CREATE TRANSFER `%s` + FROM `%s` TO `%s` USING $l + WITH ( + CONNECTION_STRING = 'grpc://%s' + -- , TOKEN = 'user@builtin' + ); + )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } else { + Sleep(TDuration::Seconds(1)); - CREATE TRANSFER `%s` - FROM `%s` TO `%s` USING $l - WITH ( - CONNECTION_STRING = 'grpc://%s' - -- , TOKEN = 'user@builtin' - ); - )", Config.Lambda, TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + auto res = session.ExecuteQuery(Sprintf(R"( + %s; + + ALTER TRANSFER `%s` + SET USING $l; + )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + + if (i == lambdas.size() - 1) { + Sleep(TDuration::Seconds(1)); + } + } } { @@ -665,5 +687,62 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(AlterLambda) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data || " new lambda" AS Utf8) + |> + ]; + }; + )", + + .Messages = {{"Message-1"}}, + + .Expectations = { + _C("Message", TString("Message-1 new lambda")), + }, + + .AlterLambdas = { + R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data || " 1 lambda" AS Utf8) + |> + ]; + }; + )", + R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data || " 2 lambda" AS Utf8) + |> + ]; + }; + )", + } + + }).Run(); + } + + } |