diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-19 13:31:50 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-19 13:31:50 +0300 |
commit | 7653175c3b4bd262510d7dbfdf5dc30ceb777d75 (patch) | |
tree | d33ef2f5490f7f8dbab2e0e9ac8c3b13f26e8111 /library/cpp/actors/interconnect/outgoing_stream.h | |
parent | 9143f30e39d2d0d9d81048632bce5f25de587b3f (diff) | |
download | ydb-7653175c3b4bd262510d7dbfdf5dc30ceb777d75.tar.gz |
Rework interconnect sending stream machinery
Diffstat (limited to 'library/cpp/actors/interconnect/outgoing_stream.h')
-rw-r--r-- | library/cpp/actors/interconnect/outgoing_stream.h | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h new file mode 100644 index 0000000000..0306295b52 --- /dev/null +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -0,0 +1,224 @@ +#pragma once + +#include <library/cpp/actors/core/event_load.h> +#include <library/cpp/actors/util/rc_buf.h> +#include <library/cpp/containers/stack_vector/stack_vec.h> +#include <deque> + +namespace NInterconnect { + + template<size_t TotalSize> + class TOutgoingStreamT { + static constexpr size_t BufferSize = TotalSize - sizeof(ui32) * 2; + + struct TBuffer { + char Data[BufferSize]; + ui32 RefCount; + ui32 Index; + + struct TDeleter { + void operator ()(TBuffer *buffer) const { + free(buffer); + } + }; + }; + + static_assert(sizeof(TBuffer) == TotalSize); + + struct TSendChunk { + TContiguousSpan Span; + TBuffer *Buffer; + }; + + std::vector<std::unique_ptr<TBuffer, typename TBuffer::TDeleter>> Buffers; + TBuffer *AppendBuffer = nullptr; + size_t AppendOffset = BufferSize; // into the last buffer + std::deque<TSendChunk> SendQueue; + size_t SendQueuePos = 0; + size_t SendOffset = 0; + + public: + size_t CalculateOutgoingSize() const { + size_t res = 0; + for (const TSendChunk& chunk : SendQueue) { + res += chunk.Span.size(); + } + return res; + } + + size_t CalculateUnsentSize() const { + size_t res = 0; + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) { + res += it->Span.size(); + } + return res - SendOffset; + } + + TMutableContiguousSpan AcquireSpanForWriting(size_t maxLen) { + if (AppendOffset == BufferSize) { // we have no free buffer, allocate one + Buffers.emplace_back(static_cast<TBuffer*>(malloc(sizeof(TBuffer)))); + AppendBuffer = Buffers.back().get(); + Y_VERIFY(AppendBuffer); + AppendBuffer->RefCount = 1; // through AppendBuffer pointer + AppendBuffer->Index = Buffers.size() - 1; + AppendOffset = 0; + } + return {AppendBuffer->Data + AppendOffset, Min(maxLen, BufferSize - AppendOffset)}; + } + + void Append(TContiguousSpan span) { + TBuffer *buffer = nullptr; + if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span + buffer = AppendBuffer; + AppendOffset += span.size(); + Y_VERIFY_DEBUG(AppendOffset <= BufferSize); + if (AppendOffset != BufferSize) { + ++buffer->RefCount; + } + } else { +#ifndef NDEBUG + // ensure this span does not point into any existing buffer part + const char *begin = span.data(); + const char *end = span.data() + span.size(); + for (const auto& buffer : Buffers) { + const char *bufferBegin = buffer->Data; + const char *bufferEnd = bufferBegin + BufferSize; + if (bufferBegin < end && begin < bufferEnd) { + Y_FAIL(); + } + } +#endif + } + + if (!SendQueue.empty()) { + auto& back = SendQueue.back(); + if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span + if (SendQueuePos == SendQueue.size()) { + --SendQueuePos; + SendOffset = back.Span.size(); + } + back.Span = {back.Span.data(), back.Span.size() + span.size()}; + DropBufferReference(buffer); + return; + } + } + + if (buffer) { + ++buffer->RefCount; + } + SendQueue.push_back(TSendChunk{span, buffer}); + DropBufferReference(buffer); + } + + void Write(TContiguousSpan in) { + while (in.size()) { + auto outChunk = AcquireSpanForWriting(in.size()); + Append(outChunk); + memcpy(outChunk.data(), in.data(), outChunk.size()); + in = in.SubSpan(outChunk.size(), Max<size_t>()); + } + } + + using TBookmark = TStackVec<TMutableContiguousSpan, 2>; + + TBookmark Bookmark(size_t len) { + TBookmark bookmark; + + while (len) { + const auto span = AcquireSpanForWriting(len); + Append(span); + bookmark.push_back(span); + len -= span.size(); + } + + return bookmark; + } + + void WriteBookmark(TBookmark&& bookmark, TContiguousSpan in) { + for (auto& outChunk : bookmark) { + Y_VERIFY(outChunk.size() <= in.size()); + memcpy(outChunk.data(), in.data(), outChunk.size()); + in = in.SubSpan(outChunk.size(), Max<size_t>()); + } + } + + void Rewind() { + SendQueuePos = 0; + SendOffset = 0; + } + + template<typename T> + void ProduceIoVec(T& container, size_t maxItems) { + size_t offset = SendOffset; + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems; ++it) { + const TContiguousSpan span = it->Span.SubSpan(offset, Max<size_t>()); + container.push_back(NActors::TConstIoVec{span.data(), span.size()}); + offset = 0; + } + } + + void Advance(size_t numBytes) { // called when numBytes portion of data has been sent + Y_VERIFY_DEBUG(numBytes == 0 || SendQueuePos != SendQueue.size()); + SendOffset += numBytes; + for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->Span.size() <= SendOffset; ++SendQueuePos, ++it) { + SendOffset -= it->Span.size(); + Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + } + } + + void DropFront(size_t numBytes) { // drops first numBytes from the queue, freeing buffers when necessary + while (numBytes) { + Y_VERIFY_DEBUG(!SendQueue.empty()); + auto& front = SendQueue.front(); + if (numBytes < front.Span.size()) { + front.Span = front.Span.SubSpan(numBytes, Max<size_t>()); + if (SendQueuePos == 0) { + Y_VERIFY_DEBUG(numBytes <= SendOffset); + SendOffset -= numBytes; + } + break; + } else { + numBytes -= front.Span.size(); + } + Y_VERIFY_DEBUG(!front.Buffer || (front.Span.data() >= front.Buffer->Data && + front.Span.data() + front.Span.size() <= front.Buffer->Data + BufferSize)); + DropBufferReference(front.Buffer); + SendQueue.pop_front(); + if (SendQueuePos) { + --SendQueuePos; + } else { + SendOffset = 0; + } + } + } + + template<typename T> + void ScanLastBytes(size_t numBytes, T&& callback) const { + auto it = SendQueue.end(); + ssize_t offset = -numBytes; + while (offset < 0) { + Y_VERIFY_DEBUG(it != SendQueue.begin()); + const TSendChunk& chunk = *--it; + offset += chunk.Span.size(); + } + for (; it != SendQueue.end(); ++it, offset = 0) { + callback(it->Span.SubSpan(offset, Max<size_t>())); + } + } + + private: + void DropBufferReference(TBuffer *buffer) { + if (buffer && !--buffer->RefCount) { + const size_t index = buffer->Index; + auto& cell = Buffers[index]; + Y_VERIFY_DEBUG(cell.get() == buffer); + std::swap(cell, Buffers.back()); + cell->Index = index; + Buffers.pop_back(); + } + } + }; + + using TOutgoingStream = TOutgoingStreamT<262144>; + +} // NInterconnect |