aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-11-01 20:14:01 +0300
committerdcherednik <dcherednik@ydb.tech>2023-11-01 20:37:14 +0300
commit0e41ec2b5cb2e914863a0514e0b48ffc6c821496 (patch)
tree5f4fd522ef1b0ff05fccb70645c1eae2a27e7241
parent419bbddb7f1e7587741e0cf4bddd26d2485656ae (diff)
downloadydb-0e41ec2b5cb2e914863a0514e0b48ffc6c821496.tar.gz
Fix cost for stream query service call. KIKIMR-19243
Fix cost for stream query service call. KIKIMR-19243 Pull Request resolved: https://github.com/ydb-platform/ydb/pull/421
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp4
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp48
-rw-r--r--ydb/core/kqp/ut/cost/kqp_cost_ut.cpp45
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp1
4 files changed, 72 insertions, 26 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 2b07aae0a7..03eda749d8 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -356,6 +356,10 @@ private:
const auto& issueMessage = record.GetResponse().GetQueryIssues();
NYql::IssuesFromMessage(issueMessage, issues);
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ Request_->SetRuHeader(record.GetConsumedRu());
+ }
+
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && NeedReportStats(*Request_->GetProtoRequest())) {
Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 1c7130f822..e880587900 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -739,6 +739,30 @@ TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool th
return out.Str();
}
+static void FillPlan(const NYdb::NTable::TScanQueryPart& streamPart, TCollectedStreamResult& res) {
+ if (streamPart.HasQueryStats() ) {
+ res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats());
+
+ auto plan = res.QueryStats->query_plan();
+ if (!plan.empty()) {
+ res.PlanJson = plan;
+ }
+ }
+}
+
+static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollectedStreamResult& res) {
+ if (streamPart.HasQueryStats() ) {
+ res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats());
+
+ auto plan = res.QueryStats->query_plan();
+ if (!plan.empty()) {
+ res.PlanJson = plan;
+ }
+ }
+}
+
+static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {}
+
template<typename TIterator>
TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
TCollectedStreamResult res;
@@ -770,6 +794,17 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
}
}
+ if constexpr (std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) {
+ UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats(),
+ "Unexpected empty query service response.");
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ PrintResultSet(resultSet, resultSetWriter);
+ res.RowsCount += resultSet.RowsCount();
+ }
+ }
+
if constexpr (std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
if (streamPart.HasPartialResult()) {
const auto& partialResult = streamPart.GetPartialResult();
@@ -780,15 +815,9 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
}
if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>
- || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
- if (streamPart.HasQueryStats() ) {
- res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats());
-
- auto plan = res.QueryStats->query_plan();
- if (!plan.empty()) {
- res.PlanJson = plan;
- }
- }
+ || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>
+ || std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) {
+ FillPlan(streamPart, res);
} else {
if (streamPart.HasPlan()) {
res.PlanJson = streamPart.ExtractPlan();
@@ -809,6 +838,7 @@ TCollectedStreamResult CollectStreamResult(TIterator& it) {
template TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it);
template TCollectedStreamResult CollectStreamResult(NYdb::NScripting::TYqlResultPartIterator& it);
+template TCollectedStreamResult CollectStreamResult(NYdb::NQuery::TExecuteQueryIterator& it);
TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table) {
TReadTableSettings settings;
diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
index b55b9642e7..6211df3be3 100644
--- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
+++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
@@ -124,14 +124,15 @@ Y_UNIT_TEST_SUITE(KqpCost) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().bytes(), 20);
}
+ const static TString Query = R"(SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1;)";
+ const static TString Expected = R"([[[3500u];["None"];[1u];["Anna"]]])";
+
Y_UNIT_TEST_TWIN(ScanQueryRangeFullScan, SourceRead) {
TKikimrRunner kikimr(GetAppConfig(SourceRead));
auto db = kikimr.GetTableClient();
EnableDebugLogging(kikimr.GetTestServer().GetRuntime());
- auto query = Q_(R"(
- SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1;
- )");
+ auto query = Q_(Query);
NYdb::NTable::TStreamExecScanQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
@@ -142,11 +143,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
UNIT_ASSERT(res.ConsumedRuFromHeader > 0);
- CompareYson(R"(
- [
- [[3500u];["None"];[1u];["Anna"]]
- ]
- )", res.ResultSetYson);
+ CompareYson(Expected, res.ResultSetYson);
/*
const auto& stats = *res.QueryStats;
@@ -160,9 +157,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
TKikimrRunner kikimr(GetAppConfig(SourceRead));
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
- auto query = Q_(R"(
- SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1;
- )");
+ auto query = Q_(Query);
NYdb::NScripting::TExecuteYqlRequestSettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
@@ -173,12 +168,30 @@ Y_UNIT_TEST_SUITE(KqpCost) {
UNIT_ASSERT(res.ConsumedRuFromHeader > 0);
- CompareYson(R"(
- [
- [[3500u];["None"];[1u];["Anna"]]
- ]
- )", res.ResultSetYson);
+ CompareYson(Expected, res.ResultSetYson);
}
+
+ Y_UNIT_TEST_TWIN(QuerySeviceRangeFullScan, SourceRead) {
+ TKikimrRunner kikimr(GetAppConfig(SourceRead));
+
+ NYdb::NQuery::TQueryClient client(kikimr.GetDriver());
+ auto query = Q_(Query);
+
+ NYdb::NQuery::TExecuteQuerySettings execSettings;
+
+ auto it = client.StreamExecuteQuery(
+ query,
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx(),
+ execSettings
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS);
+ auto res = CollectStreamResult(it);
+
+ UNIT_ASSERT(res.ConsumedRuFromHeader > 0);
+
+ CompareYson(Expected, res.ResultSetYson);
+ }
+
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
index 21bd9e2125..39ba795499 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
@@ -65,7 +65,6 @@ public:
NYql::TIssues issues;
NYql::IssuesFromMessage(self->Response_.issues(), issues);
EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
- // TODO: Add headers for streaming calls.
TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}};
TStatus status{std::move(plainStatus)};