diff options
author | savnik <savnik@yandex-team.com> | 2023-08-31 10:12:48 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-08-31 10:27:02 +0300 |
commit | cee0e3df5d707640983570d06892747720e0bb61 (patch) | |
tree | 23a94ac7ffcda52f153bb504ba9c1bc3bc443e5e | |
parent | eda974389da3036fc702fce50ab75ad71ad13bfb (diff) | |
download | ydb-cee0e3df5d707640983570d06892747720e0bb61.tar.gz |
Kafka metrics
25 files changed, 347 insertions, 144 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 607364d78ef..919b2a9a6e9 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -65,6 +65,8 @@ #include <ydb/core/health_check/health_check.h> +#include <ydb/core/kafka_proxy/actors/kafka_metrics_actor.h> +#include <ydb/core/kafka_proxy/kafka_metrics.h> #include <ydb/core/kafka_proxy/kafka_proxy.h> #include <ydb/core/kqp/common/kqp.h> @@ -2635,6 +2637,13 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()), TMailboxType::HTSwap, appData->UserPoolId) ); + + IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters->GetSubgroup("counters", "kafka_proxy")}); + setup->LocalServices.emplace_back( + NKafka::MakeKafkaMetricsServiceID(), + TActorSetupCmd(metricsActor, + TMailboxType::HTSwap, appData->UserPoolId) + ); } } diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index 4a2aad8d4a4..b5e422d9e17 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -26,8 +26,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index 3c0e1dff11f..64ae0928906 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -27,8 +27,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index 3c0e1dff11f..64ae0928906 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -27,8 +27,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index 4a2aad8d4a4..b5e422d9e17 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -26,8 +26,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 526328bddb9..88fe30d8c93 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -30,7 +30,10 @@ struct TContext { EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE; TString SaslMechanism; - TString Database; + TString DatabasePath; + TString FolderId; + TString CloudId; + TString DatabaseId; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString ClientDC; diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index bb24a616156..14a0e4931a0 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -10,6 +10,7 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const TApiVersionsResponseData::TPtr GetApiVersions() { TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>(); + response->ErrorCode = EKafkaErrors::NONE_ERROR; response->ApiKeys.resize(6); response->ApiKeys[0].ApiKey = PRODUCE; @@ -41,8 +42,8 @@ TApiVersionsResponseData::TPtr GetApiVersions() { void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) { Y_UNUSED(Message); - - Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions())); + auto apiVersions = GetApiVersions(); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, apiVersions, static_cast<EKafkaErrors>(apiVersions->ErrorCode))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp index 71bd8503fca..481c21869c9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp @@ -22,8 +22,8 @@ TInitProducerIdResponseData::TPtr GetResponse(const NActors::TActorContext& ctx) void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) { Y_UNUSED(Message); - - Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetResponse(ctx))); + auto response = GetResponse(ctx); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 0951a66e05c..15e8a2a5fd8 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -48,7 +48,7 @@ TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMeta TGetPartitionsLocationRequest locationRequest{}; locationRequest.Topic = topicRequest.Name.value(); locationRequest.Token = Context->UserToken->GetSerializedToken(); - locationRequest.Database = Context->Database; + locationRequest.Database = Context->DatabasePath; PendingResponses++; @@ -59,6 +59,7 @@ void TKafkaMetadataActor::AddTopicError( TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode ) { topic.ErrorCode = errorCode; + ErrorCode = errorCode; } void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { @@ -134,7 +135,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) { if (PendingResponses == 0) { - Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response)); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode)); Die(ctx); } } diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h index d55659d0ace..343c9377d0d 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h @@ -45,6 +45,7 @@ private: TMetadataResponseData::TPtr Response; THashMap<TActorId, TVector<ui64>> TopicIndexes; THashSet<ui64> AllClusterNodes; + EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR; }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp new file mode 100644 index 00000000000..de410274923 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp @@ -0,0 +1,79 @@ +#include "kafka_metrics_actor.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/cache/cache.h> + +#include <library/cpp/monlib/metrics/histogram_collector.h> + +namespace NKafka { + + using namespace NActors; + + class TKafkaMetricsActor : public NActors::TActorBootstrapped<TKafkaMetricsActor> { + using TBase = NActors::TActorBootstrapped<TKafkaMetricsActor>; + public: + explicit TKafkaMetricsActor(const TKafkaMetricsSettings& settings) + : Settings(settings) + { + } + + void Bootstrap(const TActorContext&) { + TBase::Become(&TKafkaMetricsActor::StateWork); + } + + TStringBuilder LogPrefix() const { + return {}; + } + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvUpdateCounter, Handle); + HFunc(TEvKafka::TEvUpdateHistCounter, Handle); + + } + } + + void Handle(TEvKafka::TEvUpdateCounter::TPtr& ev, const TActorContext& ctx); + void Handle(TEvKafka::TEvUpdateHistCounter::TPtr& ev, const TActorContext& ctx); + TIntrusivePtr<NMonitoring::TDynamicCounters> GetGroupFromLabels(const TVector<std::pair<TString, TString>>& labels); + + private: + TKafkaMetricsSettings Settings; + }; + + TIntrusivePtr<NMonitoring::TDynamicCounters> TKafkaMetricsActor::GetGroupFromLabels(const TVector<std::pair<TString, TString>>& labels) { + Y_VERIFY(labels.size() > 1); + auto group = Settings.Counters->GetSubgroup(labels[0].first, labels[0].second); + for (ui32 i = 1; i + 1 < labels.size(); ++i) { + if (labels[i].second.empty()) + continue; + group = group->GetSubgroup(labels[i].first, labels[i].second); + } + return group; + } + + void TKafkaMetricsActor::Handle(TEvKafka::TEvUpdateCounter::TPtr& ev, const TActorContext&) { + auto labels = ev->Get()->Labels; + auto group = GetGroupFromLabels(labels); + auto counter = group->GetNamedCounter(labels.back().first, labels.back().second, true); + *counter += ev->Get()->Delta; + } + + void TKafkaMetricsActor::Handle(TEvKafka::TEvUpdateHistCounter::TPtr& ev, const TActorContext&) { + auto labels = ev->Get()->Labels; + auto group = GetGroupFromLabels(labels); + auto counter = group->GetNamedHistogram(labels.back().first, labels.back().second, + NMonitoring::ExplicitHistogram({100, 200, 500, 1000, 2000, 5000, 10000, 30000})); + counter->Collect(ev->Get()->Value, ev->Get()->Count); + } + + + NActors::IActor* CreateKafkaMetricsActor(const TKafkaMetricsSettings& settings) { + return new TKafkaMetricsActor(settings); + } + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h new file mode 100644 index 00000000000..123d568119c --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/kafka_proxy/kafka_events.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NKafka { + + struct TKafkaMetricsSettings { + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + }; + + NActors::IActor* CreateKafkaMetricsActor(const TKafkaMetricsSettings& settings); + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 5fd4d2b2da6..926ede3640c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -1,4 +1,6 @@ #include "kafka_produce_actor.h" +#include "../kafka_metrics.h" + #include <contrib/libs/protobuf/src/google/protobuf/util/time_util.h> @@ -46,6 +48,11 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) { KAFKA_LOG_T("Received event: " << ev.GetTypeName()); } +void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) { + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicName, TStringBuilder() << "api.kafka.produce." << name, ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicName, "api.kafka.produce.total_messages", ""))); +} + void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) { Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); Become(&TKafkaProduceActor::StateWork); @@ -231,7 +238,7 @@ size_t TKafkaProduceActor::EnqueueInitialization() { for(const auto& e : Requests) { auto* r = e->Get()->Request; for(const auto& topicData : r->TopicData) { - const auto& topicPath = NormalizePath(Context->Database, *topicData.Name); + const auto& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); if (!Topics.contains(topicPath)) { requireInitialization = true; TopicsForInitialization.insert(topicPath); @@ -324,7 +331,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co size_t position = 0; for(const auto& topicData : r->TopicData) { - const TString& topicPath = NormalizePath(Context->Database, *topicData.Name); + const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; @@ -438,35 +445,29 @@ EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) { void TKafkaProduceActor::SendResults(const TActorContext& ctx) { auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL; - KAFKA_LOG_T("Sending results. QueueSize= " << PendingRequests.size() << ", ExpirationTime=" << expireTime); // We send the results in the order of receipt of the request while (!PendingRequests.empty()) { auto pendingRequest = PendingRequests.front(); - + auto* request = pendingRequest->Request->Get()->Request; + auto correlationId = pendingRequest->Request->Get()->CorrelationId; + EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR; + // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. bool expired = expireTime > pendingRequest->StartTime; - if (!expired && !pendingRequest->WaitResultCookies.empty()) { - return; - } - - auto* r = pendingRequest->Request->Get()->Request; - auto correlationId = pendingRequest->Request->Get()->CorrelationId; - KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired); - const auto topicsCount = r->TopicData.size(); + const auto topicsCount = request->TopicData.size(); auto response = std::make_shared<TProduceResponseData>(); response->Responses.resize(topicsCount); size_t position = 0; for(size_t i = 0; i < topicsCount; ++i) { - const auto& topicData = r->TopicData[i]; + const auto& topicData = request->TopicData[i]; const auto partitionCount = topicData.PartitionData.size(); - auto& topicResponse = response->Responses[i]; topicResponse.Name = topicData.Name; topicResponse.PartitionResponses.resize(partitionCount); @@ -475,35 +476,40 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { const auto& partitionData = topicData.PartitionData[j]; auto& partitionResponse = topicResponse.PartitionResponses[j]; const auto& result = pendingRequest->Results[position++]; - + size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0; partitionResponse.Index = partitionData.Index; - - if (EKafkaErrors::NONE_ERROR != result.ErrorCode) { + if (expired || pendingRequest->WaitResultCookies.empty()) { + SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); + } else if (EKafkaErrors::NONE_ERROR != result.ErrorCode ) { KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage); partitionResponse.ErrorCode = result.ErrorCode; + metricsErrorCode = result.ErrorCode; partitionResponse.ErrorMessage = result.ErrorMessage; + SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); } else { auto* msg = result.Value->Get(); if (msg->IsSuccess()) { KAFKA_LOG_T("Partition result success."); partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR; auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult(); - if (!writeResults.empty()) { + SendMetrics(TStringBuilder() << topicData.Name, writeResults.size(), "successful_messages", ctx); auto& lastResult = writeResults.at(writeResults.size() - 1); partitionResponse.LogAppendTimeMs = lastResult.GetWriteTimestampMS(); partitionResponse.BaseOffset = lastResult.GetSeqNo(); } } else { KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason); + SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); partitionResponse.ErrorCode = Convert(msg->GetError().Code); + metricsErrorCode = Convert(msg->GetError().Code); partitionResponse.ErrorMessage = msg->GetError().Reason; } } } } - Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response)); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response, metricsErrorCode)); if (!pendingRequest->WaitAcceptingCookies.empty()) { if (!expired) { diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index 70f4d09ab9d..822c880fc30 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -131,6 +131,7 @@ private: TString LogPrefix(); void LogEvent(IEventHandle& ev); + void SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx); private: const TContext::TPtr Context; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index e0c3bab0ce8..4d7f52b957a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -10,7 +10,7 @@ namespace NKafka { -static constexpr char ERROR_AUTH_BYTES[] = ""; +static constexpr char EMPTY_AUTH_BYTES[] = ""; NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) { return new TKafkaSaslAuthActor(context, correlationId, address, message); @@ -18,14 +18,14 @@ NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) { - SendAuthFailedAndDie(EKafkaErrors::ILLEGAL_SASL_STATE, + SendResponseAndDie(EKafkaErrors::ILLEGAL_SASL_STATE, "Request is not valid given the current SASL state.", TStringBuilder() << "Current step: " << static_cast<int>(Context->AuthenticationStep), ctx); return; } if (Context->SaslMechanism != "PLAIN") { - SendAuthFailedAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, + SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM, "Does not support the requested SASL mechanism.", TStringBuilder() << "Requested mechanism '" << Context->SaslMechanism << "'", ctx); @@ -35,33 +35,25 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { StartPlainAuth(ctx); } -void TKafkaSaslAuthActor::PassAway() { - if (SubscriberId) { - Send(SubscriberId, new TEvents::TEvPoison()); - } - TActorBootstrapped::PassAway(); -} - void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) { TAuthData authData; if (!TryParseAuthDataTo(authData, ctx)) { return; } - Database = CanonizePath(authData.Database); + + DatabasePath = CanonizePath(authData.Database); SendLoginRequest(authData, ctx); SendDescribeRequest(ctx); } void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) { if (ev->Get()->Error) { - SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx); return; } Authentificated = true; - - Token = ev->Get()->Token; - + UserToken = ev->Get()->Token; ReplyIfReady(ctx); } @@ -78,33 +70,47 @@ void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) { return; } - KAFKA_LOG_D("Authentificated success. Database='" << Database << "', " + SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx); +} + +void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) { + auto isFailed = errorCode != EKafkaErrors::NONE_ERROR; + + auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); + responseToClient->ErrorCode = errorCode; + responseToClient->AuthBytes = TKafkaRawBytes(EMPTY_AUTH_BYTES, sizeof(EMPTY_AUTH_BYTES)); + + if (isFailed) { + KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details); + responseToClient->ErrorMessage = TStringBuilder() << "Authentication failure. " << errorMessage; + + auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode); + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage); + Send(Context->ConnectionId, authResult); + } else { + KAFKA_LOG_D("Authentificated success. Database='" << DatabasePath << "', " << "FolderId='" << FolderId << "', " << "ServiceAccountId='" << ServiceAccountId << "', " << "DatabaseId='" << DatabaseId << "', " << "Coordinator='" << Coordinator << "', " << "ResourcePath='" << ResourcePath << "'"); + responseToClient->ErrorMessage = ""; - auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); - responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR; - responseToClient->ErrorMessage = ""; - responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); - - auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); - - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, Token, Database, FolderId, ServiceAccountId, DatabaseId, Coordinator, ResourcePath); - Send(Context->ConnectionId, authResult); - + auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode); + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage); + Send(Context->ConnectionId, authResult); + } + Die(ctx); } void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) { - SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx); } bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx) { if (!AuthenticateRequestData->AuthBytes.has_value()) { - SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx); return false; } @@ -112,7 +118,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut TString auth(rawAuthBytes.data(), rawAuthBytes.size()); TVector<TString> tokens = StringSplitter(auth).Split('\0'); if (tokens.size() != 3) { - SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, TStringBuilder() << "Invalid SASL/PLAIN response: expected 3 tokens, got " << tokens.size(), "", ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, TStringBuilder() << "Invalid SASL/PLAIN response: expected 3 tokens, got " << tokens.size(), "", ctx); return false; } @@ -121,7 +127,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut auto password = tokens[2]; size_t atPos = userAndDatabase.rfind('@'); if (atPos == TString::npos) { - SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx); + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx); return false; } @@ -131,21 +137,6 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut return true; } -void TKafkaSaslAuthActor::SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) { - KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details); - - auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); - responseToClient->ErrorCode = errorCode; - responseToClient->ErrorMessage = TStringBuilder() << "Authentication failure. " << errorMessage; - responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); - - auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage); - Send(Context->ConnectionId, authResult); - - Die(ctx); -} - void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx) { Ydb::Auth::LoginRequest request; request.set_user(authData.UserName); @@ -174,48 +165,33 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa }); } -void TKafkaSaslAuthActor::SendDescribeRequest(const NActors::TActorContext& /*ctx*/) { - if (Database.Empty()) { - Described = true; - return; - } - - KAFKA_LOG_D("Describe database '" << Database << "'"); - SubscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), Database)); +void TKafkaSaslAuthActor::SendDescribeRequest(const TActorContext& ctx) { + auto schemeCacheRequest = std::make_unique<NKikimr::NSchemeCache::TSchemeCacheNavigate>(); + NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = NKikimr::SplitPath(DatabasePath); + entry.Operation = NKikimr::NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.SyncVersion = false; + schemeCacheRequest->ResultSet.emplace_back(entry); + ctx.Send(NKikimr::MakeSchemeCacheID(), MakeHolder<NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release())); } -void TKafkaSaslAuthActor::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) { - Described = true; - - auto* result = ev->Get(); - auto status = result->DescribeSchemeResult.GetStatus(); - if (status != NKikimrScheme::EStatus::StatusSuccess) { - KAFKA_LOG_ERROR("Describe database '" << Database << "' error: " << status); - ReplyIfReady(ctx); +void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + const NKikimr::NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); + if (navigate->ErrorCount) { + SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists", ctx); return; } - - for(const auto& attr : result->DescribeSchemeResult.GetPathDescription().GetUserAttributes()) { - const auto& key = attr.GetKey(); - const auto& value = attr.GetValue(); - - KAFKA_LOG_D("Database attribute key=" << key << ", value=" << value); - - if (key == "folder_id") { - FolderId = value; - } else if (key == "service_account_id") { - ServiceAccountId = value; - } else if (key == "database_id") { - DatabaseId = value; - } else if (key == "serverless_rt_coordination_node_path") { - Coordinator = value; - } else if (key == "serverless_rt_base_resource_ru") { - ResourcePath = value; - } + Y_VERIFY(navigate->ResultSet.size() == 1); + for (const auto& attr : navigate->ResultSet.front().Attributes) { + if (attr.first == "folder_id") FolderId = attr.second; + if (attr.first == "cloud_id") CloudId = attr.second; + if (attr.first == "database_id") DatabaseId = attr.second; + if (attr.first == "service_account_id") ServiceAccountId = attr.second; + if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second; + if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second; } - + Described = true; ReplyIfReady(ctx); } - } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index 8c3007096f5..b03b8869fda 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -5,6 +5,8 @@ #include <ydb/core/tx/scheme_board/events.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include "actors.h" @@ -57,26 +59,23 @@ private: HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle); HFunc(TEvPrivate::TEvTokenReady, Handle); HFunc(TEvPrivate::TEvAuthFailed, Handle); - HFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle); + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); } } void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); void StartPlainAuth(const NActors::TActorContext& ctx); void SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx); void SendDescribeRequest(const NActors::TActorContext& ctx); - void SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx); bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx); - + void SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx); + void ReplyIfReady(const NActors::TActorContext& ctx); -protected: - void PassAway() override; - private: const TContext::TPtr Context; const ui64 CorrelationId; @@ -84,17 +83,17 @@ private: const TSaslAuthenticateRequestData* AuthenticateRequestData; const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address; - TString Database; - TIntrusiveConstPtr<NACLib::TUserToken> Token; + TString DatabasePath; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString FolderId; TString ServiceAccountId; TString DatabaseId; TString Coordinator; TString ResourcePath; + TString CloudId; bool Authentificated = false; bool Described = false; - TActorId SubscriberId; }; } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp index c768987ae13..931bad1d3d9 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp @@ -34,7 +34,7 @@ void TKafkaSaslHandshakeActor::SendResponse(const TString& errorMessage, EKafkaE responseToClient->ErrorCode = kafkaError; responseToClient->Mechanisms.insert(responseToClient->Mechanisms.end(), SUPPORTED_SASL_MECHANISMS.begin(), SUPPORTED_SASL_MECHANISMS.end()); - auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); + auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, kafkaError); auto handshakeResult = new TEvKafka::TEvHandshakeResult(authStep, evResponse, saslMechanism, errorMessage); Send(Context->ConnectionId, handshakeResult); } diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 061af4d3e0d..8f7ea0ce1bb 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -6,6 +6,7 @@ #include "kafka_events.h" #include "kafka_log_impl.h" #include "actors/actors.h" +#include "kafka_metrics.h" #include <strstream> #include <sstream> @@ -38,7 +39,11 @@ public: TRequestHeaderData Header; std::unique_ptr<TApiMessage> Message; + TInstant StartTime; + TString Method; + TApiMessage::TPtr Response; + EKafkaErrors ResponseErrorCode; }; static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); @@ -161,6 +166,24 @@ protected: return sb; } + void SendRequestMetrics(const TActorContext& ctx) { + if (Context) { + ctx.Send(MakeKafkaMetricsServiceID(), + new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, Request->Method, "", "api.kafka.request.count", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), + new TEvKafka::TEvUpdateCounter(Request->Size, BuildLabels(Context, Request->Method, "", "api.kafka.request.bytes", ""))); + } + } + + void SendResponseMetrics(const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) { + TDuration duration = TInstant::Now() - requestStartTime; + ctx.Send(MakeKafkaMetricsServiceID(), + new TEvKafka::TEvUpdateHistCounter(static_cast<i64>(duration.MilliSeconds()), 1, BuildLabels(Context, method, "", "api.kafka.response.duration_milliseconds", "") + )); + ctx.Send(MakeKafkaMetricsServiceID(), + new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, method, "", "api.kafka.response.count", TStringBuilder() << (i16)errorCode))); + } + void OnAccept() { InactivityTimer.Reset(); TBase::Become(&TKafkaConnection::StateConnected); @@ -220,13 +243,13 @@ protected: KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); - Msg::TPtr r = Request; + Request->Method = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey))->second; - PendingRequestsQueue.push_back(r); - PendingRequests[r->Header.CorrelationId] = r; + PendingRequestsQueue.push_back(Request); + PendingRequests[Request->Header.CorrelationId] = Request; TApiMessage* message = Request->Message.get(); - + SendRequestMetrics(ctx); switch (Request->Header.RequestApiKey) { case PRODUCE: HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message), ctx); @@ -260,12 +283,12 @@ protected: void Handle(TEvKafka::TEvResponse::TPtr response, const TActorContext& ctx) { auto r = response->Get(); - Reply(r->CorrelationId, r->Response, ctx); + Reply(r->CorrelationId, r->Response, r->ErrorCode, ctx); } void Handle(TEvKafka::TEvAuthResult::TPtr ev, const TActorContext& ctx) { auto event = ev->Get(); - Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, ctx); + Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, event->ClientResponse->ErrorCode, ctx); auto authStep = event->AuthStep; if (authStep == EAuthSteps::FAILED) { @@ -275,16 +298,19 @@ protected: } Context->UserToken = event->UserToken; - Context->Database = event->Database; + Context->DatabasePath = event->DatabasePath; Context->AuthenticationStep = authStep; - Context->RlContext = {event->Coordinator, event->ResourcePath, event->Database, event->UserToken->GetSerializedToken()}; + Context->RlContext = {event->Coordinator, event->ResourcePath, event->DatabasePath, event->UserToken->GetSerializedToken()}; + Context->DatabaseId = event->DatabaseId; + Context->CloudId = event->CloudId; + Context->FolderId = event->FolderId; KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID()); } void Handle(TEvKafka::TEvHandshakeResult::TPtr ev, const TActorContext& ctx) { auto event = ev->Get(); - Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, ctx); + Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, event->ClientResponse->ErrorCode, ctx); auto authStep = event->AuthStep; if (authStep == EAuthSteps::FAILED) { @@ -297,7 +323,7 @@ protected: Context->AuthenticationStep = authStep; } - void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) { + void Reply(const ui64 correlationId, TApiMessage::TPtr response, EKafkaErrors errorCode, const TActorContext& ctx) { auto it = PendingRequests.find(correlationId); if (it == PendingRequests.end()) { KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId); @@ -306,21 +332,22 @@ protected: auto request = it->second; request->Response = response; + request->ResponseErrorCode = errorCode; request->Buffer.Clear(); - ProcessReplyQueue(); + ProcessReplyQueue(ctx); DoRead(ctx); } - void ProcessReplyQueue() { + void ProcessReplyQueue(const TActorContext& ctx) { while(!PendingRequestsQueue.empty()) { auto& request = PendingRequestsQueue.front(); if (request->Response.get() == nullptr) { break; } - Reply(&request->Header, request->Response.get()); + Reply(&request->Header, request->Response.get(), request->Method, request->StartTime, request->ResponseErrorCode, ctx); InflightSize -= request->ExpectedSize; @@ -329,7 +356,7 @@ protected: } } - void Reply(const TRequestHeaderData* header, const TApiMessage* reply) { + void Reply(const TRequestHeaderData* header, const TApiMessage* reply, const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) { TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion); TKafkaVersion version = header->RequestApiVersion; @@ -340,7 +367,7 @@ protected: TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize()); TKafkaWritable writable(buffer); - + SendResponseMetrics(method, requestStartTime, errorCode, ctx); try { writable << size; responseHeader.Write(writable, headerVersion); @@ -360,7 +387,6 @@ protected: void DoRead(const TActorContext& ctx) { KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step)); - for (;;) { while (Demand) { ssize_t received = 0; @@ -459,9 +485,8 @@ protected: break; case MESSAGE_PROCESS: - KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId); - - TKafkaReadable readable(Request->Buffer); + Request->StartTime = TInstant::Now(); + KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);TKafkaReadable readable(Request->Buffer); try { Request->Message = CreateRequest(Request->ApiKey); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index f9577438dfb..48f03f403da 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -18,6 +18,8 @@ struct TEvKafka { EvAuthResult, EvHandshakeResult, EvWakeup, + EvUpdateCounter, + EvUpdateHistCounter, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -39,27 +41,31 @@ struct TEvKafka { }; struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> { - TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response) + TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response, EKafkaErrors errorCode) : CorrelationId(correlationId) - , Response(std::move(response)) { + , Response(std::move(response)) + , ErrorCode(errorCode) { } const ui64 CorrelationId; const TApiMessage::TPtr Response; + const EKafkaErrors ErrorCode; }; struct TEvAuthResult : public TEventLocal<TEvAuthResult, EvAuthResult> { + TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TString error = "") : AuthStep(authStep) , Error(error) , ClientResponse(clientResponse) { } - TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database, - TString folderId, TString serviceAccountId, TString databaseId, TString coordinator, TString resourcePath, TString error = "") + TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString databasePath, TString databaseId, + TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, TString error = "") : AuthStep(authStep) , UserToken(token) - , Database(database) + , DatabasePath(databasePath) + , CloudId(cloudId) , FolderId(folderId) , ServiceAccountId(serviceAccountId) , DatabaseId(databaseId) @@ -71,7 +77,8 @@ struct TEvKafka { EAuthSteps AuthStep; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; - TString Database; + TString DatabasePath; + TString CloudId; TString FolderId; TString ServiceAccountId; TString DatabaseId; @@ -97,6 +104,28 @@ struct TEvKafka { std::shared_ptr<TEvKafka::TEvResponse> ClientResponse; }; + struct TEvUpdateCounter : public TEventLocal<TEvUpdateCounter, EvUpdateCounter> { + i64 Delta; + TVector<std::pair<TString, TString>> Labels; + + TEvUpdateCounter(const i64 delta, const TVector<std::pair<TString, TString>> labels) + : Delta(delta) + , Labels(labels) + {} + }; + + struct TEvUpdateHistCounter : public TEventLocal<TEvUpdateHistCounter, EvUpdateHistCounter> { + i64 Value; + ui64 Count; + TVector<std::pair<TString, TString>> Labels; + + TEvUpdateHistCounter(const i64 value, const ui64 count, const TVector<std::pair<TString, TString>> labels) + : Value(value) + , Count(count) + , Labels(labels) + {} + }; + struct TEvWakeup : public TEventLocal<TEvWakeup, EvWakeup> { }; }; diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 68d4a0777ca..52cb51d18f1 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -6,6 +6,18 @@ namespace NKafka { +const std::unordered_map<EApiKey, TString> EApiKeyNames = { + {EApiKey::HEADER, "HEADER"}, + {EApiKey::PRODUCE, "PRODUCE"}, + {EApiKey::FETCH, "FETCH"}, + {EApiKey::METADATA, "METADATA"}, + {EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"}, + {EApiKey::API_VERSIONS, "API_VERSIONS"}, + {EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"}, + {EApiKey::SASL_AUTHENTICATE, "SASL_AUTHENTICATE"}, +}; + + std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) { switch (apiKey) { case PRODUCE: diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index 0d63f9fd9e7..6c795e4bf75 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -25,7 +25,7 @@ enum EApiKey { SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER] }; - +extern const std::unordered_map<EApiKey, TString> EApiKeyNames; diff --git a/ydb/core/kafka_proxy/kafka_metrics.cpp b/ydb/core/kafka_proxy/kafka_metrics.cpp new file mode 100644 index 00000000000..b001d2e6c5c --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_metrics.cpp @@ -0,0 +1,17 @@ +#include "kafka_events.h" +#include "kafka_metrics.h" + +namespace NKafka { + +TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode) { + return {{"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId}, + {"folder_id", context->FolderId}, {"database_id", context->DatabaseId}, + {"topic", topic}, {"errorCode", errorCode}, {"name", name}}; +} + +TActorId MakeKafkaMetricsServiceID() { + static const char x[12] = "kafka_mtrcs"; + return TActorId(0, TStringBuf(x, 12)); +} + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_metrics.h b/ydb/core/kafka_proxy/kafka_metrics.h new file mode 100644 index 00000000000..a18797e0ba3 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_metrics.h @@ -0,0 +1,11 @@ +#pragma once + +#include "kafka_events.h" + +namespace NKafka { + +TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode); + +TActorId MakeKafkaMetricsServiceID(); + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 5696e1cdebd..832dfc03ed4 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -7,6 +7,7 @@ SRCS( actors/kafka_produce_actor.cpp actors/kafka_sasl_auth_actor.cpp actors/kafka_sasl_handshake_actor.cpp + actors/kafka_metrics_actor.cpp kafka_connection.cpp kafka_connection.h kafka_listener.h @@ -19,6 +20,7 @@ SRCS( kafka_messages_int.h kafka_proxy.h kafka_records.cpp + kafka_metrics.cpp ) PEERDIR( diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 4650b63bc9b..6d98f1b4343 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -46,7 +46,9 @@ #include <ydb/core/security/ldap_auth_provider.h> #include <ydb/core/base/user_registry.h> #include <ydb/core/health_check/health_check.h> +#include <ydb/core/kafka_proxy/actors/kafka_metrics_actor.h> #include <ydb/core/kafka_proxy/kafka_listener.h> +#include <ydb/core/kafka_proxy/kafka_metrics.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> @@ -940,6 +942,10 @@ namespace Tests { IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig.GetKafkaProxyConfig()); TActorId actorId = Runtime->Register(actor, nodeIdx); Runtime->RegisterService(TActorId{}, actorId, nodeIdx); + + IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")}); + TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx); + Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx); } if (Settings->EnableYq) { |