summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <[email protected]>2026-05-19 17:11:47 +0500
committerGitHub <[email protected]>2026-05-19 17:11:47 +0500
commit6cfd6eccef2143f752356af9a2b00a5cc54fee89 (patch)
tree26f67db02e0912b63180e0cd1330643a83bb3dfd
parent774093255c06a377bcc214a7379839afec6e74a6 (diff)
Refactoring: move serialization and deserialization from http_req.cpp (#40332)
-rw-r--r--ydb/core/http_proxy/auth_actors.cpp2
-rw-r--r--ydb/core/http_proxy/auth_actors.h17
-rw-r--r--ydb/core/http_proxy/auth_factory.cpp17
-rw-r--r--ydb/core/http_proxy/auth_factory.h11
-rw-r--r--ydb/core/http_proxy/controller_base.cpp50
-rw-r--r--ydb/core/http_proxy/controller_base.h27
-rw-r--r--ydb/core/http_proxy/controller_registry.cpp62
-rw-r--r--ydb/core/http_proxy/datastreams.cpp119
-rw-r--r--ydb/core/http_proxy/datastreams.h2
-rw-r--r--ydb/core/http_proxy/datastreams_serialization.cpp48
-rw-r--r--ydb/core/http_proxy/datastreams_serialization.h51
-rw-r--r--ydb/core/http_proxy/discovery_actor.cpp2
-rw-r--r--ydb/core/http_proxy/exceptions_mapping.cpp1
-rw-r--r--ydb/core/http_proxy/http_req.cpp341
-rw-r--r--ydb/core/http_proxy/http_req.h49
-rw-r--r--ydb/core/http_proxy/http_service.cpp19
-rw-r--r--ydb/core/http_proxy/serialization.cpp91
-rw-r--r--ydb/core/http_proxy/serialization.h18
-rw-r--r--ydb/core/http_proxy/sqs.cpp170
-rw-r--r--ydb/core/http_proxy/sqs.h2
-rw-r--r--ydb/core/http_proxy/sqs_serialization.cpp36
-rw-r--r--ydb/core/http_proxy/sqs_serialization.h45
-rw-r--r--ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp2
-rw-r--r--ydb/core/http_proxy/ut/kinesis_ut.cpp12
-rw-r--r--ydb/core/http_proxy/utils.cpp3
-rw-r--r--ydb/core/http_proxy/utils.h8
-rw-r--r--ydb/core/http_proxy/ya.make13
-rw-r--r--ydb/core/http_proxy/ymq.cpp205
-rw-r--r--ydb/core/http_proxy/ymq.h2
-rw-r--r--ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp1
30 files changed, 880 insertions, 546 deletions
diff --git a/ydb/core/http_proxy/auth_actors.cpp b/ydb/core/http_proxy/auth_actors.cpp
index 7a7d65729a4..0eb101ab26a 100644
--- a/ydb/core/http_proxy/auth_actors.cpp
+++ b/ydb/core/http_proxy/auth_actors.cpp
@@ -1,5 +1,7 @@
#include "auth_actors.h"
+#include "http_req.h"
+
#include <ydb/core/base/path.h>
#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/protos/config.pb.h>
diff --git a/ydb/core/http_proxy/auth_actors.h b/ydb/core/http_proxy/auth_actors.h
index 58b156e2118..d8f89e15c8c 100644
--- a/ydb/core/http_proxy/auth_actors.h
+++ b/ydb/core/http_proxy/auth_actors.h
@@ -1,13 +1,20 @@
#pragma once
-#include "http_req.h"
-
#include <ydb/core/protos/serverless_proxy_config.pb.h>
#include <ydb/library/actors/core/actorsystem_fwd.h>
+#include <util/generic/ptr.h>
+
+namespace NKikimr::NSQS {
+ class TAwsRequestSignV4;
+}
+
namespace NKikimr::NHttpProxy {
- NActors::IActor* CreateAccessServiceActor(const NKikimrConfig::TServerlessProxyConfig& config);
- NActors::IActor* CreateIamTokenServiceActor(const NKikimrConfig::TServerlessProxyConfig& config);
- NActors::IActor* CreateIamAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature);
+struct THttpRequestContext;
+
+NActors::IActor* CreateAccessServiceActor(const NKikimrConfig::TServerlessProxyConfig& config);
+NActors::IActor* CreateIamTokenServiceActor(const NKikimrConfig::TServerlessProxyConfig& config);
+NActors::IActor* CreateIamAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature);
+
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/auth_factory.cpp b/ydb/core/http_proxy/auth_factory.cpp
index 641581b99f5..d22f5b7cd30 100644
--- a/ydb/core/http_proxy/auth_factory.cpp
+++ b/ydb/core/http_proxy/auth_factory.cpp
@@ -1,17 +1,16 @@
-#include "auth_actors.h"
#include "auth_factory.h"
+#include "auth_actors.h"
+#include "http_service.h"
+#include "metrics_actor.h"
+
#include <ydb/core/base/feature_flags.h>
-#include <ydb/core/http_proxy/http_service.h>
-#include <ydb/core/http_proxy/metrics_actor.h>
-#include <ydb/core/protos/config.pb.h>
#include <ydb/library/actors/http/http_proxy.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/iam_private/iam.h>
#include <ydb/public/sdk/cpp/src/client/types/core_facility/simple_core_facility.h>
namespace NKikimr::NHttpProxy {
-
void TIamAuthFactory::InitTenantDiscovery(
NActors::TActorSystemSetup::TLocalServices&,
const TAppData&,
@@ -23,7 +22,7 @@ void TIamAuthFactory::Initialize(
NActors::TActorSystemSetup::TLocalServices& localServices,
const TAppData& appData,
const THttpConfig& httpConfig,
- const NKikimrConfig::TGRpcConfig& grpcConfig)
+ const NKikimrConfig::TGRpcConfig& grpcConfig)
{
if (!httpConfig.GetEnabled()) {
return;
@@ -43,7 +42,7 @@ void TIamAuthFactory::Initialize(
if (httpConfig.GetYandexCloudMode() && httpConfig.GetYandexCloudServiceRegion().empty()) {
Cout << "HttpProxy: YandexCloudServiceRegion must not be empty" << Endl;
}
- IActor* actor = NKikimr::NHttpProxy::CreateAccessServiceActor(config);
+ NActors::IActor* actor = NKikimr::NHttpProxy::CreateAccessServiceActor(config);
localServices.push_back(std::pair<TActorId, TActorSetupCmd>(
NKikimr::NHttpProxy::MakeAccessServiceID(),
TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId)));
@@ -56,7 +55,7 @@ void TIamAuthFactory::Initialize(
const NYdb::TCredentialsProviderFactoryPtr credentialsProviderFactory = jwtFilename.empty()
? NYdb::CreateInsecureCredentialsProviderFactory()
: NYdb::CreateIamJwtFileCredentialsProviderFactoryPrivate(
- {{.Endpoint = iamExternalEndpoint}, jwtFilename} );
+ {{.Endpoint = iamExternalEndpoint}, jwtFilename});
const std::shared_ptr<NYdb::ICoreFacility> coreFacility = jwtFilename.empty()
? nullptr
@@ -94,7 +93,7 @@ void TIamAuthFactory::Initialize(
TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId)));
}
-NActors::IActor* TIamAuthFactory::CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const
+NActors::IActor* TIamAuthFactory::CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const
{
return CreateIamAuthActor(sender, context, std::move(signature));
}
diff --git a/ydb/core/http_proxy/auth_factory.h b/ydb/core/http_proxy/auth_factory.h
index c8acb2cc43e..dfc139a5099 100644
--- a/ydb/core/http_proxy/auth_factory.h
+++ b/ydb/core/http_proxy/auth_factory.h
@@ -1,12 +1,15 @@
#pragma once
-#include "http_req.h"
-
#include <ydb/core/base/appdata.h>
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/http_proxy/authorization/signature.h>
+#include <util/generic/ptr.h>
+
namespace NKikimr::NHttpProxy {
+struct THttpRequestContext;
class IAuthFactory {
public:
@@ -18,7 +21,7 @@ public:
const THttpConfig& config,
const NKikimrConfig::TGRpcConfig& grpcConfig) = 0;
- virtual NActors::IActor* CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const = 0;
+ virtual NActors::IActor* CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const = 0;
virtual ~IAuthFactory() = default;
};
@@ -34,7 +37,7 @@ public:
const THttpConfig& config,
const NKikimrConfig::TGRpcConfig& grpcConfig) final;
- NActors::IActor* CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const final;
+ NActors::IActor* CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const final;
virtual void InitTenantDiscovery(NActors::TActorSystemSetup::TLocalServices&,
const TAppData& appData,
diff --git a/ydb/core/http_proxy/controller_base.cpp b/ydb/core/http_proxy/controller_base.cpp
new file mode 100644
index 00000000000..7a9daa3d66d
--- /dev/null
+++ b/ydb/core/http_proxy/controller_base.cpp
@@ -0,0 +1,50 @@
+#include "controller_base.h"
+
+namespace NKikimr::NHttpProxy {
+
+ bool TBaseHttpController::Execute(
+ THttpRequestContext&& context,
+ THolder<NKikimr::NSQS::TAwsRequestSignV4> signature
+ ) const {
+ const auto& ctx = TlsActivationContext->AsActorContext();
+
+ const auto name = context.MethodName;
+ const auto contentType = context.ContentType;
+ const auto apiVersion = context.ApiVersion;
+
+ auto proc = GetProcessor(name, context);
+ if (proc.has_value()) {
+ proc.value()->Execute(std::move(context), std::move(signature), ctx);
+ return true;
+ } else {
+ switch (proc.error()) {
+ case IHttpController::EError::MethodNotFound:
+ context.DoReply(MakeError(contentType, NYdb::EStatus::UNSUPPORTED,
+ TStringBuilder() << "Unknown method name " << name.Quote(), static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION)));
+ return false;
+ case IHttpController::EError::ServiceDisabled:
+ context.DoReply(MakeError(contentType, NYdb::EStatus::BAD_REQUEST,
+ TStringBuilder() << apiVersion << " is disabled", static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND)));
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ std::expected<IHttpRequestProcessor*, IHttpController::EError> TBaseHttpController::GetProcessor(
+ const TString& name,
+ const THttpRequestContext& context
+ ) const {
+ if (!IsEnabled(context.ServiceConfig.GetHttpConfig())) {
+ return std::unexpected(IHttpController::EError::ServiceDisabled);
+ }
+
+ if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) {
+ return proc->second.get();
+ }
+
+ return std::unexpected(IHttpController::EError::MethodNotFound);
+ }
+
+} // namespace NKikimr::NHttpProxy \ No newline at end of file
diff --git a/ydb/core/http_proxy/controller_base.h b/ydb/core/http_proxy/controller_base.h
new file mode 100644
index 00000000000..54bc80510a8
--- /dev/null
+++ b/ydb/core/http_proxy/controller_base.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "http_req.h"
+
+namespace NKikimr::NHttpProxy {
+
+ class TBaseHttpController: public IHttpController {
+ public:
+ bool Execute(
+ THttpRequestContext&& context,
+ THolder<NKikimr::NSQS::TAwsRequestSignV4> signature
+ ) const override;
+
+ virtual bool IsEnabled(const NKikimrConfig::THttpProxyConfig&) const = 0;
+
+ protected:
+ std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor(
+ const TString& name,
+ const THttpRequestContext& context
+ ) const;
+
+ protected:
+ absl::flat_hash_map<TString, std::unique_ptr<IHttpRequestProcessor>> Name2Processor;
+
+ };
+
+} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/controller_registry.cpp b/ydb/core/http_proxy/controller_registry.cpp
new file mode 100644
index 00000000000..901c7172f06
--- /dev/null
+++ b/ydb/core/http_proxy/controller_registry.cpp
@@ -0,0 +1,62 @@
+#include "datastreams.h"
+#include "http_req.h"
+#include "sqs.h"
+#include "ymq.h"
+
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/core/protos/serverless_proxy_config.pb.h>
+
+#include <util/generic/algorithm.h>
+
+namespace NKikimr::NHttpProxy {
+
+ namespace {
+
+ class TSqsControllerProxy: public IHttpController {
+ public:
+ THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override {
+ return GetSqsHttpController()->MakeError(contentType, Status, message, issueCode);
+ }
+
+ bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const override {
+ return GetController(config)->IsPossible(apiVersion, config);
+ }
+
+ bool Execute(
+ THttpRequestContext&& context,
+ THolder<NKikimr::NSQS::TAwsRequestSignV4> signature
+ ) const override {
+ return GetController(context.ServiceConfig)->Execute(std::move(context), std::move(signature));
+ }
+
+ private:
+ const IHttpController* GetController(const NKikimrConfig::TServerlessProxyConfig& config) const {
+ return config.GetHttpConfig().GetYmqEnabled() ? GetYmqHttpController() : GetSqsHttpController();
+ }
+ };
+
+ TSqsControllerProxy SqsControllerProxyInstance;
+
+ const std::array<const IHttpController*, 2> Controllers = {
+ &SqsControllerProxyInstance,
+ GetDataStreamsHttpController()
+ };
+
+ THttpControllerRegistry Instance;
+
+ } // namespace
+
+ const THttpControllerRegistry& GetHttpControllerRegistry() {
+ return Instance;
+ }
+
+ const IHttpController* THttpControllerRegistry::GetController(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const {
+ for (const auto& controller : Controllers) {
+ if (controller->IsPossible(apiVersion, config)) {
+ return controller;
+ }
+ }
+ return nullptr;
+ }
+
+} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/datastreams.cpp b/ydb/core/http_proxy/datastreams.cpp
index d0562858e77..a96872a3a3b 100644
--- a/ydb/core/http_proxy/datastreams.cpp
+++ b/ydb/core/http_proxy/datastreams.cpp
@@ -1,8 +1,11 @@
+#include "datastreams.h"
+
#include "auth_factory.h"
+#include "controller_base.h"
#include "custom_metrics.h"
+#include "datastreams_serialization.h"
#include "exceptions_mapping.h"
#include "http_req.h"
-#include "json_proto_conversion.h"
#include "utils.h"
#include <ydb/core/base/appdata.h>
@@ -86,9 +89,10 @@ namespace NKikimr::NHttpProxy {
template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv>
class THttpRequestProcessor : public TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>{
- using TProcessorBase = TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>;
+ using TProcessorBase = TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>;
public:
- THttpRequestProcessor(TString method, TProtoCall protoCall) : TProcessorBase(method, protoCall)
+ THttpRequestProcessor(TString method, TProtoCall protoCall)
+ : TProcessorBase(method, protoCall)
{
}
@@ -276,6 +280,8 @@ namespace NKikimr::NHttpProxy {
}
void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) {
+ auto exception = MapToException(status, Method, issueCode);
+
/* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
@@ -284,7 +290,7 @@ namespace NKikimr::NHttpProxy {
{"folder", HttpContext.FolderId},
{"database", HttpContext.DatabaseId},
{"stream", HttpContext.StreamName},
- {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
+ {"code", TStringBuilder() << (int)exception.second},
{"name", "api.http.errors_per_second"}}
});
@@ -297,37 +303,42 @@ namespace NKikimr::NHttpProxy {
{"folder_id", HttpContext.FolderId},
{"database_id", HttpContext.DatabaseId},
{"topic", HttpContext.StreamName},
- {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
+ {"code", TStringBuilder() << (int)exception.second},
{"name", "api.http.data_streams.response.count"}}
});
- HttpContext.ResponseData.Status = status;
- HttpContext.ResponseData.ErrorText = errorText;
- ReplyToHttpContext(ctx, issueCode);
+ ReplyToHttpContext({
+ .HttpCode = exception.second,
+ .ContentType = HttpContext.ContentType,
+ .Message = exception.first,
+ .Body = NDataStreams::Serialize(HttpContext.ContentType, {
+ .Exception = exception,
+ .ErrorText = errorText,
+ })
+ }, status, issueCode, errorText);
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
TBase::Die(ctx);
}
- void ReplyToHttpContext(const TActorContext& ctx, std::optional<size_t> issueCode = std::nullopt) {
+ void ReplyToHttpContext(THttpResponseData&& data, NYdb::EStatus status, std::optional<size_t> issueCode = std::nullopt, TStringBuf errorText = "OK") {
+ const TActorContext& ctx = TlsActivationContext->AsActorContext();
+
ReportLatencyCounters(ctx);
- LogHttpRequestResponse(ctx, issueCode);
+ LogHttpRequestResponse(ctx, status, issueCode, errorText);
- if (issueCode.has_value()) {
- HttpContext.DoReply(ctx, issueCode.value());
- } else {
- HttpContext.DoReply(ctx);
- }
+ HttpContext.DoReply(std::move(data));
}
- void LogHttpRequestResponse(const TActorContext& ctx, const std::optional<size_t> issueCode) {
- const int httpCode = issueCode ? MapToException(HttpContext.ResponseData.Status, Method, *issueCode).second : 200;
+ void LogHttpRequestResponse(const TActorContext& ctx, NYdb::EStatus status, const std::optional<size_t> issueCode, TStringBuf errorText) {
+ const int httpCode = issueCode ? MapToException(status, Method, *issueCode).second : 200;
const bool isServerError = IsServerError(httpCode);
auto priority = isServerError ? NActors::NLog::PRI_WARN : NActors::NLog::PRI_INFO;
LOG_LOG_S_SAMPLED_BY(ctx, priority, NKikimrServices::HTTP_PROXY,
NSqsTopic::SampleIdFromRequestId(HttpContext.RequestId),
- "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "Kinesis", HttpContext.StreamName, Method, {}, httpCode, HttpContext.ResponseData.ErrorText));
+ "Request [" << HttpContext.RequestId << "] " <<
+ LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "Kinesis", HttpContext.StreamName, Method, {}, httpCode, errorText));
}
void ReportInputCounters(const TActorContext& ctx) {
@@ -388,8 +399,6 @@ namespace NKikimr::NHttpProxy {
void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev,
const TActorContext& ctx) {
if (ev->Get()->Status->IsSuccess()) {
- ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body,
- HttpContext.ContentType == MIME_CBOR);
FillOutputCustomMetrics<TProtoResult>(
*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
/* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
@@ -408,7 +417,12 @@ namespace NKikimr::NHttpProxy {
{"code", "200"},
{"name", "api.http.data_streams.response.count"}}
});
- ReplyToHttpContext(ctx);
+ ReplyToHttpContext({
+ .HttpCode = 200,
+ .ContentType = HttpContext.ContentType,
+ .Message = "",
+ .Body = NDataStreams::Serialize(HttpContext.ContentType, *ev->Get()->Message)
+ }, NYdb::EStatus::SUCCESS);
} else {
auto retryClass =
NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus());
@@ -449,7 +463,7 @@ namespace NKikimr::NHttpProxy {
void Bootstrap(const TActorContext& ctx) {
StartTime = ctx.Now();
try {
- HttpContext.RequestBodyToProto(&Request);
+ NDataStreams::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body);
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
if (e.ErrorClass.ErrorCode == "MissingParameter")
@@ -513,17 +527,18 @@ namespace NKikimr::NHttpProxy {
};
- class TController : public IHttpController {
+ class TController : public TBaseHttpController {
public:
TController() {
- #define DECLARE_DATASTREAMS_PROCESSOR(name) Name2Processor[#name] = MakeHolder<THttpRequestProcessor<\
- DataStreamsService, \
- name##Request, \
- name##Response, \
- name##Result, \
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), \
- NKikimr::NGRpcService::TEvDataStreams##name##Request>> \
- (#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name);
+ #define DECLARE_DATASTREAMS_PROCESSOR(name) Name2Processor[#name] = \
+ std::make_unique<THttpRequestProcessor< \
+ DataStreamsService, \
+ name##Request, \
+ name##Response, \
+ name##Result, \
+ decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), \
+ NKikimr::NGRpcService::TEvDataStreams##name##Request \
+ >>(#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name) \
DECLARE_DATASTREAMS_PROCESSOR(PutRecords);
DECLARE_DATASTREAMS_PROCESSOR(CreateStream);
@@ -556,35 +571,37 @@ namespace NKikimr::NHttpProxy {
DECLARE_DATASTREAMS_PROCESSOR(StopStreamEncryption);
#undef DECLARE_DATASTREAMS_PROCESSOR
-
}
- std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor(
- const TString& name,
- const THttpRequestContext& context
- ) const override {
- if (context.ApiVersion != "kinesisApi") {
- return std::unexpected(IHttpController::EError::NotMyProtocol);
- }
-
- if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) {
- return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get());
- }
+ THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override {
+ const auto exception = MapToException(Status, "", issueCode);
+ return {
+ .HttpCode = exception.second,
+ .ContentType = contentType,
+ .Message = exception.first,
+ .Body = NDataStreams::Serialize(contentType, NDataStreams::TErrorResponse{
+ .Exception = exception,
+ .ErrorText = TString(message),
+ })
+ };
+ }
- return std::unexpected(IHttpController::EError::MethodNotFound);
+ bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override {
+ return config.GetDataStreamsEnabled();
}
- private:
- THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor;
+ bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override {
+ return apiVersion == "kinesisApi";
+ }
};
+ TController ControllerInstance;
+
} // namespace
- std::shared_ptr<const IHttpController> CreateDataStreamsHttpController(const NKikimrConfig::TServerlessProxyConfig& config) {
- if (config.GetHttpConfig().GetDataStreamsEnabled()) {
- return std::make_shared<TController>();
- }
- return {};
+ const IHttpController* GetDataStreamsHttpController() {
+ return &ControllerInstance;
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/datastreams.h b/ydb/core/http_proxy/datastreams.h
index 5b5652dcfbb..b7f14455929 100644
--- a/ydb/core/http_proxy/datastreams.h
+++ b/ydb/core/http_proxy/datastreams.h
@@ -4,6 +4,6 @@
namespace NKikimr::NHttpProxy {
-std::shared_ptr<const IHttpController> CreateDataStreamsHttpController(const NKikimrConfig::TServerlessProxyConfig& config);
+const IHttpController* GetDataStreamsHttpController();
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/datastreams_serialization.cpp b/ydb/core/http_proxy/datastreams_serialization.cpp
new file mode 100644
index 00000000000..b6b5656a3b0
--- /dev/null
+++ b/ydb/core/http_proxy/datastreams_serialization.cpp
@@ -0,0 +1,48 @@
+#include "datastreams_serialization.h"
+
+namespace NKikimr::NHttpProxy::NDataStreams {
+
+ template<>
+ void PrepareValue<Ydb::DataStreams::V1::ListStreamsRequest>(Ydb::DataStreams::V1::ListStreamsRequest& value) {
+ value.set_recurse(true);
+ }
+
+ TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value) {
+ switch (mimeType) {
+ case MIME_CBOR:
+ return SerializeCbor(value);
+ case MIME_JSON:
+ [[fallthrough]];
+ default:
+ return SerializeJson(value);
+ }
+ }
+
+ TString Serialize(const MimeTypes mimeType, TErrorResponse&& value) {
+ NJson::TJsonValue json;
+ json.SetType(NJson::JSON_MAP);
+ json["message"] = value.ErrorText;
+ json["__type"] = value.Exception.first;
+
+ switch (mimeType) {
+ case MIME_CBOR:
+ return SerializeCbor(json);
+ case MIME_JSON:
+ [[fallthrough]];
+ default:
+ return SerializeJson(json);
+ }
+ }
+
+} // namespace NKikimr::NHttpProxy::NDataStreams
+
+namespace NKikimr::NHttpProxy {
+
+ TString BuildError(MimeTypes mimeType, HttpCodes httpCode, const TString& errorName, const TString& errorText) {
+ return Serialize(mimeType, NDataStreams::TErrorResponse{
+ .Exception = TException{errorName, httpCode},
+ .ErrorText = errorText
+ });
+ }
+
+} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/datastreams_serialization.h b/ydb/core/http_proxy/datastreams_serialization.h
new file mode 100644
index 00000000000..84cd9757206
--- /dev/null
+++ b/ydb/core/http_proxy/datastreams_serialization.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include "exceptions_mapping.h"
+#include "json_proto_conversion.h"
+#include "serialization.h"
+
+#include <ydb/library/http_proxy/error/error.h>
+#include <ydb/public/api/protos/draft/datastreams.pb.h>
+
+namespace NKikimr::NHttpProxy::NDataStreams {
+
+ template<typename TValue>
+ void PrepareValue(TValue& value) {
+ Y_UNUSED(value);
+ }
+
+ template<>
+ void PrepareValue<Ydb::DataStreams::V1::ListStreamsRequest>(Ydb::DataStreams::V1::ListStreamsRequest& value);
+
+ template<typename TValue>
+ void Deserialize(const MimeTypes mimeType, TValue& value, const TStringBuf& input)
+ requires std::is_base_of_v<NProtoBuf::Message, TValue> {
+
+ if (input.empty()) {
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Empty body";
+ }
+
+ switch (mimeType) {
+ case MIME_CBOR:
+ PrepareValue(value);
+ DeserializeCbor(value, input);
+ break;
+ case MIME_JSON:
+ PrepareValue(value);
+ DeserializeJson(value, input);
+ break;
+ default:
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
+ "Unknown ContentType";
+ }
+ };
+
+ TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value);
+
+ struct TErrorResponse {
+ TException Exception;
+ TString ErrorText;
+ };
+ TString Serialize(const MimeTypes mimeType, TErrorResponse&& value);
+
+} // namespace NKikimr::NHttpProxy::NDataStreams
diff --git a/ydb/core/http_proxy/discovery_actor.cpp b/ydb/core/http_proxy/discovery_actor.cpp
index 317d593d542..ca7624294e0 100644
--- a/ydb/core/http_proxy/discovery_actor.cpp
+++ b/ydb/core/http_proxy/discovery_actor.cpp
@@ -5,7 +5,6 @@
#include <ydb/library/actors/core/events.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
-#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
#include <library/cpp/cache/cache.h>
@@ -177,3 +176,4 @@ namespace NKikimr::NHttpProxy {
return new TDiscoveryProxyActor(credentialsProvider, config);
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/exceptions_mapping.cpp b/ydb/core/http_proxy/exceptions_mapping.cpp
index 081207cd5d3..dc850d74520 100644
--- a/ydb/core/http_proxy/exceptions_mapping.cpp
+++ b/ydb/core/http_proxy/exceptions_mapping.cpp
@@ -174,3 +174,4 @@ TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueC
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 8f8e0c0dc9a..ccaba669bae 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -1,75 +1,24 @@
-#include "auth_factory.h"
-#include "custom_metrics.h"
-#include "datastreams.h"
-#include "exceptions_mapping.h"
#include "http_req.h"
-#include "json_proto_conversion.h"
-#include "sqs.h"
+
+#include "auth_factory.h"
#include "utils.h"
-#include "ymq.h"
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/base/path.h>
-#include <ydb/core/grpc_caching/cached_grpc_request_actor.h>
-#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
-#include <ydb/core/protos/serverless_proxy_config.pb.h>
-#include <ydb/core/security/ticket_parser_impl.h>
-#include <ydb/core/viewer/json/json.h>
-#include <ydb/core/ymq/actor/auth_multi_factory.h>
-#include <ydb/core/ymq/actor/serviceid.h>
#include <ydb/library/actors/http/http_proxy.h>
-#include <ydb/library/folder_service/events.h>
-#include <ydb/library/folder_service/folder_service.h>
-#include <ydb/library/grpc/actor_client/grpc_service_cache.h>
#include <ydb/library/http_proxy/authorization/auth_helpers.h>
#include <ydb/library/http_proxy/error/error.h>
-#include <ydb/library/ycloud/api/access_service.h>
-#include <ydb/library/ycloud/api/iam_token_service.h>
-#include <ydb/library/ycloud/impl/access_service.h>
-#include <ydb/library/ycloud/impl/iam_token_service.h>
-#include <ydb/public/api/grpc/draft/ydb_sqs_topic_v1.grpc.pb.h>
-#include <ydb/public/sdk/cpp/adapters/issue/issue.h>
-#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/datastreams/datastreams.h>
-#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h>
-#include <ydb/services/datastreams/datastreams_proxy.h>
-#include <ydb/services/datastreams/next_token.h>
-#include <ydb/services/datastreams/shard_iterator.h>
-#include <ydb/services/lib/sharding/sharding.h>
-#include <ydb/services/sqs_topic/queue_url/utils.h>
-#include <ydb/services/sqs_topic/sqs_topic_proxy.h>
-#include <ydb/services/sqs_topic/utils.h>
-#include <ydb/services/ymq/grpc_service.h>
-#include <ydb/services/ymq/rpc_params.h>
-#include <ydb/services/ymq/utils.h>
-#include <ydb/services/ymq/ymq_proxy.h>
-
-#include <yql/essentials/public/issue/yql_issue_message.h>
#include <library/cpp/cgiparam/cgiparam.h>
-#include <library/cpp/digest/old_crc/crc.h>
#include <library/cpp/http/misc/parsed_request.h>
#include <library/cpp/http/server/response.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/protobuf/json/json_output_create.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-#include <library/cpp/protobuf/json/proto2json_printer.h>
#include <library/cpp/uri/uri.h>
-#include <util/generic/guid.h>
-#include <util/stream/file.h>
#include <util/string/ascii.h>
-#include <util/string/cast.h>
-#include <util/string/join.h>
#include <util/string/vector.h>
-#include <nlohmann/json.hpp>
namespace NKikimr::NHttpProxy {
using namespace google::protobuf;
- using namespace Ydb::DataStreams::V1;
- using namespace NYdb::NDataStreams::V1;
constexpr TStringBuf IAM_HEADER = "x-yacloud-subjecttoken";
constexpr TStringBuf SECURITY_TOKEN_HEADER = "x-amz-security-token";
@@ -82,63 +31,24 @@ namespace NKikimr::NHttpProxy {
constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type";
constexpr TStringBuf CREDENTIAL_PARAM = "Credential";
- std::vector<std::shared_ptr<const IHttpController>> BuildControllers(const NKikimrConfig::TServerlessProxyConfig& config) {
- auto controllers = std::vector<std::shared_ptr<const IHttpController>>{
- CreateSqsHttpController(config),
- CreateYmqHttpController(config),
- CreateDataStreamsHttpController(config)
- } | std::views::filter([](const auto& controller) { return controller != nullptr; });
-
- return std::vector<std::shared_ptr<const IHttpController>>(controllers.begin(), controllers.end());
- }
-
- THttpRequestProcessors::THttpRequestProcessors(const NKikimrConfig::TServerlessProxyConfig& config)
- : Controllers(BuildControllers(config))
- {
- }
-
- void SetApiVersionDisabledErrorText(THttpRequestContext& context) {
- context.ResponseData.ErrorText = (TStringBuilder() << context.ApiVersion << " is disabled");
+ THttpRequestProcessors::THttpRequestProcessors(const NKikimrConfig::TServerlessProxyConfig&) {
}
- bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context,
+ bool THttpRequestProcessors::Execute(const TString&, THttpRequestContext&& context,
THolder<NKikimr::NSQS::TAwsRequestSignV4> signature,
- const TActorContext& ctx) {
+ const TActorContext&) {
- for (const auto& controller : Controllers) {
- auto proc = controller->GetProcessor(name, context);
- if (proc.has_value()) {
- proc.value()->Execute(std::move(context), std::move(signature), ctx);
- return true;
- } else {
- switch (proc.error()) {
- case IHttpController::EError::NotMyProtocol:
- continue;
- case IHttpController::EError::MethodNotFound:
- context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED;
- context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name.Quote();
- context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION));
- return false;
- }
- }
+ const auto* controller = GetHttpControllerRegistry().GetController(context.ApiVersion, context.ServiceConfig);
+ if (controller) {
+ return controller->Execute(std::move(context), std::move(signature));
}
- if (name.empty()) {
- context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED;
- context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name.Quote();
- context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION));
- return false;
- }
-
- context.ResponseData.IsYmq = context.ApiVersion == "AmazonSQS";
- if (context.ResponseData.IsYmq) {
- context.ResponseData.UseYmqStatusCode = true;
- context.ResponseData.YmqHttpCode = 400;
- } else {
- context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST;
- }
- SetApiVersionDisabledErrorText(context);
- context.DoReply(ctx);
+ context.DoReply(THttpResponseData{
+ .HttpCode = 404,
+ .ContentType = MimeTypes::MIME_TEXT,
+ .Message = "Not Found",
+ .Body = "Not Found"
+ });
return false;
}
@@ -196,110 +106,38 @@ namespace NKikimr::NHttpProxy {
return signature;
}
- void THttpRequestContext::DoReply(const TActorContext& ctx, size_t issueCode) {
- auto createResponse = [this](const auto& request,
- TStringBuf status,
- TStringBuf message,
- TStringBuf contentType,
- TStringBuf body) {
- NHttp::THttpOutgoingResponsePtr response =
- new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", status, message);
- response->Set<&NHttp::THttpResponse::Connection>(request->GetConnection());
- response->Set(REQUEST_ID_HEADER_EXT, RequestId);
- if (!contentType.empty() && !body.empty()) {
- response->Set<&NHttp::THttpResponse::ContentType>(contentType);
- if (!request->Endpoint->CompressContentTypes.empty()) {
- contentType = NHttp::Trim(contentType.Before(';'), ' ');
- if (Count(request->Endpoint->CompressContentTypes, contentType) != 0) {
- response->EnableCompression();
- }
- }
- }
+ void THttpRequestContext::DoReply(THttpResponseData&& data) {
+ auto ctx = TlsActivationContext->AsActorContext();
+ LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
+ "reply with status: " << data.HttpCode << " message: " << data.Message);
- if (response->IsNeedBody() || !body.empty()) {
- if (request->Method == "HEAD") {
- response->Set<&NHttp::THttpResponse::ContentLength>(ToString(body.size()));
- } else {
- response->SetBody(body);
+ NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(
+ Request,
+ "HTTP",
+ "1.1",
+ TStringBuilder() << data.HttpCode,
+ data.Message
+ );
+ response->Set<&NHttp::THttpResponse::Connection>(Request->GetConnection());
+ response->Set(REQUEST_ID_HEADER_EXT, RequestId);
+ if (!data.Body.empty()) {
+ const auto contentType = AsAwsContentType(data.ContentType);
+ response->Set<&NHttp::THttpResponse::ContentType>(contentType);
+ if (!Request->Endpoint->CompressContentTypes.empty()) {
+ TStringBuf buffer = contentType;
+ auto contentType = NHttp::Trim(buffer.Before(';'), ' ');
+ if (Count(Request->Endpoint->CompressContentTypes, contentType) != 0) {
+ response->EnableCompression();
}
}
- return response;
- };
- auto strByMimeAws = [](MimeTypes contentType) {
- switch (contentType) {
- case MIME_JSON:
- return "application/x-amz-json-1.1";
- case MIME_CBOR:
- return "application/x-amz-cbor-1.1";
- default:
- return strByMime(contentType);
- }
- };
-
- if (ResponseData.Status == NYdb::EStatus::SUCCESS) {
- LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "reply ok");
- } else {
- LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY,
- "reply with status: " << ResponseData.Status <<
- " message: " << ResponseData.ErrorText);
- ResponseData.Body.SetType(NJson::JSON_MAP);
- ResponseData.Body["message"] = ResponseData.ErrorText;
- if (ResponseData.UseYmqStatusCode) {
- ResponseData.Body["__type"] = ResponseData.YmqStatusCode;
- } else {
- ResponseData.Body["__type"] = MapToException(ResponseData.Status, MethodName, issueCode).first;
- }
}
- TString errorName;
- ui32 httpCode;
- if (ResponseData.UseYmqStatusCode) {
- httpCode = ResponseData.YmqHttpCode;
- errorName = ResponseData.YmqStatusCode;
- } else {
- std::tie(errorName, httpCode) = MapToException(ResponseData.Status, MethodName, issueCode);
- }
- auto response = createResponse(
- Request,
- TStringBuilder() << (ui32)httpCode,
- errorName,
- strByMimeAws(ContentType),
- ResponseData.DumpBody(ContentType)
- );
-
- if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) {
- // Send request attributes to the metering actor
- auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>();
-
- auto& requestAttributes = reportRequestAttributes->Data;
-
- requestAttributes.HttpStatusCode = httpCode;
- requestAttributes.IsFifo = ResponseData.YmqIsFifo;
- requestAttributes.FolderId = FolderId;
- requestAttributes.RequestSizeInBytes = Request->Size();
- requestAttributes.ResponseSizeInBytes = response->Size();
- requestAttributes.SourceAddress = SourceAddress;
- requestAttributes.ResourceId = ResourceId;
- requestAttributes.Action = NSQS::ActionFromString(MethodName);
- for (const auto& [k, v] : ResponseData.QueueTags) {
- requestAttributes.QueueTags[k] = v;
+ if (response->IsNeedBody() || !data.Body.empty()) {
+ if (Request->Method == "HEAD") {
+ response->Set<&NHttp::THttpResponse::ContentLength>(ToString(data.Body.size()));
+ } else {
+ response->SetBody(data.Body);
}
-
- LOG_SP_DEBUG_S(
- ctx,
- NKikimrServices::HTTP_PROXY,
- TStringBuilder() << "Send metering event."
- << " HttpStatusCode: " << requestAttributes.HttpStatusCode
- << " IsFifo: " << requestAttributes.IsFifo
- << " FolderId: " << requestAttributes.FolderId
- << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
- << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
- << " SourceAddress: " << requestAttributes.SourceAddress
- << " ResourceId: " << requestAttributes.ResourceId
- << " Action: " << requestAttributes.Action
- );
-
- ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
}
ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
@@ -331,7 +169,7 @@ namespace NKikimr::NHttpProxy {
for (const auto& header : headers.Headers) {
if (AsciiEqualsIgnoreCase(header.first, IAM_HEADER)) {
IamToken = header.second;
- } else if(AsciiEqualsIgnoreCase(header.first, SECURITY_TOKEN_HEADER)) {
+ } else if (AsciiEqualsIgnoreCase(header.first, SECURITY_TOKEN_HEADER)) {
SecurityToken = header.second;
} else if (AsciiEqualsIgnoreCase(header.first, AUTHORIZATION_HEADER)) {
if (header.second.StartsWith("Bearer ")) {
@@ -359,98 +197,25 @@ namespace NKikimr::NHttpProxy {
RequestId = GenerateRequestId(sourceReqId);
}
- TString THttpResponseData::DumpBody(MimeTypes contentType) {
- // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization
- auto cborBinaryTagBySize = [](size_t size) -> ui8 {
- if (size <= 23) {
- return 0x40 + static_cast<ui32>(size);
- } else if (size <= 255) {
- return 0x58;
- } else if (size <= 65536) {
- return 0x59;
- }
-
- return 0x5A;
- };
+ TString AsAwsContentType(MimeTypes contentType) {
switch (contentType) {
- case MIME_CBOR: {
- bool gotData = false;
- std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz =
- [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) {
- if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) {
- gotData = true;
- return true;
- }
- if (event == nlohmann::json::parse_event_t::value and gotData) {
- gotData = false;
- std::string data = parsed.get<std::string>();
- parsed = nlohmann::json::binary({data.begin(), data.end()},
- cborBinaryTagBySize(data.size()));
- return true;
- }
- return true;
- };
-
- auto toCborStr = NJson::WriteJson(Body, false);
- auto json =
- nlohmann::json::parse(TStringBuf(toCborStr).begin(), TStringBuf(toCborStr).end(), bz, false);
- auto toCbor = nlohmann::json::to_cbor(json);
- return {(char*)&toCbor[0], toCbor.size()};
- }
- default: {
- case MIME_JSON:
- return NJson::WriteJson(Body, false);
- }
- }
- }
-
- void THttpRequestContext::RequestBodyToProto(NProtoBuf::Message* request) {
- TStringBuf requestStr = Request->Body;
- if (requestStr.empty()) {
- throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
- "Empty body";
- }
-
- // recursive is default setting
- if (auto listStreamsRequest = dynamic_cast<Ydb::DataStreams::V1::ListStreamsRequest*>(request)) {
- listStreamsRequest->set_recurse(true);
- }
-
- switch (ContentType) {
- case MIME_CBOR: {
- auto fromCbor = nlohmann::json::from_cbor(requestStr.begin(), requestStr.end(),
- true, false,
- nlohmann::json::cbor_tag_handler_t::ignore);
- if (fromCbor.is_discarded()) {
- throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
- "Can not parse request body from CBOR";
- } else {
- NlohmannJsonToProto(fromCbor, request);
- }
- break;
- }
- case MIME_JSON: {
- auto fromJson = nlohmann::json::parse(requestStr, nullptr, false);
- if (fromJson.is_discarded()) {
- throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
- "Can not parse request body from JSON";
- } else {
- NlohmannJsonToProto(fromJson, request);
- }
- break;
- }
- default:
- throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
- "Unknown ContentType";
+ case MIME_JSON:
+ return "application/x-amz-json-1.1";
+ case MIME_CBOR:
+ return "application/x-amz-cbor-1.1";
+ default:
+ return strByMime(contentType);
}
}
-
} // namespace NKikimr::NHttpProxy
template <>
void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p) {
- TString s = TStringBuilder() << "NYdb status: " << std::to_string(static_cast<size_t>(p.Status)) <<
- ". Body: " << NJson::WriteJson(p.Body) << ". Error text: " << p.ErrorText;
+ TString s = TStringBuilder() << "NYdb status: " << p.HttpCode <<
+ ". Content type: " << p.ContentType <<
+ ". Message: " << p.Message <<
+ ". Body: " << p.Body;
+
o.Write(s.data(), s.length());
}
diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h
index 182dc46c462..fa3d5651f11 100644
--- a/ydb/core/http_proxy/http_req.h
+++ b/ydb/core/http_proxy/http_req.h
@@ -10,8 +10,6 @@
#include <ydb/services/datastreams/codes/datastreams_codes.h>
#include <library/cpp/http/server/http.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_value.h>
#include <util/stream/output.h>
#include <util/string/builder.h>
@@ -47,19 +45,11 @@ private:
ui32 UsedRetries{0};
};
-
struct THttpResponseData {
- bool IsYmq = false;
- bool UseYmqStatusCode = false;
- NYdb::EStatus Status{NYdb::EStatus::SUCCESS};
- NJson::TJsonValue Body;
- TString ErrorText{"OK"};
- TString YmqStatusCode;
- ui32 YmqHttpCode = 500;
- bool YmqIsFifo = false;
- THashMap<TString, TString> QueueTags;
-
- TString DumpBody(MimeTypes contentType);
+ ui32 HttpCode;
+ MimeTypes ContentType = MimeTypes::MIME_TEXT;
+ TString Message;
+ TString Body;
};
struct THttpRequestContext {
@@ -74,7 +64,6 @@ struct THttpRequestContext {
NYdb::TDriver* Driver;
std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider;
- THttpResponseData ResponseData;
TString ServiceAccountId;
TString RequestId;
TString DiscoveryEndpoint;
@@ -98,9 +87,9 @@ struct THttpRequestContext {
}
THolder<NKikimr::NSQS::TAwsRequestSignV4> GetSignature();
- void DoReply(const TActorContext& ctx, size_t issueCode = ISSUE_CODE_GENERIC);
void ParseHeaders(TStringBuf headers);
- void RequestBodyToProto(NProtoBuf::Message* request);
+
+ void DoReply(THttpResponseData&& data);
};
class IHttpRequestProcessor {
@@ -142,18 +131,29 @@ protected:
class IHttpController {
public:
enum class EError {
- NotMyProtocol,
- MethodNotFound
+ MethodNotFound,
+ ServiceDisabled
};
virtual ~IHttpController() = default;
- virtual std::expected<IHttpRequestProcessor*, EError> GetProcessor(
- const TString& name,
- const THttpRequestContext& context
+ virtual bool Execute(
+ THttpRequestContext&& context,
+ THolder<NKikimr::NSQS::TAwsRequestSignV4> signature
) const = 0;
+
+ virtual THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const = 0;
+
+ virtual bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const = 0;
+};
+
+class THttpControllerRegistry {
+public:
+ const IHttpController* GetController(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const;
};
+const THttpControllerRegistry& GetHttpControllerRegistry();
+
class THttpRequestProcessors {
public:
using TService = Ydb::DataStreams::V1::DataStreamsService;
@@ -165,11 +165,10 @@ public:
bool Execute(const TString& name, THttpRequestContext&& params,
THolder<NKikimr::NSQS::TAwsRequestSignV4> signature,
const TActorContext& ctx);
-
-private:
- const std::vector<std::shared_ptr<const IHttpController>> Controllers;
};
+TString AsAwsContentType(MimeTypes contentType);
+
} // namespace NKikimr::NHttpProxy
template <>
diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp
index aa022488d43..95559991598 100644
--- a/ydb/core/http_proxy/http_service.cpp
+++ b/ydb/core/http_proxy/http_service.cpp
@@ -1,6 +1,7 @@
-#include "http_req.h"
#include "http_service.h"
+#include "http_req.h"
+
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/events.h>
@@ -9,13 +10,15 @@
#include <ydb/library/actors/http/http_proxy.h>
#include <ydb/library/http_proxy/error/error.h>
-#include <util/string/ascii.h>
#include <util/stream/file.h>
+#include <util/string/ascii.h>
namespace NKikimr::NHttpProxy {
using namespace NActors;
+ TString BuildError(MimeTypes mimeType, HttpCodes httpCode, const TString& errorName, const TString& errorText);
+
class THttpProxyActor : public NActors::TActorBootstrapped<THttpProxyActor> {
using TBase = NActors::TActorBootstrapped<THttpProxyActor>;
public:
@@ -101,15 +104,18 @@ namespace NKikimr::NHttpProxy {
" database [" << context.DatabasePath << "]" <<
" requestId: " << context.RequestId);
+ auto contentType = context.ContentType;
try {
auto signature = context.GetSignature();
auto methodName = context.MethodName;
Processors->Execute(std::move(methodName), std::move(context), std::move(signature), ctx);
} catch (const NKikimr::NSQS::TSQSException& e) {
- context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST;
- context.ResponseData.ErrorText = e.what();
- context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED));
- return;
+ context.DoReply({
+ .HttpCode = HTTP_BAD_REQUEST,
+ .ContentType = contentType,
+ .Message = "AccessDeniedException",
+ .Body = BuildError(contentType, HTTP_BAD_REQUEST, "AccessDeniedException", e.what())
+ });
}
}
@@ -118,3 +124,4 @@ namespace NKikimr::NHttpProxy {
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/serialization.cpp b/ydb/core/http_proxy/serialization.cpp
new file mode 100644
index 00000000000..29bc1c77a25
--- /dev/null
+++ b/ydb/core/http_proxy/serialization.cpp
@@ -0,0 +1,91 @@
+#include "serialization.h"
+#include "json_proto_conversion.h"
+
+#include <nlohmann/json.hpp>
+
+
+namespace NKikimr::NHttpProxy {
+
+void DeserializeCbor(NProtoBuf::Message& message, const TStringBuf& input) {
+ auto fromCbor = nlohmann::json::from_cbor(
+ input.begin(),
+ input.end(),
+ true, // strict mode
+ false, // allow exceptions
+ nlohmann::json::cbor_tag_handler_t::ignore
+ );
+ if (fromCbor.is_discarded()) {
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
+ "Can not parse request body from CBOR";
+ } else {
+ NlohmannJsonToProto(fromCbor, &message);
+ }
+}
+
+void DeserializeJson(NProtoBuf::Message& message, const TStringBuf& input) {
+ auto fromJson = nlohmann::json::parse(input, nullptr, false);
+ if (fromJson.is_discarded()) {
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
+ "Can not parse request body from JSON";
+ } else {
+ NlohmannJsonToProto(fromJson, &message);
+ }
+}
+
+TString SerializeCbor(const NProtoBuf::Message& message) {
+ NJson::TJsonValue result;
+ ProtoToJson(message, result, true);
+ return SerializeCbor(result);
+}
+
+TString SerializeJson(const NProtoBuf::Message& message) {
+ NJson::TJsonValue result;
+ ProtoToJson(message, result, false);
+ return SerializeJson(result);
+}
+
+TString SerializeCbor(const NJson::TJsonValue& message) {
+ // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization
+ auto cborBinaryTagBySize = [](size_t size) -> ui8 {
+ if (size <= 23) {
+ return 0x40 + static_cast<ui32>(size);
+ } else if (size <= 255) {
+ return 0x58;
+ } else if (size <= 65535) {
+ return 0x59;
+ }
+
+ return 0x5A;
+ };
+
+ bool gotData = false;
+ std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz =
+ [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) {
+ if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) {
+ gotData = true;
+ return true;
+ }
+ if (event == nlohmann::json::parse_event_t::value and gotData) {
+ gotData = false;
+ std::string data = parsed.get<std::string>();
+ parsed = nlohmann::json::binary({data.begin(), data.end()},
+ cborBinaryTagBySize(data.size()));
+ return true;
+ }
+ return true;
+ };
+
+ auto toCborStr = NJson::WriteJson(message, false);
+ TStringBuf toCborBuf(toCborStr);
+
+ auto json = nlohmann::json::parse(toCborBuf.begin(), toCborBuf.end(), bz, false);
+ auto toCbor = nlohmann::json::to_cbor(json);
+
+ return {(char*)&toCbor[0], toCbor.size()};
+}
+
+TString SerializeJson(const NJson::TJsonValue& message) {
+ return NJson::WriteJson(message, false);
+}
+
+} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/serialization.h b/ydb/core/http_proxy/serialization.h
new file mode 100644
index 00000000000..5196e4c02f7
--- /dev/null
+++ b/ydb/core/http_proxy/serialization.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <contrib/libs/protobuf/src/google/protobuf/message.h>
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/mime/types/mime.h>
+
+namespace NKikimr::NHttpProxy {
+
+void DeserializeCbor(NProtoBuf::Message& message, const TStringBuf& input);
+void DeserializeJson(NProtoBuf::Message& message, const TStringBuf& input);
+
+TString SerializeCbor(const NProtoBuf::Message& message);
+TString SerializeJson(const NProtoBuf::Message& message);
+
+TString SerializeCbor(const NJson::TJsonValue& message);
+TString SerializeJson(const NJson::TJsonValue& message);
+
+} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/sqs.cpp b/ydb/core/http_proxy/sqs.cpp
index 481ab381898..ba2b6642af6 100644
--- a/ydb/core/http_proxy/sqs.cpp
+++ b/ydb/core/http_proxy/sqs.cpp
@@ -1,8 +1,11 @@
+#include "sqs.h"
+
#include "auth_factory.h"
+#include "controller_base.h"
#include "custom_metrics.h"
#include "exceptions_mapping.h"
#include "http_req.h"
-#include "json_proto_conversion.h"
+#include "sqs_serialization.h"
#include "utils.h"
#include <ydb/core/base/appdata.h>
@@ -26,8 +29,6 @@
#include <yql/essentials/public/issue/yql_issue_message.h>
-#include <util/string/cast.h>
-
namespace NKikimr::NHttpProxy {
namespace {
@@ -51,7 +52,7 @@ namespace NKikimr::NHttpProxy {
return std::unexpected(std::move(parsedQueueUrl).error());
}
return parsedQueueUrl->Database;
- }
+ }
template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv>
class TSqsTopicHttpRequestProcessor : public TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>{
@@ -171,16 +172,25 @@ namespace NKikimr::NHttpProxy {
}
void ReplyWithYdbError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) {
- HttpContext.ResponseData.Status = status;
- HttpContext.ResponseData.ErrorText = errorText;
+ const auto [errorName, httpCode] = MapToException(status, Method, issueCode);
+
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
AddCommonLabels({
- {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
+ {"code", TStringBuilder() << (int)httpCode},
{"name", "api.sqs.response.count"},
})});
- ReplyToHttpContext(ctx, 0, issueCode);
+
+ ReplyToHttpContext({
+ .HttpCode = httpCode,
+ .ContentType = HttpContext.ContentType,
+ .Message = errorName,
+ .Body = NSQS::Serialize(HttpContext.ContentType, {
+ .StatusCode = errorName,
+ .ErrorText = errorText,
+ })
+ }, errorText.size(), errorText);
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
@@ -192,12 +202,7 @@ namespace NKikimr::NHttpProxy {
ui32 httpStatusCode,
const TString& ymqStatusCode,
const TString& errorText) {
- HttpContext.ResponseData.IsYmq = false;
- HttpContext.ResponseData.UseYmqStatusCode = true;
- HttpContext.ResponseData.Status = NYdb::EStatus::STATUS_UNDEFINED;
- HttpContext.ResponseData.YmqHttpCode = httpStatusCode;
- HttpContext.ResponseData.YmqStatusCode = ymqStatusCode;
- HttpContext.ResponseData.ErrorText = errorText;
+
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
@@ -205,32 +210,38 @@ namespace NKikimr::NHttpProxy {
{"code", ToString(httpStatusCode)},
{"name", "api.sqs.response.count"},
})});
- ReplyToHttpContext(ctx, errorText.size(), std::nullopt);
+
+ ReplyToHttpContext({
+ .HttpCode = httpStatusCode,
+ .ContentType = HttpContext.ContentType,
+ .Message = ymqStatusCode,
+ .Body = NSQS::Serialize(HttpContext.ContentType, {
+ .StatusCode = ymqStatusCode,
+ .ErrorText = errorText,
+ })
+ }, errorText.size(), errorText);
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
TBase::Die(ctx);
}
- void ReplyToHttpContext(const TActorContext& ctx, size_t messageSize, std::optional<size_t> issueCode) {
+ void ReplyToHttpContext(THttpResponseData&& data, size_t messageSize, TStringBuf errorText = "") {
+ const TActorContext& ctx = TlsActivationContext->AsActorContext();
+
ReportLatencyCounters(ctx);
- ReportResponseSizeCounters(TStringBuilder() << HttpContext.ResponseData.YmqHttpCode, messageSize, ctx);
- LogHttpRequestResponse(ctx);
+ ReportResponseSizeCounters(TStringBuilder() << data.HttpCode, messageSize, ctx);
+ LogHttpRequestResponse(ctx, data.HttpCode, errorText);
- if (issueCode.has_value()) {
- HttpContext.DoReply(ctx, issueCode.value());
- } else {
- HttpContext.DoReply(ctx);
- }
+ HttpContext.DoReply(std::move(data));
}
- void LogHttpRequestResponse(const TActorContext& ctx) {
- const int httpCode = HttpContext.ResponseData.UseYmqStatusCode ? HttpContext.ResponseData.YmqHttpCode : 200;
+ void LogHttpRequestResponse(const TActorContext& ctx, int httpCode, TStringBuf errorText) {
const bool isServerError = IsServerError(httpCode);
auto priority = isServerError ? NActors::NLog::PRI_WARN : NActors::NLog::PRI_INFO;
LOG_LOG_S_SAMPLED_BY(ctx, priority, NKikimrServices::SQS,
NSqsTopic::SampleIdFromRequestId(HttpContext.RequestId),
- "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "SqsTopic", TopicPath, Method, UserSid_, httpCode, HttpContext.ResponseData.ErrorText));
+ "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "SqsTopic", TopicPath, Method, UserSid_, httpCode, errorText));
}
void ReportInputCounters(const TActorContext& ctx) {
@@ -265,17 +276,23 @@ namespace NKikimr::NHttpProxy {
void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev,
const TActorContext& ctx) {
if (ev->Get()->Status->IsSuccess()) {
- ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body,
- HttpContext.ContentType == MIME_CBOR);
FillOutputCustomMetrics<TProtoResult>(
- *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
+ *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())),
+ HttpContext,
+ ctx
+ );
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
AddCommonLabels({
{"code", "200"},
{"name", "api.sqs.response.count"}})});
- ReplyToHttpContext(ctx, ev->Get()->Message->ByteSizeLong(), std::nullopt);
+ ReplyToHttpContext({
+ .HttpCode = 200,
+ .ContentType = HttpContext.ContentType,
+ .Message = "",
+ .Body = NSQS::Serialize(HttpContext.ContentType, *ev->Get()->Message)
+ }, ev->Get()->Message->ByteSizeLong());
} else {
auto retryClass =
NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus());
@@ -299,8 +316,8 @@ namespace NKikimr::NHttpProxy {
auto issues = ev->Get()->Status->GetIssues();
auto [error, errorCode] = issues.Empty()
? std::make_tuple(
- NSQS::NErrors::INTERNAL_FAILURE.ErrorCode,
- NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode)
+ NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode,
+ NKikimr::NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode)
: NKikimr::NSQS::TErrorClass::GetErrorAndCode(issues.begin()->GetCode());
LOG_SP_DEBUG_S(
@@ -313,7 +330,7 @@ namespace NKikimr::NHttpProxy {
ctx,
errorCode,
error,
- TString{!issues.Empty() ? issues.begin()->GetMessage() : NSQS::NErrors::INTERNAL_FAILURE.ErrorCode}
+ TString{!issues.Empty() ? issues.begin()->GetMessage() : NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode}
);
}
}
@@ -343,7 +360,7 @@ namespace NKikimr::NHttpProxy {
void Bootstrap(const TActorContext& ctx) {
StartTime = ctx.Now();
try {
- HttpContext.RequestBodyToProto(&Request);
+ NSQS::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body);
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
if (e.ErrorClass.ErrorCode == "MissingParameter")
@@ -388,9 +405,9 @@ namespace NKikimr::NHttpProxy {
if (AppData(ctx)->EnforceUserTokenRequirement || AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
return ReplyWithMessageQueueError(
ctx,
- NSQS::NErrors::INCOMPLETE_SIGNATURE.HttpStatusCode,
- NSQS::NErrors::INCOMPLETE_SIGNATURE.ErrorCode,
- NSQS::NErrors::INCOMPLETE_SIGNATURE.DefaultMessage);
+ NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.HttpStatusCode,
+ NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.ErrorCode,
+ NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.DefaultMessage);
}
SendGrpcRequestNoDriver(ctx);
}
@@ -422,31 +439,33 @@ namespace NKikimr::NHttpProxy {
};
};
- class TController : public IHttpController {
+ class TController : public TBaseHttpController {
public:
TController() {
- #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = MakeHolder<TSqsTopicHttpRequestProcessor< \
- Ydb::SqsTopic::V1::SqsTopicService, \
- Ydb::Ymq::V1::name##Request, \
- Ydb::Ymq::V1::name##Response, \
- Ydb::Ymq::V1::name##Result, \
- decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \
- NKikimr::NGRpcService::TEvSqsTopic##name##Request>> \
- (#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name)
+ #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = \
+ std::make_unique<TSqsTopicHttpRequestProcessor< \
+ Ydb::SqsTopic::V1::SqsTopicService, \
+ Ydb::Ymq::V1::name##Request, \
+ Ydb::Ymq::V1::name##Response, \
+ Ydb::Ymq::V1::name##Result, \
+ decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \
+ NKikimr::NGRpcService::TEvSqsTopic##name##Request \
+ >>(#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) \
DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(GetQueueUrl);
DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(ListQueues);
#undef DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN
- #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = MakeHolder<TSqsTopicHttpRequestProcessor< \
- Ydb::SqsTopic::V1::SqsTopicService, \
- Ydb::Ymq::V1::name##Request, \
- Ydb::Ymq::V1::name##Response, \
- Ydb::Ymq::V1::name##Result, \
- decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \
- NKikimr::NGRpcService::TEvSqsTopic##name##Request>> \
- (#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name)
+ #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = \
+ std::make_unique<TSqsTopicHttpRequestProcessor< \
+ Ydb::SqsTopic::V1::SqsTopicService, \
+ Ydb::Ymq::V1::name##Request, \
+ Ydb::Ymq::V1::name##Response, \
+ Ydb::Ymq::V1::name##Result, \
+ decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \
+ NKikimr::NGRpcService::TEvSqsTopic##name##Request \
+ >>(#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) \
DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(CreateQueue);
DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(DeleteMessage);
@@ -463,33 +482,36 @@ namespace NKikimr::NHttpProxy {
#undef DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN
}
-
- std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor(
- const TString& name,
- const THttpRequestContext& context
- ) const override {
- if (context.ApiVersion != "AmazonSQS") {
- return std::unexpected(IHttpController::EError::NotMyProtocol);
- }
- if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) {
- return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get());
- }
+ THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override {
+ const auto [errorName, httpCode] = MapToException(Status, "", issueCode);
+ return {
+ .HttpCode = httpCode,
+ .ContentType = contentType,
+ .Message = errorName,
+ .Body = NSQS::Serialize(contentType, NSQS::TErrorResponse{
+ .StatusCode = errorName,
+ .ErrorText = TString(message),
+ })
+ };
+ }
+
+ bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override {
+ return config.GetSqsTopicEnabled();
+ }
- return std::unexpected(IHttpController::EError::MethodNotFound);
+ bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override {
+ return apiVersion == "AmazonSQS";
}
-
- private:
- THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor;
};
+ TController ControllerInstance;
+
} // namespace
- std::shared_ptr<const IHttpController> CreateSqsHttpController(const NKikimrConfig::TServerlessProxyConfig& config) {
- if (config.GetHttpConfig().GetSqsTopicEnabled()) {
- return std::make_shared<TController>();
- }
- return {};
+ const IHttpController* GetSqsHttpController() {
+ return &ControllerInstance;
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/sqs.h b/ydb/core/http_proxy/sqs.h
index 0d681501bd3..f777e7a0e6b 100644
--- a/ydb/core/http_proxy/sqs.h
+++ b/ydb/core/http_proxy/sqs.h
@@ -4,6 +4,6 @@
namespace NKikimr::NHttpProxy {
-std::shared_ptr<const IHttpController> CreateSqsHttpController(const NKikimrConfig::TServerlessProxyConfig& config);
+const IHttpController* GetSqsHttpController();
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/sqs_serialization.cpp b/ydb/core/http_proxy/sqs_serialization.cpp
new file mode 100644
index 00000000000..8086d19c9b1
--- /dev/null
+++ b/ydb/core/http_proxy/sqs_serialization.cpp
@@ -0,0 +1,36 @@
+#include "sqs_serialization.h"
+
+namespace NKikimr::NHttpProxy::NSQS {
+
+ TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value) {
+ switch (mimeType) {
+ case MIME_XML:
+ // TODO implement
+ Y_ENSURE(false);
+ case MIME_CBOR:
+ return SerializeCbor(value);
+ case MIME_JSON:
+ [[fallthrough]];
+ default:
+ return SerializeJson(value);
+ }
+ }
+
+
+ TString Serialize(const MimeTypes mimeType, TErrorResponse&& value) {
+ NJson::TJsonValue json;
+ json.SetType(NJson::JSON_MAP);
+ json["message"] = value.ErrorText;
+ json["__type"] = value.StatusCode;
+
+ switch (mimeType) {
+ case MIME_CBOR:
+ return SerializeCbor(json);
+ case MIME_JSON:
+ [[fallthrough]];
+ default:
+ return SerializeJson(json);
+ }
+ }
+
+} // namespace NKikimr::NHttpProxy::NSQS
diff --git a/ydb/core/http_proxy/sqs_serialization.h b/ydb/core/http_proxy/sqs_serialization.h
new file mode 100644
index 00000000000..75594f43126
--- /dev/null
+++ b/ydb/core/http_proxy/sqs_serialization.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include "serialization.h"
+
+#include <ydb/library/http_proxy/error/error.h>
+
+namespace NKikimr::NHttpProxy::NSQS {
+
+ template<typename TValue>
+ void PrepareValue(TValue& value) {
+ Y_UNUSED(value);
+ }
+
+ template<typename TValue>
+ void Deserialize(const MimeTypes mimeType, TValue& value, const TStringBuf& input)
+ requires std::is_base_of_v<NProtoBuf::Message, TValue> {
+
+ if (input.empty()) {
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Empty body";
+ }
+
+ switch (mimeType) {
+ case MIME_CBOR:
+ PrepareValue(value);
+ DeserializeCbor(value, input);
+ break;
+ case MIME_JSON:
+ PrepareValue(value);
+ DeserializeJson(value, input);
+ break;
+ default:
+ throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
+ "Unknown ContentType";
+ }
+ };
+
+ TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value);
+
+ struct TErrorResponse {
+ TString StatusCode;
+ TString ErrorText;
+ };
+ TString Serialize(const MimeTypes mimeType, TErrorResponse&& value);
+
+} // namespace NKikimr::NHttpProxy::NSQS
diff --git a/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp b/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp
index 337fbcb4ee3..0905db1a478 100644
--- a/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp
+++ b/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp
@@ -807,7 +807,7 @@ void THttpProxyTestMock::InitHttpServer(bool yandexCloudMode, bool enableSqsTopi
config.SetTestMode(true);
config.MutableHttpConfig()->SetPort(HttpServicePort);
config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode);
- config.MutableHttpConfig()->SetYmqEnabled(true);
+ config.MutableHttpConfig()->SetYmqEnabled(!enableSqsTopic);
config.MutableHttpConfig()->SetSqsTopicEnabled(enableSqsTopic);
SqsTopicMode = enableSqsTopic;
diff --git a/ydb/core/http_proxy/ut/kinesis_ut.cpp b/ydb/core/http_proxy/ut/kinesis_ut.cpp
index a31f221a8a9..8471f456b18 100644
--- a/ydb/core/http_proxy/ut/kinesis_ut.cpp
+++ b/ydb/core/http_proxy/ut/kinesis_ut.cpp
@@ -190,22 +190,22 @@ Y_UNIT_TEST_SUITE(TestKinesisHttpProxy) {
request = CreateDescribeStreamRequest();
request["StreamName"] = "teststream";
res = SendHttpRequest("/Root", ".", request, FormAuthorizationStr("ru-central-1"));
- UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
- UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction");
+ UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404);
+ UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found");
}
{
request = CreateDescribeStreamRequest();
request["StreamName"] = "teststream";
res = SendHttpRequest("/Root", "", request, FormAuthorizationStr("ru-central-1"));
- UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
- UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction");
+ UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404);
+ UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found");
}
{
request = CreateDescribeStreamRequest();
request["StreamName"] = "teststream";
res = SendHttpRequest("/Root", ".DescribeStream", request, FormAuthorizationStr("ru-central-1"));
- UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
- UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction");
+ UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404);
+ UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found");
}
}
diff --git a/ydb/core/http_proxy/utils.cpp b/ydb/core/http_proxy/utils.cpp
index 90496640dfc..2519b0556de 100644
--- a/ydb/core/http_proxy/utils.cpp
+++ b/ydb/core/http_proxy/utils.cpp
@@ -1,6 +1,7 @@
-#include "exceptions_mapping.h"
#include "utils.h"
+#include "http_req.h"
+
namespace NKikimr::NHttpProxy {
TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode) {
diff --git a/ydb/core/http_proxy/utils.h b/ydb/core/http_proxy/utils.h
index 119d82d75d4..0ba7b49ba10 100644
--- a/ydb/core/http_proxy/utils.h
+++ b/ydb/core/http_proxy/utils.h
@@ -1,13 +1,17 @@
#pragma once
#include "exceptions_mapping.h"
-#include "http_req.h"
+
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/status_codes.h>
#include <util/datetime/base.h>
namespace NKikimr::NHttpProxy {
-TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode = ISSUE_CODE_ERROR);
+struct THttpRequestContext;
+
+TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode);
+
TString LogHttpRequestResponseCommonInfoString(const THttpRequestContext& httpContext, TInstant startTime, TStringBuf api, TStringBuf topicPath, TStringBuf method, TStringBuf userSid, int httpCode, TStringBuf httpResponseMessage);
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/ya.make b/ydb/core/http_proxy/ya.make
index 6b07d755c78..2ac9615485e 100644
--- a/ydb/core/http_proxy/ya.make
+++ b/ydb/core/http_proxy/ya.make
@@ -9,8 +9,14 @@ SRCS(
auth_factory.h
auth_actors.cpp
auth_actors.h
+ controller_base.cpp
+ controller_base.h
+ controller_registry.cpp
custom_metrics.h
datastreams.cpp
+ datastreams.h
+ datastreams_serialization.cpp
+ datastreams_serialization.h
discovery_actor.cpp
discovery_actor.h
events.h
@@ -26,9 +32,16 @@ SRCS(
json_proto_conversion.cpp
metrics_actor.cpp
metrics_actor.h
+ serialization.cpp
+ serialization.h
sqs.cpp
+ sqs.h
+ sqs_serialization.cpp
+ sqs_serialization.h
utils.cpp
+ utils.h
ymq.cpp
+ ymq.h
)
PEERDIR(
diff --git a/ydb/core/http_proxy/ymq.cpp b/ydb/core/http_proxy/ymq.cpp
index f57753f998f..63c774365c2 100644
--- a/ydb/core/http_proxy/ymq.cpp
+++ b/ydb/core/http_proxy/ymq.cpp
@@ -1,5 +1,9 @@
+#include "ymq.h"
+
+#include "controller_base.h"
#include "http_req.h"
-#include "json_proto_conversion.h"
+#include "sqs_serialization.h"
+#include "utils.h"
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/core/ymq/actor/auth_multi_factory.h>
@@ -18,6 +22,9 @@
#include <yql/essentials/public/issue/yql_issue_message.h>
+#include <expected>
+#include <functional>
+
namespace NKikimr::NHttpProxy {
namespace {
@@ -144,10 +151,19 @@ namespace NKikimr::NHttpProxy {
NYdb::EStatus status,
const TString& errorText,
size_t issueCode = ISSUE_CODE_GENERIC) {
- HttpContext.ResponseData.Status = status;
- HttpContext.ResponseData.ErrorText = errorText;
- ReplyToHttpContext(ctx, issueCode);
+ const auto [errorName, httpCode] = MapToException(status, Method, issueCode);
+
+ ReplyToHttpContext({
+ .HttpCode = httpCode,
+ .ContentType = HttpContext.ContentType,
+ .Message = errorName,
+ .Body = NSQS::Serialize(HttpContext.ContentType, {
+ .StatusCode = errorName,
+ .ErrorText = errorText,
+ })
+ }, {});
+
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
@@ -159,28 +175,69 @@ namespace NKikimr::NHttpProxy {
ui32 httpStatusCode,
const TString& ymqStatusCode,
const TString& errorText) {
- HttpContext.ResponseData.IsYmq = true;
- HttpContext.ResponseData.UseYmqStatusCode = true;
- HttpContext.ResponseData.Status = NYdb::EStatus::STATUS_UNDEFINED;
- HttpContext.ResponseData.YmqHttpCode = httpStatusCode;
- HttpContext.ResponseData.YmqStatusCode = ymqStatusCode;
- HttpContext.ResponseData.ErrorText = errorText;
- ReplyToHttpContext(ctx);
+ ReplyToHttpContext({
+ .HttpCode = httpStatusCode,
+ .ContentType = HttpContext.ContentType,
+ .Message = ymqStatusCode,
+ .Body = NSQS::Serialize(HttpContext.ContentType, {
+ .StatusCode = ymqStatusCode,
+ .ErrorText = errorText,
+ })
+ }, {});
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
TBase::Die(ctx);
}
- void ReplyToHttpContext(const TActorContext& ctx, std::optional<size_t> issueCode = std::nullopt) {
- if (issueCode.has_value()) {
- HttpContext.DoReply(ctx, issueCode.value());
- } else {
- HttpContext.DoReply(ctx);
+ void DoMetering(const THttpResponseData& data, THolder<THashMap<TString, TString>>&& queueTags, const TActorContext& ctx) {
+ if (HttpContext.ServiceConfig.GetHttpConfig().GetYandexCloudMode()) {
+ // Send request attributes to the metering actor
+ auto reportRequestAttributes = MakeHolder<::NKikimr::NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>();
+
+ auto& requestAttributes = reportRequestAttributes->Data;
+
+ requestAttributes.HttpStatusCode = data.HttpCode;
+ requestAttributes.IsFifo = IsFifo;
+ requestAttributes.FolderId = FolderId;
+ requestAttributes.RequestSizeInBytes = HttpContext.Request->Size();
+ requestAttributes.ResponseSizeInBytes = data.Body.size();
+ requestAttributes.SourceAddress = HttpContext.SourceAddress;
+ requestAttributes.ResourceId = ResourceId;
+ requestAttributes.Action = ::NKikimr::NSQS::ActionFromString(HttpContext.MethodName);
+ if (queueTags) {
+ for (auto&& [k, v] : *queueTags) {
+ requestAttributes.QueueTags[std::move(k)] = std::move(v);
+ }
+ }
+
+ LOG_SP_DEBUG_S(
+ ctx,
+ NKikimrServices::HTTP_PROXY,
+ TStringBuilder() << "Send metering event."
+ << " HttpStatusCode: " << requestAttributes.HttpStatusCode
+ << " IsFifo: " << requestAttributes.IsFifo
+ << " FolderId: " << requestAttributes.FolderId
+ << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
+ << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
+ << " SourceAddress: " << requestAttributes.SourceAddress
+ << " ResourceId: " << requestAttributes.ResourceId
+ << " Action: " << requestAttributes.Action
+ );
+
+ ctx.Send(::NKikimr::NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
}
}
+ void ReplyToHttpContext(THttpResponseData&& data, THolder<THashMap<TString, TString>>&& queueTags) {
+ const TActorContext& ctx = TlsActivationContext->AsActorContext();
+
+ DoMetering(data, std::move(queueTags), ctx);
+
+ HttpContext.DoReply(std::move(data));
+ }
+
void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev,
const TActorContext& ctx) {
if (ev->Get()->Status->IsSuccess()) {
@@ -189,18 +246,15 @@ namespace NKikimr::NHttpProxy {
NKikimrServices::HTTP_PROXY,
"Got succesfult GRPC response."
);
- ProtoToJson(
- *ev->Get()->Message,
- HttpContext.ResponseData.Body,
- HttpContext.ContentType == MIME_CBOR
+
+ ReplyToHttpContext({
+ .HttpCode = 200,
+ .ContentType = HttpContext.ContentType,
+ .Message = "",
+ .Body = NSQS::Serialize(HttpContext.ContentType, *ev->Get()->Message)
+ },
+ std::move(ev->Get()->QueueTags)
);
- HttpContext.ResponseData.IsYmq = true;
- HttpContext.ResponseData.UseYmqStatusCode = true;
- HttpContext.ResponseData.YmqHttpCode = 200;
- if (ev->Get()->QueueTags) {
- HttpContext.ResponseData.QueueTags = std::move(*ev->Get()->QueueTags);
- }
- ReplyToHttpContext(ctx);
} else {
auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus());
@@ -227,8 +281,8 @@ namespace NKikimr::NHttpProxy {
auto issues = ev->Get()->Status->GetIssues();
auto errorAndCode = issues.Empty()
? std::make_tuple(
- NSQS::NErrors::INTERNAL_FAILURE.ErrorCode,
- NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode)
+ NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode,
+ NKikimr::NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode)
: NKikimr::NSQS::TErrorClass::GetErrorAndCode(issues.begin()->GetCode());
LOG_SP_DEBUG_S(
@@ -292,7 +346,7 @@ namespace NKikimr::NHttpProxy {
PoolId = ctx.SelfID.PoolID();
StartTime = ctx.Now();
try {
- HttpContext.RequestBodyToProto(&Request);
+ NSQS::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body);
auto queueUrl = QueueUrlExtractor(Request);
if (!queueUrl.empty()) {
auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl);
@@ -301,7 +355,7 @@ namespace NKikimr::NHttpProxy {
}
CloudId = cloudIdAndResourceId.first;
HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second;
- HttpContext.ResponseData.YmqIsFifo = AsciiHasSuffixIgnoreCase(queueUrl, ".fifo");
+ IsFifo = AsciiHasSuffixIgnoreCase(queueUrl, ".fifo");
}
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
@@ -337,10 +391,10 @@ namespace NKikimr::NHttpProxy {
} else {
auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>();
- NSQS::EAction action = NSQS::ActionFromString(Method);
+ NKikimr::NSQS::EAction action = NKikimr::NSQS::ActionFromString(Method);
requestHolder->SetRequestId(HttpContext.RequestId);
- NSQS::TAuthActorData data {
+ NKikimr::NSQS::TAuthActorData data {
.SQSRequest = std::move(requestHolder),
.UserSidCallback = [](const TString& userSid) { Y_UNUSED(userSid); },
.EnableQueueLeader = true,
@@ -352,7 +406,7 @@ namespace NKikimr::NHttpProxy {
.AWSSignature = std::move(HttpContext.GetSignature()),
.IAMToken = HttpContext.IamToken,
.FolderID = HttpContext.FolderId,
- .RequestFormat = NSQS::TAuthActorData::Json,
+ .RequestFormat = NKikimr::NSQS::TAuthActorData::Json,
.Requester = ctx.SelfID
};
@@ -386,22 +440,25 @@ namespace NKikimr::NHttpProxy {
TString CloudId;
TString ResourceId;
TString UserSid;
+ bool IsFifo = false;
};
std::function<TString(TProtoRequest&)> QueueUrlExtractor;
};
- class TController : public IHttpController {
+ class TController: public TBaseHttpController {
public:
TController() {
- #define DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = MakeHolder<TYmqHttpRequestProcessor< \
- Ydb::Ymq::V1::YmqService, \
- Ydb::Ymq::V1::name##Request, \
- Ydb::Ymq::V1::name##Response, \
- Ydb::Ymq::V1::name##Result, \
- decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \
- NKikimr::NGRpcService::TEvYmq##name##Request>> \
- (#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request&){return "";});
+
+ #define DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = \
+ std::make_unique<TYmqHttpRequestProcessor< \
+ Ydb::Ymq::V1::YmqService, \
+ Ydb::Ymq::V1::name##Request, \
+ Ydb::Ymq::V1::name##Response, \
+ Ydb::Ymq::V1::name##Result, \
+ decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \
+ NKikimr::NGRpcService::TEvYmq##name##Request \
+ >>(#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request&){return "";})
DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(GetQueueUrl);
DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(CreateQueue);
@@ -409,14 +466,18 @@ namespace NKikimr::NHttpProxy {
#undef DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN
- #define DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = MakeHolder<TYmqHttpRequestProcessor< \
- Ydb::Ymq::V1::YmqService, \
- Ydb::Ymq::V1::name##Request, \
- Ydb::Ymq::V1::name##Response, \
- Ydb::Ymq::V1::name##Result,\
- decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \
- NKikimr::NGRpcService::TEvYmq##name##Request>> \
- (#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request& request){return request.Getqueue_url();});
+ #define DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = \
+ std::make_unique<TYmqHttpRequestProcessor< \
+ Ydb::Ymq::V1::YmqService, \
+ Ydb::Ymq::V1::name##Request, \
+ Ydb::Ymq::V1::name##Response, \
+ Ydb::Ymq::V1::name##Result, \
+ decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \
+ NKikimr::NGRpcService::TEvYmq##name##Request \
+ >>(#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request& request){ \
+ return request.Getqueue_url(); \
+ })
+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessage);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ReceiveMessage);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(GetQueueAttributes);
@@ -434,34 +495,38 @@ namespace NKikimr::NHttpProxy {
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(UntagQueue);
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
+
}
- std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor(
- const TString& name,
- const THttpRequestContext& context
- ) const override {
- if (context.ApiVersion != "AmazonSQS") {
- return std::unexpected(IHttpController::EError::NotMyProtocol);
- }
+ THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override {
+ const auto [errorName, httpCode] = MapToException(Status, "", issueCode);
+ return {
+ .HttpCode = httpCode,
+ .ContentType = contentType,
+ .Message = errorName,
+ .Body = NSQS::Serialize(contentType, NSQS::TErrorResponse{
+ .StatusCode = errorName,
+ .ErrorText = TString(message),
+ })
+ };
+ }
- if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) {
- return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get());
- }
+ bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override {
+ return config.GetYmqEnabled();
+ }
- return std::unexpected(IHttpController::EError::MethodNotFound);
+ bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override {
+ return apiVersion == "AmazonSQS";
}
+ };
- private:
- THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor;
- };
+ TController ControllerInstance;
} // namespace
- std::shared_ptr<const IHttpController> CreateYmqHttpController(const NKikimrConfig::TServerlessProxyConfig& config) {
- if (config.GetHttpConfig().GetYmqEnabled()) {
- return std::make_shared<TController>();
- }
- return {};
+ const IHttpController* GetYmqHttpController() {
+ return &ControllerInstance;
}
} // namespace NKikimr::NHttpProxy
+
diff --git a/ydb/core/http_proxy/ymq.h b/ydb/core/http_proxy/ymq.h
index 67da4e08466..5eab412fd37 100644
--- a/ydb/core/http_proxy/ymq.h
+++ b/ydb/core/http_proxy/ymq.h
@@ -4,6 +4,6 @@
namespace NKikimr::NHttpProxy {
-std::shared_ptr<const IHttpController> CreateYmqHttpController(const NKikimrConfig::TServerlessProxyConfig& config);
+const IHttpController* GetYmqHttpController();
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp b/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp
index 60fadd1c2b7..6818a8cfc69 100644
--- a/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp
+++ b/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/core/discovery/discovery.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
#include <library/cpp/testing/unittest/registar.h>