aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/event_holder_pool.h
blob: 0afa1d7a7ce990cb77800bc06d8547570ee4324c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#pragma once

#include <library/cpp/containers/stack_vector/stack_vec.h>

#include "packet.h"

namespace NActors {
    struct TEvFreeItems : TEventLocal<TEvFreeItems, EventSpaceBegin(TEvents::ES_PRIVATE)> {
        static constexpr size_t MaxEvents = 256;

        std::list<TEventHolder> FreeQueue;
        TStackVec<THolder<IEventBase>, MaxEvents> Events;
        TStackVec<THolder<TEventSerializedData>, MaxEvents> Buffers;
        std::shared_ptr<std::atomic<TAtomicBase>> Counter;
        ui64 NumBytes = 0;

        ~TEvFreeItems() {
            if (Counter) {
                TAtomicBase res = Counter->fetch_sub(NumBytes) - NumBytes;
                Y_ABORT_UNLESS(res >= 0);
            }
        }

        bool GetInLineForDestruction(const TIntrusivePtr<TInterconnectProxyCommon>& common) {
            Y_ABORT_UNLESS(!Counter);
            const auto& counter = common->DestructorQueueSize;
            const auto& max = common->MaxDestructorQueueSize;
            if (counter && (TAtomicBase)(counter->fetch_add(NumBytes) + NumBytes) > max) {
                counter->fetch_sub(NumBytes);
                return false;
            }
            Counter = counter;
            return true;
        }
    };

    class TEventHolderPool {
        using TDestroyCallback = std::function<void(THolder<IEventBase>)>;

        static constexpr size_t MaxFreeQueueItems = 32;
        static constexpr size_t FreeQueueTrimThreshold = MaxFreeQueueItems * 2;
        static constexpr ui64 MaxBytesPerMessage = 10 * 1024 * 1024;

        TIntrusivePtr<TInterconnectProxyCommon> Common;
        std::list<TEventHolder> Cache;
        THolder<TEvFreeItems> PendingFreeEvent;
        TDestroyCallback DestroyCallback;

    public:
        TEventHolderPool(TIntrusivePtr<TInterconnectProxyCommon> common,
                TDestroyCallback destroyCallback)
            : Common(std::move(common))
            , DestroyCallback(std::move(destroyCallback))
        {}

        TEventHolder& Allocate(std::list<TEventHolder>& queue) {
            if (Cache.empty()) {
                queue.emplace_back();
            } else {
                queue.splice(queue.end(), Cache, Cache.begin());
            }
            return queue.back();
        }

        void Release(std::list<TEventHolder>& queue) {
            for (auto it = queue.begin(); it != queue.end(); ) {
                Release(queue, it++);
            }
        }

        void Release(std::list<TEventHolder>& queue, std::list<TEventHolder>::iterator event) {
            bool trim = false;

            // release held event, if any
            if (THolder<IEventBase> ev = std::move(event->Event)) {
                auto p = GetPendingEvent();
                p->NumBytes += event->EventSerializedSize;
                auto& events = p->Events;
                events.push_back(std::move(ev));
                trim = trim || events.size() >= TEvFreeItems::MaxEvents || p->NumBytes >= MaxBytesPerMessage;
            }

            // release buffer, if any
            if (event->Buffer && event->Buffer.RefCount() == 1) {
                auto p = GetPendingEvent();
                p->NumBytes += event->EventSerializedSize;
                auto& buffers = p->Buffers;
                buffers.emplace_back(event->Buffer.Release());
                trim = trim || buffers.size() >= TEvFreeItems::MaxEvents || p->NumBytes >= MaxBytesPerMessage;
            }

            // free event and trim the cache if its size is exceeded
            event->Clear();
            Cache.splice(Cache.end(), queue, event);
            if (Cache.size() >= FreeQueueTrimThreshold) {
                auto& freeQueue = GetPendingEvent()->FreeQueue;
                auto it = Cache.begin();
                std::advance(it, Cache.size() - MaxFreeQueueItems);
                freeQueue.splice(freeQueue.end(), Cache, Cache.begin(), it);
                trim = true;
            }

            // release items if we have hit the limit
            if (trim) {
                Trim();
            }
        }

        void Trim() {
            if (auto ev = std::move(PendingFreeEvent); ev && ev->GetInLineForDestruction(Common)) {
                DestroyCallback(std::move(ev));
            }

            // ensure it is dropped
            PendingFreeEvent.Reset();
        }

    private:
        TEvFreeItems* GetPendingEvent() {
            if (!PendingFreeEvent) {
                PendingFreeEvent.Reset(new TEvFreeItems);
            }
            return PendingFreeEvent.Get();
        }
    };

}