diff options
author | lpetrov02 <lpetrov02@yandex-team.com> | 2023-02-13 18:09:09 +0300 |
---|---|---|
committer | lpetrov02 <lpetrov02@yandex-team.com> | 2023-02-13 18:09:09 +0300 |
commit | cbe7a80bc6a10c43817fb9d3c5e8dca885ca253e (patch) | |
tree | 6fe555b338e2b7ca28780cee0d89ab62785b96da | |
parent | b5b3d520c01646610f376c55455d9cad735e3e0d (diff) | |
download | ydb-cbe7a80bc6a10c43817fb9d3c5e8dca885ca253e.tar.gz |
Introduce tests on error codes
Saves minor changes to pull from trunk
Saves changes
Adds alternative (to think) variants of http codes for ydb statuses
22 files changed, 743 insertions, 240 deletions
diff --git a/ydb/core/http_proxy/CMakeLists.darwin.txt b/ydb/core/http_proxy/CMakeLists.darwin.txt index d76dd4343b6..8a70b7e5ed3 100644 --- a/ydb/core/http_proxy/CMakeLists.darwin.txt +++ b/ydb/core/http_proxy/CMakeLists.darwin.txt @@ -33,10 +33,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC cpp-client-iam_private ydb-services-datastreams services-persqueue_v1-actors + api-protos ) target_sources(ydb-core-http_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp diff --git a/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt index 35e6158b498..d70963e423e 100644 --- a/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt @@ -34,10 +34,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC cpp-client-iam_private ydb-services-datastreams services-persqueue_v1-actors + api-protos ) target_sources(ydb-core-http_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp diff --git a/ydb/core/http_proxy/CMakeLists.linux.txt b/ydb/core/http_proxy/CMakeLists.linux.txt index 35e6158b498..d70963e423e 100644 --- a/ydb/core/http_proxy/CMakeLists.linux.txt +++ b/ydb/core/http_proxy/CMakeLists.linux.txt @@ -34,10 +34,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC cpp-client-iam_private ydb-services-datastreams services-persqueue_v1-actors + api-protos ) target_sources(ydb-core-http_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h index c859597ebe2..4acb6376ed6 100644 --- a/ydb/core/http_proxy/events.h +++ b/ydb/core/http_proxy/events.h @@ -42,6 +42,7 @@ namespace NKikimr::NHttpProxy { EvListEndpointsRequest, EvListEndpointsResponse, EvError, + EvErrorWithIssue, EvCounter, EvHistCounter, EvToken, @@ -139,6 +140,18 @@ namespace NKikimr::NHttpProxy { , Response(response) {} }; + + struct TEvErrorWithIssue : public TEventLocal<TEvErrorWithIssue, EvErrorWithIssue> { + NYdb::EStatus Status; + size_t IssueCode; + TString Response; + + TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, size_t issueCode=0) + : Status(status) + , IssueCode(issueCode) + , Response(response) + {} + }; }; diff --git a/ydb/core/http_proxy/exceptions_mapping.cpp b/ydb/core/http_proxy/exceptions_mapping.cpp new file mode 100644 index 00000000000..5fcbc1d8d60 --- /dev/null +++ b/ydb/core/http_proxy/exceptions_mapping.cpp @@ -0,0 +1,175 @@ +#include "exceptions_mapping.h" + +#include <unordered_map> + + +namespace NKikimr::NHttpProxy { + +enum class EMethodId : ui8 { + CREATE_STREAM = 100, + DELETE_STREAM = 101, + UPDATE_SHARD_COUNT = 102, + UPDATE_STREAM_MODE = 103, + UPDATE_STREAM = 104, + SET_WRITE_QUOTA = 105, + SET_STREAM_RETENTION_PERIOD = 106, + DESCRIBE_STREAM = 107, + LIST_STREAMS = 108, + LIST_STREAM_CONSUMERS = 109, + REGISTER_STREAM_CONSUMER = 110, + DEREGISTER_STREAM_CONSUMER = 111, + GET_SHARD_ITERATOR = 112, + GET_RECORDS = 113, + LIST_SHARDS = 114, + DESCRIBE_STREAM_SUMMARY = 115 +}; + +static std::unordered_map<TString, EMethodId> getEMethodId = { + {"CreateStream", EMethodId::CREATE_STREAM}, + {"DeleteStream", EMethodId::DELETE_STREAM}, + {"UpdateShardCount", EMethodId::UPDATE_SHARD_COUNT}, + {"UpdateStreamMode", EMethodId::UPDATE_STREAM_MODE}, + {"UpdateStream", EMethodId::UPDATE_STREAM}, + {"SetWriteQuota", EMethodId::SET_WRITE_QUOTA}, + {"SetStreamRetentionPeriod", EMethodId::SET_STREAM_RETENTION_PERIOD}, + {"DescribeStream", EMethodId::DESCRIBE_STREAM}, + {"ListStreams", EMethodId::LIST_STREAMS}, + {"ListStreamConsumers", EMethodId::LIST_STREAM_CONSUMERS}, + {"RegisterStreamConsumer", EMethodId::REGISTER_STREAM_CONSUMER}, + {"DeregisterStreamConsumer", EMethodId::DEREGISTER_STREAM_CONSUMER}, + {"GetShardIterator", EMethodId::GET_SHARD_ITERATOR}, + {"GetRecords", EMethodId::GET_RECORDS}, + {"ListShards", EMethodId::LIST_SHARDS}, + {"DescribeStreamSummary", EMethodId::DESCRIBE_STREAM_SUMMARY} +}; + + +TException AlreadyExistsExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + Y_UNUSED(issueCode); + return TException("ResourceInUseException", HTTP_BAD_REQUEST); +} + +TException BadRequestExceptions(const TString& method, NYds::EErrorCodes issueCode) { + EMethodId Method = getEMethodId[method]; + + switch (issueCode) { + case NYds::EErrorCodes::ACCESS_DENIED: + return TException("AccessDeniedException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::INVALID_ARGUMENT: + return TException("InvalidArgumentException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::VALIDATION_ERROR: + return TException("ValidationException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::MISSING_PARAMETER: + return TException("MissingParameter", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::IN_USE: + return TException("ResourceInUseException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::GENERIC_ERROR: + switch (Method) { + case EMethodId::LIST_STREAMS: + return TException("ValidationException", HTTP_BAD_REQUEST); + case EMethodId::DESCRIBE_STREAM: + return TException("ResourceNotFoundException", HTTP_BAD_REQUEST); + case EMethodId::CREATE_STREAM: + return TException("LimitExceededException", HTTP_BAD_REQUEST); + default: + return TException("ValidationException", HTTP_BAD_REQUEST); + } + case NYds::EErrorCodes::EXPIRED_TOKEN: + return TException("ExpiredNextTokenException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::EXPIRED_ITERATOR: + return TException("ExpiredIteratorException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::NOT_FOUND: + return TException("ResourceNotFoundException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::BAD_REQUEST: + return TException("ValidationException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::INCOMPLETE_SIGNATURE: + return TException("IncompleteSignature", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION: + return TException("InvalidParameterCombination", HTTP_BAD_REQUEST); + default: + return TException("UnknownError", HTTP_BAD_REQUEST); + } +} + +TException GenericErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(issueCode); + EMethodId Method = getEMethodId[method]; + switch (Method) { + case EMethodId::CREATE_STREAM: + return TException("LimitExceededException", HTTP_BAD_REQUEST); + default: + return TException("GenericError", HTTP_BAD_REQUEST); + } +} + +TException InternalErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + Y_UNUSED(issueCode); + return TException("InternalError", HTTP_INTERNAL_SERVER_ERROR); +} + +TException NotFoundExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + switch (issueCode) { + case NYds::EErrorCodes::INVALID_ARGUMENT: + return TException("InvalidArgumentException", HTTP_BAD_REQUEST); + default: + return TException("ResourceNotFoundException", HTTP_BAD_REQUEST); + } +} + +TException OverloadedExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + switch (issueCode) { + case NYds::EErrorCodes::ERROR: + return TException("ResourceInUseException", HTTP_BAD_REQUEST); + default: + return TException("ThrottlingException", HTTP_BAD_REQUEST); + } +} + +TException PreconditionFailedExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + Y_UNUSED(issueCode); + return TException("ValidationException", HTTP_BAD_REQUEST); +} + +TException SchemeErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + switch (issueCode) { + case NYds::EErrorCodes::INVALID_ARGUMENT: + return TException("InvalidArgumentException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::VALIDATION_ERROR: + return TException("ValidationException", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::NOT_FOUND: + case NYds::EErrorCodes::ACCESS_DENIED: + return TException("ResourceNotFoundException", HTTP_BAD_REQUEST); + default: + return TException("InvalidArgumentException", HTTP_BAD_REQUEST); + } +} + +TException UnauthorizedExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + switch (issueCode) { + case NYds::EErrorCodes::INCOMPLETE_SIGNATURE: + return TException("IncompleteSignature", HTTP_BAD_REQUEST); + case NYds::EErrorCodes::MISSING_AUTHENTICATION_TOKEN: + return TException("MissingAuthenticationToken", HTTP_BAD_REQUEST); + default: + return TException("AccessDeniedException", HTTP_BAD_REQUEST); + } +} + +TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueCode) { + Y_UNUSED(method); + switch (issueCode) { + case NYds::EErrorCodes::MISSING_ACTION: + return TException("MissingAction", HTTP_BAD_REQUEST); + default: + return TException("InvalidAction", HTTP_BAD_REQUEST); + } +} + +} diff --git a/ydb/core/http_proxy/exceptions_mapping.h b/ydb/core/http_proxy/exceptions_mapping.h new file mode 100644 index 00000000000..93c3f389530 --- /dev/null +++ b/ydb/core/http_proxy/exceptions_mapping.h @@ -0,0 +1,24 @@ +#pragma once + +#include <ydb/services/datastreams/datastreams_codes.h> + +#include <library/cpp/http/misc/httpcodes.h> + + +using TException = std::pair<TString, HttpCodes>; + + +namespace NKikimr::NHttpProxy { + + TException AlreadyExistsExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException BadRequestExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException GenericErrorExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException InternalErrorExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException NotFoundExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException OverloadedExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException PreconditionFailedExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException SchemeErrorExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException UnauthorizedExceptions(const TString& method, NYds::EErrorCodes issueCode); + TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueCode); + +}
\ No newline at end of file diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 339cc354604..68e03b198b3 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -3,6 +3,7 @@ #include "http_req.h" #include "json_proto_conversion.h" #include "custom_metrics.h" +#include "exceptions_mapping.h" #include <library/cpp/actors/http/http_proxy.h> #include <library/cpp/cgiparam/cgiparam.h> @@ -60,85 +61,50 @@ namespace NKikimr::NHttpProxy { using namespace Ydb::DataStreams::V1; using namespace NYdb::NDataStreams::V1; - TString StatusToErrorType(NYdb::EStatus status) { + TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode = ISSUE_CODE_ERROR) { + auto IssueCode = static_cast<NYds::EErrorCodes>(issueCode); + switch(status) { case NYdb::EStatus::SUCCESS: - return "OK"; + return TException("", HTTP_OK); case NYdb::EStatus::BAD_REQUEST: - return "InvalidParameterValueException"; //TODO: bring here issues and parse from them - case NYdb::EStatus::CLIENT_UNAUTHENTICATED: + return BadRequestExceptions(method, IssueCode); case NYdb::EStatus::UNAUTHORIZED: - return "AccessDeniedException"; + return UnauthorizedExceptions(method, IssueCode); case NYdb::EStatus::INTERNAL_ERROR: - return "InternalFailureException"; - case NYdb::EStatus::ABORTED: - return "RequestExpiredException"; //TODO: find better code - case NYdb::EStatus::UNAVAILABLE: - return "ServiceUnavailableException"; + return InternalErrorExceptions(method, IssueCode); case NYdb::EStatus::OVERLOADED: - return "ThrottlingException"; - case NYdb::EStatus::SCHEME_ERROR: - return "ResourceNotFoundException"; + return OverloadedExceptions(method, IssueCode); case NYdb::EStatus::GENERIC_ERROR: - return "InternalFailureException"; //TODO: find better code - case NYdb::EStatus::TIMEOUT: - return "RequestTimeoutException"; - case NYdb::EStatus::BAD_SESSION: - return "AccessDeniedException"; + return GenericErrorExceptions(method, IssueCode); case NYdb::EStatus::PRECONDITION_FAILED: + return PreconditionFailedExceptions(method, IssueCode); case NYdb::EStatus::ALREADY_EXISTS: - return "ValidationErrorException"; //TODO: find better code + return AlreadyExistsExceptions(method, IssueCode); + case NYdb::EStatus::SCHEME_ERROR: + return SchemeErrorExceptions(method, IssueCode); case NYdb::EStatus::NOT_FOUND: - return "ResourceNotFoundException"; - case NYdb::EStatus::SESSION_EXPIRED: - return "AccessDeniedException"; + return NotFoundExceptions(method, IssueCode); case NYdb::EStatus::UNSUPPORTED: - return "InvalidActionException"; - default: - return "InternalFailureException"; - } - - } - - HttpCodes StatusToHttpCode(NYdb::EStatus status) { - switch(status) { - case NYdb::EStatus::SUCCESS: - return HTTP_OK; - case NYdb::EStatus::UNSUPPORTED: - case NYdb::EStatus::BAD_REQUEST: - return HTTP_BAD_REQUEST; + return UnsupportedExceptions(method, IssueCode); case NYdb::EStatus::CLIENT_UNAUTHENTICATED: - case NYdb::EStatus::UNAUTHORIZED: - return HTTP_FORBIDDEN; - case NYdb::EStatus::INTERNAL_ERROR: - return HTTP_INTERNAL_SERVER_ERROR; + return TException("Unauthenticated", HTTP_BAD_REQUEST); case NYdb::EStatus::ABORTED: - return HTTP_CONFLICT; + return TException("Aborted", HTTP_BAD_REQUEST); case NYdb::EStatus::UNAVAILABLE: - return HTTP_SERVICE_UNAVAILABLE; - case NYdb::EStatus::OVERLOADED: - return HTTP_BAD_REQUEST; - case NYdb::EStatus::SCHEME_ERROR: - return HTTP_NOT_FOUND; - case NYdb::EStatus::GENERIC_ERROR: - return HTTP_BAD_REQUEST; + return TException("Unavailable", HTTP_BAD_REQUEST); case NYdb::EStatus::TIMEOUT: - return HTTP_GATEWAY_TIME_OUT; + return TException("RequestExpired", HTTP_BAD_REQUEST); case NYdb::EStatus::BAD_SESSION: - return HTTP_UNAUTHORIZED; - case NYdb::EStatus::PRECONDITION_FAILED: - return HTTP_PRECONDITION_FAILED; - case NYdb::EStatus::ALREADY_EXISTS: - return HTTP_CONFLICT; - case NYdb::EStatus::NOT_FOUND: - return HTTP_NOT_FOUND; + return TException("BadSession", HTTP_BAD_REQUEST); case NYdb::EStatus::SESSION_EXPIRED: - return HTTP_UNAUTHORIZED; + return TException("SessionExpired", HTTP_BAD_REQUEST); default: - return HTTP_INTERNAL_SERVER_ERROR; + return TException("InternalException", HTTP_INTERNAL_SERVER_ERROR); } } + template<class TProto> TString ExtractStreamNameWithoutProtoField(const TProto& req) { @@ -266,6 +232,7 @@ namespace NKikimr::NHttpProxy { HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady); HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle); HFunc(TEvServerlessProxy::TEvError, HandleError); + HFunc(TEvServerlessProxy::TEvErrorWithIssue, HandleErrorWithIssue); HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse); HFunc(TEvServerlessProxy::TEvToken, HandleToken); default: @@ -367,6 +334,7 @@ namespace NKikimr::NHttpProxy { auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); if (response.IsSuccess()) { result->Message = MakeHolder<TProtoResult>(response.GetResult()); + } result->Status = MakeHolder<NYdb::TStatus>(response); actorSystem->Send(actorId, result.Release()); @@ -393,7 +361,11 @@ namespace NKikimr::NHttpProxy { ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response); } - void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) { + void HandleErrorWithIssue(TEvServerlessProxy::TEvErrorWithIssue::TPtr& ev, const TActorContext& ctx) { + ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response, ev->Get()->IssueCode); + } + + void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) { /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, @@ -402,26 +374,12 @@ namespace NKikimr::NHttpProxy { {"folder", HttpContext.FolderId}, {"database", HttpContext.DatabaseId}, {"stream", HttpContext.StreamName}, - {"code", TStringBuilder() << (int)StatusToHttpCode(status)}, + {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, {"name", "api.http.errors_per_second"}} }); - - ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{ - 1, true, true, - {{"database", HttpContext.DatabaseName}, - {"method", Method}, - {"cloud_id", HttpContext.CloudId}, - {"folder_id", HttpContext.FolderId}, - {"database_id", HttpContext.DatabaseId}, - {"topic", HttpContext.StreamName}, - {"code", TStringBuilder() << (int)StatusToHttpCode(status)}, - {"name", "api.http.data_streams.response.count"}} - }); - //TODO: add api.http.response.count HttpContext.ResponseData.Status = status; HttpContext.ResponseData.ErrorText = errorText; - HttpContext.DoReply(ctx); + HttpContext.DoReply(ctx, issueCode); ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); @@ -459,7 +417,7 @@ namespace NKikimr::NHttpProxy { return; } - return ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Message); + ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Message); } void ReportLatencyCounters(const TActorContext& ctx) { @@ -491,7 +449,7 @@ namespace NKikimr::NHttpProxy { ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, - {{"database", HttpContext.DatabaseName}, + {{"database", HttpContext.DatabaseName}, {"method", Method}, {"cloud_id", HttpContext.CloudId}, {"folder_id", HttpContext.FolderId}, @@ -517,7 +475,11 @@ namespace NKikimr::NHttpProxy { TStringOutput stringOutput(errorText); ev->Get()->Status->GetIssues().PrintTo(stringOutput); RetryCounter.Void(); - return ReplyWithError(ctx, ev->Get()->Status->GetStatus(), errorText); + auto issues = ev->Get()->Status->GetIssues(); + size_t issueCode = ( + issues && issues.begin()->IssueCode != ISSUE_CODE_OK + ) ? issues.begin()->IssueCode : ISSUE_CODE_GENERIC; + return ReplyWithError(ctx, ev->Get()->Status->GetStatus(), errorText, issueCode); } } } @@ -534,12 +496,18 @@ namespace NKikimr::NHttpProxy { StartTime = ctx.Now(); try { HttpContext.RequestBodyToProto(&Request); - } catch (std::exception& e) { + } catch (const NKikimr::NSQS::TSQSException& e) { + NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; + if (e.ErrorClass.ErrorCode == "MissingParameter") + issueCode = NYds::EErrorCodes::MISSING_PARAMETER; + else if (e.ErrorClass.ErrorCode == "InvalidQueryParameter" || e.ErrorClass.ErrorCode == "MalformedQueryString") + issueCode = NYds::EErrorCodes::INVALID_ARGUMENT; + return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(issueCode)); + } catch (const std::exception& e) { LOG_SP_WARN_S(ctx, NKikimrServices::HTTP_PROXY, "got new request with incorrect json from [" << HttpContext.SourceAddress << "] " << "database '" << HttpContext.DatabaseName << "'"); - - return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what()); + return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT)); } if (HttpContext.DatabaseName.empty()) { @@ -645,9 +613,16 @@ namespace NKikimr::NHttpProxy { proc->second->Execute(std::move(context), std::move(signature), ctx); return true; } - context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST; - context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name; - context.DoReply(ctx); + else if (name.empty()) { + context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED; + context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name; + context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION)); + } + else { + context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED; + context.ResponseData.ErrorText = TStringBuilder() << "Missing method name " << name; + context.DoReply(ctx); + } return false; } @@ -700,7 +675,7 @@ namespace NKikimr::NHttpProxy { return signature; } - void THttpRequestContext::DoReply(const TActorContext& ctx) { + void THttpRequestContext::DoReply(const TActorContext& ctx, size_t issueCode) { auto createResponse = [this](const auto& request, TStringBuf status, TStringBuf message, @@ -751,13 +726,14 @@ namespace NKikimr::NHttpProxy { ResponseData.Body.SetType(NJson::JSON_MAP); ResponseData.Body["message"] = ResponseData.ErrorText; - ResponseData.Body["__type"] = StatusToErrorType(ResponseData.Status); + ResponseData.Body["__type"] = MapToException(ResponseData.Status, MethodName, issueCode).first; } + auto [errorName, httpCode] = MapToException(ResponseData.Status, MethodName, issueCode); auto response = createResponse( Request, - TStringBuilder() << (ui32)StatusToHttpCode(ResponseData.Status), - StatusToErrorType(ResponseData.Status), + TStringBuilder() << (ui32)httpCode, + errorName, strByMimeAws(ContentType), ResponseData.DumpBody(ContentType) ); @@ -782,8 +758,8 @@ namespace NKikimr::NHttpProxy { } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_TARGET_HEADER)) { TString requestTarget = TString(header.second); TVector<TString> parts = SplitString(requestTarget, "."); - ApiVersion = parts[0]; - MethodName = parts[1]; + ApiVersion = parts.size() > 0 ? parts[0] : ""; + MethodName = parts.size() > 1 ? parts[1] : ""; } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_CONTENT_TYPE_HEADER)) { ContentType = mimeByStr(header.second); } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_DATE_HEADER)) { @@ -930,7 +906,10 @@ namespace NKikimr::NHttpProxy { void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); if (navigate->ErrorCount) { - return ReplyWithError(ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists"); + return ReplyWithError( + ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists", + NYds::EErrorCodes::NOT_FOUND + ); } Y_VERIFY(navigate->ResultSet.size() == 1); if (navigate->ResultSet.front().PQGroupInfo) { @@ -967,7 +946,9 @@ namespace NKikimr::NHttpProxy { TInstant signedAt; if (!Signature.Get() && IamToken.empty()) { return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED, - "Neither Credentials nor IAM token was provided"); + "Neither Credentials nor IAM token was provided", + NYds::EErrorCodes::INCOMPLETE_SIGNATURE + ); } if (Signature) { bool found = false; @@ -980,12 +961,23 @@ namespace NKikimr::NHttpProxy { if (!found) { return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED, TStringBuilder() << "Wrong service region: got " << Signature->GetRegion() << - " expected " << ServiceConfig.GetHttpConfig().GetYandexCloudServiceRegion(0)); + " expected " << ServiceConfig.GetHttpConfig().GetYandexCloudServiceRegion(0), + NYds::EErrorCodes::INCOMPLETE_SIGNATURE + ); } if (!TInstant::TryParseIso8601(Signature->GetSigningTimestamp(), signedAt)) { return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, - "Failed to parse Signature timestamp"); + "Failed to parse Signature timestamp", + NYds::EErrorCodes::INCOMPLETE_SIGNATURE + ); + } + + if (Signature->GetAccessKeyId().empty()) { + return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED, + "Access key id should be provided", + NYds::EErrorCodes::MISSING_AUTHENTICATION_TOKEN + ); } } @@ -1073,8 +1065,9 @@ namespace NKikimr::NHttpProxy { ctx.Send(MakeIamTokenServiceID(), std::move(request)); } - void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) { - ctx.Send(Sender, new TEvServerlessProxy::TEvError(status, errorText)); + void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, + NYds::EErrorCodes issueCode = NYds::EErrorCodes::GENERIC_ERROR) { + ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, static_cast<size_t>(issueCode))); TBase::Die(ctx); } @@ -1169,3 +1162,10 @@ namespace NKikimr::NHttpProxy { } // namespace NKikimr::NHttpProxy + +template <> +void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p) { + TString s = TStringBuilder() << "NYdb status: " << std::to_string(static_cast<size_t>(p.Status)) << + ". Body: " << NJson::WriteJson(p.Body) << ". Error text: " << p.ErrorText; + o.Write(s.data(), s.length()); +} diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index 8973107348a..60437ff3b2b 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -2,6 +2,8 @@ #include "events.h" +#include <ydb/services/datastreams/datastreams_codes.h> + #include <ydb/core/protos/serverless_proxy_config.pb.h> #include <ydb/core/protos/serverless_proxy_config.pb.h> @@ -15,14 +17,16 @@ #include <library/cpp/json/json_value.h> #include <library/cpp/json/json_reader.h> +#include <util/stream/output.h> #include <util/string/builder.h> +#define ISSUE_CODE_OK 0 +#define ISSUE_CODE_GENERIC 500030 +#define ISSUE_CODE_ERROR 500100 -namespace NKikimr::NHttpProxy { -HttpCodes StatusToHttpCode(NYdb::EStatus status); -TString StatusToErrorType(NYdb::EStatus status); +namespace NKikimr::NHttpProxy { class TRetryCounter { public: @@ -88,7 +92,7 @@ struct THttpRequestContext { } THolder<NKikimr::NSQS::TAwsRequestSignV4> GetSignature(); - void DoReply(const TActorContext& ctx); + void DoReply(const TActorContext& ctx, size_t issueCode = ISSUE_CODE_GENERIC); void ParseHeaders(TStringBuf headers); void RequestBodyToProto(NProtoBuf::Message* request); }; @@ -125,3 +129,6 @@ NActors::IActor* CreateIamAuthActor(const TActorId sender, THttpRequestContext& } // namespace NKinesis::NHttpProxy + +template <> +void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p); diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp index 026c9c73247..a08d764bc55 100644 --- a/ydb/core/http_proxy/http_service.cpp +++ b/ydb/core/http_proxy/http_service.cpp @@ -100,10 +100,10 @@ namespace NKikimr::NHttpProxy { try { auto signature = context.GetSignature(); Processors->Execute(context.MethodName, std::move(context), std::move(signature), ctx); - } catch (NKikimr::NSQS::TSQSException& e) { + } catch (const NKikimr::NSQS::TSQSException& e) { context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST; context.ResponseData.ErrorText = e.what(); - context.DoReply(ctx); + context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED)); return; } } diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h index f2601459ec5..e6c0d6d2119 100644 --- a/ydb/core/http_proxy/json_proto_conversion.h +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -8,6 +8,7 @@ #include <library/cpp/string_utils/base64/base64.h> #include <ydb/library/naming_conventions/naming_conventions.h> #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/library/http_proxy/error/error.h> #include <nlohmann/json.hpp> @@ -139,11 +140,20 @@ inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { Y_ENSURE(depth < 101, "Json depth is > 100"); - Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map"); + Y_ENSURE_EX( + !jsonValue.IsNull(), + NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MISSING_PARAMETER) << + "Top level of json value is not a map" + ); auto* desc = message->GetDescriptor(); auto* reflection = message->GetReflection(); for (const auto& [key, value] : jsonValue.GetMap()) { auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key)); + Y_ENSURE_EX( + fieldDescriptor, + NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::INVALID_QUERY_PARAMETER) << + "Unexpected json key: " << key + ); Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key); auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE; if (fieldDescriptor->options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) { @@ -293,7 +303,11 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { Y_ENSURE(depth < 101, "Json depth is > 100"); - Y_ENSURE(jsonValue.is_object(), "Top level of json value is not a map"); + Y_ENSURE_EX( + !jsonValue.is_null(), + NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MISSING_PARAMETER) << + "Top level of json value is not a map" + ); auto* desc = message->GetDescriptor(); auto* reflection = message->GetReflection(); for (const auto& [key, value] : jsonValue.get<std::unordered_map<std::string, nlohmann::json>>()) { diff --git a/ydb/core/public_http/CMakeLists.darwin.txt b/ydb/core/public_http/CMakeLists.darwin.txt index b674ecc8872..adb69457848 100644 --- a/ydb/core/public_http/CMakeLists.darwin.txt +++ b/ydb/core/public_http/CMakeLists.darwin.txt @@ -28,6 +28,7 @@ target_link_libraries(ydb-core-public_http PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp @@ -55,6 +56,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http.global PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp diff --git a/ydb/core/public_http/CMakeLists.linux-aarch64.txt b/ydb/core/public_http/CMakeLists.linux-aarch64.txt index 790494f5f93..50759814942 100644 --- a/ydb/core/public_http/CMakeLists.linux-aarch64.txt +++ b/ydb/core/public_http/CMakeLists.linux-aarch64.txt @@ -29,6 +29,7 @@ target_link_libraries(ydb-core-public_http PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp @@ -57,6 +58,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http.global PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp diff --git a/ydb/core/public_http/CMakeLists.linux.txt b/ydb/core/public_http/CMakeLists.linux.txt index 790494f5f93..50759814942 100644 --- a/ydb/core/public_http/CMakeLists.linux.txt +++ b/ydb/core/public_http/CMakeLists.linux.txt @@ -29,6 +29,7 @@ target_link_libraries(ydb-core-public_http PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp @@ -57,6 +58,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC core-viewer-json yq-libs-result_formatter yql-public-issue + cpp-client-ydb_types ) target_sources(ydb-core-public_http.global PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp diff --git a/ydb/core/public_http/http_req.cpp b/ydb/core/public_http/http_req.cpp index f98eef0da4c..0444ade1957 100644 --- a/ydb/core/public_http/http_req.cpp +++ b/ydb/core/public_http/http_req.cpp @@ -1,6 +1,7 @@ #include "http_req.h" #include <library/cpp/actors/http/http_proxy.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> #include <ydb/core/http_proxy/http_req.h> #include <util/generic/guid.h> @@ -12,10 +13,91 @@ namespace NKikimr::NPublicHttp { constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type"; constexpr TStringBuf REQUEST_FORWARDED_FOR = "x-forwarded-for"; constexpr TStringBuf IDEMPOTENCY_KEY_HEADER = "idempotency-key"; - + constexpr TStringBuf APPLICATION_JSON = "application/json"; constexpr TStringBuf TEXT_PLAIN_UTF8 = "text/plain;charset=UTF-8"; + + TString StatusToErrorType(NYdb::EStatus status) { + switch(status) { + case NYdb::EStatus::SUCCESS: + return "OK"; + case NYdb::EStatus::BAD_REQUEST: + return "InvalidParameterValueException"; //TODO: bring here issues and parse from them + case NYdb::EStatus::CLIENT_UNAUTHENTICATED: + case NYdb::EStatus::UNAUTHORIZED: + return "AccessDeniedException"; + case NYdb::EStatus::INTERNAL_ERROR: + return "InternalFailureException"; + case NYdb::EStatus::ABORTED: + return "RequestExpiredException"; //TODO: find better code + case NYdb::EStatus::UNAVAILABLE: + return "ServiceUnavailableException"; + case NYdb::EStatus::OVERLOADED: + return "ThrottlingException"; + case NYdb::EStatus::SCHEME_ERROR: + return "ResourceNotFoundException"; + case NYdb::EStatus::GENERIC_ERROR: + return "InternalFailureException"; //TODO: find better code + case NYdb::EStatus::TIMEOUT: + return "RequestTimeoutException"; + case NYdb::EStatus::BAD_SESSION: + return "AccessDeniedException"; + case NYdb::EStatus::PRECONDITION_FAILED: + case NYdb::EStatus::ALREADY_EXISTS: + return "ValidationErrorException"; //TODO: find better code + case NYdb::EStatus::NOT_FOUND: + return "ResourceNotFoundException"; + case NYdb::EStatus::SESSION_EXPIRED: + return "AccessDeniedException"; + case NYdb::EStatus::UNSUPPORTED: + return "InvalidActionException"; + default: + return "InternalFailureException"; + } + + } + + HttpCodes StatusToHttpCode(NYdb::EStatus status) { + switch(status) { + case NYdb::EStatus::SUCCESS: + return HTTP_OK; + case NYdb::EStatus::UNSUPPORTED: + case NYdb::EStatus::BAD_REQUEST: + return HTTP_BAD_REQUEST; + case NYdb::EStatus::CLIENT_UNAUTHENTICATED: + case NYdb::EStatus::UNAUTHORIZED: + return HTTP_FORBIDDEN; + case NYdb::EStatus::INTERNAL_ERROR: + return HTTP_INTERNAL_SERVER_ERROR; + case NYdb::EStatus::ABORTED: + return HTTP_CONFLICT; + case NYdb::EStatus::UNAVAILABLE: + return HTTP_SERVICE_UNAVAILABLE; + case NYdb::EStatus::OVERLOADED: + return HTTP_BAD_REQUEST; + case NYdb::EStatus::SCHEME_ERROR: + return HTTP_NOT_FOUND; + case NYdb::EStatus::GENERIC_ERROR: + return HTTP_BAD_REQUEST; + case NYdb::EStatus::TIMEOUT: + return HTTP_GATEWAY_TIME_OUT; + case NYdb::EStatus::BAD_SESSION: + return HTTP_UNAUTHORIZED; + case NYdb::EStatus::PRECONDITION_FAILED: + return HTTP_PRECONDITION_FAILED; + case NYdb::EStatus::ALREADY_EXISTS: + return HTTP_CONFLICT; + case NYdb::EStatus::NOT_FOUND: + return HTTP_NOT_FOUND; + case NYdb::EStatus::SESSION_EXPIRED: + return HTTP_UNAUTHORIZED; + default: + return HTTP_INTERNAL_SERVER_ERROR; + } + } + + TString GenerateRequestId(const TString& sourceReqId) { if (sourceReqId.empty()) { return CreateGuidAsString(); @@ -98,8 +180,8 @@ namespace NKikimr::NPublicHttp { void THttpRequestContext::DoResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& errorText, TStringBuf contentType) const { const NYdb::EStatus ydbStatus = static_cast<NYdb::EStatus>(status); - const TString httpCodeStr = ToString((int)NKikimr::NHttpProxy::StatusToHttpCode(ydbStatus)); - DoResponse(httpCodeStr, NKikimr::NHttpProxy::StatusToErrorType(ydbStatus), errorText, contentType); + const TString httpCodeStr = ToString((int)StatusToHttpCode(ydbStatus)); + DoResponse(httpCodeStr, StatusToErrorType(ydbStatus), errorText, contentType); } void THttpRequestContext::ResponseOK() const { diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index 0db64f69fed..3620c4799bd 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -44,4 +44,8 @@ enum ErrorCode { SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025; ERROR = 500100; + + INVALID_ARGUMENT = 500040; + VALIDATION_ERROR = 500080; + } diff --git a/ydb/services/datastreams/datastreams_codes.h b/ydb/services/datastreams/datastreams_codes.h new file mode 100644 index 00000000000..f1435a90a83 --- /dev/null +++ b/ydb/services/datastreams/datastreams_codes.h @@ -0,0 +1,35 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/system/types.h> + + +namespace NYds { + +enum class EErrorCodes : size_t { + // Server statuses + OK = 0, // compatible with PersQueue::ErrorCode + + BAD_REQUEST = 500003, // compatible with PersQueue::ErrorCode + ERROR = 500100, // compatible with PersQueue::ErrorCode + ACCESS_DENIED = 500018, // compatible with PersQueue::ErrorCode + + GENERIC_ERROR = 500030, + INVALID_ARGUMENT = 500040, + MISSING_PARAMETER = 500050, + NOT_FOUND = 500060, + IN_USE = 500070, + + VALIDATION_ERROR = 500080, + MISSING_ACTION = 500090, + + INVALID_PARAMETER_COMBINATION = 500110, + + EXPIRED_ITERATOR = 500120, + EXPIRED_TOKEN = 500130, + + INCOMPLETE_SIGNATURE = 500140, + MISSING_AUTHENTICATION_TOKEN = 500150, +}; + +} diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 6f56ad5bb0a..a12a8e1288b 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1,4 +1,5 @@ #include "datastreams_proxy.h" +#include "datastreams_codes.h" #include "put_records_actor.h" #include "shard_iterator.h" #include "next_token.h" @@ -34,7 +35,6 @@ namespace NKikimr::NDataStreams::V1 { using namespace NGRpcService; using namespace NGRpcProxy::V1; - namespace { template <class TRequest> @@ -64,6 +64,7 @@ namespace NKikimr::NDataStreams::V1 { } } + class TCreateStreamActor : public TPQGrpcSchemaBase<TCreateStreamActor, NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest> { using TBase = TPQGrpcSchemaBase<TCreateStreamActor, TEvDataStreamsCreateStreamRequest>; using TProtoRequest = typename TBase::TProtoRequest; @@ -138,13 +139,13 @@ namespace NKikimr::NDataStreams::V1 { topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS); break; default: - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with unknown metering mode", ctx); } } } else { if (GetProtoRequest()->has_stream_mode_details()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with metering mode", ctx); } } @@ -160,11 +161,10 @@ namespace NKikimr::NDataStreams::V1 { pqDescr->SetPartitionPerTablet(1); TString error; - auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error, + TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error, workingDir, proposal.Record.GetDatabaseName()); - - if (status != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + if (codes.YdbCode != Ydb::StatusIds::SUCCESS) { + return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx); } } @@ -175,7 +175,7 @@ namespace NKikimr::NDataStreams::V1 { && msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) { return ReplyWithError(Ydb::StatusIds::ALREADY_EXISTS, - Ydb::PersQueue::ErrorCode::ERROR, + static_cast<size_t>(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " is already exists", ctx); } @@ -245,7 +245,7 @@ namespace NKikimr::NDataStreams::V1 { const auto& readRules = pqGroupDescription.GetPQTabletConfig().GetReadRules(); if (readRules.size() > 0 && EnforceDeletion == false) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream has registered consumers" << "and EnforceConsumerDeletion flag is false", ctx); } @@ -286,7 +286,7 @@ namespace NKikimr::NDataStreams::V1 { TString error; if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error, ctx); } groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); @@ -325,7 +325,7 @@ namespace NKikimr::NDataStreams::V1 { Y_UNUSED(selfInfo); Y_UNUSED(pqGroupDescription); if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with metering mode", ctx); } @@ -337,7 +337,7 @@ namespace NKikimr::NDataStreams::V1 { groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); break; default: - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with unknown metering mode", ctx); } } @@ -379,7 +379,7 @@ namespace NKikimr::NDataStreams::V1 { TString error; if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error, ctx); } groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); @@ -406,7 +406,7 @@ namespace NKikimr::NDataStreams::V1 { if (GetProtoRequest()->has_stream_mode_details()) { if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with metering mode", ctx); } @@ -418,15 +418,17 @@ namespace NKikimr::NDataStreams::V1 { groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); break; default: - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), "streams can't be created with unknown metering mode", ctx); } } auto serviceTypes = GetSupportedClientServiceTypes(ctx); - auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx); + auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); if (status != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : + static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), + error, ctx); } } @@ -472,9 +474,11 @@ namespace NKikimr::NDataStreams::V1 { pqConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB); auto serviceTypes = GetSupportedClientServiceTypes(ctx); - auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx); + auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); if (status != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : + static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), + error, ctx); } } @@ -532,11 +536,12 @@ namespace NKikimr::NDataStreams::V1 { pqConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime); auto serviceTypes = GetSupportedClientServiceTypes(ctx); - status = CheckConfig(*pqConfig, serviceTypes, error, ctx); + status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); } if (status != Ydb::StatusIds::SUCCESS) { - return TBase::ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return TBase::ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) : + static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error, ctx); } } @@ -563,13 +568,13 @@ namespace NKikimr::NDataStreams::V1 { void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } @@ -667,6 +672,7 @@ namespace NKikimr::NDataStreams::V1 { ui32 writeSpeed = pqConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() / 1_KB; auto& description = *result.mutable_stream_description(); description.set_stream_name(GetProtoRequest()->stream_name()); + description.set_stream_arn(GetProtoRequest()->stream_name()); // Added by lpetrov02 for testing ui32 retentionPeriodHours = TInstant::Seconds(pqConfig.GetPartitionConfig().GetLifetimeSeconds()).Hours(); description.set_retention_period_hours(retentionPeriodHours); description.set_write_quota_kb_per_sec(writeSpeed); @@ -727,7 +733,6 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------- - class TListStreamsActor : public TRpcSchemeRequestActor<TListStreamsActor, NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest> { using TBase = TRpcSchemeRequestActor<TListStreamsActor, TEvDataStreamsListStreamsRequest>; @@ -745,9 +750,10 @@ namespace NKikimr::NDataStreams::V1 { void SendPendingRequests(const TActorContext& ctx); void SendResponse(const TActorContext& ctx); - void ReplyWithError(Ydb::StatusIds::StatusCode status, Ydb::PersQueue::ErrorCode::ErrorCode pqStatus, + void ReplyWithError(Ydb::StatusIds::StatusCode status, NYds::EErrorCodes errorCode, const TString& messageText, const NActors::TActorContext& ctx) { - this->Request_->RaiseIssue(FillIssue(messageText, pqStatus)); + + this->Request_->RaiseIssue(FillIssue(messageText, static_cast<size_t>(errorCode))); this->Request_->ReplyWithYdbStatus(status); this->Die(ctx); } @@ -768,13 +774,13 @@ namespace NKikimr::NDataStreams::V1 { void TListStreamsActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); if (!Request_->GetDatabaseName()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, NYds::EErrorCodes::INVALID_ARGUMENT, "Request without dabase is forbiden", ctx); } if (this->Request_->GetInternalToken().empty()) { if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, NYds::EErrorCodes::BAD_REQUEST, "Unauthenticated access is forbidden, please provide credentials", ctx); } } @@ -803,6 +809,7 @@ namespace NKikimr::NDataStreams::V1 { int limit = GetProtoRequest()->limit() == 0 ? 100 : GetProtoRequest()->limit(); if (limit > 10000) { + Request_->RaiseIssue(FillIssue("'Limit' shoud not be higher than 10000", static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR))); Request_->ReplyWithYdbStatus(Ydb::StatusIds::BAD_REQUEST); return Die(ctx); } @@ -935,18 +942,26 @@ namespace NKikimr::NDataStreams::V1 { void TListStreamConsumersActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); + if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_arn().empty()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION), + TStringBuilder() << "StreamArn and NextToken can not be provided together", ctx); + } + if (NextToken.IsExpired()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN), + TStringBuilder() << "Provided NextToken is expired", ctx); + } + if (!NextToken.IsValid()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), + TStringBuilder() << "Provided NextToken is malformed", ctx); + } + auto maxResultsInRange = MIN_MAX_RESULTS <= MaxResults && MaxResults <= MAX_MAX_RESULTS; if (!maxResultsInRange) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), TStringBuilder() << "Requested max_result value '" << MaxResults << "' is out of range [" << MIN_MAX_RESULTS << ", " << MAX_MAX_RESULTS << "]", ctx); } - - if (!NextToken.IsValid()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, - TStringBuilder() << "Provided NextToken has expired or malformed", ctx); - } SendDescribeProposeRequest(ctx); Become(&TListStreamConsumersActor::StateWork); } @@ -974,7 +989,7 @@ namespace NKikimr::NDataStreams::V1 { const auto alreadyRead = NextToken.GetAlreadyRead(); if (alreadyRead > (ui32)streamReadRulesNames.size()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " << "everything is already read", ctx); } @@ -1065,15 +1080,22 @@ namespace NKikimr::NDataStreams::V1 { readRule.set_version(selfInfo.GetVersion().GetPQVersion()); } auto serviceTypes = GetSupportedClientServiceTypes(ctx); - TString error = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx); - auto status = error.empty() ? CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS) - : Ydb::StatusIds::BAD_REQUEST; + auto messageAndCode = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx); + size_t issueCode = static_cast<size_t>(messageAndCode.PQCode); + + Ydb::StatusIds::StatusCode status; + if (messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK) { + status = CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS); + if (status == Ydb::StatusIds::ALREADY_EXISTS) { + issueCode = static_cast<size_t>(NYds::EErrorCodes::IN_USE); + } + } else { + status = Ydb::StatusIds::BAD_REQUEST; + } + if (status != Ydb::StatusIds::SUCCESS) { - return ReplyWithError(status, - status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK - : Ydb::PersQueue::ErrorCode::BAD_REQUEST, - error, ctx); + return ReplyWithError(status, issueCode, messageAndCode.Message, ctx); } } @@ -1136,7 +1158,7 @@ namespace NKikimr::NDataStreams::V1 { ctx ); if (!error.Empty()) { - return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); + return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error, ctx); } } @@ -1188,7 +1210,7 @@ namespace NKikimr::NDataStreams::V1 { case TIteratorType::AT_SEQUENCE_NUMBER: { auto sn = SequenceNumberToInt(GetProtoRequest()->starting_sequence_number()); if (!sn) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Malformed sequence number", ctx); } SequenceNumber = sn.value() + (IteratorType == TIteratorType::AFTER_SEQUENCE_NUMBER ? 1u : 0u); @@ -1197,7 +1219,7 @@ namespace NKikimr::NDataStreams::V1 { case TIteratorType::AT_TIMESTAMP: if (GetProtoRequest()->timestamp() == 0 || GetProtoRequest()->timestamp() > static_cast<i64>(TInstant::Now().MilliSeconds())) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " << "but timestamp is either missed or too old or in future", ctx); } @@ -1210,7 +1232,7 @@ namespace NKikimr::NDataStreams::V1 { ReadTimestampMs = TInstant::Now().MilliSeconds(); break; default: - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard iterator type '" << (ui32)IteratorType << "' is not known", ctx); @@ -1240,7 +1262,7 @@ namespace NKikimr::NDataStreams::V1 { if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, token)) { return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, - Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED), TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " @@ -1262,7 +1284,7 @@ namespace NKikimr::NDataStreams::V1 { } } - ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), TStringBuilder() << "No such shard: " << ShardId, ctx); } @@ -1335,14 +1357,18 @@ namespace NKikimr::NDataStreams::V1 { void TGetRecordsActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); + if (ShardIterator.IsExpired()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_ITERATOR), + TStringBuilder() << "Provided shard iterator is expired", ctx); + } if (!ShardIterator.IsValid()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, - TStringBuilder() << "Provided shard iterator is malformed or expired", ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), + TStringBuilder() << "Provided shard iterator is malformed", ctx); } Limit = Limit == 0 ? MAX_LIMIT : Limit; if (Limit < 1 || Limit > MAX_LIMIT) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx); } @@ -1402,7 +1428,7 @@ namespace NKikimr::NDataStreams::V1 { if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, token)) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, - Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED), TStringBuilder() << "Access to stream " << ShardIterator.GetStreamName() << " is denied for subject " @@ -1423,7 +1449,7 @@ namespace NKikimr::NDataStreams::V1 { } } - ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ctx); } @@ -1445,7 +1471,7 @@ namespace NKikimr::NDataStreams::V1 { return; default: return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()), - Ydb::PersQueue::ErrorCode::ERROR, + static_cast<size_t>(NYds::EErrorCodes::ERROR), record.GetErrorReason(), ctx); } break; @@ -1490,13 +1516,13 @@ namespace NKikimr::NDataStreams::V1 { void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } } void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } @@ -1594,22 +1620,35 @@ namespace NKikimr::NDataStreams::V1 { void TListShardsActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); + if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_name().empty()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION), + TStringBuilder() << "StreamName and NextToken can not be provided together", ctx); + } + if (NextToken.IsExpired()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN), + TStringBuilder() << "Provided next token is expired", ctx); + } + if (!NextToken.IsValid()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), + TStringBuilder() << "Provided next token is malformed", ctx); + } + if (!TShardFilter::ShardFilterType_IsValid(ShardFilter.type())) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Shard filter '" << (ui32)ShardFilter.type() << "' is not known", ctx); } MaxResults = MaxResults == 0 ? DEFAULT_MAX_RESULTS : MaxResults; if (MaxResults > MAX_MAX_RESULTS) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), TStringBuilder() << "Max results '" << MaxResults << "' is out of bound [" << MIN_MAX_RESULTS << "; " << MAX_MAX_RESULTS << "]", ctx); } if (ShardFilter.type() == TShardFilter::AFTER_SHARD_ID && ShardFilter.shard_id() == "") { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::MISSING_PARAMETER), TStringBuilder() << "Shard filter type is AFTER_SHARD_ID," << " but no ShardId provided", ctx); } @@ -1640,7 +1679,7 @@ namespace NKikimr::NDataStreams::V1 { if (!topicInfo.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, token)) { return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, - Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED), TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " @@ -1699,7 +1738,7 @@ namespace NKikimr::NDataStreams::V1 { const auto alreadyRead = NextToken.GetAlreadyRead(); if (alreadyRead > (ui32)partitions.size()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR, + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " "everything is already read", ctx); } @@ -1759,13 +1798,13 @@ namespace NKikimr::NDataStreams::V1 { void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } } void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR, + ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR), TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } @@ -1906,7 +1945,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); - this->Request_->RaiseIssue(FillIssue("Method is not implemented yet", Ydb::PersQueue::ErrorCode::ErrorCode::ERROR)); + this->Request_->RaiseIssue(FillIssue("Method is not implemented yet", static_cast<size_t>(NYds::EErrorCodes::ERROR))); this->Request_->ReplyWithYdbStatus(Ydb::StatusIds::UNSUPPORTED); this->Die(ctx); } diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h index 7a3791a4d0d..a6d186cab2b 100644 --- a/ydb/services/datastreams/datastreams_proxy.h +++ b/ydb/services/datastreams/datastreams_proxy.h @@ -11,6 +11,7 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> + namespace NKikimr { namespace NGRpcService { diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 05be3683946..c7899e0750e 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -253,6 +253,7 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_status(), YDS_V1::StreamDescription::ACTIVE); UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_name(), streamName); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_arn(), streamName); UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().write_quota_kb_per_sec(), 1_KB); UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().retention_period_hours(), 24); @@ -1782,7 +1783,7 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_UNEQUAL(result.GetResult().next_token().size(), 0); auto nextToken = result.GetResult().next_token(); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); @@ -1813,26 +1814,26 @@ Y_UNIT_TEST_SUITE(DataStreams) { }; auto nextToken = makeNextToken(TInstant::Now().MilliSeconds() - 300001); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); nextToken = makeNextToken(TInstant::Now().MilliSeconds() + 1000); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); nextToken = makeNextToken(0); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken("some_garbage")).ExtractValueSync(); - result = testServer.DataStreamsClient->ListStreamConsumers(streamName, + result = testServer.DataStreamsClient->ListStreamConsumers("", NYDS_V1::TListStreamConsumersSettings().NextToken("some_garbage")).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); @@ -2511,7 +2512,8 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); } -/* { //TODO: datastreams api uses only one retention parameter + /* + { //TODO: datastreams api uses only one retention parameter auto result = testServer.DataStreamsClient->CreateStream(streamName, NYDS_V1::TCreateStreamSettings().ShardCount(shardCount) .RetentionStorageMegabytes(55_KB).RetentionPeriodHours(8 * 24)).ExtractValueSync(); @@ -2612,4 +2614,18 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 0); } } + + Y_UNIT_TEST(ListStreamsValidation) { + TInsecureDatastreamsTestServer testServer; + + { + auto result = testServer.DataStreamsClient->ListStreams( + NYdb::NDataStreams::V1::TListStreamsSettings().Limit(1000000000).Recurse(false) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + } + } } diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index f080b3f99a2..9de74a9c836 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -11,6 +11,7 @@ #include <library/cpp/digest/md5/md5.h> + namespace NKikimr::NGRpcProxy::V1 { constexpr TStringBuf GRPCS_ENDPOINT_PREFIX = "grpcs://"; @@ -65,7 +66,7 @@ namespace NKikimr::NGRpcProxy::V1 { return ""; } - TString AddReadRuleToConfig( + TMsgPqCodes AddReadRuleToConfig( NKikimrPQ::TPQTabletConfig* config, const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr, const TClientServiceTypes& supportedClientServiceTypes, @@ -73,66 +74,96 @@ namespace NKikimr::NGRpcProxy::V1 { ) { auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), ctx); + if (consumerName.empty()) { + return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); + } if(consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) { - return TStringBuilder() << "consumer '" << rr.consumer_name() << "' has illegal symbols"; + return TMsgPqCodes( + TStringBuilder() << "consumer '" << rr.consumer_name() << "' has illegal symbols", + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT + ); } { TString migrationError = ReadRuleServiceTypeMigration(config, ctx); if (migrationError) { - return migrationError; + return TMsgPqCodes(migrationError, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } } config->AddReadRules(consumerName); if (rr.starting_message_timestamp_ms() < 0) { - return TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.starting_message_timestamp_ms(); + return TMsgPqCodes( + TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.starting_message_timestamp_ms(), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) { - return TStringBuilder() << "Unknown format version with value " << (int)rr.supported_format() << " for " << rr.consumer_name(); + return TMsgPqCodes( + TStringBuilder() << "Unknown format version with value " << (int)rr.supported_format() << " for " << rr.consumer_name(), + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT + ); } config->AddConsumerFormatVersions(rr.supported_format() - 1); if (rr.version() < 0) { - return TStringBuilder() << "version in read_rule can't be negative, provided " << rr.version(); + return TMsgPqCodes( + TStringBuilder() << "version in read_rule can't be negative, provided " << rr.version(), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } config->AddReadRuleVersions(rr.version()); auto ct = config->AddConsumerCodecs(); if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) { - return TStringBuilder() << "supported_codecs count cannot be more than " - << MAX_SUPPORTED_CODECS_COUNT << ", provided " << rr.supported_codecs().size(); + return TMsgPqCodes( + TStringBuilder() << "supported_codecs count cannot be more than " + << MAX_SUPPORTED_CODECS_COUNT << ", provided " << rr.supported_codecs().size(), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } for (const auto& codec : rr.supported_codecs()) { if (!Ydb::PersQueue::V1::Codec_IsValid(codec) || codec == 0) - return TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name(); + return TMsgPqCodes( + TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name(), + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT + ); ct->AddIds(codec - 1); ct->AddCodecs(to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6)); } if (rr.important()) { if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { - return TStringBuilder() << "important flag is forbiden for consumer " << rr.consumer_name(); + return TMsgPqCodes( + TStringBuilder() << "important flag is forbiden for consumer " << rr.consumer_name(), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } config->MutablePartitionConfig()->AddImportantClientId(consumerName); } if (!rr.service_type().empty()) { if (!supportedClientServiceTypes.contains(rr.service_type())) { - return TStringBuilder() << "Unknown read rule service type '" << rr.service_type() - << "' for consumer '" << rr.consumer_name() << "'"; + return TMsgPqCodes( + TStringBuilder() << "Unknown read rule service type '" << rr.service_type() + << "' for consumer '" << rr.consumer_name() << "'", + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT + ); } config->AddReadRuleServiceTypes(rr.service_type()); } else { const auto& pqConfig = AppData(ctx)->PQConfig; if (pqConfig.GetDisallowDefaultClientServiceType()) { - return TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'"; + return TMsgPqCodes( + TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'", + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); config->AddReadRuleServiceTypes(defaultCientServiceType); } - return ""; + return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -151,7 +182,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - TString AddReadRuleToConfig( + TMsgPqCodes AddReadRuleToConfig( NKikimrPQ::TPQTabletConfig* config, const Ydb::Topic::Consumer& rr, const TClientServiceTypes& supportedClientServiceTypes, @@ -160,22 +191,25 @@ namespace NKikimr::NGRpcProxy::V1 { ) { auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), ctx); if (consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) { - return TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols"; + return TMsgPqCodes(TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } if (consumerName.empty()) { - return TStringBuilder() << "consumer with empty name is forbidden"; + return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } { TString migrationError = ReadRuleServiceTypeMigration(config, ctx); if (migrationError) { - return migrationError; + return TMsgPqCodes(migrationError, migrationError.empty() ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); //find better issueCode } } config->AddReadRules(consumerName); if (rr.read_from().seconds() < 0) { - return TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.read_from().seconds(); + return TMsgPqCodes( + TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.read_from().seconds(), + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); @@ -196,13 +230,16 @@ namespace NKikimr::NGRpcProxy::V1 { if (!pair.second.empty()) version = FromString<ui32>(pair.second); } catch(...) { - return TStringBuilder() << "Attribute for consumer '" << rr.name() << "' _version is " << pair.second << ", which is not ui32"; + return TMsgPqCodes( + TStringBuilder() << "Attribute for consumer '" << rr.name() << "' _version is " << pair.second << ", which is not ui32", + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR + ); } } else if (pair.first == "_service_type") { if (!pair.second.empty()) { if (!supportedClientServiceTypes.contains(pair.second)) { - return TStringBuilder() << "Unknown _service_type '" << pair.second - << "' for consumer '" << rr.name() << "'"; + return TMsgPqCodes(TStringBuilder() << "Unknown _service_type '" << pair.second + << "' for consumer '" << rr.name() << "'", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } serviceType = pair.second; } @@ -213,7 +250,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } if (serviceType.empty()) { - return TStringBuilder() << "service type cannot be empty for consumer '" << rr.name() << "'"; + return TMsgPqCodes(TStringBuilder() << "service type cannot be empty for consumer '" << rr.name() << "'", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } Y_VERIFY(supportedClientServiceTypes.find(serviceType) != supportedClientServiceTypes.end()); @@ -229,10 +266,10 @@ namespace NKikimr::NGRpcProxy::V1 { } if (!found) { if (hasPassword) { - return "incorrect client service type password"; + return TMsgPqCodes("incorrect client service type password", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } if (AppData(ctx)->PQConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required - return "no client service type password provided"; + return TMsgPqCodes("no client service type password provided", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } } } @@ -244,7 +281,10 @@ namespace NKikimr::NGRpcProxy::V1 { for(const auto& codec : rr.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { - return TStringBuilder() << "Unknown codec for consumer '" << rr.name() << "' with value " << codec; + return TMsgPqCodes( + TStringBuilder() << "Unknown codec for consumer '" << rr.name() << "' with value " << codec, + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT + ); } ct->AddIds(codec - 1); ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); @@ -252,12 +292,12 @@ namespace NKikimr::NGRpcProxy::V1 { if (rr.important()) { if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { - return TStringBuilder() << "important flag is forbiden for consumer " << rr.name(); + return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } config->MutablePartitionConfig()->AddImportantClientId(consumerName); } - return ""; + return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -433,10 +473,15 @@ namespace NKikimr::NGRpcProxy::V1 { return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? dubsStatus : Ydb::StatusIds::BAD_REQUEST); } - NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode) { + NYql::TIssue FillIssue(const TString& errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode) { NYql::TIssue res(NYql::TPosition(), errorReason); res.SetCode(errorCode, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR); + return res; + } + NYql::TIssue FillIssue(const TString& errorReason, const size_t errorCode) { + NYql::TIssue res(NYql::TPosition(), errorReason); + res.SetCode(errorCode, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR); return res; } @@ -752,8 +797,9 @@ namespace NKikimr::NGRpcProxy::V1 { } const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); for (const auto& rr : settings.read_rules()) { - error = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, ctx); - if (!error.Empty()) { + auto messageAndCode = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, ctx); + if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { + error = messageAndCode.Message; return Ydb::StatusIds::BAD_REQUEST; } } @@ -879,7 +925,7 @@ namespace NKikimr::NGRpcProxy::V1 { return true; } - Ydb::StatusIds::StatusCode FillProposeRequestImpl( + TYdbPqCodes FillProposeRequestImpl( const TString& name, const Ydb::Topic::CreateTopicRequest& request, NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx, TString& error, const TString& path, const TString& database, const TString& localDc @@ -894,7 +940,7 @@ namespace NKikimr::NGRpcProxy::V1 { if (request.has_partitioning_settings()) { if (request.partitioning_settings().min_active_partitions() < 0) { error = TStringBuilder() << "Partitions count must be positive, provided " << request.partitioning_settings().min_active_partitions(); - return Ydb::StatusIds::BAD_REQUEST; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } parts = request.partitioning_settings().min_active_partitions(); if (parts == 0) parts = 1; @@ -916,7 +962,7 @@ namespace NKikimr::NGRpcProxy::V1 { auto res = ProcessAttributes(request.attributes(), pqDescr, error, false); if (res != Ydb::StatusIds::SUCCESS) { - return res; + return TYdbPqCodes(res, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } bool local = true; // TODO: check here cluster; @@ -930,7 +976,7 @@ namespace NKikimr::NGRpcProxy::V1 { if (!converter->IsValid()) { error = TStringBuilder() << "Bad topic: " << converter->GetReason(); - return Ydb::StatusIds::BAD_REQUEST; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } config->SetLocalDC(local); config->SetDC(converter->GetCluster()); @@ -951,6 +997,11 @@ namespace NKikimr::NGRpcProxy::V1 { partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles); } if (request.has_retention_period()) { + if (request.retention_period().seconds() <= 0) { + error = TStringBuilder() << "retention_period must be not negative, provided " << + request.retention_period().DebugString(); + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); + } partConfig->SetLifetimeSeconds(request.retention_period().seconds()); } else { partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds()); @@ -979,7 +1030,7 @@ namespace NKikimr::NGRpcProxy::V1 { for(const auto& codec : request.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { error = TStringBuilder() << "Unknown codec with value " << codec; - return Ydb::StatusIds::BAD_REQUEST; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } ct->AddIds(codec - 1); ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); @@ -988,32 +1039,33 @@ namespace NKikimr::NGRpcProxy::V1 { if (request.consumers_size() > MAX_READ_RULES_COUNT) { error = TStringBuilder() << "consumers count cannot be more than " << MAX_READ_RULES_COUNT << ", provided " << request.consumers_size(); - return Ydb::StatusIds::BAD_REQUEST; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } { error = ReadRuleServiceTypeMigration(config, ctx); if (error) { - return Ydb::StatusIds::INTERNAL_ERROR; + return TYdbPqCodes(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } } Ydb::StatusIds::StatusCode code; if (!FillMeteringMode(request.metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), false, code, error)) { - return code; + return TYdbPqCodes(code, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx); for (const auto& rr : request.consumers()) { - error = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, true, ctx); - if (!error.Empty()) { - return Ydb::StatusIds::BAD_REQUEST; + auto messageAndCode = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, true, ctx); + if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { + error = messageAndCode.Message; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, messageAndCode.PQCode); } } - return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST); + return TYdbPqCodes(CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST), Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } Ydb::StatusIds::StatusCode FillProposeRequestImpl( @@ -1183,8 +1235,9 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleVersions(); for (const auto& rr : consumers) { - error = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx); - if (!error.Empty()) { + auto messageAndCode = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx); + if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) { + error = messageAndCode.Message; return Ydb::StatusIds::BAD_REQUEST; } } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 08fe0a71dbf..d58f0da55b0 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -10,6 +10,25 @@ #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/hfunc.h> + +struct TMsgPqCodes { + TString Message; + Ydb::PersQueue::ErrorCode::ErrorCode PQCode; + + TMsgPqCodes(TString const& message, Ydb::PersQueue::ErrorCode::ErrorCode pqCode) + : Message(message), PQCode(pqCode) {} +}; + +struct TYdbPqCodes { + Ydb::StatusIds::StatusCode YdbCode; + Ydb::PersQueue::ErrorCode::ErrorCode PQCode; + + TYdbPqCodes(Ydb::StatusIds::StatusCode YdbCode, Ydb::PersQueue::ErrorCode::ErrorCode PQCode) + : YdbCode(YdbCode), + PQCode(PQCode) {} +}; + + namespace NKikimr::NGRpcProxy::V1 { Ydb::StatusIds::StatusCode FillProposeRequestImpl( @@ -24,7 +43,7 @@ namespace NKikimr::NGRpcProxy::V1 { const TString& localDc = TString() ); - Ydb::StatusIds::StatusCode FillProposeRequestImpl( + TYdbPqCodes FillProposeRequestImpl( const TString& name, const Ydb::Topic::CreateTopicRequest& request, NKikimrSchemeOp::TModifyScheme& modifyScheme, @@ -33,7 +52,6 @@ namespace NKikimr::NGRpcProxy::V1 { const TString& path, const TString& database = TString(), const TString& localDc = TString() - ); Ydb::StatusIds::StatusCode FillProposeRequestImpl( @@ -58,7 +76,7 @@ namespace NKikimr::NGRpcProxy::V1 { TString& error, const TActorContext& ctx, const Ydb::StatusIds::StatusCode dubsStatus = Ydb::StatusIds::BAD_REQUEST); - TString AddReadRuleToConfig( + TMsgPqCodes AddReadRuleToConfig( NKikimrPQ::TPQTabletConfig *config, const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr, const TClientServiceTypes& supportedReadRuleServiceTypes, @@ -71,7 +89,7 @@ namespace NKikimr::NGRpcProxy::V1 { const TActorContext& ctx ); NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode); - + NYql::TIssue FillIssue(const TString &errorReason, const size_t errorCode); template <typename T> class THasCdcStreamCompatibility { @@ -185,8 +203,8 @@ namespace NKikimr::NGRpcProxy::V1 { NSchemeCache::TSchemeCacheNavigate::KindTopic) { this->Request_->RaiseIssue( FillIssue( - TStringBuilder() << "path '" << path << "' is not a topic", - Ydb::PersQueue::ErrorCode::ERROR + TStringBuilder() << "path '" << path << "' is not a stream", + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -221,7 +239,7 @@ namespace NKikimr::NGRpcProxy::V1 { this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "path '" << path << "' is not compatible scheme object", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -229,7 +247,7 @@ namespace NKikimr::NGRpcProxy::V1 { this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "path '" << path << "' creation is not completed", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -242,7 +260,7 @@ namespace NKikimr::NGRpcProxy::V1 { FillIssue( TStringBuilder() << "path '" << path << "' does not exist or you " << "do not have access rights", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::ACCESS_DENIED ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -251,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 { this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "table creation is not completed", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -261,7 +279,7 @@ namespace NKikimr::NGRpcProxy::V1 { this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "path '" << path << "' is not a table", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -271,7 +289,7 @@ namespace NKikimr::NGRpcProxy::V1 { this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "unknown database root", - Ydb::PersQueue::ErrorCode::ERROR + Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ) ); return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); @@ -292,6 +310,14 @@ namespace NKikimr::NGRpcProxy::V1 { IsDead = true; } + void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus, + const TString& messageText, const NActors::TActorContext& ctx) { + this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus)); + this->Request_->ReplyWithYdbStatus(status); + this->Die(ctx); + IsDead = true; + } + void ReplyWithResult(Ydb::StatusIds::StatusCode status, const NActors::TActorContext& ctx) { this->Request_->ReplyWithYdbStatus(status); this->Die(ctx); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 0eddfe0cf23..f9bdd72464f 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -237,14 +237,17 @@ void TAddReadRuleActor::ModifyPersqueueConfig( rule.set_version(selfInfo.GetVersion().GetPQVersion()); } auto serviceTypes = GetSupportedClientServiceTypes(ctx); - TString error = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx); - auto status = error.empty() ? CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS) + + TString error; + auto messageAndCode = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx); + auto status = messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK ? + CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS) : Ydb::StatusIds::BAD_REQUEST; if (status != Ydb::StatusIds::SUCCESS) { return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::BAD_REQUEST, - error, ctx); + messageAndCode.Message, ctx); } } @@ -333,7 +336,6 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti { TString error; - auto status = FillProposeRequestImpl(name, GetProtoRequest()->settings(), modifyScheme, ctx, false, error, workingDir, proposal.Record.GetDatabaseName(), LocalCluster); if (!error.empty()) { @@ -367,7 +369,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction TString error; auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, ctx, error, - workingDir, proposal.Record.GetDatabaseName(), LocalCluster); + workingDir, proposal.Record.GetDatabaseName(), LocalCluster).YdbCode; if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); |