diff options
| author | dcherednik <[email protected]> | 2023-10-11 13:06:41 +0300 | 
|---|---|---|
| committer | dcherednik <[email protected]> | 2023-10-11 13:35:51 +0300 | 
| commit | f23fad2ba0d00dc204697a8e10c333690d083b0e (patch) | |
| tree | 1151005ac1406df83ba4bae3f891c66aa15f4c47 | |
| parent | c5f748cdb834e1f931d33ddca00cc8cd308985ec (diff) | |
Set request unit trailing header for ScanQuery/ScanYql sctipts. KIKIMR-19243
| -rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 11 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 3 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 28 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp | 6 | 
7 files changed, 53 insertions, 3 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index e3f786242db..acec2de0758 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -132,6 +132,7 @@ struct TGrpcStatus {      TString Details;      int GRpcStatusCode;      bool InternalError; +    std::multimap<TString, TString> ServerTrailingMetadata;      TGrpcStatus()          : GRpcStatusCode(grpc::StatusCode::OK) @@ -809,6 +810,11 @@ private:          } else if (readCallback) {              if (status.Ok()) {                  status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); +                for (const auto& [name, value] : Context.GetServerTrailingMetadata()) { +                    status.ServerTrailingMetadata.emplace( +                        TString(name.begin(), name.end()), +                        TString(value.begin(), value.end())); +                }              }              readCallback(std::move(status));          } else if (finishCallback) { @@ -1205,6 +1211,11 @@ private:          } else if (readCallback) {              if (status.Ok()) {                  status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); +                for (const auto& [name, value] : Context.GetServerTrailingMetadata()) { +                    status.ServerTrailingMetadata.emplace( +                        TString(name.begin(), name.end()), +                        TString(value.begin(), value.end())); +                }              }              readCallback(std::move(status));          } else if (finishCallback) { diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 40b18a3f89d..0643c7a8bd3 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -284,6 +284,8 @@ private:          NYql::IssuesFromMessage(issueMessage, issues);          if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { +            Request_->SetRuHeader(record.GetConsumedRu()); +              Ydb::Table::ExecuteScanQueryPartialResponse response;              TString out;              auto& kqpResponse = record.GetResponse(); @@ -316,7 +318,6 @@ private:                  Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());              }          } -          ReplyFinishStream(record.GetYdbStatus(), issues);      } diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index ae335e6dc9f..e3c7291fa50 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -344,6 +344,8 @@ private:          NYql::IssuesFromMessage(issueMessage, issues);          if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { +            Request_->SetRuHeader(record.GetConsumedRu()); +              Ydb::Scripting::ExecuteYqlPartialResponse response;              TString out;              auto& kqpResponse = record.GetResponse(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 122f7dd2867..f0db5fea88b 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -738,6 +738,11 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {          auto streamPart = it.ReadNext().GetValueSync();          if (!streamPart.IsSuccess()) {              UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); +            const auto& meta = streamPart.GetResponseMetadata(); +            auto mit = meta.find("x-ydb-consumed-units"); +            if (mit != meta.end()) { +                res.ConsumedRuFromHeader = std::stol(mit->second); +            }              break;          } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index afa9ac01508..e739e2e96ea 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -189,6 +189,7 @@ struct TCollectedStreamResult {      TMaybe<TString> PlanJson;      TMaybe<Ydb::TableStats::QueryStats> QueryStats;      ui64 RowsCount = 0; +    ui64 ConsumedRuFromHeader = 0;  };  template<typename TIterator> diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 597bc523a32..b55b9642e79 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -140,13 +140,15 @@ Y_UNIT_TEST_SUITE(KqpCost) {          UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS);          auto res = CollectStreamResult(it); +        UNIT_ASSERT(res.ConsumedRuFromHeader > 0); +          CompareYson(R"(              [                  [[3500u];["None"];[1u];["Anna"]]              ]          )", res.ResultSetYson); - -/*        const auto& stats = *res.QueryStats; +/* +        const auto& stats = *res.QueryStats;          Cerr << stats.DebugString() << Endl;          UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); @@ -154,7 +156,29 @@ Y_UNIT_TEST_SUITE(KqpCost) {  */      } +    Y_UNIT_TEST_TWIN(ScanScriptingRangeFullScan, SourceRead) { +        TKikimrRunner kikimr(GetAppConfig(SourceRead)); + +        NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); +        auto query = Q_(R"( +            SELECT * FROM `/Root/Test` WHERE Amount < 5000ul ORDER BY Group LIMIT 1; +        )"); + +        NYdb::NScripting::TExecuteYqlRequestSettings execSettings; +        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); +        auto it = client.StreamExecuteYqlScript(query, execSettings).ExtractValueSync(); +        UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), EStatus::SUCCESS); +        auto res = CollectStreamResult(it); + +        UNIT_ASSERT(res.ConsumedRuFromHeader > 0); + +        CompareYson(R"( +            [ +                [[3500u];["None"];[1u];["Anna"]] +            ] +        )", res.ResultSetYson); +    }  }  } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp index 0449c785523..1620cc8e18d 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.cpp @@ -54,6 +54,12 @@ TPlainStatus::TPlainStatus(      if (msg) {          Issues.AddIssue(NYql::TIssue(msg));      } +    for (const auto& [name, value] : grpcStatus.ServerTrailingMetadata) { +        Metadata.emplace( +            TStringType(name.begin(), name.end()), +            TStringType(value.begin(), value.end()) +        ); +    }  }  TPlainStatus TPlainStatus::Internal(const TStringType& message) {  | 
