diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-07-13 16:42:15 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-07-13 16:42:15 +0300 |
commit | caafd737598a64041420fe6cd27636b8b54e7afd (patch) | |
tree | 8f6dc306dc9f25ccbbb584152a32390dd37a778e | |
parent | d4b3cf864f1c876ff34cd2c30fc03c8572df686c (diff) | |
download | ydb-caafd737598a64041420fe6cd27636b8b54e7afd.tar.gz |
fix topic service
fix topic service
[] enable 'topic' by default
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/events.h | 8 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.cpp | 21 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.h | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 83 |
5 files changed, 83 insertions, 32 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 0b7431c5271..bd5211cc7d2 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -523,7 +523,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["pq"] = &hasPQ; bool hasPQv1 = services.empty(); names["pqv1"] = &hasPQv1; - bool hasTopic = false; + bool hasTopic = services.empty(); names["topic"] = &hasTopic; bool hasPQCD = services.empty(); names["pqcd"] = &hasPQCD; diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 29b45676bf2..1837044a763 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -177,6 +177,14 @@ struct TEvPQProxy { , ReadTimestampMs(readTimestampMs) { } + explicit TEvRead(ui64 maxSize) + : Guid(CreateGuidAsString()) + , MaxCount(0) + , MaxSize(maxSize) + , MaxTimeLagMs(0) + , ReadTimestampMs(0) { + } + const TString Guid; ui64 MaxCount; ui64 MaxSize; diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index c6304b6984c..f63c67a3ab9 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -303,6 +303,13 @@ void SetBatchExtraField(Topic::StreamReadMessage::ReadResponse::Batch* batch, TS (*batch->mutable_write_session_meta())[key] = std::move(value); } +i32 GetDataChunkCodec(const NKikimrPQClient::TDataChunk& proto) { + if (proto.HasCodec()) { + return proto.GetCodec() + 1; + } + return 0; +} + template<typename TReadResponse> bool FillBatchedData( TReadResponse* data, const NKikimrClient::TCmdReadResult& res, @@ -326,6 +333,8 @@ bool FillBatchedData( bool hasOffset = false; bool hasData = false; + i32 batchCodec = 0; // UNSPECIFIED + typename TReadResponse::Batch* currentBatch = nullptr; for (ui32 i = 0; i < res.ResultSize(); ++i) { const auto& r = res.GetResult(i); @@ -348,15 +357,19 @@ bool FillBatchedData( sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId()); } - if (!currentBatch - || GetBatchWriteTimestampMS(currentBatch) != static_cast<i64>(r.GetWriteTimestampMS()) - || GetBatchSourceId(currentBatch) != sourceId) { + if (!currentBatch || GetBatchWriteTimestampMS(currentBatch) != static_cast<i64>(r.GetWriteTimestampMS()) || + GetBatchSourceId(currentBatch) != sourceId || + (!UseMigrationProtocol && GetDataChunkCodec(proto) != batchCodec)) { // If write time and source id are the same, the rest fields will be the same too. currentBatch = partitionData->add_batches(); i64 write_ts = static_cast<i64>(r.GetWriteTimestampMS()); Y_VERIFY(write_ts >= 0); SetBatchWriteTimestampMS(currentBatch, write_ts); SetBatchSourceId(currentBatch, std::move(sourceId)); + batchCodec = GetDataChunkCodec(proto); + if constexpr (!UseMigrationProtocol) { + currentBatch->set_codec(batchCodec); + } if (proto.HasMeta()) { const auto& header = proto.GetMeta(); @@ -406,7 +419,7 @@ bool FillBatchedData( *message->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS()); - message->set_message_group_id(r.GetPartitionKey()); + message->set_message_group_id(GetBatchSourceId(currentBatch)); } hasData = true; } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index 6013112419b..de20aee7bdf 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -289,6 +289,7 @@ private: ui32 MaxReadSize; i64 MaxTimeLagMs; i64 ReadTimestampMs; + i64 ReadSizeBudget; TString Auth; diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index d2cd4625976..dd38d513d28 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -30,12 +30,12 @@ using namespace PersQueue::V1; //TODO: add here tracking of bytes in/out -template<bool UseMigrationProtocol> -TReadSessionActor<UseMigrationProtocol>::TReadSessionActor( - TEvStreamPQReadRequest* request, const ui64 cookie, const TActorId& schemeCache, const TActorId& newSchemeCache, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, - const NPersQueue::TTopicsListController& topicsHandler -) +template <bool UseMigrationProtocol> +TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie, + const TActorId& schemeCache, const TActorId& newSchemeCache, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TMaybe<TString> clientDC, + const NPersQueue::TTopicsListController& topicsHandler) : Request(request) , ClientDC(clientDC ? *clientDC : "other") , StartTimestamp(TInstant::Now()) @@ -52,6 +52,7 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor( , MaxReadSize(0) , MaxTimeLagMs(0) , ReadTimestampMs(0) + , ReadSizeBudget(0) , ForceACLCheck(false) , RequestNotChecked(true) , LastACLCheckTimestamp(TInstant::Zero()) @@ -62,12 +63,10 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor( , BytesInflight_(0) , RequestedBytes(0) , ReadsInfly(0) - , TopicsHandler(topicsHandler) -{ + , TopicsHandler(topicsHandler) { Y_ASSERT(Request); } - template<bool UseMigrationProtocol> TReadSessionActor<UseMigrationProtocol>::~TReadSessionActor() = default; @@ -254,7 +253,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } case TClientMessage::kReadRequest: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size())); break; } case TClientMessage::kPartitionSessionStatusRequest: { @@ -682,8 +681,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& MaxReadSize = NormalizeMaxReadSize(0); MaxTimeLagMs = 0; // max_lag per topic only ReadTimestampMs = 0; // read_from per topic only - // MaxTimeLagMs = ::google::protobuf::util::TimeUtil::DurationToMilliseconds(init.max_lag()); - // ReadTimestampMs = ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(init.read_from()); ReadOnlyLocal = true; } if (MaxTimeLagMs < 0) { @@ -1531,7 +1528,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr& LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request with guid: " << event->Guid); - Reads.emplace_back(event.Release()); + if constexpr (UseMigrationProtocol) { + Reads.emplace_back(event.Release()); + } else { + ReadSizeBudget += event->MaxSize; + } ProcessReads(ctx); } @@ -1683,20 +1684,21 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& RequestedBytes -= formedResponse->RequestedBytes; ReadsInfly--; + if constexpr (!UseMigrationProtocol) { + ReadSizeBudget += formedResponse->RequestedBytes; + ReadSizeBudget -= diff; + } // Bring back available partitions. // If some partition was removed from partitions container, it is not bad because it will be checked during read processing. AvailablePartitions.insert(formedResponse->PartitionsBecameAvailable.begin(), formedResponse->PartitionsBecameAvailable.end()); - if (!hasMessages) { - // process new read - TClientMessage req; - if constexpr(UseMigrationProtocol) { - req.mutable_read(); - } else { - req.mutable_read_request(); + if constexpr (UseMigrationProtocol) { + if (!hasMessages) { + // process new read + // Start new reading request with the same guid + Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid)); } - Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid)); // Start new reading request with the same guid } ProcessReads(ctx); // returns false if actor died @@ -1727,12 +1729,27 @@ ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadSize(ui32 sourceVa template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& ctx) { - while (!Reads.empty() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES && ReadsInfly < MAX_INFLY_READS) { + auto ShouldContinueReads = [this]() { + if constexpr (UseMigrationProtocol) { + return !Reads.empty() && ReadsInfly < MAX_INFLY_READS; + } else { + return ReadSizeBudget > 0; + } + }; + + while (ShouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) { ui32 count = MaxReadMessagesCount; ui64 size = MaxReadSize; ui32 partitionsAsked = 0; - typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = new TFormedReadResponse<TServerMessage>(Reads.front()->Guid, ctx.Now()); + TString guid; + if constexpr (UseMigrationProtocol) { + guid = Reads.front()->Guid; + } else { + guid = CreateGuidAsString(); + } + typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = + new TFormedReadResponse<TServerMessage>(guid, ctx.Now()); while (!AvailablePartitions.empty()) { auto part = *AvailablePartitions.begin(); AvailablePartitions.erase(AvailablePartitions.begin()); @@ -1746,7 +1763,10 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& const ui32 ccount = Min<ui32>(part.MsgLag * LAG_GROW_MULTIPLIER, count); count -= ccount; - const ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size); + ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size); + if constexpr (!UseMigrationProtocol) { + csize = Min<i64>(csize, ReadSizeBudget); + } size -= csize; Y_VERIFY(csize < Max<i32>()); @@ -1765,9 +1785,9 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& auto lags_it = MaxLagByTopic.find(it->second.Topic->GetInternalName()); Y_VERIFY(lags_it != MaxLagByTopic.end()); - ui32 maxLag = Max(MaxTimeLagMs, lags_it->second); + ui32 maxLag = lags_it->second; - TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(Reads.front()->Guid, ccount, csize, maxLag, readTimestampMs); + TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(guid, ccount, csize, maxLag, readTimestampMs); LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " performing read request with guid " << read->Guid @@ -1786,14 +1806,21 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& RequestedBytes += csize; formedResponse->RequestedBytes += csize; + ReadSizeBudget -= csize; ctx.Send(it->second.Actor, read.Release()); const auto insertResult = PartitionToReadResponse.insert(std::make_pair(it->second.Actor, formedResponse)); Y_VERIFY(insertResult.second); + // Only from single partition + if constexpr (!UseMigrationProtocol) { + break; + } + if (count == 0 || size == 0) break; } + if (partitionsAsked == 0) break; ReadsTotal.Inc(); @@ -1807,7 +1834,9 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& if (BytesInflight) { (*BytesInflight) += diff; } - Reads.pop_front(); + if constexpr (UseMigrationProtocol) { + Reads.pop_front(); + } } } |