#pragma once #include "zerocopy.h" #include "zerocopy_output.h" #include <utility> #include <util/generic/ptr.h> #include <util/generic/typetraits.h> #include <util/generic/store_policy.h> /** * @addtogroup Streams_Buffered * @{ */ /** * Input stream that wraps the given stream and adds a buffer on top of it, * thus making sure that data is read from the underlying stream in big chunks. * * Note that it does not claim ownership of the underlying stream, so it's up * to the user to free it. */ class TBufferedInput: public IZeroCopyInput { public: TBufferedInput(IInputStream* slave, size_t buflen = 8192); TBufferedInput(TBufferedInput&&) noexcept; TBufferedInput& operator=(TBufferedInput&&) noexcept; ~TBufferedInput() override; /** * Switches the underlying stream to the one provided. Does not clear the * data that was already buffered. * * @param slave New underlying stream. */ void Reset(IInputStream* slave); protected: size_t DoRead(void* buf, size_t len) override; size_t DoReadTo(TString& st, char ch) override; size_t DoSkip(size_t len) override; size_t DoNext(const void** ptr, size_t len) override; private: class TImpl; THolder<TImpl> Impl_; }; /** * Output stream that wraps the given stream and adds a buffer on top of it, * thus making sure that data is written to the underlying stream in big chunks. * * Note that by default this stream does not propagate `Flush` and `Finish` * calls to the underlying stream, instead simply flushing out the buffer. * You can change this behavior by using propagation mode setters. * * Also note that this stream does not claim ownership of the underlying stream, * so it's up to the user to free it. */ class TBufferedOutputBase: public IZeroCopyOutput { public: /** * Constructs a buffered stream that dynamically adjusts the size of the * buffer. This works best when the amount of data that will be passed * through this stream is not known and can range in size from several * kilobytes to several gigabytes. * * @param slave Underlying stream. */ TBufferedOutputBase(IOutputStream* slave); /** * Constructs a buffered stream with the given size of the buffer. * * @param slave Underlying stream. * @param buflen Size of the buffer. */ TBufferedOutputBase(IOutputStream* slave, size_t buflen); TBufferedOutputBase(TBufferedOutputBase&&) noexcept; TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept; ~TBufferedOutputBase() override; /** * @param propagate Whether `Flush` and `Finish` calls should * be propagated to the underlying stream. * By default they are not. */ inline void SetPropagateMode(bool propagate) noexcept { SetFlushPropagateMode(propagate); SetFinishPropagateMode(propagate); } /** * @param propagate Whether `Flush` calls should be propagated * to the underlying stream. By default they * are not. */ void SetFlushPropagateMode(bool propagate) noexcept; /** * @param propagate Whether `Finish` calls should be propagated * to the underlying stream. By default they * are not. */ void SetFinishPropagateMode(bool propagate) noexcept; class TImpl; protected: size_t DoNext(void** ptr) override; void DoUndo(size_t len) override; void DoWrite(const void* data, size_t len) override; void DoWriteC(char c) override; void DoFlush() override; void DoFinish() override; private: THolder<TImpl> Impl_; }; /** * Buffered output stream with a fixed-size buffer. * * @see TBufferedOutputBase */ class TBufferedOutput: public TBufferedOutputBase { public: TBufferedOutput(IOutputStream* slave, size_t buflen = 8192); ~TBufferedOutput() override; TBufferedOutput(TBufferedOutput&&) noexcept = default; TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default; }; /** * Buffered output stream that dynamically adjusts the size of the buffer based * on the amount of data that's passed through it. * * @see TBufferedOutputBase */ class TAdaptiveBufferedOutput: public TBufferedOutputBase { public: TAdaptiveBufferedOutput(IOutputStream* slave); ~TAdaptiveBufferedOutput() override; TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default; TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default; }; namespace NPrivate { struct TMyBufferedOutput: public TBufferedOutput { inline TMyBufferedOutput(IOutputStream* slave, size_t buflen) : TBufferedOutput(slave, buflen) { SetFinishPropagateMode(true); } }; template <class T> struct TBufferedStreamFor { using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>; }; } /** * A mixin class that turns unbuffered stream into a buffered one. * * Note that using this mixin with a stream that is already buffered won't * result in double buffering, e.g. `TBuffered<TBuffered<TUnbufferedFileInput>>` and * `TBuffered<TUnbufferedFileInput>` are basically the same types. * * Example usage: * @code * TBuffered<TUnbufferedFileInput> file_input(1024, "/path/to/file"); * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file"); * @endcode * Here 1024 is the size of the buffer. */ template <class TSlave> class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult { using TSlaveBase = TEmbedPolicy<TSlave>; using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult; public: template <typename... Args> inline TBuffered(size_t b, Args&&... args) : TSlaveBase(std::forward<Args>(args)...) , TBufferedBase(TSlaveBase::Ptr(), b) { } inline TSlave& Slave() noexcept { return *this->Ptr(); } TBuffered(const TBuffered&) = delete; TBuffered& operator=(const TBuffered&) = delete; TBuffered(TBuffered&&) = delete; TBuffered& operator=(TBuffered&&) = delete; }; /** * A mixin class that turns unbuffered stream into an adaptively buffered one. * Created stream differs from the one created via `TBuffered` template in that * it dynamically adjusts the size of the buffer based on the amount of data * that's passed through it. * * Example usage: * @code * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file"); * @endcode */ template <class TSlave> class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput { using TSlaveBase = TEmbedPolicy<TSlave>; public: template <typename... Args> inline TAdaptivelyBuffered(Args&&... args) : TSlaveBase(std::forward<Args>(args)...) , TAdaptiveBufferedOutput(TSlaveBase::Ptr()) { } TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete; TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete; TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete; TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete; }; /** @} */