aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBulat <brgayazov@yandex-team.ru>2025-02-27 19:55:21 +0300
committerGitHub <noreply@github.com>2025-02-27 19:55:21 +0300
commita601ca88db3e7cc4e307b944230477fdd9682714 (patch)
tree7888bf72fd110a17fd09dc591f00f637cacb2787
parent1d43db6321dabc26c9bca604408733782700dfd8 (diff)
downloadydb-a601ca88db3e7cc4e307b944230477fdd9682714.tar.gz
Added Ru return in GetConsumedRu method with streaming calls in C++ SDK (#15095)
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp2
-rw-r--r--ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h4
-rw-r--r--ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.cpp18
-rw-r--r--ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.h11
-rw-r--r--ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/ya.make1
-rw-r--r--ydb/public/sdk/cpp/src/client/table/impl/readers.cpp1
-rw-r--r--ydb/public/sdk/cpp/src/client/types/status/status.cpp2
7 files changed, 29 insertions, 10 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index bbbd3b39fc..035cf7aa8e 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -1008,6 +1008,8 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
if (mit != meta.end()) {
res.ConsumedRuFromHeader = std::stol(mit->second);
}
+ UNIT_ASSERT_EQUAL_C(res.ConsumedRuFromHeader, streamPart.GetConsumedRu(),
+ "Request unit values from headers and TStatus are differ");
break;
}
diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h
index 6a90a9e758..05fa1a6b67 100644
--- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h
+++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h
@@ -459,7 +459,6 @@ public:
}
};
processor->AddFinishedCallback(std::move(finishedCallback));
- // TODO: Add headers for streaming calls.
TPlainStatus status(std::move(grpcStatus), endpoint.GetEndpoint(), {});
responseCb(std::move(status), std::move(processor));
} else {
@@ -468,7 +467,6 @@ public:
if (grpcStatus.GRpcStatusCode != grpc::StatusCode::CANCELLED) {
dbState->EndpointPool.BanEndpoint(endpoint.GetEndpoint());
}
- // TODO: Add headers for streaming calls.
TPlainStatus status(std::move(grpcStatus), endpoint.GetEndpoint(), {});
responseCb(std::move(status), nullptr);
}
@@ -560,7 +558,6 @@ public:
}
};
processor->AddFinishedCallback(std::move(finishedCallback));
- // TODO: Add headers for streaming calls.
TPlainStatus status(std::move(grpcStatus), endpoint.GetEndpoint(), {});
connectedCallback(std::move(status), std::move(processor));
} else {
@@ -569,7 +566,6 @@ public:
if (grpcStatus.GRpcStatusCode != grpc::StatusCode::CANCELLED) {
dbState->EndpointPool.BanEndpoint(endpoint.GetEndpoint());
}
- // TODO: Add headers for streaming calls.
TPlainStatus status(std::move(grpcStatus), endpoint.GetEndpoint(), {});
connectedCallback(std::move(status), nullptr);
}
diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.cpp
index bb29e6583e..a0dcfb5fc4 100644
--- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.cpp
+++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.cpp
@@ -1,6 +1,8 @@
#define INCLUDE_YDB_INTERNAL_H
#include "status.h"
+#include <ydb-cpp-sdk/client/resources/ydb_resources.h>
+
#include <util/string/builder.h>
namespace NYdb::inline V3 {
@@ -62,10 +64,26 @@ TPlainStatus::TPlainStatus(
std::string(value.begin(), value.end())
);
}
+
+ InitCostInfo();
}
TPlainStatus TPlainStatus::Internal(const std::string& message) {
return { EStatus::CLIENT_INTERNAL_ERROR, "Internal client error: " + message };
}
+void TPlainStatus::InitCostInfo() {
+ if (auto metaIt = Metadata.find(YDB_CONSUMED_UNITS_HEADER); metaIt != Metadata.end()) {
+ try {
+ CostInfo.set_consumed_units(std::stod(metaIt->second));
+ } catch (std::exception& e) {
+ if (Ok()) {
+ Status = EStatus::CLIENT_INTERNAL_ERROR;
+ }
+
+ Issues.AddIssue(NIssue::TIssue{"Failed to parse CostInfo from Metadata: " + std::string{e.what()}});
+ }
+ }
+}
+
} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.h
index cf5fe5110b..f6c0f0b227 100644
--- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.h
+++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/status.h
@@ -18,7 +18,7 @@ struct TPlainStatus {
NYdb::NIssue::TIssues Issues;
std::string Endpoint;
std::multimap<std::string, std::string> Metadata;
- Ydb::CostInfo ConstInfo;
+ Ydb::CostInfo CostInfo;
TPlainStatus()
: Status(EStatus::SUCCESS)
@@ -35,7 +35,9 @@ struct TPlainStatus {
, Issues(std::move(issues))
, Endpoint(endpoint)
, Metadata(std::move(metadata))
- { }
+ {
+ InitCostInfo();
+ }
TPlainStatus(EStatus status, const std::string& message)
: Status(status)
@@ -51,7 +53,7 @@ struct TPlainStatus {
template<class T>
void SetCostInfo(T&& costInfo) {
- ConstInfo = std::forward<T>(costInfo);
+ CostInfo = std::forward<T>(costInfo);
}
bool Ok() const {
@@ -73,7 +75,8 @@ struct TPlainStatus {
return ret;
}
-
+private:
+ void InitCostInfo();
};
} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/ya.make b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/ya.make
index 630e2aab56..6bf23008fe 100644
--- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/ya.make
+++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status/ya.make
@@ -8,6 +8,7 @@ SRCS(
PEERDIR(
contrib/libs/protobuf
+ ydb/public/sdk/cpp/src/client/resources
ydb/public/sdk/cpp/src/library/grpc/client
ydb/public/sdk/cpp/src/library/issue
)
diff --git a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
index acb5f6dd58..1d15c4d9aa 100644
--- a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
+++ b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
@@ -78,7 +78,6 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt
NYdb::NIssue::TIssues issues;
NYdb::NIssue::IssuesFromMessage(self->Response_.issues(), issues);
EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
- // TODO: Add headers for streaming calls.
TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}};
TStatus status{std::move(plainStatus)};
std::optional<TQueryStats> queryStats;
diff --git a/ydb/public/sdk/cpp/src/client/types/status/status.cpp b/ydb/public/sdk/cpp/src/client/types/status/status.cpp
index 1764499e40..dd5238a479 100644
--- a/ydb/public/sdk/cpp/src/client/types/status/status.cpp
+++ b/ydb/public/sdk/cpp/src/client/types/status/status.cpp
@@ -71,7 +71,7 @@ const std::multimap<std::string, std::string>& TStatus::GetResponseMetadata() co
}
float TStatus::GetConsumedRu() const {
- return Impl_->Status.ConstInfo.consumed_units();
+ return Impl_->Status.CostInfo.consumed_units();
}
void TStatus::Out(IOutputStream& out) const {