diff options
| author | Dmitry Kardymon <[email protected]> | 2025-01-21 12:58:48 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-01-21 12:58:48 +0300 |
| commit | 7ff68eac6fb5d12e6f6ece19086f5e45f89754fa (patch) | |
| tree | 68d8faa628b1ce5c2327f8807b19cf16f73f353e | |
| parent | 623de68f0102c414de61c1ae6656ae43722b9e46 (diff) | |
YQ-3893 Use one session read_actor <-> RD (#12247)
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/coordinator.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/events/data_plane.h | 24 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h | 5 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/leader_election.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/probes.h | 20 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/protos/events.proto | 29 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 398 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 110 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp | 127 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp | 50 | ||||
| -rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 561 | ||||
| -rw-r--r-- | ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 188 |
12 files changed, 857 insertions, 667 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index e258dd94582..c891c3b83d1 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -409,7 +409,7 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; - LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId())); + LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionIds())); Metrics.IncomingRequests->Inc(); TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record}; @@ -430,7 +430,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC bool hasPendingPartitions = false; TMap<NActors::TActorId, TSet<ui64>> tmpResult; - for (auto& partitionId : request.Record.GetPartitionId()) { + for (auto& partitionId : request.Record.GetPartitionIds()) { TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()}; TPartitionKey key {topicKey, partitionId}; auto locationIt = PartitionLocations.find(key); @@ -457,7 +457,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC auto* partitionsProto = response->Record.AddPartitions(); ActorIdToProto(actorId, partitionsProto->MutableActorId()); for (auto partitionId : partitions) { - partitionsProto->AddPartitionId(partitionId); + partitionsProto->AddPartitionIds(partitionId); } } diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 859bf8d6234..074a0a35a00 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -3,7 +3,6 @@ #include <ydb/library/actors/core/actorid.h> #include <ydb/library/actors/core/event_local.h> #include <ydb/core/fq/libs/events/event_subspace.h> - #include <ydb/core/fq/libs/row_dispatcher/protos/events.pb.h> #include <ydb/library/yql/providers/pq/proto/dq_io.pb.h> #include <ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h> @@ -11,6 +10,9 @@ #include <yql/essentials/public/issue/yql_issue.h> #include <yql/essentials/public/purecalc/common/fwd.h> +#include <util/generic/set.h> +#include <util/generic/map.h> + namespace NFq { NActors::TActorId RowDispatcherServiceActorId(); @@ -75,7 +77,7 @@ struct TEvRowDispatcher { const std::vector<ui64>& partitionIds) { *Record.MutableSource() = sourceParams; for (const auto& id : partitionIds) { - Record.AddPartitionId(id); + Record.AddPartitionIds(id); } } }; @@ -93,16 +95,20 @@ struct TEvRowDispatcher { TEvStartSession() = default; TEvStartSession( const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, - ui64 partitionId, + const std::set<ui32>& partitionIds, const TString token, - TMaybe<ui64> readOffset, + const std::map<ui32, ui64>& readOffsets, ui64 startingMessageTimestampMs, const TString& queryId) { *Record.MutableSource() = sourceParams; - Record.SetPartitionId(partitionId); + for (auto partitionId : partitionIds) { + Record.AddPartitionIds(partitionId); + } Record.SetToken(token); - if (readOffset) { - Record.SetOffset(*readOffset); + for (const auto& [partitionId, offset] : readOffsets) { + auto* partitionOffset = Record.AddOffsets(); + partitionOffset->SetPartitionId(partitionId); + partitionOffset->SetOffset(offset); } Record.SetStartingMessageTimestampMs(startingMessageTimestampMs); Record.SetQueryId(queryId); @@ -143,7 +149,6 @@ struct TEvRowDispatcher { struct TEvStatistics : public NActors::TEventPB<TEvStatistics, NFq::NRowDispatcherProto::TEvStatistics, EEv::EvStatistics> { TEvStatistics() = default; - NActors::TActorId ReadActorId; }; struct TEvSessionError : public NActors::TEventPB<TEvSessionError, @@ -162,9 +167,6 @@ struct TEvRowDispatcher { struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> { TEvHeartbeat() = default; - TEvHeartbeat(ui32 partitionId) { - Record.SetPartitionId(partitionId); - } }; struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> { diff --git a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h index 707c014618d..4c5e7f427f7 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h +++ b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h @@ -11,7 +11,8 @@ struct TTopicSessionClientStatistic { i64 UnreadRows = 0; // Current value i64 UnreadBytes = 0; // Current value ui64 Offset = 0; // Current value - ui64 ReadBytes = 0; // Increment / filtered + ui64 FilteredReadBytes = 0; // Increment / filtered + ui64 ReadBytes = 0; // Increment bool IsWaiting = false; // Current value i64 ReadLagMessages = 0; // Current value ui64 InitialOffset = 0; @@ -19,12 +20,14 @@ struct TTopicSessionClientStatistic { UnreadRows = stat.UnreadRows; UnreadBytes = stat.UnreadBytes; Offset = stat.Offset; + FilteredReadBytes += stat.FilteredReadBytes; ReadBytes += stat.ReadBytes; IsWaiting = stat.IsWaiting; ReadLagMessages = stat.ReadLagMessages; InitialOffset = stat.InitialOffset; } void Clear() { + FilteredReadBytes = 0; ReadBytes = 0; } }; diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp index 9d267183c0f..2ba97f2cc45 100644 --- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp +++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp @@ -77,12 +77,12 @@ struct TLeaderElectionMetrics { explicit TLeaderElectionMetrics(const ::NMonitoring::TDynamicCounterPtr& counters) : Counters(counters) { Errors = Counters->GetCounter("LeaderElectionErrors", true); - LeaderChangedCount = Counters->GetCounter("LeaderElectionChangedCount"); + LeaderChanged = Counters->GetCounter("LeaderChanged", true); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr Errors; - ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount; + ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; }; class TLeaderElection: public TActorBootstrapped<TLeaderElection> { @@ -458,7 +458,7 @@ void TLeaderElection::Handle(TEvPrivate::TEvDescribeSemaphoreResult::TPtr& ev) { if (!LeaderActorId || (*LeaderActorId != id)) { LOG_ROW_DISPATCHER_INFO("Send TEvCoordinatorChanged to " << ParentId); TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(id, generation)); - Metrics.LeaderChangedCount->Inc(); + Metrics.LeaderChanged->Inc(); } LeaderActorId = id; } diff --git a/ydb/core/fq/libs/row_dispatcher/probes.h b/ydb/core/fq/libs/row_dispatcher/probes.h index 2d00e21c089..0e385c4066f 100644 --- a/ydb/core/fq/libs/row_dispatcher/probes.h +++ b/ydb/core/fq/libs/row_dispatcher/probes.h @@ -41,8 +41,8 @@ NAMES("sender", "coordinatorGeneration", "coordinatorActor")) \ PROBE(StartSession, \ GROUPS(), \ - TYPES(TString, ui32, TString, ui64), \ - NAMES("sender", "partitionId", "queryId", "size")) \ + TYPES(TString, TString, ui64), \ + NAMES("sender", "queryId", "size")) \ PROBE(GetNextBatch, \ GROUPS(), \ TYPES(TString, ui32, TString, ui64), \ @@ -53,16 +53,16 @@ NAMES("sender", "partitionId", "queryId", "size")) \ PROBE(StopSession, \ GROUPS(), \ - TYPES(TString, ui32, TString, ui64), \ - NAMES("sender", "partitionId", "queryId", "size")) \ + TYPES(TString, TString, ui64), \ + NAMES("sender", "queryId", "size")) \ PROBE(TryConnect, \ GROUPS(), \ TYPES(TString, ui32), \ NAMES("sender", "nodeId")) \ PROBE(PrivateHeartbeat, \ GROUPS(), \ - TYPES(TString, ui32, TString, ui64), \ - NAMES("sender", "partitionId", "queryId", "generation")) \ + TYPES(TString, TString, ui64), \ + NAMES("sender", "queryId", "generation")) \ PROBE(NewDataArrived, \ GROUPS(), \ TYPES(TString, TString, TString, ui64, ui64), \ @@ -73,12 +73,12 @@ NAMES("sender", "readActor", "queryId", "generation", "size")) \ PROBE(SessionError, \ GROUPS(), \ - TYPES(TString, TString, ui32, TString, ui64, ui64), \ - NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \ + TYPES(TString, TString, TString, ui64, ui64), \ + NAMES("sender", "readActor", "queryId", "generation", "size")) \ PROBE(Statistics, \ GROUPS(), \ - TYPES(TString, TString, ui32, TString, ui64, ui64), \ - NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \ + TYPES(TString, TString, ui64, ui64), \ + NAMES("readActor", "queryId", "generation", "size")) \ PROBE(UpdateMetrics, \ GROUPS(), \ TYPES(), \ diff --git a/ydb/core/fq/libs/row_dispatcher/protos/events.proto b/ydb/core/fq/libs/row_dispatcher/protos/events.proto index 5d56a866811..f8dcbcf07e2 100644 --- a/ydb/core/fq/libs/row_dispatcher/protos/events.proto +++ b/ydb/core/fq/libs/row_dispatcher/protos/events.proto @@ -11,25 +11,34 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; message TEvGetAddressRequest { NYql.NPq.NProto.TDqPqTopicSource Source = 1; - repeated uint32 PartitionId = 2; + repeated uint32 PartitionId = 2 [deprecated=true]; + repeated uint32 PartitionIds = 3; } message TEvPartitionAddress { - repeated uint32 PartitionId = 1; + repeated uint32 PartitionId = 1 [deprecated=true]; NActorsProto.TActorId ActorId = 2; + repeated uint32 PartitionIds = 3; } message TEvGetAddressResponse { repeated TEvPartitionAddress Partitions = 1; } +message TPartitionOffset { + uint32 PartitionId = 1; + uint64 Offset = 2; +} + message TEvStartSession { NYql.NPq.NProto.TDqPqTopicSource Source = 1; - uint32 PartitionId = 2; + uint32 PartitionId = 2 [deprecated=true]; string Token = 3; - optional uint64 Offset = 4; + optional uint64 Offset = 4 [deprecated=true]; uint64 StartingMessageTimestampMs = 5; string QueryId = 6; + repeated uint32 PartitionIds = 7; + repeated TPartitionOffset Offsets = 8; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } @@ -50,7 +59,7 @@ message TEvNewDataArrived { message TEvStopSession { NYql.NPq.NProto.TDqPqTopicSource Source = 1; - uint32 PartitionId = 2; + uint32 PartitionId = 2 [deprecated=true]; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } @@ -68,16 +77,22 @@ message TEvMessageBatch { optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } -message TEvStatistics { +message TPartitionStatistics { uint32 PartitionId = 1; uint64 NextMessageOffset = 2; +} + +message TEvStatistics { + uint32 PartitionId = 1; // deprecated + uint64 NextMessageOffset = 2; // deprecated uint64 ReadBytes = 3; + repeated TPartitionStatistics Partition = 4; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } message TEvSessionError { reserved 1; - uint32 PartitionId = 2; + uint32 PartitionId = 2 [deprecated=true]; NYql.NDqProto.StatusIds.StatusCode StatusCode = 3; repeated Ydb.Issue.IssueMessage Issues = 4; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 935f3c0a04f..9d3daba2ead 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -63,6 +63,7 @@ struct TEvPrivate { EvUpdateMetrics, EvPrintStateToLog, EvTryConnect, + EvSendStatistic, EvEnd }; @@ -75,6 +76,7 @@ struct TEvPrivate { : NodeId(nodeId) {} ui32 NodeId = 0; }; + struct TEvSendStatistic : public NActors::TEventLocal<TEvSendStatistic, EvSendStatistic> {}; }; struct TQueryStatKey { @@ -98,14 +100,14 @@ struct TQueryStatKeyHash { }; struct TAggQueryStat { - NYql::TCounters::TEntry ReadBytes; + NYql::TCounters::TEntry FilteredReadBytes; NYql::TCounters::TEntry UnreadBytes; NYql::TCounters::TEntry UnreadRows; NYql::TCounters::TEntry ReadLagMessages; bool IsWaiting = false; void Add(const TTopicSessionClientStatistic& stat) { - ReadBytes.Add(NYql::TCounters::TEntry(stat.ReadBytes)); + FilteredReadBytes.Add(NYql::TCounters::TEntry(stat.FilteredReadBytes)); UnreadBytes.Add(NYql::TCounters::TEntry(stat.UnreadBytes)); UnreadRows.Add(NYql::TCounters::TEntry(stat.UnreadRows)); ReadLagMessages.Add(NYql::TCounters::TEntry(stat.ReadLagMessages)); @@ -120,30 +122,7 @@ ui64 MaxSessionBufferSizeBytes = 16000000; class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { - struct ConsumerSessionKey { - TActorId ReadActorId; - ui32 PartitionId; - - size_t Hash() const noexcept { - ui64 hash = std::hash<TActorId>()(ReadActorId); - hash = CombineHashes<ui64>(hash, std::hash<ui32>()(PartitionId)); - return hash; - } - bool operator==(const ConsumerSessionKey& other) const { - return ReadActorId == other.ReadActorId && PartitionId == other.PartitionId; - } - TString ToString() const { - return TString::Join(ReadActorId.ToString(), "/", ::ToString(PartitionId)); - } - }; - - struct ConsumerSessionKeyHash { - int operator()(const ConsumerSessionKey& k) const { - return k.Hash(); - } - }; - - struct TopicSessionKey { + struct TTopicSessionKey { TString ReadGroup; TString Endpoint; TString Database; @@ -158,14 +137,14 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId)); return hash; } - bool operator==(const TopicSessionKey& other) const { + bool operator==(const TTopicSessionKey& other) const { return ReadGroup == other.ReadGroup && Endpoint == other.Endpoint && Database == other.Database && TopicPath == other.TopicPath && PartitionId == other.PartitionId; } }; - struct TopicSessionKeyHash { - int operator()(const TopicSessionKey& k) const { + struct TTopicSessionKeyHash { + int operator()(const TTopicSessionKey& k) const { return k.Hash(); } }; @@ -295,27 +274,32 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { TNodesTracker NodesTracker; TAggregatedStats AggrStats; - struct ConsumerCounters { + struct TConsumerCounters { ui64 NewDataArrived = 0; ui64 GetNextBatch = 0; ui64 MessageBatch = 0; }; - struct ConsumerInfo { - ConsumerInfo( + struct TConsumerPartition { + bool PendingGetNextBatch = false; + bool PendingNewDataArrived = false; + TActorId TopicSessionId; + TTopicSessionClientStatistic Stat; + bool StatisticsUpdated = false; + }; + + struct TConsumerInfo { + TConsumerInfo( NActors::TActorId readActorId, NActors::TActorId selfId, ui64 eventQueueId, NFq::NRowDispatcherProto::TEvStartSession& proto, - TActorId topicSessionId, bool alreadyConnected, ui64 generation) : ReadActorId(readActorId) , SourceParams(proto.GetSource()) - , PartitionId(proto.GetPartitionId()) , EventQueueId(eventQueueId) , Proto(proto) - , TopicSessionId(topicSessionId) , QueryId(proto.GetQueryId()) , Generation(generation) { EventsQueue.Init("txId", selfId, selfId, eventQueueId, /* KeepAlive */ true, /* UseConnect */ false); @@ -324,39 +308,36 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { NActors::TActorId ReadActorId; NYql::NPq::NProto::TDqPqTopicSource SourceParams; - ui64 PartitionId; NYql::NDq::TRetryEventsQueue EventsQueue; ui64 EventQueueId; NFq::NRowDispatcherProto::TEvStartSession Proto; - TActorId TopicSessionId; + THashMap<ui32, TConsumerPartition> Partitions; const TString QueryId; - ConsumerCounters Counters; - bool PendingGetNextBatch = false; - bool PendingNewDataArrived = false; + TConsumerCounters Counters; TTopicSessionClientStatistic Stat; ui64 Generation; }; - struct SessionInfo { - TMap<TActorId, TAtomicSharedPtr<ConsumerInfo>> Consumers; // key - ReadActor actor id + struct TSessionInfo { + TMap<TActorId, TAtomicSharedPtr<TConsumerInfo>> Consumers; // key - ReadActor actor id TTopicSessionCommonStatistic Stat; // Increments NYql::TCounters::TEntry AggrReadBytes; }; - struct TopicSessionInfo { - TMap<TActorId, SessionInfo> Sessions; // key - TopicSession actor id + struct TTopicSessionInfo { + TMap<TActorId, TSessionInfo> Sessions; // key - TopicSession actor id }; - struct ReadActorInfo { + struct TReadActorInfo { TString InternalState; TInstant RequestTime; TInstant ResponseTime; }; - THashMap<ConsumerSessionKey, TAtomicSharedPtr<ConsumerInfo>, ConsumerSessionKeyHash> Consumers; - TMap<ui64, TAtomicSharedPtr<ConsumerInfo>> ConsumersByEventQueueId; - THashMap<TopicSessionKey, TopicSessionInfo, TopicSessionKeyHash> TopicSessions; - TMap<TActorId, ReadActorInfo> ReadActorsInternalState; + THashMap<NActors::TActorId, TAtomicSharedPtr<TConsumerInfo>> Consumers; // key - read actor id + TMap<ui64, TAtomicSharedPtr<TConsumerInfo>> ConsumersByEventQueueId; + THashMap<TTopicSessionKey, TTopicSessionInfo, TTopicSessionKeyHash> TopicSessions; + TMap<TActorId, TReadActorInfo> ReadActorsInternalState; public: explicit TRowDispatcher( @@ -389,7 +370,6 @@ public: void Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); - void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev); @@ -400,15 +380,16 @@ public: void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&); void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&); void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&); + void Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&); void Handle(const NMon::TEvHttpInfo::TPtr&); - void DeleteConsumer(const ConsumerSessionKey& key); + void DeleteConsumer(NActors::TActorId readActorId); void UpdateMetrics(); TString GetInternalState(); TString GetReadActorsInternalState(); void UpdateReadActorsInternalState(); template <class TEventPtr> - bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev); + bool CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev); void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax); void PrintStateToLog(); @@ -427,7 +408,6 @@ public: hFunc(NFq::TEvRowDispatcher::TEvStopSession, Handle); hFunc(NFq::TEvRowDispatcher::TEvNoSession, Handle); hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); - hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateResponse, Handle); hFunc(TEvPrivate::TEvTryConnect, Handle); @@ -436,6 +416,7 @@ public: hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle); hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle); hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle); + hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle); hFunc(NMon::TEvHttpInfo, Handle); }) }; @@ -479,6 +460,7 @@ void TRowDispatcher::Bootstrap() { Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing()); Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics()); Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog()); + Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatistic()); if (Monitoring) { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(FQ_ROW_DISPATCHER_PROVIDER)); @@ -529,7 +511,7 @@ void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TP void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { LWPROBE(UndeliveredStart, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie); - LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason); + LOG_ROW_DISPATCHER_TRACE("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason); for (auto& [key, consumer] : Consumers) { if (ev->Cookie != consumer->Generation) { // Several partitions in one read_actor have different Generation. LWPROBE(UndeliveredSkipGeneration, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie, consumer->Generation); @@ -537,7 +519,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { } if (consumer->EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { LWPROBE(UndeliveredDeleteConsumer, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie, key.ToString()); - DeleteConsumer(key); + DeleteConsumer(ev->Sender); break; } } @@ -648,6 +630,7 @@ TString TRowDispatcher::GetInternalState() { auto printDataRate = [&](NYql::TCounters::TEntry entry) { str << " (sum " << toHumanDR(entry.Sum) << " max " << toHumanDR(entry.Max) << " min " << toHumanDR(entry.Min) << ")"; }; + str << "SelfId: " << SelfId().ToString() << "\n"; str << "Consumers count: " << Consumers.size() << "\n"; str << "TopicSessions count: " << TopicSessions.size() << "\n"; str << "Max session buffer size: " << toHuman(MaxSessionBufferSizeBytes) << "\n"; @@ -682,7 +665,7 @@ TString TRowDispatcher::GetInternalState() { auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0; str << " " << queryId << " / " << readGroup << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate"; if (aggStat) { - printDataRate(aggStat->ReadBytes); + printDataRate(aggStat->FilteredReadBytes); } str << " waiting " << stat.IsWaiting << " max read lag " << stat.ReadLagMessages.Max; str << "\n"; @@ -704,23 +687,33 @@ TString TRowDispatcher::GetInternalState() { } for (auto& [readActorId, consumer] : sessionInfo.Consumers) { + const auto& partition = consumer->Partitions[key.PartitionId]; str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes " << toHuman(consumer->Stat.UnreadBytes) << " (" << leftPad(consumer->Stat.UnreadRows) << " rows) " << " offset " << leftPad(consumer->Stat.Offset) << " init offset " << leftPad(consumer->Stat.InitialOffset) << " get " << leftPad(consumer->Counters.GetNextBatch) << " arr " << leftPad(consumer->Counters.NewDataArrived) << " btc " << leftPad(consumer->Counters.MessageBatch) - << " pend get " << leftPad(consumer->PendingGetNextBatch) << " pend new " << leftPad(consumer->PendingNewDataArrived) + << " pend get " << leftPad(partition.PendingGetNextBatch) << " pend new " << leftPad(partition.PendingNewDataArrived) << " waiting " << consumer->Stat.IsWaiting << " read lag " << leftPad(consumer->Stat.ReadLagMessages) - << " conn id " << consumer->Generation << " "; - str << " retry queue: "; - consumer->EventsQueue.PrintInternalState(str); - + << " conn id " << consumer->Generation << "\n"; maxInitialOffset = std::max(maxInitialOffset, consumer->Stat.InitialOffset); minInitialOffset = std::min(minInitialOffset, consumer->Stat.InitialOffset); } str << " initial offset max " << leftPad(maxInitialOffset) << " min " << leftPad(minInitialOffset) << "\n";; } } + + str << "Consumers:\n"; + for (auto& [readActorId, consumer] : Consumers) { + str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " Generation " << consumer->Generation << "\n"; + str << " partitions: "; + for (const auto& [partitionId, info] : consumer->Partitions) { + str << partitionId << ","; + } + str << "\n retry queue: "; + consumer->EventsQueue.PrintInternalState(str); + } + return str.Str(); } @@ -734,8 +727,8 @@ TString TRowDispatcher::GetReadActorsInternalState() { void TRowDispatcher::UpdateReadActorsInternalState() { TSet<TActorId> ReadActors; - for (const auto& [key, _]: Consumers) { - ReadActors.insert(key.ReadActorId); + for (const auto& [readActorId, _]: Consumers) { + ReadActors.insert(readActorId); } for(auto it = ReadActorsInternalState.begin(); it != ReadActorsInternalState.end();) { @@ -759,21 +752,15 @@ void TRowDispatcher::UpdateReadActorsInternalState() { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << - " part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie); + " part id " << JoinSeq(',', ev->Get()->Record.GetPartitionIds()) << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie); auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup())); topicGroup->GetCounter("StartSession", true)->Inc(); - LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong()); + LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong()); NodesTracker.AddNode(ev->Sender.NodeId()); - TMaybe<ui64> readOffset; - if (ev->Get()->Record.HasOffset()) { - readOffset = ev->Get()->Record.GetOffset(); - } - - ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Sender); if (it != Consumers.end()) { if (ev->Cookie <= it->second->Generation) { LOG_ROW_DISPATCHER_WARN("Consumer already exists, ignore StartSession"); @@ -781,80 +768,90 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { } LOG_ROW_DISPATCHER_WARN("Consumer already exists, new consumer with new generation (" << ev->Cookie << ", current " << it->second->Generation << "), remove old consumer, sender " << ev->Sender << ", topicPath " - << ev->Get()->Record.GetSource().GetTopicPath() <<" partitionId " << ev->Get()->Record.GetPartitionId() << " cookie " << ev->Cookie); - DeleteConsumer(key); + << ev->Get()->Record.GetSource().GetTopicPath() << " cookie " << ev->Cookie); + DeleteConsumer(ev->Sender); } const auto& source = ev->Get()->Record.GetSource(); - TActorId sessionActorId; - TopicSessionKey topicKey{source.GetReadGroup(), source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), ev->Get()->Record.GetPartitionId()}; - TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; - Y_ENSURE(topicSessionInfo.Sessions.size() <= 1); + auto consumerInfo = MakeAtomicShared<TConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, + NodesTracker.GetNodeConnected(ev->Sender.NodeId()), ev->Cookie); - auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()), ev->Cookie); - Consumers[key] = consumerInfo; + Consumers[ev->Sender] = consumerInfo; ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo; if (!CheckSession(consumerInfo, ev)) { return; } - if (topicSessionInfo.Sessions.empty()) { - LOG_ROW_DISPATCHER_DEBUG("Create new session: read group " << source.GetReadGroup() << " topic " << source.GetTopicPath() - << " part id " << ev->Get()->Record.GetPartitionId() << " offset " << readOffset); - sessionActorId = ActorFactory->RegisterTopicSession( - source.GetReadGroup(), - source.GetTopicPath(), - source.GetEndpoint(), - source.GetDatabase(), - Config, - SelfId(), - CompileServiceActorId, - ev->Get()->Record.GetPartitionId(), - YqSharedResources->UserSpaceYdbDriver, - CreateCredentialsProviderFactoryForStructuredToken( - CredentialsFactory, - ev->Get()->Record.GetToken(), - source.GetAddBearerToToken()), - Counters, - CountersRoot, - PqGateway, - MaxSessionBufferSizeBytes - ); - SessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId]; - sessionInfo.Consumers[ev->Sender] = consumerInfo; - } else { - auto sessionIt = topicSessionInfo.Sessions.begin(); - SessionInfo& sessionInfo = sessionIt->second; - sessionInfo.Consumers[ev->Sender] = consumerInfo; - sessionActorId = sessionIt->first; + for (auto partitionId : ev->Get()->Record.GetPartitionIds()) { + TActorId sessionActorId; + TTopicSessionKey topicKey{source.GetReadGroup(), source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId}; + TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; + Y_ENSURE(topicSessionInfo.Sessions.size() <= 1); + + if (topicSessionInfo.Sessions.empty()) { + LOG_ROW_DISPATCHER_DEBUG("Create new session: read group " << source.GetReadGroup() << " topic " << source.GetTopicPath() + << " part id " << partitionId); + sessionActorId = ActorFactory->RegisterTopicSession( + source.GetReadGroup(), + source.GetTopicPath(), + source.GetEndpoint(), + source.GetDatabase(), + Config, + SelfId(), + CompileServiceActorId, + partitionId, + YqSharedResources->UserSpaceYdbDriver, + CreateCredentialsProviderFactoryForStructuredToken( + CredentialsFactory, + ev->Get()->Record.GetToken(), + source.GetAddBearerToToken()), + Counters, + CountersRoot, + PqGateway, + MaxSessionBufferSizeBytes + ); + TSessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId]; + sessionInfo.Consumers[ev->Sender] = consumerInfo; + } else { + auto sessionIt = topicSessionInfo.Sessions.begin(); + TSessionInfo& sessionInfo = sessionIt->second; + sessionInfo.Consumers[ev->Sender] = consumerInfo; + sessionActorId = sessionIt->first; + } + consumerInfo->Partitions[partitionId].TopicSessionId = sessionActorId; + + auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStartSession>(); + event->Record.CopyFrom(ev->Get()->Record); + Send(new IEventHandle(sessionActorId, ev->Sender, event.release(), 0)); } - consumerInfo->TopicSessionId = sessionActorId; consumerInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvStartSessionAck(consumerInfo->Proto), consumerInfo->Generation); - - Forward(ev, sessionActorId); Metrics.ClientsCount->Set(Consumers.size()); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { - ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Sender); if (it == Consumers.end()) { LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvGetNextBatch from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId()); return; } - LWPROBE(GetNextBatch, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong()); + auto& session = it->second; + LWPROBE(GetNextBatch, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), session->QueryId, ev->Get()->Record.ByteSizeLong()); LOG_ROW_DISPATCHER_TRACE("Received TEvGetNextBatch from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId); - if (!CheckSession(it->second, ev)) { + if (!CheckSession(session, ev)) { + return; + } + auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId()); + if (partitionIt == session->Partitions.end()) { + LOG_ROW_DISPATCHER_ERROR("Ignore TEvGetNextBatch from " << ev->Sender << ", wrong partition id " << ev->Get()->Record.GetPartitionId()); return; } - it->second->PendingNewDataArrived = false; - it->second->PendingGetNextBatch = true; - it->second->Counters.GetNextBatch++; - Forward(ev, it->second->TopicSessionId); + partitionIt->second.PendingNewDataArrived = false; + partitionIt->second.PendingGetNextBatch = true; + session->Counters.GetNextBatch++; + Forward(ev, partitionIt->second.TopicSessionId); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) { - ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Sender); if (it == Consumers.end()) { LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId()); return; @@ -868,12 +865,11 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Received TEvNoSession from " << ev->Sender << ", cookie " << ev->Cookie); - ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()}; - DeleteConsumer(key); + DeleteConsumer(ev->Sender); } template <class TEventPtr> -bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev) { +bool TRowDispatcher::CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev) { if (ev->Cookie != consumer->Generation) { LOG_ROW_DISPATCHER_WARN("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId); return false; @@ -887,56 +883,51 @@ bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, cons } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { - ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Sender); if (it == Consumers.end()) { - LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId()); - return; - } - LWPROBE(StopSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong()); - LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << - " partitionId " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId); - const auto& consumer = it->second; - if (ev->Cookie != consumer->Generation) { - LOG_ROW_DISPATCHER_WARN("Wrong message generation, ignore TEvStopSession, sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId); + LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender); return; } + + LWPROBE(StopSession, ev->Sender.ToString(), it->second->QueryId, ev->Get()->Record.ByteSizeLong()); + LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId); if (!CheckSession(it->second, ev)) { return; } - DeleteConsumer(key); + DeleteConsumer(ev->Sender); } -void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) { - auto consumerIt = Consumers.find(key); +void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) { + auto consumerIt = Consumers.find(readActorId); if (consumerIt == Consumers.end()) { - LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId); + LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << readActorId); return; } const auto& consumer = consumerIt->second; - LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << key.ReadActorId << " partitionId " << key.PartitionId << " query id " << consumer->QueryId); - auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); - *event->Record.MutableSource() = consumer->SourceParams; - event->Record.SetPartitionId(consumer->PartitionId); - Send(new IEventHandle(consumerIt->second->TopicSessionId, consumer->ReadActorId, event.release(), 0)); + LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId); + for (auto& [partitionId, partition] : consumer->Partitions) { + auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); + *event->Record.MutableSource() = consumer->SourceParams; + Send(new IEventHandle(partition.TopicSessionId, consumer->ReadActorId, event.release(), 0)); - TopicSessionKey topicKey{ - consumer->SourceParams.GetReadGroup(), - consumer->SourceParams.GetEndpoint(), - consumer->SourceParams.GetDatabase(), - consumer->SourceParams.GetTopicPath(), - consumer->PartitionId}; - TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; - SessionInfo& sessionInfo = topicSessionInfo.Sessions[consumerIt->second->TopicSessionId]; - Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId)); - sessionInfo.Consumers.erase(consumer->ReadActorId); - if (sessionInfo.Consumers.empty()) { - LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << consumerIt->second->TopicSessionId); - topicSessionInfo.Sessions.erase(consumerIt->second->TopicSessionId); - Send(consumerIt->second->TopicSessionId, new NActors::TEvents::TEvPoisonPill()); - if (topicSessionInfo.Sessions.empty()) { - TopicSessions.erase(topicKey); + TTopicSessionKey topicKey{ + consumer->SourceParams.GetReadGroup(), + consumer->SourceParams.GetEndpoint(), + consumer->SourceParams.GetDatabase(), + consumer->SourceParams.GetTopicPath(), + partitionId}; + TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey]; + TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId]; + Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId)); + sessionInfo.Consumers.erase(consumer->ReadActorId); + if (sessionInfo.Consumers.empty()) { + LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId); + topicSessionInfo.Sessions.erase(partition.TopicSessionId); + Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill()); + if (topicSessionInfo.Sessions.empty()) { + TopicSessions.erase(topicKey); + } } } ConsumersByEventQueueId.erase(consumerIt->second->EventQueueId); @@ -957,70 +948,56 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbea return; } auto& sessionInfo = it->second; - LWPROBE(PrivateHeartbeat, ev->Sender.ToString(), sessionInfo->PartitionId, sessionInfo->QueryId, sessionInfo->Generation); + LWPROBE(PrivateHeartbeat, ev->Sender.ToString(), sessionInfo->QueryId, sessionInfo->Generation); bool needSend = sessionInfo->EventsQueue.Heartbeat(); if (needSend) { LOG_ROW_DISPATCHER_TRACE("Send TEvHeartbeat to " << sessionInfo->ReadActorId << " query id " << sessionInfo->QueryId); - auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>(sessionInfo->PartitionId); + auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>(); Send(new IEventHandle(sessionInfo->ReadActorId, SelfId(), event.release(), 0, sessionInfo->Generation)); } } -void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { - ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); +void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { + auto it = Consumers.find(ev->Get()->ReadActorId); if (it == Consumers.end()) { LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvNewDataArrived from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId()); return; } LWPROBE(NewDataArrived, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); - it->second->PendingNewDataArrived = true; + auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()]; + partition.PendingNewDataArrived = true; it->second->Counters.NewDataArrived++; it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) { - ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Get()->ReadActorId); if (it == Consumers.end()) { - LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvMessageBatch from " << ev->Sender); + LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId); return; } LWPROBE(MessageBatch, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize()); - it->second->PendingGetNextBatch = false; + auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()]; + partition.PendingGetNextBatch = false; it->second->Counters.MessageBatch++; it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) { - ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); + auto it = Consumers.find(ev->Get()->ReadActorId); if (it == Consumers.end()) { - LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId()); + LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId); return; } - LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); + LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); ++*Metrics.ErrorsCount; - LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId); - it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); - DeleteConsumer(key); -} - -void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvStatistics from " << ev->Sender); - ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()}; - auto it = Consumers.find(key); - if (it == Consumers.end()) { - LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvStatistics from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId()); - return; - } - LWPROBE(Statistics, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong()); - LOG_ROW_DISPATCHER_TRACE("Forward TEvStatus from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId); + LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId); it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation); + DeleteConsumer(ev->Get()->ReadActorId); } void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) { @@ -1038,12 +1015,35 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) { void TRowDispatcher::PrintStateToLog() { auto str = GetInternalState(); auto buf = TStringBuf(str); - i64 size = buf.size(); - ui64 offset = 0; - while (size > 0) { + for (ui64 offset = 0; offset < buf.size(); offset += PrintStateToLogSplitSize) { LOG_ROW_DISPATCHER_DEBUG(buf.SubString(offset, PrintStateToLogSplitSize)); - offset += PrintStateToLogSplitSize; - size -= PrintStateToLogSplitSize; + } +} + +void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) { + LOG_ROW_DISPATCHER_TRACE("TEvPrivate::TEvSendStatistic"); + + Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatistic()); + for (auto& [actorId, consumer] : Consumers) { + if (!NodesTracker.GetNodeConnected(actorId.NodeId())) { + continue; // Wait Connected to prevent retry_queue increases. + } + auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>(); + ui64 readBytes = 0; + for (auto& [partitionId, partition] : consumer->Partitions) { + if (!partition.StatisticsUpdated) { + continue; + } + auto* partitionsProto = event->Record.AddPartition(); + partitionsProto->SetPartitionId(partitionId); + partitionsProto->SetNextMessageOffset(partition.Stat.Offset); + readBytes += partition.Stat.ReadBytes; + partition.Stat.Clear(); + partition.StatisticsUpdated = false; + } + event->Record.SetReadBytes(readBytes); + LWPROBE(Statistics, consumer->ReadActorId.ToString(), consumer->QueryId, consumer->Generation, event->Record.ByteSizeLong()); + consumer->EventsQueue.Send(event.release(), consumer->Generation); } } @@ -1080,7 +1080,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev stat.Common.RestartSessionByOffsets, stat.Common.ReadEvents, stat.Common.LastReadedOffset); - TopicSessionKey sessionKey{key.ReadGroup, key.Endpoint, key.Database, key.TopicPath, key.PartitionId}; + TTopicSessionKey sessionKey{key.ReadGroup, key.Endpoint, key.Database, key.TopicPath, key.PartitionId}; auto sessionsIt = TopicSessions.find(sessionKey); if (sessionsIt == TopicSessions.end()) { @@ -1101,6 +1101,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev } auto consumerInfoPtr = it->second; consumerInfoPtr->Stat.Add(clientStat); + auto partitionIt = consumerInfoPtr->Partitions.find(key.PartitionId); + if (partitionIt == consumerInfoPtr->Partitions.end()) { + continue; + } + partitionIt->second.Stat.Add(clientStat); + partitionIt->second.StatisticsUpdated = true; } } diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 333deacc279..5aac6c57b13 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -57,8 +57,7 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE), EvPqEventsReady = EvBegin + 10, EvCreateSession, - EvSendStatisticToReadActor, - EvSendStatisticToRowDispatcher, + EvSendStatistic, EvReconnectSession, EvEnd }; @@ -67,8 +66,7 @@ struct TEvPrivate { // Events struct TEvPqEventsReady : public TEventLocal<TEvPqEventsReady, EvPqEventsReady> {}; struct TEvCreateSession : public TEventLocal<TEvCreateSession, EvCreateSession> {}; - struct TEvSendStatisticToRowDispatcher : public TEventLocal<TEvSendStatisticToRowDispatcher, EvSendStatisticToRowDispatcher> {}; - struct TEvSendStatisticToReadActor : public TEventLocal<TEvSendStatisticToReadActor, EvSendStatisticToReadActor> {}; + struct TEvSendStatistic : public TEventLocal<TEvSendStatistic, EvSendStatistic> {}; struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {}; }; @@ -96,7 +94,7 @@ private: struct TClientsInfo : public IClientDataConsumer { using TPtr = TIntrusivePtr<TClientsInfo>; - TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup) + TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe<ui64> offset) : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) @@ -104,9 +102,9 @@ private: , ReadActorId(ev->Sender) , Counters(counters) { - if (Settings.HasOffset()) { - NextMessageOffset = Settings.GetOffset(); - InitialOffset = Settings.GetOffset(); + if (offset) { + NextMessageOffset = *offset; + InitialOffset = *offset; } Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod)); auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); @@ -198,7 +196,7 @@ private: // Metrics ui64 InitialOffset = 0; - TStats Stat; // Send (filtered) to read_actor + TStats FilteredStat; const ::NMonitoring::TDynamicCounterPtr Counters; NMonitoring::TDynamicCounters::TCounterPtr FilteredDataRate; // filtered NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsetsByQuery; @@ -252,8 +250,7 @@ private: // Metrics TInstant WaitEventStartedAt; ui64 RestartSessionByOffsets = 0; - TStats SessionStats; - TStats ClientsStats; + TStats Statistics; TTopicSessionMetrics Metrics; const ::NMonitoring::TDynamicCounterPtr Counters; const ::NMonitoring::TDynamicCounterPtr CountersRoot; @@ -300,16 +297,16 @@ private: void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&); void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&); void Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&); - void Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&); - void Handle(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher::TPtr&); + void Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&); void Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr&); void Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); void HandleException(const std::exception& err); - void SendStatisticToRowDispatcher(); - void SendSessionError(TActorId readActorId, TStatus status); + void SendStatistics(); bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); + TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings); + void SendSessionError(TActorId readActorId, TStatus status); void RestartSessionIfOldestClient(const TClientsInfo& info); private: @@ -317,8 +314,7 @@ private: STRICT_STFUNC_EXC(StateFunc, hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle); hFunc(NFq::TEvPrivate::TEvCreateSession, Handle); - hFunc(NFq::TEvPrivate::TEvSendStatisticToReadActor, Handle); - hFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher, Handle); + hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle); hFunc(NFq::TEvPrivate::TEvReconnectSession, Handle); hFunc(TEvRowDispatcher::TEvGetNextBatch, Handle); hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle); @@ -331,11 +327,10 @@ private: cFunc(TEvents::TEvPoisonPill::EventType, PassAway); IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady); IgnoreFunc(NFq::TEvPrivate::TEvCreateSession); - IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToReadActor); IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch); IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession); IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession); - IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher);, + IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);, ExceptionFunc(std::exception, HandleException) ) }; @@ -380,8 +375,7 @@ void TTopicSession::Bootstrap() { LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition << ", Timeout " << Config.GetTimeoutBeforeStartSessionSec() << " sec, StatusPeriod " << Config.GetSendStatusPeriodSec() << " sec"); Y_ENSURE(Config.GetSendStatusPeriodSec() > 0); - Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor()); - Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatisticToRowDispatcher()); + Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatistic()); } void TTopicSession::PassAway() { @@ -497,27 +491,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&) { - LOG_ROW_DISPATCHER_TRACE("TEvSendStatisticToReadActor"); - Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor()); - - auto readBytes = ClientsStats.Bytes; - for (auto& [actorId, infoPtr] : Clients) { - auto& info = *infoPtr; - if (!info.ProcessedNextMessageOffset) { - continue; - } - auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>(); - event->Record.SetPartitionId(PartitionId); - event->Record.SetNextMessageOffset(*info.ProcessedNextMessageOffset); - event->Record.SetReadBytes(readBytes); - event->ReadActorId = info.ReadActorId; - LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << info.ProcessedNextMessageOffset); - Send(RowDispatcherActorId, event.release()); - } - ClientsStats.Clear(); -} - void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) { Metrics.ReconnectRate->Inc(); TInstant minTime = GetMinStartingMessageTimestamp(); @@ -581,8 +554,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE Self.LastMessageOffset = message.GetOffset(); } - Self.SessionStats.Add(dataSize, event.GetMessages().size()); - Self.ClientsStats.Add(dataSize, event.GetMessages().size()); + Self.Statistics.Add(dataSize, event.GetMessages().size()); Self.Metrics.SessionDataRate->Add(dataSize); Self.Metrics.AllSessionsDataRate->Add(dataSize); DataReceivedEventSize += dataSize; @@ -693,15 +665,16 @@ void TTopicSession::SendData(TClientsInfo& info) { info.UnreadRows = 0; info.UnreadBytes = 0; - info.Stat.Add(dataSize, eventsSize); + info.FilteredStat.Add(dataSize, eventsSize); info.FilteredDataRate->Add(dataSize); info.ProcessedNextMessageOffset = *info.NextMessageOffset; } void TTopicSession::StartClientSession(TClientsInfo& info) { if (ReadSession) { - if (info.Settings.HasOffset() && info.Settings.GetOffset() <= LastMessageOffset) { - LOG_ROW_DISPATCHER_INFO("New client has less offset (" << info.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); + auto offset = GetOffset(info.Settings); + if (offset && offset <= LastMessageOffset) { + LOG_ROW_DISPATCHER_INFO("New client has less offset (" << offset << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); Metrics.RestartSessionByOffsets->Inc(); ++RestartSessionByOffsets; info.RestartSessionByOffsetsByQuery->Inc(); @@ -715,8 +688,9 @@ void TTopicSession::StartClientSession(TClientsInfo& info) { } void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { + auto offset = GetOffset(ev->Get()->Record); const auto& source = ev->Get()->Record.GetSource(); - LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset()); + LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << offset); if (!CheckNewClient(ev)) { return; @@ -725,7 +699,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { const TString& format = source.GetFormat(); ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"}; - auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second; + auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second; auto formatIt = FormatHandlers.find(handlerSettings); if (formatIt == FormatHandlers.end()) { formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler( @@ -742,12 +716,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { } ConsumerName = source.GetConsumerName(); - SendStatisticToRowDispatcher(); + SendStatistics(); } void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { - LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << - " partitionId " << ev->Get()->Record.GetPartitionId() << " clients count " << Clients.size()); + LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " clients count " << Clients.size()); auto it = Clients.find(ev->Sender); if (it == Clients.end()) { @@ -824,7 +797,6 @@ void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) { auto event = std::make_unique<TEvRowDispatcher::TEvSessionError>(); event->Record.SetStatusCode(status.GetStatus()); NYql::IssuesToMessage(status.GetErrorDescription(), event->Record.MutableIssues()); - event->Record.SetPartitionId(PartitionId); event->ReadActorId = readActorId; Send(RowDispatcherActorId, event.release()); } @@ -858,15 +830,15 @@ void TTopicSession::HandleException(const std::exception& e) { FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Session error, got unexpected exception: " << e.what())); } -void TTopicSession::SendStatisticToRowDispatcher() { +void TTopicSession::SendStatistics() { + LOG_ROW_DISPATCHER_TRACE("SendStatistics"); TTopicSessionStatistic sessionStatistic; auto& commonStatistic = sessionStatistic.Common; commonStatistic.UnreadBytes = UnreadBytes; commonStatistic.RestartSessionByOffsets = RestartSessionByOffsets; - commonStatistic.ReadBytes = SessionStats.Bytes; - commonStatistic.ReadEvents = SessionStats.Events; + commonStatistic.ReadBytes = Statistics.Bytes; + commonStatistic.ReadEvents = Statistics.Events; commonStatistic.LastReadedOffset = LastMessageOffset; - SessionStats.Clear(); sessionStatistic.SessionKey = TTopicSessionParams{ReadGroup, Endpoint, Database, TopicPath, PartitionId}; sessionStatistic.Clients.reserve(Clients.size()); @@ -877,12 +849,13 @@ void TTopicSession::SendStatisticToRowDispatcher() { clientStatistic.ReadActorId = readActorId; clientStatistic.UnreadRows = info.UnreadRows; clientStatistic.UnreadBytes = info.UnreadBytes; - clientStatistic.Offset = info.NextMessageOffset.GetOrElse(0); - clientStatistic.ReadBytes = info.Stat.Bytes; + clientStatistic.Offset = info.ProcessedNextMessageOffset.GetOrElse(0); + clientStatistic.FilteredReadBytes = info.FilteredStat.Bytes; + clientStatistic.ReadBytes = Statistics.Bytes; clientStatistic.IsWaiting = LastMessageOffset + 1 < info.NextMessageOffset.GetOrElse(0); clientStatistic.ReadLagMessages = info.NextMessageOffset.GetOrElse(0) - LastMessageOffset - 1; clientStatistic.InitialOffset = info.InitialOffset; - info.Stat.Clear(); + info.FilteredStat.Clear(); sessionStatistic.Clients.emplace_back(std::move(clientStatistic)); } @@ -893,11 +866,12 @@ void TTopicSession::SendStatisticToRowDispatcher() { auto event = std::make_unique<TEvRowDispatcher::TEvSessionStatistic>(sessionStatistic); Send(RowDispatcherActorId, event.release()); + Statistics.Clear(); } -void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher::TPtr&) { - Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatisticToRowDispatcher()); - SendStatisticToRowDispatcher(); +void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) { + SendStatistics(); + Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatistic()); } bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { @@ -918,6 +892,16 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& return true; } +TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings) { + for (auto p : settings.GetOffsets()) { + if (p.GetPartitionId() != PartitionId) { + continue; + } + return p.GetOffset(); + } + return Nothing(); +} + } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index fcec8b0d673..340195f7328 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -137,29 +137,27 @@ public: return settings; } - void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId, ui64 generation = 1) { + void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, const std::set<ui32>& partitionIds, TActorId readActorId, ui64 generation = 1) { auto event = new NFq::TEvRowDispatcher::TEvStartSession( source, - partitionId, // partitionId + partitionIds, "Token", - Nothing(), // readOffset, + {}, // readOffset, 0, // StartingMessageTimestamp; "QueryId"); event->Record.MutableTransportMeta()->SetSeqNo(1); Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event, 0, generation)); } - void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) { + void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, TActorId readActorId) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); event->Record.MutableSource()->CopyFrom(source); - event->Record.SetPartitionId(partitionId); event->Record.MutableTransportMeta()->SetSeqNo(1); Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1)); } - void MockNoSession(ui64 partitionId, TActorId readActorId) { + void MockNoSession(TActorId readActorId) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>(); - event->Record.SetPartitionId(partitionId); Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1)); } @@ -177,9 +175,8 @@ public: Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation)); } - void MockSessionError(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) { + void MockSessionError(TActorId topicSessionId, TActorId readActorId) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvSessionError>(); - event->Record.SetPartitionId(partitionId); event->ReadActorId = readActorId; Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release())); } @@ -201,10 +198,9 @@ public: UNIT_ASSERT(eventHolder.Get() != nullptr); } - void ExpectStopSession(NActors::TActorId actorId, ui64 partitionId) { + void ExpectStopSession(NActors::TActorId actorId) { auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(actorId); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == partitionId); } void ExpectGetNextBatch(NActors::TActorId topicSessionId, ui64 partitionId) { @@ -230,10 +226,9 @@ public: UNIT_ASSERT(eventHolder.Get() != nullptr); } - void ExpectSessionError(NActors::TActorId readActorId, ui64 partitionId) { + void ExpectSessionError(NActors::TActorId readActorId) { auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvSessionError>(readActorId); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == partitionId); } NActors::TActorId ExpectRegisterTopicSession() { @@ -267,51 +262,51 @@ public: NYql::NPq::NProto::TDqPqTopicSource Source2 = BuildPqTopicSourceSettings("Endpoint2", "Database1", "topic", "connection_id1"); NYql::NPq::NProto::TDqPqTopicSource Source1Connection2 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id2"); - ui64 PartitionId0 = 0; - ui64 PartitionId1 = 1; + ui32 PartitionId0 = 0; + ui32 PartitionId1 = 1; }; Y_UNIT_TEST_SUITE(RowDispatcherTests) { Y_UNIT_TEST_F(OneClientOneSession, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1); auto topicSessionId = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); ExpectStartSession(topicSessionId); ProcessData(ReadActorId1, PartitionId0, topicSessionId); - MockStopSession(Source1, PartitionId0, ReadActorId1); - ExpectStopSession(topicSessionId, PartitionId0); + MockStopSession(Source1, ReadActorId1); + ExpectStopSession(topicSessionId); } Y_UNIT_TEST_F(TwoClientOneSession, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1); auto topicSessionId = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); ExpectStartSession(topicSessionId); - MockAddSession(Source1, PartitionId0, ReadActorId2); + MockAddSession(Source1, {PartitionId0}, ReadActorId2); ExpectStartSessionAck(ReadActorId2); ExpectStartSession(topicSessionId); ProcessData(ReadActorId1, PartitionId0, topicSessionId); ProcessData(ReadActorId2, PartitionId0, topicSessionId); - MockSessionError(PartitionId0, topicSessionId, ReadActorId1); - ExpectSessionError(ReadActorId1, PartitionId0); + MockSessionError(topicSessionId, ReadActorId1); + ExpectSessionError(ReadActorId1); - MockSessionError(PartitionId0, topicSessionId, ReadActorId2); - ExpectSessionError(ReadActorId2, PartitionId0); + MockSessionError(topicSessionId, ReadActorId2); + ExpectSessionError(ReadActorId2); } Y_UNIT_TEST_F(SessionError, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1); auto topicSessionId = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); ExpectStartSession(topicSessionId); - MockSessionError(PartitionId0, topicSessionId, ReadActorId1); - ExpectSessionError(ReadActorId1, PartitionId0); + MockSessionError(topicSessionId, ReadActorId1); + ExpectSessionError(ReadActorId1); } Y_UNIT_TEST_F(CoordinatorSubscribe, TFixture) { @@ -342,24 +337,18 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { Y_UNIT_TEST_F(TwoClients4Sessions, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1); + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1); auto topicSession1 = ExpectRegisterTopicSession(); - ExpectStartSessionAck(ReadActorId1); - ExpectStartSession(topicSession1); - - MockAddSession(Source1, PartitionId1, ReadActorId1); auto topicSession2 = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); + ExpectStartSession(topicSession1); ExpectStartSession(topicSession2); - MockAddSession(Source2, PartitionId0, ReadActorId2); + MockAddSession(Source2, {PartitionId0, PartitionId1}, ReadActorId2); auto topicSession3 = ExpectRegisterTopicSession(); - ExpectStartSessionAck(ReadActorId2); - ExpectStartSession(topicSession3); - - MockAddSession(Source2, PartitionId1, ReadActorId2); auto topicSession4 = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId2); + ExpectStartSession(topicSession3); ExpectStartSession(topicSession4); ProcessData(ReadActorId1, PartitionId0, topicSession1); @@ -367,69 +356,75 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ProcessData(ReadActorId2, PartitionId0, topicSession3); ProcessData(ReadActorId2, PartitionId1, topicSession4); - MockSessionError(PartitionId0, topicSession1, ReadActorId1); - ExpectSessionError(ReadActorId1, PartitionId0); + MockSessionError(topicSession1, ReadActorId1); + ExpectSessionError(ReadActorId1); ProcessData(ReadActorId1, PartitionId1, topicSession2); ProcessData(ReadActorId2, PartitionId0, topicSession3); ProcessData(ReadActorId2, PartitionId1, topicSession4); - MockStopSession(Source1, PartitionId1, ReadActorId1); - ExpectStopSession(topicSession2, PartitionId1); + MockStopSession(Source1, ReadActorId1); + ExpectStopSession(topicSession2); - MockStopSession(Source2, PartitionId0, ReadActorId2); - ExpectStopSession(topicSession3, PartitionId0); + MockStopSession(Source2, ReadActorId2); + ExpectStopSession(topicSession3); - MockStopSession(Source2, PartitionId1, ReadActorId2); - ExpectStopSession(topicSession4, PartitionId1); + MockStopSession(Source2, ReadActorId2); + ExpectStopSession(topicSession4); // Ignore data after StopSession MockMessageBatch(PartitionId1, topicSession4, ReadActorId2, 1); } Y_UNIT_TEST_F(ReinitConsumerIfNewGeneration, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1, 1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1, 1); auto topicSessionId = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); ExpectStartSession(topicSessionId); ProcessData(ReadActorId1, PartitionId0, topicSessionId); // ignore StartSession with same generation - MockAddSession(Source1, PartitionId0, ReadActorId1, 1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1, 1); // reinit consumer - MockAddSession(Source1, PartitionId0, ReadActorId1, 2); + MockAddSession(Source1, {PartitionId0}, ReadActorId1, 2); ExpectStartSessionAck(ReadActorId1, 2); } Y_UNIT_TEST_F(HandleTEvUndelivered, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1, 1); + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1, 1); auto topicSession1 = ExpectRegisterTopicSession(); + auto topicSession2 = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1, 1); ExpectStartSession(topicSession1); + ExpectStartSession(topicSession2); - MockAddSession(Source1, PartitionId1, ReadActorId1, 2); - auto topicSession2 = ExpectRegisterTopicSession(); - ExpectStartSessionAck(ReadActorId1, 2); + MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2, 1); + ExpectStartSessionAck(ReadActorId2, 1); + ExpectStartSession(topicSession1); ExpectStartSession(topicSession2); ProcessData(ReadActorId1, PartitionId0, topicSession1, 1); - ProcessData(ReadActorId1, PartitionId1, topicSession2, 2); - - MockUndelivered(ReadActorId1, 2); - ExpectStopSession(topicSession2, PartitionId1); + ProcessData(ReadActorId1, PartitionId1, topicSession2, 1); + ProcessData(ReadActorId2, PartitionId0, topicSession1, 1); + ProcessData(ReadActorId2, PartitionId1, topicSession2, 1); MockUndelivered(ReadActorId1, 1); - ExpectStopSession(topicSession1, PartitionId0); + ExpectStopSession(topicSession1); + ExpectStopSession(topicSession2); + + MockUndelivered(ReadActorId2, 1); + ExpectStopSession(topicSession1); + ExpectStopSession(topicSession2); } Y_UNIT_TEST_F(TwoClientTwoConnection, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId1); + MockAddSession(Source1, {PartitionId0}, ReadActorId1); auto session1 = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId1); ExpectStartSession(session1); - MockAddSession(Source1Connection2, PartitionId0, ReadActorId2); + MockAddSession(Source1Connection2, {PartitionId0}, ReadActorId2); auto session2 = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId2); ExpectStartSession(session2); @@ -437,22 +432,22 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) { ProcessData(ReadActorId1, PartitionId0, session1); ProcessData(ReadActorId2, PartitionId0, session2); - MockStopSession(Source1, PartitionId0, ReadActorId1); - ExpectStopSession(session1, PartitionId0); + MockStopSession(Source1, ReadActorId1); + ExpectStopSession(session1); - MockStopSession(Source1Connection2, PartitionId0, ReadActorId2); - ExpectStopSession(session2, PartitionId0); + MockStopSession(Source1Connection2, ReadActorId2); + ExpectStopSession(session2); } Y_UNIT_TEST_F(ProcessNoSession, TFixture) { - MockAddSession(Source1, PartitionId0, ReadActorId3); + MockAddSession(Source1, {PartitionId0}, ReadActorId3); auto topicSessionId = ExpectRegisterTopicSession(); ExpectStartSessionAck(ReadActorId3); ExpectStartSession(topicSessionId); ProcessData(ReadActorId3, PartitionId0, topicSessionId); - MockNoSession(PartitionId0, ReadActorId3); - ExpectStopSession(topicSessionId, PartitionId0); + MockNoSession(ReadActorId3); + ExpectStopSession(topicSessionId); } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 379fe1528fb..75af42af21b 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -88,11 +88,15 @@ public: } void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe<ui64> readOffset = Nothing(), bool expectedError = false) { + std::map<ui32, ui64> readOffsets; + if (readOffset) { + readOffsets[PartitionId] = *readOffset; + } auto event = new NFq::TEvRowDispatcher::TEvStartSession( source, - PartitionId, + {PartitionId}, "Token", - readOffset, // readOffset, + readOffsets, 0, // StartingMessageTimestamp; "QueryId"); Runtime.Send(new IEventHandle(TopicSession, readActorId, event)); @@ -131,7 +135,6 @@ public: void StopSession(NActors::TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); *event->Record.MutableSource() = source; - event->Record.SetPartitionId(PartitionId); Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release())); } @@ -183,15 +186,30 @@ public: return numberMessages; } - void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds, ui64 expectedNextMessageOffset) { - size_t count = readActorIds.size(); - for (size_t i = 0; i < count; ++i) { - auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); + void ExpectStatistics(TMap<NActors::TActorId, ui64> clients) { + auto check = [&]() -> bool { + auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvSessionStatistic>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId)); - readActorIds.erase(eventHolder->Get()->ReadActorId); - UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->Record.GetNextMessageOffset(), expectedNextMessageOffset); + if (clients.size() != eventHolder->Get()->Stat.Clients.size()) { + return false; + } + for (const auto& client : eventHolder->Get()->Stat.Clients) { + if (!clients.contains(client.ReadActorId)) { + return false; + } + if (clients[client.ReadActorId] != client.Offset) { + return false; + } + } + return true; + }; + auto start = TInstant::Now(); + while (TInstant::Now() - start < TDuration::Seconds(5)) { + if (check()) { + return; + } } + UNIT_FAIL("ExpectStatistics timeout"); } static TRow JsonMessage(ui64 index) { @@ -250,7 +268,7 @@ public: NActors::TActorId ReadActorId1; NActors::TActorId ReadActorId2; NActors::TActorId ReadActorId3; - ui64 PartitionId = 0; + ui32 PartitionId = 0; NConfig::TRowDispatcherConfig Config; TIntrusivePtr<IMockPqGateway> MockPqGateway; @@ -273,22 +291,26 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { Init(topicName); auto source = BuildSource(); StartSession(ReadActorId1, source); + ExpectStatistics({{ReadActorId1, 0}}); StartSession(ReadActorId2, source); + ExpectStatistics({{ReadActorId1, 0}, {ReadActorId2, 0}}); std::vector<TString> data = { Json1 }; PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); - ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); + ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}}); data = { Json2 }; PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); + + ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}}); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }); - ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2); + ExpectStatistics({{ReadActorId1, 2}, {ReadActorId2, 2}}); auto source2 = BuildSource(false, "OtherConsumer"); StartSession(ReadActorId3, source2, Nothing(), true); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 4b39715b2fe..6f0a1b9f628 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -83,6 +83,7 @@ struct TRowDispatcherReadActorMetrics { auto task = source->GetSubgroup("task_id", ToString(taskId)); InFlyGetNextBatch = task->GetCounter("InFlyGetNextBatch"); InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); + ReInit = task->GetCounter("ReInit", true); } ~TRowDispatcherReadActorMetrics() { @@ -94,6 +95,7 @@ struct TRowDispatcherReadActorMetrics { ::NMonitoring::TDynamicCounterPtr SubGroup; ::NMonitoring::TDynamicCounters::TCounterPtr InFlyGetNextBatch; ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; + ::NMonitoring::TDynamicCounters::TCounterPtr ReInit; }; struct TEvPrivate { @@ -111,9 +113,7 @@ struct TEvPrivate { class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase { const ui64 PrintStatePeriodSec = 300; - const ui64 ProcessStatePeriodSec = 2; - - using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>; + const ui64 SleepPeriodSec = 2; struct TReadyBatch { public: @@ -164,53 +164,68 @@ private: NActors::TActorId LocalRowDispatcherActorId; std::queue<TReadyBatch> ReadyBuffer; EState State = EState::INIT; + bool Inited = false; ui64 CoordinatorRequestCookie = 0; TRowDispatcherReadActorMetrics Metrics; - bool SchedulePrintStatePeriod = false; bool ProcessStateScheduled = false; bool InFlyAsyncInputData = false; TCounters Counters; - // Parsing info std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher) std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker; - struct SessionInfo { + THashMap<ui32, TMaybe<ui64>> NextOffsetFromRD; + + struct TPartition { + bool HasPendingData = false; + bool IsWaitingMessageBatch = false; + }; + + struct TSession { enum class ESessionStatus { - NoSession, - Started, + INIT, + WAIT_START_SESSION_ACK, + STARTED, }; - SessionInfo( + + TSession( const TTxId& txId, const NActors::TActorId selfId, TActorId rowDispatcherActorId, - ui64 partitionId, ui64 eventQueueId, ui64 generation) - : RowDispatcherActorId(rowDispatcherActorId) - , PartitionId(partitionId) - , Generation(generation) { - EventsQueue.Init(txId, selfId, selfId, eventQueueId, /* KeepAlive */ true); + : TxId(txId) + , SelfId(selfId) + , EventQueueId(eventQueueId) + , RowDispatcherActorId(rowDispatcherActorId) + , Generation(generation) + { + EventsQueue.Init(TxId, SelfId, SelfId, EventQueueId, /* KeepAlive */ true); EventsQueue.OnNewRecipientId(rowDispatcherActorId); } - ESessionStatus Status = ESessionStatus::NoSession; - ui64 NextOffset = 0; - bool IsWaitingStartSessionAck = false; + ESessionStatus Status = ESessionStatus::INIT; + const TTxId TxId; + const NActors::TActorId SelfId; + const ui64 EventQueueId; NYql::NDq::TRetryEventsQueue EventsQueue; - bool HasPendingData = false; - bool IsWaitingMessageBatch = false; TActorId RowDispatcherActorId; - ui64 PartitionId; - ui64 Generation; + ui64 Generation = std::numeric_limits<ui64>::max(); + THashMap<ui32, TPartition> Partitions; + bool IsWaitingStartSessionAck = false; }; - - TMap<ui64, SessionInfo> Sessions; + + TMap<NActors::TActorId, TSession> Sessions; + THashMap<ui64, NActors::TActorId> ReadActorByEventQueueId; const THolderFactory& HolderFactory; const i64 MaxBufferSize; i64 ReadyBufferSizeBytes = 0; ui64 NextGeneration = 0; + ui64 NextEventQueueId = 0; + + TMap<NActors::TActorId, TSet<ui32>> LastUsedPartitionDistribution; + TMap<NActors::TActorId, TSet<ui32>> LastReceivedPartitionDistribution; public: TDqPqRdReadActor( @@ -276,16 +291,22 @@ public: std::vector<ui64> GetPartitionsToRead() const; void AddMessageBatch(TRope&& serializedBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer); void ProcessState(); + void StopSession(TSession& sessionInfo); void Stop(NDqProto::StatusIds::StatusCode status, TIssues issues); - void StopSessions(); void ReInit(const TString& reason); void PrintInternalState(); + void TrySendGetNextBatch(TSession& sessionInfo); TString GetInternalState(); - void TrySendGetNextBatch(SessionInfo& sessionInfo); template <class TEventPtr> - bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId); - void SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie); + TSession* FindSession(const TEventPtr& ev); + void SendNoSession(const NActors::TActorId& recipient, ui64 cookie); void NotifyCA(); + void SendStartSession(TSession& sessionInfo); + void Init(); + void Sleep(); + void ProcessGlobalState(); + void ProcessSessionsState(); + void UpdateSessions(); }; TDqPqRdReadActor::TDqPqRdReadActor( @@ -332,36 +353,48 @@ TDqPqRdReadActor::TDqPqRdReadActor( DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(InputDataType); IngressStats.Level = statsLevel; - SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields())); + SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()) + << ", partitions: " << JoinSeq(',', GetPartitionsToRead())); } -void TDqPqRdReadActor::ProcessState() { +void TDqPqRdReadActor::Init() { + if (Inited) { + return; + } + LogPrefix = (TStringBuilder() << "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". PQ source. "); + + auto partitionToRead = GetPartitionsToRead(); + for (auto partitionId : partitionToRead) { + TPartitionKey partitionKey{TString{}, partitionId}; + const auto offsetIt = PartitionToOffset.find(partitionKey); + auto& nextOffset = NextOffsetFromRD[partitionId]; + if (offsetIt != PartitionToOffset.end()) { + nextOffset = offsetIt->second; + } + } + SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local RD (" << LocalRowDispatcherActorId << ")"); + Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); + + Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); + Inited = true; +} + +void TDqPqRdReadActor::ProcessGlobalState() { switch (State) { case EState::INIT: - LogPrefix = (TStringBuilder() << "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". PQ source. "); - if (!ReadyBuffer.empty()) { return; } - if (!ProcessStateScheduled) { - ProcessStateScheduled = true; - Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState()); - } if (!CoordinatorActorId) { SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId()); Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); - if (!SchedulePrintStatePeriod) { - SchedulePrintStatePeriod = true; - Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); - } + State = EState::WAIT_COORDINATOR_ID; } - State = EState::WAIT_COORDINATOR_ID; [[fallthrough]]; case EState::WAIT_COORDINATOR_ID: { if (!CoordinatorActorId) { return; } - State = EState::WAIT_PARTITIONS_ADDRES; auto partitionToRead = GetPartitionsToRead(); auto cookie = ++CoordinatorRequestCookie; SRC_LOG_I("Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString() << ", partIds: " @@ -371,70 +404,94 @@ void TDqPqRdReadActor::ProcessState() { new NFq::TEvRowDispatcher::TEvCoordinatorRequest(SourceParams, partitionToRead), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie); - return; + LastReceivedPartitionDistribution.clear(); + State = EState::WAIT_PARTITIONS_ADDRES; + [[fallthrough]]; } case EState::WAIT_PARTITIONS_ADDRES: - if (Sessions.empty()) { - return; - } - - for (auto& [partitionId, sessionInfo] : Sessions) { - if (sessionInfo.Status == SessionInfo::ESessionStatus::NoSession) { - TMaybe<ui64> readOffset; - TPartitionKey partitionKey{TString{}, partitionId}; - const auto offsetIt = PartitionToOffset.find(partitionKey); - if (offsetIt != PartitionToOffset.end()) { - SRC_LOG_D("ReadOffset found" ); - readOffset = offsetIt->second; - } - - SRC_LOG_I("Send TEvStartSession to " << sessionInfo.RowDispatcherActorId - << ", offset " << readOffset - << ", partitionId " << partitionId - << ", connection id " << sessionInfo.Generation); - - auto event = new NFq::TEvRowDispatcher::TEvStartSession( - SourceParams, - partitionId, - Token, - readOffset, - StartingMessageTimestamp.MilliSeconds(), - std::visit([](auto arg) { return ToString(arg); }, TxId)); - sessionInfo.EventsQueue.Send(event, sessionInfo.Generation); - sessionInfo.IsWaitingStartSessionAck = true; - sessionInfo.Status = SessionInfo::ESessionStatus::Started; - } + if (LastReceivedPartitionDistribution.empty()) { + break; } + UpdateSessions(); State = EState::STARTED; - return; + [[fallthrough]]; case EState::STARTED: - return; + break; } } +void TDqPqRdReadActor::ProcessSessionsState() { + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { + switch (sessionInfo.Status) { + case TSession::ESessionStatus::INIT: + SendStartSession(sessionInfo); + sessionInfo.IsWaitingStartSessionAck = true; + sessionInfo.Status = TSession::ESessionStatus::WAIT_START_SESSION_ACK; + [[fallthrough]]; + case TSession::ESessionStatus::WAIT_START_SESSION_ACK: + if (sessionInfo.IsWaitingStartSessionAck) { + break; + } + sessionInfo.Status = TSession::ESessionStatus::STARTED; + [[fallthrough]]; + case TSession::ESessionStatus::STARTED: + break; + } + } +} -void TDqPqRdReadActor::CommitState(const NDqProto::TCheckpoint& /*checkpoint*/) { +void TDqPqRdReadActor::ProcessState() { + ProcessGlobalState(); + ProcessSessionsState(); } -void TDqPqRdReadActor::StopSessions() { - SRC_LOG_I("Stop all session"); - for (auto& [partitionId, sessionInfo] : Sessions) { - if (sessionInfo.Status == SessionInfo::ESessionStatus::NoSession) { +void TDqPqRdReadActor::SendStartSession(TSession& sessionInfo) { + auto str = TStringBuilder() << "Send TEvStartSession to " << sessionInfo.RowDispatcherActorId + << ", connection id " << sessionInfo.Generation << " partitions offsets "; + + std::set<ui32> partitions; + std::map<ui32, ui64> partitionOffsets; + for (auto& [partitionId, partition] : sessionInfo.Partitions) { + partitions.insert(partitionId); + auto nextOffset = NextOffsetFromRD[partitionId]; + str << "(" << partitionId << " / "; + if (!nextOffset) { + str << "<empty>),"; continue; } - auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); - *event->Record.MutableSource() = SourceParams; - event->Record.SetPartitionId(partitionId); - SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId); - sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation); + partitionOffsets[partitionId] = *nextOffset; + str << nextOffset << "),"; } + SRC_LOG_I(str); + + auto event = new NFq::TEvRowDispatcher::TEvStartSession( + SourceParams, + partitions, + Token, + partitionOffsets, + StartingMessageTimestamp.MilliSeconds(), + std::visit([](auto arg) { return ToString(arg); }, TxId)); + sessionInfo.EventsQueue.Send(event, sessionInfo.Generation); +} + +void TDqPqRdReadActor::CommitState(const NDqProto::TCheckpoint& /*checkpoint*/) { +} + +void TDqPqRdReadActor::StopSession(TSession& sessionInfo) { + SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId + << " generation " << sessionInfo.Generation); + auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); + *event->Record.MutableSource() = SourceParams; + sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation); } // IActor & IDqComputeActorAsyncInput void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor SRC_LOG_I("PassAway"); PrintInternalState(); - StopSessions(); + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { + StopSession(sessionInfo); + } TActor<TDqPqRdReadActor>::PassAway(); // TODO: RetryQueue::Unsubscribe() @@ -442,10 +499,10 @@ void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& /*watermark*/, bool&, i64 freeSpace) { SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); + Init(); Metrics.InFlyAsyncInputData->Set(0); InFlyAsyncInputData = false; - ProcessState(); if (ReadyBuffer.empty() || !freeSpace) { return 0; } @@ -472,16 +529,14 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b if (!ReadyBuffer.empty()) { NotifyCA(); } - for (auto& [partitionId, sessionInfo] : Sessions) { + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { TrySendGetNextBatch(sessionInfo); } - ProcessState(); return usedSpace; } std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const { std::vector<ui64> res; - ui64 currentPartition = ReadParams.GetPartitioningParams().GetEachTopicPartitionGroupId(); do { res.emplace_back(currentPartition); // 0-based in topic API @@ -492,45 +547,25 @@ std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const { void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_I("TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); + SRC_LOG_I("Received TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", generation " << ev->Cookie); Counters.StartSessionAck++; - - ui64 partitionId = ev->Get()->Record.GetConsumer().GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); - if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() - << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); - YQL_ENSURE(State != EState::STARTED); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + auto* session = FindSession(ev); + if (!session) { return; } - auto& sessionInfo = sessionIt->second; - if (!CheckSession(sessionInfo, ev, partitionId)) { - return; - } - sessionInfo.IsWaitingStartSessionAck = false; + session->IsWaitingStartSessionAck = false; + ProcessState(); } void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_I("TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); + SRC_LOG_I("Received TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); Counters.SessionError++; - ui64 partitionId = ev->Get()->Record.GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); - if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() - << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); - YQL_ENSURE(State != EState::STARTED); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + auto* session = FindSession(ev); + if (!session) { return; } - - auto& sessionInfo = sessionIt->second; - if (!CheckSession(sessionInfo, ev, partitionId)) { - return; - } - NYql::TIssues issues; IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); Stop(ev->Get()->Record.GetStatusCode(), issues); @@ -538,28 +573,27 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_T("TEvStatistics from " << ev->Sender << ", offset " << ev->Get()->Record.GetNextMessageOffset() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); + SRC_LOG_T("Received TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie); Counters.Statistics++; - ui64 partitionId = ev->Get()->Record.GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); - if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() - << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); - YQL_ENSURE(State != EState::STARTED); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + auto* session = FindSession(ev); + if (!session) { return; } - auto& sessionInfo = sessionIt->second; IngressStats.Bytes += ev->Get()->Record.GetReadBytes(); - - if (!CheckSession(sessionInfo, ev, partitionId)) { - return; - } - - if (ReadyBuffer.empty()) { - TPartitionKey partitionKey{TString{}, partitionId}; - PartitionToOffset[partitionKey] = ev->Get()->Record.GetNextMessageOffset(); + for (auto partition : ev->Get()->Record.GetPartition()) { + ui64 partitionId = partition.GetPartitionId(); + auto& nextOffset = NextOffsetFromRD[partitionId]; + if (!nextOffset) { + nextOffset = partition.GetNextMessageOffset(); + } else { + nextOffset = std::max(*nextOffset, partition.GetNextMessageOffset()); + } + SRC_LOG_T("NextOffsetFromRD [" << partitionId << "]= " << nextOffset); + if (ReadyBuffer.empty()) { + TPartitionKey partitionKey{TString{}, partitionId}; + PartitionToOffset[partitionKey] = *nextOffset; + } } } @@ -571,35 +605,36 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_T("TEvNewDataArrived from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); + SRC_LOG_T("Received TEvNewDataArrived from " << ev->Sender << ", partition " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie); Counters.NewDataArrived++; - - ui64 partitionId = ev->Get()->Record.GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); - if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo() - << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); - YQL_ENSURE(State != EState::STARTED); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + + auto* session = FindSession(ev); + if (!session) { return; } - - auto& sessionInfo = sessionIt->second; - if (!CheckSession(sessionInfo, ev, partitionId)) { + auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId()); + if (partitionIt == session->Partitions.end()) { + SRC_LOG_E("TEvNewDataArrived: wrong partition id " << ev->Get()->Record.GetPartitionId()); + Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << ev->Get()->Record.GetPartitionId())}); return; } - sessionInfo.HasPendingData = true; - TrySendGetNextBatch(sessionInfo); + partitionIt->second.HasPendingData = true; + TrySendGetNextBatch(*session); } void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) { - SRC_LOG_D("TEvRetry"); + SRC_LOG_T("TEvRetry, EventQueueId " << ev->Get()->EventQueueId); Counters.Retry++; - ui64 partitionId = ev->Get()->EventQueueId; - auto sessionIt = Sessions.find(partitionId); + auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId); + if (readActorIt == ReadActorByEventQueueId.end()) { + SRC_LOG_D("Ignore TEvRetry, wrong EventQueueId " << ev->Get()->EventQueueId); + return; + } + + auto sessionIt = Sessions.find(readActorIt->second); if (sessionIt == Sessions.end()) { - SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvRetry"); + SRC_LOG_D("Ignore TEvRetry, wrong read actor id " << readActorIt->second); return; } sessionIt->second.EventsQueue.Retry(); @@ -608,35 +643,37 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::T void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) { SRC_LOG_T("TEvRetryQueuePrivate::TEvEvHeartbeat"); Counters.PrivateHeartbeat++; - ui64 partitionId = ev->Get()->EventQueueId; + auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId); + if (readActorIt == ReadActorByEventQueueId.end()) { + SRC_LOG_D("Ignore TEvRetry, wrong EventQueueId " << ev->Get()->EventQueueId); + return; + } - auto sessionIt = Sessions.find(partitionId); + auto sessionIt = Sessions.find(readActorIt->second); if (sessionIt == Sessions.end()) { - SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvPing"); + SRC_LOG_D("Ignore TEvRetry, wrong read actor id " << readActorIt->second); return; } auto& sessionInfo = sessionIt->second; bool needSend = sessionInfo.EventsQueue.Heartbeat(); if (needSend) { SRC_LOG_T("Send TEvEvHeartbeat"); - Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId), sessionInfo.Generation); + Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(), sessionInfo.Generation); } } void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) { SRC_LOG_T("Received TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie); Counters.Heartbeat++; - - ui64 partitionId = ev->Get()->Record.GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); + auto sessionIt = Sessions.find(ev->Sender); if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", PartitionId " << partitionId << ", generation " << ev->Cookie); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie); + SendNoSession(ev->Sender, ev->Cookie); return; } if (ev->Cookie != sessionIt->second.Generation) { - SendNoSession(ev->Sender, partitionId, ev->Cookie); + SendNoSession(ev->Sender, ev->Cookie); } } @@ -657,18 +694,25 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr CoordinatorActorId = ev->Get()->CoordinatorActorId; ReInit("Coordinator is changed"); - ProcessState(); + Sleep(); +} + +void TDqPqRdReadActor::Sleep() { + if (ProcessStateScheduled) { + return; + } + ProcessStateScheduled = true; + Schedule(TDuration::Seconds(SleepPeriodSec), new TEvPrivate::TEvProcessState()); } void TDqPqRdReadActor::ReInit(const TString& reason) { SRC_LOG_I("ReInit state, reason " << reason); - StopSessions(); - Sessions.clear(); - State = EState::INIT; + Metrics.ReInit->Inc(); + + State = EState::WAIT_COORDINATOR_ID; if (!ReadyBuffer.empty()) { NotifyCA(); } - PrintInternalState(); } void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issues) { @@ -677,28 +721,18 @@ void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issu } void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) { - SRC_LOG_I("TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie); + SRC_LOG_I("Received TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie); Counters.CoordinatorChanged++; if (ev->Cookie != CoordinatorRequestCookie) { SRC_LOG_W("Ignore TEvCoordinatorResult. wrong cookie"); return; } - if (State != EState::WAIT_PARTITIONS_ADDRES) { - SRC_LOG_W("Ignore TEvCoordinatorResult. wrong state " << static_cast<ui64>(EState::WAIT_PARTITIONS_ADDRES)); - return; - } + LastReceivedPartitionDistribution.clear(); + TMap<NActors::TActorId, TSet<ui32>> distribution; for (auto& p : ev->Get()->Record.GetPartitions()) { TActorId rowDispatcherActorId = ActorIdFromProto(p.GetActorId()); - for (auto partitionId : p.GetPartitionId()) { - if (Sessions.contains(partitionId)) { - Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue("Session already exists")}); - return; - } - SRC_LOG_I("Create session to RD (" << rowDispatcherActorId << "), partitionId " << partitionId); - Sessions.emplace( - std::piecewise_construct, - std::forward_as_tuple(partitionId), - std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, partitionId, partitionId, ++NextGeneration)); + for (auto partitionId : p.GetPartitionIds()) { + LastReceivedPartitionDistribution[rowDispatcherActorId].insert(partitionId); } } ProcessState(); @@ -707,7 +741,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& void TDqPqRdReadActor::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) { SRC_LOG_D("EvNodeConnected " << ev->Get()->NodeId); Counters.NodeConnected++; - for (auto& [partitionId, sessionInfo] : Sessions) { + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { sessionInfo.EventsQueue.HandleNodeConnected(ev->Get()->NodeId); } } @@ -715,7 +749,7 @@ void TDqPqRdReadActor::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { SRC_LOG_D("TEvNodeDisconnected, node id " << ev->Get()->NodeId); Counters.NodeDisconnected++; - for (auto& [partitionId, sessionInfo] : Sessions) { + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { sessionInfo.EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId); } // In case of row dispatcher disconnection: wait connected or SessionClosed(). TODO: Stop actor after timeout. @@ -724,63 +758,70 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected:: } void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << " generation " << ev->Cookie); + SRC_LOG_D("Received TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << ", reason " << ev->Get()->Reason); Counters.Undelivered++; - for (auto& [partitionId, sessionInfo] : Sessions) { - if (ev->Cookie != sessionInfo.Generation) { - continue; - } + + auto sessionIt = Sessions.find(ev->Sender); + if (sessionIt != Sessions.end()) { + auto& sessionInfo = sessionIt->second; if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { - ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId); - break; + if (sessionInfo.Generation == ev->Cookie) { + SRC_LOG_D("Erase session to " << ev->Sender.ToString()); + Sessions.erase(ev->Sender); + ReadActorByEventQueueId.erase(sessionInfo.EventQueueId); + ReInit("Reset session state"); + } } } if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) { ReInit("TEvUndelivered to coordinator"); + Sleep(); + return; } + ProcessState(); } void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_T("TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); + SRC_LOG_T("Received TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie); Counters.MessageBatch++; - ui64 partitionId = ev->Get()->Record.GetPartitionId(); - auto sessionIt = Sessions.find(partitionId); - if (sessionIt == Sessions.end()) { - SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() - << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); - YQL_ENSURE(State != EState::STARTED); - SendNoSession(ev->Sender, partitionId, ev->Cookie); + auto* session = FindSession(ev); + if (!session) { return; } - - Metrics.InFlyGetNextBatch->Set(0); - auto& sessionInfo = sessionIt->second; - if (!CheckSession(sessionInfo, ev, partitionId)) { + auto partitionId = ev->Get()->Record.GetPartitionId(); + auto partitionIt = session->Partitions.find(partitionId); + if (partitionIt == session->Partitions.end()) { + SRC_LOG_E("TEvMessageBatch: wrong partition id " << partitionId); + Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << partitionId)}); return; } + auto& partirtion = partitionIt->second; + Metrics.InFlyGetNextBatch->Set(0); ReadyBuffer.emplace(partitionId, ev->Get()->Record.MessagesSize()); TReadyBatch& activeBatch = ReadyBuffer.back(); + auto& nextOffset = NextOffsetFromRD[partitionId]; + ui64 bytes = 0; for (const auto& message : ev->Get()->Record.GetMessages()) { const auto& offsets = message.GetOffsets(); if (offsets.empty()) { - Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue("Got unexpected empty batch from row dispatcher")}); + Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "Got unexpected empty batch from row dispatcher")}); return; } activeBatch.Data.emplace_back(ev->Get()->GetPayload(message.GetPayloadId())); bytes += activeBatch.Data.back().GetSize(); - sessionInfo.NextOffset = *offsets.rbegin() + 1; - SRC_LOG_T("TEvMessageBatch NextOffset " << sessionInfo.NextOffset); + nextOffset = *offsets.rbegin() + 1; + SRC_LOG_T("TEvMessageBatch NextOffset " << nextOffset); } activeBatch.UsedSpace = bytes; ReadyBufferSizeBytes += bytes; activeBatch.NextOffset = ev->Get()->Record.GetNextMessageOffset(); - sessionInfo.IsWaitingMessageBatch = false; + partirtion.IsWaitingMessageBatch = false; NotifyCA(); } @@ -836,56 +877,80 @@ TString TDqPqRdReadActor::GetInternalState() { << " Heartbeat " << Counters.Heartbeat << " PrintState " << Counters.PrintState << " ProcessState " << Counters.ProcessState << " NotifyCA " << Counters.NotifyCA << "\n"; - for (auto& [partitionId, sessionInfo] : Sessions) { - str << " partId " << partitionId << " status " << static_cast<ui64>(sessionInfo.Status) - << " next offset " << sessionInfo.NextOffset - << " is waiting ack " << sessionInfo.IsWaitingStartSessionAck << " is waiting batch " << sessionInfo.IsWaitingMessageBatch - << " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " "; + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { + str << " " << rowDispatcherActorId << " status " << static_cast<ui64>(sessionInfo.Status) + << " is waiting ack " << sessionInfo.IsWaitingStartSessionAck << " connection id " << sessionInfo.Generation << " "; sessionInfo.EventsQueue.PrintInternalState(str); + for (const auto& [partitionId, partition] : sessionInfo.Partitions) { + const auto offsetIt = NextOffsetFromRD.find(partitionId); + str << " partId " << partitionId + << " next offset " << ((offsetIt != NextOffsetFromRD.end()) ? ToString(offsetIt->second) : TString("<empty>")) + << " is waiting batch " << partition.IsWaitingMessageBatch + << " has pending data " << partition.HasPendingData << "\n"; + } + str << "\n"; } return str.Str(); } void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) { + ProcessStateScheduled = false; Counters.ProcessState++; - Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState()); ProcessState(); } -void TDqPqRdReadActor::TrySendGetNextBatch(SessionInfo& sessionInfo) { - if (!sessionInfo.HasPendingData) { - return; - } +void TDqPqRdReadActor::TrySendGetNextBatch(TSession& sessionInfo) { if (ReadyBufferSizeBytes > MaxBufferSize) { return; } - Metrics.InFlyGetNextBatch->Inc(); - auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>(); - sessionInfo.HasPendingData = false; - sessionInfo.IsWaitingMessageBatch = true; - event->Record.SetPartitionId(sessionInfo.PartitionId); - sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation); + for (auto& [partitionId, partition] : sessionInfo.Partitions) { + if (!partition.HasPendingData) { + continue; + } + Metrics.InFlyGetNextBatch->Inc(); + auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>(); + partition.HasPendingData = false; + partition.IsWaitingMessageBatch = true; + event->Record.SetPartitionId(partitionId); + sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation); + } } template <class TEventPtr> -bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId) { +TDqPqRdReadActor::TSession* TDqPqRdReadActor::FindSession(const TEventPtr& ev) { + auto sessionIt = Sessions.find(ev->Sender); + if (sessionIt == Sessions.end()) { + SRC_LOG_W("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender); + SendNoSession(ev->Sender, ev->Cookie); + return nullptr; + } + auto& session = sessionIt->second; + if (ev->Cookie != session.Generation) { SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession"); - SendNoSession(ev->Sender, partitionId, ev->Cookie); - return false; + SendNoSession(ev->Sender, ev->Cookie); + return nullptr; } if (!session.EventsQueue.OnEventReceived(ev)) { const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); - SRC_LOG_W("Wrong seq num ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString()); - return false; + SRC_LOG_W("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender << ", wrong seq num, seqNo " << meta.GetSeqNo()); + return nullptr; + } + + auto expectedStatus = TSession::ESessionStatus::STARTED; + if constexpr (std::is_same_v<TEventPtr, NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr>) { + expectedStatus = TSession::ESessionStatus::WAIT_START_SESSION_ACK; } - return true; + + if (session.Status != expectedStatus) { + SRC_LOG_E("Wrong " << typeid(TEventPtr).name() << " from " << ev->Sender << " session status " << static_cast<ui64>(session.Status) << " expected " << static_cast<ui64>(expectedStatus)); + return nullptr; + } + return &session; } -void TDqPqRdReadActor::SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) { +void TDqPqRdReadActor::SendNoSession(const NActors::TActorId& recipient, ui64 cookie) { auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>(); - //*event->Record.MutableSource() = SourceParams; - event->Record.SetPartitionId(partitionId); Send(recipient, event.release(), 0, cookie); } @@ -896,6 +961,38 @@ void TDqPqRdReadActor::NotifyCA() { Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } +void TDqPqRdReadActor::UpdateSessions() { + SRC_LOG_I("UpdateSessions, Sessions size " << Sessions.size()); + + if (LastUsedPartitionDistribution != LastReceivedPartitionDistribution) { + SRC_LOG_I("Distribution is changed, remove sessions"); + for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) { + StopSession(sessionInfo); + ReadActorByEventQueueId.erase(sessionInfo.EventQueueId); + } + Sessions.clear(); + } + + for (const auto& [rowDispatcherActorId, partitions] : LastReceivedPartitionDistribution) { + if (Sessions.contains(rowDispatcherActorId)) { + continue; + } + + SRC_LOG_I("Create session to " << rowDispatcherActorId); + auto queueId = ++NextEventQueueId; + Sessions.emplace( + std::piecewise_construct, + std::forward_as_tuple(rowDispatcherActorId), + std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, queueId, ++NextGeneration)); + auto& session = Sessions.at(rowDispatcherActorId); + for (auto partitionId : partitions) { + session.Partitions[partitionId]; + } + ReadActorByEventQueueId[queueId] = rowDispatcherActorId; + } + LastUsedPartitionDistribution = LastReceivedPartitionDistribution; +} + std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor( const TTypeEnvironment& typeEnv, NPq::NProto::TDqPqTopicSource&& settings, diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 2784a4de697..5f50ea8e55b 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -16,7 +16,8 @@ namespace NYql::NDq { -const ui64 PartitionId = 666; +const ui64 PartitionId1 = 666; +const ui64 PartitionId2 = 667; struct TFixture : public TPqIoTestFixture { TFixture() { @@ -29,13 +30,14 @@ struct TFixture : public TPqIoTestFixture { void InitRdSource( const NYql::NPq::NProto::TDqPqTopicSource& settings, - i64 freeSpace = 1_MB) + i64 freeSpace = 1_MB, + ui64 partitionCount = 1) { CaSetup->Execute([&](TFakeActor& actor) { NPq::NProto::TDqReadTaskParams params; auto* partitioninigParams = params.MutablePartitioningParams(); - partitioninigParams->SetTopicPartitionsCount(1); - partitioninigParams->SetEachTopicPartitionGroupId(PartitionId); + partitioninigParams->SetTopicPartitionsCount(partitionCount); + partitioninigParams->SetEachTopicPartitionGroupId(PartitionId1); partitioninigParams->SetDqPartitionsCount(1); TString serializedParams; @@ -76,17 +78,21 @@ struct TFixture : public TPqIoTestFixture { return eventHolder; } - void ExpectStartSession(ui64 expectedOffset, NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) { + void ExpectStartSession(const TMap<ui32, ui64>& expectedOffsets, NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) { auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStartSession>(rowDispatcherId, TDuration::Seconds(5)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->Record.GetOffset() == expectedOffset); - UNIT_ASSERT(eventHolder->Cookie == expectedGeneration); + TMap<ui32, ui64> offsets; + for (auto p : eventHolder->Get()->Record.GetOffsets()) { + offsets[p.GetPartitionId()] = p.GetOffset(); + } + UNIT_ASSERT_EQUAL(offsets, expectedOffsets); + UNIT_ASSERT_EQUAL(eventHolder->Cookie, expectedGeneration); } void ExpectStopSession(NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) { auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(rowDispatcherId, TDuration::Seconds(5)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Cookie == expectedGeneration); + UNIT_ASSERT_EQUAL(eventHolder->Cookie, expectedGeneration); } void ExpectNoSession(NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) { @@ -95,10 +101,10 @@ struct TFixture : public TPqIoTestFixture { UNIT_ASSERT(eventHolder->Cookie == expectedGeneration); } - void ExpectGetNextBatch(NActors::TActorId rowDispatcherId) { + void ExpectGetNextBatch(NActors::TActorId rowDispatcherId, ui64 partitionId = PartitionId1) { auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvGetNextBatch>(rowDispatcherId, TDuration::Seconds(5)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == PartitionId); + UNIT_ASSERT_EQUAL(eventHolder->Get()->Record.GetPartitionId(), partitionId); } void MockCoordinatorChanged(NActors::TActorId coordinatorId) { @@ -108,20 +114,23 @@ struct TFixture : public TPqIoTestFixture { }); } - void MockCoordinatorResult(NActors::TActorId rowDispatcherId, ui64 cookie = 0) { + void MockCoordinatorResult(const TMap<NActors::TActorId, ui64> result, ui64 cookie = 0) { CaSetup->Execute([&](TFakeActor& actor) { auto event = new NFq::TEvRowDispatcher::TEvCoordinatorResult(); - auto* partitions = event->Record.AddPartitions(); - partitions->AddPartitionId(PartitionId); - ActorIdToProto(rowDispatcherId, partitions->MutableActorId()); + + for (const auto& [rowDispatcherId, partitionId] : result) { + auto* partitions = event->Record.AddPartitions(); + partitions->AddPartitionIds(partitionId); + ActorIdToProto(rowDispatcherId, partitions->MutableActorId()); + } CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, Coordinator1Id, event, 0, cookie)); }); } - void MockAck(NActors::TActorId rowDispatcherId, ui64 generation = 1) { + void MockAck(NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) { CaSetup->Execute([&](TFakeActor& actor) { NFq::NRowDispatcherProto::TEvStartSession proto; - proto.SetPartitionId(PartitionId); + proto.AddPartitionIds(partitionId); auto event = new NFq::TEvRowDispatcher::TEvStartSessionAck(proto); CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); }); @@ -129,15 +138,15 @@ struct TFixture : public TPqIoTestFixture { void MockHeartbeat(NActors::TActorId rowDispatcherId, ui64 generation = 1) { CaSetup->Execute([&](TFakeActor& actor) { - auto event = new NFq::TEvRowDispatcher::TEvHeartbeat(PartitionId); + auto event = new NFq::TEvRowDispatcher::TEvHeartbeat(); CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); }); } - void MockNewDataArrived(NActors::TActorId rowDispatcherId, ui64 generation = 1) { + void MockNewDataArrived(NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) { CaSetup->Execute([&](TFakeActor& actor) { auto event = new NFq::TEvRowDispatcher::TEvNewDataArrived(); - event->Record.SetPartitionId(PartitionId); + event->Record.SetPartitionId(partitionId); CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); }); } @@ -161,7 +170,7 @@ struct TFixture : public TPqIoTestFixture { } // Supported schema (Uint64, String) - void MockMessageBatch(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, ui64 generation = 1) { + void MockMessageBatch(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) { CaSetup->Execute([&](TFakeActor& actor) { auto event = new NFq::TEvRowDispatcher::TEvMessageBatch(); for (const auto& item : messages) { @@ -170,7 +179,7 @@ struct TFixture : public TPqIoTestFixture { message.AddOffsets(offset++); *event->Record.AddMessages() = message; } - event->Record.SetPartitionId(PartitionId); + event->Record.SetPartitionId(partitionId); event->Record.SetNextMessageOffset(offset); CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); }); @@ -181,11 +190,20 @@ struct TFixture : public TPqIoTestFixture { auto event = new NFq::TEvRowDispatcher::TEvSessionError(); event->Record.SetStatusCode(::NYql::NDqProto::StatusIds::BAD_REQUEST); IssueToMessage(TIssue("A problem has been detected and session has been shut down to prevent damage your life"), event->Record.AddIssues()); - event->Record.SetPartitionId(PartitionId); CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, 1)); }); } + void MockStatistics(NActors::TActorId rowDispatcherId, ui64 nextOffset, ui64 generation, ui64 partitionId) { + CaSetup->Execute([&](TFakeActor& actor) { + auto event = new NFq::TEvRowDispatcher::TEvStatistics(); + auto* partitionsProto = event->Record.AddPartition(); + partitionsProto->SetPartitionId(partitionId); + partitionsProto->SetNextMessageOffset(nextOffset); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); + }); + } + template<typename T> void AssertDataWithWatermarks( const std::vector<std::variant<T, TInstant>>& actual, @@ -227,35 +245,37 @@ struct TFixture : public TPqIoTestFixture { }); } - void MockUndelivered(ui64 generation = 0, NActors::TEvents::TEvUndelivered::EReason reason = NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { + void MockUndelivered(NActors::TActorId rowDispatcherId, ui64 generation = 0, NActors::TEvents::TEvUndelivered::EReason reason = NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { CaSetup->Execute([&](TFakeActor& actor) { auto event = new NActors::TEvents::TEvUndelivered(0, reason); - CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, generation)); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation)); }); } - void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB) { - InitRdSource(settings, freeSpace); + void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB, ui64 partitionCount = 1) { + InitRdSource(settings, freeSpace, partitionCount); SourceRead<std::pair<ui64, TString>>(UVPairParser); ExpectCoordinatorChangesSubscribe(); MockCoordinatorChanged(Coordinator1Id); - auto req =ExpectCoordinatorRequest(Coordinator1Id); + auto req = ExpectCoordinatorRequest(Coordinator1Id); - MockCoordinatorResult(RowDispatcher1, req->Cookie); - ExpectStartSession(0, RowDispatcher1); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcher1); MockAck(RowDispatcher1); } void ProcessSomeMessages(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, - std::function<std::vector<std::pair<ui64, TString>>(const NUdf::TUnboxedValue&)> uvParser = UVPairParser, ui64 generation = 1) { - MockNewDataArrived(rowDispatcherId, generation); - ExpectGetNextBatch(rowDispatcherId); - - MockMessageBatch(offset, messages, rowDispatcherId, generation); + std::function<std::vector<std::pair<ui64, TString>>(const NUdf::TUnboxedValue&)> uvParser = UVPairParser, ui64 generation = 1, + ui64 partitionId = PartitionId1, bool readedByCA = true) { + MockNewDataArrived(rowDispatcherId, generation, partitionId); + ExpectGetNextBatch(rowDispatcherId, partitionId); - auto result = SourceReadDataUntil<std::pair<ui64, TString>>(uvParser, messages.size()); - AssertDataWithWatermarks(result, messages, {}); + MockMessageBatch(offset, messages, rowDispatcherId, generation, partitionId); + if (readedByCA) { + auto result = SourceReadDataUntil<std::pair<ui64, TString>>(uvParser, messages.size()); + AssertDataWithWatermarks(result, messages, {}); + } } const std::pair<ui64, TString> Message1 = {100, "value1"}; @@ -273,7 +293,7 @@ struct TFixture : public TPqIoTestFixture { }; Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { - Y_UNIT_TEST_F(TestReadFromTopic, TFixture) { + Y_UNIT_TEST_F(TestReadFromTopic2, TFixture) { StartSession(Source1); ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1); } @@ -281,7 +301,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { Y_UNIT_TEST_F(IgnoreUndeliveredWithWrongGeneration, TFixture) { StartSession(Source1); ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1); - MockUndelivered(NActors::TEvents::TEvUndelivered::Disconnected); + MockUndelivered(RowDispatcher1, NActors::TEvents::TEvUndelivered::Disconnected); ProcessSomeMessages(2, {Message3}, RowDispatcher1); } @@ -338,7 +358,6 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { { TFixture f; f.InitRdSource(f.Source1); - f.SourceRead<std::pair<ui64, TString>>(UVPairParser); f.LoadSource(state); f.SourceRead<std::pair<ui64, TString>>(UVPairParser); f.ExpectCoordinatorChangesSubscribe(); @@ -346,8 +365,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.MockCoordinatorChanged(f.Coordinator1Id); auto req = f.ExpectCoordinatorRequest(f.Coordinator1Id); - f.MockCoordinatorResult(f.RowDispatcher1, req->Cookie); - f.ExpectStartSession(2, f.RowDispatcher1); + f.MockCoordinatorResult({{f.RowDispatcher1, PartitionId1}}, req->Cookie); + f.ExpectStartSession({{PartitionId1, 2}}, f.RowDispatcher1); f.MockAck(f.RowDispatcher1); f.ProcessSomeMessages(2, {f.Message3}, f.RowDispatcher1); // offsets: 2 @@ -358,7 +377,6 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { { TFixture f; f.InitRdSource(f.Source1); - f.SourceRead<std::pair<ui64, TString>>(UVPairParser); f.LoadSource(state); f.SourceRead<std::pair<ui64, TString>>(UVPairParser); f.ExpectCoordinatorChangesSubscribe(); @@ -366,8 +384,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.MockCoordinatorChanged(f.Coordinator1Id); auto req = f.ExpectCoordinatorRequest(f.Coordinator1Id); - f.MockCoordinatorResult(f.RowDispatcher1, req->Cookie); - f.ExpectStartSession(3, f.RowDispatcher1); + f.MockCoordinatorResult({{f.RowDispatcher1, PartitionId1}}, req->Cookie); + f.ExpectStartSession({{PartitionId1, 3}}, f.RowDispatcher1); f.MockAck(f.RowDispatcher1); f.ProcessSomeMessages(3, {f.Message4}, f.RowDispatcher1); // offsets: 3 @@ -381,21 +399,30 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { // change active Coordinator MockCoordinatorChanged(Coordinator2Id); - ExpectStopSession(RowDispatcher1); - - auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1); - AssertDataWithWatermarks(result, {Message3}, {}); + // continue use old sessions + MockMessageBatch(3, {Message4}, RowDispatcher1); auto req = ExpectCoordinatorRequest(Coordinator2Id); - MockCoordinatorResult(RowDispatcher2, req->Cookie); - ExpectStartSession(3, RowDispatcher2, 2); + auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 2); + AssertDataWithWatermarks(result, {Message3, Message4}, {}); + + MockCoordinatorResult({{RowDispatcher2, PartitionId1}}, req->Cookie); // change distribution + ExpectStopSession(RowDispatcher1); + + ExpectStartSession({{PartitionId1, 4}}, RowDispatcher2, 2); MockAck(RowDispatcher2, 2); - ProcessSomeMessages(3, {Message4}, RowDispatcher2, UVPairParser, 2); + ProcessSomeMessages(4, {Message4}, RowDispatcher2, UVPairParser, 2); MockHeartbeat(RowDispatcher1, 1); // old generation ExpectNoSession(RowDispatcher1, 1); + + // change active Coordinator + MockCoordinatorChanged(Coordinator1Id); + req = ExpectCoordinatorRequest(Coordinator1Id); + MockCoordinatorResult({{RowDispatcher2, PartitionId1}}, req->Cookie); // distribution is not changed + ProcessSomeMessages(5, {Message2}, RowDispatcher2, UVPairParser, 2); } Y_UNIT_TEST_F(Backpressure, TFixture) { @@ -406,7 +433,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { MockNewDataArrived(RowDispatcher1); ExpectGetNextBatch(RowDispatcher1); - MockMessageBatch(0, {message, message, message}, RowDispatcher1); + MockMessageBatch(1, {message, message, message}, RowDispatcher1); MockNewDataArrived(RowDispatcher1); ASSERT_THROW( @@ -417,26 +444,64 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { AssertDataWithWatermarks(result, {message, message, message}, {}); ExpectGetNextBatch(RowDispatcher1); - MockMessageBatch(3, {Message1}, RowDispatcher1); + MockMessageBatch(4, {Message1}, RowDispatcher1); result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1); AssertDataWithWatermarks(result, {Message1}, {}); } - Y_UNIT_TEST_F(RowDispatcherIsRestarted, TFixture) { + Y_UNIT_TEST_F(RowDispatcherIsRestarted2, TFixture) { StartSession(Source1); ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1); MockDisconnected(); MockConnected(); - MockUndelivered(1); + MockUndelivered(RowDispatcher1, 1); auto req = ExpectCoordinatorRequest(Coordinator1Id); - MockCoordinatorResult(RowDispatcher1, req->Cookie); - ExpectStartSession(2, RowDispatcher1, 2); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie); + ExpectStartSession({{PartitionId1, 2}}, RowDispatcher1, 2); MockAck(RowDispatcher1, 2); ProcessSomeMessages(2, {Message3}, RowDispatcher1, UVPairParser, 2); } + Y_UNIT_TEST_F(TwoPartitionsRowDispatcherIsRestarted, TFixture) { + InitRdSource(Source1, 1_MB, PartitionId2 + 1); + SourceRead<TString>(UVParser); + ExpectCoordinatorChangesSubscribe(); + MockCoordinatorChanged(Coordinator1Id); + auto req = ExpectCoordinatorRequest(Coordinator1Id); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}, {RowDispatcher2, PartitionId2}}, req->Cookie); + ExpectStartSession({}, RowDispatcher1, 1); + ExpectStartSession({}, RowDispatcher2, 2); + MockAck(RowDispatcher1, 1, PartitionId1); + MockAck(RowDispatcher2, 2, PartitionId2); + + ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1, UVPairParser, 1, PartitionId1); + ProcessSomeMessages(0, {Message3}, RowDispatcher2, UVPairParser, 2, PartitionId2, false); // not read by CA + MockStatistics(RowDispatcher2, 10, 2, PartitionId2); + + // Restart node 2 (RowDispatcher2) + MockDisconnected(); + MockConnected(); + MockUndelivered(RowDispatcher2, 2); + + // session1 is still working + ProcessSomeMessages(2, {Message4}, RowDispatcher1, UVPairParser, 1, PartitionId1, false); + + // Reinit session to RowDispatcher2 + auto req2 = ExpectCoordinatorRequest(Coordinator1Id); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}, {RowDispatcher2, PartitionId2}}, req2->Cookie); + ExpectStartSession({{PartitionId2, 10}}, RowDispatcher2, 3); + MockAck(RowDispatcher2, 3, PartitionId2); + + ProcessSomeMessages(3, {Message4}, RowDispatcher1, UVPairParser, 1, PartitionId1, false); + ProcessSomeMessages(10, {Message4}, RowDispatcher2, UVPairParser, 3, PartitionId2, false); + + std::vector<std::pair<ui64, TString>> expected{Message3, Message4, Message4, Message4}; + auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, expected.size()); + AssertDataWithWatermarks(result, expected, {}); + } + Y_UNIT_TEST_F(IgnoreMessageIfNoSessions, TFixture) { StartSession(Source1); MockCoordinatorChanged(Coordinator2Id); @@ -463,12 +528,13 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { MockCoordinatorChanged(Coordinator2Id); auto req = ExpectCoordinatorRequest(Coordinator2Id); + MockUndelivered(RowDispatcher1, 1); + auto req2 = ExpectCoordinatorRequest(Coordinator2Id); - MockUndelivered(1); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie); + MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req2->Cookie); + ExpectStartSession({{PartitionId1, 2}}, RowDispatcher1, 2); - MockCoordinatorResult(RowDispatcher1, req->Cookie); - MockCoordinatorResult(RowDispatcher1, req->Cookie); - ExpectStartSession(2, RowDispatcher1, 2); MockAck(RowDispatcher1); } } |
