aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorleonidvasilev <leonidvasilev@yandex-team.ru>2022-02-10 16:50:41 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:41 +0300
commit0f1eecfae2af6221f0abe9fdcc2c7b641c429cfb (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8
parent4d9cecd757702b2b5e853002de94e2cdbae1ca83 (diff)
downloadydb-0f1eecfae2af6221f0abe9fdcc2c7b641c429cfb.tar.gz
Restoring authorship annotation for <leonidvasilev@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--library/cpp/actors/core/process_stats.cpp30
-rw-r--r--library/cpp/actors/http/http.cpp4
-rw-r--r--library/cpp/actors/http/http.h2
-rw-r--r--library/cpp/actors/http/http_ut.cpp22
-rw-r--r--ydb/core/grpc_services/rpc_calls.h2
-rwxr-xr-xydb/core/protos/grpc_pq_old.proto2
-rw-r--r--ydb/core/protos/pqconfig.proto8
-rw-r--r--ydb/public/api/grpc/draft/ydb_persqueue_v1.proto2
-rw-r--r--ydb/public/api/protos/persqueue_error_codes_v1.proto2
-rw-r--r--ydb/public/api/protos/ydb_persqueue_v1.proto312
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h80
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h82
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_codecs.cpp52
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_codecs.h14
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read.cpp2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp6
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.cpp156
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.h18
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp308
-rw-r--r--ydb/services/persqueue_v1/persqueue.cpp8
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp462
-rw-r--r--ydb/services/persqueue_v1/ut/api_test_setup.h206
-rw-r--r--ydb/services/persqueue_v1/ut/pq_data_writer.h38
-rw-r--r--ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp98
-rw-r--r--ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h60
-rw-r--r--ydb/services/persqueue_v1/ut/test_utils.h124
27 files changed, 1051 insertions, 1051 deletions
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp
index 4eb7d7c2fc..0e1dbd0031 100644
--- a/library/cpp/actors/core/process_stats.cpp
+++ b/library/cpp/actors/core/process_stats.cpp
@@ -6,7 +6,7 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
-#include <util/datetime/uptime.h>
+#include <util/datetime/uptime.h>
#include <util/system/defaults.h>
#include <util/stream/file.h>
#include <util/string/vector.h>
@@ -74,8 +74,8 @@ namespace NActors {
Stime /= tickPerMillisec;
CUtime /= tickPerMillisec;
CStime /= tickPerMillisec;
- SystemUptime = ::Uptime();
- Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / TicksPerMillisec());
+ SystemUptime = ::Uptime();
+ Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / TicksPerMillisec());
}
TFileInput statm("/proc/" + strPid + "/statm");
@@ -196,9 +196,9 @@ namespace {
SysTime = ProcStatGroup->GetCounter("Process/SystemTime", true);
MinorPageFaults = ProcStatGroup->GetCounter("Process/MinorPageFaults", true);
MajorPageFaults = ProcStatGroup->GetCounter("Process/MajorPageFaults", true);
- UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false);
+ UptimeSeconds = ProcStatGroup->GetCounter("Process/UptimeSeconds", false);
NumThreads = ProcStatGroup->GetCounter("Process/NumThreads", false);
- SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false);
+ SystemUptimeSeconds = ProcStatGroup->GetCounter("System/UptimeSeconds", false);
}
void UpdateCounters(const TProcStat& procStat) {
@@ -212,9 +212,9 @@ namespace {
*SysTime = procStat.Stime;
*MinorPageFaults = procStat.MinFlt;
*MajorPageFaults = procStat.MajFlt;
- *UptimeSeconds = procStat.Uptime.Seconds();
+ *UptimeSeconds = procStat.Uptime.Seconds();
*NumThreads = procStat.NumThreads;
- *SystemUptimeSeconds = procStat.Uptime.Seconds();
+ *SystemUptimeSeconds = procStat.Uptime.Seconds();
}
private:
@@ -227,9 +227,9 @@ namespace {
NMonitoring::TDynamicCounters::TCounterPtr SysTime;
NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults;
NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults;
- NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds;
+ NMonitoring::TDynamicCounters::TCounterPtr UptimeSeconds;
NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
- NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds;
+ NMonitoring::TDynamicCounters::TCounterPtr SystemUptimeSeconds;
};
@@ -243,9 +243,9 @@ namespace {
AnonRssSize = registry.IntGauge({{"sensor", "process.AnonRssSize"}});
FileRssSize = registry.IntGauge({{"sensor", "process.FileRssSize"}});
CGroupMemLimit = registry.IntGauge({{"sensor", "process.CGroupMemLimit"}});
- UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}});
+ UptimeSeconds = registry.IntGauge({{"sensor", "process.UptimeSeconds"}});
NumThreads = registry.IntGauge({{"sensor", "process.NumThreads"}});
- SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}});
+ SystemUptimeSeconds = registry.IntGauge({{"sensor", "system.UptimeSeconds"}});
UserTime = registry.Rate({{"sensor", "process.UserTime"}});
SysTime = registry.Rate({{"sensor", "process.SystemTime"}});
@@ -258,9 +258,9 @@ namespace {
AnonRssSize->Set(procStat.AnonRss);
FileRssSize->Set(procStat.FileRss);
CGroupMemLimit->Set(procStat.CGroupMemLim);
- UptimeSeconds->Set(procStat.Uptime.Seconds());
+ UptimeSeconds->Set(procStat.Uptime.Seconds());
NumThreads->Set(procStat.NumThreads);
- SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds());
+ SystemUptimeSeconds->Set(procStat.SystemUptime.Seconds());
// it is ok here to reset and add metric value, because mutation
// is performed in siglethreaded context
@@ -287,9 +287,9 @@ namespace {
NMonitoring::TRate* SysTime;
NMonitoring::TRate* MinorPageFaults;
NMonitoring::TRate* MajorPageFaults;
- NMonitoring::TIntGauge* UptimeSeconds;
+ NMonitoring::TIntGauge* UptimeSeconds;
NMonitoring::TIntGauge* NumThreads;
- NMonitoring::TIntGauge* SystemUptimeSeconds;
+ NMonitoring::TIntGauge* SystemUptimeSeconds;
};
} // namespace
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp
index ae0860e1e3..7125f9d8b0 100644
--- a/library/cpp/actors/http/http.cpp
+++ b/library/cpp/actors/http/http.cpp
@@ -45,7 +45,7 @@ template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { r
template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; }
template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; }
template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; }
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; }
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; }
const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = {
{ THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection },
@@ -53,7 +53,7 @@ const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::
{ THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength },
{ THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding },
{ THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified },
- { THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding }
+ { THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding }
};
void THttpRequest::Clear() {
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 6e212500d7..96c5c1ec48 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -141,7 +141,7 @@ public:
TStringBuf ContentLength;
TStringBuf TransferEncoding;
TStringBuf LastModified;
- TStringBuf ContentEncoding;
+ TStringBuf ContentEncoding;
TStringBuf Body;
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 870199ce0f..4c922f8d0f 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -83,17 +83,17 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test.");
}
- Y_UNIT_TEST(CreateRepsonseWithCompressedBody) {
- NHttp::THttpIncomingRequestPtr request = nullptr;
- NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
- response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip");
- TString compressedBody = "compressed body";
- response->SetBody(compressedBody);
- UNIT_ASSERT_VALUES_EQUAL("gzip", response->ContentEncoding);
- UNIT_ASSERT_VALUES_EQUAL(ToString(compressedBody.size()), response->ContentLength);
- UNIT_ASSERT_VALUES_EQUAL(compressedBody, response->Body);
- }
-
+ Y_UNIT_TEST(CreateRepsonseWithCompressedBody) {
+ NHttp::THttpIncomingRequestPtr request = nullptr;
+ NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
+ response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip");
+ TString compressedBody = "compressed body";
+ response->SetBody(compressedBody);
+ UNIT_ASSERT_VALUES_EQUAL("gzip", response->ContentEncoding);
+ UNIT_ASSERT_VALUES_EQUAL(ToString(compressedBody.size()), response->ContentLength);
+ UNIT_ASSERT_VALUES_EQUAL(compressedBody, response->Body);
+ }
+
Y_UNIT_TEST(BasicPartialParsing) {
NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n");
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index e288b64a0f..43136c2f4a 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -85,7 +85,7 @@ using TEvKikhouseDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvKikh
using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>;
using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>;
using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>;
-using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
+using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto
index 4399c0f15b..ed273e1404 100755
--- a/ydb/core/protos/grpc_pq_old.proto
+++ b/ydb/core/protos/grpc_pq_old.proto
@@ -32,7 +32,7 @@ message TDataChunk {
optional EChunkType ChunkType = 9 [default = REGULAR];
- optional int64 Codec = 10;
+ optional int64 Codec = 10;
optional TMapType ExtraFields = 126;
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 350f54abf0..7c85927449 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -35,7 +35,7 @@ message TPQConfig {
optional bool CheckACL = 9 [default = false];
- optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds.
+ optional uint32 SourceIdCleanupPeriodSec = 10 [default = 60]; // 24 hours // TODO: What is '24 hours'? Default is 60 seconds.
optional uint32 SourceIdMaxLifetimeSec = 11 [default = 1382400]; // 16 days
optional uint32 SourceIdTotalShardsCount = 12 [default = 131072];
@@ -77,9 +77,9 @@ message TPQConfig {
}
optional TQuotingConfig QuotingConfig = 18;
- // Time duration that we wait before we consider remote cluster enabled for load balancing purposes
- optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes
- optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes
+ // Time duration that we wait before we consider remote cluster enabled for load balancing purposes
+ optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes
+ optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes
optional bool RoundRobinPartitionMapping = 26 [default = true];
diff --git a/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto b/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto
index 0cee327235..873ee38b3c 100644
--- a/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto
@@ -32,7 +32,7 @@ service PersQueueService {
* <----------------
*/
- rpc StreamingWrite(stream StreamingWriteClientMessage) returns (stream StreamingWriteServerMessage);
+ rpc StreamingWrite(stream StreamingWriteClientMessage) returns (stream StreamingWriteServerMessage);
/**
* Creates Read Session
diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto
index 85fcd4cecf..c6658305c2 100644
--- a/ydb/public/api/protos/persqueue_error_codes_v1.proto
+++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto
@@ -38,6 +38,6 @@ enum ErrorCode {
CLUSTER_DISABLED = 500020;
WRONG_PARTITION_NUMBER = 500021;
- PREFERRED_CLUSTER_MISMATCHED = 500022;
+ PREFERRED_CLUSTER_MISMATCHED = 500022;
ERROR = 500100;
}
diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto
index 7bb35d73bf..93a7fb6c79 100644
--- a/ydb/public/api/protos/ydb_persqueue_v1.proto
+++ b/ydb/public/api/protos/ydb_persqueue_v1.proto
@@ -5,17 +5,17 @@ import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/annotations/validation.proto";
-package Ydb.PersQueue.V1;
+package Ydb.PersQueue.V1;
option java_package = "com.yandex.ydb.persqueue";
-
+
option cc_enable_arenas = true;
-// NOTE:
-// * We use 'ms' suffix instead of google.protobuf.Timestamp and google.protobuf.Duration in order to utilize
-// packed encoding ('message' types can't be packed encoded). In non-repeated fields we use 'ms' for consistency.
-// * Any message with non-empty 'issues' property leads to streaming RPC termination.
-
+// NOTE:
+// * We use 'ms' suffix instead of google.protobuf.Timestamp and google.protobuf.Duration in order to utilize
+// packed encoding ('message' types can't be packed encoded). In non-repeated fields we use 'ms' for consistency.
+// * Any message with non-empty 'issues' property leads to streaming RPC termination.
+
enum Codec {
CODEC_UNSPECIFIED = 0;
CODEC_RAW = 1;
@@ -24,189 +24,189 @@ enum Codec {
CODEC_ZSTD = 4;
}
-message SessionMetaValue {
- map<string, string> value = 1;
+message SessionMetaValue {
+ map<string, string> value = 1;
}
-/**
- * Represents range [start_offset, end_offset).
- */
-message OffsetsRange {
- int64 start_offset = 1;
- int64 end_offset = 2;
+/**
+ * Represents range [start_offset, end_offset).
+ */
+message OffsetsRange {
+ int64 start_offset = 1;
+ int64 end_offset = 2;
}
/**
- * Request for write session. Contains one of:
- * InitRequest - handshake request.
- * WriteRequest - portion of data to be written.
- * UpdateTokenRequest - user credentials if update is needed.
+ * Request for write session. Contains one of:
+ * InitRequest - handshake request.
+ * WriteRequest - portion of data to be written.
+ * UpdateTokenRequest - user credentials if update is needed.
*/
-message StreamingWriteClientMessage {
- oneof client_message {
- InitRequest init_request = 1;
- WriteRequest write_request = 2;
- UpdateTokenRequest update_token_request = 3;
- }
-
-
+message StreamingWriteClientMessage {
+ oneof client_message {
+ InitRequest init_request = 1;
+ WriteRequest write_request = 2;
+ UpdateTokenRequest update_token_request = 3;
+ }
+
+
// Handshake request that must be sent to server first.
- message InitRequest {
+ message InitRequest {
// Path of topic to write to.
- string topic = 1;
- // message group identifier of client data stream a.k.a. sourceId.
- string message_group_id = 2;
- // Some user metadata attached to this write session.
- map<string, string> session_meta = 3;
+ string topic = 1;
+ // message group identifier of client data stream a.k.a. sourceId.
+ string message_group_id = 2;
+ // Some user metadata attached to this write session.
+ map<string, string> session_meta = 3;
// Partition group to write to.
- // Zero means any group.
- int64 partition_group_id = 4;
-
- int64 max_supported_block_format_version = 5;
-
- string session_id = 100;
- // 0 for first init message and incremental value for connect retries. Used for server logging.
- int64 connection_attempt = 101;
- // Opaque blob. Take last one from previous connect.
- bytes connection_meta = 102;
-
- // Optinal preferred cluster name. Sever will close session If preferred cluster is not server cluster and preferred cluster is enabled after delay TPQConfig::CloseClientSessionWithEnabledRemotePreferredClusterDelaySec
- string preferred_cluster = 103;
-
- // Sanity check option. When no writing activity is done in idle_timeout_sec seconds, then session will be destroyed. Zero means infinity.
- int64 idle_timeout_ms = 200;
+ // Zero means any group.
+ int64 partition_group_id = 4;
+
+ int64 max_supported_block_format_version = 5;
+
+ string session_id = 100;
+ // 0 for first init message and incremental value for connect retries. Used for server logging.
+ int64 connection_attempt = 101;
+ // Opaque blob. Take last one from previous connect.
+ bytes connection_meta = 102;
+
+ // Optinal preferred cluster name. Sever will close session If preferred cluster is not server cluster and preferred cluster is enabled after delay TPQConfig::CloseClientSessionWithEnabledRemotePreferredClusterDelaySec
+ string preferred_cluster = 103;
+
+ // Sanity check option. When no writing activity is done in idle_timeout_sec seconds, then session will be destroyed. Zero means infinity.
+ int64 idle_timeout_ms = 200;
}
- // Represents portion of client messages.
- message WriteRequest {
- // Sequence numbers of messages in order that client will provide to server.
- repeated int64 sequence_numbers = 2;
- // Message creation timestamps for client messages.
- // Same size as sequence_numbers.
- repeated int64 created_at_ms = 3;
- // Message creation timestamps for client messages.
- // Same size as sequence_numbers.
- repeated int64 sent_at_ms = 4;
- // Client message sizes.
- // Same size as sequence_numbers.
- repeated int64 message_sizes = 5;
-
- // Block must contain whole client message when it's size is not bigger that max_block_size.
- // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of
- // this blocks will be with block_count = 0 but not the last one - last one's block_count will be greater than 0;
- // Blocks can be reordered upto max_flush_window_size of uncompressed data.
+ // Represents portion of client messages.
+ message WriteRequest {
+ // Sequence numbers of messages in order that client will provide to server.
+ repeated int64 sequence_numbers = 2;
+ // Message creation timestamps for client messages.
+ // Same size as sequence_numbers.
+ repeated int64 created_at_ms = 3;
+ // Message creation timestamps for client messages.
+ // Same size as sequence_numbers.
+ repeated int64 sent_at_ms = 4;
+ // Client message sizes.
+ // Same size as sequence_numbers.
+ repeated int64 message_sizes = 5;
+
+ // Block must contain whole client message when it's size is not bigger that max_block_size.
+ // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of
+ // this blocks will be with block_count = 0 but not the last one - last one's block_count will be greater than 0;
+ // Blocks can be reordered upto max_flush_window_size of uncompressed data.
// Each block contains concatenated client messages, compressed by chosen codec.
- // If there is not full client message inside block, then all block contains only this part of message.
- // blocks: A A A B B B BCDE
- // offset: 1 1 1 2 2 2 2
- // part_number: 0 1 2 0 1 2 3
- // count: 0 0 1 0 0 1 4
-
- repeated int64 blocks_offsets = 6;
- repeated int64 blocks_part_numbers = 7;
- // How many complete messages and imcomplete messages end parts (one at most) this block contains
- repeated int64 blocks_message_counts = 8;
- repeated int64 blocks_uncompressed_sizes = 9;
- // In block format version 0 each byte contains only block codec identifier
- repeated bytes blocks_headers = 10;
- repeated bytes blocks_data = 11;
+ // If there is not full client message inside block, then all block contains only this part of message.
+ // blocks: A A A B B B BCDE
+ // offset: 1 1 1 2 2 2 2
+ // part_number: 0 1 2 0 1 2 3
+ // count: 0 0 1 0 0 1 4
+
+ repeated int64 blocks_offsets = 6;
+ repeated int64 blocks_part_numbers = 7;
+ // How many complete messages and imcomplete messages end parts (one at most) this block contains
+ repeated int64 blocks_message_counts = 8;
+ repeated int64 blocks_uncompressed_sizes = 9;
+ // In block format version 0 each byte contains only block codec identifier
+ repeated bytes blocks_headers = 10;
+ repeated bytes blocks_data = 11;
}
- // In-session reauthentication and reauthorization, lets user increase session lifetime. You should wait for 'update_token_response' before sending next 'update_token_request'.
- message UpdateTokenRequest {
- string token = 1;
- }
+ // In-session reauthentication and reauthorization, lets user increase session lifetime. You should wait for 'update_token_response' before sending next 'update_token_request'.
+ message UpdateTokenRequest {
+ string token = 1;
+ }
}
/**
- * Response for write session. Contains one of:
- * InitResponse - correct handshake response.
- * BatchWriteResponse - acknowledgment of storing client messages.
- * UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
+ * Response for write session. Contains one of:
+ * InitResponse - correct handshake response.
+ * BatchWriteResponse - acknowledgment of storing client messages.
+ * UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
*/
-message StreamingWriteServerMessage {
- oneof server_message {
- InitResponse init_response = 3;
- BatchWriteResponse batch_write_response = 4;
- UpdateTokenResponse update_token_response = 5;
- }
-
- // Server status of response.
- Ydb.StatusIds.StatusCode status = 1;
-
- // Issues if any.
- repeated Ydb.Issue.IssueMessage issues = 2;
-
+message StreamingWriteServerMessage {
+ oneof server_message {
+ InitResponse init_response = 3;
+ BatchWriteResponse batch_write_response = 4;
+ UpdateTokenResponse update_token_response = 5;
+ }
+
+ // Server status of response.
+ Ydb.StatusIds.StatusCode status = 1;
+
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 2;
+
// Response for handshake.
- message InitResponse {
- // Last persisted message's sequence number for this message group.
- int64 last_sequence_number = 1;
+ message InitResponse {
+ // Last persisted message's sequence number for this message group.
+ int64 last_sequence_number = 1;
// Unique identifier of write session. Used for debug purposes.
string session_id = 2;
// Path of topic that matched for this write session. Used for debug purposes, will be the same as in Init request from client.
- string topic = 3;
+ string topic = 3;
// Write session is established to this cluster. Client data will be in instance of topic in this cluster.
string cluster = 4;
- // Identifier of partition that is matched for this write session.
- int64 partition_id = 5;
+ // Identifier of partition that is matched for this write session.
+ int64 partition_id = 5;
- // Block (see StreamingWriteClientMessage.WriteRequest.blocks_data) format version supported by server or configured for a topic. Client must write data only with them.
- int64 block_format_version = 6;
+ // Block (see StreamingWriteClientMessage.WriteRequest.blocks_data) format version supported by server or configured for a topic. Client must write data only with them.
+ int64 block_format_version = 6;
// Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
repeated Codec supported_codecs = 10;
-
- // Maximal flush window size choosed by server. Size of uncompressed data not sended to server must not be bigger than flush window size.
- // In other words, this is maximal size of gap inside uncompressed data, which is not sended to server yet.
- int64 max_flush_window_size = 7; // will be 2048kb
- // How big blocks per stream could be(in uncompressed size). When block contains more than max_block_size of uncompressed data - block must be truncated.
- int64 max_block_size = 8; // will be 512kb
-
- // Opaque blob, used for fast reconnects.
- bytes connection_meta = 9;
+
+ // Maximal flush window size choosed by server. Size of uncompressed data not sended to server must not be bigger than flush window size.
+ // In other words, this is maximal size of gap inside uncompressed data, which is not sended to server yet.
+ int64 max_flush_window_size = 7; // will be 2048kb
+ // How big blocks per stream could be(in uncompressed size). When block contains more than max_block_size of uncompressed data - block must be truncated.
+ int64 max_block_size = 8; // will be 512kb
+
+ // Opaque blob, used for fast reconnects.
+ bytes connection_meta = 9;
+ }
+
+ // Message that represents acknowledgment for sequence of client messages. This sequence is persisted together so write statistics is for messages batch.
+ message BatchWriteResponse {
+ // Sequence numbers of persisted client messages.
+ repeated int64 sequence_numbers = 1;
+ // Assigned partition offsets.
+ // Zero for skipped messages.
+ repeated int64 offsets = 2;
+ // Per message flag. False if message is written for the first time and True otherwise.
+ repeated bool already_written = 3;
+
+ // Assigned partition for all client messages inside this batch.
+ int64 partition_id = 4;
+
+ // Write statistics for this sequence of client messages.
+ WriteStatistics write_statistics = 5;
+ }
+
+ message UpdateTokenResponse {
}
- // Message that represents acknowledgment for sequence of client messages. This sequence is persisted together so write statistics is for messages batch.
- message BatchWriteResponse {
- // Sequence numbers of persisted client messages.
- repeated int64 sequence_numbers = 1;
- // Assigned partition offsets.
- // Zero for skipped messages.
- repeated int64 offsets = 2;
- // Per message flag. False if message is written for the first time and True otherwise.
- repeated bool already_written = 3;
-
- // Assigned partition for all client messages inside this batch.
- int64 partition_id = 4;
-
- // Write statistics for this sequence of client messages.
- WriteStatistics write_statistics = 5;
- }
-
- message UpdateTokenResponse {
- }
-
// Message with write statistics.
- message WriteStatistics {
+ message WriteStatistics {
// Time spent in persisting of data.
- int64 persist_duration_ms = 1;
+ int64 persist_duration_ms = 1;
// Time spent in queue before persisting.
- int64 queued_in_partition_duration_ms = 2;
+ int64 queued_in_partition_duration_ms = 2;
// Time spent awaiting for partition write quota.
- int64 throttled_on_partition_duration_ms = 3;
- // Time spent awaiting for topic write quota.
- int64 throttled_on_topic_duration_ms = 4;
+ int64 throttled_on_partition_duration_ms = 3;
+ // Time spent awaiting for topic write quota.
+ int64 throttled_on_topic_duration_ms = 4;
}
-}
+}
-message Path {
- // Path of object (topic/consumer).
- string path = 1;
-}
+message Path {
+ // Path of object (topic/consumer).
+ string path = 1;
+}
-message KeyValue {
- string key = 1;
- string value = 2;
+message KeyValue {
+ string key = 1;
+ string value = 2;
}
/**
@@ -575,9 +575,9 @@ message StreamingReadServerMessageNew {
// How many complete messages and imcomplete messages end parts (one at most) this block contains
repeated int64 blocks_message_counts = 15;
repeated int64 blocks_uncompressed_sizes = 16;
- // In block format version 0 each byte contains only block codec identifier
- repeated bytes blocks_headers = 17;
- repeated bytes blocks_data = 18;
+ // In block format version 0 each byte contains only block codec identifier
+ repeated bytes blocks_headers = 17;
+ repeated bytes blocks_data = 18;
// Zero if this is not first portion of data after resume or provided by client cookie otherwise.
int64 resume_cookie = 50;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index a2ab6f477c..677eb8c03d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -1,14 +1,14 @@
-#pragma once
-#include "test_server.h"
+#pragma once
+#include "test_server.h"
#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>
#include <library/cpp/logger/log.h>
#include <util/system/tempfile.h>
-
-#define TEST_CASE_NAME (this->Name_)
-
-namespace NPersQueue {
-class SDKTestSetup {
+#define TEST_CASE_NAME (this->Name_)
+
+namespace NPersQueue {
+
+class SDKTestSetup {
protected:
TString TestCaseName;
@@ -19,10 +19,10 @@ protected:
TLog Log = TLog("cerr");
-public:
+public:
SDKTestSetup(const TString& testCaseName, bool start = true)
: TestCaseName(testCaseName)
- {
+ {
InitOptions();
if (start) {
Start();
@@ -31,14 +31,14 @@ public:
void InitOptions() {
Log.SetFormatter([testCaseName = TestCaseName](ELogPriority priority, TStringBuf message) {
- return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
- });
- Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max());
- // Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors:
- // 1. 'got more than 10 unordered cookies to commit 12'
- // 2. 'got more than 10 uncommitted reads'
- Server.ServerSettings.PQConfig.Clear();
- Server.ServerSettings.PQConfig.SetEnabled(true);
+ return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
+ });
+ Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max());
+ // Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors:
+ // 1. 'got more than 10 unordered cookies to commit 12'
+ // 2. 'got more than 10 uncommitted reads'
+ Server.ServerSettings.PQConfig.Clear();
+ Server.ServerSettings.PQConfig.SetEnabled(true);
Server.ServerSettings.PQConfig.SetRemoteClusterEnabledDelaySec(1);
Server.ServerSettings.PQConfig.SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(1);
Server.ServerSettings.PQClusterDiscoveryConfig.SetEnabled(true);
@@ -53,7 +53,7 @@ public:
void Start(bool waitInit = true, bool addBrokenDatacenter = false) {
Server.StartServer(false);
//Server.EnableLogs({NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
- Server.AnnoyingClient->InitRoot();
+ Server.AnnoyingClient->InitRoot();
if (DataCenters.empty()) {
THashMap<TString, NKikimr::NPersQueueTests::TPQTestClusterInfo> dataCenters;
dataCenters.emplace("dc1", NKikimr::NPersQueueTests::TPQTestClusterInfo{TStringBuilder() << "localhost:" << Server.GrpcPort, true});
@@ -64,33 +64,33 @@ public:
} else {
Server.AnnoyingClient->InitDCs(DataCenters, LocalDC);
}
- Server.AnnoyingClient->InitSourceIds();
+ Server.AnnoyingClient->InitSourceIds();
CreateTopic(GetTestTopic(), GetLocalCluster());
if (waitInit) {
Server.WaitInit(GetTestTopic());
}
- }
-
+ }
+
TString GetTestTopic() const {
- return "topic1";
- }
-
+ return "topic1";
+ }
+
TString GetTestClient() const {
- return "test-reader";
- }
-
+ return "test-reader";
+ }
+
TString GetTestMessageGroupId() const {
- return "test-message-group-id";
- }
-
+ return "test-message-group-id";
+ }
+
TString GetLocalCluster() const {
return LocalDC;
- }
-
+ }
+
ui16 GetGrpcPort() const {
return Server.GrpcPort;
}
-
+
NGrpc::TServerOptions& GetGrpcServerOptions() {
return Server.GrpcServerOptions;
}
@@ -102,11 +102,11 @@ public:
Server.ServerSettings.NetClassifierConfig.SetNetDataFilePath(NetDataFile->Name());
}
-
- TLog& GetLog() {
- return Log;
- }
-
+
+ TLog& GetLog() {
+ return Log;
+ }
+
template <class TConsumerOrProducer>
void Start(const THolder<TConsumerOrProducer>& obj) {
auto startFuture = obj->Start();
@@ -198,5 +198,5 @@ public:
UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record);
Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID());
}
-};
-}
+};
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
index 9e43a7c42e..250ce03620 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
@@ -69,7 +69,7 @@ public:
void EnableLogs(const TVector<NKikimrServices::EServiceKikimr> services,
NActors::NLog::EPriority prio = NActors::NLog::PRI_DEBUG) {
- Y_VERIFY(CleverServer != nullptr, "Start server before enabling logs");
+ Y_VERIFY(CleverServer != nullptr, "Start server before enabling logs");
for (auto s : services) {
CleverServer->GetRuntime()->SetLogPriority(s, prio);
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 1fc46216bb..65c8aca1db 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -52,14 +52,14 @@ const TString& TopicPrefix(const TActorContext& ctx);
static const TDuration CHECK_ACL_DELAY = TDuration::Minutes(5);
-// Codec ID size in bytes
-constexpr ui32 CODEC_ID_SIZE = 1;
-
+// Codec ID size in bytes
+constexpr ui32 CODEC_ID_SIZE = 1;
-template<typename TItem0, typename... TItems>
-bool AllEqual(const TItem0& item0, const TItems&... items) {
- return ((items == item0) && ... && true);
-}
+
+template<typename TItem0, typename... TItems>
+bool AllEqual(const TItem0& item0, const TItems&... items) {
+ return ((items == item0) && ... && true);
+}
static inline bool InternalErrorCode(PersQueue::ErrorCode::ErrorCode errorCode) {
switch(errorCode) {
@@ -98,7 +98,7 @@ struct TCommitCookie {
struct TEvPQProxy {
enum EEv {
- EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something
+ EvWriteInit = EventSpaceBegin(TKikimrEvents::ES_PQ_PROXY_NEW), // TODO: Replace 'NEW' with version or something
EvWrite,
EvDone,
EvReadInit,
@@ -123,7 +123,7 @@ struct TEvPQProxy {
EvUpdateClusters,
EvQueryCompiled,
EvSessionDead,
- EvSessionSetPreferredCluster,
+ EvSessionSetPreferredCluster,
EvScheduleUpdateClusters,
EvDeadlineExceeded,
EvGetStatus,
@@ -148,14 +148,14 @@ struct TEvPQProxy {
TTopicTabletsPairs TopicAndTablets;
};
- struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> {
- TEvSessionSetPreferredCluster(const ui64 cookie, const TString& preferredCluster)
- : Cookie(cookie)
- , PreferredCluster(preferredCluster)
- {}
- const ui64 Cookie;
- const TString PreferredCluster;
- };
+ struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> {
+ TEvSessionSetPreferredCluster(const ui64 cookie, const TString& preferredCluster)
+ : Cookie(cookie)
+ , PreferredCluster(preferredCluster)
+ {}
+ const ui64 Cookie;
+ const TString PreferredCluster;
+ };
struct TEvSessionDead : public NActors::TEventLocal<TEvSessionDead, EvSessionDead> {
TEvSessionDead(const ui64 cookie)
@@ -196,21 +196,21 @@ struct TEvPQProxy {
struct TEvWriteInit : public NActors::TEventLocal<TEvWriteInit, EvWriteInit> {
- TEvWriteInit(PersQueue::V1::StreamingWriteClientMessage&& req, const TString& peerName)
+ TEvWriteInit(PersQueue::V1::StreamingWriteClientMessage&& req, const TString& peerName)
: Request(std::move(req))
, PeerName(peerName)
{ }
- PersQueue::V1::StreamingWriteClientMessage Request;
+ PersQueue::V1::StreamingWriteClientMessage Request;
TString PeerName;
};
struct TEvWrite : public NActors::TEventLocal<TEvWrite, EvWrite> {
- explicit TEvWrite(PersQueue::V1::StreamingWriteClientMessage&& req)
+ explicit TEvWrite(PersQueue::V1::StreamingWriteClientMessage&& req)
: Request(std::move(req))
{ }
- PersQueue::V1::StreamingWriteClientMessage Request;
+ PersQueue::V1::StreamingWriteClientMessage Request;
};
struct TEvDone : public NActors::TEventLocal<TEvDone, EvDone> {
@@ -244,14 +244,14 @@ struct TEvPQProxy {
ui64 ReadTimestampMs;
};
- struct TEvUpdateToken : public NActors::TEventLocal<TEvUpdateToken, EvUpdateToken> {
- explicit TEvUpdateToken(PersQueue::V1::StreamingWriteClientMessage&& req)
- : Request(std::move(req))
- { }
-
- PersQueue::V1::StreamingWriteClientMessage Request;
- };
-
+ struct TEvUpdateToken : public NActors::TEventLocal<TEvUpdateToken, EvUpdateToken> {
+ explicit TEvUpdateToken(PersQueue::V1::StreamingWriteClientMessage&& req)
+ : Request(std::move(req))
+ { }
+
+ PersQueue::V1::StreamingWriteClientMessage Request;
+ };
+
struct TEvCloseSession : public NActors::TEventLocal<TEvCloseSession, EvCloseSession> {
TEvCloseSession(const TString& reason, const PersQueue::ErrorCode::ErrorCode errorCode)
: Reason(reason)
@@ -453,7 +453,7 @@ struct TEvPQProxy {
/// WRITE ACTOR
class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor> {
-using IContext = NGRpcServer::IGRpcStreamingContext<PersQueue::V1::StreamingWriteClientMessage, PersQueue::V1::StreamingWriteServerMessage>;
+using IContext = NGRpcServer::IGRpcStreamingContext<PersQueue::V1::StreamingWriteClientMessage, PersQueue::V1::StreamingWriteServerMessage>;
using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse;
using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest;
@@ -515,11 +515,11 @@ private:
void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx);
void CheckACL(const TActorContext& ctx);
- // Requests fresh ACL from 'SchemeCache'
+ // Requests fresh ACL from 'SchemeCache'
void InitCheckSchema(const TActorContext& ctx, bool needWaitSchema = false);
void Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPQProxy::TEvWrite::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPQProxy::TEvDone::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx);
@@ -581,8 +581,8 @@ private:
NPersQueue::TConverterPtr TopicConverter;
ui32 Partition;
ui32 PreferedPartition;
- // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
- TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere
+ // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
+ TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere
TString EscapedSourceId;
ui32 Hash = 0;
@@ -592,7 +592,7 @@ private:
ui32 NumReserveBytesRequests;
THolder<TAclWrapper> ACL;
-
+
struct TWriteRequestBatchInfo: public TSimpleRefCount<TWriteRequestBatchInfo> {
using TPtr = TIntrusivePtr<TWriteRequestBatchInfo>;
@@ -615,7 +615,7 @@ private:
// Requests that is already sent to partition actor
std::deque<TWriteRequestBatchInfo::TPtr> SentMessages;
-
+
bool WritesDone;
THashMap<ui32, ui64> PartitionToTablet;
@@ -639,9 +639,9 @@ private:
TIntrusivePtr<NACLib::TUserToken> Token;
TString Auth;
- // Got 'update_token_request', authentication or authorization in progress or 'update_token_response' is not sent yet. Only single 'update_token_request' is allowed inflight
- bool UpdateTokenInProgress;
- bool UpdateTokenAuthenticated;
+ // Got 'update_token_request', authentication or authorization in progress or 'update_token_response' is not sent yet. Only single 'update_token_request' is allowed inflight
+ bool UpdateTokenInProgress;
+ bool UpdateTokenAuthenticated;
bool ACLCheckInProgress;
bool FirstACLCheck;
bool RequestNotChecked;
@@ -651,8 +651,8 @@ private:
ui64 BalancerTabletId;
TActorId PipeToBalancer;
- // PQ tablet configuration that we get at the time of session initialization
- NKikimrPQ::TPQTabletConfig InitialPQTabletConfig;
+ // PQ tablet configuration that we get at the time of session initialization
+ NKikimrPQ::TPQTabletConfig InitialPQTabletConfig;
NKikimrPQClient::TDataChunk InitMeta;
TString LocalDC;
diff --git a/ydb/services/persqueue_v1/grpc_pq_codecs.cpp b/ydb/services/persqueue_v1/grpc_pq_codecs.cpp
index 5415fc1576..d9cbfeb411 100644
--- a/ydb/services/persqueue_v1/grpc_pq_codecs.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_codecs.cpp
@@ -1,30 +1,30 @@
-#include "grpc_pq_codecs.h"
+#include "grpc_pq_codecs.h"
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <util/generic/algorithm.h>
-#include <util/generic/fwd.h>
-#include <util/string/builder.h>
-
-namespace NKikimr::NGRpcProxy {
- bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error) {
- error.clear();
-
- if (pqTabletConfig.has_codecs() /* empty codecs that any codec is allowed for migration purposes */) {
- const auto& ids = pqTabletConfig.codecs().ids();
- if (!ids.empty() && Find(ids, codecID) == ids.end()) {
- const auto& names = pqTabletConfig.codecs().codecs();
- Y_VERIFY(ids.size() == names.size(), "PQ tabled supported codecs configuration is invalid");
- TStringBuilder errorBuilder;
- errorBuilder << "given codec (id " << static_cast<i32>(codecID) << ") is not configured for the topic. Configured codecs are " << names[0] << " (id " << ids[0] << ")";
- for (i32 i = 1; i != ids.size(); ++i) {
- errorBuilder << ", " << names[i] << " (id " << ids[i] << ")";
- }
- error = errorBuilder;
- return false;
- }
- }
-
- return true;
- }
-}
+#include <util/generic/fwd.h>
+#include <util/string/builder.h>
+
+namespace NKikimr::NGRpcProxy {
+ bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error) {
+ error.clear();
+
+ if (pqTabletConfig.has_codecs() /* empty codecs that any codec is allowed for migration purposes */) {
+ const auto& ids = pqTabletConfig.codecs().ids();
+ if (!ids.empty() && Find(ids, codecID) == ids.end()) {
+ const auto& names = pqTabletConfig.codecs().codecs();
+ Y_VERIFY(ids.size() == names.size(), "PQ tabled supported codecs configuration is invalid");
+ TStringBuilder errorBuilder;
+ errorBuilder << "given codec (id " << static_cast<i32>(codecID) << ") is not configured for the topic. Configured codecs are " << names[0] << " (id " << ids[0] << ")";
+ for (i32 i = 1; i != ids.size(); ++i) {
+ errorBuilder << ", " << names[i] << " (id " << ids[i] << ")";
+ }
+ error = errorBuilder;
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/ydb/services/persqueue_v1/grpc_pq_codecs.h b/ydb/services/persqueue_v1/grpc_pq_codecs.h
index c3d2cb3734..005aeaa2d8 100644
--- a/ydb/services/persqueue_v1/grpc_pq_codecs.h
+++ b/ydb/services/persqueue_v1/grpc_pq_codecs.h
@@ -1,8 +1,8 @@
-#pragma once
+#pragma once
#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <util/generic/fwd.h>
-
-namespace NKikimr::NGRpcProxy {
- // Validates that client can safely write to the topic data compressed with specific codec
- bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error);
-}
+#include <util/generic/fwd.h>
+
+namespace NKikimr::NGRpcProxy {
+ // Validates that client can safely write to the topic data compressed with specific codec
+ bool ValidateWriteWithCodec(const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const ui32 codecID, TString& error);
+}
diff --git a/ydb/services/persqueue_v1/grpc_pq_read.cpp b/ydb/services/persqueue_v1/grpc_pq_read.cpp
index bb6137d99c..6b7e72fdd0 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read.cpp
@@ -17,7 +17,7 @@ namespace V1 {
///////////////////////////////////////////////////////////////////////////////
-using namespace PersQueue::V1;
+using namespace PersQueue::V1;
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 384d631809..654edfcfcd 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -32,7 +32,7 @@ using namespace NMsgBusProxy;
namespace NGRpcProxy {
namespace V1 {
-using namespace PersQueue::V1;
+using namespace PersQueue::V1;
#define PQ_LOG_PREFIX "session cookie " << Cookie << " consumer " << ClientPath << " session " << Session
@@ -2982,8 +2982,8 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const PersQueue:
case INITIALIZING:
case CLUSTER_DISABLED:
return Ydb::StatusIds::UNAVAILABLE;
- case PREFERRED_CLUSTER_MISMATCHED:
- return Ydb::StatusIds::ABORTED;
+ case PREFERRED_CLUSTER_MISMATCHED:
+ return Ydb::StatusIds::ABORTED;
case OVERLOAD:
return Ydb::StatusIds::OVERLOADED;
case BAD_REQUEST:
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp
index 816cc63579..5f17a2f3ab 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp
@@ -14,7 +14,7 @@ namespace NKikimr {
namespace NGRpcProxy {
namespace V1 {
-using namespace PersQueue::V1;
+using namespace PersQueue::V1;
///////////////////////////////////////////////////////////////////////////////
@@ -65,7 +65,7 @@ void TPQWriteService::Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpda
}
-void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx) {
+void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx) {
Y_VERIFY(ev->Get()->ClustersList);
Y_VERIFY(ev->Get()->ClustersList->Clusters.size());
@@ -74,92 +74,92 @@ void TPQWriteService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvCluster
LocalCluster = "";
Enabled = false;
- // Rebalance load on installation clusters: if preferred cluster is enabled and session is alive long enough close it so client can recreate it in preferred cluster
- auto remoteClusterEnabledDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetRemoteClusterEnabledDelaySec());
- auto closeClientSessionWithEnabledRemotePreferredClusterDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec());
- const auto clustersListUpdatedAt = ev->Get()->ClustersListUpdateTimestamp ? *ev->Get()->ClustersListUpdateTimestamp : TInstant::Now();
- THashSet<TString> remoteClusters;
- THashSet<TString> rebalanceClusters;
- for (const auto& cluster : clusters) {
- if (cluster.IsLocal) {
- LocalCluster = cluster.Name;
- Enabled = cluster.IsEnabled;
- continue;
- }
-
- remoteClusters.emplace(cluster.Name);
-
- if (!cluster.IsEnabled) {
- ClustersEnabledAt.erase(cluster.Name);
- continue;
- }
-
- if (!ClustersEnabledAt.contains(cluster.Name)) {
- ClustersEnabledAt[cluster.Name] = clustersListUpdatedAt;
- }
-
- const bool readyToCreateSessions = ClustersEnabledAt[cluster.Name] <= (TInstant::Now() - remoteClusterEnabledDelay);
- if (readyToCreateSessions) {
- rebalanceClusters.emplace(cluster.Name);
- }
- }
-
- if (!Enabled) {
- for (auto it = Sessions.begin(); it != Sessions.end(); ++it) {
- Send(it->second, new TEvPQProxy::TEvDieCommand("cluster disabled", PersQueue::ErrorCode::CLUSTER_DISABLED));
- }
- return;
- }
-
- for (const auto& sessionsByPreferredCluster : SessionsByRemotePreferredCluster) {
- const auto& cluster = sessionsByPreferredCluster.first;
- if (rebalanceClusters.contains(cluster) || !remoteClusters.contains(cluster)) {
- const TString closeReason = TStringBuilder() << "Session preferred cluster " << cluster.Quote()
- << (remoteClusters.contains(cluster) ? " is enabled for at least " + ToString(closeClientSessionWithEnabledRemotePreferredClusterDelay) : " is unknown")
- << " and session is older than " << closeClientSessionWithEnabledRemotePreferredClusterDelay;
-
- const auto closeUpToCreatedAt = TInstant::Now() - closeClientSessionWithEnabledRemotePreferredClusterDelay;
-
- for (const auto& session : sessionsByPreferredCluster.second) {
- const auto& createdAt = session.second;
- if (createdAt <= closeUpToCreatedAt) {
- const auto& workerID = Sessions[session.first];
- Send(workerID, new TEvPQProxy::TEvDieCommand(closeReason, PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED));
- }
+ // Rebalance load on installation clusters: if preferred cluster is enabled and session is alive long enough close it so client can recreate it in preferred cluster
+ auto remoteClusterEnabledDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetRemoteClusterEnabledDelaySec());
+ auto closeClientSessionWithEnabledRemotePreferredClusterDelay = TDuration::Seconds(AppData(ctx)->PQConfig.GetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec());
+ const auto clustersListUpdatedAt = ev->Get()->ClustersListUpdateTimestamp ? *ev->Get()->ClustersListUpdateTimestamp : TInstant::Now();
+ THashSet<TString> remoteClusters;
+ THashSet<TString> rebalanceClusters;
+ for (const auto& cluster : clusters) {
+ if (cluster.IsLocal) {
+ LocalCluster = cluster.Name;
+ Enabled = cluster.IsEnabled;
+ continue;
+ }
+
+ remoteClusters.emplace(cluster.Name);
+
+ if (!cluster.IsEnabled) {
+ ClustersEnabledAt.erase(cluster.Name);
+ continue;
+ }
+
+ if (!ClustersEnabledAt.contains(cluster.Name)) {
+ ClustersEnabledAt[cluster.Name] = clustersListUpdatedAt;
+ }
+
+ const bool readyToCreateSessions = ClustersEnabledAt[cluster.Name] <= (TInstant::Now() - remoteClusterEnabledDelay);
+ if (readyToCreateSessions) {
+ rebalanceClusters.emplace(cluster.Name);
+ }
+ }
+
+ if (!Enabled) {
+ for (auto it = Sessions.begin(); it != Sessions.end(); ++it) {
+ Send(it->second, new TEvPQProxy::TEvDieCommand("cluster disabled", PersQueue::ErrorCode::CLUSTER_DISABLED));
+ }
+ return;
+ }
+
+ for (const auto& sessionsByPreferredCluster : SessionsByRemotePreferredCluster) {
+ const auto& cluster = sessionsByPreferredCluster.first;
+ if (rebalanceClusters.contains(cluster) || !remoteClusters.contains(cluster)) {
+ const TString closeReason = TStringBuilder() << "Session preferred cluster " << cluster.Quote()
+ << (remoteClusters.contains(cluster) ? " is enabled for at least " + ToString(closeClientSessionWithEnabledRemotePreferredClusterDelay) : " is unknown")
+ << " and session is older than " << closeClientSessionWithEnabledRemotePreferredClusterDelay;
+
+ const auto closeUpToCreatedAt = TInstant::Now() - closeClientSessionWithEnabledRemotePreferredClusterDelay;
+
+ for (const auto& session : sessionsByPreferredCluster.second) {
+ const auto& createdAt = session.second;
+ if (createdAt <= closeUpToCreatedAt) {
+ const auto& workerID = Sessions[session.first];
+ Send(workerID, new TEvPQProxy::TEvDieCommand(closeReason, PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED));
+ }
}
}
}
}
-void TPQWriteService::Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx) {
- const auto& cookie = ev->Get()->Cookie;
- const auto& preferredCluster = ev->Get()->PreferredCluster;
- if (!Sessions.contains(cookie)) {
- LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, TStringBuilder() << "Got TEvSessionSetPreferredCluster message from session with cookie " << cookie << " that is not in session collection");
- return;
- }
- if (!preferredCluster.empty() && *LocalCluster != preferredCluster) {
- SessionsByRemotePreferredCluster[preferredCluster][cookie] = TInstant::Now();
- RemotePreferredClusterBySessionCookie[cookie] = std::move(preferredCluster);
- }
-}
+void TPQWriteService::Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx) {
+ const auto& cookie = ev->Get()->Cookie;
+ const auto& preferredCluster = ev->Get()->PreferredCluster;
+ if (!Sessions.contains(cookie)) {
+ LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, TStringBuilder() << "Got TEvSessionSetPreferredCluster message from session with cookie " << cookie << " that is not in session collection");
+ return;
+ }
+ if (!preferredCluster.empty() && *LocalCluster != preferredCluster) {
+ SessionsByRemotePreferredCluster[preferredCluster][cookie] = TInstant::Now();
+ RemotePreferredClusterBySessionCookie[cookie] = std::move(preferredCluster);
+ }
+}
void TPQWriteService::Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext&) {
- const auto& cookie = ev->Get()->Cookie;
- Sessions.erase(cookie);
- if (RemotePreferredClusterBySessionCookie.contains(cookie)) {
- const auto& preferredCluster = RemotePreferredClusterBySessionCookie[cookie];
- SessionsByRemotePreferredCluster[preferredCluster].erase(cookie);
- if (SessionsByRemotePreferredCluster[preferredCluster].empty()) {
- SessionsByRemotePreferredCluster.erase(preferredCluster);
- }
- RemotePreferredClusterBySessionCookie.erase(cookie);
- }
+ const auto& cookie = ev->Get()->Cookie;
+ Sessions.erase(cookie);
+ if (RemotePreferredClusterBySessionCookie.contains(cookie)) {
+ const auto& preferredCluster = RemotePreferredClusterBySessionCookie[cookie];
+ SessionsByRemotePreferredCluster[preferredCluster].erase(cookie);
+ if (SessionsByRemotePreferredCluster[preferredCluster].empty()) {
+ SessionsByRemotePreferredCluster.erase(preferredCluster);
+ }
+ RemotePreferredClusterBySessionCookie.erase(cookie);
+ }
}
-StreamingWriteServerMessage FillWriteResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) {
- StreamingWriteServerMessage res;
+StreamingWriteServerMessage FillWriteResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) {
+ StreamingWriteServerMessage res;
FillIssue(res.add_issues(), code, errorReason);
res.set_status(ConvertPersQueueInternalCodeToStatus(code));
return res;
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h
index d18355dc60..73148388b2 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.h
+++ b/ydb/services/persqueue_v1/grpc_pq_write.h
@@ -43,9 +43,9 @@ private:
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
HFunc(NKikimr::NGRpcService::TEvStreamPQWriteRequest, Handle);
- HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
+ HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
HFunc(TEvPQProxy::TEvSessionDead, Handle);
- HFunc(TEvPQProxy::TEvSessionSetPreferredCluster, Handle);
+ HFunc(TEvPQProxy::TEvSessionSetPreferredCluster, Handle);
HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle);
}
@@ -53,10 +53,10 @@ private:
private:
void Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx);
+ void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev, const TActorContext& ctx);
void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvSessionSetPreferredCluster::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext& ctx);
NActors::TActorId SchemeCache;
@@ -65,11 +65,11 @@ private:
TAtomic LastCookie = 0;
THashMap<ui64, TActorId> Sessions;
- // Created at by session cookie map by remote preferred cluster name
- THashMap<TString, THashMap<ui64, TInstant>> SessionsByRemotePreferredCluster;
- THashMap<ui64, TString> RemotePreferredClusterBySessionCookie;
- // Cluster enabled at time if cluster is currently enabled
- THashMap<TString, TInstant> ClustersEnabledAt;
+ // Created at by session cookie map by remote preferred cluster name
+ THashMap<TString, THashMap<ui64, TInstant>> SessionsByRemotePreferredCluster;
+ THashMap<ui64, TString> RemotePreferredClusterBySessionCookie;
+ // Cluster enabled at time if cluster is currently enabled
+ THashMap<TString, TInstant> ClustersEnabledAt;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index a706a1cdd5..3038cc82a6 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -14,7 +14,7 @@
#include <util/string/hex.h>
#include <util/string/vector.h>
#include <util/string/escape.h>
-#include <util/string/printf.h>
+#include <util/string/printf.h>
using namespace NActors;
using namespace NKikimrClient;
@@ -76,7 +76,7 @@ void FillChunkDataFromReq(
namespace NGRpcProxy {
namespace V1 {
-using namespace Ydb::PersQueue::V1;
+using namespace Ydb::PersQueue::V1;
static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
@@ -131,11 +131,11 @@ TWriteSessionActor::TWriteSessionActor(
, NextRequestInited(false)
, NextRequestCookie(0)
, Token(nullptr)
- , UpdateTokenInProgress(false)
- , UpdateTokenAuthenticated(false)
+ , UpdateTokenInProgress(false)
+ , UpdateTokenAuthenticated(false)
, ACLCheckInProgress(false)
, FirstACLCheck(true)
- , RequestNotChecked(false)
+ , RequestNotChecked(false)
, LastACLCheckTimestamp(TInstant::Zero())
, LogSessionDeadline(TInstant::Zero())
, BalancerTabletId(0)
@@ -191,27 +191,27 @@ TString WriteRequestToLog(const Ydb::PersQueue::V1::StreamingWriteClientMessage&
void TWriteSessionActor::Handle(IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read done: success: " << ev->Get()->Success << " data: " << WriteRequestToLog(ev->Get()->Record));
- if (!ev->Get()->Success) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone());
- return;
- }
-
- switch(ev->Get()->Record.client_message_case()) {
- case StreamingWriteClientMessage::kInitRequest:
+ if (!ev->Get()->Success) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone());
+ return;
+ }
+
+ switch(ev->Get()->Record.client_message_case()) {
+ case StreamingWriteClientMessage::kInitRequest:
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvWriteInit(std::move(ev->Get()->Record), Request->GetStreamCtx()->GetPeerName()));
break;
- case StreamingWriteClientMessage::kWriteRequest:
+ case StreamingWriteClientMessage::kWriteRequest:
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvWrite(std::move(ev->Get()->Record)));
break;
- case StreamingWriteClientMessage::kUpdateTokenRequest: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvUpdateToken(std::move(ev->Get()->Record)));
- break;
- }
- case StreamingWriteClientMessage::CLIENT_MESSAGE_NOT_SET: {
- CloseSession("'client_message' is not set", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
+ case StreamingWriteClientMessage::kUpdateTokenRequest: {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvUpdateToken(std::move(ev->Get()->Record)));
+ break;
+ }
+ case StreamingWriteClientMessage::CLIENT_MESSAGE_NOT_SET: {
+ CloseSession("'client_message' is not set", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
}
}
@@ -261,7 +261,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext&
void TWriteSessionActor::CheckACL(const TActorContext& ctx) {
//Y_VERIFY(ACLCheckInProgress);
-
+
NACLib::EAccessRights rights = NACLib::EAccessRights::UpdateRow;
Y_VERIFY(ACL);
@@ -271,16 +271,16 @@ void TWriteSessionActor::CheckACL(const TActorContext& ctx) {
FirstACLCheck = false;
DiscoverPartition(ctx);
}
- if (UpdateTokenInProgress && UpdateTokenAuthenticated) {
- UpdateTokenInProgress = false;
- StreamingWriteServerMessage serverMessage;
- serverMessage.set_status(Ydb::StatusIds::SUCCESS);
- serverMessage.mutable_update_token_response();
- if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed");
- Die(ctx);
- }
- }
+ if (UpdateTokenInProgress && UpdateTokenAuthenticated) {
+ UpdateTokenInProgress = false;
+ StreamingWriteServerMessage serverMessage;
+ serverMessage.set_status(Ydb::StatusIds::SUCCESS);
+ serverMessage.mutable_update_token_response();
+ if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed");
+ Die(ctx);
+ }
+ }
} else {
TString errorReason = Sprintf("access to topic '%s' denied for '%s' due to 'no WriteTopic rights', Marker# PQ1125",
TopicConverter->GetClientsideName().c_str(),
@@ -297,10 +297,10 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
CloseSession("got second init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
- const auto& init = event->Request.init_request();
+ const auto& init = event->Request.init_request();
- if (init.topic().empty() || init.message_group_id().empty()) {
- CloseSession("no topic or message_group_id in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ if (init.topic().empty() || init.message_group_id().empty()) {
+ CloseSession("no topic or message_group_id in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
@@ -315,7 +315,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
PeerName = event->PeerName;
- SourceId = init.message_group_id();
+ SourceId = init.message_group_id();
TString encodedSourceId;
try {
encodedSourceId = NPQ::NSourceIdEncoding::Encode(SourceId);
@@ -343,7 +343,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
InitCheckSchema(ctx, true);
- PreferedPartition = init.partition_group_id() > 0 ? init.partition_group_id() - 1 : Max<ui32>();
+ PreferedPartition = init.partition_group_id() > 0 ? init.partition_group_id() - 1 : Max<ui32>();
InitMeta = GetInitialDataChunk(init, TopicConverter->GetFullLegacyName(), PeerName); // ToDo[migration] - check?
@@ -353,11 +353,11 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false);
SLIErrors = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsError"}, true, "sensor", false);
SLITotal.Inc();
-
- const auto& preferredCluster = init.preferred_cluster();
- if (!preferredCluster.empty()) {
- Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionSetPreferredCluster(Cookie, preferredCluster));
- }
+
+ const auto& preferredCluster = init.preferred_cluster();
+ if (!preferredCluster.empty()) {
+ Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionSetPreferredCluster(Cookie, preferredCluster));
+ }
}
void TWriteSessionActor::SetupCounters()
@@ -442,8 +442,8 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
if (Request->GetInternalToken().empty()) { // session without auth
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials");
- Die(ctx);
+ Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials");
+ Die(ctx);
return;
}
Y_VERIFY(FirstACLCheck);
@@ -580,7 +580,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
<< SourceId << " escaped " << EscapedSourceId << " discover partition race, retrying");
DiscoverPartition(ctx);
return;
@@ -609,7 +609,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
Partition = tt.GetOptional().GetUint32();
if (PreferedPartition < Max<ui32>() && Partition != PreferedPartition) {
- CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.",
+ CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.",
PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
@@ -618,7 +618,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
}
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
<< SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions "
<< PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
@@ -728,7 +728,7 @@ void TWriteSessionActor::CloseSession(const TString& errorReason, const PersQueu
++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("Errors", true));
}
- StreamingWriteServerMessage result;
+ StreamingWriteServerMessage result;
result.set_status(ConvertPersQueueInternalCodeToStatus(errorCode));
FillIssue(result.add_issues(), errorCode, errorReason);
@@ -879,7 +879,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr&
stat->set_persist_duration_ms(
Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms()));
};
-
+
ui32 partitionCmdWriteResultIndex = 0;
// TODO: Send single batch write response for all user write requests up to some max size/count
for (const auto& userWriteRequest : writeRequest->UserWriteRequests) {
@@ -887,7 +887,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr&
result.set_status(Ydb::StatusIds::SUCCESS);
auto batchWriteResponse = result.mutable_batch_write_response();
batchWriteResponse->set_partition_id(Partition);
-
+
for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) {
if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) {
CloseSession("too less responses from server", PersQueue::ErrorCode::ERROR, ctx);
@@ -948,21 +948,21 @@ void TWriteSessionActor::GenerateNextWriteRequest(const TActorContext& ctx) {
Writes.clear();
i64 diff = 0;
- auto addData = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) {
+ auto addData = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) {
auto w = request.MutablePartitionRequest()->AddCmdWrite();
w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex));
- w->SetSeqNo(writeRequest.sequence_numbers(messageIndex));
+ w->SetSeqNo(writeRequest.sequence_numbers(messageIndex));
w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId));
- w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex));
+ w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex));
w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex));
w->SetClientDC(ClientDC);
};
for (const auto& write : writeRequest->UserWriteRequests) {
diff -= write->Request.ByteSize();
- const auto& writeRequest = write->Request.write_request();
- for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) {
- addData(writeRequest, messageIndex);
+ const auto& writeRequest = write->Request.write_request();
+ for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) {
+ addData(writeRequest, messageIndex);
}
}
@@ -982,48 +982,48 @@ void TWriteSessionActor::GenerateNextWriteRequest(const TActorContext& ctx) {
++NumReserveBytesRequests;
}
-void TWriteSessionActor::Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const TActorContext& ctx) {
- if (State != ES_INITED) {
- CloseSession("got 'update_token_request' but write session is not initialized", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
- if (UpdateTokenInProgress) {
- CloseSession("got another 'update_token_request' while previous still in progress, only single token update is allowed at a time", PersQueue::ErrorCode::OVERLOAD, ctx);
- return;
- }
-
- const auto& token = ev->Get()->Request.update_token_request().token();
- if (token == Auth || (token.empty() && !AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol())) {
- // Got same token or empty token with no non-empty token requirement, do not trigger any checks
- StreamingWriteServerMessage serverMessage;
- serverMessage.set_status(Ydb::StatusIds::SUCCESS);
- serverMessage.mutable_update_token_response();
- if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed");
- Die(ctx);
- return;
- }
- }
- else if (token.empty()) {
- Request->ReplyUnauthenticated("'token' in 'update_token_request' is empty");
- Die(ctx);
- return;
- }
- else {
- UpdateTokenInProgress = true;
- UpdateTokenAuthenticated = false;
- Auth = token;
- Request->RefreshToken(Auth, ctx, ctx.SelfID);
- }
-
- NextRequestInited = true;
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");
- Die(ctx);
- return;
- }
-}
-
+void TWriteSessionActor::Handle(TEvPQProxy::TEvUpdateToken::TPtr& ev, const TActorContext& ctx) {
+ if (State != ES_INITED) {
+ CloseSession("got 'update_token_request' but write session is not initialized", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ if (UpdateTokenInProgress) {
+ CloseSession("got another 'update_token_request' while previous still in progress, only single token update is allowed at a time", PersQueue::ErrorCode::OVERLOAD, ctx);
+ return;
+ }
+
+ const auto& token = ev->Get()->Request.update_token_request().token();
+ if (token == Auth || (token.empty() && !AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol())) {
+ // Got same token or empty token with no non-empty token requirement, do not trigger any checks
+ StreamingWriteServerMessage serverMessage;
+ serverMessage.set_status(Ydb::StatusIds::SUCCESS);
+ serverMessage.mutable_update_token_response();
+ if (!Request->GetStreamCtx()->Write(std::move(serverMessage))) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed");
+ Die(ctx);
+ return;
+ }
+ }
+ else if (token.empty()) {
+ Request->ReplyUnauthenticated("'token' in 'update_token_request' is empty");
+ Die(ctx);
+ return;
+ }
+ else {
+ UpdateTokenInProgress = true;
+ UpdateTokenAuthenticated = false;
+ Auth = token;
+ Request->RefreshToken(Auth, ctx, ctx.SelfID);
+ }
+
+ NextRequestInited = true;
+ if (!Request->GetStreamCtx()->Read()) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");
+ Die(ctx);
+ return;
+ }
+}
+
void TWriteSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr &ev , const TActorContext& ctx) {
Y_UNUSED(ctx);
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "updating token");
@@ -1031,10 +1031,10 @@ void TWriteSessionActor::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshToken
if (ev->Get()->Authenticated && !ev->Get()->InternalToken.empty()) {
Token = new NACLib::TUserToken(ev->Get()->InternalToken);
Request->SetInternalToken(ev->Get()->InternalToken);
- UpdateTokenAuthenticated = true;
- if (!ACLCheckInProgress) {
+ UpdateTokenAuthenticated = true;
+ if (!ACLCheckInProgress) {
InitCheckSchema(ctx);
- }
+ }
} else {
Request->ReplyUnauthenticated("refreshed token is invalid");
Die(ctx);
@@ -1051,65 +1051,65 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont
return;
}
- const auto& writeRequest = ev->Get()->Request.write_request();
- if (!AllEqual(writeRequest.sequence_numbers_size(), writeRequest.created_at_ms_size(), writeRequest.sent_at_ms_size(), writeRequest.message_sizes_size())) {
- CloseSession(TStringBuilder() << "messages meta repeated fields do not have same size, 'sequence_numbers' size is " << writeRequest.sequence_numbers_size()
- << ", 'message_sizes' size is " << writeRequest.message_sizes_size() << ", 'created_at_ms' size is " << writeRequest.created_at_ms_size()
- << " and 'sent_at_ms' size is " << writeRequest.sent_at_ms_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
- if (!AllEqual(writeRequest.blocks_offsets_size(), writeRequest.blocks_part_numbers_size(), writeRequest.blocks_message_counts_size(), writeRequest.blocks_uncompressed_sizes_size(), writeRequest.blocks_headers_size(), writeRequest.blocks_data_size())) {
- CloseSession(TStringBuilder() << "blocks repeated fields do no have same size, 'blocks_offsets' size is " << writeRequest.blocks_offsets_size()
- << ", 'blocks_part_numbers' size is " << writeRequest.blocks_part_numbers_size() << ", 'blocks_message_counts' size is " << writeRequest.blocks_message_counts_size()
- << ", 'blocks_uncompressed_sizes' size is " << writeRequest.blocks_uncompressed_sizes_size() << ", 'blocks_headers' size is " << writeRequest.blocks_headers_size()
- << " and 'blocks_data' size is " << writeRequest.blocks_data_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
-
- const i32 messageCount = writeRequest.sequence_numbers_size();
- const i32 blockCount = writeRequest.blocks_offsets_size();
- if (messageCount == 0) {
- CloseSession(TStringBuilder() << "messages meta repeated fields are empty, write request contains no messages", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
- if (messageCount != blockCount) {
- CloseSession(TStringBuilder() << "messages meta repeated fields and blocks repeated fields do not have same size, messages meta fields size is " << messageCount
- << " and blocks fields size is " << blockCount << ", only one message per block is supported in blocks format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
- }
- auto dataCheck = [&](const StreamingWriteClientMessage::WriteRequest& data, const i32 messageIndex) -> bool {
- if (data.sequence_numbers(messageIndex) <= 0) {
- CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' items must be greater than 0. Value at position " << messageIndex << " is " << data.sequence_numbers(messageIndex), PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ const auto& writeRequest = ev->Get()->Request.write_request();
+ if (!AllEqual(writeRequest.sequence_numbers_size(), writeRequest.created_at_ms_size(), writeRequest.sent_at_ms_size(), writeRequest.message_sizes_size())) {
+ CloseSession(TStringBuilder() << "messages meta repeated fields do not have same size, 'sequence_numbers' size is " << writeRequest.sequence_numbers_size()
+ << ", 'message_sizes' size is " << writeRequest.message_sizes_size() << ", 'created_at_ms' size is " << writeRequest.created_at_ms_size()
+ << " and 'sent_at_ms' size is " << writeRequest.sent_at_ms_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ if (!AllEqual(writeRequest.blocks_offsets_size(), writeRequest.blocks_part_numbers_size(), writeRequest.blocks_message_counts_size(), writeRequest.blocks_uncompressed_sizes_size(), writeRequest.blocks_headers_size(), writeRequest.blocks_data_size())) {
+ CloseSession(TStringBuilder() << "blocks repeated fields do no have same size, 'blocks_offsets' size is " << writeRequest.blocks_offsets_size()
+ << ", 'blocks_part_numbers' size is " << writeRequest.blocks_part_numbers_size() << ", 'blocks_message_counts' size is " << writeRequest.blocks_message_counts_size()
+ << ", 'blocks_uncompressed_sizes' size is " << writeRequest.blocks_uncompressed_sizes_size() << ", 'blocks_headers' size is " << writeRequest.blocks_headers_size()
+ << " and 'blocks_data' size is " << writeRequest.blocks_data_size(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+
+ const i32 messageCount = writeRequest.sequence_numbers_size();
+ const i32 blockCount = writeRequest.blocks_offsets_size();
+ if (messageCount == 0) {
+ CloseSession(TStringBuilder() << "messages meta repeated fields are empty, write request contains no messages", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ if (messageCount != blockCount) {
+ CloseSession(TStringBuilder() << "messages meta repeated fields and blocks repeated fields do not have same size, messages meta fields size is " << messageCount
+ << " and blocks fields size is " << blockCount << ", only one message per block is supported in blocks format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ auto dataCheck = [&](const StreamingWriteClientMessage::WriteRequest& data, const i32 messageIndex) -> bool {
+ if (data.sequence_numbers(messageIndex) <= 0) {
+ CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' items must be greater than 0. Value at position " << messageIndex << " is " << data.sequence_numbers(messageIndex), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return false;
}
- if (messageIndex > 0 && data.sequence_numbers(messageIndex) <= data.sequence_numbers(messageIndex - 1)) {
- CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' are unsorted. Value " << data.sequence_numbers(messageIndex) << " at position " << messageIndex
- << " is less than or equal to value " << data.sequence_numbers(messageIndex - 1) << " at position " << (messageIndex - 1), PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ if (messageIndex > 0 && data.sequence_numbers(messageIndex) <= data.sequence_numbers(messageIndex - 1)) {
+ CloseSession(TStringBuilder() << "bad write request - 'sequence_numbers' are unsorted. Value " << data.sequence_numbers(messageIndex) << " at position " << messageIndex
+ << " is less than or equal to value " << data.sequence_numbers(messageIndex - 1) << " at position " << (messageIndex - 1), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return false;
}
-
- if (data.blocks_headers(messageIndex).size() != CODEC_ID_SIZE) {
- CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " has incorrect size " << data.blocks_headers(messageIndex).size() << " [B]. Only headers of size " << CODEC_ID_SIZE << " [B] (with codec identifier) are supported in block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return false;
- }
-
- const char& codecID = data.blocks_headers(messageIndex).front();
- TString error;
- if (!ValidateWriteWithCodec(InitialPQTabletConfig, codecID, error)) {
- CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return false;
+
+ if (data.blocks_headers(messageIndex).size() != CODEC_ID_SIZE) {
+ CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " has incorrect size " << data.blocks_headers(messageIndex).size() << " [B]. Only headers of size " << CODEC_ID_SIZE << " [B] (with codec identifier) are supported in block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return false;
+ }
+
+ const char& codecID = data.blocks_headers(messageIndex).front();
+ TString error;
+ if (!ValidateWriteWithCodec(InitialPQTabletConfig, codecID, error)) {
+ CloseSession(TStringBuilder() << "bad write request - 'blocks_headers' at position " << messageIndex << " is invalid: " << error, PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return false;
+ }
+
+ if (data.blocks_message_counts(messageIndex) != 1) {
+ CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex)
+ << ", only single message per block is supported by block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return false;
}
-
- if (data.blocks_message_counts(messageIndex) != 1) {
- CloseSession(TStringBuilder() << "bad write request - 'blocks_message_counts' at position " << messageIndex << " is " << data.blocks_message_counts(messageIndex)
- << ", only single message per block is supported by block format version 0", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return false;
- }
return true;
};
- for (i32 messageIndex = 0; messageIndex != messageCount; ++messageIndex) {
- if (!dataCheck(writeRequest, messageIndex)) {
+ for (i32 messageIndex = 0; messageIndex != messageCount; ++messageIndex) {
+ if (!dataCheck(writeRequest, messageIndex)) {
return;
}
}
@@ -1154,7 +1154,7 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) {
void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) {
Y_VERIFY(State == ES_INITED);
ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
- if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) {
+ if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) {
RequestNotChecked = false;
InitCheckSchema(ctx);
}
diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp
index 8572a6c02f..ccacc4d43a 100644
--- a/ydb/services/persqueue_v1/persqueue.cpp
+++ b/ydb/services/persqueue_v1/persqueue.cpp
@@ -71,9 +71,9 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
{
- using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage;
+ using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage;
- using TBiResponse = Ydb::PersQueue::V1::StreamingWriteServerMessage;
+ using TBiResponse = Ydb::PersQueue::V1::StreamingWriteServerMessage;
using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest<
TBiRequest,
@@ -82,7 +82,7 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NKikimrServices::GRPC_SERVER>;
- TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite,
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite,
[this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context));
},
@@ -114,7 +114,7 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#error ADD_REQUEST macro already defined
#endif
#define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \
+ MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \
[this](NGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
ACTION; \
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 11f7ade7cc..3bc9b21728 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1,10 +1,10 @@
-#include "grpc_pq_actor.h"
+#include "grpc_pq_actor.h"
#include <ydb/services/persqueue_v1/ut/pq_data_writer.h>
#include <ydb/services/persqueue_v1/ut/api_test_setup.h>
#include <ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h>
#include <ydb/services/persqueue_v1/ut/test_utils.h>
#include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h>
-
+
#include <ydb/core/base/appdata.h>
#include <ydb/core/testlib/test_pq_client.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
@@ -43,7 +43,7 @@ namespace NKikimr::NPersQueueTests {
using namespace Tests;
using namespace NKikimrClient;
using namespace Ydb::PersQueue;
-using namespace Ydb::PersQueue::V1;
+using namespace Ydb::PersQueue::V1;
using namespace NThreading;
using namespace NNetClassifier;
@@ -99,22 +99,22 @@ namespace {
Y_UNIT_TEST_SUITE(TPersQueueTest) {
- Y_UNIT_TEST(AllEqual) {
- using NGRpcProxy::V1::AllEqual;
-
- UNIT_ASSERT(AllEqual(0));
- UNIT_ASSERT(AllEqual(0, 0));
- UNIT_ASSERT(AllEqual(0, 0, 0));
- UNIT_ASSERT(AllEqual(1, 1, 1));
- UNIT_ASSERT(AllEqual(1, 1, 1, 1, 1, 1));
- UNIT_ASSERT(!AllEqual(1, 0));
- UNIT_ASSERT(!AllEqual(0, 1));
- UNIT_ASSERT(!AllEqual(0, 1, 0));
- UNIT_ASSERT(!AllEqual(1, 1, 0));
- UNIT_ASSERT(!AllEqual(0, 1, 1));
- UNIT_ASSERT(!AllEqual(1, 1, 1, 1, 1, 0));
- }
-
+ Y_UNIT_TEST(AllEqual) {
+ using NGRpcProxy::V1::AllEqual;
+
+ UNIT_ASSERT(AllEqual(0));
+ UNIT_ASSERT(AllEqual(0, 0));
+ UNIT_ASSERT(AllEqual(0, 0, 0));
+ UNIT_ASSERT(AllEqual(1, 1, 1));
+ UNIT_ASSERT(AllEqual(1, 1, 1, 1, 1, 1));
+ UNIT_ASSERT(!AllEqual(1, 0));
+ UNIT_ASSERT(!AllEqual(0, 1));
+ UNIT_ASSERT(!AllEqual(0, 1, 0));
+ UNIT_ASSERT(!AllEqual(1, 1, 0));
+ UNIT_ASSERT(!AllEqual(0, 1, 1));
+ UNIT_ASSERT(!AllEqual(1, 1, 1, 1, 1, 0));
+ }
+
Y_UNIT_TEST(SetupLockSession2) {
TPersQueueV1TestServer server;
SET_LOCALS;
@@ -428,27 +428,27 @@ namespace {
UNIT_ASSERT(!writer2->Close());
}
- Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) {
+ Y_UNIT_TEST(EachMessageGetsExactlyOneAcknowledgementInCorrectOrder) {
NPersQueue::TTestServer server;
server.AnnoyingClient->CreateTopic("rt3.dc1--topic", 1);
-
+
auto driver = server.AnnoyingClient->GetDriver();
auto writer = CreateSimpleWriter(*driver, "topic", "test source ID");
bool res = true;
- ui32 messageCount = 1000;
-
- for (ui32 sequenceNumber = 1; sequenceNumber <= messageCount; ++sequenceNumber) {
+ ui32 messageCount = 1000;
+
+ for (ui32 sequenceNumber = 1; sequenceNumber <= messageCount; ++sequenceNumber) {
res = writer->Write("x", sequenceNumber);
UNIT_ASSERT(res);
- }
+ }
UNIT_ASSERT(writer->IsAlive());
res = writer->Close(TDuration::Seconds(10));
UNIT_ASSERT(res);
- }
-
+ }
+
Y_UNIT_TEST(SetupWriteSessionOnDisabledCluster) {
TPersQueueV1TestServer server;
SET_LOCALS;
@@ -1079,7 +1079,7 @@ namespace {
NACLib::TDiffACL acl;
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "topic1@" BUILTIN_ACL_DOMAIN);
server.AnnoyingClient->ModifyACL("/Root/PQ", DEFAULT_TOPIC_NAME, acl.SerializeAsString());
- WaitACLModification();
+ WaitACLModification();
writer2.Write(SHORT_TOPIC_NAME, {"valuevaluevalue1"}, false, "topic1@" BUILTIN_ACL_DOMAIN);
writer2.Write(SHORT_TOPIC_NAME, {"valuevaluevalue1"}, true, "invalid_ticket");
@@ -1092,7 +1092,7 @@ namespace {
dynamic_cast<TTestCredentialsProviderFactory*>(creds.get())->SetToken(NYdb::TStringType("topic1@" BUILTIN_ACL_DOMAIN));
auto writer = CreateWriter(*driver, SHORT_TOPIC_NAME, "123", {}, {}, {}, creds);
-
+
auto msg = writer->GetEvent(true);
UNIT_ASSERT(msg); // ReadyToAcceptEvent
@@ -1166,7 +1166,7 @@ namespace {
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@" BUILTIN_ACL_DOMAIN);
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user2@" BUILTIN_ACL_DOMAIN);
server.AnnoyingClient->ModifyACL("/Root/PQ", topic2, acl.SerializeAsString());
- WaitACLModification();
+ WaitACLModification();
auto ticket1 = "1@" BUILTIN_ACL_DOMAIN;
auto ticket2 = "2@" BUILTIN_ACL_DOMAIN;
@@ -1186,7 +1186,7 @@ namespace {
acl.Clear();
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user3@" BUILTIN_ACL_DOMAIN);
server.AnnoyingClient->ModifyACL("/Root/PQ", topic2, acl.SerializeAsString());
- WaitACLModification();
+ WaitACLModification();
Cerr << "==== Writer - read\n";
writer.Read(shortTopic2Name, "user1", "user3@" BUILTIN_ACL_DOMAIN, false, true, true);
@@ -1359,7 +1359,7 @@ namespace {
if (i == 5) {
UNIT_ASSERT(TInstant::Now() - st > TDuration::Seconds(3));
- // TODO: Describe this assert in comment
+ // TODO: Describe this assert in comment
UNIT_ASSERT(!ack->Acks.empty());
UNIT_ASSERT(ack->Acks.back().Stat);
UNIT_ASSERT(ack->Acks.back().Stat->TotalTimeInPartitionQueue >= ack->Acks.back().Stat->PartitionQuotedTime);
@@ -1747,7 +1747,7 @@ namespace {
for (ui32 i = 1; i <= 11; ++i) {
auto f = producer->Write(i, TString(10, 'a'));
f.Wait();
- UNIT_ASSERT_C(f.GetValue().Response.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, f.GetValue().Response);
+ UNIT_ASSERT_C(f.GetValue().Response.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, f.GetValue().Response);
}
}
@@ -2129,227 +2129,227 @@ namespace {
UNIT_ASSERT_EQUAL(ccResult.Response.Getissues(0).issue_code(), Ydb::PersQueue::ErrorCode::UNKNOWN_TOPIC);
}
*/
- Y_UNIT_TEST(Codecs_InitWriteSession_DefaultTopicSupportedCodecsInInitResponse) {
- APITestSetup setup{TEST_CASE_NAME};
- grpc::ClientContext context;
- auto session = setup.GetPersQueueService()->StreamingWrite(&context);
-
-
- auto serverMessage = setup.InitSession(session);
-
-
+ Y_UNIT_TEST(Codecs_InitWriteSession_DefaultTopicSupportedCodecsInInitResponse) {
+ APITestSetup setup{TEST_CASE_NAME};
+ grpc::ClientContext context;
+ auto session = setup.GetPersQueueService()->StreamingWrite(&context);
+
+
+ auto serverMessage = setup.InitSession(session);
+
+
auto defaultSupportedCodecs = TVector<Ydb::PersQueue::V1::Codec>{ Ydb::PersQueue::V1::CODEC_RAW, Ydb::PersQueue::V1::CODEC_GZIP, Ydb::PersQueue::V1::CODEC_LZOP };
- auto topicSupportedCodecs = serverMessage.init_response().supported_codecs();
+ auto topicSupportedCodecs = serverMessage.init_response().supported_codecs();
UNIT_ASSERT_VALUES_EQUAL_C(defaultSupportedCodecs.size(), topicSupportedCodecs.size(), serverMessage.init_response());
UNIT_ASSERT_C(Equal(defaultSupportedCodecs.begin(), defaultSupportedCodecs.end(), topicSupportedCodecs.begin()), serverMessage.init_response());
- }
-
- Y_UNIT_TEST(Codecs_WriteMessageWithDefaultCodecs_MessagesAreAcknowledged) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- StreamingWriteClientMessage clientMessage;
- auto* writeRequest = clientMessage.mutable_write_request();
- auto sequenceNumber = 1;
- auto messageCount = 0;
- const auto message = NUnitTest::RandomString(250 * 1024, std::rand());
- auto compress = [](TString data, i32 codecID) {
- Y_UNUSED(codecID);
- return TString(data);
- };
- TVector<char> defaultCodecs{0, 1, 2};
- for (const auto& codecID : defaultCodecs) {
- auto compressedMessage = compress(message, codecID);
-
- writeRequest->add_sequence_numbers(sequenceNumber++);
- writeRequest->add_message_sizes(message.size());
- writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds());
- writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds());
- writeRequest->add_blocks_offsets(0);
- writeRequest->add_blocks_part_numbers(0);
- writeRequest->add_blocks_message_counts(1);
- writeRequest->add_blocks_uncompressed_sizes(message.size());
- writeRequest->add_blocks_headers(TString(1, codecID));
- writeRequest->add_blocks_data(compressedMessage);
- ++messageCount;
- }
- auto session = setup.InitWriteSession();
-
-
- AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage);
-
-
- StreamingWriteServerMessage serverMessage;
- log << TLOG_INFO << "Wait for write acknowledgement";
- AssertSuccessfullStreamingOperation(session.first->Read(&serverMessage), session.first);
- UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, serverMessage);
- UNIT_ASSERT_VALUES_EQUAL_C(defaultCodecs.size(), serverMessage.batch_write_response().offsets_size(), serverMessage);
- }
-
- Y_UNIT_TEST(Codecs_WriteMessageWithNonDefaultCodecThatHasToBeConfiguredAdditionally_SessionClosedWithBadRequestError) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- StreamingWriteClientMessage clientMessage;
- auto* writeRequest = clientMessage.mutable_write_request();
- const auto message = NUnitTest::RandomString(250 * 1024, std::rand());
- const auto codecID = 3;
- writeRequest->add_sequence_numbers(1);
- writeRequest->add_message_sizes(message.size());
- writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds());
- writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds());
- writeRequest->add_blocks_offsets(0);
- writeRequest->add_blocks_part_numbers(0);
- writeRequest->add_blocks_message_counts(1);
- writeRequest->add_blocks_uncompressed_sizes(message.size());
- writeRequest->add_blocks_headers(TString(1, codecID));
- writeRequest->add_blocks_data(message);
- auto session = setup.InitWriteSession();
-
-
- AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage);
-
- log << TLOG_INFO << "Wait for session to die";
- AssertStreamingSessionDead(session.first, Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST);
- }
-
- StreamingWriteClientMessage::InitRequest GenerateSessionSetupWithPreferredCluster(const TString preferredCluster) {
- StreamingWriteClientMessage::InitRequest sessionSetup;
- sessionSetup.set_preferred_cluster(preferredCluster);
- sessionSetup.set_message_group_id("test-message-group-id-" + preferredCluster);
- return sessionSetup;
- };
-
- Y_UNIT_TEST(PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
-
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
+ }
+
+ Y_UNIT_TEST(Codecs_WriteMessageWithDefaultCodecs_MessagesAreAcknowledged) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ StreamingWriteClientMessage clientMessage;
+ auto* writeRequest = clientMessage.mutable_write_request();
+ auto sequenceNumber = 1;
+ auto messageCount = 0;
+ const auto message = NUnitTest::RandomString(250 * 1024, std::rand());
+ auto compress = [](TString data, i32 codecID) {
+ Y_UNUSED(codecID);
+ return TString(data);
+ };
+ TVector<char> defaultCodecs{0, 1, 2};
+ for (const auto& codecID : defaultCodecs) {
+ auto compressedMessage = compress(message, codecID);
+
+ writeRequest->add_sequence_numbers(sequenceNumber++);
+ writeRequest->add_message_sizes(message.size());
+ writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds());
+ writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds());
+ writeRequest->add_blocks_offsets(0);
+ writeRequest->add_blocks_part_numbers(0);
+ writeRequest->add_blocks_message_counts(1);
+ writeRequest->add_blocks_uncompressed_sizes(message.size());
+ writeRequest->add_blocks_headers(TString(1, codecID));
+ writeRequest->add_blocks_data(compressedMessage);
+ ++messageCount;
+ }
+ auto session = setup.InitWriteSession();
+
+
+ AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage);
+
+
+ StreamingWriteServerMessage serverMessage;
+ log << TLOG_INFO << "Wait for write acknowledgement";
+ AssertSuccessfullStreamingOperation(session.first->Read(&serverMessage), session.first);
+ UNIT_ASSERT_C(serverMessage.server_message_case() == StreamingWriteServerMessage::kBatchWriteResponse, serverMessage);
+ UNIT_ASSERT_VALUES_EQUAL_C(defaultCodecs.size(), serverMessage.batch_write_response().offsets_size(), serverMessage);
+ }
+
+ Y_UNIT_TEST(Codecs_WriteMessageWithNonDefaultCodecThatHasToBeConfiguredAdditionally_SessionClosedWithBadRequestError) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ StreamingWriteClientMessage clientMessage;
+ auto* writeRequest = clientMessage.mutable_write_request();
+ const auto message = NUnitTest::RandomString(250 * 1024, std::rand());
+ const auto codecID = 3;
+ writeRequest->add_sequence_numbers(1);
+ writeRequest->add_message_sizes(message.size());
+ writeRequest->add_created_at_ms(TInstant::Now().MilliSeconds());
+ writeRequest->add_sent_at_ms(TInstant::Now().MilliSeconds());
+ writeRequest->add_blocks_offsets(0);
+ writeRequest->add_blocks_part_numbers(0);
+ writeRequest->add_blocks_message_counts(1);
+ writeRequest->add_blocks_uncompressed_sizes(message.size());
+ writeRequest->add_blocks_headers(TString(1, codecID));
+ writeRequest->add_blocks_data(message);
+ auto session = setup.InitWriteSession();
+
+
+ AssertSuccessfullStreamingOperation(session.first->Write(clientMessage), session.first, &clientMessage);
+
+ log << TLOG_INFO << "Wait for session to die";
+ AssertStreamingSessionDead(session.first, Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST);
+ }
+
+ StreamingWriteClientMessage::InitRequest GenerateSessionSetupWithPreferredCluster(const TString preferredCluster) {
+ StreamingWriteClientMessage::InitRequest sessionSetup;
+ sessionSetup.set_preferred_cluster(preferredCluster);
+ sessionSetup.set_message_group_id("test-message-group-id-" + preferredCluster);
+ return sessionSetup;
+ };
+
+ Y_UNIT_TEST(PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0);
- auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString()));
- auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster()));
- auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
- grpc::ClientContext context;
- auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context);
-
- log << TLOG_INFO << "Wait for session with remote preferred cluster to die";
- AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
- AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
- AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
-
- setup.InitSession(sessionWithNoInitialization);
- AssertStreamingSessionAlive(sessionWithNoInitialization);
- }
-
- Y_UNIT_TEST(PreferredCluster_DisabledRemoteClusterAndWriteSessionsWithDifferentPreferredClusterAndLaterRemoteClusterEnabled_SessionWithMismatchedClusterDiesAfterPreferredClusterEnabledAndOtherSessionsAlive) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- log << TLOG_INFO << "Disable remote cluster " << setup.GetRemoteCluster().Quote();
- setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false);
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
- setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0);
- auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString()));
- auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster()));
- auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
- AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
- AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
- AssertStreamingSessionAlive(sessionWithRemotePrefferedCluster.first);
-
- log << TLOG_INFO << "Enable remote cluster " << setup.GetRemoteCluster().Quote();
- setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true);
-
- log << TLOG_INFO << "Wait for session with remote preferred cluster to die";
- AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
- AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
- AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
- }
-
- Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndCloseClientSessionWithEnabledRemotePreferredClusterDelaySec_SessionDiesOnlyAfterDelay) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
+ auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString()));
+ auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster()));
+ auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+ grpc::ClientContext context;
+ auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context);
+
+ log << TLOG_INFO << "Wait for session with remote preferred cluster to die";
+ AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
+ AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
+
+ setup.InitSession(sessionWithNoInitialization);
+ AssertStreamingSessionAlive(sessionWithNoInitialization);
+ }
+
+ Y_UNIT_TEST(PreferredCluster_DisabledRemoteClusterAndWriteSessionsWithDifferentPreferredClusterAndLaterRemoteClusterEnabled_SessionWithMismatchedClusterDiesAfterPreferredClusterEnabledAndOtherSessionsAlive) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ log << TLOG_INFO << "Disable remote cluster " << setup.GetRemoteCluster().Quote();
+ setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false);
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
+ setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0);
+ auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString()));
+ auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster()));
+ auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+ AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
+ AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
+ AssertStreamingSessionAlive(sessionWithRemotePrefferedCluster.first);
+
+ log << TLOG_INFO << "Enable remote cluster " << setup.GetRemoteCluster().Quote();
+ setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true);
+
+ log << TLOG_INFO << "Wait for session with remote preferred cluster to die";
+ AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
+ AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
+ }
+
+ Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndCloseClientSessionWithEnabledRemotePreferredClusterDelaySec_SessionDiesOnlyAfterDelay) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0);
- const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
-
+ const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
+
setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
log << TLOG_INFO << "Wait for cluster tracker event";
auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
-
-
- auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
-
- AssertStreamingSessionAlive(session.first);
-
- AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
- }
-
- Y_UNIT_TEST(PreferredCluster_NonExistentPreferredCluster_SessionDiesOnlyAfterDelay) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
+
+
+ auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+
+ AssertStreamingSessionAlive(session.first);
+
+ AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ }
+
+ Y_UNIT_TEST(PreferredCluster_NonExistentPreferredCluster_SessionDiesOnlyAfterDelay) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2);
-
+
TInstant now(TInstant::Now());
- auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster("non-existent-cluster"));
- AssertStreamingSessionAlive(session.first);
-
- AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster("non-existent-cluster"));
+ AssertStreamingSessionAlive(session.first);
+
+ AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999));
- }
-
- Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndRemoteClusterEnabledDelaySec_SessionDiesOnlyAfterDelay) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ }
+
+ Y_UNIT_TEST(PreferredCluster_EnabledRemotePreferredClusterAndRemoteClusterEnabledDelaySec_SessionDiesOnlyAfterDelay) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2);
- const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
-
+ const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
+
setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
-
+
TInstant now(TInstant::Now());
- auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
-
- setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
- log << TLOG_INFO << "Wait for cluster tracker event";
- auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
- AssertStreamingSessionAlive(session.first);
-
+ auto session = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+
+ setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
+ log << TLOG_INFO << "Wait for cluster tracker event";
+ auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
+ AssertStreamingSessionAlive(session.first);
+
AssertStreamingSessionDead(session.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
-
+
UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999));
- }
-
- Y_UNIT_TEST(PreferredCluster_RemotePreferredClusterEnabledWhileSessionInitializing_SessionDiesOnlyAfterInitializationAndDelay) {
- APITestSetup setup{TEST_CASE_NAME};
- auto log = setup.GetLog();
- setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
+ }
+
+ Y_UNIT_TEST(PreferredCluster_RemotePreferredClusterEnabledWhileSessionInitializing_SessionDiesOnlyAfterInitializationAndDelay) {
+ APITestSetup setup{TEST_CASE_NAME};
+ auto log = setup.GetLog();
+ setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
+ setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(2);
- const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
- setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false);
-
- grpc::ClientContext context;
- auto session = setup.GetPersQueueService()->StreamingWrite(&context);
-
- setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true);
-
- setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
- log << TLOG_INFO << "Wait for cluster tracker event";
- auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
+ const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
+ setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, false);
+
+ grpc::ClientContext context;
+ auto session = setup.GetPersQueueService()->StreamingWrite(&context);
+
+ setup.GetFlatMsgBusPQClient().UpdateDC(setup.GetRemoteCluster(), false, true);
+
+ setup.GetServer().GetRuntime()->Send(new IEventHandle(NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActorID, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
+ log << TLOG_INFO << "Wait for cluster tracker event";
+ auto clustersUpdate = setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
TInstant now(TInstant::Now());
- setup.InitSession(session, GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+ setup.InitSession(session, GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
- AssertStreamingSessionAlive(session);
-
- log << TLOG_INFO << "Set small delay and wait for initialized session with remote preferred cluster to die";
- AssertStreamingSessionDead(session, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ AssertStreamingSessionAlive(session);
+
+ log << TLOG_INFO << "Set small delay and wait for initialized session with remote preferred cluster to die";
+ AssertStreamingSessionDead(session, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
UNIT_ASSERT(TInstant::Now() - now > TDuration::MilliSeconds(1999));
- }
+ }
Y_UNIT_TEST(SchemeOperationsTest) {
NPersQueue::TTestServer server;
diff --git a/ydb/services/persqueue_v1/ut/api_test_setup.h b/ydb/services/persqueue_v1/ut/api_test_setup.h
index 4b8963bc3e..8d5d82b33d 100644
--- a/ydb/services/persqueue_v1/ut/api_test_setup.h
+++ b/ydb/services/persqueue_v1/ut/api_test_setup.h
@@ -1,120 +1,120 @@
-#pragma once
-#include "pq_data_writer.h"
-#include "test_utils.h"
-
+#pragma once
+#include "pq_data_writer.h"
+#include "test_utils.h"
+
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
#include <ydb/core/testlib/test_pq_client.h>
-
+
#include <google/protobuf/message.h>
#include <library/cpp/logger/log.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
-#include <util/string/builder.h>
-
-class APITestSetup {
-private:
- using TStreamingWriteClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage;
- using TStreamingWriteServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage;
- TLog Log;
+#include <util/string/builder.h>
+
+class APITestSetup {
+private:
+ using TStreamingWriteClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage;
+ using TStreamingWriteServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage;
+ TLog Log;
NPersQueue::TTestServer server;
- std::shared_ptr<::grpc::Channel> channel;
- std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub> service;
-public:
- APITestSetup(const TString& testCaseName)
- : Log("cerr")
+ std::shared_ptr<::grpc::Channel> channel;
+ std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub> service;
+public:
+ APITestSetup(const TString& testCaseName)
+ : Log("cerr")
, server(NPersQueue::TTestServer())
, channel(grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials()))
- , service(Ydb::PersQueue::V1::PersQueueService::NewStub(channel))
- {
- Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) {
- return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
- });
+ , service(Ydb::PersQueue::V1::PersQueueService::NewStub(channel))
+ {
+ Log.SetFormatter([testCaseName](ELogPriority priority, TStringBuf message) {
+ return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
+ });
server.CleverServer->GetRuntime()->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG);
server.CleverServer->GetRuntime()->SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG);
server.AnnoyingClient->CreateTopic("rt3.dc1--" + TString(GetTestTopic()), 1);
NKikimr::NPersQueueTests::TPQDataWriter writer(GetTestMessageGroupId(), server);
-
- auto seed = TInstant::Now().MicroSeconds();
- // This makes failing randomized tests (for example with NUnitTest::RandomString(size, std::rand()) calls) reproducable
- Log << TLOG_INFO << "Random seed for debugging is " << seed;
- std::srand(seed);
- }
-
- TLog& GetLog() {
- return Log;
- }
-
- TString GetTestTopic() {
- return "topic1";
- }
-
- TString GetTestMessageGroupId() {
- return "test-message-group-id";
- }
-
- TString GetLocalCluster() {
- return "dc1";
- }
-
- TString GetRemoteCluster() {
- return "dc2";
- }
-
- TDuration GetLongDelay() {
- return TDuration::Max() - TDuration::Seconds(1) /* So delay does not get mixed up with infinity */;
- }
-
- NKikimr::Tests::TServer& GetServer() {
+
+ auto seed = TInstant::Now().MicroSeconds();
+ // This makes failing randomized tests (for example with NUnitTest::RandomString(size, std::rand()) calls) reproducable
+ Log << TLOG_INFO << "Random seed for debugging is " << seed;
+ std::srand(seed);
+ }
+
+ TLog& GetLog() {
+ return Log;
+ }
+
+ TString GetTestTopic() {
+ return "topic1";
+ }
+
+ TString GetTestMessageGroupId() {
+ return "test-message-group-id";
+ }
+
+ TString GetLocalCluster() {
+ return "dc1";
+ }
+
+ TString GetRemoteCluster() {
+ return "dc2";
+ }
+
+ TDuration GetLongDelay() {
+ return TDuration::Max() - TDuration::Seconds(1) /* So delay does not get mixed up with infinity */;
+ }
+
+ NKikimr::Tests::TServer& GetServer() {
return *server.CleverServer;
- }
-
- TString GetPQRoot() {
- return "/Root/PQ";
- }
-
- NKikimrPQ::TPQConfig& GetPQConfig() {
+ }
+
+ TString GetPQRoot() {
+ return "/Root/PQ";
+ }
+
+ NKikimrPQ::TPQConfig& GetPQConfig() {
return server.CleverServer->GetRuntime()->GetAppData().PQConfig;
- }
-
- NKikimr::NPersQueueTests::TFlatMsgBusPQClient& GetFlatMsgBusPQClient() {
+ }
+
+ NKikimr::NPersQueueTests::TFlatMsgBusPQClient& GetFlatMsgBusPQClient() {
return *server.AnnoyingClient;
- }
-
- std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub>& GetPersQueueService() {
- return service;
- }
-
- std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession(
- const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest())
- {
- auto context = std::make_unique<grpc::ClientContext>();
- auto stream = service->StreamingWrite(context.get());
- InitSession(stream, setup);
- return std::make_pair(std::move(stream), std::move(context));
- }
-
- // Initializes session with default (possible overwriten by setup parameter) values and returns initialization response
- template<typename TClientMessage, typename TServerMessage>
- TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
- const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest())
- {
- TClientMessage clientMessage;
- // TODO: Replace with MergeFrom?
- clientMessage.mutable_init_request()->CopyFrom(setup);
- if (setup.topic().empty()) {
- clientMessage.mutable_init_request()->set_topic(GetTestTopic());
- }
- if (setup.message_group_id().empty()) {
- clientMessage.mutable_init_request()->set_message_group_id(GetTestMessageGroupId());
- }
- AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage);
-
- TServerMessage serverMessage;
- Log << TLOG_INFO << "Wait for \"init_response\"";
- AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
+ }
+
+ std::unique_ptr<Ydb::PersQueue::V1::PersQueueService::Stub>& GetPersQueueService() {
+ return service;
+ }
+
+ std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession(
+ const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest())
+ {
+ auto context = std::make_unique<grpc::ClientContext>();
+ auto stream = service->StreamingWrite(context.get());
+ InitSession(stream, setup);
+ return std::make_pair(std::move(stream), std::move(context));
+ }
+
+ // Initializes session with default (possible overwriten by setup parameter) values and returns initialization response
+ template<typename TClientMessage, typename TServerMessage>
+ TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
+ const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest())
+ {
+ TClientMessage clientMessage;
+ // TODO: Replace with MergeFrom?
+ clientMessage.mutable_init_request()->CopyFrom(setup);
+ if (setup.topic().empty()) {
+ clientMessage.mutable_init_request()->set_topic(GetTestTopic());
+ }
+ if (setup.message_group_id().empty()) {
+ clientMessage.mutable_init_request()->set_message_group_id(GetTestMessageGroupId());
+ }
+ AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage);
+
+ TServerMessage serverMessage;
+ Log << TLOG_INFO << "Wait for \"init_response\"";
+ AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
Cerr << "Init response: " << serverMessage.ShortDebugString() << Endl;
- UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage);
- Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote();
- return serverMessage;
- }
-};
+ UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage);
+ Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote();
+ return serverMessage;
+ }
+};
diff --git a/ydb/services/persqueue_v1/ut/pq_data_writer.h b/ydb/services/persqueue_v1/ut/pq_data_writer.h
index 71f271fb14..caef894060 100644
--- a/ydb/services/persqueue_v1/ut/pq_data_writer.h
+++ b/ydb/services/persqueue_v1/ut/pq_data_writer.h
@@ -1,5 +1,5 @@
#pragma once
-#include "test_utils.h"
+#include "test_utils.h"
#include <ydb/core/testlib/test_pq_client.h>
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
@@ -178,7 +178,7 @@ public:
continue;
}
- AssertSuccessfullStreamingOperation(stream->Read(&resp), stream);
+ AssertSuccessfullStreamingOperation(stream->Read(&resp), stream);
if (resp.status() == Ydb::StatusIds::UNAVAILABLE) {
continue;
}
@@ -223,16 +223,16 @@ public:
return 0;
}
- ui32 Write(const TString& topic, const TVector<TString>& data, bool error = false, const TMaybe<TString>& ticket = {}) {
+ ui32 Write(const TString& topic, const TVector<TString>& data, bool error = false, const TMaybe<TString>& ticket = {}) {
return WriteImpl(topic, {data}, error, ticket);
}
private:
- ui32 WriteImpl(const TString& topic, const TVector<TString>& data, bool error, const TMaybe<TString>& ticket) {
+ ui32 WriteImpl(const TString& topic, const TVector<TString>& data, bool error, const TMaybe<TString>& ticket) {
grpc::ClientContext context;
- if (ticket)
- context.AddMetadata("x-ydb-auth-ticket", *ticket);
+ if (ticket)
+ context.AddMetadata("x-ydb-auth-ticket", *ticket);
auto stream = StubP_->StreamingWrite(&context);
@@ -300,20 +300,20 @@ private:
}
template <typename S>
- void Flush(const TVector<TString>& data, S& stream, const TMaybe<TString>& ticket) {
+ void Flush(const TVector<TString>& data, S& stream, const TMaybe<TString>& ticket) {
Ydb::PersQueue::V1::StreamingWriteClientMessage request;
Ydb::PersQueue::V1::StreamingWriteServerMessage response;
- if (ticket) {
- request.mutable_update_token_request()->set_token(*ticket);
- Cerr << "update user token request: " << request << Endl;
- if (!stream->Write(request)) {
- ythrow yexception() << "write fail";
- }
- UNIT_ASSERT(stream->Read(&response));
- UNIT_ASSERT_C(response.server_message_case() == Ydb::PersQueue::V1::StreamingWriteServerMessage::kUpdateTokenResponse, response);
- }
-
+ if (ticket) {
+ request.mutable_update_token_request()->set_token(*ticket);
+ Cerr << "update user token request: " << request << Endl;
+ if (!stream->Write(request)) {
+ ythrow yexception() << "write fail";
+ }
+ UNIT_ASSERT(stream->Read(&response));
+ UNIT_ASSERT_C(response.server_message_case() == Ydb::PersQueue::V1::StreamingWriteServerMessage::kUpdateTokenResponse, response);
+ }
+
TVector<ui64> allSeqNo;
auto* mutableData = request.mutable_write_request();
ui32 offset = 0;
@@ -329,8 +329,8 @@ private:
mutableData->add_blocks_part_numbers(0);
mutableData->add_blocks_message_counts(1);
mutableData->add_blocks_uncompressed_sizes(d.size());
- mutableData->add_blocks_headers(TString(1, '\0') /* RAW codec ID */);
- mutableData->add_blocks_data(d);
+ mutableData->add_blocks_headers(TString(1, '\0') /* RAW codec ID */);
+ mutableData->add_blocks_data(d);
}
Cerr << "request: " << request << Endl;
diff --git a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
index f3f78567ef..779d60e778 100644
--- a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
+++ b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
@@ -1,14 +1,14 @@
-#include "rate_limiter_test_setup.h"
+#include "rate_limiter_test_setup.h"
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <library/cpp/logger/priority.h>
-
-using namespace NPersQueue;
-using namespace Ydb::PersQueue;
-using namespace NKikimr::Tests;
-
-
-namespace NKikimr::NPersQueueTests {
-
+
+using namespace NPersQueue;
+using namespace Ydb::PersQueue;
+using namespace NKikimr::Tests;
+
+
+namespace NKikimr::NPersQueueTests {
+
TRateLimiterTestSetup::TRateLimiterTestSetup(
NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity limitedEntity,
double writeAccountQuota,
@@ -19,28 +19,28 @@ TRateLimiterTestSetup::TRateLimiterTestSetup(
, LimitedEntity(limitedEntity)
, WriteAccountQuota(writeAccountQuota)
, ReadAccountQuota(readAccountQuota)
-{
+{
Start(enableReadQuoting);
-}
-
+}
+
void TRateLimiterTestSetup::CreateTopic(const TString& path) {
- const TString name = BuildFullTopicName(path, "dc1");
- const TString account = GetAccount(name);
-
- Cerr << "Creating topic \"" << name << "\"" << Endl;
+ const TString name = BuildFullTopicName(path, "dc1");
+ const TString account = GetAccount(name);
+
+ Cerr << "Creating topic \"" << name << "\"" << Endl;
Server->AnnoyingClient->CreateTopic(name, 1);
-
+
CreateKesus(account);
CreateQuotaResources(path, "write-quota", false);
CreateQuotaResources(path, "read-quota", true);
}
-
+
void TRateLimiterTestSetup::CreateConsumer(const TString& path) {
const TString account = GetAccount(path);
-
+
Cerr << "Creating consumer \"" << path << "\"" << Endl;
Server->AnnoyingClient->CreateConsumer(path);
-
+
CreateKesus(account);
CreateQuotaResources(path, "write-quota", true);
CreateQuotaResources(path, "read-quota", false);
@@ -68,10 +68,10 @@ void TRateLimiterTestSetup::CreateKesus(const TString& account) {
}
void TRateLimiterTestSetup::CreateQuotaResources(const TString& path, const TString& quotaPrefix, bool excludeLastComponent) {
- TVector<TString> pathComponents = SplitPath(path);
+ TVector<TString> pathComponents = SplitPath(path);
if (pathComponents.size() <= 1) {
return;
- }
+ }
TStringBuilder prefixPath;
prefixPath << quotaPrefix;
@@ -91,59 +91,59 @@ void TRateLimiterTestSetup::CreateQuotaResources(const TString& path, const TStr
"Status: " << Ydb::StatusIds::StatusCode_Name(statusCode)
);
}
-}
-
+}
+
/*
THolder<Ydb::PersQueue::IProducer> TRateLimiterTestSetup::StartProducer(const TString& topicPath, bool compress) {
Ydb::PersQueue::TProducerSettings producerSettings;
producerSettings.Server = Ydb::PersQueue::TServerSetting("localhost", Server->GrpcPort);
- producerSettings.Topic = topicPath;
+ producerSettings.Topic = topicPath;
producerSettings.SourceId = "TRateLimiterTestSetupSourceId";
producerSettings.Codec = compress ? "gzip" : "raw";
THolder<Ydb::PersQueue::IProducer> producer = PQLib->CreateProducer(producerSettings);
- auto startResult = producer->Start();
- UNIT_ASSERT_EQUAL_C(Ydb::StatusIds::SUCCESS, startResult.GetValueSync().Response.status(), "Response: " << startResult.GetValueSync().Response);
- return producer;
-}
+ auto startResult = producer->Start();
+ UNIT_ASSERT_EQUAL_C(Ydb::StatusIds::SUCCESS, startResult.GetValueSync().Response.status(), "Response: " << startResult.GetValueSync().Response);
+ return producer;
+}
*/
-
+
void TRateLimiterTestSetup::Start(bool enableReadQuoting) {
InitServer(enableReadQuoting);
- InitQuoting();
+ InitQuoting();
WaitWritePQServiceInitialization();
-}
-
+}
+
void TRateLimiterTestSetup::InitServer(bool enableReadQuoting) {
auto& settings = Server->ServerSettings;
-
+
settings.PQConfig.MutableQuotingConfig()->SetEnableQuoting(true);
settings.PQConfig.MutableQuotingConfig()->SetEnableReadQuoting(enableReadQuoting);
settings.PQConfig.MutableQuotingConfig()->SetTopicWriteQuotaEntityToLimit(LimitedEntity);
-
+
Server->GrpcServerOptions.SetMaxMessageSize(130 * 1024 * 1024);
Server->StartServer();
Server->EnableLogs(
- {
- NKikimrServices::PQ_READ_PROXY,
- NKikimrServices::PQ_WRITE_PROXY,
- NKikimrServices::PERSQUEUE,
- NKikimrServices::QUOTER_SERVICE,
- NKikimrServices::QUOTER_PROXY,
- NKikimrServices::KESUS_TABLET,
+ {
+ NKikimrServices::PQ_READ_PROXY,
+ NKikimrServices::PQ_WRITE_PROXY,
+ NKikimrServices::PERSQUEUE,
+ NKikimrServices::QUOTER_SERVICE,
+ NKikimrServices::QUOTER_PROXY,
+ NKikimrServices::KESUS_TABLET,
NKikimrServices::PQ_READ_SPEED_LIMITER
},
NActors::NLog::PRI_TRACE
- );
-}
-
-void TRateLimiterTestSetup::InitQuoting() {
+ );
+}
+
+void TRateLimiterTestSetup::InitQuoting() {
Server->AnnoyingClient->MkDir("/Root", "PersQueue");
Server->AnnoyingClient->MkDir("/Root/PersQueue", "System");
Server->AnnoyingClient->MkDir("/Root/PersQueue/System", "Quoters");
-}
-
+}
+
void TRateLimiterTestSetup::WaitWritePQServiceInitialization() {
PQDataWriter = MakeHolder<TPQDataWriter>("writer_source_id", *Server);
-}
+}
}
diff --git a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h
index 1196574139..01529818e1 100644
--- a/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h
+++ b/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h
@@ -1,59 +1,59 @@
-#pragma once
+#pragma once
#include "pq_data_writer.h"
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/testlib/test_pq_client.h>
-
-namespace NKikimr::NPersQueueTests {
-
-class TRateLimiterTestSetup {
-public:
+
+namespace NKikimr::NPersQueueTests {
+
+class TRateLimiterTestSetup {
+public:
explicit TRateLimiterTestSetup(
NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity limitedEntity,
double writeAccountQuota = 1000.0,
double readAccountQuota = 1000.0,
bool enableReadQuoting = false
);
-
+
void CreateTopic(const TString& path);
void CreateConsumer(const TString& path);
-
- // namespace NPersQueue = Ydb::PersQueue;
+
+ // namespace NPersQueue = Ydb::PersQueue;
// THolder<Ydb::PersQueue::IProducer> StartProducer(const TString& topicPath, bool compress = false);
-
- // Getters
- ui16 GetGrpcPort() const {
+
+ // Getters
+ ui16 GetGrpcPort() const {
return Server->GrpcPort;
- }
-
- ui16 GetMsgBusPort() const {
+ }
+
+ ui16 GetMsgBusPort() const {
return Server->Port;
- }
-
- NKikimr::Tests::TServer& GetServer() {
+ }
+
+ NKikimr::Tests::TServer& GetServer() {
return *Server->CleverServer;
- }
-
- NPersQueueTests::TFlatMsgBusPQClient& GetClient() {
+ }
+
+ NPersQueueTests::TFlatMsgBusPQClient& GetClient() {
return *Server->AnnoyingClient;
- }
-
-private:
+ }
+
+private:
void CreateKesus(const TString& account);
void CreateQuotaResources(const TString& path, const TString& quotaPrefix, bool excludeLastComponent);
-
+
void Start(bool enableReadQuoting);
void InitServer(bool enableReadQuoting);
- void InitQuoting();
+ void InitQuoting();
void WaitWritePQServiceInitialization();
-
-private:
+
+private:
THolder<NPersQueue::TTestServer> Server;
THolder<TPQDataWriter> PQDataWriter; // For waiting for grpc writer service initialization.
const NKikimrPQ::TPQConfig::TQuotingConfig::ELimitedEntity LimitedEntity;
double WriteAccountQuota;
double ReadAccountQuota;
const TString QuotersRootPath = "/Root/PersQueue/System/Quoters";
-};
-}
+};
+}
diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h
index 50ee04ff94..c02308b0d7 100644
--- a/ydb/services/persqueue_v1/ut/test_utils.h
+++ b/ydb/services/persqueue_v1/ut/test_utils.h
@@ -1,16 +1,16 @@
-#pragma once
+#pragma once
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
#include <ydb/core/testlib/test_pq_client.h>
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <google/protobuf/message.h>
-#include <contrib/libs/grpc/include/grpcpp/support/sync_stream.h>
-#include <util/string/builder.h>
+#include <contrib/libs/grpc/include/grpcpp/support/sync_stream.h>
+#include <util/string/builder.h>
#include <util/system/type_name.h>
-
+
#define TEST_CASE_NAME (TypeName(*this).rfind("TTestCase") != TString::npos ? TypeName(*this).substr(TypeName(*this).rfind("TTestCase") + 9) : TypeName(*this))
-
+
static constexpr int DEBUG_LOG_LEVEL = 7;
using namespace NKikimr::NPersQueueTests;
@@ -20,62 +20,62 @@ inline void WaitACLModification() {
Sleep(TDuration::Seconds(5));
}
-// TODO: Remove and replace all usage with ApiTestSetup
-#define SETUP_API_TEST_PREREQUISITES()\
- const TString topic = "topic1";\
- const TString cluster = "dc1";\
- const TString pqRoot = "/Root/PQ";\
- const TString messageGroupId = "test-message-group-id";\
- TPortManager pm;\
- const ui16 port = pm.GetPort(2134);\
- const ui16 grpc = pm.GetPort(2135);\
- TServerSettings settings = PQSettings(port);\
- TServer server = TServer(settings);\
+// TODO: Remove and replace all usage with ApiTestSetup
+#define SETUP_API_TEST_PREREQUISITES()\
+ const TString topic = "topic1";\
+ const TString cluster = "dc1";\
+ const TString pqRoot = "/Root/PQ";\
+ const TString messageGroupId = "test-message-group-id";\
+ TPortManager pm;\
+ const ui16 port = pm.GetPort(2134);\
+ const ui16 grpc = pm.GetPort(2135);\
+ TServerSettings settings = PQSettings(port);\
+ TServer server = TServer(settings);\
server.EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(grpc));\
- TFlatMsgBusPQClient client(settings, grpc);\
+ TFlatMsgBusPQClient client(settings, grpc);\
client.FullInit();\
- client.CreateTopic("rt3.dc1--" + TString(topic), 1);\
- EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\
- TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\
- writer.WaitWritePQServiceInitialization();\
- auto channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());\
- auto service = Ydb::PersQueue::V1::PersQueueService::NewStub(channel);\
-
-template<typename TStream, typename TMessage = google::protobuf::Message>
-void AssertSuccessfullStreamingOperation(bool ok, std::unique_ptr<TStream>& stream, TMessage* message = nullptr) {
- if (!ok) {
- auto status = stream->Finish();
- TStringBuilder errorMessage;
- errorMessage << "gRPC stream operation failed with error code " << static_cast<int>(status.error_code()) << " and error message '" << status.error_message() << "'.";
- if (message != nullptr) {
- errorMessage << " Last user message is " << message->DebugString();
- }
- UNIT_FAIL(errorMessage);
- }
-}
-
-template<typename TClientMessage, typename TServerMessage>
-void AssertStreamingSessionAlive(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream) {
- TClientMessage clientMessage;
- // TODO: Add 'ping_request' and 'ping_response' to write and read protocol for debugging?
- clientMessage.mutable_update_token_request();
- AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage);
- TServerMessage serverMessage;
- AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
- UNIT_ASSERT_EQUAL_C(TServerMessage::kUpdateTokenResponse, serverMessage.server_message_case(), serverMessage);
-}
-
-template<typename TClientMessage, typename TServerMessage>
-void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
- const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode)
-{
- TServerMessage serverMessage;
- AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
- UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status());
- UNIT_ASSERT_LE(1, serverMessage.issues_size());
- // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"?
- // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does?
- auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code());
- UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage);
-}
+ client.CreateTopic("rt3.dc1--" + TString(topic), 1);\
+ EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\
+ TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\
+ writer.WaitWritePQServiceInitialization();\
+ auto channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());\
+ auto service = Ydb::PersQueue::V1::PersQueueService::NewStub(channel);\
+
+template<typename TStream, typename TMessage = google::protobuf::Message>
+void AssertSuccessfullStreamingOperation(bool ok, std::unique_ptr<TStream>& stream, TMessage* message = nullptr) {
+ if (!ok) {
+ auto status = stream->Finish();
+ TStringBuilder errorMessage;
+ errorMessage << "gRPC stream operation failed with error code " << static_cast<int>(status.error_code()) << " and error message '" << status.error_message() << "'.";
+ if (message != nullptr) {
+ errorMessage << " Last user message is " << message->DebugString();
+ }
+ UNIT_FAIL(errorMessage);
+ }
+}
+
+template<typename TClientMessage, typename TServerMessage>
+void AssertStreamingSessionAlive(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream) {
+ TClientMessage clientMessage;
+ // TODO: Add 'ping_request' and 'ping_response' to write and read protocol for debugging?
+ clientMessage.mutable_update_token_request();
+ AssertSuccessfullStreamingOperation(stream->Write(clientMessage), stream, &clientMessage);
+ TServerMessage serverMessage;
+ AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
+ UNIT_ASSERT_EQUAL_C(TServerMessage::kUpdateTokenResponse, serverMessage.server_message_case(), serverMessage);
+}
+
+template<typename TClientMessage, typename TServerMessage>
+void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
+ const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode)
+{
+ TServerMessage serverMessage;
+ AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
+ UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status());
+ UNIT_ASSERT_LE(1, serverMessage.issues_size());
+ // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"?
+ // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does?
+ auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code());
+ UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage);
+}