diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-08-08 12:14:21 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-08-08 13:19:14 +0300 |
commit | 6531528c7f69c09862eb0c8449c970aa98e0a623 (patch) | |
tree | 48ca34daecc5c3c9c362011535ac6b24b3bb7ef4 | |
parent | 3f450d2fb597681b15b0848b7cccfaa899143a8c (diff) | |
download | ydb-6531528c7f69c09862eb0c8449c970aa98e0a623.tar.gz |
Fix build dq task with channels and source
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp | 112 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/query.cpp | 4 |
3 files changed, 106 insertions, 13 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 61f2db18f8..3558d2bc73 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -756,8 +756,7 @@ protected: const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); - YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, - "multiple sources or sources mixed with connections"); + YQL_ENSURE(stage.SourcesSize() == 1, "multiple sources in one task are not supported"); const auto& stageSource = stage.GetSources(0); const auto& externalSource = stageSource.GetExternalSource(); diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 18e0b3766b..0ec78b3749 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -29,8 +29,15 @@ R"({"key": "1", "value": "trololo"} {"key": "2", "value": "hello world"} )"sv; +constexpr TStringBuf TEST_CONTENT_KEYS = +R"({"key": "1"} +{"key": "3"} +)"sv; + const TString TEST_SCHEMA = R"(["StructType";[["key";["DataType";"Utf8";];];["value";["DataType";"Utf8";];];];])"; +const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])"; + bool InitAwsApi() { Aws::InitAPI(Aws::SDKOptions()); return true; @@ -64,19 +71,26 @@ void CreateBucket(const TString& bucket) { CreateBucket(bucket, s3Client); } -void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) { - CreateBucket(bucket, s3Client); +void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) { + Aws::S3::Model::PutObjectRequest req; + req.WithBucket(bucket).WithKey(object); - { - Aws::S3::Model::PutObjectRequest req; - req.WithBucket(bucket).WithKey(object); + auto inputStream = std::make_shared<std::stringstream>(); + *inputStream << content; + req.SetBody(inputStream); + const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req); + UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage()); +} - auto inputStream = std::make_shared<std::stringstream>(); - *inputStream << content; - req.SetBody(inputStream); - const Aws::S3::Model::PutObjectOutcome result = s3Client.PutObject(req); - UNIT_ASSERT_C(result.IsSuccess(), "Error uploading object \"" << object << "\" to a bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage()); - } +void UploadObject(const TString& bucket, const TString& object, const TStringBuf& content) { + Aws::S3::S3Client s3Client = MakeS3Client(); + + UploadObject(bucket, object, content, s3Client); +} + +void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) { + CreateBucket(bucket, s3Client); + UploadObject(bucket, object, content, s3Client); } void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) { @@ -815,6 +829,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n"); UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n"); } + + Y_UNIT_TEST(JoinTwoSources) { + using namespace fmt::literals; + const TString dataSource = "/Root/data_source"; + const TString bucket = "test_bucket_mixed"; + const TString dataTable = "/Root/data"; + const TString dataObject = "data"; + const TString keysTable = "/Root/keys"; + const TString keysObject = "keys"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(bucket, s3Client); + UploadObject(bucket, dataObject, TEST_CONTENT, s3Client); + UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client); + } + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{data_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{bucket_location}", + AUTH_METHOD="NONE" + ); + + CREATE EXTERNAL TABLE `{data_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{data_source}", + LOCATION="{data_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL TABLE `{keys_table}` ( + key Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{data_source}", + LOCATION="{keys_object}", + FORMAT="json_each_row" + ); + )", + "data_source"_a = dataSource, + "bucket_location"_a = GetBucketLocation(bucket), + "data_table"_a = dataTable, + "data_object"_a = dataObject, + "keys_table"_a = keysTable, + "keys_object"_a = keysObject + ); + auto schemeQueryesult = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(schemeQueryesult.GetStatus() == NYdb::EStatus::SUCCESS, schemeQueryesult.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + SELECT * FROM `{data_table}` + WHERE key IN ( + SELECT key FROM `{keys_table}` + ) + )", + "data_table"_a = dataTable, + "keys_table"_a = keysTable); + + auto db = kikimr.GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + auto result = resultFuture.GetValueSync().GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(result.RowsCount(), 1); + UNIT_ASSERT(result.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("key").GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo"); + UNIT_ASSERT(!result.TryNextRow()); + } } } // namespace NKqp diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/ydb_query/query.cpp index 7da774d5e7..9fae3953e9 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/query.cpp @@ -14,6 +14,10 @@ TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const { return ResultSets_[resultIndex]; } +TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const { + return TResultSetParser(GetResultSet(resultIndex)); +} + TScriptExecutionOperation::TScriptExecutionOperation(TStatus&& status, Ydb::Operations::Operation&& operation) : TOperation(std::move(status), std::move(operation)) { |