summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <[email protected]>2025-01-21 12:58:48 +0300
committerGitHub <[email protected]>2025-01-21 12:58:48 +0300
commit7ff68eac6fb5d12e6f6ece19086f5e45f89754fa (patch)
tree68d8faa628b1ce5c2327f8807b19cf16f73f353e
parent623de68f0102c414de61c1ae6656ae43722b9e46 (diff)
YQ-3893 Use one session read_actor <-> RD (#12247)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/coordinator.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/events/data_plane.h24
-rw-r--r--ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h5
-rw-r--r--ydb/core/fq/libs/row_dispatcher/leader_election.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/probes.h20
-rw-r--r--ydb/core/fq/libs/row_dispatcher/protos/events.proto29
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp398
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp110
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp127
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp50
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp561
-rw-r--r--ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp188
12 files changed, 857 insertions, 667 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
index e258dd94582..c891c3b83d1 100644
--- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
@@ -409,7 +409,7 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
UpdateInterconnectSessions(ev->InterconnectSession);
TStringStream str;
- LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId()));
+ LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionIds()));
Metrics.IncomingRequests->Inc();
TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
@@ -430,7 +430,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
- for (auto& partitionId : request.Record.GetPartitionId()) {
+ for (auto& partitionId : request.Record.GetPartitionIds()) {
TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()};
TPartitionKey key {topicKey, partitionId};
auto locationIt = PartitionLocations.find(key);
@@ -457,7 +457,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
auto* partitionsProto = response->Record.AddPartitions();
ActorIdToProto(actorId, partitionsProto->MutableActorId());
for (auto partitionId : partitions) {
- partitionsProto->AddPartitionId(partitionId);
+ partitionsProto->AddPartitionIds(partitionId);
}
}
diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
index 859bf8d6234..074a0a35a00 100644
--- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
+++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h
@@ -3,7 +3,6 @@
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/core/fq/libs/events/event_subspace.h>
-
#include <ydb/core/fq/libs/row_dispatcher/protos/events.pb.h>
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h>
@@ -11,6 +10,9 @@
#include <yql/essentials/public/issue/yql_issue.h>
#include <yql/essentials/public/purecalc/common/fwd.h>
+#include <util/generic/set.h>
+#include <util/generic/map.h>
+
namespace NFq {
NActors::TActorId RowDispatcherServiceActorId();
@@ -75,7 +77,7 @@ struct TEvRowDispatcher {
const std::vector<ui64>& partitionIds) {
*Record.MutableSource() = sourceParams;
for (const auto& id : partitionIds) {
- Record.AddPartitionId(id);
+ Record.AddPartitionIds(id);
}
}
};
@@ -93,16 +95,20 @@ struct TEvRowDispatcher {
TEvStartSession() = default;
TEvStartSession(
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams,
- ui64 partitionId,
+ const std::set<ui32>& partitionIds,
const TString token,
- TMaybe<ui64> readOffset,
+ const std::map<ui32, ui64>& readOffsets,
ui64 startingMessageTimestampMs,
const TString& queryId) {
*Record.MutableSource() = sourceParams;
- Record.SetPartitionId(partitionId);
+ for (auto partitionId : partitionIds) {
+ Record.AddPartitionIds(partitionId);
+ }
Record.SetToken(token);
- if (readOffset) {
- Record.SetOffset(*readOffset);
+ for (const auto& [partitionId, offset] : readOffsets) {
+ auto* partitionOffset = Record.AddOffsets();
+ partitionOffset->SetPartitionId(partitionId);
+ partitionOffset->SetOffset(offset);
}
Record.SetStartingMessageTimestampMs(startingMessageTimestampMs);
Record.SetQueryId(queryId);
@@ -143,7 +149,6 @@ struct TEvRowDispatcher {
struct TEvStatistics : public NActors::TEventPB<TEvStatistics,
NFq::NRowDispatcherProto::TEvStatistics, EEv::EvStatistics> {
TEvStatistics() = default;
- NActors::TActorId ReadActorId;
};
struct TEvSessionError : public NActors::TEventPB<TEvSessionError,
@@ -162,9 +167,6 @@ struct TEvRowDispatcher {
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
TEvHeartbeat() = default;
- TEvHeartbeat(ui32 partitionId) {
- Record.SetPartitionId(partitionId);
- }
};
struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> {
diff --git a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
index 707c014618d..4c5e7f427f7 100644
--- a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
+++ b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
@@ -11,7 +11,8 @@ struct TTopicSessionClientStatistic {
i64 UnreadRows = 0; // Current value
i64 UnreadBytes = 0; // Current value
ui64 Offset = 0; // Current value
- ui64 ReadBytes = 0; // Increment / filtered
+ ui64 FilteredReadBytes = 0; // Increment / filtered
+ ui64 ReadBytes = 0; // Increment
bool IsWaiting = false; // Current value
i64 ReadLagMessages = 0; // Current value
ui64 InitialOffset = 0;
@@ -19,12 +20,14 @@ struct TTopicSessionClientStatistic {
UnreadRows = stat.UnreadRows;
UnreadBytes = stat.UnreadBytes;
Offset = stat.Offset;
+ FilteredReadBytes += stat.FilteredReadBytes;
ReadBytes += stat.ReadBytes;
IsWaiting = stat.IsWaiting;
ReadLagMessages = stat.ReadLagMessages;
InitialOffset = stat.InitialOffset;
}
void Clear() {
+ FilteredReadBytes = 0;
ReadBytes = 0;
}
};
diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
index 9d267183c0f..2ba97f2cc45 100644
--- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
@@ -77,12 +77,12 @@ struct TLeaderElectionMetrics {
explicit TLeaderElectionMetrics(const ::NMonitoring::TDynamicCounterPtr& counters)
: Counters(counters) {
Errors = Counters->GetCounter("LeaderElectionErrors", true);
- LeaderChangedCount = Counters->GetCounter("LeaderElectionChangedCount");
+ LeaderChanged = Counters->GetCounter("LeaderChanged", true);
}
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr Errors;
- ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged;
};
class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
@@ -458,7 +458,7 @@ void TLeaderElection::Handle(TEvPrivate::TEvDescribeSemaphoreResult::TPtr& ev) {
if (!LeaderActorId || (*LeaderActorId != id)) {
LOG_ROW_DISPATCHER_INFO("Send TEvCoordinatorChanged to " << ParentId);
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(id, generation));
- Metrics.LeaderChangedCount->Inc();
+ Metrics.LeaderChanged->Inc();
}
LeaderActorId = id;
}
diff --git a/ydb/core/fq/libs/row_dispatcher/probes.h b/ydb/core/fq/libs/row_dispatcher/probes.h
index 2d00e21c089..0e385c4066f 100644
--- a/ydb/core/fq/libs/row_dispatcher/probes.h
+++ b/ydb/core/fq/libs/row_dispatcher/probes.h
@@ -41,8 +41,8 @@
NAMES("sender", "coordinatorGeneration", "coordinatorActor")) \
PROBE(StartSession, \
GROUPS(), \
- TYPES(TString, ui32, TString, ui64), \
- NAMES("sender", "partitionId", "queryId", "size")) \
+ TYPES(TString, TString, ui64), \
+ NAMES("sender", "queryId", "size")) \
PROBE(GetNextBatch, \
GROUPS(), \
TYPES(TString, ui32, TString, ui64), \
@@ -53,16 +53,16 @@
NAMES("sender", "partitionId", "queryId", "size")) \
PROBE(StopSession, \
GROUPS(), \
- TYPES(TString, ui32, TString, ui64), \
- NAMES("sender", "partitionId", "queryId", "size")) \
+ TYPES(TString, TString, ui64), \
+ NAMES("sender", "queryId", "size")) \
PROBE(TryConnect, \
GROUPS(), \
TYPES(TString, ui32), \
NAMES("sender", "nodeId")) \
PROBE(PrivateHeartbeat, \
GROUPS(), \
- TYPES(TString, ui32, TString, ui64), \
- NAMES("sender", "partitionId", "queryId", "generation")) \
+ TYPES(TString, TString, ui64), \
+ NAMES("sender", "queryId", "generation")) \
PROBE(NewDataArrived, \
GROUPS(), \
TYPES(TString, TString, TString, ui64, ui64), \
@@ -73,12 +73,12 @@
NAMES("sender", "readActor", "queryId", "generation", "size")) \
PROBE(SessionError, \
GROUPS(), \
- TYPES(TString, TString, ui32, TString, ui64, ui64), \
- NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \
+ TYPES(TString, TString, TString, ui64, ui64), \
+ NAMES("sender", "readActor", "queryId", "generation", "size")) \
PROBE(Statistics, \
GROUPS(), \
- TYPES(TString, TString, ui32, TString, ui64, ui64), \
- NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \
+ TYPES(TString, TString, ui64, ui64), \
+ NAMES("readActor", "queryId", "generation", "size")) \
PROBE(UpdateMetrics, \
GROUPS(), \
TYPES(), \
diff --git a/ydb/core/fq/libs/row_dispatcher/protos/events.proto b/ydb/core/fq/libs/row_dispatcher/protos/events.proto
index 5d56a866811..f8dcbcf07e2 100644
--- a/ydb/core/fq/libs/row_dispatcher/protos/events.proto
+++ b/ydb/core/fq/libs/row_dispatcher/protos/events.proto
@@ -11,25 +11,34 @@ import "ydb/public/api/protos/ydb_issue_message.proto";
message TEvGetAddressRequest {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
- repeated uint32 PartitionId = 2;
+ repeated uint32 PartitionId = 2 [deprecated=true];
+ repeated uint32 PartitionIds = 3;
}
message TEvPartitionAddress {
- repeated uint32 PartitionId = 1;
+ repeated uint32 PartitionId = 1 [deprecated=true];
NActorsProto.TActorId ActorId = 2;
+ repeated uint32 PartitionIds = 3;
}
message TEvGetAddressResponse {
repeated TEvPartitionAddress Partitions = 1;
}
+message TPartitionOffset {
+ uint32 PartitionId = 1;
+ uint64 Offset = 2;
+}
+
message TEvStartSession {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
- uint32 PartitionId = 2;
+ uint32 PartitionId = 2 [deprecated=true];
string Token = 3;
- optional uint64 Offset = 4;
+ optional uint64 Offset = 4 [deprecated=true];
uint64 StartingMessageTimestampMs = 5;
string QueryId = 6;
+ repeated uint32 PartitionIds = 7;
+ repeated TPartitionOffset Offsets = 8;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
@@ -50,7 +59,7 @@ message TEvNewDataArrived {
message TEvStopSession {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
- uint32 PartitionId = 2;
+ uint32 PartitionId = 2 [deprecated=true];
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
@@ -68,16 +77,22 @@ message TEvMessageBatch {
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
-message TEvStatistics {
+message TPartitionStatistics {
uint32 PartitionId = 1;
uint64 NextMessageOffset = 2;
+}
+
+message TEvStatistics {
+ uint32 PartitionId = 1; // deprecated
+ uint64 NextMessageOffset = 2; // deprecated
uint64 ReadBytes = 3;
+ repeated TPartitionStatistics Partition = 4;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
message TEvSessionError {
reserved 1;
- uint32 PartitionId = 2;
+ uint32 PartitionId = 2 [deprecated=true];
NYql.NDqProto.StatusIds.StatusCode StatusCode = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 935f3c0a04f..9d3daba2ead 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -63,6 +63,7 @@ struct TEvPrivate {
EvUpdateMetrics,
EvPrintStateToLog,
EvTryConnect,
+ EvSendStatistic,
EvEnd
};
@@ -75,6 +76,7 @@ struct TEvPrivate {
: NodeId(nodeId) {}
ui32 NodeId = 0;
};
+ struct TEvSendStatistic : public NActors::TEventLocal<TEvSendStatistic, EvSendStatistic> {};
};
struct TQueryStatKey {
@@ -98,14 +100,14 @@ struct TQueryStatKeyHash {
};
struct TAggQueryStat {
- NYql::TCounters::TEntry ReadBytes;
+ NYql::TCounters::TEntry FilteredReadBytes;
NYql::TCounters::TEntry UnreadBytes;
NYql::TCounters::TEntry UnreadRows;
NYql::TCounters::TEntry ReadLagMessages;
bool IsWaiting = false;
void Add(const TTopicSessionClientStatistic& stat) {
- ReadBytes.Add(NYql::TCounters::TEntry(stat.ReadBytes));
+ FilteredReadBytes.Add(NYql::TCounters::TEntry(stat.FilteredReadBytes));
UnreadBytes.Add(NYql::TCounters::TEntry(stat.UnreadBytes));
UnreadRows.Add(NYql::TCounters::TEntry(stat.UnreadRows));
ReadLagMessages.Add(NYql::TCounters::TEntry(stat.ReadLagMessages));
@@ -120,30 +122,7 @@ ui64 MaxSessionBufferSizeBytes = 16000000;
class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
- struct ConsumerSessionKey {
- TActorId ReadActorId;
- ui32 PartitionId;
-
- size_t Hash() const noexcept {
- ui64 hash = std::hash<TActorId>()(ReadActorId);
- hash = CombineHashes<ui64>(hash, std::hash<ui32>()(PartitionId));
- return hash;
- }
- bool operator==(const ConsumerSessionKey& other) const {
- return ReadActorId == other.ReadActorId && PartitionId == other.PartitionId;
- }
- TString ToString() const {
- return TString::Join(ReadActorId.ToString(), "/", ::ToString(PartitionId));
- }
- };
-
- struct ConsumerSessionKeyHash {
- int operator()(const ConsumerSessionKey& k) const {
- return k.Hash();
- }
- };
-
- struct TopicSessionKey {
+ struct TTopicSessionKey {
TString ReadGroup;
TString Endpoint;
TString Database;
@@ -158,14 +137,14 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId));
return hash;
}
- bool operator==(const TopicSessionKey& other) const {
+ bool operator==(const TTopicSessionKey& other) const {
return ReadGroup == other.ReadGroup && Endpoint == other.Endpoint && Database == other.Database
&& TopicPath == other.TopicPath && PartitionId == other.PartitionId;
}
};
- struct TopicSessionKeyHash {
- int operator()(const TopicSessionKey& k) const {
+ struct TTopicSessionKeyHash {
+ int operator()(const TTopicSessionKey& k) const {
return k.Hash();
}
};
@@ -295,27 +274,32 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
TNodesTracker NodesTracker;
TAggregatedStats AggrStats;
- struct ConsumerCounters {
+ struct TConsumerCounters {
ui64 NewDataArrived = 0;
ui64 GetNextBatch = 0;
ui64 MessageBatch = 0;
};
- struct ConsumerInfo {
- ConsumerInfo(
+ struct TConsumerPartition {
+ bool PendingGetNextBatch = false;
+ bool PendingNewDataArrived = false;
+ TActorId TopicSessionId;
+ TTopicSessionClientStatistic Stat;
+ bool StatisticsUpdated = false;
+ };
+
+ struct TConsumerInfo {
+ TConsumerInfo(
NActors::TActorId readActorId,
NActors::TActorId selfId,
ui64 eventQueueId,
NFq::NRowDispatcherProto::TEvStartSession& proto,
- TActorId topicSessionId,
bool alreadyConnected,
ui64 generation)
: ReadActorId(readActorId)
, SourceParams(proto.GetSource())
- , PartitionId(proto.GetPartitionId())
, EventQueueId(eventQueueId)
, Proto(proto)
- , TopicSessionId(topicSessionId)
, QueryId(proto.GetQueryId())
, Generation(generation) {
EventsQueue.Init("txId", selfId, selfId, eventQueueId, /* KeepAlive */ true, /* UseConnect */ false);
@@ -324,39 +308,36 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
NActors::TActorId ReadActorId;
NYql::NPq::NProto::TDqPqTopicSource SourceParams;
- ui64 PartitionId;
NYql::NDq::TRetryEventsQueue EventsQueue;
ui64 EventQueueId;
NFq::NRowDispatcherProto::TEvStartSession Proto;
- TActorId TopicSessionId;
+ THashMap<ui32, TConsumerPartition> Partitions;
const TString QueryId;
- ConsumerCounters Counters;
- bool PendingGetNextBatch = false;
- bool PendingNewDataArrived = false;
+ TConsumerCounters Counters;
TTopicSessionClientStatistic Stat;
ui64 Generation;
};
- struct SessionInfo {
- TMap<TActorId, TAtomicSharedPtr<ConsumerInfo>> Consumers; // key - ReadActor actor id
+ struct TSessionInfo {
+ TMap<TActorId, TAtomicSharedPtr<TConsumerInfo>> Consumers; // key - ReadActor actor id
TTopicSessionCommonStatistic Stat; // Increments
NYql::TCounters::TEntry AggrReadBytes;
};
- struct TopicSessionInfo {
- TMap<TActorId, SessionInfo> Sessions; // key - TopicSession actor id
+ struct TTopicSessionInfo {
+ TMap<TActorId, TSessionInfo> Sessions; // key - TopicSession actor id
};
- struct ReadActorInfo {
+ struct TReadActorInfo {
TString InternalState;
TInstant RequestTime;
TInstant ResponseTime;
};
- THashMap<ConsumerSessionKey, TAtomicSharedPtr<ConsumerInfo>, ConsumerSessionKeyHash> Consumers;
- TMap<ui64, TAtomicSharedPtr<ConsumerInfo>> ConsumersByEventQueueId;
- THashMap<TopicSessionKey, TopicSessionInfo, TopicSessionKeyHash> TopicSessions;
- TMap<TActorId, ReadActorInfo> ReadActorsInternalState;
+ THashMap<NActors::TActorId, TAtomicSharedPtr<TConsumerInfo>> Consumers; // key - read actor id
+ TMap<ui64, TAtomicSharedPtr<TConsumerInfo>> ConsumersByEventQueueId;
+ THashMap<TTopicSessionKey, TTopicSessionInfo, TTopicSessionKeyHash> TopicSessions;
+ TMap<TActorId, TReadActorInfo> ReadActorsInternalState;
public:
explicit TRowDispatcher(
@@ -389,7 +370,6 @@ public:
void Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
- void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev);
@@ -400,15 +380,16 @@ public:
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
+ void Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&);
void Handle(const NMon::TEvHttpInfo::TPtr&);
- void DeleteConsumer(const ConsumerSessionKey& key);
+ void DeleteConsumer(NActors::TActorId readActorId);
void UpdateMetrics();
TString GetInternalState();
TString GetReadActorsInternalState();
void UpdateReadActorsInternalState();
template <class TEventPtr>
- bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev);
+ bool CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev);
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
void PrintStateToLog();
@@ -427,7 +408,6 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvStopSession, Handle);
hFunc(NFq::TEvRowDispatcher::TEvNoSession, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
- hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateResponse, Handle);
hFunc(TEvPrivate::TEvTryConnect, Handle);
@@ -436,6 +416,7 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
+ hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
})
};
@@ -479,6 +460,7 @@ void TRowDispatcher::Bootstrap() {
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
+ Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatistic());
if (Monitoring) {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(FQ_ROW_DISPATCHER_PROVIDER));
@@ -529,7 +511,7 @@ void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TP
void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LWPROBE(UndeliveredStart, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie);
- LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason);
+ LOG_ROW_DISPATCHER_TRACE("TEvUndelivered, from " << ev->Sender << ", reason " << ev->Get()->Reason);
for (auto& [key, consumer] : Consumers) {
if (ev->Cookie != consumer->Generation) { // Several partitions in one read_actor have different Generation.
LWPROBE(UndeliveredSkipGeneration, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie, consumer->Generation);
@@ -537,7 +519,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
}
if (consumer->EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
LWPROBE(UndeliveredDeleteConsumer, ev->Sender.ToString(), ev->Get()->Reason, ev->Cookie, key.ToString());
- DeleteConsumer(key);
+ DeleteConsumer(ev->Sender);
break;
}
}
@@ -648,6 +630,7 @@ TString TRowDispatcher::GetInternalState() {
auto printDataRate = [&](NYql::TCounters::TEntry entry) {
str << " (sum " << toHumanDR(entry.Sum) << " max " << toHumanDR(entry.Max) << " min " << toHumanDR(entry.Min) << ")";
};
+ str << "SelfId: " << SelfId().ToString() << "\n";
str << "Consumers count: " << Consumers.size() << "\n";
str << "TopicSessions count: " << TopicSessions.size() << "\n";
str << "Max session buffer size: " << toHuman(MaxSessionBufferSizeBytes) << "\n";
@@ -682,7 +665,7 @@ TString TRowDispatcher::GetInternalState() {
auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0;
str << " " << queryId << " / " << readGroup << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate";
if (aggStat) {
- printDataRate(aggStat->ReadBytes);
+ printDataRate(aggStat->FilteredReadBytes);
}
str << " waiting " << stat.IsWaiting << " max read lag " << stat.ReadLagMessages.Max;
str << "\n";
@@ -704,23 +687,33 @@ TString TRowDispatcher::GetInternalState() {
}
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
+ const auto& partition = consumer->Partitions[key.PartitionId];
str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes "
<< toHuman(consumer->Stat.UnreadBytes) << " (" << leftPad(consumer->Stat.UnreadRows) << " rows) "
<< " offset " << leftPad(consumer->Stat.Offset) << " init offset " << leftPad(consumer->Stat.InitialOffset)
<< " get " << leftPad(consumer->Counters.GetNextBatch)
<< " arr " << leftPad(consumer->Counters.NewDataArrived) << " btc " << leftPad(consumer->Counters.MessageBatch)
- << " pend get " << leftPad(consumer->PendingGetNextBatch) << " pend new " << leftPad(consumer->PendingNewDataArrived)
+ << " pend get " << leftPad(partition.PendingGetNextBatch) << " pend new " << leftPad(partition.PendingNewDataArrived)
<< " waiting " << consumer->Stat.IsWaiting << " read lag " << leftPad(consumer->Stat.ReadLagMessages)
- << " conn id " << consumer->Generation << " ";
- str << " retry queue: ";
- consumer->EventsQueue.PrintInternalState(str);
-
+ << " conn id " << consumer->Generation << "\n";
maxInitialOffset = std::max(maxInitialOffset, consumer->Stat.InitialOffset);
minInitialOffset = std::min(minInitialOffset, consumer->Stat.InitialOffset);
}
str << " initial offset max " << leftPad(maxInitialOffset) << " min " << leftPad(minInitialOffset) << "\n";;
}
}
+
+ str << "Consumers:\n";
+ for (auto& [readActorId, consumer] : Consumers) {
+ str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " Generation " << consumer->Generation << "\n";
+ str << " partitions: ";
+ for (const auto& [partitionId, info] : consumer->Partitions) {
+ str << partitionId << ",";
+ }
+ str << "\n retry queue: ";
+ consumer->EventsQueue.PrintInternalState(str);
+ }
+
return str.Str();
}
@@ -734,8 +727,8 @@ TString TRowDispatcher::GetReadActorsInternalState() {
void TRowDispatcher::UpdateReadActorsInternalState() {
TSet<TActorId> ReadActors;
- for (const auto& [key, _]: Consumers) {
- ReadActors.insert(key.ReadActorId);
+ for (const auto& [readActorId, _]: Consumers) {
+ ReadActors.insert(readActorId);
}
for(auto it = ReadActorsInternalState.begin(); it != ReadActorsInternalState.end();) {
@@ -759,21 +752,15 @@ void TRowDispatcher::UpdateReadActorsInternalState() {
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
- " part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
+ " part id " << JoinSeq(',', ev->Get()->Record.GetPartitionIds()) << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup()));
topicGroup->GetCounter("StartSession", true)->Inc();
- LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong());
+ LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong());
NodesTracker.AddNode(ev->Sender.NodeId());
- TMaybe<ui64> readOffset;
- if (ev->Get()->Record.HasOffset()) {
- readOffset = ev->Get()->Record.GetOffset();
- }
-
- ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Sender);
if (it != Consumers.end()) {
if (ev->Cookie <= it->second->Generation) {
LOG_ROW_DISPATCHER_WARN("Consumer already exists, ignore StartSession");
@@ -781,80 +768,90 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
}
LOG_ROW_DISPATCHER_WARN("Consumer already exists, new consumer with new generation (" << ev->Cookie << ", current "
<< it->second->Generation << "), remove old consumer, sender " << ev->Sender << ", topicPath "
- << ev->Get()->Record.GetSource().GetTopicPath() <<" partitionId " << ev->Get()->Record.GetPartitionId() << " cookie " << ev->Cookie);
- DeleteConsumer(key);
+ << ev->Get()->Record.GetSource().GetTopicPath() << " cookie " << ev->Cookie);
+ DeleteConsumer(ev->Sender);
}
const auto& source = ev->Get()->Record.GetSource();
- TActorId sessionActorId;
- TopicSessionKey topicKey{source.GetReadGroup(), source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), ev->Get()->Record.GetPartitionId()};
- TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
- Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
+ auto consumerInfo = MakeAtomicShared<TConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record,
+ NodesTracker.GetNodeConnected(ev->Sender.NodeId()), ev->Cookie);
- auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()), ev->Cookie);
- Consumers[key] = consumerInfo;
+ Consumers[ev->Sender] = consumerInfo;
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
if (!CheckSession(consumerInfo, ev)) {
return;
}
- if (topicSessionInfo.Sessions.empty()) {
- LOG_ROW_DISPATCHER_DEBUG("Create new session: read group " << source.GetReadGroup() << " topic " << source.GetTopicPath()
- << " part id " << ev->Get()->Record.GetPartitionId() << " offset " << readOffset);
- sessionActorId = ActorFactory->RegisterTopicSession(
- source.GetReadGroup(),
- source.GetTopicPath(),
- source.GetEndpoint(),
- source.GetDatabase(),
- Config,
- SelfId(),
- CompileServiceActorId,
- ev->Get()->Record.GetPartitionId(),
- YqSharedResources->UserSpaceYdbDriver,
- CreateCredentialsProviderFactoryForStructuredToken(
- CredentialsFactory,
- ev->Get()->Record.GetToken(),
- source.GetAddBearerToToken()),
- Counters,
- CountersRoot,
- PqGateway,
- MaxSessionBufferSizeBytes
- );
- SessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId];
- sessionInfo.Consumers[ev->Sender] = consumerInfo;
- } else {
- auto sessionIt = topicSessionInfo.Sessions.begin();
- SessionInfo& sessionInfo = sessionIt->second;
- sessionInfo.Consumers[ev->Sender] = consumerInfo;
- sessionActorId = sessionIt->first;
+ for (auto partitionId : ev->Get()->Record.GetPartitionIds()) {
+ TActorId sessionActorId;
+ TTopicSessionKey topicKey{source.GetReadGroup(), source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
+ TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
+ Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
+
+ if (topicSessionInfo.Sessions.empty()) {
+ LOG_ROW_DISPATCHER_DEBUG("Create new session: read group " << source.GetReadGroup() << " topic " << source.GetTopicPath()
+ << " part id " << partitionId);
+ sessionActorId = ActorFactory->RegisterTopicSession(
+ source.GetReadGroup(),
+ source.GetTopicPath(),
+ source.GetEndpoint(),
+ source.GetDatabase(),
+ Config,
+ SelfId(),
+ CompileServiceActorId,
+ partitionId,
+ YqSharedResources->UserSpaceYdbDriver,
+ CreateCredentialsProviderFactoryForStructuredToken(
+ CredentialsFactory,
+ ev->Get()->Record.GetToken(),
+ source.GetAddBearerToToken()),
+ Counters,
+ CountersRoot,
+ PqGateway,
+ MaxSessionBufferSizeBytes
+ );
+ TSessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId];
+ sessionInfo.Consumers[ev->Sender] = consumerInfo;
+ } else {
+ auto sessionIt = topicSessionInfo.Sessions.begin();
+ TSessionInfo& sessionInfo = sessionIt->second;
+ sessionInfo.Consumers[ev->Sender] = consumerInfo;
+ sessionActorId = sessionIt->first;
+ }
+ consumerInfo->Partitions[partitionId].TopicSessionId = sessionActorId;
+
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStartSession>();
+ event->Record.CopyFrom(ev->Get()->Record);
+ Send(new IEventHandle(sessionActorId, ev->Sender, event.release(), 0));
}
- consumerInfo->TopicSessionId = sessionActorId;
consumerInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvStartSessionAck(consumerInfo->Proto), consumerInfo->Generation);
-
- Forward(ev, sessionActorId);
Metrics.ClientsCount->Set(Consumers.size());
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
- ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Sender);
if (it == Consumers.end()) {
LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvGetNextBatch from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId());
return;
}
- LWPROBE(GetNextBatch, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
+ auto& session = it->second;
+ LWPROBE(GetNextBatch, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), session->QueryId, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_TRACE("Received TEvGetNextBatch from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
- if (!CheckSession(it->second, ev)) {
+ if (!CheckSession(session, ev)) {
+ return;
+ }
+ auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId());
+ if (partitionIt == session->Partitions.end()) {
+ LOG_ROW_DISPATCHER_ERROR("Ignore TEvGetNextBatch from " << ev->Sender << ", wrong partition id " << ev->Get()->Record.GetPartitionId());
return;
}
- it->second->PendingNewDataArrived = false;
- it->second->PendingGetNextBatch = true;
- it->second->Counters.GetNextBatch++;
- Forward(ev, it->second->TopicSessionId);
+ partitionIt->second.PendingNewDataArrived = false;
+ partitionIt->second.PendingGetNextBatch = true;
+ session->Counters.GetNextBatch++;
+ Forward(ev, partitionIt->second.TopicSessionId);
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
- ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Sender);
if (it == Consumers.end()) {
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId());
return;
@@ -868,12 +865,11 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Received TEvNoSession from " << ev->Sender << ", cookie " << ev->Cookie);
- ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
- DeleteConsumer(key);
+ DeleteConsumer(ev->Sender);
}
template <class TEventPtr>
-bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev) {
+bool TRowDispatcher::CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev) {
if (ev->Cookie != consumer->Generation) {
LOG_ROW_DISPATCHER_WARN("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId);
return false;
@@ -887,56 +883,51 @@ bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, cons
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
- ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Sender);
if (it == Consumers.end()) {
- LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId());
- return;
- }
- LWPROBE(StopSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
- LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
- " partitionId " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
- const auto& consumer = it->second;
- if (ev->Cookie != consumer->Generation) {
- LOG_ROW_DISPATCHER_WARN("Wrong message generation, ignore TEvStopSession, sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId);
+ LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender);
return;
}
+
+ LWPROBE(StopSession, ev->Sender.ToString(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
+ LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId);
if (!CheckSession(it->second, ev)) {
return;
}
- DeleteConsumer(key);
+ DeleteConsumer(ev->Sender);
}
-void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
- auto consumerIt = Consumers.find(key);
+void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
+ auto consumerIt = Consumers.find(readActorId);
if (consumerIt == Consumers.end()) {
- LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << key.ReadActorId << " part id " << key.PartitionId);
+ LOG_ROW_DISPATCHER_ERROR("Ignore (no consumer) DeleteConsumer, " << " read actor id " << readActorId);
return;
}
const auto& consumer = consumerIt->second;
- LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << key.ReadActorId << " partitionId " << key.PartitionId << " query id " << consumer->QueryId);
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
- *event->Record.MutableSource() = consumer->SourceParams;
- event->Record.SetPartitionId(consumer->PartitionId);
- Send(new IEventHandle(consumerIt->second->TopicSessionId, consumer->ReadActorId, event.release(), 0));
+ LOG_ROW_DISPATCHER_DEBUG("DeleteConsumer, readActorId " << readActorId << " query id " << consumer->QueryId);
+ for (auto& [partitionId, partition] : consumer->Partitions) {
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
+ *event->Record.MutableSource() = consumer->SourceParams;
+ Send(new IEventHandle(partition.TopicSessionId, consumer->ReadActorId, event.release(), 0));
- TopicSessionKey topicKey{
- consumer->SourceParams.GetReadGroup(),
- consumer->SourceParams.GetEndpoint(),
- consumer->SourceParams.GetDatabase(),
- consumer->SourceParams.GetTopicPath(),
- consumer->PartitionId};
- TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
- SessionInfo& sessionInfo = topicSessionInfo.Sessions[consumerIt->second->TopicSessionId];
- Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId));
- sessionInfo.Consumers.erase(consumer->ReadActorId);
- if (sessionInfo.Consumers.empty()) {
- LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << consumerIt->second->TopicSessionId);
- topicSessionInfo.Sessions.erase(consumerIt->second->TopicSessionId);
- Send(consumerIt->second->TopicSessionId, new NActors::TEvents::TEvPoisonPill());
- if (topicSessionInfo.Sessions.empty()) {
- TopicSessions.erase(topicKey);
+ TTopicSessionKey topicKey{
+ consumer->SourceParams.GetReadGroup(),
+ consumer->SourceParams.GetEndpoint(),
+ consumer->SourceParams.GetDatabase(),
+ consumer->SourceParams.GetTopicPath(),
+ partitionId};
+ TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
+ TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId];
+ Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId));
+ sessionInfo.Consumers.erase(consumer->ReadActorId);
+ if (sessionInfo.Consumers.empty()) {
+ LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId);
+ topicSessionInfo.Sessions.erase(partition.TopicSessionId);
+ Send(partition.TopicSessionId, new NActors::TEvents::TEvPoisonPill());
+ if (topicSessionInfo.Sessions.empty()) {
+ TopicSessions.erase(topicKey);
+ }
}
}
ConsumersByEventQueueId.erase(consumerIt->second->EventQueueId);
@@ -957,70 +948,56 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbea
return;
}
auto& sessionInfo = it->second;
- LWPROBE(PrivateHeartbeat, ev->Sender.ToString(), sessionInfo->PartitionId, sessionInfo->QueryId, sessionInfo->Generation);
+ LWPROBE(PrivateHeartbeat, ev->Sender.ToString(), sessionInfo->QueryId, sessionInfo->Generation);
bool needSend = sessionInfo->EventsQueue.Heartbeat();
if (needSend) {
LOG_ROW_DISPATCHER_TRACE("Send TEvHeartbeat to " << sessionInfo->ReadActorId << " query id " << sessionInfo->QueryId);
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>(sessionInfo->PartitionId);
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>();
Send(new IEventHandle(sessionInfo->ReadActorId, SelfId(), event.release(), 0, sessionInfo->Generation));
}
}
-void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
- ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
+ auto it = Consumers.find(ev->Get()->ReadActorId);
if (it == Consumers.end()) {
LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvNewDataArrived from " << ev->Sender << " part id " << ev->Get()->Record.GetPartitionId());
return;
}
LWPROBE(NewDataArrived, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
- it->second->PendingNewDataArrived = true;
+ auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
+ partition.PendingNewDataArrived = true;
it->second->Counters.NewDataArrived++;
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
- ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Get()->ReadActorId);
if (it == Consumers.end()) {
- LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvMessageBatch from " << ev->Sender);
+ LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId);
return;
}
LWPROBE(MessageBatch, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize());
- it->second->PendingGetNextBatch = false;
+ auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
+ partition.PendingGetNextBatch = false;
it->second->Counters.MessageBatch++;
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
- ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
+ auto it = Consumers.find(ev->Get()->ReadActorId);
if (it == Consumers.end()) {
- LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId());
+ LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId);
return;
}
- LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
+ LWPROBE(SessionError, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
++*Metrics.ErrorsCount;
- LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
- it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
- DeleteConsumer(key);
-}
-
-void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
- LOG_ROW_DISPATCHER_TRACE("TEvStatistics from " << ev->Sender);
- ConsumerSessionKey key{ev->Get()->ReadActorId, ev->Get()->Record.GetPartitionId()};
- auto it = Consumers.find(key);
- if (it == Consumers.end()) {
- LOG_ROW_DISPATCHER_WARN("Ignore (no consumer) TEvStatistics from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId());
- return;
- }
- LWPROBE(Statistics, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
- LOG_ROW_DISPATCHER_TRACE("Forward TEvStatus from " << ev->Sender << " to " << ev->Get()->ReadActorId << " part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
+ LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
+ DeleteConsumer(ev->Get()->ReadActorId);
}
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
@@ -1038,12 +1015,35 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
void TRowDispatcher::PrintStateToLog() {
auto str = GetInternalState();
auto buf = TStringBuf(str);
- i64 size = buf.size();
- ui64 offset = 0;
- while (size > 0) {
+ for (ui64 offset = 0; offset < buf.size(); offset += PrintStateToLogSplitSize) {
LOG_ROW_DISPATCHER_DEBUG(buf.SubString(offset, PrintStateToLogSplitSize));
- offset += PrintStateToLogSplitSize;
- size -= PrintStateToLogSplitSize;
+ }
+}
+
+void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
+ LOG_ROW_DISPATCHER_TRACE("TEvPrivate::TEvSendStatistic");
+
+ Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatistic());
+ for (auto& [actorId, consumer] : Consumers) {
+ if (!NodesTracker.GetNodeConnected(actorId.NodeId())) {
+ continue; // Wait Connected to prevent retry_queue increases.
+ }
+ auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
+ ui64 readBytes = 0;
+ for (auto& [partitionId, partition] : consumer->Partitions) {
+ if (!partition.StatisticsUpdated) {
+ continue;
+ }
+ auto* partitionsProto = event->Record.AddPartition();
+ partitionsProto->SetPartitionId(partitionId);
+ partitionsProto->SetNextMessageOffset(partition.Stat.Offset);
+ readBytes += partition.Stat.ReadBytes;
+ partition.Stat.Clear();
+ partition.StatisticsUpdated = false;
+ }
+ event->Record.SetReadBytes(readBytes);
+ LWPROBE(Statistics, consumer->ReadActorId.ToString(), consumer->QueryId, consumer->Generation, event->Record.ByteSizeLong());
+ consumer->EventsQueue.Send(event.release(), consumer->Generation);
}
}
@@ -1080,7 +1080,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
stat.Common.RestartSessionByOffsets,
stat.Common.ReadEvents,
stat.Common.LastReadedOffset);
- TopicSessionKey sessionKey{key.ReadGroup, key.Endpoint, key.Database, key.TopicPath, key.PartitionId};
+ TTopicSessionKey sessionKey{key.ReadGroup, key.Endpoint, key.Database, key.TopicPath, key.PartitionId};
auto sessionsIt = TopicSessions.find(sessionKey);
if (sessionsIt == TopicSessions.end()) {
@@ -1101,6 +1101,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
}
auto consumerInfoPtr = it->second;
consumerInfoPtr->Stat.Add(clientStat);
+ auto partitionIt = consumerInfoPtr->Partitions.find(key.PartitionId);
+ if (partitionIt == consumerInfoPtr->Partitions.end()) {
+ continue;
+ }
+ partitionIt->second.Stat.Add(clientStat);
+ partitionIt->second.StatisticsUpdated = true;
}
}
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 333deacc279..5aac6c57b13 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -57,8 +57,7 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE),
EvPqEventsReady = EvBegin + 10,
EvCreateSession,
- EvSendStatisticToReadActor,
- EvSendStatisticToRowDispatcher,
+ EvSendStatistic,
EvReconnectSession,
EvEnd
};
@@ -67,8 +66,7 @@ struct TEvPrivate {
// Events
struct TEvPqEventsReady : public TEventLocal<TEvPqEventsReady, EvPqEventsReady> {};
struct TEvCreateSession : public TEventLocal<TEvCreateSession, EvCreateSession> {};
- struct TEvSendStatisticToRowDispatcher : public TEventLocal<TEvSendStatisticToRowDispatcher, EvSendStatisticToRowDispatcher> {};
- struct TEvSendStatisticToReadActor : public TEventLocal<TEvSendStatisticToReadActor, EvSendStatisticToReadActor> {};
+ struct TEvSendStatistic : public TEventLocal<TEvSendStatistic, EvSendStatistic> {};
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
};
@@ -96,7 +94,7 @@ private:
struct TClientsInfo : public IClientDataConsumer {
using TPtr = TIntrusivePtr<TClientsInfo>;
- TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup)
+ TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe<ui64> offset)
: Self(self)
, LogPrefix(logPrefix)
, HandlerSettings(handlerSettings)
@@ -104,9 +102,9 @@ private:
, ReadActorId(ev->Sender)
, Counters(counters)
{
- if (Settings.HasOffset()) {
- NextMessageOffset = Settings.GetOffset();
- InitialOffset = Settings.GetOffset();
+ if (offset) {
+ NextMessageOffset = *offset;
+ InitialOffset = *offset;
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
@@ -198,7 +196,7 @@ private:
// Metrics
ui64 InitialOffset = 0;
- TStats Stat; // Send (filtered) to read_actor
+ TStats FilteredStat;
const ::NMonitoring::TDynamicCounterPtr Counters;
NMonitoring::TDynamicCounters::TCounterPtr FilteredDataRate; // filtered
NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsetsByQuery;
@@ -252,8 +250,7 @@ private:
// Metrics
TInstant WaitEventStartedAt;
ui64 RestartSessionByOffsets = 0;
- TStats SessionStats;
- TStats ClientsStats;
+ TStats Statistics;
TTopicSessionMetrics Metrics;
const ::NMonitoring::TDynamicCounterPtr Counters;
const ::NMonitoring::TDynamicCounterPtr CountersRoot;
@@ -300,16 +297,16 @@ private:
void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&);
void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&);
void Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&);
- void Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&);
- void Handle(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher::TPtr&);
+ void Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&);
void Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr&);
void Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
void HandleException(const std::exception& err);
- void SendStatisticToRowDispatcher();
- void SendSessionError(TActorId readActorId, TStatus status);
+ void SendStatistics();
bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
+ TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings);
+ void SendSessionError(TActorId readActorId, TStatus status);
void RestartSessionIfOldestClient(const TClientsInfo& info);
private:
@@ -317,8 +314,7 @@ private:
STRICT_STFUNC_EXC(StateFunc,
hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle);
hFunc(NFq::TEvPrivate::TEvCreateSession, Handle);
- hFunc(NFq::TEvPrivate::TEvSendStatisticToReadActor, Handle);
- hFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher, Handle);
+ hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
hFunc(NFq::TEvPrivate::TEvReconnectSession, Handle);
hFunc(TEvRowDispatcher::TEvGetNextBatch, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle);
@@ -331,11 +327,10 @@ private:
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady);
IgnoreFunc(NFq::TEvPrivate::TEvCreateSession);
- IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToReadActor);
IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch);
IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession);
IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession);
- IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher);,
+ IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);,
ExceptionFunc(std::exception, HandleException)
)
};
@@ -380,8 +375,7 @@ void TTopicSession::Bootstrap() {
LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition
<< ", Timeout " << Config.GetTimeoutBeforeStartSessionSec() << " sec, StatusPeriod " << Config.GetSendStatusPeriodSec() << " sec");
Y_ENSURE(Config.GetSendStatusPeriodSec() > 0);
- Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor());
- Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatisticToRowDispatcher());
+ Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatistic());
}
void TTopicSession::PassAway() {
@@ -497,27 +491,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) {
CreateTopicSession();
}
-void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&) {
- LOG_ROW_DISPATCHER_TRACE("TEvSendStatisticToReadActor");
- Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatisticToReadActor());
-
- auto readBytes = ClientsStats.Bytes;
- for (auto& [actorId, infoPtr] : Clients) {
- auto& info = *infoPtr;
- if (!info.ProcessedNextMessageOffset) {
- continue;
- }
- auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
- event->Record.SetPartitionId(PartitionId);
- event->Record.SetNextMessageOffset(*info.ProcessedNextMessageOffset);
- event->Record.SetReadBytes(readBytes);
- event->ReadActorId = info.ReadActorId;
- LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << info.ProcessedNextMessageOffset);
- Send(RowDispatcherActorId, event.release());
- }
- ClientsStats.Clear();
-}
-
void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
Metrics.ReconnectRate->Inc();
TInstant minTime = GetMinStartingMessageTimestamp();
@@ -581,8 +554,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
Self.LastMessageOffset = message.GetOffset();
}
- Self.SessionStats.Add(dataSize, event.GetMessages().size());
- Self.ClientsStats.Add(dataSize, event.GetMessages().size());
+ Self.Statistics.Add(dataSize, event.GetMessages().size());
Self.Metrics.SessionDataRate->Add(dataSize);
Self.Metrics.AllSessionsDataRate->Add(dataSize);
DataReceivedEventSize += dataSize;
@@ -693,15 +665,16 @@ void TTopicSession::SendData(TClientsInfo& info) {
info.UnreadRows = 0;
info.UnreadBytes = 0;
- info.Stat.Add(dataSize, eventsSize);
+ info.FilteredStat.Add(dataSize, eventsSize);
info.FilteredDataRate->Add(dataSize);
info.ProcessedNextMessageOffset = *info.NextMessageOffset;
}
void TTopicSession::StartClientSession(TClientsInfo& info) {
if (ReadSession) {
- if (info.Settings.HasOffset() && info.Settings.GetOffset() <= LastMessageOffset) {
- LOG_ROW_DISPATCHER_INFO("New client has less offset (" << info.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
+ auto offset = GetOffset(info.Settings);
+ if (offset && offset <= LastMessageOffset) {
+ LOG_ROW_DISPATCHER_INFO("New client has less offset (" << offset << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
info.RestartSessionByOffsetsByQuery->Inc();
@@ -715,8 +688,9 @@ void TTopicSession::StartClientSession(TClientsInfo& info) {
}
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
+ auto offset = GetOffset(ev->Get()->Record);
const auto& source = ev->Get()->Record.GetSource();
- LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset());
+ LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << offset);
if (!CheckNewClient(ev)) {
return;
@@ -725,7 +699,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
const TString& format = source.GetFormat();
ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"};
- auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second;
+ auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second;
auto formatIt = FormatHandlers.find(handlerSettings);
if (formatIt == FormatHandlers.end()) {
formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(
@@ -742,12 +716,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
}
ConsumerName = source.GetConsumerName();
- SendStatisticToRowDispatcher();
+ SendStatistics();
}
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
- LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
- " partitionId " << ev->Get()->Record.GetPartitionId() << " clients count " << Clients.size());
+ LOG_ROW_DISPATCHER_DEBUG("TEvStopSession from " << ev->Sender << " topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " clients count " << Clients.size());
auto it = Clients.find(ev->Sender);
if (it == Clients.end()) {
@@ -824,7 +797,6 @@ void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) {
auto event = std::make_unique<TEvRowDispatcher::TEvSessionError>();
event->Record.SetStatusCode(status.GetStatus());
NYql::IssuesToMessage(status.GetErrorDescription(), event->Record.MutableIssues());
- event->Record.SetPartitionId(PartitionId);
event->ReadActorId = readActorId;
Send(RowDispatcherActorId, event.release());
}
@@ -858,15 +830,15 @@ void TTopicSession::HandleException(const std::exception& e) {
FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Session error, got unexpected exception: " << e.what()));
}
-void TTopicSession::SendStatisticToRowDispatcher() {
+void TTopicSession::SendStatistics() {
+ LOG_ROW_DISPATCHER_TRACE("SendStatistics");
TTopicSessionStatistic sessionStatistic;
auto& commonStatistic = sessionStatistic.Common;
commonStatistic.UnreadBytes = UnreadBytes;
commonStatistic.RestartSessionByOffsets = RestartSessionByOffsets;
- commonStatistic.ReadBytes = SessionStats.Bytes;
- commonStatistic.ReadEvents = SessionStats.Events;
+ commonStatistic.ReadBytes = Statistics.Bytes;
+ commonStatistic.ReadEvents = Statistics.Events;
commonStatistic.LastReadedOffset = LastMessageOffset;
- SessionStats.Clear();
sessionStatistic.SessionKey = TTopicSessionParams{ReadGroup, Endpoint, Database, TopicPath, PartitionId};
sessionStatistic.Clients.reserve(Clients.size());
@@ -877,12 +849,13 @@ void TTopicSession::SendStatisticToRowDispatcher() {
clientStatistic.ReadActorId = readActorId;
clientStatistic.UnreadRows = info.UnreadRows;
clientStatistic.UnreadBytes = info.UnreadBytes;
- clientStatistic.Offset = info.NextMessageOffset.GetOrElse(0);
- clientStatistic.ReadBytes = info.Stat.Bytes;
+ clientStatistic.Offset = info.ProcessedNextMessageOffset.GetOrElse(0);
+ clientStatistic.FilteredReadBytes = info.FilteredStat.Bytes;
+ clientStatistic.ReadBytes = Statistics.Bytes;
clientStatistic.IsWaiting = LastMessageOffset + 1 < info.NextMessageOffset.GetOrElse(0);
clientStatistic.ReadLagMessages = info.NextMessageOffset.GetOrElse(0) - LastMessageOffset - 1;
clientStatistic.InitialOffset = info.InitialOffset;
- info.Stat.Clear();
+ info.FilteredStat.Clear();
sessionStatistic.Clients.emplace_back(std::move(clientStatistic));
}
@@ -893,11 +866,12 @@ void TTopicSession::SendStatisticToRowDispatcher() {
auto event = std::make_unique<TEvRowDispatcher::TEvSessionStatistic>(sessionStatistic);
Send(RowDispatcherActorId, event.release());
+ Statistics.Clear();
}
-void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher::TPtr&) {
- Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatisticToRowDispatcher());
- SendStatisticToRowDispatcher();
+void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
+ SendStatistics();
+ Schedule(TDuration::Seconds(SendStatisticPeriodSec), new NFq::TEvPrivate::TEvSendStatistic());
}
bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -918,6 +892,16 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
return true;
}
+TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings) {
+ for (auto p : settings.GetOffsets()) {
+ if (p.GetPartitionId() != PartitionId) {
+ continue;
+ }
+ return p.GetOffset();
+ }
+ return Nothing();
+}
+
} // anonymous namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index fcec8b0d673..340195f7328 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -137,29 +137,27 @@ public:
return settings;
}
- void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId, ui64 generation = 1) {
+ void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, const std::set<ui32>& partitionIds, TActorId readActorId, ui64 generation = 1) {
auto event = new NFq::TEvRowDispatcher::TEvStartSession(
source,
- partitionId, // partitionId
+ partitionIds,
"Token",
- Nothing(), // readOffset,
+ {}, // readOffset,
0, // StartingMessageTimestamp;
"QueryId");
event->Record.MutableTransportMeta()->SetSeqNo(1);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event, 0, generation));
}
- void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) {
+ void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, TActorId readActorId) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
event->Record.MutableSource()->CopyFrom(source);
- event->Record.SetPartitionId(partitionId);
event->Record.MutableTransportMeta()->SetSeqNo(1);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
}
- void MockNoSession(ui64 partitionId, TActorId readActorId) {
+ void MockNoSession(TActorId readActorId) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>();
- event->Record.SetPartitionId(partitionId);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
}
@@ -177,9 +175,8 @@ public:
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, generation));
}
- void MockSessionError(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
+ void MockSessionError(TActorId topicSessionId, TActorId readActorId) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvSessionError>();
- event->Record.SetPartitionId(partitionId);
event->ReadActorId = readActorId;
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
}
@@ -201,10 +198,9 @@ public:
UNIT_ASSERT(eventHolder.Get() != nullptr);
}
- void ExpectStopSession(NActors::TActorId actorId, ui64 partitionId) {
+ void ExpectStopSession(NActors::TActorId actorId) {
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(actorId);
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == partitionId);
}
void ExpectGetNextBatch(NActors::TActorId topicSessionId, ui64 partitionId) {
@@ -230,10 +226,9 @@ public:
UNIT_ASSERT(eventHolder.Get() != nullptr);
}
- void ExpectSessionError(NActors::TActorId readActorId, ui64 partitionId) {
+ void ExpectSessionError(NActors::TActorId readActorId) {
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvSessionError>(readActorId);
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == partitionId);
}
NActors::TActorId ExpectRegisterTopicSession() {
@@ -267,51 +262,51 @@ public:
NYql::NPq::NProto::TDqPqTopicSource Source2 = BuildPqTopicSourceSettings("Endpoint2", "Database1", "topic", "connection_id1");
NYql::NPq::NProto::TDqPqTopicSource Source1Connection2 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id2");
- ui64 PartitionId0 = 0;
- ui64 PartitionId1 = 1;
+ ui32 PartitionId0 = 0;
+ ui32 PartitionId1 = 1;
};
Y_UNIT_TEST_SUITE(RowDispatcherTests) {
Y_UNIT_TEST_F(OneClientOneSession, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
ProcessData(ReadActorId1, PartitionId0, topicSessionId);
- MockStopSession(Source1, PartitionId0, ReadActorId1);
- ExpectStopSession(topicSessionId, PartitionId0);
+ MockStopSession(Source1, ReadActorId1);
+ ExpectStopSession(topicSessionId);
}
Y_UNIT_TEST_F(TwoClientOneSession, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
- MockAddSession(Source1, PartitionId0, ReadActorId2);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId2);
ExpectStartSessionAck(ReadActorId2);
ExpectStartSession(topicSessionId);
ProcessData(ReadActorId1, PartitionId0, topicSessionId);
ProcessData(ReadActorId2, PartitionId0, topicSessionId);
- MockSessionError(PartitionId0, topicSessionId, ReadActorId1);
- ExpectSessionError(ReadActorId1, PartitionId0);
+ MockSessionError(topicSessionId, ReadActorId1);
+ ExpectSessionError(ReadActorId1);
- MockSessionError(PartitionId0, topicSessionId, ReadActorId2);
- ExpectSessionError(ReadActorId2, PartitionId0);
+ MockSessionError(topicSessionId, ReadActorId2);
+ ExpectSessionError(ReadActorId2);
}
Y_UNIT_TEST_F(SessionError, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
- MockSessionError(PartitionId0, topicSessionId, ReadActorId1);
- ExpectSessionError(ReadActorId1, PartitionId0);
+ MockSessionError(topicSessionId, ReadActorId1);
+ ExpectSessionError(ReadActorId1);
}
Y_UNIT_TEST_F(CoordinatorSubscribe, TFixture) {
@@ -342,24 +337,18 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
Y_UNIT_TEST_F(TwoClients4Sessions, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1);
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1);
auto topicSession1 = ExpectRegisterTopicSession();
- ExpectStartSessionAck(ReadActorId1);
- ExpectStartSession(topicSession1);
-
- MockAddSession(Source1, PartitionId1, ReadActorId1);
auto topicSession2 = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
+ ExpectStartSession(topicSession1);
ExpectStartSession(topicSession2);
- MockAddSession(Source2, PartitionId0, ReadActorId2);
+ MockAddSession(Source2, {PartitionId0, PartitionId1}, ReadActorId2);
auto topicSession3 = ExpectRegisterTopicSession();
- ExpectStartSessionAck(ReadActorId2);
- ExpectStartSession(topicSession3);
-
- MockAddSession(Source2, PartitionId1, ReadActorId2);
auto topicSession4 = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId2);
+ ExpectStartSession(topicSession3);
ExpectStartSession(topicSession4);
ProcessData(ReadActorId1, PartitionId0, topicSession1);
@@ -367,69 +356,75 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ProcessData(ReadActorId2, PartitionId0, topicSession3);
ProcessData(ReadActorId2, PartitionId1, topicSession4);
- MockSessionError(PartitionId0, topicSession1, ReadActorId1);
- ExpectSessionError(ReadActorId1, PartitionId0);
+ MockSessionError(topicSession1, ReadActorId1);
+ ExpectSessionError(ReadActorId1);
ProcessData(ReadActorId1, PartitionId1, topicSession2);
ProcessData(ReadActorId2, PartitionId0, topicSession3);
ProcessData(ReadActorId2, PartitionId1, topicSession4);
- MockStopSession(Source1, PartitionId1, ReadActorId1);
- ExpectStopSession(topicSession2, PartitionId1);
+ MockStopSession(Source1, ReadActorId1);
+ ExpectStopSession(topicSession2);
- MockStopSession(Source2, PartitionId0, ReadActorId2);
- ExpectStopSession(topicSession3, PartitionId0);
+ MockStopSession(Source2, ReadActorId2);
+ ExpectStopSession(topicSession3);
- MockStopSession(Source2, PartitionId1, ReadActorId2);
- ExpectStopSession(topicSession4, PartitionId1);
+ MockStopSession(Source2, ReadActorId2);
+ ExpectStopSession(topicSession4);
// Ignore data after StopSession
MockMessageBatch(PartitionId1, topicSession4, ReadActorId2, 1);
}
Y_UNIT_TEST_F(ReinitConsumerIfNewGeneration, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1, 1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1, 1);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
ProcessData(ReadActorId1, PartitionId0, topicSessionId);
// ignore StartSession with same generation
- MockAddSession(Source1, PartitionId0, ReadActorId1, 1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1, 1);
// reinit consumer
- MockAddSession(Source1, PartitionId0, ReadActorId1, 2);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1, 2);
ExpectStartSessionAck(ReadActorId1, 2);
}
Y_UNIT_TEST_F(HandleTEvUndelivered, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1, 1);
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId1, 1);
auto topicSession1 = ExpectRegisterTopicSession();
+ auto topicSession2 = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1, 1);
ExpectStartSession(topicSession1);
+ ExpectStartSession(topicSession2);
- MockAddSession(Source1, PartitionId1, ReadActorId1, 2);
- auto topicSession2 = ExpectRegisterTopicSession();
- ExpectStartSessionAck(ReadActorId1, 2);
+ MockAddSession(Source1, {PartitionId0, PartitionId1}, ReadActorId2, 1);
+ ExpectStartSessionAck(ReadActorId2, 1);
+ ExpectStartSession(topicSession1);
ExpectStartSession(topicSession2);
ProcessData(ReadActorId1, PartitionId0, topicSession1, 1);
- ProcessData(ReadActorId1, PartitionId1, topicSession2, 2);
-
- MockUndelivered(ReadActorId1, 2);
- ExpectStopSession(topicSession2, PartitionId1);
+ ProcessData(ReadActorId1, PartitionId1, topicSession2, 1);
+ ProcessData(ReadActorId2, PartitionId0, topicSession1, 1);
+ ProcessData(ReadActorId2, PartitionId1, topicSession2, 1);
MockUndelivered(ReadActorId1, 1);
- ExpectStopSession(topicSession1, PartitionId0);
+ ExpectStopSession(topicSession1);
+ ExpectStopSession(topicSession2);
+
+ MockUndelivered(ReadActorId2, 1);
+ ExpectStopSession(topicSession1);
+ ExpectStopSession(topicSession2);
}
Y_UNIT_TEST_F(TwoClientTwoConnection, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId1);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId1);
auto session1 = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(session1);
- MockAddSession(Source1Connection2, PartitionId0, ReadActorId2);
+ MockAddSession(Source1Connection2, {PartitionId0}, ReadActorId2);
auto session2 = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId2);
ExpectStartSession(session2);
@@ -437,22 +432,22 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
ProcessData(ReadActorId1, PartitionId0, session1);
ProcessData(ReadActorId2, PartitionId0, session2);
- MockStopSession(Source1, PartitionId0, ReadActorId1);
- ExpectStopSession(session1, PartitionId0);
+ MockStopSession(Source1, ReadActorId1);
+ ExpectStopSession(session1);
- MockStopSession(Source1Connection2, PartitionId0, ReadActorId2);
- ExpectStopSession(session2, PartitionId0);
+ MockStopSession(Source1Connection2, ReadActorId2);
+ ExpectStopSession(session2);
}
Y_UNIT_TEST_F(ProcessNoSession, TFixture) {
- MockAddSession(Source1, PartitionId0, ReadActorId3);
+ MockAddSession(Source1, {PartitionId0}, ReadActorId3);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId3);
ExpectStartSession(topicSessionId);
ProcessData(ReadActorId3, PartitionId0, topicSessionId);
- MockNoSession(PartitionId0, ReadActorId3);
- ExpectStopSession(topicSessionId, PartitionId0);
+ MockNoSession(ReadActorId3);
+ ExpectStopSession(topicSessionId);
}
}
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
index 379fe1528fb..75af42af21b 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
@@ -88,11 +88,15 @@ public:
}
void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe<ui64> readOffset = Nothing(), bool expectedError = false) {
+ std::map<ui32, ui64> readOffsets;
+ if (readOffset) {
+ readOffsets[PartitionId] = *readOffset;
+ }
auto event = new NFq::TEvRowDispatcher::TEvStartSession(
source,
- PartitionId,
+ {PartitionId},
"Token",
- readOffset, // readOffset,
+ readOffsets,
0, // StartingMessageTimestamp;
"QueryId");
Runtime.Send(new IEventHandle(TopicSession, readActorId, event));
@@ -131,7 +135,6 @@ public:
void StopSession(NActors::TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
*event->Record.MutableSource() = source;
- event->Record.SetPartitionId(PartitionId);
Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release()));
}
@@ -183,15 +186,30 @@ public:
return numberMessages;
}
- void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds, ui64 expectedNextMessageOffset) {
- size_t count = readActorIds.size();
- for (size_t i = 0; i < count; ++i) {
- auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
+ void ExpectStatistics(TMap<NActors::TActorId, ui64> clients) {
+ auto check = [&]() -> bool {
+ auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvSessionStatistic>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId));
- readActorIds.erase(eventHolder->Get()->ReadActorId);
- UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->Record.GetNextMessageOffset(), expectedNextMessageOffset);
+ if (clients.size() != eventHolder->Get()->Stat.Clients.size()) {
+ return false;
+ }
+ for (const auto& client : eventHolder->Get()->Stat.Clients) {
+ if (!clients.contains(client.ReadActorId)) {
+ return false;
+ }
+ if (clients[client.ReadActorId] != client.Offset) {
+ return false;
+ }
+ }
+ return true;
+ };
+ auto start = TInstant::Now();
+ while (TInstant::Now() - start < TDuration::Seconds(5)) {
+ if (check()) {
+ return;
+ }
}
+ UNIT_FAIL("ExpectStatistics timeout");
}
static TRow JsonMessage(ui64 index) {
@@ -250,7 +268,7 @@ public:
NActors::TActorId ReadActorId1;
NActors::TActorId ReadActorId2;
NActors::TActorId ReadActorId3;
- ui64 PartitionId = 0;
+ ui32 PartitionId = 0;
NConfig::TRowDispatcherConfig Config;
TIntrusivePtr<IMockPqGateway> MockPqGateway;
@@ -273,22 +291,26 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
Init(topicName);
auto source = BuildSource();
StartSession(ReadActorId1, source);
+ ExpectStatistics({{ReadActorId1, 0}});
StartSession(ReadActorId2, source);
+ ExpectStatistics({{ReadActorId1, 0}, {ReadActorId2, 0}});
std::vector<TString> data = { Json1 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
+
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) });
- ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
+ ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}});
data = { Json2 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
- ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
+
+ ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}});
ExpectMessageBatch(ReadActorId1, { JsonMessage(2) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(2) });
- ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2);
+ ExpectStatistics({{ReadActorId1, 2}, {ReadActorId2, 2}});
auto source2 = BuildSource(false, "OtherConsumer");
StartSession(ReadActorId3, source2, Nothing(), true);
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index 4b39715b2fe..6f0a1b9f628 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -83,6 +83,7 @@ struct TRowDispatcherReadActorMetrics {
auto task = source->GetSubgroup("task_id", ToString(taskId));
InFlyGetNextBatch = task->GetCounter("InFlyGetNextBatch");
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
+ ReInit = task->GetCounter("ReInit", true);
}
~TRowDispatcherReadActorMetrics() {
@@ -94,6 +95,7 @@ struct TRowDispatcherReadActorMetrics {
::NMonitoring::TDynamicCounterPtr SubGroup;
::NMonitoring::TDynamicCounters::TCounterPtr InFlyGetNextBatch;
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReInit;
};
struct TEvPrivate {
@@ -111,9 +113,7 @@ struct TEvPrivate {
class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
const ui64 PrintStatePeriodSec = 300;
- const ui64 ProcessStatePeriodSec = 2;
-
- using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
+ const ui64 SleepPeriodSec = 2;
struct TReadyBatch {
public:
@@ -164,53 +164,68 @@ private:
NActors::TActorId LocalRowDispatcherActorId;
std::queue<TReadyBatch> ReadyBuffer;
EState State = EState::INIT;
+ bool Inited = false;
ui64 CoordinatorRequestCookie = 0;
TRowDispatcherReadActorMetrics Metrics;
- bool SchedulePrintStatePeriod = false;
bool ProcessStateScheduled = false;
bool InFlyAsyncInputData = false;
TCounters Counters;
-
// Parsing info
std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher)
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker;
- struct SessionInfo {
+ THashMap<ui32, TMaybe<ui64>> NextOffsetFromRD;
+
+ struct TPartition {
+ bool HasPendingData = false;
+ bool IsWaitingMessageBatch = false;
+ };
+
+ struct TSession {
enum class ESessionStatus {
- NoSession,
- Started,
+ INIT,
+ WAIT_START_SESSION_ACK,
+ STARTED,
};
- SessionInfo(
+
+ TSession(
const TTxId& txId,
const NActors::TActorId selfId,
TActorId rowDispatcherActorId,
- ui64 partitionId,
ui64 eventQueueId,
ui64 generation)
- : RowDispatcherActorId(rowDispatcherActorId)
- , PartitionId(partitionId)
- , Generation(generation) {
- EventsQueue.Init(txId, selfId, selfId, eventQueueId, /* KeepAlive */ true);
+ : TxId(txId)
+ , SelfId(selfId)
+ , EventQueueId(eventQueueId)
+ , RowDispatcherActorId(rowDispatcherActorId)
+ , Generation(generation)
+ {
+ EventsQueue.Init(TxId, SelfId, SelfId, EventQueueId, /* KeepAlive */ true);
EventsQueue.OnNewRecipientId(rowDispatcherActorId);
}
- ESessionStatus Status = ESessionStatus::NoSession;
- ui64 NextOffset = 0;
- bool IsWaitingStartSessionAck = false;
+ ESessionStatus Status = ESessionStatus::INIT;
+ const TTxId TxId;
+ const NActors::TActorId SelfId;
+ const ui64 EventQueueId;
NYql::NDq::TRetryEventsQueue EventsQueue;
- bool HasPendingData = false;
- bool IsWaitingMessageBatch = false;
TActorId RowDispatcherActorId;
- ui64 PartitionId;
- ui64 Generation;
+ ui64 Generation = std::numeric_limits<ui64>::max();
+ THashMap<ui32, TPartition> Partitions;
+ bool IsWaitingStartSessionAck = false;
};
-
- TMap<ui64, SessionInfo> Sessions;
+
+ TMap<NActors::TActorId, TSession> Sessions;
+ THashMap<ui64, NActors::TActorId> ReadActorByEventQueueId;
const THolderFactory& HolderFactory;
const i64 MaxBufferSize;
i64 ReadyBufferSizeBytes = 0;
ui64 NextGeneration = 0;
+ ui64 NextEventQueueId = 0;
+
+ TMap<NActors::TActorId, TSet<ui32>> LastUsedPartitionDistribution;
+ TMap<NActors::TActorId, TSet<ui32>> LastReceivedPartitionDistribution;
public:
TDqPqRdReadActor(
@@ -276,16 +291,22 @@ public:
std::vector<ui64> GetPartitionsToRead() const;
void AddMessageBatch(TRope&& serializedBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer);
void ProcessState();
+ void StopSession(TSession& sessionInfo);
void Stop(NDqProto::StatusIds::StatusCode status, TIssues issues);
- void StopSessions();
void ReInit(const TString& reason);
void PrintInternalState();
+ void TrySendGetNextBatch(TSession& sessionInfo);
TString GetInternalState();
- void TrySendGetNextBatch(SessionInfo& sessionInfo);
template <class TEventPtr>
- bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
- void SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
+ TSession* FindSession(const TEventPtr& ev);
+ void SendNoSession(const NActors::TActorId& recipient, ui64 cookie);
void NotifyCA();
+ void SendStartSession(TSession& sessionInfo);
+ void Init();
+ void Sleep();
+ void ProcessGlobalState();
+ void ProcessSessionsState();
+ void UpdateSessions();
};
TDqPqRdReadActor::TDqPqRdReadActor(
@@ -332,36 +353,48 @@ TDqPqRdReadActor::TDqPqRdReadActor(
DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(InputDataType);
IngressStats.Level = statsLevel;
- SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()));
+ SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields())
+ << ", partitions: " << JoinSeq(',', GetPartitionsToRead()));
}
-void TDqPqRdReadActor::ProcessState() {
+void TDqPqRdReadActor::Init() {
+ if (Inited) {
+ return;
+ }
+ LogPrefix = (TStringBuilder() << "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". PQ source. ");
+
+ auto partitionToRead = GetPartitionsToRead();
+ for (auto partitionId : partitionToRead) {
+ TPartitionKey partitionKey{TString{}, partitionId};
+ const auto offsetIt = PartitionToOffset.find(partitionKey);
+ auto& nextOffset = NextOffsetFromRD[partitionId];
+ if (offsetIt != PartitionToOffset.end()) {
+ nextOffset = offsetIt->second;
+ }
+ }
+ SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local RD (" << LocalRowDispatcherActorId << ")");
+ Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
+
+ Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
+ Inited = true;
+}
+
+void TDqPqRdReadActor::ProcessGlobalState() {
switch (State) {
case EState::INIT:
- LogPrefix = (TStringBuilder() << "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". PQ source. ");
-
if (!ReadyBuffer.empty()) {
return;
}
- if (!ProcessStateScheduled) {
- ProcessStateScheduled = true;
- Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState());
- }
if (!CoordinatorActorId) {
SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId());
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
- if (!SchedulePrintStatePeriod) {
- SchedulePrintStatePeriod = true;
- Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
- }
+ State = EState::WAIT_COORDINATOR_ID;
}
- State = EState::WAIT_COORDINATOR_ID;
[[fallthrough]];
case EState::WAIT_COORDINATOR_ID: {
if (!CoordinatorActorId) {
return;
}
- State = EState::WAIT_PARTITIONS_ADDRES;
auto partitionToRead = GetPartitionsToRead();
auto cookie = ++CoordinatorRequestCookie;
SRC_LOG_I("Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString() << ", partIds: "
@@ -371,70 +404,94 @@ void TDqPqRdReadActor::ProcessState() {
new NFq::TEvRowDispatcher::TEvCoordinatorRequest(SourceParams, partitionToRead),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
cookie);
- return;
+ LastReceivedPartitionDistribution.clear();
+ State = EState::WAIT_PARTITIONS_ADDRES;
+ [[fallthrough]];
}
case EState::WAIT_PARTITIONS_ADDRES:
- if (Sessions.empty()) {
- return;
- }
-
- for (auto& [partitionId, sessionInfo] : Sessions) {
- if (sessionInfo.Status == SessionInfo::ESessionStatus::NoSession) {
- TMaybe<ui64> readOffset;
- TPartitionKey partitionKey{TString{}, partitionId};
- const auto offsetIt = PartitionToOffset.find(partitionKey);
- if (offsetIt != PartitionToOffset.end()) {
- SRC_LOG_D("ReadOffset found" );
- readOffset = offsetIt->second;
- }
-
- SRC_LOG_I("Send TEvStartSession to " << sessionInfo.RowDispatcherActorId
- << ", offset " << readOffset
- << ", partitionId " << partitionId
- << ", connection id " << sessionInfo.Generation);
-
- auto event = new NFq::TEvRowDispatcher::TEvStartSession(
- SourceParams,
- partitionId,
- Token,
- readOffset,
- StartingMessageTimestamp.MilliSeconds(),
- std::visit([](auto arg) { return ToString(arg); }, TxId));
- sessionInfo.EventsQueue.Send(event, sessionInfo.Generation);
- sessionInfo.IsWaitingStartSessionAck = true;
- sessionInfo.Status = SessionInfo::ESessionStatus::Started;
- }
+ if (LastReceivedPartitionDistribution.empty()) {
+ break;
}
+ UpdateSessions();
State = EState::STARTED;
- return;
+ [[fallthrough]];
case EState::STARTED:
- return;
+ break;
}
}
+void TDqPqRdReadActor::ProcessSessionsState() {
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
+ switch (sessionInfo.Status) {
+ case TSession::ESessionStatus::INIT:
+ SendStartSession(sessionInfo);
+ sessionInfo.IsWaitingStartSessionAck = true;
+ sessionInfo.Status = TSession::ESessionStatus::WAIT_START_SESSION_ACK;
+ [[fallthrough]];
+ case TSession::ESessionStatus::WAIT_START_SESSION_ACK:
+ if (sessionInfo.IsWaitingStartSessionAck) {
+ break;
+ }
+ sessionInfo.Status = TSession::ESessionStatus::STARTED;
+ [[fallthrough]];
+ case TSession::ESessionStatus::STARTED:
+ break;
+ }
+ }
+}
-void TDqPqRdReadActor::CommitState(const NDqProto::TCheckpoint& /*checkpoint*/) {
+void TDqPqRdReadActor::ProcessState() {
+ ProcessGlobalState();
+ ProcessSessionsState();
}
-void TDqPqRdReadActor::StopSessions() {
- SRC_LOG_I("Stop all session");
- for (auto& [partitionId, sessionInfo] : Sessions) {
- if (sessionInfo.Status == SessionInfo::ESessionStatus::NoSession) {
+void TDqPqRdReadActor::SendStartSession(TSession& sessionInfo) {
+ auto str = TStringBuilder() << "Send TEvStartSession to " << sessionInfo.RowDispatcherActorId
+ << ", connection id " << sessionInfo.Generation << " partitions offsets ";
+
+ std::set<ui32> partitions;
+ std::map<ui32, ui64> partitionOffsets;
+ for (auto& [partitionId, partition] : sessionInfo.Partitions) {
+ partitions.insert(partitionId);
+ auto nextOffset = NextOffsetFromRD[partitionId];
+ str << "(" << partitionId << " / ";
+ if (!nextOffset) {
+ str << "<empty>),";
continue;
}
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
- *event->Record.MutableSource() = SourceParams;
- event->Record.SetPartitionId(partitionId);
- SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId);
- sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation);
+ partitionOffsets[partitionId] = *nextOffset;
+ str << nextOffset << "),";
}
+ SRC_LOG_I(str);
+
+ auto event = new NFq::TEvRowDispatcher::TEvStartSession(
+ SourceParams,
+ partitions,
+ Token,
+ partitionOffsets,
+ StartingMessageTimestamp.MilliSeconds(),
+ std::visit([](auto arg) { return ToString(arg); }, TxId));
+ sessionInfo.EventsQueue.Send(event, sessionInfo.Generation);
+}
+
+void TDqPqRdReadActor::CommitState(const NDqProto::TCheckpoint& /*checkpoint*/) {
+}
+
+void TDqPqRdReadActor::StopSession(TSession& sessionInfo) {
+ SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId
+ << " generation " << sessionInfo.Generation);
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
+ *event->Record.MutableSource() = SourceParams;
+ sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation);
}
// IActor & IDqComputeActorAsyncInput
void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor
SRC_LOG_I("PassAway");
PrintInternalState();
- StopSessions();
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
+ StopSession(sessionInfo);
+ }
TActor<TDqPqRdReadActor>::PassAway();
// TODO: RetryQueue::Unsubscribe()
@@ -442,10 +499,10 @@ void TDqPqRdReadActor::PassAway() { // Is called from Compute Actor
i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& /*watermark*/, bool&, i64 freeSpace) {
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);
+ Init();
Metrics.InFlyAsyncInputData->Set(0);
InFlyAsyncInputData = false;
- ProcessState();
if (ReadyBuffer.empty() || !freeSpace) {
return 0;
}
@@ -472,16 +529,14 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
if (!ReadyBuffer.empty()) {
NotifyCA();
}
- for (auto& [partitionId, sessionInfo] : Sessions) {
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
TrySendGetNextBatch(sessionInfo);
}
- ProcessState();
return usedSpace;
}
std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const {
std::vector<ui64> res;
-
ui64 currentPartition = ReadParams.GetPartitioningParams().GetEachTopicPartitionGroupId();
do {
res.emplace_back(currentPartition); // 0-based in topic API
@@ -492,45 +547,25 @@ std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const {
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_I("TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
+ SRC_LOG_I("Received TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", generation " << ev->Cookie);
Counters.StartSessionAck++;
-
- ui64 partitionId = ev->Get()->Record.GetConsumer().GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
- if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
- << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
- YQL_ENSURE(State != EState::STARTED);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ auto* session = FindSession(ev);
+ if (!session) {
return;
}
- auto& sessionInfo = sessionIt->second;
- if (!CheckSession(sessionInfo, ev, partitionId)) {
- return;
- }
- sessionInfo.IsWaitingStartSessionAck = false;
+ session->IsWaitingStartSessionAck = false;
+ ProcessState();
}
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_I("TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
+ SRC_LOG_I("Received TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
Counters.SessionError++;
- ui64 partitionId = ev->Get()->Record.GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
- if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
- << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
- YQL_ENSURE(State != EState::STARTED);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ auto* session = FindSession(ev);
+ if (!session) {
return;
}
-
- auto& sessionInfo = sessionIt->second;
- if (!CheckSession(sessionInfo, ev, partitionId)) {
- return;
- }
-
NYql::TIssues issues;
IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
Stop(ev->Get()->Record.GetStatusCode(), issues);
@@ -538,28 +573,27 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_T("TEvStatistics from " << ev->Sender << ", offset " << ev->Get()->Record.GetNextMessageOffset() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
+ SRC_LOG_T("Received TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie);
Counters.Statistics++;
- ui64 partitionId = ev->Get()->Record.GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
- if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
- << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
- YQL_ENSURE(State != EState::STARTED);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ auto* session = FindSession(ev);
+ if (!session) {
return;
}
- auto& sessionInfo = sessionIt->second;
IngressStats.Bytes += ev->Get()->Record.GetReadBytes();
-
- if (!CheckSession(sessionInfo, ev, partitionId)) {
- return;
- }
-
- if (ReadyBuffer.empty()) {
- TPartitionKey partitionKey{TString{}, partitionId};
- PartitionToOffset[partitionKey] = ev->Get()->Record.GetNextMessageOffset();
+ for (auto partition : ev->Get()->Record.GetPartition()) {
+ ui64 partitionId = partition.GetPartitionId();
+ auto& nextOffset = NextOffsetFromRD[partitionId];
+ if (!nextOffset) {
+ nextOffset = partition.GetNextMessageOffset();
+ } else {
+ nextOffset = std::max(*nextOffset, partition.GetNextMessageOffset());
+ }
+ SRC_LOG_T("NextOffsetFromRD [" << partitionId << "]= " << nextOffset);
+ if (ReadyBuffer.empty()) {
+ TPartitionKey partitionKey{TString{}, partitionId};
+ PartitionToOffset[partitionKey] = *nextOffset;
+ }
}
}
@@ -571,35 +605,36 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest:
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_T("TEvNewDataArrived from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
+ SRC_LOG_T("Received TEvNewDataArrived from " << ev->Sender << ", partition " << ev->Get()->Record.GetPartitionId() << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie);
Counters.NewDataArrived++;
-
- ui64 partitionId = ev->Get()->Record.GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
- if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
- << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
- YQL_ENSURE(State != EState::STARTED);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+
+ auto* session = FindSession(ev);
+ if (!session) {
return;
}
-
- auto& sessionInfo = sessionIt->second;
- if (!CheckSession(sessionInfo, ev, partitionId)) {
+ auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId());
+ if (partitionIt == session->Partitions.end()) {
+ SRC_LOG_E("TEvNewDataArrived: wrong partition id " << ev->Get()->Record.GetPartitionId());
+ Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << ev->Get()->Record.GetPartitionId())});
return;
}
- sessionInfo.HasPendingData = true;
- TrySendGetNextBatch(sessionInfo);
+ partitionIt->second.HasPendingData = true;
+ TrySendGetNextBatch(*session);
}
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
- SRC_LOG_D("TEvRetry");
+ SRC_LOG_T("TEvRetry, EventQueueId " << ev->Get()->EventQueueId);
Counters.Retry++;
- ui64 partitionId = ev->Get()->EventQueueId;
- auto sessionIt = Sessions.find(partitionId);
+ auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId);
+ if (readActorIt == ReadActorByEventQueueId.end()) {
+ SRC_LOG_D("Ignore TEvRetry, wrong EventQueueId " << ev->Get()->EventQueueId);
+ return;
+ }
+
+ auto sessionIt = Sessions.find(readActorIt->second);
if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvRetry");
+ SRC_LOG_D("Ignore TEvRetry, wrong read actor id " << readActorIt->second);
return;
}
sessionIt->second.EventsQueue.Retry();
@@ -608,35 +643,37 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::T
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
SRC_LOG_T("TEvRetryQueuePrivate::TEvEvHeartbeat");
Counters.PrivateHeartbeat++;
- ui64 partitionId = ev->Get()->EventQueueId;
+ auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId);
+ if (readActorIt == ReadActorByEventQueueId.end()) {
+ SRC_LOG_D("Ignore TEvRetry, wrong EventQueueId " << ev->Get()->EventQueueId);
+ return;
+ }
- auto sessionIt = Sessions.find(partitionId);
+ auto sessionIt = Sessions.find(readActorIt->second);
if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvPing");
+ SRC_LOG_D("Ignore TEvRetry, wrong read actor id " << readActorIt->second);
return;
}
auto& sessionInfo = sessionIt->second;
bool needSend = sessionInfo.EventsQueue.Heartbeat();
if (needSend) {
SRC_LOG_T("Send TEvEvHeartbeat");
- Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId), sessionInfo.Generation);
+ Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(), sessionInfo.Generation);
}
}
void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
SRC_LOG_T("Received TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie);
Counters.Heartbeat++;
-
- ui64 partitionId = ev->Get()->Record.GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
+ auto sessionIt = Sessions.find(ev->Sender);
if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", PartitionId " << partitionId << ", generation " << ev->Cookie);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie);
+ SendNoSession(ev->Sender, ev->Cookie);
return;
}
if (ev->Cookie != sessionIt->second.Generation) {
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ SendNoSession(ev->Sender, ev->Cookie);
}
}
@@ -657,18 +694,25 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr
CoordinatorActorId = ev->Get()->CoordinatorActorId;
ReInit("Coordinator is changed");
- ProcessState();
+ Sleep();
+}
+
+void TDqPqRdReadActor::Sleep() {
+ if (ProcessStateScheduled) {
+ return;
+ }
+ ProcessStateScheduled = true;
+ Schedule(TDuration::Seconds(SleepPeriodSec), new TEvPrivate::TEvProcessState());
}
void TDqPqRdReadActor::ReInit(const TString& reason) {
SRC_LOG_I("ReInit state, reason " << reason);
- StopSessions();
- Sessions.clear();
- State = EState::INIT;
+ Metrics.ReInit->Inc();
+
+ State = EState::WAIT_COORDINATOR_ID;
if (!ReadyBuffer.empty()) {
NotifyCA();
}
- PrintInternalState();
}
void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issues) {
@@ -677,28 +721,18 @@ void TDqPqRdReadActor::Stop(NDqProto::StatusIds::StatusCode status, TIssues issu
}
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev) {
- SRC_LOG_I("TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie);
+ SRC_LOG_I("Received TEvCoordinatorResult from " << ev->Sender.ToString() << ", cookie " << ev->Cookie);
Counters.CoordinatorChanged++;
if (ev->Cookie != CoordinatorRequestCookie) {
SRC_LOG_W("Ignore TEvCoordinatorResult. wrong cookie");
return;
}
- if (State != EState::WAIT_PARTITIONS_ADDRES) {
- SRC_LOG_W("Ignore TEvCoordinatorResult. wrong state " << static_cast<ui64>(EState::WAIT_PARTITIONS_ADDRES));
- return;
- }
+ LastReceivedPartitionDistribution.clear();
+ TMap<NActors::TActorId, TSet<ui32>> distribution;
for (auto& p : ev->Get()->Record.GetPartitions()) {
TActorId rowDispatcherActorId = ActorIdFromProto(p.GetActorId());
- for (auto partitionId : p.GetPartitionId()) {
- if (Sessions.contains(partitionId)) {
- Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue("Session already exists")});
- return;
- }
- SRC_LOG_I("Create session to RD (" << rowDispatcherActorId << "), partitionId " << partitionId);
- Sessions.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(partitionId),
- std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, partitionId, partitionId, ++NextGeneration));
+ for (auto partitionId : p.GetPartitionIds()) {
+ LastReceivedPartitionDistribution[rowDispatcherActorId].insert(partitionId);
}
}
ProcessState();
@@ -707,7 +741,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr&
void TDqPqRdReadActor::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
SRC_LOG_D("EvNodeConnected " << ev->Get()->NodeId);
Counters.NodeConnected++;
- for (auto& [partitionId, sessionInfo] : Sessions) {
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
sessionInfo.EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
}
}
@@ -715,7 +749,7 @@ void TDqPqRdReadActor::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr&
void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
SRC_LOG_D("TEvNodeDisconnected, node id " << ev->Get()->NodeId);
Counters.NodeDisconnected++;
- for (auto& [partitionId, sessionInfo] : Sessions) {
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
sessionInfo.EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
}
// In case of row dispatcher disconnection: wait connected or SessionClosed(). TODO: Stop actor after timeout.
@@ -724,63 +758,70 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::
}
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << " generation " << ev->Cookie);
+ SRC_LOG_D("Received TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << ", reason " << ev->Get()->Reason);
Counters.Undelivered++;
- for (auto& [partitionId, sessionInfo] : Sessions) {
- if (ev->Cookie != sessionInfo.Generation) {
- continue;
- }
+
+ auto sessionIt = Sessions.find(ev->Sender);
+ if (sessionIt != Sessions.end()) {
+ auto& sessionInfo = sessionIt->second;
if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
- ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId);
- break;
+ if (sessionInfo.Generation == ev->Cookie) {
+ SRC_LOG_D("Erase session to " << ev->Sender.ToString());
+ Sessions.erase(ev->Sender);
+ ReadActorByEventQueueId.erase(sessionInfo.EventQueueId);
+ ReInit("Reset session state");
+ }
}
}
if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) {
ReInit("TEvUndelivered to coordinator");
+ Sleep();
+ return;
}
+ ProcessState();
}
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_T("TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
+ SRC_LOG_T("Received TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie);
Counters.MessageBatch++;
- ui64 partitionId = ev->Get()->Record.GetPartitionId();
- auto sessionIt = Sessions.find(partitionId);
- if (sessionIt == Sessions.end()) {
- SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
- << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
- YQL_ENSURE(State != EState::STARTED);
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
+ auto* session = FindSession(ev);
+ if (!session) {
return;
}
-
- Metrics.InFlyGetNextBatch->Set(0);
- auto& sessionInfo = sessionIt->second;
- if (!CheckSession(sessionInfo, ev, partitionId)) {
+ auto partitionId = ev->Get()->Record.GetPartitionId();
+ auto partitionIt = session->Partitions.find(partitionId);
+ if (partitionIt == session->Partitions.end()) {
+ SRC_LOG_E("TEvMessageBatch: wrong partition id " << partitionId);
+ Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << partitionId)});
return;
}
+ auto& partirtion = partitionIt->second;
+ Metrics.InFlyGetNextBatch->Set(0);
ReadyBuffer.emplace(partitionId, ev->Get()->Record.MessagesSize());
TReadyBatch& activeBatch = ReadyBuffer.back();
+ auto& nextOffset = NextOffsetFromRD[partitionId];
+
ui64 bytes = 0;
for (const auto& message : ev->Get()->Record.GetMessages()) {
const auto& offsets = message.GetOffsets();
if (offsets.empty()) {
- Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue("Got unexpected empty batch from row dispatcher")});
+ Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "Got unexpected empty batch from row dispatcher")});
return;
}
activeBatch.Data.emplace_back(ev->Get()->GetPayload(message.GetPayloadId()));
bytes += activeBatch.Data.back().GetSize();
- sessionInfo.NextOffset = *offsets.rbegin() + 1;
- SRC_LOG_T("TEvMessageBatch NextOffset " << sessionInfo.NextOffset);
+ nextOffset = *offsets.rbegin() + 1;
+ SRC_LOG_T("TEvMessageBatch NextOffset " << nextOffset);
}
activeBatch.UsedSpace = bytes;
ReadyBufferSizeBytes += bytes;
activeBatch.NextOffset = ev->Get()->Record.GetNextMessageOffset();
- sessionInfo.IsWaitingMessageBatch = false;
+ partirtion.IsWaitingMessageBatch = false;
NotifyCA();
}
@@ -836,56 +877,80 @@ TString TDqPqRdReadActor::GetInternalState() {
<< " Heartbeat " << Counters.Heartbeat << " PrintState " << Counters.PrintState << " ProcessState " << Counters.ProcessState
<< " NotifyCA " << Counters.NotifyCA << "\n";
- for (auto& [partitionId, sessionInfo] : Sessions) {
- str << " partId " << partitionId << " status " << static_cast<ui64>(sessionInfo.Status)
- << " next offset " << sessionInfo.NextOffset
- << " is waiting ack " << sessionInfo.IsWaitingStartSessionAck << " is waiting batch " << sessionInfo.IsWaitingMessageBatch
- << " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " ";
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
+ str << " " << rowDispatcherActorId << " status " << static_cast<ui64>(sessionInfo.Status)
+ << " is waiting ack " << sessionInfo.IsWaitingStartSessionAck << " connection id " << sessionInfo.Generation << " ";
sessionInfo.EventsQueue.PrintInternalState(str);
+ for (const auto& [partitionId, partition] : sessionInfo.Partitions) {
+ const auto offsetIt = NextOffsetFromRD.find(partitionId);
+ str << " partId " << partitionId
+ << " next offset " << ((offsetIt != NextOffsetFromRD.end()) ? ToString(offsetIt->second) : TString("<empty>"))
+ << " is waiting batch " << partition.IsWaitingMessageBatch
+ << " has pending data " << partition.HasPendingData << "\n";
+ }
+ str << "\n";
}
return str.Str();
}
void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {
+ ProcessStateScheduled = false;
Counters.ProcessState++;
- Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState());
ProcessState();
}
-void TDqPqRdReadActor::TrySendGetNextBatch(SessionInfo& sessionInfo) {
- if (!sessionInfo.HasPendingData) {
- return;
- }
+void TDqPqRdReadActor::TrySendGetNextBatch(TSession& sessionInfo) {
if (ReadyBufferSizeBytes > MaxBufferSize) {
return;
}
- Metrics.InFlyGetNextBatch->Inc();
- auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
- sessionInfo.HasPendingData = false;
- sessionInfo.IsWaitingMessageBatch = true;
- event->Record.SetPartitionId(sessionInfo.PartitionId);
- sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation);
+ for (auto& [partitionId, partition] : sessionInfo.Partitions) {
+ if (!partition.HasPendingData) {
+ continue;
+ }
+ Metrics.InFlyGetNextBatch->Inc();
+ auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
+ partition.HasPendingData = false;
+ partition.IsWaitingMessageBatch = true;
+ event->Record.SetPartitionId(partitionId);
+ sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation);
+ }
}
template <class TEventPtr>
-bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId) {
+TDqPqRdReadActor::TSession* TDqPqRdReadActor::FindSession(const TEventPtr& ev) {
+ auto sessionIt = Sessions.find(ev->Sender);
+ if (sessionIt == Sessions.end()) {
+ SRC_LOG_W("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender);
+ SendNoSession(ev->Sender, ev->Cookie);
+ return nullptr;
+ }
+ auto& session = sessionIt->second;
+
if (ev->Cookie != session.Generation) {
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession");
- SendNoSession(ev->Sender, partitionId, ev->Cookie);
- return false;
+ SendNoSession(ev->Sender, ev->Cookie);
+ return nullptr;
}
if (!session.EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
- SRC_LOG_W("Wrong seq num ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString());
- return false;
+ SRC_LOG_W("Ignore " << typeid(TEventPtr).name() << " from " << ev->Sender << ", wrong seq num, seqNo " << meta.GetSeqNo());
+ return nullptr;
+ }
+
+ auto expectedStatus = TSession::ESessionStatus::STARTED;
+ if constexpr (std::is_same_v<TEventPtr, NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr>) {
+ expectedStatus = TSession::ESessionStatus::WAIT_START_SESSION_ACK;
}
- return true;
+
+ if (session.Status != expectedStatus) {
+ SRC_LOG_E("Wrong " << typeid(TEventPtr).name() << " from " << ev->Sender << " session status " << static_cast<ui64>(session.Status) << " expected " << static_cast<ui64>(expectedStatus));
+ return nullptr;
+ }
+ return &session;
}
-void TDqPqRdReadActor::SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
+void TDqPqRdReadActor::SendNoSession(const NActors::TActorId& recipient, ui64 cookie) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>();
- //*event->Record.MutableSource() = SourceParams;
- event->Record.SetPartitionId(partitionId);
Send(recipient, event.release(), 0, cookie);
}
@@ -896,6 +961,38 @@ void TDqPqRdReadActor::NotifyCA() {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
+void TDqPqRdReadActor::UpdateSessions() {
+ SRC_LOG_I("UpdateSessions, Sessions size " << Sessions.size());
+
+ if (LastUsedPartitionDistribution != LastReceivedPartitionDistribution) {
+ SRC_LOG_I("Distribution is changed, remove sessions");
+ for (auto& [rowDispatcherActorId, sessionInfo] : Sessions) {
+ StopSession(sessionInfo);
+ ReadActorByEventQueueId.erase(sessionInfo.EventQueueId);
+ }
+ Sessions.clear();
+ }
+
+ for (const auto& [rowDispatcherActorId, partitions] : LastReceivedPartitionDistribution) {
+ if (Sessions.contains(rowDispatcherActorId)) {
+ continue;
+ }
+
+ SRC_LOG_I("Create session to " << rowDispatcherActorId);
+ auto queueId = ++NextEventQueueId;
+ Sessions.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(rowDispatcherActorId),
+ std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, queueId, ++NextGeneration));
+ auto& session = Sessions.at(rowDispatcherActorId);
+ for (auto partitionId : partitions) {
+ session.Partitions[partitionId];
+ }
+ ReadActorByEventQueueId[queueId] = rowDispatcherActorId;
+ }
+ LastUsedPartitionDistribution = LastReceivedPartitionDistribution;
+}
+
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
const TTypeEnvironment& typeEnv,
NPq::NProto::TDqPqTopicSource&& settings,
diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
index 2784a4de697..5f50ea8e55b 100644
--- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
+++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
@@ -16,7 +16,8 @@
namespace NYql::NDq {
-const ui64 PartitionId = 666;
+const ui64 PartitionId1 = 666;
+const ui64 PartitionId2 = 667;
struct TFixture : public TPqIoTestFixture {
TFixture() {
@@ -29,13 +30,14 @@ struct TFixture : public TPqIoTestFixture {
void InitRdSource(
const NYql::NPq::NProto::TDqPqTopicSource& settings,
- i64 freeSpace = 1_MB)
+ i64 freeSpace = 1_MB,
+ ui64 partitionCount = 1)
{
CaSetup->Execute([&](TFakeActor& actor) {
NPq::NProto::TDqReadTaskParams params;
auto* partitioninigParams = params.MutablePartitioningParams();
- partitioninigParams->SetTopicPartitionsCount(1);
- partitioninigParams->SetEachTopicPartitionGroupId(PartitionId);
+ partitioninigParams->SetTopicPartitionsCount(partitionCount);
+ partitioninigParams->SetEachTopicPartitionGroupId(PartitionId1);
partitioninigParams->SetDqPartitionsCount(1);
TString serializedParams;
@@ -76,17 +78,21 @@ struct TFixture : public TPqIoTestFixture {
return eventHolder;
}
- void ExpectStartSession(ui64 expectedOffset, NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) {
+ void ExpectStartSession(const TMap<ui32, ui64>& expectedOffsets, NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) {
auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStartSession>(rowDispatcherId, TDuration::Seconds(5));
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(eventHolder->Get()->Record.GetOffset() == expectedOffset);
- UNIT_ASSERT(eventHolder->Cookie == expectedGeneration);
+ TMap<ui32, ui64> offsets;
+ for (auto p : eventHolder->Get()->Record.GetOffsets()) {
+ offsets[p.GetPartitionId()] = p.GetOffset();
+ }
+ UNIT_ASSERT_EQUAL(offsets, expectedOffsets);
+ UNIT_ASSERT_EQUAL(eventHolder->Cookie, expectedGeneration);
}
void ExpectStopSession(NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) {
auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStopSession>(rowDispatcherId, TDuration::Seconds(5));
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(eventHolder->Cookie == expectedGeneration);
+ UNIT_ASSERT_EQUAL(eventHolder->Cookie, expectedGeneration);
}
void ExpectNoSession(NActors::TActorId rowDispatcherId, ui64 expectedGeneration = 1) {
@@ -95,10 +101,10 @@ struct TFixture : public TPqIoTestFixture {
UNIT_ASSERT(eventHolder->Cookie == expectedGeneration);
}
- void ExpectGetNextBatch(NActors::TActorId rowDispatcherId) {
+ void ExpectGetNextBatch(NActors::TActorId rowDispatcherId, ui64 partitionId = PartitionId1) {
auto eventHolder = CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvGetNextBatch>(rowDispatcherId, TDuration::Seconds(5));
UNIT_ASSERT(eventHolder.Get() != nullptr);
- UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == PartitionId);
+ UNIT_ASSERT_EQUAL(eventHolder->Get()->Record.GetPartitionId(), partitionId);
}
void MockCoordinatorChanged(NActors::TActorId coordinatorId) {
@@ -108,20 +114,23 @@ struct TFixture : public TPqIoTestFixture {
});
}
- void MockCoordinatorResult(NActors::TActorId rowDispatcherId, ui64 cookie = 0) {
+ void MockCoordinatorResult(const TMap<NActors::TActorId, ui64> result, ui64 cookie = 0) {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorResult();
- auto* partitions = event->Record.AddPartitions();
- partitions->AddPartitionId(PartitionId);
- ActorIdToProto(rowDispatcherId, partitions->MutableActorId());
+
+ for (const auto& [rowDispatcherId, partitionId] : result) {
+ auto* partitions = event->Record.AddPartitions();
+ partitions->AddPartitionIds(partitionId);
+ ActorIdToProto(rowDispatcherId, partitions->MutableActorId());
+ }
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, Coordinator1Id, event, 0, cookie));
});
}
- void MockAck(NActors::TActorId rowDispatcherId, ui64 generation = 1) {
+ void MockAck(NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) {
CaSetup->Execute([&](TFakeActor& actor) {
NFq::NRowDispatcherProto::TEvStartSession proto;
- proto.SetPartitionId(PartitionId);
+ proto.AddPartitionIds(partitionId);
auto event = new NFq::TEvRowDispatcher::TEvStartSessionAck(proto);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
});
@@ -129,15 +138,15 @@ struct TFixture : public TPqIoTestFixture {
void MockHeartbeat(NActors::TActorId rowDispatcherId, ui64 generation = 1) {
CaSetup->Execute([&](TFakeActor& actor) {
- auto event = new NFq::TEvRowDispatcher::TEvHeartbeat(PartitionId);
+ auto event = new NFq::TEvRowDispatcher::TEvHeartbeat();
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
});
}
- void MockNewDataArrived(NActors::TActorId rowDispatcherId, ui64 generation = 1) {
+ void MockNewDataArrived(NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NFq::TEvRowDispatcher::TEvNewDataArrived();
- event->Record.SetPartitionId(PartitionId);
+ event->Record.SetPartitionId(partitionId);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
});
}
@@ -161,7 +170,7 @@ struct TFixture : public TPqIoTestFixture {
}
// Supported schema (Uint64, String)
- void MockMessageBatch(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, ui64 generation = 1) {
+ void MockMessageBatch(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, ui64 generation = 1, ui64 partitionId = PartitionId1) {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NFq::TEvRowDispatcher::TEvMessageBatch();
for (const auto& item : messages) {
@@ -170,7 +179,7 @@ struct TFixture : public TPqIoTestFixture {
message.AddOffsets(offset++);
*event->Record.AddMessages() = message;
}
- event->Record.SetPartitionId(PartitionId);
+ event->Record.SetPartitionId(partitionId);
event->Record.SetNextMessageOffset(offset);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
});
@@ -181,11 +190,20 @@ struct TFixture : public TPqIoTestFixture {
auto event = new NFq::TEvRowDispatcher::TEvSessionError();
event->Record.SetStatusCode(::NYql::NDqProto::StatusIds::BAD_REQUEST);
IssueToMessage(TIssue("A problem has been detected and session has been shut down to prevent damage your life"), event->Record.AddIssues());
- event->Record.SetPartitionId(PartitionId);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, 1));
});
}
+ void MockStatistics(NActors::TActorId rowDispatcherId, ui64 nextOffset, ui64 generation, ui64 partitionId) {
+ CaSetup->Execute([&](TFakeActor& actor) {
+ auto event = new NFq::TEvRowDispatcher::TEvStatistics();
+ auto* partitionsProto = event->Record.AddPartition();
+ partitionsProto->SetPartitionId(partitionId);
+ partitionsProto->SetNextMessageOffset(nextOffset);
+ CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
+ });
+ }
+
template<typename T>
void AssertDataWithWatermarks(
const std::vector<std::variant<T, TInstant>>& actual,
@@ -227,35 +245,37 @@ struct TFixture : public TPqIoTestFixture {
});
}
- void MockUndelivered(ui64 generation = 0, NActors::TEvents::TEvUndelivered::EReason reason = NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
+ void MockUndelivered(NActors::TActorId rowDispatcherId, ui64 generation = 0, NActors::TEvents::TEvUndelivered::EReason reason = NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NActors::TEvents::TEvUndelivered(0, reason);
- CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, generation));
+ CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, rowDispatcherId, event, 0, generation));
});
}
- void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB) {
- InitRdSource(settings, freeSpace);
+ void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB, ui64 partitionCount = 1) {
+ InitRdSource(settings, freeSpace, partitionCount);
SourceRead<std::pair<ui64, TString>>(UVPairParser);
ExpectCoordinatorChangesSubscribe();
MockCoordinatorChanged(Coordinator1Id);
- auto req =ExpectCoordinatorRequest(Coordinator1Id);
+ auto req = ExpectCoordinatorRequest(Coordinator1Id);
- MockCoordinatorResult(RowDispatcher1, req->Cookie);
- ExpectStartSession(0, RowDispatcher1);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie);
+ ExpectStartSession({}, RowDispatcher1);
MockAck(RowDispatcher1);
}
void ProcessSomeMessages(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId,
- std::function<std::vector<std::pair<ui64, TString>>(const NUdf::TUnboxedValue&)> uvParser = UVPairParser, ui64 generation = 1) {
- MockNewDataArrived(rowDispatcherId, generation);
- ExpectGetNextBatch(rowDispatcherId);
-
- MockMessageBatch(offset, messages, rowDispatcherId, generation);
+ std::function<std::vector<std::pair<ui64, TString>>(const NUdf::TUnboxedValue&)> uvParser = UVPairParser, ui64 generation = 1,
+ ui64 partitionId = PartitionId1, bool readedByCA = true) {
+ MockNewDataArrived(rowDispatcherId, generation, partitionId);
+ ExpectGetNextBatch(rowDispatcherId, partitionId);
- auto result = SourceReadDataUntil<std::pair<ui64, TString>>(uvParser, messages.size());
- AssertDataWithWatermarks(result, messages, {});
+ MockMessageBatch(offset, messages, rowDispatcherId, generation, partitionId);
+ if (readedByCA) {
+ auto result = SourceReadDataUntil<std::pair<ui64, TString>>(uvParser, messages.size());
+ AssertDataWithWatermarks(result, messages, {});
+ }
}
const std::pair<ui64, TString> Message1 = {100, "value1"};
@@ -273,7 +293,7 @@ struct TFixture : public TPqIoTestFixture {
};
Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
- Y_UNIT_TEST_F(TestReadFromTopic, TFixture) {
+ Y_UNIT_TEST_F(TestReadFromTopic2, TFixture) {
StartSession(Source1);
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
}
@@ -281,7 +301,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
Y_UNIT_TEST_F(IgnoreUndeliveredWithWrongGeneration, TFixture) {
StartSession(Source1);
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
- MockUndelivered(NActors::TEvents::TEvUndelivered::Disconnected);
+ MockUndelivered(RowDispatcher1, NActors::TEvents::TEvUndelivered::Disconnected);
ProcessSomeMessages(2, {Message3}, RowDispatcher1);
}
@@ -338,7 +358,6 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
{
TFixture f;
f.InitRdSource(f.Source1);
- f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
f.LoadSource(state);
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
f.ExpectCoordinatorChangesSubscribe();
@@ -346,8 +365,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
f.MockCoordinatorChanged(f.Coordinator1Id);
auto req = f.ExpectCoordinatorRequest(f.Coordinator1Id);
- f.MockCoordinatorResult(f.RowDispatcher1, req->Cookie);
- f.ExpectStartSession(2, f.RowDispatcher1);
+ f.MockCoordinatorResult({{f.RowDispatcher1, PartitionId1}}, req->Cookie);
+ f.ExpectStartSession({{PartitionId1, 2}}, f.RowDispatcher1);
f.MockAck(f.RowDispatcher1);
f.ProcessSomeMessages(2, {f.Message3}, f.RowDispatcher1); // offsets: 2
@@ -358,7 +377,6 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
{
TFixture f;
f.InitRdSource(f.Source1);
- f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
f.LoadSource(state);
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
f.ExpectCoordinatorChangesSubscribe();
@@ -366,8 +384,8 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
f.MockCoordinatorChanged(f.Coordinator1Id);
auto req = f.ExpectCoordinatorRequest(f.Coordinator1Id);
- f.MockCoordinatorResult(f.RowDispatcher1, req->Cookie);
- f.ExpectStartSession(3, f.RowDispatcher1);
+ f.MockCoordinatorResult({{f.RowDispatcher1, PartitionId1}}, req->Cookie);
+ f.ExpectStartSession({{PartitionId1, 3}}, f.RowDispatcher1);
f.MockAck(f.RowDispatcher1);
f.ProcessSomeMessages(3, {f.Message4}, f.RowDispatcher1); // offsets: 3
@@ -381,21 +399,30 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
// change active Coordinator
MockCoordinatorChanged(Coordinator2Id);
- ExpectStopSession(RowDispatcher1);
-
- auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1);
- AssertDataWithWatermarks(result, {Message3}, {});
+ // continue use old sessions
+ MockMessageBatch(3, {Message4}, RowDispatcher1);
auto req = ExpectCoordinatorRequest(Coordinator2Id);
- MockCoordinatorResult(RowDispatcher2, req->Cookie);
- ExpectStartSession(3, RowDispatcher2, 2);
+ auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 2);
+ AssertDataWithWatermarks(result, {Message3, Message4}, {});
+
+ MockCoordinatorResult({{RowDispatcher2, PartitionId1}}, req->Cookie); // change distribution
+ ExpectStopSession(RowDispatcher1);
+
+ ExpectStartSession({{PartitionId1, 4}}, RowDispatcher2, 2);
MockAck(RowDispatcher2, 2);
- ProcessSomeMessages(3, {Message4}, RowDispatcher2, UVPairParser, 2);
+ ProcessSomeMessages(4, {Message4}, RowDispatcher2, UVPairParser, 2);
MockHeartbeat(RowDispatcher1, 1); // old generation
ExpectNoSession(RowDispatcher1, 1);
+
+ // change active Coordinator
+ MockCoordinatorChanged(Coordinator1Id);
+ req = ExpectCoordinatorRequest(Coordinator1Id);
+ MockCoordinatorResult({{RowDispatcher2, PartitionId1}}, req->Cookie); // distribution is not changed
+ ProcessSomeMessages(5, {Message2}, RowDispatcher2, UVPairParser, 2);
}
Y_UNIT_TEST_F(Backpressure, TFixture) {
@@ -406,7 +433,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
MockNewDataArrived(RowDispatcher1);
ExpectGetNextBatch(RowDispatcher1);
- MockMessageBatch(0, {message, message, message}, RowDispatcher1);
+ MockMessageBatch(1, {message, message, message}, RowDispatcher1);
MockNewDataArrived(RowDispatcher1);
ASSERT_THROW(
@@ -417,26 +444,64 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
AssertDataWithWatermarks(result, {message, message, message}, {});
ExpectGetNextBatch(RowDispatcher1);
- MockMessageBatch(3, {Message1}, RowDispatcher1);
+ MockMessageBatch(4, {Message1}, RowDispatcher1);
result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1);
AssertDataWithWatermarks(result, {Message1}, {});
}
- Y_UNIT_TEST_F(RowDispatcherIsRestarted, TFixture) {
+ Y_UNIT_TEST_F(RowDispatcherIsRestarted2, TFixture) {
StartSession(Source1);
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
MockDisconnected();
MockConnected();
- MockUndelivered(1);
+ MockUndelivered(RowDispatcher1, 1);
auto req = ExpectCoordinatorRequest(Coordinator1Id);
- MockCoordinatorResult(RowDispatcher1, req->Cookie);
- ExpectStartSession(2, RowDispatcher1, 2);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie);
+ ExpectStartSession({{PartitionId1, 2}}, RowDispatcher1, 2);
MockAck(RowDispatcher1, 2);
ProcessSomeMessages(2, {Message3}, RowDispatcher1, UVPairParser, 2);
}
+ Y_UNIT_TEST_F(TwoPartitionsRowDispatcherIsRestarted, TFixture) {
+ InitRdSource(Source1, 1_MB, PartitionId2 + 1);
+ SourceRead<TString>(UVParser);
+ ExpectCoordinatorChangesSubscribe();
+ MockCoordinatorChanged(Coordinator1Id);
+ auto req = ExpectCoordinatorRequest(Coordinator1Id);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}, {RowDispatcher2, PartitionId2}}, req->Cookie);
+ ExpectStartSession({}, RowDispatcher1, 1);
+ ExpectStartSession({}, RowDispatcher2, 2);
+ MockAck(RowDispatcher1, 1, PartitionId1);
+ MockAck(RowDispatcher2, 2, PartitionId2);
+
+ ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1, UVPairParser, 1, PartitionId1);
+ ProcessSomeMessages(0, {Message3}, RowDispatcher2, UVPairParser, 2, PartitionId2, false); // not read by CA
+ MockStatistics(RowDispatcher2, 10, 2, PartitionId2);
+
+ // Restart node 2 (RowDispatcher2)
+ MockDisconnected();
+ MockConnected();
+ MockUndelivered(RowDispatcher2, 2);
+
+ // session1 is still working
+ ProcessSomeMessages(2, {Message4}, RowDispatcher1, UVPairParser, 1, PartitionId1, false);
+
+ // Reinit session to RowDispatcher2
+ auto req2 = ExpectCoordinatorRequest(Coordinator1Id);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}, {RowDispatcher2, PartitionId2}}, req2->Cookie);
+ ExpectStartSession({{PartitionId2, 10}}, RowDispatcher2, 3);
+ MockAck(RowDispatcher2, 3, PartitionId2);
+
+ ProcessSomeMessages(3, {Message4}, RowDispatcher1, UVPairParser, 1, PartitionId1, false);
+ ProcessSomeMessages(10, {Message4}, RowDispatcher2, UVPairParser, 3, PartitionId2, false);
+
+ std::vector<std::pair<ui64, TString>> expected{Message3, Message4, Message4, Message4};
+ auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, expected.size());
+ AssertDataWithWatermarks(result, expected, {});
+ }
+
Y_UNIT_TEST_F(IgnoreMessageIfNoSessions, TFixture) {
StartSession(Source1);
MockCoordinatorChanged(Coordinator2Id);
@@ -463,12 +528,13 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
MockCoordinatorChanged(Coordinator2Id);
auto req = ExpectCoordinatorRequest(Coordinator2Id);
+ MockUndelivered(RowDispatcher1, 1);
+ auto req2 = ExpectCoordinatorRequest(Coordinator2Id);
- MockUndelivered(1);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req->Cookie);
+ MockCoordinatorResult({{RowDispatcher1, PartitionId1}}, req2->Cookie);
+ ExpectStartSession({{PartitionId1, 2}}, RowDispatcher1, 2);
- MockCoordinatorResult(RowDispatcher1, req->Cookie);
- MockCoordinatorResult(RowDispatcher1, req->Cookie);
- ExpectStartSession(2, RowDispatcher1, 2);
MockAck(RowDispatcher1);
}
}