aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/netliba/v6/block_chain.h
blob: e214d5f60320378be49352d510ae4b66407fd4aa (plain) (tree)












































































































































                                                                                                                            
                                                         












































































































































































                                                                                     
#pragma once

#include <util/generic/algorithm.h>
#include <util/generic/list.h>
#include <util/system/shmat.h>
#include <util/generic/noncopyable.h>

namespace NNetliba {
    class TBlockChain {
    public:
        struct TBlock {
            const char* Data;
            int Offset, Size; // Offset in whole chain

            TBlock()
                : Data(nullptr)
                , Offset(0)
                , Size(0)
            {
            }
            TBlock(const char* data, int offset, int sz)
                : Data(data)
                , Offset(offset)
                , Size(sz)
            {
            }
        };

    private:
        typedef TVector<TBlock> TBlockVector;
        TBlockVector Blocks;
        int Size;
        struct TBlockLess {
            bool operator()(const TBlock& b, int offset) const {
                return b.Offset < offset;
            }
        };

    public:
        TBlockChain()
            : Size(0)
        {
        }
        void AddBlock(const void* data, int sz) {
            Blocks.push_back(TBlock((const char*)data, Size, sz));
            Size += sz;
        }
        int GetSize() const {
            return Size;
        }
        const TBlock& GetBlock(int i) const {
            return Blocks[i];
        }
        int GetBlockCount() const {
            return Blocks.ysize();
        }
        int GetBlockIdByOffset(int offset) const {
            TBlockVector::const_iterator i = LowerBound(Blocks.begin(), Blocks.end(), offset, TBlockLess());
            if (i == Blocks.end())
                return Blocks.ysize() - 1;
            if (i->Offset == offset)
                return (int)(i - Blocks.begin());
            return (int)(i - Blocks.begin() - 1);
        }
    };

    //////////////////////////////////////////////////////////////////////////
    class TBlockChainIterator {
        const TBlockChain& Chain;
        int Pos, BlockPos, BlockId;
        bool Failed;

    public:
        TBlockChainIterator(const TBlockChain& chain)
            : Chain(chain)
            , Pos(0)
            , BlockPos(0)
            , BlockId(0)
            , Failed(false)
        {
        }
        void Read(void* dst, int sz) {
            char* dstBuf = (char*)dst;
            while (sz > 0) {
                if (BlockId >= Chain.GetBlockCount()) {
                    // JACKPOT!
                    fprintf(stderr, "reading beyond chain end: BlockId %d, Chain.GetBlockCount() %d, Pos %d, BlockPos %d\n",
                            BlockId, Chain.GetBlockCount(), Pos, BlockPos);
                    Y_ASSERT(0 && "reading beyond chain end");
                    memset(dstBuf, 0, sz);
                    Failed = true;
                    return;
                }
                const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
                int copySize = Min(blk.Size - BlockPos, sz);
                memcpy(dstBuf, blk.Data + BlockPos, copySize);
                dstBuf += copySize;
                Pos += copySize;
                BlockPos += copySize;
                sz -= copySize;
                if (BlockPos == blk.Size) {
                    BlockPos = 0;
                    ++BlockId;
                }
            }
        }
        void Seek(int pos) {
            if (pos < 0 || pos > Chain.GetSize()) {
                Y_ASSERT(0);
                Pos = 0;
                BlockPos = 0;
                BlockId = 0;
                return;
            }
            BlockId = Chain.GetBlockIdByOffset(pos);
            const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
            Pos = pos;
            BlockPos = Pos - blk.Offset;
        }
        int GetPos() const {
            return Pos;
        }
        int GetSize() const {
            return Chain.GetSize();
        }
        bool HasFailed() const {
            return Failed;
        }
        void Fail() {
            Failed = true;
        }
    };

    //////////////////////////////////////////////////////////////////////////
    class TRopeDataPacket: public TNonCopyable {
        TBlockChain Chain;
        TVector<char*> Buf;
        char *Block, *BlockEnd;
        TList<TVector<char>> DataVectors;
        TIntrusivePtr<TSharedMemory> SharedData;
        TVector<TIntrusivePtr<TThrRefBase>> AttachedStorage;
        char DefaultBuf[128]; // prevent allocs in most cases

        static constexpr int N_DEFAULT_BLOCK_SIZE = 1024;

        char* Alloc(int sz) {
            char* res = nullptr;
            if (BlockEnd - Block < sz) {
                int bufSize = Max((int)N_DEFAULT_BLOCK_SIZE, sz);
                char* newBlock = AllocBuf(bufSize);
                Block = newBlock;
                BlockEnd = Block + bufSize;
                Buf.push_back(newBlock);
            }
            res = Block;
            Block += sz;
            Y_ASSERT(Block <= BlockEnd);
            return res;
        }

