aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/packet.h
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:39 +0300
commitf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch)
tree25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/interconnect/packet.h
parentfccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff)
downloadydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/packet.h')
-rw-r--r--library/cpp/actors/interconnect/packet.h596
1 files changed, 298 insertions, 298 deletions
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f..52267bcc2e 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -1,324 +1,324 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/event_load.h>
-#include <library/cpp/actors/core/events.h>
-#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/prof/tag.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include <library/cpp/lwtrace/shuttle.h>
-#include <util/generic/string.h>
-#include <util/generic/list.h>
-
-#ifndef FORCE_EVENT_CHECKSUM
-#define FORCE_EVENT_CHECKSUM 0
-#endif
-
+#include <util/generic/string.h>
+#include <util/generic/list.h>
+
+#ifndef FORCE_EVENT_CHECKSUM
+#define FORCE_EVENT_CHECKSUM 0
+#endif
+
using NActors::IEventBase;
using NActors::IEventHandle;
using NActors::TActorId;
using NActors::TConstIoVec;
-using NActors::TEventSerializedData;
-
-Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
- if constexpr (NSan::MSanIsOn()) {
- const char *begin = static_cast<const char*>(data);
- const char *end = begin + len;
- begin -= reinterpret_cast<uintptr_t>(begin) & 15;
- end += -reinterpret_cast<uintptr_t>(end) & 15;
- NSan::Unpoison(begin, end - begin);
- }
- return Crc32cExtend(checksum, data, len);
-}
-
-struct TSessionParams {
- bool Encryption = {};
- bool UseModernFrame = {};
- bool AuthOnly = {};
- TString AuthCN;
- NActors::TScopeId PeerScopeId;
-};
-
-struct TTcpPacketHeader_v1 {
- ui32 HeaderCRC32;
- ui32 PayloadCRC32;
- ui64 Confirm;
- ui64 Serial;
- ui64 DataSize;
-
- inline bool Check() const {
- ui32 actual = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
- return actual == HeaderCRC32;
- }
-
- inline void Sign() {
- HeaderCRC32 = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
- }
-
- TString ToString() const {
- return Sprintf("{Confirm# %" PRIu64 " Serial# %" PRIu64 " DataSize# %" PRIu64 "}", Confirm, Serial, DataSize);
- }
-};
-
-#pragma pack(push, 1)
-struct TTcpPacketHeader_v2 {
- ui64 Confirm;
- ui64 Serial;
- ui32 Checksum; // for the whole frame
- ui16 PayloadLength;
-};
-#pragma pack(pop)
-
-union TTcpPacketBuf {
- static constexpr ui64 PingRequestMask = 0x8000000000000000ULL;
- static constexpr ui64 PingResponseMask = 0x4000000000000000ULL;
- static constexpr ui64 ClockMask = 0x2000000000000000ULL;
-
- static constexpr size_t PacketDataLen = 4096 * 2 - 96 - Max(sizeof(TTcpPacketHeader_v1), sizeof(TTcpPacketHeader_v2));
- struct {
- TTcpPacketHeader_v1 Header;
- char Data[PacketDataLen];
- } v1;
- struct {
- TTcpPacketHeader_v2 Header;
- char Data[PacketDataLen];
- } v2;
-};
-
-#pragma pack(push, 1)
-struct TEventDescr {
- ui32 Type;
- ui32 Flags;
+using NActors::TEventSerializedData;
+
+Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
+ if constexpr (NSan::MSanIsOn()) {
+ const char *begin = static_cast<const char*>(data);
+ const char *end = begin + len;
+ begin -= reinterpret_cast<uintptr_t>(begin) & 15;
+ end += -reinterpret_cast<uintptr_t>(end) & 15;
+ NSan::Unpoison(begin, end - begin);
+ }
+ return Crc32cExtend(checksum, data, len);
+}
+
+struct TSessionParams {
+ bool Encryption = {};
+ bool UseModernFrame = {};
+ bool AuthOnly = {};
+ TString AuthCN;
+ NActors::TScopeId PeerScopeId;
+};
+
+struct TTcpPacketHeader_v1 {
+ ui32 HeaderCRC32;
+ ui32 PayloadCRC32;
+ ui64 Confirm;
+ ui64 Serial;
+ ui64 DataSize;
+
+ inline bool Check() const {
+ ui32 actual = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
+ return actual == HeaderCRC32;
+ }
+
+ inline void Sign() {
+ HeaderCRC32 = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
+ }
+
+ TString ToString() const {
+ return Sprintf("{Confirm# %" PRIu64 " Serial# %" PRIu64 " DataSize# %" PRIu64 "}", Confirm, Serial, DataSize);
+ }
+};
+
+#pragma pack(push, 1)
+struct TTcpPacketHeader_v2 {
+ ui64 Confirm;
+ ui64 Serial;
+ ui32 Checksum; // for the whole frame
+ ui16 PayloadLength;
+};
+#pragma pack(pop)
+
+union TTcpPacketBuf {
+ static constexpr ui64 PingRequestMask = 0x8000000000000000ULL;
+ static constexpr ui64 PingResponseMask = 0x4000000000000000ULL;
+ static constexpr ui64 ClockMask = 0x2000000000000000ULL;
+
+ static constexpr size_t PacketDataLen = 4096 * 2 - 96 - Max(sizeof(TTcpPacketHeader_v1), sizeof(TTcpPacketHeader_v2));
+ struct {
+ TTcpPacketHeader_v1 Header;
+ char Data[PacketDataLen];
+ } v1;
+ struct {
+ TTcpPacketHeader_v2 Header;
+ char Data[PacketDataLen];
+ } v2;
+};
+
+#pragma pack(push, 1)
+struct TEventDescr {
+ ui32 Type;
+ ui32 Flags;
TActorId Recipient;
TActorId Sender;
- ui64 Cookie;
- // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
- NWilson::TTraceId::TSerializedTraceId TraceId;
- ui32 Checksum;
-};
-#pragma pack(pop)
-
-struct TEventHolder : TNonCopyable {
- TEventDescr Descr;
+ ui64 Cookie;
+ // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
+ NWilson::TTraceId::TSerializedTraceId TraceId;
+ ui32 Checksum;
+};
+#pragma pack(pop)
+
+struct TEventHolder : TNonCopyable {
+ TEventDescr Descr;
TActorId ForwardRecipient;
- THolder<IEventBase> Event;
- TIntrusivePtr<TEventSerializedData> Buffer;
- ui64 Serial;
- ui32 EventSerializedSize;
- ui32 EventActuallySerialized;
+ THolder<IEventBase> Event;
+ TIntrusivePtr<TEventSerializedData> Buffer;
+ ui64 Serial;
+ ui32 EventSerializedSize;
+ ui32 EventActuallySerialized;
mutable NLWTrace::TOrbit Orbit;
-
- ui32 Fill(IEventHandle& ev);
-
- void InitChecksum() {
- Descr.Checksum = 0;
- }
-
- void UpdateChecksum(const TSessionParams& params, const void *buffer, size_t len) {
- if (FORCE_EVENT_CHECKSUM || !params.UseModernFrame) {
- Descr.Checksum = Crc32cExtendMSanCompatible(Descr.Checksum, buffer, len);
- }
- }
-
- void ForwardOnNondelivery(bool unsure) {
- TEventDescr& d = Descr;
- const TActorId& r = d.Recipient;
- const TActorId& s = d.Sender;
- const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
- auto ev = Event
- ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, NWilson::TTraceId(d.TraceId))
- : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, NWilson::TTraceId(d.TraceId));
- NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
- }
-
- void Clear() {
- Event.Reset();
- Buffer.Reset();
+
+ ui32 Fill(IEventHandle& ev);
+
+ void InitChecksum() {
+ Descr.Checksum = 0;
+ }
+
+ void UpdateChecksum(const TSessionParams& params, const void *buffer, size_t len) {
+ if (FORCE_EVENT_CHECKSUM || !params.UseModernFrame) {
+ Descr.Checksum = Crc32cExtendMSanCompatible(Descr.Checksum, buffer, len);
+ }
+ }
+
+ void ForwardOnNondelivery(bool unsure) {
+ TEventDescr& d = Descr;
+ const TActorId& r = d.Recipient;
+ const TActorId& s = d.Sender;
+ const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
+ auto ev = Event
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, NWilson::TTraceId(d.TraceId))
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, NWilson::TTraceId(d.TraceId));
+ NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
+ }
+
+ void Clear() {
+ Event.Reset();
+ Buffer.Reset();
Orbit.Reset();
- }
-};
-
-namespace NActors {
- class TEventOutputChannel;
+ }
+};
+
+namespace NActors {
+ class TEventOutputChannel;
}
-
-struct TTcpPacketOutTask : TNonCopyable {
- const TSessionParams& Params;
- TTcpPacketBuf Packet;
- size_t DataSize;
- TStackVec<TConstIoVec, 32> Bufs;
- size_t BufferIndex;
- size_t FirstBufferOffset;
- bool TriedWriting;
- char *FreeArea;
- char *End;
+
+struct TTcpPacketOutTask : TNonCopyable {
+ const TSessionParams& Params;
+ TTcpPacketBuf Packet;
+ size_t DataSize;
+ TStackVec<TConstIoVec, 32> Bufs;
+ size_t BufferIndex;
+ size_t FirstBufferOffset;
+ bool TriedWriting;
+ char *FreeArea;
+ char *End;
mutable NLWTrace::TOrbit Orbit;
-
-public:
- TTcpPacketOutTask(const TSessionParams& params)
- : Params(params)
- {
- Reuse();
- }
-
- template<typename T>
- auto ApplyToHeader(T&& callback) {
- return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
- }
-
- template<typename T>
- auto ApplyToHeader(T&& callback) const {
- return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
- }
-
- bool IsAtBegin() const {
- return !BufferIndex && !FirstBufferOffset && !TriedWriting;
- }
-
- void MarkTriedWriting() {
- TriedWriting = true;
- }
-
- void Reuse() {
- DataSize = 0;
- ApplyToHeader([this](auto& header) { Bufs.assign(1, {&header, sizeof(header)}); });
- BufferIndex = 0;
- FirstBufferOffset = 0;
- TriedWriting = false;
- FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data;
- End = FreeArea + TTcpPacketBuf::PacketDataLen;
+
+public:
+ TTcpPacketOutTask(const TSessionParams& params)
+ : Params(params)
+ {
+ Reuse();
+ }
+
+ template<typename T>
+ auto ApplyToHeader(T&& callback) {
+ return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
+ }
+
+ template<typename T>
+ auto ApplyToHeader(T&& callback) const {
+ return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
+ }
+
+ bool IsAtBegin() const {
+ return !BufferIndex && !FirstBufferOffset && !TriedWriting;
+ }
+
+ void MarkTriedWriting() {
+ TriedWriting = true;
+ }
+
+ void Reuse() {
+ DataSize = 0;
+ ApplyToHeader([this](auto& header) { Bufs.assign(1, {&header, sizeof(header)}); });
+ BufferIndex = 0;
+ FirstBufferOffset = 0;
+ TriedWriting = false;
+ FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data;
+ End = FreeArea + TTcpPacketBuf::PacketDataLen;
Orbit.Reset();
- }
-
+ }
+
bool IsEmpty() const {
- return !DataSize;
- }
-
- void SetMetadata(ui64 serial, ui64 confirm) {
- ApplyToHeader([&](auto& header) {
- header.Serial = serial;
- header.Confirm = confirm;
- });
- }
-
- void UpdateConfirmIfPossible(ui64 confirm) {
- // we don't want to recalculate whole packet checksum for single confirmation update on v2
- if (!Params.UseModernFrame && IsAtBegin() && confirm != Packet.v1.Header.Confirm) {
- Packet.v1.Header.Confirm = confirm;
- Packet.v1.Header.Sign();
- }
- }
-
- size_t GetDataSize() const { return DataSize; }
-
- ui64 GetSerial() const {
- return ApplyToHeader([](auto& header) { return header.Serial; });
- }
-
- bool Confirmed(ui64 confirm) const {
- return ApplyToHeader([&](auto& header) { return IsEmpty() || header.Serial <= confirm; });
- }
-
- void *GetFreeArea() {
- return FreeArea;
- }
-
- size_t GetVirtualFreeAmount() const {
- return TTcpPacketBuf::PacketDataLen - DataSize;
+ return !DataSize;
}
-
- void AppendBuf(const void *buf, size_t size) {
- DataSize += size;
- Y_VERIFY_DEBUG(DataSize <= TTcpPacketBuf::PacketDataLen, "DataSize# %zu AppendBuf buf# %p size# %zu"
- " FreeArea# %p End# %p", DataSize, buf, size, FreeArea, End);
-
- if (Bufs && static_cast<const char*>(Bufs.back().Data) + Bufs.back().Size == buf) {
- Bufs.back().Size += size;
- } else {
- Bufs.push_back({buf, size});
- }
-
- if (buf >= FreeArea && buf < End) {
- Y_VERIFY_DEBUG(buf == FreeArea);
- FreeArea = const_cast<char*>(static_cast<const char*>(buf)) + size;
- Y_VERIFY_DEBUG(FreeArea <= End);
- }
- }
-
- void Undo(size_t size) {
- Y_VERIFY(Bufs);
- auto& buf = Bufs.back();
- Y_VERIFY(buf.Data == FreeArea - buf.Size);
- buf.Size -= size;
- if (!buf.Size) {
- Bufs.pop_back();
- }
- FreeArea -= size;
- DataSize -= size;
- }
-
- bool DropBufs(size_t& amount) {
+
+ void SetMetadata(ui64 serial, ui64 confirm) {
+ ApplyToHeader([&](auto& header) {
+ header.Serial = serial;
+ header.Confirm = confirm;
+ });
+ }
+
+ void UpdateConfirmIfPossible(ui64 confirm) {
+ // we don't want to recalculate whole packet checksum for single confirmation update on v2
+ if (!Params.UseModernFrame && IsAtBegin() && confirm != Packet.v1.Header.Confirm) {
+ Packet.v1.Header.Confirm = confirm;
+ Packet.v1.Header.Sign();
+ }
+ }
+
+ size_t GetDataSize() const { return DataSize; }
+
+ ui64 GetSerial() const {
+ return ApplyToHeader([](auto& header) { return header.Serial; });
+ }
+
+ bool Confirmed(ui64 confirm) const {
+ return ApplyToHeader([&](auto& header) { return IsEmpty() || header.Serial <= confirm; });
+ }
+
+ void *GetFreeArea() {
+ return FreeArea;
+ }
+
+ size_t GetVirtualFreeAmount() const {
+ return TTcpPacketBuf::PacketDataLen - DataSize;
+ }
+
+ void AppendBuf(const void *buf, size_t size) {
+ DataSize += size;
+ Y_VERIFY_DEBUG(DataSize <= TTcpPacketBuf::PacketDataLen, "DataSize# %zu AppendBuf buf# %p size# %zu"
+ " FreeArea# %p End# %p", DataSize, buf, size, FreeArea, End);
+
+ if (Bufs && static_cast<const char*>(Bufs.back().Data) + Bufs.back().Size == buf) {
+ Bufs.back().Size += size;
+ } else {
+ Bufs.push_back({buf, size});
+ }
+
+ if (buf >= FreeArea && buf < End) {
+ Y_VERIFY_DEBUG(buf == FreeArea);
+ FreeArea = const_cast<char*>(static_cast<const char*>(buf)) + size;
+ Y_VERIFY_DEBUG(FreeArea <= End);
+ }
+ }
+
+ void Undo(size_t size) {
+ Y_VERIFY(Bufs);
+ auto& buf = Bufs.back();
+ Y_VERIFY(buf.Data == FreeArea - buf.Size);
+ buf.Size -= size;
+ if (!buf.Size) {
+ Bufs.pop_back();
+ }
+ FreeArea -= size;
+ DataSize -= size;
+ }
+
+ bool DropBufs(size_t& amount) {
while (BufferIndex != Bufs.size()) {
TConstIoVec& item = Bufs[BufferIndex];
- // calculate number of bytes to the end in current buffer
- const size_t remain = item.Size - FirstBufferOffset;
- if (amount >= remain) {
- // vector item completely fits into the received amount, drop it out and switch to next buffer
- amount -= remain;
- ++BufferIndex;
- FirstBufferOffset = 0;
- } else {
- // adjust first buffer by "amount" bytes forward and reset amount to zero
- FirstBufferOffset += amount;
- amount = 0;
- // return false meaning that we have some more data to send
- return false;
- }
- }
- return true;
- }
-
- void ResetBufs() {
- BufferIndex = FirstBufferOffset = 0;
- TriedWriting = false;
- }
-
+ // calculate number of bytes to the end in current buffer
+ const size_t remain = item.Size - FirstBufferOffset;
+ if (amount >= remain) {
+ // vector item completely fits into the received amount, drop it out and switch to next buffer
+ amount -= remain;
+ ++BufferIndex;
+ FirstBufferOffset = 0;
+ } else {
+ // adjust first buffer by "amount" bytes forward and reset amount to zero
+ FirstBufferOffset += amount;
+ amount = 0;
+ // return false meaning that we have some more data to send
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void ResetBufs() {
+ BufferIndex = FirstBufferOffset = 0;
+ TriedWriting = false;
+ }
+
template <typename TVectorType>
void AppendToIoVector(TVectorType& vector, size_t max) {
- for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) {
- TConstIoVec v = Bufs[k];
+ for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) {
+ TConstIoVec v = Bufs[k];
v.Data = static_cast<const char*>(v.Data) + offset;
- v.Size -= offset;
- vector.push_back(v);
- }
- }
-
- void Sign() {
- if (Params.UseModernFrame) {
- Packet.v2.Header.Checksum = 0;
- Packet.v2.Header.PayloadLength = DataSize;
- if (!Params.Encryption) {
- ui32 sum = 0;
- for (const auto& item : Bufs) {
- sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size);
- }
- Packet.v2.Header.Checksum = sum;
- }
- } else {
- Y_VERIFY(!Bufs.empty());
- auto it = Bufs.begin();
- static constexpr size_t headerLen = sizeof(TTcpPacketHeader_v1);
- Y_VERIFY(it->Data == &Packet.v1.Header && it->Size >= headerLen);
- ui32 sum = Crc32cExtendMSanCompatible(0, Packet.v1.Data, it->Size - headerLen);
- while (++it != Bufs.end()) {
- sum = Crc32cExtendMSanCompatible(sum, it->Data, it->Size);
- }
-
- Packet.v1.Header.PayloadCRC32 = sum;
- Packet.v1.Header.DataSize = DataSize;
- Packet.v1.Header.Sign();
- }
- }
-};
+ v.Size -= offset;
+ vector.push_back(v);
+ }
+ }
+
+ void Sign() {
+ if (Params.UseModernFrame) {
+ Packet.v2.Header.Checksum = 0;
+ Packet.v2.Header.PayloadLength = DataSize;
+ if (!Params.Encryption) {
+ ui32 sum = 0;
+ for (const auto& item : Bufs) {
+ sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size);
+ }
+ Packet.v2.Header.Checksum = sum;
+ }
+ } else {
+ Y_VERIFY(!Bufs.empty());
+ auto it = Bufs.begin();
+ static constexpr size_t headerLen = sizeof(TTcpPacketHeader_v1);
+ Y_VERIFY(it->Data == &Packet.v1.Header && it->Size >= headerLen);
+ ui32 sum = Crc32cExtendMSanCompatible(0, Packet.v1.Data, it->Size - headerLen);
+ while (++it != Bufs.end()) {
+ sum = Crc32cExtendMSanCompatible(sum, it->Data, it->Size);
+ }
+
+ Packet.v1.Header.PayloadCRC32 = sum;
+ Packet.v1.Header.DataSize = DataSize;
+ Packet.v1.Header.Sign();
+ }
+ }
+};