aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_channel.h
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:39 +0300
commitf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch)
tree25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/interconnect/interconnect_channel.h
parentfccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff)
downloadydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_channel.h')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h178
1 files changed, 89 insertions, 89 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e4a0ae3cda..56d6e31ba7 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -1,46 +1,46 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_load.h>
#include <library/cpp/actors/util/rope.h>
-#include <util/generic/deque.h>
-#include <util/generic/vector.h>
-#include <util/generic/map.h>
-#include <util/stream/walk.h>
+#include <util/generic/deque.h>
+#include <util/generic/vector.h>
+#include <util/generic/map.h>
+#include <util/stream/walk.h>
#include <library/cpp/actors/wilson/wilson_event.h>
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
-
-#include "interconnect_common.h"
-#include "interconnect_counters.h"
-#include "packet.h"
-#include "event_holder_pool.h"
-
-namespace NActors {
-#pragma pack(push, 1)
+
+#include "interconnect_common.h"
+#include "interconnect_counters.h"
+#include "packet.h"
+#include "event_holder_pool.h"
+
+namespace NActors {
+#pragma pack(push, 1)
struct TChannelPart {
ui16 Channel;
ui16 Size;
-
+
static constexpr ui16 LastPartFlag = ui16(1) << 15;
-
- TString ToString() const {
- return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag)
- << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false")
- << " Size# " << Size << "}";
- }
- };
-#pragma pack(pop)
-
- struct TExSerializedEventTooLarge : std::exception {
- const ui32 Type;
-
- TExSerializedEventTooLarge(ui32 type)
- : Type(type)
- {}
+
+ TString ToString() const {
+ return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag)
+ << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false")
+ << " Size# " << Size << "}";
+ }
};
-
- class TEventOutputChannel : public TInterconnectLoggingBase {
+#pragma pack(pop)
+
+ struct TExSerializedEventTooLarge : std::exception {
+ const ui32 Type;
+
+ TExSerializedEventTooLarge(ui32 type)
+ : Type(type)
+ {}
+ };
+
+ class TEventOutputChannel : public TInterconnectLoggingBase {
public:
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
@@ -49,79 +49,79 @@ namespace NActors {
, PeerNodeId(peerNodeId)
, ChannelId(id)
, Metrics(std::move(metrics))
- , Params(std::move(params))
- , MaxSerializedEventSize(maxSerializedEventSize)
- {}
-
- ~TEventOutputChannel() {
- }
-
+ , Params(std::move(params))
+ , MaxSerializedEventSize(maxSerializedEventSize)
+ {}
+
+ ~TEventOutputChannel() {
+ }
+
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
- TEventHolder& event = Pool.Allocate(Queue);
- const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
- OutputQueueSize += bytes;
+ TEventHolder& event = Pool.Allocate(Queue);
+ const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
+ OutputQueueSize += bytes;
return std::make_pair(bytes, &event);
}
-
- void DropConfirmed(ui64 confirm);
-
- bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
-
+
+ void DropConfirmed(ui64 confirm);
+
+ bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
+
bool IsEmpty() const {
- return Queue.empty();
- }
-
- bool IsWorking() const {
- return !IsEmpty();
+ return Queue.empty();
}
-
+
+ bool IsWorking() const {
+ return !IsEmpty();
+ }
+
ui32 GetQueueSize() const {
return (ui32)Queue.size();
}
-
- ui64 GetBufferedAmountOfData() const {
+
+ ui64 GetBufferedAmountOfData() const {
return OutputQueueSize;
}
-
- void NotifyUndelivered();
-
+
+ void NotifyUndelivered();
+
TEventHolderPool& Pool;
const ui32 PeerNodeId;
const ui16 ChannelId;
std::shared_ptr<IInterconnectMetrics> Metrics;
- const TSessionParams Params;
- const ui32 MaxSerializedEventSize;
- ui64 UnaccountedTraffic = 0;
- ui64 EqualizeCounterOnPause = 0;
- ui64 WeightConsumedOnPause = 0;
-
- enum class EState {
- INITIAL,
- CHUNKER,
- BUFFER,
- DESCRIPTOR,
- };
- EState State = EState::INITIAL;
-
- static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
-
+ const TSessionParams Params;
+ const ui32 MaxSerializedEventSize;
+ ui64 UnaccountedTraffic = 0;
+ ui64 EqualizeCounterOnPause = 0;
+ ui64 WeightConsumedOnPause = 0;
+
+ enum class EState {
+ INITIAL,
+ CHUNKER,
+ BUFFER,
+ DESCRIPTOR,
+ };
+ EState State = EState::INITIAL;
+
+ static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
+
protected:
- ui64 OutputQueueSize = 0;
-
- std::list<TEventHolder> Queue;
- std::list<TEventHolder> NotYetConfirmed;
- TRope::TConstIterator Iter;
- TCoroutineChunkSerializer Chunker;
- bool ExtendedFormat = false;
-
- bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
-
- void AccountTraffic() {
- if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {
+ ui64 OutputQueueSize = 0;
+
+ std::list<TEventHolder> Queue;
+ std::list<TEventHolder> NotYetConfirmed;
+ TRope::TConstIterator Iter;
+ TCoroutineChunkSerializer Chunker;
+ bool ExtendedFormat = false;
+
+ bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+
+ void AccountTraffic() {
+ if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {
Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
- }
+ }
}
-
+
friend class TInterconnectSessionTCP;
};
-}
+}