aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Uzhegov <uzhegov37@yahoo.com>2024-01-30 16:49:49 +0300
committerGitHub <noreply@github.com>2024-01-30 15:49:49 +0200
commit9da224918cbef0fd8d3ee032ad41f8e18e631e60 (patch)
treef5a22099c66a2bde56ab0a7f89c9899ddd9be36c
parent3311d10b22651f987a52656368ee1d1050ccdacf (diff)
downloadydb-9da224918cbef0fd8d3ee032ad41f8e18e631e60.tar.gz
[YQ-1997] Refactor create external data source / external table schemeshard operations (#1119)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp112
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h47
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp169
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h49
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp349
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp512
-rw-r--r--ydb/core/tx/schemeshard/ya.make2
7 files changed, 773 insertions, 467 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp
new file mode 100644
index 0000000000..b9ce6755aa
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.cpp
@@ -0,0 +1,112 @@
+#include "schemeshard__operation_common_external_data_source.h"
+
+#include <utility>
+
+namespace NKikimr::NSchemeShard::NExternalDataSource {
+
+constexpr uint32_t MAX_FIELD_SIZE = 1000;
+constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB
+
+bool ValidateLocationAndInstallation(const TString& location,
+ const TString& installation,
+ TString& errStr) {
+ if (location.Size() > MAX_FIELD_SIZE) {
+ errStr =
+ Sprintf("Maximum length of location must be less or equal equal to %u but got %lu",
+ MAX_FIELD_SIZE,
+ location.Size());
+ return false;
+ }
+ if (installation.Size() > MAX_FIELD_SIZE) {
+ errStr = Sprintf(
+ "Maximum length of installation must be less or equal equal to %u but got %lu",
+ MAX_FIELD_SIZE,
+ installation.Size());
+ return false;
+ }
+ return true;
+}
+
+bool CheckAuth(const TString& authMethod,
+ const TVector<TString>& availableAuthMethods,
+ TString& errStr) {
+ if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) {
+ errStr = TStringBuilder{} << authMethod << " isn't supported for this source type";
+ return false;
+ }
+
+ return true;
+}
+
+bool ValidateProperties(const NKikimrSchemeOp::TExternalDataSourceProperties& properties,
+ TString& errStr) {
+ if (properties.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
+ errStr =
+ Sprintf("Maximum size of properties must be less or equal equal to %u but got %lu",
+ MAX_PROTOBUF_SIZE,
+ properties.ByteSizeLong());
+ return false;
+ }
+ return true;
+}
+
+bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth,
+ const NExternalSource::IExternalSource::TPtr& source,
+ TString& errStr) {
+ if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
+ errStr = Sprintf(
+ "Maximum size of authorization information must be less or equal equal to %u but got %lu",
+ MAX_PROTOBUF_SIZE,
+ auth.ByteSizeLong());
+ return false;
+ }
+ const auto availableAuthMethods = source->GetAuthMethods();
+ switch (auth.identity_case()) {
+ case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: {
+ errStr = "Authorization method isn't specified";
+ return false;
+ }
+ case NKikimrSchemeOp::TAuth::kServiceAccount:
+ return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr);
+ case NKikimrSchemeOp::TAuth::kMdbBasic:
+ return CheckAuth("MDB_BASIC", availableAuthMethods, errStr);
+ case NKikimrSchemeOp::TAuth::kBasic:
+ return CheckAuth("BASIC", availableAuthMethods, errStr);
+ case NKikimrSchemeOp::TAuth::kAws:
+ return CheckAuth("AWS", availableAuthMethods, errStr);
+ case NKikimrSchemeOp::TAuth::kNone:
+ return CheckAuth("NONE", availableAuthMethods, errStr);
+ }
+ return false;
+}
+
+bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
+ const NExternalSource::IExternalSourceFactory::TPtr& factory,
+ TString& errStr) {
+ try {
+ const auto source = factory->GetOrCreate(desc.GetSourceType());
+ source->ValidateExternalDataSource(desc.SerializeAsString());
+ return ValidateLocationAndInstallation(desc.GetLocation(),
+ desc.GetInstallation(),
+ errStr) &&
+ ValidateAuth(desc.GetAuth(), source, errStr) &&
+ ValidateProperties(desc.GetProperties(), errStr);
+ } catch (...) {
+ errStr = CurrentExceptionMessage();
+ return false;
+ }
+}
+
+TExternalDataSourceInfo::TPtr CreateExternalDataSource(
+ const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion) {
+ TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo;
+ externalDataSoureInfo->SourceType = desc.GetSourceType();
+ externalDataSoureInfo->Location = desc.GetLocation();
+ externalDataSoureInfo->Installation = desc.GetInstallation();
+ externalDataSoureInfo->AlterVersion = alterVersion;
+ externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth());
+ externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties());
+ return externalDataSoureInfo;
+}
+
+} // namespace NKikimr::NSchemeShard::NExternalDataSource
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h
new file mode 100644
index 0000000000..79f1217492
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_data_source.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include "schemeshard__operation_part.h"
+#include "schemeshard_impl.h"
+
+#include <ydb/core/tablet_flat/test/libs/table/test_iter.h>
+
+#include <utility>
+
+#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
+#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
+#define RETURN_RESULT_UNLESS(x) if (!(x)) return result;
+
+namespace NKikimr::NSchemeShard::NExternalDataSource {
+
+inline TPath::TChecker IsParentPathValid(const TPath& parentPath) {
+ auto checks = parentPath.Check();
+ checks.NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsLikeDirectory();
+
+ return std::move(checks);
+}
+
+inline bool IsParentPathValid(const THolder<TProposeResponse>& result,
+ const TPath& parentPath) {
+ const auto checks = IsParentPathValid(parentPath);
+
+ if (!checks) {
+ result->SetError(checks.GetStatus(), checks.GetError());
+ }
+
+ return static_cast<bool>(checks);
+}
+
+bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
+ const NExternalSource::IExternalSourceFactory::TPtr& factory,
+ TString& errStr);
+
+TExternalDataSourceInfo::TPtr CreateExternalDataSource(
+ const NKikimrSchemeOp::TExternalDataSourceDescription& desc, ui64 alterVersion);
+
+} // namespace NKikimr::NSchemeShard::NExternalDataSource
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp
new file mode 100644
index 0000000000..8bed0aa2df
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.cpp
@@ -0,0 +1,169 @@
+#include "schemeshard__operation_common_external_table.h"
+
+#include <utility>
+
+namespace NKikimr::NSchemeShard::NExternalTable {
+
+constexpr uint32_t MAX_FIELD_SIZE = 1000;
+constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB
+
+bool ValidateSourceType(const TString& sourceType, TString& errStr) {
+ // Only object storage supported today
+ if (sourceType != "ObjectStorage") {
+ errStr = "Only ObjectStorage source type supported but got " + sourceType;
+ return false;
+ }
+ return true;
+}
+
+bool ValidateLocation(const TString& location, TString& errStr) {
+ if (!location) {
+ errStr = "Location must not be empty";
+ return false;
+ }
+ if (location.Size() > MAX_FIELD_SIZE) {
+ errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size());
+ return false;
+ }
+ return true;
+}
+
+bool ValidateContent(const TString& content, TString& errStr) {
+ if (content.Size() > MAX_PROTOBUF_SIZE) {
+ errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size());
+ return false;
+ }
+ return true;
+}
+
+bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) {
+ if (!dataSourcePath) {
+ errStr = "Data source path must not be empty";
+ return false;
+ }
+ return true;
+}
+
+bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) {
+ return ValidateSourceType(sourceType, errStr)
+ && ValidateLocation(desc.GetLocation(), errStr)
+ && ValidateContent(desc.GetContent(), errStr)
+ && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr);
+}
+
+Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) {
+ Ydb::Type ydbType;
+ if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
+ auto* typeDesc = typeInfo.GetTypeDesc();
+ auto* pg = ydbType.mutable_pg_type();
+ pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
+ pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
+ } else {
+ auto& item = notNull
+ ? ydbType
+ : *ydbType.mutable_optional_type()->mutable_item();
+ item.set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(typeInfo.GetTypeId()));
+ }
+ return ydbType;
+}
+
+std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable(
+ const TString& sourceType,
+ const NKikimrSchemeOp::TExternalTableDescription& desc,
+ const NExternalSource::IExternalSourceFactory::TPtr& factory,
+ ui64 alterVersion) {
+ TString errStr;
+
+ if (!desc.ColumnsSize()) {
+ errStr = "The schema must have at least one column";
+ return std::make_pair(nullptr, errStr);
+ }
+
+ TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo;
+ const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
+
+ if (desc.GetSourceType() != "General") {
+ errStr = "Only general data source has been supported as request";
+ return std::make_pair(nullptr, errStr);
+ }
+
+ externalTableInfo->DataSourcePath = desc.GetDataSourcePath();
+ externalTableInfo->Location = desc.GetLocation();
+ externalTableInfo->AlterVersion = alterVersion;
+ externalTableInfo->SourceType = sourceType;
+
+ NKikimrExternalSources::TSchema schema;
+ uint64_t nextColumnId = 1;
+ for (const auto& col : desc.GetColumns()) {
+ TString colName = col.GetName();
+
+ if (!colName) {
+ errStr = "Columns cannot have an empty name";
+ return std::make_pair(nullptr, errStr);
+ }
+
+ if (col.HasTypeId()) {
+ errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type";
+ return std::make_pair(nullptr, errStr);
+ }
+
+ if (!col.HasType()) {
+ errStr = TStringBuilder() << "Missing Type for column '" << colName << "'";
+ return std::make_pair(nullptr, errStr);
+ }
+
+ auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType());
+ const NScheme::IType* type = typeRegistry->GetType(typeName);
+
+ NScheme::TTypeInfo typeInfo;
+ if (type) {
+ // Only allow YQL types
+ if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
+ errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data());
+ return std::make_pair(nullptr, errStr);
+ }
+ typeInfo = NScheme::TTypeInfo(type->GetTypeId());
+ } else {
+ auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
+ if (!typeDesc) {
+ errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data());
+ return std::make_pair(nullptr, errStr);
+ }
+ typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
+ }
+
+ ui32 colId = col.HasId() ? col.GetId() : nextColumnId;
+ if (externalTableInfo->Columns.contains(colId)) {
+ errStr = Sprintf("Duplicate column id: %" PRIu32, colId);
+ return std::make_pair(nullptr, errStr);
+ }
+
+ nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId;
+
+ TTableInfo::TColumn& column = externalTableInfo->Columns[colId];
+ column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here?
+
+ auto& schemaColumn= *schema.add_column();
+ schemaColumn.set_name(colName);
+ *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull());
+ }
+
+ try {
+ NKikimrExternalSources::TGeneral general;
+ general.ParseFromStringOrThrow(desc.GetContent());
+ const auto source = factory->GetOrCreate(sourceType);
+ if (!source->HasExternalTable()) {
+ errStr = TStringBuilder{} << "External table isn't supported for " << sourceType;
+ return std::make_pair(nullptr, errStr);
+ }
+ externalTableInfo->Content = source->Pack(schema, general);
+ } catch (...) {
+ errStr = CurrentExceptionMessage();
+ return std::make_pair(nullptr, errStr);
+ }
+
+ return std::make_pair(externalTableInfo, Nothing());
+}
+
+
+} // namespace NKikimr::NSchemeShard::NExternalDataSource
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h
new file mode 100644
index 0000000000..9b74b2e658
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_external_table.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "schemeshard__operation_part.h"
+#include "schemeshard_impl.h"
+
+#include <ydb/core/tablet_flat/test/libs/table/test_iter.h>
+
+#include <utility>
+
+#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
+#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
+#define RETURN_RESULT_UNLESS(x) if (!(x)) return result;
+
+namespace NKikimr::NSchemeShard::NExternalTable {
+
+inline TPath::TChecker IsParentPathValid(const TPath& parentPath) {
+ return parentPath.Check()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsLikeDirectory();
+}
+
+inline bool IsParentPathValid(const THolder<TProposeResponse>& result,
+ const TPath& parentPath) {
+ const auto checks = IsParentPathValid(parentPath);
+
+ if (!checks) {
+ result->SetError(checks.GetStatus(), checks.GetError());
+ }
+
+ return static_cast<bool>(checks);
+}
+
+bool Validate(const TString& sourceType,
+ const NKikimrSchemeOp::TExternalTableDescription& desc,
+ TString& errStr);
+
+std::pair<TExternalTableInfo::TPtr, TMaybe<TString>> CreateExternalTable(
+ const TString& sourceType,
+ const NKikimrSchemeOp::TExternalTableDescription& desc,
+ const NExternalSource::IExternalSourceFactory::TPtr& factory,
+ ui64 alterVersion);
+
+
+} // namespace NKikimr::NSchemeShard::NExternalDataSource
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp
index 9fbd47b0ad..85780c4888 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp
@@ -1,101 +1,15 @@
+#include "schemeshard__operation_common_external_data_source.h"
#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"
#include <ydb/core/base/subdomain.h>
-#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
-#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
-
namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-constexpr uint32_t MAX_FIELD_SIZE = 1000;
-constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB
-
-bool ValidateLocationAndInstallation(const TString& location, const TString& installation, TString& errStr) {
- if (location.Size() > MAX_FIELD_SIZE) {
- errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size());
- return false;
- }
- if (installation.Size() > MAX_FIELD_SIZE) {
- errStr = Sprintf("Maximum length of installation must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, installation.Size());
- return false;
- }
- return true;
-}
-
-bool CheckAuth(const TString& authMethod, const TVector<TString>& availableAuthMethods, TString& errStr) {
- if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) {
- errStr = TStringBuilder{} << authMethod << " isn't supported for this source type";
- return false;
- }
-
- return true;
-}
-
-bool ValidateProperties(const NKikimrSchemeOp::TExternalDataSourceProperties& properties, TString& errStr) {
- if (properties.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
- errStr = Sprintf("Maximum size of properties must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, properties.ByteSizeLong());
- return false;
- }
- return true;
-}
-
-bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, const NKikimr::NExternalSource::IExternalSource::TPtr& source, TString& errStr) {
- if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) {
- errStr = Sprintf("Maximum size of authorization information must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, auth.ByteSizeLong());
- return false;
- }
- const auto availableAuthMethods = source->GetAuthMethods();
- switch (auth.identity_case()) {
- case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: {
- errStr = "Authorization method isn't specified";
- return false;
- }
- case NKikimrSchemeOp::TAuth::kServiceAccount:
- return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr);
- case NKikimrSchemeOp::TAuth::kMdbBasic:
- return CheckAuth("MDB_BASIC", availableAuthMethods, errStr);
- case NKikimrSchemeOp::TAuth::kBasic:
- return CheckAuth("BASIC", availableAuthMethods, errStr);
- case NKikimrSchemeOp::TAuth::kAws:
- return CheckAuth("AWS", availableAuthMethods, errStr);
- case NKikimrSchemeOp::TAuth::kNone:
- return CheckAuth("NONE", availableAuthMethods, errStr);
- }
- return false;
-}
-
-bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) {
- try {
- auto source = factory->GetOrCreate(desc.GetSourceType());
- source->ValidateExternalDataSource(desc.SerializeAsString());
- return ValidateLocationAndInstallation(desc.GetLocation(), desc.GetInstallation(), errStr)
- && ValidateAuth(desc.GetAuth(), source, errStr)
- && ValidateProperties(desc.GetProperties(), errStr);
- } catch (...) {
- errStr = CurrentExceptionMessage();
- return false;
- }
-}
-
-TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) {
- if (!Validate(desc, factory, errStr)) {
- return nullptr;
- }
- TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo;
- externalDataSoureInfo->SourceType = desc.GetSourceType();
- externalDataSoureInfo->Location = desc.GetLocation();
- externalDataSoureInfo->Installation = desc.GetInstallation();
- externalDataSoureInfo->AlterVersion = 1;
- externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth());
- externalDataSoureInfo->Properties.CopyFrom(desc.GetProperties());
- return externalDataSoureInfo;
-}
-
class TPropose: public TSubOperationState {
private:
const TOperationId OperationId;
@@ -108,7 +22,7 @@ private:
public:
explicit TPropose(TOperationId id)
- : OperationId(id)
+ : OperationId(std::move(id))
{
}
@@ -118,13 +32,13 @@ public:
LOG_I(DebugHint() << "HandleReply TEvOperationPlan"
<< ": step# " << step);
- TTxState* txState = context.SS->FindTx(OperationId);
+ const TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalDataSource);
- auto pathId = txState->TargetPathId;
- auto path = TPath::Init(pathId, context.SS);
- TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
+ const auto pathId = txState->TargetPathId;
+ const auto path = TPath::Init(pathId, context.SS);
+ const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1);
@@ -145,7 +59,7 @@ public:
bool ProgressState(TOperationContext& context) override {
LOG_I(DebugHint() << "ProgressState");
- TTxState* txState = context.SS->FindTx(OperationId);
+ const TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalDataSource);
@@ -154,11 +68,8 @@ public:
}
};
-
-class TCreateExternalDataSource: public TSubOperation {
- static TTxState::ETxState NextState() {
- return TTxState::Propose;
- }
+class TCreateExternalDataSource : public TSubOperation {
+ static TTxState::ETxState NextState() { return TTxState::Propose; }
TTxState::ETxState NextState(TTxState::ETxState state) const override {
switch (state) {
@@ -182,130 +93,189 @@ class TCreateExternalDataSource: public TSubOperation {
}
}
-public:
- using TSubOperation::TSubOperation;
-
- THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
- const auto ssId = context.SS->SelfTabletId();
-
- const auto acceptExisted = !Transaction.GetFailOnExist();
- const TString& parentPathStr = Transaction.GetWorkingDir();
- const auto& externalDataSoureDescription = Transaction.GetCreateExternalDataSource();
- const TString& name = externalDataSoureDescription.GetName();
-
-
- LOG_N("TCreateExternalDataSource Propose"
- << ": opId# " << OperationId
- << ", path# " << parentPathStr << "/" << name);
-
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId));
-
- TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
- {
- auto checks = parentPath.Check();
+ static bool IsDestinationPathValid(const THolder<TProposeResponse>& result,
+ const TPath& dstPath,
+ const TString& acl,
+ bool acceptExisted) {
+ const auto checks = dstPath.Check();
+ checks.IsAtLocalSchemeShard();
+ if (dstPath.IsResolved()) {
checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
.IsResolved()
- .NotDeleted()
.NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
-
- if (!checks) {
- result->SetError(checks.GetStatus(), checks.GetError());
- return result;
- }
+ .FailOnExist(TPathElement::EPathType::EPathTypeExternalDataSource, acceptExisted);
+ } else {
+ checks
+ .NotEmpty()
+ .NotResolved();
}
- const TString acl = Transaction.GetModifyACL().GetDiffACL();
+ if (checks) {
+ checks
+ .IsValidLeafName()
+ .DepthLimit()
+ .PathsLimit()
+ .DirChildrenLimit()
+ .IsValidACL(acl);
+ }
- TPath dstPath = parentPath.Child(name);
- {
- auto checks = dstPath.Check();
- checks.IsAtLocalSchemeShard();
+ if (!checks) {
+ result->SetError(checks.GetStatus(), checks.GetError());
if (dstPath.IsResolved()) {
- checks
- .IsResolved()
- .NotUnderDeleting()
- .FailOnExist(TPathElement::EPathType::EPathTypeExternalDataSource, acceptExisted);
- } else {
- checks
- .NotEmpty()
- .NotResolved();
- }
-
- if (checks) {
- checks
- .IsValidLeafName()
- .DepthLimit()
- .PathsLimit()
- .DirChildrenLimit()
- .IsValidACL(acl);
- }
-
- if (!checks) {
- result->SetError(checks.GetStatus(), checks.GetError());
- if (dstPath.IsResolved()) {
- result->SetPathCreateTxId(ui64(dstPath.Base()->CreateTxId));
- result->SetPathId(dstPath.Base()->PathId.LocalPathId);
- }
- return result;
+ result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId));
+ result->SetPathId(dstPath.Base()->PathId.LocalPathId);
}
}
- TString errStr;
- if (!context.SS->CheckApplyIf(Transaction, errStr)) {
- result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
- return result;
+ return static_cast<bool>(checks);
+ }
+
+ bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result,
+ const TOperationContext& context) const {
+ TString errorMessage;
+ if (!context.SS->CheckApplyIf(Transaction, errorMessage)) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage);
+ return false;
}
+ return true;
+ }
- TExternalDataSourceInfo::TPtr externalDataSoureInfo = CreateExternalDataSource(externalDataSoureDescription, context.SS->ExternalSourceFactory, errStr);
- if (!externalDataSoureInfo) {
- result->SetError(NKikimrScheme::StatusSchemeError, errStr);
- return result;
+ static bool IsDescriptionValid(
+ const THolder<TProposeResponse>& result,
+ const NKikimrSchemeOp::TExternalDataSourceDescription& desc,
+ const NExternalSource::IExternalSourceFactory::TPtr& factory) {
+ TString errorMessage;
+ if (!NExternalDataSource::Validate(desc, factory, errorMessage)) {
+ result->SetError(NKikimrScheme::StatusSchemeError, errorMessage);
+ return false;
}
+ return true;
+ }
+ static void AddPathInSchemeShard(
+ const THolder<TProposeResponse>& result, TPath& dstPath, const TString& owner) {
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
+ }
- TPathElement::TPtr externalDataSoure = dstPath.Base();
- externalDataSoure->CreateTxId = OperationId.GetTxId();
- externalDataSoure->LastTxId = OperationId.GetTxId();
- externalDataSoure->PathState = TPathElement::EPathState::EPathStateCreate;
- externalDataSoure->PathType = TPathElement::EPathType::EPathTypeExternalDataSource;
+ TPathElement::TPtr CreateExternalDataSourcePathElement(const TPath& dstPath) const {
+ TPathElement::TPtr externalDataSource = dstPath.Base();
- TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalDataSource, externalDataSoure->PathId);
- txState.Shards.clear();
+ externalDataSource->CreateTxId = OperationId.GetTxId();
+ externalDataSource->PathType = TPathElement::EPathType::EPathTypeExternalDataSource;
+ externalDataSource->PathState = TPathElement::EPathState::EPathStateCreate;
+ externalDataSource->LastTxId = OperationId.GetTxId();
- NIceDb::TNiceDb db(context.GetDB());
+ return externalDataSource;
+ }
+
+ void CreateTransaction(const TOperationContext &context,
+ const TPathId &externalDataSourcePathId) const {
+ TTxState& txState = context.SS->CreateTx(OperationId,
+ TTxState::TxCreateExternalDataSource,
+ externalDataSourcePathId);
+ txState.Shards.clear();
+ }
+ void RegisterParentPathDependencies(const TOperationContext& context,
+ const TPath& parentPath) const {
if (parentPath.Base()->HasActiveChanges()) {
- TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId;
+ const TTxId parentTxId = parentPath.Base()->PlannedToCreate()
+ ? parentPath.Base()->CreateTxId
+ : parentPath.Base()->LastTxId;
context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
}
+ }
+ void AdvanceTransactionStateToPropose(const TOperationContext& context,
+ NIceDb::TNiceDb& db) const {
context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
context.OnComplete.ActivateTx(OperationId);
+ }
- context.SS->ExternalDataSources[externalDataSoure->PathId] = externalDataSoureInfo;
- context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1);
- context.SS->IncrementPathDbRefCount(externalDataSoure->PathId);
+ void PersistExternalDataSource(
+ const TOperationContext& context,
+ NIceDb::TNiceDb& db,
+ const TPathElement::TPtr& externalDataSourcePath,
+ const TExternalDataSourceInfo::TPtr& externalDataSourceInfo,
+ const TString& acl) const {
+ const auto& externalDataSourcePathId = externalDataSourcePath->PathId;
- context.SS->PersistPath(db, externalDataSoure->PathId);
+ context.SS->ExternalDataSources[externalDataSourcePathId] = externalDataSourceInfo;
+ context.SS->IncrementPathDbRefCount(externalDataSourcePathId);
+ context.SS->PersistPath(db, externalDataSourcePathId);
if (!acl.empty()) {
- externalDataSoure->ApplyACL(acl);
- context.SS->PersistACL(db, externalDataSoure);
+ externalDataSourcePath->ApplyACL(acl);
+ context.SS->PersistACL(db, externalDataSourcePath);
}
- context.SS->PersistExternalDataSource(db, externalDataSoure->PathId, externalDataSoureInfo);
+ context.SS->PersistExternalDataSource(db,
+ externalDataSourcePathId,
+ externalDataSourceInfo);
context.SS->PersistTxState(db, OperationId);
+ }
- IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete);
-
+ static void UpdatePathSizeCounts(const TPath& parentPath, const TPath& dstPath) {
dstPath.DomainInfo()->IncPathsInside();
parentPath.Base()->IncAliveChildren();
+ }
+
+public:
+ using TSubOperation::TSubOperation;
+
+ THolder<TProposeResponse> Propose(const TString& owner,
+ TOperationContext& context) override {
+ const auto acceptExisted = !Transaction.GetFailOnExist();
+ const auto ssId = context.SS->SelfTabletId();
+ const TString& parentPathStr = Transaction.GetWorkingDir();
+ const auto& externalDataSourceDescription =
+ Transaction.GetCreateExternalDataSource();
+ const TString& name = externalDataSourceDescription.GetName();
+
+ LOG_N("TCreateExternalDataSource Propose"
+ << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name);
+
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted,
+ static_cast<ui64>(OperationId.GetTxId()),
+ static_cast<ui64>(ssId));
+
+ const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
+ RETURN_RESULT_UNLESS(NExternalDataSource::IsParentPathValid(result, parentPath));
+
+ const TString acl = Transaction.GetModifyACL().GetDiffACL();
+ TPath dstPath = parentPath.Child(name);
+
+ RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl, acceptExisted));
+ RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
+ RETURN_RESULT_UNLESS(IsDescriptionValid(result,
+ externalDataSourceDescription,
+ context.SS->ExternalSourceFactory));
+
+ const TExternalDataSourceInfo::TPtr externalDataSourceInfo =
+ NExternalDataSource::CreateExternalDataSource(externalDataSourceDescription, 1);
+ Y_ABORT_UNLESS(externalDataSourceInfo);
+
+ AddPathInSchemeShard(result, dstPath, owner);
+ const TPathElement::TPtr externalDataSource =
+ CreateExternalDataSourcePathElement(dstPath);
+ CreateTransaction(context, externalDataSource->PathId);
+
+ NIceDb::TNiceDb db(context.GetDB());
+
+ RegisterParentPathDependencies(context, parentPath);
+
+ AdvanceTransactionStateToPropose(context, db);
+
+ PersistExternalDataSource(context, db, externalDataSource,
+ externalDataSourceInfo, acl);
+
+ IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId,
+ dstPath,
+ context.SS,
+ context.OnComplete);
+
+ UpdatePathSizeCounts(parentPath, dstPath);
SetState(NextState());
return result;
@@ -313,19 +283,18 @@ public:
void AbortPropose(TOperationContext& context) override {
LOG_N("TCreateExternalDataSource AbortPropose"
- << ": opId# " << OperationId);
+ << ": opId# " << OperationId);
Y_ABORT("no AbortPropose for TCreateExternalDataSource");
}
void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {
LOG_N("TCreateExternalDataSource AbortUnsafe"
- << ": opId# " << OperationId
- << ", txId# " << forceDropTxId);
+ << ": opId# " << OperationId << ", txId# " << forceDropTxId);
context.OnComplete.DoneOperation(OperationId);
}
};
-}
+} // namespace
namespace NKikimr::NSchemeShard {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp
index 10a46d30a8..a82af9859b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp
@@ -1,192 +1,55 @@
+#include "schemeshard__operation_common_external_table.h"
#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"
-#include <ydb/core/base/subdomain.h>
-
-#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
-#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
-
+#include <utility>
namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-constexpr uint32_t MAX_FIELD_SIZE = 1000;
-constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB
-
-bool ValidateSourceType(const TString& sourceType, TString& errStr) {
- // Only object storage supported today
- if (sourceType != "ObjectStorage") {
- errStr = "Only ObjectStorage source type supported but got " + sourceType;
- return false;
- }
- return true;
-}
-
-bool ValidateLocation(const TString& location, TString& errStr) {
- if (!location) {
- errStr = "Location must not be empty";
- return false;
- }
- if (location.Size() > MAX_FIELD_SIZE) {
- errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size());
- return false;
- }
- return true;
-}
-
-bool ValidateContent(const TString& content, TString& errStr) {
- if (content.Size() > MAX_PROTOBUF_SIZE) {
- errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size());
- return false;
- }
- return true;
-}
-
-bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) {
- if (!dataSourcePath) {
- errStr = "Data source path must not be empty";
- return false;
- }
- return true;
-}
-
-bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) {
- return ValidateSourceType(sourceType, errStr)
- && ValidateLocation(desc.GetLocation(), errStr)
- && ValidateContent(desc.GetContent(), errStr)
- && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr);
-}
-
-Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) {
- Ydb::Type ydbType;
- if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
- auto* typeDesc = typeInfo.GetTypeDesc();
- auto* pg = ydbType.mutable_pg_type();
- pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
- pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
- } else {
- auto& item = notNull
- ? ydbType
- : *ydbType.mutable_optional_type()->mutable_item();
- item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
- }
- return ydbType;
-}
-
-TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) {
- if (!Validate(sourceType, desc, errStr)) {
- return nullptr;
- }
+class TPropose: public TSubOperationState {
+private:
+ const TOperationId OperationId;
- if (!desc.ColumnsSize()) {
- errStr = "The schema must have at least one column";
- return nullptr;
+ TString DebugHint() const override {
+ return TStringBuilder()
+ << "TCreateExternalTable TPropose"
+ << ", operationId: " << OperationId;
}
- TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo;
- const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
-
- if (desc.GetSourceType() != "General") {
- errStr = "Only general data source has been supported as request";
- return nullptr;
+ void IncrementExternalTableCounter(const TOperationContext& context) const {
+ context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Add(1);
}
- externalTableInfo->DataSourcePath = desc.GetDataSourcePath();
- externalTableInfo->Location = desc.GetLocation();
- externalTableInfo->AlterVersion = 1;
- externalTableInfo->SourceType = sourceType;
-
- NKikimrExternalSources::TSchema schema;
- uint64_t nextColumnId = 1;
- for (const auto& col : desc.GetColumns()) {
- TString colName = col.GetName();
-
- if (!colName) {
- errStr = "Columns cannot have an empty name";
- return nullptr;
- }
-
- if (col.HasTypeId()) {
- errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type";
- return nullptr;
- }
-
- if (!col.HasType()) {
- errStr = TStringBuilder() << "Missing Type for column '" << colName << "'";
- return nullptr;
- }
-
- auto typeName = NMiniKQL::AdaptLegacyYqlType(col.GetType());
- const NScheme::IType* type = typeRegistry->GetType(typeName);
-
- NScheme::TTypeInfo typeInfo;
- if (type) {
- // Only allow YQL types
- if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
- errStr = Sprintf("Type '%s' specified for column '%s' is no longer supported", col.GetType().data(), colName.data());
- return nullptr;
- }
- typeInfo = NScheme::TTypeInfo(type->GetTypeId());
- } else {
- auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
- if (!typeDesc) {
- errStr = Sprintf("Type '%s' specified for column '%s' is not supported by storage", col.GetType().data(), colName.data());
- return nullptr;
- }
- typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
- }
-
- ui32 colId = col.HasId() ? col.GetId() : nextColumnId;
- if (externalTableInfo->Columns.contains(colId)) {
- errStr = Sprintf("Duplicate column id: %" PRIu32, colId);
- return nullptr;
- }
-
- nextColumnId = colId + 1 > nextColumnId ? colId + 1 : nextColumnId;
-
- TTableInfo::TColumn& column = externalTableInfo->Columns[colId];
- column = TTableInfo::TColumn(colName, colId, typeInfo, "", col.GetNotNull()); // TODO: do we need typeMod here?
-
- auto& schemaColumn= *schema.add_column();
- schemaColumn.set_name(colName);
- *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull());
+ void SetAndPersistCreateStep(const TOperationContext& context,
+ NIceDb::TNiceDb& db,
+ const TPath& path,
+ const TPathId& pathId,
+ const TStepId& step) const {
+ path->StepCreated = step;
+ context.SS->PersistCreateStep(db, pathId, step);
}
- try {
- NKikimrExternalSources::TGeneral general;
- general.ParseFromStringOrThrow(desc.GetContent());
- auto source = factory->GetOrCreate(sourceType);
- if (!source->HasExternalTable()) {
- errStr = TStringBuilder{} << "External table isn't supported for " << sourceType;
- return nullptr;
- }
- externalTableInfo->Content = source->Pack(schema, general);
- } catch (...) {
- errStr = CurrentExceptionMessage();
- return nullptr;
+ void ClearDescribePathCaches(const TOperationContext& context,
+ const TPathElement::TPtr& pathPtr,
+ const TPathElement::TPtr& dataSourcePathPtr) const {
+ context.SS->ClearDescribePathCaches(pathPtr);
+ context.SS->ClearDescribePathCaches(dataSourcePathPtr);
}
- return externalTableInfo;
-}
-
-class TPropose: public TSubOperationState {
-private:
- const TOperationId OperationId;
-
- TString DebugHint() const override {
- return TStringBuilder()
- << "TCreateExternalTable TPropose"
- << ", operationId: " << OperationId;
+ void PublishToSchemeBoard(const TOperationContext& context,
+ const TPathId& pathId,
+ const TPathId& dataSourcePathId) const {
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId);
}
public:
explicit TPropose(TOperationId id)
- : OperationId(id)
- {
- }
+ : OperationId(std::move(id)) { }
bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
const TStepId step = TStepId(ev->Get()->StepId);
@@ -194,29 +57,27 @@ public:
LOG_I(DebugHint() << " HandleReply TEvOperationPlan"
<< ": step# " << step);
- TTxState* txState = context.SS->FindTx(OperationId);
+ const TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalTable);
- auto pathId = txState->TargetPathId;
- auto dataSourcePathId = txState->SourcePathId;
- auto path = TPath::Init(pathId, context.SS);
- TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
- TPathElement::TPtr dataSourcePathPtr = context.SS->PathsById.at(dataSourcePathId);
+ const auto pathId = txState->TargetPathId;
+ const auto dataSourcePathId = txState->SourcePathId;
+ const auto path = TPath::Init(pathId, context.SS);
+ const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
+ const TPathElement::TPtr dataSourcePathPtr =
+ context.SS->PathsById.at(dataSourcePathId);
- context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Add(1);
+ IncrementExternalTableCounter(context);
NIceDb::TNiceDb db(context.GetDB());
- path->StepCreated = step;
- context.SS->PersistCreateStep(db, pathId, step);
+ SetAndPersistCreateStep(context, db, path, pathId, step);
IncParentDirAlterVersionWithRepublish(OperationId, path, context);
- context.SS->ClearDescribePathCaches(pathPtr);
- context.SS->ClearDescribePathCaches(dataSourcePathPtr);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
- context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId);
+ ClearDescribePathCaches(context, pathPtr, dataSourcePathPtr);
+ PublishToSchemeBoard(context, pathId, dataSourcePathId);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
return true;
@@ -225,7 +86,7 @@ public:
bool ProgressState(TOperationContext& context) override {
LOG_I(DebugHint() << " ProgressState");
- TTxState* txState = context.SS->FindTx(OperationId);
+ const TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateExternalTable);
@@ -236,6 +97,7 @@ public:
class TCreateExternalTable: public TSubOperation {
+private:
static TTxState::ETxState NextState() {
return TTxState::Propose;
}
@@ -262,138 +124,152 @@ class TCreateExternalTable: public TSubOperation {
}
}
-public:
- using TSubOperation::TSubOperation;
-
- THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
- const auto ssId = context.SS->SelfTabletId();
-
- const auto acceptExisted = !Transaction.GetFailOnExist();
- const TString& parentPathStr = Transaction.GetWorkingDir();
- const auto& externalTableDescription = Transaction.GetCreateExternalTable();
- const TString& name = externalTableDescription.GetName();
-
- LOG_N("TCreateExternalTable Propose"
- << ": opId# " << OperationId
- << ", path# " << parentPathStr << "/" << name);
-
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId));
-
- TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
- {
- auto checks = parentPath.Check();
+ static bool IsDestinationPathValid(const THolder<TProposeResponse>& result,
+ const TPath& dstPath,
+ const TString& acl,
+ bool acceptExisted) {
+ const auto checks = dstPath.Check();
+ checks.IsAtLocalSchemeShard();
+ if (dstPath.IsResolved()) {
checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
.IsResolved()
- .NotDeleted()
.NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
-
- if (!checks) {
- result->SetError(checks.GetStatus(), checks.GetError());
- return result;
- }
+ .FailOnExist(TPathElement::EPathType::EPathTypeExternalTable, acceptExisted);
+ } else {
+ checks
+ .NotEmpty()
+ .NotResolved();
}
- const TString acl = Transaction.GetModifyACL().GetDiffACL();
+ if (checks) {
+ checks
+ .IsValidLeafName()
+ .DepthLimit()
+ .PathsLimit()
+ .DirChildrenLimit()
+ .IsValidACL(acl);
+ }
- TPath dstPath = parentPath.Child(name);
- {
- auto checks = dstPath.Check();
- checks.IsAtLocalSchemeShard();
+ if (!checks) {
+ result->SetError(checks.GetStatus(), checks.GetError());
if (dstPath.IsResolved()) {
- checks
- .IsResolved()
- .NotUnderDeleting()
- .FailOnExist(TPathElement::EPathType::EPathTypeExternalTable, acceptExisted);
- } else {
- checks
- .NotEmpty()
- .NotResolved();
+ result->SetPathCreateTxId(static_cast<ui64>(dstPath.Base()->CreateTxId));
+ result->SetPathId(dstPath.Base()->PathId.LocalPathId);
}
+ }
- if (checks) {
- checks
- .IsValidLeafName()
- .DepthLimit()
- .PathsLimit()
- .DirChildrenLimit()
- .IsValidACL(acl);
- }
+ return static_cast<bool>(checks);
+ }
- if (!checks) {
- result->SetError(checks.GetStatus(), checks.GetError());
- if (dstPath.IsResolved()) {
- result->SetPathCreateTxId(ui64(dstPath.Base()->CreateTxId));
- result->SetPathId(dstPath.Base()->PathId.LocalPathId);
- }
- return result;
- }
+ static bool IsDataSourcePathValid(const THolder<TProposeResponse>& result, const TPath& dataSourcePath) {
+ const auto checks = dataSourcePath.Check();
+ checks
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsExternalDataSource()
+ .NotUnderOperation();
+
+ if (!checks) {
+ result->SetError(checks.GetStatus(), checks.GetError());
}
- TPath dataSourcePath = TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS);
- {
- auto checks = dataSourcePath.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .IsCommonSensePath()
- .IsExternalDataSource()
- .NotUnderOperation();
+ return static_cast<bool>(checks);
+ }
- if (!checks) {
- result->SetError(checks.GetStatus(), checks.GetError());
- return result;
- }
+ bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result,
+ const TOperationContext& context) const {
+ TString errorMessage;
+ if (!context.SS->CheckApplyIf(Transaction, errorMessage)) {
+ result->SetError(NKikimrScheme::StatusPreconditionFailed, errorMessage);
+ return false;
}
- TExternalDataSourceInfo::TPtr externalDataSource = context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr);
+ return true;
+ }
+
+ static bool IsDataSourceValid(const THolder<TProposeResponse>& result,
+ const TExternalDataSourceInfo::TPtr& externalDataSource) {
if (!externalDataSource) {
result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist");
- return result;
- }
- TString errStr;
- if (!context.SS->CheckApplyIf(Transaction, errStr)) {
- result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
- return result;
+ return false;
}
+ return true;
+ }
- TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalDataSource->SourceType, externalTableDescription, context.SS->ExternalSourceFactory, errStr);
- if (!externalTableInfo) {
- result->SetError(NKikimrScheme::StatusSchemeError, errStr);
- return result;
+ static bool IsExternalTableDescriptionValid(
+ const THolder<TProposeResponse>& result,
+ const TString& sourceType,
+ const NKikimrSchemeOp::TExternalTableDescription& desc) {
+ if (TString errorMessage; !NExternalTable::Validate(sourceType, desc, errorMessage)) {
+ result->SetError(NKikimrScheme::StatusSchemeError, errorMessage);
+ return false;
}
+ return true;
+ }
+
+ static void AddPathInSchemeShard(const THolder<TProposeResponse> &result,
+ TPath &dstPath, const TString &owner) {
dstPath.MaterializeLeaf(owner);
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
+ }
+ TPathElement::TPtr CreateExternalTablePathElement(const TPath& dstPath) const {
TPathElement::TPtr externalTable = dstPath.Base();
- externalTable->CreateTxId = OperationId.GetTxId();
- externalTable->LastTxId = OperationId.GetTxId();
+ externalTable->CreateTxId = OperationId.GetTxId();
+ externalTable->PathType = TPathElement::EPathType::EPathTypeExternalTable;
externalTable->PathState = TPathElement::EPathState::EPathStateCreate;
- externalTable->PathType = TPathElement::EPathType::EPathTypeExternalTable;
+ externalTable->LastTxId = OperationId.GetTxId();
- TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable, externalTable->PathId, dataSourcePath->PathId);
- txState.Shards.clear();
+ return externalTable;
+ }
- NIceDb::TNiceDb db(context.GetDB());
+ void CreateTransaction(const TOperationContext &context,
+ const TPathId &externalTablePathId,
+ const TPathId &externalDataSourcePathId) const {
+ TTxState &txState =
+ context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable,
+ externalTablePathId, externalDataSourcePathId);
+ txState.Shards.clear();
+ }
+ void RegisterParentPathDependencies(const TOperationContext& context,
+ const TPath& parentPath) const {
if (parentPath.Base()->HasActiveChanges()) {
- TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId;
+ const auto parentTxId = parentPath.Base()->PlannedToCreate()
+ ? parentPath.Base()->CreateTxId
+ : parentPath.Base()->LastTxId;
context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
}
+ }
+ void AdvanceTransactionStateToPropose(const TOperationContext& context,
+ NIceDb::TNiceDb& db) const {
context.SS->ChangeTxState(db, OperationId, TTxState::Propose);
context.OnComplete.ActivateTx(OperationId);
+ }
+ static void LinkExternalDataSourceWithExternalTable(
+ const TExternalDataSourceInfo::TPtr& externalDataSource,
+ const TPathElement::TPtr& externalTable,
+ const TPath& dstPath) {
auto& reference = *externalDataSource->ExternalTableReferences.AddReferences();
reference.SetPath(dstPath.PathString());
PathIdFromPathId(externalTable->PathId, reference.MutablePathId());
+ }
+
+ void PersistExternalTable(
+ const TOperationContext& context,
+ NIceDb::TNiceDb& db,
+ const TPathElement::TPtr& externalTable,
+ const TExternalTableInfo::TPtr& externalTableInfo,
+ const TPathId& externalDataSourcePathId,
+ const TExternalDataSourceInfo::TPtr& externalDataSource,
+ const TString& acl) const {
context.SS->ExternalTables[externalTable->PathId] = externalTableInfo;
context.SS->IncrementPathDbRefCount(externalTable->PathId);
@@ -404,14 +280,96 @@ public:
context.SS->PersistACL(db, externalTable);
}
- context.SS->PersistExternalDataSource(db, dataSourcePath->PathId, externalDataSource);
+ context.SS->PersistExternalDataSource(db, externalDataSourcePathId, externalDataSource);
context.SS->PersistExternalTable(db, externalTable->PathId, externalTableInfo);
context.SS->PersistTxState(db, OperationId);
+ }
- IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete);
-
+ static void UpdatePathSizeCounts(const TPath& parentPath,
+ const TPath& dstPath) {
dstPath.DomainInfo()->IncPathsInside();
parentPath.Base()->IncAliveChildren();
+ }
+
+public:
+ using TSubOperation::TSubOperation;
+
+ THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
+ const auto ssId = context.SS->SelfTabletId();
+
+ const auto acceptExisted = !Transaction.GetFailOnExist();
+ const TString& parentPathStr = Transaction.GetWorkingDir();
+ const auto& externalTableDescription = Transaction.GetCreateExternalTable();
+ const TString& name = externalTableDescription.GetName();
+
+ LOG_N("TCreateExternalTable Propose"
+ << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name);
+
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted,
+ static_cast<ui64>(OperationId.GetTxId()),
+ static_cast<ui64>(ssId));
+
+ const auto parentPath = TPath::Resolve(parentPathStr, context.SS);
+ RETURN_RESULT_UNLESS(NExternalTable::IsParentPathValid(result, parentPath));
+
+ const TString acl = Transaction.GetModifyACL().GetDiffACL();
+ TPath dstPath = parentPath.Child(name);
+ RETURN_RESULT_UNLESS(IsDestinationPathValid(
+ result, dstPath, acl, acceptExisted));
+
+ const auto dataSourcePath =
+ TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS);
+ RETURN_RESULT_UNLESS(IsDataSourcePathValid(result, dataSourcePath));
+
+ const auto externalDataSource =
+ context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr);
+ RETURN_RESULT_UNLESS(IsDataSourceValid(result, externalDataSource));
+
+ RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
+
+ RETURN_RESULT_UNLESS(IsExternalTableDescriptionValid(result,
+ externalDataSource->SourceType,
+ externalTableDescription));
+
+ auto [externalTableInfo, maybeError] =
+ NExternalTable::CreateExternalTable(externalDataSource->SourceType,
+ externalTableDescription,
+ context.SS->ExternalSourceFactory,
+ 1);
+ if (maybeError) {
+ result->SetError(NKikimrScheme::StatusSchemeError, *maybeError);
+ return result;
+ }
+ Y_ABORT_UNLESS(externalTableInfo);
+
+ AddPathInSchemeShard(result, dstPath, owner);
+
+ const auto externalTable = CreateExternalTablePathElement(dstPath);
+ CreateTransaction(context, externalTable->PathId, dataSourcePath->PathId);
+
+ NIceDb::TNiceDb db(context.GetDB());
+
+ RegisterParentPathDependencies(context, parentPath);
+ AdvanceTransactionStateToPropose(context, db);
+
+ LinkExternalDataSourceWithExternalTable(externalDataSource,
+ externalTable,
+ dstPath);
+
+ PersistExternalTable(context,
+ db,
+ externalTable,
+ externalTableInfo,
+ dataSourcePath->PathId,
+ externalDataSource,
+ acl);
+
+ IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId,
+ dstPath,
+ context.SS,
+ context.OnComplete);
+
+ UpdatePathSizeCounts(parentPath, dstPath);
SetState(NextState());
return result;
@@ -441,7 +399,7 @@ ISubOperation::TPtr CreateNewExternalTable(TOperationId id, const TTxTransaction
ISubOperation::TPtr CreateNewExternalTable(TOperationId id, TTxState::ETxState state) {
Y_ABORT_UNLESS(state != TTxState::Invalid);
- return MakeSubOperation<TCreateExternalTable>(id, state);
+ return MakeSubOperation<TCreateExternalTable>(std::move(id), state);
}
}
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index d1e8f55992..3d2a5004cc 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -94,6 +94,8 @@ SRCS(
schemeshard__operation_cancel_tx.cpp
schemeshard__operation_common.cpp
schemeshard__operation_common.h
+ schemeshard__operation_common_external_data_source.cpp
+ schemeshard__operation_common_external_table.cpp
schemeshard__operation_common_subdomain.h
schemeshard__operation_consistent_copy_tables.cpp
schemeshard__operation_copy_table.cpp