1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/core/utils/stream/ConcurrentStreamBuf.h>
#include <aws/core/utils/logging/LogMacros.h>
#include <cstdint>
#include <cassert>
namespace Aws
{
namespace Utils
{
namespace Stream
{
const char TAG[] = "ConcurrentStreamBuf";
ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
m_eof(false)
{
m_getArea.reserve(bufferLength);
m_backbuf.reserve(bufferLength);
char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
setp(pbegin, pbegin + bufferLength);
}
void ConcurrentStreamBuf::SetEof()
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_eof = true;
}
m_signal.notify_all();
}
void ConcurrentStreamBuf::FlushPutArea()
{
const size_t bitslen = pptr() - pbase();
if (bitslen)
{
// scope the lock
{
std::unique_lock<std::mutex> lock(m_lock);
m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); });
if (m_eof)
{
return;
}
std::copy(pbase(), pptr(), std::back_inserter(m_backbuf));
}
m_signal.notify_one();
char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
setp(pbegin, pbegin + m_putArea.size());
}
}
std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode)
{
return std::streamoff(-1); // Seeking is not supported.
}
std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode)
{
return std::streamoff(-1); // Seeking is not supported.
}
int ConcurrentStreamBuf::underflow()
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; });
if (m_eof && m_backbuf.empty())
{
return std::char_traits<char>::eof();
}
m_getArea.clear(); // keep the get-area from growing unbounded.
std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea));
m_backbuf.clear();
}
m_signal.notify_one();
char* gbegin = reinterpret_cast<char*>(&m_getArea[0]);
setg(gbegin, gbegin, gbegin + m_getArea.size());
return std::char_traits<char>::to_int_type(*gptr());
}
std::streamsize ConcurrentStreamBuf::showmanyc()
{
std::unique_lock<std::mutex> lock(m_lock);
if (!m_backbuf.empty())
{
AWS_LOGSTREAM_TRACE(TAG, "Stream characters in buffer: " << m_backbuf.size());
}
return m_backbuf.size();
}
int ConcurrentStreamBuf::overflow(int ch)
{
const auto eof = std::char_traits<char>::eof();
if (ch == eof)
{
FlushPutArea();
return eof;
}
FlushPutArea();
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_eof)
{
return eof;
}
*pptr() = static_cast<char>(ch);
pbump(1);
return ch;
}
}
int ConcurrentStreamBuf::sync()
{
FlushPutArea();
return 0;
}
}
}
}
|