diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-10-11 13:06:41 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-10-11 13:35:51 +0300 |
commit | f23fad2ba0d00dc204697a8e10c333690d083b0e (patch) | |
tree | 1151005ac1406df83ba4bae3f891c66aa15f4c47 | |
parent | c5f748cdb834e1f931d33ddca00cc8cd308985ec (diff) | |
download | ydb-f23fad2ba0d00dc204697a8e10c333690d083b0e.tar.gz |
Set request unit trailing header for ScanQuery/ScanYql sctipts. KIKIMR-19243
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 11 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 28 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp | 6 |
7 files changed, 53 insertions, 3 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index e3f786242d..acec2de075 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -132,6 +132,7 @@ struct TGrpcStatus { TString Details; int GRpcStatusCode; bool InternalError; + std::multimap<TString, TString> ServerTrailingMetadata; TGrpcStatus() : GRpcStatusCode(grpc::StatusCode::OK) @@ -809,6 +810,11 @@ private: } else if (readCallback) { if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + for (const auto& [name, value] : Context.GetServerTrailingMetadata()) { + status.ServerTrailingMetadata.emplace( + TString(name.begin(), name.end()), + TString(value.begin(), value.end())); + } } readCallback(std::move(status)); } else if (finishCallback) { @@ -1205,6 +1211,11 @@ private: } else if (readCallback) { if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + for (const auto& [name, value] : Context.GetServerTrailingMetadata()) { + status.ServerTrailingMetadata.emplace( + TString(name.begin(), name.end()), + TString(value.begin(), value.end())); + } } readCallback(std::move(status)); } else if (finishCallback) { diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 40b18a3f89..0643c7a8bd 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -284,6 +284,8 @@ private: NYql::IssuesFromMessage(issueMessage, issues); if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + Request_->SetRuHeader(record.GetConsumedRu()); + Ydb::Table::ExecuteScanQueryPartialResponse response; TString out; auto& kqpResponse = record.GetResponse(); @@ -316,7 +318,6 @@ private: Request_->SendSerializedResult(std::move(out), record.GetYdbStatus()); } } - ReplyFinishStream(record.GetYdbStatus(), issues); } diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index ae335e6dc9..e3c7291fa5 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -344,6 +344,8 @@ private: NYql::IssuesFromMessage(issueMessage, issues); if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + Request_->SetRuHeader(record.GetConsumedRu()); + Ydb::Scripting::ExecuteYqlPartialResponse response; TString out; auto& kqpResponse = record.GetResponse(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 122f7dd286..f0db5fea88 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -738,6 +738,11 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + const auto& meta = streamPart.GetResponseMetadata(); + auto mit = meta.find("x-ydb-consumed-units"); + if (mit != meta.end()) { + res.ConsumedRuFromHeader = std::stol(mit->second); + } break; } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index afa9ac0150..e739e2e96e 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -189,6 +189,7 @@ struct TCollectedStreamResult { TMaybe<TString> PlanJson; TMaybe<Ydb::TableStats::QueryStats> QueryStats; ui64 RowsCount = 0; + ui64 ConsumedRuFromHeader = 0; }; template<typename TIterator> diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 597bc523a3..b55b9642e7 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -140,13 +140,15 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS); auto res = CollectStreamResult(it); + UNIT_ASSERT(res.ConsumedRuFromHeader > 0); + CompareYson(R"( [ [[3500u];["None"];[1u];["Anna"]] ] )", res.ResultSetYson); - -/* const auto& stats = *res.QueryStats; +/* + const auto& stats = *res.QueryStats; Cerr << stats.DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); @@ -154,7 +156,29 @@ Y_UNIT_TEST_SUITE(KqpCost) { */ } + Y_UNIT_TEST_TWIN(ScanScriptingRangeFullScan, SourceRead) { + TKikimrRunner kikimr(GetAppConfig(SourceRead)); + + NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + auto query = Q_(R"( + SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1; + )"); + + NYdb::NScripting::TExecuteYqlRequestSettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + auto it = client.StreamExecuteYqlScript(query, execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS); + auto res = CollectStreamResult(it); + + UNIT_ASSERT(res.ConsumedRuFromHeader > 0); + + CompareYson(R"( + [ + [[3500u];["None"];[1u];["Anna"]] + ] + )", res.ResultSetYson); + } } } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp index 0449c78552..1620cc8e18 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp @@ -54,6 +54,12 @@ TPlainStatus::TPlainStatus( if (msg) { Issues.AddIssue(NYql::TIssue(msg)); } + for (const auto& [name, value] : grpcStatus.ServerTrailingMetadata) { + Metadata.emplace( + TStringType(name.begin(), name.end()), + TStringType(value.begin(), value.end()) + ); + } } TPlainStatus TPlainStatus::Internal(const TStringType& message) { |