aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2024-12-16 15:16:13 +0300
committerGitHub <noreply@github.com>2024-12-16 15:16:13 +0300
commitbd3e3a57ab761fac861ef03a1a009d9c3c99a5ea (patch)
treed9ad8b426b04bfb227a15331a5949ae1550faf7b
parentb99f85da727900e683ddbc9ad1989d82b3dd9bda (diff)
downloadydb-bd3e3a57ab761fac861ef03a1a009d9c3c99a5ea.tar.gz
YMQ JSON API: send metering events (#12599)
-rw-r--r--ydb/core/http_proxy/http_req.cpp41
-rw-r--r--ydb/core/http_proxy/http_req.h2
-rw-r--r--ydb/core/http_proxy/ut/datastreams_fixture.h36
-rw-r--r--ydb/core/http_proxy/ut/ymq_ut.h15
-rw-r--r--ydb/core/ymq/http/http.cpp12
5 files changed, 99 insertions, 7 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 3ae5ba82f25..1d617ed1290 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -62,6 +62,7 @@
#include <ydb/library/folder_service/events.h>
#include <ydb/core/ymq/actor/auth_multi_factory.h>
+#include <ydb/core/ymq/actor/serviceid.h>
#include <ydb/library/http_proxy/error/error.h>
@@ -438,8 +439,8 @@ namespace NKikimr::NHttpProxy {
<< " CloudId: " << ev->Get()->CloudId
<< " UserSid: " << ev->Get()->Sid;
);
- FolderId = ev->Get()->FolderId;
- CloudId = ev->Get()->CloudId;
+ HttpContext.FolderId = FolderId = ev->Get()->FolderId;
+ HttpContext.CloudId = CloudId = ev->Get()->CloudId;
UserSid = ev->Get()->Sid;
SendGrpcRequestNoDriver(ctx);
} else {
@@ -473,7 +474,8 @@ namespace NKikimr::NHttpProxy {
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
}
CloudId = cloudIdAndResourceId.first;
- ResourceId = cloudIdAndResourceId.second;
+ HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second;
+ HttpContext.ResponseData.YmqIsFifo = queueUrl.EndsWith(".fifo");
}
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
@@ -508,6 +510,7 @@ namespace NKikimr::NHttpProxy {
SendGrpcRequestNoDriver(ctx);
} else {
auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>();
+ // TODO? action = NSQS::ActionFromString(Method);
NSQS::EAction action = NSQS::EAction::Unknown;
if (Method == "CreateQueue") {
action = NSQS::EAction::CreateQueue;
@@ -1248,6 +1251,38 @@ namespace NKikimr::NHttpProxy {
ResponseData.DumpBody(ContentType)
);
+ if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) {
+ // Send request attributes to the metering actor
+ auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>();
+
+ auto& requestAttributes = reportRequestAttributes->Data;
+
+ requestAttributes.HttpStatusCode = httpCode;
+ requestAttributes.IsFifo = ResponseData.YmqIsFifo;
+ requestAttributes.FolderId = FolderId;
+ requestAttributes.RequestSizeInBytes = Request->Size();
+ requestAttributes.ResponseSizeInBytes = response->Size();
+ requestAttributes.SourceAddress = SourceAddress;
+ requestAttributes.ResourceId = ResourceId;
+ requestAttributes.Action = NSQS::ActionFromString(MethodName);
+
+ LOG_SP_DEBUG_S(
+ ctx,
+ NKikimrServices::HTTP_PROXY,
+ TStringBuilder() << "Send metering event."
+ << " HttpStatusCode: " << requestAttributes.HttpStatusCode
+ << " IsFifo: " << requestAttributes.IsFifo
+ << " FolderId: " << requestAttributes.FolderId
+ << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
+ << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
+ << " SourceAddress: " << requestAttributes.SourceAddress
+ << " ResourceId: " << requestAttributes.ResourceId
+ << " Action: " << requestAttributes.Action
+ );
+
+ ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
+ }
+
ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
}
diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h
index e7dbe4d2e9e..7683f7634f1 100644
--- a/ydb/core/http_proxy/http_req.h
+++ b/ydb/core/http_proxy/http_req.h
@@ -58,6 +58,7 @@ struct THttpResponseData {
TString ErrorText{"OK"};
TString YmqStatusCode;
ui32 YmqHttpCode;
+ bool YmqIsFifo;
TString DumpBody(MimeTypes contentType);
};
@@ -83,6 +84,7 @@ struct THttpRequestContext {
TString FolderId; // not in context
TString CloudId; // not in context
TString StreamName; // not in context
+ TString ResourceId;
TString SourceAddress;
TString MethodName; // used once
TString ApiVersion; // used once
diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h
index 1bf4b3ff602..ed8ab1c05d2 100644
--- a/ydb/core/http_proxy/ut/datastreams_fixture.h
+++ b/ydb/core/http_proxy/ut/datastreams_fixture.h
@@ -5,6 +5,7 @@
#include <ydb/library/grpc/server/actors/logger.h>
#include <library/cpp/http/misc/parsed_request.h>
#include <library/cpp/json/json_writer.h>
+#include <library/cpp/logger/global/global.h>
#include <library/cpp/resource/resource.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -69,7 +70,6 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) {
}
class THttpProxyTestMock : public NUnitTest::TBaseFixture {
- friend class THttpProxyTestMockForSQS;
public:
THttpProxyTestMock() = default;
~THttpProxyTestMock() = default;
@@ -82,10 +82,10 @@ public:
InitAll();
}
- void InitAll(bool yandexCloudMode = true) {
+ void InitAll(bool yandexCloudMode = true, bool enableMetering = false) {
AccessServicePort = PortManager.GetPort(8443);
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
- InitKikimr(yandexCloudMode);
+ InitKikimr(yandexCloudMode, enableMetering);
InitAccessServiceService();
InitHttpServer(yandexCloudMode);
}
@@ -470,7 +470,7 @@ private:
return resultSet;
}
- void InitKikimr(bool yandexCloudMode) {
+ void InitKikimr(bool yandexCloudMode, bool enableMetering) {
AuthFactory = std::make_shared<TIamAuthFactory>();
NKikimrConfig::TAppConfig appConfig;
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
@@ -484,6 +484,21 @@ private:
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
+ if (enableMetering) {
+ auto& sqsConfig = *appConfig.MutableSqsConfig();
+
+ sqsConfig.SetMeteringFlushingIntervalMs(100);
+ sqsConfig.SetMeteringLogFilePath("sqs_metering.log");
+ TFsPath(sqsConfig.GetMeteringLogFilePath()).DeleteIfExists();
+
+ sqsConfig.AddMeteringCloudNetCidr("5.45.196.0/24");
+ sqsConfig.AddMeteringCloudNetCidr("2a0d:d6c0::/29");
+ sqsConfig.AddMeteringYandexNetCidr("127.0.0.0/8");
+ sqsConfig.AddMeteringYandexNetCidr("5.45.217.0/24");
+
+ DoInitGlobalLog(CreateOwningThreadedLogBackend(sqsConfig.GetMeteringLogFilePath(), 0));
+ }
+
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
limit->SetMinPeriodSeconds(0);
limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds());
@@ -519,6 +534,13 @@ private:
ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG);
ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
+ if (enableMetering) {
+ ActorRuntime->RegisterService(
+ NSQS::MakeSqsMeteringServiceID(),
+ ActorRuntime->Register(NSQS::CreateSqsMeteringService())
+ );
+ }
+
NYdb::TClient client(*(KikimrServer->ServerSettings));
UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"},
@@ -924,3 +946,9 @@ class THttpProxyTestMockForSQS : public THttpProxyTestMock {
InitAll(false);
}
};
+
+class THttpProxyTestMockWithMetering : public THttpProxyTestMock {
+ void SetUp(NUnitTest::TTestContext&) override {
+ InitAll(true, true);
+ }
+};
diff --git a/ydb/core/http_proxy/ut/ymq_ut.h b/ydb/core/http_proxy/ut/ymq_ut.h
index b51bbcac49d..4cf9c03f60d 100644
--- a/ydb/core/http_proxy/ut/ymq_ut.h
+++ b/ydb/core/http_proxy/ut/ymq_ut.h
@@ -181,6 +181,21 @@ Y_UNIT_TEST_SUITE(TestYmqHttpProxy) {
});
}
+ Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) {
+ auto json = CreateQueue({{"QueueName", "ExampleQueueName"}});
+ auto queueUrl = GetByPath<TString>(json, "QueueUrl");
+
+ json = SendMessage({
+ {"QueueUrl", queueUrl},
+ {"MessageBody", "MessageBody-0"}
+ });
+
+ // TODO:
+ // Sleep(TDuration::Seconds(500));
+ // TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath());
+ // CheckBillingRecord(records, expectedRecords);
+ }
+
Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
// We had a bug that crashed the server if QueueUrl was empty in a request.
SendMessage({
diff --git a/ydb/core/ymq/http/http.cpp b/ydb/core/ymq/http/http.cpp
index 7cb47162509..b98b124ec70 100644
--- a/ydb/core/ymq/http/http.cpp
+++ b/ydb/core/ymq/http/http.cpp
@@ -164,6 +164,18 @@ void THttpRequest::WriteResponse(const TReplyParams& replyParams, const TSqsHttp
requestAttributes.ResourceId = response.ResourceId;
requestAttributes.Action = Action_;
+ RLOG_SQS_BASE_DEBUG(*Parent_->ActorSystem_,
+ TStringBuilder() << "Send metering event."
+ << " HttpStatusCode: " << requestAttributes.HttpStatusCode
+ << " IsFifo: " << requestAttributes.IsFifo
+ << " FolderId: " << requestAttributes.FolderId
+ << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
+ << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
+ << " SourceAddress: " << requestAttributes.SourceAddress
+ << " ResourceId: " << requestAttributes.ResourceId
+ << " Action: " << requestAttributes.Action
+ );
+
Parent_->ActorSystem_->Send(MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
}