summaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/ut/grpc_io_ut.cpp
blob: 4a97395b3df01f9f5b78cf19f7ed72717637c9d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <library/cpp/unified_agent_client/grpc_io.h>

#include <library/cpp/testing/gtest/gtest.h>

#include <contrib/libs/grpc/include/grpcpp/grpcpp.h>
#include <contrib/libs/grpc/include/grpc/grpc.h>

using namespace NUnifiedAgent;

namespace {
    struct TDummyCallback : public IIOCallback {
        std::atomic<bool> Completed{false};
        std::atomic<int> Refs{0};

        IIOCallback* Ref() override {
            Refs++;
            return this;
        }

        void OnIOCompleted(EIOStatus) override {
            Completed = true;
        }
    };
}

class TGrpcTimerTest : public ::testing::Test {
protected:
    void SetUp() override {
        EnsureGrpcConfigured();
        grpc_init();
    }

    void TearDown() override {
        grpc_shutdown();
    }
};

TEST_F(TGrpcTimerTest, TestSetCancelFromExternalThread) {
    grpc::CompletionQueue cq;
    TAsyncJoiner joiner;
    auto cb = MakeHolder<TDummyCallback>();
    auto* cbPtr = cb.Get();
    cbPtr->Refs = 1; // Base ref

    TGrpcTimer timer(cq, std::move(cb), joiner);

    timer.Set(TInstant::Now() + TDuration::Seconds(10));

    void* tag;
    bool ok;

    switch (cq.AsyncNext(&tag, &ok, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
        case grpc::CompletionQueue::GOT_EVENT: {
            EXPECT_TRUE(ok);
            auto* iocb = static_cast<IIOCallback*>(tag);
            iocb->OnIOCompleted(EIOStatus::Ok);
            break;
        }
        case grpc::CompletionQueue::TIMEOUT:
            FAIL() << "Timeout waiting for CQ event";
        case grpc::CompletionQueue::SHUTDOWN:
            FAIL() << "CQ is shutdown";
    }

    EXPECT_FALSE(cbPtr->Completed.load());

    // Now the alarm is set, and the lambda is destroyed. The joiner should have its ref decremented.

    // Cancel the alarm via external thread
    timer.Cancel();

    // Pull the Cancel posted message
    switch (cq.AsyncNext(&tag, &ok, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
        case grpc::CompletionQueue::GOT_EVENT: {
            EXPECT_TRUE(ok);
            auto* iocb = static_cast<IIOCallback*>(tag);
            iocb->OnIOCompleted(EIOStatus::Ok); // Executes ApplyCancel + UnRef
            break;
        }
        default:
            FAIL() << "Expected event in CQ";
    }

    // Pull the actual grpc::Alarm completion since we canceled it
    switch (cq.AsyncNext(&tag, &ok, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(1, GPR_TIMESPAN)))) {
        case grpc::CompletionQueue::GOT_EVENT: {
            // ok might be false because the alarm was canceled
            auto* iocb = static_cast<IIOCallback*>(tag);
            iocb->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
            break;
        }
        default:
            FAIL() << "Expected alarm completion event in CQ";
    }

    EXPECT_TRUE(cbPtr->Completed.load());

    joiner.Join().Wait();
}