diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:39 +0300 |
commit | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch) | |
tree | 25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/interconnect/interconnect_channel.h | |
parent | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff) | |
download | ydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_channel.h')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 178 |
1 files changed, 89 insertions, 89 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e4a0ae3cda..56d6e31ba7 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -1,46 +1,46 @@ -#pragma once - +#pragma once + #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/event_load.h> #include <library/cpp/actors/util/rope.h> -#include <util/generic/deque.h> -#include <util/generic/vector.h> -#include <util/generic/map.h> -#include <util/stream/walk.h> +#include <util/generic/deque.h> +#include <util/generic/vector.h> +#include <util/generic/map.h> +#include <util/stream/walk.h> #include <library/cpp/actors/wilson/wilson_event.h> #include <library/cpp/actors/helpers/mon_histogram_helper.h> - -#include "interconnect_common.h" -#include "interconnect_counters.h" -#include "packet.h" -#include "event_holder_pool.h" - -namespace NActors { -#pragma pack(push, 1) + +#include "interconnect_common.h" +#include "interconnect_counters.h" +#include "packet.h" +#include "event_holder_pool.h" + +namespace NActors { +#pragma pack(push, 1) struct TChannelPart { ui16 Channel; ui16 Size; - + static constexpr ui16 LastPartFlag = ui16(1) << 15; - - TString ToString() const { - return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag) - << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false") - << " Size# " << Size << "}"; - } - }; -#pragma pack(pop) - - struct TExSerializedEventTooLarge : std::exception { - const ui32 Type; - - TExSerializedEventTooLarge(ui32 type) - : Type(type) - {} + + TString ToString() const { + return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag) + << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false") + << " Size# " << Size << "}"; + } }; - - class TEventOutputChannel : public TInterconnectLoggingBase { +#pragma pack(pop) + + struct TExSerializedEventTooLarge : std::exception { + const ui32 Type; + + TExSerializedEventTooLarge(ui32 type) + : Type(type) + {} + }; + + class TEventOutputChannel : public TInterconnectLoggingBase { public: TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params) @@ -49,79 +49,79 @@ namespace NActors { , PeerNodeId(peerNodeId) , ChannelId(id) , Metrics(std::move(metrics)) - , Params(std::move(params)) - , MaxSerializedEventSize(maxSerializedEventSize) - {} - - ~TEventOutputChannel() { - } - + , Params(std::move(params)) + , MaxSerializedEventSize(maxSerializedEventSize) + {} + + ~TEventOutputChannel() { + } + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { - TEventHolder& event = Pool.Allocate(Queue); - const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); - OutputQueueSize += bytes; + TEventHolder& event = Pool.Allocate(Queue); + const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); + OutputQueueSize += bytes; return std::make_pair(bytes, &event); } - - void DropConfirmed(ui64 confirm); - - bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); - + + void DropConfirmed(ui64 confirm); + + bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); + bool IsEmpty() const { - return Queue.empty(); - } - - bool IsWorking() const { - return !IsEmpty(); + return Queue.empty(); } - + + bool IsWorking() const { + return !IsEmpty(); + } + ui32 GetQueueSize() const { return (ui32)Queue.size(); } - - ui64 GetBufferedAmountOfData() const { + + ui64 GetBufferedAmountOfData() const { return OutputQueueSize; } - - void NotifyUndelivered(); - + + void NotifyUndelivered(); + TEventHolderPool& Pool; const ui32 PeerNodeId; const ui16 ChannelId; std::shared_ptr<IInterconnectMetrics> Metrics; - const TSessionParams Params; - const ui32 MaxSerializedEventSize; - ui64 UnaccountedTraffic = 0; - ui64 EqualizeCounterOnPause = 0; - ui64 WeightConsumedOnPause = 0; - - enum class EState { - INITIAL, - CHUNKER, - BUFFER, - DESCRIPTOR, - }; - EState State = EState::INITIAL; - - static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); - + const TSessionParams Params; + const ui32 MaxSerializedEventSize; + ui64 UnaccountedTraffic = 0; + ui64 EqualizeCounterOnPause = 0; + ui64 WeightConsumedOnPause = 0; + + enum class EState { + INITIAL, + CHUNKER, + BUFFER, + DESCRIPTOR, + }; + EState State = EState::INITIAL; + + static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); + protected: - ui64 OutputQueueSize = 0; - - std::list<TEventHolder> Queue; - std::list<TEventHolder> NotYetConfirmed; - TRope::TConstIterator Iter; - TCoroutineChunkSerializer Chunker; - bool ExtendedFormat = false; - - bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - - void AccountTraffic() { - if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { + ui64 OutputQueueSize = 0; + + std::list<TEventHolder> Queue; + std::list<TEventHolder> NotYetConfirmed; + TRope::TConstIterator Iter; + TCoroutineChunkSerializer Chunker; + bool ExtendedFormat = false; + + bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + + void AccountTraffic() { + if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { Metrics->UpdateOutputChannelTraffic(ChannelId, amount); - } + } } - + friend class TInterconnectSessionTCP; }; -} +} |