diff options
author | hcpp <hcpp@ydb.tech> | 2023-02-21 19:54:41 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-02-21 19:54:41 +0300 |
commit | 6e33b317bd70bd0662ae81effbec343f6d39af0c (patch) | |
tree | 9fbc8d5fa63b4605d21a84372d1c670d7c825136 | |
parent | 40717b186524f698361f094ebd1cf2753419a334 (diff) | |
download | ydb-6e33b317bd70bd0662ae81effbec343f6d39af0c.tar.gz |
external data source has been added to kqp
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 108 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 22 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 52 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/scheme_cache.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation.cpp | 3 |
5 files changed, 175 insertions, 12 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index a46e2fbfa3..7bd1a951b6 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1269,8 +1269,8 @@ public: schemeTx.SetWorkingDir(pathPair.first); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable); - NKikimrSchemeOp::TExternalTableDescription& tableDesc = *schemeTx.MutableCreateExternalTable(); - FillCreateExternalTableColumnDesc(tableDesc, pathPair.second, settings); + NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable(); + FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings); return SendSchemeRequest(ev.Release()); } catch (yexception& e) { @@ -1363,6 +1363,84 @@ public: } } + TFuture<TGenericResult> CreateExternalDataSource(const TString& cluster, + const NYql::TCreateExternalDataSourceSettings& settings, + bool createDir) override { + using TRequest = TEvTxUserProxy::TEvProposeTransaction; + + try { + if (!CheckCluster(cluster)) { + return InvalidCluster<TGenericResult>(cluster); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!GetPathPair(settings.ExternalDataSource, pathPair, error, createDir)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + auto ev = MakeHolder<TRequest>(); + ev->Record.SetDatabaseName(Database); + if (UserToken) { + ev->Record.SetUserToken(UserToken->Serialized); + } + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource); + + NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *schemeTx.MutableCreateExternalDataSource(); + FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings); + return SendSchemeRequest(ev.Release()); + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + + TFuture<TGenericResult> AlterExternalDataSource(const TString& cluster, + const NYql::TAlterExternalDataSourceSettings& settings) override { + Y_UNUSED(cluster, settings); + return MakeErrorFuture<TGenericResult>(std::make_exception_ptr(yexception() << "The alter is not supported for the external data source")); + } + + TFuture<TGenericResult> DropExternalDataSource(const TString& cluster, + const NYql::TDropExternalDataSourceSettings& settings) override { + using TRequest = TEvTxUserProxy::TEvProposeTransaction; + + try { + if (!CheckCluster(cluster)) { + return InvalidCluster<TGenericResult>(cluster); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!GetPathPair(settings.ExternalDataSource, pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + auto ev = MakeHolder<TRequest>(); + ev->Record.SetDatabaseName(Database); + if (UserToken) { + ev->Record.SetUserToken(UserToken->Serialized); + } + + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource); + + NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop(); + drop.SetName(pathPair.second); + return SendSchemeRequest(ev.Release()); + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + TFuture<TGenericResult> AlterUser(const TString& cluster, const NYql::TAlterUserSettings& settings) override { using TRequest = TEvTxUserProxy::TEvProposeTransaction; @@ -2130,7 +2208,7 @@ private: } static void FillCreateTableColumnDesc(NKikimrSchemeOp::TTableDescription& tableDesc, - const TString& name, NYql::TKikimrTableMetadataPtr metadata) + const TString& name, NYql::TKikimrTableMetadataPtr metadata) { tableDesc.SetName(name); @@ -2174,26 +2252,40 @@ private: schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); } - static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalableDesc, + static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc, const TString& name, const NYql::TCreateExternalTableSettings& settings) { - externalableDesc.SetName(name); - externalableDesc.SetDataSourcePath(settings.DataSourcePath); - externalableDesc.SetLocation(settings.Location); + externalTableDesc.SetName(name); + externalTableDesc.SetDataSourcePath(settings.DataSourcePath); + externalTableDesc.SetLocation(settings.Location); Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size()); for (const auto& name : settings.ColumnOrder) { auto columnIt = settings.Columns.find(name); Y_ENSURE(columnIt != settings.Columns.end()); - TColumnDescription& columnDesc = *externalableDesc.AddColumns(); + TColumnDescription& columnDesc = *externalTableDesc.AddColumns(); columnDesc.SetName(columnIt->second.Name); columnDesc.SetType(columnIt->second.Type); columnDesc.SetNotNull(columnIt->second.NotNull); } } + static void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescription& externaDataSourceDesc, + const TString& name, + const NYql::TCreateExternalDataSourceSettings& settings) + { + externaDataSourceDesc.SetName(name); + externaDataSourceDesc.SetSourceType(settings.SourceType); + externaDataSourceDesc.SetLocation(settings.Location); + externaDataSourceDesc.SetInstallation(settings.Installation); + + if (settings.AuthMethod == "NONE") { + externaDataSourceDesc.MutableAuth()->MutableNone(); + } + } + static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) { if (!params) { return; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 6145d2c9f7..19820705fc 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -521,6 +521,22 @@ struct TDropExternalTableSettings { TString ExternalTable; }; +struct TCreateExternalDataSourceSettings { + TString ExternalDataSource; + TString SourceType; + TString Location; + TString Installation; + TString AuthMethod; +}; + +struct TAlterExternalDataSourceSettings { + TString ExternalDataSource; +}; + +struct TDropExternalDataSourceSettings { + TString ExternalDataSource; +}; + struct TKikimrListPathItem { TKikimrListPathItem(TString name, bool isDirectory) { Name = name; @@ -682,6 +698,12 @@ public: virtual NThreading::TFuture<TGenericResult> DropExternalTable(const TString& cluster, const TDropExternalTableSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> CreateExternalDataSource(const TString& cluster, const TCreateExternalDataSourceSettings& settings, bool createDir) = 0; + + virtual NThreading::TFuture<TGenericResult> AlterExternalDataSource(const TString& cluster, const TAlterExternalDataSourceSettings& settings) = 0; + + virtual NThreading::TFuture<TGenericResult> DropExternalDataSource(const TString& cluster, const TDropExternalDataSourceSettings& settings) = 0; + virtual TVector<TString> GetCollectedSchemeData() = 0; public: diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index ca9a199ef2..62e117c6a4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -262,10 +262,37 @@ void TestDropTableCommon(TIntrusivePtr<IKikimrGateway> gateway) { UNIT_ASSERT(!loadResponse.Metadata->DoesExist); } +void TestCreateExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) { + NYql::TCreateExternalDataSourceSettings settings; + settings.ExternalDataSource = path; + settings.SourceType = "ObjectStorage"; + settings.AuthMethod = "NONE"; + settings.Installation = "cloud"; + + auto responseFuture = gateway->CreateExternalDataSource(TestCluster, settings, true); + responseFuture.Wait(); + auto response = responseFuture.GetValue(); + response.Issues().PrintTo(Cerr); + + UNIT_ASSERT_C(response.Success(), response.Issues().ToString()); + + auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), "cloud"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), ""); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(path).back()); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone()); +} + void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) { NYql::TCreateExternalTableSettings settings; settings.ExternalTable = path; + settings.DataSourcePath = "/Root/f1/f2/external_data_source"; + settings.Location = "/"; settings.Columns.insert(std::make_pair("Column1", TKikimrColumnMetadata{"Column1", 0, "Uint32", false})); settings.ColumnOrder.push_back("Column1"); @@ -301,6 +328,19 @@ void TestDropExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGate UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); } +void TestDropExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) { + auto responseFuture = gateway->DropExternalDataSource(TestCluster, TDropExternalDataSourceSettings{.ExternalDataSource=path}); + responseFuture.Wait(); + auto response = responseFuture.GetValue(); + response.Issues().PrintTo(Cerr); + UNIT_ASSERT(response.Success()); + + auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalTable = externalTableDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1); + UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); +} + } // namespace @@ -370,24 +410,30 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { } Y_UNIT_TEST(TestCreateExternalTable) { - return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); } Y_UNIT_TEST(TestCreateSameExternalTable) { - return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); } Y_UNIT_TEST(TestDropExternalTable) { - return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table"); } + + Y_UNIT_TEST(TestDropExternalDataSource) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); + TestDropExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); + } } } // namespace NYql diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 2da2c173d2..fb67d89742 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -130,7 +130,7 @@ struct TSchemeCacheNavigate { KindReplication = 15, KindBlobDepot = 16, KindExternalTable = 17, - KindExternalDataSource = 17, + KindExternalDataSource = 18, }; struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 77349b8361..345b0c965c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -672,6 +672,9 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: targetName = tx.GetCreateExternalTable().GetName(); break; + case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: + targetName = tx.GetCreateExternalDataSource().GetName(); + break; default: result.Transactions.push_back(tx); return result; |