diff options
author | baidarov <baidarov@yandex-team.ru> | 2022-02-10 16:52:23 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:23 +0300 |
commit | 7e0fa93950d364bdb59fbe19350a611dc65a6512 (patch) | |
tree | be78cff15358bb53858ebdd80775302b2f79b6aa | |
parent | e31629dd6bd33e846b5ca7cafd3fb209a3a9f398 (diff) | |
download | ydb-7e0fa93950d364bdb59fbe19350a611dc65a6512.tar.gz |
Restoring authorship annotation for <baidarov@yandex-team.ru>. Commit 1 of 2.
35 files changed, 618 insertions, 618 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index c666f7468c0..fdff62b4033 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -133,7 +133,7 @@ struct TAppData { THolder<NKikimrCms::TCmsConfig> DefaultCmsConfig; NKikimrStream::TStreamingConfig StreamingConfig; - NKikimrPQ::TPQConfig PQConfig; + NKikimrPQ::TPQConfig PQConfig; NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig; NKikimrNetClassifier::TNetClassifierConfig NetClassifierConfig; NKikimrNetClassifier::TNetClassifierDistributableConfig NetClassifierDistributableConfig; diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f5fedfe19b2..a2cc9e4f602 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -86,9 +86,9 @@ struct TKikimrEvents : TEvents { ES_BLOCKSTORE, //4162 ES_RTMR_ICBUS, ES_TENANT_POOL, - ES_USER_REGISTRY, - ES_TVM_SETTINGS_UPDATER, - ES_PQ_CLUSTERS_UPDATER, + ES_USER_REGISTRY, + ES_TVM_SETTINGS_UPDATER, + ES_PQ_CLUSTERS_UPDATER, ES_TENANT_SLOT_BROKER, ES_GRPC_CALLS, ES_CONSOLE, diff --git a/ydb/core/base/user_registry.h b/ydb/core/base/user_registry.h index 74484c3d06c..72ddefd4f76 100644 --- a/ydb/core/base/user_registry.h +++ b/ydb/core/base/user_registry.h @@ -1,48 +1,48 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/events.h> #include <ydb/core/tx/defs.h> - -namespace NKikimr { - struct TEvUserRegistry { - enum EEv { - EvGetUserById = EventSpaceBegin(TKikimrEvents::ES_USER_REGISTRY), - EvGetUserByIdResult, - EvEnd - }; - - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY), - "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY)"); - - struct TEvGetUserById : TEventLocal<TEvGetUserById, EvGetUserById> { - const ui64 UID; + +namespace NKikimr { + struct TEvUserRegistry { + enum EEv { + EvGetUserById = EventSpaceBegin(TKikimrEvents::ES_USER_REGISTRY), + EvGetUserByIdResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY), + "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY)"); + + struct TEvGetUserById : TEventLocal<TEvGetUserById, EvGetUserById> { + const ui64 UID; const TString UserNameHint; - + TEvGetUserById(const ui64 uid, const TString& userNameHint = "") - : UID(uid) + : UID(uid) , UserNameHint(userNameHint) - {} - }; - - struct TEvGetUserByIdResult : TEventLocal<TEvGetUserByIdResult, EvGetUserByIdResult> { - const ui64 UID; + {} + }; + + struct TEvGetUserByIdResult : TEventLocal<TEvGetUserByIdResult, EvGetUserByIdResult> { + const ui64 UID; const TString UserNameHint; - const TString User; - const TString Error; - + const TString User; + const TString Error; + TEvGetUserByIdResult(const ui64 uid, const TString& userNameHint, const TString& user = "", const TString& error = "") - : UID(uid) + : UID(uid) , UserNameHint(userNameHint) - , User(user) - , Error(error) - {} - }; - }; - + , User(user) + , Error(error) + {} + }; + }; + inline NActors::TActorId MakeUserRegistryID() { - const char name[12] = "userregistr"; + const char name[12] = "userregistr"; return NActors::TActorId(0, TStringBuf(name, 12)); - } - - IActor* CreateUserRegistry(const TString& query); -} + } + + IActor* CreateUserRegistry(const TString& query); +} diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index 78b83ef3a3b..a7ce6a4c623 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -458,7 +458,7 @@ protected: ui32 TopicsAnswered; THashSet<ui64> TabletsDiscovered; THashSet<ui64> TabletsAnswered; - ui32 AclRequests; + ui32 AclRequests; ui32 DescribeRequests; ui32 PartTabletsRequested; TString ErrorReason; @@ -779,8 +779,8 @@ public: bool AnswerIfCanForMeta(const TActorContext& ctx) { Y_VERIFY(IsMetaRequest); Y_VERIFY(RequestProto.HasMetaRequest()); - if (AclRequests) - return false; + if (AclRequests) + return false; if (DescribeRequests) return false; const auto& meta = RequestProto.GetMetaRequest(); @@ -1210,7 +1210,7 @@ public: const auto& offset = req.GetOffset(); const auto& part = req.GetPartition(); const auto& maxBytes = req.GetMaxBytes(); - const auto& readTimestampMs = req.GetReadTimestampMs(); + const auto& readTimestampMs = req.GetReadTimestampMs(); auto it = TopicInfo.find(topic); Y_VERIFY(it != TopicInfo.end()); if (it->second.PartitionToTablet.find(part) == it->second.PartitionToTablet.end()) { //tablet's info is not filled for this topic yet @@ -1242,7 +1242,7 @@ public: read->SetCount(1000000); read->SetTimeoutMs(0); read->SetBytes(Min<ui32>(maxBytes, FetchRequestBytesLeft)); - read->SetReadTimestampMs(readTimestampMs); + read->SetReadTimestampMs(readTimestampMs); NTabletPipe::SendData(ctx, jt->second.PipeClient, preq.Release()); } diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp index 237bba147b9..b1fb7b8f587 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp @@ -727,7 +727,7 @@ public: NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override { NKikimrClient::TPersQueueRequest persQueueRequest; - persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); + persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetTopicMetadata(); req.AddTopic("topic1"); @@ -821,7 +821,7 @@ public: NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override { NKikimrClient::TPersQueueRequest persQueueRequest; - persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); + persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionLocations(); FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount); @@ -984,7 +984,7 @@ public: NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override { NKikimrClient::TPersQueueRequest persQueueRequest; - persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); + persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionOffsets(); FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount); @@ -1145,7 +1145,7 @@ public: NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override { NKikimrClient::TPersQueueRequest persQueueRequest; - persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); + persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionStatus(); FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount); @@ -1298,7 +1298,7 @@ public: NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override { NKikimrClient::TPersQueueRequest persQueueRequest; - persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); + persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN); auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetReadSessionsInfo(); req.SetClientId("client_id"); diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp index 6766dd31719..40dbb7f1777 100644 --- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp +++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp @@ -208,10 +208,10 @@ protected: config.Opts->AddLongOption("kqp-file", "Kikimr Query Processor config file").OptionalArgument("PATH"); config.Opts->AddLongOption("incrhuge-file", "incremental huge blob keeper config file").OptionalArgument("PATH"); config.Opts->AddLongOption("memorylog-file", "set buffer size for memory log").OptionalArgument("PATH"); - config.Opts->AddLongOption("pq-file", "PersQueue config file").OptionalArgument("PATH"); + config.Opts->AddLongOption("pq-file", "PersQueue config file").OptionalArgument("PATH"); config.Opts->AddLongOption("pqcd-file", "PersQueue cluster discovery config file").OptionalArgument("PATH"); config.Opts->AddLongOption("netclassifier-file", "NetClassifier config file").OptionalArgument("PATH"); - config.Opts->AddLongOption("auth-file", "authorization configuration").OptionalArgument("PATH"); + config.Opts->AddLongOption("auth-file", "authorization configuration").OptionalArgument("PATH"); config.Opts->AddLongOption("auth-token-file", "authorization token configuration").OptionalArgument("PATH"); config.Opts->AddLongOption("key-file", "tanant encryption key configuration").OptionalArgument("PATH"); config.Opts->AddLongOption("pdisk-key-file", "pdisk encryption key configuration").OptionalArgument("PATH"); diff --git a/ydb/core/driver_lib/run/config_parser.cpp b/ydb/core/driver_lib/run/config_parser.cpp index bb686562912..e62a7d38295 100644 --- a/ydb/core/driver_lib/run/config_parser.cpp +++ b/ydb/core/driver_lib/run/config_parser.cpp @@ -65,10 +65,10 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts& opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST"); opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT"); opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT"); - opts.AddLongOption("pq-file", "PQ config file").OptionalArgument("PATH"); + opts.AddLongOption("pq-file", "PQ config file").OptionalArgument("PATH"); opts.AddLongOption("pqcd-file", "PQCD config file").OptionalArgument("PATH"); opts.AddLongOption("netclassifier-file", "NetClassifier config file").OptionalArgument("PATH"); - opts.AddLongOption("auth-file", "authorization config file").OptionalArgument("PATH"); + opts.AddLongOption("auth-file", "authorization config file").OptionalArgument("PATH"); opts.AddLongOption("auth-token-file", "authorization token config file").OptionalArgument("PATH"); opts.AddLongOption("key-file", "encryption key config file").OptionalArgument("PATH"); opts.AddLongOption("sqs-file", "SQS config file").OptionalArgument("PATH"); @@ -146,7 +146,7 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu conf.SetStartGRpcProxy(true); conf.SetPort(FromString<ui16>(res.Get("grpc-port"))); } - + if (res.Has("grpcs-port")) { auto& conf = *Config.AppConfig.MutableGRpcConfig(); conf.SetStartGRpcProxy(true); @@ -168,10 +168,10 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu conf.SetPublicSslPort(FromString<ui16>(res.Get("grpcs-public-port"))); } - if (res.Has("pq-file")) { - Y_VERIFY(ParsePBFromFile(res.Get("pq-file"), Config.AppConfig.MutablePQConfig())); - } - + if (res.Has("pq-file")) { + Y_VERIFY(ParsePBFromFile(res.Get("pq-file"), Config.AppConfig.MutablePQConfig())); + } + if (res.Has("pqcd-file")) { Y_VERIFY(ParsePBFromFile(res.Get("pqcd-file"), Config.AppConfig.MutablePQClusterDiscoveryConfig())); } @@ -180,9 +180,9 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu Y_VERIFY(ParsePBFromFile(res.Get("netclassifier-file"), Config.AppConfig.MutableNetClassifierConfig())); } - if (res.Has("auth-file")) { - Y_VERIFY(ParsePBFromFile(res.Get("auth-file"), Config.AppConfig.MutableAuthConfig())); - } + if (res.Has("auth-file")) { + Y_VERIFY(ParsePBFromFile(res.Get("auth-file"), Config.AppConfig.MutableAuthConfig())); + } if (res.Has("auth-token-file")) { Y_VERIFY(ParsePBFromFile(res.Get("auth-token-file"), Config.AppConfig.MutableAuthConfig())); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index a4f74aa4e0f..076ca7b2c38 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -875,9 +875,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) } if (runConfig.AppConfig.HasPQConfig()) { - AppData->PQConfig.CopyFrom(runConfig.AppConfig.GetPQConfig()); + AppData->PQConfig.CopyFrom(runConfig.AppConfig.GetPQConfig()); } - + if (runConfig.AppConfig.HasPQClusterDiscoveryConfig()) { AppData->PQClusterDiscoveryConfig.CopyFrom(runConfig.AppConfig.GetPQClusterDiscoveryConfig()); } @@ -891,9 +891,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) } if (runConfig.AppConfig.HasAuthConfig()) { - AppData->AuthConfig.CopyFrom(runConfig.AppConfig.GetAuthConfig()); + AppData->AuthConfig.CopyFrom(runConfig.AppConfig.GetAuthConfig()); } - + if (runConfig.AppConfig.HasKeyConfig()) { AppData->KeyConfig.CopyFrom(runConfig.AppConfig.GetKeyConfig()); } diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index e1957e7a16f..a82ad3abdb8 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -36,9 +36,9 @@ struct TEvPersQueue { EvGetReadSessionsInfo, EvReadSessionsInfoResponse, EvWakeupClient, - EvUpdateACL, - EvCheckACL, - EvCheckACLResponse, + EvUpdateACL, + EvCheckACL, + EvCheckACLResponse, EvError, EvGetPartitionIdForWrite, EvGetPartitionIdForWriteResponse, @@ -182,22 +182,22 @@ struct TEvPersQueue { TEvDescribeResponse() {} }; - - struct TEvUpdateACL : public TEventLocal<TEvUpdateACL, EvUpdateACL> { - TEvUpdateACL() - {} - }; - - struct TEvCheckACL : public TEventPB<TEvCheckACL, NKikimrPQ::TCheckACL, EvCheckACL> { - TEvCheckACL() - {} - }; - - struct TEvCheckACLResponse : public TEventPB<TEvCheckACLResponse, NKikimrPQ::TCheckACLResponse, EvCheckACLResponse> { - TEvCheckACLResponse() - {}; - }; - + + struct TEvUpdateACL : public TEventLocal<TEvUpdateACL, EvUpdateACL> { + TEvUpdateACL() + {} + }; + + struct TEvCheckACL : public TEventPB<TEvCheckACL, NKikimrPQ::TCheckACL, EvCheckACL> { + TEvCheckACL() + {} + }; + + struct TEvCheckACLResponse : public TEventPB<TEvCheckACLResponse, NKikimrPQ::TCheckACLResponse, EvCheckACLResponse> { + TEvCheckACLResponse() + {}; + }; + struct TEvError : public TEventPB<TEvError, NPersQueueCommon::TError, EvError> { TEvError() {} diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index bc892988b47..b24395e5cba 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -125,7 +125,7 @@ struct TEvPQ { ui32 TotalSize; ui64 CreateTimestamp; ui64 ReceiveTimestamp; - bool DisableDeduplication; + bool DisableDeduplication; ui64 WriteTimestamp; TString Data; ui32 UncompressedSize; @@ -173,7 +173,7 @@ struct TEvPQ { , ClientId(clientId) , Timeout(timeout) , Size(size) - , MaxTimeLagMs(maxTimeLagMs) + , MaxTimeLagMs(maxTimeLagMs) , ReadTimestampMs(readTimestampMs) , ClientDC(clientDC) , ExternalOperation(externalOperation) @@ -187,7 +187,7 @@ struct TEvPQ { TString ClientId; ui32 Timeout; ui32 Size; - ui32 MaxTimeLagMs; + ui32 MaxTimeLagMs; ui64 ReadTimestampMs; TString ClientDC; bool ExternalOperation; diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index 098c67cf7d9..1bf03466edb 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -20,10 +20,10 @@ public: }; enum EMark : char { - MarkUserDeprecated = 'u', - MarkSourceId = 's', + MarkUserDeprecated = 'u', + MarkSourceId = 's', MarkProtoSourceId = 'p', - MarkUser = 'c' + MarkUser = 'c' }; TKeyPrefix(EType type, const ui32 partition) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cc8e2419cf2..8469f4a9ef5 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -214,20 +214,20 @@ IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value) { return out; } - + ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) { - if (container.empty()) { + if (container.empty()) { return offset; - } - auto it = std::lower_bound(container.begin(), container.end(), timestamp, + } + auto it = std::lower_bound(container.begin(), container.end(), timestamp, [](const TDataKey& p, const TInstant timestamp) { return timestamp > p.Timestamp; }); - if (it == container.end()) { + if (it == container.end()) { return offset; - } else { - return it->Key.GetOffset(); - } -} - + } else { + return it->Key.GetOffset(); + } +} + struct TMirrorerInfo { TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline) : Actor(actor) @@ -686,7 +686,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo TABLEH() {out << "ReadOffset";} TABLEH() {out << "ReadWriteTimestamp";} TABLEH() {out << "ReadCreateTimestamp";} - TABLEH() {out << "ReadOffsetRewindSum";} + TABLEH() {out << "ReadOffsetRewindSum";} TABLEH() {out << "ActiveReads";} TABLEH() {out << "Subscriptions";} } @@ -703,7 +703,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo TABLED() {out << (d.second.GetReadOffset());} TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadWriteTimestamp());} TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadCreateTimestamp());} - TABLED() {out << (d.second.ReadOffsetRewindSum);} + TABLED() {out << (d.second.ReadOffsetRewindSum);} TABLED() {out << d.second.ActiveReads;} TABLED() {out << d.second.Subscriptions;} } @@ -1400,7 +1400,7 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now()); } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) { UsersInfoStorage.Parse(*key, pair.GetValue(), ctx); - } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) { + } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) { UsersInfoStorage.ParseDeprecated(*key, pair.GetValue(), ctx); } } @@ -2633,29 +2633,29 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const const TString& user = read->ClientId; auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); - ui64 offset = read->Offset; + ui64 offset = read->Offset; if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) { TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero(); timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs)); timestamp = Max(timestamp, userInfo.ReadFromTimestamp); offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset); userInfo.ReadOffsetRewindSum += offset - read->Offset; - } + } TReadInfo info(user, read->ClientDC, offset, read->PartNo, read->Count, read->Size, read->Cookie, read->ReadTimestampMs, waitQuotaTime); ui64 cookie = Cookie++; LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "read cookie " << cookie << " Topic '" << TopicName << "' partition " << Partition << " user " << user - << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset - << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); + << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset + << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); - if (offset == EndOffset) { + if (offset == EndOffset) { if (read->Timeout > 30000) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "too big read timeout " << " Topic '" << TopicName << "' partition " << Partition << " user " << read->ClientId - << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset - << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); + << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset + << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); read->Timeout = 30000; } Subscriber.AddSubscription(std::move(info), read->Timeout, cookie, ctx); @@ -2665,7 +2665,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const return; } - Y_VERIFY(offset < EndOffset); + Y_VERIFY(offset < EndOffset); ProcessRead(ctx, std::move(info), cookie, false); } @@ -2708,7 +2708,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) if (it != SourceIdStorage.GetInMemorySourceIds().end()) { maxSeqNo = it->second.SeqNo; maxOffset = it->second.Offset; - if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { + if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) { already = true; } } @@ -3172,14 +3172,14 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum; if (readOffsetRewindSum != userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) { - haveChanges = true; + haveChanges = true; userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Set(readOffsetRewindSum); - } + } if (readOffsetRewindSum != userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_TOTAL].Get()) { haveChanges = true; userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_TOTAL].Set(readOffsetRewindSum); } - + ui32 id = METRIC_TOTAL_READ_SPEED_1; for (ui32 i = 0; i < userInfo.AvgReadBytes.size(); ++i) { ui64 avg = userInfo.AvgReadBytes[i].GetValue(); @@ -3893,7 +3893,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T continue;*/ } - TBuffer idata; + TBuffer idata; { NKikimrPQ::TUserInfo userData; userData.SetOffset(offset); @@ -3904,11 +3904,11 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T userData.SetReadRuleGeneration(readRuleGeneration); TString out; Y_PROTOBUF_SUPPRESS_NODISCARD userData.SerializeToString(&out); - + idata.Append(out.c_str(), out.size()); } TBuffer idataDeprecated = NDeprecatedUserData::Serialize(offset, gen, step, session); - + auto write = request->Record.AddCmdWrite(); write->SetKey(ikey.Data(), ikey.Size()); write->SetValue(idata.Data(), idata.Size()); @@ -3917,7 +3917,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T write2->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size()); write2->SetValue(idataDeprecated.Data(), idataDeprecated.Size()); write2->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); - + request->Record.SetCookie(cookie); ctx.Send(Tablet, request.Release()); @@ -3945,7 +3945,7 @@ void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue: void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST) { - ReplyError(ctx, p.Cookie, errorCode, errorStr); + ReplyError(ctx, p.Cookie, errorCode, errorStr); Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); FailBadClient(ctx); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 2eec26f2516..d5ddcaa6c10 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -32,15 +32,15 @@ static const ui32 MAX_BLOB_PART_SIZE = 500 << 10; //500Kb typedef TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor> TPartitionLabeledCounters; -struct TDataKey { - TKey Key; - ui32 Size; +struct TDataKey { + TKey Key; + ui32 Size; TInstant Timestamp; - ui64 CumulativeSize; -}; - + ui64 CumulativeSize; +}; + ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset); - + struct TMirrorerInfo; class TPartition : public TActorBootstrapped<TPartition> { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 9937dbe561c..cf592028b22 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1499,7 +1499,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p ui32 mSize = MAX_BLOB_PART_SIZE - cmd.GetSourceId().size() - sizeof(ui32) - TClientBlob::OVERHEAD; //megaqc - remove this Y_VERIFY(mSize > 204800); ui64 receiveTimestampMs = TAppData::TimeProvider->Now().MilliSeconds(); - bool disableDeduplication = cmd.GetDisableDeduplication(); + bool disableDeduplication = cmd.GetDisableDeduplication(); if (cmd.GetData().size() > mSize) { if (cmd.HasPartNo()) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, @@ -1650,7 +1650,7 @@ void TPersQueue::HandleReadRequest(const ui64 responseCookie, const TActorId& pa } else if (cmd.HasPartNo() && cmd.GetPartNo() < 0) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "invalid partNo in read request: " << ToString(req).data()); - } else if (cmd.HasMaxTimeLagMs() && cmd.GetMaxTimeLagMs() < 0) { + } else if (cmd.HasMaxTimeLagMs() && cmd.GetMaxTimeLagMs() < 0) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "invalid maxTimeLagMs in read request: " << ToString(req).data()); } else { diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index 13a61229105..2f073a41f41 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -138,53 +138,53 @@ Y_UNIT_TEST(TestGroupsBalancer3) { Y_UNIT_TEST(TestUserInfoCompatibility) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - activeZone = false; - TString client = "test"; + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + TString client = "test"; tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); - + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}}, tc, 4, 6*1024*1024, true, 0, 0, 1); - TVector<std::pair<ui64, TString>> data; - data.push_back({1, "s"}); - data.push_back({2, "q"}); - CmdWrite(0, "sourceid", data, tc); - CmdWrite(1, "sourceid", data, tc); - CmdWrite(2, "sourceid", data, tc); + TVector<std::pair<ui64, TString>> data; + data.push_back({1, "s"}); + data.push_back({2, "q"}); + CmdWrite(0, "sourceid", data, tc); + CmdWrite(1, "sourceid", data, tc); + CmdWrite(2, "sourceid", data, tc); CmdWrite(3, "sourceid", data, tc); + - - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - FillUserInfo(request->Record.AddCmdWrite(), client, 0, 0); - FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 0, 0); - FillUserInfo(request->Record.AddCmdWrite(), client, 1, 1); - FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 2, 1); + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + FillUserInfo(request->Record.AddCmdWrite(), client, 0, 0); + FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 0, 0); + FillUserInfo(request->Record.AddCmdWrite(), client, 1, 1); + FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 2, 1); FillUserInfo(request->Record.AddCmdWrite(), client, 2, 1); FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 3, 0); FillUserInfo(request->Record.AddCmdWrite(), client, 3, 1); - - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - TAutoPtr<IEventHandle> handle; - TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - Y_UNUSED(result); - - RestartTablet(tc); + + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + Y_UNUSED(result); + + RestartTablet(tc); Cerr << "AFTER RESTART\n"; - CmdGetOffset(0, client, 0, tc); - CmdGetOffset(1, client, 1, tc); - CmdGetOffset(2, client, 1, tc); + CmdGetOffset(0, client, 0, tc); + CmdGetOffset(1, client, 1, tc); + CmdGetOffset(2, client, 1, tc); CmdGetOffset(3, client, 1, tc); - }); -} - + }); +} + Y_UNIT_TEST(TestReadRuleVersions) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { @@ -273,49 +273,49 @@ Y_UNIT_TEST(TestCreateBalancer) { } Y_UNIT_TEST(TestDescribeBalancer) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - activeZone = false; + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()}; ui64 ssId = 9876; BootFakeSchemeShard(*tc.Runtime, ssId, state); - tc.Runtime->SetScheduledLimit(50); - tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100)); + tc.Runtime->SetScheduledLimit(50); + tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100)); BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc); - TAutoPtr<IEventHandle> handle; - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries()); - TEvPersQueue::TEvDescribeResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle); - UNIT_ASSERT(result); - auto& rec = result->Record; + TAutoPtr<IEventHandle> handle; + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries()); + TEvPersQueue::TEvDescribeResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle); + UNIT_ASSERT(result); + auto& rec = result->Record; UNIT_ASSERT(rec.HasSchemeShardId() && rec.GetSchemeShardId() == ssId); - RestartTablet(tc); - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle); - UNIT_ASSERT(result); - auto& rec2 = result->Record; + RestartTablet(tc); + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle); + UNIT_ASSERT(result); + auto& rec2 = result->Record; UNIT_ASSERT(rec2.HasSchemeShardId() && rec2.GetSchemeShardId() == ssId); - }); -} + }); +} Y_UNIT_TEST(TestCheckACL) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - activeZone = false; - TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()}; - ui64 ssId = 9876; - BootFakeSchemeShard(*tc.Runtime, ssId, state); + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()}; + ui64 ssId = 9876; + BootFakeSchemeShard(*tc.Runtime, ssId, state); IActor* ticketParser = NKikimr::CreateTicketParser(tc.Runtime->GetAppData().AuthConfig); TActorId ticketParserId = tc.Runtime->Register(ticketParser); - tc.Runtime->RegisterService(NKikimr::MakeTicketParserID(), ticketParserId); + tc.Runtime->RegisterService(NKikimr::MakeTicketParserID(), ticketParserId); TAutoPtr<IEventHandle> handle; THolder<TEvPersQueue::TEvCheckACL> request(new TEvPersQueue::TEvCheckACL()); @@ -326,57 +326,57 @@ Y_UNIT_TEST(TestCheckACL) { tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); tc.Runtime->SetScheduledLimit(600); - tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100)); + tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100)); BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc); - - { - TDispatchOptions options; + + { + TDispatchOptions options; options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult); - tc.Runtime->DispatchEvents(options); - } - - TEvPersQueue::TEvCheckACLResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec = result->Record; - UNIT_ASSERT(rec.GetAccess() == NKikimrPQ::EAccess::DENIED); - UNIT_ASSERT(rec.GetTopic() == "topic"); - - state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "client@" BUILTIN_ACL_DOMAIN); - - { - TDispatchOptions options; + tc.Runtime->DispatchEvents(options); + } + + TEvPersQueue::TEvCheckACLResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec = result->Record; + UNIT_ASSERT(rec.GetAccess() == NKikimrPQ::EAccess::DENIED); + UNIT_ASSERT(rec.GetTopic() == "topic"); + + state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "client@" BUILTIN_ACL_DOMAIN); + + { + TDispatchOptions options; options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult); - tc.Runtime->DispatchEvents(options); - } - - request.Reset(new TEvPersQueue::TEvCheckACL()); + tc.Runtime->DispatchEvents(options); + } + + request.Reset(new TEvPersQueue::TEvCheckACL()); request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString()); request->Record.SetUser("client"); - request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec2 = result->Record; + request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec2 = result->Record; UNIT_ASSERT_C(rec2.GetAccess() == NKikimrPQ::EAccess::ALLOWED, rec2); - - state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "client@" BUILTIN_ACL_DOMAIN); - - { - TDispatchOptions options; + + state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "client@" BUILTIN_ACL_DOMAIN); + + { + TDispatchOptions options; options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult); - tc.Runtime->DispatchEvents(options); - } - - request.Reset(new TEvPersQueue::TEvCheckACL()); + tc.Runtime->DispatchEvents(options); + } + + request.Reset(new TEvPersQueue::TEvCheckACL()); request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString()); request->Record.SetUser("client"); - request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec3 = result->Record; - UNIT_ASSERT(rec3.GetAccess() == NKikimrPQ::EAccess::ALLOWED); - - request.Reset(new TEvPersQueue::TEvCheckACL()); + request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec3 = result->Record; + UNIT_ASSERT(rec3.GetAccess() == NKikimrPQ::EAccess::ALLOWED); + + request.Reset(new TEvPersQueue::TEvCheckACL()); request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString()); request->Record.SetUser("client2"); request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP); @@ -387,47 +387,47 @@ Y_UNIT_TEST(TestCheckACL) { UNIT_ASSERT(rec9.GetAccess() == NKikimrPQ::EAccess::ALLOWED); request.Reset(new TEvPersQueue::TEvCheckACL()); - // No auth provided and auth for topic not required - request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec5 = result->Record; - UNIT_ASSERT(rec5.GetAccess() == NKikimrPQ::EAccess::ALLOWED); - - request.Reset(new TEvPersQueue::TEvCheckACL()); - // No auth provided and auth for topic not required - request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec6 = result->Record; - UNIT_ASSERT(rec6.GetAccess() == NKikimrPQ::EAccess::ALLOWED); - - request.Reset(new TEvPersQueue::TEvCheckACL()); - // No auth provided and auth for topic is required - request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); + // No auth provided and auth for topic not required + request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec5 = result->Record; + UNIT_ASSERT(rec5.GetAccess() == NKikimrPQ::EAccess::ALLOWED); + + request.Reset(new TEvPersQueue::TEvCheckACL()); + // No auth provided and auth for topic not required + request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec6 = result->Record; + UNIT_ASSERT(rec6.GetAccess() == NKikimrPQ::EAccess::ALLOWED); + + request.Reset(new TEvPersQueue::TEvCheckACL()); + // No auth provided and auth for topic is required + request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); request->Record.SetToken(""); - + BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc, true); - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec7 = result->Record; - UNIT_ASSERT(rec7.GetAccess() == NKikimrPQ::EAccess::DENIED); - - request.Reset(new TEvPersQueue::TEvCheckACL()); - // No auth provided and auth for topic is required - request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec7 = result->Record; + UNIT_ASSERT(rec7.GetAccess() == NKikimrPQ::EAccess::DENIED); + + request.Reset(new TEvPersQueue::TEvCheckACL()); + // No auth provided and auth for topic is required + request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP); request->Record.SetToken(""); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); - auto& rec8 = result->Record; - UNIT_ASSERT(rec8.GetAccess() == NKikimrPQ::EAccess::DENIED); - }); -} - - + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle); + auto& rec8 = result->Record; + UNIT_ASSERT(rec8.GetAccess() == NKikimrPQ::EAccess::DENIED); + }); +} + + void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {}) { IActor* actor = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PERSQUEUE); @@ -980,28 +980,28 @@ Y_UNIT_TEST(TestAlreadyWritten) { Y_UNIT_TEST(TestAlreadyWrittenWithoutDeduplication) { - TTestContext tc; - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - activeZone = false; - tc.Runtime->SetScheduledLimit(200); - + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(200); + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob - TVector<std::pair<ui64, TString>> data; - activeZone = true; - - TString s{32, 'c'}; - ui32 pp = 4 + 8 + 1 + 9; - data.push_back({2, s.substr(pp)}); - CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true); - data[0].first = 1; - CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 1, false, false, true); - CmdRead(0, 0, Max<i32>(), Max<i32>(), 2, false, tc, {0, 1}); - }); -} + TVector<std::pair<ui64, TString>> data; + activeZone = true; + + TString s{32, 'c'}; + ui32 pp = 4 + 8 + 1 + 9; + data.push_back({2, s.substr(pp)}); + CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true); + data[0].first = 1; + CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 1, false, false, true); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 2, false, tc, {0, 1}); + }); +} Y_UNIT_TEST(TestWritePQCompact) { @@ -1904,14 +1904,14 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) { } Y_UNIT_TEST(TestOffsetEstimation) { - std::deque<NPQ::TDataKey> container = { + std::deque<NPQ::TDataKey> container = { {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 1, 0, 0, 0), 0, TInstant::Seconds(1), 10}, {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 2, 0, 0, 0), 0, TInstant::Seconds(1), 10}, {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 3, 0, 0, 0), 0, TInstant::Seconds(2), 10}, {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 4, 0, 0, 0), 0, TInstant::Seconds(2), 10}, {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 5, 0, 0, 0), 0, TInstant::Seconds(3), 10}, {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 6, 0, 0, 0), 0, TInstant::Seconds(3), 10}, - }; + }; UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(500), 9999), 1); @@ -1921,35 +1921,35 @@ Y_UNIT_TEST(TestOffsetEstimation) { UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(2500), 9999), 5); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(3000), 9999), 5); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(3500), 9999), 9999); -} - +} + Y_UNIT_TEST(TestMaxTimeLagRewind) { - TTestContext tc; - - RunTestWithReboots(tc.TabletIds, [&]() { - return tc.InitialEventsFilter.Prepare(); - }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { - TFinalizer finalizer(tc); - tc.Prepare(dispatchName, setup, activeZone); - - tc.Runtime->SetScheduledLimit(200); - + TTestContext tc; + + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + + tc.Runtime->SetScheduledLimit(200); + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{"aaa", true}}, tc); - activeZone = false; - - - for (int i = 0; i < 5; i++) { - TVector<std::pair<ui64, TString>> data; - for (int j = 0; j < 7; j++) { - data.push_back({7 * i + j + 1, TString(1 * 1024 * 1024, 'a')}); - } - CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0); - tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1)); - } + activeZone = false; + + + for (int i = 0; i < 5; i++) { + TVector<std::pair<ui64, TString>> data; + for (int j = 0; j < 7; j++) { + data.push_back({7 * i + j + 1, TString(1 * 1024 * 1024, 'a')}); + } + CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0); + tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1)); + } auto ts = tc.Runtime->GetCurrentTime(); - CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0}); - CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000); - CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000); + CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0}); + CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000); + CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000); CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000); CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0, ts.MilliSeconds() - 3 * 60 * 1000); @@ -1959,9 +1959,9 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) { PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{"aaa", true}}, tc, 2, 6*1024*1024, true, ts.MilliSeconds() - 1000); CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34}); - }); -} - + }); +} + Y_UNIT_TEST(TestWriteTimeStampEstimate) { TTestContext tc; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 4eb9c7206dd..629ad8a1755 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -12,7 +12,7 @@ using namespace NTabletFlatExecutor; static constexpr TDuration ACL_SUCCESS_RETRY_TIMEOUT = TDuration::Seconds(30); static constexpr TDuration ACL_ERROR_RETRY_TIMEOUT = TDuration::Seconds(5); static constexpr TDuration ACL_EXPIRATION_TIMEOUT = TDuration::Minutes(5); - + bool TPersQueueReadBalancer::TTxPreInit::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_UNUSED(ctx); NIceDb::TNiceDb(txc.DB).Materialize<Schema>(); @@ -44,7 +44,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA Self->Path = dataRowset.GetValue<Schema::Data::Path>(); Self->Version = dataRowset.GetValue<Schema::Data::Version>(); Self->MaxPartsPerTablet = dataRowset.GetValueOrDefault<Schema::Data::MaxPartsPerTablet>(0); - Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0); + Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0); Self->NextPartitionId = dataRowset.GetValueOrDefault<Schema::Data::NextPartitionId>(0); TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>(""); @@ -132,7 +132,7 @@ bool TPersQueueReadBalancer::TTxWrite::Execute(TTransactionContext& txc, const T NIceDb::TUpdate<Schema::Data::Path>(Self->Path), NIceDb::TUpdate<Schema::Data::Version>(Self->Version), NIceDb::TUpdate<Schema::Data::MaxPartsPerTablet>(Self->MaxPartsPerTablet), - NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId), + NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId), NIceDb::TUpdate<Schema::Data::NextPartitionId>(Self->NextPartitionId), NIceDb::TUpdate<Schema::Data::Config>(config)); for (auto& p : DeletedPartitions) { @@ -312,7 +312,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPt } -void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const TActorContext &ctx) { +void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const TActorContext &ctx) { if (!AppData(ctx)->PQConfig.GetCheckACL()) { RespondWithACL(ev, NKikimrPQ::EAccess::ALLOWED, "", ctx); @@ -324,7 +324,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const T return; } - auto& record = ev->Get()->Record; + auto& record = ev->Get()->Record; if (record.GetToken().empty()) { if (record.GetOperation() == NKikimrPQ::EOperation::WRITE_OP && TabletConfig.GetRequireAuthWrite() || @@ -332,40 +332,40 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const T RespondWithACL(ev, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "topic " << Topic << " requires authentication", ctx); } else { RespondWithACL(ev, NKikimrPQ::EAccess::ALLOWED, "", ctx); - } + } return; - } + } NACLib::TUserToken token(record.GetToken()); CheckACL(ev, token, ctx); -} - - -void TPersQueueReadBalancer::RespondWithACL( - const TEvPersQueue::TEvCheckACL::TPtr &request, - const NKikimrPQ::EAccess &access, - const TString &error, - const TActorContext &ctx) { - THolder<TEvPersQueue::TEvCheckACLResponse> res{new TEvPersQueue::TEvCheckACLResponse}; - res->Record.SetTopic(Topic); +} + + +void TPersQueueReadBalancer::RespondWithACL( + const TEvPersQueue::TEvCheckACL::TPtr &request, + const NKikimrPQ::EAccess &access, + const TString &error, + const TActorContext &ctx) { + THolder<TEvPersQueue::TEvCheckACLResponse> res{new TEvPersQueue::TEvCheckACLResponse}; + res->Record.SetTopic(Topic); res->Record.SetPath(Path); - res->Record.SetAccess(access); - res->Record.SetError(error); - ctx.Send(request->Sender, res.Release()); -} - -void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx) { - NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow; + res->Record.SetAccess(access); + res->Record.SetError(error); + ctx.Send(request->Sender, res.Release()); +} + +void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx) { + NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow; const auto& record = request->Get()->Record; switch(record.GetOperation()) { - case NKikimrPQ::EOperation::READ_OP: - rights = NACLib::EAccessRights::SelectRow; - break; - case NKikimrPQ::EOperation::WRITE_OP: - rights = NACLib::EAccessRights::UpdateRow; - break; - }; - + case NKikimrPQ::EOperation::READ_OP: + rights = NACLib::EAccessRights::SelectRow; + break; + case NKikimrPQ::EOperation::WRITE_OP: + rights = NACLib::EAccessRights::UpdateRow; + break; + }; + TString user = record.HasUser() ? record.GetUser() : ""; if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) { @@ -377,11 +377,11 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req } if (ACL.CheckAccess(rights, token)) { RespondWithACL(request, NKikimrPQ::EAccess::ALLOWED, "", ctx); - } else { + } else { RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "access denied for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' : no " << (rights == NACLib::EAccessRights::SelectRow ? "ReadTopic" : "WriteTopic") << " permission" , ctx); - } -} - + } +} + void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvWakeupClient::TPtr &ev, const TActorContext& ctx) { auto jt = ClientsInfo.find(ev->Get()->Client); if (jt == ClientsInfo.end()) @@ -467,7 +467,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Path = record.GetPath(); TxId = record.GetTxId(); TabletConfig = record.GetTabletConfig(); - SchemeShardId = record.GetSchemeShardId(); + SchemeShardId = record.GetSchemeShardId(); TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0; ui32 prevNextPartitionId = NextPartitionId; NextPartitionId = record.HasNextPartitionId() ? record.GetNextPartitionId() : 0; @@ -588,7 +588,7 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { RestartPipe(ev->Get()->TabletId, ctx); - RequestTabletIfNeeded(ev->Get()->TabletId, ctx); + RequestTabletIfNeeded(ev->Get()->TabletId, ctx); } @@ -596,7 +596,7 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, { if (ev->Get()->Status != NKikimrProto::OK) { RestartPipe(ev->Get()->TabletId, ctx); - RequestTabletIfNeeded(ev->Get()->TabletId, ctx); + RequestTabletIfNeeded(ev->Get()->TabletId, ctx); } } @@ -610,9 +610,9 @@ void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContex } -void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx) +void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx) { - if ((tabletId == SchemeShardId && !WaitingForACL) || + if ((tabletId == SchemeShardId && !WaitingForACL) || (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId))) return; @@ -625,11 +625,11 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA } else { pipeClient = it->second; } - if (tabletId == SchemeShardId) { + if (tabletId == SchemeShardId) { NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId)); - } else { - NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus()); - } + } else { + NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus()); + } } @@ -671,24 +671,24 @@ void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) { } void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ctx); - if (!WaitingForACL) //ignore if already processed - return; - WaitingForACL = false; + Y_UNUSED(ctx); + if (!WaitingForACL) //ignore if already processed + return; + WaitingForACL = false; const auto& record = ev->Get()->GetRecord(); if (record.GetStatus() == NKikimrScheme::EStatus::StatusSuccess) { - ACL.Clear(); + ACL.Clear(); Y_PROTOBUF_SUPPRESS_NODISCARD ACL.MutableACL()->ParseFromString(record.GetPathDescription().GetSelf().GetEffectiveACL()); LastACLUpdate = ctx.Now(); - ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerMetadataRetryTimeoutSec()), new TEvPersQueue::TEvUpdateACL()); + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerMetadataRetryTimeoutSec()), new TEvPersQueue::TEvUpdateACL()); AnswerWaitingRequests(ctx); - } else { + } else { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "couldn't receive ACL due to " << record.GetStatus()); ctx.Schedule(ACL_ERROR_RETRY_TIMEOUT, new TEvPersQueue::TEvUpdateACL()); - } -} - + } +} + void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Y_UNUSED(ctx); //TODO: Deside about changing number of partitions and send request to SchemeShard @@ -707,21 +707,21 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { bool res = WaitingForStat.insert(tabletId).second; if (!res) //already asked stat continue; - RequestTabletIfNeeded(tabletId, ctx); + RequestTabletIfNeeded(tabletId, ctx); } } -void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) { - if (WaitingForACL) // if there is request infly - return; - if (SchemeShardId == 0) { +void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) { + if (WaitingForACL) // if there is request infly + return; + if (SchemeShardId == 0) { ctx.Schedule(ACL_SUCCESS_RETRY_TIMEOUT, new TEvPersQueue::TEvUpdateACL()); - } else { - WaitingForACL = true; - RequestTabletIfNeeded(SchemeShardId, ctx); - } -} - + } else { + WaitingForACL = true; + RequestTabletIfNeeded(SchemeShardId, ctx); + } +} + void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx) { const TActorId& sender = ev->Get()->ClientId; diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index 6a07edcb784..2e4de2bac53 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -56,7 +56,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa struct Version : Column<36, NScheme::NTypeIds::Uint32> {}; struct Config : Column<40, NScheme::NTypeIds::Utf8> {}; struct MaxPartsPerTablet : Column<41, NScheme::NTypeIds::Uint32> {}; - struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {}; + struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {}; struct NextPartitionId : Column<43, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Key>; @@ -180,9 +180,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it } - void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) { - GetACL(ctx); - } + void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) { + GetACL(ctx); + } void Die(const TActorContext& ctx) override { for (auto& pipe : TabletPipes) { @@ -240,7 +240,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa RegisterEvents.clear(); ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it - ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL()); + ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL()); } bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) override; @@ -256,7 +256,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx); - void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&); + void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&); void Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPtr&, const TActorContext&); void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&); @@ -267,17 +267,17 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa TStringBuilder GetPrefix() const; - void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&); + void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&); void RestartPipe(const ui64 tabletId, const TActorContext&); void CheckStat(const TActorContext&); - void RespondWithACL( - const TEvPersQueue::TEvCheckACL::TPtr &request, - const NKikimrPQ::EAccess &access, - const TString &error, - const TActorContext &ctx); - void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx); + void RespondWithACL( + const TEvPersQueue::TEvCheckACL::TPtr &request, + const NKikimrPQ::EAccess &access, + const TString &error, + const TActorContext &ctx); + void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx); void GetStat(const TActorContext&); - void GetACL(const TActorContext&); + void GetACL(const TActorContext&); void AnswerWaitingRequests(const TActorContext& ctx); void Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx); @@ -298,10 +298,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa ui32 Generation; int Version; ui32 MaxPartsPerTablet; - ui64 SchemeShardId; + ui64 SchemeShardId; NKikimrPQ::TPQTabletConfig TabletConfig; - NACLib::TSecurityObject ACL; - TInstant LastACLUpdate; + NACLib::TSecurityObject ACL; + TInstant LastACLUpdate; THashSet<TString> Consumers; @@ -419,7 +419,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa THashMap<ui64, TActorId> TabletPipes; THashSet<ui64> WaitingForStat; - bool WaitingForACL; + bool WaitingForACL; ui64 TotalAvgSpeedSec; ui64 MaxAvgSpeedSec; ui64 TotalAvgSpeedMin; @@ -444,8 +444,8 @@ public: , Generation(0) , Version(-1) , MaxPartsPerTablet(0) - , SchemeShardId(0) - , LastACLUpdate(TInstant::Zero()) + , SchemeShardId(0) + , LastACLUpdate(TInstant::Zero()) , TxId(0) , NumActiveParts(0) , MaxIdx(0) @@ -455,7 +455,7 @@ public: , TotalGroups(0) , NoGroupsInBase(true) , ResourceMetrics(nullptr) - , WaitingForACL(false) + , WaitingForACL(false) , TotalAvgSpeedSec(0) , MaxAvgSpeedSec(0) , TotalAvgSpeedMin(0) @@ -492,8 +492,8 @@ public: switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, Handle); HFunc(TEvents::TEvWakeup, HandleWakeup); - HFunc(TEvPersQueue::TEvUpdateACL, HandleUpdateACL); - HFunc(TEvPersQueue::TEvCheckACL, Handle); + HFunc(TEvPersQueue::TEvUpdateACL, HandleUpdateACL); + HFunc(TEvPersQueue::TEvCheckACL, Handle); HFunc(TEvPersQueue::TEvGetPartitionIdForWrite, Handle); HFunc(TEvPersQueue::TEvUpdateBalancerConfig, Handle); HFunc(TEvPersQueue::TEvWakeupClient, Handle); diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 258e947c467..35a628a9e5e 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -163,7 +163,7 @@ struct TUserInfo { i64 ReadOffset = -1; TInstant ReadWriteTimestamp; TInstant ReadCreateTimestamp; - ui64 ReadOffsetRewindSum = 0; + ui64 ReadOffsetRewindSum = 0; bool ReadScheduled = false; @@ -304,7 +304,7 @@ struct TUserInfo { , ReadOffset(-1) , ReadWriteTimestamp(TAppData::TimeProvider->Now()) , ReadCreateTimestamp(TAppData::TimeProvider->Now()) - , ReadOffsetRewindSum(readOffsetRewindSum) + , ReadOffsetRewindSum(readOffsetRewindSum) , ReadScheduled(false) , Important(important) , ReadFromTimestamp(readFromTimestamp) diff --git a/ydb/core/protos/auth.proto b/ydb/core/protos/auth.proto index 1f252b8f711..da88d9b692e 100644 --- a/ydb/core/protos/auth.proto +++ b/ydb/core/protos/auth.proto @@ -1,9 +1,9 @@ package NKikimrProto; -option java_package = "ru.yandex.kikimr.proto"; - -message TAuthConfig { - optional TUserRegistryConfig UserRegistryConfig = 1; - optional TTVMConfig TVMConfig = 2; +option java_package = "ru.yandex.kikimr.proto"; + +message TAuthConfig { + optional TUserRegistryConfig UserRegistryConfig = 1; + optional TTVMConfig TVMConfig = 2; optional string StaffApiUserToken = 3; optional string BlackBoxEndpoint = 4 [default = "blackbox.yandex-team.ru"]; optional string AccessServiceEndpoint = 5 [default = "as.private-api.cloud.yandex.net:4286"]; @@ -42,17 +42,17 @@ message TAuthConfig { optional string PathToRootCA = 60 [default = "/etc/ssl/certs/YandexInternalRootCA.pem"]; // root CA certificate PEM/x509 optional uint32 AccessServiceGrpcKeepAliveTimeMs = 70 [default = 10000]; // CLOUD-27573 optional uint32 AccessServiceGrpcKeepAliveTimeoutMs = 71 [default = 1000]; // CLOUD-27573 -} - -message TUserRegistryConfig { - optional string Query = 1; // Must take uid (ui64) as parameter and return user (utf8) -} - -message TTVMConfig { - optional bool Enabled = 1; - optional uint32 ServiceTVMId = 2; - optional string PublicKeys = 3; - optional bool UpdatePublicKeys = 4 [default = true]; - optional uint64 UpdatePublicKeysSuccessTimeout = 5 [default = 82800]; - optional uint64 UpdatePublicKeysFailureTimeout = 6 [default = 10]; -} +} + +message TUserRegistryConfig { + optional string Query = 1; // Must take uid (ui64) as parameter and return user (utf8) +} + +message TTVMConfig { + optional bool Enabled = 1; + optional uint32 ServiceTVMId = 2; + optional string PublicKeys = 3; + optional bool UpdatePublicKeys = 4 [default = true]; + optional uint64 UpdatePublicKeysSuccessTimeout = 5 [default = 82800]; + optional uint64 UpdatePublicKeysFailureTimeout = 6 [default = 10]; +} diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d64169d4fc0..177350bd296 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1357,7 +1357,7 @@ message TAppConfig { optional NKikimrCms.TCmsConfig CmsConfig = 25; optional TFeatureFlags FeatureFlags = 26; optional TSqsConfig SqsConfig = 27; - optional NKikimrPQ.TPQConfig PQConfig = 28; + optional NKikimrPQ.TPQConfig PQConfig = 28; optional NKikimrTenantPool.TTenantPoolConfig TenantPoolConfig = 29; optional NKikimrProto.TAuthConfig AuthConfig = 30; optional NKikimrTenantSlotBroker.TConfig TenantSlotBrokerConfig = 32; diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 849b292383a..bfa2e43b944 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -15,7 +15,7 @@ message TPersQueuePartitionRequest { optional int32 Count = 4; //optional optional int32 Bytes = 5; //optional, max value = 25Mb optional int32 TimeoutMs = 6; //ms, default = 1000 - optional int32 MaxTimeLagMs = 8; // optional, default = infinity, why we use int instead of uint? + optional int32 MaxTimeLagMs = 8; // optional, default = infinity, why we use int instead of uint? optional uint64 ReadTimestampMs = 9; //optional, default = 0 optional bool MirrorerRequest = 10 [default = false]; @@ -60,7 +60,7 @@ message TPersQueuePartitionRequest { optional int32 TotalParts = 6; //fill it for first part of multi-part message optional int32 TotalSize = 7; // fill it for first part of multi-part message optional int64 CreateTimeMS = 8; //mandatory - optional bool DisableDeduplication = 9 [ default = false ]; + optional bool DisableDeduplication = 9 [ default = false ]; optional int64 WriteTimeMS = 10; //for mirroring only optional int32 UncompressedSize = 12; //fill it for all parts @@ -203,7 +203,7 @@ message TPersQueueFetchRequest { optional int32 Partition = 2; // must be set optional int64 Offset = 3; // must be set optional int32 MaxBytes = 4; // must be set - optional uint64 ReadTimestampMs = 5; //optional, default = 0 + optional uint64 ReadTimestampMs = 5; //optional, default = 0 } repeated TPartitionInfo Partition = 1; optional int32 TotalMaxBytes = 2; //must be set @@ -221,7 +221,7 @@ message TPersQueueRequest { //only one from data, meta or fetch request must be set. optional string SecurityToken = 5; - optional string Ticket = 6; //if set, check for acl + optional string Ticket = 6; //if set, check for acl optional string RequestId = 100; //for logging } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 7c859274496..47ddb259e52 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -19,13 +19,13 @@ message TPartitionMeta { optional uint64 EndOffset = 2; } -message TPQConfig { - optional uint32 ACLRetryTimeoutSec = 1 [default = 300]; +message TPQConfig { + optional uint32 ACLRetryTimeoutSec = 1 [default = 300]; optional uint32 BalancerMetadataRetryTimeoutSec = 2 [default = 240]; optional uint32 MaxBlobsPerLevel = 3 [default = 64]; // will produce 8mb blobs at last level //32 => 1mb blobs at last level optional uint32 MaxBlobSize = 4 [default = 8388608]; //8mb - optional uint32 ClustersUpdateTimeoutSec = 5 [default = 30]; + optional uint32 ClustersUpdateTimeoutSec = 5 [default = 30]; optional bool Enabled = 6 [default = false]; // Enable PQ proxies @@ -36,7 +36,7 @@ message TPQConfig { optional bool CheckACL = 9 [default = false]; optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds. - optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days + optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days optional uint32 SourceIdTotalShardsCount = 12 [default = 131072]; optional NKikimrClient.TKeyValueRequest.ETactic Tactic = 13 [default = MAX_THROUGHPUT]; @@ -142,8 +142,8 @@ message TPQConfig { optional uint32 MetaCacheRefreshIntervalMilliSeconds = 41 [default = 10000]; optional bool MetaCacheSkipVersionCheck = 42 [default = false]; -} - +} + message TChannelProfile { optional string PoolKind = 1; optional uint64 Size = 2; // size in bytes @@ -231,12 +231,12 @@ message TPQTabletConfig { optional string TopicName = 4; // also filled by schemeshard optional uint32 Version = 5; //also filled by schemeshard optional bool LocalDC = 6 [default = false]; - optional bool RequireAuthWrite = 7 [default = false]; - optional bool RequireAuthRead = 8 [default = false]; - optional string Producer = 9; - optional string Ident = 10; - optional string Topic = 11; - optional string DC = 12; + optional bool RequireAuthWrite = 7 [default = false]; + optional bool RequireAuthRead = 8 [default = false]; + optional string Producer = 9; + optional string Ident = 10; + optional string Topic = 11; + optional string DC = 12; // ReadRules, ReadTopicTimestampMs, ReadRuleVersions, ConsumerFormatVersions and ConsumersCodecs form a consumer data array stored by columns repeated string ReadRules = 13; @@ -316,7 +316,7 @@ message TUpdateBalancerConfig { //for schemeshard use only optional TPQTabletConfig TabletConfig = 7; optional uint32 PartitionPerTablet = 8; - optional uint64 SchemeShardId = 9; + optional uint64 SchemeShardId = 9; message TPartition { optional uint32 Partition = 1; optional uint64 TabletId = 2; @@ -342,7 +342,7 @@ message TDescribeResponse { optional uint32 Version = 2; optional TPQTabletConfig Config = 3; optional uint32 PartitionPerTablet = 4; - optional uint64 SchemeShardId = 6; + optional uint64 SchemeShardId = 6; optional uint64 BalancerTabletId = 7; optional bytes SecurityObject = 8; //NACLibProto.TSecurityObject @@ -355,32 +355,32 @@ message TDescribeResponse { repeated TPartition Partitions = 5; } -message TCheckACL { - optional EOperation Operation = 2; +message TCheckACL { + optional EOperation Operation = 2; optional NPersQueueCommon.Credentials Auth = 4; //leaved for compatibility optional string User = 5; optional bytes Token = 6; -} - -message TCheckACLResponse { - optional EAccess Access = 1; - optional string Topic = 2; +} + +message TCheckACLResponse { + optional EAccess Access = 1; + optional string Topic = 2; optional string Path = 3; - optional string Error = 4; -} - -enum EOperation { - READ_OP = 1; - WRITE_OP = 2; -} - -enum EAccess { - ALLOWED = 1; - DENIED = 2; - UNKNOWN = 3; -} - + optional string Error = 4; +} + +enum EOperation { + READ_OP = 1; + WRITE_OP = 2; +} + +enum EAccess { + ALLOWED = 1; + DENIED = 2; + UNKNOWN = 3; +} + message TGetPartitionIdForWrite { } @@ -655,15 +655,15 @@ message TBatchHeader { optional bool HasKinesis = 8; } -message TUserInfo { - optional uint64 Offset = 1; - optional uint32 Generation = 2; - optional uint32 Step = 3; - optional string Session = 4; - optional uint64 OffsetRewindSum = 5; +message TUserInfo { + optional uint64 Offset = 1; + optional uint32 Generation = 2; + optional uint32 Step = 3; + optional string Session = 4; + optional uint64 OffsetRewindSum = 5; optional uint64 ReadRuleGeneration = 6; -} - +} + message TPartitionClientInfo { repeated int32 Partitions = 1; } diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 70bb65514cd..58387360879 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -1,5 +1,5 @@ PROTO_LIBRARY() - + GRPC() OWNER( @@ -119,7 +119,7 @@ SRCS( tx_sequenceshard.proto pdiskfit.proto pqconfig.proto - auth.proto + auth.proto key.proto grpc.proto grpc_pq_old.proto diff --git a/ydb/core/security/ticket_parser.cpp b/ydb/core/security/ticket_parser.cpp index c7d024d03dc..89e0ca3ddfe 100644 --- a/ydb/core/security/ticket_parser.cpp +++ b/ydb/core/security/ticket_parser.cpp @@ -183,17 +183,17 @@ class TTicketParser : public TActorBootstrapped<TTicketParser> { .AuthType = record.GetAuthType() }), ctx); CounterTicketsBuiltin->Inc(); - return; - } + return; + } if(record.Ticket.EndsWith("@" BUILTIN_ERROR_DOMAIN)) { record.TokenType = ETokenType::Builtin; SetError(key, record, {"Builtin error simulation"}, ctx); CounterTicketsBuiltin->Inc(); - return; - } - } - + return; + } + } + if (UseLoginProvider && (record.TokenType == ETokenType::Unknown || record.TokenType == ETokenType::Login)) { TString database = Config.GetDomainLoginOnly() ? DomainName : record.Database; auto itLoginProvider = LoginProviders.find(database); diff --git a/ydb/core/security/ticket_parser_ut.cpp b/ydb/core/security/ticket_parser_ut.cpp index 2b624efefe3..5ad6523a4ed 100644 --- a/ydb/core/security/ticket_parser_ut.cpp +++ b/ydb/core/security/ticket_parser_ut.cpp @@ -4,13 +4,13 @@ #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> - -#include "ticket_parser.h" - -namespace NKikimr { - + +#include "ticket_parser.h" + +namespace NKikimr { + Y_UNIT_TEST_SUITE(TTicketParserTest) { - + Y_UNIT_TEST(LoginGood) { using namespace Tests; TPortManager tp; @@ -133,5 +133,5 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) { UNIT_ASSERT(!result->Error.empty()); UNIT_ASSERT_EQUAL(result->Error.Message, "Token is not in correct format"); } -} -} +} +} diff --git a/ydb/core/security/ut/ya.make b/ydb/core/security/ut/ya.make index 24ae1eaca39..bcfd69c1a4b 100644 --- a/ydb/core/security/ut/ya.make +++ b/ydb/core/security/ut/ya.make @@ -1,24 +1,24 @@ UNITTEST_FOR(ydb/core/security) - + OWNER( g:kikimr g:logbroker ) -FORK_SUBTESTS() - +FORK_SUBTESTS() + TIMEOUT(600) SIZE(MEDIUM) -PEERDIR( +PEERDIR( ydb/core/testlib -) - +) + YQL_LAST_ABI_VERSION() -SRCS( - ticket_parser_ut.cpp -) - -END() +SRCS( + ticket_parser_ut.cpp +) + +END() diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index d4907c26f1e..ec0787b33e3 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -588,7 +588,7 @@ namespace Tests { { IActor* ticketParser = Settings->CreateTicketParser(Settings->AuthConfig); TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx); - Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx); + Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx); } { @@ -610,7 +610,7 @@ namespace Tests { nullptr); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); - } + } { IActor* txProxy = CreateTxProxy(Runtime->GetTxAllocatorTabletIds()); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 2064752ab45..f0890cb60f9 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -90,7 +90,7 @@ namespace Tests { ui16 Port; ui16 GrpcPort = 0; NKikimrProto::TAuthConfig AuthConfig; - NKikimrPQ::TPQConfig PQConfig; + NKikimrPQ::TPQConfig PQConfig; NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig; NKikimrNetClassifier::TNetClassifierConfig NetClassifierConfig; ui32 Domain = TestDomain; @@ -181,8 +181,8 @@ namespace Tests { explicit TServerSettings(ui16 port, const NKikimrProto::TAuthConfig authConfig = {}, const NKikimrPQ::TPQConfig pqConfig = {}) : Port(port) - , AuthConfig(authConfig) - , PQConfig(pqConfig) + , AuthConfig(authConfig) + , PQConfig(pqConfig) { AddStoragePool("test", "/" + DomainName + ":test"); } diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index f0599232508..225f86d75a0 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -566,7 +566,7 @@ public: TFsPath fsPath(path); CreateTable(fsPath.Dirname(), "Name: \"" + fsPath.Basename() + "\"" - "Columns { Name: \"Hash\" Type: \"Uint32\"}" + "Columns { Name: \"Hash\" Type: \"Uint32\"}" "Columns { Name: \"SourceId\" Type: \"Utf8\"}" "Columns { Name: \"Topic\" Type: \"Utf8\"}" "Columns { Name: \"Partition\" Type: \"Uint32\"}" @@ -577,13 +577,13 @@ public: } void InsertSourceId(ui32 hash, TString sourceId, ui64 accessTime, const TString& path = "/Root/PQ/SourceIdMeta2") { - TString query = - "DECLARE $Hash AS Uint32; " - "DECLARE $SourceId AS Utf8; " - "DECLARE $AccessTime AS Uint64; " + TString query = + "DECLARE $Hash AS Uint32; " + "DECLARE $SourceId AS Utf8; " + "DECLARE $AccessTime AS Uint64; " "UPSERT INTO [" + path + "] (Hash, SourceId, Topic, Partition, CreateTime, AccessTime) " "VALUES($Hash, $SourceId, \"1\", 0, 0, $AccessTime); "; - + NYdb::TParamsBuilder builder; auto params = builder .AddParam("$Hash").Uint32(hash).Build() @@ -592,25 +592,25 @@ public: .Build(); RunYqlDataQueryWithParams(query, params); - } - + } + THashMap<TString, TInstant> ListSourceIds(const TString& path = "/Root/PQ/SourceIdMeta2") { auto result = RunYqlDataQuery("SELECT SourceId, AccessTime FROM [" + path + "];"); NYdb::TResultSetParser parser(*result); - THashMap<TString, TInstant> sourceIds; + THashMap<TString, TInstant> sourceIds; while(parser.TryNextRow()) { TString sourceId = *parser.ColumnParser("SourceId").GetOptionalUtf8(); TInstant accessTime = TInstant::MilliSeconds(*parser.ColumnParser("AccessTime").GetOptionalUint64()); - sourceIds[sourceId] = accessTime; - } - return sourceIds; - } - + sourceIds[sourceId] = accessTime; + } + return sourceIds; + } + void InitDCs(THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST, const TString& localCluster = TString()) { MkDir("/Root/PQ", "Config"); - MkDir("/Root/PQ/Config", "V2"); + MkDir("/Root/PQ/Config", "V2"); RunYqlSchemeQuery(R"___( - CREATE TABLE [/Root/PQ/Config/V2/Cluster] ( + CREATE TABLE [/Root/PQ/Config/V2/Cluster] ( name Utf8, balancer Utf8, local Bool, @@ -1263,7 +1263,7 @@ public: UNIT_ASSERT(tt.GetErrorCode() == (ui32)NPersQueue::NErrorCode::OK); } } - return response->Record; + return response->Record; } diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto index 8b2a0986314..5b1d8a2794d 100644 --- a/ydb/public/api/protos/draft/persqueue_error_codes.proto +++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto @@ -13,7 +13,7 @@ enum EErrorCode { WRITE_ERROR_PARTITION_IS_FULL = 5; WRITE_ERROR_DISK_IS_FULL = 15; - WRITE_ERROR_BAD_OFFSET = 19; + WRITE_ERROR_BAD_OFFSET = 19; CREATE_SESSION_ALREADY_LOCKED = 6; DELETE_SESSION_NO_SESSION = 7; @@ -32,9 +32,9 @@ enum EErrorCode { UNKNOWN_TOPIC = 17; - ACCESS_DENIED = 18; - CLUSTER_DISABLED = 20; - + ACCESS_DENIED = 18; + CLUSTER_DISABLED = 20; + WRONG_PARTITION_NUMBER = 21; CREATE_TIMEOUT = 22; // TODO: move to pqlib codes diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 65c8aca1db2..07381627a6a 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -424,7 +424,7 @@ struct TEvPQProxy { struct TEvDieCommand : public NActors::TEventLocal<TEvDieCommand, EvDieCommand> { TEvDieCommand(const TString& reason, const PersQueue::ErrorCode::ErrorCode errorCode) : Reason(reason) - , ErrorCode(errorCode) + , ErrorCode(errorCode) { } TString Reason; @@ -648,20 +648,20 @@ private: TInstant LastACLCheckTimestamp; TInstant LogSessionDeadline; - ui64 BalancerTabletId; + ui64 BalancerTabletId; TActorId PipeToBalancer; - + // PQ tablet configuration that we get at the time of session initialization NKikimrPQ::TPQTabletConfig InitialPQTabletConfig; NKikimrPQClient::TDataChunk InitMeta; - TString LocalDC; + TString LocalDC; TString ClientDC; - TString SelectSourceIdQuery; - TString UpdateSourceIdQuery; - TInstant LastSourceIdUpdate; - ui64 SourceIdCreateTime; - bool SourceIdUpdateInfly; + TString SelectSourceIdQuery; + TString UpdateSourceIdQuery; + TInstant LastSourceIdUpdate; + ui64 SourceIdCreateTime; + bool SourceIdUpdateInfly; TVector<NPQ::TLabelsInfo> Aggr; NKikimr::NPQ::TMultiCounter SLITotal; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 654edfcfcd4..50e2355ebfc 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -783,8 +783,8 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false); SLITotal.Inc(); -} - +} + void TReadSessionActor::RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx) { @@ -808,8 +808,8 @@ void TReadSessionActor::RegisterSession(const TActorId& pipe, const TString& top void TReadSessionActor::RegisterSessions(const TActorContext& ctx) { InitDone = true; - for (auto& t : Topics) { - auto& topic = t.first; + for (auto& t : Topics) { + auto& topic = t.first; RegisterSession(t.second.PipeClient, topic, t.second.Groups, ctx); NumPartitionsFromTopic[t.second.TopicNameConverter->GetClientsideName()] = 0; } @@ -1647,7 +1647,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TA void TReadSessionActor::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) { - CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); + CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); } @@ -2455,7 +2455,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& } if (req->MaxTimeLagMs) { read->SetMaxTimeLagMs(req->MaxTimeLagMs); - } + } if (req->ReadTimestampMs) { read->SetReadTimestampMs(req->ReadTimestampMs); } diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp index 5f17a2f3aba..cb80a604c57 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp @@ -49,7 +49,7 @@ void TPQWriteService::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateFunc); } - + ui64 TPQWriteService::NextCookie() { return ++LastCookie; } @@ -214,20 +214,20 @@ bool TPQWriteService::TooMuchSessions() { TString TPQWriteService::AvailableLocalCluster(const TActorContext&) const { return HaveClusters && Enabled ? *LocalCluster : ""; -} - - - - +} + + + + /////////////////////////////////////////////////////////////////////////////// +} } } -} - - + + void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQWriteServiceActorID(), ev->Release().Release()); -} +} diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h index 73148388b29..75d1db6068e 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.h +++ b/ydb/services/persqueue_v1/grpc_pq_write.h @@ -75,10 +75,10 @@ private: ui32 MaxSessions; TMaybe<TString> LocalCluster; - bool Enabled; - TString SelectSourceIdQuery; - TString UpdateSourceIdQuery; - TString DeleteSourceIdQuery; + bool Enabled; + TString SelectSourceIdQuery; + TString UpdateSourceIdQuery; + TString DeleteSourceIdQuery; NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; // Detects client's datacenter by IP. May be null bool HaveClusters; diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 3038cc82a61..39074ad1146 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -11,7 +11,7 @@ #include <ydb/services/lib/sharding/sharding.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/digest/md5/md5.h> -#include <util/string/hex.h> +#include <util/string/hex.h> #include <util/string/vector.h> #include <util/string/escape.h> #include <util/string/printf.h> @@ -81,8 +81,8 @@ using namespace Ydb::PersQueue::V1; static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; static const ui32 MAX_BYTES_INFLIGHT = 1 << 20; //1mb -static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; -static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); +static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; +static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); static const TString SELECT_SOURCEID_QUERY1 = "--!syntax_v1\n" @@ -138,11 +138,11 @@ TWriteSessionActor::TWriteSessionActor( , RequestNotChecked(false) , LastACLCheckTimestamp(TInstant::Zero()) , LogSessionDeadline(TInstant::Zero()) - , BalancerTabletId(0) + , BalancerTabletId(0) , ClientDC(clientDC ? *clientDC : "other") - , LastSourceIdUpdate(TInstant::Zero()) - , SourceIdCreateTime(0) - , SourceIdUpdateInfly(false) + , LastSourceIdUpdate(TInstant::Zero()) + , SourceIdCreateTime(0) + , SourceIdUpdateInfly(false) { Y_ASSERT(Request); ++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true)); @@ -287,8 +287,8 @@ void TWriteSessionActor::CheckACL(const TActorContext& ctx) { Token->GetUserSID().c_str()); CloseSession(errorReason, PersQueue::ErrorCode::ACCESS_DENIED, ctx); } -} - +} + void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActorContext& ctx) { THolder<TEvPQProxy::TEvWriteInit> event(ev->Release()); @@ -324,7 +324,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor return; } EscapedSourceId = HexEncode(encodedSourceId); - + TString s = TopicConverter->GetClientsideName() + encodedSourceId; Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED); @@ -534,11 +534,11 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - NClient::TParameters parameters; - parameters["$Hash"] = Hash; + NClient::TParameters parameters; + parameters["$Hash"] = Hash; parameters["$Topic"] = TopicConverter->GetClientsideName(); - parameters["$SourceId"] = EscapedSourceId; - ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); + parameters["$SourceId"] = EscapedSourceId; + ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); State = ES_WAIT_TABLE_REQUEST_1; } @@ -588,18 +588,18 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { TStringBuilder errorReason; - errorReason << "internal error in kqp Marker# PQ50 : " << record; - if (State == EState::ES_INITED) { - LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason); - SourceIdUpdateInfly = false; - } else { + errorReason << "internal error in kqp Marker# PQ50 : " << record; + if (State == EState::ES_INITED) { + LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason); + SourceIdUpdateInfly = false; + } else { CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); - } + } return; } - if (State == EState::ES_WAIT_TABLE_REQUEST_1) { - SourceIdCreateTime = TInstant::Now().MilliSeconds(); + if (State == EState::ES_WAIT_TABLE_REQUEST_1) { + SourceIdCreateTime = TInstant::Now().MilliSeconds(); bool partitionFound = false; auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); @@ -614,19 +614,19 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const return; } partitionFound = true; - SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); + SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); } } LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " << SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions " - << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); + << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); if (!partitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) { Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : Hash % PartitionToTablet.size(); //choose partition default value partitionFound = true; } - + if (partitionFound) { UpdatePartition(ctx); } else { @@ -634,11 +634,11 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const } return; } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { - LastSourceIdUpdate = ctx.Now(); + LastSourceIdUpdate = ctx.Now(); ProceedPartition(Partition, ctx); - } else if (State == EState::ES_INITED) { - SourceIdUpdateInfly = false; - LastSourceIdUpdate = ctx.Now(); + } else if (State == EState::ES_INITED) { + SourceIdUpdateInfly = false; + LastSourceIdUpdate = ctx.Now(); } else { Y_FAIL("Wrong state"); } @@ -648,7 +648,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet const NActors::TActorContext& ctx ) { - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); @@ -661,19 +661,19 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - NClient::TParameters parameters; - parameters["$Hash"] = Hash; + NClient::TParameters parameters; + parameters["$Hash"] = Hash; parameters["$Topic"] = TopicConverter->GetClientsideName(); - parameters["$SourceId"] = EscapedSourceId; - parameters["$CreateTime"] = SourceIdCreateTime; - parameters["$AccessTime"] = TInstant::Now().MilliSeconds(); - parameters["$Partition"] = Partition; - ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); - - return ev; -} - - + parameters["$SourceId"] = EscapedSourceId; + parameters["$CreateTime"] = SourceIdCreateTime; + parameters["$AccessTime"] = TInstant::Now().MilliSeconds(); + parameters["$Partition"] = Partition; + ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); + + return ev; +} + + void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record; @@ -1112,8 +1112,8 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont if (!dataCheck(writeRequest, messageIndex)) { return; } - } - + } + THolder<TEvPQProxy::TEvWrite> event(ev->Release()); Writes.push_back(std::move(event)); @@ -1141,7 +1141,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont } void TWriteSessionActor::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) { - CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); + CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); } void TWriteSessionActor::LogSession(const TActorContext& ctx) { @@ -1161,9 +1161,9 @@ void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) { // ToDo[migration] - separate flag for having config tables if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SourceIdUpdateInfly && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD) { auto ev = MakeUpdateSourceIdMetadataRequest(ctx); - SourceIdUpdateInfly = true; + SourceIdUpdateInfly = true; ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } + } if (ctx.Now() >= LogSessionDeadline) { LogSession(ctx); } |