diff options
author | Alexey Uzhegov <auzhegov@ydb.tech> | 2024-01-31 17:27:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 16:27:20 +0200 |
commit | bf52c5de107689f1f7e1a2bb575fd3e6599adaad (patch) | |
tree | 540e4dd3d098057492c4df1b8813458ff7c5a541 | |
parent | 52be5dbdd420165c68e7e90ba8f1d2f00da041f6 (diff) | |
download | ydb-24.1.1.tar.gz |
[YQ-1997] Support for ReplaceIfExists flag in SchemeShard for External Data Source and External Table (#1431)24.1.1
18 files changed, 1018 insertions, 18 deletions
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 52d1cf02c4..dcb82a099d 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -128,4 +128,5 @@ message TFeatureFlags { optional bool EnableServerlessExclusiveDynamicNodes = 113 [default = false]; optional bool EnableAccessServiceBulkAuthorization = 114 [default = false]; optional bool EnableAddColumsWithDefaults = 115 [ default = false]; + optional bool EnableReplaceIfExistsForExternalEntities = 116 [ default = false]; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 22086cb955..5e82db9e8f 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -57,6 +57,7 @@ public: FEATURE_FLAG_SETTER(EnableServerlessExclusiveDynamicNodes) FEATURE_FLAG_SETTER(EnableAccessServiceBulkAuthorization) FEATURE_FLAG_SETTER(EnableAddColumsWithDefaults) + FEATURE_FLAG_SETTER(EnableReplaceIfExistsForExternalEntities) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 33a4df1979..6767716210 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1062,14 +1062,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: case TTxState::ETxType::TxDropExternalTable: return CreateDropExternalTable(NextPartId(), txState); case TTxState::ETxType::TxAlterExternalTable: - Y_ABORT("TODO: implement"); + return CreateAlterExternalTable(NextPartId(), txState); case TTxState::ETxType::TxCreateExternalDataSource: return CreateNewExternalDataSource(NextPartId(), txState); case TTxState::ETxType::TxDropExternalDataSource: return CreateDropExternalDataSource(NextPartId(), txState); case TTxState::ETxType::TxAlterExternalDataSource: - Y_ABORT("TODO: implement"); - + return CreateAlterExternalDataSource(NextPartId(), txState); + // View case TTxState::ETxType::TxCreateView: return CreateNewView(NextPartId(), txState); @@ -1282,7 +1282,7 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op // ExternalTable case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: - return CreateNewExternalTable(NextPartId(), tx); + Y_ABORT("operation is handled before"); case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable: return CreateDropExternalTable(NextPartId(), tx); case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable: @@ -1290,7 +1290,7 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op // ExternalDataSource case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: - return CreateNewExternalDataSource(NextPartId(), tx); + Y_ABORT("operation is handled before"); case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource: return CreateDropExternalDataSource(NextPartId(), tx); case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: @@ -1351,6 +1351,10 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx return CreateConsistentMoveIndex(NextPartId(), tx, context); case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain: return CreateCompatibleAlterExtSubDomain(NextPartId(), tx, context); + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + return CreateNewExternalDataSource(NextPartId(), tx, context); + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: + return CreateNewExternalTable(NextPartId(), tx, context); default: return {ConstructPart(opType, tx)}; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp new file mode 100644 index 0000000000..b7c8cb88e8 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp @@ -0,0 +1,286 @@ +#include "schemeshard__operation_common_external_data_source.h" +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include <utility> + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TPropose: public TSubOperationState { +private: + const TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TAlterExternalDataSource TPropose" + << ", operationId: " << OperationId; + } + +public: + explicit TPropose(TOperationId id) + : OperationId(std::move(id)) + { + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + const TStepId step = TStepId(ev->Get()->StepId); + + LOG_I(DebugHint() << "HandleReply TEvOperationPlan" + << ": step# " << step); + + const TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource); + + const auto pathId = txState->TargetPathId; + const auto path = TPath::Init(pathId, context.SS); + const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + + NIceDb::TNiceDb db(context.GetDB()); + + IncParentDirAlterVersionWithRepublish(OperationId, path, context); + + context.SS->ClearDescribePathCaches(pathPtr); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + LOG_I(DebugHint() << "ProgressState"); + + const TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalDataSource); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + +class TAlterExternalDataSource : public TSubOperation { + static TTxState::ETxState NextState() { return TTxState::Propose; } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return MakeHolder<TPropose>(OperationId); + case TTxState::Done: + return MakeHolder<TDone>(OperationId); + default: + return nullptr; + } + } + + static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, + const TPath& dstPath, + const TString& acl) { + const auto checks = dstPath.Check(); + checks.IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .FailOnWrongType(TPathElement::EPathType::EPathTypeExternalDataSource) + .IsValidLeafName() + .DepthLimit() + .PathsLimit() + .DirChildrenLimit() + .IsValidACL(acl); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId)); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + } + + return static_cast<bool>(checks); + } + + bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, + const TOperationContext& context) const { + TString errorMessage; + if (!context.SS->CheckApplyIf(Transaction, errorMessage)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage); + return false; + } + return true; + } + + static bool IsDescriptionValid( + const THolder<TProposeResponse>& result, + const NKikimrSchemeOp::TExternalDataSourceDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory) { + TString errorMessage; + if (!NExternalDataSource::Validate(desc, factory, errorMessage)) { + result->SetError(NKikimrScheme::StatusSchemeError, errorMessage); + return false; + } + return true; + } + + static void AddPathInSchemeShard( + const THolder<TProposeResponse>& result, const TPath& dstPath) { + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + + TPathElement::TPtr ReplaceExternalDataSourcePathElement(const TPath& dstPath) const { + TPathElement::TPtr externalDataSource = dstPath.Base(); + + externalDataSource->PathState = TPathElement::EPathState::EPathStateAlter; + externalDataSource->LastTxId = OperationId.GetTxId(); + + return externalDataSource; + } + + void CreateTransaction(const TOperationContext& context, + const TPathId& externalDataSourcePathId) const { + TTxState& txState = context.SS->CreateTx(OperationId, + TTxState::TxAlterExternalDataSource, + externalDataSourcePathId); + txState.Shards.clear(); + } + + void RegisterParentPathDependencies(const TOperationContext& context, + const TPath& parentPath) const { + if (parentPath.Base()->HasActiveChanges()) { + const TTxId parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId + : parentPath.Base()->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + } + + void AdvanceTransactionStateToPropose(const TOperationContext& context, + NIceDb::TNiceDb& db) const { + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + } + + void PersistExternalDataSource( + const TOperationContext& context, + NIceDb::TNiceDb& db, + const TPathElement::TPtr& externalDataSourcePath, + const TExternalDataSourceInfo::TPtr& externalDataSourceInfo, + const TString& acl) const { + const auto& externalDataSourcePathId = externalDataSourcePath->PathId; + + context.SS->ExternalDataSources[externalDataSourcePathId] = externalDataSourceInfo; + context.SS->PersistPath(db, externalDataSourcePathId); + + if (!acl.empty()) { + externalDataSourcePath->ApplyACL(acl); + context.SS->PersistACL(db, externalDataSourcePath); + } + + context.SS->PersistExternalDataSource(db, + externalDataSourcePathId, + externalDataSourceInfo); + context.SS->PersistTxState(db, OperationId); + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString& owner, + TOperationContext& context) override { + Y_UNUSED(owner); + const auto ssId = context.SS->SelfTabletId(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + const auto& externalDataSourceDescription = + Transaction.GetCreateExternalDataSource(); + const TString& name = externalDataSourceDescription.GetName(); + + LOG_N("TAlterExternalDataSource Propose" + << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, + static_cast<ui64>(OperationId.GetTxId()), + static_cast<ui64>(ssId)); + + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath)); + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + const TPath dstPath = parentPath.Child(name); + + RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl)); + RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context)); + RETURN_RESULT_UNLESS(IsDescriptionValid(result, + externalDataSourceDescription, + context.SS->ExternalSourceFactory)); + + const auto oldExternalDataSourceInfo = + context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr); + Y_ABORT_UNLESS(oldExternalDataSourceInfo); + const TExternalDataSourceInfo::TPtr externalDataSourceInfo = + NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription, + oldExternalDataSourceInfo->AlterVersion + 1); + Y_ABORT_UNLESS(externalDataSourceInfo); + + AddPathInSchemeShard(result, dstPath); + const TPathElement::TPtr externalDataSource = + ReplaceExternalDataSourcePathElement(dstPath); + CreateTransaction(context, externalDataSource->PathId); + + NIceDb::TNiceDb db(context.GetDB()); + + RegisterParentPathDependencies(context, parentPath); + + AdvanceTransactionStateToPropose(context, db); + + PersistExternalDataSource( + context, db, externalDataSource, externalDataSourceInfo, acl); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, + dstPath, + context.SS, + context.OnComplete); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TAlterExternalDataSource AbortPropose" + << ": opId# " << OperationId); + Y_ABORT("no AbortPropose for TAlterExternalDataSource"); + } + + void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { + LOG_N("TAlterExternalDataSource AbortUnsafe" + << ": opId# " << OperationId << ", txId# " << forceDropTxId); + context.OnComplete.DoneOperation(OperationId); + } +}; + +} // namespace + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, const TTxTransaction& tx) { + return MakeSubOperation<TAlterExternalDataSource>(id, tx); +} + +ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, TTxState::ETxState state) { + Y_ABORT_UNLESS(state != TTxState::Invalid); + return MakeSubOperation<TAlterExternalDataSource>(id, state); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_table.cpp new file mode 100644 index 0000000000..5b52536ed0 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_table.cpp @@ -0,0 +1,421 @@ +#include "schemeshard__operation_common_external_table.h" +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include <utility> + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TPropose: public TSubOperationState { +private: + const TOperationId OperationId; + bool IsSameDataSource = false; + TPathId OldSourcePathId = InvalidPathId; + + TString DebugHint() const override { + return TStringBuilder() + << "TAlterExternalTable TPropose" + << ", operationId: " << OperationId; + } + + void ClearDescribePathCaches(const TOperationContext& context, + const TPathElement::TPtr& pathPtr, + const TPathElement::TPtr& dataSourcePathPtr, + const TPathElement::TPtr& oldDataSourcePathPtr) const { + context.SS->ClearDescribePathCaches(pathPtr); + if (!IsSameDataSource) { + context.SS->ClearDescribePathCaches(dataSourcePathPtr); + context.SS->ClearDescribePathCaches(oldDataSourcePathPtr); + } + } + + void PublishToSchemeBoard(const TOperationContext& context, + const TPathId& pathId, + const TPathId& dataSourcePathId) const { + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + if (!IsSameDataSource) { + context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId); + context.OnComplete.PublishToSchemeBoard(OperationId, OldSourcePathId); + } + } + +public: + explicit TPropose(TOperationId id, + bool isSameDataSource, + const TPathId oldSourcePathId) + : OperationId(std::move(id)) + , IsSameDataSource(isSameDataSource) + , OldSourcePathId(oldSourcePathId) { } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + const TStepId step = TStepId(ev->Get()->StepId); + + LOG_I(DebugHint() << " HandleReply TEvOperationPlan" + << ": step# " << step); + + const TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalTable); + + const auto pathId = txState->TargetPathId; + const auto dataSourcePathId = txState->SourcePathId; + const auto path = TPath::Init(pathId, context.SS); + const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + const TPathElement::TPtr dataSourcePathPtr = + context.SS->PathsById.at(dataSourcePathId); + TPathElement::TPtr oldDataSourcePathPtr; + if (!IsSameDataSource) { + oldDataSourcePathPtr = context.SS->PathsById.at(OldSourcePathId); + } + + NIceDb::TNiceDb db(context.GetDB()); + + IncParentDirAlterVersionWithRepublish(OperationId, path, context); + + ClearDescribePathCaches(context, pathPtr, dataSourcePathPtr, oldDataSourcePathPtr); + PublishToSchemeBoard(context, pathId, dataSourcePathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + LOG_I(DebugHint() << " ProgressState"); + + const TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxAlterExternalTable); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + + +class TAlterExternalTable: public TSubOperation { +private: + bool IsSameDataSource = true; + TPathId OldDataSourcePathId = InvalidPathId; + + static TTxState::ETxState NextState() { + return TTxState::Propose; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return MakeHolder<TPropose>(OperationId, IsSameDataSource, OldDataSourcePathId); + case TTxState::Done: + return MakeHolder<TDone>(OperationId); + default: + return nullptr; + } + } + + static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, + const TPath& dstPath, + const TString& acl) { + const auto checks = dstPath.Check(); + checks.IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .FailOnWrongType(TPathElement::EPathType::EPathTypeExternalTable) + .IsValidLeafName() + .DepthLimit() + .PathsLimit() + .DirChildrenLimit() + .IsValidACL(acl); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId)); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + } + + return static_cast<bool>(checks); + } + + static bool IsDataSourcePathValid(const THolder<TProposeResponse>& result, const TPath& dataSourcePath) { + const auto checks = dataSourcePath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsExternalDataSource() + .NotUnderOperation(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + } + + return static_cast<bool>(checks); + } + + bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, + const TOperationContext& context) const { + TString errorMessage; + if (!context.SS->CheckApplyIf(Transaction, errorMessage)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage); + return false; + } + + return true; + } + + static bool IsDataSourceValid(const THolder<TProposeResponse>& result, + const TExternalDataSourceInfo::TPtr& externalDataSource) { + if (!externalDataSource) { + result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist"); + return false; + } + return true; + } + + static bool IsExternalTableDescriptionValid( + const THolder<TProposeResponse>& result, + const TString& sourceType, + const NKikimrSchemeOp::TExternalTableDescription& desc) { + if (TString errorMessage; !NExternalTable::Validate(sourceType, desc, errorMessage)) { + result->SetError(NKikimrScheme::StatusSchemeError, errorMessage); + return false; + } + + return true; + } + + static void AddPathInSchemeShard(const THolder<TProposeResponse>& result, + TPath& dstPath) { + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + + TPathElement::TPtr ReplaceExternalTablePathElement(const TPath& dstPath) const { + TPathElement::TPtr externalTable = dstPath.Base(); + externalTable->PathState = TPathElement::EPathState::EPathStateAlter; + externalTable->LastTxId = OperationId.GetTxId(); + + return externalTable; + } + + void CreateTransaction(const TOperationContext& context, + const TPathId& externalTablePathId, + const TPathId& externalDataSourcePathId) const { + TTxState& txState = context.SS->CreateTx(OperationId, + TTxState::TxAlterExternalTable, + externalTablePathId, + externalDataSourcePathId); + txState.Shards.clear(); + } + + void RegisterParentPathDependencies(const TOperationContext& context, + const TPath& parentPath) const { + if (parentPath.Base()->HasActiveChanges()) { + const auto parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId + : parentPath.Base()->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + } + + void AdvanceTransactionStateToPropose(const TOperationContext& context, + NIceDb::TNiceDb& db) const { + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + } + + static void LinkExternalDataSourceWithExternalTable( + const TExternalDataSourceInfo::TPtr& externalDataSource, + const TPathElement::TPtr& externalTable, + const TPath& dstPath, + const TExternalDataSourceInfo::TPtr& oldDataSource, + bool isSameDataSource) { + + if (!isSameDataSource) { + auto& reference = *externalDataSource->ExternalTableReferences.AddReferences(); + reference.SetPath(dstPath.PathString()); + PathIdFromPathId(externalTable->PathId, reference.MutablePathId()); + + EraseIf(*oldDataSource->ExternalTableReferences.MutableReferences(), + [pathId = externalTable->PathId]( + const NKikimrSchemeOp::TExternalTableReferences::TReference& reference) { + return PathIdFromPathId(reference.GetPathId()) == pathId; + }); + } + } + + void PersistExternalTable( + const TOperationContext& context, + NIceDb::TNiceDb& db, + const TPathElement::TPtr& externalTable, + const TExternalTableInfo::TPtr& externalTableInfo, + const TPathId& externalDataSourcePathId, + const TExternalDataSourceInfo::TPtr& externalDataSource, + const TPathId& oldExternalDataSourcePathId, + const TExternalDataSourceInfo::TPtr& oldExternalDataSource, + const TString& acl, + bool isSameDataSource) const { + context.SS->ExternalTables[externalTable->PathId] = externalTableInfo; + + context.SS->PersistPath(db, externalTable->PathId); + + if (!acl.empty()) { + externalTable->ApplyACL(acl); + context.SS->PersistACL(db, externalTable); + } + + if (!isSameDataSource) { + context.SS->PersistExternalDataSource(db, externalDataSourcePathId, externalDataSource); + context.SS->PersistExternalDataSource(db, oldExternalDataSourcePathId, oldExternalDataSource); + } + context.SS->PersistExternalTable(db, externalTable->PathId, externalTableInfo); + context.SS->PersistTxState(db, OperationId); + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { + Y_UNUSED(owner); + const auto ssId = context.SS->SelfTabletId(); + + const TString& parentPathStr = Transaction.GetWorkingDir(); + const auto& externalTableDescription = Transaction.GetCreateExternalTable(); + const TString& name = externalTableDescription.GetName(); + + LOG_N("TAlterExternalTable Propose" + << ": opId# " << OperationId + << ", path# " << parentPathStr << "/" << name << ", ReplaceIfExists:" << externalTableDescription.GetReplaceIfExists()); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, + static_cast<ui64>(OperationId.GetTxId()), + static_cast<ui64>(ssId)); + + const auto parentPath = TPath::Resolve(parentPathStr, context.SS); + RETURN_RESULT_UNLESS(NExternalTable::IsParentPathValid(result, parentPath)); + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + TPath dstPath = parentPath.Child(name); + RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl)); + + const auto dataSourcePath = + TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS); + RETURN_RESULT_UNLESS(IsDataSourcePathValid(result, dataSourcePath)); + + const auto externalDataSource = + context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr); + RETURN_RESULT_UNLESS(IsDataSourceValid(result, externalDataSource)); + + RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context)); + + RETURN_RESULT_UNLESS(IsExternalTableDescriptionValid(result, + externalDataSource->SourceType, + externalTableDescription)); + + /// Extract old data source + TExternalDataSourceInfo::TPtr oldDataSource; + { + const auto oldExternalTableRecord = context.SS->ExternalTables.Value(dstPath->PathId, nullptr); + Y_ABORT_UNLESS(oldExternalTableRecord); + const auto oldDataSourcePath = TPath::Resolve(oldExternalTableRecord->DataSourcePath, context.SS); + RETURN_RESULT_UNLESS(IsDataSourcePathValid(result, oldDataSourcePath)); + + OldDataSourcePathId = oldDataSourcePath->PathId; + IsSameDataSource = oldDataSourcePath.PathString() == dataSourcePath.PathString(); + if (!IsSameDataSource) { + oldDataSource = context.SS->ExternalDataSources.Value(oldDataSourcePath->PathId, nullptr); + Y_ABORT_UNLESS(oldDataSource); + } + } + /// Extract old data source end + + const auto oldExternalTableInfo = + context.SS->ExternalTables.Value(dstPath->PathId, nullptr); + Y_ABORT_UNLESS(oldExternalTableInfo); + auto [externalTableInfo, maybeError] = + NExternalTable::CreateExternalTable(externalDataSource->SourceType, + externalTableDescription, + context.SS->ExternalSourceFactory, + oldExternalTableInfo->AlterVersion + 1); + if (maybeError) { + result->SetError(NKikimrScheme::StatusSchemeError, *maybeError); + return result; + } + Y_ABORT_UNLESS(externalTableInfo); + + AddPathInSchemeShard(result, dstPath); + + const auto externalTable = ReplaceExternalTablePathElement(dstPath); + CreateTransaction(context, externalTable->PathId, dataSourcePath->PathId); + + NIceDb::TNiceDb db(context.GetDB()); + + RegisterParentPathDependencies(context, parentPath); + AdvanceTransactionStateToPropose(context, db); + + LinkExternalDataSourceWithExternalTable(externalDataSource, + externalTable, + dstPath, + oldDataSource, + IsSameDataSource); + + PersistExternalTable(context, db, externalTable, externalTableInfo, + dataSourcePath->PathId, externalDataSource, + OldDataSourcePathId, oldDataSource, acl, + IsSameDataSource); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, + dstPath, + context.SS, + context.OnComplete); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TAlterExternalTable AbortPropose" + << ": opId# " << OperationId); + Y_ABORT("no AbortPropose for TAlterExternalTable"); + } + + void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { + LOG_N("TAlterExternalTable AbortUnsafe" + << ": opId# " << OperationId + << ", txId# " << forceDropTxId); + context.OnComplete.DoneOperation(OperationId); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateAlterExternalTable(TOperationId id, const TTxTransaction& tx) { + return MakeSubOperation<TAlterExternalTable>(std::move(id), tx); +} + +ISubOperation::TPtr CreateAlterExternalTable(TOperationId id, TTxState::ETxState state) { + Y_ABORT_UNLESS(state != TTxState::Invalid); + return MakeSubOperation<TAlterExternalTable>(std::move(id), state); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp index 85780c4888..bebd242390 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp @@ -298,10 +298,50 @@ public: namespace NKikimr::NSchemeShard { -ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, const TTxTransaction& tx) { - return MakeSubOperation<TCreateExternalDataSource>(id, tx); -} +TVector<ISubOperation::TPtr> CreateNewExternalDataSource(TOperationId id, + const TTxTransaction& tx, + TOperationContext& context) { + Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateExternalDataSource); + + LOG_I("CreateNewExternalDataSource, opId " << id << ", feature flag EnableReplaceIfExistsForExternalEntities " + << context.SS->EnableReplaceIfExistsForExternalEntities << ", tx " + << tx.ShortDebugString()); + + auto errorResult = [&id](NKikimrScheme::EStatus status, const TStringBuf& msg) -> TVector<ISubOperation::TPtr> { + return {CreateReject(id, status, TStringBuilder() << "Invalid TCreateExternalDataSource request: " << msg)}; + }; + const auto &operation = tx.GetCreateExternalDataSource(); + const auto replaceIfExists = operation.GetReplaceIfExists(); + const TString &name = operation.GetName(); + + if (replaceIfExists && !context.SS->EnableReplaceIfExistsForExternalEntities) { + return errorResult(NKikimrScheme::StatusPreconditionFailed, "Unsupported: feature flag EnableReplaceIfExistsForExternalEntities is off"); + } + + const TString& parentPathStr = tx.GetWorkingDir(); + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + + { + const auto checks = NExternalDataSource::IsParentPathValid(parentPath); + if (!checks) { + return errorResult(checks.GetStatus(), checks.GetError()); + } + } + + if (replaceIfExists) { + const TPath dstPath = parentPath.Child(name); + const auto isAlreadyExists = + dstPath.Check() + .IsResolved() + .NotUnderDeleting(); + if (isAlreadyExists) { + return {CreateAlterExternalDataSource(id, tx)}; + } + } + + return {MakeSubOperation<TCreateExternalDataSource>(id, tx)}; +} ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, TTxState::ETxState state) { Y_ABORT_UNLESS(state != TTxState::Invalid); return MakeSubOperation<TCreateExternalDataSource>(id, state); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp index a82af9859b..702e8c6995 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp @@ -192,7 +192,7 @@ private: } static bool IsDataSourceValid(const THolder<TProposeResponse>& result, - const TExternalDataSourceInfo::TPtr& externalDataSource) { + const TExternalDataSourceInfo::TPtr& externalDataSource) { if (!externalDataSource) { result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist"); return false; @@ -393,8 +393,47 @@ public: namespace NKikimr::NSchemeShard { -ISubOperation::TPtr CreateNewExternalTable(TOperationId id, const TTxTransaction& tx) { - return MakeSubOperation<TCreateExternalTable>(id, tx); +TVector<ISubOperation::TPtr> CreateNewExternalTable(TOperationId id, const TTxTransaction& tx, TOperationContext& context) { + Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateExternalTable); + + LOG_I("CreateNewExternalTable, opId " << id << ", feature flag EnableReplaceIfExistsForExternalEntities " + << context.SS->EnableReplaceIfExistsForExternalEntities << ", tx " + << tx.ShortDebugString()); + + auto errorResult = [&id](NKikimrScheme::EStatus status, const TStringBuf& msg) -> TVector<ISubOperation::TPtr> { + return {CreateReject(id, status, TStringBuilder() << "Invalid TCreateExternalTable request: " << msg)}; + }; + + const auto &operation = tx.GetCreateExternalTable(); + const auto replaceIfExists = operation.GetReplaceIfExists(); + const TString &name = operation.GetName(); + + if (replaceIfExists && !context.SS->EnableReplaceIfExistsForExternalEntities) { + return errorResult(NKikimrScheme::StatusPreconditionFailed, "Unsupported: feature flag EnableReplaceIfExistsForExternalEntities is off"); + } + + const TString& parentPathStr = tx.GetWorkingDir(); + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + + { + const auto checks = NExternalTable::IsParentPathValid(parentPath); + if (!checks) { + return errorResult(checks.GetStatus(), checks.GetError()); + } + } + + if (replaceIfExists) { + const TPath dstPath = parentPath.Child(name); + const auto isAlreadyExists = + dstPath.Check() + .IsResolved() + .NotUnderDeleting(); + if (isAlreadyExists) { + return {CreateAlterExternalTable(id, tx)}; + } + } + + return {MakeSubOperation<TCreateExternalTable>(id, tx)}; } ISubOperation::TPtr CreateNewExternalTable(TOperationId id, TTxState::ETxState state) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 4c592f5790..0a9cf8b6b8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -369,16 +369,22 @@ ISubOperation::TPtr CreateUpdateMainTableOnIndexMove(TOperationId id, TTxState:: // External Table // Create -ISubOperation::TPtr CreateNewExternalTable(TOperationId id, const TTxTransaction& tx); +TVector<ISubOperation::TPtr> CreateNewExternalTable(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperation::TPtr CreateNewExternalTable(TOperationId id, TTxState::ETxState state); +// Alter +ISubOperation::TPtr CreateAlterExternalTable(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateAlterExternalTable(TOperationId id, TTxState::ETxState state); // Drop ISubOperation::TPtr CreateDropExternalTable(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropExternalTable(TOperationId id, TTxState::ETxState state); // External Data Source // Create -ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, const TTxTransaction& tx); +TVector<ISubOperation::TPtr> CreateNewExternalDataSource(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, TTxState::ETxState state); +// Alter +ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateAlterExternalDataSource(TOperationId id, TTxState::ETxState state); // Drop ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 7a31db7018..12c04ccfe5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -636,7 +636,7 @@ TAuditLogFragment MakeAuditLogFragment(const NKikimrSchemeOp::TModifyScheme& tx) auto [aclAdd, aclRemove] = ExtractACLChange(tx); auto [userAttrsAdd, userAttrsRemove] = ExtractUserAttrChange(tx); auto [loginUser, loginGroup, loginMember] = ExtractLoginChange(tx); - + return { .Operation = DefineUserOperationName(tx), .Paths = ExtractChangingPaths(tx), diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5498c8eee7..c55bca5942 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4322,6 +4322,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { EnableTablePgTypes = appData->FeatureFlags.GetEnableTablePgTypes(); EnableServerlessExclusiveDynamicNodes = appData->FeatureFlags.GetEnableServerlessExclusiveDynamicNodes(); EnableAddColumsWithDefaults = appData->FeatureFlags.GetEnableAddColumsWithDefaults(); + EnableReplaceIfExistsForExternalEntities = appData->FeatureFlags.GetEnableReplaceIfExistsForExternalEntities(); EnableTempTables = appData->FeatureFlags.GetEnableTempTables(); ConfigureCompactionQueues(appData->CompactionConfig, ctx); @@ -6846,6 +6847,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableServerlessExclusiveDynamicNodes = featureFlags.GetEnableServerlessExclusiveDynamicNodes(); EnableAddColumsWithDefaults = featureFlags.GetEnableAddColumsWithDefaults(); EnableTempTables = featureFlags.GetEnableTempTables(); + EnableReplaceIfExistsForExternalEntities = featureFlags.GetEnableReplaceIfExistsForExternalEntities(); } void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 870af7b9c6..2c1a2e6db0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -304,6 +304,7 @@ public: bool EnableTablePgTypes = false; bool EnableServerlessExclusiveDynamicNodes = false; bool EnableAddColumsWithDefaults = false; + bool EnableReplaceIfExistsForExternalEntities = false; bool EnableTempTables = false; TShardDeleter ShardDeleter; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index 78c87a86f8..cf5674c4ad 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -565,6 +565,37 @@ const TPath::TChecker& TPath::TChecker::IsTheSameDomain(const TPath& another, ES << ", another path: " << another.PathString()); } +const TPath::TChecker& TPath::TChecker::FailOnWrongType(const TSet<TPathElement::EPathType>& expectedTypes) const { + if (Failed) { + return *this; + } + + if (!Path.IsResolved()) { + return *this; + } + + if (Path.IsDeleted()) { + return *this; + } + + if (!expectedTypes.contains(Path.Base()->PathType)) { + return Fail(EStatus::StatusNameConflict, TStringBuilder() << "unexpected path type" + << " (" << BasicPathInfo(Path.Base()) << ")" + << ", expected types: " << JoinSeq(", ", expectedTypes)); + } + + if (!Path.Base()->IsCreateFinished()) { + return Fail(EStatus::StatusMultipleModifications, TStringBuilder() << "path exists but creating right now" + << " (" << BasicPathInfo(Path.Base()) << ")"); + } + + return *this; +} + +const TPath::TChecker& TPath::TChecker::FailOnWrongType(TPathElement::EPathType expectedType) const { + return FailOnWrongType(TSet<TPathElement::EPathType>{expectedType}); +} + const TPath::TChecker& TPath::TChecker::FailOnExist(const TSet<TPathElement::EPathType>& expectedTypes, bool acceptAlreadyExist) const { if (Failed) { return *this; @@ -585,7 +616,7 @@ const TPath::TChecker& TPath::TChecker::FailOnExist(const TSet<TPathElement::EPa } if (!Path.Base()->IsCreateFinished()) { - return Fail(EStatus::StatusMultipleModifications, TStringBuilder() << "path exist but creating right now" + return Fail(EStatus::StatusMultipleModifications, TStringBuilder() << "path exists but creating right now" << " (" << BasicPathInfo(Path.Base()) << ")"); } diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index 93257d943f..2087a853f0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -75,6 +75,8 @@ public: const TChecker& IsLikeDirectory(EStatus status = EStatus::StatusPathIsNotDirectory) const; const TChecker& IsDirectory(EStatus status = EStatus::StatusPathIsNotDirectory) const; const TChecker& IsTheSameDomain(const TPath& another, EStatus status = EStatus::StatusInvalidParameter) const; + const TChecker& FailOnWrongType(const TSet<TPathElement::EPathType>& expectedTypes) const; + const TChecker& FailOnWrongType(TPathElement::EPathType expectedType) const; const TChecker& FailOnExist(const TSet<TPathElement::EPathType>& expectedTypes, bool acceptAlreadyExist) const; const TChecker& FailOnExist(TPathElement::EPathType expectedType, bool acceptAlreadyExist) const; const TChecker& IsValidLeafName(EStatus status = EStatus::StatusSchemeError) const; diff --git a/ydb/core/tx/schemeshard/ut_external_data_source/ut_external_data_source.cpp b/ydb/core/tx/schemeshard/ut_external_data_source/ut_external_data_source.cpp index c4c6d3dad5..35dc0610e2 100644 --- a/ydb/core/tx/schemeshard/ut_external_data_source/ut_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/ut_external_data_source/ut_external_data_source.cpp @@ -403,4 +403,80 @@ Y_UNIT_TEST_SUITE(TExternalDataSourceTest) { TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathNotExist); } + + Y_UNIT_TEST(ReplaceExternalDataSourceIfNotExists) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableReplaceIfExistsForExternalEntities(true)); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + ReplaceIfExists: true + )",{NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, txId); + + { + auto describeResult = DescribePath(runtime, "/MyRoot/MyExternalDataSource"); + TestDescribeResult(describeResult, {NLs::PathExist}); + UNIT_ASSERT(describeResult.GetPathDescription().HasExternalDataSourceDescription()); + const auto& externalDataSourceDescription = describeResult.GetPathDescription().GetExternalDataSourceDescription(); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetName(), "MyExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetVersion(), 1); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetLocation(), "https://s3.cloud.net/my_bucket"); + UNIT_ASSERT_EQUAL(externalDataSourceDescription.GetAuth().identity_case(), NKikimrSchemeOp::TAuth::kNone); + } + + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_new_bucket" + Auth { + None { + } + } + ReplaceIfExists: true + )",{NKikimrScheme::StatusAccepted}); + env.TestWaitNotification(runtime, txId); + + { + auto describeResult = DescribePath(runtime, "/MyRoot/MyExternalDataSource"); + TestDescribeResult(describeResult, {NLs::PathExist}); + UNIT_ASSERT(describeResult.GetPathDescription().HasExternalDataSourceDescription()); + const auto& externalDataSourceDescription = describeResult.GetPathDescription().GetExternalDataSourceDescription(); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetName(), "MyExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetVersion(), 2); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetLocation(), "https://s3.cloud.net/my_new_bucket"); + UNIT_ASSERT_EQUAL(externalDataSourceDescription.GetAuth().identity_case(), NKikimrSchemeOp::TAuth::kNone); + } + } + + Y_UNIT_TEST(ReplaceExternalDataSourceIfNotExistsShouldFailIfFeatureFlagIsNotSet) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableReplaceIfExistsForExternalEntities(false)); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + ReplaceIfExists: true + )",{{NKikimrScheme::StatusPreconditionFailed, "Unsupported: feature flag EnableReplaceIfExistsForExternalEntities is off"}}); + + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/MyExternalDataSource", false, NLs::PathNotExist); + } } diff --git a/ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp b/ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp index 7c8921a281..bcb9809c55 100644 --- a/ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp +++ b/ydb/core/tx/schemeshard/ut_external_table/ut_external_table.cpp @@ -312,4 +312,90 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { Columns { Name: "Value" Type: "Utf8"} )", {{NKikimrScheme::StatusPathDoesNotExist, "Check failed: path: '/MyRoot/ExternalDataSource1'"}}); } + + Y_UNIT_TEST(ReplaceExternalTableIfNotExists) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableReplaceIfExistsForExternalEntities(true)); + ui64 txId = 100; + + CreateExternalDataSource(runtime, env, ++txId); + TestCreateExternalTable(runtime, ++txId, "/MyRoot", R"( + Name: "ExternalTable" + SourceType: "General" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + ReplaceIfExists: true + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, txId); + + { + auto describeResult = DescribePath(runtime, "/MyRoot/ExternalTable"); + TestDescribeResult(describeResult, {NLs::PathExist}); + UNIT_ASSERT(describeResult.GetPathDescription().HasExternalTableDescription()); + const auto& externalTableDescription = describeResult.GetPathDescription().GetExternalTableDescription(); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetName(), "ExternalTable"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetDataSourcePath(), "/MyRoot/ExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetLocation(), "/"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetVersion(), 1); + auto& columns = externalTableDescription.GetColumns(); + UNIT_ASSERT_VALUES_EQUAL(columns.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetName(), "key"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetType(), "Uint64"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetNotNull(), false); + } + + TestCreateExternalTable(runtime, ++txId, "/MyRoot", R"( + Name: "ExternalTable" + SourceType: "General" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/new_location" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + ReplaceIfExists: true + )", {NKikimrScheme::StatusAccepted}); + env.TestWaitNotification(runtime, txId); + + { + auto describeResult = DescribePath(runtime, "/MyRoot/ExternalTable"); + TestDescribeResult(describeResult, {NLs::PathExist}); + UNIT_ASSERT(describeResult.GetPathDescription().HasExternalTableDescription()); + const auto& externalTableDescription = describeResult.GetPathDescription().GetExternalTableDescription(); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetName(), "ExternalTable"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetDataSourcePath(), "/MyRoot/ExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetLocation(), "/new_location"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetVersion(), 2); + auto& columns = externalTableDescription.GetColumns(); + UNIT_ASSERT_VALUES_EQUAL(columns.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetName(), "key"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetType(), "Uint64"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(0).GetNotNull(), false); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(1).GetName(), "value"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(1).GetType(), "Uint64"); + UNIT_ASSERT_VALUES_EQUAL(columns.Get(1).GetNotNull(), false); + } + } + + Y_UNIT_TEST(ReplaceExternalTableIfNotExistsShouldFailIfFeatureFlagIsNotSet) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableReplaceIfExistsForExternalEntities(false)); + ui64 txId = 100; + + CreateExternalDataSource(runtime, env, ++txId); + TestCreateExternalTable(runtime, ++txId, "/MyRoot", R"( + Name: "ExternalTable" + SourceType: "General" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + ReplaceIfExists: true + )", {{NKikimrScheme::StatusPreconditionFailed, "Unsupported: feature flag EnableReplaceIfExistsForExternalEntities is off"}}); + + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathNotExist); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index f8eb235471..1247239013 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -537,6 +537,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableTablePgTypes(opts.EnableTablePgTypes_); app.SetEnableServerlessExclusiveDynamicNodes(opts.EnableServerlessExclusiveDynamicNodes_); app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_); + app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); @@ -826,8 +827,8 @@ std::function<NActors::IActor *(const NActors::TActorId &, NKikimr::TTabletStora } void NSchemeShardUT_Private::TTestEnv::TestServerlessComputeResourcesModeInHive(TTestActorRuntime& runtime, - const TString& path, NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode, ui64 hive) -{ + const TString& path, NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode, ui64 hive) +{ auto record = DescribePath(runtime, path); const auto& pathDescr = record.GetPathDescription(); const TSubDomainKey subdomainKey(pathDescr.GetDomainDescription().GetDomainKey()); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index d0ff68ece8..9c300ee098 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -59,6 +59,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional<bool>, EnableTablePgTypes, std::nullopt); OPTION(std::optional<bool>, EnableServerlessExclusiveDynamicNodes, std::nullopt); OPTION(std::optional<bool>, EnableAddColumsWithDefaults, std::nullopt); + OPTION(std::optional<bool>, EnableReplaceIfExistsForExternalEntities, std::nullopt); #undef OPTION }; @@ -116,7 +117,7 @@ namespace NSchemeShardUT_Private { void SimulateSleep(TTestActorRuntime& runtime, TDuration duration); - void TestServerlessComputeResourcesModeInHive(TTestActorRuntime& runtime, const TString& path, + void TestServerlessComputeResourcesModeInHive(TTestActorRuntime& runtime, const TString& path, NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode, ui64 hive = TTestTxConfig::Hive); diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index f1e2176a38..f049575c2c 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -82,6 +82,8 @@ SRCS( schemeshard__operation_memory_changes.cpp schemeshard__operation_db_changes.cpp schemeshard__operation_alter_bsv.cpp + schemeshard__operation_alter_external_data_source.cpp + schemeshard__operation_alter_external_table.cpp schemeshard__operation_alter_extsubdomain.cpp schemeshard__operation_alter_fs.cpp schemeshard__operation_alter_index.cpp |