diff options
| author | alexnick <[email protected]> | 2023-06-23 10:09:56 +0300 |
|---|---|---|
| committer | alexnick <[email protected]> | 2023-06-23 10:09:56 +0300 |
| commit | 50d94d3a778c92c8dc867eadab99af96bb257e9d (patch) | |
| tree | b856f88d7bf40e6d85b9f93a65b004a61d5a5a44 | |
| parent | 0a42fb7c5128e888198fa592bdb3670ed67ec3cf (diff) | |
fix yds counters inside ydb
fix counters
| -rw-r--r-- | ydb/core/http_proxy/custom_metrics.h | 4 | ||||
| -rw-r--r-- | ydb/core/http_proxy/events.h | 20 | ||||
| -rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 130 | ||||
| -rw-r--r-- | ydb/core/http_proxy/http_req.h | 2 | ||||
| -rw-r--r-- | ydb/core/http_proxy/http_service.cpp | 2 | ||||
| -rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 2 | ||||
| -rw-r--r-- | ydb/services/datastreams/put_records_actor.h | 4 |
7 files changed, 99 insertions, 65 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h index 75b6d84aa9b..026e9f750cc 100644 --- a/ydb/core/http_proxy/custom_metrics.h +++ b/ydb/core/http_proxy/custom_metrics.h @@ -28,12 +28,12 @@ TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const TH {"name", name}}; } if (method.empty()) { - return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId}, + return {{"database", httpContext.DatabasePath}, {"cloud_id", httpContext.CloudId}, {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, {"topic", httpContext.StreamName}, {"name", name}}; } - return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId}, + return {{"database", httpContext.DatabasePath}, {"method", method}, {"cloud_id", httpContext.CloudId}, {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, {"topic", httpContext.StreamName}, {"name", name}}; } diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h index 4acb6376ed6..010a18e73b5 100644 --- a/ydb/core/http_proxy/events.h +++ b/ydb/core/http_proxy/events.h @@ -41,7 +41,6 @@ namespace NKikimr::NHttpProxy { EvUpdateDatabasesEvent, EvListEndpointsRequest, EvListEndpointsResponse, - EvError, EvErrorWithIssue, EvCounter, EvHistCounter, @@ -120,10 +119,13 @@ namespace NKikimr::NHttpProxy { TString SerializedUserToken; - TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken = "") + TDatabase Database; + + TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken, const TDatabase& database) : ServiceAccountId(serviceAccountId) , IamToken(iamToken) , SerializedUserToken(serializedUserToken) + , Database(database) {} }; @@ -131,25 +133,17 @@ namespace NKikimr::NHttpProxy { TEvClientReady() {} }; - struct TEvError : public TEventLocal<TEvError, EvError> { - NYdb::EStatus Status; - TString Response; - - TEvError(const NYdb::EStatus status, const TString& response) - : Status(status) - , Response(response) - {} - }; - struct TEvErrorWithIssue : public TEventLocal<TEvErrorWithIssue, EvErrorWithIssue> { NYdb::EStatus Status; size_t IssueCode; TString Response; + TDatabase Database; - TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, size_t issueCode=0) + TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, const TDatabase& database, size_t issueCode) : Status(status) , IssueCode(issueCode) , Response(response) + , Database(database) {} }; }; diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 28b28face9f..5c82d8c67a6 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -143,17 +143,17 @@ namespace NKikimr::NHttpProxy { } template<class TProto> - TString TruncateStreamName(const TProto& req, const TString& database) + TString TruncateStreamName(const TProto& req, const TString& databasePath) { constexpr bool has_stream_name = requires(const TProto& t) { t.stream_name(); }; if constexpr (has_stream_name) { - Y_VERIFY(req.stream_name().StartsWith(database)); - return req.stream_name().substr(database.size(), -1); + Y_VERIFY(req.stream_name().StartsWith(databasePath)); + return req.stream_name().substr(databasePath.size(), -1); } - return ExtractStreamNameWithoutProtoField<TProto>(req).substr(database.size(), -1); + return ExtractStreamNameWithoutProtoField<TProto>(req).substr(databasePath.size(), -1); } constexpr TStringBuf IAM_HEADER = "x-yacloud-subjecttoken"; @@ -231,7 +231,6 @@ namespace NKikimr::NHttpProxy { HFunc(TEvents::TEvWakeup, HandleTimeout); HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady); HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle); - HFunc(TEvServerlessProxy::TEvError, HandleError); HFunc(TEvServerlessProxy::TEvErrorWithIssue, HandleErrorWithIssue); HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse); HFunc(TEvServerlessProxy::TEvToken, HandleToken); @@ -247,7 +246,7 @@ namespace NKikimr::NHttpProxy { RequestState = StateAuthorization; auto request = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest>(); - request->DatabasePath = HttpContext.DatabaseName; + request->DatabasePath = HttpContext.DatabasePath; ctx.Send(MakeTenantDiscoveryID(), std::move(request)); } @@ -256,17 +255,17 @@ namespace NKikimr::NHttpProxy { RequestState = StateListEndpoints; LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "create client to '" << HttpContext.DiscoveryEndpoint << - "' database: '" << HttpContext.DatabaseName << + "' database: '" << HttpContext.DatabasePath << "' iam token size: " << HttpContext.IamToken.size()); auto clientSettings = NYdb::TCommonClientSettings() .DiscoveryEndpoint(HttpContext.DiscoveryEndpoint) - .Database(HttpContext.DatabaseName) + .Database(HttpContext.DatabasePath) .AuthToken(HttpContext.IamToken) .DiscoveryMode(NYdb::EDiscoveryMode::Async); - if (!HttpContext.DatabaseName.empty() && !HttpContext.ServiceConfig.GetTestMode()) { - clientSettings.Database(HttpContext.DatabaseName); + if (!HttpContext.DatabasePath.empty() && !HttpContext.ServiceConfig.GetTestMode()) { + clientSettings.Database(HttpContext.DatabasePath); } Y_VERIFY(!Client); Client.Reset(new TDataStreamsClient(*HttpContext.Driver, clientSettings)); @@ -285,10 +284,10 @@ namespace NKikimr::NHttpProxy { RequestState = StateGrpcRequest; LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "sending grpc request to '" << HttpContext.DiscoveryEndpoint << - "' database: '" << HttpContext.DatabaseName << + "' database: '" << HttpContext.DatabasePath << "' iam token size: " << HttpContext.IamToken.size()); - RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, + RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabasePath, HttpContext.SerializedUserToken, ctx.ActorSystem()); RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()] (const NThreading::TFuture<TProtoResponse>& future) { @@ -313,7 +312,7 @@ namespace NKikimr::NHttpProxy { RequestState = StateGrpcRequest; LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "sending grpc request to '" << HttpContext.DiscoveryEndpoint << - "' database: '" << HttpContext.DatabaseName << + "' database: '" << HttpContext.DatabasePath << "' iam token size: " << HttpContext.IamToken.size()); Y_VERIFY(Client); @@ -345,6 +344,23 @@ namespace NKikimr::NHttpProxy { Y_UNUSED(ev); } + void TryUpdateDbInfo(const TDatabase& db, const TActorContext& ctx) { + if (db.Path) { + HttpContext.DatabasePath = db.Path; + HttpContext.DatabaseId = db.Id; + HttpContext.CloudId = db.CloudId; + HttpContext.FolderId = db.FolderId; + if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabasePath + "/")) { + HttpContext.StreamName = + TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabasePath + "/"); + } else { + HttpContext.StreamName = ExtractStreamName<TProtoRequest>(Request); + } + + } + ReportInputCounters(ctx); + } + void HandleToken(TEvServerlessProxy::TEvToken::TPtr& ev, const TActorContext& ctx) { HttpContext.ServiceAccountId = ev->Get()->ServiceAccountId; HttpContext.IamToken = ev->Get()->IamToken; @@ -353,15 +369,14 @@ namespace NKikimr::NHttpProxy { if (HttpContext.Driver) { SendYdbDriverRequest(ctx); } else { + TryUpdateDbInfo(ev->Get()->Database, ctx); SendGrpcRequestNoDriver(ctx); } } - void HandleError(TEvServerlessProxy::TEvError::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response); - } void HandleErrorWithIssue(TEvServerlessProxy::TEvErrorWithIssue::TPtr& ev, const TActorContext& ctx) { + TryUpdateDbInfo(ev->Get()->Database, ctx); ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response, ev->Get()->IssueCode); } @@ -377,6 +392,20 @@ namespace NKikimr::NHttpProxy { {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, {"name", "api.http.errors_per_second"}} }); + + ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{ + 1, true, true, + {{"database", HttpContext.DatabasePath}, + {"method", Method}, + {"cloud_id", HttpContext.CloudId}, + {"folder_id", HttpContext.FolderId}, + {"database_id", HttpContext.DatabaseId}, + {"topic", HttpContext.StreamName}, + {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, + {"name", "api.http.data_streams.response.count"}} + }); + HttpContext.ResponseData.Status = status; HttpContext.ResponseData.ErrorText = errorText; HttpContext.DoReply(ctx, issueCode); @@ -386,6 +415,24 @@ namespace NKikimr::NHttpProxy { TBase::Die(ctx); } + void ReportInputCounters(const TActorContext& ctx) { + + if (InputCountersReported) { + return; + } + InputCountersReported = true; + + FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix) + }); + ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels(Method, HttpContext, "api.http.data_streams.request.count") + }); + } + void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult::TPtr ev, const TActorContext& ctx) { if (ev->Get()->DatabaseInfo) { @@ -394,25 +441,15 @@ namespace NKikimr::NHttpProxy { HttpContext.CloudId = db->CloudId; HttpContext.DatabaseId = db->Id; HttpContext.DiscoveryEndpoint = db->Endpoint; - HttpContext.DatabaseName = db->Path; + HttpContext.DatabasePath = db->Path; - if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabaseName + "/")) { + if (ExtractStreamName<TProtoRequest>(Request).StartsWith(HttpContext.DatabasePath + "/")) { HttpContext.StreamName = - TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabaseName + "/"); + TruncateStreamName<TProtoRequest>(Request, HttpContext.DatabasePath + "/"); } else { HttpContext.StreamName = ExtractStreamName<TProtoRequest>(Request); } - - FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix) - }); - ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.data_streams.request.count") - }); - //TODO: add api.http.request.count + ReportInputCounters(ctx); CreateClient(ctx); return; } @@ -441,7 +478,6 @@ namespace NKikimr::NHttpProxy { FillOutputCustomMetrics<TProtoResult>( *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix) @@ -449,7 +485,7 @@ namespace NKikimr::NHttpProxy { ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, - {{"database", HttpContext.DatabaseName}, + {{"database", HttpContext.DatabasePath}, {"method", Method}, {"cloud_id", HttpContext.CloudId}, {"folder_id", HttpContext.FolderId}, @@ -506,17 +542,17 @@ namespace NKikimr::NHttpProxy { } catch (const std::exception& e) { LOG_SP_WARN_S(ctx, NKikimrServices::HTTP_PROXY, "got new request with incorrect json from [" << HttpContext.SourceAddress << "] " << - "database '" << HttpContext.DatabaseName << "'"); + "database '" << HttpContext.DatabasePath << "'"); return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT)); } - if (HttpContext.DatabaseName.empty()) { - HttpContext.DatabaseName = ExtractStreamName<TProtoRequest>(Request); + if (HttpContext.DatabasePath.empty()) { + HttpContext.DatabasePath = ExtractStreamName<TProtoRequest>(Request); } LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "got new request from [" << HttpContext.SourceAddress << "] " << - "database '" << HttpContext.DatabaseName << "' " << + "database '" << HttpContext.DatabasePath << "' " << "stream '" << ExtractStreamName<TProtoRequest>(Request) << "'"); // Use Signature or no sdk mode - then need to auth anyway @@ -554,6 +590,7 @@ namespace NKikimr::NHttpProxy { THolder<TDataStreamsClient> Client; TActorId AuthActor; + bool InputCountersReported = false; }; private: @@ -653,9 +690,9 @@ namespace NKikimr::NHttpProxy { SourceAddress = address; } - DatabaseName = Request->URL; - if (DatabaseName == "/") { - DatabaseName = ""; + DatabasePath = Request->URL; + if (DatabasePath == "/") { + DatabasePath = ""; } //TODO: find out databaseId ParseHeaders(Request->Headers); @@ -869,7 +906,7 @@ namespace NKikimr::NHttpProxy { , ServiceConfig(context.ServiceConfig) , IamToken(context.IamToken) , Authorize(!context.Driver) - , Database(context.DatabaseName) + , DatabasePath(context.DatabasePath) , StreamName(context.StreamName) { } @@ -896,7 +933,7 @@ namespace NKikimr::NHttpProxy { void SendDescribeRequest(const TActorContext& ctx) { auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = NKikimr::SplitPath(Database); + entry.Path = NKikimr::SplitPath(DatabasePath); entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; entry.SyncVersion = false; schemeCacheRequest->ResultSet.emplace_back(entry); @@ -907,7 +944,7 @@ namespace NKikimr::NHttpProxy { const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); if (navigate->ErrorCount) { return ReplyWithError( - ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists", + ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists", NYds::EErrorCodes::NOT_FOUND ); } @@ -917,6 +954,7 @@ namespace NKikimr::NHttpProxy { FolderId = description.GetPQTabletConfig().GetYcFolderId(); CloudId = description.GetPQTabletConfig().GetYcCloudId(); DatabaseId = description.GetPQTabletConfig().GetYdbDatabaseId(); + DatabasePath = description.GetPQTabletConfig().GetYdbDatabasePath(); } for (const auto& attr : navigate->ResultSet.front().Attributes) { if (attr.first == "folder_id") FolderId = attr.second; @@ -935,7 +973,7 @@ namespace NKikimr::NHttpProxy { if (ev->Get()->Error) { return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED, ev->Get()->Error.Message); }; - ctx.Send(Sender, new TEvServerlessProxy::TEvToken(ev->Get()->Token->GetUserSID(), "", ev->Get()->SerializedToken)); + ctx.Send(Sender, new TEvServerlessProxy::TEvToken(ev->Get()->Token->GetUserSID(), "", ev->Get()->SerializedToken, {"", DatabaseId, DatabasePath, CloudId, FolderId})); LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "Authorized successfully"); @@ -1067,7 +1105,7 @@ namespace NKikimr::NHttpProxy { void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, NYds::EErrorCodes issueCode = NYds::EErrorCodes::GENERIC_ERROR) { - ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, static_cast<size_t>(issueCode))); + ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, {"", DatabaseId, DatabasePath, CloudId, FolderId}, static_cast<size_t>(issueCode))); TBase::Die(ctx); } @@ -1090,7 +1128,7 @@ namespace NKikimr::NHttpProxy { Y_VERIFY(!ev->Get()->Response.iam_token().empty()); ctx.Send(Sender, - new TEvServerlessProxy::TEvToken(ServiceAccountId, ev->Get()->Response.iam_token())); + new TEvServerlessProxy::TEvToken(ServiceAccountId, ev->Get()->Response.iam_token(), "", {})); LOG_SP_DEBUG_S(ctx, NKikimrServices::HTTP_PROXY, "IAM token generated"); @@ -1126,7 +1164,7 @@ namespace NKikimr::NHttpProxy { TString FolderId; TString CloudId; TString DatabaseId; - TString Database; + TString DatabasePath; TString StreamName; }; diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index 60437ff3b2b..6effd742973 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -75,7 +75,7 @@ struct THttpRequestContext { TString ServiceAccountId; TString RequestId; TString DiscoveryEndpoint; - TString DatabaseName; + TString DatabasePath; TString DatabaseId; // not in context TString FolderId; // not in context TString CloudId; // not in context diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp index a08d764bc55..656ec9be035 100644 --- a/ydb/core/http_proxy/http_service.cpp +++ b/ydb/core/http_proxy/http_service.cpp @@ -94,7 +94,7 @@ namespace NKikimr::NHttpProxy { " incoming request from [" << context.SourceAddress << "]" << " request [" << context.MethodName << "]" << " url [" << context.Request->URL << "]" << - " database [" << context.DatabaseName << "]" << + " database [" << context.DatabasePath << "]" << " requestId: " << context.RequestId); try { diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index eaa46ca2a1e..c939a1572a0 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -1388,7 +1388,7 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(item.GetData(), item.GetPartitionKey()); auto hashKey = item.GetExplicitHash().empty() ? HexBytesToDecimal(MD5::Calc(item.GetPartitionKey())) : BytesToDecimal(item.GetExplicitHash()); UNIT_ASSERT_VALUES_EQUAL(NKikimr::NDataStreams::V1::ShardFromDecimal(hashKey, 5), item.GetPartitionStream()->GetPartitionId()); - UNIT_ASSERT(!item.GetIp().empty()); + UNIT_ASSERT(item.GetIp().empty()); if (item.GetData() == dataStr) { UNIT_ASSERT_VALUES_EQUAL(item.GetExplicitHash(), dataStr); } diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index 917c9b07014..cac15961120 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -28,7 +28,9 @@ namespace NKikimr::NDataStreams::V1 { TString GetSerializedData(const TPutRecordsItem& item) { NKikimrPQClient::TDataChunk proto; - proto.SetIp(item.Ip); + //TODO: get ip from client, not grpc; + // proto.SetIp(item.Ip); + proto.SetCodec(0); // NPersQueue::CODEC_RAW proto.SetData(item.Data); |
