aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream')
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp126
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp75
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp91
-rw-r--r--contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp239
4 files changed, 531 insertions, 0 deletions
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp
new file mode 100644
index 0000000000..3f59dbe96d
--- /dev/null
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp
@@ -0,0 +1,126 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+#include <aws/core/utils/stream/ConcurrentStreamBuf.h>
+#include <aws/core/utils/logging/LogMacros.h>
+#include <cstdint>
+#include <cassert>
+
+namespace Aws
+{
+ namespace Utils
+ {
+ namespace Stream
+ {
+ const char TAG[] = "ConcurrentStreamBuf";
+ ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
+ m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
+ m_eof(false)
+ {
+ m_getArea.reserve(bufferLength);
+ m_backbuf.reserve(bufferLength);
+
+ char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
+ setp(pbegin, pbegin + bufferLength);
+ }
+
+ void ConcurrentStreamBuf::SetEof()
+ {
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ m_eof = true;
+ }
+ m_signal.notify_all();
+ }
+
+ void ConcurrentStreamBuf::FlushPutArea()
+ {
+ const size_t bitslen = pptr() - pbase();
+ if (bitslen)
+ {
+ // scope the lock
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); });
+ if (m_eof)
+ {
+ return;
+ }
+ std::copy(pbase(), pptr(), std::back_inserter(m_backbuf));
+ }
+ m_signal.notify_one();
+ char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
+ setp(pbegin, pbegin + m_putArea.size());
+ }
+ }
+
+ std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode)
+ {
+ return std::streamoff(-1); // Seeking is not supported.
+ }
+
+ std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode)
+ {
+ return std::streamoff(-1); // Seeking is not supported.
+ }
+
+ int ConcurrentStreamBuf::underflow()
+ {
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; });
+
+ if (m_eof && m_backbuf.empty())
+ {
+ return std::char_traits<char>::eof();
+ }
+
+ m_getArea.clear(); // keep the get-area from growing unbounded.
+ std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea));
+ m_backbuf.clear();
+ }
+ m_signal.notify_one();
+ char* gbegin = reinterpret_cast<char*>(&m_getArea[0]);
+ setg(gbegin, gbegin, gbegin + m_getArea.size());
+ return std::char_traits<char>::to_int_type(*gptr());
+ }
+
+ std::streamsize ConcurrentStreamBuf::showmanyc()
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ AWS_LOGSTREAM_TRACE(TAG, "stream how many character? " << m_backbuf.size());
+ return m_backbuf.size();
+ }
+
+ int ConcurrentStreamBuf::overflow(int ch)
+ {
+ const auto eof = std::char_traits<char>::eof();
+
+ if (ch == eof)
+ {
+ FlushPutArea();
+ return eof;
+ }
+
+ FlushPutArea();
+ {
+ std::unique_lock<std::mutex> lock(m_lock);
+ if (m_eof)
+ {
+ return eof;
+ }
+ *pptr() = static_cast<char>(ch);
+ pbump(1);
+ return ch;
+ }
+ }
+
+ int ConcurrentStreamBuf::sync()
+ {
+ FlushPutArea();
+ return 0;
+ }
+ }
+ }
+}
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp
new file mode 100644
index 0000000000..f656fc8613
--- /dev/null
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp
@@ -0,0 +1,75 @@
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
+#include <cassert>
+
+namespace Aws
+{
+ namespace Utils
+ {
+ namespace Stream
+ {
+ PreallocatedStreamBuf::PreallocatedStreamBuf(unsigned char* buffer, uint64_t lengthToRead) :
+ m_underlyingBuffer(buffer), m_lengthToRead(lengthToRead)
+ {
+ char* end = reinterpret_cast<char*>(m_underlyingBuffer + m_lengthToRead);
+ char* begin = reinterpret_cast<char*>(m_underlyingBuffer);
+ setp(begin, end);
+ setg(begin, begin, end);
+ }
+
+ PreallocatedStreamBuf::pos_type PreallocatedStreamBuf::seekoff(off_type off, std::ios_base::seekdir dir, std::ios_base::openmode which)
+ {
+ if (dir == std::ios_base::beg)
+ {
+ return seekpos(off, which);
+ }
+ else if (dir == std::ios_base::end)
+ {
+ return seekpos(m_lengthToRead - off, which);
+ }
+ else if (dir == std::ios_base::cur)
+ {
+ if(which == std::ios_base::in)
+ {
+ return seekpos((gptr() - reinterpret_cast<char*>(m_underlyingBuffer)) + off, which);
+ }
+ else
+ {
+ return seekpos((pptr() - reinterpret_cast<char*>(m_underlyingBuffer)) + off, which);
+ }
+ }
+
+ return off_type(-1);
+ }
+
+ PreallocatedStreamBuf::pos_type PreallocatedStreamBuf::seekpos(pos_type pos, std::ios_base::openmode which)
+ {
+ assert(static_cast<size_t>(pos) <= m_lengthToRead);
+ if (static_cast<size_t>(pos) > m_lengthToRead)
+ {
+ return pos_type(off_type(-1));
+ }
+
+ char* end = reinterpret_cast<char*>(m_underlyingBuffer + m_lengthToRead);
+ char* begin = reinterpret_cast<char*>(m_underlyingBuffer);
+
+ if (which == std::ios_base::in)
+ {
+ setg(begin, begin + static_cast<size_t>(pos), end);
+ }
+
+ if (which == std::ios_base::out)
+ {
+ setp(begin + static_cast<size_t>(pos), end);
+ }
+
+ return pos;
+ }
+ }
+ }
+}
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp
new file mode 100644
index 0000000000..6d1f90ed12
--- /dev/null
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp
@@ -0,0 +1,91 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/core/utils/stream/ResponseStream.h>
+#include <aws/core/utils/memory/stl/AWSStringStream.h>
+
+#if defined(_GLIBCXX_FULLY_DYNAMIC_STRING) && _GLIBCXX_FULLY_DYNAMIC_STRING == 0 && defined(__ANDROID__)
+#include <aws/core/utils/stream/SimpleStreamBuf.h>
+using DefaultStreamBufType = Aws::Utils::Stream::SimpleStreamBuf;
+#else
+using DefaultStreamBufType = Aws::StringBuf;
+#endif
+
+using namespace Aws::Utils::Stream;
+
+ResponseStream::ResponseStream(void) :
+ m_underlyingStream(nullptr)
+{
+}
+
+ResponseStream::ResponseStream(Aws::IOStream* underlyingStreamToManage) :
+ m_underlyingStream(underlyingStreamToManage)
+{
+}
+
+ResponseStream::ResponseStream(const Aws::IOStreamFactory& factory) :
+ m_underlyingStream(factory())
+{
+}
+
+ResponseStream::ResponseStream(ResponseStream&& toMove) : m_underlyingStream(toMove.m_underlyingStream)
+{
+ toMove.m_underlyingStream = nullptr;
+}
+
+ResponseStream& ResponseStream::operator=(ResponseStream&& toMove)
+{
+ if(m_underlyingStream == toMove.m_underlyingStream)
+ {
+ return *this;
+ }
+
+ ReleaseStream();
+ m_underlyingStream = toMove.m_underlyingStream;
+ toMove.m_underlyingStream = nullptr;
+
+ return *this;
+}
+
+ResponseStream::~ResponseStream()
+{
+ ReleaseStream();
+}
+
+void ResponseStream::ReleaseStream()
+{
+ if (m_underlyingStream)
+ {
+ m_underlyingStream->flush();
+ Aws::Delete(m_underlyingStream);
+ }
+
+ m_underlyingStream = nullptr;
+}
+
+static const char *DEFAULT_STREAM_TAG = "DefaultUnderlyingStream";
+
+DefaultUnderlyingStream::DefaultUnderlyingStream() :
+ Base( Aws::New< DefaultStreamBufType >( DEFAULT_STREAM_TAG ) )
+{}
+
+DefaultUnderlyingStream::DefaultUnderlyingStream(Aws::UniquePtr<std::streambuf> buf) :
+ Base(buf.release())
+{}
+
+DefaultUnderlyingStream::~DefaultUnderlyingStream()
+{
+ if( rdbuf() )
+ {
+ Aws::Delete( rdbuf() );
+ }
+}
+
+static const char* RESPONSE_STREAM_FACTORY_TAG = "ResponseStreamFactory";
+
+Aws::IOStream* Aws::Utils::Stream::DefaultResponseStreamFactoryMethod()
+{
+ return Aws::New<Aws::Utils::Stream::DefaultUnderlyingStream>(RESPONSE_STREAM_FACTORY_TAG);
+}
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp
new file mode 100644
index 0000000000..6e42994744
--- /dev/null
+++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp
@@ -0,0 +1,239 @@
+
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/core/utils/stream/SimpleStreamBuf.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+
+namespace Aws
+{
+namespace Utils
+{
+namespace Stream
+{
+
+static const uint32_t DEFAULT_BUFFER_SIZE = 100;
+static const char* SIMPLE_STREAMBUF_ALLOCATION_TAG = "SimpleStreamBufTag";
+
+SimpleStreamBuf::SimpleStreamBuf() :
+ m_buffer(nullptr),
+ m_bufferSize(0)
+{
+ m_buffer = Aws::NewArray<char>(DEFAULT_BUFFER_SIZE, SIMPLE_STREAMBUF_ALLOCATION_TAG);
+ m_bufferSize = DEFAULT_BUFFER_SIZE;
+
+ char* begin = m_buffer;
+ char* end = begin + m_bufferSize;
+
+ setp(begin, end);
+ setg(begin, begin, begin);
+}
+
+SimpleStreamBuf::SimpleStreamBuf(const Aws::String& value) :
+ m_buffer(nullptr),
+ m_bufferSize(0)
+{
+ size_t baseSize = (std::max)(value.size(), static_cast<std::size_t>(DEFAULT_BUFFER_SIZE));
+
+ m_buffer = Aws::NewArray<char>(baseSize, SIMPLE_STREAMBUF_ALLOCATION_TAG);
+ m_bufferSize = baseSize;
+
+ std::memcpy(m_buffer, value.c_str(), value.size());
+
+ char* begin = m_buffer;
+ char* end = begin + m_bufferSize;
+
+ setp(begin + value.size(), end);
+ setg(begin, begin, begin);
+}
+
+SimpleStreamBuf::~SimpleStreamBuf()
+{
+ if(m_buffer)
+ {
+ Aws::DeleteArray<char>(m_buffer);
+ m_buffer = nullptr;
+ }
+
+ m_bufferSize = 0;
+}
+
+std::streampos SimpleStreamBuf::seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which)
+{
+ if (dir == std::ios_base::beg)
+ {
+ return seekpos(off, which);
+ }
+ else if (dir == std::ios_base::end)
+ {
+ return seekpos((pptr() - m_buffer) - off, which);
+ }
+ else if (dir == std::ios_base::cur)
+ {
+ if(which == std::ios_base::in)
+ {
+ return seekpos((gptr() - m_buffer) + off, which);
+ }
+ else
+ {
+ return seekpos((pptr() - m_buffer) + off, which);
+ }
+ }
+
+ return off_type(-1);
+}
+
+std::streampos SimpleStreamBuf::seekpos(std::streampos pos, std::ios_base::openmode which)
+{
+ size_t maxSeek = pptr() - m_buffer;
+ assert(static_cast<size_t>(pos) <= maxSeek);
+ if (static_cast<size_t>(pos) > maxSeek)
+ {
+ return pos_type(off_type(-1));
+ }
+
+ if (which == std::ios_base::in)
+ {
+ setg(m_buffer, m_buffer + static_cast<size_t>(pos), pptr());
+ }
+
+ if (which == std::ios_base::out)
+ {
+ setp(m_buffer + static_cast<size_t>(pos), epptr());
+ }
+
+ return pos;
+}
+
+bool SimpleStreamBuf::GrowBuffer()
+{
+ size_t currentSize = m_bufferSize;
+ size_t newSize = currentSize * 2;
+
+ char* newBuffer = Aws::NewArray<char>(newSize, SIMPLE_STREAMBUF_ALLOCATION_TAG);
+ if(newBuffer == nullptr)
+ {
+ return false;
+ }
+
+ if(currentSize > 0)
+ {
+ std::memcpy(newBuffer, m_buffer, currentSize);
+ }
+
+ if(m_buffer)
+ {
+ Aws::DeleteArray<char>(m_buffer);
+ }
+
+ m_buffer = newBuffer;
+ m_bufferSize = newSize;
+
+ return true;
+}
+
+int SimpleStreamBuf::overflow (int c)
+{
+ auto endOfFile = std::char_traits< char >::eof();
+ if(c == endOfFile)
+ {
+ return endOfFile;
+ }
+
+ char* old_begin = m_buffer;
+
+ char *old_pptr = pptr();
+ char *old_gptr = gptr();
+ char *old_egptr = egptr();
+
+ size_t currentWritePosition = m_bufferSize;
+
+ if(!GrowBuffer())
+ {
+ return endOfFile;
+ }
+
+ char* new_begin = m_buffer;
+ char* new_end = new_begin + m_bufferSize;
+
+ setp(new_begin + (old_pptr - old_begin) + 1, new_end);
+ setg(new_begin, new_begin + (old_gptr - old_begin), new_begin + (old_egptr - old_begin));
+
+ auto val = std::char_traits< char >::to_char_type(c);
+ *(new_begin + currentWritePosition) = val;
+
+ return c;
+}
+
+std::streamsize SimpleStreamBuf::xsputn(const char* s, std::streamsize n)
+{
+ std::streamsize writeCount = 0;
+ while(writeCount < n)
+ {
+ char* current_pptr = pptr();
+ char* current_epptr = epptr();
+
+ if (current_pptr < current_epptr)
+ {
+ std::size_t copySize = (std::min)(static_cast< std::size_t >(n - writeCount),
+ static_cast< std::size_t >(current_epptr - current_pptr));
+
+ std::memcpy(current_pptr, s + writeCount, copySize);
+ writeCount += copySize;
+ setp(current_pptr + copySize, current_epptr);
+ setg(m_buffer, gptr(), pptr());
+ }
+ else if (overflow(std::char_traits< char >::to_int_type(*(s + writeCount))) != std::char_traits<char>::eof())
+ {
+ writeCount++;
+ }
+ else
+ {
+ return writeCount;
+ }
+ }
+
+ return writeCount;
+}
+
+Aws::String SimpleStreamBuf::str() const
+{
+ return Aws::String(m_buffer, pptr());
+}
+
+int SimpleStreamBuf::underflow()
+{
+ if(egptr() != pptr())
+ {
+ setg(m_buffer, gptr(), pptr());
+ }
+
+ if(gptr() != egptr())
+ {
+ return std::char_traits< char >::to_int_type(*gptr());
+ }
+ else
+ {
+ return std::char_traits< char >::eof();
+ }
+}
+
+void SimpleStreamBuf::str(const Aws::String& value)
+{
+ char* begin = m_buffer;
+ char* end = begin + m_bufferSize;
+
+ setp(begin, end);
+ setg(begin, begin, begin);
+
+ xsputn(value.c_str(), value.size());
+}
+
+}
+}
+}