diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-08-22 16:59:45 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-08-22 17:36:02 +0300 |
commit | d40726ba705519837b014cb71110b86258906f0d (patch) | |
tree | 496a6cdf0860dce49d9f343ea5f34a0952a138bc | |
parent | 3af30f8d6f0ef3a03c4f64174e2296d97043c958 (diff) | |
download | ydb-d40726ba705519837b014cb71110b86258906f0d.tar.gz |
KIKIMR-18756: ExternalTable/ExternalSource version check
-rw-r--r-- | ydb/core/kqp/common/kqp_resolve.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 25 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_common.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp | 159 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/scheme_board/cache.cpp | 3 |
10 files changed, 211 insertions, 15 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h index aca17f50a7b..0c9553e89d9 100644 --- a/ydb/core/kqp/common/kqp_resolve.h +++ b/ydb/core/kqp/common/kqp_resolve.h @@ -19,7 +19,8 @@ enum class ETableKind { Unknown = 0, Datashard, SysView, - Olap + Olap, + External }; class TKqpTableKeys { @@ -99,6 +100,9 @@ public: case NKqpProto::TABLE_KIND_SYS_VIEW: TableKind = ETableKind::SysView; break; + case NKqpProto::TABLE_KIND_EXTERNAL: + TableKind = ETableKind::External; + return; default: YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind()); } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 735da43d1fa..219cfaeca6e 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -854,6 +854,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto switch (tableInfo->TableKind) { case ETableKind::Unknown: + case ETableKind::External: case ETableKind::SysView: { protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); break; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index 4425e5ca4e7..63bf0fcc28d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -518,7 +518,7 @@ public: YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType; return nullptr; } - + ctx.Step.Repeat(TExprStep::DiscoveryIO) .Repeat(TExprStep::Epochs) .Repeat(TExprStep::Intents) diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 3ae9aa60946..fb93b5ff730 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -59,6 +59,7 @@ TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ct items.emplace_back(ctx.NewList(pos, std::move(children))); } + return ctx.NewList(pos, std::move(items)); } @@ -689,7 +690,7 @@ public: .Repeat(TExprStep::RewriteIO); TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) }); auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path}); - auto key = ctx.NewCallable(node->Pos(), "Key", {table}); + auto newKey = ctx.NewCallable(node->Pos(), "Key", {table}); auto newRead = Build<TCoRead>(ctx, node->Pos()) .World(read->Child(0)) .DataSource( @@ -701,9 +702,10 @@ public: .Done().Ptr() ) .FreeArgs() - .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key})) + .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {newKey})) .Add(ctx.NewCallable(node->Pos(), "Void", {})) .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) + .Build() .Done().Ptr(); auto retChildren = node->ChildrenList(); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index b7da08f0106..84f448d36f5 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -107,6 +107,8 @@ NKqpProto::EKqpPhyTableKind GetPhyTableKind(EKikimrTableKind kind) { return NKqpProto::TABLE_KIND_OLAP; case EKikimrTableKind::SysView: return NKqpProto::TABLE_KIND_SYS_VIEW; + case EKikimrTableKind::External: + return NKqpProto::TABLE_KIND_EXTERNAL; default: return NKqpProto::TABLE_KIND_UNSPECIFIED; } @@ -786,6 +788,18 @@ private: FillTable(*tableMeta, std::move(tableColumns), *txProto.AddTables()); } + for (const auto& [a, desc] : TablesData->GetTables()) { + auto tableMeta = desc.Metadata; + YQL_ENSURE(tableMeta); + if (desc.Metadata->Kind == NYql::EKikimrTableKind::External) { + THashSet<TStringBuf> columns; + for (const auto& [col, _]: tableMeta->Columns){ + columns.emplace(col); + } + FillTable(*tableMeta, std::move(columns), *txProto.AddTables()); + } + } + for (const auto& secretName : SecretNames) { txProto.AddSecretNames(secretName); } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index f2013bb807e..92319c8971f 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -178,17 +178,16 @@ public: // todo: gvit // fill this hash set only once on query compilation. void FillTables(const NKqpProto::TKqpPhyTx& phyTx) { + auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) { + NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId()); + auto it = TableVersions.find(tableId); + if (it != TableVersions.end()) { + Y_ENSURE(it->second == table.GetVersion()); + } else { + TableVersions.emplace(tableId, table.GetVersion()); + } + }; for (const auto& stage : phyTx.GetStages()) { - - auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) { - NKikimr::TTableId tableId(table.GetOwnerId(), table.GetTableId()); - auto it = TableVersions.find(tableId); - if (it != TableVersions.end()) { - Y_ENSURE(it->second == table.GetVersion()); - } else { - TableVersions.emplace(tableId, table.GetVersion()); - } - }; for (const auto& tableOp : stage.GetTableOps()) { addTable(tableOp.GetTable()); } @@ -209,6 +208,12 @@ public: } } } + + for (const auto& table : phyTx.GetTables()) { + if (table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_EXTERNAL) { + addTable(table.GetId()); + } + } } bool NeedCheckTableVersions() const { diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp index ed0dcaaa078..901fed55c01 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp @@ -178,6 +178,15 @@ bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query) { if (tx.GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME) { return false; } + + for (const auto& stage : tx.GetStages()) { + for (const auto& source : stage.GetSources()) { + // S3 provider stores S3 paths to read in AST, so we can't cache such queries + if (source.HasExternalSource() && source.GetExternalSource().GetType() == "S3Source") { + return false; + } + } + } } return true; diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 42608e7fa3c..93e644cba81 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -12,6 +12,7 @@ #include <ydb/library/yql/utils/log/log.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> #include <fmt/format.h> @@ -313,6 +314,63 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { } } + Y_UNIT_TEST(ExecuteScriptWithS3ReadNotCached) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto settings = TExecuteScriptSettings().StatsMode(Ydb::Query::STATS_MODE_BASIC); + + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); + + scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); + readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); + } + Y_UNIT_TEST(ExecuteScriptWithDataSource) { using namespace fmt::literals; const TString externalDataSourceName = "/Root/external_data_source"; @@ -830,6 +888,107 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n"); } + void ExecuteInsertQuery(TQueryClient& client, const TString& writeTableName, const TString& readTableName, bool expectCached) { + using namespace fmt::literals; + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT * FROM `{read_table}`; + )", + "write_table"_a = writeTableName, + "read_table"_a = readTableName); + auto settings = TExecuteQuerySettings().StatsMode(EStatsMode::Basic); + auto resultFuture = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + auto& stats = NYdb::TProtoAccessor::GetProto(*resultFuture.GetValueSync().GetStats()); + UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached); + } + + Y_UNIT_TEST(InsertIntoBucketCaching) { + using namespace fmt::literals; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_cache"; + const TString writeObject = "test_object_write/"; + const TString writeAnotherObject = "test_another_object_write/"; + const TString readTableName = "/Root/read_table"; + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + + CREATE TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY (key) + ); + )", + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject, + "read_table"_a = readTableName); + + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{read_table}` (key, value) VALUES + ("1", "one") + )", + "read_table"_a = readTableName); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr.GetQueryClient(); + ExecuteInsertQuery(db, writeTableName, readTableName, false); + ExecuteInsertQuery(db, writeTableName, readTableName, true); + { + const TString modifyQuery = fmt::format(R"( + DROP EXTERNAL TABLE `{write_table}`; + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + )", + "write_table"_a = writeTableName, + "write_object"_a = writeAnotherObject, + "write_source"_a = writeDataSourceName); + auto result = session.ExecuteSchemeQuery(modifyQuery).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + ExecuteInsertQuery(db, writeTableName, readTableName, false); + + UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3); + } + Y_UNIT_TEST(JoinTwoSources) { using namespace fmt::literals; const TString dataSource = "/Root/data_source"; diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index fd208e72a30..3f44bc24818 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -74,6 +74,7 @@ enum EKqpPhyTableKind { TABLE_KIND_DS = 1; TABLE_KIND_OLAP = 2; TABLE_KIND_SYS_VIEW = 3; + TABLE_KIND_EXTERNAL = 4; } message TKqpPhyTableId { diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 77e76a54a2d..a88e2c3cbd1 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -1716,7 +1716,8 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { return SetError(context, entry, TNavigate::EStatus::PathErrorUnknown); } - const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable; + const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable || + Kind == TNavigate::KindExternalTable || Kind == TNavigate::KindExternalDataSource; const bool isTopic = Kind == TNavigate::KindTopic || Kind == TNavigate::KindCdcStream; if (entry.Operation == TNavigate::OpTable && !isTable) { |