summaryrefslogtreecommitdiffstats
path: root/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp
blob: 1ef4ee67580d02e37cd34280d9e9662ae507d1af (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */
#include <aws/core/utils/stream/ConcurrentStreamBuf.h>
#include <aws/core/utils/logging/LogMacros.h>
#include <cstdint>
#include <cassert>

namespace Aws
{
    namespace Utils
    {
        namespace Stream
        {
            const char TAG[] = "ConcurrentStreamBuf";
            ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
                m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
                m_eof(false)
            {
                m_getArea.reserve(bufferLength);
                m_backbuf.reserve(bufferLength);

                char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
                setp(pbegin, pbegin + bufferLength);
            }

            void ConcurrentStreamBuf::SetEof()
            {
                {
                    std::unique_lock<std::mutex> lock(m_lock);
                    m_eof = true;
                }
                m_signal.notify_all();
            }

            void ConcurrentStreamBuf::FlushPutArea()
            {
                const size_t bitslen = pptr() - pbase();
                if (bitslen)
                {
                    // scope the lock
                    {
                        std::unique_lock<std::mutex> lock(m_lock);
                        m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); });
                        if (m_eof)
                        {
                            return;
                        }
                        std::copy(pbase(), pptr(), std::back_inserter(m_backbuf));
                    }
                    m_signal.notify_one();
                    char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
                    setp(pbegin, pbegin + m_putArea.size());
                }
            }

            std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode)
            {
                return std::streamoff(-1); // Seeking is not supported.
            }

            std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode)
            {
                return std::streamoff(-1); // Seeking is not supported.
            }

            int ConcurrentStreamBuf::underflow()
            {
                {
                    std::unique_lock<std::mutex> lock(m_lock);
                    m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; });

                    if (m_eof && m_backbuf.empty())
                    {
                        return std::char_traits<char>::eof();
                    }

                    m_getArea.clear(); // keep the get-area from growing unbounded.
                    std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea));
                    m_backbuf.clear();
                }
                m_signal.notify_one();
                char* gbegin = reinterpret_cast<char*>(&m_getArea[0]);
                setg(gbegin, gbegin, gbegin + m_getArea.size());
                return std::char_traits<char>::to_int_type(*gptr());
            }

            std::streamsize ConcurrentStreamBuf::showmanyc()
            {
                std::unique_lock<std::mutex> lock(m_lock);
                if (!m_backbuf.empty())
                {
                    AWS_LOGSTREAM_TRACE(TAG, "Stream characters in buffer: " << m_backbuf.size());
                }
                return m_backbuf.size();
            }

            int ConcurrentStreamBuf::overflow(int ch)
            {
                const auto eof = std::char_traits<char>::eof();

                if (ch == eof)
                {
                    FlushPutArea();
                    return eof;
                }

                FlushPutArea();
                {
                    std::unique_lock<std::mutex> lock(m_lock);
                    if (m_eof)
                    {
                        return eof;
                    }
                    *pptr() = static_cast<char>(ch);
                    pbump(1);
                    return ch;
                }
            }

            int ConcurrentStreamBuf::sync()
            {
                FlushPutArea();
                return 0;
            }
        }
    }
}