diff options
author | leonidvasilev <leonidvasilev@yandex-team.ru> | 2022-02-10 16:50:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:41 +0300 |
commit | 4d9cecd757702b2b5e853002de94e2cdbae1ca83 (patch) | |
tree | dc8dc091b00fab2f43a09c9a989cc2e1a095b4ce | |
parent | 8e42621c4c7f215397ef2f1e53a09366ff2634f1 (diff) | |
download | ydb-4d9cecd757702b2b5e853002de94e2cdbae1ca83.tar.gz |
Restoring authorship annotation for <leonidvasilev@yandex-team.ru>. Commit 1 of 2.
27 files changed, 1051 insertions, 1051 deletions
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp index 0e1dbd0031..4eb7d7c2fc 100644 --- a/library/cpp/actors/core/process_stats.cpp +++ b/library/cpp/actors/core/process_stats.cpp @@ -6,7 +6,7 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> -#include <util/datetime/uptime.h> +#include <util/datetime/uptime.h> #include <util/system/defaults.h> #include <util/stream/file.h> #include <util/string/vector.h> @@ -74,8 +74,8 @@ namespace NActors { Stime /= tickPerMillisec; CUtime /= tickPerMillisec; CStime /= tickPerMillisec; - SystemUptime = ::Uptime(); - Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / TicksPerMillisec()); + SystemUptime = ::Uptime(); + Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / TicksPerMillisec()); } TFileInput statm("/proc/" + strPid + "/statm"); @@ -196,9 +196,9 @@ namespace { SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true); MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true); MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true); - UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false); + UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false); NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false); - SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false); + SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false); } void UpdateCounters(const TProcStat& procStat) { @@ -212,9 +212,9 @@ namespace { *SysTime = procStat.Stime; *MinorPageFaults = procStat.MinFlt; *MajorPageFaults = procStat.MajFlt; - *UptimeSeconds = procStat.Uptime.Seconds(); + *UptimeSeconds = procStat.Uptime.Seconds(); *NumThreads = procStat.NumThreads; - *SystemUptimeSeconds = procStat.Uptime.Seconds(); + *SystemUptimeSeconds = procStat.Uptime.Seconds(); } private: @@ -227,9 +227,9 @@ namespace { NMonitoring::TDynamicCounters::TCounterPtr SysTime; NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults; NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults; - NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds; + NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds; NMonitoring::TDynamicCounters::TCounterPtr NumThreads; - NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds; + NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds; }; @@ -243,9 +243,9 @@ namespace { AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}}); FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}}); CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}}); - UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}}); + UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}}); NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}}); - SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}}); + SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}}); UserTime = registry.Rate({{"sensor", "process.UserTime"}}); SysTime = registry.Rate({{"sensor", "process.SystemTime"}}); @@ -258,9 +258,9 @@ namespace { AnonRssSize->Set(procStat.AnonRss); FileRssSize->Set(procStat.FileRss); CGroupMemLimit->Set(procStat.CGroupMemLim); - UptimeSeconds->Set(procStat.Uptime.Seconds()); + UptimeSeconds->Set(procStat.Uptime.Seconds()); NumThreads->Set(procStat.NumThreads); - SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds()); + SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds()); // it is ok here to reset and add metric value, because mutation // is performed in siglethreaded context @@ -287,9 +287,9 @@ namespace { NMonitoring::TRate* SysTime; NMonitoring::TRate* MinorPageFaults; NMonitoring::TRate* MajorPageFaults; - NMonitoring::TIntGauge* UptimeSeconds; + NMonitoring::TIntGauge* UptimeSeconds; NMonitoring::TIntGauge* NumThreads; - NMonitoring::TIntGauge* SystemUptimeSeconds; + NMonitoring::TIntGauge* SystemUptimeSeconds; }; } // namespace diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp index 7125f9d8b0..ae0860e1e3 100644 --- a/library/cpp/actors/http/http.cpp +++ b/library/cpp/actors/http/http.cpp @@ -45,7 +45,7 @@ template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { r template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; } template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; } template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; } -template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; } +template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; } const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = { { THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection }, @@ -53,7 +53,7 @@ const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse:: { THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength }, { THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding }, { THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified }, - { THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding } + { THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding } }; void THttpRequest::Clear() { diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 96c5c1ec48..6e212500d7 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -141,7 +141,7 @@ public: TStringBuf ContentLength; TStringBuf TransferEncoding; TStringBuf LastModified; - TStringBuf ContentEncoding; + TStringBuf ContentEncoding; TStringBuf Body; diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 4c922f8d0f..870199ce0f 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -83,17 +83,17 @@ Y_UNIT_TEST_SUITE(HttpProxy) { UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test."); } - Y_UNIT_TEST(CreateRepsonseWithCompressedBody) { - NHttp::THttpIncomingRequestPtr request = nullptr; - NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK"); - response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip"); - TString compressedBody = "compressed body"; - response->SetBody(compressedBody); - UNIT_ASSERT_VALUES_EQUAL("gzip", response->ContentEncoding); - UNIT_ASSERT_VALUES_EQUAL(ToString(compressedBody.size()), response->ContentLength); - UNIT_ASSERT_VALUES_EQUAL(compressedBody, response->Body); - } - + Y_UNIT_TEST(CreateRepsonseWithCompressedBody) { + NHttp::THttpIncomingRequestPtr request = nullptr; + NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK"); + response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip"); + TString compressedBody = "compressed body"; + response->SetBody(compressedBody); + UNIT_ASSERT_VALUES_EQUAL("gzip", response->ContentEncoding); + UNIT_ASSERT_VALUES_EQUAL(ToString(compressedBody.size()), response->ContentLength); + UNIT_ASSERT_VALUES_EQUAL(compressedBody, response->Body); + } + Y_UNIT_TEST(BasicPartialParsing) { NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n"); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 43136c2f4a..e288b64a0f 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -85,7 +85,7 @@ using TEvKikhouseDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvKikh using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>; using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>; using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>; -using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>; +using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>; using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>; diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto index ed273e1404..4399c0f15b 100755 --- a/ydb/core/protos/grpc_pq_old.proto +++ b/ydb/core/protos/grpc_pq_old.proto @@ -32,7 +32,7 @@ message TDataChunk { optional EChunkType ChunkType = 9 [default = REGULAR]; - optional int64 Codec = 10; + optional int64 Codec = 10; optional TMapType ExtraFields = 126; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 7c85927449..350f54abf0 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -35,7 +35,7 @@ message TPQConfig { optional bool CheckACL = 9 [default = false]; - optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds. + optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds. optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days optional uint32 SourceIdTotalShardsCount = 12 [default = 131072]; @@ -77,9 +77,9 @@ message TPQConfig { } optional TQuotingConfig QuotingConfig = 18; - // Time duration that we wait before we consider remote cluster enabled for load balancing purposes - optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes - optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes + // Time duration that we wait before we consider remote cluster enabled for load balancing purposes + optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes + optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes optional bool RoundRobinPartitionMapping = 26 [default = true]; diff --git a/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto b/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto index 873ee38b3c..0cee327235 100644 --- a/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto @@ -32,7 +32,7 @@ service PersQueueService { * <---------------- */ - rpc StreamingWrite(stream StreamingWriteClientMessage) returns (stream StreamingWriteServerMessage); + rpc StreamingWrite(stream StreamingWriteClientMessage) returns (stream StreamingWriteServerMessage); /** * Creates Read Session diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index c6658305c2..85fcd4cecf 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -38,6 +38,6 @@ enum ErrorCode { CLUSTER_DISABLED = 500020; WRONG_PARTITION_NUMBER = 500021; - PREFERRED_CLUSTER_MISMATCHED = 500022; + PREFERRED_CLUSTER_MISMATCHED = 500022; ERROR = 500100; } diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index 93a7fb6c79..7bb35d73bf 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -5,17 +5,17 @@ import "ydb/public/api/protos/ydb_status_codes.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/public/api/protos/annotations/validation.proto"; -package Ydb.PersQueue.V1; +package Ydb.PersQueue.V1; option java_package = "com.yandex.ydb.persqueue"; - + option cc_enable_arenas = true; -// NOTE: -// * We use 'ms' suffix instead of google.protobuf.Timestamp and google.protobuf.Duration in order to utilize -// packed encoding ('message' types can't be packed encoded). In non-repeated fields we use 'ms' for consistency. -// * Any message with non-empty 'issues' property leads to streaming RPC termination. - +// NOTE: +// * We use 'ms' suffix instead of google.protobuf.Timestamp and google.protobuf.Duration in order to utilize +// packed encoding ('message' types can't be packed encoded). In non-repeated fields we use 'ms' for consistency. +// * Any message with non-empty 'issues' property leads to streaming RPC termination. + enum Codec { CODEC_UNSPECIFIED = 0; CODEC_RAW = 1; @@ -24,189 +24,189 @@ enum Codec { CODEC_ZSTD = 4; } -message SessionMetaValue { - map<string, string> value = 1; +message SessionMetaValue { + map<string, string> value = 1; } -/** - * Represents range [start_offset, end_offset). - */ -message OffsetsRange { - int64 start_offset = 1; - int64 end_offset = 2; +/** + * Represents range [start_offset, end_offset). + */ +message OffsetsRange { + int64 start_offset = 1; + int64 end_offset = 2; } /** - * Request for write session. Contains one of: - * InitRequest - handshake request. - * WriteRequest - portion of data to be written. - * UpdateTokenRequest - user credentials if update is needed. + * Request for write session. Contains one of: + * InitRequest - handshake request. + * WriteRequest - portion of data to be written. + * UpdateTokenRequest - user credentials if update is needed. */ -message StreamingWriteClientMessage { - oneof client_message { - InitRequest init_request = 1; - WriteRequest write_request = 2; - UpdateTokenRequest update_token_request = 3; - } - - +message StreamingWriteClientMessage { + oneof client_message { + InitRequest init_request = 1; + WriteRequest write_request = 2; + UpdateTokenRequest update_token_request = 3; + } + + // Handshake request that must be sent to server first. - message InitRequest { + message InitRequest { // Path of topic to write to. - string topic = 1; - // message group identifier of client data stream a.k.a. sourceId. - string message_group_id = 2; - // Some user metadata attached to this write session. - map<string, string> session_meta = 3; + string topic = 1; + // message group identifier of client data stream a.k.a. sourceId. + string message_group_id = 2; + // Some user metadata attached to this write session. + map<string, string> session_meta = 3; // Partition group to write to. - // Zero means any group. - int64 partition_group_id = 4; - - int64 max_supported_block_format_version = 5; - - string session_id = 100; - // 0 for first init message and incremental value for connect retries. Used for server logging. - int64 connection_attempt = 101; - // Opaque blob. Take last one from previous connect. - bytes connection_meta = 102; - - // Optinal preferred cluster name. Sever will close session If preferred cluster is not server cluster and preferred cluster is enabled after delay TPQConfig::CloseClientSessionWithEnabledRemotePreferredClusterDelaySec - string preferred_cluster = 103; - - // Sanity check option. When no writing activity is done in idle_timeout_sec seconds, then session will be destroyed. Zero means infinity. - int64 idle_timeout_ms = 200; + // Zero means any group. + int64 partition_group_id = 4; + + int64 max_supported_block_format_version = 5; + + string session_id = 100; + // 0 for first init message and incremental value for connect retries. Used for server logging. + int64 connection_attempt = 101; + // Opaque blob. Take last one from previous connect. + bytes connection_meta = 102; + + // Optinal preferred cluster name. Sever will close session If preferred cluster is not server cluster and preferred cluster is enabled after delay TPQConfig::CloseClientSessionWithEnabledRemotePreferredClusterDelaySec + string preferred_cluster = 103; + + // Sanity check option. When no writing activity is done in idle_timeout_sec seconds, then session will be destroyed. Zero means infinity. + int64 idle_timeout_ms = 200; } - // Represents portion of client messages. - message WriteRequest { - // Sequence numbers of messages in order that client will provide to server. - repeated int64 sequence_numbers = 2; - // Message creation timestamps for client messages. - // Same size as sequence_numbers. - repeated int64 created_at_ms = 3; - // Message creation timestamps for client messages. - // Same size as sequence_numbers. - repeated int64 sent_at_ms = 4; - // Client message sizes. - // Same size as sequence_numbers. - repeated int64 message_sizes = 5; - - // Block must contain whole client message when it's size is not bigger that max_block_size. - // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of - // this blocks will be with block_count = 0 but not the last one - last one's block_count will be greater than 0; - // Blocks can be reordered upto max_flush_window_size of uncompressed data. + // Represents portion of client messages. + message WriteRequest { + // Sequence numbers of messages in order that client will provide to server. + repeated int64 sequence_numbers = 2; + // Message creation timestamps for client messages. + // Same size as sequence_numbers. + repeated int64 created_at_ms = 3; + // Message creation timestamps for client messages. + // Same size as sequence_numbers. + repeated int64 sent_at_ms = 4; + // Client message sizes. + // Same size as sequence_numbers. + repeated int64 message_sizes = 5; + + // Block must contain whole client message when it's size is not bigger that max_block_size. + // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of + // this blocks will be with block_count = 0 but not the last one - last one's block_count will be greater than 0; + // Blocks can be reordered upto max_flush_window_size of uncompressed data. // Each block contains concatenated client messages, compressed by chosen codec. - // If there is not full client message inside block, then all block contains only this part of message. - // blocks: A A A B B B BCDE - // offset: 1 1 1 2 2 2 2 - // part_number: 0 1 2 0 1 2 3 - // count: 0 0 1 0 0 1 4 - - repeated int64 blocks_offsets = 6; - repeated int64 blocks_part_numbers = 7; - // How many complete messages and imcomplete messages end parts (one at most) this block contains - repeated int64 blocks_message_counts = 8; - repeated int64 blocks_uncompressed_sizes = 9; - // In block format version 0 each byte contains only block codec identifier - repeated bytes blocks_headers = 10; - repeated bytes blocks_data = 11; + // If there is not full client message inside block, then all block contains only this part of message. + // blocks: A A A B B B BCDE + // offset: 1 1 1 2 2 2 2 + // part_number: 0 1 2 0 1 2 3 + // count: 0 0 1 0 0 1 4 + + repeated int64 blocks_offsets = 6; + repeated int64 blocks_part_numbers = 7; + // How many complete messages and imcomplete messages end parts (one at most) this block contains + repeated int64 blocks_message_counts = 8; + repeated int64 blocks_uncompressed_sizes = 9; + // In block format version 0 each byte contains only block codec identifier + repeated bytes blocks_headers = 10; + repeated bytes blocks_data = 11; } - // In-session reauthentication and reauthorization, lets user increase session lifetime. You should wait for 'update_token_response' before sending next 'update_token_request'. - message UpdateTokenRequest { - string token = 1; - } + // In-session reauthentication and reauthorization, lets user increase session lifetime. You should wait for 'update_token_response' before sending next 'update_token_request'. + message UpdateTokenRequest { + string token = 1; + } } /** - * Response for write session. Contains one of: - * InitResponse - correct handshake response. - * BatchWriteResponse - acknowledgment of storing client messages. - * UpdateTokenResponse - acknowledgment of reauthentication and reauthorization. + * Response for write session. Contains one of: + * InitResponse - correct handshake response. + * BatchWriteResponse - acknowledgment of storing client messages. + * UpdateTokenResponse - acknowledgment of reauthentication and reauthorization. */ -message StreamingWriteServerMessage { - oneof server_message { - InitResponse init_response = 3; - BatchWriteResponse batch_write_response = 4; - UpdateTokenResponse update_token_response = 5; - } - - // Server status of response. - Ydb.StatusIds.StatusCode status = 1; - - // Issues if any. - repeated Ydb.Issue.IssueMessage issues = 2; - +message StreamingWriteServerMessage { + oneof server_message { + InitResponse init_response = 3; + BatchWriteResponse batch_write_response = 4; + UpdateTokenResponse update_token_response = 5; + } + + // Server status of response. + Ydb.StatusIds.StatusCode status = 1; + + // Issues if any. + repeated Ydb.Issue.IssueMessage issues = 2; + // Response for handshake. - message InitResponse { - // Last persisted message's sequence number for this message group. - int64 last_sequence_number = 1; + message InitResponse { + // Last persisted message's sequence number for this message group. + int64 last_sequence_number = 1; // Unique identifier of write session. Used for debug purposes. string session_id = 2; // Path of topic that matched for this write session. Used for debug purposes, will be the same as in Init request from client. - string topic = 3; + string topic = 3; // Write session is established to this cluster. Client data will be in instance of topic in this cluster. string cluster = 4; - // Identifier of partition that is matched for this write session. - int64 partition_id = 5; + // Identifier of partition that is matched for this write session. + int64 partition_id = 5; - // Block (see StreamingWriteClientMessage.WriteRequest.blocks_data) format version supported by server or configured for a topic. Client must write data only with them. - int64 block_format_version = 6; + // Block (see StreamingWriteClientMessage.WriteRequest.blocks_data) format version supported by server or configured for a topic. Client must write data only with them. + int64 block_format_version = 6; // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise. repeated Codec supported_codecs = 10; - - // Maximal flush window size choosed by server. Size of uncompressed data not sended to server must not be bigger than flush window size. - // In other words, this is maximal size of gap inside uncompressed data, which is not sended to server yet. - int64 max_flush_window_size = 7; // will be 2048kb - // How big blocks per stream could be(in uncompressed size). When block contains more than max_block_size of uncompressed data - block must be truncated. - int64 max_block_size = 8; // will be 512kb - - // Opaque blob, used for fast reconnects. - bytes connection_meta = 9; - } - - // Message that represents acknowledgment for sequence of client messages. This sequence is persisted together so write statistics is for messages batch. - message BatchWriteResponse { - // Sequence numbers of persisted client messages. - repeated int64 sequence_numbers = 1; - // Assigned partition offsets. - // Zero for skipped messages. - repeated int64 offsets = 2; - // Per message flag. False if message is written for the first time and True otherwise. - repeated bool already_written = 3; - - // Assigned partition for all client messages inside this batch. - int64 partition_id = 4; - - // Write statistics for this sequence of client messages. - WriteStatistics write_statistics = 5; - } - - message UpdateTokenResponse { + + // Maximal flush window size choosed by server. Size of uncompressed data not sended to server must not be bigger than flush window size. + // In other words, this is maximal size of gap inside uncompressed data, which is not sended to server yet. + int64 max_flush_window_size = 7; // will be 2048kb + // How big blocks per stream could be(in uncompressed size). When block contains more than max_block_size of uncompressed data - block must be truncated. + int64 max_block_size = 8; // will be 512kb + + // Opaque blob, used for fast reconnects. + bytes connection_meta = 9; } + // Message that represents acknowledgment for sequence of client messages. This sequence is persisted together so write statistics is for messages batch. + message BatchWriteResponse { + // Sequence numbers of persisted client messages. + repeated int64 sequence_numbers = 1; + // Assigned partition offsets. + // Zero for skipped messages. + repeated int64 offsets = 2; + // Per message flag. False if message is written for the first time and True otherwise. + repeated bool already_written = 3; + + // Assigned partition for all client messages inside this batch. + int64 partition_id = 4; + + // Write statistics for this sequence of client messages. + WriteStatistics write_statistics = 5; + } + + message UpdateTokenResponse { + } + // Message with write statistics. - message WriteStatistics { + message WriteStatistics { // Time spent in persisting of data. - int64 persist_duration_ms = 1; + int64 persist_duration_ms = 1; // Time spent in queue before persisting. - int64 queued_in_partition_duration_ms = 2; + int64 queued_in_partition_duration_ms = 2; // Time spent awaiting for partition write quota. - int64 throttled_on_partition_duration_ms = 3; - // Time spent awaiting for topic write quota. - int64 throttled_on_topic_duration_ms = 4; + int64 throttled_on_partition_duration_ms = 3; + // Time spent awaiting for topic write quota. + int64 throttled_on_topic_duration_ms = 4; } -} +} -message Path { - // Path of object (topic/consumer). - string path = 1; -} +message Path { + // Path of object (topic/consumer). + string path = 1; +} -message KeyValue { - string key = 1; - string value = 2; +message KeyValue { + string key = 1; + string value = 2; } /** @@ -575,9 +575,9 @@ message StreamingReadServerMessageNew { // How many complete messages and imcomplete messages end parts (one at most) this block contains repeated int64 blocks_message_counts = 15; repeated int64 blocks_uncompressed_sizes = 16; - // In block format version 0 each byte contains only block codec identifier - repeated bytes blocks_headers = 17; - repeated bytes blocks_data = 18; + // In block format version 0 each byte contains only block codec identifier + repeated bytes blocks_headers = 17; + repeated bytes blocks_data = 18; // Zero if this is not first portion of data after resume or provided by client cookie otherwise. int64 resume_cookie = 50; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index 677eb8c03d..a2ab6f477c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -1,14 +1,14 @@ -#pragma once -#include "test_server.h" +#pragma once +#include "test_server.h" #include <ydb/library/persqueue/topic_parser_public/topic_parser.h> #include <library/cpp/logger/log.h> #include <util/system/tempfile.h> + +#define TEST_CASE_NAME (this->Name_) + +namespace NPersQueue { -#define TEST_CASE_NAME (this->Name_) - -namespace NPersQueue { - -class SDKTestSetup { +class SDKTestSetup { protected: TString TestCaseName; @@ -19,10 +19,10 @@ protected: TLog Log = TLog("cerr"); -public: +public: SDKTestSetup(const TString& testCaseName, bool start = true) : TestCaseName(testCaseName) - { + { InitOptions(); if (start) { Start(); @@ -31,14 +31,14 @@ public: void InitOptions() { Log.SetFormatter([testCaseName = TestCaseName](ELogPriority priority, TStringBuf message) { - return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; - }); - Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max()); - // Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors: - // 1. 'got more than 10 unordered cookies to commit 12' - // 2. 'got more than 10 uncommitted reads' - Server.ServerSettings.PQConfig.Clear(); - Server.ServerSettings.PQConfig.SetEnabled(true); + return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; + }); + Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max()); + // Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors: + // 1. 'got more than 10 unordered cookies to commit 12' + // 2. 'got more than 10 uncommitted reads' + Server.ServerSettings.PQConfig.Clear(); + Server.ServerSettings.PQConfig.SetEnabled(true); Server.ServerSettings.PQConfig.SetRemoteClusterEnabledDelaySec(1); Server.ServerSettings.PQConfig.SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(1); Server.ServerSettings.PQClusterDiscoveryConfig.SetEnabled(true); @@ -53,7 +53,7 @@ public: void Start(bool waitInit = true, bool addBrokenDatacenter = false) { Server.StartServer(false); //Server.EnableLogs({NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); - Server.AnnoyingClient->InitRoot(); + Server.AnnoyingClient->InitRoot(); if (DataCenters.empty()) { THashMap<TString, NKikimr::NPersQueueTests::TPQTestClusterInfo> dataCenters; dataCenters.emplace("dc1", NKikimr::NPersQueueTests::TPQTestClusterInfo{TStringBuilder() << "localhost:" << Server.GrpcPort, true}); @@ -64,33 +64,33 @@ public: } else { Server.AnnoyingClient->InitDCs(DataCenters, LocalDC); } - Server.AnnoyingClient->InitSourceIds(); + Server.AnnoyingClient->InitSourceIds(); CreateTopic(GetTestTopic(), GetLocalCluster()); if (waitInit) { Server.WaitInit(GetTestTopic()); } - } - + } + TString GetTestTopic() const { - return "topic1"; - } - + return "topic1"; + } + TString GetTestClient() const { - return "test-reader"; - } - + return "test-reader"; + } + TString GetTestMessageGroupId() const { - return "test-message-group-id"; - } - + return "test-message-group-id"; + } + TString GetLocalCluster() const { return LocalDC; - } - + } + ui16 GetGrpcPort() const { return Server.GrpcPort; } - + NGrpc::TServerOptions& GetGrpcServerOptions() { return Server.GrpcServerOptions; } @@ -102,11 +102,11 @@ public: Server.ServerSettings.NetClassifierConfig.SetNetDataFilePath(NetDataFile->Name()); } - - TLog& GetLog() { - return Log; - } - + + TLog& GetLog() { + return Log; + } + template <class TConsumerOrProducer> void Start(const THolder<TConsumerOrProducer>& obj) { auto startFuture = obj->Start(); @@ -198,5 +198,5 @@ public: UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record); Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID()); } -}; -} +}; +} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h index 250ce03620..9e43a7c42e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h @@ -69,7 +69,7 @@ public: void EnableLogs(const TVector<NKikimrServices::EServiceKikimr> services, NActors::NLog::EPriority prio = NActors::NLog::PRI_DEBUG) { - Y_VERIFY(CleverServer != nullptr, "Start server before enabling logs"); + Y_VERIFY(CleverServer != nullptr, "Start server before enabling logs"); for (auto s : services) { CleverServer->GetRuntime()->SetLogPriority(s, prio); } diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 65c8aca1db..1fc46216bb 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -52,14 +52,14 @@ const TString& TopicPrefix(const TActorContext& ctx); static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5); -// Codec ID size in bytes -constexpr ui32 CODEC_ID_SIZE = 1; +// Codec ID size in bytes +constexpr ui32 CODEC_ID_SIZE = 1; + - -template<typename TItem0, typename... TItems> -bool AllEqual(const TItem0& item0, const TItems&... items) { - return ((items == item0) && ... && true); -} +template<typename TItem0, typename... TItems> +bool AllEqual(const TItem0& item0, const TItems&... items) { + return ((items == item0) && ... && true); +} static inline bool InternalErrorCode(PersQueue::ErrorCode::ErrorCode errorCode) { switch(errorCode) { @@ -98,7 +98,7 @@ struct TCommitCookie { struct TEvPQProxy { enum EEv { - EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something + EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something EvWrite, EvDone, EvReadInit, @@ -123,7 +123,7 @@ struct TEvPQProxy { EvUpdateClusters, EvQueryCompiled, EvSessionDead, - EvSessionSetPreferredCluster, + EvSessionSetPreferredCluster, EvScheduleUpdateClusters, EvDeadlineExceeded, EvGetStatus, @@ -148,14 +148,14 @@ struct TEvPQProxy { TTopicTabletsPairs TopicAndTablets; }; - struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> { - TEvSessionSetPreferredCluster(const ui64 cookie, const TString& preferredCluster) - : Cookie(cookie) - , PreferredCluster(preferredCluster) - {} - const ui64 Cookie; - const TString PreferredCluster; - }; + struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> { + TEvSessionSetPreferredCluster(const ui64 cookie, const TString& preferredCluster) + : Cookie(cookie) + , PreferredCluster(preferredCluster) + {} + const ui64 Cookie; + const TString PreferredCluster; + }; struct TEvSessionDead : public NActors::TEventLocal<TEvSessionDead, EvSessionDead> { TEvSessionDead(const ui64 cookie) @@ -196,21 +196,21 @@ struct TEvPQProxy { struct TEvWriteInit : public NActors::TEventLocal<TEvWriteInit, EvWriteInit> { - TEvWriteInit(PersQueue::V1::StreamingWriteClientMessage&& req, const TString& peerName) + TEvWriteInit(PersQueue::V1::StreamingWriteClientMessage&& req, const TString& peerName) : Request(std::move(req)) , PeerName(peerName) { } - PersQueue::V1::StreamingWriteClientMessage Request; + PersQueue::V1::StreamingWriteClientMessage Request; TString PeerName; }; struct TEvWrite : public NActors::TEventLocal<TEvWrite, EvWrite> { - explicit TEvWrite(PersQueue::V1::StreamingWriteClientMessage&& req) + explicit TEvWrite(PersQueue::V1::StreamingWriteClientMessage&& req) : Request(std::move(req)) { } - PersQueue::V1::StreamingWriteClientMessage Request; + PersQueue::V1::StreamingWriteClientMessage Request; }; struct TEvDone : public NActors::TEventLocal<TEvDone, EvDone> { @@ -244,14 +244,14 @@ struct TEvPQProxy { ui64 ReadTimestampMs; }; - struct TEvUpdateToken : public NActors::TEventLocal<TEvUpdateToken, EvUpdateToken> { - explicit TEvUpdateToken(PersQueue::V1::StreamingWriteClientMessage&& req) - : Request(std::move(req)) - { } - - PersQueue::V1::StreamingWriteClientMessage Request; - }; - + struct TEvUpdateToken : public NActors::TEventLocal<TEvUpdateToken, EvUpdateToken> { + explicit TEvUpdateToken(PersQueue::V1::StreamingWriteClientMessage&& req) + : Request(std::move(req)) + { } + + PersQueue::V1::StreamingWriteClientMessage Request; + }; + struct TEvCloseSession : public NActors::TEventLocal<TEvCloseSession, EvCloseSession> { TEvCloseSession(const TString& reason, const PersQueue::ErrorCode::ErrorCode errorCode) : Reason(reason) @@ -453,7 +453,7 @@ struct TEvPQProxy { /// WRITE ACTOR class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor> { -using IContext = NGRpcServer::IGRpcStreamingContext<PersQueue::V1::StreamingWriteClientMessage, PersQueue::V1::StreamingWriteServerMessage>; +using IContext = NGRpcServer::IGRpcStreamingContext<PersQueue::V1::StreamingWriteClientMessage, PersQueue::V1::StreamingWriteServerMessage>; using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse; using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest; @@ -515,11 +515,11 @@ private: void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx); void CheckACL(const TActorContext& ctx); - // Requests fresh ACL from 'SchemeCache' + // Requests fresh ACL from 'SchemeCache' void InitCheckSchema(const TActorContext& ctx, bool needWaitSchema = false); void Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPQProxy::TEvWrite::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPQProxy::TEvDone::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx); @@ -581,8 +581,8 @@ private: NPersQueue::TConverterPtr TopicConverter; ui32 Partition; ui32 PreferedPartition; - // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 - TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere + // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 + TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere TString EscapedSourceId; ui32 Hash = 0; @@ -592,7 +592,7 @@ private: ui32 NumReserveBytesRequests; THolder<TAclWrapper> ACL; - + struct TWriteRequestBatchInfo: public TSimpleRefCount<TWriteRequestBatchInfo> { using TPtr = TIntrusivePtr<TWriteRequestBatchInfo>; @@ -615,7 +615,7 @@ private: // Requests that is already sent to partition actor std::deque<TWriteRequestBatchInfo::TPtr> SentMessages; - + bool WritesDone; THashMap<ui32, ui64> PartitionToTablet; @@ -639,9 +639,9 @@ private: TIntrusivePtr<NACLib::TUserToken> Token; TString Auth; - // Got 'update_token_request', authentication or authorization in progress or 'update_token_response' is not sent yet. Only single 'update_token_request' is allowed inflight - bool UpdateTokenInProgress; - bool UpdateTokenAuthenticated; + // Got 'update_token_request', authentication or authorization in progress or 'update_token_response' is not sent yet. Only single 'update_token_request' is allowed inflight + bool UpdateTokenInProgress; + bool UpdateTokenAuthenticated; bool ACLCheckInProgress; bool FirstACLCheck; bool RequestNotChecked; @@ -651,8 +651,8 @@ private: ui64 BalancerTabletId; TActorId PipeToBalancer; - // PQ tablet configuration that we get at the time of session initialization - NKikimrPQ::TPQTabletConfig InitialPQTabletConfig; + // PQ tablet configuration that we get at the time of session initialization + NKikimrPQ::TPQTabletConfig InitialPQTabletConfig; NKikimrPQClient::TDataChunk InitMeta; TString LocalDC; diff --git a/ydb/services/persqueue_v1/grpc_pq_codecs.cpp b/ydb/services/persqueue_v1/grpc_pq_codecs.cpp index d9cbfeb411..5415fc1576 100644 --- a/ydb/services/persqueue_v1/grpc_pq_codecs.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_codecs.cpp @@ -1,30 +1,30 @@ -#include "grpc_pq_codecs.h" +#include "grpc_pq_codecs.h" #include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <util/generic/algorithm.h> -#include <util/generic/fwd.h> -#include <util/string/builder.h> - -namespace NKikimr::NGRpcProxy { - bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error) { - error.clear(); - - if (pqTabletConfig.has_codecs() /* empty codecs that any codec is allowed for migration purposes */) { - const auto& ids = pqTabletConfig.codecs().ids(); - if (!ids.empty() && Find(ids, codecID) == ids.end()) { - const auto& names = pqTabletConfig.codecs().codecs(); - Y_VERIFY(ids.size() == names.size(), "PQ tabled supported codecs configuration is invalid"); - TStringBuilder errorBuilder; - errorBuilder << "given codec (id " << static_cast<i32>(codecID) << ") is not configured for the topic. Configured codecs are " << names[0] << " (id " << ids[0] << ")"; - for (i32 i = 1; i != ids.size(); ++i) { - errorBuilder << ", " << names[i] << " (id " << ids[i] << ")"; - } - error = errorBuilder; - return false; - } - } - - return true; - } -} +#include <util/generic/fwd.h> +#include <util/string/builder.h> + +namespace NKikimr::NGRpcProxy { + bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error) { + error.clear(); + + if (pqTabletConfig.has_codecs() /* empty codecs that any codec is allowed for migration purposes */) { + const auto& ids = pqTabletConfig.codecs().ids(); + if (!ids.empty() && Find(ids, codecID) == ids.end()) { + const auto& names = pqTabletConfig.codecs().codecs(); + Y_VERIFY(ids.size() == names.size(), "PQ tabled supported codecs configuration is invalid"); + TStringBuilder errorBuilder; + errorBuilder << "given codec (id " << static_cast<i32>(codecID) << ") is not configured for the topic. Configured codecs are " << names[0] << " (id " << ids[0] << ")"; + for (i32 i = 1; i != ids.size(); ++i) { + errorBuilder << ", " << names[i] << " (id " << ids[i] << ")"; + } + error = errorBuilder; + return false; + } + } + + return true; + } +} diff --git a/ydb/services/persqueue_v1/grpc_pq_codecs.h b/ydb/services/persqueue_v1/grpc_pq_codecs.h index 005aeaa2d8..c3d2cb3734 100644 --- a/ydb/services/persqueue_v1/grpc_pq_codecs.h +++ b/ydb/services/persqueue_v1/grpc_pq_codecs.h @@ -1,8 +1,8 @@ -#pragma once +#pragma once #include <ydb/core/protos/flat_scheme_op.pb.h> -#include <util/generic/fwd.h> - -namespace NKikimr::NGRpcProxy { - // Validates that client can safely write to the topic data compressed with specific codec - bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error); -} +#include <util/generic/fwd.h> + +namespace NKikimr::NGRpcProxy { + // Validates that client can safely write to the topic data compressed with specific codec + bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error); +} diff --git a/ydb/services/persqueue_v1/grpc_pq_read.cpp b/ydb/services/persqueue_v1/grpc_pq_read.cpp index 6b7e72fdd0..bb6137d99c 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read.cpp @@ -17,7 +17,7 @@ namespace V1 { /////////////////////////////////////////////////////////////////////////////// -using namespace PersQueue::V1; +using namespace PersQueue::V1; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 654edfcfcd..384d631809 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -32,7 +32,7 @@ using namespace NMsgBusProxy; namespace NGRpcProxy { namespace V1 { -using namespace PersQueue::V1; +using namespace PersQueue::V1; #define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session @@ -2982,8 +2982,8 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue: case INITIALIZING: case CLUSTER_DISABLED: return Ydb::StatusIds::UNAVAILABLE; - case PREFERRED_CLUSTER_MISMATCHED: - return Ydb::StatusIds::ABORTED; + case PREFERRED_CLUSTER_MISMATCHED: + return Ydb::StatusIds::ABORTED; case OVERLOAD: return Ydb::StatusIds::OVERLOADED; case BAD_REQUEST: diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp index 5f17a2f3ab..816cc63579 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp @@ -14,7 +14,7 @@ namespace NKikimr { namespace NGRpcProxy { namespace V1 { -using namespace PersQueue::V1; +using namespace PersQueue::V1; /////////////////////////////////////////////////////////////////////////////// @@ -65,7 +65,7 @@ void TPQWriteService::Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpda } -void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx) { +void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx) { Y_VERIFY(ev->Get()->ClustersList); Y_VERIFY(ev->Get()->ClustersList->Clusters.size()); @@ -74,92 +74,92 @@ void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvCluster LocalCluster = ""; Enabled = false; - // Rebalance load on installation clusters: if preferred cluster is enabled and session is alive long enough close it so client can recreate it in preferred cluster - auto remoteClusterEnabledDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetRemoteClusterEnabledDelaySec()); - auto closeClientSessionWithEnabledRemotePreferredClusterDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec()); - const auto clustersListUpdatedAt = ev->Get()->ClustersListUpdateTimestamp ? *ev->Get()->ClustersListUpdateTimestamp : TInstant::Now(); - THashSet<TString> remoteClusters; - THashSet<TString> rebalanceClusters; - for (const auto& cluster : clusters) { - if (cluster.IsLocal) { - LocalCluster = cluster.Name; - Enabled = cluster.IsEnabled; - continue; - } - - remoteClusters.emplace(cluster.Name); - - if (!cluster.IsEnabled) { - ClustersEnabledAt.erase(cluster.Name); - continue; - } - - if (!ClustersEnabledAt.contains(cluster.Name)) { - ClustersEnabledAt[cluster.Name] = clustersListUpdatedAt; - } - - const bool readyToCreateSessions = ClustersEnabledAt[cluster.Name] <= (TInstant::Now() - remoteClusterEnabledDelay); - if (readyToCreateSessions) { - rebalanceClusters.emplace(cluster.Name); - } - } - - if (!Enabled) { - for (auto it = Sessions.begin(); it != Sessions.end(); ++it) { - Send(it->second, new TEvPQProxy::TEvDieCommand("cluster disabled", PersQueue::ErrorCode::CLUSTER_DISABLED)); - } - return; - } - - for (const auto& sessionsByPreferredCluster : SessionsByRemotePreferredCluster) { - const auto& cluster = sessionsByPreferredCluster.first; - if (rebalanceClusters.contains(cluster) || !remoteClusters.contains(cluster)) { - const TString closeReason = TStringBuilder() << "Session preferred cluster " << cluster.Quote() - << (remoteClusters.contains(cluster) ? " is enabled for at least " + ToString(closeClientSessionWithEnabledRemotePreferredClusterDelay) : " is unknown") - << " and session is older than " << closeClientSessionWithEnabledRemotePreferredClusterDelay; - - const auto closeUpToCreatedAt = TInstant::Now() - closeClientSessionWithEnabledRemotePreferredClusterDelay; - - for (const auto& session : sessionsByPreferredCluster.second) { - const auto& createdAt = session.second; - if (createdAt <= closeUpToCreatedAt) { - const auto& workerID = Sessions[session.first]; - Send(workerID, new TEvPQProxy::TEvDieCommand(closeReason, PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED)); - } + // Rebalance load on installation clusters: if preferred cluster is enabled and session is alive long enough close it so client can recreate it in preferred cluster + auto remoteClusterEnabledDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetRemoteClusterEnabledDelaySec()); + auto closeClientSessionWithEnabledRemotePreferredClusterDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec()); + const auto clustersListUpdatedAt = ev->Get()->ClustersListUpdateTimestamp ? *ev->Get()->ClustersListUpdateTimestamp : TInstant::Now(); + THashSet<TString> remoteClusters; + THashSet<TString> rebalanceClusters; + for (const auto& cluster : clusters) { + if (cluster.IsLocal) { + LocalCluster = cluster.Name; + Enabled = cluster.IsEnabled; + continue; + } + + remoteClusters.emplace(cluster.Name); + + if (!cluster.IsEnabled) { + ClustersEnabledAt.erase(cluster.Name); + continue; + } + + if (!ClustersEnabledAt.contains(cluster.Name)) { + ClustersEnabledAt[cluster.Name] = clustersListUpdatedAt; + } + + const bool readyToCreateSessions = ClustersEnabledAt[cluster.Name] <= (TInstant::Now() - remoteClusterEnabledDelay); + if (readyToCreateSessions) { + rebalanceClusters.emplace(cluster.Name); + } + } + + if (!Enabled) { + for (auto it = Sessions.begin(); it != Sessions.end(); ++it) { + Send(it->second, new TEvPQProxy::TEvDieCommand("cluster disabled", PersQueue::ErrorCode::CLUSTER_DISABLED)); + } + return; + } + + for (const auto& sessionsByPreferredCluster : SessionsByRemotePreferredCluster) { + const auto& cluster = sessionsByPreferredCluster.first; + if (rebalanceClusters.contains(cluster) || !remoteClusters.contains(cluster)) { + const TString closeReason = TStringBuilder() << "Session preferred cluster " << cluster.Quote() + << (remoteClusters.contains(cluster) ? " is enabled for at least " + ToString(closeClientSessionWithEnabledRemotePreferredClusterDelay) : " is unknown") + << " and session is older than " << closeClientSessionWithEnabledRemotePreferredClusterDelay; + + const auto closeUpToCreatedAt = TInstant::Now() - closeClientSessionWithEnabledRemotePreferredClusterDelay; + + for (const auto& session : sessionsByPreferredCluster.second) { + const auto& createdAt = session.second; + if (createdAt <= closeUpToCreatedAt) { + const auto& workerID = Sessions[session.first]; + Send(workerID, new TEvPQProxy::TEvDieCommand(closeReason, PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED)); + } } } } } -void TPQWriteService::Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx) { - const auto& cookie = ev->Get()->Cookie; - const auto& preferredCluster = ev->Get()->PreferredCluster; - if (!Sessions.contains(cookie)) { - LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, TStringBuilder() << "Got TEvSessionSetPreferredCluster message from session with cookie " << cookie << " that is not in session collection"); - return; - } - if (!preferredCluster.empty() && *LocalCluster != preferredCluster) { - SessionsByRemotePreferredCluster[preferredCluster][cookie] = TInstant::Now(); - RemotePreferredClusterBySessionCookie[cookie] = std::move(preferredCluster); - } -} +void TPQWriteService::Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx) { + const auto& cookie = ev->Get()->Cookie; + const auto& preferredCluster = ev->Get()->PreferredCluster; + if (!Sessions.contains(cookie)) { + LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, TStringBuilder() << "Got TEvSessionSetPreferredCluster message from session with cookie " << cookie << " that is not in session collection"); + return; + } + if (!preferredCluster.empty() && *LocalCluster != preferredCluster) { + SessionsByRemotePreferredCluster[preferredCluster][cookie] = TInstant::Now(); + RemotePreferredClusterBySessionCookie[cookie] = std::move(preferredCluster); + } +} void TPQWriteService::Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext&) { - const auto& cookie = ev->Get()->Cookie; - Sessions.erase(cookie); - if (RemotePreferredClusterBySessionCookie.contains(cookie)) { - const auto& preferredCluster = RemotePreferredClusterBySessionCookie[cookie]; - SessionsByRemotePreferredCluster[preferredCluster].erase(cookie); - if (SessionsByRemotePreferredCluster[preferredCluster].empty()) { - SessionsByRemotePreferredCluster.erase(preferredCluster); - } - RemotePreferredClusterBySessionCookie.erase(cookie); - } + const auto& cookie = ev->Get()->Cookie; + Sessions.erase(cookie); + if (RemotePreferredClusterBySessionCookie.contains(cookie)) { + const auto& preferredCluster = RemotePreferredClusterBySessionCookie[cookie]; + SessionsByRemotePreferredCluster[preferredCluster].erase(cookie); + if (SessionsByRemotePreferredCluster[preferredCluster].empty()) { + SessionsByRemotePreferredCluster.erase(preferredCluster); + } + RemotePreferredClusterBySessionCookie.erase(cookie); + } } -StreamingWriteServerMessage FillWriteResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) { - StreamingWriteServerMessage res; +StreamingWriteServerMessage FillWriteResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) { + StreamingWriteServerMessage res; FillIssue(res.add_issues(), code, errorReason); res.set_status(ConvertPersQueueInternalCodeToStatus(code)); return res; diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h index 73148388b2..d18355dc60 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.h +++ b/ydb/services/persqueue_v1/grpc_pq_write.h @@ -43,9 +43,9 @@ private: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { HFunc(NKikimr::NGRpcService::TEvStreamPQWriteRequest, Handle); - HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); + HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); HFunc(TEvPQProxy::TEvSessionDead, Handle); - HFunc(TEvPQProxy::TEvSessionSetPreferredCluster, Handle); + HFunc(TEvPQProxy::TEvSessionSetPreferredCluster, Handle); HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle); } @@ -53,10 +53,10 @@ private: private: void Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx); + void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx); void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext& ctx); NActors::TActorId SchemeCache; @@ -65,11 +65,11 @@ private: TAtomic LastCookie = 0; THashMap<ui64, TActorId> Sessions; - // Created at by session cookie map by remote preferred cluster name - THashMap<TString, THashMap<ui64, TInstant>> SessionsByRemotePreferredCluster; - THashMap<ui64, TString> RemotePreferredClusterBySessionCookie; - // Cluster enabled at time if cluster is currently enabled - THashMap<TString, TInstant> ClustersEnabledAt; + // Created at by session cookie map by remote preferred cluster name + THashMap<TString, THashMap<ui64, TInstant>> SessionsByRemotePreferredCluster; + THashMap<ui64, TString> RemotePreferredClusterBySessionCookie; + // Cluster enabled at time if cluster is currently enabled + THashMap<TString, TInstant> ClustersEnabledAt; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 3038cc82a6..a706a1cdd5 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -14,7 +14,7 @@ #include <util/string/hex.h> #include <util/string/vector.h> #include <util/string/escape.h> -#include <util/string/printf.h> +#include <util/string/printf.h> using namespace NActors; using namespace NKikimrClient; @@ -76,7 +76,7 @@ void FillChunkDataFromReq( namespace NGRpcProxy { namespace V1 { -using namespace Ydb::PersQueue::V1; +using namespace Ydb::PersQueue::V1; static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; @@ -131,11 +131,11 @@ TWriteSessionActor::TWriteSessionActor( , NextRequestInited(false) , NextRequestCookie(0) , Token(nullptr) - , UpdateTokenInProgress(false) - , UpdateTokenAuthenticated(false) + , UpdateTokenInProgress(false) + , UpdateTokenAuthenticated(false) , ACLCheckInProgress(false) , FirstACLCheck(true) - , RequestNotChecked(false) + , RequestNotChecked(false) , LastACLCheckTimestamp(TInstant::Zero()) , LogSessionDeadline(TInstant::Zero()) , BalancerTabletId(0) @@ -191,27 +191,27 @@ TString WriteRequestToLog(const Ydb::PersQueue::V1::StreamingWriteClientMessage& void TWriteSessionActor::Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read done: success: " << ev->Get()->Success << " data: " << WriteRequestToLog(ev->Get()->Record)); - if (!ev->Get()->Success) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed"); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone()); - return; - } - - switch(ev->Get()->Record.client_message_case()) { - case StreamingWriteClientMessage::kInitRequest: + if (!ev->Get()->Success) { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed"); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone()); + return; + } + + switch(ev->Get()->Record.client_message_case()) { + case StreamingWriteClientMessage::kInitRequest: ctx.Send(ctx.SelfID, new TEvPQProxy::TEvWriteInit(std::move(ev->Get()->Record), Request->GetStreamCtx()->GetPeerName())); break; - case StreamingWriteClientMessage::kWriteRequest: + case StreamingWriteClientMessage::kWriteRequest: ctx.Send(ctx.SelfID, new TEvPQProxy::TEvWrite(std::move(ev->Get()->Record))); break; - case StreamingWriteClientMessage::kUpdateTokenRequest: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvUpdateToken(std::move(ev->Get()->Record))); - break; - } - case StreamingWriteClientMessage::CLIENT_MESSAGE_NOT_SET: { - CloseSession("'client_message' is not set", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } + case StreamingWriteClientMessage::kUpdateTokenRequest: { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvUpdateToken(std::move(ev->Get()->Record))); + break; + } + case StreamingWriteClientMessage::CLIENT_MESSAGE_NOT_SET: { + CloseSession("'client_message' is not set", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } } } @@ -261,7 +261,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& void TWriteSessionActor::CheckACL(const TActorContext& ctx) { //Y_VERIFY(ACLCheckInProgress); - + NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow; Y_VERIFY(ACL); @@ -271,16 +271,16 @@ void TWriteSessionActor::CheckACL(const TActorContext& ctx) { FirstACLCheck = false; DiscoverPartition(ctx); } - if (UpdateTokenInProgress && UpdateTokenAuthenticated) { - UpdateTokenInProgress = false; - StreamingWriteServerMessage serverMessage; - serverMessage.set_status(Ydb::StatusIds::SUCCESS); - serverMessage.mutable_update_token_response(); - if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); - Die(ctx); - } - } + if (UpdateTokenInProgress && UpdateTokenAuthenticated) { + UpdateTokenInProgress = false; + StreamingWriteServerMessage serverMessage; + serverMessage.set_status(Ydb::StatusIds::SUCCESS); + serverMessage.mutable_update_token_response(); + if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); + Die(ctx); + } + } } else { TString errorReason = Sprintf("access to topic '%s' denied for '%s' due to 'no WriteTopic rights', Marker# PQ1125", TopicConverter->GetClientsideName().c_str(), @@ -297,10 +297,10 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor CloseSession("got second init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } - const auto& init = event->Request.init_request(); + const auto& init = event->Request.init_request(); - if (init.topic().empty() || init.message_group_id().empty()) { - CloseSession("no topic or message_group_id in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); + if (init.topic().empty() || init.message_group_id().empty()) { + CloseSession("no topic or message_group_id in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } @@ -315,7 +315,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor PeerName = event->PeerName; - SourceId = init.message_group_id(); + SourceId = init.message_group_id(); TString encodedSourceId; try { encodedSourceId = NPQ::NSourceIdEncoding::Encode(SourceId); @@ -343,7 +343,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor InitCheckSchema(ctx, true); - PreferedPartition = init.partition_group_id() > 0 ? init.partition_group_id() - 1 : Max<ui32>(); + PreferedPartition = init.partition_group_id() > 0 ? init.partition_group_id() - 1 : Max<ui32>(); InitMeta = GetInitialDataChunk(init, TopicConverter->GetFullLegacyName(), PeerName); // ToDo[migration] - check? @@ -353,11 +353,11 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false); SLIErrors = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsError"}, true, "sensor", false); SLITotal.Inc(); - - const auto& preferredCluster = init.preferred_cluster(); - if (!preferredCluster.empty()) { - Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionSetPreferredCluster(Cookie, preferredCluster)); - } + + const auto& preferredCluster = init.preferred_cluster(); + if (!preferredCluster.empty()) { + Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionSetPreferredCluster(Cookie, preferredCluster)); + } } void TWriteSessionActor::SetupCounters() @@ -442,8 +442,8 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo if (Request->GetInternalToken().empty()) { // session without auth if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials"); - Die(ctx); + Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials"); + Die(ctx); return; } Y_VERIFY(FirstACLCheck); @@ -580,7 +580,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const auto& record = ev->Get()->Record.GetRef(); if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " << SourceId << " escaped " << EscapedSourceId << " discover partition race, retrying"); DiscoverPartition(ctx); return; @@ -609,7 +609,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition Partition = tt.GetOptional().GetUint32(); if (PreferedPartition < Max<ui32>() && Partition != PreferedPartition) { - CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.", + CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } @@ -618,7 +618,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const } } - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " << SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions " << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); @@ -728,7 +728,7 @@ void TWriteSessionActor::CloseSession(const TString& errorReason, const PersQueu ++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("Errors", true)); } - StreamingWriteServerMessage result; + StreamingWriteServerMessage result; result.set_status(ConvertPersQueueInternalCodeToStatus(errorCode)); FillIssue(result.add_issues(), errorCode, errorReason); @@ -879,7 +879,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& stat->set_persist_duration_ms( Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms())); }; - + ui32 partitionCmdWriteResultIndex = 0; // TODO: Send single batch write response for all user write requests up to some max size/count for (const auto& userWriteRequest : writeRequest->UserWriteRequests) { @@ -887,7 +887,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& result.set_status(Ydb::StatusIds::SUCCESS); auto batchWriteResponse = result.mutable_batch_write_response(); batchWriteResponse->set_partition_id(Partition); - + for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) { if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) { CloseSession("too less responses from server", PersQueue::ErrorCode::ERROR, ctx); @@ -948,21 +948,21 @@ void TWriteSessionActor::GenerateNextWriteRequest(const TActorContext& ctx) { Writes.clear(); i64 diff = 0; - auto addData = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) { + auto addData = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) { auto w = request.MutablePartitionRequest()->AddCmdWrite(); w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex)); - w->SetSeqNo(writeRequest.sequence_numbers(messageIndex)); + w->SetSeqNo(writeRequest.sequence_numbers(messageIndex)); w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId)); - w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex)); + w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex)); w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex)); w->SetClientDC(ClientDC); }; for (const auto& write : writeRequest->UserWriteRequests) { diff -= write->Request.ByteSize(); - const auto& writeRequest = write->Request.write_request(); - for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) { - addData(writeRequest, messageIndex); + const auto& writeRequest = write->Request.write_request(); + for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) { + addData(writeRequest, messageIndex); } } @@ -982,48 +982,48 @@ void TWriteSessionActor::GenerateNextWriteRequest(const TActorContext& ctx) { ++NumReserveBytesRequests; } -void TWriteSessionActor::Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const TActorContext& ctx) { - if (State != ES_INITED) { - CloseSession("got 'update_token_request' but write session is not initialized", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } - if (UpdateTokenInProgress) { - CloseSession("got another 'update_token_request' while previous still in progress, only single token update is allowed at a time", PersQueue::ErrorCode::OVERLOAD, ctx); - return; - } - - const auto& token = ev->Get()->Request.update_token_request().token(); - if (token == Auth || (token.empty() && !AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol())) { - // Got same token or empty token with no non-empty token requirement, do not trigger any checks - StreamingWriteServerMessage serverMessage; - serverMessage.set_status(Ydb::StatusIds::SUCCESS); - serverMessage.mutable_update_token_response(); - if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); - Die(ctx); - return; - } - } - else if (token.empty()) { - Request->ReplyUnauthenticated("'token' in 'update_token_request' is empty"); - Die(ctx); - return; - } - else { - UpdateTokenInProgress = true; - UpdateTokenAuthenticated = false; - Auth = token; - Request->RefreshToken(Auth, ctx, ctx.SelfID); - } - - NextRequestInited = true; - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed"); - Die(ctx); - return; - } -} - +void TWriteSessionActor::Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const TActorContext& ctx) { + if (State != ES_INITED) { + CloseSession("got 'update_token_request' but write session is not initialized", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + if (UpdateTokenInProgress) { + CloseSession("got another 'update_token_request' while previous still in progress, only single token update is allowed at a time", PersQueue::ErrorCode::OVERLOAD, ctx); + return; + } + + const auto& token = ev->Get()->Request.update_token_request().token(); + if (token == Auth || (token.empty() && !AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol())) { + // Got same token or empty token with no non-empty token requirement, do not trigger any checks + StreamingWriteServerMessage serverMessage; + serverMessage.set_status(Ydb::StatusIds::SUCCESS); + serverMessage.mutable_update_token_response(); + if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); + Die(ctx); + return; + } + } + else if (token.empty()) { + Request->ReplyUnauthenticated("'token' in 'update_token_request' is empty"); + Die(ctx); + return; + } + else { + UpdateTokenInProgress = true; + UpdateTokenAuthenticated = false; + Auth = token; + Request->RefreshToken(Auth, ctx, ctx.SelfID); + } + + NextRequestInited = true; + if (!Request->GetStreamCtx()->Read()) { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed"); + Die(ctx); + return; + } +} + void TWriteSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr &ev , const TActorContext& ctx) { Y_UNUSED(ctx); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "updating token"); @@ -1031,10 +1031,10 @@ void TWriteSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshToken if (ev->Get()->Authenticated && !ev->Get()->InternalToken.empty()) { Token = new NACLib::TUserToken(ev->Get()->InternalToken); Request->SetInternalToken(ev->Get()->InternalToken); - UpdateTokenAuthenticated = true; - if (!ACLCheckInProgress) { + UpdateTokenAuthenticated = true; + if (!ACLCheckInProgress) { InitCheckSchema(ctx); - } + } } else { Request->ReplyUnauthenticated("refreshed token is invalid"); Die(ctx); @@ -1051,65 +1051,65 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont return; } - const auto& writeRequest = ev->Get()->Request.write_request(); - if (!AllEqual(writeRequest.sequence_numbers_size(), writeRequest.created_at_ms_size(), writeRequest.sent_at_ms_size(), writeRequest.message_sizes_size())) { - CloseSession(TStringBuilder() << "messages meta repeated fields do not have same size, 'sequence_numbers' size is " << writeRequest.sequence_numbers_size() - << ", 'message_sizes' size is " << writeRequest.message_sizes_size() << ", 'created_at_ms' size is " << writeRequest.created_at_ms_size() - << " and 'sent_at_ms' size is " << writeRequest.sent_at_ms_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } - if (!AllEqual(writeRequest.blocks_offsets_size(), writeRequest.blocks_part_numbers_size(), writeRequest.blocks_message_counts_size(), writeRequest.blocks_uncompressed_sizes_size(), writeRequest.blocks_headers_size(), writeRequest.blocks_data_size())) { - CloseSession(TStringBuilder() << "blocks repeated fields do no have same size, 'blocks_offsets' size is " << writeRequest.blocks_offsets_size() - << ", 'blocks_part_numbers' size is " << writeRequest.blocks_part_numbers_size() << ", 'blocks_message_counts' size is " << writeRequest.blocks_message_counts_size() - << ", 'blocks_uncompressed_sizes' size is " << writeRequest.blocks_uncompressed_sizes_size() << ", 'blocks_headers' size is " << writeRequest.blocks_headers_size() - << " and 'blocks_data' size is " << writeRequest.blocks_data_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } - - const i32 messageCount = writeRequest.sequence_numbers_size(); - const i32 blockCount = writeRequest.blocks_offsets_size(); - if (messageCount == 0) { - CloseSession(TStringBuilder() << "messages meta repeated fields are empty, write request contains no messages", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } - if (messageCount != blockCount) { - CloseSession(TStringBuilder() << "messages meta repeated fields and blocks repeated fields do not have same size, messages meta fields size is " << messageCount - << " and blocks fields size is " << blockCount << ", only one message per block is supported in blocks format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; - } - auto dataCheck = [&](const StreamingWriteClientMessage::WriteRequest& data, const i32 messageIndex) -> bool { - if (data.sequence_numbers(messageIndex) <= 0) { - CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' items must be greater than 0. Value at position " << messageIndex << " is " << data.sequence_numbers(messageIndex), PersQueue::ErrorCode::BAD_REQUEST, ctx); + const auto& writeRequest = ev->Get()->Request.write_request(); + if (!AllEqual(writeRequest.sequence_numbers_size(), writeRequest.created_at_ms_size(), writeRequest.sent_at_ms_size(), writeRequest.message_sizes_size())) { + CloseSession(TStringBuilder() << "messages meta repeated fields do not have same size, 'sequence_numbers' size is " << writeRequest.sequence_numbers_size() + << ", 'message_sizes' size is " << writeRequest.message_sizes_size() << ", 'created_at_ms' size is " << writeRequest.created_at_ms_size() + << " and 'sent_at_ms' size is " << writeRequest.sent_at_ms_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + if (!AllEqual(writeRequest.blocks_offsets_size(), writeRequest.blocks_part_numbers_size(), writeRequest.blocks_message_counts_size(), writeRequest.blocks_uncompressed_sizes_size(), writeRequest.blocks_headers_size(), writeRequest.blocks_data_size())) { + CloseSession(TStringBuilder() << "blocks repeated fields do no have same size, 'blocks_offsets' size is " << writeRequest.blocks_offsets_size() + << ", 'blocks_part_numbers' size is " << writeRequest.blocks_part_numbers_size() << ", 'blocks_message_counts' size is " << writeRequest.blocks_message_counts_size() + << ", 'blocks_uncompressed_sizes' size is " << writeRequest.blocks_uncompressed_sizes_size() << ", 'blocks_headers' size is " << writeRequest.blocks_headers_size() + << " and 'blocks_data' size is " << writeRequest.blocks_data_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + + const i32 messageCount = writeRequest.sequence_numbers_size(); + const i32 blockCount = writeRequest.blocks_offsets_size(); + if (messageCount == 0) { + CloseSession(TStringBuilder() << "messages meta repeated fields are empty, write request contains no messages", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + if (messageCount != blockCount) { + CloseSession(TStringBuilder() << "messages meta repeated fields and blocks repeated fields do not have same size, messages meta fields size is " << messageCount + << " and blocks fields size is " << blockCount << ", only one message per block is supported in blocks format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + auto dataCheck = [&](const StreamingWriteClientMessage::WriteRequest& data, const i32 messageIndex) -> bool { + if (data.sequence_numbers(messageIndex) <= 0) { + CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' items must be greater than 0. Value at position " << messageIndex << " is " << data.sequence_numbers(messageIndex), PersQueue::ErrorCode::BAD_REQUEST, ctx); return false; } - if (messageIndex > 0 && data.sequence_numbers(messageIndex) <= data.sequence_numbers(messageIndex - 1)) { - CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' are unsorted. Value " << data.sequence_numbers(messageIndex) << " at position " << messageIndex - << " is less than or equal to value " << data.sequence_numbers(messageIndex - 1) << " at position " << (messageIndex - 1), PersQueue::ErrorCode::BAD_REQUEST, ctx); + if (messageIndex > 0 && data.sequence_numbers(messageIndex) <= data.sequence_numbers(messageIndex - 1)) { + CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' are unsorted. Value " << data.sequence_numbers(messageIndex) << " at position " << messageIndex + << " is less than or equal to value " << data.sequence_numbers(messageIndex - 1) << " at position " << (messageIndex - 1), PersQueue::ErrorCode::BAD_REQUEST, ctx); return false; } - - if (data.blocks_headers(messageIndex).size() != CODEC_ID_SIZE) { - CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " has incorrect size " << data.blocks_headers(messageIndex).size() << " [B]. Only headers of size " << CODEC_ID_SIZE << " [B] (with codec identifier) are supported in block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return false; - } - - const char& codecID = data.blocks_headers(messageIndex).front(); - TString error; - if (!ValidateWriteWithCodec(InitialPQTabletConfig, codecID, error)) { - CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx); - return false; - } - - if (data.blocks_message_counts(messageIndex) != 1) { - CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex) - << ", only single message per block is supported by block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return false; + + if (data.blocks_headers(messageIndex).size() != CODEC_ID_SIZE) { + CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " has incorrect size " << data.blocks_headers(messageIndex).size() << " [B]. Only headers of size " << CODEC_ID_SIZE << " [B] (with codec identifier) are supported in block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return false; + } + + const char& codecID = data.blocks_headers(messageIndex).front(); + TString error; + if (!ValidateWriteWithCodec(InitialPQTabletConfig, codecID, error)) { + CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx); + return false; } + + if (data.blocks_message_counts(messageIndex) != 1) { + CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex) + << ", only single message per block is supported by block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return false; + } return true; }; - for (i32 messageIndex = 0; messageIndex != messageCount; ++messageIndex) { - if (!dataCheck(writeRequest, messageIndex)) { + for (i32 messageIndex = 0; messageIndex != messageCount; ++messageIndex) { + if (!dataCheck(writeRequest, messageIndex)) { return; } } @@ -1154,7 +1154,7 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) { void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) { Y_VERIFY(State == ES_INITED); ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); - if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) { + if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) { RequestNotChecked = false; InitCheckSchema(ctx); } diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp index ccacc4d43a..8572a6c02f 100644 --- a/ydb/services/persqueue_v1/persqueue.cpp +++ b/ydb/services/persqueue_v1/persqueue.cpp @@ -71,9 +71,9 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); { - using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage; + using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage; - using TBiResponse = Ydb::PersQueue::V1::StreamingWriteServerMessage; + using TBiResponse = Ydb::PersQueue::V1::StreamingWriteServerMessage; using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest< TBiRequest, @@ -82,7 +82,7 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NKikimrServices::GRPC_SERVER>; - TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite, + TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite, [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context)); }, @@ -114,7 +114,7 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #error ADD_REQUEST macro already defined #endif #define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \ + MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \ [this](NGrpc::IRequestContextBase *ctx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ ACTION; \ diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 3bc9b21728..11f7ade7cc 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1,10 +1,10 @@ -#include "grpc_pq_actor.h" +#include "grpc_pq_actor.h" #include <ydb/services/persqueue_v1/ut/pq_data_writer.h> #include <ydb/services/persqueue_v1/ut/api_test_setup.h> #include <ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h> #include <ydb/services/persqueue_v1/ut/test_utils.h> #include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h> - + #include <ydb/core/base/appdata.h> #include <ydb/core/testlib/test_pq_client.h> #include <ydb/core/protos/grpc_pq_old.pb.h> @@ -43,7 +43,7 @@ namespace NKikimr::NPersQueueTests { using namespace Tests; using namespace NKikimrClient; using namespace Ydb::PersQueue; -using namespace Ydb::PersQueue::V1; +using namespace Ydb::PersQueue::V1; using namespace NThreading; using namespace NNetClassifier; @@ -99,22 +99,22 @@ namespace { Y_UNIT_TEST_SUITE(TPersQueueTest) { - Y_UNIT_TEST(AllEqual) { - using NGRpcProxy::V1::AllEqual; - - UNIT_ASSERT(AllEqual(0)); - UNIT_ASSERT(AllEqual(0, 0)); - UNIT_ASSERT(AllEqual(0, 0, 0)); - UNIT_ASSERT(AllEqual(1, 1, 1)); - UNIT_ASSERT(AllEqual(1, 1, 1, 1, 1, 1)); - UNIT_ASSERT(!AllEqual(1, 0)); - UNIT_ASSERT(!AllEqual(0, 1)); - UNIT_ASSERT(!AllEqual(0, 1, 0)); - UNIT_ASSERT(!AllEqual(1, 1, 0)); - UNIT_ASSERT(!AllEqual(0, 1, 1)); - UNIT_ASSERT(!AllEqual(1, 1, 1, 1, 1, 0)); - } - + Y_UNIT_TEST(AllEqual) { + using NGRpcProxy::V1::AllEqual; + + UNIT_ASSERT(AllEqual(0)); + UNIT_ASSERT(AllEqual(0, 0)); + UNIT_ASSERT(AllEqual(0, 0, 0)); + UNIT_ASSERT(AllEqual(1, 1, 1)); + UNIT_ASSERT(AllEqual(1, 1, 1, 1, 1, 1)); + UNIT_ASSERT(!AllEqual(1, 0)); + UNIT_ASSERT(!AllEqual(0, 1)); + UNIT_ASSERT(!AllEqual(0, 1, 0)); + UNIT_ASSERT(!AllEqual(1, 1, 0)); + UNIT_ASSERT(!AllEqual(0, 1, 1)); + UNIT_ASSERT(!AllEqual(1, 1, 1, 1, 1, 0)); + } + Y_UNIT_TEST(SetupLockSession2) { TPersQueueV1TestServer server; SET_LOCALS; @@ -428,27 +428,27 @@ namespace { UNIT_ASSERT(!writer2->Close()); } - Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) { + Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) { NPersQueue::TTestServer server; server.AnnoyingClient->CreateTopic("rt3.dc1--topic", 1); - + auto driver = server.AnnoyingClient->GetDriver(); auto writer = CreateSimpleWriter(*driver, "topic", "test source ID"); bool res = true; - ui32 messageCount = 1000; - - for (ui32 sequenceNumber = 1; sequenceNumber <= messageCount; ++sequenceNumber) { + ui32 messageCount = 1000; + + for (ui32 sequenceNumber = 1; sequenceNumber <= messageCount; ++sequenceNumber) { res = writer->Write("x", sequenceNumber); UNIT_ASSERT(res); - } + } UNIT_ASSERT(writer->IsAlive()); res = writer->Close(TDuration::Seconds(10)); UNIT_ASSERT(res); - } - + } + Y_UNIT_TEST(SetupWriteSessionOnDisabledCluster) { TPersQueueV1TestServer server; SET_LOCALS; @@ -1079,7 +1079,7 @@ namespace { NACLib::TDiffACL acl; acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "topic1@" BUILTIN_ACL_DOMAIN); server.AnnoyingClient->ModifyACL("/Root/PQ", DEFAULT_TOPIC_NAME, acl.SerializeAsString()); - WaitACLModification(); + WaitACLModification(); writer2.Write(SHORT_TOPIC_NAME, {"valuevaluevalue1"}, false, "topic1@" BUILTIN_ACL_DOMAIN); writer2.Write(SHORT_TOPIC_NAME, {"valuevaluevalue1"}, true, "invalid_ticket"); @@ -1092,7 +1092,7 @@ namespace { dynamic_cast<TTestCredentialsProviderFactory*>(creds.get())->SetToken(NYdb::TStringType("topic1@" BUILTIN_ACL_DOMAIN)); auto writer = CreateWriter(*driver, SHORT_TOPIC_NAME, "123", {}, {}, {}, creds); - + auto msg = writer->GetEvent(true); UNIT_ASSERT(msg); // ReadyToAcceptEvent @@ -1166,7 +1166,7 @@ namespace { acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@" BUILTIN_ACL_DOMAIN); acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user2@" BUILTIN_ACL_DOMAIN); server.AnnoyingClient->ModifyACL("/Root/PQ", topic2, acl.SerializeAsString()); - WaitACLModification(); + WaitACLModification(); auto ticket1 = "1@" BUILTIN_ACL_DOMAIN; auto ticket2 = "2@" BUILTIN_ACL_DOMAIN; @@ -1186,7 +1186,7 @@ namespace { acl.Clear(); acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user3@" BUILTIN_ACL_DOMAIN); server.AnnoyingClient->ModifyACL("/Root/PQ", topic2, acl.SerializeAsString()); - WaitACLModification(); + WaitACLModification(); Cerr << "==== Writer - read\n"; writer.Read(shortTopic2Name, "user1", "user3@" BUILTIN_ACL_DOMAIN, false, true, true); @@ -1359,7 +1359,7 @@ namespace { if (i == 5) { UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3)); - // TODO: Describe this assert in comment + // TODO: Describe this assert in comment UNIT_ASSERT(!ack->Acks.empty()); UNIT_ASSERT(ack->Acks.back().Stat); UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue >= ack->Acks.back().Stat->PartitionQuotedTime); @@ -1747,7 +1747,7 @@ namespace { for (ui32 i = 1; i <= 11; ++i) { auto f = producer->Write(i, TString(10, 'a')); f.Wait(); - UNIT_ASSERT_C(f.GetValue().Response.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, f.GetValue().Response); + UNIT_ASSERT_C(f.GetValue().Response.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, f.GetValue().Response); } } @@ -2129,227 +2129,227 @@ namespace { UNIT_ASSERT_EQUAL(ccResult.Response.Getissues(0).issue_code(), Ydb::PersQueue::ErrorCode::UNKNOWN_TOPIC); } */ - Y_UNIT_TEST(Codecs_InitWriteSession_DefaultTopicSupportedCodecsInInitResponse) { - APITestSetup setup{TEST_CASE_NAME}; - grpc::ClientContext context; - auto session = setup.GetPersQueueService()->StreamingWrite(&context); - - - auto serverMessage = setup.InitSession(session); - - + Y_UNIT_TEST(Codecs_InitWriteSession_DefaultTopicSupportedCodecsInInitResponse) { + APITestSetup setup{TEST_CASE_NAME}; + grpc::ClientContext context; + auto session = setup.GetPersQueueService()->StreamingWrite(&context); + + + auto serverMessage = setup.InitSession(session); + + auto defaultSupportedCodecs = TVector<Ydb::PersQueue::V1::Codec>{ Ydb::PersQueue::V1::CODEC_RAW, Ydb::PersQueue::V1::CODEC_GZIP, Ydb::PersQueue::V1::CODEC_LZOP }; - auto topicSupportedCodecs = serverMessage.init_response().supported_codecs(); + auto topicSupportedCodecs = serverMessage.init_response().supported_codecs(); UNIT_ASSERT_VALUES_EQUAL_C(defaultSupportedCodecs.size(), topicSupportedCodecs.size(), serverMessage.init_response()); UNIT_ASSERT_C(Equal(defaultSupportedCodecs.begin(), defaultSupportedCodecs.end(), topicSupportedCodecs.begin()), serverMessage.init_response()); - } - - Y_UNIT_TEST(Codecs_WriteMessageWithDefaultCodecs_MessagesAreAcknowledged) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - StreamingWriteClientMessage clientMessage; - auto* writeRequest = clientMessage.mutable_write_request(); - auto sequenceNumber = 1; - auto messageCount = 0; - const auto message = NUnitTest::RandomString(250 * 1024, std::rand()); - auto compress = [](TString data, i32 codecID) { - Y_UNUSED(codecID); - return TString(data); - }; - TVector<char> defaultCodecs{0, 1, 2}; - for (const auto& codecID : defaultCodecs) { - auto compressedMessage = compress(message, codecID); - - writeRequest->add_sequence_numbers(sequenceNumber++); - writeRequest->add_message_sizes(message.size()); - writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds()); - writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds()); - writeRequest->add_blocks_offsets(0); - writeRequest->add_blocks_part_numbers(0); - writeRequest->add_blocks_message_counts(1); - writeRequest->add_blocks_uncompressed_sizes(message.size()); - writeRequest->add_blocks_headers(TString(1, codecID)); - writeRequest->add_blocks_data(compressedMessage); - ++messageCount; - } - auto session = setup.InitWriteSession(); - - - AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage); - - - StreamingWriteServerMessage serverMessage; - log << TLOG_INFO << "Wait for write acknowledgement"; - AssertSuccessfullStreamingOperation(session.first->Read(&serverMessage), session.first); - UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, serverMessage); - UNIT_ASSERT_VALUES_EQUAL_C(defaultCodecs.size(), serverMessage.batch_write_response().offsets_size(), serverMessage); - } - - Y_UNIT_TEST(Codecs_WriteMessageWithNonDefaultCodecThatHasToBeConfiguredAdditionally_SessionClosedWithBadRequestError) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - StreamingWriteClientMessage clientMessage; - auto* writeRequest = clientMessage.mutable_write_request(); - const auto message = NUnitTest::RandomString(250 * 1024, std::rand()); - const auto codecID = 3; - writeRequest->add_sequence_numbers(1); - writeRequest->add_message_sizes(message.size()); - writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds()); - writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds()); - writeRequest->add_blocks_offsets(0); - writeRequest->add_blocks_part_numbers(0); - writeRequest->add_blocks_message_counts(1); - writeRequest->add_blocks_uncompressed_sizes(message.size()); - writeRequest->add_blocks_headers(TString(1, codecID)); - writeRequest->add_blocks_data(message); - auto session = setup.InitWriteSession(); - - - AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage); - - log << TLOG_INFO << "Wait for session to die"; - AssertStreamingSessionDead(session.first, Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST); - } - - StreamingWriteClientMessage::InitRequest GenerateSessionSetupWithPreferredCluster(const TString preferredCluster) { - StreamingWriteClientMessage::InitRequest sessionSetup; - sessionSetup.set_preferred_cluster(preferredCluster); - sessionSetup.set_message_group_id("test-message-group-id-" + preferredCluster); - return sessionSetup; - }; - - Y_UNIT_TEST(PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); + } + + Y_UNIT_TEST(Codecs_WriteMessageWithDefaultCodecs_MessagesAreAcknowledged) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + StreamingWriteClientMessage clientMessage; + auto* writeRequest = clientMessage.mutable_write_request(); + auto sequenceNumber = 1; + auto messageCount = 0; + const auto message = NUnitTest::RandomString(250 * 1024, std::rand()); + auto compress = [](TString data, i32 codecID) { + Y_UNUSED(codecID); + return TString(data); + }; + TVector<char> defaultCodecs{0, 1, 2}; + for (const auto& codecID : defaultCodecs) { + auto compressedMessage = compress(message, codecID); + + writeRequest->add_sequence_numbers(sequenceNumber++); + writeRequest->add_message_sizes(message.size()); + writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds()); + writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds()); + writeRequest->add_blocks_offsets(0); + writeRequest->add_blocks_part_numbers(0); + writeRequest->add_blocks_message_counts(1); + writeRequest->add_blocks_uncompressed_sizes(message.size()); + writeRequest->add_blocks_headers(TString(1, codecID)); + writeRequest->add_blocks_data(compressedMessage); + ++messageCount; + } + auto session = setup.InitWriteSession(); + + + AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage); + + + StreamingWriteServerMessage serverMessage; + log << TLOG_INFO << "Wait for write acknowledgement"; + AssertSuccessfullStreamingOperation(session.first->Read(&serverMessage), session.first); + UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, serverMessage); + UNIT_ASSERT_VALUES_EQUAL_C(defaultCodecs.size(), serverMessage.batch_write_response().offsets_size(), serverMessage); + } + + Y_UNIT_TEST(Codecs_WriteMessageWithNonDefaultCodecThatHasToBeConfiguredAdditionally_SessionClosedWithBadRequestError) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + StreamingWriteClientMessage clientMessage; + auto* writeRequest = clientMessage.mutable_write_request(); + const auto message = NUnitTest::RandomString(250 * 1024, std::rand()); + const auto codecID = 3; + writeRequest->add_sequence_numbers(1); + writeRequest->add_message_sizes(message.size()); + writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds()); + writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds()); + writeRequest->add_blocks_offsets(0); + writeRequest->add_blocks_part_numbers(0); + writeRequest->add_blocks_message_counts(1); + writeRequest->add_blocks_uncompressed_sizes(message.size()); + writeRequest->add_blocks_headers(TString(1, codecID)); + writeRequest->add_blocks_data(message); + auto session = setup.InitWriteSession(); + + + AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage); + + log << TLOG_INFO << "Wait for session to die"; + AssertStreamingSessionDead(session.first, Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST); + } + + StreamingWriteClientMessage::InitRequest GenerateSessionSetupWithPreferredCluster(const TString preferredCluster) { + StreamingWriteClientMessage::InitRequest sessionSetup; + sessionSetup.set_preferred_cluster(preferredCluster); + sessionSetup.set_message_group_id("test-message-group-id-" + preferredCluster); + return sessionSetup; + }; + + Y_UNIT_TEST(PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0); - auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString())); - auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster())); - auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); - grpc::ClientContext context; - auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context); - - log << TLOG_INFO << "Wait for session with remote preferred cluster to die"; - AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); - AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); - AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); - - setup.InitSession(sessionWithNoInitialization); - AssertStreamingSessionAlive(sessionWithNoInitialization); - } - - Y_UNIT_TEST(PreferredCluster_DisabledRemoteClusterAndWriteSessionsWithDifferentPreferredClusterAndLaterRemoteClusterEnabled_SessionWithMismatchedClusterDiesAfterPreferredClusterEnabledAndOtherSessionsAlive) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - log << TLOG_INFO << "Disable remote cluster " << setup.GetRemoteCluster().Quote(); - setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false); - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); - setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0); - auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString())); - auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster())); - auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); - AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); - AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); - AssertStreamingSessionAlive(sessionWithRemotePrefferedCluster.first); - - log << TLOG_INFO << "Enable remote cluster " << setup.GetRemoteCluster().Quote(); - setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true); - - log << TLOG_INFO << "Wait for session with remote preferred cluster to die"; - AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); - AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); - AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); - } - - Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndCloseClientSessionWithEnabledRemotePreferredClusterDelaySec_SessionDiesOnlyAfterDelay) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); + auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString())); + auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster())); + auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + grpc::ClientContext context; + auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context); + + log << TLOG_INFO << "Wait for session with remote preferred cluster to die"; + AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); + AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); + + setup.InitSession(sessionWithNoInitialization); + AssertStreamingSessionAlive(sessionWithNoInitialization); + } + + Y_UNIT_TEST(PreferredCluster_DisabledRemoteClusterAndWriteSessionsWithDifferentPreferredClusterAndLaterRemoteClusterEnabled_SessionWithMismatchedClusterDiesAfterPreferredClusterEnabledAndOtherSessionsAlive) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + log << TLOG_INFO << "Disable remote cluster " << setup.GetRemoteCluster().Quote(); + setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false); + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); + setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0); + auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString())); + auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster())); + auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); + AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); + AssertStreamingSessionAlive(sessionWithRemotePrefferedCluster.first); + + log << TLOG_INFO << "Enable remote cluster " << setup.GetRemoteCluster().Quote(); + setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true); + + log << TLOG_INFO << "Wait for session with remote preferred cluster to die"; + AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); + AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); + } + + Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndCloseClientSessionWithEnabledRemotePreferredClusterDelaySec_SessionDiesOnlyAfterDelay) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0); - const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); - + const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); + setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); log << TLOG_INFO << "Wait for cluster tracker event"; auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); - - - auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); - - AssertStreamingSessionAlive(session.first); - - AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); - } - - Y_UNIT_TEST(PreferredCluster_NonExistentPreferredCluster_SessionDiesOnlyAfterDelay) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); + + + auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + + AssertStreamingSessionAlive(session.first); + + AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + } + + Y_UNIT_TEST(PreferredCluster_NonExistentPreferredCluster_SessionDiesOnlyAfterDelay) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2); - + TInstant now(TInstant::Now()); - auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster("non-existent-cluster")); - AssertStreamingSessionAlive(session.first); - - AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster("non-existent-cluster")); + AssertStreamingSessionAlive(session.first); + + AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999)); - } - - Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndRemoteClusterEnabledDelaySec_SessionDiesOnlyAfterDelay) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + } + + Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndRemoteClusterEnabledDelaySec_SessionDiesOnlyAfterDelay) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2); - const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); - + const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); - + TInstant now(TInstant::Now()); - auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); - - setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); - log << TLOG_INFO << "Wait for cluster tracker event"; - auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); - AssertStreamingSessionAlive(session.first); - + auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + + setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); + log << TLOG_INFO << "Wait for cluster tracker event"; + auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); + AssertStreamingSessionAlive(session.first); + AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); - + UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999)); - } - - Y_UNIT_TEST(PreferredCluster_RemotePreferredClusterEnabledWhileSessionInitializing_SessionDiesOnlyAfterInitializationAndDelay) { - APITestSetup setup{TEST_CASE_NAME}; - auto log = setup.GetLog(); - setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); + } + + Y_UNIT_TEST(PreferredCluster_RemotePreferredClusterEnabledWhileSessionInitializing_SessionDiesOnlyAfterInitializationAndDelay) { + APITestSetup setup{TEST_CASE_NAME}; + auto log = setup.GetLog(); + setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); + setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2); - const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); - setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false); - - grpc::ClientContext context; - auto session = setup.GetPersQueueService()->StreamingWrite(&context); - - setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true); - - setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); - log << TLOG_INFO << "Wait for cluster tracker event"; - auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); + const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); + setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false); + + grpc::ClientContext context; + auto session = setup.GetPersQueueService()->StreamingWrite(&context); + + setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true); + + setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); + log << TLOG_INFO << "Wait for cluster tracker event"; + auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); TInstant now(TInstant::Now()); - setup.InitSession(session, GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + setup.InitSession(session, GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); - AssertStreamingSessionAlive(session); - - log << TLOG_INFO << "Set small delay and wait for initialized session with remote preferred cluster to die"; - AssertStreamingSessionDead(session, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + AssertStreamingSessionAlive(session); + + log << TLOG_INFO << "Set small delay and wait for initialized session with remote preferred cluster to die"; + AssertStreamingSessionDead(session, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999)); - } + } Y_UNIT_TEST(SchemeOperationsTest) { NPersQueue::TTestServer server; diff --git a/ydb/services/persqueue_v1/ut/api_test_setup.h b/ydb/services/persqueue_v1/ut/api_test_setup.h index 8d5d82b33d..4b8963bc3e 100644 --- a/ydb/services/persqueue_v1/ut/api_test_setup.h +++ b/ydb/services/persqueue_v1/ut/api_test_setup.h @@ -1,120 +1,120 @@ -#pragma once -#include "pq_data_writer.h" -#include "test_utils.h" - +#pragma once +#include "pq_data_writer.h" +#include "test_utils.h" + #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> #include <ydb/core/testlib/test_pq_client.h> - + #include <google/protobuf/message.h> #include <library/cpp/logger/log.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> -#include <util/string/builder.h> - -class APITestSetup { -private: - using TStreamingWriteClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage; - using TStreamingWriteServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage; - TLog Log; +#include <util/string/builder.h> + +class APITestSetup { +private: + using TStreamingWriteClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage; + using TStreamingWriteServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage; + TLog Log; NPersQueue::TTestServer server; - std::shared_ptr<::grpc::Channel> channel; - std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub> service; -public: - APITestSetup(const TString& testCaseName) - : Log("cerr") + std::shared_ptr<::grpc::Channel> channel; + std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub> service; +public: + APITestSetup(const TString& testCaseName) + : Log("cerr") , server(NPersQueue::TTestServer()) , channel(grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials())) - , service(Ydb::PersQueue::V1::PersQueueService::NewStub(channel)) - { - Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) { - return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; - }); + , service(Ydb::PersQueue::V1::PersQueueService::NewStub(channel)) + { + Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) { + return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; + }); server.CleverServer->GetRuntime()->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); server.CleverServer->GetRuntime()->SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); server.AnnoyingClient->CreateTopic("rt3.dc1--" + TString(GetTestTopic()), 1); NKikimr::NPersQueueTests::TPQDataWriter writer(GetTestMessageGroupId(), server); - - auto seed = TInstant::Now().MicroSeconds(); - // This makes failing randomized tests (for example with NUnitTest::RandomString(size, std::rand()) calls) reproducable - Log << TLOG_INFO << "Random seed for debugging is " << seed; - std::srand(seed); - } - - TLog& GetLog() { - return Log; - } - - TString GetTestTopic() { - return "topic1"; - } - - TString GetTestMessageGroupId() { - return "test-message-group-id"; - } - - TString GetLocalCluster() { - return "dc1"; - } - - TString GetRemoteCluster() { - return "dc2"; - } - - TDuration GetLongDelay() { - return TDuration::Max() - TDuration::Seconds(1) /* So delay does not get mixed up with infinity */; - } - - NKikimr::Tests::TServer& GetServer() { + + auto seed = TInstant::Now().MicroSeconds(); + // This makes failing randomized tests (for example with NUnitTest::RandomString(size, std::rand()) calls) reproducable + Log << TLOG_INFO << "Random seed for debugging is " << seed; + std::srand(seed); + } + + TLog& GetLog() { + return Log; + } + + TString GetTestTopic() { + return "topic1"; + } + + TString GetTestMessageGroupId() { + return "test-message-group-id"; + } + + TString GetLocalCluster() { + return "dc1"; + } + + TString GetRemoteCluster() { + return "dc2"; + } + + TDuration GetLongDelay() { + return TDuration::Max() - TDuration::Seconds(1) /* So delay does not get mixed up with infinity */; + } + + NKikimr::Tests::TServer& GetServer() { return *server.CleverServer; - } - - TString GetPQRoot() { - return "/Root/PQ"; - } - - NKikimrPQ::TPQConfig& GetPQConfig() { + } + + TString GetPQRoot() { + return "/Root/PQ"; + } + + NKikimrPQ::TPQConfig& GetPQConfig() { return server.CleverServer->GetRuntime()->GetAppData().PQConfig; - } - - NKikimr::NPersQueueTests::TFlatMsgBusPQClient& GetFlatMsgBusPQClient() { + } + + NKikimr::NPersQueueTests::TFlatMsgBusPQClient& GetFlatMsgBusPQClient() { return *server.AnnoyingClient; - } - - std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub>& GetPersQueueService() { - return service; - } - - std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession( - const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest()) - { - auto context = std::make_unique<grpc::ClientContext>(); - auto stream = service->StreamingWrite(context.get()); - InitSession(stream, setup); - return std::make_pair(std::move(stream), std::move(context)); - } - - // Initializes session with default (possible overwriten by setup parameter) values and returns initialization response - template<typename TClientMessage, typename TServerMessage> - TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, - const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest()) - { - TClientMessage clientMessage; - // TODO: Replace with MergeFrom? - clientMessage.mutable_init_request()->CopyFrom(setup); - if (setup.topic().empty()) { - clientMessage.mutable_init_request()->set_topic(GetTestTopic()); - } - if (setup.message_group_id().empty()) { - clientMessage.mutable_init_request()->set_message_group_id(GetTestMessageGroupId()); - } - AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage); - - TServerMessage serverMessage; - Log << TLOG_INFO << "Wait for \"init_response\""; - AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); + } + + std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub>& GetPersQueueService() { + return service; + } + + std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession( + const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest()) + { + auto context = std::make_unique<grpc::ClientContext>(); + auto stream = service->StreamingWrite(context.get()); + InitSession(stream, setup); + return std::make_pair(std::move(stream), std::move(context)); + } + + // Initializes session with default (possible overwriten by setup parameter) values and returns initialization response + template<typename TClientMessage, typename TServerMessage> + TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, + const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest()) + { + TClientMessage clientMessage; + // TODO: Replace with MergeFrom? + clientMessage.mutable_init_request()->CopyFrom(setup); + if (setup.topic().empty()) { + clientMessage.mutable_init_request()->set_topic(GetTestTopic()); + } + if (setup.message_group_id().empty()) { + clientMessage.mutable_init_request()->set_message_group_id(GetTestMessageGroupId()); + } + AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage); + + TServerMessage serverMessage; + Log << TLOG_INFO << "Wait for \"init_response\""; + AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); Cerr << "Init response: " << serverMessage.ShortDebugString() << Endl; - UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage); - Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote(); - return serverMessage; - } -}; + UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage); + Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote(); + return serverMessage; + } +}; diff --git a/ydb/services/persqueue_v1/ut/pq_data_writer.h b/ydb/services/persqueue_v1/ut/pq_data_writer.h index caef894060..71f271fb14 100644 --- a/ydb/services/persqueue_v1/ut/pq_data_writer.h +++ b/ydb/services/persqueue_v1/ut/pq_data_writer.h @@ -1,5 +1,5 @@ #pragma once -#include "test_utils.h" +#include "test_utils.h" #include <ydb/core/testlib/test_pq_client.h> #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> #include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h> @@ -178,7 +178,7 @@ public: continue; } - AssertSuccessfullStreamingOperation(stream->Read(&resp), stream); + AssertSuccessfullStreamingOperation(stream->Read(&resp), stream); if (resp.status() == Ydb::StatusIds::UNAVAILABLE) { continue; } @@ -223,16 +223,16 @@ public: return 0; } - ui32 Write(const TString& topic, const TVector<TString>& data, bool error = false, const TMaybe<TString>& ticket = {}) { + ui32 Write(const TString& topic, const TVector<TString>& data, bool error = false, const TMaybe<TString>& ticket = {}) { return WriteImpl(topic, {data}, error, ticket); } private: - ui32 WriteImpl(const TString& topic, const TVector<TString>& data, bool error, const TMaybe<TString>& ticket) { + ui32 WriteImpl(const TString& topic, const TVector<TString>& data, bool error, const TMaybe<TString>& ticket) { grpc::ClientContext context; - if (ticket) - context.AddMetadata("x-ydb-auth-ticket", *ticket); + if (ticket) + context.AddMetadata("x-ydb-auth-ticket", *ticket); auto stream = StubP_->StreamingWrite(&context); @@ -300,20 +300,20 @@ private: } template <typename S> - void Flush(const TVector<TString>& data, S& stream, const TMaybe<TString>& ticket) { + void Flush(const TVector<TString>& data, S& stream, const TMaybe<TString>& ticket) { Ydb::PersQueue::V1::StreamingWriteClientMessage request; Ydb::PersQueue::V1::StreamingWriteServerMessage response; - if (ticket) { - request.mutable_update_token_request()->set_token(*ticket); - Cerr << "update user token request: " << request << Endl; - if (!stream->Write(request)) { - ythrow yexception() << "write fail"; - } - UNIT_ASSERT(stream->Read(&response)); - UNIT_ASSERT_C(response.server_message_case() == Ydb::PersQueue::V1::StreamingWriteServerMessage::kUpdateTokenResponse, response); - } - + if (ticket) { + request.mutable_update_token_request()->set_token(*ticket); + Cerr << "update user token request: " << request << Endl; + if (!stream->Write(request)) { + ythrow yexception() << "write fail"; + } + UNIT_ASSERT(stream->Read(&response)); + UNIT_ASSERT_C(response.server_message_case() == Ydb::PersQueue::V1::StreamingWriteServerMessage::kUpdateTokenResponse, response); + } + TVector<ui64> allSeqNo; auto* mutableData = request.mutable_write_request(); ui32 offset = 0; @@ -329,8 +329,8 @@ private: mutableData->add_blocks_part_numbers(0); mutableData->add_blocks_message_counts(1); mutableData->add_blocks_uncompressed_sizes(d.size()); - mutableData->add_blocks_headers(TString(1, '\0') /* RAW codec ID */); - mutableData->add_blocks_data(d); + mutableData->add_blocks_headers(TString(1, '\0') /* RAW codec ID */); + mutableData->add_blocks_data(d); } Cerr << "request: " << request << Endl; diff --git a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp index 779d60e778..f3f78567ef 100644 --- a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp +++ b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp @@ -1,14 +1,14 @@ -#include "rate_limiter_test_setup.h" +#include "rate_limiter_test_setup.h" #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <library/cpp/logger/priority.h> - -using namespace NPersQueue; -using namespace Ydb::PersQueue; -using namespace NKikimr::Tests; - - -namespace NKikimr::NPersQueueTests { - + +using namespace NPersQueue; +using namespace Ydb::PersQueue; +using namespace NKikimr::Tests; + + +namespace NKikimr::NPersQueueTests { + TRateLimiterTestSetup::TRateLimiterTestSetup( NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity limitedEntity, double writeAccountQuota, @@ -19,28 +19,28 @@ TRateLimiterTestSetup::TRateLimiterTestSetup( , LimitedEntity(limitedEntity) , WriteAccountQuota(writeAccountQuota) , ReadAccountQuota(readAccountQuota) -{ +{ Start(enableReadQuoting); -} - +} + void TRateLimiterTestSetup::CreateTopic(const TString& path) { - const TString name = BuildFullTopicName(path, "dc1"); - const TString account = GetAccount(name); - - Cerr << "Creating topic \"" << name << "\"" << Endl; + const TString name = BuildFullTopicName(path, "dc1"); + const TString account = GetAccount(name); + + Cerr << "Creating topic \"" << name << "\"" << Endl; Server->AnnoyingClient->CreateTopic(name, 1); - + CreateKesus(account); CreateQuotaResources(path, "write-quota", false); CreateQuotaResources(path, "read-quota", true); } - + void TRateLimiterTestSetup::CreateConsumer(const TString& path) { const TString account = GetAccount(path); - + Cerr << "Creating consumer \"" << path << "\"" << Endl; Server->AnnoyingClient->CreateConsumer(path); - + CreateKesus(account); CreateQuotaResources(path, "write-quota", true); CreateQuotaResources(path, "read-quota", false); @@ -68,10 +68,10 @@ void TRateLimiterTestSetup::CreateKesus(const TString& account) { } void TRateLimiterTestSetup::CreateQuotaResources(const TString& path, const TString& quotaPrefix, bool excludeLastComponent) { - TVector<TString> pathComponents = SplitPath(path); + TVector<TString> pathComponents = SplitPath(path); if (pathComponents.size() <= 1) { return; - } + } TStringBuilder prefixPath; prefixPath << quotaPrefix; @@ -91,59 +91,59 @@ void TRateLimiterTestSetup::CreateQuotaResources(const TString& path, const TStr "Status: " << Ydb::StatusIds::StatusCode_Name(statusCode) ); } -} - +} + /* THolder<Ydb::PersQueue::IProducer> TRateLimiterTestSetup::StartProducer(const TString& topicPath, bool compress) { Ydb::PersQueue::TProducerSettings producerSettings; producerSettings.Server = Ydb::PersQueue::TServerSetting("localhost", Server->GrpcPort); - producerSettings.Topic = topicPath; + producerSettings.Topic = topicPath; producerSettings.SourceId = "TRateLimiterTestSetupSourceId"; producerSettings.Codec = compress ? "gzip" : "raw"; THolder<Ydb::PersQueue::IProducer> producer = PQLib->CreateProducer(producerSettings); - auto startResult = producer->Start(); - UNIT_ASSERT_EQUAL_C(Ydb::StatusIds::SUCCESS, startResult.GetValueSync().Response.status(), "Response: " << startResult.GetValueSync().Response); - return producer; -} + auto startResult = producer->Start(); + UNIT_ASSERT_EQUAL_C(Ydb::StatusIds::SUCCESS, startResult.GetValueSync().Response.status(), "Response: " << startResult.GetValueSync().Response); + return producer; +} */ - + void TRateLimiterTestSetup::Start(bool enableReadQuoting) { InitServer(enableReadQuoting); - InitQuoting(); + InitQuoting(); WaitWritePQServiceInitialization(); -} - +} + void TRateLimiterTestSetup::InitServer(bool enableReadQuoting) { auto& settings = Server->ServerSettings; - + settings.PQConfig.MutableQuotingConfig()->SetEnableQuoting(true); settings.PQConfig.MutableQuotingConfig()->SetEnableReadQuoting(enableReadQuoting); settings.PQConfig.MutableQuotingConfig()->SetTopicWriteQuotaEntityToLimit(LimitedEntity); - + Server->GrpcServerOptions.SetMaxMessageSize(130 * 1024 * 1024); Server->StartServer(); Server->EnableLogs( - { - NKikimrServices::PQ_READ_PROXY, - NKikimrServices::PQ_WRITE_PROXY, - NKikimrServices::PERSQUEUE, - NKikimrServices::QUOTER_SERVICE, - NKikimrServices::QUOTER_PROXY, - NKikimrServices::KESUS_TABLET, + { + NKikimrServices::PQ_READ_PROXY, + NKikimrServices::PQ_WRITE_PROXY, + NKikimrServices::PERSQUEUE, + NKikimrServices::QUOTER_SERVICE, + NKikimrServices::QUOTER_PROXY, + NKikimrServices::KESUS_TABLET, NKikimrServices::PQ_READ_SPEED_LIMITER }, NActors::NLog::PRI_TRACE - ); -} - -void TRateLimiterTestSetup::InitQuoting() { + ); +} + +void TRateLimiterTestSetup::InitQuoting() { Server->AnnoyingClient->MkDir("/Root", "PersQueue"); Server->AnnoyingClient->MkDir("/Root/PersQueue", "System"); Server->AnnoyingClient->MkDir("/Root/PersQueue/System", "Quoters"); -} - +} + void TRateLimiterTestSetup::WaitWritePQServiceInitialization() { PQDataWriter = MakeHolder<TPQDataWriter>("writer_source_id", *Server); -} +} } diff --git a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h index 01529818e1..1196574139 100644 --- a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h +++ b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h @@ -1,59 +1,59 @@ -#pragma once +#pragma once #include "pq_data_writer.h" #include <ydb/core/testlib/test_client.h> #include <ydb/core/testlib/test_pq_client.h> - -namespace NKikimr::NPersQueueTests { - -class TRateLimiterTestSetup { -public: + +namespace NKikimr::NPersQueueTests { + +class TRateLimiterTestSetup { +public: explicit TRateLimiterTestSetup( NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity limitedEntity, double writeAccountQuota = 1000.0, double readAccountQuota = 1000.0, bool enableReadQuoting = false ); - + void CreateTopic(const TString& path); void CreateConsumer(const TString& path); - - // namespace NPersQueue = Ydb::PersQueue; + + // namespace NPersQueue = Ydb::PersQueue; // THolder<Ydb::PersQueue::IProducer> StartProducer(const TString& topicPath, bool compress = false); - - // Getters - ui16 GetGrpcPort() const { + + // Getters + ui16 GetGrpcPort() const { return Server->GrpcPort; - } - - ui16 GetMsgBusPort() const { + } + + ui16 GetMsgBusPort() const { return Server->Port; - } - - NKikimr::Tests::TServer& GetServer() { + } + + NKikimr::Tests::TServer& GetServer() { return *Server->CleverServer; - } - - NPersQueueTests::TFlatMsgBusPQClient& GetClient() { + } + + NPersQueueTests::TFlatMsgBusPQClient& GetClient() { return *Server->AnnoyingClient; - } - -private: + } + +private: void CreateKesus(const TString& account); void CreateQuotaResources(const TString& path, const TString& quotaPrefix, bool excludeLastComponent); - + void Start(bool enableReadQuoting); void InitServer(bool enableReadQuoting); - void InitQuoting(); + void InitQuoting(); void WaitWritePQServiceInitialization(); - -private: + +private: THolder<NPersQueue::TTestServer> Server; THolder<TPQDataWriter> PQDataWriter; // For waiting for grpc writer service initialization. const NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity LimitedEntity; double WriteAccountQuota; double ReadAccountQuota; const TString QuotersRootPath = "/Root/PersQueue/System/Quoters"; -}; -} +}; +} diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h index c02308b0d7..50ee04ff94 100644 --- a/ydb/services/persqueue_v1/ut/test_utils.h +++ b/ydb/services/persqueue_v1/ut/test_utils.h @@ -1,16 +1,16 @@ -#pragma once +#pragma once #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h> #include <ydb/core/testlib/test_pq_client.h> - + #include <library/cpp/testing/unittest/registar.h> #include <google/protobuf/message.h> -#include <contrib/libs/grpc/include/grpcpp/support/sync_stream.h> -#include <util/string/builder.h> +#include <contrib/libs/grpc/include/grpcpp/support/sync_stream.h> +#include <util/string/builder.h> #include <util/system/type_name.h> - + #define TEST_CASE_NAME (TypeName(*this).rfind("TTestCase") != TString::npos ? TypeName(*this).substr(TypeName(*this).rfind("TTestCase") + 9) : TypeName(*this)) - + static constexpr int DEBUG_LOG_LEVEL = 7; using namespace NKikimr::NPersQueueTests; @@ -20,62 +20,62 @@ inline void WaitACLModification() { Sleep(TDuration::Seconds(5)); } -// TODO: Remove and replace all usage with ApiTestSetup -#define SETUP_API_TEST_PREREQUISITES()\ - const TString topic = "topic1";\ - const TString cluster = "dc1";\ - const TString pqRoot = "/Root/PQ";\ - const TString messageGroupId = "test-message-group-id";\ - TPortManager pm;\ - const ui16 port = pm.GetPort(2134);\ - const ui16 grpc = pm.GetPort(2135);\ - TServerSettings settings = PQSettings(port);\ - TServer server = TServer(settings);\ +// TODO: Remove and replace all usage with ApiTestSetup +#define SETUP_API_TEST_PREREQUISITES()\ + const TString topic = "topic1";\ + const TString cluster = "dc1";\ + const TString pqRoot = "/Root/PQ";\ + const TString messageGroupId = "test-message-group-id";\ + TPortManager pm;\ + const ui16 port = pm.GetPort(2134);\ + const ui16 grpc = pm.GetPort(2135);\ + TServerSettings settings = PQSettings(port);\ + TServer server = TServer(settings);\ server.EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(grpc));\ - TFlatMsgBusPQClient client(settings, grpc);\ + TFlatMsgBusPQClient client(settings, grpc);\ client.FullInit();\ - client.CreateTopic("rt3.dc1--" + TString(topic), 1);\ - EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\ - TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\ - writer.WaitWritePQServiceInitialization();\ - auto channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());\ - auto service = Ydb::PersQueue::V1::PersQueueService::NewStub(channel);\ - -template<typename TStream, typename TMessage = google::protobuf::Message> -void AssertSuccessfullStreamingOperation(bool ok, std::unique_ptr<TStream>& stream, TMessage* message = nullptr) { - if (!ok) { - auto status = stream->Finish(); - TStringBuilder errorMessage; - errorMessage << "gRPC stream operation failed with error code " << static_cast<int>(status.error_code()) << " and error message '" << status.error_message() << "'."; - if (message != nullptr) { - errorMessage << " Last user message is " << message->DebugString(); - } - UNIT_FAIL(errorMessage); - } -} - -template<typename TClientMessage, typename TServerMessage> -void AssertStreamingSessionAlive(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream) { - TClientMessage clientMessage; - // TODO: Add 'ping_request' and 'ping_response' to write and read protocol for debugging? - clientMessage.mutable_update_token_request(); - AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage); - TServerMessage serverMessage; - AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); - UNIT_ASSERT_EQUAL_C(TServerMessage::kUpdateTokenResponse, serverMessage.server_message_case(), serverMessage); -} - -template<typename TClientMessage, typename TServerMessage> -void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, - const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode) -{ - TServerMessage serverMessage; - AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); - UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status()); - UNIT_ASSERT_LE(1, serverMessage.issues_size()); - // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"? - // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does? - auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code()); - UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage); -} + client.CreateTopic("rt3.dc1--" + TString(topic), 1);\ + EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\ + TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\ + writer.WaitWritePQServiceInitialization();\ + auto channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());\ + auto service = Ydb::PersQueue::V1::PersQueueService::NewStub(channel);\ + +template<typename TStream, typename TMessage = google::protobuf::Message> +void AssertSuccessfullStreamingOperation(bool ok, std::unique_ptr<TStream>& stream, TMessage* message = nullptr) { + if (!ok) { + auto status = stream->Finish(); + TStringBuilder errorMessage; + errorMessage << "gRPC stream operation failed with error code " << static_cast<int>(status.error_code()) << " and error message '" << status.error_message() << "'."; + if (message != nullptr) { + errorMessage << " Last user message is " << message->DebugString(); + } + UNIT_FAIL(errorMessage); + } +} + +template<typename TClientMessage, typename TServerMessage> +void AssertStreamingSessionAlive(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream) { + TClientMessage clientMessage; + // TODO: Add 'ping_request' and 'ping_response' to write and read protocol for debugging? + clientMessage.mutable_update_token_request(); + AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage); + TServerMessage serverMessage; + AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); + UNIT_ASSERT_EQUAL_C(TServerMessage::kUpdateTokenResponse, serverMessage.server_message_case(), serverMessage); +} + +template<typename TClientMessage, typename TServerMessage> +void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, + const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode) +{ + TServerMessage serverMessage; + AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); + UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status()); + UNIT_ASSERT_LE(1, serverMessage.issues_size()); + // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"? + // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does? + auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code()); + UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage); +} |