diff options
author | alexnick <alexnick@ydb.tech> | 2022-07-15 15:04:41 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-07-15 15:04:41 +0300 |
commit | c32db3322b4705080f14507646b8734017859d1e (patch) | |
tree | 55b0af88d698072ab88bfc391ca335da85ec1b23 | |
parent | c9ecab541133902ec7d20cfff194651e1f1b3189 (diff) | |
download | ydb-c32db3322b4705080f14507646b8734017859d1e.tar.gz |
fix for topic service
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 6 |
3 files changed, 11 insertions, 4 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index bd5211cc7d2..f9cf13dc1ef 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -595,7 +595,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { hasYqlInternal = true; } - if (hasTableService || hasYqlInternal || hasPQ || hasKesus || hasPQv1 || hasExport || hasImport) { + if (hasTableService || hasYqlInternal || hasPQ || hasKesus || hasPQv1 || hasExport || hasImport || hasTopic) { hasSchemeService = true; hasOperationService = true; // backward compatability diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index f63c67a3ab9..7267c418397 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -393,9 +393,12 @@ bool FillBatchedData( SetBatchExtraField(currentBatch, kv.GetKey(), kv.GetValue()); } } - if constexpr (UseMigrationProtocol) { - if (proto.HasIp() && IsUtf(proto.GetIp())) { + + if (proto.HasIp() && IsUtf(proto.GetIp())) { + if constexpr (UseMigrationProtocol) { currentBatch->set_ip(proto.GetIp()); + } else { + SetBatchExtraField(currentBatch, "_ip", proto.GetIp()); } } } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index dd38d513d28..3229faa2e6d 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -1645,6 +1645,10 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& } else { hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); } + ui64 sizeEstimation = hasMessages ? formedResponse->Response.ByteSize() : 0; + if constexpr (!UseMigrationProtocol) { + formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation); + } if (hasMessages) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid); @@ -1686,7 +1690,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& ReadsInfly--; if constexpr (!UseMigrationProtocol) { ReadSizeBudget += formedResponse->RequestedBytes; - ReadSizeBudget -= diff; + ReadSizeBudget -= sizeEstimation; } // Bring back available partitions. |