diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-11-01 20:14:01 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-11-01 20:37:14 +0300 |
commit | 0e41ec2b5cb2e914863a0514e0b48ffc6c821496 (patch) | |
tree | 5f4fd522ef1b0ff05fccb70645c1eae2a27e7241 | |
parent | 419bbddb7f1e7587741e0cf4bddd26d2485656ae (diff) | |
download | ydb-0e41ec2b5cb2e914863a0514e0b48ffc6c821496.tar.gz |
Fix cost for stream query service call. KIKIMR-19243
Fix cost for stream query service call. KIKIMR-19243
Pull Request resolved: https://github.com/ydb-platform/ydb/pull/421
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 45 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp | 1 |
4 files changed, 72 insertions, 26 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 2b07aae0a7..03eda749d8 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -356,6 +356,10 @@ private: const auto& issueMessage = record.GetResponse().GetQueryIssues(); NYql::IssuesFromMessage(issueMessage, issues); + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + Request_->SetRuHeader(record.GetConsumedRu()); + } + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && NeedReportStats(*Request_->GetProtoRequest())) { Ydb::Query::ExecuteQueryResponsePart response; response.set_status(Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 1c7130f822..e880587900 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -739,6 +739,30 @@ TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool th return out.Str(); } +static void FillPlan(const NYdb::NTable::TScanQueryPart& streamPart, TCollectedStreamResult& res) { + if (streamPart.HasQueryStats() ) { + res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats()); + + auto plan = res.QueryStats->query_plan(); + if (!plan.empty()) { + res.PlanJson = plan; + } + } +} + +static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollectedStreamResult& res) { + if (streamPart.HasQueryStats() ) { + res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats()); + + auto plan = res.QueryStats->query_plan(); + if (!plan.empty()) { + res.PlanJson = plan; + } + } +} + +static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {} + template<typename TIterator> TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { TCollectedStreamResult res; @@ -770,6 +794,17 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { } } + if constexpr (std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) { + UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats(), + "Unexpected empty query service response."); + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + PrintResultSet(resultSet, resultSetWriter); + res.RowsCount += resultSet.RowsCount(); + } + } + if constexpr (std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) { if (streamPart.HasPartialResult()) { const auto& partialResult = streamPart.GetPartialResult(); @@ -780,15 +815,9 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { } if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator> - || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) { - if (streamPart.HasQueryStats() ) { - res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats()); - - auto plan = res.QueryStats->query_plan(); - if (!plan.empty()) { - res.PlanJson = plan; - } - } + || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator> + || std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) { + FillPlan(streamPart, res); } else { if (streamPart.HasPlan()) { res.PlanJson = streamPart.ExtractPlan(); @@ -809,6 +838,7 @@ TCollectedStreamResult CollectStreamResult(TIterator& it) { template TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it); template TCollectedStreamResult CollectStreamResult(NYdb::NScripting::TYqlResultPartIterator& it); +template TCollectedStreamResult CollectStreamResult(NYdb::NQuery::TExecuteQueryIterator& it); TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table) { TReadTableSettings settings; diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index b55b9642e7..6211df3be3 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -124,14 +124,15 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().bytes(), 20); } + const static TString Query = R"(SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1;)"; + const static TString Expected = R"([[[3500u];["None"];[1u];["Anna"]]])"; + Y_UNIT_TEST_TWIN(ScanQueryRangeFullScan, SourceRead) { TKikimrRunner kikimr(GetAppConfig(SourceRead)); auto db = kikimr.GetTableClient(); EnableDebugLogging(kikimr.GetTestServer().GetRuntime()); - auto query = Q_(R"( - SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1; - )"); + auto query = Q_(Query); NYdb::NTable::TStreamExecScanQuerySettings execSettings; execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); @@ -142,11 +143,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT(res.ConsumedRuFromHeader > 0); - CompareYson(R"( - [ - [[3500u];["None"];[1u];["Anna"]] - ] - )", res.ResultSetYson); + CompareYson(Expected, res.ResultSetYson); /* const auto& stats = *res.QueryStats; @@ -160,9 +157,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { TKikimrRunner kikimr(GetAppConfig(SourceRead)); NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); - auto query = Q_(R"( - SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1; - )"); + auto query = Q_(Query); NYdb::NScripting::TExecuteYqlRequestSettings execSettings; execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); @@ -173,12 +168,30 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT(res.ConsumedRuFromHeader > 0); - CompareYson(R"( - [ - [[3500u];["None"];[1u];["Anna"]] - ] - )", res.ResultSetYson); + CompareYson(Expected, res.ResultSetYson); } + + Y_UNIT_TEST_TWIN(QuerySeviceRangeFullScan, SourceRead) { + TKikimrRunner kikimr(GetAppConfig(SourceRead)); + + NYdb::NQuery::TQueryClient client(kikimr.GetDriver()); + auto query = Q_(Query); + + NYdb::NQuery::TExecuteQuerySettings execSettings; + + auto it = client.StreamExecuteQuery( + query, + NYdb::NQuery::TTxControl::BeginTx().CommitTx(), + execSettings + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS); + auto res = CollectStreamResult(it); + + UNIT_ASSERT(res.ConsumedRuFromHeader > 0); + + CompareYson(Expected, res.ResultSetYson); + } + } } diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index 21bd9e2125..39ba795499 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -65,7 +65,6 @@ public: NYql::TIssues issues; NYql::IssuesFromMessage(self->Response_.issues(), issues); EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); - // TODO: Add headers for streaming calls. TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}}; TStatus status{std::move(plainStatus)}; |