diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream')
4 files changed, 531 insertions, 0 deletions
diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp new file mode 100644 index 0000000000..3f59dbe96d --- /dev/null +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp @@ -0,0 +1,126 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include <aws/core/utils/stream/ConcurrentStreamBuf.h> +#include <aws/core/utils/logging/LogMacros.h> +#include <cstdint> +#include <cassert> + +namespace Aws +{ + namespace Utils + { + namespace Stream + { + const char TAG[] = "ConcurrentStreamBuf"; + ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) : + m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it. + m_eof(false) + { + m_getArea.reserve(bufferLength); + m_backbuf.reserve(bufferLength); + + char* pbegin = reinterpret_cast<char*>(&m_putArea[0]); + setp(pbegin, pbegin + bufferLength); + } + + void ConcurrentStreamBuf::SetEof() + { + { + std::unique_lock<std::mutex> lock(m_lock); + m_eof = true; + } + m_signal.notify_all(); + } + + void ConcurrentStreamBuf::FlushPutArea() + { + const size_t bitslen = pptr() - pbase(); + if (bitslen) + { + // scope the lock + { + std::unique_lock<std::mutex> lock(m_lock); + m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); }); + if (m_eof) + { + return; + } + std::copy(pbase(), pptr(), std::back_inserter(m_backbuf)); + } + m_signal.notify_one(); + char* pbegin = reinterpret_cast<char*>(&m_putArea[0]); + setp(pbegin, pbegin + m_putArea.size()); + } + } + + std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode) + { + return std::streamoff(-1); // Seeking is not supported. + } + + std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode) + { + return std::streamoff(-1); // Seeking is not supported. + } + + int ConcurrentStreamBuf::underflow() + { + { + std::unique_lock<std::mutex> lock(m_lock); + m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; }); + + if (m_eof && m_backbuf.empty()) + { + return std::char_traits<char>::eof(); + } + + m_getArea.clear(); // keep the get-area from growing unbounded. + std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea)); + m_backbuf.clear(); + } + m_signal.notify_one(); + char* gbegin = reinterpret_cast<char*>(&m_getArea[0]); + setg(gbegin, gbegin, gbegin + m_getArea.size()); + return std::char_traits<char>::to_int_type(*gptr()); + } + + std::streamsize ConcurrentStreamBuf::showmanyc() + { + std::unique_lock<std::mutex> lock(m_lock); + AWS_LOGSTREAM_TRACE(TAG, "stream how many character? " << m_backbuf.size()); + return m_backbuf.size(); + } + + int ConcurrentStreamBuf::overflow(int ch) + { + const auto eof = std::char_traits<char>::eof(); + + if (ch == eof) + { + FlushPutArea(); + return eof; + } + + FlushPutArea(); + { + std::unique_lock<std::mutex> lock(m_lock); + if (m_eof) + { + return eof; + } + *pptr() = static_cast<char>(ch); + pbump(1); + return ch; + } + } + + int ConcurrentStreamBuf::sync() + { + FlushPutArea(); + return 0; + } + } + } +} diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp new file mode 100644 index 0000000000..f656fc8613 --- /dev/null +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/PreallocatedStreamBuf.cpp @@ -0,0 +1,75 @@ + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/core/utils/stream/PreallocatedStreamBuf.h> +#include <cassert> + +namespace Aws +{ + namespace Utils + { + namespace Stream + { + PreallocatedStreamBuf::PreallocatedStreamBuf(unsigned char* buffer, uint64_t lengthToRead) : + m_underlyingBuffer(buffer), m_lengthToRead(lengthToRead) + { + char* end = reinterpret_cast<char*>(m_underlyingBuffer + m_lengthToRead); + char* begin = reinterpret_cast<char*>(m_underlyingBuffer); + setp(begin, end); + setg(begin, begin, end); + } + + PreallocatedStreamBuf::pos_type PreallocatedStreamBuf::seekoff(off_type off, std::ios_base::seekdir dir, std::ios_base::openmode which) + { + if (dir == std::ios_base::beg) + { + return seekpos(off, which); + } + else if (dir == std::ios_base::end) + { + return seekpos(m_lengthToRead - off, which); + } + else if (dir == std::ios_base::cur) + { + if(which == std::ios_base::in) + { + return seekpos((gptr() - reinterpret_cast<char*>(m_underlyingBuffer)) + off, which); + } + else + { + return seekpos((pptr() - reinterpret_cast<char*>(m_underlyingBuffer)) + off, which); + } + } + + return off_type(-1); + } + + PreallocatedStreamBuf::pos_type PreallocatedStreamBuf::seekpos(pos_type pos, std::ios_base::openmode which) + { + assert(static_cast<size_t>(pos) <= m_lengthToRead); + if (static_cast<size_t>(pos) > m_lengthToRead) + { + return pos_type(off_type(-1)); + } + + char* end = reinterpret_cast<char*>(m_underlyingBuffer + m_lengthToRead); + char* begin = reinterpret_cast<char*>(m_underlyingBuffer); + + if (which == std::ios_base::in) + { + setg(begin, begin + static_cast<size_t>(pos), end); + } + + if (which == std::ios_base::out) + { + setp(begin + static_cast<size_t>(pos), end); + } + + return pos; + } + } + } +} diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp new file mode 100644 index 0000000000..6d1f90ed12 --- /dev/null +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/ResponseStream.cpp @@ -0,0 +1,91 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/core/utils/stream/ResponseStream.h> +#include <aws/core/utils/memory/stl/AWSStringStream.h> + +#if defined(_GLIBCXX_FULLY_DYNAMIC_STRING) && _GLIBCXX_FULLY_DYNAMIC_STRING == 0 && defined(__ANDROID__) +#include <aws/core/utils/stream/SimpleStreamBuf.h> +using DefaultStreamBufType = Aws::Utils::Stream::SimpleStreamBuf; +#else +using DefaultStreamBufType = Aws::StringBuf; +#endif + +using namespace Aws::Utils::Stream; + +ResponseStream::ResponseStream(void) : + m_underlyingStream(nullptr) +{ +} + +ResponseStream::ResponseStream(Aws::IOStream* underlyingStreamToManage) : + m_underlyingStream(underlyingStreamToManage) +{ +} + +ResponseStream::ResponseStream(const Aws::IOStreamFactory& factory) : + m_underlyingStream(factory()) +{ +} + +ResponseStream::ResponseStream(ResponseStream&& toMove) : m_underlyingStream(toMove.m_underlyingStream) +{ + toMove.m_underlyingStream = nullptr; +} + +ResponseStream& ResponseStream::operator=(ResponseStream&& toMove) +{ + if(m_underlyingStream == toMove.m_underlyingStream) + { + return *this; + } + + ReleaseStream(); + m_underlyingStream = toMove.m_underlyingStream; + toMove.m_underlyingStream = nullptr; + + return *this; +} + +ResponseStream::~ResponseStream() +{ + ReleaseStream(); +} + +void ResponseStream::ReleaseStream() +{ + if (m_underlyingStream) + { + m_underlyingStream->flush(); + Aws::Delete(m_underlyingStream); + } + + m_underlyingStream = nullptr; +} + +static const char *DEFAULT_STREAM_TAG = "DefaultUnderlyingStream"; + +DefaultUnderlyingStream::DefaultUnderlyingStream() : + Base( Aws::New< DefaultStreamBufType >( DEFAULT_STREAM_TAG ) ) +{} + +DefaultUnderlyingStream::DefaultUnderlyingStream(Aws::UniquePtr<std::streambuf> buf) : + Base(buf.release()) +{} + +DefaultUnderlyingStream::~DefaultUnderlyingStream() +{ + if( rdbuf() ) + { + Aws::Delete( rdbuf() ); + } +} + +static const char* RESPONSE_STREAM_FACTORY_TAG = "ResponseStreamFactory"; + +Aws::IOStream* Aws::Utils::Stream::DefaultResponseStreamFactoryMethod() +{ + return Aws::New<Aws::Utils::Stream::DefaultUnderlyingStream>(RESPONSE_STREAM_FACTORY_TAG); +} diff --git a/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp new file mode 100644 index 0000000000..6e42994744 --- /dev/null +++ b/contrib/libs/aws-sdk-cpp/aws-cpp-sdk-core/source/utils/stream/SimpleStreamBuf.cpp @@ -0,0 +1,239 @@ + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/core/utils/stream/SimpleStreamBuf.h> + +#include <algorithm> +#include <cassert> +#include <cstring> + +namespace Aws +{ +namespace Utils +{ +namespace Stream +{ + +static const uint32_t DEFAULT_BUFFER_SIZE = 100; +static const char* SIMPLE_STREAMBUF_ALLOCATION_TAG = "SimpleStreamBufTag"; + +SimpleStreamBuf::SimpleStreamBuf() : + m_buffer(nullptr), + m_bufferSize(0) +{ + m_buffer = Aws::NewArray<char>(DEFAULT_BUFFER_SIZE, SIMPLE_STREAMBUF_ALLOCATION_TAG); + m_bufferSize = DEFAULT_BUFFER_SIZE; + + char* begin = m_buffer; + char* end = begin + m_bufferSize; + + setp(begin, end); + setg(begin, begin, begin); +} + +SimpleStreamBuf::SimpleStreamBuf(const Aws::String& value) : + m_buffer(nullptr), + m_bufferSize(0) +{ + size_t baseSize = (std::max)(value.size(), static_cast<std::size_t>(DEFAULT_BUFFER_SIZE)); + + m_buffer = Aws::NewArray<char>(baseSize, SIMPLE_STREAMBUF_ALLOCATION_TAG); + m_bufferSize = baseSize; + + std::memcpy(m_buffer, value.c_str(), value.size()); + + char* begin = m_buffer; + char* end = begin + m_bufferSize; + + setp(begin + value.size(), end); + setg(begin, begin, begin); +} + +SimpleStreamBuf::~SimpleStreamBuf() +{ + if(m_buffer) + { + Aws::DeleteArray<char>(m_buffer); + m_buffer = nullptr; + } + + m_bufferSize = 0; +} + +std::streampos SimpleStreamBuf::seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which) +{ + if (dir == std::ios_base::beg) + { + return seekpos(off, which); + } + else if (dir == std::ios_base::end) + { + return seekpos((pptr() - m_buffer) - off, which); + } + else if (dir == std::ios_base::cur) + { + if(which == std::ios_base::in) + { + return seekpos((gptr() - m_buffer) + off, which); + } + else + { + return seekpos((pptr() - m_buffer) + off, which); + } + } + + return off_type(-1); +} + +std::streampos SimpleStreamBuf::seekpos(std::streampos pos, std::ios_base::openmode which) +{ + size_t maxSeek = pptr() - m_buffer; + assert(static_cast<size_t>(pos) <= maxSeek); + if (static_cast<size_t>(pos) > maxSeek) + { + return pos_type(off_type(-1)); + } + + if (which == std::ios_base::in) + { + setg(m_buffer, m_buffer + static_cast<size_t>(pos), pptr()); + } + + if (which == std::ios_base::out) + { + setp(m_buffer + static_cast<size_t>(pos), epptr()); + } + + return pos; +} + +bool SimpleStreamBuf::GrowBuffer() +{ + size_t currentSize = m_bufferSize; + size_t newSize = currentSize * 2; + + char* newBuffer = Aws::NewArray<char>(newSize, SIMPLE_STREAMBUF_ALLOCATION_TAG); + if(newBuffer == nullptr) + { + return false; + } + + if(currentSize > 0) + { + std::memcpy(newBuffer, m_buffer, currentSize); + } + + if(m_buffer) + { + Aws::DeleteArray<char>(m_buffer); + } + + m_buffer = newBuffer; + m_bufferSize = newSize; + + return true; +} + +int SimpleStreamBuf::overflow (int c) +{ + auto endOfFile = std::char_traits< char >::eof(); + if(c == endOfFile) + { + return endOfFile; + } + + char* old_begin = m_buffer; + + char *old_pptr = pptr(); + char *old_gptr = gptr(); + char *old_egptr = egptr(); + + size_t currentWritePosition = m_bufferSize; + + if(!GrowBuffer()) + { + return endOfFile; + } + + char* new_begin = m_buffer; + char* new_end = new_begin + m_bufferSize; + + setp(new_begin + (old_pptr - old_begin) + 1, new_end); + setg(new_begin, new_begin + (old_gptr - old_begin), new_begin + (old_egptr - old_begin)); + + auto val = std::char_traits< char >::to_char_type(c); + *(new_begin + currentWritePosition) = val; + + return c; +} + +std::streamsize SimpleStreamBuf::xsputn(const char* s, std::streamsize n) +{ + std::streamsize writeCount = 0; + while(writeCount < n) + { + char* current_pptr = pptr(); + char* current_epptr = epptr(); + + if (current_pptr < current_epptr) + { + std::size_t copySize = (std::min)(static_cast< std::size_t >(n - writeCount), + static_cast< std::size_t >(current_epptr - current_pptr)); + + std::memcpy(current_pptr, s + writeCount, copySize); + writeCount += copySize; + setp(current_pptr + copySize, current_epptr); + setg(m_buffer, gptr(), pptr()); + } + else if (overflow(std::char_traits< char >::to_int_type(*(s + writeCount))) != std::char_traits<char>::eof()) + { + writeCount++; + } + else + { + return writeCount; + } + } + + return writeCount; +} + +Aws::String SimpleStreamBuf::str() const +{ + return Aws::String(m_buffer, pptr()); +} + +int SimpleStreamBuf::underflow() +{ + if(egptr() != pptr()) + { + setg(m_buffer, gptr(), pptr()); + } + + if(gptr() != egptr()) + { + return std::char_traits< char >::to_int_type(*gptr()); + } + else + { + return std::char_traits< char >::eof(); + } +} + +void SimpleStreamBuf::str(const Aws::String& value) +{ + char* begin = m_buffer; + char* end = begin + m_bufferSize; + + setp(begin, end); + setg(begin, begin, begin); + + xsputn(value.c_str(), value.size()); +} + +} +} +} |