diff options
author | qyryq <qyryq@ydb.tech> | 2024-12-16 15:16:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-16 15:16:13 +0300 |
commit | bd3e3a57ab761fac861ef03a1a009d9c3c99a5ea (patch) | |
tree | d9ad8b426b04bfb227a15331a5949ae1550faf7b | |
parent | b99f85da727900e683ddbc9ad1989d82b3dd9bda (diff) | |
download | ydb-bd3e3a57ab761fac861ef03a1a009d9c3c99a5ea.tar.gz |
YMQ JSON API: send metering events (#12599)
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 41 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.h | 2 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/datastreams_fixture.h | 36 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/ymq_ut.h | 15 | ||||
-rw-r--r-- | ydb/core/ymq/http/http.cpp | 12 |
5 files changed, 99 insertions, 7 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 3ae5ba82f25..1d617ed1290 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -62,6 +62,7 @@ #include <ydb/library/folder_service/events.h> #include <ydb/core/ymq/actor/auth_multi_factory.h> +#include <ydb/core/ymq/actor/serviceid.h> #include <ydb/library/http_proxy/error/error.h> @@ -438,8 +439,8 @@ namespace NKikimr::NHttpProxy { << " CloudId: " << ev->Get()->CloudId << " UserSid: " << ev->Get()->Sid; ); - FolderId = ev->Get()->FolderId; - CloudId = ev->Get()->CloudId; + HttpContext.FolderId = FolderId = ev->Get()->FolderId; + HttpContext.CloudId = CloudId = ev->Get()->CloudId; UserSid = ev->Get()->Sid; SendGrpcRequestNoDriver(ctx); } else { @@ -473,7 +474,8 @@ namespace NKikimr::NHttpProxy { return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url"); } CloudId = cloudIdAndResourceId.first; - ResourceId = cloudIdAndResourceId.second; + HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second; + HttpContext.ResponseData.YmqIsFifo = queueUrl.EndsWith(".fifo"); } } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; @@ -508,6 +510,7 @@ namespace NKikimr::NHttpProxy { SendGrpcRequestNoDriver(ctx); } else { auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>(); + // TODO? action = NSQS::ActionFromString(Method); NSQS::EAction action = NSQS::EAction::Unknown; if (Method == "CreateQueue") { action = NSQS::EAction::CreateQueue; @@ -1248,6 +1251,38 @@ namespace NKikimr::NHttpProxy { ResponseData.DumpBody(ContentType) ); + if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) { + // Send request attributes to the metering actor + auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>(); + + auto& requestAttributes = reportRequestAttributes->Data; + + requestAttributes.HttpStatusCode = httpCode; + requestAttributes.IsFifo = ResponseData.YmqIsFifo; + requestAttributes.FolderId = FolderId; + requestAttributes.RequestSizeInBytes = Request->Size(); + requestAttributes.ResponseSizeInBytes = response->Size(); + requestAttributes.SourceAddress = SourceAddress; + requestAttributes.ResourceId = ResourceId; + requestAttributes.Action = NSQS::ActionFromString(MethodName); + + LOG_SP_DEBUG_S( + ctx, + NKikimrServices::HTTP_PROXY, + TStringBuilder() << "Send metering event." + << " HttpStatusCode: " << requestAttributes.HttpStatusCode + << " IsFifo: " << requestAttributes.IsFifo + << " FolderId: " << requestAttributes.FolderId + << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes + << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes + << " SourceAddress: " << requestAttributes.SourceAddress + << " ResourceId: " << requestAttributes.ResourceId + << " Action: " << requestAttributes.Action + ); + + ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); + } + ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); } diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index e7dbe4d2e9e..7683f7634f1 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -58,6 +58,7 @@ struct THttpResponseData { TString ErrorText{"OK"}; TString YmqStatusCode; ui32 YmqHttpCode; + bool YmqIsFifo; TString DumpBody(MimeTypes contentType); }; @@ -83,6 +84,7 @@ struct THttpRequestContext { TString FolderId; // not in context TString CloudId; // not in context TString StreamName; // not in context + TString ResourceId; TString SourceAddress; TString MethodName; // used once TString ApiVersion; // used once diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index 1bf4b3ff602..ed8ab1c05d2 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -5,6 +5,7 @@ #include <ydb/library/grpc/server/actors/logger.h> #include <library/cpp/http/misc/parsed_request.h> #include <library/cpp/json/json_writer.h> +#include <library/cpp/logger/global/global.h> #include <library/cpp/resource/resource.h> #include <library/cpp/testing/unittest/registar.h> @@ -69,7 +70,6 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) { } class THttpProxyTestMock : public NUnitTest::TBaseFixture { - friend class THttpProxyTestMockForSQS; public: THttpProxyTestMock() = default; ~THttpProxyTestMock() = default; @@ -82,10 +82,10 @@ public: InitAll(); } - void InitAll(bool yandexCloudMode = true) { + void InitAll(bool yandexCloudMode = true, bool enableMetering = false) { AccessServicePort = PortManager.GetPort(8443); AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort); - InitKikimr(yandexCloudMode); + InitKikimr(yandexCloudMode, enableMetering); InitAccessServiceService(); InitHttpServer(yandexCloudMode); } @@ -470,7 +470,7 @@ private: return resultSet; } - void InitKikimr(bool yandexCloudMode) { + void InitKikimr(bool yandexCloudMode, bool enableMetering) { AuthFactory = std::make_shared<TIamAuthFactory>(); NKikimrConfig::TAppConfig appConfig; appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true); @@ -484,6 +484,21 @@ private: appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode); appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true); + if (enableMetering) { + auto& sqsConfig = *appConfig.MutableSqsConfig(); + + sqsConfig.SetMeteringFlushingIntervalMs(100); + sqsConfig.SetMeteringLogFilePath("sqs_metering.log"); + TFsPath(sqsConfig.GetMeteringLogFilePath()).DeleteIfExists(); + + sqsConfig.AddMeteringCloudNetCidr("5.45.196.0/24"); + sqsConfig.AddMeteringCloudNetCidr("2a0d:d6c0::/29"); + sqsConfig.AddMeteringYandexNetCidr("127.0.0.0/8"); + sqsConfig.AddMeteringYandexNetCidr("5.45.217.0/24"); + + DoInitGlobalLog(CreateOwningThreadedLogBackend(sqsConfig.GetMeteringLogFilePath(), 0)); + } + auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); limit->SetMinPeriodSeconds(0); limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds()); @@ -519,6 +534,13 @@ private: ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG); ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + if (enableMetering) { + ActorRuntime->RegisterService( + NSQS::MakeSqsMeteringServiceID(), + ActorRuntime->Register(NSQS::CreateSqsMeteringService()) + ); + } + NYdb::TClient client(*(KikimrServer->ServerSettings)); UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"}, @@ -924,3 +946,9 @@ class THttpProxyTestMockForSQS : public THttpProxyTestMock { InitAll(false); } }; + +class THttpProxyTestMockWithMetering : public THttpProxyTestMock { + void SetUp(NUnitTest::TTestContext&) override { + InitAll(true, true); + } +}; diff --git a/ydb/core/http_proxy/ut/ymq_ut.h b/ydb/core/http_proxy/ut/ymq_ut.h index b51bbcac49d..4cf9c03f60d 100644 --- a/ydb/core/http_proxy/ut/ymq_ut.h +++ b/ydb/core/http_proxy/ut/ymq_ut.h @@ -181,6 +181,21 @@ Y_UNIT_TEST_SUITE(TestYmqHttpProxy) { }); } + Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) { + auto json = CreateQueue({{"QueueName", "ExampleQueueName"}}); + auto queueUrl = GetByPath<TString>(json, "QueueUrl"); + + json = SendMessage({ + {"QueueUrl", queueUrl}, + {"MessageBody", "MessageBody-0"} + }); + + // TODO: + // Sleep(TDuration::Seconds(500)); + // TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath()); + // CheckBillingRecord(records, expectedRecords); + } + Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) { // We had a bug that crashed the server if QueueUrl was empty in a request. SendMessage({ diff --git a/ydb/core/ymq/http/http.cpp b/ydb/core/ymq/http/http.cpp index 7cb47162509..b98b124ec70 100644 --- a/ydb/core/ymq/http/http.cpp +++ b/ydb/core/ymq/http/http.cpp @@ -164,6 +164,18 @@ void THttpRequest::WriteResponse(const TReplyParams& replyParams, const TSqsHttp requestAttributes.ResourceId = response.ResourceId; requestAttributes.Action = Action_; + RLOG_SQS_BASE_DEBUG(*Parent_->ActorSystem_, + TStringBuilder() << "Send metering event." + << " HttpStatusCode: " << requestAttributes.HttpStatusCode + << " IsFifo: " << requestAttributes.IsFifo + << " FolderId: " << requestAttributes.FolderId + << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes + << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes + << " SourceAddress: " << requestAttributes.SourceAddress + << " ResourceId: " << requestAttributes.ResourceId + << " Action: " << requestAttributes.Action + ); + Parent_->ActorSystem_->Send(MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); } |