diff options
author | Alexey Uzhegov <uzhegov37@yahoo.com> | 2024-01-30 16:49:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 15:49:49 +0200 |
commit | 9da224918cbef0fd8d3ee032ad41f8e18e631e60 (patch) | |
tree | f5a22099c66a2bde56ab0a7f89c9899ddd9be36c | |
parent | 3311d10b22651f987a52656368ee1d1050ccdacf (diff) | |
download | ydb-9da224918cbef0fd8d3ee032ad41f8e18e631e60.tar.gz |
[YQ-1997] Refactor create external data source / external table schemeshard operations (#1119)
7 files changed, 773 insertions, 467 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp new file mode 100644 index 0000000000..b9ce6755aa --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp @@ -0,0 +1,112 @@ +#include "schemeshard__operation_common_external_data_source.h" + +#include <utility> + +namespace NKikimr::NSchemeShard::NExternalDataSource { + +constexpr uint32_t MAX_FIELD_SIZE = 1000; +constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB + +bool ValidateLocationAndInstallation(const TString& location, + const TString& installation, + TString& errStr) { + if (location.Size() > MAX_FIELD_SIZE) { + errStr = + Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", + MAX_FIELD_SIZE, + location.Size()); + return false; + } + if (installation.Size() > MAX_FIELD_SIZE) { + errStr = Sprintf( + "Maximum length of installation must be less or equal equal to %u but got %lu", + MAX_FIELD_SIZE, + installation.Size()); + return false; + } + return true; +} + +bool CheckAuth(const TString& authMethod, + const TVector<TString>& availableAuthMethods, + TString& errStr) { + if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) { + errStr = TStringBuilder{} << authMethod << " isn't supported for this source type"; + return false; + } + + return true; +} + +bool ValidateProperties(const NKikimrSchemeOp::TExternalDataSourceProperties& properties, + TString& errStr) { + if (properties.ByteSizeLong() > MAX_PROTOBUF_SIZE) { + errStr = + Sprintf("Maximum size of properties must be less or equal equal to %u but got %lu", + MAX_PROTOBUF_SIZE, + properties.ByteSizeLong()); + return false; + } + return true; +} + +bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, + const NExternalSource::IExternalSource::TPtr& source, + TString& errStr) { + if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) { + errStr = Sprintf( + "Maximum size of authorization information must be less or equal equal to %u but got %lu", + MAX_PROTOBUF_SIZE, + auth.ByteSizeLong()); + return false; + } + const auto availableAuthMethods = source->GetAuthMethods(); + switch (auth.identity_case()) { + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: { + errStr = "Authorization method isn't specified"; + return false; + } + case NKikimrSchemeOp::TAuth::kServiceAccount: + return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kMdbBasic: + return CheckAuth("MDB_BASIC", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kBasic: + return CheckAuth("BASIC", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kAws: + return CheckAuth("AWS", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kNone: + return CheckAuth("NONE", availableAuthMethods, errStr); + } + return false; +} + +bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory, + TString& errStr) { + try { + const auto source = factory->GetOrCreate(desc.GetSourceType()); + source->ValidateExternalDataSource(desc.SerializeAsString()); + return ValidateLocationAndInstallation(desc.GetLocation(), + desc.GetInstallation(), + errStr) && + ValidateAuth(desc.GetAuth(), source, errStr) && + ValidateProperties(desc.GetProperties(), errStr); + } catch (...) { + errStr = CurrentExceptionMessage(); + return false; + } +} + +TExternalDataSourceInfo::TPtr CreateExternalDataSource( + const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion) { + TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo; + externalDataSoureInfo->SourceType = desc.GetSourceType(); + externalDataSoureInfo->Location = desc.GetLocation(); + externalDataSoureInfo->Installation = desc.GetInstallation(); + externalDataSoureInfo->AlterVersion = alterVersion; + externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth()); + externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties()); + return externalDataSoureInfo; +} + +} // namespace NKikimr::NSchemeShard::NExternalDataSource diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h new file mode 100644 index 0000000000..79f1217492 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h @@ -0,0 +1,47 @@ +#pragma once + +#include "schemeshard__operation_part.h" +#include "schemeshard_impl.h" + +#include <ydb/core/tablet_flat/test/libs/table/test_iter.h> + +#include <utility> + +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define RETURN_RESULT_UNLESS(x) if (!(x)) return result; + +namespace NKikimr::NSchemeShard::NExternalDataSource { + +inline TPath::TChecker IsParentPathValid(const TPath& parentPath) { + auto checks = parentPath.Check(); + checks.NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); + + return std::move(checks); +} + +inline bool IsParentPathValid(const THolder<TProposeResponse>& result, + const TPath& parentPath) { + const auto checks = IsParentPathValid(parentPath); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + } + + return static_cast<bool>(checks); +} + +bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory, + TString& errStr); + +TExternalDataSourceInfo::TPtr CreateExternalDataSource( + const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion); + +} // namespace NKikimr::NSchemeShard::NExternalDataSource diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp new file mode 100644 index 0000000000..8bed0aa2df --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp @@ -0,0 +1,169 @@ +#include "schemeshard__operation_common_external_table.h" + +#include <utility> + +namespace NKikimr::NSchemeShard::NExternalTable { + +constexpr uint32_t MAX_FIELD_SIZE = 1000; +constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB + +bool ValidateSourceType(const TString& sourceType, TString& errStr) { + // Only object storage supported today + if (sourceType != "ObjectStorage") { + errStr = "Only ObjectStorage source type supported but got " + sourceType; + return false; + } + return true; +} + +bool ValidateLocation(const TString& location, TString& errStr) { + if (!location) { + errStr = "Location must not be empty"; + return false; + } + if (location.Size() > MAX_FIELD_SIZE) { + errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); + return false; + } + return true; +} + +bool ValidateContent(const TString& content, TString& errStr) { + if (content.Size() > MAX_PROTOBUF_SIZE) { + errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size()); + return false; + } + return true; +} + +bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) { + if (!dataSourcePath) { + errStr = "Data source path must not be empty"; + return false; + } + return true; +} + +bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { + return ValidateSourceType(sourceType, errStr) + && ValidateLocation(desc.GetLocation(), errStr) + && ValidateContent(desc.GetContent(), errStr) + && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr); +} + +Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(typeInfo.GetTypeId())); + } + return ydbType; +} + +std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable( + const TString& sourceType, + const NKikimrSchemeOp::TExternalTableDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory, + ui64 alterVersion) { + TString errStr; + + if (!desc.ColumnsSize()) { + errStr = "The schema must have at least one column"; + return std::make_pair(nullptr, errStr); + } + + TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo; + const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + + if (desc.GetSourceType() != "General") { + errStr = "Only general data source has been supported as request"; + return std::make_pair(nullptr, errStr); + } + + externalTableInfo->DataSourcePath = desc.GetDataSourcePath(); + externalTableInfo->Location = desc.GetLocation(); + externalTableInfo->AlterVersion = alterVersion; + externalTableInfo->SourceType = sourceType; + + NKikimrExternalSources::TSchema schema; + uint64_t nextColumnId = 1; + for (const auto& col : desc.GetColumns()) { + TString colName = col.GetName(); + + if (!colName) { + errStr = "Columns cannot have an empty name"; + return std::make_pair(nullptr, errStr); + } + + if (col.HasTypeId()) { + errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; + return std::make_pair(nullptr, errStr); + } + + if (!col.HasType()) { + errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; + return std::make_pair(nullptr, errStr); + } + + auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType()); + const NScheme::IType* type = typeRegistry->GetType(typeName); + + NScheme::TTypeInfo typeInfo; + if (type) { + // Only allow YQL types + if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { + errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data()); + return std::make_pair(nullptr, errStr); + } + typeInfo = NScheme::TTypeInfo(type->GetTypeId()); + } else { + auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); + if (!typeDesc) { + errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data()); + return std::make_pair(nullptr, errStr); + } + typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); + } + + ui32 colId = col.HasId() ? col.GetId() : nextColumnId; + if (externalTableInfo->Columns.contains(colId)) { + errStr = Sprintf("Duplicate column id: %" PRIu32, colId); + return std::make_pair(nullptr, errStr); + } + + nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId; + + TTableInfo::TColumn& column = externalTableInfo->Columns[colId]; + column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here? + + auto& schemaColumn= *schema.add_column(); + schemaColumn.set_name(colName); + *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull()); + } + + try { + NKikimrExternalSources::TGeneral general; + general.ParseFromStringOrThrow(desc.GetContent()); + const auto source = factory->GetOrCreate(sourceType); + if (!source->HasExternalTable()) { + errStr = TStringBuilder{} << "External table isn't supported for " << sourceType; + return std::make_pair(nullptr, errStr); + } + externalTableInfo->Content = source->Pack(schema, general); + } catch (...) { + errStr = CurrentExceptionMessage(); + return std::make_pair(nullptr, errStr); + } + + return std::make_pair(externalTableInfo, Nothing()); +} + + +} // namespace NKikimr::NSchemeShard::NExternalDataSource diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h new file mode 100644 index 0000000000..9b74b2e658 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h @@ -0,0 +1,49 @@ +#pragma once + +#include "schemeshard__operation_part.h" +#include "schemeshard_impl.h" + +#include <ydb/core/tablet_flat/test/libs/table/test_iter.h> + +#include <utility> + +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define RETURN_RESULT_UNLESS(x) if (!(x)) return result; + +namespace NKikimr::NSchemeShard::NExternalTable { + +inline TPath::TChecker IsParentPathValid(const TPath& parentPath) { + return parentPath.Check() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); +} + +inline bool IsParentPathValid(const THolder<TProposeResponse>& result, + const TPath& parentPath) { + const auto checks = IsParentPathValid(parentPath); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + } + + return static_cast<bool>(checks); +} + +bool Validate(const TString& sourceType, + const NKikimrSchemeOp::TExternalTableDescription& desc, + TString& errStr); + +std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable( + const TString& sourceType, + const NKikimrSchemeOp::TExternalTableDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory, + ui64 alterVersion); + + +} // namespace NKikimr::NSchemeShard::NExternalDataSource diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp index 9fbd47b0ad..85780c4888 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp @@ -1,101 +1,15 @@ +#include "schemeshard__operation_common_external_data_source.h" #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" #include "schemeshard_impl.h" #include <ydb/core/base/subdomain.h> -#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) -#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) - namespace { using namespace NKikimr; using namespace NSchemeShard; -constexpr uint32_t MAX_FIELD_SIZE = 1000; -constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB - -bool ValidateLocationAndInstallation(const TString& location, const TString& installation, TString& errStr) { - if (location.Size() > MAX_FIELD_SIZE) { - errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); - return false; - } - if (installation.Size() > MAX_FIELD_SIZE) { - errStr = Sprintf("Maximum length of installation must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, installation.Size()); - return false; - } - return true; -} - -bool CheckAuth(const TString& authMethod, const TVector<TString>& availableAuthMethods, TString& errStr) { - if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) { - errStr = TStringBuilder{} << authMethod << " isn't supported for this source type"; - return false; - } - - return true; -} - -bool ValidateProperties(const NKikimrSchemeOp::TExternalDataSourceProperties& properties, TString& errStr) { - if (properties.ByteSizeLong() > MAX_PROTOBUF_SIZE) { - errStr = Sprintf("Maximum size of properties must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, properties.ByteSizeLong()); - return false; - } - return true; -} - -bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, const NKikimr::NExternalSource::IExternalSource::TPtr& source, TString& errStr) { - if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) { - errStr = Sprintf("Maximum size of authorization information must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, auth.ByteSizeLong()); - return false; - } - const auto availableAuthMethods = source->GetAuthMethods(); - switch (auth.identity_case()) { - case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: { - errStr = "Authorization method isn't specified"; - return false; - } - case NKikimrSchemeOp::TAuth::kServiceAccount: - return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr); - case NKikimrSchemeOp::TAuth::kMdbBasic: - return CheckAuth("MDB_BASIC", availableAuthMethods, errStr); - case NKikimrSchemeOp::TAuth::kBasic: - return CheckAuth("BASIC", availableAuthMethods, errStr); - case NKikimrSchemeOp::TAuth::kAws: - return CheckAuth("AWS", availableAuthMethods, errStr); - case NKikimrSchemeOp::TAuth::kNone: - return CheckAuth("NONE", availableAuthMethods, errStr); - } - return false; -} - -bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { - try { - auto source = factory->GetOrCreate(desc.GetSourceType()); - source->ValidateExternalDataSource(desc.SerializeAsString()); - return ValidateLocationAndInstallation(desc.GetLocation(), desc.GetInstallation(), errStr) - && ValidateAuth(desc.GetAuth(), source, errStr) - && ValidateProperties(desc.GetProperties(), errStr); - } catch (...) { - errStr = CurrentExceptionMessage(); - return false; - } -} - -TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { - if (!Validate(desc, factory, errStr)) { - return nullptr; - } - TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo; - externalDataSoureInfo->SourceType = desc.GetSourceType(); - externalDataSoureInfo->Location = desc.GetLocation(); - externalDataSoureInfo->Installation = desc.GetInstallation(); - externalDataSoureInfo->AlterVersion = 1; - externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth()); - externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties()); - return externalDataSoureInfo; -} - class TPropose: public TSubOperationState { private: const TOperationId OperationId; @@ -108,7 +22,7 @@ private: public: explicit TPropose(TOperationId id) - : OperationId(id) + : OperationId(std::move(id)) { } @@ -118,13 +32,13 @@ public: LOG_I(DebugHint() << "HandleReply TEvOperationPlan" << ": step# " << step); - TTxState* txState = context.SS->FindTx(OperationId); + const TTxState* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalDataSource); - auto pathId = txState->TargetPathId; - auto path = TPath::Init(pathId, context.SS); - TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + const auto pathId = txState->TargetPathId; + const auto path = TPath::Init(pathId, context.SS); + const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1); @@ -145,7 +59,7 @@ public: bool ProgressState(TOperationContext& context) override { LOG_I(DebugHint() << "ProgressState"); - TTxState* txState = context.SS->FindTx(OperationId); + const TTxState* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalDataSource); @@ -154,11 +68,8 @@ public: } }; - -class TCreateExternalDataSource: public TSubOperation { - static TTxState::ETxState NextState() { - return TTxState::Propose; - } +class TCreateExternalDataSource : public TSubOperation { + static TTxState::ETxState NextState() { return TTxState::Propose; } TTxState::ETxState NextState(TTxState::ETxState state) const override { switch (state) { @@ -182,130 +93,189 @@ class TCreateExternalDataSource: public TSubOperation { } } -public: - using TSubOperation::TSubOperation; - - THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { - const auto ssId = context.SS->SelfTabletId(); - - const auto acceptExisted = !Transaction.GetFailOnExist(); - const TString& parentPathStr = Transaction.GetWorkingDir(); - const auto& externalDataSoureDescription = Transaction.GetCreateExternalDataSource(); - const TString& name = externalDataSoureDescription.GetName(); - - - LOG_N("TCreateExternalDataSource Propose" - << ": opId# " << OperationId - << ", path# " << parentPathStr << "/" << name); - - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); - - TPath parentPath = TPath::Resolve(parentPathStr, context.SS); - { - auto checks = parentPath.Check(); + static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, + const TPath& dstPath, + const TString& acl, + bool acceptExisted) { + const auto checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() .IsResolved() - .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() - .IsLikeDirectory(); - - if (!checks) { - result->SetError(checks.GetStatus(), checks.GetError()); - return result; - } + .FailOnExist(TPathElement::EPathType::EPathTypeExternalDataSource, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); } - const TString acl = Transaction.GetModifyACL().GetDiffACL(); + if (checks) { + checks + .IsValidLeafName() + .DepthLimit() + .PathsLimit() + .DirChildrenLimit() + .IsValidACL(acl); + } - TPath dstPath = parentPath.Child(name); - { - auto checks = dstPath.Check(); - checks.IsAtLocalSchemeShard(); + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); if (dstPath.IsResolved()) { - checks - .IsResolved() - .NotUnderDeleting() - .FailOnExist(TPathElement::EPathType::EPathTypeExternalDataSource, acceptExisted); - } else { - checks - .NotEmpty() - .NotResolved(); - } - - if (checks) { - checks - .IsValidLeafName() - .DepthLimit() - .PathsLimit() - .DirChildrenLimit() - .IsValidACL(acl); - } - - if (!checks) { - result->SetError(checks.GetStatus(), checks.GetError()); - if (dstPath.IsResolved()) { - result->SetPathCreateTxId(ui64(dstPath.Base()->CreateTxId)); - result->SetPathId(dstPath.Base()->PathId.LocalPathId); - } - return result; + result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId)); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); } } - TString errStr; - if (!context.SS->CheckApplyIf(Transaction, errStr)) { - result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); - return result; + return static_cast<bool>(checks); + } + + bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, + const TOperationContext& context) const { + TString errorMessage; + if (!context.SS->CheckApplyIf(Transaction, errorMessage)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage); + return false; } + return true; + } - TExternalDataSourceInfo::TPtr externalDataSoureInfo = CreateExternalDataSource(externalDataSoureDescription, context.SS->ExternalSourceFactory, errStr); - if (!externalDataSoureInfo) { - result->SetError(NKikimrScheme::StatusSchemeError, errStr); - return result; + static bool IsDescriptionValid( + const THolder<TProposeResponse>& result, + const NKikimrSchemeOp::TExternalDataSourceDescription& desc, + const NExternalSource::IExternalSourceFactory::TPtr& factory) { + TString errorMessage; + if (!NExternalDataSource::Validate(desc, factory, errorMessage)) { + result->SetError(NKikimrScheme::StatusSchemeError, errorMessage); + return false; } + return true; + } + static void AddPathInSchemeShard( + const THolder<TProposeResponse>& result, TPath& dstPath, const TString& owner) { dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } - TPathElement::TPtr externalDataSoure = dstPath.Base(); - externalDataSoure->CreateTxId = OperationId.GetTxId(); - externalDataSoure->LastTxId = OperationId.GetTxId(); - externalDataSoure->PathState = TPathElement::EPathState::EPathStateCreate; - externalDataSoure->PathType = TPathElement::EPathType::EPathTypeExternalDataSource; + TPathElement::TPtr CreateExternalDataSourcePathElement(const TPath& dstPath) const { + TPathElement::TPtr externalDataSource = dstPath.Base(); - TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalDataSource, externalDataSoure->PathId); - txState.Shards.clear(); + externalDataSource->CreateTxId = OperationId.GetTxId(); + externalDataSource->PathType = TPathElement::EPathType::EPathTypeExternalDataSource; + externalDataSource->PathState = TPathElement::EPathState::EPathStateCreate; + externalDataSource->LastTxId = OperationId.GetTxId(); - NIceDb::TNiceDb db(context.GetDB()); + return externalDataSource; + } + + void CreateTransaction(const TOperationContext &context, + const TPathId &externalDataSourcePathId) const { + TTxState& txState = context.SS->CreateTx(OperationId, + TTxState::TxCreateExternalDataSource, + externalDataSourcePathId); + txState.Shards.clear(); + } + void RegisterParentPathDependencies(const TOperationContext& context, + const TPath& parentPath) const { if (parentPath.Base()->HasActiveChanges()) { - TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; + const TTxId parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId + : parentPath.Base()->LastTxId; context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); } + } + void AdvanceTransactionStateToPropose(const TOperationContext& context, + NIceDb::TNiceDb& db) const { context.SS->ChangeTxState(db, OperationId, TTxState::Propose); context.OnComplete.ActivateTx(OperationId); + } - context.SS->ExternalDataSources[externalDataSoure->PathId] = externalDataSoureInfo; - context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1); - context.SS->IncrementPathDbRefCount(externalDataSoure->PathId); + void PersistExternalDataSource( + const TOperationContext& context, + NIceDb::TNiceDb& db, + const TPathElement::TPtr& externalDataSourcePath, + const TExternalDataSourceInfo::TPtr& externalDataSourceInfo, + const TString& acl) const { + const auto& externalDataSourcePathId = externalDataSourcePath->PathId; - context.SS->PersistPath(db, externalDataSoure->PathId); + context.SS->ExternalDataSources[externalDataSourcePathId] = externalDataSourceInfo; + context.SS->IncrementPathDbRefCount(externalDataSourcePathId); + context.SS->PersistPath(db, externalDataSourcePathId); if (!acl.empty()) { - externalDataSoure->ApplyACL(acl); - context.SS->PersistACL(db, externalDataSoure); + externalDataSourcePath->ApplyACL(acl); + context.SS->PersistACL(db, externalDataSourcePath); } - context.SS->PersistExternalDataSource(db, externalDataSoure->PathId, externalDataSoureInfo); + context.SS->PersistExternalDataSource(db, + externalDataSourcePathId, + externalDataSourceInfo); context.SS->PersistTxState(db, OperationId); + } - IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete); - + static void UpdatePathSizeCounts(const TPath& parentPath, const TPath& dstPath) { dstPath.DomainInfo()->IncPathsInside(); parentPath.Base()->IncAliveChildren(); + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString& owner, + TOperationContext& context) override { + const auto acceptExisted = !Transaction.GetFailOnExist(); + const auto ssId = context.SS->SelfTabletId(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + const auto& externalDataSourceDescription = + Transaction.GetCreateExternalDataSource(); + const TString& name = externalDataSourceDescription.GetName(); + + LOG_N("TCreateExternalDataSource Propose" + << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, + static_cast<ui64>(OperationId.GetTxId()), + static_cast<ui64>(ssId)); + + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath)); + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + TPath dstPath = parentPath.Child(name); + + RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl, acceptExisted)); + RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context)); + RETURN_RESULT_UNLESS(IsDescriptionValid(result, + externalDataSourceDescription, + context.SS->ExternalSourceFactory)); + + const TExternalDataSourceInfo::TPtr externalDataSourceInfo = + NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription, 1); + Y_ABORT_UNLESS(externalDataSourceInfo); + + AddPathInSchemeShard(result, dstPath, owner); + const TPathElement::TPtr externalDataSource = + CreateExternalDataSourcePathElement(dstPath); + CreateTransaction(context, externalDataSource->PathId); + + NIceDb::TNiceDb db(context.GetDB()); + + RegisterParentPathDependencies(context, parentPath); + + AdvanceTransactionStateToPropose(context, db); + + PersistExternalDataSource(context, db, externalDataSource, + externalDataSourceInfo, acl); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, + dstPath, + context.SS, + context.OnComplete); + + UpdatePathSizeCounts(parentPath, dstPath); SetState(NextState()); return result; @@ -313,19 +283,18 @@ public: void AbortPropose(TOperationContext& context) override { LOG_N("TCreateExternalDataSource AbortPropose" - << ": opId# " << OperationId); + << ": opId# " << OperationId); Y_ABORT("no AbortPropose for TCreateExternalDataSource"); } void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { LOG_N("TCreateExternalDataSource AbortUnsafe" - << ": opId# " << OperationId - << ", txId# " << forceDropTxId); + << ": opId# " << OperationId << ", txId# " << forceDropTxId); context.OnComplete.DoneOperation(OperationId); } }; -} +} // namespace namespace NKikimr::NSchemeShard { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp index 10a46d30a8..a82af9859b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp @@ -1,192 +1,55 @@ +#include "schemeshard__operation_common_external_table.h" #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" #include "schemeshard_impl.h" -#include <ydb/core/base/subdomain.h> - -#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) -#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) - +#include <utility> namespace { using namespace NKikimr; using namespace NSchemeShard; -constexpr uint32_t MAX_FIELD_SIZE = 1000; -constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB - -bool ValidateSourceType(const TString& sourceType, TString& errStr) { - // Only object storage supported today - if (sourceType != "ObjectStorage") { - errStr = "Only ObjectStorage source type supported but got " + sourceType; - return false; - } - return true; -} - -bool ValidateLocation(const TString& location, TString& errStr) { - if (!location) { - errStr = "Location must not be empty"; - return false; - } - if (location.Size() > MAX_FIELD_SIZE) { - errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); - return false; - } - return true; -} - -bool ValidateContent(const TString& content, TString& errStr) { - if (content.Size() > MAX_PROTOBUF_SIZE) { - errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size()); - return false; - } - return true; -} - -bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) { - if (!dataSourcePath) { - errStr = "Data source path must not be empty"; - return false; - } - return true; -} - -bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { - return ValidateSourceType(sourceType, errStr) - && ValidateLocation(desc.GetLocation(), errStr) - && ValidateContent(desc.GetContent(), errStr) - && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr); -} - -Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { - Ydb::Type ydbType; - if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { - auto* typeDesc = typeInfo.GetTypeDesc(); - auto* pg = ydbType.mutable_pg_type(); - pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); - pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); - } else { - auto& item = notNull - ? ydbType - : *ydbType.mutable_optional_type()->mutable_item(); - item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); - } - return ydbType; -} - -TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { - if (!Validate(sourceType, desc, errStr)) { - return nullptr; - } +class TPropose: public TSubOperationState { +private: + const TOperationId OperationId; - if (!desc.ColumnsSize()) { - errStr = "The schema must have at least one column"; - return nullptr; + TString DebugHint() const override { + return TStringBuilder() + << "TCreateExternalTable TPropose" + << ", operationId: " << OperationId; } - TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo; - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; - - if (desc.GetSourceType() != "General") { - errStr = "Only general data source has been supported as request"; - return nullptr; + void IncrementExternalTableCounter(const TOperationContext& context) const { + context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Add(1); } - externalTableInfo->DataSourcePath = desc.GetDataSourcePath(); - externalTableInfo->Location = desc.GetLocation(); - externalTableInfo->AlterVersion = 1; - externalTableInfo->SourceType = sourceType; - - NKikimrExternalSources::TSchema schema; - uint64_t nextColumnId = 1; - for (const auto& col : desc.GetColumns()) { - TString colName = col.GetName(); - - if (!colName) { - errStr = "Columns cannot have an empty name"; - return nullptr; - } - - if (col.HasTypeId()) { - errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type"; - return nullptr; - } - - if (!col.HasType()) { - errStr = TStringBuilder() << "Missing Type for column '" << colName << "'"; - return nullptr; - } - - auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType()); - const NScheme::IType* type = typeRegistry->GetType(typeName); - - NScheme::TTypeInfo typeInfo; - if (type) { - // Only allow YQL types - if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) { - errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data()); - return nullptr; - } - typeInfo = NScheme::TTypeInfo(type->GetTypeId()); - } else { - auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName); - if (!typeDesc) { - errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data()); - return nullptr; - } - typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); - } - - ui32 colId = col.HasId() ? col.GetId() : nextColumnId; - if (externalTableInfo->Columns.contains(colId)) { - errStr = Sprintf("Duplicate column id: %" PRIu32, colId); - return nullptr; - } - - nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId; - - TTableInfo::TColumn& column = externalTableInfo->Columns[colId]; - column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here? - - auto& schemaColumn= *schema.add_column(); - schemaColumn.set_name(colName); - *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull()); + void SetAndPersistCreateStep(const TOperationContext& context, + NIceDb::TNiceDb& db, + const TPath& path, + const TPathId& pathId, + const TStepId& step) const { + path->StepCreated = step; + context.SS->PersistCreateStep(db, pathId, step); } - try { - NKikimrExternalSources::TGeneral general; - general.ParseFromStringOrThrow(desc.GetContent()); - auto source = factory->GetOrCreate(sourceType); - if (!source->HasExternalTable()) { - errStr = TStringBuilder{} << "External table isn't supported for " << sourceType; - return nullptr; - } - externalTableInfo->Content = source->Pack(schema, general); - } catch (...) { - errStr = CurrentExceptionMessage(); - return nullptr; + void ClearDescribePathCaches(const TOperationContext& context, + const TPathElement::TPtr& pathPtr, + const TPathElement::TPtr& dataSourcePathPtr) const { + context.SS->ClearDescribePathCaches(pathPtr); + context.SS->ClearDescribePathCaches(dataSourcePathPtr); } - return externalTableInfo; -} - -class TPropose: public TSubOperationState { -private: - const TOperationId OperationId; - - TString DebugHint() const override { - return TStringBuilder() - << "TCreateExternalTable TPropose" - << ", operationId: " << OperationId; + void PublishToSchemeBoard(const TOperationContext& context, + const TPathId& pathId, + const TPathId& dataSourcePathId) const { + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId); } public: explicit TPropose(TOperationId id) - : OperationId(id) - { - } + : OperationId(std::move(id)) { } bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { const TStepId step = TStepId(ev->Get()->StepId); @@ -194,29 +57,27 @@ public: LOG_I(DebugHint() << " HandleReply TEvOperationPlan" << ": step# " << step); - TTxState* txState = context.SS->FindTx(OperationId); + const TTxState* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalTable); - auto pathId = txState->TargetPathId; - auto dataSourcePathId = txState->SourcePathId; - auto path = TPath::Init(pathId, context.SS); - TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); - TPathElement::TPtr dataSourcePathPtr = context.SS->PathsById.at(dataSourcePathId); + const auto pathId = txState->TargetPathId; + const auto dataSourcePathId = txState->SourcePathId; + const auto path = TPath::Init(pathId, context.SS); + const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + const TPathElement::TPtr dataSourcePathPtr = + context.SS->PathsById.at(dataSourcePathId); - context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Add(1); + IncrementExternalTableCounter(context); NIceDb::TNiceDb db(context.GetDB()); - path->StepCreated = step; - context.SS->PersistCreateStep(db, pathId, step); + SetAndPersistCreateStep(context, db, path, pathId, step); IncParentDirAlterVersionWithRepublish(OperationId, path, context); - context.SS->ClearDescribePathCaches(pathPtr); - context.SS->ClearDescribePathCaches(dataSourcePathPtr); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); - context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId); + ClearDescribePathCaches(context, pathPtr, dataSourcePathPtr); + PublishToSchemeBoard(context, pathId, dataSourcePathId); context.SS->ChangeTxState(db, OperationId, TTxState::Done); return true; @@ -225,7 +86,7 @@ public: bool ProgressState(TOperationContext& context) override { LOG_I(DebugHint() << " ProgressState"); - TTxState* txState = context.SS->FindTx(OperationId); + const TTxState* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalTable); @@ -236,6 +97,7 @@ public: class TCreateExternalTable: public TSubOperation { +private: static TTxState::ETxState NextState() { return TTxState::Propose; } @@ -262,138 +124,152 @@ class TCreateExternalTable: public TSubOperation { } } -public: - using TSubOperation::TSubOperation; - - THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { - const auto ssId = context.SS->SelfTabletId(); - - const auto acceptExisted = !Transaction.GetFailOnExist(); - const TString& parentPathStr = Transaction.GetWorkingDir(); - const auto& externalTableDescription = Transaction.GetCreateExternalTable(); - const TString& name = externalTableDescription.GetName(); - - LOG_N("TCreateExternalTable Propose" - << ": opId# " << OperationId - << ", path# " << parentPathStr << "/" << name); - - auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); - - TPath parentPath = TPath::Resolve(parentPathStr, context.SS); - { - auto checks = parentPath.Check(); + static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, + const TPath& dstPath, + const TString& acl, + bool acceptExisted) { + const auto checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() .IsResolved() - .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() - .IsLikeDirectory(); - - if (!checks) { - result->SetError(checks.GetStatus(), checks.GetError()); - return result; - } + .FailOnExist(TPathElement::EPathType::EPathTypeExternalTable, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); } - const TString acl = Transaction.GetModifyACL().GetDiffACL(); + if (checks) { + checks + .IsValidLeafName() + .DepthLimit() + .PathsLimit() + .DirChildrenLimit() + .IsValidACL(acl); + } - TPath dstPath = parentPath.Child(name); - { - auto checks = dstPath.Check(); - checks.IsAtLocalSchemeShard(); + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); if (dstPath.IsResolved()) { - checks - .IsResolved() - .NotUnderDeleting() - .FailOnExist(TPathElement::EPathType::EPathTypeExternalTable, acceptExisted); - } else { - checks - .NotEmpty() - .NotResolved(); + result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId)); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); } + } - if (checks) { - checks - .IsValidLeafName() - .DepthLimit() - .PathsLimit() - .DirChildrenLimit() - .IsValidACL(acl); - } + return static_cast<bool>(checks); + } - if (!checks) { - result->SetError(checks.GetStatus(), checks.GetError()); - if (dstPath.IsResolved()) { - result->SetPathCreateTxId(ui64(dstPath.Base()->CreateTxId)); - result->SetPathId(dstPath.Base()->PathId.LocalPathId); - } - return result; - } + static bool IsDataSourcePathValid(const THolder<TProposeResponse>& result, const TPath& dataSourcePath) { + const auto checks = dataSourcePath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsExternalDataSource() + .NotUnderOperation(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); } - TPath dataSourcePath = TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS); - { - auto checks = dataSourcePath.Check(); - checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .IsCommonSensePath() - .IsExternalDataSource() - .NotUnderOperation(); + return static_cast<bool>(checks); + } - if (!checks) { - result->SetError(checks.GetStatus(), checks.GetError()); - return result; - } + bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, + const TOperationContext& context) const { + TString errorMessage; + if (!context.SS->CheckApplyIf(Transaction, errorMessage)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage); + return false; } - TExternalDataSourceInfo::TPtr externalDataSource = context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr); + return true; + } + + static bool IsDataSourceValid(const THolder<TProposeResponse>& result, + const TExternalDataSourceInfo::TPtr& externalDataSource) { if (!externalDataSource) { result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist"); - return result; - } - TString errStr; - if (!context.SS->CheckApplyIf(Transaction, errStr)) { - result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); - return result; + return false; } + return true; + } - TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalDataSource->SourceType, externalTableDescription, context.SS->ExternalSourceFactory, errStr); - if (!externalTableInfo) { - result->SetError(NKikimrScheme::StatusSchemeError, errStr); - return result; + static bool IsExternalTableDescriptionValid( + const THolder<TProposeResponse>& result, + const TString& sourceType, + const NKikimrSchemeOp::TExternalTableDescription& desc) { + if (TString errorMessage; !NExternalTable::Validate(sourceType, desc, errorMessage)) { + result->SetError(NKikimrScheme::StatusSchemeError, errorMessage); + return false; } + return true; + } + + static void AddPathInSchemeShard(const THolder<TProposeResponse> &result, + TPath &dstPath, const TString &owner) { dstPath.MaterializeLeaf(owner); result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + TPathElement::TPtr CreateExternalTablePathElement(const TPath& dstPath) const { TPathElement::TPtr externalTable = dstPath.Base(); - externalTable->CreateTxId = OperationId.GetTxId(); - externalTable->LastTxId = OperationId.GetTxId(); + externalTable->CreateTxId = OperationId.GetTxId(); + externalTable->PathType = TPathElement::EPathType::EPathTypeExternalTable; externalTable->PathState = TPathElement::EPathState::EPathStateCreate; - externalTable->PathType = TPathElement::EPathType::EPathTypeExternalTable; + externalTable->LastTxId = OperationId.GetTxId(); - TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable, externalTable->PathId, dataSourcePath->PathId); - txState.Shards.clear(); + return externalTable; + } - NIceDb::TNiceDb db(context.GetDB()); + void CreateTransaction(const TOperationContext &context, + const TPathId &externalTablePathId, + const TPathId &externalDataSourcePathId) const { + TTxState &txState = + context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable, + externalTablePathId, externalDataSourcePathId); + txState.Shards.clear(); + } + void RegisterParentPathDependencies(const TOperationContext& context, + const TPath& parentPath) const { if (parentPath.Base()->HasActiveChanges()) { - TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; + const auto parentTxId = parentPath.Base()->PlannedToCreate() + ? parentPath.Base()->CreateTxId + : parentPath.Base()->LastTxId; context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); } + } + void AdvanceTransactionStateToPropose(const TOperationContext& context, + NIceDb::TNiceDb& db) const { context.SS->ChangeTxState(db, OperationId, TTxState::Propose); context.OnComplete.ActivateTx(OperationId); + } + static void LinkExternalDataSourceWithExternalTable( + const TExternalDataSourceInfo::TPtr& externalDataSource, + const TPathElement::TPtr& externalTable, + const TPath& dstPath) { auto& reference = *externalDataSource->ExternalTableReferences.AddReferences(); reference.SetPath(dstPath.PathString()); PathIdFromPathId(externalTable->PathId, reference.MutablePathId()); + } + + void PersistExternalTable( + const TOperationContext& context, + NIceDb::TNiceDb& db, + const TPathElement::TPtr& externalTable, + const TExternalTableInfo::TPtr& externalTableInfo, + const TPathId& externalDataSourcePathId, + const TExternalDataSourceInfo::TPtr& externalDataSource, + const TString& acl) const { context.SS->ExternalTables[externalTable->PathId] = externalTableInfo; context.SS->IncrementPathDbRefCount(externalTable->PathId); @@ -404,14 +280,96 @@ public: context.SS->PersistACL(db, externalTable); } - context.SS->PersistExternalDataSource(db, dataSourcePath->PathId, externalDataSource); + context.SS->PersistExternalDataSource(db, externalDataSourcePathId, externalDataSource); context.SS->PersistExternalTable(db, externalTable->PathId, externalTableInfo); context.SS->PersistTxState(db, OperationId); + } - IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete); - + static void UpdatePathSizeCounts(const TPath& parentPath, + const TPath& dstPath) { dstPath.DomainInfo()->IncPathsInside(); parentPath.Base()->IncAliveChildren(); + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { + const auto ssId = context.SS->SelfTabletId(); + + const auto acceptExisted = !Transaction.GetFailOnExist(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + const auto& externalTableDescription = Transaction.GetCreateExternalTable(); + const TString& name = externalTableDescription.GetName(); + + LOG_N("TCreateExternalTable Propose" + << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, + static_cast<ui64>(OperationId.GetTxId()), + static_cast<ui64>(ssId)); + + const auto parentPath = TPath::Resolve(parentPathStr, context.SS); + RETURN_RESULT_UNLESS(NExternalTable::IsParentPathValid(result, parentPath)); + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + TPath dstPath = parentPath.Child(name); + RETURN_RESULT_UNLESS(IsDestinationPathValid( + result, dstPath, acl, acceptExisted)); + + const auto dataSourcePath = + TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS); + RETURN_RESULT_UNLESS(IsDataSourcePathValid(result, dataSourcePath)); + + const auto externalDataSource = + context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr); + RETURN_RESULT_UNLESS(IsDataSourceValid(result, externalDataSource)); + + RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context)); + + RETURN_RESULT_UNLESS(IsExternalTableDescriptionValid(result, + externalDataSource->SourceType, + externalTableDescription)); + + auto [externalTableInfo, maybeError] = + NExternalTable::CreateExternalTable(externalDataSource->SourceType, + externalTableDescription, + context.SS->ExternalSourceFactory, + 1); + if (maybeError) { + result->SetError(NKikimrScheme::StatusSchemeError, *maybeError); + return result; + } + Y_ABORT_UNLESS(externalTableInfo); + + AddPathInSchemeShard(result, dstPath, owner); + + const auto externalTable = CreateExternalTablePathElement(dstPath); + CreateTransaction(context, externalTable->PathId, dataSourcePath->PathId); + + NIceDb::TNiceDb db(context.GetDB()); + + RegisterParentPathDependencies(context, parentPath); + AdvanceTransactionStateToPropose(context, db); + + LinkExternalDataSourceWithExternalTable(externalDataSource, + externalTable, + dstPath); + + PersistExternalTable(context, + db, + externalTable, + externalTableInfo, + dataSourcePath->PathId, + externalDataSource, + acl); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, + dstPath, + context.SS, + context.OnComplete); + + UpdatePathSizeCounts(parentPath, dstPath); SetState(NextState()); return result; @@ -441,7 +399,7 @@ ISubOperation::TPtr CreateNewExternalTable(TOperationId id, const TTxTransaction ISubOperation::TPtr CreateNewExternalTable(TOperationId id, TTxState::ETxState state) { Y_ABORT_UNLESS(state != TTxState::Invalid); - return MakeSubOperation<TCreateExternalTable>(id, state); + return MakeSubOperation<TCreateExternalTable>(std::move(id), state); } } diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index d1e8f55992..3d2a5004cc 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -94,6 +94,8 @@ SRCS( schemeshard__operation_cancel_tx.cpp schemeshard__operation_common.cpp schemeshard__operation_common.h + schemeshard__operation_common_external_data_source.cpp + schemeshard__operation_common_external_table.cpp schemeshard__operation_common_subdomain.h schemeshard__operation_consistent_copy_tables.cpp schemeshard__operation_copy_table.cpp |