aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-08-31 10:12:48 +0300
committersavnik <savnik@yandex-team.com>2023-08-31 10:27:02 +0300
commitcee0e3df5d707640983570d06892747720e0bb61 (patch)
tree23a94ac7ffcda52f153bb504ba9c1bc3bc443e5e
parenteda974389da3036fc702fce50ab75ad71ad13bfb (diff)
downloadydb-cee0e3df5d707640983570d06892747720e0bb61.tar.gz
Kafka metrics
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp9
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h5
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp5
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp4
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp5
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.h1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp79
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metrics_actor.h17
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp44
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp138
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h19
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp2
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp63
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h41
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp12
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h2
-rw-r--r--ydb/core/kafka_proxy/kafka_metrics.cpp17
-rw-r--r--ydb/core/kafka_proxy/kafka_metrics.h11
-rw-r--r--ydb/core/kafka_proxy/ya.make2
-rw-r--r--ydb/core/testlib/test_client.cpp6
25 files changed, 347 insertions, 144 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 607364d78ef..919b2a9a6e9 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -65,6 +65,8 @@
#include <ydb/core/health_check/health_check.h>
+#include <ydb/core/kafka_proxy/actors/kafka_metrics_actor.h>
+#include <ydb/core/kafka_proxy/kafka_metrics.h>
#include <ydb/core/kafka_proxy/kafka_proxy.h>
#include <ydb/core/kqp/common/kqp.h>
@@ -2635,6 +2637,13 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),
TMailboxType::HTSwap, appData->UserPoolId)
);
+
+ IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters->GetSubgroup("counters", "kafka_proxy")});
+ setup->LocalServices.emplace_back(
+ NKafka::MakeKafkaMetricsServiceID(),
+ TActorSetupCmd(metricsActor,
+ TMailboxType::HTSwap, appData->UserPoolId)
+ );
}
}
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index 4a2aad8d4a4..b5e422d9e17 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -26,8 +26,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index 3c0e1dff11f..64ae0928906 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -27,8 +27,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index 3c0e1dff11f..64ae0928906 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -27,8 +27,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index 4a2aad8d4a4..b5e422d9e17 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -26,8 +26,10 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 526328bddb9..88fe30d8c93 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -30,7 +30,10 @@ struct TContext {
EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE;
TString SaslMechanism;
- TString Database;
+ TString DatabasePath;
+ TString FolderId;
+ TString CloudId;
+ TString DatabaseId;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString ClientDC;
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
index bb24a616156..14a0e4931a0 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -10,6 +10,7 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const
TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
+ response->ErrorCode = EKafkaErrors::NONE_ERROR;
response->ApiKeys.resize(6);
response->ApiKeys[0].ApiKey = PRODUCE;
@@ -41,8 +42,8 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
void TKafkaApiVersionsActor::Bootstrap(const NActors::TActorContext& ctx) {
Y_UNUSED(Message);
-
- Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetApiVersions()));
+ auto apiVersions = GetApiVersions();
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, apiVersions, static_cast<EKafkaErrors>(apiVersions->ErrorCode)));
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
index 71bd8503fca..481c21869c9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
@@ -22,8 +22,8 @@ TInitProducerIdResponseData::TPtr GetResponse(const NActors::TActorContext& ctx)
void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) {
Y_UNUSED(Message);
-
- Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, GetResponse(ctx)));
+ auto response = GetResponse(ctx);
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 0951a66e05c..15e8a2a5fd8 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -48,7 +48,7 @@ TActorId TKafkaMetadataActor::SendTopicRequest(const TMetadataRequestData::TMeta
TGetPartitionsLocationRequest locationRequest{};
locationRequest.Topic = topicRequest.Name.value();
locationRequest.Token = Context->UserToken->GetSerializedToken();
- locationRequest.Database = Context->Database;
+ locationRequest.Database = Context->DatabasePath;
PendingResponses++;
@@ -59,6 +59,7 @@ void TKafkaMetadataActor::AddTopicError(
TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode
) {
topic.ErrorCode = errorCode;
+ ErrorCode = errorCode;
}
void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
@@ -134,7 +135,7 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
void TKafkaMetadataActor::RespondIfRequired(const TActorContext& ctx) {
if (PendingResponses == 0) {
- Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, ErrorCode));
Die(ctx);
}
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
index d55659d0ace..343c9377d0d 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
@@ -45,6 +45,7 @@ private:
TMetadataResponseData::TPtr Response;
THashMap<TActorId, TVector<ui64>> TopicIndexes;
THashSet<ui64> AllClusterNodes;
+ EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;
};
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
new file mode 100644
index 00000000000..de410274923
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
@@ -0,0 +1,79 @@
+#include "kafka_metrics_actor.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/cache/cache.h>
+
+#include <library/cpp/monlib/metrics/histogram_collector.h>
+
+namespace NKafka {
+
+ using namespace NActors;
+
+ class TKafkaMetricsActor : public NActors::TActorBootstrapped<TKafkaMetricsActor> {
+ using TBase = NActors::TActorBootstrapped<TKafkaMetricsActor>;
+ public:
+ explicit TKafkaMetricsActor(const TKafkaMetricsSettings& settings)
+ : Settings(settings)
+ {
+ }
+
+ void Bootstrap(const TActorContext&) {
+ TBase::Become(&TKafkaMetricsActor::StateWork);
+ }
+
+ TStringBuilder LogPrefix() const {
+ return {};
+ }
+
+ private:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKafka::TEvUpdateCounter, Handle);
+ HFunc(TEvKafka::TEvUpdateHistCounter, Handle);
+
+ }
+ }
+
+ void Handle(TEvKafka::TEvUpdateCounter::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvKafka::TEvUpdateHistCounter::TPtr& ev, const TActorContext& ctx);
+ TIntrusivePtr<NMonitoring::TDynamicCounters> GetGroupFromLabels(const TVector<std::pair<TString, TString>>& labels);
+
+ private:
+ TKafkaMetricsSettings Settings;
+ };
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> TKafkaMetricsActor::GetGroupFromLabels(const TVector<std::pair<TString, TString>>& labels) {
+ Y_VERIFY(labels.size() > 1);
+ auto group = Settings.Counters->GetSubgroup(labels[0].first, labels[0].second);
+ for (ui32 i = 1; i + 1 < labels.size(); ++i) {
+ if (labels[i].second.empty())
+ continue;
+ group = group->GetSubgroup(labels[i].first, labels[i].second);
+ }
+ return group;
+ }
+
+ void TKafkaMetricsActor::Handle(TEvKafka::TEvUpdateCounter::TPtr& ev, const TActorContext&) {
+ auto labels = ev->Get()->Labels;
+ auto group = GetGroupFromLabels(labels);
+ auto counter = group->GetNamedCounter(labels.back().first, labels.back().second, true);
+ *counter += ev->Get()->Delta;
+ }
+
+ void TKafkaMetricsActor::Handle(TEvKafka::TEvUpdateHistCounter::TPtr& ev, const TActorContext&) {
+ auto labels = ev->Get()->Labels;
+ auto group = GetGroupFromLabels(labels);
+ auto counter = group->GetNamedHistogram(labels.back().first, labels.back().second,
+ NMonitoring::ExplicitHistogram({100, 200, 500, 1000, 2000, 5000, 10000, 30000}));
+ counter->Collect(ev->Get()->Value, ev->Get()->Count);
+ }
+
+
+ NActors::IActor* CreateKafkaMetricsActor(const TKafkaMetricsSettings& settings) {
+ return new TKafkaMetricsActor(settings);
+ }
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h
new file mode 100644
index 00000000000..123d568119c
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKafka {
+
+ struct TKafkaMetricsSettings {
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ };
+
+ NActors::IActor* CreateKafkaMetricsActor(const TKafkaMetricsSettings& settings);
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 5fd4d2b2da6..926ede3640c 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -1,4 +1,6 @@
#include "kafka_produce_actor.h"
+#include "../kafka_metrics.h"
+
#include <contrib/libs/protobuf/src/google/protobuf/util/time_util.h>
@@ -46,6 +48,11 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
KAFKA_LOG_T("Received event: " << ev.GetTypeName());
}
+void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) {
+ ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicName, TStringBuilder() << "api.kafka.produce." << name, "")));
+ ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicName, "api.kafka.produce.total_messages", "")));
+}
+
void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) {
Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup());
Become(&TKafkaProduceActor::StateWork);
@@ -231,7 +238,7 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
for(const auto& e : Requests) {
auto* r = e->Get()->Request;
for(const auto& topicData : r->TopicData) {
- const auto& topicPath = NormalizePath(Context->Database, *topicData.Name);
+ const auto& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
if (!Topics.contains(topicPath)) {
requireInitialization = true;
TopicsForInitialization.insert(topicPath);
@@ -324,7 +331,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
size_t position = 0;
for(const auto& topicData : r->TopicData) {
- const TString& topicPath = NormalizePath(Context->Database, *topicData.Name);
+ const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
const auto partitionId = partitionData.Index;
@@ -438,35 +445,29 @@ EKafkaErrors Convert(TEvPartitionWriter::TEvWriteResponse::EErrors value) {
void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL;
-
KAFKA_LOG_T("Sending results. QueueSize= " << PendingRequests.size() << ", ExpirationTime=" << expireTime);
// We send the results in the order of receipt of the request
while (!PendingRequests.empty()) {
auto pendingRequest = PendingRequests.front();
-
+ auto* request = pendingRequest->Request->Get()->Request;
+ auto correlationId = pendingRequest->Request->Get()->CorrelationId;
+ EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR;
+
// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
bool expired = expireTime > pendingRequest->StartTime;
- if (!expired && !pendingRequest->WaitResultCookies.empty()) {
- return;
- }
-
- auto* r = pendingRequest->Request->Get()->Request;
- auto correlationId = pendingRequest->Request->Get()->CorrelationId;
-
KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired);
- const auto topicsCount = r->TopicData.size();
+ const auto topicsCount = request->TopicData.size();
auto response = std::make_shared<TProduceResponseData>();
response->Responses.resize(topicsCount);
size_t position = 0;
for(size_t i = 0; i < topicsCount; ++i) {
- const auto& topicData = r->TopicData[i];
+ const auto& topicData = request->TopicData[i];
const auto partitionCount = topicData.PartitionData.size();
-
auto& topicResponse = response->Responses[i];
topicResponse.Name = topicData.Name;
topicResponse.PartitionResponses.resize(partitionCount);
@@ -475,35 +476,40 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
const auto& partitionData = topicData.PartitionData[j];
auto& partitionResponse = topicResponse.PartitionResponses[j];
const auto& result = pendingRequest->Results[position++];
-
+ size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0;
partitionResponse.Index = partitionData.Index;
-
- if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
+ if (expired || pendingRequest->WaitResultCookies.empty()) {
+ SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
+ } else if (EKafkaErrors::NONE_ERROR != result.ErrorCode ) {
KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage);
partitionResponse.ErrorCode = result.ErrorCode;
+ metricsErrorCode = result.ErrorCode;
partitionResponse.ErrorMessage = result.ErrorMessage;
+ SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
} else {
auto* msg = result.Value->Get();
if (msg->IsSuccess()) {
KAFKA_LOG_T("Partition result success.");
partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR;
auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult();
-
if (!writeResults.empty()) {
+ SendMetrics(TStringBuilder() << topicData.Name, writeResults.size(), "successful_messages", ctx);
auto& lastResult = writeResults.at(writeResults.size() - 1);
partitionResponse.LogAppendTimeMs = lastResult.GetWriteTimestampMS();
partitionResponse.BaseOffset = lastResult.GetSeqNo();
}
} else {
KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason);
+ SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
partitionResponse.ErrorCode = Convert(msg->GetError().Code);
+ metricsErrorCode = Convert(msg->GetError().Code);
partitionResponse.ErrorMessage = msg->GetError().Reason;
}
}
}
}
- Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response));
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response, metricsErrorCode));
if (!pendingRequest->WaitAcceptingCookies.empty()) {
if (!expired) {
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index 70f4d09ab9d..822c880fc30 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -131,6 +131,7 @@ private:
TString LogPrefix();
void LogEvent(IEventHandle& ev);
+ void SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx);
private:
const TContext::TPtr Context;
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index e0c3bab0ce8..4d7f52b957a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -10,7 +10,7 @@
namespace NKafka {
-static constexpr char ERROR_AUTH_BYTES[] = "";
+static constexpr char EMPTY_AUTH_BYTES[] = "";
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) {
return new TKafkaSaslAuthActor(context, correlationId, address, message);
@@ -18,14 +18,14 @@ NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui
void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
if (Context->AuthenticationStep != EAuthSteps::WAIT_AUTH) {
- SendAuthFailedAndDie(EKafkaErrors::ILLEGAL_SASL_STATE,
+ SendResponseAndDie(EKafkaErrors::ILLEGAL_SASL_STATE,
"Request is not valid given the current SASL state.",
TStringBuilder() << "Current step: " << static_cast<int>(Context->AuthenticationStep),
ctx);
return;
}
if (Context->SaslMechanism != "PLAIN") {
- SendAuthFailedAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM,
+ SendResponseAndDie(EKafkaErrors::UNSUPPORTED_SASL_MECHANISM,
"Does not support the requested SASL mechanism.",
TStringBuilder() << "Requested mechanism '" << Context->SaslMechanism << "'",
ctx);
@@ -35,33 +35,25 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
StartPlainAuth(ctx);
}
-void TKafkaSaslAuthActor::PassAway() {
- if (SubscriberId) {
- Send(SubscriberId, new TEvents::TEvPoison());
- }
- TActorBootstrapped::PassAway();
-}
-
void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) {
TAuthData authData;
if (!TryParseAuthDataTo(authData, ctx)) {
return;
}
- Database = CanonizePath(authData.Database);
+
+ DatabasePath = CanonizePath(authData.Database);
SendLoginRequest(authData, ctx);
SendDescribeRequest(ctx);
}
void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) {
if (ev->Get()->Error) {
- SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->Error.Message, ctx);
return;
}
Authentificated = true;
-
- Token = ev->Get()->Token;
-
+ UserToken = ev->Get()->Token;
ReplyIfReady(ctx);
}
@@ -78,33 +70,47 @@ void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) {
return;
}
- KAFKA_LOG_D("Authentificated success. Database='" << Database << "', "
+ SendResponseAndDie(EKafkaErrors::NONE_ERROR, "", "", ctx);
+}
+
+void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) {
+ auto isFailed = errorCode != EKafkaErrors::NONE_ERROR;
+
+ auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
+ responseToClient->ErrorCode = errorCode;
+ responseToClient->AuthBytes = TKafkaRawBytes(EMPTY_AUTH_BYTES, sizeof(EMPTY_AUTH_BYTES));
+
+ if (isFailed) {
+ KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details);
+ responseToClient->ErrorMessage = TStringBuilder() << "Authentication failure. " << errorMessage;
+
+ auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode);
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage);
+ Send(Context->ConnectionId, authResult);
+ } else {
+ KAFKA_LOG_D("Authentificated success. Database='" << DatabasePath << "', "
<< "FolderId='" << FolderId << "', "
<< "ServiceAccountId='" << ServiceAccountId << "', "
<< "DatabaseId='" << DatabaseId << "', "
<< "Coordinator='" << Coordinator << "', "
<< "ResourcePath='" << ResourcePath << "'");
+ responseToClient->ErrorMessage = "";
- auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
- responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR;
- responseToClient->ErrorMessage = "";
- responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
-
- auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
-
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, Token, Database, FolderId, ServiceAccountId, DatabaseId, Coordinator, ResourcePath);
- Send(Context->ConnectionId, authResult);
-
+ auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode);
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage);
+ Send(Context->ConnectionId, authResult);
+ }
+
Die(ctx);
}
void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) {
- SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx);
}
bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx) {
if (!AuthenticateRequestData->AuthBytes.has_value()) {
- SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", "AuthBytes is empty.", ctx);
return false;
}
@@ -112,7 +118,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
TString auth(rawAuthBytes.data(), rawAuthBytes.size());
TVector<TString> tokens = StringSplitter(auth).Split('\0');
if (tokens.size() != 3) {
- SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, TStringBuilder() << "Invalid SASL/PLAIN response: expected 3 tokens, got " << tokens.size(), "", ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, TStringBuilder() << "Invalid SASL/PLAIN response: expected 3 tokens, got " << tokens.size(), "", ctx);
return false;
}
@@ -121,7 +127,7 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
auto password = tokens[2];
size_t atPos = userAndDatabase.rfind('@');
if (atPos == TString::npos) {
- SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx);
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx);
return false;
}
@@ -131,21 +137,6 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
return true;
}
-void TKafkaSaslAuthActor::SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx) {
- KAFKA_LOG_ERROR("Authentication failure. " << errorMessage << " " << details);
-
- auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
- responseToClient->ErrorCode = errorCode;
- responseToClient->ErrorMessage = TStringBuilder() << "Authentication failure. " << errorMessage;
- responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
-
- auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage);
- Send(Context->ConnectionId, authResult);
-
- Die(ctx);
-}
-
void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx) {
Ydb::Auth::LoginRequest request;
request.set_user(authData.UserName);
@@ -174,48 +165,33 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa
});
}
-void TKafkaSaslAuthActor::SendDescribeRequest(const NActors::TActorContext& /*ctx*/) {
- if (Database.Empty()) {
- Described = true;
- return;
- }
-
- KAFKA_LOG_D("Describe database '" << Database << "'");
- SubscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), Database));
+void TKafkaSaslAuthActor::SendDescribeRequest(const TActorContext& ctx) {
+ auto schemeCacheRequest = std::make_unique<NKikimr::NSchemeCache::TSchemeCacheNavigate>();
+ NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry entry;
+ entry.Path = NKikimr::SplitPath(DatabasePath);
+ entry.Operation = NKikimr::NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.SyncVersion = false;
+ schemeCacheRequest->ResultSet.emplace_back(entry);
+ ctx.Send(NKikimr::MakeSchemeCacheID(), MakeHolder<NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
}
-void TKafkaSaslAuthActor::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) {
- Described = true;
-
- auto* result = ev->Get();
- auto status = result->DescribeSchemeResult.GetStatus();
- if (status != NKikimrScheme::EStatus::StatusSuccess) {
- KAFKA_LOG_ERROR("Describe database '" << Database << "' error: " << status);
- ReplyIfReady(ctx);
+void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ const NKikimr::NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
+ if (navigate->ErrorCount) {
+ SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", TStringBuilder() << "Database with path '" << DatabasePath << "' doesn't exists", ctx);
return;
}
-
- for(const auto& attr : result->DescribeSchemeResult.GetPathDescription().GetUserAttributes()) {
- const auto& key = attr.GetKey();
- const auto& value = attr.GetValue();
-
- KAFKA_LOG_D("Database attribute key=" << key << ", value=" << value);
-
- if (key == "folder_id") {
- FolderId = value;
- } else if (key == "service_account_id") {
- ServiceAccountId = value;
- } else if (key == "database_id") {
- DatabaseId = value;
- } else if (key == "serverless_rt_coordination_node_path") {
- Coordinator = value;
- } else if (key == "serverless_rt_base_resource_ru") {
- ResourcePath = value;
- }
+ Y_VERIFY(navigate->ResultSet.size() == 1);
+ for (const auto& attr : navigate->ResultSet.front().Attributes) {
+ if (attr.first == "folder_id") FolderId = attr.second;
+ if (attr.first == "cloud_id") CloudId = attr.second;
+ if (attr.first == "database_id") DatabaseId = attr.second;
+ if (attr.first == "service_account_id") ServiceAccountId = attr.second;
+ if (attr.first == "serverless_rt_coordination_node_path") Coordinator = attr.second;
+ if (attr.first == "serverless_rt_base_resource_ru") ResourcePath = attr.second;
}
-
+ Described = true;
ReplyIfReady(ctx);
}
-
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index 8c3007096f5..b03b8869fda 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -5,6 +5,8 @@
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include "actors.h"
@@ -57,26 +59,23 @@ private:
HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle);
HFunc(TEvPrivate::TEvTokenReady, Handle);
HFunc(TEvPrivate::TEvAuthFailed, Handle);
- HFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
}
}
void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
void StartPlainAuth(const NActors::TActorContext& ctx);
void SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx);
void SendDescribeRequest(const NActors::TActorContext& ctx);
- void SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx);
bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx);
-
+ void SendResponseAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx);
+
void ReplyIfReady(const NActors::TActorContext& ctx);
-protected:
- void PassAway() override;
-
private:
const TContext::TPtr Context;
const ui64 CorrelationId;
@@ -84,17 +83,17 @@ private:
const TSaslAuthenticateRequestData* AuthenticateRequestData;
const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address;
- TString Database;
- TIntrusiveConstPtr<NACLib::TUserToken> Token;
+ TString DatabasePath;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString FolderId;
TString ServiceAccountId;
TString DatabaseId;
TString Coordinator;
TString ResourcePath;
+ TString CloudId;
bool Authentificated = false;
bool Described = false;
- TActorId SubscriberId;
};
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
index c768987ae13..931bad1d3d9 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_handshake_actor.cpp
@@ -34,7 +34,7 @@ void TKafkaSaslHandshakeActor::SendResponse(const TString& errorMessage, EKafkaE
responseToClient->ErrorCode = kafkaError;
responseToClient->Mechanisms.insert(responseToClient->Mechanisms.end(), SUPPORTED_SASL_MECHANISMS.begin(), SUPPORTED_SASL_MECHANISMS.end());
- auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
+ auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, kafkaError);
auto handshakeResult = new TEvKafka::TEvHandshakeResult(authStep, evResponse, saslMechanism, errorMessage);
Send(Context->ConnectionId, handshakeResult);
}
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 061af4d3e0d..8f7ea0ce1bb 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -6,6 +6,7 @@
#include "kafka_events.h"
#include "kafka_log_impl.h"
#include "actors/actors.h"
+#include "kafka_metrics.h"
#include <strstream>
#include <sstream>
@@ -38,7 +39,11 @@ public:
TRequestHeaderData Header;
std::unique_ptr<TApiMessage> Message;
+ TInstant StartTime;
+ TString Method;
+
TApiMessage::TPtr Response;
+ EKafkaErrors ResponseErrorCode;
};
static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
@@ -161,6 +166,24 @@ protected:
return sb;
}
+ void SendRequestMetrics(const TActorContext& ctx) {
+ if (Context) {
+ ctx.Send(MakeKafkaMetricsServiceID(),
+ new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, Request->Method, "", "api.kafka.request.count", "")));
+ ctx.Send(MakeKafkaMetricsServiceID(),
+ new TEvKafka::TEvUpdateCounter(Request->Size, BuildLabels(Context, Request->Method, "", "api.kafka.request.bytes", "")));
+ }
+ }
+
+ void SendResponseMetrics(const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) {
+ TDuration duration = TInstant::Now() - requestStartTime;
+ ctx.Send(MakeKafkaMetricsServiceID(),
+ new TEvKafka::TEvUpdateHistCounter(static_cast<i64>(duration.MilliSeconds()), 1, BuildLabels(Context, method, "", "api.kafka.response.duration_milliseconds", "")
+ ));
+ ctx.Send(MakeKafkaMetricsServiceID(),
+ new TEvKafka::TEvUpdateCounter(1, BuildLabels(Context, method, "", "api.kafka.response.count", TStringBuilder() << (i16)errorCode)));
+ }
+
void OnAccept() {
InactivityTimer.Reset();
TBase::Become(&TKafkaConnection::StateConnected);
@@ -220,13 +243,13 @@ protected:
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
<< ", Size=" << Request->Size);
- Msg::TPtr r = Request;
+ Request->Method = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey))->second;
- PendingRequestsQueue.push_back(r);
- PendingRequests[r->Header.CorrelationId] = r;
+ PendingRequestsQueue.push_back(Request);
+ PendingRequests[Request->Header.CorrelationId] = Request;
TApiMessage* message = Request->Message.get();
-
+ SendRequestMetrics(ctx);
switch (Request->Header.RequestApiKey) {
case PRODUCE:
HandleMessage(&Request->Header, dynamic_cast<TProduceRequestData*>(message), ctx);
@@ -260,12 +283,12 @@ protected:
void Handle(TEvKafka::TEvResponse::TPtr response, const TActorContext& ctx) {
auto r = response->Get();
- Reply(r->CorrelationId, r->Response, ctx);
+ Reply(r->CorrelationId, r->Response, r->ErrorCode, ctx);
}
void Handle(TEvKafka::TEvAuthResult::TPtr ev, const TActorContext& ctx) {
auto event = ev->Get();
- Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, ctx);
+ Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, event->ClientResponse->ErrorCode, ctx);
auto authStep = event->AuthStep;
if (authStep == EAuthSteps::FAILED) {
@@ -275,16 +298,19 @@ protected:
}
Context->UserToken = event->UserToken;
- Context->Database = event->Database;
+ Context->DatabasePath = event->DatabasePath;
Context->AuthenticationStep = authStep;
- Context->RlContext = {event->Coordinator, event->ResourcePath, event->Database, event->UserToken->GetSerializedToken()};
+ Context->RlContext = {event->Coordinator, event->ResourcePath, event->DatabasePath, event->UserToken->GetSerializedToken()};
+ Context->DatabaseId = event->DatabaseId;
+ Context->CloudId = event->CloudId;
+ Context->FolderId = event->FolderId;
KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID());
}
void Handle(TEvKafka::TEvHandshakeResult::TPtr ev, const TActorContext& ctx) {
auto event = ev->Get();
- Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, ctx);
+ Reply(event->ClientResponse->CorrelationId, event->ClientResponse->Response, event->ClientResponse->ErrorCode, ctx);
auto authStep = event->AuthStep;
if (authStep == EAuthSteps::FAILED) {
@@ -297,7 +323,7 @@ protected:
Context->AuthenticationStep = authStep;
}
- void Reply(const ui64 correlationId, TApiMessage::TPtr response, const TActorContext& ctx) {
+ void Reply(const ui64 correlationId, TApiMessage::TPtr response, EKafkaErrors errorCode, const TActorContext& ctx) {
auto it = PendingRequests.find(correlationId);
if (it == PendingRequests.end()) {
KAFKA_LOG_ERROR("Unexpected correlationId " << correlationId);
@@ -306,21 +332,22 @@ protected:
auto request = it->second;
request->Response = response;
+ request->ResponseErrorCode = errorCode;
request->Buffer.Clear();
- ProcessReplyQueue();
+ ProcessReplyQueue(ctx);
DoRead(ctx);
}
- void ProcessReplyQueue() {
+ void ProcessReplyQueue(const TActorContext& ctx) {
while(!PendingRequestsQueue.empty()) {
auto& request = PendingRequestsQueue.front();
if (request->Response.get() == nullptr) {
break;
}
- Reply(&request->Header, request->Response.get());
+ Reply(&request->Header, request->Response.get(), request->Method, request->StartTime, request->ResponseErrorCode, ctx);
InflightSize -= request->ExpectedSize;
@@ -329,7 +356,7 @@ protected:
}
}
- void Reply(const TRequestHeaderData* header, const TApiMessage* reply) {
+ void Reply(const TRequestHeaderData* header, const TApiMessage* reply, const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) {
TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion);
TKafkaVersion version = header->RequestApiVersion;
@@ -340,7 +367,7 @@ protected:
TBufferedWriter buffer(Socket.Get(), Context->Config.GetPacketSize());
TKafkaWritable writable(buffer);
-
+ SendResponseMetrics(method, requestStartTime, errorCode, ctx);
try {
writable << size;
responseHeader.Write(writable, headerVersion);
@@ -360,7 +387,6 @@ protected:
void DoRead(const TActorContext& ctx) {
KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step));
-
for (;;) {
while (Demand) {
ssize_t received = 0;
@@ -459,9 +485,8 @@ protected:
break;
case MESSAGE_PROCESS:
- KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
-
- TKafkaReadable readable(Request->Buffer);
+ Request->StartTime = TInstant::Now();
+ KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);TKafkaReadable readable(Request->Buffer);
try {
Request->Message = CreateRequest(Request->ApiKey);
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index f9577438dfb..48f03f403da 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -18,6 +18,8 @@ struct TEvKafka {
EvAuthResult,
EvHandshakeResult,
EvWakeup,
+ EvUpdateCounter,
+ EvUpdateHistCounter,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
EvEnd
@@ -39,27 +41,31 @@ struct TEvKafka {
};
struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> {
- TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response)
+ TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response, EKafkaErrors errorCode)
: CorrelationId(correlationId)
- , Response(std::move(response)) {
+ , Response(std::move(response))
+ , ErrorCode(errorCode) {
}
const ui64 CorrelationId;
const TApiMessage::TPtr Response;
+ const EKafkaErrors ErrorCode;
};
struct TEvAuthResult : public TEventLocal<TEvAuthResult, EvAuthResult> {
+
TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TString error = "")
: AuthStep(authStep)
, Error(error)
, ClientResponse(clientResponse) {
}
- TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database,
- TString folderId, TString serviceAccountId, TString databaseId, TString coordinator, TString resourcePath, TString error = "")
+ TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString databasePath, TString databaseId,
+ TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, TString error = "")
: AuthStep(authStep)
, UserToken(token)
- , Database(database)
+ , DatabasePath(databasePath)
+ , CloudId(cloudId)
, FolderId(folderId)
, ServiceAccountId(serviceAccountId)
, DatabaseId(databaseId)
@@ -71,7 +77,8 @@ struct TEvKafka {
EAuthSteps AuthStep;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
- TString Database;
+ TString DatabasePath;
+ TString CloudId;
TString FolderId;
TString ServiceAccountId;
TString DatabaseId;
@@ -97,6 +104,28 @@ struct TEvKafka {
std::shared_ptr<TEvKafka::TEvResponse> ClientResponse;
};
+ struct TEvUpdateCounter : public TEventLocal<TEvUpdateCounter, EvUpdateCounter> {
+ i64 Delta;
+ TVector<std::pair<TString, TString>> Labels;
+
+ TEvUpdateCounter(const i64 delta, const TVector<std::pair<TString, TString>> labels)
+ : Delta(delta)
+ , Labels(labels)
+ {}
+ };
+
+ struct TEvUpdateHistCounter : public TEventLocal<TEvUpdateHistCounter, EvUpdateHistCounter> {
+ i64 Value;
+ ui64 Count;
+ TVector<std::pair<TString, TString>> Labels;
+
+ TEvUpdateHistCounter(const i64 value, const ui64 count, const TVector<std::pair<TString, TString>> labels)
+ : Value(value)
+ , Count(count)
+ , Labels(labels)
+ {}
+ };
+
struct TEvWakeup : public TEventLocal<TEvWakeup, EvWakeup> {
};
};
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 68d4a0777ca..52cb51d18f1 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -6,6 +6,18 @@
namespace NKafka {
+const std::unordered_map<EApiKey, TString> EApiKeyNames = {
+ {EApiKey::HEADER, "HEADER"},
+ {EApiKey::PRODUCE, "PRODUCE"},
+ {EApiKey::FETCH, "FETCH"},
+ {EApiKey::METADATA, "METADATA"},
+ {EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"},
+ {EApiKey::API_VERSIONS, "API_VERSIONS"},
+ {EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"},
+ {EApiKey::SASL_AUTHENTICATE, "SASL_AUTHENTICATE"},
+};
+
+
std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) {
switch (apiKey) {
case PRODUCE:
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index 0d63f9fd9e7..6c795e4bf75 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -25,7 +25,7 @@ enum EApiKey {
SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER]
};
-
+extern const std::unordered_map<EApiKey, TString> EApiKeyNames;
diff --git a/ydb/core/kafka_proxy/kafka_metrics.cpp b/ydb/core/kafka_proxy/kafka_metrics.cpp
new file mode 100644
index 00000000000..b001d2e6c5c
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_metrics.cpp
@@ -0,0 +1,17 @@
+#include "kafka_events.h"
+#include "kafka_metrics.h"
+
+namespace NKafka {
+
+TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode) {
+ return {{"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId},
+ {"folder_id", context->FolderId}, {"database_id", context->DatabaseId},
+ {"topic", topic}, {"errorCode", errorCode}, {"name", name}};
+}
+
+TActorId MakeKafkaMetricsServiceID() {
+ static const char x[12] = "kafka_mtrcs";
+ return TActorId(0, TStringBuf(x, 12));
+}
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_metrics.h b/ydb/core/kafka_proxy/kafka_metrics.h
new file mode 100644
index 00000000000..a18797e0ba3
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_metrics.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "kafka_events.h"
+
+namespace NKafka {
+
+TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode);
+
+TActorId MakeKafkaMetricsServiceID();
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index 5696e1cdebd..832dfc03ed4 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -7,6 +7,7 @@ SRCS(
actors/kafka_produce_actor.cpp
actors/kafka_sasl_auth_actor.cpp
actors/kafka_sasl_handshake_actor.cpp
+ actors/kafka_metrics_actor.cpp
kafka_connection.cpp
kafka_connection.h
kafka_listener.h
@@ -19,6 +20,7 @@ SRCS(
kafka_messages_int.h
kafka_proxy.h
kafka_records.cpp
+ kafka_metrics.cpp
)
PEERDIR(
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 4650b63bc9b..6d98f1b4343 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -46,7 +46,9 @@
#include <ydb/core/security/ldap_auth_provider.h>
#include <ydb/core/base/user_registry.h>
#include <ydb/core/health_check/health_check.h>
+#include <ydb/core/kafka_proxy/actors/kafka_metrics_actor.h>
#include <ydb/core/kafka_proxy/kafka_listener.h>
+#include <ydb/core/kafka_proxy/kafka_metrics.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/proxy_service/kqp_proxy_service.h>
@@ -940,6 +942,10 @@ namespace Tests {
IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig.GetKafkaProxyConfig());
TActorId actorId = Runtime->Register(actor, nodeIdx);
Runtime->RegisterService(TActorId{}, actorId, nodeIdx);
+
+ IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")});
+ TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx);
+ Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx);
}
if (Settings->EnableYq) {