diff options
author | hcpp <[email protected]> | 2023-02-17 17:45:06 +0300 |
---|---|---|
committer | hcpp <[email protected]> | 2023-02-17 17:45:06 +0300 |
commit | 53b03848d2c7424227e949900d76e6941650326d (patch) | |
tree | bd1fb72912a16b6bcca85fc8b55617217c4bebce | |
parent | 3e7759b1382ed1cfdf47c30c48eeeac42501191d (diff) |
external data source has been added to scheme shard
52 files changed, 2498 insertions, 136 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index b78d5fb48a3..ca9a199ef2f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -370,17 +370,20 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { } Y_UNIT_TEST(TestCreateExternalTable) { + return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); } Y_UNIT_TEST(TestCreateSameExternalTable) { + return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); } Y_UNIT_TEST(TestDropExternalTable) { + return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index cf5067a51c7..0406f2c0641 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -178,6 +178,11 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateExternalTable = 144 [(CounterOpts) = {Name: "InFlightOps/CreateExternalTable"}]; COUNTER_IN_FLIGHT_OPS_TxDropExternalTable = 145 [(CounterOpts) = {Name: "InFlightOps/DropExternalTable"}]; COUNTER_IN_FLIGHT_OPS_TxAlterExternalTable = 146 [(CounterOpts) = {Name: "InFlightOps/AlterExternalTable"}]; + + COUNTER_EXTERNAL_DATA_SOURCE_COUNT = 147 [(CounterOpts) = {Name: "ExternalDataSourceCount"}]; + COUNTER_IN_FLIGHT_OPS_TxCreateExternalDataSource = 148 [(CounterOpts) = {Name: "InFlightOps/CreateExternalDataSource"}]; + COUNTER_IN_FLIGHT_OPS_TxDropExternalDataSource = 149 [(CounterOpts) = {Name: "InFlightOps/DropExternalDataSource"}]; + COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}]; } enum ECumulativeCounters { @@ -288,6 +293,10 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateExternalTable = 88 [(CounterOpts) = {Name: "FinishedOps/CreateExternalTable"}]; COUNTER_FINISHED_OPS_TxDropExternalTable = 89 [(CounterOpts) = {Name: "FinishedOps/DropExternalTable"}]; COUNTER_FINISHED_OPS_TxAlterExternalTable = 90 [(CounterOpts) = {Name: "FinishedOps/AlterExternalTable"}]; + + COUNTER_FINISHED_OPS_TxCreateExternalDataSource = 91 [(CounterOpts) = {Name: "FinishedOps/CreateExternalDataSource"}]; + COUNTER_FINISHED_OPS_TxDropExternalDataSource = 92 [(CounterOpts) = {Name: "FinishedOps/DropExternalDataSource"}]; + COUNTER_FINISHED_OPS_TxAlterExternalDataSource = 93 [(CounterOpts) = {Name: "FinishedOps/AlterExternalDataSource"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ae6158d65a4..3d6a0be1d8d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1302,6 +1302,11 @@ enum EOperationType { ESchemeOpCreateExternalTable = 86; ESchemeOpDropExternalTable = 87; ESchemeOpAlterExternalTable = 88; + + // External Data Source + ESchemeOpCreateExternalDataSource = 89; + ESchemeOpDropExternalDataSource = 90; + ESchemeOpAlterExternalDataSource = 91; } message TApplyIf { @@ -1412,6 +1417,8 @@ message TModifyScheme { optional TPersQueueGroupDeallocate DeallocatePersQueueGroup = 57; optional TExternalTableDescription CreateExternalTable = 58; + + optional TExternalDataSourceDescription CreateExternalDataSource = 59; } // "Script", used by client to parse text files with multiple DDL commands @@ -1468,6 +1475,7 @@ enum EPathType { EPathTypeReplication = 16; EPathTypeBlobDepot = 17; EPathTypeExternalTable = 18; + EPathTypeExternalDataSource = 19; } enum EPathSubType { @@ -1520,6 +1528,7 @@ message TPathVersion { optional uint64 ReplicationVersion = 25; optional uint64 BlobDepotVersion = 26; optional uint64 ExternalTableVersion = 27; + optional uint64 ExternalDataSourceVersion = 28; } // Describes single path @@ -1606,6 +1615,7 @@ message TPathDescription { optional TReplicationDescription ReplicationDescription = 25; optional TBlobDepotDescription BlobDepotDescription = 26; optional TExternalTableDescription ExternalTableDescription = 27; + optional TExternalDataSourceDescription ExternalDataSourceDescription = 28; } // For persisting AlterTable Tx description in Schemeshard internal DB @@ -1660,3 +1670,31 @@ message TExternalTableDescription { repeated TColumnDescription Columns = 7; optional bytes Content = 8; } + +// Access without authorization +message NoneAuth { +} + +message TAuth { + oneof identity { + NoneAuth None = 3; + } +} + +message TExternalTableReferences { + message TReference { + optional string Path = 1; + optional NKikimrProto.TPathID PathId = 2; + } + repeated TReference References = 1; +} + +message TExternalDataSourceDescription { + optional string Name = 1; + optional NKikimrProto.TPathID PathId = 2; + optional uint64 Version = 3; + optional string SourceType = 4; + optional string Location = 5; + optional string Installation = 6; + optional TAuth Auth = 7; +} diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index e0ab9b6470f..2360d09a3f8 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -737,6 +737,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { ReplicationInfo.Drop(); BlobDepotInfo.Drop(); ExternalTableInfo.Drop(); + ExternalDataSourceInfo.Drop(); } void FillTableInfo(const NKikimrSchemeOp::TPathDescription& pathDesc) { @@ -1166,6 +1167,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { DESCRIPTION_PART(ReplicationInfo); DESCRIPTION_PART(BlobDepotInfo); DESCRIPTION_PART(ExternalTableInfo); + DESCRIPTION_PART(ExternalDataSourceInfo); #undef DESCRIPTION_PART @@ -1476,6 +1478,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { Kind = TNavigate::KindExternalTable; FillInfo(Kind, ExternalTableInfo, std::move(*pathDesc.MutableExternalTableDescription())); break; + case NKikimrSchemeOp::EPathTypeExternalDataSource: + Kind = TNavigate::KindExternalDataSource; + FillInfo(Kind, ExternalDataSourceInfo, std::move(*pathDesc.MutableExternalDataSourceDescription())); + break; case NKikimrSchemeOp::EPathTypeInvalid: Y_VERIFY_DEBUG(false, "Invalid path type"); break; @@ -1536,6 +1542,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { case NKikimrSchemeOp::EPathTypeExternalTable: ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindExternalTable); break; + case NKikimrSchemeOp::EPathTypeExternalDataSource: + ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindExternalDataSource); + break; case NKikimrSchemeOp::EPathTypeTableIndex: case NKikimrSchemeOp::EPathTypeInvalid: Y_VERIFY_DEBUG(false, "Invalid path type"); @@ -1745,6 +1754,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { entry.ReplicationInfo = ReplicationInfo; entry.BlobDepotInfo = BlobDepotInfo; entry.ExternalTableInfo = ExternalTableInfo; + entry.ExternalDataSourceInfo = ExternalDataSourceInfo; } bool CheckColumns(TResolveContext* context, TResolve::TEntry& entry, @@ -2022,6 +2032,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { // ExternalTable specific TIntrusivePtr<TNavigate::TExternalTableInfo> ExternalTableInfo; + // ExternalDataSource specific + TIntrusivePtr<TNavigate::TExternalDataSourceInfo> ExternalDataSourceInfo; + }; // TCacheItem struct TMerger { diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 913cc74f7e7..2da2c173d29 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -130,6 +130,7 @@ struct TSchemeCacheNavigate { KindReplication = 15, KindBlobDepot = 16, KindExternalTable = 17, + KindExternalDataSource = 17, }; struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> { @@ -217,6 +218,11 @@ struct TSchemeCacheNavigate { NKikimrSchemeOp::TExternalTableDescription Description; }; + struct TExternalDataSourceInfo : public TAtomicRefCount<TExternalDataSourceInfo> { + EKind Kind = KindUnknown; + NKikimrSchemeOp::TExternalDataSourceDescription Description; + }; + struct TEntry { enum class ERequestType : ui8 { ByPath, @@ -263,6 +269,7 @@ struct TSchemeCacheNavigate { TIntrusiveConstPtr<TReplicationInfo> ReplicationInfo; TIntrusiveConstPtr<TBlobDepotInfo> BlobDepotInfo; TIntrusiveConstPtr<TExternalTableInfo> ExternalTableInfo; + TIntrusiveConstPtr<TExternalDataSourceInfo> ExternalDataSourceInfo; TString ToString() const; TString ToString(const NScheme::TTypeRegistry& typeRegistry) const; diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt index 9db71ca37fd..abd0c5b9b21 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt @@ -17,6 +17,8 @@ add_subdirectory(ut_cdc_stream_reboots) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) +add_subdirectory(ut_external_data_source) +add_subdirectory(ut_external_data_source_reboots) add_subdirectory(ut_external_table) add_subdirectory(ut_external_table_reboots) add_subdirectory(ut_extsubdomain) @@ -138,6 +140,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp @@ -156,6 +159,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index 49c3a0454e8..b24c64ba3a9 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -17,6 +17,8 @@ add_subdirectory(ut_cdc_stream_reboots) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) +add_subdirectory(ut_external_data_source) +add_subdirectory(ut_external_data_source_reboots) add_subdirectory(ut_external_table) add_subdirectory(ut_external_table_reboots) add_subdirectory(ut_extsubdomain) @@ -139,6 +141,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp @@ -157,6 +160,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/CMakeLists.linux.txt index 49c3a0454e8..b24c64ba3a9 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux.txt @@ -17,6 +17,8 @@ add_subdirectory(ut_cdc_stream_reboots) add_subdirectory(ut_compaction) add_subdirectory(ut_export) add_subdirectory(ut_export_reboots_s3) +add_subdirectory(ut_external_data_source) +add_subdirectory(ut_external_data_source_reboots) add_subdirectory(ut_external_table) add_subdirectory(ut_external_table_reboots) add_subdirectory(ut_extsubdomain) @@ -139,6 +141,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_fs.cpp @@ -157,6 +160,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_subdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_extsubdomain.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 9a9fa7b8ee3..afc3caa417f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -1906,6 +1906,32 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } + // Externel Data Source + { + auto rowset = db.Table<Schema::ExternalDataSource>().Range().Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + TOwnerId ownerPathId = rowset.GetValue<Schema::ExternalDataSource::OwnerPathId>(); + TLocalPathId localPathId = rowset.GetValue<Schema::ExternalDataSource::LocalPathId>(); + TPathId pathId(ownerPathId, localPathId); + + auto& externalDataSource = Self->ExternalDataSources[pathId] = new TExternalDataSourceInfo(); + externalDataSource->AlterVersion = rowset.GetValue<Schema::ExternalDataSource::AlterVersion>(); + externalDataSource->SourceType = rowset.GetValue<Schema::ExternalDataSource::SourceType>(); + externalDataSource->Location = rowset.GetValue<Schema::ExternalDataSource::Location>(); + externalDataSource->Installation = rowset.GetValue<Schema::ExternalDataSource::Installation>(); + Y_PROTOBUF_SUPPRESS_NODISCARD externalDataSource->Auth.ParseFromString(rowset.GetValue<Schema::ExternalDataSource::Auth>()); + Y_PROTOBUF_SUPPRESS_NODISCARD externalDataSource->ExternalTableReferences.ParseFromString(rowset.GetValue<Schema::ExternalDataSource::ExternalTableReferences>()); + + Self->IncrementPathDbRefCount(pathId); + + if (!rowset.Next()) + return false; + } + } + // Read table columns { TColumnRows columnRows; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index a2e1d72ecb3..77349b8361a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -769,6 +769,9 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: create.MutableCreateExternalTable()->SetName(name); break; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + create.MutableCreateExternalDataSource()->SetName(name); + break; default: Y_UNREACHABLE(); } @@ -1019,6 +1022,12 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: return CreateDropExternalTable(NextPartId(), txState); case TTxState::ETxType::TxAlterExternalTable: Y_FAIL("TODO: implement"); + case TTxState::ETxType::TxCreateExternalDataSource: + return CreateNewExternalDataSource(NextPartId(), txState); + case TTxState::ETxType::TxDropExternalDataSource: + return CreateDropExternalDataSource(NextPartId(), txState); + case TTxState::ETxType::TxAlterExternalDataSource: + Y_FAIL("TODO: implement"); case TTxState::ETxType::TxInvalid: Y_UNREACHABLE(); } @@ -1227,6 +1236,14 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op return CreateDropExternalTable(NextPartId(), tx); case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable: Y_FAIL("TODO: implement"); + + // ExternalDataSource + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + return CreateNewExternalDataSource(NextPartId(), tx); + case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource: + return CreateDropExternalDataSource(NextPartId(), tx); + case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: + Y_FAIL("TODO: implement"); } Y_UNREACHABLE(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp new file mode 100644 index 00000000000..c15a012a63a --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp @@ -0,0 +1,321 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include <ydb/core/base/subdomain.h> + +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +constexpr uint32_t MAX_FIELD_SIZE = 1000; +constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB + +bool ValidateSourceType(const TString& sourceType, TString& errStr) { + // Only object storage supported today + if (sourceType != "ObjectStorage") { + errStr = "Only ObjectStorage source type supported but got " + sourceType; + return false; + } + return true; +} + +bool ValidateLocationAndInstallation(const TString& location, const TString& installation, TString& errStr) { + if (!location && !installation) { + errStr = "Location or installation must not be empty"; + return false; + } + if (location.Size() > MAX_FIELD_SIZE) { + errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); + return false; + } + if (installation.Size() > MAX_FIELD_SIZE) { + errStr = Sprintf("Maximum length of installation must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, installation.Size()); + return false; + } + return true; +} + +bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, TString& errStr) { + if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) { + errStr = Sprintf("Maximum size of authorization information must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, auth.ByteSizeLong()); + return false; + } + switch (auth.identity_case()) { + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: { + errStr = "Authorization method not specified"; + return false; + } + case NKikimrSchemeOp::TAuth::kNone: { + return true; + } + } + return false; +} + +bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, TString& errStr) { + return ValidateSourceType(desc.GetSourceType(), errStr) + && ValidateLocationAndInstallation(desc.GetLocation(), desc.GetInstallation(), errStr) + && ValidateAuth(desc.GetAuth(), errStr); +} + +TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, TString& errStr) { + if (!Validate(desc, errStr)) { + return nullptr; + } + TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo; + externalDataSoureInfo->SourceType = desc.GetSourceType(); + externalDataSoureInfo->Location = desc.GetLocation(); + externalDataSoureInfo->Installation = desc.GetInstallation(); + externalDataSoureInfo->AlterVersion = 1; + externalDataSoureInfo->Auth.CopyFrom(desc.GetAuth()); + return externalDataSoureInfo; +} + +class TPropose: public TSubOperationState { +private: + const TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCreateExternalDataSource TPropose" + << ", operationId: " << OperationId; + } + +public: + explicit TPropose(TOperationId id) + : OperationId(id) + { + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + const TStepId step = TStepId(ev->Get()->StepId); + + LOG_I(DebugHint() << "HandleReply TEvOperationPlan" + << ": step# " << step); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxCreateExternalDataSource); + + auto pathId = txState->TargetPathId; + auto path = TPath::Init(pathId, context.SS); + TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + + context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1); + + NIceDb::TNiceDb db(context.GetDB()); + + path->StepCreated = step; + context.SS->PersistCreateStep(db, pathId, step); + + IncParentDirAlterVersionWithRepublish(OperationId, path, context); + + context.SS->ClearDescribePathCaches(pathPtr); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + LOG_I(DebugHint() << "ProgressState"); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxCreateExternalDataSource); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + + +class TCreateExternalDataSource: public TSubOperation { + static TTxState::ETxState NextState() { + return TTxState::Propose; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Waiting: + case TTxState::Propose: + return MakeHolder<TPropose>(OperationId); + case TTxState::Done: + return MakeHolder<TDone>(OperationId); + default: + return nullptr; + } + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override { + const auto ssId = context.SS->SelfTabletId(); + + const auto acceptExisted = !Transaction.GetFailOnExist(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + const auto& externalDataSoureDescription = Transaction.GetCreateExternalDataSource(); + const TString& name = externalDataSoureDescription.GetName(); + + + LOG_N("TCreateExternalDataSource Propose" + << ": opId# " << OperationId + << ", path# " << parentPathStr << "/" << name); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); + + TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + { + auto checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + + TPath dstPath = parentPath.Child(name); + { + auto checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { + checks + .IsResolved() + .NotUnderDeleting() + .FailOnExist(TPathElement::EPathType::EPathTypeExternalDataSource, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks) { + checks + .IsValidLeafName() + .DepthLimit() + .PathsLimit() + .DirChildrenLimit() + .IsValidACL(acl); + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(ui64(dstPath.Base()->CreateTxId)); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + } + return result; + } + } + + TString errStr; + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + TExternalDataSourceInfo::TPtr externalDataSoureInfo = CreateExternalDataSource(externalDataSoureDescription, errStr); + if (!externalDataSoureInfo) { + result->SetError(NKikimrScheme::StatusSchemeError, errStr); + return result; + } + + dstPath.MaterializeLeaf(owner); + result->SetPathId(dstPath.Base()->PathId.LocalPathId); + + TPathElement::TPtr externalDataSoure = dstPath.Base(); + externalDataSoure->CreateTxId = OperationId.GetTxId(); + externalDataSoure->LastTxId = OperationId.GetTxId(); + externalDataSoure->PathState = TPathElement::EPathState::EPathStateCreate; + externalDataSoure->PathType = TPathElement::EPathType::EPathTypeExternalDataSource; + + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalDataSource, externalDataSoure->PathId); + txState.Shards.clear(); + + NIceDb::TNiceDb db(context.GetDB()); + + if (parentPath.Base()->HasActiveChanges()) { + TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + + context.SS->ExternalDataSources[externalDataSoure->PathId] = externalDataSoureInfo; + context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Add(1); + context.SS->IncrementPathDbRefCount(externalDataSoure->PathId); + + context.SS->PersistPath(db, externalDataSoure->PathId); + + if (!acl.empty()) { + externalDataSoure->ApplyACL(acl); + context.SS->PersistACL(db, externalDataSoure); + } + + context.SS->PersistExternalDataSource(db, externalDataSoure->PathId, externalDataSoureInfo); + context.SS->PersistTxState(db, OperationId); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete); + + dstPath.DomainInfo()->IncPathsInside(); + parentPath.Base()->IncAliveChildren(); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TCreateExternalDataSource AbortPropose" + << ": opId# " << OperationId); + Y_FAIL("no AbortPropose for TCreateExternalDataSource"); + } + + void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { + LOG_N("TCreateExternalDataSource AbortUnsafe" + << ": opId# " << OperationId + << ", txId# " << forceDropTxId); + context.OnComplete.DoneOperation(OperationId); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, const TTxTransaction& tx) { + return MakeSubOperation<TCreateExternalDataSource>(id, tx); +} + +ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, TTxState::ETxState state) { + Y_VERIFY(state != TTxState::Invalid); + return MakeSubOperation<TCreateExternalDataSource>(id, state); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp index 4529e8c41bf..cb1af0c455d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp @@ -13,6 +13,53 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; +constexpr uint32_t MAX_FIELD_SIZE = 1000; +constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB + +bool ValidateSourceType(const TString& sourceType, TString& errStr) { + // Only object storage supported today + if (sourceType != "ObjectStorage") { + errStr = "Only ObjectStorage source type supported but got " + sourceType; + return false; + } + return true; +} + +bool ValidateLocation(const TString& location, TString& errStr) { + if (!location) { + errStr = "Location must not be empty"; + return false; + } + if (location.Size() > MAX_FIELD_SIZE) { + errStr = Sprintf("Maximum length of location must be less or equal equal to %u but got %lu", MAX_FIELD_SIZE, location.Size()); + return false; + } + return true; +} + +bool ValidateContent(const TString& content, TString& errStr) { + if (content.Size() > MAX_PROTOBUF_SIZE) { + errStr = Sprintf("Maximum size of content must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, content.Size()); + return false; + } + return true; +} + +bool ValidateDataSourcePath(const TString& dataSourcePath, TString& errStr) { + if (!dataSourcePath) { + errStr = "Data source path must not be empty"; + return false; + } + return true; +} + +bool Validate(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { + return ValidateSourceType(sourceType, errStr) + && ValidateLocation(desc.GetLocation(), errStr) + && ValidateContent(desc.GetContent(), errStr) + && ValidateDataSourcePath(desc.GetDataSourcePath(), errStr); +} + bool IsAllowedType(ui32 typeId) { if (!NScheme::NTypeIds::IsYqlType(typeId)) { return false; @@ -30,7 +77,16 @@ bool IsAllowedType(ui32 typeId) { return true; } -TExternalTableInfo::TPtr CreateExternalTable(const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { +TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { + if (!Validate(sourceType, desc, errStr)) { + return nullptr; + } + + if (!desc.ColumnsSize()) { + errStr = "The schema must have at least one column"; + return nullptr; + } + TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo; const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; @@ -38,6 +94,7 @@ TExternalTableInfo::TPtr CreateExternalTable(const NKikimrSchemeOp::TExternalTab externalTableInfo->Location = desc.GetLocation(); externalTableInfo->AlterVersion = 1; externalTableInfo->Content = desc.GetContent(); + externalTableInfo->SourceType = sourceType; uint64_t nextColumnId = 1; for (const auto& col : desc.GetColumns()) { @@ -102,7 +159,7 @@ TExternalTableInfo::TPtr CreateExternalTable(const NKikimrSchemeOp::TExternalTab class TPropose: public TSubOperationState { private: - TOperationId OperationId; + const TOperationId OperationId; TString DebugHint() const override { return TStringBuilder() @@ -111,7 +168,7 @@ private: } public: - TPropose(TOperationId id) + explicit TPropose(TOperationId id) : OperationId(id) { } @@ -119,7 +176,7 @@ public: bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { const TStepId step = TStepId(ev->Get()->StepId); - LOG_I(DebugHint() << "HandleReply TEvOperationPlan" + LOG_I(DebugHint() << " HandleReply TEvOperationPlan" << ": step# " << step); TTxState* txState = context.SS->FindTx(OperationId); @@ -127,8 +184,10 @@ public: Y_VERIFY(txState->TxType == TTxState::TxCreateExternalTable); auto pathId = txState->TargetPathId; + auto dataSourcePathId = txState->SourcePathId; auto path = TPath::Init(pathId, context.SS); TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId); + TPathElement::TPtr dataSourcePathPtr = context.SS->PathsById.at(dataSourcePathId); context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Add(1); @@ -140,14 +199,16 @@ public: IncParentDirAlterVersionWithRepublish(OperationId, path, context); context.SS->ClearDescribePathCaches(pathPtr); + context.SS->ClearDescribePathCaches(dataSourcePathPtr); context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId); context.SS->ChangeTxState(db, OperationId, TTxState::Done); return true; } bool ProgressState(TOperationContext& context) override { - LOG_I(DebugHint() << "ProgressState"); + LOG_I(DebugHint() << " ProgressState"); TTxState* txState = context.SS->FindTx(OperationId); Y_VERIFY(txState); @@ -197,16 +258,15 @@ public: const auto& externalTableDescription = Transaction.GetCreateExternalTable(); const TString& name = externalTableDescription.GetName(); - LOG_N("TCreateExternalTable Propose" << ": opId# " << OperationId << ", path# " << parentPathStr << "/" << name); auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId)); - NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS); + TPath parentPath = TPath::Resolve(parentPathStr, context.SS); { - NSchemeShard::TPath::TChecker checks = parentPath.Check(); + auto checks = parentPath.Check(); checks .NotUnderDomainUpgrade() .IsAtLocalSchemeShard() @@ -224,9 +284,9 @@ public: const TString acl = Transaction.GetModifyACL().GetDiffACL(); - NSchemeShard::TPath dstPath = parentPath.Child(name); + TPath dstPath = parentPath.Child(name); { - NSchemeShard::TPath::TChecker checks = dstPath.Check(); + auto checks = dstPath.Check(); checks.IsAtLocalSchemeShard(); if (dstPath.IsResolved()) { checks @@ -258,13 +318,37 @@ public: } } + TPath dataSourcePath = TPath::Resolve(externalTableDescription.GetDataSourcePath(), context.SS); + { + auto checks = dataSourcePath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsExternalDataSource() + .NotUnderOperation(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + TExternalDataSourceInfo::TPtr externalDataSource = context.SS->ExternalDataSources.Value(dataSourcePath->PathId, nullptr); + if (!externalDataSource) { + result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist"); + return result; + } TString errStr; if (!context.SS->CheckApplyIf(Transaction, errStr)) { result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); return result; } - TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalTableDescription, errStr); + TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalDataSource->SourceType, externalTableDescription, errStr); if (!externalTableInfo) { result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; @@ -279,7 +363,7 @@ public: externalTable->PathState = TPathElement::EPathState::EPathStateCreate; externalTable->PathType = TPathElement::EPathType::EPathTypeExternalTable; - TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable, externalTable->PathId); + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateExternalTable, externalTable->PathId, dataSourcePath->PathId); txState.Shards.clear(); NIceDb::TNiceDb db(context.GetDB()); @@ -292,6 +376,9 @@ public: context.SS->ChangeTxState(db, OperationId, TTxState::Propose); context.OnComplete.ActivateTx(OperationId); + auto& reference = *externalDataSource->ExternalTableReferences.AddReferences(); + reference.SetPath(externalTableInfo->DataSourcePath); + PathIdFromPathId(externalTable->PathId, reference.MutablePathId()); context.SS->ExternalTables[externalTable->PathId] = externalTableInfo; context.SS->IncrementPathDbRefCount(externalTable->PathId); @@ -302,6 +389,7 @@ public: context.SS->PersistACL(db, externalTable); } + context.SS->PersistExternalDataSource(db, dataSourcePath->PathId, externalDataSource); context.SS->PersistExternalTable(db, externalTable->PathId, externalTableInfo); context.SS->PersistTxState(db, OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp new file mode 100644 index 00000000000..908e62f359d --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_data_source.cpp @@ -0,0 +1,218 @@ +#include "schemeshard__operation_common.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_impl.h" + +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TPropose: public TSubOperationState { + TString DebugHint() const override { + return TStringBuilder() + << "TDropExternalDataSource TPropose" + << " opId# " << OperationId << " "; + } + +public: + explicit TPropose(TOperationId id) + : OperationId(id) + { } + + bool ProgressState(TOperationContext& context) override { + LOG_I(DebugHint() << "ProgressState"); + + const auto* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxDropExternalDataSource); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + const auto step = TStepId(ev->Get()->StepId); + + LOG_I(DebugHint() << "HandleReply TEvOperationPlan" + << ": step# " << step); + + NIceDb::TNiceDb db(context.GetDB()); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + TPathId pathId = txState->TargetPathId; + auto path = context.SS->PathsById.at(pathId); + auto parentDir = context.SS->PathsById.at(path->ParentPathId); + + Y_VERIFY(!path->Dropped()); + path->SetDropped(step, OperationId.GetTxId()); + context.SS->PersistDropStep(db, pathId, step, OperationId); + auto domainInfo = context.SS->ResolveDomainInfo(pathId); + domainInfo->DecPathsInside(); + parentDir->DecAliveChildren(); + + context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Sub(1); + context.SS->PersistRemoveExternalDataSource(db, pathId); + + ++parentDir->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentDir); + context.SS->ClearDescribePathCaches(parentDir); + context.SS->ClearDescribePathCaches(path); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + + return true; + } + +private: + const TOperationId OperationId; +}; // TPropose + +class TDropExternalDataSource: public TSubOperation { + TTxState::ETxState NextState() const { + return TTxState::Propose; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Propose: + return MakeHolder<TPropose>(OperationId); + case TTxState::Done: + return MakeHolder<TDone>(OperationId); + default: + return nullptr; + } + } + +public: + using TSubOperation::TSubOperation; + + THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { + const ui64 ssId = context.SS->TabletID(); + const auto& drop = Transaction.GetDrop(); + + const TString& workingDir = Transaction.GetWorkingDir(); + const TString& name = drop.GetName(); + + LOG_N("TDropExternalDataSource Propose" + << ": opId# " << OperationId + << ", path# " << workingDir << "/" << name); + + auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ssId); + + TPath path = drop.HasId() + ? TPath::Init(context.SS->MakeLocalId(drop.GetId()), context.SS) + : TPath::Resolve(workingDir, context.SS).Dive(name); + { + auto checks = path.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsExternalDataSource() + .NotUnderOperation() + .IsCommonSensePath(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (path.IsResolved() && path.Base()->IsExternalDataSource() && (path.Base()->PlannedToDrop() || path.Base()->Dropped())) { + result->SetPathDropTxId(ui64(path.Base()->DropTxId)); + result->SetPathId(path.Base()->PathId.LocalPathId); + } + return result; + } + } + + TExternalDataSourceInfo::TPtr externalDataSource = context.SS->ExternalDataSources.Value(path->PathId, nullptr); + if (!externalDataSource) { + result->SetError(NKikimrScheme::StatusSchemeError, "Data source doesn't exist"); + return result; + } + if (externalDataSource->ExternalTableReferences.ReferencesSize()) { + result->SetError(NKikimrScheme::StatusSchemeError, "Other entities depend on this data source, please remove them at the beginning: " + externalDataSource->ExternalTableReferences.GetReferences(0).GetPath()); + return result; + } + + TString errStr; + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + const auto pathId = path.Base()->PathId; + result->SetPathId(pathId.LocalPathId); + + auto guard = context.DbGuard(); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + context.MemChanges.GrabPath(context.SS, pathId); + context.MemChanges.GrabPath(context.SS, path->ParentPathId); + context.MemChanges.GrabExternalDataSource(context.SS, pathId); + + context.DbChanges.PersistTxState(OperationId); + context.DbChanges.PersistPath(pathId); + context.DbChanges.PersistPath(path->ParentPathId); + + Y_VERIFY(!context.SS->FindTx(OperationId)); + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropExternalDataSource, path.Base()->PathId); + txState.State = TTxState::Propose; + txState.MinStep = TStepId(1); + + path.Base()->PathState = TPathElement::EPathState::EPathStateDrop; + path.Base()->DropTxId = OperationId.GetTxId(); + path.Base()->LastTxId = OperationId.GetTxId(); + + IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, path, context.SS, context.OnComplete); + + context.OnComplete.ActivateTx(OperationId); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TDropExternalDataSource AbortPropose" + << ": opId# " << OperationId); + } + + void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override { + LOG_N("TDropExternalDataSource AbortUnsafe" + << ": opId# " << OperationId + << ", txId# " << forceDropTxId); + context.OnComplete.DoneOperation(OperationId); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, const TTxTransaction& tx) { + return MakeSubOperation<TDropExternalDataSource>(id, tx); +} + +ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, TTxState::ETxState state) { + Y_VERIFY(state != TTxState::Invalid); + return MakeSubOperation<TDropExternalDataSource>(id, state); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp index 889236ec350..3d3c5e316bf 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_external_table.cpp @@ -44,7 +44,9 @@ public: TTxState* txState = context.SS->FindTx(OperationId); Y_VERIFY(txState); TPathId pathId = txState->TargetPathId; + TPathId dataSourcePathId = txState->SourcePathId; auto path = context.SS->PathsById.at(pathId); + auto dataSourcePath = context.SS->PathsById.at(dataSourcePathId); auto parentDir = context.SS->PathsById.at(path->ParentPathId); Y_VERIFY(!path->Dropped()); @@ -54,17 +56,24 @@ public: domainInfo->DecPathsInside(); parentDir->DecAliveChildren(); + TExternalDataSourceInfo::TPtr externalDataSourceInfo = context.SS->ExternalDataSources.Value(dataSourcePathId, nullptr); + Y_VERIFY(externalDataSourceInfo); + EraseIf(*externalDataSourceInfo->ExternalTableReferences.MutableReferences(), [pathId](const NKikimrSchemeOp::TExternalTableReferences::TReference& reference) { return PathIdFromPathId(reference.GetPathId()) == pathId; }); + context.SS->TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Sub(1); + context.SS->PersistExternalDataSource(db, dataSourcePathId, externalDataSourceInfo); context.SS->PersistRemoveExternalTable(db, pathId); ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); context.SS->ClearDescribePathCaches(path); + context.SS->ClearDescribePathCaches(dataSourcePath); if (!context.SS->DisablePublicationsOfDropping) { context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + context.OnComplete.PublishToSchemeBoard(OperationId, dataSourcePathId); } context.SS->ChangeTxState(db, OperationId, TTxState::Done); @@ -74,7 +83,6 @@ public: private: const TOperationId OperationId; - }; // TPropose class TDropExternalTable: public TSubOperation { @@ -122,7 +130,7 @@ public: ? TPath::Init(context.SS->MakeLocalId(drop.GetId()), context.SS) : TPath::Resolve(workingDir, context.SS).Dive(name); { - TPath::TChecker checks = path.Check(); + auto checks = path.Check(); checks .NotEmpty() .NotUnderDomainUpgrade() @@ -153,18 +161,44 @@ public: const auto pathId = path.Base()->PathId; result->SetPathId(pathId.LocalPathId); + TExternalTableInfo::TPtr externalTableInfo = context.SS->ExternalTables.Value(pathId, nullptr); + if (!externalTableInfo) { + result->SetError(NKikimrScheme::StatusSchemeError, "External table info doesn't exist"); + return result; + } + TPath dataSourcePath = TPath::Resolve(externalTableInfo->DataSourcePath, context.SS); + { + auto checks = dataSourcePath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsExternalDataSource(); + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + auto guard = context.DbGuard(); context.MemChanges.GrabNewTxState(context.SS, OperationId); context.MemChanges.GrabPath(context.SS, pathId); context.MemChanges.GrabPath(context.SS, path->ParentPathId); + context.MemChanges.GrabPath(context.SS, dataSourcePath->PathId); context.MemChanges.GrabExternalTable(context.SS, pathId); + context.MemChanges.GrabExternalDataSource(context.SS, dataSourcePath->PathId); context.DbChanges.PersistTxState(OperationId); context.DbChanges.PersistPath(pathId); context.DbChanges.PersistPath(path->ParentPathId); + context.DbChanges.PersistPath(dataSourcePath->PathId); Y_VERIFY(!context.SS->FindTx(OperationId)); - TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropExternalTable, path.Base()->PathId); + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropExternalTable, path.Base()->PathId, dataSourcePath->PathId); txState.State = TTxState::Propose; txState.MinStep = TStepId(1); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp index da529e7c359..e2dac02352e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.cpp @@ -82,14 +82,14 @@ void TMemoryChanges::GrabLongLock(TSchemeShard* ss, const TPathId& pathId, TTxId LockedPaths.emplace(pathId, lockTxId); // will be restored on UnDo() } -void TMemoryChanges::GrabNewExternalTable(TSchemeShard* ss, const TPathId& pathId) { - GrabNew(pathId, ss->ExternalTables, ExternalTables); -} - void TMemoryChanges::GrabExternalTable(TSchemeShard* ss, const TPathId& pathId) { Grab<TExternalTableInfo>(pathId, ss->ExternalTables, ExternalTables); } +void TMemoryChanges::GrabExternalDataSource(TSchemeShard* ss, const TPathId& pathId) { + Grab<TExternalDataSourceInfo>(pathId, ss->ExternalDataSources, ExternalDataSources); +} + void TMemoryChanges::UnDo(TSchemeShard* ss) { // be aware of the order of grab & undo ops // stack is the best way to manage it right @@ -199,6 +199,16 @@ void TMemoryChanges::UnDo(TSchemeShard* ss) { } ExternalTables.pop(); } + + while (ExternalDataSources) { + const auto& [id, elem] = ExternalDataSources.top(); + if (elem) { + ss->ExternalDataSources[id] = elem; + } else { + ss->ExternalDataSources.erase(id); + } + ExternalDataSources.pop(); + } } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h index 6c9032b12b3..6793b470820 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_memory_changes.h @@ -42,6 +42,9 @@ class TMemoryChanges: public TSimpleRefCount<TMemoryChanges> { using TExternalTableState = std::pair<TPathId, TExternalTableInfo::TPtr>; TStack<TExternalTableState> ExternalTables; + using TExternalDataSourceState = std::pair<TPathId, TExternalDataSourceInfo::TPtr>; + TStack<TExternalDataSourceState> ExternalDataSources; + public: ~TMemoryChanges() = default; @@ -69,9 +72,10 @@ public: void GrabNewLongLock(TSchemeShard* ss, const TPathId& pathId); void GrabLongLock(TSchemeShard* ss, const TPathId& pathId, TTxId lockTxId); - void GrabNewExternalTable(TSchemeShard* ss, const TPathId& pathId); void GrabExternalTable(TSchemeShard* ss, const TPathId& pathId); + void GrabExternalDataSource(TSchemeShard* ss, const TPathId& pathId); + void UnDo(TSchemeShard* ss); }; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 60c503293d9..60c7e295cb9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -371,6 +371,14 @@ ISubOperation::TPtr CreateNewExternalTable(TOperationId id, TTxState::ETxState s ISubOperation::TPtr CreateDropExternalTable(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropExternalTable(TOperationId id, TTxState::ETxState state); +// External Data Source +// Create +ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateNewExternalDataSource(TOperationId id, TTxState::ETxState state); +// Drop +ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateDropExternalDataSource(TOperationId id, TTxState::ETxState state); + /// CDC // Create TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId id, const TTxTransaction& tx, TOperationContext& context); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp index c380d6ceaea..8057e55018c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_upgrade_subdomain.cpp @@ -304,6 +304,7 @@ public: switch (path.Base()->PathType) { case NKikimrSchemeOp::EPathType::EPathTypeDir: case NKikimrSchemeOp::EPathType::EPathTypeExternalTable: + case NKikimrSchemeOp::EPathType::EPathTypeExternalDataSource: Y_VERIFY(!path.Base()->IsRoot()); //no shards break; diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index a65626eef27..6044c00fa81 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -192,6 +192,12 @@ TString DefineUserOperationName(NKikimrSchemeOp::EOperationType type) { return "DROP EXTERNAL TABLE"; case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable: return "ALTER EXTERNAL TABLE"; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + return "CREATE EXTERNAL DATA SOURCE"; + case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource: + return "DROP EXTERNAL DATA SOURCE"; + case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: + return "ALTER EXTERNAL DATA SOURCE"; } Y_FAIL("switch should cover all operation types"); } @@ -460,6 +466,15 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable: // TODO: unimplemented break; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetCreateExternalDataSource().GetName()})); + break; + case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetDrop().GetName()})); + break; + case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalDataSource: + // TODO: unimplemented + break; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 7d02c3cdd4f..8cd87155d11 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1344,6 +1344,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxCreateReplication: case TTxState::TxCreateBlobDepot: case TTxState::TxCreateExternalTable: + case TTxState::TxCreateExternalDataSource: return TPathElement::EPathState::EPathStateCreate; case TTxState::TxAlterPQGroup: case TTxState::TxAlterTable: @@ -1375,6 +1376,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxAlterBlobDepot: case TTxState::TxUpdateMainTableOnIndexMove: case TTxState::TxAlterExternalTable: + case TTxState::TxAlterExternalDataSource: return TPathElement::EPathState::EPathStateAlter; case TTxState::TxDropTable: case TTxState::TxDropPQGroup: @@ -1394,6 +1396,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxDropReplication: case TTxState::TxDropBlobDepot: case TTxState::TxDropExternalTable: + case TTxState::TxDropExternalDataSource: return TPathElement::EPathState::EPathStateDrop; case TTxState::TxBackup: return TPathElement::EPathState::EPathStateBackup; @@ -2611,6 +2614,30 @@ void TSchemeShard::PersistRemoveExternalTable(NIceDb::TNiceDb& db, TPathId pathI db.Table<Schema::ExternalTable>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); } +void TSchemeShard::PersistExternalDataSource(NIceDb::TNiceDb &db, TPathId pathId, const TExternalDataSourceInfo::TPtr externalDataSourceInfo) { + Y_VERIFY(IsLocalId(pathId)); + + db.Table<Schema::ExternalDataSource>().Key(pathId.OwnerId, pathId.LocalPathId).Update( + NIceDb::TUpdate<Schema::ExternalDataSource::AlterVersion>{externalDataSourceInfo->AlterVersion}, + NIceDb::TUpdate<Schema::ExternalDataSource::SourceType>{externalDataSourceInfo->SourceType}, + NIceDb::TUpdate<Schema::ExternalDataSource::Location>{externalDataSourceInfo->Location}, + NIceDb::TUpdate<Schema::ExternalDataSource::Installation>{externalDataSourceInfo->Installation}, + NIceDb::TUpdate<Schema::ExternalDataSource::Auth>{externalDataSourceInfo->Auth.SerializeAsString()}, + NIceDb::TUpdate<Schema::ExternalDataSource::ExternalTableReferences>{externalDataSourceInfo->ExternalTableReferences.SerializeAsString()} + ); +} + +void TSchemeShard::PersistRemoveExternalDataSource(NIceDb::TNiceDb& db, TPathId pathId) +{ + Y_VERIFY(IsLocalId(pathId)); + if (ExternalDataSources.contains(pathId)) { + ExternalDataSources.erase(pathId); + DecrementPathDbRefCount(pathId); + } + + db.Table<Schema::ExternalDataSource>().Key(pathId.OwnerId, pathId.LocalPathId).Delete(); +} + void TSchemeShard::PersistRemoveRtmrVolume(NIceDb::TNiceDb &db, TPathId pathId) { Y_VERIFY(IsLocalId(pathId)); @@ -3849,6 +3876,13 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co generalVersion += result.GetExternalTableVersion(); break; } + case NKikimrSchemeOp::EPathType::EPathTypeExternalDataSource: { + auto it = ExternalDataSources.find(pathId); + Y_VERIFY(it != ExternalDataSources.end()); + result.SetExternalDataSourceVersion(it->second->AlterVersion); + generalVersion += result.GetExternalDataSourceVersion(); + break; + } case NKikimrSchemeOp::EPathType::EPathTypeInvalid: { Y_UNREACHABLE(); @@ -4600,6 +4634,9 @@ void TSchemeShard::UncountNode(TPathElement::TPtr node) { case TPathElement::EPathType::EPathTypeExternalTable: TabletCounters->Simple()[COUNTER_EXTERNAL_TABLE_COUNT].Sub(1); break; + case TPathElement::EPathType::EPathTypeExternalDataSource: + TabletCounters->Simple()[COUNTER_EXTERNAL_DATA_SOURCE_COUNT].Sub(1); + break; case TPathElement::EPathType::EPathTypeInvalid: Y_FAIL("impossible path type"); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index a1d9d959aff..757d7018df6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -210,6 +210,7 @@ public: THashMap<TPathId, TKesusInfo::TPtr> KesusInfos; THashMap<TPathId, TOlapStoreInfo::TPtr> OlapStores; THashMap<TPathId, TExternalTableInfo::TPtr> ExternalTables; + THashMap<TPathId, TExternalDataSourceInfo::TPtr> ExternalDataSources; TTablesStorage ColumnTables; @@ -745,6 +746,10 @@ public: void PersistExternalTable(NIceDb::TNiceDb &db, TPathId pathId, const TExternalTableInfo::TPtr externalTable); void PersistRemoveExternalTable(NIceDb::TNiceDb& db, TPathId pathId); + // ExternalDataSource + void PersistExternalDataSource(NIceDb::TNiceDb &db, TPathId pathId, const TExternalDataSourceInfo::TPtr externalDataSource); + void PersistRemoveExternalDataSource(NIceDb::TNiceDb& db, TPathId pathId); + TTabletId GetGlobalHive(const TActorContext& ctx) const; enum class EHiveSelection : uint8_t { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 30daf3e98f6..023610be4eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2900,6 +2900,17 @@ struct TExternalTableInfo: TSimpleRefCount<TExternalTableInfo> { TString Content; }; +struct TExternalDataSourceInfo: TSimpleRefCount<TExternalDataSourceInfo> { + using TPtr = TIntrusivePtr<TExternalDataSourceInfo>; + + ui64 AlterVersion = 0; + TString SourceType; + TString Location; + TString Installation; + NKikimrSchemeOp::TAuth Auth; + NKikimrSchemeOp::TExternalTableReferences ExternalTableReferences; +}; + bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, const THashMap<ui32, TTableInfo::TColumn>& sourceColumns, const THashMap<ui32, TTableInfo::TColumn>& alterColumns, diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index c68fbbec347..28beabc6d32 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -755,6 +755,19 @@ const TPath::TChecker& TPath::TChecker::IsExternalTable(EStatus status) const { << " (" << BasicPathInfo(Path.Base()) << ")"); } +const TPath::TChecker& TPath::TChecker::IsExternalDataSource(EStatus status) const { + if (Failed) { + return *this; + } + + if (Path.Base()->IsExternalDataSource()) { + return *this; + } + + return Fail(status, TStringBuilder() << "path is not a external data source" + << " (" << BasicPathInfo(Path.Base()) << ")"); +} + const TPath::TChecker& TPath::TChecker::PathShardsLimit(ui64 delta, EStatus status) const { if (Failed) { return *this; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index 172dbcc3af6..88dcd3c5bc5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -86,6 +86,7 @@ public: const TChecker& PQPartitionsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const; const TChecker& PQReservedStorageLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const; const TChecker& IsExternalTable(EStatus status = EStatus::StatusNameConflict) const; + const TChecker& IsExternalDataSource(EStatus status = EStatus::StatusNameConflict) const; }; public: diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 1c96351be54..3e81f0fb918 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -813,6 +813,21 @@ void TPathDescriber::DescribeExternalTable(const TActorContext& ctx, TPathId pat entry->SetContent(externalTableInfo->Content); } +void TPathDescriber::DescribeExternalDataSource(const TActorContext&, TPathId pathId, TPathElement::TPtr pathEl) { + auto it = Self->ExternalDataSources.FindPtr(pathId); + Y_VERIFY(it, "ExternalDataSource is not found"); + TExternalDataSourceInfo::TPtr externalDataSourceInfo = *it; + + auto entry = Result->Record.MutablePathDescription()->MutableExternalDataSourceDescription(); + entry->SetName(pathEl->Name); + PathIdFromPathId(pathId, entry->MutablePathId()); + entry->SetVersion(externalDataSourceInfo->AlterVersion); + entry->SetSourceType(externalDataSourceInfo->SourceType); + entry->SetLocation(externalDataSourceInfo->Location); + entry->SetInstallation(externalDataSourceInfo->Installation); + entry->MutableAuth()->CopyFrom(externalDataSourceInfo->Auth); +} + THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe(const TActorContext& ctx) { TPathId pathId = Params.HasPathId() ? TPathId(Params.GetSchemeshardId(), Params.GetPathId()) : InvalidPathId; TString pathStr = Params.GetPath(); @@ -943,6 +958,9 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> TPathDescriber::Describe case NKikimrSchemeOp::EPathTypeExternalTable: DescribeExternalTable(ctx, base->PathId, base); break; + case NKikimrSchemeOp::EPathTypeExternalDataSource: + DescribeExternalDataSource(ctx, base->PathId, base); + break; case NKikimrSchemeOp::EPathTypeInvalid: Y_UNREACHABLE(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.h b/ydb/core/tx/schemeshard/schemeshard_path_describer.h index 05f44ceb3f0..2e40315cfcd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.h @@ -44,6 +44,7 @@ class TPathDescriber { void DescribeReplication(TPathId pathId, TPathElement::TPtr pathEl); void DescribeBlobDepot(const TPath& path); void DescribeExternalTable(const TActorContext& ctx, TPathId pathId, TPathElement::TPtr pathEl); + void DescribeExternalDataSource(const TActorContext& ctx, TPathId pathId, TPathElement::TPtr pathEl); public: explicit TPathDescriber(TSchemeShard* self, NKikimrSchemeOp::TDescribePath&& params) diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index 91d14a7a132..480c22777fe 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -168,6 +168,10 @@ bool TPathElement::IsExternalTable() const { return PathType == EPathType::EPathTypeExternalTable; } +bool TPathElement::IsExternalDataSource() const { + return PathType == EPathType::EPathTypeExternalDataSource; +} + void TPathElement::SetDropped(TStepId step, TTxId txId) { PathState = EPathState::EPathStateNotExist; StepDropped = step; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.h b/ydb/core/tx/schemeshard/schemeshard_path_element.h index 25edaff694b..fb09051616b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.h @@ -379,6 +379,7 @@ public: bool HasActiveChanges() const; bool IsCreateFinished() const; bool IsExternalTable() const; + bool IsExternalDataSource() const; TVirtualTimestamp GetCreateTS() const; TVirtualTimestamp GetDropTS() const; void SetDropped(TStepId step, TTxId txId); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 509f6a1418f..8e2e2a1879d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1632,6 +1632,20 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<OwnerPathId, LocalPathId, SourceType, DataSourcePath, Location, AlterVersion, Content>; }; + struct ExternalDataSource : Table<105> { + struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; + struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {}; + struct SourceType : Column<4, NScheme::NTypeIds::Utf8> {}; + struct Location : Column<5, NScheme::NTypeIds::Utf8> {}; + struct Installation : Column<6, NScheme::NTypeIds::Utf8> {}; + struct Auth : Column<7, NScheme::NTypeIds::String> {}; + struct ExternalTableReferences : Column<8, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<OwnerPathId, LocalPathId>; + using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, SourceType, Location, Installation, Auth, ExternalTableReferences>; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -1735,7 +1749,8 @@ struct Schema : NIceDb::Schema { ReplicationsAlterData, BlobDepots, CdcStreamScanShardStatus, - ExternalTable + ExternalTable, + ExternalDataSource >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 55055400a60..12b3fddec5e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -123,6 +123,9 @@ struct TTxState { item(TxCreateExternalTable, 77) \ item(TxDropExternalTable, 78) \ item(TxAlterExternalTable, 79) \ + item(TxCreateExternalDataSource, 80) \ + item(TxDropExternalDataSource, 81) \ + item(TxAlterExternalDataSource, 82) \ // TX_STATE_TYPE_ENUM @@ -331,6 +334,7 @@ struct TTxState { case TxCreateReplication: case TxCreateBlobDepot: case TxCreateExternalTable: + case TxCreateExternalDataSource: return true; case TxInitializeBuildIndex: //this is more like alter case TxCreateCdcStreamAtTable: @@ -362,6 +366,7 @@ struct TTxState { case TxDropBlobDepot: case TxUpdateMainTableOnIndexMove: case TxDropExternalTable: + case TxDropExternalDataSource: return false; case TxAlterPQGroup: case TxAlterTable: @@ -391,6 +396,7 @@ struct TTxState { case TxAlterReplication: case TxAlterBlobDepot: case TxAlterExternalTable: + case TxAlterExternalDataSource: return false; case TxMoveTable: case TxMoveTableIndex: @@ -421,6 +427,7 @@ struct TTxState { case TxDropReplication: case TxDropBlobDepot: case TxDropExternalTable: + case TxDropExternalDataSource: return true; case TxMkDir: case TxCreateTable: @@ -453,6 +460,7 @@ struct TTxState { case TxDropCdcStreamAtTableDropSnapshot: case TxUpdateMainTableOnIndexMove: case TxCreateExternalTable: + case TxCreateExternalDataSource: return false; case TxAlterPQGroup: case TxAlterTable: @@ -482,6 +490,7 @@ struct TTxState { case TxAlterReplication: case TxAlterBlobDepot: case TxAlterExternalTable: + case TxAlterExternalDataSource: return false; case TxMoveTable: case TxMoveTableIndex: @@ -516,6 +525,7 @@ struct TTxState { case TxRmDir: case TxFinalizeBuildIndex: case TxDropExternalTable: + case TxDropExternalDataSource: return false; case TxMkDir: case TxCreateTable: @@ -546,6 +556,7 @@ struct TTxState { case TxDropCdcStreamAtTableDropSnapshot: case TxUpdateMainTableOnIndexMove: case TxCreateExternalTable: + case TxCreateExternalDataSource: return false; case TxAlterPQGroup: case TxAlterTable: @@ -576,6 +587,7 @@ struct TTxState { case TxAlterReplication: case TxAlterBlobDepot: case TxAlterExternalTable: + case TxAlterExternalDataSource: return false; case TxInvalid: Y_VERIFY_DEBUG("UNREACHABLE"); @@ -671,6 +683,8 @@ struct TTxState { case NKikimrSchemeOp::ESchemeOpDeallocatePersQueueGroup: return TxInvalid; case NKikimrSchemeOp::ESchemeOpCreateExternalTable: return TxCreateExternalTable; case NKikimrSchemeOp::ESchemeOpAlterExternalTable: return TxAlterExternalTable; + case NKikimrSchemeOp::ESchemeOpCreateExternalDataSource: return TxCreateExternalDataSource; + case NKikimrSchemeOp::ESchemeOpAlterExternalDataSource: return TxAlterExternalDataSource; default: return TxInvalid; } } diff --git a/ydb/core/tx/schemeshard/ut_external_data_source.cpp b/ydb/core/tx/schemeshard/ut_external_data_source.cpp new file mode 100644 index 00000000000..9900addb53a --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source.cpp @@ -0,0 +1,385 @@ +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> + +using namespace NKikimr::NSchemeShard; +using namespace NKikimr; +using namespace NKikimrSchemeOp; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(TExternalDataSourceTest) { + Y_UNIT_TEST(CreateExternalDataSource) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, txId++, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, 100); + + TestLs(runtime, "/MyRoot/MyExternalDataSource", false, NLs::PathExist); + } + + Y_UNIT_TEST(DropExternalDataSource) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, txId++, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )",{NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, 100); + + TestLs(runtime, "/MyRoot/MyExternalDataSource", false, NLs::PathExist); + + TestDropExternalDataSource(runtime, ++txId, "/MyRoot", "MyExternalDataSource"); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/MyExternalDataSource", false, NLs::PathNotExist); + } + + using TRuntimeTxFn = std::function<void(TTestBasicRuntime&, ui64)>; + + void DropTwice(const TString& path, TRuntimeTxFn createFn, TRuntimeTxFn dropFn) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + createFn(runtime, ++txId); + env.TestWaitNotification(runtime, txId); + + dropFn(runtime, ++txId); + dropFn(runtime, ++txId); + TestModificationResult(runtime, txId - 1); + + auto ev = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(); + UNIT_ASSERT(ev); + + const auto& record = ev->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetTxId(), txId); + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusMultipleModifications); + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDropTxId(), txId - 1); + + env.TestWaitNotification(runtime, txId - 1); + TestDescribeResult(DescribePath(runtime, path), { + NLs::PathNotExist + }); + } + + Y_UNIT_TEST(DropTableTwice) { + auto createFn = [](TTestBasicRuntime& runtime, ui64 txId) { + TestCreateExternalDataSource(runtime, txId, "/MyRoot", R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + }; + + auto dropFn = [](TTestBasicRuntime& runtime, ui64 txId) { + AsyncDropExternalDataSource(runtime, txId, "/MyRoot", "MyExternalDataSource"); + }; + + DropTwice("/MyRoot/MyExternalDataSource", createFn, dropFn); + } + + Y_UNIT_TEST(ParallelCreateExternalDataSource) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 123; + + AsyncMkDir(runtime, ++txId, "/MyRoot", "DirA"); + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( + Name: "MyExternalDataSource1" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "MyExternalDataSource2" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted); + + env.TestWaitNotification(runtime, {txId, txId-1, txId-2}); + + TestDescribe(runtime, "/MyRoot/DirA/MyExternalDataSource1"); + TestDescribe(runtime, "/MyRoot/DirA/MyExternalDataSource2"); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA"), + {NLs::PathVersionEqual(7)}); + TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/MyExternalDataSource1"), + {NLs::PathVersionEqual(2)}); + TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/MyExternalDataSource2"), + {NLs::PathVersionEqual(2)}); + } + + Y_UNIT_TEST(ParallelCreateSameExternalDataSource) { + using ESts = NKikimrScheme::EStatus; + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 123; + + TString dataSourceConfig = R"( + Name: "NilNoviSubLuna" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"; + + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot", dataSourceConfig); + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot", dataSourceConfig); + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot", dataSourceConfig); + + ui64 sts[3]; + sts[0] = TestModificationResults(runtime, txId-2, {ESts::StatusAccepted, ESts::StatusMultipleModifications, ESts::StatusAlreadyExists}); + sts[1] = TestModificationResults(runtime, txId-1, {ESts::StatusAccepted, ESts::StatusMultipleModifications, ESts::StatusAlreadyExists}); + sts[2] = TestModificationResults(runtime, txId, {ESts::StatusAccepted, ESts::StatusMultipleModifications, ESts::StatusAlreadyExists}); + + for (ui32 i=0; i<3; ++i) { + if (sts[i] == ESts::StatusAlreadyExists) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/NilNoviSubLuna"), + {NLs::Finished, + NLs::IsExternalDataSource}); + } + + if (sts[i] == ESts::StatusMultipleModifications) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/NilNoviSubLuna"), + {NLs::Finished, + NLs::IsExternalDataSource}); + } + } + + env.TestWaitNotification(runtime, {txId-2, txId-1, txId}); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/NilNoviSubLuna"), + {NLs::Finished, + NLs::IsExternalDataSource, + NLs::PathVersionEqual(2)}); + + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot", dataSourceConfig, {ESts::StatusAlreadyExists}); + + } + + + Y_UNIT_TEST(ReadOnlyMode) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 123; + + AsyncMkDir(runtime, ++txId, "/MyRoot", "SubDirA"); + AsyncCreateExternalDataSource(runtime, ++txId, "/MyRoot",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + // Set ReadOnly + SetSchemeshardReadOnlyMode(runtime, true); + TActorId sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + // Verify that table creation successfully finished + env.TestWaitNotification(runtime, txId); + + // Check that describe works + TestDescribeResult(DescribePath(runtime, "/MyRoot/SubDirA"), + {NLs::Finished}); + TestDescribeResult(DescribePath(runtime, "/MyRoot/MyExternalDataSource"), + {NLs::Finished, + NLs::IsExternalDataSource}); + + // Check that new modifications fail + TestMkDir(runtime, ++txId, "/MyRoot", "SubDirBBBB", {NKikimrScheme::StatusReadOnly}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot",R"( + Name: "MyExternalDataSource2" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {NKikimrScheme::StatusReadOnly}); + + // Disable ReadOnly + SetSchemeshardReadOnlyMode(runtime, false); + sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + // Check that modifications now work again + TestMkDir(runtime, ++txId, "/MyRoot", "SubDirBBBB"); + } + + Y_UNIT_TEST(SchemeErrors) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 123; + + TestMkDir(runtime, ++txId, "/MyRoot", "DirA"); + env.TestWaitNotification(runtime, txId); + + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( + Name: "MyExternalDataSource" + SourceType: "DataStream" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {{NKikimrScheme::StatusSchemeError, "Only ObjectStorage source type supported but got DataStream"}}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "" + Auth { + None { + } + } + )", {{NKikimrScheme::StatusSchemeError, "Location or installation must not be empty"}}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + )", {{NKikimrScheme::StatusSchemeError, "Authorization method not specified"}}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA", Sprintf(R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "%s" + Auth { + None { + } + } + )", TString{1001, 'a'}.c_str()), {{NKikimrScheme::StatusSchemeError, "Maximum length of location must be less or equal equal to 1000 but got 1001"}}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA", Sprintf(R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Installation: "%s" + Auth { + None { + } + } + )", TString{1001, 'a'}.c_str()), {{NKikimrScheme::StatusSchemeError, "Maximum length of installation must be less or equal equal to 1000 but got 1001"}}); + TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( + Name: "" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {{NKikimrScheme::StatusSchemeError, "error: path part shouldn't be empty"}}); + } + + Y_UNIT_TEST(PreventDeletionOfDependentDataSources) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, txId++, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, 100); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathExist); + + TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, txId - 1); + + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathExist); + + + TestDropExternalDataSource(runtime, ++txId, "/MyRoot", "ExternalDataSource", + {{NKikimrScheme::StatusSchemeError, "Other entities depend on this data source, please remove them at the beginning: /MyRoot/ExternalDataSource"}}); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathExist); + } + + Y_UNIT_TEST(RemovingReferencesFromDataSources) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExternalDataSource(runtime, txId++, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, 100); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathExist); + + TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, txId - 1); + + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathExist); + + TestDropExternalTable(runtime, ++txId, "/MyRoot", "ExternalTable", + {NKikimrScheme::StatusAccepted}); + + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathNotExist); + + TestDropExternalDataSource(runtime, ++txId, "/MyRoot", "ExternalDataSource", + {NKikimrScheme::StatusAccepted}); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathNotExist); + } +} diff --git a/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.darwin.txt new file mode 100644 index 00000000000..5b341469b91 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.darwin.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_external_data_source) +target_compile_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_external_data_source PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_external_data_source + TEST_TARGET + ydb-core-tx-schemeshard-ut_external_data_source + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-schemeshard-ut_external_data_source) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..beee61c6360 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux-aarch64.txt @@ -0,0 +1,85 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_external_data_source) +target_compile_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_external_data_source PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_external_data_source + TEST_TARGET + ydb-core-tx-schemeshard-ut_external_data_source + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-schemeshard-ut_external_data_source) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux.txt new file mode 100644 index 00000000000..037aa8a2ce7 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.linux.txt @@ -0,0 +1,87 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-schemeshard-ut_external_data_source) +target_compile_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(ydb-core-tx-schemeshard-ut_external_data_source PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-schemeshard-ut_external_data_source PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source.cpp +) +set_property( + TARGET + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-tx-schemeshard-ut_external_data_source + TEST_TARGET + ydb-core-tx-schemeshard-ut_external_data_source + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-schemeshard-ut_external_data_source + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-core-tx-schemeshard-ut_external_data_source) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.txt b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp b/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp new file mode 100644 index 00000000000..88c15ac601b --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp @@ -0,0 +1,257 @@ +#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> + +#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> + +#include <google/protobuf/text_format.h> + +using namespace NKikimr; +using namespace NSchemeShard; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(TExternalDataSourceTestReboots) { + Y_UNIT_TEST(CreateExternalDataSourceWithReboots) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + AsyncMkDir(runtime, ++t.TxId, "/MyRoot", "DirExternalDataSource"); + + AsyncCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot/DirExternalDataSource", R"( + Name: "MyExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + + t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId-1}); + + { + TInactiveZone inactive(activeZone); + auto describeResult = DescribePath(runtime, "/MyRoot/DirExternalDataSource/MyExternalDataSource"); + TestDescribeResult(describeResult, {NLs::Finished}); + + UNIT_ASSERT(describeResult.GetPathDescription().HasExternalDataSourceDescription()); + const auto& externalDataSourceDescription = describeResult.GetPathDescription().GetExternalDataSourceDescription(); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetName(), "MyExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetVersion(), 1); + UNIT_ASSERT_VALUES_EQUAL(externalDataSourceDescription.GetLocation(), "https://s3.cloud.net/my_bucket"); + UNIT_ASSERT_EQUAL(externalDataSourceDescription.GetAuth().identity_case(), NKikimrSchemeOp::TAuth::kNone); + } + }); + } + + Y_UNIT_TEST(ParallelCreateDrop) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + AsyncCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "DropMe" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + AsyncDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "DropMe"); + t.TestEnv->TestWaitNotification(runtime, t.TxId-1); + + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "DropMe"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/DropMe"), + {NLs::PathNotExist}); + } + }); + } + + Y_UNIT_TEST(SimpleDropExternalDataSourceWithReboots) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalDataSource"), + {NLs::PathNotExist}); + } + }); + } + + Y_UNIT_TEST(SimpleDropExternalDataSourceWithReboots2) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalDataSource"), + {NLs::PathNotExist}); + } + }); + } + + + Y_UNIT_TEST(DropExternalDataSourceWithReboots) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateExternalDataSource(runtime, t.TxId, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalDataSource"), + {NLs::PathNotExist}); + + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + {NLs::PathNotExist}); + } + }); + } + + + Y_UNIT_TEST(CreateDroppedExternalDataSourceWithReboots) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + }); + } + + Y_UNIT_TEST(CreateDroppedExternalDataSourceAndDropWithReboots) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateExternalDataSource(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestDropExternalDataSource(runtime, ++t.TxId, "/MyRoot", "ExternalDataSource"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalDataSource"), + {NLs::PathNotExist}); + } + }); + } +} diff --git a/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.darwin.txt new file mode 100644 index 00000000000..7f155a3be9e --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.darwin.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(schemeshard-ut_external_data_source_reboots) +target_compile_options(schemeshard-ut_external_data_source_reboots PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(schemeshard-ut_external_data_source_reboots PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(schemeshard-ut_external_data_source_reboots PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp +) +set_property( + TARGET + schemeshard-ut_external_data_source_reboots + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + schemeshard-ut_external_data_source_reboots + TEST_TARGET + schemeshard-ut_external_data_source_reboots + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + TIMEOUT + 600 +) +vcs_info(schemeshard-ut_external_data_source_reboots) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..beee2a7b5f0 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux-aarch64.txt @@ -0,0 +1,85 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(schemeshard-ut_external_data_source_reboots) +target_compile_options(schemeshard-ut_external_data_source_reboots PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(schemeshard-ut_external_data_source_reboots PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(schemeshard-ut_external_data_source_reboots PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp +) +set_property( + TARGET + schemeshard-ut_external_data_source_reboots + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + schemeshard-ut_external_data_source_reboots + TEST_TARGET + schemeshard-ut_external_data_source_reboots + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + TIMEOUT + 600 +) +vcs_info(schemeshard-ut_external_data_source_reboots) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux.txt new file mode 100644 index 00000000000..973e1a080e7 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.linux.txt @@ -0,0 +1,87 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(schemeshard-ut_external_data_source_reboots) +target_compile_options(schemeshard-ut_external_data_source_reboots PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard +) +target_link_libraries(schemeshard-ut_external_data_source_reboots PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-schemeshard + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-core-tx + tx-schemeshard-ut_helpers + udf-service-exception_policy +) +target_link_options(schemeshard-ut_external_data_source_reboots PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(schemeshard-ut_external_data_source_reboots PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/ut_external_data_source_reboots.cpp +) +set_property( + TARGET + schemeshard-ut_external_data_source_reboots + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + schemeshard-ut_external_data_source_reboots + TEST_TARGET + schemeshard-ut_external_data_source_reboots + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + schemeshard-ut_external_data_source_reboots + PROPERTY + TIMEOUT + 600 +) +vcs_info(schemeshard-ut_external_data_source_reboots) diff --git a/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.txt b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_external_data_source_reboots/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/tx/schemeshard/ut_external_table.cpp b/ydb/core/tx/schemeshard/ut_external_table.cpp index e9eb58d7c4a..e8a355cbdba 100644 --- a/ydb/core/tx/schemeshard/ut_external_table.cpp +++ b/ydb/core/tx/schemeshard/ut_external_table.cpp @@ -5,19 +5,42 @@ using namespace NKikimr; using namespace NKikimrSchemeOp; using namespace NSchemeShardUT_Private; +namespace { + +void CreateExternalDataSource(TTestBasicRuntime& runtime, TTestEnv& env, ui64 txId) { + TestCreateExternalDataSource(runtime, txId, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )", {NKikimrScheme::StatusAccepted}); + + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathExist); +} + +} + Y_UNIT_TEST_SUITE(TExternalTableTest) { Y_UNIT_TEST(CreateExternalTable) { TTestBasicRuntime runtime; TTestEnv env(runtime); ui64 txId = 100; + CreateExternalDataSource(runtime, env, txId++); + TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + )", {NKikimrScheme::StatusAccepted}); - TestCreateExternalTable(runtime, txId++, "/MyRoot", - R"(Name: "external_table1")", - {NKikimrScheme::StatusAccepted}); - - env.TestWaitNotification(runtime, 100); + env.TestWaitNotification(runtime, txId - 1); - TestLs(runtime, "/MyRoot/external_table1", false, NLs::PathExist); + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathExist); } Y_UNIT_TEST(DropExternalTable) { @@ -25,18 +48,22 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TTestEnv env(runtime); ui64 txId = 100; - TestCreateExternalTable(runtime, txId++, "/MyRoot", - R"(Name: "external_table1")", - {NKikimrScheme::StatusAccepted}); + CreateExternalDataSource(runtime, env, txId++); + TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + )", {NKikimrScheme::StatusAccepted}); - env.TestWaitNotification(runtime, 100); + env.TestWaitNotification(runtime, txId - 1); - TestLs(runtime, "/MyRoot/external_table1", false, NLs::PathExist); + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathExist); - TestDropExternalTable(runtime, ++txId, "/MyRoot", "external_table1"); + TestDropExternalTable(runtime, ++txId, "/MyRoot", "ExternalTable"); env.TestWaitNotification(runtime, txId); - TestLs(runtime, "/MyRoot/external_table1", false, NLs::PathNotExist); + TestLs(runtime, "/MyRoot/ExternalTable", false, NLs::PathNotExist); } using TRuntimeTxFn = std::function<void(TTestBasicRuntime&, ui64)>; @@ -46,6 +73,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TTestEnv env(runtime); ui64 txId = 100; + CreateExternalDataSource(runtime, env, txId++); createFn(runtime, ++txId); env.TestWaitNotification(runtime, txId); @@ -70,10 +98,12 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { Y_UNIT_TEST(DropTableTwice) { auto createFn = [](TTestBasicRuntime& runtime, ui64 txId) { TestCreateExternalTable(runtime, txId, "/MyRoot", R"( - Name: "ExternalTable" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Utf8" } - )"); + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + )"); }; auto dropFn = [](TTestBasicRuntime& runtime, ui64 txId) { @@ -88,17 +118,24 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TTestEnv env(runtime); ui64 txId = 123; + CreateExternalDataSource(runtime, env, txId++); AsyncMkDir(runtime, ++txId, "/MyRoot", "DirA"); - AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "ExternalTable1" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); - AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "ExternalTable2" - Columns { Name: "key1" Type: "Uint32"} - Columns { Name: "key2" Type: "Utf8"} - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "ExternalTable1" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); + AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "ExternalTable2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key1" Type: "Uint32"} + Columns { Name: "key2" Type: "Utf8"} + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted); TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted); TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted); @@ -123,10 +160,14 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TTestEnv env(runtime); ui64 txId = 123; - TString tableConfig = R"(Name: "NilNoviSubLuna" - Columns { Name: "key" Type: "Uint64"} - Columns { Name: "value" Type: "Uint64"})"; - + TString tableConfig = R"( + Name: "NilNoviSubLuna" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Uint64"} + )"; + CreateExternalDataSource(runtime, env, txId++); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", tableConfig); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", tableConfig); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", tableConfig); @@ -167,11 +208,15 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TTestEnv env(runtime); ui64 txId = 123; + CreateExternalDataSource(runtime, env, txId++); AsyncMkDir(runtime, ++txId, "/MyRoot", "SubDirA"); - AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", - R"(Name: "ExternalTable1" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", R"( + Name: "ExternalTable1" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); // Set ReadOnly SetSchemeshardReadOnlyMode(runtime, true); TActorId sender = runtime.AllocateEdgeActor(); @@ -189,11 +234,13 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { // Check that new modifications fail TestMkDir(runtime, ++txId, "/MyRoot", "SubDirBBBB", {NKikimrScheme::StatusReadOnly}); - TestCreateExternalTable(runtime, ++txId, "/MyRoot", - R"(Name: "ExternalTable1" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})", - {NKikimrScheme::StatusReadOnly}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot", R"( + Name: "ExternalTable1" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )", {NKikimrScheme::StatusReadOnly}); // Disable ReadOnly SetSchemeshardReadOnlyMode(runtime, false); @@ -211,27 +258,44 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TestMkDir(runtime, ++txId, "/MyRoot", "DirA"); env.TestWaitNotification(runtime, txId); - - TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "Table2" - Columns { Name: "RowId" Type: "BlaBlaType"})", - {NKikimrScheme::StatusSchemeError}); - TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "Table2" - Columns { Name: "" Type: "Uint64"})", - {NKikimrScheme::StatusSchemeError}); - TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "Table2" - Columns { Name: "RowId" TypeId: 27})", - {NKikimrScheme::StatusSchemeError}); - TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "Table2" - Columns { Name: "RowId" })", - {NKikimrScheme::StatusSchemeError}); - TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", - R"(Name: "Table2" - Columns { Name: "RowId" Type: "Uint64" Id: 2} - Columns { Name: "RowId2" Type: "Uint64" Id: 2 })", - {NKikimrScheme::StatusSchemeError}); + CreateExternalDataSource(runtime, env, txId++); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "BlaBlaType"} + )", {{NKikimrScheme::StatusSchemeError, "Type 'BlaBlaType' specified for column 'RowId' is not supported"}}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "" Type: "Uint64"} + )", {{NKikimrScheme::StatusSchemeError, "Columns cannot have an empty name"}}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" TypeId: 27} + )", {{NKikimrScheme::StatusSchemeError, "a"}}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" } + )", {{NKikimrScheme::StatusSchemeError, "Missing Type for column 'RowId'"}}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64" Id: 2} + Columns { Name: "RowId2" Type: "Uint64" Id: 2 } + )", {{NKikimrScheme::StatusSchemeError, "Duplicate column id: 2"}}); + TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( + Name: "Table2" + DataSourcePath: "/MyRoot/ExternalDataSource1" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )", {{NKikimrScheme::StatusPathDoesNotExist, "Check failed: path: '/MyRoot/ExternalDataSource1'"}}); } } diff --git a/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp b/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp index ea276bf9603..e24ca8a8629 100644 --- a/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp @@ -9,30 +9,53 @@ using namespace NKikimr; using namespace NSchemeShard; using namespace NSchemeShardUT_Private; +namespace { + +void CreateExternalDataSource(TTestActorRuntime& runtime, TTestEnv& env, ui64 txId) { + AsyncCreateExternalDataSource(runtime, txId, "/MyRoot",R"( + Name: "ExternalDataSource" + SourceType: "ObjectStorage" + Location: "https://s3.cloud.net/my_bucket" + Auth { + None { + } + } + )"); + + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/ExternalDataSource", false, NLs::PathExist); +} + +} + Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { Y_UNIT_TEST(CreateExternalTableWithReboots) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); AsyncMkDir(runtime, ++t.TxId, "/MyRoot", "DirExternalTable"); - AsyncCreateExternalTable(runtime, ++t.TxId, "/MyRoot/DirExternalTable", R"( - Name: "external_table1" - DataSourcePath: "/MySource" - Columns { Name: "a" Type: "Int32" NotNull: true } - Columns { Name: "b" Type: "Int32" NotNull: true } - )"); + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "a" Type: "Int32" NotNull: true } + Columns { Name: "b" Type: "Int32" NotNull: true } + )"); - t.TestEnv->TestWaitNotification(runtime, {t.TxId, t.TxId-1}); + t.TestEnv->TestWaitNotification(runtime, {t.TxId-2, t.TxId-1, t.TxId}); { TInactiveZone inactive(activeZone); - auto describeResult = DescribePath(runtime, "/MyRoot/DirExternalTable/external_table1"); + auto describeResult = DescribePath(runtime, "/MyRoot/DirExternalTable/ExternalTable"); TestDescribeResult(describeResult, {NLs::Finished}); UNIT_ASSERT(describeResult.GetPathDescription().HasExternalTableDescription()); const auto& externalTableDescription = describeResult.GetPathDescription().GetExternalTableDescription(); - UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetName(), "external_table1"); - UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetDataSourcePath(), "/MySource"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetName(), "ExternalTable"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetDataSourcePath(), "/MyRoot/ExternalDataSource"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetLocation(), "/"); + UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetSourceType(), "ObjectStorage"); UNIT_ASSERT_VALUES_EQUAL(externalTableDescription.GetVersion(), 1); auto& columns = externalTableDescription.GetColumns(); UNIT_ASSERT_VALUES_EQUAL(columns.size(), 2); @@ -46,17 +69,21 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { }); } - Y_UNIT_TEST(ParallelCreateDrop) { //+ + Y_UNIT_TEST(ParallelCreateDrop) { + using ESts = NKikimrScheme::EStatus; TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, t.TxId); AsyncCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( - Name: "DropMe" - Columns { Name: "RowId" Type: "Uint64" } - Columns { Name: "Value" Type: "Utf8" } - )"); + Name: "DropMe" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64" } + Columns { Name: "Value" Type: "Utf8" } + )"); AsyncDropExternalTable(runtime, ++t.TxId, "/MyRoot", "DropMe"); - t.TestEnv->TestWaitNotification(runtime, t.TxId-1); - + TestModificationResults(runtime, t.TxId - 1, {ESts::StatusAccepted}); + t.TestEnv->TestWaitNotification(runtime, t.TxId - 1); TestDropExternalTable(runtime, ++t.TxId, "/MyRoot", "DropMe"); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -72,12 +99,17 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { Y_UNIT_TEST(SimpleDropExternalTableWithReboots) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); { TInactiveZone inactive(activeZone); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - "Name: \"ExternalTable\"" - "Columns { Name: \"RowId\" Type: \"Uint64\"}" - "Columns { Name: \"Value\" Type: \"Utf8\"}"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot",R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "a" Type: "Int32" NotNull: true } + Columns { Name: "b" Type: "Int32" NotNull: true } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -92,15 +124,19 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { }); } - Y_UNIT_TEST(SimpleDropExternalTableWithReboots2) { //+ + Y_UNIT_TEST(SimpleDropExternalTableWithReboots2) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); { TInactiveZone inactive(activeZone); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - "Name: \"ExternalTable\"" - "Columns { Name: \"RowId\" Type: \"Uint64\"}" - "Columns { Name: \"Value\" Type: \"Utf8\"}"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot",R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "a" Type: "Int32" NotNull: true } + Columns { Name: "b" Type: "Int32" NotNull: true } + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -118,12 +154,16 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { Y_UNIT_TEST(DropExternalTableWithReboots) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); { TInactiveZone inactive(activeZone); - TestCreateExternalTable(runtime, t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -135,16 +175,19 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalTable"), {NLs::PathNotExist}); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); TestDropExternalTable(runtime, ++t.TxId, "/MyRoot", "ExternalTable"); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), + TestDescribeResult(DescribePath(runtime, "/MyRoot/ExternalTable"), {NLs::PathNotExist}); } }); @@ -153,22 +196,29 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { Y_UNIT_TEST(CreateDroppedExternalTableWithReboots) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); { TInactiveZone inactive(activeZone); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); TestDropExternalTable(runtime, ++t.TxId, "/MyRoot", "ExternalTable"); t.TestEnv->TestWaitNotification(runtime, t.TxId); } - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); { @@ -183,21 +233,28 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { Y_UNIT_TEST(CreateDroppedExternalTableAndDropWithReboots) { //+ TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + CreateExternalDataSource(runtime, *t.TestEnv, ++t.TxId); { TInactiveZone inactive(activeZone); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); TestDropExternalTable(runtime, ++t.TxId, "/MyRoot", "ExternalTable"); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", - R"(Name: "ExternalTable" - Columns { Name: "RowId" Type: "Uint64"} - Columns { Name: "Value" Type: "Utf8"})"); + TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "ExternalTable" + DataSourcePath: "/MyRoot/ExternalDataSource" + Location: "/" + Columns { Name: "RowId" Type: "Uint64"} + Columns { Name: "Value" Type: "Utf8"} + )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 4fa6ca245b1..d5b550e6d38 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -893,6 +893,11 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(DropExternalTable, NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable, &NKikimrSchemeOp::TModifyScheme::MutableDrop) DROP_BY_PATH_ID_HELPERS(DropExternalTable, NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable) + // external data source + GENERIC_HELPERS(CreateExternalDataSource, NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource, &NKikimrSchemeOp::TModifyScheme::MutableCreateExternalDataSource) + GENERIC_HELPERS(DropExternalDataSource, NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource, &NKikimrSchemeOp::TModifyScheme::MutableDrop) + DROP_BY_PATH_ID_HELPERS(DropExternalDataSource, NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource) + #undef DROP_BY_PATH_ID_HELPERS #undef GENERIC_WITH_ATTRS_HELPERS #undef GENERIC_HELPERS diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 49c2f892fba..c58fe760692 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -181,6 +181,11 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(DropExternalTable); DROP_BY_PATH_ID_HELPERS(DropExternalTable); + // external data source + GENERIC_HELPERS(CreateExternalDataSource); + GENERIC_HELPERS(DropExternalDataSource); + DROP_BY_PATH_ID_HELPERS(DropExternalDataSource); + // backup & restore GENERIC_HELPERS(Backup); GENERIC_HELPERS(BackupToYt); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index f2918d23027..a8dd706e723 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -396,6 +396,13 @@ void IsExternalTable(const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(selfPath.GetPathType(), NKikimrSchemeOp::EPathTypeExternalTable); } +void IsExternalDataSource(const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrScheme::StatusSuccess); + const auto& pathDescr = record.GetPathDescription(); + const auto& selfPath = pathDescr.GetSelf(); + UNIT_ASSERT_VALUES_EQUAL(selfPath.GetPathType(), NKikimrSchemeOp::EPathTypeExternalDataSource); +} + TCheckFunc CheckColumns(const TString& name, const TSet<TString>& columns, const TSet<TString>& droppedColumns, const TSet<TString> keyColumns, NKikimrSchemeOp::EPathState pathState) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index e21efb9d90c..95fd8cc1286 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -83,6 +83,7 @@ namespace NLs { void IsTable(const NKikimrScheme::TEvDescribeSchemeResult& record); void IsExternalTable(const NKikimrScheme::TEvDescribeSchemeResult& record); + void IsExternalDataSource(const NKikimrScheme::TEvDescribeSchemeResult& record); TCheckFunc CheckColumns(const TString& name, const TSet<TString>& columns, const TSet<TString>& droppedColumns, const TSet<TString> keyColumns, NKikimrSchemeOp::EPathState pathState = NKikimrSchemeOp::EPathState::EPathStateNoChanges); void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult& record); diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index 9729908410b..ab0a8f162a7 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -152,6 +152,8 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpDropBlobDepot: case NKikimrSchemeOp::ESchemeOpDropExternalTable: return *modifyScheme.MutableDrop()->MutableName(); + case NKikimrSchemeOp::ESchemeOpDropExternalDataSource: + return *modifyScheme.MutableDrop()->MutableName(); case NKikimrSchemeOp::ESchemeOpAlterTable: return *modifyScheme.MutableAlterTable()->MutableName(); @@ -331,6 +333,12 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpAlterExternalTable: Y_FAIL("no implementation for ESchemeOpAlterExternalTable"); + + case NKikimrSchemeOp::ESchemeOpCreateExternalDataSource: + return *modifyScheme.MutableCreateExternalDataSource()->MutableName(); + + case NKikimrSchemeOp::ESchemeOpAlterExternalDataSource: + Y_FAIL("no implementation for ESchemeOpAlterExternalDataSource"); } } @@ -351,6 +359,7 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpCreateColumnStore: case NKikimrSchemeOp::ESchemeOpCreateColumnTable: case NKikimrSchemeOp::ESchemeOpCreateExternalTable: + case NKikimrSchemeOp::ESchemeOpCreateExternalDataSource: return true; default: return false; @@ -573,6 +582,7 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpAlterReplication: case NKikimrSchemeOp::ESchemeOpAlterBlobDepot: case NKikimrSchemeOp::ESchemeOpAlterExternalTable: + case NKikimrSchemeOp::ESchemeOpAlterExternalDataSource: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); toResolve.Path = Merge(workingDir, SplitPath(GetPathNameForScheme(pbModifyScheme))); @@ -593,7 +603,8 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpDropSequence: case NKikimrSchemeOp::ESchemeOpDropReplication: case NKikimrSchemeOp::ESchemeOpDropBlobDepot: - case NKikimrSchemeOp::ESchemeOpDropExternalTable: { + case NKikimrSchemeOp::ESchemeOpDropExternalTable: + case NKikimrSchemeOp::ESchemeOpDropExternalDataSource: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); toResolve.Path = Merge(workingDir, SplitPath(GetPathNameForScheme(pbModifyScheme))); toResolve.RequiredAccess = NACLib::EAccessRights::RemoveSchema; @@ -652,6 +663,7 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpCreateReplication: case NKikimrSchemeOp::ESchemeOpCreateBlobDepot: case NKikimrSchemeOp::ESchemeOpCreateExternalTable: + case NKikimrSchemeOp::ESchemeOpCreateExternalDataSource: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); toResolve.Path = workingDir; diff --git a/ydb/core/viewer/browse.h b/ydb/core/viewer/browse.h index 930c5e971f1..6744dafe049 100644 --- a/ydb/core/viewer/browse.h +++ b/ydb/core/viewer/browse.h @@ -96,6 +96,8 @@ public: return NKikimrViewer::EObjectType::BlobDepot; case NKikimrSchemeOp::EPathType::EPathTypeExternalTable: return NKikimrViewer::EObjectType::ExternalTable; + case NKikimrSchemeOp::EPathType::EPathTypeExternalDataSource: + return NKikimrViewer::EObjectType::ExternalDataSource; case NKikimrSchemeOp::EPathType::EPathTypeExtSubDomain: case NKikimrSchemeOp::EPathType::EPathTypeTableIndex: case NKikimrSchemeOp::EPathType::EPathTypeInvalid: diff --git a/ydb/core/viewer/protos/viewer.proto b/ydb/core/viewer/protos/viewer.proto index 8190cff37a1..f5ee46e4978 100644 --- a/ydb/core/viewer/protos/viewer.proto +++ b/ydb/core/viewer/protos/viewer.proto @@ -36,6 +36,7 @@ enum EObjectType { Replication = 22; BlobDepot = 23; ExternalTable = 24; + ExternalDataSource = 25; } message TBrowseInfo { diff --git a/ydb/public/lib/deprecated/kicli/kicli.h b/ydb/public/lib/deprecated/kicli/kicli.h index 1a079fc52ba..699c6bac05f 100644 --- a/ydb/public/lib/deprecated/kicli/kicli.h +++ b/ydb/public/lib/deprecated/kicli/kicli.h @@ -580,7 +580,8 @@ public: Sequence, Replication, BlobDepot, - ExternalTable + ExternalTable, + ExternalDataSource }; TSchemaObject(TSchemaObject&&) = default; diff --git a/ydb/public/lib/deprecated/kicli/schema.cpp b/ydb/public/lib/deprecated/kicli/schema.cpp index a32f82d4032..69a7c4e32e5 100644 --- a/ydb/public/lib/deprecated/kicli/schema.cpp +++ b/ydb/public/lib/deprecated/kicli/schema.cpp @@ -125,6 +125,9 @@ void TSchemaObject::Drop() { case EPathType::ExternalTable: drop.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable); break; + case EPathType::ExternalDataSource: + drop.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalDataSource); + break; case EPathType::Unknown: case EPathType::SubDomain: case EPathType::RtmrVolume: @@ -212,6 +215,8 @@ static TSchemaObject::EPathType GetType(const NKikimrSchemeOp::TDirEntry& entry) return TSchemaObject::EPathType::BlobDepot; case NKikimrSchemeOp::EPathTypeExternalTable: return TSchemaObject::EPathType::ExternalTable; + case NKikimrSchemeOp::EPathTypeExternalDataSource: + return TSchemaObject::EPathType::ExternalDataSource; case NKikimrSchemeOp::EPathTypeTableIndex: case NKikimrSchemeOp::EPathTypeExtSubDomain: case NKikimrSchemeOp::EPathTypeCdcStream: diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 0c181e773f5..1e2fe85de68 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6842,5 +6842,83 @@ "Blobs": 1 } } + }, + { + "TableId": 105, + "TableName": "ExternalDataSource", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "OwnerPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LocalPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "AlterVersion", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "SourceType", + "ColumnType": "Utf8" + }, + { + "ColumnId": 5, + "ColumnName": "Location", + "ColumnType": "Utf8" + }, + { + "ColumnId": 6, + "ColumnName": "Installation", + "ColumnType": "Utf8" + }, + { + "ColumnId": 7, + "ColumnName": "Auth", + "ColumnType": "String" + }, + { + "ColumnId": 8, + "ColumnName": "ExternalTableReferences", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |