diff options
author | molotkov-and <molotkov-and@ydb.tech> | 2023-07-21 08:39:32 +0300 |
---|---|---|
committer | molotkov-and <molotkov-and@ydb.tech> | 2023-07-21 08:39:32 +0300 |
commit | 91b32fadb3237b325674c27da4c48d741d7dcda3 (patch) | |
tree | 7acd0d533e8a66dbfdb61c542c9b52ad7dffbf39 | |
parent | 89f40072e93575b769621fe9baa8b0fe88c9f226 (diff) | |
download | ydb-91b32fadb3237b325674c27da4c48d741d7dcda3.tar.gz |
KIKIMR-18712: Retry first retryable error from access service immediately
-rw-r--r-- | ydb/core/security/ticket_parser_impl.h | 38 | ||||
-rw-r--r-- | ydb/core/security/ticket_parser_ut.cpp | 118 | ||||
-rw-r--r-- | ydb/library/testlib/service_mocks/access_service_mock.h | 9 |
3 files changed, 146 insertions, 19 deletions
diff --git a/ydb/core/security/ticket_parser_impl.h b/ydb/core/security/ticket_parser_impl.h index 0814e47001..d711470046 100644 --- a/ydb/core/security/ticket_parser_impl.h +++ b/ydb/core/security/ticket_parser_impl.h @@ -388,6 +388,8 @@ class TTicketParserImpl : public TActorBootstrapped<TDerived> { } auto& record = it->second; + record.CurrentDelay = MinErrorRefreshTime; + record.RefreshRetryableErrorImmediately = true; record.PeerName = std::move(ev->Get()->PeerName); record.Database = std::move(ev->Get()->Database); record.Signature = ev->Get()->Signature; @@ -878,11 +880,11 @@ protected: TInstant RefreshTime; TInstant ExpireTime; TInstant AccessTime; - TDuration CurrentMaxRefreshTime = TDuration::Seconds(1); - TDuration CurrentMinRefreshTime = TDuration::Seconds(1); + TDuration CurrentDelay = TDuration::Seconds(1); TString PeerName; TString Database; TStackVec<TString> AdditionalSIDs; + bool RefreshRetryableErrorImmediately = false; TTokenRecordBase(const TStringBuf ticket) : Ticket(ticket) @@ -916,32 +918,24 @@ protected: template <typename T> void SetErrorRefreshTime(TTicketParserImpl<T>* ticketParser, TInstant now) { if (Error.Retryable) { - if (CurrentMaxRefreshTime < ticketParser->MaxErrorRefreshTime) { - CurrentMaxRefreshTime += ticketParser->MinErrorRefreshTime; + SetRefreshTime(now, CurrentDelay); + if (CurrentDelay < ticketParser->MaxErrorRefreshTime - ticketParser->MinErrorRefreshTime) { + static const double scaleFactor = 2.0; + CurrentDelay = Min(CurrentDelay * scaleFactor, ticketParser->MaxErrorRefreshTime - ticketParser->MinErrorRefreshTime); } - CurrentMinRefreshTime = ticketParser->MinErrorRefreshTime; } else { - CurrentMaxRefreshTime = ticketParser->RefreshTime; - CurrentMinRefreshTime = CurrentMaxRefreshTime / 2; + SetRefreshTime(now, ticketParser->RefreshTime - ticketParser->RefreshTime / 2); } - SetRefreshTime(now); } template <typename T> void SetOkRefreshTime(TTicketParserImpl<T>* ticketParser, TInstant now) { - CurrentMaxRefreshTime = ticketParser->RefreshTime; - CurrentMinRefreshTime = CurrentMaxRefreshTime / 2; - SetRefreshTime(now); + SetRefreshTime(now, ticketParser->RefreshTime - ticketParser->RefreshTime / 2); } - void SetRefreshTime(TInstant now) { - if (CurrentMinRefreshTime < CurrentMaxRefreshTime) { - TDuration currentDuration = CurrentMaxRefreshTime - CurrentMinRefreshTime; - TDuration refreshDuration = CurrentMinRefreshTime + TDuration::MilliSeconds(RandomNumber<double>() * currentDuration.MilliSeconds()); - RefreshTime = now + refreshDuration; - } else { - RefreshTime = now + CurrentMinRefreshTime; - } + void SetRefreshTime(TInstant now, TDuration delay) { + const TDuration::TValue half = delay.GetValue() / 2; + RefreshTime = now + TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); } TString GetSubject() const { @@ -1079,6 +1073,7 @@ protected: } else { record.RefreshTime = record.ExpireTime; } + record.RefreshRetryableErrorImmediately = true; CounterTicketsSuccess->Inc(); CounterTicketsBuildTime->Collect((now - record.InitTime).MilliSeconds()); BLOG_D("Ticket " << MaskTicket(record.Ticket) << " (" @@ -1096,6 +1091,11 @@ protected: CounterTicketsErrorsRetryable->Inc(); BLOG_D("Ticket " << MaskTicket(record.Ticket) << " (" << record.PeerName << ") has now retryable error message '" << error.Message << "'"); + if (record.RefreshRetryableErrorImmediately) { + record.RefreshRetryableErrorImmediately = false; + GetDerived()->CanRefreshTicket(key, record); + Respond(record); + } } else { record.UnsetToken(); record.SetOkRefreshTime(this, now); diff --git a/ydb/core/security/ticket_parser_ut.cpp b/ydb/core/security/ticket_parser_ut.cpp index 8eb89e0701..cddba2f544 100644 --- a/ydb/core/security/ticket_parser_ut.cpp +++ b/ydb/core/security/ticket_parser_ut.cpp @@ -489,6 +489,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { authConfig.SetUseAccessServiceTLS(false); authConfig.SetAccessServiceEndpoint(accessServiceEndpoint); authConfig.SetUseStaff(false); + authConfig.SetMinErrorRefreshTime("300ms"); auto settings = TServerSettings(port, authConfig); settings.SetDomainName("Root"); settings.CreateTicketParser = NKikimr::CreateTicketParser; @@ -519,6 +520,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { UNIT_ASSERT(result->Error.Retryable); UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable"); + Sleep(TDuration::Seconds(2)); accessServiceMock.ShouldGenerateRetryableError = false; Sleep(TDuration::Seconds(10)); @@ -529,6 +531,60 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as"); } + Y_UNIT_TEST(AuthenticationRetryErrorImmediately) { + using namespace Tests; + + TPortManager tp; + ui16 port = tp.GetPort(2134); + ui16 grpcPort = tp.GetPort(2135); + ui16 servicePort = tp.GetPort(4284); + TString accessServiceEndpoint = "localhost:" + ToString(servicePort); + NKikimrProto::TAuthConfig authConfig; + authConfig.SetUseBlackBox(false); + authConfig.SetUseAccessService(true); + authConfig.SetUseAccessServiceTLS(false); + authConfig.SetAccessServiceEndpoint(accessServiceEndpoint); + authConfig.SetUseStaff(false); + authConfig.SetRefreshPeriod("5s"); + auto settings = TServerSettings(port, authConfig); + settings.SetDomainName("Root"); + settings.CreateTicketParser = NKikimr::CreateTicketParser; + TServer server(settings); + server.EnableGRpc(grpcPort); + server.GetRuntime()->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + server.GetRuntime()->SetLogPriority(NKikimrServices::GRPC_CLIENT, NLog::PRI_TRACE); + TClient client(settings); + NClient::TKikimr kikimr(client.GetClientConfig()); + client.InitRootScheme(); + + // Access Server Mock + NKikimr::TAccessServiceMock accessServiceMock; + grpc::ServerBuilder builder; + builder.AddListeningPort(accessServiceEndpoint, grpc::InsecureServerCredentials()).RegisterService(&accessServiceMock); + std::unique_ptr<grpc::Server> accessServer(builder.BuildAndStart()); + + TTestActorRuntime* runtime = server.GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + TAutoPtr<IEventHandle> handle; + + accessServiceMock.ShouldGenerateOneRetryableError = true; + TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature signature {.AccessKeyId = "keyId"}; + TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature retrySignature = signature; + runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(signature), "", {})), 0); + TEvTicketParser::TEvAuthorizeTicketResult* result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle); + UNIT_ASSERT(!result->Error.empty()); + UNIT_ASSERT(result->Error.Retryable); + UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable"); + + Sleep(TDuration::Seconds(2)); + + runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(retrySignature), "", {})), 0); + result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle); + UNIT_ASSERT(result->Error.empty()); + UNIT_ASSERT(result->Token != nullptr); + UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as"); + } + Y_UNIT_TEST(AuthorizationRetryError) { using namespace Tests; @@ -543,6 +599,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { authConfig.SetUseAccessServiceTLS(false); authConfig.SetAccessServiceEndpoint(accessServiceEndpoint); authConfig.SetUseStaff(false); + authConfig.SetMinErrorRefreshTime("300ms"); auto settings = TServerSettings(port, authConfig); settings.SetDomainName("Root"); settings.CreateTicketParser = NKikimr::CreateTicketParser; @@ -577,6 +634,7 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { UNIT_ASSERT(result->Error.Retryable); UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable"); + Sleep(TDuration::Seconds(2)); accessServiceMock.ShouldGenerateRetryableError = false; Sleep(TDuration::Seconds(10)); @@ -589,6 +647,66 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { UNIT_ASSERT(!result->Token->IsExist("something.write-bbbb4554@as")); } + Y_UNIT_TEST(AuthorizationRetryErrorImmediately) { + using namespace Tests; + + TPortManager tp; + ui16 port = tp.GetPort(2134); + ui16 grpcPort = tp.GetPort(2135); + ui16 servicePort = tp.GetPort(4284); + TString accessServiceEndpoint = "localhost:" + ToString(servicePort); + NKikimrProto::TAuthConfig authConfig; + authConfig.SetUseBlackBox(false); + authConfig.SetUseAccessService(true); + authConfig.SetUseAccessServiceTLS(false); + authConfig.SetAccessServiceEndpoint(accessServiceEndpoint); + authConfig.SetUseStaff(false); + authConfig.SetRefreshPeriod("5s"); + auto settings = TServerSettings(port, authConfig); + settings.SetDomainName("Root"); + settings.CreateTicketParser = NKikimr::CreateTicketParser; + TServer server(settings); + server.EnableGRpc(grpcPort); + server.GetRuntime()->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + server.GetRuntime()->SetLogPriority(NKikimrServices::GRPC_CLIENT, NLog::PRI_TRACE); + TClient client(settings); + NClient::TKikimr kikimr(client.GetClientConfig()); + client.InitRootScheme(); + + // Access Server Mock + NKikimr::TAccessServiceMock accessServiceMock; + grpc::ServerBuilder builder; + builder.AddListeningPort(accessServiceEndpoint, grpc::InsecureServerCredentials()).RegisterService(&accessServiceMock); + std::unique_ptr<grpc::Server> accessServer(builder.BuildAndStart()); + + TTestActorRuntime* runtime = server.GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + TAutoPtr<IEventHandle> handle; + + accessServiceMock.ShouldGenerateOneRetryableError = true; + TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature signature {.AccessKeyId = "keyId"}; + TEvTicketParser::TEvAuthorizeTicket::TAccessKeySignature retrySignature = signature; + const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ + TEvTicketParser::TEvAuthorizeTicket::ToPermissions({"something.read"}), + {{"folder_id", "aaaa1234"}, {"database_id", "bbbb4554"}} + }}; + runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(signature), "", entries)), 0); + TEvTicketParser::TEvAuthorizeTicketResult* result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle); + UNIT_ASSERT(!result->Error.empty()); + UNIT_ASSERT(result->Error.Retryable); + UNIT_ASSERT_VALUES_EQUAL(result->Error.Message, "Service Unavailable"); + + Sleep(TDuration::Seconds(2)); + + runtime->Send(new IEventHandle(MakeTicketParserID(), sender, new TEvTicketParser::TEvAuthorizeTicket(std::move(retrySignature), "", entries)), 0); + result = runtime->GrabEdgeEvent<TEvTicketParser::TEvAuthorizeTicketResult>(handle); + UNIT_ASSERT(result->Error.empty()); + UNIT_ASSERT(result->Token != nullptr); + UNIT_ASSERT_VALUES_EQUAL(result->Token->GetUserSID(), "user1@as"); + UNIT_ASSERT(result->Token->IsExist("something.read-bbbb4554@as")); + UNIT_ASSERT(!result->Token->IsExist("something.write-bbbb4554@as")); + } + Y_UNIT_TEST(AuthenticationUnsupported) { using namespace Tests; diff --git a/ydb/library/testlib/service_mocks/access_service_mock.h b/ydb/library/testlib/service_mocks/access_service_mock.h index 8de34b8de2..4ea652225a 100644 --- a/ydb/library/testlib/service_mocks/access_service_mock.h +++ b/ydb/library/testlib/service_mocks/access_service_mock.h @@ -77,6 +77,7 @@ public: THashMap<TString, TString> AllowedServiceTokens = {{"service1", "root1/folder1"}}; bool ShouldGenerateRetryableError = false; + bool ShouldGenerateOneRetryableError = false; grpc::Status Authenticate( grpc::ServerContext*, @@ -88,6 +89,10 @@ public: if (ShouldGenerateRetryableError) { return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable"); } + if (ShouldGenerateOneRetryableError) { + ShouldGenerateOneRetryableError = false; + return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable"); + } response->mutable_subject()->mutable_user_account()->set_id("user1"); return grpc::Status::OK; } else { @@ -125,6 +130,10 @@ public: if (ShouldGenerateRetryableError) { return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable"); } + if (ShouldGenerateOneRetryableError) { + ShouldGenerateOneRetryableError = false; + return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Service Unavailable"); + } response->mutable_subject()->mutable_user_account()->set_id("user1"); return grpc::Status::OK; } else { |