aboutsummaryrefslogblamecommitdiffstats
path: root/util/stream/buffered.h
blob: 0847186141f53665f5daf1389b430d8426cf62f6 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
            
 
                     
                            
 
                  
                             

                                      










                                                                               
                                             
       
                                                              


                                                         
                               
 



                                                                             
       
                                    
          
                                                  
                                                   
                                                         




                         









                                                                                
                                                   
       






                                                                            
                                              





                                                                      
                                                             
 
                                                                   
                                    
 


                                                                                 
       
                                                           


                                          



                                                                                  
                                                        




                                                                                   
                                                         


                
                                       
                                                        
                                   
                             



                         



                                                   
                                                   
                                                                


                                                                     
  




                                                                               
                                                           
                                                  


                                                                                     


                                                      
                                                                     






                                            
                                                                                                                       

      


                                                                          
                                                                                    

                 
                                                                       
                                       

                                                                                                       
                                                                                   


                                              
                                                 





                                             



                                                    
  






                                                                               
                                                                           
           
                                                                                         
                                            


                                              
                                                 

                                                    



                                                                         
  
         
#pragma once

#include "zerocopy.h"
#include "zerocopy_output.h"

#include <utility>
#include <util/generic/ptr.h>
#include <util/generic/typetraits.h>
#include <util/generic/store_policy.h>

/**
 * @addtogroup Streams_Buffered
 * @{
 */

/**
 * Input stream that wraps the given stream and adds a buffer on top of it,
 * thus making sure that data is read from the underlying stream in big chunks.
 *
 * Note that it does not claim ownership of the underlying stream, so it's up
 * to the user to free it.
 */
class TBufferedInput: public IZeroCopyInput {
public:
    TBufferedInput(IInputStream* slave, size_t buflen = 8192);

    TBufferedInput(TBufferedInput&&) noexcept;
    TBufferedInput& operator=(TBufferedInput&&) noexcept;

    ~TBufferedInput() override;

    /**
     * Switches the underlying stream to the one provided. Does not clear the
     * data that was already buffered.
     *
     * @param slave                     New underlying stream.
     */
    void Reset(IInputStream* slave);

protected:
    size_t DoRead(void* buf, size_t len) override;
    size_t DoReadTo(TString& st, char ch) override;
    size_t DoSkip(size_t len) override;
    size_t DoNext(const void** ptr, size_t len) override;

private:
    class TImpl;
    THolder<TImpl> Impl_;
};

/**
 * Output stream that wraps the given stream and adds a buffer on top of it,
 * thus making sure that data is written to the underlying stream in big chunks.
 *
 * Note that by default this stream does not propagate `Flush` and `Finish`
 * calls to the underlying stream, instead simply flushing out the buffer.
 * You can change this behavior by using propagation mode setters.
 *
 * Also note that this stream does not claim ownership of the underlying stream,
 * so it's up to the user to free it.
 */
class TBufferedOutputBase: public IZeroCopyOutput {
public:
    /**
     * Constructs a buffered stream that dynamically adjusts the size of the
     * buffer. This works best when the amount of data that will be passed
     * through this stream is not known and can range in size from several
     * kilobytes to several gigabytes.
     *
     * @param slave                     Underlying stream.
     */
    TBufferedOutputBase(IOutputStream* slave);

    /**
     * Constructs a buffered stream with the given size of the buffer.
     *
     * @param slave                     Underlying stream.
     * @param buflen                    Size of the buffer.
     */
    TBufferedOutputBase(IOutputStream* slave, size_t buflen);

    TBufferedOutputBase(TBufferedOutputBase&&) noexcept;
    TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept;

    ~TBufferedOutputBase() override;

    /**
     * @param propagate                 Whether `Flush` and `Finish` calls should
     *                                  be propagated to the underlying stream.
     *                                  By default they are not.
     */
    inline void SetPropagateMode(bool propagate) noexcept {
        SetFlushPropagateMode(propagate);
        SetFinishPropagateMode(propagate);
    }

