diff options
author | alexnick <alexnick@ydb.tech> | 2023-02-14 16:10:12 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-02-14 16:10:12 +0300 |
commit | 68fb62b3af9716c5055c7b2f44e109eaf71ec318 (patch) | |
tree | b1d505f6def617849a40dcc0fed5f579fcb84690 | |
parent | 32778d66b1d27930f41bf2657a78b8ea668f0f10 (diff) | |
download | ydb-68fb62b3af9716c5055c7b2f44e109eaf71ec318.tar.gz |
fix for
fix for
6 files changed, 21 insertions, 15 deletions
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h index ea90f8b427..56a03bff14 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.h +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h @@ -15,8 +15,6 @@ namespace NKikimr::NGRpcProxy::V1 { #endif #define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session -static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5); - // moved to ydb/core/client/server/msgbus_server_persqueue.h? // const TString& TopicPrefix(const TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 141114356c..8e99ad2e34 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -947,7 +947,7 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c NumPartitionsFromTopic[holder.FullConverter->GetInternalName()] = 0; } - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); } template <bool UseMigrationProtocol> @@ -1841,9 +1841,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) { - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); - const auto timeout = TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()); + + ctx.Schedule(timeout, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); + const bool authTimedOut = (ctx.Now() - LastACLCheckTimestamp) > timeout; if (Token && !AuthInitActor && (ForceACLCheck || (authTimedOut && RequestNotChecked))) { diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 0e090790a6..262cd162b6 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1050,7 +1050,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T State = ES_INITED; - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); //init completed; wait for first data chunk NextRequestInited = true; @@ -1579,7 +1579,6 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); OnWakeup(tag); - switch (tag) { case EWakeupTag::RlInit: return CheckACL(ctx); @@ -1616,7 +1615,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) { Y_VERIFY(State == ES_INITED); - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) { RequestNotChecked = false; InitCheckSchema(ctx); diff --git a/ydb/services/persqueue_v1/ut/persqueue_common_tests.h b/ydb/services/persqueue_v1/ut/persqueue_common_tests.h index abafd55010..f184667e6f 100644 --- a/ydb/services/persqueue_v1/ut/persqueue_common_tests.h +++ b/ydb/services/persqueue_v1/ut/persqueue_common_tests.h @@ -49,7 +49,7 @@ public: TCommonTests(bool tenantModeEnabled) : TenantModeEnabled(tenantModeEnabled) {} - + TPersQueueV1TestServer CreateServer() { return TPersQueueV1TestServer(false, TenantModeEnabled); } @@ -126,7 +126,6 @@ public: void WriteSessionWithValidTokenAndACEAndThenRemoveACEAndSendWriteRequest() { TPersQueueV1TestServer server = CreateServer(); - //setup.GetPQConfig().SetACLRetryTimeoutSec(0); NACLib::TDiffACL acl; const auto token = GenerateValidToken(); acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, token); @@ -144,10 +143,6 @@ public: UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kInitResponse, serverMessage); - acl.ClearAccess(); - Cerr << "===ModifyAcl\n"; - server.ModifyTopicACL(server.GetTopic(), acl); - clientMessage = StreamingWriteClientMessage(); auto *writeRequest = clientMessage.mutable_write_request(); TString message = "x"; @@ -168,6 +163,9 @@ public: AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, serverMessage); + acl.ClearAccess(); + Cerr << "===ModifyAcl\n"; + server.ModifyTopicACL(server.GetTopic(), acl); Cerr << "===Wait for session created with token with removed ACE to die"; AssertStreamingSessionDead(stream, Ydb::StatusIds::UNAUTHORIZED, diff --git a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h index feb7e5d5cf..10c871ad5f 100644 --- a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h +++ b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h @@ -18,6 +18,8 @@ namespace NKikimr::NPersQueueTests { + + #define SET_LOCALS \ auto& pqClient = server.Server->AnnoyingClient; \ Y_UNUSED(pqClient); \ @@ -41,14 +43,19 @@ namespace NKikimr::NPersQueueTests { virtual void AlterSettings(NKikimr::Tests::TServerSettings& settings) { Y_UNUSED(settings); } + + void InitializePQ() { Y_VERIFY(Server == nullptr); PortManager = new TPortManager(); Server = MakeHolder<NPersQueue::TTestServer>(false, PortManager); Server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(TenantModeEnabled()); Server->ServerSettings.PQConfig.MutablePQDiscoveryConfig()->SetLBFrontEnabled(true); + Server->ServerSettings.PQConfig.SetACLRetryTimeoutSec(1); + AlterSettings(Server->ServerSettings); Server->StartServer(false); + if (TenantModeEnabled()) { Server->AnnoyingClient->SetNoConfigMode(); Server->ServerSettings.PQConfig.SetSourceIdTablePath("some unused path"); @@ -59,6 +66,7 @@ namespace NKikimr::NPersQueueTests { EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_INFO); EnablePQLogs({NKikimrServices::KQP_PROXY}, NLog::EPriority::PRI_EMERG); + Server->AnnoyingClient->FullInit(); Server->AnnoyingClient->CreateConsumer("user"); if (TenantModeEnabled()) { diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h index c02308b0d7..915e7906b3 100644 --- a/ydb/services/persqueue_v1/ut/test_utils.h +++ b/ydb/services/persqueue_v1/ut/test_utils.h @@ -70,7 +70,9 @@ void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClient const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode) { TServerMessage serverMessage; - AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); + auto res = stream->Read(&serverMessage); + Cerr << serverMessage.DebugString() << "\n"; + AssertSuccessfullStreamingOperation(res, stream); UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status()); UNIT_ASSERT_LE(1, serverMessage.issues_size()); // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"? |