aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbaidarov <baidarov@yandex-team.ru>2022-02-10 16:52:23 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:23 +0300
commit7e0fa93950d364bdb59fbe19350a611dc65a6512 (patch)
treebe78cff15358bb53858ebdd80775302b2f79b6aa
parente31629dd6bd33e846b5ca7cafd3fb209a3a9f398 (diff)
downloadydb-7e0fa93950d364bdb59fbe19350a611dc65a6512.tar.gz
Restoring authorship annotation for <baidarov@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/core/base/appdata.h2
-rw-r--r--ydb/core/base/events.h6
-rw-r--r--ydb/core/base/user_registry.h76
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp10
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp10
-rw-r--r--ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp4
-rw-r--r--ydb/core/driver_lib/run/config_parser.cpp20
-rw-r--r--ydb/core/driver_lib/run/run.cpp8
-rw-r--r--ydb/core/persqueue/events/global.h38
-rw-r--r--ydb/core/persqueue/events/internal.h6
-rw-r--r--ydb/core/persqueue/key.h6
-rw-r--r--ydb/core/persqueue/partition.cpp60
-rw-r--r--ydb/core/persqueue/partition.h14
-rw-r--r--ydb/core/persqueue/pq_impl.cpp4
-rw-r--r--ydb/core/persqueue/pq_ut.cpp398
-rw-r--r--ydb/core/persqueue/read_balancer.cpp132
-rw-r--r--ydb/core/persqueue/read_balancer.h46
-rw-r--r--ydb/core/persqueue/user_info.h4
-rw-r--r--ydb/core/protos/auth.proto38
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/protos/msgbus_pq.proto8
-rw-r--r--ydb/core/protos/pqconfig.proto86
-rw-r--r--ydb/core/protos/ya.make4
-rw-r--r--ydb/core/security/ticket_parser.cpp12
-rw-r--r--ydb/core/security/ticket_parser_ut.cpp16
-rw-r--r--ydb/core/security/ut/ya.make22
-rw-r--r--ydb/core/testlib/test_client.cpp4
-rw-r--r--ydb/core/testlib/test_client.h6
-rw-r--r--ydb/core/testlib/test_pq_client.h34
-rw-r--r--ydb/public/api/protos/draft/persqueue_error_codes.proto8
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h18
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp12
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.cpp20
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.h8
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp94
35 files changed, 618 insertions, 618 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index c666f7468c0..fdff62b4033 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -133,7 +133,7 @@ struct TAppData {
THolder<NKikimrCms::TCmsConfig> DefaultCmsConfig;
NKikimrStream::TStreamingConfig StreamingConfig;
- NKikimrPQ::TPQConfig PQConfig;
+ NKikimrPQ::TPQConfig PQConfig;
NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig;
NKikimrNetClassifier::TNetClassifierConfig NetClassifierConfig;
NKikimrNetClassifier::TNetClassifierDistributableConfig NetClassifierDistributableConfig;
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index f5fedfe19b2..a2cc9e4f602 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -86,9 +86,9 @@ struct TKikimrEvents : TEvents {
ES_BLOCKSTORE, //4162
ES_RTMR_ICBUS,
ES_TENANT_POOL,
- ES_USER_REGISTRY,
- ES_TVM_SETTINGS_UPDATER,
- ES_PQ_CLUSTERS_UPDATER,
+ ES_USER_REGISTRY,
+ ES_TVM_SETTINGS_UPDATER,
+ ES_PQ_CLUSTERS_UPDATER,
ES_TENANT_SLOT_BROKER,
ES_GRPC_CALLS,
ES_CONSOLE,
diff --git a/ydb/core/base/user_registry.h b/ydb/core/base/user_registry.h
index 74484c3d06c..72ddefd4f76 100644
--- a/ydb/core/base/user_registry.h
+++ b/ydb/core/base/user_registry.h
@@ -1,48 +1,48 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/events.h>
#include <ydb/core/tx/defs.h>
-
-namespace NKikimr {
- struct TEvUserRegistry {
- enum EEv {
- EvGetUserById = EventSpaceBegin(TKikimrEvents::ES_USER_REGISTRY),
- EvGetUserByIdResult,
- EvEnd
- };
-
- static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY),
- "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY)");
-
- struct TEvGetUserById : TEventLocal<TEvGetUserById, EvGetUserById> {
- const ui64 UID;
+
+namespace NKikimr {
+ struct TEvUserRegistry {
+ enum EEv {
+ EvGetUserById = EventSpaceBegin(TKikimrEvents::ES_USER_REGISTRY),
+ EvGetUserByIdResult,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY),
+ "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_USER_REGISTRY)");
+
+ struct TEvGetUserById : TEventLocal<TEvGetUserById, EvGetUserById> {
+ const ui64 UID;
const TString UserNameHint;
-
+
TEvGetUserById(const ui64 uid, const TString& userNameHint = "")
- : UID(uid)
+ : UID(uid)
, UserNameHint(userNameHint)
- {}
- };
-
- struct TEvGetUserByIdResult : TEventLocal<TEvGetUserByIdResult, EvGetUserByIdResult> {
- const ui64 UID;
+ {}
+ };
+
+ struct TEvGetUserByIdResult : TEventLocal<TEvGetUserByIdResult, EvGetUserByIdResult> {
+ const ui64 UID;
const TString UserNameHint;
- const TString User;
- const TString Error;
-
+ const TString User;
+ const TString Error;
+
TEvGetUserByIdResult(const ui64 uid, const TString& userNameHint, const TString& user = "", const TString& error = "")
- : UID(uid)
+ : UID(uid)
, UserNameHint(userNameHint)
- , User(user)
- , Error(error)
- {}
- };
- };
-
+ , User(user)
+ , Error(error)
+ {}
+ };
+ };
+
inline NActors::TActorId MakeUserRegistryID() {
- const char name[12] = "userregistr";
+ const char name[12] = "userregistr";
return NActors::TActorId(0, TStringBuf(name, 12));
- }
-
- IActor* CreateUserRegistry(const TString& query);
-}
+ }
+
+ IActor* CreateUserRegistry(const TString& query);
+}
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index 78b83ef3a3b..a7ce6a4c623 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -458,7 +458,7 @@ protected:
ui32 TopicsAnswered;
THashSet<ui64> TabletsDiscovered;
THashSet<ui64> TabletsAnswered;
- ui32 AclRequests;
+ ui32 AclRequests;
ui32 DescribeRequests;
ui32 PartTabletsRequested;
TString ErrorReason;
@@ -779,8 +779,8 @@ public:
bool AnswerIfCanForMeta(const TActorContext& ctx) {
Y_VERIFY(IsMetaRequest);
Y_VERIFY(RequestProto.HasMetaRequest());
- if (AclRequests)
- return false;
+ if (AclRequests)
+ return false;
if (DescribeRequests)
return false;
const auto& meta = RequestProto.GetMetaRequest();
@@ -1210,7 +1210,7 @@ public:
const auto& offset = req.GetOffset();
const auto& part = req.GetPartition();
const auto& maxBytes = req.GetMaxBytes();
- const auto& readTimestampMs = req.GetReadTimestampMs();
+ const auto& readTimestampMs = req.GetReadTimestampMs();
auto it = TopicInfo.find(topic);
Y_VERIFY(it != TopicInfo.end());
if (it->second.PartitionToTablet.find(part) == it->second.PartitionToTablet.end()) { //tablet's info is not filled for this topic yet
@@ -1242,7 +1242,7 @@ public:
read->SetCount(1000000);
read->SetTimeoutMs(0);
read->SetBytes(Min<ui32>(maxBytes, FetchRequestBytesLeft));
- read->SetReadTimestampMs(readTimestampMs);
+ read->SetReadTimestampMs(readTimestampMs);
NTabletPipe::SendData(ctx, jt->second.PipeClient, preq.Release());
}
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
index 237bba147b9..b1fb7b8f587 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
@@ -727,7 +727,7 @@ public:
NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override {
NKikimrClient::TPersQueueRequest persQueueRequest;
- persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
+ persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetTopicMetadata();
req.AddTopic("topic1");
@@ -821,7 +821,7 @@ public:
NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override {
NKikimrClient::TPersQueueRequest persQueueRequest;
- persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
+ persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionLocations();
FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount);
@@ -984,7 +984,7 @@ public:
NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override {
NKikimrClient::TPersQueueRequest persQueueRequest;
- persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
+ persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionOffsets();
FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount);
@@ -1145,7 +1145,7 @@ public:
NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override {
NKikimrClient::TPersQueueRequest persQueueRequest;
- persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
+ persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetPartitionStatus();
FillValidTopicRequest(*req.MutableTopicRequest(), topicsCount);
@@ -1298,7 +1298,7 @@ public:
NKikimrClient::TPersQueueRequest MakeValidRequest(ui64 topicsCount = 2) override {
NKikimrClient::TPersQueueRequest persQueueRequest;
- persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
+ persQueueRequest.SetTicket("client_id@" BUILTIN_ACL_DOMAIN);
auto& req = *persQueueRequest.MutableMetaRequest()->MutableCmdGetReadSessionsInfo();
req.SetClientId("client_id");
diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
index 6766dd31719..40dbb7f1777 100644
--- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
+++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
@@ -208,10 +208,10 @@ protected:
config.Opts->AddLongOption("kqp-file", "Kikimr Query Processor config file").OptionalArgument("PATH");
config.Opts->AddLongOption("incrhuge-file", "incremental huge blob keeper config file").OptionalArgument("PATH");
config.Opts->AddLongOption("memorylog-file", "set buffer size for memory log").OptionalArgument("PATH");
- config.Opts->AddLongOption("pq-file", "PersQueue config file").OptionalArgument("PATH");
+ config.Opts->AddLongOption("pq-file", "PersQueue config file").OptionalArgument("PATH");
config.Opts->AddLongOption("pqcd-file", "PersQueue cluster discovery config file").OptionalArgument("PATH");
config.Opts->AddLongOption("netclassifier-file", "NetClassifier config file").OptionalArgument("PATH");
- config.Opts->AddLongOption("auth-file", "authorization configuration").OptionalArgument("PATH");
+ config.Opts->AddLongOption("auth-file", "authorization configuration").OptionalArgument("PATH");
config.Opts->AddLongOption("auth-token-file", "authorization token configuration").OptionalArgument("PATH");
config.Opts->AddLongOption("key-file", "tanant encryption key configuration").OptionalArgument("PATH");
config.Opts->AddLongOption("pdisk-key-file", "pdisk encryption key configuration").OptionalArgument("PATH");
diff --git a/ydb/core/driver_lib/run/config_parser.cpp b/ydb/core/driver_lib/run/config_parser.cpp
index bb686562912..e62a7d38295 100644
--- a/ydb/core/driver_lib/run/config_parser.cpp
+++ b/ydb/core/driver_lib/run/config_parser.cpp
@@ -65,10 +65,10 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts&
opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST");
opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT");
opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT");
- opts.AddLongOption("pq-file", "PQ config file").OptionalArgument("PATH");
+ opts.AddLongOption("pq-file", "PQ config file").OptionalArgument("PATH");
opts.AddLongOption("pqcd-file", "PQCD config file").OptionalArgument("PATH");
opts.AddLongOption("netclassifier-file", "NetClassifier config file").OptionalArgument("PATH");
- opts.AddLongOption("auth-file", "authorization config file").OptionalArgument("PATH");
+ opts.AddLongOption("auth-file", "authorization config file").OptionalArgument("PATH");
opts.AddLongOption("auth-token-file", "authorization token config file").OptionalArgument("PATH");
opts.AddLongOption("key-file", "encryption key config file").OptionalArgument("PATH");
opts.AddLongOption("sqs-file", "SQS config file").OptionalArgument("PATH");
@@ -146,7 +146,7 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetStartGRpcProxy(true);
conf.SetPort(FromString<ui16>(res.Get("grpc-port")));
}
-
+
if (res.Has("grpcs-port")) {
auto& conf = *Config.AppConfig.MutableGRpcConfig();
conf.SetStartGRpcProxy(true);
@@ -168,10 +168,10 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetPublicSslPort(FromString<ui16>(res.Get("grpcs-public-port")));
}
- if (res.Has("pq-file")) {
- Y_VERIFY(ParsePBFromFile(res.Get("pq-file"), Config.AppConfig.MutablePQConfig()));
- }
-
+ if (res.Has("pq-file")) {
+ Y_VERIFY(ParsePBFromFile(res.Get("pq-file"), Config.AppConfig.MutablePQConfig()));
+ }
+
if (res.Has("pqcd-file")) {
Y_VERIFY(ParsePBFromFile(res.Get("pqcd-file"), Config.AppConfig.MutablePQClusterDiscoveryConfig()));
}
@@ -180,9 +180,9 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
Y_VERIFY(ParsePBFromFile(res.Get("netclassifier-file"), Config.AppConfig.MutableNetClassifierConfig()));
}
- if (res.Has("auth-file")) {
- Y_VERIFY(ParsePBFromFile(res.Get("auth-file"), Config.AppConfig.MutableAuthConfig()));
- }
+ if (res.Has("auth-file")) {
+ Y_VERIFY(ParsePBFromFile(res.Get("auth-file"), Config.AppConfig.MutableAuthConfig()));
+ }
if (res.Has("auth-token-file")) {
Y_VERIFY(ParsePBFromFile(res.Get("auth-token-file"), Config.AppConfig.MutableAuthConfig()));
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index a4f74aa4e0f..076ca7b2c38 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -875,9 +875,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
}
if (runConfig.AppConfig.HasPQConfig()) {
- AppData->PQConfig.CopyFrom(runConfig.AppConfig.GetPQConfig());
+ AppData->PQConfig.CopyFrom(runConfig.AppConfig.GetPQConfig());
}
-
+
if (runConfig.AppConfig.HasPQClusterDiscoveryConfig()) {
AppData->PQClusterDiscoveryConfig.CopyFrom(runConfig.AppConfig.GetPQClusterDiscoveryConfig());
}
@@ -891,9 +891,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
}
if (runConfig.AppConfig.HasAuthConfig()) {
- AppData->AuthConfig.CopyFrom(runConfig.AppConfig.GetAuthConfig());
+ AppData->AuthConfig.CopyFrom(runConfig.AppConfig.GetAuthConfig());
}
-
+
if (runConfig.AppConfig.HasKeyConfig()) {
AppData->KeyConfig.CopyFrom(runConfig.AppConfig.GetKeyConfig());
}
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index e1957e7a16f..a82ad3abdb8 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -36,9 +36,9 @@ struct TEvPersQueue {
EvGetReadSessionsInfo,
EvReadSessionsInfoResponse,
EvWakeupClient,
- EvUpdateACL,
- EvCheckACL,
- EvCheckACLResponse,
+ EvUpdateACL,
+ EvCheckACL,
+ EvCheckACLResponse,
EvError,
EvGetPartitionIdForWrite,
EvGetPartitionIdForWriteResponse,
@@ -182,22 +182,22 @@ struct TEvPersQueue {
TEvDescribeResponse()
{}
};
-
- struct TEvUpdateACL : public TEventLocal<TEvUpdateACL, EvUpdateACL> {
- TEvUpdateACL()
- {}
- };
-
- struct TEvCheckACL : public TEventPB<TEvCheckACL, NKikimrPQ::TCheckACL, EvCheckACL> {
- TEvCheckACL()
- {}
- };
-
- struct TEvCheckACLResponse : public TEventPB<TEvCheckACLResponse, NKikimrPQ::TCheckACLResponse, EvCheckACLResponse> {
- TEvCheckACLResponse()
- {};
- };
-
+
+ struct TEvUpdateACL : public TEventLocal<TEvUpdateACL, EvUpdateACL> {
+ TEvUpdateACL()
+ {}
+ };
+
+ struct TEvCheckACL : public TEventPB<TEvCheckACL, NKikimrPQ::TCheckACL, EvCheckACL> {
+ TEvCheckACL()
+ {}
+ };
+
+ struct TEvCheckACLResponse : public TEventPB<TEvCheckACLResponse, NKikimrPQ::TCheckACLResponse, EvCheckACLResponse> {
+ TEvCheckACLResponse()
+ {};
+ };
+
struct TEvError : public TEventPB<TEvError,
NPersQueueCommon::TError, EvError> {
TEvError() {}
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index bc892988b47..b24395e5cba 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -125,7 +125,7 @@ struct TEvPQ {
ui32 TotalSize;
ui64 CreateTimestamp;
ui64 ReceiveTimestamp;
- bool DisableDeduplication;
+ bool DisableDeduplication;
ui64 WriteTimestamp;
TString Data;
ui32 UncompressedSize;
@@ -173,7 +173,7 @@ struct TEvPQ {
, ClientId(clientId)
, Timeout(timeout)
, Size(size)
- , MaxTimeLagMs(maxTimeLagMs)
+ , MaxTimeLagMs(maxTimeLagMs)
, ReadTimestampMs(readTimestampMs)
, ClientDC(clientDC)
, ExternalOperation(externalOperation)
@@ -187,7 +187,7 @@ struct TEvPQ {
TString ClientId;
ui32 Timeout;
ui32 Size;
- ui32 MaxTimeLagMs;
+ ui32 MaxTimeLagMs;
ui64 ReadTimestampMs;
TString ClientDC;
bool ExternalOperation;
diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h
index 098c67cf7d9..1bf03466edb 100644
--- a/ydb/core/persqueue/key.h
+++ b/ydb/core/persqueue/key.h
@@ -20,10 +20,10 @@ public:
};
enum EMark : char {
- MarkUserDeprecated = 'u',
- MarkSourceId = 's',
+ MarkUserDeprecated = 'u',
+ MarkSourceId = 's',
MarkProtoSourceId = 'p',
- MarkUser = 'c'
+ MarkUser = 'c'
};
TKeyPrefix(EType type, const ui32 partition)
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index cc8e2419cf2..8469f4a9ef5 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -214,20 +214,20 @@ IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value) {
return out;
}
-
+
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
- if (container.empty()) {
+ if (container.empty()) {
return offset;
- }
- auto it = std::lower_bound(container.begin(), container.end(), timestamp,
+ }
+ auto it = std::lower_bound(container.begin(), container.end(), timestamp,
[](const TDataKey& p, const TInstant timestamp) { return timestamp > p.Timestamp; });
- if (it == container.end()) {
+ if (it == container.end()) {
return offset;
- } else {
- return it->Key.GetOffset();
- }
-}
-
+ } else {
+ return it->Key.GetOffset();
+ }
+}
+
struct TMirrorerInfo {
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
: Actor(actor)
@@ -686,7 +686,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
TABLEH() {out << "ReadOffset";}
TABLEH() {out << "ReadWriteTimestamp";}
TABLEH() {out << "ReadCreateTimestamp";}
- TABLEH() {out << "ReadOffsetRewindSum";}
+ TABLEH() {out << "ReadOffsetRewindSum";}
TABLEH() {out << "ActiveReads";}
TABLEH() {out << "Subscriptions";}
}
@@ -703,7 +703,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
TABLED() {out << (d.second.GetReadOffset());}
TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadWriteTimestamp());}
TABLED() {out << ToStringLocalTimeUpToSeconds(d.second.GetReadCreateTimestamp());}
- TABLED() {out << (d.second.ReadOffsetRewindSum);}
+ TABLED() {out << (d.second.ReadOffsetRewindSum);}
TABLED() {out << d.second.ActiveReads;}
TABLED() {out << d.second.Subscriptions;}
}
@@ -1400,7 +1400,7 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe
SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now());
} else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) {
UsersInfoStorage.Parse(*key, pair.GetValue(), ctx);
- } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) {
+ } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) {
UsersInfoStorage.ParseDeprecated(*key, pair.GetValue(), ctx);
}
}
@@ -2633,29 +2633,29 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
const TString& user = read->ClientId;
auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
- ui64 offset = read->Offset;
+ ui64 offset = read->Offset;
if (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1)) {
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
timestamp = Max(timestamp, userInfo.ReadFromTimestamp);
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
userInfo.ReadOffsetRewindSum += offset - read->Offset;
- }
+ }
TReadInfo info(user, read->ClientDC, offset, read->PartNo, read->Count, read->Size, read->Cookie, read->ReadTimestampMs, waitQuotaTime);
ui64 cookie = Cookie++;
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "read cookie " << cookie << " Topic '" << TopicName << "' partition " << Partition << " user " << user
- << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset
- << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);
+ << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset
+ << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);
- if (offset == EndOffset) {
+ if (offset == EndOffset) {
if (read->Timeout > 30000) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "too big read timeout " << " Topic '" << TopicName << "' partition " << Partition << " user " << read->ClientId
- << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset
- << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);
+ << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset
+ << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);
read->Timeout = 30000;
}
Subscriber.AddSubscription(std::move(info), read->Timeout, cookie, ctx);
@@ -2665,7 +2665,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
return;
}
- Y_VERIFY(offset < EndOffset);
+ Y_VERIFY(offset < EndOffset);
ProcessRead(ctx, std::move(info), cookie, false);
}
@@ -2708,7 +2708,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
if (it != SourceIdStorage.GetInMemorySourceIds().end()) {
maxSeqNo = it->second.SeqNo;
maxOffset = it->second.Offset;
- if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
+ if (it->second.SeqNo >= seqNo && !writeResponse.Msg.DisableDeduplication) {
already = true;
}
}
@@ -3172,14 +3172,14 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum;
if (readOffsetRewindSum != userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) {
- haveChanges = true;
+ haveChanges = true;
userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Set(readOffsetRewindSum);
- }
+ }
if (readOffsetRewindSum != userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_TOTAL].Get()) {
haveChanges = true;
userInfo.LabeledCounters.GetCounters()[METRIC_READ_OFFSET_REWIND_TOTAL].Set(readOffsetRewindSum);
}
-
+
ui32 id = METRIC_TOTAL_READ_SPEED_1;
for (ui32 i = 0; i < userInfo.AvgReadBytes.size(); ++i) {
ui64 avg = userInfo.AvgReadBytes[i].GetValue();
@@ -3893,7 +3893,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
continue;*/
}
- TBuffer idata;
+ TBuffer idata;
{
NKikimrPQ::TUserInfo userData;
userData.SetOffset(offset);
@@ -3904,11 +3904,11 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
userData.SetReadRuleGeneration(readRuleGeneration);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD userData.SerializeToString(&out);
-
+
idata.Append(out.c_str(), out.size());
}
TBuffer idataDeprecated = NDeprecatedUserData::Serialize(offset, gen, step, session);
-
+
auto write = request->Record.AddCmdWrite();
write->SetKey(ikey.Data(), ikey.Size());
write->SetValue(idata.Data(), idata.Size());
@@ -3917,7 +3917,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
write2->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size());
write2->SetValue(idataDeprecated.Data(), idataDeprecated.Size());
write2->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
-
+
request->Record.SetCookie(cookie);
ctx.Send(Tablet, request.Release());
@@ -3945,7 +3945,7 @@ void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue:
void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST) {
- ReplyError(ctx, p.Cookie, errorCode, errorStr);
+ ReplyError(ctx, p.Cookie, errorCode, errorStr);
Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size());
FailBadClient(ctx);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 2eec26f2516..d5ddcaa6c10 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -32,15 +32,15 @@ static const ui32 MAX_BLOB_PART_SIZE = 500 << 10; //500Kb
typedef TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor> TPartitionLabeledCounters;
-struct TDataKey {
- TKey Key;
- ui32 Size;
+struct TDataKey {
+ TKey Key;
+ ui32 Size;
TInstant Timestamp;
- ui64 CumulativeSize;
-};
-
+ ui64 CumulativeSize;
+};
+
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset);
-
+
struct TMirrorerInfo;
class TPartition : public TActorBootstrapped<TPartition> {
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 9937dbe561c..cf592028b22 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1499,7 +1499,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
ui32 mSize = MAX_BLOB_PART_SIZE - cmd.GetSourceId().size() - sizeof(ui32) - TClientBlob::OVERHEAD; //megaqc - remove this
Y_VERIFY(mSize > 204800);
ui64 receiveTimestampMs = TAppData::TimeProvider->Now().MilliSeconds();
- bool disableDeduplication = cmd.GetDisableDeduplication();
+ bool disableDeduplication = cmd.GetDisableDeduplication();
if (cmd.GetData().size() > mSize) {
if (cmd.HasPartNo()) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST,
@@ -1650,7 +1650,7 @@ void TPersQueue::HandleReadRequest(const ui64 responseCookie, const TActorId& pa
} else if (cmd.HasPartNo() && cmd.GetPartNo() < 0) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST,
TStringBuilder() << "invalid partNo in read request: " << ToString(req).data());
- } else if (cmd.HasMaxTimeLagMs() && cmd.GetMaxTimeLagMs() < 0) {
+ } else if (cmd.HasMaxTimeLagMs() && cmd.GetMaxTimeLagMs() < 0) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST,
TStringBuilder() << "invalid maxTimeLagMs in read request: " << ToString(req).data());
} else {
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index 13a61229105..2f073a41f41 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -138,53 +138,53 @@ Y_UNIT_TEST(TestGroupsBalancer3) {
Y_UNIT_TEST(TestUserInfoCompatibility) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- activeZone = false;
- TString client = "test";
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ TString client = "test";
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
-
+
PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}}, tc, 4, 6*1024*1024, true, 0, 0, 1);
- TVector<std::pair<ui64, TString>> data;
- data.push_back({1, "s"});
- data.push_back({2, "q"});
- CmdWrite(0, "sourceid", data, tc);
- CmdWrite(1, "sourceid", data, tc);
- CmdWrite(2, "sourceid", data, tc);
+ TVector<std::pair<ui64, TString>> data;
+ data.push_back({1, "s"});
+ data.push_back({2, "q"});
+ CmdWrite(0, "sourceid", data, tc);
+ CmdWrite(1, "sourceid", data, tc);
+ CmdWrite(2, "sourceid", data, tc);
CmdWrite(3, "sourceid", data, tc);
+
-
- THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
- FillUserInfo(request->Record.AddCmdWrite(), client, 0, 0);
- FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 0, 0);
- FillUserInfo(request->Record.AddCmdWrite(), client, 1, 1);
- FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 2, 1);
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+ FillUserInfo(request->Record.AddCmdWrite(), client, 0, 0);
+ FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 0, 0);
+ FillUserInfo(request->Record.AddCmdWrite(), client, 1, 1);
+ FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 2, 1);
FillUserInfo(request->Record.AddCmdWrite(), client, 2, 1);
FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 3, 0);
FillUserInfo(request->Record.AddCmdWrite(), client, 3, 1);
-
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- TAutoPtr<IEventHandle> handle;
- TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
- Y_UNUSED(result);
-
- RestartTablet(tc);
+
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+ Y_UNUSED(result);
+
+ RestartTablet(tc);
Cerr << "AFTER RESTART\n";
- CmdGetOffset(0, client, 0, tc);
- CmdGetOffset(1, client, 1, tc);
- CmdGetOffset(2, client, 1, tc);
+ CmdGetOffset(0, client, 0, tc);
+ CmdGetOffset(1, client, 1, tc);
+ CmdGetOffset(2, client, 1, tc);
CmdGetOffset(3, client, 1, tc);
- });
-}
-
+ });
+}
+
Y_UNIT_TEST(TestReadRuleVersions) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
@@ -273,49 +273,49 @@ Y_UNIT_TEST(TestCreateBalancer) {
}
Y_UNIT_TEST(TestDescribeBalancer) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- activeZone = false;
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
ui64 ssId = 9876;
BootFakeSchemeShard(*tc.Runtime, ssId, state);
- tc.Runtime->SetScheduledLimit(50);
- tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100));
+ tc.Runtime->SetScheduledLimit(50);
+ tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100));
BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc);
- TAutoPtr<IEventHandle> handle;
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries());
- TEvPersQueue::TEvDescribeResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle);
- UNIT_ASSERT(result);
- auto& rec = result->Record;
+ TAutoPtr<IEventHandle> handle;
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries());
+ TEvPersQueue::TEvDescribeResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle);
+ UNIT_ASSERT(result);
+ auto& rec = result->Record;
UNIT_ASSERT(rec.HasSchemeShardId() && rec.GetSchemeShardId() == ssId);
- RestartTablet(tc);
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle);
- UNIT_ASSERT(result);
- auto& rec2 = result->Record;
+ RestartTablet(tc);
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, new TEvPersQueue::TEvDescribe(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvDescribeResponse>(handle);
+ UNIT_ASSERT(result);
+ auto& rec2 = result->Record;
UNIT_ASSERT(rec2.HasSchemeShardId() && rec2.GetSchemeShardId() == ssId);
- });
-}
+ });
+}
Y_UNIT_TEST(TestCheckACL) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- activeZone = false;
- TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
- ui64 ssId = 9876;
- BootFakeSchemeShard(*tc.Runtime, ssId, state);
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
+ ui64 ssId = 9876;
+ BootFakeSchemeShard(*tc.Runtime, ssId, state);
IActor* ticketParser = NKikimr::CreateTicketParser(tc.Runtime->GetAppData().AuthConfig);
TActorId ticketParserId = tc.Runtime->Register(ticketParser);
- tc.Runtime->RegisterService(NKikimr::MakeTicketParserID(), ticketParserId);
+ tc.Runtime->RegisterService(NKikimr::MakeTicketParserID(), ticketParserId);
TAutoPtr<IEventHandle> handle;
THolder<TEvPersQueue::TEvCheckACL> request(new TEvPersQueue::TEvCheckACL());
@@ -326,57 +326,57 @@ Y_UNIT_TEST(TestCheckACL) {
tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
tc.Runtime->SetScheduledLimit(600);
- tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100));
+ tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100));
BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc);
-
- {
- TDispatchOptions options;
+
+ {
+ TDispatchOptions options;
options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult);
- tc.Runtime->DispatchEvents(options);
- }
-
- TEvPersQueue::TEvCheckACLResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec = result->Record;
- UNIT_ASSERT(rec.GetAccess() == NKikimrPQ::EAccess::DENIED);
- UNIT_ASSERT(rec.GetTopic() == "topic");
-
- state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "client@" BUILTIN_ACL_DOMAIN);
-
- {
- TDispatchOptions options;
+ tc.Runtime->DispatchEvents(options);
+ }
+
+ TEvPersQueue::TEvCheckACLResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec = result->Record;
+ UNIT_ASSERT(rec.GetAccess() == NKikimrPQ::EAccess::DENIED);
+ UNIT_ASSERT(rec.GetTopic() == "topic");
+
+ state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "client@" BUILTIN_ACL_DOMAIN);
+
+ {
+ TDispatchOptions options;
options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult);
- tc.Runtime->DispatchEvents(options);
- }
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
+ tc.Runtime->DispatchEvents(options);
+ }
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString());
request->Record.SetUser("client");
- request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec2 = result->Record;
+ request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec2 = result->Record;
UNIT_ASSERT_C(rec2.GetAccess() == NKikimrPQ::EAccess::ALLOWED, rec2);
-
- state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "client@" BUILTIN_ACL_DOMAIN);
-
- {
- TDispatchOptions options;
+
+ state->ACL.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "client@" BUILTIN_ACL_DOMAIN);
+
+ {
+ TDispatchOptions options;
options.FinalEvents.emplace_back(NSchemeShard::TEvSchemeShard::EvDescribeSchemeResult);
- tc.Runtime->DispatchEvents(options);
- }
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
+ tc.Runtime->DispatchEvents(options);
+ }
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString());
request->Record.SetUser("client");
- request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP);
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec3 = result->Record;
- UNIT_ASSERT(rec3.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
+ request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP);
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec3 = result->Record;
+ UNIT_ASSERT(rec3.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
request->Record.SetToken(NACLib::TUserToken("client@" BUILTIN_ACL_DOMAIN, {}).SerializeAsString());
request->Record.SetUser("client2");
request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP);
@@ -387,47 +387,47 @@ Y_UNIT_TEST(TestCheckACL) {
UNIT_ASSERT(rec9.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
request.Reset(new TEvPersQueue::TEvCheckACL());
- // No auth provided and auth for topic not required
- request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP);
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec5 = result->Record;
- UNIT_ASSERT(rec5.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
- // No auth provided and auth for topic not required
- request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec6 = result->Record;
- UNIT_ASSERT(rec6.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
- // No auth provided and auth for topic is required
- request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
+ // No auth provided and auth for topic not required
+ request->Record.SetOperation(NKikimrPQ::EOperation::WRITE_OP);
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec5 = result->Record;
+ UNIT_ASSERT(rec5.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
+ // No auth provided and auth for topic not required
+ request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec6 = result->Record;
+ UNIT_ASSERT(rec6.GetAccess() == NKikimrPQ::EAccess::ALLOWED);
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
+ // No auth provided and auth for topic is required
+ request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
request->Record.SetToken("");
-
+
BalancerPrepare("topic", {{1,{1, 2}}}, ssId, tc, true);
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec7 = result->Record;
- UNIT_ASSERT(rec7.GetAccess() == NKikimrPQ::EAccess::DENIED);
-
- request.Reset(new TEvPersQueue::TEvCheckACL());
- // No auth provided and auth for topic is required
- request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec7 = result->Record;
+ UNIT_ASSERT(rec7.GetAccess() == NKikimrPQ::EAccess::DENIED);
+
+ request.Reset(new TEvPersQueue::TEvCheckACL());
+ // No auth provided and auth for topic is required
+ request->Record.SetOperation(NKikimrPQ::EOperation::READ_OP);
request->Record.SetToken("");
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
- auto& rec8 = result->Record;
- UNIT_ASSERT(rec8.GetAccess() == NKikimrPQ::EAccess::DENIED);
- });
-}
-
-
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvCheckACLResponse>(handle);
+ auto& rec8 = result->Record;
+ UNIT_ASSERT(rec8.GetAccess() == NKikimrPQ::EAccess::DENIED);
+ });
+}
+
+
void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {})
{
IActor* actor = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PERSQUEUE);
@@ -980,28 +980,28 @@ Y_UNIT_TEST(TestAlreadyWritten) {
Y_UNIT_TEST(TestAlreadyWrittenWithoutDeduplication) {
- TTestContext tc;
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
- activeZone = false;
- tc.Runtime->SetScheduledLimit(200);
-
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ tc.Runtime->SetScheduledLimit(200);
+
PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
- TVector<std::pair<ui64, TString>> data;
- activeZone = true;
-
- TString s{32, 'c'};
- ui32 pp = 4 + 8 + 1 + 9;
- data.push_back({2, s.substr(pp)});
- CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
- data[0].first = 1;
- CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 1, false, false, true);
- CmdRead(0, 0, Max<i32>(), Max<i32>(), 2, false, tc, {0, 1});
- });
-}
+ TVector<std::pair<ui64, TString>> data;
+ activeZone = true;
+
+ TString s{32, 'c'};
+ ui32 pp = 4 + 8 + 1 + 9;
+ data.push_back({2, s.substr(pp)});
+ CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
+ data[0].first = 1;
+ CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 1, false, false, true);
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 2, false, tc, {0, 1});
+ });
+}
Y_UNIT_TEST(TestWritePQCompact) {
@@ -1904,14 +1904,14 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
}
Y_UNIT_TEST(TestOffsetEstimation) {
- std::deque<NPQ::TDataKey> container = {
+ std::deque<NPQ::TDataKey> container = {
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 1, 0, 0, 0), 0, TInstant::Seconds(1), 10},
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 2, 0, 0, 0), 0, TInstant::Seconds(1), 10},
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 3, 0, 0, 0), 0, TInstant::Seconds(2), 10},
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 4, 0, 0, 0), 0, TInstant::Seconds(2), 10},
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 5, 0, 0, 0), 0, TInstant::Seconds(3), 10},
{NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, 0, 6, 0, 0, 0), 0, TInstant::Seconds(3), 10},
- };
+ };
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(500), 9999), 1);
@@ -1921,35 +1921,35 @@ Y_UNIT_TEST(TestOffsetEstimation) {
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(2500), 9999), 5);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(3000), 9999), 5);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(3500), 9999), 9999);
-}
-
+}
+
Y_UNIT_TEST(TestMaxTimeLagRewind) {
- TTestContext tc;
-
- RunTestWithReboots(tc.TabletIds, [&]() {
- return tc.InitialEventsFilter.Prepare();
- }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
- TFinalizer finalizer(tc);
- tc.Prepare(dispatchName, setup, activeZone);
-
- tc.Runtime->SetScheduledLimit(200);
-
+ TTestContext tc;
+
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+
+ tc.Runtime->SetScheduledLimit(200);
+
PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{"aaa", true}}, tc);
- activeZone = false;
-
-
- for (int i = 0; i < 5; i++) {
- TVector<std::pair<ui64, TString>> data;
- for (int j = 0; j < 7; j++) {
- data.push_back({7 * i + j + 1, TString(1 * 1024 * 1024, 'a')});
- }
- CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0);
- tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1));
- }
+ activeZone = false;
+
+
+ for (int i = 0; i < 5; i++) {
+ TVector<std::pair<ui64, TString>> data;
+ for (int j = 0; j < 7; j++) {
+ data.push_back({7 * i + j + 1, TString(1 * 1024 * 1024, 'a')});
+ }
+ CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0);
+ tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1));
+ }
auto ts = tc.Runtime->GetCurrentTime();
- CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
- CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000);
- CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000);
+ CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
+ CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000);
+ CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000);
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0, ts.MilliSeconds() - 3 * 60 * 1000);
@@ -1959,9 +1959,9 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{"aaa", true}}, tc, 2, 6*1024*1024, true, ts.MilliSeconds() - 1000);
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34});
- });
-}
-
+ });
+}
+
Y_UNIT_TEST(TestWriteTimeStampEstimate) {
TTestContext tc;
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 4eb9c7206dd..629ad8a1755 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -12,7 +12,7 @@ using namespace NTabletFlatExecutor;
static constexpr TDuration ACL_SUCCESS_RETRY_TIMEOUT = TDuration::Seconds(30);
static constexpr TDuration ACL_ERROR_RETRY_TIMEOUT = TDuration::Seconds(5);
static constexpr TDuration ACL_EXPIRATION_TIMEOUT = TDuration::Minutes(5);
-
+
bool TPersQueueReadBalancer::TTxPreInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_UNUSED(ctx);
NIceDb::TNiceDb(txc.DB).Materialize<Schema>();
@@ -44,7 +44,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
Self->Path = dataRowset.GetValue<Schema::Data::Path>();
Self->Version = dataRowset.GetValue<Schema::Data::Version>();
Self->MaxPartsPerTablet = dataRowset.GetValueOrDefault<Schema::Data::MaxPartsPerTablet>(0);
- Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0);
+ Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0);
Self->NextPartitionId = dataRowset.GetValueOrDefault<Schema::Data::NextPartitionId>(0);
TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>("");
@@ -132,7 +132,7 @@ bool TPersQueueReadBalancer::TTxWrite::Execute(TTransactionContext& txc, const T
NIceDb::TUpdate<Schema::Data::Path>(Self->Path),
NIceDb::TUpdate<Schema::Data::Version>(Self->Version),
NIceDb::TUpdate<Schema::Data::MaxPartsPerTablet>(Self->MaxPartsPerTablet),
- NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId),
+ NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId),
NIceDb::TUpdate<Schema::Data::NextPartitionId>(Self->NextPartitionId),
NIceDb::TUpdate<Schema::Data::Config>(config));
for (auto& p : DeletedPartitions) {
@@ -312,7 +312,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPt
}
-void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const TActorContext &ctx) {
+void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const TActorContext &ctx) {
if (!AppData(ctx)->PQConfig.GetCheckACL()) {
RespondWithACL(ev, NKikimrPQ::EAccess::ALLOWED, "", ctx);
@@ -324,7 +324,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const T
return;
}
- auto& record = ev->Get()->Record;
+ auto& record = ev->Get()->Record;
if (record.GetToken().empty()) {
if (record.GetOperation() == NKikimrPQ::EOperation::WRITE_OP && TabletConfig.GetRequireAuthWrite() ||
@@ -332,40 +332,40 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvCheckACL::TPtr &ev, const T
RespondWithACL(ev, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "topic " << Topic << " requires authentication", ctx);
} else {
RespondWithACL(ev, NKikimrPQ::EAccess::ALLOWED, "", ctx);
- }
+ }
return;
- }
+ }
NACLib::TUserToken token(record.GetToken());
CheckACL(ev, token, ctx);
-}
-
-
-void TPersQueueReadBalancer::RespondWithACL(
- const TEvPersQueue::TEvCheckACL::TPtr &request,
- const NKikimrPQ::EAccess &access,
- const TString &error,
- const TActorContext &ctx) {
- THolder<TEvPersQueue::TEvCheckACLResponse> res{new TEvPersQueue::TEvCheckACLResponse};
- res->Record.SetTopic(Topic);
+}
+
+
+void TPersQueueReadBalancer::RespondWithACL(
+ const TEvPersQueue::TEvCheckACL::TPtr &request,
+ const NKikimrPQ::EAccess &access,
+ const TString &error,
+ const TActorContext &ctx) {
+ THolder<TEvPersQueue::TEvCheckACLResponse> res{new TEvPersQueue::TEvCheckACLResponse};
+ res->Record.SetTopic(Topic);
res->Record.SetPath(Path);
- res->Record.SetAccess(access);
- res->Record.SetError(error);
- ctx.Send(request->Sender, res.Release());
-}
-
-void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx) {
- NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow;
+ res->Record.SetAccess(access);
+ res->Record.SetError(error);
+ ctx.Send(request->Sender, res.Release());
+}
+
+void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx) {
+ NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow;
const auto& record = request->Get()->Record;
switch(record.GetOperation()) {
- case NKikimrPQ::EOperation::READ_OP:
- rights = NACLib::EAccessRights::SelectRow;
- break;
- case NKikimrPQ::EOperation::WRITE_OP:
- rights = NACLib::EAccessRights::UpdateRow;
- break;
- };
-
+ case NKikimrPQ::EOperation::READ_OP:
+ rights = NACLib::EAccessRights::SelectRow;
+ break;
+ case NKikimrPQ::EOperation::WRITE_OP:
+ rights = NACLib::EAccessRights::UpdateRow;
+ break;
+ };
+
TString user = record.HasUser() ? record.GetUser() : "";
if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) {
@@ -377,11 +377,11 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req
}
if (ACL.CheckAccess(rights, token)) {
RespondWithACL(request, NKikimrPQ::EAccess::ALLOWED, "", ctx);
- } else {
+ } else {
RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "access denied for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' : no " << (rights == NACLib::EAccessRights::SelectRow ? "ReadTopic" : "WriteTopic") << " permission" , ctx);
- }
-}
-
+ }
+}
+
void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvWakeupClient::TPtr &ev, const TActorContext& ctx) {
auto jt = ClientsInfo.find(ev->Get()->Client);
if (jt == ClientsInfo.end())
@@ -467,7 +467,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Path = record.GetPath();
TxId = record.GetTxId();
TabletConfig = record.GetTabletConfig();
- SchemeShardId = record.GetSchemeShardId();
+ SchemeShardId = record.GetSchemeShardId();
TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0;
ui32 prevNextPartitionId = NextPartitionId;
NextPartitionId = record.HasNextPartitionId() ? record.GetNextPartitionId() : 0;
@@ -588,7 +588,7 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr&
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
RestartPipe(ev->Get()->TabletId, ctx);
- RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
+ RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
}
@@ -596,7 +596,7 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev,
{
if (ev->Get()->Status != NKikimrProto::OK) {
RestartPipe(ev->Get()->TabletId, ctx);
- RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
+ RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
}
}
@@ -610,9 +610,9 @@ void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContex
}
-void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx)
+void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx)
{
- if ((tabletId == SchemeShardId && !WaitingForACL) ||
+ if ((tabletId == SchemeShardId && !WaitingForACL) ||
(tabletId != SchemeShardId && !WaitingForStat.contains(tabletId)))
return;
@@ -625,11 +625,11 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
} else {
pipeClient = it->second;
}
- if (tabletId == SchemeShardId) {
+ if (tabletId == SchemeShardId) {
NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId));
- } else {
- NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus());
- }
+ } else {
+ NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus());
+ }
}
@@ -671,24 +671,24 @@ void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) {
}
void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ctx);
- if (!WaitingForACL) //ignore if already processed
- return;
- WaitingForACL = false;
+ Y_UNUSED(ctx);
+ if (!WaitingForACL) //ignore if already processed
+ return;
+ WaitingForACL = false;
const auto& record = ev->Get()->GetRecord();
if (record.GetStatus() == NKikimrScheme::EStatus::StatusSuccess) {
- ACL.Clear();
+ ACL.Clear();
Y_PROTOBUF_SUPPRESS_NODISCARD ACL.MutableACL()->ParseFromString(record.GetPathDescription().GetSelf().GetEffectiveACL());
LastACLUpdate = ctx.Now();
- ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerMetadataRetryTimeoutSec()), new TEvPersQueue::TEvUpdateACL());
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerMetadataRetryTimeoutSec()), new TEvPersQueue::TEvUpdateACL());
AnswerWaitingRequests(ctx);
- } else {
+ } else {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "couldn't receive ACL due to " << record.GetStatus());
ctx.Schedule(ACL_ERROR_RETRY_TIMEOUT, new TEvPersQueue::TEvUpdateACL());
- }
-}
-
+ }
+}
+
void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
Y_UNUSED(ctx);
//TODO: Deside about changing number of partitions and send request to SchemeShard
@@ -707,21 +707,21 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) {
bool res = WaitingForStat.insert(tabletId).second;
if (!res) //already asked stat
continue;
- RequestTabletIfNeeded(tabletId, ctx);
+ RequestTabletIfNeeded(tabletId, ctx);
}
}
-void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) {
- if (WaitingForACL) // if there is request infly
- return;
- if (SchemeShardId == 0) {
+void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) {
+ if (WaitingForACL) // if there is request infly
+ return;
+ if (SchemeShardId == 0) {
ctx.Schedule(ACL_SUCCESS_RETRY_TIMEOUT, new TEvPersQueue::TEvUpdateACL());
- } else {
- WaitingForACL = true;
- RequestTabletIfNeeded(SchemeShardId, ctx);
- }
-}
-
+ } else {
+ WaitingForACL = true;
+ RequestTabletIfNeeded(SchemeShardId, ctx);
+ }
+}
+
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx)
{
const TActorId& sender = ev->Get()->ClientId;
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index 6a07edcb784..2e4de2bac53 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -56,7 +56,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
struct Version : Column<36, NScheme::NTypeIds::Uint32> {};
struct Config : Column<40, NScheme::NTypeIds::Utf8> {};
struct MaxPartsPerTablet : Column<41, NScheme::NTypeIds::Uint32> {};
- struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {};
+ struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {};
struct NextPartitionId : Column<43, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Key>;
@@ -180,9 +180,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it
}
- void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) {
- GetACL(ctx);
- }
+ void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) {
+ GetACL(ctx);
+ }
void Die(const TActorContext& ctx) override {
for (auto& pipe : TabletPipes) {
@@ -240,7 +240,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
RegisterEvents.clear();
ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it
- ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL());
+ ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL());
}
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) override;
@@ -256,7 +256,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx);
- void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&);
+ void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&);
void Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPtr&, const TActorContext&);
void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&);
@@ -267,17 +267,17 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
TStringBuilder GetPrefix() const;
- void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&);
+ void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&);
void RestartPipe(const ui64 tabletId, const TActorContext&);
void CheckStat(const TActorContext&);
- void RespondWithACL(
- const TEvPersQueue::TEvCheckACL::TPtr &request,
- const NKikimrPQ::EAccess &access,
- const TString &error,
- const TActorContext &ctx);
- void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx);
+ void RespondWithACL(
+ const TEvPersQueue::TEvCheckACL::TPtr &request,
+ const NKikimrPQ::EAccess &access,
+ const TString &error,
+ const TActorContext &ctx);
+ void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx);
void GetStat(const TActorContext&);
- void GetACL(const TActorContext&);
+ void GetACL(const TActorContext&);
void AnswerWaitingRequests(const TActorContext& ctx);
void Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx);
@@ -298,10 +298,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ui32 Generation;
int Version;
ui32 MaxPartsPerTablet;
- ui64 SchemeShardId;
+ ui64 SchemeShardId;
NKikimrPQ::TPQTabletConfig TabletConfig;
- NACLib::TSecurityObject ACL;
- TInstant LastACLUpdate;
+ NACLib::TSecurityObject ACL;
+ TInstant LastACLUpdate;
THashSet<TString> Consumers;
@@ -419,7 +419,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
THashMap<ui64, TActorId> TabletPipes;
THashSet<ui64> WaitingForStat;
- bool WaitingForACL;
+ bool WaitingForACL;
ui64 TotalAvgSpeedSec;
ui64 MaxAvgSpeedSec;
ui64 TotalAvgSpeedMin;
@@ -444,8 +444,8 @@ public:
, Generation(0)
, Version(-1)
, MaxPartsPerTablet(0)
- , SchemeShardId(0)
- , LastACLUpdate(TInstant::Zero())
+ , SchemeShardId(0)
+ , LastACLUpdate(TInstant::Zero())
, TxId(0)
, NumActiveParts(0)
, MaxIdx(0)
@@ -455,7 +455,7 @@ public:
, TotalGroups(0)
, NoGroupsInBase(true)
, ResourceMetrics(nullptr)
- , WaitingForACL(false)
+ , WaitingForACL(false)
, TotalAvgSpeedSec(0)
, MaxAvgSpeedSec(0)
, TotalAvgSpeedMin(0)
@@ -492,8 +492,8 @@ public:
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvPoisonPill, Handle);
HFunc(TEvents::TEvWakeup, HandleWakeup);
- HFunc(TEvPersQueue::TEvUpdateACL, HandleUpdateACL);
- HFunc(TEvPersQueue::TEvCheckACL, Handle);
+ HFunc(TEvPersQueue::TEvUpdateACL, HandleUpdateACL);
+ HFunc(TEvPersQueue::TEvCheckACL, Handle);
HFunc(TEvPersQueue::TEvGetPartitionIdForWrite, Handle);
HFunc(TEvPersQueue::TEvUpdateBalancerConfig, Handle);
HFunc(TEvPersQueue::TEvWakeupClient, Handle);
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 258e947c467..35a628a9e5e 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -163,7 +163,7 @@ struct TUserInfo {
i64 ReadOffset = -1;
TInstant ReadWriteTimestamp;
TInstant ReadCreateTimestamp;
- ui64 ReadOffsetRewindSum = 0;
+ ui64 ReadOffsetRewindSum = 0;
bool ReadScheduled = false;
@@ -304,7 +304,7 @@ struct TUserInfo {
, ReadOffset(-1)
, ReadWriteTimestamp(TAppData::TimeProvider->Now())
, ReadCreateTimestamp(TAppData::TimeProvider->Now())
- , ReadOffsetRewindSum(readOffsetRewindSum)
+ , ReadOffsetRewindSum(readOffsetRewindSum)
, ReadScheduled(false)
, Important(important)
, ReadFromTimestamp(readFromTimestamp)
diff --git a/ydb/core/protos/auth.proto b/ydb/core/protos/auth.proto
index 1f252b8f711..da88d9b692e 100644
--- a/ydb/core/protos/auth.proto
+++ b/ydb/core/protos/auth.proto
@@ -1,9 +1,9 @@
package NKikimrProto;
-option java_package = "ru.yandex.kikimr.proto";
-
-message TAuthConfig {
- optional TUserRegistryConfig UserRegistryConfig = 1;
- optional TTVMConfig TVMConfig = 2;
+option java_package = "ru.yandex.kikimr.proto";
+
+message TAuthConfig {
+ optional TUserRegistryConfig UserRegistryConfig = 1;
+ optional TTVMConfig TVMConfig = 2;
optional string StaffApiUserToken = 3;
optional string BlackBoxEndpoint = 4 [default = "blackbox.yandex-team.ru"];
optional string AccessServiceEndpoint = 5 [default = "as.private-api.cloud.yandex.net:4286"];
@@ -42,17 +42,17 @@ message TAuthConfig {
optional string PathToRootCA = 60 [default = "/etc/ssl/certs/YandexInternalRootCA.pem"]; // root CA certificate PEM/x509
optional uint32 AccessServiceGrpcKeepAliveTimeMs = 70 [default = 10000]; // CLOUD-27573
optional uint32 AccessServiceGrpcKeepAliveTimeoutMs = 71 [default = 1000]; // CLOUD-27573
-}
-
-message TUserRegistryConfig {
- optional string Query = 1; // Must take uid (ui64) as parameter and return user (utf8)
-}
-
-message TTVMConfig {
- optional bool Enabled = 1;
- optional uint32 ServiceTVMId = 2;
- optional string PublicKeys = 3;
- optional bool UpdatePublicKeys = 4 [default = true];
- optional uint64 UpdatePublicKeysSuccessTimeout = 5 [default = 82800];
- optional uint64 UpdatePublicKeysFailureTimeout = 6 [default = 10];
-}
+}
+
+message TUserRegistryConfig {
+ optional string Query = 1; // Must take uid (ui64) as parameter and return user (utf8)
+}
+
+message TTVMConfig {
+ optional bool Enabled = 1;
+ optional uint32 ServiceTVMId = 2;
+ optional string PublicKeys = 3;
+ optional bool UpdatePublicKeys = 4 [default = true];
+ optional uint64 UpdatePublicKeysSuccessTimeout = 5 [default = 82800];
+ optional uint64 UpdatePublicKeysFailureTimeout = 6 [default = 10];
+}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index d64169d4fc0..177350bd296 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1357,7 +1357,7 @@ message TAppConfig {
optional NKikimrCms.TCmsConfig CmsConfig = 25;
optional TFeatureFlags FeatureFlags = 26;
optional TSqsConfig SqsConfig = 27;
- optional NKikimrPQ.TPQConfig PQConfig = 28;
+ optional NKikimrPQ.TPQConfig PQConfig = 28;
optional NKikimrTenantPool.TTenantPoolConfig TenantPoolConfig = 29;
optional NKikimrProto.TAuthConfig AuthConfig = 30;
optional NKikimrTenantSlotBroker.TConfig TenantSlotBrokerConfig = 32;
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index 849b292383a..bfa2e43b944 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -15,7 +15,7 @@ message TPersQueuePartitionRequest {
optional int32 Count = 4; //optional
optional int32 Bytes = 5; //optional, max value = 25Mb
optional int32 TimeoutMs = 6; //ms, default = 1000
- optional int32 MaxTimeLagMs = 8; // optional, default = infinity, why we use int instead of uint?
+ optional int32 MaxTimeLagMs = 8; // optional, default = infinity, why we use int instead of uint?
optional uint64 ReadTimestampMs = 9; //optional, default = 0
optional bool MirrorerRequest = 10 [default = false];
@@ -60,7 +60,7 @@ message TPersQueuePartitionRequest {
optional int32 TotalParts = 6; //fill it for first part of multi-part message
optional int32 TotalSize = 7; // fill it for first part of multi-part message
optional int64 CreateTimeMS = 8; //mandatory
- optional bool DisableDeduplication = 9 [ default = false ];
+ optional bool DisableDeduplication = 9 [ default = false ];
optional int64 WriteTimeMS = 10; //for mirroring only
optional int32 UncompressedSize = 12; //fill it for all parts
@@ -203,7 +203,7 @@ message TPersQueueFetchRequest {
optional int32 Partition = 2; // must be set
optional int64 Offset = 3; // must be set
optional int32 MaxBytes = 4; // must be set
- optional uint64 ReadTimestampMs = 5; //optional, default = 0
+ optional uint64 ReadTimestampMs = 5; //optional, default = 0
}
repeated TPartitionInfo Partition = 1;
optional int32 TotalMaxBytes = 2; //must be set
@@ -221,7 +221,7 @@ message TPersQueueRequest {
//only one from data, meta or fetch request must be set.
optional string SecurityToken = 5;
- optional string Ticket = 6; //if set, check for acl
+ optional string Ticket = 6; //if set, check for acl
optional string RequestId = 100; //for logging
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 7c859274496..47ddb259e52 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -19,13 +19,13 @@ message TPartitionMeta {
optional uint64 EndOffset = 2;
}
-message TPQConfig {
- optional uint32 ACLRetryTimeoutSec = 1 [default = 300];
+message TPQConfig {
+ optional uint32 ACLRetryTimeoutSec = 1 [default = 300];
optional uint32 BalancerMetadataRetryTimeoutSec = 2 [default = 240];
optional uint32 MaxBlobsPerLevel = 3 [default = 64]; // will produce 8mb blobs at last level
//32 => 1mb blobs at last level
optional uint32 MaxBlobSize = 4 [default = 8388608]; //8mb
- optional uint32 ClustersUpdateTimeoutSec = 5 [default = 30];
+ optional uint32 ClustersUpdateTimeoutSec = 5 [default = 30];
optional bool Enabled = 6 [default = false]; // Enable PQ proxies
@@ -36,7 +36,7 @@ message TPQConfig {
optional bool CheckACL = 9 [default = false];
optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds.
- optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days
+ optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days
optional uint32 SourceIdTotalShardsCount = 12 [default = 131072];
optional NKikimrClient.TKeyValueRequest.ETactic Tactic = 13 [default = MAX_THROUGHPUT];
@@ -142,8 +142,8 @@ message TPQConfig {
optional uint32 MetaCacheRefreshIntervalMilliSeconds = 41 [default = 10000];
optional bool MetaCacheSkipVersionCheck = 42 [default = false];
-}
-
+}
+
message TChannelProfile {
optional string PoolKind = 1;
optional uint64 Size = 2; // size in bytes
@@ -231,12 +231,12 @@ message TPQTabletConfig {
optional string TopicName = 4; // also filled by schemeshard
optional uint32 Version = 5; //also filled by schemeshard
optional bool LocalDC = 6 [default = false];
- optional bool RequireAuthWrite = 7 [default = false];
- optional bool RequireAuthRead = 8 [default = false];
- optional string Producer = 9;
- optional string Ident = 10;
- optional string Topic = 11;
- optional string DC = 12;
+ optional bool RequireAuthWrite = 7 [default = false];
+ optional bool RequireAuthRead = 8 [default = false];
+ optional string Producer = 9;
+ optional string Ident = 10;
+ optional string Topic = 11;
+ optional string DC = 12;
// ReadRules, ReadTopicTimestampMs, ReadRuleVersions, ConsumerFormatVersions and ConsumersCodecs form a consumer data array stored by columns
repeated string ReadRules = 13;
@@ -316,7 +316,7 @@ message TUpdateBalancerConfig { //for schemeshard use only
optional TPQTabletConfig TabletConfig = 7;
optional uint32 PartitionPerTablet = 8;
- optional uint64 SchemeShardId = 9;
+ optional uint64 SchemeShardId = 9;
message TPartition {
optional uint32 Partition = 1;
optional uint64 TabletId = 2;
@@ -342,7 +342,7 @@ message TDescribeResponse {
optional uint32 Version = 2;
optional TPQTabletConfig Config = 3;
optional uint32 PartitionPerTablet = 4;
- optional uint64 SchemeShardId = 6;
+ optional uint64 SchemeShardId = 6;
optional uint64 BalancerTabletId = 7;
optional bytes SecurityObject = 8; //NACLibProto.TSecurityObject
@@ -355,32 +355,32 @@ message TDescribeResponse {
repeated TPartition Partitions = 5;
}
-message TCheckACL {
- optional EOperation Operation = 2;
+message TCheckACL {
+ optional EOperation Operation = 2;
optional NPersQueueCommon.Credentials Auth = 4; //leaved for compatibility
optional string User = 5;
optional bytes Token = 6;
-}
-
-message TCheckACLResponse {
- optional EAccess Access = 1;
- optional string Topic = 2;
+}
+
+message TCheckACLResponse {
+ optional EAccess Access = 1;
+ optional string Topic = 2;
optional string Path = 3;
- optional string Error = 4;
-}
-
-enum EOperation {
- READ_OP = 1;
- WRITE_OP = 2;
-}
-
-enum EAccess {
- ALLOWED = 1;
- DENIED = 2;
- UNKNOWN = 3;
-}
-
+ optional string Error = 4;
+}
+
+enum EOperation {
+ READ_OP = 1;
+ WRITE_OP = 2;
+}
+
+enum EAccess {
+ ALLOWED = 1;
+ DENIED = 2;
+ UNKNOWN = 3;
+}
+
message TGetPartitionIdForWrite {
}
@@ -655,15 +655,15 @@ message TBatchHeader {
optional bool HasKinesis = 8;
}
-message TUserInfo {
- optional uint64 Offset = 1;
- optional uint32 Generation = 2;
- optional uint32 Step = 3;
- optional string Session = 4;
- optional uint64 OffsetRewindSum = 5;
+message TUserInfo {
+ optional uint64 Offset = 1;
+ optional uint32 Generation = 2;
+ optional uint32 Step = 3;
+ optional string Session = 4;
+ optional uint64 OffsetRewindSum = 5;
optional uint64 ReadRuleGeneration = 6;
-}
-
+}
+
message TPartitionClientInfo {
repeated int32 Partitions = 1;
}
diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make
index 70bb65514cd..58387360879 100644
--- a/ydb/core/protos/ya.make
+++ b/ydb/core/protos/ya.make
@@ -1,5 +1,5 @@
PROTO_LIBRARY()
-
+
GRPC()
OWNER(
@@ -119,7 +119,7 @@ SRCS(
tx_sequenceshard.proto
pdiskfit.proto
pqconfig.proto
- auth.proto
+ auth.proto
key.proto
grpc.proto
grpc_pq_old.proto
diff --git a/ydb/core/security/ticket_parser.cpp b/ydb/core/security/ticket_parser.cpp
index c7d024d03dc..89e0ca3ddfe 100644
--- a/ydb/core/security/ticket_parser.cpp
+++ b/ydb/core/security/ticket_parser.cpp
@@ -183,17 +183,17 @@ class TTicketParser : public TActorBootstrapped<TTicketParser> {
.AuthType = record.GetAuthType()
}), ctx);
CounterTicketsBuiltin->Inc();
- return;
- }
+ return;
+ }
if(record.Ticket.EndsWith("@" BUILTIN_ERROR_DOMAIN)) {
record.TokenType = ETokenType::Builtin;
SetError(key, record, {"Builtin error simulation"}, ctx);
CounterTicketsBuiltin->Inc();
- return;
- }
- }
-
+ return;
+ }
+ }
+
if (UseLoginProvider && (record.TokenType == ETokenType::Unknown || record.TokenType == ETokenType::Login)) {
TString database = Config.GetDomainLoginOnly() ? DomainName : record.Database;
auto itLoginProvider = LoginProviders.find(database);
diff --git a/ydb/core/security/ticket_parser_ut.cpp b/ydb/core/security/ticket_parser_ut.cpp
index 2b624efefe3..5ad6523a4ed 100644
--- a/ydb/core/security/ticket_parser_ut.cpp
+++ b/ydb/core/security/ticket_parser_ut.cpp
@@ -4,13 +4,13 @@
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
-
-#include "ticket_parser.h"
-
-namespace NKikimr {
-
+
+#include "ticket_parser.h"
+
+namespace NKikimr {
+
Y_UNIT_TEST_SUITE(TTicketParserTest) {
-
+
Y_UNIT_TEST(LoginGood) {
using namespace Tests;
TPortManager tp;
@@ -133,5 +133,5 @@ Y_UNIT_TEST_SUITE(TTicketParserTest) {
UNIT_ASSERT(!result->Error.empty());
UNIT_ASSERT_EQUAL(result->Error.Message, "Token is not in correct format");
}
-}
-}
+}
+}
diff --git a/ydb/core/security/ut/ya.make b/ydb/core/security/ut/ya.make
index 24ae1eaca39..bcfd69c1a4b 100644
--- a/ydb/core/security/ut/ya.make
+++ b/ydb/core/security/ut/ya.make
@@ -1,24 +1,24 @@
UNITTEST_FOR(ydb/core/security)
-
+
OWNER(
g:kikimr
g:logbroker
)
-FORK_SUBTESTS()
-
+FORK_SUBTESTS()
+
TIMEOUT(600)
SIZE(MEDIUM)
-PEERDIR(
+PEERDIR(
ydb/core/testlib
-)
-
+)
+
YQL_LAST_ABI_VERSION()
-SRCS(
- ticket_parser_ut.cpp
-)
-
-END()
+SRCS(
+ ticket_parser_ut.cpp
+)
+
+END()
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index d4907c26f1e..ec0787b33e3 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -588,7 +588,7 @@ namespace Tests {
{
IActor* ticketParser = Settings->CreateTicketParser(Settings->AuthConfig);
TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx);
- Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx);
+ Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx);
}
{
@@ -610,7 +610,7 @@ namespace Tests {
nullptr);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
- }
+ }
{
IActor* txProxy = CreateTxProxy(Runtime->GetTxAllocatorTabletIds());
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 2064752ab45..f0890cb60f9 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -90,7 +90,7 @@ namespace Tests {
ui16 Port;
ui16 GrpcPort = 0;
NKikimrProto::TAuthConfig AuthConfig;
- NKikimrPQ::TPQConfig PQConfig;
+ NKikimrPQ::TPQConfig PQConfig;
NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig;
NKikimrNetClassifier::TNetClassifierConfig NetClassifierConfig;
ui32 Domain = TestDomain;
@@ -181,8 +181,8 @@ namespace Tests {
explicit TServerSettings(ui16 port, const NKikimrProto::TAuthConfig authConfig = {}, const NKikimrPQ::TPQConfig pqConfig = {})
: Port(port)
- , AuthConfig(authConfig)
- , PQConfig(pqConfig)
+ , AuthConfig(authConfig)
+ , PQConfig(pqConfig)
{
AddStoragePool("test", "/" + DomainName + ":test");
}
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index f0599232508..225f86d75a0 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -566,7 +566,7 @@ public:
TFsPath fsPath(path);
CreateTable(fsPath.Dirname(),
"Name: \"" + fsPath.Basename() + "\""
- "Columns { Name: \"Hash\" Type: \"Uint32\"}"
+ "Columns { Name: \"Hash\" Type: \"Uint32\"}"
"Columns { Name: \"SourceId\" Type: \"Utf8\"}"
"Columns { Name: \"Topic\" Type: \"Utf8\"}"
"Columns { Name: \"Partition\" Type: \"Uint32\"}"
@@ -577,13 +577,13 @@ public:
}
void InsertSourceId(ui32 hash, TString sourceId, ui64 accessTime, const TString& path = "/Root/PQ/SourceIdMeta2") {
- TString query =
- "DECLARE $Hash AS Uint32; "
- "DECLARE $SourceId AS Utf8; "
- "DECLARE $AccessTime AS Uint64; "
+ TString query =
+ "DECLARE $Hash AS Uint32; "
+ "DECLARE $SourceId AS Utf8; "
+ "DECLARE $AccessTime AS Uint64; "
"UPSERT INTO [" + path + "] (Hash, SourceId, Topic, Partition, CreateTime, AccessTime) "
"VALUES($Hash, $SourceId, \"1\", 0, 0, $AccessTime); ";
-
+
NYdb::TParamsBuilder builder;
auto params = builder
.AddParam("$Hash").Uint32(hash).Build()
@@ -592,25 +592,25 @@ public:
.Build();
RunYqlDataQueryWithParams(query, params);
- }
-
+ }
+
THashMap<TString, TInstant> ListSourceIds(const TString& path = "/Root/PQ/SourceIdMeta2") {
auto result = RunYqlDataQuery("SELECT SourceId, AccessTime FROM [" + path + "];");
NYdb::TResultSetParser parser(*result);
- THashMap<TString, TInstant> sourceIds;
+ THashMap<TString, TInstant> sourceIds;
while(parser.TryNextRow()) {
TString sourceId = *parser.ColumnParser("SourceId").GetOptionalUtf8();
TInstant accessTime = TInstant::MilliSeconds(*parser.ColumnParser("AccessTime").GetOptionalUint64());
- sourceIds[sourceId] = accessTime;
- }
- return sourceIds;
- }
-
+ sourceIds[sourceId] = accessTime;
+ }
+ return sourceIds;
+ }
+
void InitDCs(THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST, const TString& localCluster = TString()) {
MkDir("/Root/PQ", "Config");
- MkDir("/Root/PQ/Config", "V2");
+ MkDir("/Root/PQ/Config", "V2");
RunYqlSchemeQuery(R"___(
- CREATE TABLE [/Root/PQ/Config/V2/Cluster] (
+ CREATE TABLE [/Root/PQ/Config/V2/Cluster] (
name Utf8,
balancer Utf8,
local Bool,
@@ -1263,7 +1263,7 @@ public:
UNIT_ASSERT(tt.GetErrorCode() == (ui32)NPersQueue::NErrorCode::OK);
}
}
- return response->Record;
+ return response->Record;
}
diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto
index 8b2a0986314..5b1d8a2794d 100644
--- a/ydb/public/api/protos/draft/persqueue_error_codes.proto
+++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto
@@ -13,7 +13,7 @@ enum EErrorCode {
WRITE_ERROR_PARTITION_IS_FULL = 5;
WRITE_ERROR_DISK_IS_FULL = 15;
- WRITE_ERROR_BAD_OFFSET = 19;
+ WRITE_ERROR_BAD_OFFSET = 19;
CREATE_SESSION_ALREADY_LOCKED = 6;
DELETE_SESSION_NO_SESSION = 7;
@@ -32,9 +32,9 @@ enum EErrorCode {
UNKNOWN_TOPIC = 17;
- ACCESS_DENIED = 18;
- CLUSTER_DISABLED = 20;
-
+ ACCESS_DENIED = 18;
+ CLUSTER_DISABLED = 20;
+
WRONG_PARTITION_NUMBER = 21;
CREATE_TIMEOUT = 22; // TODO: move to pqlib codes
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 65c8aca1db2..07381627a6a 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -424,7 +424,7 @@ struct TEvPQProxy {
struct TEvDieCommand : public NActors::TEventLocal<TEvDieCommand, EvDieCommand> {
TEvDieCommand(const TString& reason, const PersQueue::ErrorCode::ErrorCode errorCode)
: Reason(reason)
- , ErrorCode(errorCode)
+ , ErrorCode(errorCode)
{ }
TString Reason;
@@ -648,20 +648,20 @@ private:
TInstant LastACLCheckTimestamp;
TInstant LogSessionDeadline;
- ui64 BalancerTabletId;
+ ui64 BalancerTabletId;
TActorId PipeToBalancer;
-
+
// PQ tablet configuration that we get at the time of session initialization
NKikimrPQ::TPQTabletConfig InitialPQTabletConfig;
NKikimrPQClient::TDataChunk InitMeta;
- TString LocalDC;
+ TString LocalDC;
TString ClientDC;
- TString SelectSourceIdQuery;
- TString UpdateSourceIdQuery;
- TInstant LastSourceIdUpdate;
- ui64 SourceIdCreateTime;
- bool SourceIdUpdateInfly;
+ TString SelectSourceIdQuery;
+ TString UpdateSourceIdQuery;
+ TInstant LastSourceIdUpdate;
+ ui64 SourceIdCreateTime;
+ bool SourceIdUpdateInfly;
TVector<NPQ::TLabelsInfo> Aggr;
NKikimr::NPQ::TMultiCounter SLITotal;
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 654edfcfcd4..50e2355ebfc 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -783,8 +783,8 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false);
SLITotal.Inc();
-}
-
+}
+
void TReadSessionActor::RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx)
{
@@ -808,8 +808,8 @@ void TReadSessionActor::RegisterSession(const TActorId& pipe, const TString& top
void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
InitDone = true;
- for (auto& t : Topics) {
- auto& topic = t.first;
+ for (auto& t : Topics) {
+ auto& topic = t.first;
RegisterSession(t.second.PipeClient, topic, t.second.Groups, ctx);
NumPartitionsFromTopic[t.second.TopicNameConverter->GetClientsideName()] = 0;
}
@@ -1647,7 +1647,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TA
void TReadSessionActor::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) {
- CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
+ CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
}
@@ -2455,7 +2455,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
}
if (req->MaxTimeLagMs) {
read->SetMaxTimeLagMs(req->MaxTimeLagMs);
- }
+ }
if (req->ReadTimestampMs) {
read->SetReadTimestampMs(req->ReadTimestampMs);
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp
index 5f17a2f3aba..cb80a604c57 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp
@@ -49,7 +49,7 @@ void TPQWriteService::Bootstrap(const TActorContext& ctx) {
Become(&TThis::StateFunc);
}
-
+
ui64 TPQWriteService::NextCookie() {
return ++LastCookie;
}
@@ -214,20 +214,20 @@ bool TPQWriteService::TooMuchSessions() {
TString TPQWriteService::AvailableLocalCluster(const TActorContext&) const {
return HaveClusters && Enabled ? *LocalCluster : "";
-}
-
-
-
-
+}
+
+
+
+
///////////////////////////////////////////////////////////////////////////////
+}
}
}
-}
-
-
+
+
void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQWriteServiceActorID(), ev->Release().Release());
-}
+}
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h
index 73148388b29..75d1db6068e 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.h
+++ b/ydb/services/persqueue_v1/grpc_pq_write.h
@@ -75,10 +75,10 @@ private:
ui32 MaxSessions;
TMaybe<TString> LocalCluster;
- bool Enabled;
- TString SelectSourceIdQuery;
- TString UpdateSourceIdQuery;
- TString DeleteSourceIdQuery;
+ bool Enabled;
+ TString SelectSourceIdQuery;
+ TString UpdateSourceIdQuery;
+ TString DeleteSourceIdQuery;
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; // Detects client's datacenter by IP. May be null
bool HaveClusters;
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 3038cc82a61..39074ad1146 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -11,7 +11,7 @@
#include <ydb/services/lib/sharding/sharding.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/digest/md5/md5.h>
-#include <util/string/hex.h>
+#include <util/string/hex.h>
#include <util/string/vector.h>
#include <util/string/escape.h>
#include <util/string/printf.h>
@@ -81,8 +81,8 @@ using namespace Ydb::PersQueue::V1;
static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
static const ui32 MAX_BYTES_INFLIGHT = 1 << 20; //1mb
-static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c;
-static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
+static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c;
+static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
static const TString SELECT_SOURCEID_QUERY1 =
"--!syntax_v1\n"
@@ -138,11 +138,11 @@ TWriteSessionActor::TWriteSessionActor(
, RequestNotChecked(false)
, LastACLCheckTimestamp(TInstant::Zero())
, LogSessionDeadline(TInstant::Zero())
- , BalancerTabletId(0)
+ , BalancerTabletId(0)
, ClientDC(clientDC ? *clientDC : "other")
- , LastSourceIdUpdate(TInstant::Zero())
- , SourceIdCreateTime(0)
- , SourceIdUpdateInfly(false)
+ , LastSourceIdUpdate(TInstant::Zero())
+ , SourceIdCreateTime(0)
+ , SourceIdUpdateInfly(false)
{
Y_ASSERT(Request);
++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true));
@@ -287,8 +287,8 @@ void TWriteSessionActor::CheckACL(const TActorContext& ctx) {
Token->GetUserSID().c_str());
CloseSession(errorReason, PersQueue::ErrorCode::ACCESS_DENIED, ctx);
}
-}
-
+}
+
void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActorContext& ctx) {
THolder<TEvPQProxy::TEvWriteInit> event(ev->Release());
@@ -324,7 +324,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
return;
}
EscapedSourceId = HexEncode(encodedSourceId);
-
+
TString s = TopicConverter->GetClientsideName() + encodedSourceId;
Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED);
@@ -534,11 +534,11 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- NClient::TParameters parameters;
- parameters["$Hash"] = Hash;
+ NClient::TParameters parameters;
+ parameters["$Hash"] = Hash;
parameters["$Topic"] = TopicConverter->GetClientsideName();
- parameters["$SourceId"] = EscapedSourceId;
- ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
+ parameters["$SourceId"] = EscapedSourceId;
+ ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
State = ES_WAIT_TABLE_REQUEST_1;
}
@@ -588,18 +588,18 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
TStringBuilder errorReason;
- errorReason << "internal error in kqp Marker# PQ50 : " << record;
- if (State == EState::ES_INITED) {
- LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason);
- SourceIdUpdateInfly = false;
- } else {
+ errorReason << "internal error in kqp Marker# PQ50 : " << record;
+ if (State == EState::ES_INITED) {
+ LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason);
+ SourceIdUpdateInfly = false;
+ } else {
CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
- }
+ }
return;
}
- if (State == EState::ES_WAIT_TABLE_REQUEST_1) {
- SourceIdCreateTime = TInstant::Now().MilliSeconds();
+ if (State == EState::ES_WAIT_TABLE_REQUEST_1) {
+ SourceIdCreateTime = TInstant::Now().MilliSeconds();
bool partitionFound = false;
auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
@@ -614,19 +614,19 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
return;
}
partitionFound = true;
- SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
+ SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
}
}
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
<< SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions "
- << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
+ << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
if (!partitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) {
Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : Hash % PartitionToTablet.size(); //choose partition default value
partitionFound = true;
}
-
+
if (partitionFound) {
UpdatePartition(ctx);
} else {
@@ -634,11 +634,11 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
}
return;
} else if (State == EState::ES_WAIT_TABLE_REQUEST_2) {
- LastSourceIdUpdate = ctx.Now();
+ LastSourceIdUpdate = ctx.Now();
ProceedPartition(Partition, ctx);
- } else if (State == EState::ES_INITED) {
- SourceIdUpdateInfly = false;
- LastSourceIdUpdate = ctx.Now();
+ } else if (State == EState::ES_INITED) {
+ SourceIdUpdateInfly = false;
+ LastSourceIdUpdate = ctx.Now();
} else {
Y_FAIL("Wrong state");
}
@@ -648,7 +648,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
const NActors::TActorContext& ctx
) {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
@@ -661,19 +661,19 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- NClient::TParameters parameters;
- parameters["$Hash"] = Hash;
+ NClient::TParameters parameters;
+ parameters["$Hash"] = Hash;
parameters["$Topic"] = TopicConverter->GetClientsideName();
- parameters["$SourceId"] = EscapedSourceId;
- parameters["$CreateTime"] = SourceIdCreateTime;
- parameters["$AccessTime"] = TInstant::Now().MilliSeconds();
- parameters["$Partition"] = Partition;
- ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
-
- return ev;
-}
-
-
+ parameters["$SourceId"] = EscapedSourceId;
+ parameters["$CreateTime"] = SourceIdCreateTime;
+ parameters["$AccessTime"] = TInstant::Now().MilliSeconds();
+ parameters["$Partition"] = Partition;
+ ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
+
+ return ev;
+}
+
+
void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record;
@@ -1112,8 +1112,8 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont
if (!dataCheck(writeRequest, messageIndex)) {
return;
}
- }
-
+ }
+
THolder<TEvPQProxy::TEvWrite> event(ev->Release());
Writes.push_back(std::move(event));
@@ -1141,7 +1141,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont
}
void TWriteSessionActor::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) {
- CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
+ CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
}
void TWriteSessionActor::LogSession(const TActorContext& ctx) {
@@ -1161,9 +1161,9 @@ void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) {
// ToDo[migration] - separate flag for having config tables
if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SourceIdUpdateInfly && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD) {
auto ev = MakeUpdateSourceIdMetadataRequest(ctx);
- SourceIdUpdateInfly = true;
+ SourceIdUpdateInfly = true;
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
+ }
if (ctx.Now() >= LogSessionDeadline) {
LogSession(ctx);
}