diff options
author | babenko <babenko@yandex-team.com> | 2024-06-06 20:05:13 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-06-06 20:15:37 +0300 |
commit | 537a7b1307461881b8abc1381ba9f79e6104f439 (patch) | |
tree | 425b2739a6a653a916a05be3818935d1e8bd8126 | |
parent | 7e1624f2fd672349e2f89baffeca12ef8272ec8d (diff) | |
download | ydb-537a7b1307461881b8abc1381ba9f79e6104f439.tar.gz |
Refactor and improve yt/yt/core/net compression facilities
1) Never throw/waitfor in functions returning futures, this is against the usual contract
2) Set proper buffer sizes for all codecs (this fixes 8KB reads with, e.g., gzip)
2) Add roundtrip tests
3) Hide some details in NDetail
972ddeadd5053970f5af5bdfdf7f424f4b3611e8
-rw-r--r-- | yt/yt/core/concurrency/async_stream.cpp | 17 | ||||
-rw-r--r-- | yt/yt/core/concurrency/async_stream.h | 7 | ||||
-rw-r--r-- | yt/yt/core/http/compression.cpp | 243 | ||||
-rw-r--r-- | yt/yt/core/http/compression.h | 29 | ||||
-rw-r--r-- | yt/yt/core/http/compression_detail.h | 25 | ||||
-rw-r--r-- | yt/yt/core/http/compression_opensource.cpp | 26 | ||||
-rw-r--r-- | yt/yt/core/http/connection_reuse_helpers.cpp | 8 | ||||
-rw-r--r-- | yt/yt/core/http/connection_reuse_helpers.h | 8 | ||||
-rw-r--r-- | yt/yt/core/http/public.h | 4 | ||||
-rw-r--r-- | yt/yt/core/http/unittests/http_ut.cpp | 143 |
10 files changed, 304 insertions, 206 deletions
diff --git a/yt/yt/core/concurrency/async_stream.cpp b/yt/yt/core/concurrency/async_stream.cpp index 6c43031fe0..e89cda52c9 100644 --- a/yt/yt/core/concurrency/async_stream.cpp +++ b/yt/yt/core/concurrency/async_stream.cpp @@ -393,7 +393,7 @@ IAsyncZeroCopyInputStreamPtr CreateZeroCopyAdapter( size_t blockSize) { YT_VERIFY(underlyingStream); - return New<TZeroCopyInputStreamAdapter>(underlyingStream, blockSize); + return New<TZeroCopyInputStreamAdapter>(std::move(underlyingStream), blockSize); } //////////////////////////////////////////////////////////////////////////////// @@ -450,7 +450,7 @@ private: IAsyncInputStreamPtr CreateCopyingAdapter(IAsyncZeroCopyInputStreamPtr underlyingStream) { - return New<TCopyingInputStreamAdapter>(underlyingStream); + return New<TCopyingInputStreamAdapter>(std::move(underlyingStream)); } //////////////////////////////////////////////////////////////////////////////// @@ -570,7 +570,7 @@ private: IAsyncZeroCopyOutputStreamPtr CreateZeroCopyAdapter(IAsyncOutputStreamPtr underlyingStream) { - return New<TZeroCopyOutputStreamAdapter>(underlyingStream); + return New<TZeroCopyOutputStreamAdapter>(std::move(underlyingStream)); } //////////////////////////////////////////////////////////////////////////////// @@ -711,7 +711,7 @@ IAsyncZeroCopyInputStreamPtr CreatePrefetchingAdapter( IAsyncZeroCopyInputStreamPtr underlyingStream, size_t windowSize) { - return New<TPrefetchingInputStreamAdapter>(underlyingStream, windowSize); + return New<TPrefetchingInputStreamAdapter>(std::move(underlyingStream), windowSize); } //////////////////////////////////////////////////////////////////////////////// @@ -839,7 +839,7 @@ IAsyncZeroCopyInputStreamPtr CreateBufferingAdapter( IAsyncInputStreamPtr underlyingStream, size_t windowSize) { - return New<TBufferingInputStreamAdapter>(underlyingStream, windowSize); + return New<TBufferingInputStreamAdapter>(std::move(underlyingStream), windowSize); } //////////////////////////////////////////////////////////////////////////////// @@ -1017,15 +1017,14 @@ private: IAsyncZeroCopyInputStreamPtr CreateConcurrentAdapter( IAsyncZeroCopyInputStreamPtr underlyingStream) { - return New<TConcurrentInputStreamAdapter>(underlyingStream); + return New<TConcurrentInputStreamAdapter>(std::move(underlyingStream)); } //////////////////////////////////////////////////////////////////////////////// -// NB(levysotsky): Doesn't close the output stream. void PipeInputToOutput( - IAsyncZeroCopyInputStreamPtr input, - IAsyncOutputStreamPtr output) + const IAsyncZeroCopyInputStreamPtr& input, + const IAsyncOutputStreamPtr& output) { while (true) { auto asyncBlock = input->Read(); diff --git a/yt/yt/core/concurrency/async_stream.h b/yt/yt/core/concurrency/async_stream.h index ea07583c18..ae5e0880ba 100644 --- a/yt/yt/core/concurrency/async_stream.h +++ b/yt/yt/core/concurrency/async_stream.h @@ -141,7 +141,7 @@ DEFINE_REFCOUNTED_TYPE(IAsyncZeroCopyInputStream) //! Creates a zero-copy adapter from a given asynchronous stream. IAsyncZeroCopyInputStreamPtr CreateZeroCopyAdapter( IAsyncInputStreamPtr underlyingStream, - size_t blockSize = 64 * 1024); + size_t blockSize = 64_KB); //! Creates a copying adapter from a given asynchronous zero-copy stream. IAsyncInputStreamPtr CreateCopyingAdapter(IAsyncZeroCopyInputStreamPtr underlyingStream); @@ -215,9 +215,10 @@ IAsyncZeroCopyInputStreamPtr CreateConcurrentAdapter( //////////////////////////////////////////////////////////////////////////////// +// NB(levysotsky): Doesn't close the output stream. void PipeInputToOutput( - NConcurrency::IAsyncZeroCopyInputStreamPtr input, - NConcurrency::IAsyncOutputStreamPtr output); + const IAsyncZeroCopyInputStreamPtr& input, + const IAsyncOutputStreamPtr& output); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/compression.cpp b/yt/yt/core/http/compression.cpp index 5771f99c74..e719f54262 100644 --- a/yt/yt/core/http/compression.cpp +++ b/yt/yt/core/http/compression.cpp @@ -1,7 +1,11 @@ #include "compression.h" +#include "compression_detail.h" + #include <yt/yt/core/ytree/serialize.h> +#include <yt/yt/core/compression/dictionary_codec.h> + #include <library/cpp/streams/brotli/brotli.h> #include <library/cpp/blockcodecs/codecs.h> @@ -11,6 +15,7 @@ namespace NYT::NHttp { +using namespace NHttp::NDetail; using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// @@ -33,7 +38,7 @@ DEFINE_REFCOUNTED_TYPE(TStreamHolder) TFuture<void> TSharedRefOutputStream::Write(const TSharedRef& buffer) { - Refs_.push_back(TSharedRef::MakeCopy<TDefaultSharedBlobTag>(buffer)); + Parts_.push_back(TSharedRef::MakeCopy<TDefaultSharedBlobTag>(buffer)); return VoidFuture; } @@ -47,9 +52,9 @@ TFuture<void> TSharedRefOutputStream::Close() return VoidFuture; } -const std::vector<TSharedRef>& TSharedRefOutputStream::GetRefs() const +std::vector<TSharedRef> TSharedRefOutputStream::Finish() { - return Refs_; + return std::move(Parts_); } //////////////////////////////////////////////////////////////////////////////// @@ -61,9 +66,11 @@ class TCompressingOutputStream public: TCompressingOutputStream( IAsyncOutputStreamPtr underlying, - TContentEncoding contentEncoding) - : Underlying_(underlying) - , ContentEncoding_(contentEncoding) + TContentEncoding contentEncoding, + IInvokerPtr compressionInvoker) + : Underlying_(std::move(underlying)) + , ContentEncoding_(std::move(contentEncoding)) + , CompressionInvoker_(std::move(compressionInvoker)) { } ~TCompressingOutputStream() @@ -74,48 +81,39 @@ public: TFuture<void> Write(const TSharedRef& buffer) override { - Holder_ = buffer; - - CreateCompressor(); - - if (Finished_) { - THROW_ERROR_EXCEPTION("Attempting write to closed compression stream"); - } - - Compressor_->Write(buffer.Begin(), buffer.Size()); - return VoidFuture; + return BIND(&TCompressingOutputStream::DoWriteCompressor, MakeStrong(this), buffer) + .AsyncVia(CompressionInvoker_) + .Run(); } TFuture<void> Flush() override { - CreateCompressor(); - Compressor_->Flush(); - return VoidFuture; + return BIND(&TCompressingOutputStream::DoFlushCompressor, MakeStrong(this)) + .AsyncVia(CompressionInvoker_) + .Run(); } TFuture<void> Close() override { - if (!Finished_) { - Finished_ = true; - CreateCompressor(); - Compressor_->Finish(); - } - return VoidFuture; + return BIND(&TCompressingOutputStream::DoFinishCompressor, MakeStrong(this)) + .AsyncVia(CompressionInvoker_) + .Run(); } private: const IAsyncOutputStreamPtr Underlying_; const TContentEncoding ContentEncoding_; + const IInvokerPtr CompressionInvoker_; // NB: Arcadia streams got some "interesting" ideas about // exception handling and the role of destructors in the C++ // programming language. - TSharedRef Holder_; bool Destroying_ = false; bool Finished_ = false; std::unique_ptr<IOutputStream> Compressor_; - void CreateCompressor() + + void EnsureCompressorCreated() { if (Compressor_) { return; @@ -125,17 +123,17 @@ private: Compressor_.reset(new NBlockCodecs::TCodedOutput( this, NBlockCodecs::Codec(ContentEncoding_.substr(2)), - DefaultStreamBufferSize)); + DefaultCompressionBufferSize)); return; } if (ContentEncoding_ == "gzip") { - Compressor_.reset(new TZLibCompress(this, ZLib::GZip, 4, DefaultStreamBufferSize)); + Compressor_.reset(new TZLibCompress(this, ZLib::GZip, 4, DefaultCompressionBufferSize)); return; } if (ContentEncoding_ == "deflate") { - Compressor_.reset(new TZLibCompress(this, ZLib::ZLib, 4, DefaultStreamBufferSize)); + Compressor_.reset(new TZLibCompress(this, ZLib::ZLib, 4, DefaultCompressionBufferSize)); return; } @@ -152,13 +150,39 @@ private: << TErrorAttribute("content_encoding", ToString(ContentEncoding_)); } + void DoWriteCompressor(const TSharedRef& buffer) + { + if (Finished_) { + THROW_ERROR_EXCEPTION("Attempting write to closed compression stream"); + } + + EnsureCompressorCreated(); + Compressor_->Write(buffer.Begin(), buffer.Size()); + } + + void DoFlushCompressor() + { + EnsureCompressorCreated(); + Compressor_->Flush(); + } + + void DoFinishCompressor() + { + if (Finished_) { + return; + } + Finished_ = true; + EnsureCompressorCreated(); + Compressor_->Finish(); + } + void DoWrite(const void* buf, size_t len) override { if (Destroying_) { return; } - WaitFor(Underlying_->Write(TSharedRef(buf, len, New<TStreamHolder>(this)))) + WaitForFast(Underlying_->Write(TSharedRef(buf, len, New<TStreamHolder>(this)))) .ThrowOnError(); } @@ -171,7 +195,7 @@ private: return; } - WaitFor(Underlying_->Close()) + WaitForFast(Underlying_->Close()) .ThrowOnError(); } }; @@ -187,49 +211,33 @@ class TDecompressingInputStream public: TDecompressingInputStream( IAsyncZeroCopyInputStreamPtr underlying, - TContentEncoding contentEncoding) - : Underlying_(underlying) - , ContentEncoding_(contentEncoding) + TContentEncoding contentEncoding, + IInvokerPtr compressionInvoker) + : Underlying_(std::move(underlying)) + , ContentEncoding_(std::move(contentEncoding)) + , CompressionInvoker_(std::move(compressionInvoker)) { } TFuture<size_t> Read(const TSharedMutableRef& buffer) override { - CreateDecompressorIfNeeded(); - return MakeFuture<size_t>(Decompressor_->Read(buffer.Begin(), buffer.Size())); + return BIND(&TDecompressingInputStream::DoReadDecompressor, MakeStrong(this), buffer) + .AsyncVia(CompressionInvoker_) + .Run(); } private: const IAsyncZeroCopyInputStreamPtr Underlying_; const TContentEncoding ContentEncoding_; + const IInvokerPtr CompressionInvoker_; std::unique_ptr<IInputStream> Decompressor_; - TSharedRef LastRead_; - bool IsEnd_ = false; + bool CompressedEos_ = false; + bool DecompressedEos_ = false; + TSharedRef CompressedBlock_; + size_t CompressedBlockOffset_ = 0; - size_t DoRead(void* buf, size_t len) override - { - if (IsEnd_) { - return 0; - } - - if (LastRead_.Empty()) { - LastRead_ = WaitFor(Underlying_->Read()) - .ValueOrThrow(); - IsEnd_ = LastRead_.Empty(); - } - - size_t readSize = std::min(len, LastRead_.Size()); - std::copy(LastRead_.Begin(), LastRead_.Begin() + readSize, reinterpret_cast<char*>(buf)); - if (readSize != LastRead_.Size()) { - LastRead_ = LastRead_.Slice(readSize, LastRead_.Size()); - } else { - LastRead_ = {}; - } - return readSize; - } - - void CreateDecompressorIfNeeded() + void EnsureDecompressorCreated() { if (Decompressor_) { return; @@ -243,12 +251,12 @@ private: } if (ContentEncoding_ == "gzip" || ContentEncoding_ == "deflate") { - Decompressor_.reset(new TZLibDecompress(this)); + Decompressor_.reset(new TZLibDecompress(this, ZLib::Auto, DefaultCompressionBufferSize)); return; } if (ContentEncoding_ == "br") { - Decompressor_.reset(new TBrotliDecompress(this)); + Decompressor_.reset(new TBrotliDecompress(this, DefaultCompressionBufferSize)); return; } @@ -259,15 +267,63 @@ private: THROW_ERROR_EXCEPTION("Unsupported content encoding") << TErrorAttribute("content_encoding", ContentEncoding_); } + + size_t DoReadDecompressor(const TSharedMutableRef& uncompressedBuffer) + { + if (DecompressedEos_) { + return 0; + } + + EnsureDecompressorCreated(); + + size_t offset = 0; + while (offset < uncompressedBuffer.size()) { + auto bytesRead = Decompressor_->Read(uncompressedBuffer.begin() + offset, uncompressedBuffer.size() - offset); + if (bytesRead == 0) { + DecompressedEos_ = true; + break; + } + offset += bytesRead; + } + return offset; + } + + size_t DoRead(void* buf, size_t len) override + { + if (CompressedEos_) { + return 0; + } + + size_t offset = 0; + while (offset < len) { + if (!CompressedBlock_) { + CompressedBlockOffset_ = 0; + CompressedBlock_ = WaitForFast(Underlying_->Read()) + .ValueOrThrow(); + if (!CompressedBlock_) { + CompressedEos_ = true; + break; + } + } + + auto bytesRead = std::min(len - offset, CompressedBlock_.size() - CompressedBlockOffset_); + memcpy(static_cast<char*>(buf) + offset, CompressedBlock_.begin() + CompressedBlockOffset_, bytesRead); + offset += bytesRead; + CompressedBlockOffset_ += bytesRead; + + if (CompressedBlockOffset_ == CompressedBlock_.size()) { + CompressedBlock_ = {}; + } + } + return offset; + } }; DEFINE_REFCOUNTED_TYPE(TDecompressingInputStream) //////////////////////////////////////////////////////////////////////////////// -const TContentEncoding IdentityContentEncoding = "identity"; - -bool IsCompressionSupported(const TContentEncoding& contentEncoding) +bool IsContentEncodingSupported(const TContentEncoding& contentEncoding) { if (contentEncoding.StartsWith("z-")) { try { @@ -278,29 +334,30 @@ bool IsCompressionSupported(const TContentEncoding& contentEncoding) } } - for (const auto& supported : SupportedCompressions) { - if (supported == contentEncoding) { - return true; - } + if (Find(GetInternallySupportedContentEncodings(), contentEncoding) != GetInternallySupportedContentEncodings().end()) { + return true; } return false; } -std::vector<TContentEncoding> GetSupportedCompressions() +const std::vector<TContentEncoding>& GetSupportedContentEncodings() { - auto result = SupportedCompressions; - for (auto blockCodec : NBlockCodecs::ListAllCodecs()) { - result.push_back(TString("z-") + blockCodec); - } + static const auto result = [] { + auto result = GetInternallySupportedContentEncodings(); + for (auto blockCodec : NBlockCodecs::ListAllCodecs()) { + result.push_back(TString("z-") + blockCodec); + } + return result; + }(); return result; } -// NOTE: Does not implement the spec, but a reasonable approximation. -TErrorOr<TContentEncoding> GetBestAcceptedEncoding(const TString& clientAcceptEncodingHeader) +// NB: Does not implement the spec, but a reasonable approximation. +TErrorOr<TContentEncoding> GetBestAcceptedContentEncoding(const TString& clientAcceptEncodingHeader) { auto bestPosition = TString::npos; - TContentEncoding bestEncoding; + std::optional<TContentEncoding> bestEncoding; auto checkCandidate = [&] (const TString& candidate, size_t position) { if (position != TString::npos && (bestPosition == TString::npos || position < bestPosition)) { @@ -309,7 +366,7 @@ TErrorOr<TContentEncoding> GetBestAcceptedEncoding(const TString& clientAcceptEn } }; - for (const auto& candidate : SupportedCompressions) { + for (const auto& candidate : GetInternallySupportedContentEncodings()) { if (candidate == "x-lzop") { continue; } @@ -319,32 +376,40 @@ TErrorOr<TContentEncoding> GetBestAcceptedEncoding(const TString& clientAcceptEn } for (const auto& blockcodec : NBlockCodecs::ListAllCodecs()) { - auto candidate = TString{"z-"} + blockcodec; + auto candidate = TString("z-") + blockcodec; auto position = clientAcceptEncodingHeader.find(candidate); checkCandidate(candidate, position); } - if (!bestEncoding.empty()) { - return bestEncoding; + if (!bestEncoding) { + return TError("Could not determine feasible content encoding given accept encoding constraints") + << TErrorAttribute("client_accept_encoding", clientAcceptEncodingHeader); } - return TError("Could not determine feasible Content-Encoding given Accept-Encoding constraints") - << TErrorAttribute("client_accept_encoding", clientAcceptEncodingHeader); + return *bestEncoding; } IFlushableAsyncOutputStreamPtr CreateCompressingAdapter( IAsyncOutputStreamPtr underlying, - TContentEncoding contentEncoding) + TContentEncoding contentEncoding, + IInvokerPtr compressionInvoker) { - return New<TCompressingOutputStream>(underlying, contentEncoding); + return New<TCompressingOutputStream>( + std::move(underlying), + std::move(contentEncoding), + std::move(compressionInvoker)); } IAsyncInputStreamPtr CreateDecompressingAdapter( IAsyncZeroCopyInputStreamPtr underlying, - TContentEncoding contentEncoding) + TContentEncoding contentEncoding, + IInvokerPtr compressionInvoker) { - return New<TDecompressingInputStream>(underlying, contentEncoding); + return New<TDecompressingInputStream>( + std::move(underlying), + std::move(contentEncoding), + std::move(compressionInvoker)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/compression.h b/yt/yt/core/http/compression.h index 7b0da27f7f..5837c9328d 100644 --- a/yt/yt/core/http/compression.h +++ b/yt/yt/core/http/compression.h @@ -16,40 +16,31 @@ public: TFuture<void> Flush() override; TFuture<void> Close() override; - const std::vector<TSharedRef>& GetRefs() const; + std::vector<TSharedRef> Finish(); private: - std::vector<TSharedRef> Refs_; + std::vector<TSharedRef> Parts_; }; DEFINE_REFCOUNTED_TYPE(TSharedRefOutputStream) //////////////////////////////////////////////////////////////////////////////// -bool IsCompressionSupported(const TContentEncoding& contentEncoding); +inline const TContentEncoding IdentityContentEncoding = "identity"; +const std::vector<TContentEncoding>& GetSupportedContentEncodings(); +bool IsContentEncodingSupported(const TContentEncoding& contentEncoding); +TErrorOr<TContentEncoding> GetBestAcceptedContentEncoding(const TString& clientAcceptEncodingHeader); -std::vector<TContentEncoding> GetSupportedCompressions(); - -extern const TContentEncoding IdentityContentEncoding; -extern const std::vector<TContentEncoding> SupportedCompressions; - -TErrorOr<TContentEncoding> GetBestAcceptedEncoding(const TString& clientAcceptEncodingHeader); +//////////////////////////////////////////////////////////////////////////////// NConcurrency::IFlushableAsyncOutputStreamPtr CreateCompressingAdapter( NConcurrency::IAsyncOutputStreamPtr underlying, - TContentEncoding contentEncoding); - + TContentEncoding contentEncoding, + IInvokerPtr compressionInvoker); NConcurrency::IAsyncInputStreamPtr CreateDecompressingAdapter( NConcurrency::IAsyncZeroCopyInputStreamPtr underlying, - TContentEncoding contentEncoding); - -std::unique_ptr<IOutputStream> TryDetectOptionalCompressors( - TContentEncoding contentEncoding, - IOutputStream* inner); - -std::unique_ptr<IInputStream> TryDetectOptionalDecompressors( TContentEncoding contentEncoding, - IInputStream* inner); + IInvokerPtr compressionInvoker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/compression_detail.h b/yt/yt/core/http/compression_detail.h new file mode 100644 index 0000000000..1110f71c4b --- /dev/null +++ b/yt/yt/core/http/compression_detail.h @@ -0,0 +1,25 @@ +#pragma once + +#include "public.h" + +#include <util/generic/size_literals.h> + +namespace NYT::NHttp::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +// NB: some codecs (e.g. lzop) accept ui16 as buffer size. +constexpr size_t DefaultCompressionBufferSize = 32_KB; + +std::unique_ptr<IOutputStream> TryDetectOptionalCompressors( + const TContentEncoding& contentEncoding, + IOutputStream* inner); +std::unique_ptr<IInputStream> TryDetectOptionalDecompressors( + const TContentEncoding& contentEncoding, + IInputStream* inner); + +const std::vector<TContentEncoding>& GetInternallySupportedContentEncodings(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttp::NDetail diff --git a/yt/yt/core/http/compression_opensource.cpp b/yt/yt/core/http/compression_opensource.cpp index 8c650027fb..8c2d649e1e 100644 --- a/yt/yt/core/http/compression_opensource.cpp +++ b/yt/yt/core/http/compression_opensource.cpp @@ -1,27 +1,33 @@ +#include "compression_detail.h" + #include "compression.h" -namespace NYT::NHttp { +namespace NYT::NHttp::NDetail { //////////////////////////////////////////////////////////////////////////////// -const std::vector<TContentEncoding> SupportedCompressions = { - "gzip", - IdentityContentEncoding, - "br", - "deflate", -}; +const std::vector<TContentEncoding>& GetInternallySupportedContentEncodings() +{ + static const std::vector<TContentEncoding> result = { + "gzip", + IdentityContentEncoding, + "br", + "deflate", + }; + return result; +} //////////////////////////////////////////////////////////////////////////////// std::unique_ptr<IOutputStream> TryDetectOptionalCompressors( - TContentEncoding /*contentEncoding*/, + const TContentEncoding& /*contentEncoding*/, IOutputStream* /*inner*/) { return nullptr; } std::unique_ptr<IInputStream> TryDetectOptionalDecompressors( - TContentEncoding /*contentEncoding*/, + const TContentEncoding& /*contentEncoding*/, IInputStream* /*inner*/) { return nullptr; @@ -29,4 +35,4 @@ std::unique_ptr<IInputStream> TryDetectOptionalDecompressors( //////////////////////////////////////////////////////////////////////////////// -} // namespace NYT::NHttp +} // namespace NYT::NHttp::NDetail diff --git a/yt/yt/core/http/connection_reuse_helpers.cpp b/yt/yt/core/http/connection_reuse_helpers.cpp index 2f500046c4..55ba238a5e 100644 --- a/yt/yt/core/http/connection_reuse_helpers.cpp +++ b/yt/yt/core/http/connection_reuse_helpers.cpp @@ -4,10 +4,10 @@ #include <yt/yt/core/net/connection.h> -//////////////////////////////////////////////////////////////////////////////// - namespace NYT::NHttp::NDetail { +//////////////////////////////////////////////////////////////////////////////// + TReusableConnectionState::TReusableConnectionState( NNet::IConnectionPtr connection, TConnectionPoolPtr owningPool) @@ -22,6 +22,6 @@ TReusableConnectionState::~TReusableConnectionState() } } -} // namespace NDetail - //////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttp::NDetail diff --git a/yt/yt/core/http/connection_reuse_helpers.h b/yt/yt/core/http/connection_reuse_helpers.h index 5ac736166c..bb3762bb6d 100644 --- a/yt/yt/core/http/connection_reuse_helpers.h +++ b/yt/yt/core/http/connection_reuse_helpers.h @@ -4,10 +4,10 @@ #include <yt/yt/core/net/public.h> -//////////////////////////////////////////////////////////////////////////////// - namespace NYT::NHttp::NDetail { +//////////////////////////////////////////////////////////////////////////////// + //! Responsible for returning the connection to the owning pool //! if it could be reused struct TReusableConnectionState final @@ -39,10 +39,10 @@ private: TReusableConnectionStatePtr ReusableState_; }; -} // namespace NDetail - //////////////////////////////////////////////////////////////////////////////// +} // namespace NYT::NHttp::NDetail + #define CONNECTION_REUSE_HELPERS_INL_H #include "connection_reuse_helpers-inl.h" #undef CONNECTION_REUSE_HELPERS_INL_H diff --git a/yt/yt/core/http/public.h b/yt/yt/core/http/public.h index 2d49ea41ed..74f0e8d29d 100644 --- a/yt/yt/core/http/public.h +++ b/yt/yt/core/http/public.h @@ -2,8 +2,6 @@ #include <yt/yt/core/misc/common.h> -#include <util/generic/size_literals.h> - namespace NYT::NHttp { //////////////////////////////////////////////////////////////////////////////// @@ -36,8 +34,6 @@ DECLARE_REFCOUNTED_CLASS(TSharedRefOutputStream) using TContentEncoding = TString; -static constexpr size_t DefaultStreamBufferSize = 32_KB; - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NHttp diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp index d8351afcec..8016b370ff 100644 --- a/yt/yt/core/http/unittests/http_ut.cpp +++ b/yt/yt/core/http/unittests/http_ut.cpp @@ -3,6 +3,7 @@ #include <yt/yt/core/http/client.h> #include <yt/yt/core/http/compression.h> +#include <yt/yt/core/http/compression_detail.h> #include <yt/yt/core/http/config.h> #include <yt/yt/core/http/connection_pool.h> #include <yt/yt/core/http/helpers.h> @@ -38,14 +39,15 @@ namespace NYT::NHttp { namespace { -using namespace NYT::NConcurrency; -using namespace NYT::NNet; -using namespace NYT::NCrypto; -using namespace NYT::NLogging; +using namespace NHttp::NDetail; +using namespace NConcurrency; +using namespace NNet; +using namespace NCrypto; +using namespace NLogging; //////////////////////////////////////////////////////////////////////////////// -TEST(THttpUrlParse, Simple) +TEST(TParseUrlTest, Simple) { TString example = "https://user@google.com:12345/a/b/c?foo=bar&zog=%20"; auto url = ParseUrl(example); @@ -62,7 +64,7 @@ TEST(THttpUrlParse, Simple) ASSERT_THROW(ParseUrl(TStringBuf("\0", 1)), TErrorException); } -TEST(THttpUrlParse, IPv4) +TEST(TParseUrlTest, IPv4) { TString example = "https://1.2.3.4:12345/"; auto url = ParseUrl(example); @@ -71,7 +73,7 @@ TEST(THttpUrlParse, IPv4) ASSERT_EQ(*url.Port, 12345); } -TEST(THttpUrlParse, IPv6) +TEST(TParseUrlTest, IPv6) { TString example = "https://[::1]:12345/"; auto url = ParseUrl(example); @@ -82,7 +84,7 @@ TEST(THttpUrlParse, IPv6) //////////////////////////////////////////////////////////////////////////////// -TEST(THttpCookie, ParseCookie) +TEST(TParseCookiesTest, ParseCookie) { TString cookieString = "yandexuid=706216621492423338; yandex_login=prime; _ym_d=1529669659; Cookie_check=1; _ym_isad=1;some_cookie_name= some_cookie_value ; abracadabra="; auto cookie = ParseCookies(cookieString); @@ -102,7 +104,7 @@ std::vector<TString> ToVector(const TCompactVector<TString, 1>& v) return std::vector<TString>(v.begin(), v.end()); } -TEST(THttpHeaders, Simple) +TEST(THeadersTest, Simple) { auto headers = New<THeaders>(); @@ -124,7 +126,7 @@ TEST(THttpHeaders, Simple) ASSERT_EQ(std::vector<TString>{{"J"}}, ToVector(headers->GetAll("X-Test"))); } -TEST(THttpHeaders, HeaderCaseIsIrrelevant) +TEST(THeadersTest, HeaderCaseIsIrrelevant) { auto headers = New<THeaders>(); @@ -141,7 +143,7 @@ TEST(THttpHeaders, HeaderCaseIsIrrelevant) } -TEST(THttpHeaders, MessedUpHeaderValuesAreNotAllowed) +TEST(THeadersTest, MessedUpHeaderValuesAreNotAllowed) { auto headers = New<THeaders>(); @@ -1394,65 +1396,21 @@ TEST(TRangeHeadersTest, Test) //////////////////////////////////////////////////////////////////////////////// -struct TTestOutputStream - : public NConcurrency::IAsyncOutputStream -{ - std::exception_ptr Exception; - TError Error; - - TFuture<void> Return() - { - if (Exception) { - std::rethrow_exception(Exception); - } - - return MakeFuture(Error); - } - - TFuture<void> Write(const TSharedRef& /*buffer*/) override - { - return Return(); - } - - TFuture<void> Close() override - { - return Return(); - } -}; +class TCompressionTest + : public ::testing::Test +{ }; -DEFINE_REFCOUNTED_TYPE(TTestOutputStream) - -TEST(TCompression, Segfault) -{ - auto out = New<TTestOutputStream>(); - auto compression = CreateCompressingAdapter(out, "br"); - - out->Error = TError("Write failed"); - try { - Y_UNUSED(compression->Write(TSharedRef::FromString("hello"))); - Y_UNUSED(compression->Close()); - } catch (const std::exception& ) { - } - - for (;;) { - try { - Y_UNUSED(compression->Write(TSharedRef::FromString("hello"))); - } catch (const std::exception& ) { - break; - } - } -} - -TEST(TCompression, StreamFlush) +TEST_W(TCompressionTest, Flush) { constexpr int IterationCount = 10; - for (const auto& compression : GetSupportedCompressions()) { - if (compression == IdentityContentEncoding) { + for (const auto& encoding : GetSupportedContentEncodings()) { + if (encoding == IdentityContentEncoding) { continue; } + TStringStream stringStream; auto asyncStream = CreateAsyncAdapter(static_cast<IOutputStream*>(&stringStream)); - auto compressionStream = CreateCompressingAdapter(asyncStream, compression); + auto compressionStream = CreateCompressingAdapter(asyncStream, encoding, GetCurrentInvoker()); auto previousLength = stringStream.Size(); for (int i = 0; i < IterationCount; ++i) { WaitFor(compressionStream->Write(TSharedRef("x", 1, nullptr))) @@ -1460,7 +1418,7 @@ TEST(TCompression, StreamFlush) WaitFor(compressionStream->Flush()) .ThrowOnError(); EXPECT_GT(stringStream.Size(), previousLength) - << "Output for stream " << compression << " has not grown on iteration " << i; + << "Output for stream " << encoding << " has not grown on iteration " << i; previousLength = stringStream.Size(); } WaitFor(compressionStream->Close()) @@ -1470,6 +1428,63 @@ TEST(TCompression, StreamFlush) } } +TEST_W(TCompressionTest, Roundtrip) +{ + constexpr size_t Size = 1000; + for (const auto& encoding : GetSupportedContentEncodings()) { + if (encoding == IdentityContentEncoding) { + continue; + } + + TString payload; + for (size_t i = 0; i < Size; i++) { + payload.push_back('a' + RandomNumber<size_t>(26)); + } + + auto compressedPayload = [&] { + TStringStream compressedStream; + auto asyncCompressedStream = CreateAsyncAdapter(static_cast<IOutputStream*>(&compressedStream)); + auto compressingStream = CreateCompressingAdapter(asyncCompressedStream, encoding, GetCurrentInvoker()); + size_t offset = 0; + while (offset < payload.size()) { + size_t len = std::min((size_t)10, payload.size() - offset);//RandomNumber<size_t>(std::min(payload.size() - offset, static_cast<size_t>(100))) + 1; + WaitFor(compressingStream->Write(TSharedRef(payload.data() + offset, len, nullptr))) + .ThrowOnError(); + offset += len; + } + + WaitFor(compressingStream->Close()) + .ThrowOnError(); + WaitFor(asyncCompressedStream->Close()) + .ThrowOnError(); + + return compressedStream.Str(); + }(); + + auto decompressedPayload = [&] { + TString decompressedPayload; + TStringInput compressedStream(compressedPayload); + auto asyncCompressedStream = CreateAsyncAdapter(static_cast<IInputStream*>(&compressedStream), GetCurrentInvoker()); + auto asyncZeroCopyCompressedStream = CreateZeroCopyAdapter(asyncCompressedStream, 1_KB); + auto decompressingStream = CreateDecompressingAdapter(asyncZeroCopyCompressedStream, encoding, GetCurrentInvoker()); + while (true) { + size_t len = RandomNumber<size_t>(100) + 1; + auto buffer = TSharedMutableRef::Allocate(len); + auto bytes = WaitFor(decompressingStream->Read(buffer)) + .ValueOrThrow(); + if (bytes == 0) { + break; + } + decompressedPayload += TStringBuf(buffer.data(), buffer.data() + bytes); + } + + return decompressedPayload; + }(); + + EXPECT_EQ(payload, decompressedPayload); + } +} + //////////////////////////////////////////////////////////////////////////////// } // namespace |