aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-02-21 19:54:41 +0300
committerhcpp <hcpp@ydb.tech>2023-02-21 19:54:41 +0300
commit6e33b317bd70bd0662ae81effbec343f6d39af0c (patch)
tree9fbc8d5fa63b4605d21a84372d1c670d7c825136
parent40717b186524f698361f094ebd1cf2753419a334 (diff)
downloadydb-6e33b317bd70bd0662ae81effbec343f6d39af0c.tar.gz
external data source has been added to kqp
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp108
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h22
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp52
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp3
5 files changed, 175 insertions, 12 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index a46e2fbfa3..7bd1a951b6 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1269,8 +1269,8 @@ public:
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
- NKikimrSchemeOp::TExternalTableDescription& tableDesc = *schemeTx.MutableCreateExternalTable();
- FillCreateExternalTableColumnDesc(tableDesc, pathPair.second, settings);
+ NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
+ FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
return SendSchemeRequest(ev.Release());
}
catch (yexception& e) {
@@ -1363,6 +1363,84 @@ public:
}
}
+ TFuture<TGenericResult> CreateExternalDataSource(const TString& cluster,
+ const NYql::TCreateExternalDataSourceSettings& settings,
+ bool createDir) override {
+ using TRequest = TEvTxUserProxy::TEvProposeTransaction;
+
+ try {
+ if (!CheckCluster(cluster)) {
+ return InvalidCluster<TGenericResult>(cluster);
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!GetPathPair(settings.ExternalDataSource, pathPair, error, createDir)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ auto ev = MakeHolder<TRequest>();
+ ev->Record.SetDatabaseName(Database);
+ if (UserToken) {
+ ev->Record.SetUserToken(UserToken->Serialized);
+ }
+ auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
+ schemeTx.SetWorkingDir(pathPair.first);
+ schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource);
+
+ NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *schemeTx.MutableCreateExternalDataSource();
+ FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings);
+ return SendSchemeRequest(ev.Release());
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
+ TFuture<TGenericResult> AlterExternalDataSource(const TString& cluster,
+ const NYql::TAlterExternalDataSourceSettings& settings) override {
+ Y_UNUSED(cluster, settings);
+ return MakeErrorFuture<TGenericResult>(std::make_exception_ptr(yexception() << "The alter is not supported for the external data source"));
+ }
+
+ TFuture<TGenericResult> DropExternalDataSource(const TString& cluster,
+ const NYql::TDropExternalDataSourceSettings& settings) override {
+ using TRequest = TEvTxUserProxy::TEvProposeTransaction;
+
+ try {
+ if (!CheckCluster(cluster)) {
+ return InvalidCluster<TGenericResult>(cluster);
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!GetPathPair(settings.ExternalDataSource, pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ auto ev = MakeHolder<TRequest>();
+ ev->Record.SetDatabaseName(Database);
+ if (UserToken) {
+ ev->Record.SetUserToken(UserToken->Serialized);
+ }
+
+ auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
+ schemeTx.SetWorkingDir(pathPair.first);
+ schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource);
+
+ NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
+ drop.SetName(pathPair.second);
+ return SendSchemeRequest(ev.Release());
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
TFuture<TGenericResult> AlterUser(const TString& cluster, const NYql::TAlterUserSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
@@ -2130,7 +2208,7 @@ private:
}
static void FillCreateTableColumnDesc(NKikimrSchemeOp::TTableDescription& tableDesc,
- const TString& name, NYql::TKikimrTableMetadataPtr metadata)
+ const TString& name, NYql::TKikimrTableMetadataPtr metadata)
{
tableDesc.SetName(name);
@@ -2174,26 +2252,40 @@ private:
schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
}
- static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalableDesc,
+ static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
const TString& name,
const NYql::TCreateExternalTableSettings& settings)
{
- externalableDesc.SetName(name);
- externalableDesc.SetDataSourcePath(settings.DataSourcePath);
- externalableDesc.SetLocation(settings.Location);
+ externalTableDesc.SetName(name);
+ externalTableDesc.SetDataSourcePath(settings.DataSourcePath);
+ externalTableDesc.SetLocation(settings.Location);
Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size());
for (const auto& name : settings.ColumnOrder) {
auto columnIt = settings.Columns.find(name);
Y_ENSURE(columnIt != settings.Columns.end());
- TColumnDescription& columnDesc = *externalableDesc.AddColumns();
+ TColumnDescription& columnDesc = *externalTableDesc.AddColumns();
columnDesc.SetName(columnIt->second.Name);
columnDesc.SetType(columnIt->second.Type);
columnDesc.SetNotNull(columnIt->second.NotNull);
}
}
+ static void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescription& externaDataSourceDesc,
+ const TString& name,
+ const NYql::TCreateExternalDataSourceSettings& settings)
+ {
+ externaDataSourceDesc.SetName(name);
+ externaDataSourceDesc.SetSourceType(settings.SourceType);
+ externaDataSourceDesc.SetLocation(settings.Location);
+ externaDataSourceDesc.SetInstallation(settings.Installation);
+
+ if (settings.AuthMethod == "NONE") {
+ externaDataSourceDesc.MutableAuth()->MutableNone();
+ }
+ }
+
static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) {
if (!params) {
return;
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 6145d2c9f7..19820705fc 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -521,6 +521,22 @@ struct TDropExternalTableSettings {
TString ExternalTable;
};
+struct TCreateExternalDataSourceSettings {
+ TString ExternalDataSource;
+ TString SourceType;
+ TString Location;
+ TString Installation;
+ TString AuthMethod;
+};
+
+struct TAlterExternalDataSourceSettings {
+ TString ExternalDataSource;
+};
+
+struct TDropExternalDataSourceSettings {
+ TString ExternalDataSource;
+};
+
struct TKikimrListPathItem {
TKikimrListPathItem(TString name, bool isDirectory) {
Name = name;
@@ -682,6 +698,12 @@ public:
virtual NThreading::TFuture<TGenericResult> DropExternalTable(const TString& cluster, const TDropExternalTableSettings& settings) = 0;
+ virtual NThreading::TFuture<TGenericResult> CreateExternalDataSource(const TString& cluster, const TCreateExternalDataSourceSettings& settings, bool createDir) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> AlterExternalDataSource(const TString& cluster, const TAlterExternalDataSourceSettings& settings) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> DropExternalDataSource(const TString& cluster, const TDropExternalDataSourceSettings& settings) = 0;
+
virtual TVector<TString> GetCollectedSchemeData() = 0;
public:
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index ca9a199ef2..62e117c6a4 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -262,10 +262,37 @@ void TestDropTableCommon(TIntrusivePtr<IKikimrGateway> gateway) {
UNIT_ASSERT(!loadResponse.Metadata->DoesExist);
}
+void TestCreateExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) {
+ NYql::TCreateExternalDataSourceSettings settings;
+ settings.ExternalDataSource = path;
+ settings.SourceType = "ObjectStorage";
+ settings.AuthMethod = "NONE";
+ settings.Installation = "cloud";
+
+ auto responseFuture = gateway->CreateExternalDataSource(TestCluster, settings, true);
+ responseFuture.Wait();
+ auto response = responseFuture.GetValue();
+ response.Issues().PrintTo(Cerr);
+
+ UNIT_ASSERT_C(response.Success(), response.Issues().ToString());
+
+ auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource);
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo);
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), "cloud");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "");
+ UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(path).back());
+ UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone());
+}
+
void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) {
NYql::TCreateExternalTableSettings settings;
settings.ExternalTable = path;
+ settings.DataSourcePath = "/Root/f1/f2/external_data_source";
+ settings.Location = "/";
settings.Columns.insert(std::make_pair("Column1", TKikimrColumnMetadata{"Column1", 0, "Uint32", false}));
settings.ColumnOrder.push_back("Column1");
@@ -301,6 +328,19 @@ void TestDropExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGate
UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
}
+void TestDropExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGateway> gateway, const TString& path) {
+ auto responseFuture = gateway->DropExternalDataSource(TestCluster, TDropExternalDataSourceSettings{.ExternalDataSource=path});
+ responseFuture.Wait();
+ auto response = responseFuture.GetValue();
+ response.Issues().PrintTo(Cerr);
+ UNIT_ASSERT(response.Success());
+
+ auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
+ const auto& externalTable = externalTableDesc->ResultSet.at(0);
+ UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1);
+ UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown);
+}
+
} // namespace
@@ -370,24 +410,30 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}
Y_UNIT_TEST(TestCreateExternalTable) {
- return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
}
Y_UNIT_TEST(TestCreateSameExternalTable) {
- return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
}
Y_UNIT_TEST(TestDropExternalTable) {
- return; // skip TODO: This will be fixed after adding the external data source in gateway. Will be done in a separate review
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
}
+
+ Y_UNIT_TEST(TestDropExternalDataSource) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
+ TestDropExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
+ }
}
} // namespace NYql
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index 2da2c173d2..fb67d89742 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -130,7 +130,7 @@ struct TSchemeCacheNavigate {
KindReplication = 15,
KindBlobDepot = 16,
KindExternalTable = 17,
- KindExternalDataSource = 17,
+ KindExternalDataSource = 18,
};
struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 77349b8361..345b0c965c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -672,6 +672,9 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
targetName = tx.GetCreateExternalTable().GetName();
break;
+ case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource:
+ targetName = tx.GetCreateExternalDataSource().GetName();
+ break;
default:
result.Transactions.push_back(tx);
return result;