aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-07-15 15:04:41 +0300
committeralexnick <alexnick@ydb.tech>2022-07-15 15:04:41 +0300
commitc32db3322b4705080f14507646b8734017859d1e (patch)
tree55b0af88d698072ab88bfc391ca335da85ec1b23
parentc9ecab541133902ec7d20cfff194651e1f1b3189 (diff)
downloadydb-c32db3322b4705080f14507646b8734017859d1e.tar.gz
fix for topic service
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp7
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp6
3 files changed, 11 insertions, 4 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index bd5211cc7d2..f9cf13dc1ef 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -595,7 +595,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
hasYqlInternal = true;
}
- if (hasTableService || hasYqlInternal || hasPQ || hasKesus || hasPQv1 || hasExport || hasImport) {
+ if (hasTableService || hasYqlInternal || hasPQ || hasKesus || hasPQv1 || hasExport || hasImport || hasTopic) {
hasSchemeService = true;
hasOperationService = true;
// backward compatability
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index f63c67a3ab9..7267c418397 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -393,9 +393,12 @@ bool FillBatchedData(
SetBatchExtraField(currentBatch, kv.GetKey(), kv.GetValue());
}
}
- if constexpr (UseMigrationProtocol) {
- if (proto.HasIp() && IsUtf(proto.GetIp())) {
+
+ if (proto.HasIp() && IsUtf(proto.GetIp())) {
+ if constexpr (UseMigrationProtocol) {
currentBatch->set_ip(proto.GetIp());
+ } else {
+ SetBatchExtraField(currentBatch, "_ip", proto.GetIp());
}
}
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index dd38d513d28..3229faa2e6d 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1645,6 +1645,10 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
} else {
hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
}
+ ui64 sizeEstimation = hasMessages ? formedResponse->Response.ByteSize() : 0;
+ if constexpr (!UseMigrationProtocol) {
+ formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation);
+ }
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid);
@@ -1686,7 +1690,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
ReadsInfly--;
if constexpr (!UseMigrationProtocol) {
ReadSizeBudget += formedResponse->RequestedBytes;
- ReadSizeBudget -= diff;
+ ReadSizeBudget -= sizeEstimation;
}
// Bring back available partitions.