aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-10-11 13:06:41 +0300
committerdcherednik <dcherednik@ydb.tech>2023-10-11 13:35:51 +0300
commitf23fad2ba0d00dc204697a8e10c333690d083b0e (patch)
tree1151005ac1406df83ba4bae3f891c66aa15f4c47
parentc5f748cdb834e1f931d33ddca00cc8cd308985ec (diff)
downloadydb-f23fad2ba0d00dc204697a8e10c333690d083b0e.tar.gz
Set request unit trailing header for ScanQuery/ScanYql sctipts. KIKIMR-19243
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h11
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp3
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp2
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp5
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h1
-rw-r--r--ydb/core/kqp/ut/cost/kqp_cost_ut.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp6
7 files changed, 53 insertions, 3 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index e3f786242d..acec2de075 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -132,6 +132,7 @@ struct TGrpcStatus {
TString Details;
int GRpcStatusCode;
bool InternalError;
+ std::multimap<TString, TString> ServerTrailingMetadata;
TGrpcStatus()
: GRpcStatusCode(grpc::StatusCode::OK)
@@ -809,6 +810,11 @@ private:
} else if (readCallback) {
if (status.Ok()) {
status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
+ status.ServerTrailingMetadata.emplace(
+ TString(name.begin(), name.end()),
+ TString(value.begin(), value.end()));
+ }
}
readCallback(std::move(status));
} else if (finishCallback) {
@@ -1205,6 +1211,11 @@ private:
} else if (readCallback) {
if (status.Ok()) {
status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
+ status.ServerTrailingMetadata.emplace(
+ TString(name.begin(), name.end()),
+ TString(value.begin(), value.end()));
+ }
}
readCallback(std::move(status));
} else if (finishCallback) {
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 40b18a3f89..0643c7a8bd 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -284,6 +284,8 @@ private:
NYql::IssuesFromMessage(issueMessage, issues);
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ Request_->SetRuHeader(record.GetConsumedRu());
+
Ydb::Table::ExecuteScanQueryPartialResponse response;
TString out;
auto& kqpResponse = record.GetResponse();
@@ -316,7 +318,6 @@ private:
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
}
}
-
ReplyFinishStream(record.GetYdbStatus(), issues);
}
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index ae335e6dc9..e3c7291fa5 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -344,6 +344,8 @@ private:
NYql::IssuesFromMessage(issueMessage, issues);
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ Request_->SetRuHeader(record.GetConsumedRu());
+
Ydb::Scripting::ExecuteYqlPartialResponse response;
TString out;
auto& kqpResponse = record.GetResponse();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 122f7dd286..f0db5fea88 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -738,6 +738,11 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ const auto& meta = streamPart.GetResponseMetadata();
+ auto mit = meta.find("x-ydb-consumed-units");
+ if (mit != meta.end()) {
+ res.ConsumedRuFromHeader = std::stol(mit->second);
+ }
break;
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index afa9ac0150..e739e2e96e 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -189,6 +189,7 @@ struct TCollectedStreamResult {
TMaybe<TString> PlanJson;
TMaybe<Ydb::TableStats::QueryStats> QueryStats;
ui64 RowsCount = 0;
+ ui64 ConsumedRuFromHeader = 0;
};
template<typename TIterator>
diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
index 597bc523a3..b55b9642e7 100644
--- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
+++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
@@ -140,13 +140,15 @@ Y_UNIT_TEST_SUITE(KqpCost) {
UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS);
auto res = CollectStreamResult(it);
+ UNIT_ASSERT(res.ConsumedRuFromHeader > 0);
+
CompareYson(R"(
[
[[3500u];["None"];[1u];["Anna"]]
]
)", res.ResultSetYson);
-
-/* const auto& stats = *res.QueryStats;
+/*
+ const auto& stats = *res.QueryStats;
Cerr << stats.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
@@ -154,7 +156,29 @@ Y_UNIT_TEST_SUITE(KqpCost) {
*/
}
+ Y_UNIT_TEST_TWIN(ScanScriptingRangeFullScan, SourceRead) {
+ TKikimrRunner kikimr(GetAppConfig(SourceRead));
+
+ NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+ auto query = Q_(R"(
+ SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1;
+ )");
+
+ NYdb::NScripting::TExecuteYqlRequestSettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ auto it = client.StreamExecuteYqlScript(query, execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS);
+ auto res = CollectStreamResult(it);
+
+ UNIT_ASSERT(res.ConsumedRuFromHeader > 0);
+
+ CompareYson(R"(
+ [
+ [[3500u];["None"];[1u];["Anna"]]
+ ]
+ )", res.ResultSetYson);
+ }
}
}
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp
index 0449c78552..1620cc8e18 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp
@@ -54,6 +54,12 @@ TPlainStatus::TPlainStatus(
if (msg) {
Issues.AddIssue(NYql::TIssue(msg));
}
+ for (const auto& [name, value] : grpcStatus.ServerTrailingMetadata) {
+ Metadata.emplace(
+ TStringType(name.begin(), name.end()),
+ TStringType(value.begin(), value.end())
+ );
+ }
}
TPlainStatus TPlainStatus::Internal(const TStringType& message) {