aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormolotkov-and <molotkov-and@ydb.tech>2023-07-21 08:39:32 +0300
committermolotkov-and <molotkov-and@ydb.tech>2023-07-21 08:39:32 +0300
commit91b32fadb3237b325674c27da4c48d741d7dcda3 (patch)
tree7acd0d533e8a66dbfdb61c542c9b52ad7dffbf39
parent89f40072e93575b769621fe9baa8b0fe88c9f226 (diff)
downloadydb-91b32fadb3237b325674c27da4c48d741d7dcda3.tar.gz
KIKIMR-18712: Retry first retryable error from access service immediately
-rw-r--r--ydb/core/security/ticket_parser_impl.h38
-rw-r--r--ydb/core/security/ticket_parser_ut.cpp118
-rw-r--r--ydb/library/testlib/service_mocks/access_service_mock.h9
3 files changed, 146 insertions, 19 deletions
diff --git a/ydb/core/security/ticket_parser_impl.h b/ydb/core/security/ticket_parser_impl.h
index 0814e47001..d711470046 100644
--- a/ydb/core/security/ticket_parser_impl.h
+++ b/ydb/core/security/ticket_parser_impl.h
@@ -388,6 +388,8 @@ class TTicketParserImpl : public TActorBootstrapped<TDerived> {
}
auto& record = it->second;
+ record.CurrentDelay = MinErrorRefreshTime;
+ record.RefreshRetryableErrorImmediately = true;
record.PeerName = std::move(ev->Get()->PeerName);
record.Database = std::move(ev->Get()->Database);
record.Signature = ev->Get()->Signature;
@@ -878,11 +880,11 @@ protected:
TInstant RefreshTime;
TInstant ExpireTime;
TInstant AccessTime;
- TDuration CurrentMaxRefreshTime = TDuration::Seconds(1);
- TDuration CurrentMinRefreshTime = TDuration::Seconds(1);
+ TDuration CurrentDelay = TDuration::Seconds(1);
TString PeerName;
TString Database;
TStackVec<TString> AdditionalSIDs;
+ bool RefreshRetryableErrorImmediately = false;
TTokenRecordBase(const TStringBuf ticket)
: Ticket(ticket)
@@ -916,32 +918,24 @@ protected:
template <typename T>
void SetErrorRefreshTime(TTicketParserImpl<T>* ticketParser, TInstant now) {
if (Error.Retryable) {
- if (CurrentMaxRefreshTime < ticketParser->MaxErrorRefreshTime) {
- CurrentMaxRefreshTime += ticketParser->MinErrorRefreshTime;
+ SetRefreshTime(now, CurrentDelay);
+ if (CurrentDelay < ticketParser->MaxErrorRefreshTime - ticketParser->MinErrorRefreshTime) {
+ static const double scaleFactor = 2.0;
+ CurrentDelay = Min(CurrentDelay * scaleFactor, ticketParser->MaxErrorRefreshTime - ticketParser->MinErrorRefreshTime);
}
- CurrentMinRefreshTime = ticketParser->MinErrorRefreshTime;
} else {
- CurrentMaxRefreshTime = ticketParser->RefreshTime;
- CurrentMinRefreshTime = CurrentMaxRefreshTime / 2;
+ SetRefreshTime(now, ticketParser->RefreshTime - ticketParser->RefreshTime / 2);
}
- SetRefreshTime(now);
}
template <typename T>
void SetOkRefreshTime(TTicketParserImpl<T>* ticketParser, TInstant now) {
- CurrentMaxRefreshTime = ticketParser->RefreshTime;
- CurrentMinRefreshTime = CurrentMaxRefreshTime / 2;
- SetRefreshTime(now);
+ SetRefreshTime(now, ticketParser->RefreshTime - ticketParser->RefreshTime / 2);
}
- void SetRefreshTime(TInstant now) {
- if (CurrentMinRefreshTime < CurrentMaxRefreshTime) {
- TDuration currentDuration = CurrentMaxRefreshTime - CurrentMinRefreshTime;
- TDuration refreshDuration = CurrentMinRefreshTime + TDuration::MilliSeconds(RandomNumber<double>() * currentDuration.MilliSeconds());
- RefreshTime = now + refreshDuration;
- } else {
- RefreshTime = now + CurrentMinRefreshTime;
- }
+ void SetRefreshTime(TInstant now, TDuration delay) {
+ const TDuration::TValue half = delay.GetValue() / 2;
+ RefreshTime = now + TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
}
TString GetSubject() const {
@@ -1079,6 +1073,7 @@ protected:
} else {
record.RefreshTime = record.ExpireTime;
}
+ record.RefreshRetryableErrorImmediately = true;
CounterTicketsSuccess->Inc();
CounterTicketsBuildTime->Collect((now - record.InitTime).MilliSeconds());
BLOG_D("Ticket " << MaskTicket(record.Ticket) << " ("
@@ -1096,6 +1091,11 @@ protected:
CounterTicketsErrorsRetryable->Inc();
BLOG_D("Ticket " << MaskTicket(record.Ticket) << " ("
<< record.PeerName << ") has now retryable error message '" << error.Message << "'");
+ if (record.RefreshRetryableErrorImmediately) {
+ record.RefreshRetryableErrorImmediately = false;
+ GetDerived()->CanRefreshTicket(key, record);
+ Respond(record);
+ }
} else {
record.UnsetToken();
record.SetOkRefreshTime(this, now);
diff --git a/ydb/core/security/ticket_parser_ut.cpp b/ydb/core/security/ticket_parser_ut.cpp
index 8eb89e0701..cddba2f544 100644
--- a/ydb/core/security/ticket_parser_ut.cpp
+++ b/ydb/core/security/ticket_parser_ut.cpp
@@ -489,6 +489,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
authConfig.SetUseAccessServiceTLS(false);
authConfig.SetAccessServiceEndpoint(accessServiceEndpoint);
authConfig.SetUseStaff(false);
+ authConfig.SetMinErrorRefreshTime("300ms");
auto settings = TServerSettings(port, authConfig);
settings.SetDomainName("Root");
settings.CreateTicketParser = NKikimr::CreateTicketParser;
@@ -519,6 +520,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
UNIT_ASSERT(result->Error.Retryable);
UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable");
+ Sleep(TDuration::Seconds(2));
accessServiceMock.ShouldGenerateRetryableError = false;
Sleep(TDuration::Seconds(10));
@@ -529,6 +531,60 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as");
}
+ Y_UNIT_TEST(AuthenticationRetryErrorImmediately) {
+ using namespace Tests;
+
+ TPortManager tp;
+ ui16 port = tp.GetPort(2134);
+ ui16 grpcPort = tp.GetPort(2135);
+ ui16 servicePort = tp.GetPort(4284);
+ TString accessServiceEndpoint = "localhost:" + ToString(servicePort);
+ NKikimrProto::TAuthConfig authConfig;
+ authConfig.SetUseBlackBox(false);
+ authConfig.SetUseAccessService(true);
+ authConfig.SetUseAccessServiceTLS(false);
+ authConfig.SetAccessServiceEndpoint(accessServiceEndpoint);
+ authConfig.SetUseStaff(false);
+ authConfig.SetRefreshPeriod("5s");
+ auto settings = TServerSettings(port, authConfig);
+ settings.SetDomainName("Root");
+ settings.CreateTicketParser = NKikimr::CreateTicketParser;
+ TServer server(settings);
+ server.EnableGRpc(grpcPort);
+ server.GetRuntime()->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
+ server.GetRuntime()->SetLogPriority(NKikimrServices::GRPC_CLIENT, NLog::PRI_TRACE);
+ TClient client(settings);
+ NClient::TKikimr kikimr(client.GetClientConfig());
+ client.InitRootScheme();
+
+ // Access Server Mock
+ NKikimr::TAccessServiceMock accessServiceMock;
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(accessServiceEndpoint, grpc::InsecureServerCredentials()).RegisterService(&accessServiceMock);
+ std::unique_ptr<grpc::Server> accessServer(builder.BuildAndStart());
+
+ TTestActorRuntime* runtime = server.GetRuntime();
+ TActorId sender = runtime->AllocateEdgeActor();
+ TAutoPtr<IEventHandle> handle;
+
+ accessServiceMock.ShouldGenerateOneRetryableError = true;
+ TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature signature {.AccessKeyId = "keyId"};
+ TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature retrySignature = signature;
+ runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(signature), "", {})), 0);
+ TEvTicketParser::TEvAuthorizeTicketResult* result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle);
+ UNIT_ASSERT(!result->Error.empty());
+ UNIT_ASSERT(result->Error.Retryable);
+ UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable");
+
+ Sleep(TDuration::Seconds(2));
+
+ runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(retrySignature), "", {})), 0);
+ result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle);
+ UNIT_ASSERT(result->Error.empty());
+ UNIT_ASSERT(result->Token != nullptr);
+ UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as");
+ }
+
Y_UNIT_TEST(AuthorizationRetryError) {
using namespace Tests;
@@ -543,6 +599,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
authConfig.SetUseAccessServiceTLS(false);
authConfig.SetAccessServiceEndpoint(accessServiceEndpoint);
authConfig.SetUseStaff(false);
+ authConfig.SetMinErrorRefreshTime("300ms");
auto settings = TServerSettings(port, authConfig);
settings.SetDomainName("Root");
settings.CreateTicketParser = NKikimr::CreateTicketParser;
@@ -577,6 +634,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
UNIT_ASSERT(result->Error.Retryable);
UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable");
+ Sleep(TDuration::Seconds(2));
accessServiceMock.ShouldGenerateRetryableError = false;
Sleep(TDuration::Seconds(10));
@@ -589,6 +647,66 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
UNIT_ASSERT(!result->Token->IsExist("something.write-bbbb4554@as"));
}
+ Y_UNIT_TEST(AuthorizationRetryErrorImmediately) {
+ using namespace Tests;
+
+ TPortManager tp;
+ ui16 port = tp.GetPort(2134);
+ ui16 grpcPort = tp.GetPort(2135);
+ ui16 servicePort = tp.GetPort(4284);
+ TString accessServiceEndpoint = "localhost:" + ToString(servicePort);
+ NKikimrProto::TAuthConfig authConfig;
+ authConfig.SetUseBlackBox(false);
+ authConfig.SetUseAccessService(true);
+ authConfig.SetUseAccessServiceTLS(false);
+ authConfig.SetAccessServiceEndpoint(accessServiceEndpoint);
+ authConfig.SetUseStaff(false);
+ authConfig.SetRefreshPeriod("5s");
+ auto settings = TServerSettings(port, authConfig);
+ settings.SetDomainName("Root");
+ settings.CreateTicketParser = NKikimr::CreateTicketParser;
+ TServer server(settings);
+ server.EnableGRpc(grpcPort);
+ server.GetRuntime()->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
+ server.GetRuntime()->SetLogPriority(NKikimrServices::GRPC_CLIENT, NLog::PRI_TRACE);
+ TClient client(settings);
+ NClient::TKikimr kikimr(client.GetClientConfig());
+ client.InitRootScheme();
+
+ // Access Server Mock
+ NKikimr::TAccessServiceMock accessServiceMock;
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(accessServiceEndpoint, grpc::InsecureServerCredentials()).RegisterService(&accessServiceMock);
+ std::unique_ptr<grpc::Server> accessServer(builder.BuildAndStart());
+
+ TTestActorRuntime* runtime = server.GetRuntime();
+ TActorId sender = runtime->AllocateEdgeActor();
+ TAutoPtr<IEventHandle> handle;
+
+ accessServiceMock.ShouldGenerateOneRetryableError = true;
+ TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature signature {.AccessKeyId = "keyId"};
+ TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature retrySignature = signature;
+ const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{
+ TEvTicketParser::TEvAuthorizeTicket::ToPermissions({"something.read"}),
+ {{"folder_id", "aaaa1234"}, {"database_id", "bbbb4554"}}
+ }};
+ runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(signature), "", entries)), 0);
+ TEvTicketParser::TEvAuthorizeTicketResult* result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle);
+ UNIT_ASSERT(!result->Error.empty());
+ UNIT_ASSERT(result->Error.Retryable);
+ UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable");
+
+ Sleep(TDuration::Seconds(2));
+
+ runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(retrySignature), "", entries)), 0);
+ result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle);
+ UNIT_ASSERT(result->Error.empty());
+ UNIT_ASSERT(result->Token != nullptr);
+ UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as");
+ UNIT_ASSERT(result->Token->IsExist("something.read-bbbb4554@as"));
+ UNIT_ASSERT(!result->Token->IsExist("something.write-bbbb4554@as"));
+ }
+
Y_UNIT_TEST(AuthenticationUnsupported) {
using namespace Tests;
diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h
index 8de34b8de2..4ea652225a 100644
--- a/ydb/library/testlib/service_mocks/access_service_mock.h
+++ b/ydb/library/testlib/service_mocks/access_service_mock.h
@@ -77,6 +77,7 @@ public:
THashMap<TString, TString> AllowedServiceTokens = {{"service1", "root1/folder1"}};
bool ShouldGenerateRetryableError = false;
+ bool ShouldGenerateOneRetryableError = false;
grpc::Status Authenticate(
grpc::ServerContext*,
@@ -88,6 +89,10 @@ public:
if (ShouldGenerateRetryableError) {
return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable");
}
+ if (ShouldGenerateOneRetryableError) {
+ ShouldGenerateOneRetryableError = false;
+ return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable");
+ }
response->mutable_subject()->mutable_user_account()->set_id("user1");
return grpc::Status::OK;
} else {
@@ -125,6 +130,10 @@ public:
if (ShouldGenerateRetryableError) {
return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable");
}
+ if (ShouldGenerateOneRetryableError) {
+ ShouldGenerateOneRetryableError = false;
+ return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable");
+ }
response->mutable_subject()->mutable_user_account()->set_id("user1");
return grpc::Status::OK;
} else {