aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlpetrov02 <lpetrov02@yandex-team.com>2023-02-13 18:09:09 +0300
committerlpetrov02 <lpetrov02@yandex-team.com>2023-02-13 18:09:09 +0300
commitcbe7a80bc6a10c43817fb9d3c5e8dca885ca253e (patch)
tree6fe555b338e2b7ca28780cee0d89ab62785b96da
parentb5b3d520c01646610f376c55455d9cad735e3e0d (diff)
downloadydb-cbe7a80bc6a10c43817fb9d3c5e8dca885ca253e.tar.gz
Introduce tests on error codes
Saves minor changes to pull from trunk Saves changes Adds alternative (to think) variants of http codes for ydb statuses
-rw-r--r--ydb/core/http_proxy/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/http_proxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/http_proxy/CMakeLists.linux.txt2
-rw-r--r--ydb/core/http_proxy/events.h13
-rw-r--r--ydb/core/http_proxy/exceptions_mapping.cpp175
-rw-r--r--ydb/core/http_proxy/exceptions_mapping.h24
-rw-r--r--ydb/core/http_proxy/http_req.cpp192
-rw-r--r--ydb/core/http_proxy/http_req.h15
-rw-r--r--ydb/core/http_proxy/http_service.cpp4
-rw-r--r--ydb/core/http_proxy/json_proto_conversion.h18
-rw-r--r--ydb/core/public_http/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/public_http/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/public_http/CMakeLists.linux.txt2
-rw-r--r--ydb/core/public_http/http_req.cpp88
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto4
-rw-r--r--ydb/services/datastreams/datastreams_codes.h35
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp167
-rw-r--r--ydb/services/datastreams/datastreams_proxy.h1
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp30
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp143
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h50
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp12
22 files changed, 743 insertions, 240 deletions
diff --git a/ydb/core/http_proxy/CMakeLists.darwin.txt b/ydb/core/http_proxy/CMakeLists.darwin.txt
index d76dd4343b6..8a70b7e5ed3 100644
--- a/ydb/core/http_proxy/CMakeLists.darwin.txt
+++ b/ydb/core/http_proxy/CMakeLists.darwin.txt
@@ -33,10 +33,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC
cpp-client-iam_private
ydb-services-datastreams
services-persqueue_v1-actors
+ api-protos
)
target_sources(ydb-core-http_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp
diff --git a/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt
index 35e6158b498..d70963e423e 100644
--- a/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/http_proxy/CMakeLists.linux-aarch64.txt
@@ -34,10 +34,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC
cpp-client-iam_private
ydb-services-datastreams
services-persqueue_v1-actors
+ api-protos
)
target_sources(ydb-core-http_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp
diff --git a/ydb/core/http_proxy/CMakeLists.linux.txt b/ydb/core/http_proxy/CMakeLists.linux.txt
index 35e6158b498..d70963e423e 100644
--- a/ydb/core/http_proxy/CMakeLists.linux.txt
+++ b/ydb/core/http_proxy/CMakeLists.linux.txt
@@ -34,10 +34,12 @@ target_link_libraries(ydb-core-http_proxy PUBLIC
cpp-client-iam_private
ydb-services-datastreams
services-persqueue_v1-actors
+ api-protos
)
target_sources(ydb-core-http_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/exceptions_mapping.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp
${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp
diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h
index c859597ebe2..4acb6376ed6 100644
--- a/ydb/core/http_proxy/events.h
+++ b/ydb/core/http_proxy/events.h
@@ -42,6 +42,7 @@ namespace NKikimr::NHttpProxy {
EvListEndpointsRequest,
EvListEndpointsResponse,
EvError,
+ EvErrorWithIssue,
EvCounter,
EvHistCounter,
EvToken,
@@ -139,6 +140,18 @@ namespace NKikimr::NHttpProxy {
, Response(response)
{}
};
+
+ struct TEvErrorWithIssue : public TEventLocal<TEvErrorWithIssue, EvErrorWithIssue> {
+ NYdb::EStatus Status;
+ size_t IssueCode;
+ TString Response;
+
+ TEvErrorWithIssue(const NYdb::EStatus status, const TString& response, size_t issueCode=0)
+ : Status(status)
+ , IssueCode(issueCode)
+ , Response(response)
+ {}
+ };
};
diff --git a/ydb/core/http_proxy/exceptions_mapping.cpp b/ydb/core/http_proxy/exceptions_mapping.cpp
new file mode 100644
index 00000000000..5fcbc1d8d60
--- /dev/null
+++ b/ydb/core/http_proxy/exceptions_mapping.cpp
@@ -0,0 +1,175 @@
+#include "exceptions_mapping.h"
+
+#include <unordered_map>
+
+
+namespace NKikimr::NHttpProxy {
+
+enum class EMethodId : ui8 {
+ CREATE_STREAM = 100,
+ DELETE_STREAM = 101,
+ UPDATE_SHARD_COUNT = 102,
+ UPDATE_STREAM_MODE = 103,
+ UPDATE_STREAM = 104,
+ SET_WRITE_QUOTA = 105,
+ SET_STREAM_RETENTION_PERIOD = 106,
+ DESCRIBE_STREAM = 107,
+ LIST_STREAMS = 108,
+ LIST_STREAM_CONSUMERS = 109,
+ REGISTER_STREAM_CONSUMER = 110,
+ DEREGISTER_STREAM_CONSUMER = 111,
+ GET_SHARD_ITERATOR = 112,
+ GET_RECORDS = 113,
+ LIST_SHARDS = 114,
+ DESCRIBE_STREAM_SUMMARY = 115
+};
+
+static std::unordered_map<TString, EMethodId> getEMethodId = {
+ {"CreateStream", EMethodId::CREATE_STREAM},
+ {"DeleteStream", EMethodId::DELETE_STREAM},
+ {"UpdateShardCount", EMethodId::UPDATE_SHARD_COUNT},
+ {"UpdateStreamMode", EMethodId::UPDATE_STREAM_MODE},
+ {"UpdateStream", EMethodId::UPDATE_STREAM},
+ {"SetWriteQuota", EMethodId::SET_WRITE_QUOTA},
+ {"SetStreamRetentionPeriod", EMethodId::SET_STREAM_RETENTION_PERIOD},
+ {"DescribeStream", EMethodId::DESCRIBE_STREAM},
+ {"ListStreams", EMethodId::LIST_STREAMS},
+ {"ListStreamConsumers", EMethodId::LIST_STREAM_CONSUMERS},
+ {"RegisterStreamConsumer", EMethodId::REGISTER_STREAM_CONSUMER},
+ {"DeregisterStreamConsumer", EMethodId::DEREGISTER_STREAM_CONSUMER},
+ {"GetShardIterator", EMethodId::GET_SHARD_ITERATOR},
+ {"GetRecords", EMethodId::GET_RECORDS},
+ {"ListShards", EMethodId::LIST_SHARDS},
+ {"DescribeStreamSummary", EMethodId::DESCRIBE_STREAM_SUMMARY}
+};
+
+
+TException AlreadyExistsExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ Y_UNUSED(issueCode);
+ return TException("ResourceInUseException", HTTP_BAD_REQUEST);
+}
+
+TException BadRequestExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ EMethodId Method = getEMethodId[method];
+
+ switch (issueCode) {
+ case NYds::EErrorCodes::ACCESS_DENIED:
+ return TException("AccessDeniedException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::INVALID_ARGUMENT:
+ return TException("InvalidArgumentException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::VALIDATION_ERROR:
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::MISSING_PARAMETER:
+ return TException("MissingParameter", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::IN_USE:
+ return TException("ResourceInUseException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::GENERIC_ERROR:
+ switch (Method) {
+ case EMethodId::LIST_STREAMS:
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+ case EMethodId::DESCRIBE_STREAM:
+ return TException("ResourceNotFoundException", HTTP_BAD_REQUEST);
+ case EMethodId::CREATE_STREAM:
+ return TException("LimitExceededException", HTTP_BAD_REQUEST);
+ default:
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+ }
+ case NYds::EErrorCodes::EXPIRED_TOKEN:
+ return TException("ExpiredNextTokenException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::EXPIRED_ITERATOR:
+ return TException("ExpiredIteratorException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::NOT_FOUND:
+ return TException("ResourceNotFoundException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::BAD_REQUEST:
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::INCOMPLETE_SIGNATURE:
+ return TException("IncompleteSignature", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION:
+ return TException("InvalidParameterCombination", HTTP_BAD_REQUEST);
+ default:
+ return TException("UnknownError", HTTP_BAD_REQUEST);
+ }
+}
+
+TException GenericErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(issueCode);
+ EMethodId Method = getEMethodId[method];
+ switch (Method) {
+ case EMethodId::CREATE_STREAM:
+ return TException("LimitExceededException", HTTP_BAD_REQUEST);
+ default:
+ return TException("GenericError", HTTP_BAD_REQUEST);
+ }
+}
+
+TException InternalErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ Y_UNUSED(issueCode);
+ return TException("InternalError", HTTP_INTERNAL_SERVER_ERROR);
+}
+
+TException NotFoundExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ switch (issueCode) {
+ case NYds::EErrorCodes::INVALID_ARGUMENT:
+ return TException("InvalidArgumentException", HTTP_BAD_REQUEST);
+ default:
+ return TException("ResourceNotFoundException", HTTP_BAD_REQUEST);
+ }
+}
+
+TException OverloadedExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ switch (issueCode) {
+ case NYds::EErrorCodes::ERROR:
+ return TException("ResourceInUseException", HTTP_BAD_REQUEST);
+ default:
+ return TException("ThrottlingException", HTTP_BAD_REQUEST);
+ }
+}
+
+TException PreconditionFailedExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ Y_UNUSED(issueCode);
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+}
+
+TException SchemeErrorExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ switch (issueCode) {
+ case NYds::EErrorCodes::INVALID_ARGUMENT:
+ return TException("InvalidArgumentException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::VALIDATION_ERROR:
+ return TException("ValidationException", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::NOT_FOUND:
+ case NYds::EErrorCodes::ACCESS_DENIED:
+ return TException("ResourceNotFoundException", HTTP_BAD_REQUEST);
+ default:
+ return TException("InvalidArgumentException", HTTP_BAD_REQUEST);
+ }
+}
+
+TException UnauthorizedExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ switch (issueCode) {
+ case NYds::EErrorCodes::INCOMPLETE_SIGNATURE:
+ return TException("IncompleteSignature", HTTP_BAD_REQUEST);
+ case NYds::EErrorCodes::MISSING_AUTHENTICATION_TOKEN:
+ return TException("MissingAuthenticationToken", HTTP_BAD_REQUEST);
+ default:
+ return TException("AccessDeniedException", HTTP_BAD_REQUEST);
+ }
+}
+
+TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueCode) {
+ Y_UNUSED(method);
+ switch (issueCode) {
+ case NYds::EErrorCodes::MISSING_ACTION:
+ return TException("MissingAction", HTTP_BAD_REQUEST);
+ default:
+ return TException("InvalidAction", HTTP_BAD_REQUEST);
+ }
+}
+
+}
diff --git a/ydb/core/http_proxy/exceptions_mapping.h b/ydb/core/http_proxy/exceptions_mapping.h
new file mode 100644
index 00000000000..93c3f389530
--- /dev/null
+++ b/ydb/core/http_proxy/exceptions_mapping.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <ydb/services/datastreams/datastreams_codes.h>
+
+#include <library/cpp/http/misc/httpcodes.h>
+
+
+using TException = std::pair<TString, HttpCodes>;
+
+
+namespace NKikimr::NHttpProxy {
+
+ TException AlreadyExistsExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException BadRequestExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException GenericErrorExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException InternalErrorExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException NotFoundExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException OverloadedExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException PreconditionFailedExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException SchemeErrorExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException UnauthorizedExceptions(const TString& method, NYds::EErrorCodes issueCode);
+ TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueCode);
+
+} \ No newline at end of file
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 339cc354604..68e03b198b3 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -3,6 +3,7 @@
#include "http_req.h"
#include "json_proto_conversion.h"
#include "custom_metrics.h"
+#include "exceptions_mapping.h"
#include <library/cpp/actors/http/http_proxy.h>
#include <library/cpp/cgiparam/cgiparam.h>
@@ -60,85 +61,50 @@ namespace NKikimr::NHttpProxy {
using namespace Ydb::DataStreams::V1;
using namespace NYdb::NDataStreams::V1;
- TString StatusToErrorType(NYdb::EStatus status) {
+ TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode = ISSUE_CODE_ERROR) {
+ auto IssueCode = static_cast<NYds::EErrorCodes>(issueCode);
+
switch(status) {
case NYdb::EStatus::SUCCESS:
- return "OK";
+ return TException("", HTTP_OK);
case NYdb::EStatus::BAD_REQUEST:
- return "InvalidParameterValueException"; //TODO: bring here issues and parse from them
- case NYdb::EStatus::CLIENT_UNAUTHENTICATED:
+ return BadRequestExceptions(method, IssueCode);
case NYdb::EStatus::UNAUTHORIZED:
- return "AccessDeniedException";
+ return UnauthorizedExceptions(method, IssueCode);
case NYdb::EStatus::INTERNAL_ERROR:
- return "InternalFailureException";
- case NYdb::EStatus::ABORTED:
- return "RequestExpiredException"; //TODO: find better code
- case NYdb::EStatus::UNAVAILABLE:
- return "ServiceUnavailableException";
+ return InternalErrorExceptions(method, IssueCode);
case NYdb::EStatus::OVERLOADED:
- return "ThrottlingException";
- case NYdb::EStatus::SCHEME_ERROR:
- return "ResourceNotFoundException";
+ return OverloadedExceptions(method, IssueCode);
case NYdb::EStatus::GENERIC_ERROR:
- return "InternalFailureException"; //TODO: find better code
- case NYdb::EStatus::TIMEOUT:
- return "RequestTimeoutException";
- case NYdb::EStatus::BAD_SESSION:
- return "AccessDeniedException";
+ return GenericErrorExceptions(method, IssueCode);
case NYdb::EStatus::PRECONDITION_FAILED:
+ return PreconditionFailedExceptions(method, IssueCode);
case NYdb::EStatus::ALREADY_EXISTS:
- return "ValidationErrorException"; //TODO: find better code
+ return AlreadyExistsExceptions(method, IssueCode);
+ case NYdb::EStatus::SCHEME_ERROR:
+ return SchemeErrorExceptions(method, IssueCode);
case NYdb::EStatus::NOT_FOUND:
- return "ResourceNotFoundException";
- case NYdb::EStatus::SESSION_EXPIRED:
- return "AccessDeniedException";
+ return NotFoundExceptions(method, IssueCode);
case NYdb::EStatus::UNSUPPORTED:
- return "InvalidActionException";
- default:
- return "InternalFailureException";
- }
-
- }
-
- HttpCodes StatusToHttpCode(NYdb::EStatus status) {
- switch(status) {
- case NYdb::EStatus::SUCCESS:
- return HTTP_OK;
- case NYdb::EStatus::UNSUPPORTED:
- case NYdb::EStatus::BAD_REQUEST:
- return HTTP_BAD_REQUEST;
+ return UnsupportedExceptions(method, IssueCode);
case NYdb::EStatus::CLIENT_UNAUTHENTICATED:
- case NYdb::EStatus::UNAUTHORIZED:
- return HTTP_FORBIDDEN;
- case NYdb::EStatus::INTERNAL_ERROR:
- return HTTP_INTERNAL_SERVER_ERROR;
+ return TException("Unauthenticated", HTTP_BAD_REQUEST);
case NYdb::EStatus::ABORTED:
- return HTTP_CONFLICT;
+ return TException("Aborted", HTTP_BAD_REQUEST);
case NYdb::EStatus::UNAVAILABLE:
- return HTTP_SERVICE_UNAVAILABLE;
- case NYdb::EStatus::OVERLOADED:
- return HTTP_BAD_REQUEST;
- case NYdb::EStatus::SCHEME_ERROR:
- return HTTP_NOT_FOUND;
- case NYdb::EStatus::GENERIC_ERROR:
- return HTTP_BAD_REQUEST;
+ return TException("Unavailable", HTTP_BAD_REQUEST);
case NYdb::EStatus::TIMEOUT:
- return HTTP_GATEWAY_TIME_OUT;
+ return TException("RequestExpired", HTTP_BAD_REQUEST);
case NYdb::EStatus::BAD_SESSION:
- return HTTP_UNAUTHORIZED;
- case NYdb::EStatus::PRECONDITION_FAILED:
- return HTTP_PRECONDITION_FAILED;
- case NYdb::EStatus::ALREADY_EXISTS:
- return HTTP_CONFLICT;
- case NYdb::EStatus::NOT_FOUND:
- return HTTP_NOT_FOUND;
+ return TException("BadSession", HTTP_BAD_REQUEST);
case NYdb::EStatus::SESSION_EXPIRED:
- return HTTP_UNAUTHORIZED;
+ return TException("SessionExpired", HTTP_BAD_REQUEST);
default:
- return HTTP_INTERNAL_SERVER_ERROR;
+ return TException("InternalException", HTTP_INTERNAL_SERVER_ERROR);
}
}
+
template<class TProto>
TString ExtractStreamNameWithoutProtoField(const TProto& req)
{
@@ -266,6 +232,7 @@ namespace NKikimr::NHttpProxy {
HFunc(TEvServerlessProxy::TEvClientReady, HandleClientReady);
HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult, Handle);
HFunc(TEvServerlessProxy::TEvError, HandleError);
+ HFunc(TEvServerlessProxy::TEvErrorWithIssue, HandleErrorWithIssue);
HFunc(TEvServerlessProxy::TEvGrpcRequestResult, HandleGrpcResponse);
HFunc(TEvServerlessProxy::TEvToken, HandleToken);
default:
@@ -367,6 +334,7 @@ namespace NKikimr::NHttpProxy {
auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
if (response.IsSuccess()) {
result->Message = MakeHolder<TProtoResult>(response.GetResult());
+
}
result->Status = MakeHolder<NYdb::TStatus>(response);
actorSystem->Send(actorId, result.Release());
@@ -393,7 +361,11 @@ namespace NKikimr::NHttpProxy {
ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response);
}
- void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) {
+ void HandleErrorWithIssue(TEvServerlessProxy::TEvErrorWithIssue::TPtr& ev, const TActorContext& ctx) {
+ ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Response, ev->Get()->IssueCode);
+ }
+
+ void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) {
/* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
@@ -402,26 +374,12 @@ namespace NKikimr::NHttpProxy {
{"folder", HttpContext.FolderId},
{"database", HttpContext.DatabaseId},
{"stream", HttpContext.StreamName},
- {"code", TStringBuilder() << (int)StatusToHttpCode(status)},
+ {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second},
{"name", "api.http.errors_per_second"}}
});
-
- ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{
- 1, true, true,
- {{"database", HttpContext.DatabaseName},
- {"method", Method},
- {"cloud_id", HttpContext.CloudId},
- {"folder_id", HttpContext.FolderId},
- {"database_id", HttpContext.DatabaseId},
- {"topic", HttpContext.StreamName},
- {"code", TStringBuilder() << (int)StatusToHttpCode(status)},
- {"name", "api.http.data_streams.response.count"}}
- });
- //TODO: add api.http.response.count
HttpContext.ResponseData.Status = status;
HttpContext.ResponseData.ErrorText = errorText;
- HttpContext.DoReply(ctx);
+ HttpContext.DoReply(ctx, issueCode);
ctx.Send(AuthActor, new TEvents::TEvPoisonPill());
@@ -459,7 +417,7 @@ namespace NKikimr::NHttpProxy {
return;
}
- return ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Message);
+ ReplyWithError(ctx, ev->Get()->Status, ev->Get()->Message);
}
void ReportLatencyCounters(const TActorContext& ctx) {
@@ -491,7 +449,7 @@ namespace NKikimr::NHttpProxy {
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
- {{"database", HttpContext.DatabaseName},
+ {{"database", HttpContext.DatabaseName},
{"method", Method},
{"cloud_id", HttpContext.CloudId},
{"folder_id", HttpContext.FolderId},
@@ -517,7 +475,11 @@ namespace NKikimr::NHttpProxy {
TStringOutput stringOutput(errorText);
ev->Get()->Status->GetIssues().PrintTo(stringOutput);
RetryCounter.Void();
- return ReplyWithError(ctx, ev->Get()->Status->GetStatus(), errorText);
+ auto issues = ev->Get()->Status->GetIssues();
+ size_t issueCode = (
+ issues && issues.begin()->IssueCode != ISSUE_CODE_OK
+ ) ? issues.begin()->IssueCode : ISSUE_CODE_GENERIC;
+ return ReplyWithError(ctx, ev->Get()->Status->GetStatus(), errorText, issueCode);
}
}
}
@@ -534,12 +496,18 @@ namespace NKikimr::NHttpProxy {
StartTime = ctx.Now();
try {
HttpContext.RequestBodyToProto(&Request);
- } catch (std::exception& e) {
+ } catch (const NKikimr::NSQS::TSQSException& e) {
+ NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
+ if (e.ErrorClass.ErrorCode == "MissingParameter")
+ issueCode = NYds::EErrorCodes::MISSING_PARAMETER;
+ else if (e.ErrorClass.ErrorCode == "InvalidQueryParameter" || e.ErrorClass.ErrorCode == "MalformedQueryString")
+ issueCode = NYds::EErrorCodes::INVALID_ARGUMENT;
+ return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(issueCode));
+ } catch (const std::exception& e) {
LOG_SP_WARN_S(ctx, NKikimrServices::HTTP_PROXY,
"got new request with incorrect json from [" << HttpContext.SourceAddress << "] " <<
"database '" << HttpContext.DatabaseName << "'");
-
- return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what());
+ return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, e.what(), static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT));
}
if (HttpContext.DatabaseName.empty()) {
@@ -645,9 +613,16 @@ namespace NKikimr::NHttpProxy {
proc->second->Execute(std::move(context), std::move(signature), ctx);
return true;
}
- context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST;
- context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name;
- context.DoReply(ctx);
+ else if (name.empty()) {
+ context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED;
+ context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name;
+ context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION));
+ }
+ else {
+ context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED;
+ context.ResponseData.ErrorText = TStringBuilder() << "Missing method name " << name;
+ context.DoReply(ctx);
+ }
return false;
}
@@ -700,7 +675,7 @@ namespace NKikimr::NHttpProxy {
return signature;
}
- void THttpRequestContext::DoReply(const TActorContext& ctx) {
+ void THttpRequestContext::DoReply(const TActorContext& ctx, size_t issueCode) {
auto createResponse = [this](const auto& request,
TStringBuf status,
TStringBuf message,
@@ -751,13 +726,14 @@ namespace NKikimr::NHttpProxy {
ResponseData.Body.SetType(NJson::JSON_MAP);
ResponseData.Body["message"] = ResponseData.ErrorText;
- ResponseData.Body["__type"] = StatusToErrorType(ResponseData.Status);
+ ResponseData.Body["__type"] = MapToException(ResponseData.Status, MethodName, issueCode).first;
}
+ auto [errorName, httpCode] = MapToException(ResponseData.Status, MethodName, issueCode);
auto response = createResponse(
Request,
- TStringBuilder() << (ui32)StatusToHttpCode(ResponseData.Status),
- StatusToErrorType(ResponseData.Status),
+ TStringBuilder() << (ui32)httpCode,
+ errorName,
strByMimeAws(ContentType),
ResponseData.DumpBody(ContentType)
);
@@ -782,8 +758,8 @@ namespace NKikimr::NHttpProxy {
} else if (AsciiEqualsIgnoreCase(header.first, REQUEST_TARGET_HEADER)) {
TString requestTarget = TString(header.second);
TVector<TString> parts = SplitString(requestTarget, ".");
- ApiVersion = parts[0];
- MethodName = parts[1];
+ ApiVersion = parts.size() > 0 ? parts[0] : "";
+ MethodName = parts.size() > 1 ? parts[1] : "";
} else if (AsciiEqualsIgnoreCase(header.first, REQUEST_CONTENT_TYPE_HEADER)) {
ContentType = mimeByStr(header.second);
} else if (AsciiEqualsIgnoreCase(header.first, REQUEST_DATE_HEADER)) {
@@ -930,7 +906,10 @@ namespace NKikimr::NHttpProxy {
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
if (navigate->ErrorCount) {
- return ReplyWithError(ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists");
+ return ReplyWithError(
+ ctx, NYdb::EStatus::SCHEME_ERROR, TStringBuilder() << "Database with path '" << Database << "' doesn't exists",
+ NYds::EErrorCodes::NOT_FOUND
+ );
}
Y_VERIFY(navigate->ResultSet.size() == 1);
if (navigate->ResultSet.front().PQGroupInfo) {
@@ -967,7 +946,9 @@ namespace NKikimr::NHttpProxy {
TInstant signedAt;
if (!Signature.Get() && IamToken.empty()) {
return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED,
- "Neither Credentials nor IAM token was provided");
+ "Neither Credentials nor IAM token was provided",
+ NYds::EErrorCodes::INCOMPLETE_SIGNATURE
+ );
}
if (Signature) {
bool found = false;
@@ -980,12 +961,23 @@ namespace NKikimr::NHttpProxy {
if (!found) {
return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED,
TStringBuilder() << "Wrong service region: got " << Signature->GetRegion() <<
- " expected " << ServiceConfig.GetHttpConfig().GetYandexCloudServiceRegion(0));
+ " expected " << ServiceConfig.GetHttpConfig().GetYandexCloudServiceRegion(0),
+ NYds::EErrorCodes::INCOMPLETE_SIGNATURE
+ );
}
if (!TInstant::TryParseIso8601(Signature->GetSigningTimestamp(), signedAt)) {
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST,
- "Failed to parse Signature timestamp");
+ "Failed to parse Signature timestamp",
+ NYds::EErrorCodes::INCOMPLETE_SIGNATURE
+ );
+ }
+
+ if (Signature->GetAccessKeyId().empty()) {
+ return ReplyWithError(ctx, NYdb::EStatus::UNAUTHORIZED,
+ "Access key id should be provided",
+ NYds::EErrorCodes::MISSING_AUTHENTICATION_TOKEN
+ );
}
}
@@ -1073,8 +1065,9 @@ namespace NKikimr::NHttpProxy {
ctx.Send(MakeIamTokenServiceID(), std::move(request));
}
- void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) {
- ctx.Send(Sender, new TEvServerlessProxy::TEvError(status, errorText));
+ void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText,
+ NYds::EErrorCodes issueCode = NYds::EErrorCodes::GENERIC_ERROR) {
+ ctx.Send(Sender, new TEvServerlessProxy::TEvErrorWithIssue(status, errorText, static_cast<size_t>(issueCode)));
TBase::Die(ctx);
}
@@ -1169,3 +1162,10 @@ namespace NKikimr::NHttpProxy {
} // namespace NKikimr::NHttpProxy
+
+template <>
+void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p) {
+ TString s = TStringBuilder() << "NYdb status: " << std::to_string(static_cast<size_t>(p.Status)) <<
+ ". Body: " << NJson::WriteJson(p.Body) << ". Error text: " << p.ErrorText;
+ o.Write(s.data(), s.length());
+}
diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h
index 8973107348a..60437ff3b2b 100644
--- a/ydb/core/http_proxy/http_req.h
+++ b/ydb/core/http_proxy/http_req.h
@@ -2,6 +2,8 @@
#include "events.h"
+#include <ydb/services/datastreams/datastreams_codes.h>
+
#include <ydb/core/protos/serverless_proxy_config.pb.h>
#include <ydb/core/protos/serverless_proxy_config.pb.h>
@@ -15,14 +17,16 @@
#include <library/cpp/json/json_value.h>
#include <library/cpp/json/json_reader.h>
+#include <util/stream/output.h>
#include <util/string/builder.h>
+#define ISSUE_CODE_OK 0
+#define ISSUE_CODE_GENERIC 500030
+#define ISSUE_CODE_ERROR 500100
-namespace NKikimr::NHttpProxy {
-HttpCodes StatusToHttpCode(NYdb::EStatus status);
-TString StatusToErrorType(NYdb::EStatus status);
+namespace NKikimr::NHttpProxy {
class TRetryCounter {
public:
@@ -88,7 +92,7 @@ struct THttpRequestContext {
}
THolder<NKikimr::NSQS::TAwsRequestSignV4> GetSignature();
- void DoReply(const TActorContext& ctx);
+ void DoReply(const TActorContext& ctx, size_t issueCode = ISSUE_CODE_GENERIC);
void ParseHeaders(TStringBuf headers);
void RequestBodyToProto(NProtoBuf::Message* request);
};
@@ -125,3 +129,6 @@ NActors::IActor* CreateIamAuthActor(const TActorId sender, THttpRequestContext&
} // namespace NKinesis::NHttpProxy
+
+template <>
+void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p);
diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp
index 026c9c73247..a08d764bc55 100644
--- a/ydb/core/http_proxy/http_service.cpp
+++ b/ydb/core/http_proxy/http_service.cpp
@@ -100,10 +100,10 @@ namespace NKikimr::NHttpProxy {
try {
auto signature = context.GetSignature();
Processors->Execute(context.MethodName, std::move(context), std::move(signature), ctx);
- } catch (NKikimr::NSQS::TSQSException& e) {
+ } catch (const NKikimr::NSQS::TSQSException& e) {
context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST;
context.ResponseData.ErrorText = e.what();
- context.DoReply(ctx);
+ context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED));
return;
}
}
diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h
index f2601459ec5..e6c0d6d2119 100644
--- a/ydb/core/http_proxy/json_proto_conversion.h
+++ b/ydb/core/http_proxy/json_proto_conversion.h
@@ -8,6 +8,7 @@
#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/library/naming_conventions/naming_conventions.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/library/http_proxy/error/error.h>
#include <nlohmann/json.hpp>
@@ -139,11 +140,20 @@ inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value
inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
Y_ENSURE(depth < 101, "Json depth is > 100");
- Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map");
+ Y_ENSURE_EX(
+ !jsonValue.IsNull(),
+ NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MISSING_PARAMETER) <<
+ "Top level of json value is not a map"
+ );
auto* desc = message->GetDescriptor();
auto* reflection = message->GetReflection();
for (const auto& [key, value] : jsonValue.GetMap()) {
auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key));
+ Y_ENSURE_EX(
+ fieldDescriptor,
+ NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::INVALID_QUERY_PARAMETER) <<
+ "Unexpected json key: " << key
+ );
Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key);
auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE;
if (fieldDescriptor->options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) {
@@ -293,7 +303,11 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message*
inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
Y_ENSURE(depth < 101, "Json depth is > 100");
- Y_ENSURE(jsonValue.is_object(), "Top level of json value is not a map");
+ Y_ENSURE_EX(
+ !jsonValue.is_null(),
+ NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MISSING_PARAMETER) <<
+ "Top level of json value is not a map"
+ );
auto* desc = message->GetDescriptor();
auto* reflection = message->GetReflection();
for (const auto& [key, value] : jsonValue.get<std::unordered_map<std::string, nlohmann::json>>()) {
diff --git a/ydb/core/public_http/CMakeLists.darwin.txt b/ydb/core/public_http/CMakeLists.darwin.txt
index b674ecc8872..adb69457848 100644
--- a/ydb/core/public_http/CMakeLists.darwin.txt
+++ b/ydb/core/public_http/CMakeLists.darwin.txt
@@ -28,6 +28,7 @@ target_link_libraries(ydb-core-public_http PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp
@@ -55,6 +56,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http.global PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp
diff --git a/ydb/core/public_http/CMakeLists.linux-aarch64.txt b/ydb/core/public_http/CMakeLists.linux-aarch64.txt
index 790494f5f93..50759814942 100644
--- a/ydb/core/public_http/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/public_http/CMakeLists.linux-aarch64.txt
@@ -29,6 +29,7 @@ target_link_libraries(ydb-core-public_http PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp
@@ -57,6 +58,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http.global PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp
diff --git a/ydb/core/public_http/CMakeLists.linux.txt b/ydb/core/public_http/CMakeLists.linux.txt
index 790494f5f93..50759814942 100644
--- a/ydb/core/public_http/CMakeLists.linux.txt
+++ b/ydb/core/public_http/CMakeLists.linux.txt
@@ -29,6 +29,7 @@ target_link_libraries(ydb-core-public_http PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp
@@ -57,6 +58,7 @@ target_link_libraries(ydb-core-public_http.global PUBLIC
core-viewer-json
yq-libs-result_formatter
yql-public-issue
+ cpp-client-ydb_types
)
target_sources(ydb-core-public_http.global PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/public_http/61d421c52ac59775eb29fb236d3d6f5a.cpp
diff --git a/ydb/core/public_http/http_req.cpp b/ydb/core/public_http/http_req.cpp
index f98eef0da4c..0444ade1957 100644
--- a/ydb/core/public_http/http_req.cpp
+++ b/ydb/core/public_http/http_req.cpp
@@ -1,6 +1,7 @@
#include "http_req.h"
#include <library/cpp/actors/http/http_proxy.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/http_proxy/http_req.h>
#include <util/generic/guid.h>
@@ -12,10 +13,91 @@ namespace NKikimr::NPublicHttp {
constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type";
constexpr TStringBuf REQUEST_FORWARDED_FOR = "x-forwarded-for";
constexpr TStringBuf IDEMPOTENCY_KEY_HEADER = "idempotency-key";
-
+
constexpr TStringBuf APPLICATION_JSON = "application/json";
constexpr TStringBuf TEXT_PLAIN_UTF8 = "text/plain;charset=UTF-8";
+
+ TString StatusToErrorType(NYdb::EStatus status) {
+ switch(status) {
+ case NYdb::EStatus::SUCCESS:
+ return "OK";
+ case NYdb::EStatus::BAD_REQUEST:
+ return "InvalidParameterValueException"; //TODO: bring here issues and parse from them
+ case NYdb::EStatus::CLIENT_UNAUTHENTICATED:
+ case NYdb::EStatus::UNAUTHORIZED:
+ return "AccessDeniedException";
+ case NYdb::EStatus::INTERNAL_ERROR:
+ return "InternalFailureException";
+ case NYdb::EStatus::ABORTED:
+ return "RequestExpiredException"; //TODO: find better code
+ case NYdb::EStatus::UNAVAILABLE:
+ return "ServiceUnavailableException";
+ case NYdb::EStatus::OVERLOADED:
+ return "ThrottlingException";
+ case NYdb::EStatus::SCHEME_ERROR:
+ return "ResourceNotFoundException";
+ case NYdb::EStatus::GENERIC_ERROR:
+ return "InternalFailureException"; //TODO: find better code
+ case NYdb::EStatus::TIMEOUT:
+ return "RequestTimeoutException";
+ case NYdb::EStatus::BAD_SESSION:
+ return "AccessDeniedException";
+ case NYdb::EStatus::PRECONDITION_FAILED:
+ case NYdb::EStatus::ALREADY_EXISTS:
+ return "ValidationErrorException"; //TODO: find better code
+ case NYdb::EStatus::NOT_FOUND:
+ return "ResourceNotFoundException";
+ case NYdb::EStatus::SESSION_EXPIRED:
+ return "AccessDeniedException";
+ case NYdb::EStatus::UNSUPPORTED:
+ return "InvalidActionException";
+ default:
+ return "InternalFailureException";
+ }
+
+ }
+
+ HttpCodes StatusToHttpCode(NYdb::EStatus status) {
+ switch(status) {
+ case NYdb::EStatus::SUCCESS:
+ return HTTP_OK;
+ case NYdb::EStatus::UNSUPPORTED:
+ case NYdb::EStatus::BAD_REQUEST:
+ return HTTP_BAD_REQUEST;
+ case NYdb::EStatus::CLIENT_UNAUTHENTICATED:
+ case NYdb::EStatus::UNAUTHORIZED:
+ return HTTP_FORBIDDEN;
+ case NYdb::EStatus::INTERNAL_ERROR:
+ return HTTP_INTERNAL_SERVER_ERROR;
+ case NYdb::EStatus::ABORTED:
+ return HTTP_CONFLICT;
+ case NYdb::EStatus::UNAVAILABLE:
+ return HTTP_SERVICE_UNAVAILABLE;
+ case NYdb::EStatus::OVERLOADED:
+ return HTTP_BAD_REQUEST;
+ case NYdb::EStatus::SCHEME_ERROR:
+ return HTTP_NOT_FOUND;
+ case NYdb::EStatus::GENERIC_ERROR:
+ return HTTP_BAD_REQUEST;
+ case NYdb::EStatus::TIMEOUT:
+ return HTTP_GATEWAY_TIME_OUT;
+ case NYdb::EStatus::BAD_SESSION:
+ return HTTP_UNAUTHORIZED;
+ case NYdb::EStatus::PRECONDITION_FAILED:
+ return HTTP_PRECONDITION_FAILED;
+ case NYdb::EStatus::ALREADY_EXISTS:
+ return HTTP_CONFLICT;
+ case NYdb::EStatus::NOT_FOUND:
+ return HTTP_NOT_FOUND;
+ case NYdb::EStatus::SESSION_EXPIRED:
+ return HTTP_UNAUTHORIZED;
+ default:
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+
TString GenerateRequestId(const TString& sourceReqId) {
if (sourceReqId.empty()) {
return CreateGuidAsString();
@@ -98,8 +180,8 @@ namespace NKikimr::NPublicHttp {
void THttpRequestContext::DoResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& errorText, TStringBuf contentType) const {
const NYdb::EStatus ydbStatus = static_cast<NYdb::EStatus>(status);
- const TString httpCodeStr = ToString((int)NKikimr::NHttpProxy::StatusToHttpCode(ydbStatus));
- DoResponse(httpCodeStr, NKikimr::NHttpProxy::StatusToErrorType(ydbStatus), errorText, contentType);
+ const TString httpCodeStr = ToString((int)StatusToHttpCode(ydbStatus));
+ DoResponse(httpCodeStr, StatusToErrorType(ydbStatus), errorText, contentType);
}
void THttpRequestContext::ResponseOK() const {
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index 0db64f69fed..3620c4799bd 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -44,4 +44,8 @@ enum ErrorCode {
SET_OFFSET_ERROR_COMMIT_TO_PAST = 500025;
ERROR = 500100;
+
+ INVALID_ARGUMENT = 500040;
+ VALIDATION_ERROR = 500080;
+
}
diff --git a/ydb/services/datastreams/datastreams_codes.h b/ydb/services/datastreams/datastreams_codes.h
new file mode 100644
index 00000000000..f1435a90a83
--- /dev/null
+++ b/ydb/services/datastreams/datastreams_codes.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <util/system/types.h>
+
+
+namespace NYds {
+
+enum class EErrorCodes : size_t {
+ // Server statuses
+ OK = 0, // compatible with PersQueue::ErrorCode
+
+ BAD_REQUEST = 500003, // compatible with PersQueue::ErrorCode
+ ERROR = 500100, // compatible with PersQueue::ErrorCode
+ ACCESS_DENIED = 500018, // compatible with PersQueue::ErrorCode
+
+ GENERIC_ERROR = 500030,
+ INVALID_ARGUMENT = 500040,
+ MISSING_PARAMETER = 500050,
+ NOT_FOUND = 500060,
+ IN_USE = 500070,
+
+ VALIDATION_ERROR = 500080,
+ MISSING_ACTION = 500090,
+
+ INVALID_PARAMETER_COMBINATION = 500110,
+
+ EXPIRED_ITERATOR = 500120,
+ EXPIRED_TOKEN = 500130,
+
+ INCOMPLETE_SIGNATURE = 500140,
+ MISSING_AUTHENTICATION_TOKEN = 500150,
+};
+
+}
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 6f56ad5bb0a..a12a8e1288b 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -1,4 +1,5 @@
#include "datastreams_proxy.h"
+#include "datastreams_codes.h"
#include "put_records_actor.h"
#include "shard_iterator.h"
#include "next_token.h"
@@ -34,7 +35,6 @@ namespace NKikimr::NDataStreams::V1 {
using namespace NGRpcService;
using namespace NGRpcProxy::V1;
-
namespace {
template <class TRequest>
@@ -64,6 +64,7 @@ namespace NKikimr::NDataStreams::V1 {
}
}
+
class TCreateStreamActor : public TPQGrpcSchemaBase<TCreateStreamActor, NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest> {
using TBase = TPQGrpcSchemaBase<TCreateStreamActor, TEvDataStreamsCreateStreamRequest>;
using TProtoRequest = typename TBase::TProtoRequest;
@@ -138,13 +139,13 @@ namespace NKikimr::NDataStreams::V1 {
topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
break;
default:
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with unknown metering mode", ctx);
}
}
} else {
if (GetProtoRequest()->has_stream_mode_details()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with metering mode", ctx);
}
}
@@ -160,11 +161,10 @@ namespace NKikimr::NDataStreams::V1 {
pqDescr->SetPartitionPerTablet(1);
TString error;
- auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error,
+ TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error,
workingDir, proposal.Record.GetDatabaseName());
-
- if (status != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
+ return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
}
}
@@ -175,7 +175,7 @@ namespace NKikimr::NDataStreams::V1 {
&& msg->Record.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists)
{
return ReplyWithError(Ydb::StatusIds::ALREADY_EXISTS,
- Ydb::PersQueue::ErrorCode::ERROR,
+ static_cast<size_t>(NYds::EErrorCodes::IN_USE),
TStringBuilder() << "Stream with name " << GetProtoRequest()->stream_name() << " is already exists",
ctx);
}
@@ -245,7 +245,7 @@ namespace NKikimr::NDataStreams::V1 {
const auto& readRules = pqGroupDescription.GetPQTabletConfig().GetReadRules();
if (readRules.size() > 0 && EnforceDeletion == false) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE),
TStringBuilder() << "Stream has registered consumers" <<
"and EnforceConsumerDeletion flag is false", ctx);
}
@@ -286,7 +286,7 @@ namespace NKikimr::NDataStreams::V1 {
TString error;
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), error, ctx);
}
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
@@ -325,7 +325,7 @@ namespace NKikimr::NDataStreams::V1 {
Y_UNUSED(selfInfo);
Y_UNUSED(pqGroupDescription);
if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with metering mode", ctx);
}
@@ -337,7 +337,7 @@ namespace NKikimr::NDataStreams::V1 {
groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
break;
default:
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with unknown metering mode", ctx);
}
}
@@ -379,7 +379,7 @@ namespace NKikimr::NDataStreams::V1 {
TString error;
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error, ctx);
}
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
@@ -406,7 +406,7 @@ namespace NKikimr::NDataStreams::V1 {
if (GetProtoRequest()->has_stream_mode_details()) {
if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with metering mode", ctx);
}
@@ -418,15 +418,17 @@ namespace NKikimr::NDataStreams::V1 {
groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
break;
default:
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
"streams can't be created with unknown metering mode", ctx);
}
}
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx);
+ auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
if (status != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
+ static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
+ error, ctx);
}
}
@@ -472,9 +474,11 @@ namespace NKikimr::NDataStreams::V1 {
pqConfig->MutablePartitionConfig()->SetBurstSize(GetProtoRequest()->write_quota_kb_per_sec() * 1_KB);
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx);
+ auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
if (status != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
+ static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
+ error, ctx);
}
}
@@ -532,11 +536,12 @@ namespace NKikimr::NDataStreams::V1 {
pqConfig->MutablePartitionConfig()->SetLifetimeSeconds(newLifetime);
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- status = CheckConfig(*pqConfig, serviceTypes, error, ctx);
+ status = CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
}
if (status != Ydb::StatusIds::SUCCESS) {
- return TBase::ReplyWithError(status, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return TBase::ReplyWithError(status, status == Ydb::StatusIds::ALREADY_EXISTS ? static_cast<size_t>(NYds::EErrorCodes::IN_USE) :
+ static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR), error, ctx);
}
}
@@ -563,13 +568,13 @@ namespace NKikimr::NDataStreams::V1 {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -667,6 +672,7 @@ namespace NKikimr::NDataStreams::V1 {
ui32 writeSpeed = pqConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() / 1_KB;
auto& description = *result.mutable_stream_description();
description.set_stream_name(GetProtoRequest()->stream_name());
+ description.set_stream_arn(GetProtoRequest()->stream_name()); // Added by lpetrov02 for testing
ui32 retentionPeriodHours = TInstant::Seconds(pqConfig.GetPartitionConfig().GetLifetimeSeconds()).Hours();
description.set_retention_period_hours(retentionPeriodHours);
description.set_write_quota_kb_per_sec(writeSpeed);
@@ -727,7 +733,6 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------
-
class TListStreamsActor : public TRpcSchemeRequestActor<TListStreamsActor, NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest> {
using TBase = TRpcSchemeRequestActor<TListStreamsActor, TEvDataStreamsListStreamsRequest>;
@@ -745,9 +750,10 @@ namespace NKikimr::NDataStreams::V1 {
void SendPendingRequests(const TActorContext& ctx);
void SendResponse(const TActorContext& ctx);
- void ReplyWithError(Ydb::StatusIds::StatusCode status, Ydb::PersQueue::ErrorCode::ErrorCode pqStatus,
+ void ReplyWithError(Ydb::StatusIds::StatusCode status, NYds::EErrorCodes errorCode,
const TString& messageText, const NActors::TActorContext& ctx) {
- this->Request_->RaiseIssue(FillIssue(messageText, pqStatus));
+
+ this->Request_->RaiseIssue(FillIssue(messageText, static_cast<size_t>(errorCode)));
this->Request_->ReplyWithYdbStatus(status);
this->Die(ctx);
}
@@ -768,13 +774,13 @@ namespace NKikimr::NDataStreams::V1 {
void TListStreamsActor::Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
if (!Request_->GetDatabaseName()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, NYds::EErrorCodes::INVALID_ARGUMENT,
"Request without dabase is forbiden", ctx);
}
if (this->Request_->GetInternalToken().empty()) {
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, NYds::EErrorCodes::BAD_REQUEST,
"Unauthenticated access is forbidden, please provide credentials", ctx);
}
}
@@ -803,6 +809,7 @@ namespace NKikimr::NDataStreams::V1 {
int limit = GetProtoRequest()->limit() == 0 ? 100 : GetProtoRequest()->limit();
if (limit > 10000) {
+ Request_->RaiseIssue(FillIssue("'Limit' shoud not be higher than 10000", static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR)));
Request_->ReplyWithYdbStatus(Ydb::StatusIds::BAD_REQUEST);
return Die(ctx);
}
@@ -935,18 +942,26 @@ namespace NKikimr::NDataStreams::V1 {
void TListStreamConsumersActor::Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
+ if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_arn().empty()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION),
+ TStringBuilder() << "StreamArn and NextToken can not be provided together", ctx);
+ }
+ if (NextToken.IsExpired()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN),
+ TStringBuilder() << "Provided NextToken is expired", ctx);
+ }
+ if (!NextToken.IsValid()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
+ TStringBuilder() << "Provided NextToken is malformed", ctx);
+ }
+
auto maxResultsInRange = MIN_MAX_RESULTS <= MaxResults && MaxResults <= MAX_MAX_RESULTS;
if (!maxResultsInRange) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
TStringBuilder() << "Requested max_result value '" << MaxResults <<
"' is out of range [" << MIN_MAX_RESULTS << ", " << MAX_MAX_RESULTS <<
"]", ctx);
}
-
- if (!NextToken.IsValid()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
- TStringBuilder() << "Provided NextToken has expired or malformed", ctx);
- }
SendDescribeProposeRequest(ctx);
Become(&TListStreamConsumersActor::StateWork);
}
@@ -974,7 +989,7 @@ namespace NKikimr::NDataStreams::V1 {
const auto alreadyRead = NextToken.GetAlreadyRead();
if (alreadyRead > (ui32)streamReadRulesNames.size()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Provided next_token is malformed - " <<
"everything is already read", ctx);
}
@@ -1065,15 +1080,22 @@ namespace NKikimr::NDataStreams::V1 {
readRule.set_version(selfInfo.GetVersion().GetPQVersion());
}
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- TString error = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx);
- auto status = error.empty() ? CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS)
- : Ydb::StatusIds::BAD_REQUEST;
+ auto messageAndCode = AddReadRuleToConfig(pqConfig, readRule, serviceTypes, ctx);
+ size_t issueCode = static_cast<size_t>(messageAndCode.PQCode);
+
+ Ydb::StatusIds::StatusCode status;
+ if (messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK) {
+ status = CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS);
+ if (status == Ydb::StatusIds::ALREADY_EXISTS) {
+ issueCode = static_cast<size_t>(NYds::EErrorCodes::IN_USE);
+ }
+ } else {
+ status = Ydb::StatusIds::BAD_REQUEST;
+ }
+
if (status != Ydb::StatusIds::SUCCESS) {
- return ReplyWithError(status,
- status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK
- : Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- error, ctx);
+ return ReplyWithError(status, issueCode, messageAndCode.Message, ctx);
}
}
@@ -1136,7 +1158,7 @@ namespace NKikimr::NDataStreams::V1 {
ctx
);
if (!error.Empty()) {
- return ReplyWithError(Ydb::StatusIds::NOT_FOUND, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
+ return ReplyWithError(Ydb::StatusIds::NOT_FOUND, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), error, ctx);
}
}
@@ -1188,7 +1210,7 @@ namespace NKikimr::NDataStreams::V1 {
case TIteratorType::AT_SEQUENCE_NUMBER: {
auto sn = SequenceNumberToInt(GetProtoRequest()->starting_sequence_number());
if (!sn) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Malformed sequence number", ctx);
}
SequenceNumber = sn.value() + (IteratorType == TIteratorType::AFTER_SEQUENCE_NUMBER ? 1u : 0u);
@@ -1197,7 +1219,7 @@ namespace NKikimr::NDataStreams::V1 {
case TIteratorType::AT_TIMESTAMP:
if (GetProtoRequest()->timestamp() == 0 ||
GetProtoRequest()->timestamp() > static_cast<i64>(TInstant::Now().MilliSeconds())) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard iterator type is AT_TIMESTAMP, " <<
"but timestamp is either missed or too old or in future", ctx);
}
@@ -1210,7 +1232,7 @@ namespace NKikimr::NDataStreams::V1 {
ReadTimestampMs = TInstant::Now().MilliSeconds();
break;
default:
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard iterator type '" <<
(ui32)IteratorType << "' is not known", ctx);
@@ -1240,7 +1262,7 @@ namespace NKikimr::NDataStreams::V1 {
if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
token)) {
return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
- Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED),
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
@@ -1262,7 +1284,7 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
TStringBuilder() << "No such shard: " << ShardId, ctx);
}
@@ -1335,14 +1357,18 @@ namespace NKikimr::NDataStreams::V1 {
void TGetRecordsActor::Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
+ if (ShardIterator.IsExpired()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_ITERATOR),
+ TStringBuilder() << "Provided shard iterator is expired", ctx);
+ }
if (!ShardIterator.IsValid()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
- TStringBuilder() << "Provided shard iterator is malformed or expired", ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
+ TStringBuilder() << "Provided shard iterator is malformed", ctx);
}
Limit = Limit == 0 ? MAX_LIMIT : Limit;
if (Limit < 1 || Limit > MAX_LIMIT) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx);
}
@@ -1402,7 +1428,7 @@ namespace NKikimr::NDataStreams::V1 {
if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
token)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
- Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED),
TStringBuilder() << "Access to stream "
<< ShardIterator.GetStreamName()
<< " is denied for subject "
@@ -1423,7 +1449,7 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ctx);
}
@@ -1445,7 +1471,7 @@ namespace NKikimr::NDataStreams::V1 {
return;
default:
return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()),
- Ydb::PersQueue::ErrorCode::ERROR,
+ static_cast<size_t>(NYds::EErrorCodes::ERROR),
record.GetErrorReason(), ctx);
}
break;
@@ -1490,13 +1516,13 @@ namespace NKikimr::NDataStreams::V1 {
void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -1594,22 +1620,35 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
+ if (!GetProtoRequest()->next_token().empty() && !GetProtoRequest()->stream_name().empty()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_PARAMETER_COMBINATION),
+ TStringBuilder() << "StreamName and NextToken can not be provided together", ctx);
+ }
+ if (NextToken.IsExpired()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::EXPIRED_TOKEN),
+ TStringBuilder() << "Provided next token is expired", ctx);
+ }
+ if (!NextToken.IsValid()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
+ TStringBuilder() << "Provided next token is malformed", ctx);
+ }
+
if (!TShardFilter::ShardFilterType_IsValid(ShardFilter.type())) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Shard filter '" <<
(ui32)ShardFilter.type() << "' is not known", ctx);
}
MaxResults = MaxResults == 0 ? DEFAULT_MAX_RESULTS : MaxResults;
if (MaxResults > MAX_MAX_RESULTS) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::VALIDATION_ERROR),
TStringBuilder() << "Max results '" << MaxResults <<
"' is out of bound [" << MIN_MAX_RESULTS << "; " <<
MAX_MAX_RESULTS << "]", ctx);
}
if (ShardFilter.type() == TShardFilter::AFTER_SHARD_ID && ShardFilter.shard_id() == "") {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::MISSING_PARAMETER),
TStringBuilder() << "Shard filter type is AFTER_SHARD_ID," <<
" but no ShardId provided", ctx);
}
@@ -1640,7 +1679,7 @@ namespace NKikimr::NDataStreams::V1 {
if (!topicInfo.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
token)) {
return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
- Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED),
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
@@ -1699,7 +1738,7 @@ namespace NKikimr::NDataStreams::V1 {
const auto alreadyRead = NextToken.GetAlreadyRead();
if (alreadyRead > (ui32)partitions.size()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::ERROR,
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Provided next_token is malformed - "
"everything is already read", ctx);
}
@@ -1759,13 +1798,13 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
}
void TListShardsActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::ERROR,
+ ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, static_cast<size_t>(NYds::EErrorCodes::ERROR),
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
@@ -1906,7 +1945,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
- this->Request_->RaiseIssue(FillIssue("Method is not implemented yet", Ydb::PersQueue::ErrorCode::ErrorCode::ERROR));
+ this->Request_->RaiseIssue(FillIssue("Method is not implemented yet", static_cast<size_t>(NYds::EErrorCodes::ERROR)));
this->Request_->ReplyWithYdbStatus(Ydb::StatusIds::UNSUPPORTED);
this->Die(ctx);
}
diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h
index 7a3791a4d0d..a6d186cab2b 100644
--- a/ydb/services/datastreams/datastreams_proxy.h
+++ b/ydb/services/datastreams/datastreams_proxy.h
@@ -11,6 +11,7 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
+
namespace NKikimr {
namespace NGRpcService {
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 05be3683946..c7899e0750e 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -253,6 +253,7 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_status(),
YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_name(), streamName);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_arn(), streamName);
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().write_quota_kb_per_sec(), 1_KB);
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().retention_period_hours(), 24);
@@ -1782,7 +1783,7 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_UNEQUAL(result.GetResult().next_token().size(), 0);
auto nextToken = result.GetResult().next_token();
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
@@ -1813,26 +1814,26 @@ Y_UNIT_TEST_SUITE(DataStreams) {
};
auto nextToken = makeNextToken(TInstant::Now().MilliSeconds() - 300001);
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
nextToken = makeNextToken(TInstant::Now().MilliSeconds() + 1000);
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
nextToken = makeNextToken(0);
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken(nextToken)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken("some_garbage")).ExtractValueSync();
- result = testServer.DataStreamsClient->ListStreamConsumers(streamName,
+ result = testServer.DataStreamsClient->ListStreamConsumers("",
NYDS_V1::TListStreamConsumersSettings().NextToken("some_garbage")).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
@@ -2511,7 +2512,8 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
}
-/* { //TODO: datastreams api uses only one retention parameter
+ /*
+ { //TODO: datastreams api uses only one retention parameter
auto result = testServer.DataStreamsClient->CreateStream(streamName,
NYDS_V1::TCreateStreamSettings().ShardCount(shardCount)
.RetentionStorageMegabytes(55_KB).RetentionPeriodHours(8 * 24)).ExtractValueSync();
@@ -2612,4 +2614,18 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 0);
}
}
+
+ Y_UNIT_TEST(ListStreamsValidation) {
+ TInsecureDatastreamsTestServer testServer;
+
+ {
+ auto result = testServer.DataStreamsClient->ListStreams(
+ NYdb::NDataStreams::V1::TListStreamsSettings().Limit(1000000000).Recurse(false)
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
+ }
+ }
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index f080b3f99a2..9de74a9c836 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -11,6 +11,7 @@
#include <library/cpp/digest/md5/md5.h>
+
namespace NKikimr::NGRpcProxy::V1 {
constexpr TStringBuf GRPCS_ENDPOINT_PREFIX = "grpcs://";
@@ -65,7 +66,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return "";
}
- TString AddReadRuleToConfig(
+ TMsgPqCodes AddReadRuleToConfig(
NKikimrPQ::TPQTabletConfig* config,
const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr,
const TClientServiceTypes& supportedClientServiceTypes,
@@ -73,66 +74,96 @@ namespace NKikimr::NGRpcProxy::V1 {
) {
auto consumerName = NPersQueue::ConvertNewConsumerName(rr.consumer_name(), ctx);
+ if (consumerName.empty()) {
+ return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
+ }
if(consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) {
- return TStringBuilder() << "consumer '" << rr.consumer_name() << "' has illegal symbols";
+ return TMsgPqCodes(
+ TStringBuilder() << "consumer '" << rr.consumer_name() << "' has illegal symbols",
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
+ );
}
{
TString migrationError = ReadRuleServiceTypeMigration(config, ctx);
if (migrationError) {
- return migrationError;
+ return TMsgPqCodes(migrationError, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
}
config->AddReadRules(consumerName);
if (rr.starting_message_timestamp_ms() < 0) {
- return TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.starting_message_timestamp_ms();
+ return TMsgPqCodes(
+ TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.starting_message_timestamp_ms(),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms());
if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) {
- return TStringBuilder() << "Unknown format version with value " << (int)rr.supported_format() << " for " << rr.consumer_name();
+ return TMsgPqCodes(
+ TStringBuilder() << "Unknown format version with value " << (int)rr.supported_format() << " for " << rr.consumer_name(),
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
+ );
}
config->AddConsumerFormatVersions(rr.supported_format() - 1);
if (rr.version() < 0) {
- return TStringBuilder() << "version in read_rule can't be negative, provided " << rr.version();
+ return TMsgPqCodes(
+ TStringBuilder() << "version in read_rule can't be negative, provided " << rr.version(),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
config->AddReadRuleVersions(rr.version());
auto ct = config->AddConsumerCodecs();
if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) {
- return TStringBuilder() << "supported_codecs count cannot be more than "
- << MAX_SUPPORTED_CODECS_COUNT << ", provided " << rr.supported_codecs().size();
+ return TMsgPqCodes(
+ TStringBuilder() << "supported_codecs count cannot be more than "
+ << MAX_SUPPORTED_CODECS_COUNT << ", provided " << rr.supported_codecs().size(),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
for (const auto& codec : rr.supported_codecs()) {
if (!Ydb::PersQueue::V1::Codec_IsValid(codec) || codec == 0)
- return TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name();
+ return TMsgPqCodes(
+ TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name(),
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
+ );
ct->AddIds(codec - 1);
ct->AddCodecs(to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6));
}
if (rr.important()) {
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
- return TStringBuilder() << "important flag is forbiden for consumer " << rr.consumer_name();
+ return TMsgPqCodes(
+ TStringBuilder() << "important flag is forbiden for consumer " << rr.consumer_name(),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
if (!rr.service_type().empty()) {
if (!supportedClientServiceTypes.contains(rr.service_type())) {
- return TStringBuilder() << "Unknown read rule service type '" << rr.service_type()
- << "' for consumer '" << rr.consumer_name() << "'";
+ return TMsgPqCodes(
+ TStringBuilder() << "Unknown read rule service type '" << rr.service_type()
+ << "' for consumer '" << rr.consumer_name() << "'",
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
+ );
}
config->AddReadRuleServiceTypes(rr.service_type());
} else {
const auto& pqConfig = AppData(ctx)->PQConfig;
if (pqConfig.GetDisallowDefaultClientServiceType()) {
- return TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'";
+ return TMsgPqCodes(
+ TStringBuilder() << "service type cannot be empty for consumer '" << rr.consumer_name() << "'",
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
config->AddReadRuleServiceTypes(defaultCientServiceType);
}
- return "";
+ return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
}
@@ -151,7 +182,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- TString AddReadRuleToConfig(
+ TMsgPqCodes AddReadRuleToConfig(
NKikimrPQ::TPQTabletConfig* config,
const Ydb::Topic::Consumer& rr,
const TClientServiceTypes& supportedClientServiceTypes,
@@ -160,22 +191,25 @@ namespace NKikimr::NGRpcProxy::V1 {
) {
auto consumerName = NPersQueue::ConvertNewConsumerName(rr.name(), ctx);
if (consumerName.find("/") != TString::npos || consumerName.find("|") != TString::npos) {
- return TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols";
+ return TMsgPqCodes(TStringBuilder() << "consumer '" << rr.name() << "' has illegal symbols", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
if (consumerName.empty()) {
- return TStringBuilder() << "consumer with empty name is forbidden";
+ return TMsgPqCodes(TStringBuilder() << "consumer with empty name is forbidden", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
{
TString migrationError = ReadRuleServiceTypeMigration(config, ctx);
if (migrationError) {
- return migrationError;
+ return TMsgPqCodes(migrationError, migrationError.empty() ? Ydb::PersQueue::ErrorCode::OK : Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); //find better issueCode
}
}
config->AddReadRules(consumerName);
if (rr.read_from().seconds() < 0) {
- return TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.read_from().seconds();
+ return TMsgPqCodes(
+ TStringBuilder() << "starting_message_timestamp_ms in read_rule can't be negative, provided " << rr.read_from().seconds(),
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000);
@@ -196,13 +230,16 @@ namespace NKikimr::NGRpcProxy::V1 {
if (!pair.second.empty())
version = FromString<ui32>(pair.second);
} catch(...) {
- return TStringBuilder() << "Attribute for consumer '" << rr.name() << "' _version is " << pair.second << ", which is not ui32";
+ return TMsgPqCodes(
+ TStringBuilder() << "Attribute for consumer '" << rr.name() << "' _version is " << pair.second << ", which is not ui32",
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
+ );
}
} else if (pair.first == "_service_type") {
if (!pair.second.empty()) {
if (!supportedClientServiceTypes.contains(pair.second)) {
- return TStringBuilder() << "Unknown _service_type '" << pair.second
- << "' for consumer '" << rr.name() << "'";
+ return TMsgPqCodes(TStringBuilder() << "Unknown _service_type '" << pair.second
+ << "' for consumer '" << rr.name() << "'", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
serviceType = pair.second;
}
@@ -213,7 +250,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
if (serviceType.empty()) {
- return TStringBuilder() << "service type cannot be empty for consumer '" << rr.name() << "'";
+ return TMsgPqCodes(TStringBuilder() << "service type cannot be empty for consumer '" << rr.name() << "'", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
Y_VERIFY(supportedClientServiceTypes.find(serviceType) != supportedClientServiceTypes.end());
@@ -229,10 +266,10 @@ namespace NKikimr::NGRpcProxy::V1 {
}
if (!found) {
if (hasPassword) {
- return "incorrect client service type password";
+ return TMsgPqCodes("incorrect client service type password", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
if (AppData(ctx)->PQConfig.GetForceClientServiceTypePasswordCheck()) { // no password and check is required
- return "no client service type password provided";
+ return TMsgPqCodes("no client service type password provided", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
}
}
@@ -244,7 +281,10 @@ namespace NKikimr::NGRpcProxy::V1 {
for(const auto& codec : rr.supported_codecs().codecs()) {
if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) {
- return TStringBuilder() << "Unknown codec for consumer '" << rr.name() << "' with value " << codec;
+ return TMsgPqCodes(
+ TStringBuilder() << "Unknown codec for consumer '" << rr.name() << "' with value " << codec,
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
+ );
}
ct->AddIds(codec - 1);
ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM");
@@ -252,12 +292,12 @@ namespace NKikimr::NGRpcProxy::V1 {
if (rr.important()) {
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
- return TStringBuilder() << "important flag is forbiden for consumer " << rr.name();
+ return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
- return "";
+ return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
}
@@ -433,10 +473,15 @@ namespace NKikimr::NGRpcProxy::V1 {
return error.empty() ? Ydb::StatusIds::SUCCESS : (hasDuplicates ? dubsStatus : Ydb::StatusIds::BAD_REQUEST);
}
- NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode) {
+ NYql::TIssue FillIssue(const TString& errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode) {
NYql::TIssue res(NYql::TPosition(), errorReason);
res.SetCode(errorCode, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR);
+ return res;
+ }
+ NYql::TIssue FillIssue(const TString& errorReason, const size_t errorCode) {
+ NYql::TIssue res(NYql::TPosition(), errorReason);
+ res.SetCode(errorCode, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR);
return res;
}
@@ -752,8 +797,9 @@ namespace NKikimr::NGRpcProxy::V1 {
}
const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
for (const auto& rr : settings.read_rules()) {
- error = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, ctx);
- if (!error.Empty()) {
+ auto messageAndCode = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, ctx);
+ if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
+ error = messageAndCode.Message;
return Ydb::StatusIds::BAD_REQUEST;
}
}
@@ -879,7 +925,7 @@ namespace NKikimr::NGRpcProxy::V1 {
return true;
}
- Ydb::StatusIds::StatusCode FillProposeRequestImpl(
+ TYdbPqCodes FillProposeRequestImpl(
const TString& name, const Ydb::Topic::CreateTopicRequest& request,
NKikimrSchemeOp::TModifyScheme& modifyScheme, const TActorContext& ctx,
TString& error, const TString& path, const TString& database, const TString& localDc
@@ -894,7 +940,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (request.has_partitioning_settings()) {
if (request.partitioning_settings().min_active_partitions() < 0) {
error = TStringBuilder() << "Partitions count must be positive, provided " << request.partitioning_settings().min_active_partitions();
- return Ydb::StatusIds::BAD_REQUEST;
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
parts = request.partitioning_settings().min_active_partitions();
if (parts == 0) parts = 1;
@@ -916,7 +962,7 @@ namespace NKikimr::NGRpcProxy::V1 {
auto res = ProcessAttributes(request.attributes(), pqDescr, error, false);
if (res != Ydb::StatusIds::SUCCESS) {
- return res;
+ return TYdbPqCodes(res, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
bool local = true; // TODO: check here cluster;
@@ -930,7 +976,7 @@ namespace NKikimr::NGRpcProxy::V1 {
if (!converter->IsValid()) {
error = TStringBuilder() << "Bad topic: " << converter->GetReason();
- return Ydb::StatusIds::BAD_REQUEST;
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
config->SetLocalDC(local);
config->SetDC(converter->GetCluster());
@@ -951,6 +997,11 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->MutableExplicitChannelProfiles()->CopyFrom(channelProfiles);
}
if (request.has_retention_period()) {
+ if (request.retention_period().seconds() <= 0) {
+ error = TStringBuilder() << "retention_period must be not negative, provided " <<
+ request.retention_period().DebugString();
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
+ }
partConfig->SetLifetimeSeconds(request.retention_period().seconds());
} else {
partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
@@ -979,7 +1030,7 @@ namespace NKikimr::NGRpcProxy::V1 {
for(const auto& codec : request.supported_codecs().codecs()) {
if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) {
error = TStringBuilder() << "Unknown codec with value " << codec;
- return Ydb::StatusIds::BAD_REQUEST;
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
ct->AddIds(codec - 1);
ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM");
@@ -988,32 +1039,33 @@ namespace NKikimr::NGRpcProxy::V1 {
if (request.consumers_size() > MAX_READ_RULES_COUNT) {
error = TStringBuilder() << "consumers count cannot be more than "
<< MAX_READ_RULES_COUNT << ", provided " << request.consumers_size();
- return Ydb::StatusIds::BAD_REQUEST;
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
{
error = ReadRuleServiceTypeMigration(config, ctx);
if (error) {
- return Ydb::StatusIds::INTERNAL_ERROR;
+ return TYdbPqCodes(Ydb::StatusIds::INTERNAL_ERROR, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
}
Ydb::StatusIds::StatusCode code;
if (!FillMeteringMode(request.metering_mode(), *config, pqConfig.GetBillingMeteringConfig().GetEnabled(), false, code, error)) {
- return code;
+ return TYdbPqCodes(code, Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
const auto& supportedClientServiceTypes = GetSupportedClientServiceTypes(ctx);
for (const auto& rr : request.consumers()) {
- error = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, true, ctx);
- if (!error.Empty()) {
- return Ydb::StatusIds::BAD_REQUEST;
+ auto messageAndCode = AddReadRuleToConfig(config, rr, supportedClientServiceTypes, true, ctx);
+ if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
+ error = messageAndCode.Message;
+ return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, messageAndCode.PQCode);
}
}
- return CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST);
+ return TYdbPqCodes(CheckConfig(*config, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::BAD_REQUEST), Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
@@ -1183,8 +1235,9 @@ namespace NKikimr::NGRpcProxy::V1 {
config->ClearReadRuleVersions();
for (const auto& rr : consumers) {
- error = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx);
- if (!error.Empty()) {
+ auto messageAndCode = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx);
+ if (messageAndCode.PQCode != Ydb::PersQueue::ErrorCode::OK) {
+ error = messageAndCode.Message;
return Ydb::StatusIds::BAD_REQUEST;
}
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 08fe0a71dbf..d58f0da55b0 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -10,6 +10,25 @@
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/hfunc.h>
+
+struct TMsgPqCodes {
+ TString Message;
+ Ydb::PersQueue::ErrorCode::ErrorCode PQCode;
+
+ TMsgPqCodes(TString const& message, Ydb::PersQueue::ErrorCode::ErrorCode pqCode)
+ : Message(message), PQCode(pqCode) {}
+};
+
+struct TYdbPqCodes {
+ Ydb::StatusIds::StatusCode YdbCode;
+ Ydb::PersQueue::ErrorCode::ErrorCode PQCode;
+
+ TYdbPqCodes(Ydb::StatusIds::StatusCode YdbCode, Ydb::PersQueue::ErrorCode::ErrorCode PQCode)
+ : YdbCode(YdbCode),
+ PQCode(PQCode) {}
+};
+
+
namespace NKikimr::NGRpcProxy::V1 {
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
@@ -24,7 +43,7 @@ namespace NKikimr::NGRpcProxy::V1 {
const TString& localDc = TString()
);
- Ydb::StatusIds::StatusCode FillProposeRequestImpl(
+ TYdbPqCodes FillProposeRequestImpl(
const TString& name,
const Ydb::Topic::CreateTopicRequest& request,
NKikimrSchemeOp::TModifyScheme& modifyScheme,
@@ -33,7 +52,6 @@ namespace NKikimr::NGRpcProxy::V1 {
const TString& path,
const TString& database = TString(),
const TString& localDc = TString()
-
);
Ydb::StatusIds::StatusCode FillProposeRequestImpl(
@@ -58,7 +76,7 @@ namespace NKikimr::NGRpcProxy::V1 {
TString& error, const TActorContext& ctx,
const Ydb::StatusIds::StatusCode dubsStatus = Ydb::StatusIds::BAD_REQUEST);
- TString AddReadRuleToConfig(
+ TMsgPqCodes AddReadRuleToConfig(
NKikimrPQ::TPQTabletConfig *config,
const Ydb::PersQueue::V1::TopicSettings::ReadRule& rr,
const TClientServiceTypes& supportedReadRuleServiceTypes,
@@ -71,7 +89,7 @@ namespace NKikimr::NGRpcProxy::V1 {
const TActorContext& ctx
);
NYql::TIssue FillIssue(const TString &errorReason, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode);
-
+ NYql::TIssue FillIssue(const TString &errorReason, const size_t errorCode);
template <typename T>
class THasCdcStreamCompatibility {
@@ -185,8 +203,8 @@ namespace NKikimr::NGRpcProxy::V1 {
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
this->Request_->RaiseIssue(
FillIssue(
- TStringBuilder() << "path '" << path << "' is not a topic",
- Ydb::PersQueue::ErrorCode::ERROR
+ TStringBuilder() << "path '" << path << "' is not a stream",
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -221,7 +239,7 @@ namespace NKikimr::NGRpcProxy::V1 {
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' is not compatible scheme object",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -229,7 +247,7 @@ namespace NKikimr::NGRpcProxy::V1 {
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' creation is not completed",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -242,7 +260,7 @@ namespace NKikimr::NGRpcProxy::V1 {
FillIssue(
TStringBuilder() << "path '" << path << "' does not exist or you " <<
"do not have access rights",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::ACCESS_DENIED
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -251,7 +269,7 @@ namespace NKikimr::NGRpcProxy::V1 {
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "table creation is not completed",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -261,7 +279,7 @@ namespace NKikimr::NGRpcProxy::V1 {
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' is not a table",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -271,7 +289,7 @@ namespace NKikimr::NGRpcProxy::V1 {
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "unknown database root",
- Ydb::PersQueue::ErrorCode::ERROR
+ Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
)
);
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
@@ -292,6 +310,14 @@ namespace NKikimr::NGRpcProxy::V1 {
IsDead = true;
}
+ void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus,
+ const TString& messageText, const NActors::TActorContext& ctx) {
+ this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus));
+ this->Request_->ReplyWithYdbStatus(status);
+ this->Die(ctx);
+ IsDead = true;
+ }
+
void ReplyWithResult(Ydb::StatusIds::StatusCode status, const NActors::TActorContext& ctx) {
this->Request_->ReplyWithYdbStatus(status);
this->Die(ctx);
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 0eddfe0cf23..f9bdd72464f 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -237,14 +237,17 @@ void TAddReadRuleActor::ModifyPersqueueConfig(
rule.set_version(selfInfo.GetVersion().GetPQVersion());
}
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
- TString error = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx);
- auto status = error.empty() ? CheckConfig(*pqConfig, serviceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS)
+
+ TString error;
+ auto messageAndCode = AddReadRuleToConfig(pqConfig, rule, serviceTypes, ctx);
+ auto status = messageAndCode.PQCode == Ydb::PersQueue::ErrorCode::OK ?
+ CheckConfig(*pqConfig, serviceTypes, messageAndCode.Message, ctx, Ydb::StatusIds::ALREADY_EXISTS)
: Ydb::StatusIds::BAD_REQUEST;
if (status != Ydb::StatusIds::SUCCESS) {
return ReplyWithError(status,
status == Ydb::StatusIds::ALREADY_EXISTS ? Ydb::PersQueue::ErrorCode::OK
: Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- error, ctx);
+ messageAndCode.Message, ctx);
}
}
@@ -333,7 +336,6 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti
{
TString error;
-
auto status = FillProposeRequestImpl(name, GetProtoRequest()->settings(), modifyScheme, ctx, false, error,
workingDir, proposal.Record.GetDatabaseName(), LocalCluster);
if (!error.empty()) {
@@ -367,7 +369,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
TString error;
auto status = FillProposeRequestImpl(name, *GetProtoRequest(), modifyScheme, ctx, error,
- workingDir, proposal.Record.GetDatabaseName(), LocalCluster);
+ workingDir, proposal.Record.GetDatabaseName(), LocalCluster).YdbCode;
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));