diff options
author | Vasily Gerasimov <UgnineSirdis@ydb.tech> | 2024-01-29 09:41:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-29 09:41:41 +0200 |
commit | a32284c27a7379de8cfb724ce429f89d5e5d2cb6 (patch) | |
tree | 77581d1c5036a4c48882ea3703b83dde8b272cf4 | |
parent | 5450e7b6cb4d07b476fdfa66a26b97546a16de6e (diff) | |
download | ydb-a32284c27a7379de8cfb724ce429f89d5e5d2cb6.tar.gz |
Forbid writing/reading directly to external data source (#1204)
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 60 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_metadata_loader.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 250 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 2 |
12 files changed, 318 insertions, 36 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index f02e4abc9d..283c6a2cde 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -176,7 +176,7 @@ private: counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( - TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); + QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig); Gateway->SetToken(QueryId.Cluster, UserToken); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 4caa5e09f5..1b68a5cf1a 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -806,12 +806,7 @@ public: return InvalidCluster<TTableMetadataResult>(cluster); } - settings.WithExternalDatasources_ = !CheckCluster(cluster); - // In the case of reading from an external data source, - // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...) - // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source. - // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard - return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken); + return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken); } catch (yexception& e) { return MakeFuture(ResultFromException<TTableMetadataResult>(e)); } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index e368f3f40a..58a942d412 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -227,6 +227,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId()); tableMeta->SchemaVersion = description.GetVersion(); tableMeta->Kind = NYql::EKikimrTableKind::External; + tableMeta->TableType = NYql::ETableType::ExternalTable; tableMeta->Attributes = entry.Attributes; @@ -253,7 +254,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC } TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, - const TString& cluster, const TString& tableName) { + const TString& cluster, const TString& mainCluster, const TString& tableName) { const auto& description = entry.ExternalDataSourceInfo->Description; TTableMetadataResult result; result.SetSuccess(); @@ -263,6 +264,11 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId()); tableMeta->SchemaVersion = description.GetVersion(); tableMeta->Kind = NYql::EKikimrTableKind::External; + if (cluster == mainCluster) { // resolved external data source itself + tableMeta->TableType = NYql::ETableType::Unknown; + } else { + tableMeta->TableType = NYql::ETableType::Table; // wanted to resolve table in external data source + } tableMeta->Attributes = entry.Attributes; @@ -300,7 +306,7 @@ TTableMetadataResult GetViewMetadataResult( } TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, - const TString& cluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) { + const TString& cluster, const TString& mainCluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) { using TResult = NYql::IKikimrGateway::TTableMetadataResult; using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; @@ -339,7 +345,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache result = GetExternalTableMetadataResult(entry, cluster, tableName); break; case EKind::KindExternalDataSource: - result = GetExternalDataSourceMetadataResult(entry, cluster, tableName); + result = GetExternalDataSourceMetadataResult(entry, cluster, mainCluster, tableName); break; case EKind::KindView: result = GetViewMetadataResult(entry, cluster, tableName); @@ -697,16 +703,30 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; - const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_); - Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only"); - auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id, + // In the case of reading from an external data source, + // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...) + // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source. + // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard + const bool resolveEntityInsideDataSource = (cluster != Cluster); + TPath entityName = id; + if constexpr (std::is_same_v<TPath, TString>) { + if (resolveEntityInsideDataSource) { + entityName = cluster; + } + } else { + Y_ENSURE(!resolveEntityInsideDataSource); + } + + const auto externalEntryItem = CreateNavigateExternalEntry(entityName, resolveEntityInsideDataSource); + Y_ABORT_UNLESS(!resolveEntityInsideDataSource || externalEntryItem, "External data source must be resolved using path only"); + auto resNavigate = resolveEntityInsideDataSource ? *externalEntryItem : CreateNavigateEntry(entityName, settings, TempTablesState); const auto entry = resNavigate.Entry; const auto queryName = resNavigate.QueryName; - const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem; - const ui64 expectedSchemaVersion = GetExpectedVersion(id); + const auto externalEntry = resolveEntityInsideDataSource ? std::optional<NavigateEntryResult>{} : externalEntryItem; + const ui64 expectedSchemaVersion = GetExpectedVersion(entityName); - LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id)); + LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(entityName)); auto navigate = MakeHolder<TNavigate>(); navigate->ResultSet.emplace_back(entry); @@ -728,7 +748,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta ActorSystem, schemeCacheId, ev.Release(), - [userToken, database, cluster, table, settings, expectedSchemaVersion, this, queryName] + [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName] (TPromise<TResult> promise, TResponse&& response) mutable { try { @@ -739,7 +759,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta auto& entry = InferEntry(navigate.ResultSet); if (entry.Status != EStatus::Ok) { - promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table)); + promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table)); return; } @@ -759,9 +779,15 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta } } + const bool resolveEntityInsideDataSource = (cluster != Cluster); + // resolveEntityInsideDataSource => entry.Kind == EKind::KindExternalDataSource + if (resolveEntityInsideDataSource && entry.Kind != EKind::KindExternalDataSource) { + throw yexception() << "\"" << CombinePath(entry.Path.begin(), entry.Path.end()) << "\" is expected to be external data source"; + } + switch (entry.Kind) { case EKind::KindExternalDataSource: { - auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, table); + auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table); if (!externalDataSourceMetadata.Success()) { promise.SetValue(externalDataSourceMetadata); return; @@ -772,12 +798,12 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); promise.SetValue(externalDataSourceMetadata); }); + break; } - break; case EKind::KindExternalTable: { YQL_ENSURE(entry.ExternalTableInfo, "expected external table info"); const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath(); - auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table); + auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table); if (!externalTableMetadata.Success()) { promise.SetValue(externalTableMetadata); return; @@ -789,8 +815,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta auto externalDataSourceMetadata = result.GetValue(); promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata)); }); + break; } - break; case EKind::KindIndex: { Y_ENSURE(entry.ListNodeEntry, "expected children list"); Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child"); @@ -805,10 +831,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta { promise.SetValue(result.GetValue()); }); + break; } - break; default: { - promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table, queryName)); + promise.SetValue(GetLoadTableMetadataResult(entry, cluster, mainCluster, table, queryName)); } } } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h index 33340ab8ba..798e9cca88 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.h +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h @@ -14,17 +14,19 @@ namespace NKikimr::NKqp { class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader { public: - explicit TKqpTableMetadataLoader(TActorSystem* actorSystem, - NYql::TKikimrConfiguration::TPtr config, - bool needCollectSchemeData = false, + explicit TKqpTableMetadataLoader(const TString& cluster, + TActorSystem* actorSystem, + NYql::TKikimrConfiguration::TPtr config, + bool needCollectSchemeData = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20)) - : NeedCollectSchemeData(needCollectSchemeData) + : Cluster(cluster) + , NeedCollectSchemeData(needCollectSchemeData) , ActorSystem(actorSystem) , Config(config) , TempTablesState(std::move(tempTablesState)) , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) - {}; + {} NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata( const TString& cluster, const TString& table, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, const TString& database, @@ -56,6 +58,7 @@ private: void OnLoadedTableMetadata(NYql::IKikimrGateway::TTableMetadataResult& loadTableMetadataResult); + const TString Cluster; TVector<NKikimrKqp::TKqpTableMetadataProto> CollectedSchemeData; TMutex Lock; bool NeedCollectSchemeData; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index 393862f3d7..e2df475c14 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -557,6 +557,11 @@ public: return true; } + if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\" without table. Please specify table to write to")); + return false; + } + if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) { YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType; return true; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 810057fe28..4e45f0a218 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -256,7 +256,7 @@ public: } } break; - default: + default: break; } *result = value; @@ -679,6 +679,11 @@ public: auto& tableDesc = SessionCtx->Tables().GetTable(cluster, tablePath); if (key.GetKeyType() == TKikimrKey::Type::Table) { if (tableDesc.Metadata->Kind == EKikimrTableKind::External) { + if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) { + ctx.AddError(TIssue(node->Pos(ctx), + TStringBuilder() << "Attempt to read from external data source \"" << tablePath << "\" without table. Please specify table to read from")); + return nullptr; + } if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) { const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type); ctx.Step.Repeat(TExprStep::DiscoveryIO) diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index bd58a9c7e7..50c1de4dde 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -923,7 +923,7 @@ public: auto tableTypeItem = table.Metadata->TableType; if (tableTypeItem == ETableType::ExternalTable && !SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), - TStringBuilder() << "External table are disabled. Please contact your system administrator to enable it")); + TStringBuilder() << "External tables are disabled. Please contact your system administrator to enable it")); return SyncError(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index f55ac55afd..a072b09923 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -73,7 +73,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) { counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters); counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); - std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false); + std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false); return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig()); } diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 956ba602d4..f2d7b7aba9 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -181,7 +181,7 @@ public: QueryState->RequestEv.reset(ev->Release().Release()); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( - TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); + Settings.Cluster, TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig); diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 74ab9977ab..69d72ee6e0 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -723,7 +723,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached); } - Y_UNIT_TEST(InsertIntoBucketCaching) { + Y_UNIT_TEST(InsertIntoBucketCaching) { const TString writeDataSourceName = "/Root/write_data_source"; const TString writeTableName = "/Root/write_binding"; const TString writeBucket = "test_bucket_cache"; @@ -1343,6 +1343,254 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); UNIT_ASSERT(scriptResult.GetPlan()); } + + Y_UNIT_TEST(ReadFromDataSourceWithoutTable) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString bucket = "test_bucket_inline_desc"; + const TString object = "test_object_inline_desc"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)sql", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_data_source}`; + )sql", + "external_data_source"_a=externalDataSourceName); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); + UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Attempt to read from external data source"); + } + + // select using inline syntax is well + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_data_source}`.`{obj_path}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + )sql", + "external_data_source"_a=externalDataSourceName, + "obj_path"_a = object); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + } + + Y_UNIT_TEST(InsertIntoDataSourceWithoutTable) { + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_binding"; + const TString readBucket = "test_bucket_read_insert_into_data_source"; + const TString readObject = "test_object_read"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_write_insert_into_data_source"; + const TString writeObject = "test_object_write/"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + )sql", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket) + ); + auto schemeResult = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + INSERT INTO `{write_source}` + SELECT * FROM `{read_table}` + )sql", + "read_table"_a=readTableName, + "write_source"_a = writeDataSourceName); + + auto db = kikimr->GetQueryClient(); + auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, static_cast<int>(result.GetStatus()) << ", " << result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Attempt to write to external data source"); + } + + // insert with inline syntax is well + { + Cerr << "Run inplace insert" << Endl; + const TString sql = fmt::format(R"sql( + INSERT INTO `{write_source}`.`{write_object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + SELECT * FROM `{read_table}` + )sql", + "read_table"_a=readTableName, + "write_source"_a = writeDataSourceName, + "write_object"_a = writeObject); + + auto db = kikimr->GetQueryClient(); + auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(SpecifyExternalTableInsteadOfExternalDataSource) { + const TString externalDataSourceName = "external_data_source"; + const TString externalTableName = "external_table"; + const TString bucket = "test_bucket_specify_external_table"; + const TString object = "test_object_specify_external_table"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + ); + )sql", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_table}`.`{object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ); + )sql", + "external_table"_a=externalTableName, + "object"_a = object); + + auto db = kikimr->GetQueryClient(); + auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL_C(queryExecutionOperation.GetStatus(), EStatus::SUCCESS, queryExecutionOperation.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); + } + + { + const TString sql = fmt::format(R"sql( + INSERT INTO `{external_table}`.`{object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + SELECT * FROM `{external_table}` WHERE key = '42'; + )sql", + "external_table"_a=externalTableName, + "object"_a = object); + + auto db = kikimr->GetQueryClient(); + auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL_C(queryExecutionOperation.GetStatus(), EStatus::SUCCESS, queryExecutionOperation.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 3215bf2717..7e0fbc7c16 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -34,7 +34,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) { auto counters = MakeIntrusive<TKqpRequestCounters>(); counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters); counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); - std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false); + std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false); return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig()); } diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index aecd1b44ba..1f53b50a83 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -4882,7 +4882,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { );)"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External table are disabled. Please contact your system administrator to enable it"); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External tables are disabled. Please contact your system administrator to enable it"); } Y_UNIT_TEST(CreateExternalTableCheckPrimaryKey) { |