diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-09-21 17:23:53 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-09-21 17:44:32 +0300 |
commit | 46510fe73a29904c64c8b51f5d182d4b160b68e9 (patch) | |
tree | fdfc90c28facde07bede0da76965861b2bd1475b | |
parent | 4dc08f464b9d71f6a75cf3ce9517fc733529e69b (diff) | |
download | ydb-46510fe73a29904c64c8b51f5d182d4b160b68e9.tar.gz |
Allow forbidden callables in effects queries without YDB (S3-only)
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 117 |
2 files changed, 142 insertions, 8 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 3054575846a..a0238fca5aa 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -527,7 +527,7 @@ public: return TStatus::Error; } - if (!CheckEffectsTx(tx.Cast(), ctx)) { + if (!CheckEffectsTx(tx.Cast(), query, ctx)) { return TStatus::Error; } @@ -543,7 +543,34 @@ public: } private: - bool CheckEffectsTx(TKqpPhysicalTx tx, TExprContext& ctx) const { + bool HasTableEffects(const TKqlQuery& query) const { + for (const TExprBase& effect : query.Effects()) { + if (auto maybeSinkEffect = effect.Maybe<TKqpSinkEffect>()) { + // (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0) + auto sinkEffect = maybeSinkEffect.Cast(); + const size_t sinkIndex = FromString(TStringBuf(sinkEffect.SinkIndex())); + auto stageExpr = sinkEffect.Stage(); + auto maybeStageBase = stageExpr.Maybe<TDqStageBase>(); + YQL_ENSURE(maybeStageBase); + auto stage = maybeStageBase.Cast(); + YQL_ENSURE(stage.Outputs()); + auto outputs = stage.Outputs().Cast(); + YQL_ENSURE(sinkIndex < outputs.Size()); + auto maybeSink = outputs.Item(sinkIndex); + YQL_ENSURE(maybeSink.Maybe<TDqSink>()); + auto sink = maybeSink.Cast<TDqSink>(); + auto dataSink = TCoDataSink(sink.DataSink().Ptr()); + if (dataSink.Category() == YdbProviderName || dataSink.Category() == KikimrProviderName) { + return true; + } + } else { // Not a SinkEffect, => a YDB table effect + return true; + } + } + return false; + } + + bool CheckEffectsTx(TKqpPhysicalTx tx, const TKqlQuery& query, TExprContext& ctx) const { TMaybeNode<TExprBase> blackistedNode; VisitExpr(tx.Ptr(), [&blackistedNode](const TExprNode::TPtr& exprNode) { if (blackistedNode) { @@ -565,7 +592,7 @@ private: return true; }); - if (blackistedNode) { + if (blackistedNode && HasTableEffects(query)) { ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder() << "Callable not expected in effects tx: " << blackistedNode.Cast<TCallable>().CallableName())); return false; diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index b0468977916..bc48c8e10d1 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -922,7 +922,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "write_object"_a = writeObject, "read_table"_a = readTableName); - + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } @@ -1004,7 +1004,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); resultFuture.Wait(); UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - + UNIT_ASSERT_NO_DIFF(resultFuture.GetValueSync().GetIssues().ToString(), "<main>: Error: Pre type annotation, code: 1020\n" " <main>:3:27: Error: Write mode 'update' is not supported for external entities\n"); } @@ -1129,7 +1129,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); @@ -1198,7 +1198,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); @@ -1259,7 +1259,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); @@ -1274,6 +1274,113 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.GetValue(0).GetProto().bytes_value(), content); } } + + Y_UNIT_TEST(ForbiddenCallablesForYdbTables) { + using namespace fmt::literals; + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_table"; + const TString readBucket = "test_read_bucket_forbidden_callables"; + const TString readObject = "test_object_forbidden_callables"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_table"; + const TString writeBucket = "test_write_bucket_forbidden_callables"; + const TString writeObject = "test_object_forbidden_callables/"; + const TString writeYdbTable = "/Root/test_ydb_table"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8, -- Nullable + value Utf8 -- Nullable + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="json_each_row" + ); + + CREATE TABLE `{write_ydb_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY (key) + ); + )", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject, + "write_ydb_table"_a = writeYdbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + // Forbidden callable like Unwrap is allowed in S3-only queries, + // but not allowed in mixed queries. + { + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}` + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + } + + // Unwrap is used in query with effect applied to YDB table. + { + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}`; + + DELETE FROM `{write_ydb_table}` + WHERE key = "42"; + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName, + "write_ydb_table"_a = writeYdbTable); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap"); + } + } } } // namespace NKqp |