summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <[email protected]>2023-06-23 10:09:56 +0300
committeralexnick <[email protected]>2023-06-23 10:09:56 +0300
commit50d94d3a778c92c8dc867eadab99af96bb257e9d (patch)
treeb856f88d7bf40e6d85b9f93a65b004a61d5a5a44
parent0a42fb7c5128e888198fa592bdb3670ed67ec3cf (diff)
fix yds counters inside ydb
fix counters
-rw-r--r--ydb/core/http_proxy/custom_metrics.h4
-rw-r--r--ydb/core/http_proxy/events.h20
-rw-r--r--ydb/core/http_proxy/http_req.cpp130
-rw-r--r--ydb/core/http_proxy/http_req.h2
-rw-r--r--ydb/core/http_proxy/http_service.cpp2
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp2
-rw-r--r--ydb/services/datastreams/put_records_actor.h4
7 files changed, 99 insertions, 65 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h
index 75b6d84aa9b..026e9f750cc 100644
--- a/ydb/core/http_proxy/custom_metrics.h
+++ b/ydb/core/http_proxy/custom_metrics.h
@@ -28,12 +28,12 @@ TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const TH
{"name", name}};
}
if (method.empty()) {
- return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId},
+ return {{"database", httpContext.DatabasePath}, {"cloud_id", httpContext.CloudId},
{"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
{"topic", httpContext.StreamName}, {"name", name}};
}
- return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId},
+ return {{"database", httpContext.DatabasePath}, {"method", method}, {"cloud_id", httpContext.CloudId},
{"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
{"topic", httpContext.StreamName}, {"name", name}};
}
diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h
index 4acb6376ed6..010a18e73b5 100644
--- a/ydb/core/http_proxy/events.h
+++ b/ydb/core/http_proxy/events.h
@@ -41,7 +41,6 @@ namespace NKikimr::NHttpProxy {
EvUpdateDatabasesEvent,
EvListEndpointsRequest,
EvListEndpointsResponse,
- EvError,
EvErrorWithIssue,
EvCounter,
EvHistCounter,
@@ -120,10 +119,13 @@ namespace NKikimr::NHttpProxy {
TString SerializedUserToken;
- TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken = "")
+ TDatabase Database;
+
+ TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken, const TDatabase& database)
: ServiceAccountId(serviceAccountId)
, IamToken(iamToken)
, SerializedUserToken(serializedUserToken)
+ , Database(database)
{}
};
@@ -131,25 +133,17 @@ namespace NKikimr::NHttpProxy {
TEvClientReady() {}
};
- struct TEvError : public TEventLocal<TEvError, EvError> {
- NYdb::EStatus Status;
- TString Response;
-
- TEvError(const NYdb::EStatus status, const TString& response)
- : Status(status)
- , Response(response)
- {}
- };
-
struct TEvErrorWithIssue : public TEventLocal<TEvErrorWithIssue, EvErrorWithIssue> {
NYdb::EStatus Status;
size_t IssueCode;
TString Response;
+ TDatabase Database;
- TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, size_t issueCode=0)
+ TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, const TDatabase& database, size_t issueCode)
: Status(status)
, IssueCode(issueCode)
, Response(response)
+ , Database(database)
{}
};
};
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 28b28face9f..5c82d8c67a6 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -143,17 +143,17 @@ namespace NKikimr::NHttpProxy {
}
template<class TProto>
- TString TruncateStreamName(const TProto& req, const TString& database)
+ TString TruncateStreamName(const TProto& req, const TString& databasePath)
{
constexpr bool has_stream_name = requires(const TProto& t) {
t.stream_name();
};
if constexpr (has_stream_name) {
- Y_VERIFY(req.stream_name().StartsWith(database));
- return req.stream_name().substr(database.size(), -1);
+ Y_VERIFY(req.stream_name().StartsWith(databasePath));
+ return req.stream_name().substr(databasePath.size(), -1);
}
- return ExtractStreamNameWithoutProtoField<TProto>(req).substr(database.size(), -1);
+ return ExtractStreamNameWithoutProtoField<TProto>(req).substr(databasePath.size(), -1);
}
constexpr TStringBuf IAM_HEADER = "x-yacloud-subjecttoken";
@@ -231,7 +231,6 @@ namespace NKikimr::NHttpProxy {
HFunc(TEvents::TEvWakeup, HandleTimeout);
HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady);
HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle);
- HFunc(TEvServerlessProxy::TEvError, HandleError);
HFunc(TEvServerlessProxy::TEvErrorWithIssue, HandleErrorWithIssue);
HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse);
HFunc(TEvServerlessProxy::TEvToken, HandleToken);
@@ -247,7 +246,7 @@ namespace NKikimr::NHttpProxy {
RequestState = StateAuthorization;
auto request = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest>();
- request->DatabasePath = HttpContext.DatabaseName;
+ request->DatabasePath = HttpContext.DatabasePath;
ctx.Send(MakeTenantDiscoveryID(), std::move(request));
}
@@ -256,17 +255,17 @@ namespace NKikimr::NHttpProxy {
RequestState = StateListEndpoints;
LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
"create client to '" << HttpContext.DiscoveryEndpoint <<
- "' database: '" << HttpContext.DatabaseName <<
+ "' database: '" << HttpContext.DatabasePath <<
"' iam token size: " << HttpContext.IamToken.size());
auto clientSettings = NYdb::TCommonClientSettings()
.DiscoveryEndpoint(HttpContext.DiscoveryEndpoint)
- .Database(HttpContext.DatabaseName)
+ .Database(HttpContext.DatabasePath)
.AuthToken(HttpContext.IamToken)
.DiscoveryMode(NYdb::EDiscoveryMode::Async);
- if (!HttpContext.DatabaseName.empty() && !HttpContext.ServiceConfig.GetTestMode()) {
- clientSettings.Database(HttpContext.DatabaseName);
+ if (!HttpContext.DatabasePath.empty() && !HttpContext.ServiceConfig.GetTestMode()) {
+ clientSettings.Database(HttpContext.DatabasePath);
}
Y_VERIFY(!Client);
Client.Reset(new TDataStreamsClient(*HttpContext.Driver, clientSettings));
@@ -285,10 +284,10 @@ namespace NKikimr::NHttpProxy {
RequestState = StateGrpcRequest;
LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
"sending grpc request to '" << HttpContext.DiscoveryEndpoint <<
- "' database: '" << HttpContext.DatabaseName <<
+ "' database: '" << HttpContext.DatabasePath <<
"' iam token size: " << HttpContext.IamToken.size());
- RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName,
+ RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabasePath,
HttpContext.SerializedUserToken, ctx.ActorSystem());
RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()]
(const NThreading::TFuture<TProtoResponse>& future) {
@@ -313,7 +312,7 @@ namespace NKikimr::NHttpProxy {
RequestState = StateGrpcRequest;
LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
"sending grpc request to '" << HttpContext.DiscoveryEndpoint <<
- "' database: '" << HttpContext.DatabaseName <<
+ "' database: '" << HttpContext.DatabasePath <<
"' iam token size: " << HttpContext.IamToken.size());
Y_VERIFY(Client);
@@ -345,6 +344,23 @@ namespace NKikimr::NHttpProxy {
Y_UNUSED(ev);
}
+ void TryUpdateDbInfo(const TDatabase& db, const TActorContext& ctx) {
+ if (db.Path) {
+ HttpContext.DatabasePath = db.Path;
+ HttpContext.DatabaseId = db.Id;
+ HttpContext.CloudId = db.CloudId;
+ HttpContext.FolderId = db.FolderId;
+ if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabasePath + "/")) {
+ HttpContext.StreamName =
+ TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabasePath + "/");
+ } else {
+ HttpContext.StreamName = ExtractStreamName<TProtoRequest>(Request);
+ }
+
+ }
+ ReportInputCounters(ctx);
+ }
+
void HandleToken(TEvServerlessProxy::TEvToken::TPtr& ev, const TActorContext& ctx) {
HttpContext.ServiceAccountId = ev->Get()->ServiceAccountId;
HttpContext.IamToken = ev->Get()->IamToken;
@@ -353,15 +369,14 @@ namespace NKikimr::NHttpProxy {
if (HttpContext.Driver) {
SendYdbDriverRequest(ctx);
} else {
+ TryUpdateDbInfo(ev->Get()->Database, ctx);
SendGrpcRequestNoDriver(ctx);
}
}
- void HandleError(TEvServerlessProxy::TEvError::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response);
- }
void HandleErrorWithIssue(TEvServerlessProxy::TEvErrorWithIssue::TPtr& ev, const TActorContext& ctx) {
+ TryUpdateDbInfo(ev->Get()->Database, ctx);
ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response, ev->Get()->IssueCode);
}
@@ -377,6 +392,20 @@ namespace NKikimr::NHttpProxy {
{"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
{"name", "api.http.errors_per_second"}}
});
+
+ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{
+ 1, true, true,
+ {{"database", HttpContext.DatabasePath},
+ {"method", Method},
+ {"cloud_id", HttpContext.CloudId},
+ {"folder_id", HttpContext.FolderId},
+ {"database_id", HttpContext.DatabaseId},
+ {"topic", HttpContext.StreamName},
+ {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
+ {"name", "api.http.data_streams.response.count"}}
+ });
+
HttpContext.ResponseData.Status = status;
HttpContext.ResponseData.ErrorText = errorText;
HttpContext.DoReply(ctx, issueCode);
@@ -386,6 +415,24 @@ namespace NKikimr::NHttpProxy {
TBase::Die(ctx);
}
+ void ReportInputCounters(const TActorContext& ctx) {
+
+ if (InputCountersReported) {
+ return;
+ }
+ InputCountersReported = true;
+
+ FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx);
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix)
+ });
+ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels(Method, HttpContext, "api.http.data_streams.request.count")
+ });
+ }
+
void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev,
const TActorContext& ctx) {
if (ev->Get()->DatabaseInfo) {
@@ -394,25 +441,15 @@ namespace NKikimr::NHttpProxy {
HttpContext.CloudId = db->CloudId;
HttpContext.DatabaseId = db->Id;
HttpContext.DiscoveryEndpoint = db->Endpoint;
- HttpContext.DatabaseName = db->Path;
+ HttpContext.DatabasePath = db->Path;
- if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabaseName + "/")) {
+ if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabasePath + "/")) {
HttpContext.StreamName =
- TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabaseName + "/");
+ TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabasePath + "/");
} else {
HttpContext.StreamName = ExtractStreamName<TProtoRequest>(Request);
}
-
- FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx);
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix)
- });
- ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.data_streams.request.count")
- });
- //TODO: add api.http.request.count
+ ReportInputCounters(ctx);
CreateClient(ctx);
return;
}
@@ -441,7 +478,6 @@ namespace NKikimr::NHttpProxy {
FillOutputCustomMetrics<TProtoResult>(
*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
ReportLatencyCounters(ctx);
-
/* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix)
@@ -449,7 +485,7 @@ namespace NKikimr::NHttpProxy {
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
- {{"database", HttpContext.DatabaseName},
+ {{"database", HttpContext.DatabasePath},
{"method", Method},
{"cloud_id", HttpContext.CloudId},
{"folder_id", HttpContext.FolderId},
@@ -506,17 +542,17 @@ namespace NKikimr::NHttpProxy {
} catch (const std::exception& e) {
LOG_SP_WARN_S(ctx, NKikimrServices::HTTP_PROXY,
"got new request with incorrect json from [" << HttpContext.SourceAddress << "] " <<
- "database '" << HttpContext.DatabaseName << "'");
+ "database '" << HttpContext.DatabasePath << "'");
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT));
}
- if (HttpContext.DatabaseName.empty()) {
- HttpContext.DatabaseName = ExtractStreamName<TProtoRequest>(Request);
+ if (HttpContext.DatabasePath.empty()) {
+ HttpContext.DatabasePath = ExtractStreamName<TProtoRequest>(Request);
}
LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
"got new request from [" << HttpContext.SourceAddress << "] " <<
- "database '" << HttpContext.DatabaseName << "' " <<
+ "database '" << HttpContext.DatabasePath << "' " <<
"stream '" << ExtractStreamName<TProtoRequest>(Request) << "'");
// Use Signature or no sdk mode - then need to auth anyway
@@ -554,6 +590,7 @@ namespace NKikimr::NHttpProxy {
THolder<TDataStreamsClient> Client;
TActorId AuthActor;
+ bool InputCountersReported = false;
};
private:
@@ -653,9 +690,9 @@ namespace NKikimr::NHttpProxy {
SourceAddress = address;
}
- DatabaseName = Request->URL;
- if (DatabaseName == "/") {
- DatabaseName = "";
+ DatabasePath = Request->URL;
+ if (DatabasePath == "/") {
+ DatabasePath = "";
}
//TODO: find out databaseId
ParseHeaders(Request->Headers);
@@ -869,7 +906,7 @@ namespace NKikimr::NHttpProxy {
, ServiceConfig(context.ServiceConfig)
, IamToken(context.IamToken)
, Authorize(!context.Driver)
- , Database(context.DatabaseName)
+ , DatabasePath(context.DatabasePath)
, StreamName(context.StreamName)
{
}
@@ -896,7 +933,7 @@ namespace NKikimr::NHttpProxy {
void SendDescribeRequest(const TActorContext& ctx) {
auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.Path = NKikimr::SplitPath(Database);
+ entry.Path = NKikimr::SplitPath(DatabasePath);
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.SyncVersion = false;
schemeCacheRequest->ResultSet.emplace_back(entry);
@@ -907,7 +944,7 @@ namespace NKikimr::NHttpProxy {
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
if (navigate->ErrorCount) {
return ReplyWithError(
- ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists",
+ ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists",
NYds::EErrorCodes::NOT_FOUND
);
}
@@ -917,6 +954,7 @@ namespace NKikimr::NHttpProxy {
FolderId = description.GetPQTabletConfig().GetYcFolderId();
CloudId = description.GetPQTabletConfig().GetYcCloudId();
DatabaseId = description.GetPQTabletConfig().GetYdbDatabaseId();
+ DatabasePath = description.GetPQTabletConfig().GetYdbDatabasePath();
}
for (const auto& attr : navigate->ResultSet.front().Attributes) {
if (attr.first == "folder_id") FolderId = attr.second;
@@ -935,7 +973,7 @@ namespace NKikimr::NHttpProxy {
if (ev->Get()->Error) {
return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED, ev->Get()->Error.Message);
};
- ctx.Send(Sender, new TEvServerlessProxy::TEvToken(ev->Get()->Token->GetUserSID(), "", ev->Get()->SerializedToken));
+ ctx.Send(Sender, new TEvServerlessProxy::TEvToken(ev->Get()->Token->GetUserSID(), "", ev->Get()->SerializedToken, {"", DatabaseId, DatabasePath, CloudId, FolderId}));
LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "Authorized successfully");
@@ -1067,7 +1105,7 @@ namespace NKikimr::NHttpProxy {
void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText,
NYds::EErrorCodes issueCode = NYds::EErrorCodes::GENERIC_ERROR) {
- ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, static_cast<size_t>(issueCode)));
+ ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, {"", DatabaseId, DatabasePath, CloudId, FolderId}, static_cast<size_t>(issueCode)));
TBase::Die(ctx);
}
@@ -1090,7 +1128,7 @@ namespace NKikimr::NHttpProxy {
Y_VERIFY(!ev->Get()->Response.iam_token().empty());
ctx.Send(Sender,
- new TEvServerlessProxy::TEvToken(ServiceAccountId, ev->Get()->Response.iam_token()));
+ new TEvServerlessProxy::TEvToken(ServiceAccountId, ev->Get()->Response.iam_token(), "", {}));
LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "IAM token generated");
@@ -1126,7 +1164,7 @@ namespace NKikimr::NHttpProxy {
TString FolderId;
TString CloudId;
TString DatabaseId;
- TString Database;
+ TString DatabasePath;
TString StreamName;
};
diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h
index 60437ff3b2b..6effd742973 100644
--- a/ydb/core/http_proxy/http_req.h
+++ b/ydb/core/http_proxy/http_req.h
@@ -75,7 +75,7 @@ struct THttpRequestContext {
TString ServiceAccountId;
TString RequestId;
TString DiscoveryEndpoint;
- TString DatabaseName;
+ TString DatabasePath;
TString DatabaseId; // not in context
TString FolderId; // not in context
TString CloudId; // not in context
diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp
index a08d764bc55..656ec9be035 100644
--- a/ydb/core/http_proxy/http_service.cpp
+++ b/ydb/core/http_proxy/http_service.cpp
@@ -94,7 +94,7 @@ namespace NKikimr::NHttpProxy {
" incoming request from [" << context.SourceAddress << "]" <<
" request [" << context.MethodName << "]" <<
" url [" << context.Request->URL << "]" <<
- " database [" << context.DatabaseName << "]" <<
+ " database [" << context.DatabasePath << "]" <<
" requestId: " << context.RequestId);
try {
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index eaa46ca2a1e..c939a1572a0 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -1388,7 +1388,7 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(item.GetData(), item.GetPartitionKey());
auto hashKey = item.GetExplicitHash().empty() ? HexBytesToDecimal(MD5::Calc(item.GetPartitionKey())) : BytesToDecimal(item.GetExplicitHash());
UNIT_ASSERT_VALUES_EQUAL(NKikimr::NDataStreams::V1::ShardFromDecimal(hashKey, 5), item.GetPartitionStream()->GetPartitionId());
- UNIT_ASSERT(!item.GetIp().empty());
+ UNIT_ASSERT(item.GetIp().empty());
if (item.GetData() == dataStr) {
UNIT_ASSERT_VALUES_EQUAL(item.GetExplicitHash(), dataStr);
}
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index 917c9b07014..cac15961120 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -28,7 +28,9 @@ namespace NKikimr::NDataStreams::V1 {
TString GetSerializedData(const TPutRecordsItem& item) {
NKikimrPQClient::TDataChunk proto;
- proto.SetIp(item.Ip);
+ //TODO: get ip from client, not grpc;
+ // proto.SetIp(item.Ip);
+
proto.SetCodec(0); // NPersQueue::CODEC_RAW
proto.SetData(item.Data);