    /**
     * @param propagate                 Whether `Flush` calls should be propagated
     *                                  to the underlying stream. By default they
     *                                  are not.
     */
    void SetFlushPropagateMode(bool propagate) noexcept;

    /**
     * @param propagate                 Whether `Finish` calls should be propagated
     *                                  to the underlying stream. By default they
     *                                  are not.
     */
    void SetFinishPropagateMode(bool propagate) noexcept;

    class TImpl;

protected:
    size_t DoNext(void** ptr) override;
    void DoUndo(size_t len) override;
    void DoWrite(const void* data, size_t len) override;
    void DoWriteC(char c) override;
    void DoFlush() override;
    void DoFinish() override;

private:
    THolder<TImpl> Impl_;
};

/**
 * Buffered output stream with a fixed-size buffer.
 *
 * @see TBufferedOutputBase
 */
class TBufferedOutput: public TBufferedOutputBase {
public:
    TBufferedOutput(IOutputStream* slave, size_t buflen = 8192);
    ~TBufferedOutput() override;

    TBufferedOutput(TBufferedOutput&&) noexcept = default;
    TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default;
};

/**
 * Buffered output stream that dynamically adjusts the size of the buffer based
 * on the amount of data that's passed through it.
 *
 * @see TBufferedOutputBase
 */
class TAdaptiveBufferedOutput: public TBufferedOutputBase {
public:
    TAdaptiveBufferedOutput(IOutputStream* slave);
    ~TAdaptiveBufferedOutput() override;

    TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default;
    TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default;
};

namespace NPrivate {
    struct TMyBufferedOutput: public TBufferedOutput {
        inline TMyBufferedOutput(IOutputStream* slave, size_t buflen)
            : TBufferedOutput(slave, buflen)
        {
            SetFinishPropagateMode(true);
        }
    };

    template <class T>
    struct TBufferedStreamFor {
        using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>;
    };
}

/**
 * A mixin class that turns unbuffered stream into a buffered one.
 *
 * Note that using this mixin with a stream that is already buffered won't
 * result in double buffering, e.g. `TBuffered<TBuffered<TUnbufferedFileInput>>` and
 * `TBuffered<TUnbufferedFileInput>` are basically the same types.
 *
 * Example usage:
 * @code
 * TBuffered<TUnbufferedFileInput> file_input(1024, "/path/to/file");
 * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file");
 * @endcode
 * Here 1024 is the size of the buffer.
 */
template <class TSlave>
class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult {
    using TSlaveBase = TEmbedPolicy<TSlave>;
    using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult;

public:
    template <typename... Args>
    inline TBuffered(size_t b, Args&&... args)
        : TSlaveBase(std::forward<Args>(args)...)
        , TBufferedBase(TSlaveBase::Ptr(), b)
    {
    }

    inline TSlave& Slave() noexcept {
        return *this->Ptr();
    }

    TBuffered(const TBuffered&) = delete;
    TBuffered& operator=(const TBuffered&) = delete;
    TBuffered(TBuffered&&) = delete;
    TBuffered& operator=(TBuffered&&) = delete;
};

/**
 * A mixin class that turns unbuffered stream into an adaptively buffered one.
 * Created stream differs from the one created via `TBuffered` template in that
 * it dynamically adjusts the size of the buffer based on the amount of data
 * that's passed through it.
 *
 * Example usage:
 * @code
 * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file");
 * @endcode
 */
template <class TSlave>
class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput {
    using TSlaveBase = TEmbedPolicy<TSlave>;

public:
    template <typename... Args>
    inline TAdaptivelyBuffered(Args&&... args)
        : TSlaveBase(std::forward<Args>(args)...)
        , TAdaptiveBufferedOutput(TSlaveBase::Ptr())
    {
    }

    TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete;
    TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete;
    TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete;
    TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete;
};

/** @} */