    public:
        TRopeDataPacket()
            : Block(DefaultBuf)
            , BlockEnd(DefaultBuf + Y_ARRAY_SIZE(DefaultBuf))
        {
        }
        ~TRopeDataPacket() {
            for (size_t i = 0; i < Buf.size(); ++i)
                FreeBuf(Buf[i]);
        }
        static char* AllocBuf(int sz) {
            return new char[sz];
        }
        static void FreeBuf(char* buf) {
            delete[] buf;
        }

        // buf - pointer to buffer which will be freed with FreeBuf()
        // data - pointer to data start within buf
        // sz - size of useful data
        void AddBlock(char* buf, const char* data, int sz) {
            Buf.push_back(buf);
            Chain.AddBlock(data, sz);
        }
        void AddBlock(TThrRefBase* buf, const char* data, int sz) {
            AttachedStorage.push_back(buf);
            Chain.AddBlock(data, sz);
        }
        //
        void Write(const void* data, int sz) {
            char* buf = Alloc(sz);
            memcpy(buf, data, sz);
            Chain.AddBlock(buf, sz);
        }
        template <class T>
        void Write(const T& data) {
            Write(&data, sizeof(T));
        }
        //// caller guarantees that data will persist all *this lifetime
        //// int this case so we don`t have to copy data to locally held buffer
        //template<class T>
        //void WriteNoCopy(const T *data)
        //{
        //    Chain.AddBlock(data, sizeof(T));
        //}
        // write some array like TVector<>
        //template<class T>
        //void WriteArr(const T &sz)
        //{
        //    int n = (int)sz.size();
        //    Write(n);
        //    if (n > 0)
        //        Write(&sz[0], n * sizeof(sz[0]));
        //}
        void WriteStroka(const TString& sz) {
            int n = (int)sz.size();
            Write(n);
            if (n > 0)
                Write(sz.c_str(), n * sizeof(sz[0]));
        }
        // will take *data ownership, saves copy
        void WriteDestructive(TVector<char>* data) {
            int n = data ? data->ysize() : 0;
            Write(n);
            if (n > 0) {
                TVector<char>& local = DataVectors.emplace_back(std::move(*data));
                Chain.AddBlock(&local[0], local.ysize());
            }
        }
        void AttachSharedData(TIntrusivePtr<TSharedMemory> shm) {
            SharedData = shm;
        }
        TSharedMemory* GetSharedData() const {
            return SharedData.Get();
        }
        const TBlockChain& GetChain() {
            return Chain;
        }
        int GetSize() {
            return Chain.GetSize();
        }
    };

    template <class T>
    inline void ReadArr(TBlockChainIterator* res, T* dst) {
        int n;
        res->Read(&n, sizeof(n));
        if (n >= 0) {
            dst->resize(n);
            if (n > 0)
                res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
        } else {
            dst->resize(0);
            res->Fail();
        }
    }

    template <>
    inline void ReadArr<TString>(TBlockChainIterator* res, TString* dst) {
        int n;
        res->Read(&n, sizeof(n));
        if (n >= 0) {
            dst->resize(n);
            if (n > 0)
                res->Read(dst->begin(), n * sizeof(TString::value_type));
        } else {
            dst->resize(0);
            res->Fail();
        }
    }

    // saves on zeroing *dst with yresize()
    template <class T>
    static void ReadYArr(TBlockChainIterator* res, TVector<T>* dst) {
        int n;
        res->Read(&n, sizeof(n));
        if (n >= 0) {
            dst->yresize(n);
            if (n > 0)
                res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
        } else {
            dst->yresize(0);
            res->Fail();
        }
    }

    template <class T>
    static void Read(TBlockChainIterator* res, T* dst) {
        res->Read(dst, sizeof(T));
    }

    ui32 CalcChecksum(const void* p, int size);
    ui32 CalcChecksum(const TBlockChain& chain);

    class TIncrementalChecksumCalcer {
        i64 TotalSum;
        int Offset;

    public:
        TIncrementalChecksumCalcer()
            : TotalSum(0)
            , Offset(0)
        {
        }
        void AddBlock(const void* p, int size);
        void AddBlockSum(ui32 sum, int size);
        ui32 CalcChecksum();

        static ui32 CalcBlockSum(const void* p, int size);
    };

    inline void AddChain(TIncrementalChecksumCalcer* ics, const TBlockChain& chain) {
        for (int k = 0; k < chain.GetBlockCount(); ++k) {
            const TBlockChain::TBlock& blk = chain.GetBlock(k);
            ics->AddBlock(blk.Data, blk.Size);
        }
    }
}