aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-02-14 16:10:12 +0300
committeralexnick <alexnick@ydb.tech>2023-02-14 16:10:12 +0300
commit68fb62b3af9716c5055c7b2f44e109eaf71ec318 (patch)
treeb1d505f6def617849a40dcc0fed5f579fcb84690
parent32778d66b1d27930f41bf2657a78b8ea668f0f10 (diff)
downloadydb-68fb62b3af9716c5055c7b2f44e109eaf71ec318.tar.gz
fix for
fix for
-rw-r--r--ydb/services/persqueue_v1/actors/persqueue_utils.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp7
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp5
-rw-r--r--ydb/services/persqueue_v1/ut/persqueue_common_tests.h10
-rw-r--r--ydb/services/persqueue_v1/ut/persqueue_test_fixture.h8
-rw-r--r--ydb/services/persqueue_v1/ut/test_utils.h4
6 files changed, 21 insertions, 15 deletions
diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.h b/ydb/services/persqueue_v1/actors/persqueue_utils.h
index ea90f8b427..56a03bff14 100644
--- a/ydb/services/persqueue_v1/actors/persqueue_utils.h
+++ b/ydb/services/persqueue_v1/actors/persqueue_utils.h
@@ -15,8 +15,6 @@ namespace NKikimr::NGRpcProxy::V1 {
#endif
#define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session
-static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5);
-
// moved to ydb/core/client/server/msgbus_server_persqueue.h?
// const TString& TopicPrefix(const TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 141114356c..8e99ad2e34 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -947,7 +947,7 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c
NumPartitionsFromTopic[holder.FullConverter->GetInternalName()] = 0;
}
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
}
template <bool UseMigrationProtocol>
@@ -1841,9 +1841,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
-
const auto timeout = TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec());
+
+ ctx.Schedule(timeout, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
+
const bool authTimedOut = (ctx.Now() - LastACLCheckTimestamp) > timeout;
if (Token && !AuthInitActor && (ForceACLCheck || (authTimedOut && RequestNotChecked))) {
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 0e090790a6..262cd162b6 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1050,7 +1050,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
State = ES_INITED;
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
//init completed; wait for first data chunk
NextRequestInited = true;
@@ -1579,7 +1579,6 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
OnWakeup(tag);
-
switch (tag) {
case EWakeupTag::RlInit:
return CheckACL(ctx);
@@ -1616,7 +1615,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
Y_VERIFY(State == ES_INITED);
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) {
RequestNotChecked = false;
InitCheckSchema(ctx);
diff --git a/ydb/services/persqueue_v1/ut/persqueue_common_tests.h b/ydb/services/persqueue_v1/ut/persqueue_common_tests.h
index abafd55010..f184667e6f 100644
--- a/ydb/services/persqueue_v1/ut/persqueue_common_tests.h
+++ b/ydb/services/persqueue_v1/ut/persqueue_common_tests.h
@@ -49,7 +49,7 @@ public:
TCommonTests(bool tenantModeEnabled)
: TenantModeEnabled(tenantModeEnabled)
{}
-
+
TPersQueueV1TestServer CreateServer() {
return TPersQueueV1TestServer(false, TenantModeEnabled);
}
@@ -126,7 +126,6 @@ public:
void WriteSessionWithValidTokenAndACEAndThenRemoveACEAndSendWriteRequest() {
TPersQueueV1TestServer server = CreateServer();
- //setup.GetPQConfig().SetACLRetryTimeoutSec(0);
NACLib::TDiffACL acl;
const auto token = GenerateValidToken();
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, token);
@@ -144,10 +143,6 @@ public:
UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kInitResponse,
serverMessage);
- acl.ClearAccess();
- Cerr << "===ModifyAcl\n";
- server.ModifyTopicACL(server.GetTopic(), acl);
-
clientMessage = StreamingWriteClientMessage();
auto *writeRequest = clientMessage.mutable_write_request();
TString message = "x";
@@ -168,6 +163,9 @@ public:
AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse,
serverMessage);
+ acl.ClearAccess();
+ Cerr << "===ModifyAcl\n";
+ server.ModifyTopicACL(server.GetTopic(), acl);
Cerr << "===Wait for session created with token with removed ACE to die";
AssertStreamingSessionDead(stream, Ydb::StatusIds::UNAUTHORIZED,
diff --git a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
index feb7e5d5cf..10c871ad5f 100644
--- a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
+++ b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
@@ -18,6 +18,8 @@ namespace NKikimr::NPersQueueTests {
+
+
#define SET_LOCALS \
auto& pqClient = server.Server->AnnoyingClient; \
Y_UNUSED(pqClient); \
@@ -41,14 +43,19 @@ namespace NKikimr::NPersQueueTests {
virtual void AlterSettings(NKikimr::Tests::TServerSettings& settings) {
Y_UNUSED(settings);
}
+
+
void InitializePQ() {
Y_VERIFY(Server == nullptr);
PortManager = new TPortManager();
Server = MakeHolder<NPersQueue::TTestServer>(false, PortManager);
Server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(TenantModeEnabled());
Server->ServerSettings.PQConfig.MutablePQDiscoveryConfig()->SetLBFrontEnabled(true);
+ Server->ServerSettings.PQConfig.SetACLRetryTimeoutSec(1);
+
AlterSettings(Server->ServerSettings);
Server->StartServer(false);
+
if (TenantModeEnabled()) {
Server->AnnoyingClient->SetNoConfigMode();
Server->ServerSettings.PQConfig.SetSourceIdTablePath("some unused path");
@@ -59,6 +66,7 @@ namespace NKikimr::NPersQueueTests {
EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_INFO);
EnablePQLogs({NKikimrServices::KQP_PROXY}, NLog::EPriority::PRI_EMERG);
+
Server->AnnoyingClient->FullInit();
Server->AnnoyingClient->CreateConsumer("user");
if (TenantModeEnabled()) {
diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h
index c02308b0d7..915e7906b3 100644
--- a/ydb/services/persqueue_v1/ut/test_utils.h
+++ b/ydb/services/persqueue_v1/ut/test_utils.h
@@ -70,7 +70,9 @@ void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClient
const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode)
{
TServerMessage serverMessage;
- AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
+ auto res = stream->Read(&serverMessage);
+ Cerr << serverMessage.DebugString() << "\n";
+ AssertSuccessfullStreamingOperation(res, stream);
UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status());
UNIT_ASSERT_LE(1, serverMessage.issues_size());
// TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"?