diff options
| author | qrort <[email protected]> | 2022-11-30 23:47:12 +0300 | 
|---|---|---|
| committer | qrort <[email protected]> | 2022-11-30 23:47:12 +0300 | 
| commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
| tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/microbdb | |
| parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/microbdb')
| -rw-r--r-- | library/cpp/microbdb/align.h | 17 | ||||
| -rw-r--r-- | library/cpp/microbdb/compressed.h | 520 | ||||
| -rw-r--r-- | library/cpp/microbdb/extinfo.h | 127 | ||||
| -rw-r--r-- | library/cpp/microbdb/file.cpp | 220 | ||||
| -rw-r--r-- | library/cpp/microbdb/file.h | 225 | ||||
| -rw-r--r-- | library/cpp/microbdb/hashes.h | 250 | ||||
| -rw-r--r-- | library/cpp/microbdb/header.cpp | 91 | ||||
| -rw-r--r-- | library/cpp/microbdb/header.h | 159 | ||||
| -rw-r--r-- | library/cpp/microbdb/heap.h | 143 | ||||
| -rw-r--r-- | library/cpp/microbdb/input.h | 1027 | ||||
| -rw-r--r-- | library/cpp/microbdb/microbdb.cpp | 1 | ||||
| -rw-r--r-- | library/cpp/microbdb/microbdb.h | 54 | ||||
| -rw-r--r-- | library/cpp/microbdb/noextinfo.proto | 4 | ||||
| -rw-r--r-- | library/cpp/microbdb/output.h | 1049 | ||||
| -rw-r--r-- | library/cpp/microbdb/powersorter.h | 667 | ||||
| -rw-r--r-- | library/cpp/microbdb/reader.h | 354 | ||||
| -rw-r--r-- | library/cpp/microbdb/safeopen.h | 792 | ||||
| -rw-r--r-- | library/cpp/microbdb/sorter.h | 677 | ||||
| -rw-r--r-- | library/cpp/microbdb/sorterdef.h | 19 | ||||
| -rw-r--r-- | library/cpp/microbdb/utility.h | 75 | ||||
| -rw-r--r-- | library/cpp/microbdb/wrappers.h | 637 | 
21 files changed, 7108 insertions, 0 deletions
diff --git a/library/cpp/microbdb/align.h b/library/cpp/microbdb/align.h new file mode 100644 index 00000000000..2f8567f1344 --- /dev/null +++ b/library/cpp/microbdb/align.h @@ -0,0 +1,17 @@ +#pragma once + +#include <util/system/defaults.h> + +using TDatAlign = int; + +static inline size_t DatFloor(size_t size) { +    return (size - 1) & ~(sizeof(TDatAlign) - 1); +} + +static inline size_t DatCeil(size_t size) { +    return DatFloor(size) + sizeof(TDatAlign); +} + +static inline void DatSet(void* ptr, size_t size) { +    *(TDatAlign*)((char*)ptr + DatFloor(size)) = 0; +} diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h new file mode 100644 index 00000000000..f0c9edfa925 --- /dev/null +++ b/library/cpp/microbdb/compressed.h @@ -0,0 +1,520 @@ +#pragma once + +#include <util/stream/zlib.h> + +#include "microbdb.h" +#include "safeopen.h" + +class TCompressedInputFileManip: public TInputFileManip { +public: +    inline i64 GetLength() const { +        return -1; // Some microbdb logic rely on unknown size of compressed files +    } + +    inline i64 Seek(i64 offset, int whence) { +        i64 oldPos = DoGetPosition(); +        i64 newPos = offset; +        switch (whence) { +            case SEEK_CUR: +                newPos += oldPos; +                [[fallthrough]]; // Complier happy. Please fix it! +            case SEEK_SET: +                break; +            default: +                return -1L; +        } +        if (oldPos > newPos) { +            VerifyRandomAccess(); +            DoSeek(0, SEEK_SET, IsStreamOpen()); +            oldPos = 0; +        } +        const size_t bufsize = 1 << 12; +        char buf[bufsize]; +        for (i64 i = oldPos; i < newPos; i += bufsize) +            InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); +        return newPos; +    } + +    i64 RealSeek(i64 offset, int whence) { +        InputStream.Destroy(); +        i64 ret = DoSeek(offset, whence, !!CompressedInput); +        if (ret != -1) +            DoStreamOpen(DoCreateStream(), true); +        return ret; +    } + +protected: +    IInputStream* CreateStream(const TFile& file) override { +        CompressedInput.Reset(new TUnbufferedFileInput(file)); +        return DoCreateStream(); +    } +    inline IInputStream* DoCreateStream() { +        return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip); +        //return new TLzqDecompress(CompressedInput.Get()); +    } +    THolder<IInputStream> CompressedInput; +}; + +class TCompressedBufferedInputFileManip: public TCompressedInputFileManip { +protected: +    IInputStream* CreateStream(const TFile& file) override { +        CompressedInput.Reset(new TFileInput(file, 0x100000)); +        return DoCreateStream(); +    } +}; + +using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>; +using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>; + +template <class TVal> +struct TGzKey { +    ui64 Offset; +    TVal Key; + +    static const ui32 RecordSig = TVal::RecordSig + 0x50495a47; + +    TGzKey() { +    } + +    TGzKey(ui64 offset, const TVal& key) +        : Offset(offset) +        , Key(key) +    { +    } + +    size_t SizeOf() const { +        if (this) +            return sizeof(Offset) + ::SizeOf(&Key); +        else { +            size_t sizeOfKey = ::SizeOf((TVal*)NULL); +            return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0; +        } +    } +}; + +template <class TVal> +class TInZIndexFile: protected  TInDatFileImpl<TGzKey<TVal>> { +    typedef TInDatFileImpl<TGzKey<TVal>> TDatFile; +    typedef TGzKey<TVal> TGzVal; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; + +public: +    TInZIndexFile() +        : Index0(nullptr) +    { +    } + +    int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { +        int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); +        if (ret) +            return ret; +        if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { +            TDatFile::Close(); +            return MBDB_NO_MEMORY; +        } +        if (SizeOf((TGzVal*)NULL)) +            RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL)); +        TDatFile::Next(); +        memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); +        return 0; +    } + +    int Close() { +        free(Index0); +        Index0 = NULL; +        return TDatFile::Close(); +    } + +    inline int GetError() const { +        return TDatFile::GetError(); +    } + +    int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { +        assert(IsOpen()); +        if (!SizeOf((TVal*)NULL)) +            return FindVszKey(akey); +        int pageno; +        i64 offset; +        FindKeyOnPage(pageno, offset, Index0, akey); +        TDatPage* page = TPageIter::GotoPage(pageno + 1); +        int num_add = (int)offset; +        FindKeyOnPage(pageno, offset, page, akey); +        return pageno + num_add; +    } + +    using TDatFile::IsOpen; + +    int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { +        int pageno; +        i64 offset; +        FindVszKeyOnPage(pageno, offset, Index0, akey); +        TDatPage* page = TPageIter::GotoPage(pageno + 1); +        int num_add = (int)offset; +        FindVszKeyOnPage(pageno, offset, page, akey); +        return pageno + num_add; +    } + +    i64 FindPage(int pageno) { +        if (!SizeOf((TVal*)NULL)) +            return FindVszPage(pageno); +        int recsize = DatCeil(SizeOf((TGzVal*)NULL)); +        TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage); +        if (!page) // can happen if pageno is beyond EOF +            return -1; +        unsigned int localpageno = pageno % RecsOnPage; +        if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF +            return -1; +        TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize); +        return v->Offset; +    } + +    i64 FindVszPage(int pageno) { +        TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage)); +        TGzVal* prev = cur; +        unsigned int n = 0; +        while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) { +            prev = cur; +            cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); +            n++; +        } +        TDatPage* page = TPageIter::GotoPage(n); +        unsigned int num_add = (unsigned int)(prev->Offset); +        n = 0; +        cur = (TGzVal*)((char*)page + sizeof(TDatPage)); +        while (n < page->RecNum && n + num_add < (unsigned int)pageno) { +            cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); +            n++; +        } +        if (n == page->RecNum) // can happen if pageno is beyond EOF +            return -1; +        return cur->Offset; +    } + +protected: +    void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) { +        int left = 0; +        int right = page->RecNum - 1; +        int recsize = DatCeil(SizeOf((TGzVal*)NULL)); +        while (left < right) { +            int middle = (left + right) >> 1; +            if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key) +                left = middle + 1; +            else +                right = middle; +        } +        //borders check (left and right) +        pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1; +        offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset; +    } + +    void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) { +        TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage)); +        ui32 RecordSig = page->RecNum; +        i64 tmpoffset = cur->Offset; +        for (; RecordSig > 0 && cur->Key < *key; --RecordSig) { +            tmpoffset = cur->Offset; +            cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); +        } +        int idx = page->RecNum - RecordSig - 1; +        pageno = (idx >= 0) ? idx : 0; +        offset = tmpoffset; +    } + +    TDatPage* Index0; +    int RecsOnPage; +}; + +template <class TKey> +class TCompressedIndexedInputPageFile: public TCompressedInputPageFile { +public: +    int GotoPage(int pageno); + +protected: +    TInZIndexFile<TKey> KeyFile; +}; + +template <class TVal, class TKey> +class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey, +                                                           TInDatFileImpl<TVal, TInputRecordIterator<TVal, +                                                                                                     TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> { +}; + +class TCompressedOutputFileManip: public TOutputFileManip { +public: +    inline i64 GetLength() const { +        return -1; // Some microbdb logic rely on unknown size of compressed files +    } + +    inline i64 Seek(i64 offset, int whence) { +        i64 oldPos = DoGetPosition(); +        i64 newPos = offset; +        switch (whence) { +            case SEEK_CUR: +                newPos += oldPos; +                [[fallthrough]]; // Compler happy. Please fix it! +            case SEEK_SET: +                break; +            default: +                return -1L; +        } +        if (oldPos > newPos) +            return -1L; + +        const size_t bufsize = 1 << 12; +        char buf[bufsize] = {0}; +        for (i64 i = oldPos; i < newPos; i += bufsize) +            OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); +        return newPos; +    } + +    i64 RealSeek(i64 offset, int whence) { +        OutputStream.Destroy(); +        i64 ret = DoSeek(offset, whence, !!CompressedOutput); +        if (ret != -1) +            DoStreamOpen(DoCreateStream(), true); +        return ret; +    } + +protected: +    IOutputStream* CreateStream(const TFile& file) override { +        CompressedOutput.Reset(new TUnbufferedFileOutput(file)); +        return DoCreateStream(); +    } +    inline IOutputStream* DoCreateStream() { +        return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1); +    } +    THolder<IOutputStream> CompressedOutput; +}; + +class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip { +protected: +    IOutputStream* CreateStream(const TFile& file) override { +        CompressedOutput.Reset(new TUnbufferedFileOutput(file)); +        return DoCreateStream(); +    } +    inline IOutputStream* DoCreateStream() { +        return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000); +    } +}; + +using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>; +using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>; + +template <class TVal> +class TOutZIndexFile: public TOutDatFileImpl< +                           TGzKey<TVal>, +                           TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> { +    typedef TOutDatFileImpl< +        TGzKey<TVal>, +        TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> +        TDatFile; +    typedef TOutZIndexFile<TVal> TMyType; +    typedef TGzKey<TVal> TGzVal; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TIndexer TIndexer; + +public: +    TOutZIndexFile() { +        TotalRecNum = 0; +        TIndexer::SetCallback(this, DispatchCallback); +    } + +    int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { +        int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); +        if (ret) +            return ret; +        if ((ret = TRecIter::GotoPage(1))) +            TDatFile::Close(); +        return ret; +    } + +    int Close() { +        TPageIter::Unfreeze(); +        if (TRecIter::RecNum) +            NextPage(TPageIter::Current()); +        int ret = 0; +        if (Index0.size() && !(ret = TRecIter::GotoPage(0))) { +            typename std::vector<TGzVal>::iterator it, end = Index0.end(); +            for (it = Index0.begin(); it != end; ++it) +                TRecIter::Push(&*it); +            ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); +        } +        Index0.clear(); +        int ret1 = TDatFile::Close(); +        return ret ? ret : ret1; +    } + +protected: +    int TotalRecNum; // should be enough because we have GotoPage(int) +    std::vector<TGzVal> Index0; + +    void NextPage(const TDatPage* page) { +        TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage)); +        Index0.push_back(TGzVal(TotalRecNum, rec->Key)); +        TotalRecNum += TRecIter::RecNum; +    } + +    static void DispatchCallback(void* This, const TDatPage* page) { +        ((TMyType*)This)->NextPage(page); +    } +}; + +template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile> +class TOutDirectCompressedFileImpl: public TOutDatFileImpl< +                                         TVal, +                                         TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> { +    typedef TOutDatFileImpl< +        TVal, +        TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> +        TDatFile; +    typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TIndexer TIndexer; +    typedef TGzKey<TKey> TMyKey; +    typedef TOutZIndexFile<TKey> TKeyFile; + +protected: +    using TDatFile::Tell; + +public: +    TOutDirectCompressedFileImpl() { +        TIndexer::SetCallback(this, DispatchCallback); +    } + +    int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) { +        char iname[FILENAME_MAX]; +        int ret; +        if (ipagesize == 0) +            ipagesize = pagesize; + +        ret = TDatFile::Open(fname, pagesize, 1, 1); +        ret = ret ? ret : DatNameToIdx(iname, fname); +        ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1); +        if (ret) +            TDatFile::Close(); +        return ret; +    } + +    int Close() { +        if (TRecIter::RecNum) +            NextPage(TPageIter::Current()); +        int ret = KeyFile.Close(); +        int ret1 = TDatFile::Close(); +        return ret1 ? ret1 : ret; +    } + +    int GetError() const { +        return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); +    } + +protected: +    TKeyFile KeyFile; + +    void NextPage(const TDatPage* page) { +        size_t sz = SizeOf((TMyKey*)NULL); +        TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>()); +        if (rec) { +            rec->Offset = Tell(); +            rec->Key = *(TVal*)((char*)page + sizeof(TDatPage)); +            KeyFile.ResetDat(); +        } +    } + +    static void DispatchCallback(void* This, const TDatPage* page) { +        ((TMyType*)This)->NextPage(page); +    } +}; + +template <class TKey> +int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) { +    if (Error) +        return Error; + +    Eof = 0; + +    i64 offset = KeyFile.FindPage(pageno); +    if (!offset) +        return Error = MBDB_BAD_FILE_SIZE; + +    if (offset != FileManip.RealSeek(offset, SEEK_SET)) +        Error = MBDB_BAD_FILE_SIZE; + +    return Error; +} + +template <typename TVal> +class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> { +public: +    TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1) +        : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes) +    { +    } +}; + +template <typename TVal> +class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> { +public: +    TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes) +    { +    } +}; + +template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile> +class TOutDirectCompressedFile: protected  TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> { +    typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase; + +public: +    TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0) +        : Name(strdup(name)) +        , PageSize(pagesize) +        , IdxPageSize(ipagesize) +    { +    } + +    ~TOutDirectCompressedFile() { +        Close(); +        free(Name); +        Name = NULL; +    } + +    void Open(const char* fname) { +        int ret = TBase::Open(fname, PageSize, IdxPageSize); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); +        free(Name); +        Name = strdup(fname); +    } + +    void Close() { +        int ret; +        if ((ret = TBase::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); +        if ((ret = TBase::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); +    } + +    const char* GetName() const { +        return Name; +    } + +    using TBase::Freeze; +    using TBase::Push; +    using TBase::Reserve; +    using TBase::Unfreeze; + +protected: +    char* Name; +    size_t PageSize, IdxPageSize; +}; + +class TCompressedInterFileTypes { +public: +    typedef TCompressedBufferedOutputPageFile TOutPageFile; +    typedef TCompressedBufferedInputPageFile TInPageFile; +}; diff --git a/library/cpp/microbdb/extinfo.h b/library/cpp/microbdb/extinfo.h new file mode 100644 index 00000000000..c8389e783cd --- /dev/null +++ b/library/cpp/microbdb/extinfo.h @@ -0,0 +1,127 @@ +#pragma once + +#include "header.h" + +#include <library/cpp/packedtypes/longs.h> + +#include <util/generic/typetraits.h> + +#include <library/cpp/microbdb/noextinfo.pb.h> + +inline bool operator<(const TNoExtInfo&, const TNoExtInfo&) { +    return false; +} + +namespace NMicroBDB { +    Y_HAS_MEMBER(TExtInfo); + +    template <class, bool> +    struct TSelectExtInfo; + +    template <class T> +    struct TSelectExtInfo<T, false> { +        typedef TNoExtInfo TExtInfo; +    }; + +    template <class T> +    struct TSelectExtInfo<T, true> { +        typedef typename T::TExtInfo TExtInfo; +    }; + +    template <class T> +    class TExtInfoType { +    public: +        static const bool Exists = THasTExtInfo<T>::value; +        typedef typename TSelectExtInfo<T, Exists>::TExtInfo TResult; +    }; + +    Y_HAS_MEMBER(MakeExtKey); + +    template <class, class, bool> +    struct TSelectMakeExtKey; + +    template <class TVal, class TKey> +    struct TSelectMakeExtKey<TVal, TKey, false> { +        static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult*, const TVal* from, const typename TExtInfoType<TVal>::TResult*) { +            *to = *from; +        } +    }; + +    template <class TVal, class TKey> +    struct TSelectMakeExtKey<TVal, TKey, true> { +        static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { +            TVal::MakeExtKey(to, toExt, from, fromExt); +        } +    }; + +    template <typename T> +    inline size_t SizeOfExt(const T* rec, size_t* /*out*/ extLenSize = nullptr, size_t* /*out*/ extSize = nullptr) { +        if (!TExtInfoType<T>::Exists) { +            if (extLenSize) +                *extLenSize = 0; +            if (extSize) +                *extSize = 0; +            return SizeOf(rec); +        } else { +            size_t sz = SizeOf(rec); +            i64 l; +            int els = in_long(l, (const char*)rec + sz); +            if (extLenSize) +                *extLenSize = static_cast<size_t>(els); +            if (extSize) +                *extSize = static_cast<size_t>(l); +            return sz; +        } +    } + +    template <class T> +    bool GetExtInfo(const T* rec, typename TExtInfoType<T>::TResult* extInfo) { +        Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); +        if (!rec) +            return false; +        size_t els; +        size_t es; +        size_t s = SizeOfExt(rec, &els, &es); +        const ui8* raw = (const ui8*)rec + s + els; +        return extInfo->ParseFromArray(raw, es); +    } + +    template <class T> +    const ui8* GetExtInfoRaw(const T* rec, size_t* len) { +        Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); +        if (!rec) { +            *len = 0; +            return nullptr; +        } +        size_t els; +        size_t es; +        size_t s = SizeOfExt(rec, &els, &es); +        *len = els + es; +        return (const ui8*)rec + s; +    } + +    // Compares serialized extInfo (e.g. for stable sort) +    template <class T> +    int CompareExtInfo(const T* a, const T* b) { +        Y_VERIFY(TExtInfoType<T>::Exists, "CompareExtInfo should only be used with extended records"); +        size_t elsA, esA; +        size_t elsB, esB; +        SizeOfExt(a, &elsA, &esA); +        SizeOfExt(a, &elsB, &esB); +        if (esA != esB) +            return esA - esB; +        else +            return memcmp((const ui8*)a + elsA, (const ui8*)b + elsB, esA); +    } + +} + +using NMicroBDB::TExtInfoType; + +template <class TVal, class TKey> +struct TMakeExtKey { +    static const bool Exists = NMicroBDB::THasMakeExtKey<TVal>::value; +    static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { +        NMicroBDB::TSelectMakeExtKey<TVal, TKey, Exists>::Make(to, toExt, from, fromExt); +    } +}; diff --git a/library/cpp/microbdb/file.cpp b/library/cpp/microbdb/file.cpp new file mode 100644 index 00000000000..599a7301a0f --- /dev/null +++ b/library/cpp/microbdb/file.cpp @@ -0,0 +1,220 @@ +#include "file.h" + +#include <fcntl.h> +#include <errno.h> +#include <sys/stat.h> + +#ifdef _win32_ +#define S_ISREG(x) !!(x & S_IFREG) +#endif + +TFileManipBase::TFileManipBase() +    : FileBased(true) +{ +} + +i64 TFileManipBase::DoSeek(i64 offset, int whence, bool isStreamOpen) { +    if (!isStreamOpen) +        return -1; +    VerifyRandomAccess(); +    return File.Seek(offset, (SeekDir)whence); +} + +int TFileManipBase::DoFileOpen(const TFile& file) { +    File = file; +    SetFileBased(IsFileBased()); +    return (File.IsOpen()) ? 0 : MBDB_OPEN_ERROR; +} + +int TFileManipBase::DoFileClose() { +    if (File.IsOpen()) { +        File.Close(); +        return MBDB_ALREADY_INITIALIZED; +    } +    return 0; +} + +int TFileManipBase::IsFileBased() const { +    bool fileBased = true; +#if defined(_win_) +#elif defined(_unix_) +    FHANDLE h = File.GetHandle(); +    struct stat sb; +    fileBased = false; +    if (h != INVALID_FHANDLE && !::fstat(h, &sb) && S_ISREG(sb.st_mode)) { +        fileBased = true; +    } +#else +#error +#endif +    return fileBased; +} + +TInputFileManip::TInputFileManip() +    : InputStream(nullptr) +{ +} + +int TInputFileManip::Open(const char* fname, bool direct) { +    int ret; +    return (ret = DoClose()) ? ret : DoStreamOpen(TFile(fname, RdOnly | (direct ? DirectAligned : EOpenMode()))); +} + +int TInputFileManip::Open(IInputStream& input) { +    int ret; +    return (ret = DoClose()) ? ret : DoStreamOpen(&input); +} + +int TInputFileManip::Open(TAutoPtr<IInputStream> input) { +    int ret; +    return (ret = DoClose()) ? ret : DoStreamOpen(input.Release()); +} + +int TInputFileManip::Init(const TFile& file) { +    int ret; +    if (ret = DoClose()) +        return ret; +    DoStreamOpen(file); +    return 0; +} + +int TInputFileManip::Close() { +    DoClose(); +    return 0; +} + +ssize_t TInputFileManip::Read(void* buf, unsigned len) { +    if (!IsStreamOpen()) +        return -1; +    return InputStream->Load(buf, len); +} + +IInputStream* TInputFileManip::CreateStream(const TFile& file) { +    return new TUnbufferedFileInput(file); +} + +TMappedInputPageFile::TMappedInputPageFile() +    : Pagesize(0) +    , Error(0) +    , Pagenum(0) +    , Recordsig(0) +    , Open(false) +{ +    Term(); +} + +TMappedInputPageFile::~TMappedInputPageFile() { +    Term(); +} + +int TMappedInputPageFile::Init(const char* fname, ui32 recsig, ui32* gotRecordSig, bool) { +    Mappedfile.init(fname); +    Open = true; + +    TDatMetaPage* meta = (TDatMetaPage*)Mappedfile.getData(); +    if (gotRecordSig) +        *gotRecordSig = meta->RecordSig; + +    if (meta->MetaSig != METASIG) +        Error = MBDB_BAD_METAPAGE; +    else if (meta->RecordSig != recsig) +        Error = MBDB_BAD_RECORDSIG; + +    if (Error) { +        Mappedfile.term(); +        return Error; +    } + +    size_t fsize = Mappedfile.getSize(); +    if (fsize < METASIZE) +        return Error = MBDB_BAD_FILE_SIZE; +    fsize -= METASIZE; +    if (fsize % meta->PageSize) +        return Error = MBDB_BAD_FILE_SIZE; +    Pagenum = (int)(fsize / meta->PageSize); +    Pagesize = meta->PageSize; +    Recordsig = meta->RecordSig; +    Error = 0; +    return Error; +} + +int TMappedInputPageFile::Term() { +    Mappedfile.term(); +    Open = false; +    return 0; +} + +TOutputFileManip::TOutputFileManip() +    : OutputStream(nullptr) +{ +} + +int TOutputFileManip::Open(const char* fname, EOpenMode mode) { +    if (IsStreamOpen()) { +        return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip +    } + +    try { +        if (unlink(fname) && errno != ENOENT) { +            if (strncmp(fname, "/dev/std", 8)) +                return MBDB_OPEN_ERROR; +        } +        TFile file(fname, mode); +        DoStreamOpen(file); +    } catch (const TFileError&) { +        return MBDB_OPEN_ERROR; +    } +    return 0; +} + +int TOutputFileManip::Open(IOutputStream& output) { +    if (IsStreamOpen()) +        return MBDB_ALREADY_INITIALIZED; +    DoStreamOpen(&output); +    return 0; +} + +int TOutputFileManip::Open(TAutoPtr<IOutputStream> output) { +    if (IsStreamOpen()) +        return MBDB_ALREADY_INITIALIZED; +    DoStreamOpen(output.Release()); +    return 0; +} + +int TOutputFileManip::Init(const TFile& file) { +    if (IsStreamOpen()) +        return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip +    DoStreamOpen(file); +    return 0; +} + +int TOutputFileManip::Rotate(const char* newfname) { +    if (!IsStreamOpen()) { +        return MBDB_NOT_INITIALIZED; +    } + +    try { +        TFile file(newfname, WrOnly | OpenAlways | TruncExisting | ARW | AWOther); +        DoClose(); +        DoStreamOpen(file); +    } catch (const TFileError&) { +        return MBDB_OPEN_ERROR; +    } +    return 0; +} + +int TOutputFileManip::Close() { +    DoClose(); +    return 0; +} + +int TOutputFileManip::Write(const void* buf, unsigned len) { +    if (!IsStreamOpen()) +        return -1; +    OutputStream->Write(buf, len); +    return len; +} + +IOutputStream* TOutputFileManip::CreateStream(const TFile& file) { +    return new TUnbufferedFileOutput(file); +} diff --git a/library/cpp/microbdb/file.h b/library/cpp/microbdb/file.h new file mode 100644 index 00000000000..f7c78183759 --- /dev/null +++ b/library/cpp/microbdb/file.h @@ -0,0 +1,225 @@ +#pragma once + +#include "header.h" + +#include <library/cpp/deprecated/mapped_file/mapped_file.h> + +#include <util/generic/noncopyable.h> +#include <util/stream/file.h> +#include <util/system/filemap.h> + +#define FS_BLOCK_SIZE 512 + +class TFileManipBase { +protected: +    TFileManipBase(); + +    virtual ~TFileManipBase() { +    } + +    i64 DoSeek(i64 offset, int whence, bool isStreamOpen); + +    int DoFileOpen(const TFile& file); + +    int DoFileClose(); + +    int IsFileBased() const; + +    inline void SetFileBased(bool fileBased) { +        FileBased = fileBased; +    } + +    inline i64 DoGetPosition() const { +        Y_ASSERT(FileBased); +        return File.GetPosition(); +    } + +    inline i64 DoGetLength() const { +        return (FileBased) ? File.GetLength() : -1; +    } + +    inline void VerifyRandomAccess() const { +        Y_VERIFY(FileBased, "non-file stream can not be accessed randomly"); +    } + +    inline i64 GetPosition() const { +        return (i64)File.GetPosition(); +    } + +private: +    TFile File; +    bool FileBased; +}; + +class TInputFileManip: public TFileManipBase { +public: +    using TFileManipBase::GetPosition; + +    TInputFileManip(); + +    int Open(const char* fname, bool direct = false); + +    int Open(IInputStream& input); + +    int Open(TAutoPtr<IInputStream> input); + +    int Init(const TFile& file); + +    int Close(); + +    ssize_t Read(void* buf, unsigned len); + +    inline bool IsOpen() const { +        return IsStreamOpen(); +    } + +    inline i64 GetLength() const { +        return DoGetLength(); +    } + +    inline i64 Seek(i64 offset, int whence) { +        return DoSeek(offset, whence, IsStreamOpen()); +    } + +    inline i64 RealSeek(i64 offset, int whence) { +        return Seek(offset, whence); +    } + +protected: +    inline bool IsStreamOpen() const { +        return !!InputStream; +    } + +    inline int DoStreamOpen(IInputStream* input, bool fileBased = false) { +        InputStream.Reset(input); +        SetFileBased(fileBased); +        return 0; +    } + +    inline int DoStreamOpen(const TFile& file) { +        int ret; +        return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), IsFileBased()); +    } + +    virtual IInputStream* CreateStream(const TFile& file); + +    inline bool DoClose() { +        if (IsStreamOpen()) { +            InputStream.Destroy(); +            return DoFileClose(); +        } +        return 0; +    } + +    THolder<IInputStream> InputStream; +}; + +class TMappedInputPageFile: private TNonCopyable { +public: +    TMappedInputPageFile(); + +    ~TMappedInputPageFile(); + +    inline int GetError() const { +        return Error; +    } + +    inline size_t GetPageSize() const { +        return Pagesize; +    } + +    inline int GetLastPage() const { +        return Pagenum; +    } + +    inline ui32 GetRecordSig() const { +        return Recordsig; +    } + +    inline bool IsOpen() const { +        return Open; +    } + +    inline char* GetData() const { +        return Open ? (char*)Mappedfile.getData() : nullptr; +    } + +    inline size_t GetSize() const { +        return Open ? Mappedfile.getSize() : 0; +    } + +protected: +    int Init(const char* fname, ui32 recsig, ui32* gotRecordSig = nullptr, bool direct = false); + +    int Term(); + +    TMappedFile Mappedfile; +    size_t Pagesize; +    int Error; +    int Pagenum; +    ui32 Recordsig; +    bool Open; +}; + +class TOutputFileManip: public TFileManipBase { +public: +    TOutputFileManip(); + +    int Open(const char* fname, EOpenMode mode = WrOnly | CreateAlways | ARW | AWOther); + +    int Open(IOutputStream& output); + +    int Open(TAutoPtr<IOutputStream> output); + +    int Init(const TFile& file); + +    int Rotate(const char* newfname); + +    int Write(const void* buf, unsigned len); + +    int Close(); + +    inline bool IsOpen() const { +        return IsStreamOpen(); +    } + +    inline i64 GetLength() const { +        return DoGetLength(); +    } + +    inline i64 Seek(i64 offset, int whence) { +        return DoSeek(offset, whence, IsStreamOpen()); +    } + +    inline i64 RealSeek(i64 offset, int whence) { +        return Seek(offset, whence); +    } + +protected: +    inline bool IsStreamOpen() const { +        return !!OutputStream; +    } + +    inline int DoStreamOpen(IOutputStream* output, bool fileBased = false) { +        OutputStream.Reset(output); +        SetFileBased(fileBased); +        return 0; +    } + +    inline int DoStreamOpen(const TFile& file) { +        int ret; +        return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), true); +    } + +    virtual IOutputStream* CreateStream(const TFile& file); + +    inline bool DoClose() { +        if (IsStreamOpen()) { +            OutputStream.Destroy(); +            return DoFileClose(); +        } +        return 0; +    } + +    THolder<IOutputStream> OutputStream; +}; diff --git a/library/cpp/microbdb/hashes.h b/library/cpp/microbdb/hashes.h new file mode 100644 index 00000000000..bfd113c3ba4 --- /dev/null +++ b/library/cpp/microbdb/hashes.h @@ -0,0 +1,250 @@ +#pragma once + +#include <library/cpp/on_disk/st_hash/static_hash.h> +#include <util/system/sysstat.h> +#include <util/stream/mem.h> +#include <util/string/printf.h> +#include <library/cpp/deprecated/fgood/fgood.h> + +#include "safeopen.h" + +/** This file currently implements creation of mappable read-only hash file. +    Basic usage of these "static hashes" is defined in util/static_hash.h (see docs there). +    Additional useful wrappers are available in util/static_hash_map.h + +    There are two ways to create mappable hash file: + +    A) Fill an THashMap/set structure in RAM, then dump it to disk. +       This is usually done by save_hash_to_file* functions defined in static_hash.h +       (see description in static_hash.h). + +    B) Prepare all data using external sorter, then create hash file straight on disk. +       This approach is necessary when there isn't enough RAM to hold entire original THashMap. +       Implemented in this file as TStaticHashBuilder class. + +       Current implementation's major drawback is that the size of the hash must be estimated +       before the hash is built (bucketCount), which is not always possible. +       Separate implementation with two sort passes is yet to be done. + +       Another problem is that maximum stored size of the element (maxRecSize) must also be +       known in advance, because we use TDatSorterMemo, etc. + */ + +template <class SizeType> +struct TSthashTmpRec { +    SizeType HashVal; +    SizeType RecSize; +    char Buf[1]; +    size_t SizeOf() const { +        return &Buf[RecSize] - (char*)this; +    } +    bool operator<(const TSthashTmpRec& than) const { +        return HashVal < than.HashVal; +    } +    static const ui32 RecordSig = 20100124 + sizeof(SizeType) - 4; +}; + +template <typename T> +struct TReplaceMerger { +    T operator()(const T& oldRecord, const T& newRecord) const { +        Y_UNUSED(oldRecord); +        return newRecord; +    } +}; + +/** TStaticHashBuilder template parameters: +    HashType - THashMap map/set type for which we construct corresponding mappable hash; +    SizeType - type used to store offsets and length in resulting hash; +    MergerType - type of object to process records with equal key (see TReplaceMerger for example); + */ + +template <class HashType, class SizeType, class MergerType = TReplaceMerger<typename HashType::mapped_type>> +struct TStaticHashBuilder { +    const size_t SrtIOPageSz; +    const size_t WrBufSz; +    typedef TSthashTmpRec<SizeType> TIoRec; +    typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, SizeType> TKeySaver; +    typedef typename HashType::value_type TValueType; +    typedef typename HashType::mapped_type TMappedType; +    typedef typename HashType::key_type TKeyType; + +    TDatSorterMemo<TIoRec, TCompareByLess> Srt; +    TBuffer IoRec, CurrentBlockRecs; +    TKeySaver KeySaver; +    typename HashType::hasher Hasher; +    typename HashType::key_equal Equals; +    MergerType merger; +    TString HashFileName; +    TString OurTmpDir; +    size_t BucketCount; +    int FreeBits; + +    // memSz is the Sorter buffer size; +    // maxRecSize is the maximum size (as reported by size_for_st) of our record(s) +    TStaticHashBuilder(size_t memSz, size_t maxRecSize) +        : SrtIOPageSz((maxRecSize * 16 + 65535) & ~size_t(65535)) +        , WrBufSz(memSz / 16 >= SrtIOPageSz ? memSz / 16 : SrtIOPageSz) +        , Srt("unused", memSz, SrtIOPageSz, WrBufSz, 0) +        , IoRec(sizeof(TIoRec) + maxRecSize) +        , CurrentBlockRecs(sizeof(TIoRec) + maxRecSize) +        , BucketCount(0) +        , FreeBits(0) +    { +    } + +    ~TStaticHashBuilder() { +        Close(); +    } + +    // if tmpDir is supplied, it must exist; +    // bucketCount should be HashBucketCount() of the (estimated) element count +    void Open(const char* fname, size_t bucketCount, const char* tmpDir = nullptr) { +        if (!tmpDir) +            tmpDir = ~(OurTmpDir = Sprintf("%s.temp", fname)); +        Mkdir(tmpDir, MODE0775); +        Srt.Open(tmpDir); +        HashFileName = fname; +        BucketCount = bucketCount; +        int bitCount = 0; +        while (((size_t)1 << bitCount) <= BucketCount && bitCount < int(8 * sizeof(size_t))) +            ++bitCount; +        FreeBits = 8 * sizeof(size_t) - bitCount; +    } + +    void Push(const TValueType& rec) { +        TIoRec* ioRec = MakeIoRec(rec); +        Srt.Push(ioRec); +    } +    TIoRec* MakeIoRec(const TValueType& rec) { +        TIoRec* ioRec = (TIoRec*)IoRec.Data(); +        size_t mask = (1 << FreeBits) - 1; +        size_t hash = Hasher(rec.first); +        ioRec->HashVal = ((hash % BucketCount) << FreeBits) + ((hash / BucketCount) & mask); + +        TMemoryOutput output(ioRec->Buf, IoRec.Capacity() - offsetof(TIoRec, Buf)); +        KeySaver.SaveRecord(&output, rec); +        ioRec->RecSize = output.Buf() - ioRec->Buf; +        return ioRec; +    } + +    bool Merge(TVector<std::pair<TKeyType, TMappedType>>& records, size_t newRecordSize) { +        TSthashIterator<const TKeyType, const TMappedType, typename HashType::hasher, +                        typename HashType::key_equal> +            newPtr(CurrentBlockRecs.End() - newRecordSize); +        for (size_t i = 0; i < records.size(); ++i) { +            if (newPtr.KeyEquals(Equals, records[i].first)) { +                TMappedType oldValue = records[i].second; +                TMappedType newValue = newPtr.Value(); +                newValue = merger(oldValue, newValue); +                records[i].second = newValue; +                return true; +            } +        } +        records.push_back(std::make_pair(newPtr.Key(), newPtr.Value())); +        return false; +    } + +    void PutRecord(const char* buf, size_t rec_size, TFILEPtr& f, SizeType& cur_off) { +        f.fsput(buf, rec_size); +        cur_off += rec_size; +    } + +    void Finish() { +        Srt.Sort(); +        // We use variant 1. +        // Variant 1: read sorter once, write records, fseeks to write buckets +        //            (this doesn't allow fname to be stdout) +        // Variant 2: read sorter (probably temp. file) twice: write buckets, then write records +        //            (this allows fname to be stdout but seems to be longer) +        TFILEPtr f(HashFileName, "wb"); +        setvbuf(f, nullptr, _IOFBF, WrBufSz); +        TVector<SizeType> bucketsBuf(WrBufSz, 0); +        // prepare header (note: this code must be unified with save_stl.h) +        typedef sthashtable_nvm_sv<typename HashType::hasher, typename HashType::key_equal, SizeType> sv_type; +        sv_type sv = {Hasher, Equals, BucketCount, 0, 0}; +        // to do: m.b. use just the size of corresponding object? +        SizeType cur_off = sizeof(sv_type) + +                           (sv.num_buckets + 1) * sizeof(SizeType); +        SizeType bkt_wroff = sizeof(sv_type), bkt_bufpos = 0, prev_bkt = 0, prev_hash = (SizeType)-1; +        bucketsBuf[bkt_bufpos++] = cur_off; +        // if might me better to write many zeroes here +        f.seek(cur_off, SEEK_SET); +        TVector<std::pair<TKeyType, TMappedType>> currentBlock; +        bool emptyFile = true; +        size_t prevRecSize = 0; +        // seek forward +        while (true) { +            const TIoRec* rec = Srt.Next(); +            if (currentBlock.empty() && !emptyFile) { +                if (rec && prev_hash == rec->HashVal) { +                    Merge(currentBlock, prevRecSize); +                } else { +                    // if there is only one record with this hash, don't recode it, just write +                    PutRecord(CurrentBlockRecs.Data(), prevRecSize, f, cur_off); +                    sv.num_elements++; +                } +            } +            if (!rec || prev_hash != rec->HashVal) { +                // write buckets table +                for (size_t i = 0; i < currentBlock.size(); ++i) { +                    TIoRec* ioRec = MakeIoRec(TValueType(currentBlock[i])); +                    PutRecord(ioRec->Buf, ioRec->RecSize, f, cur_off); +                } +                sv.num_elements += currentBlock.size(); +                currentBlock.clear(); +                CurrentBlockRecs.Clear(); +                if (rec) { +                    prev_hash = rec->HashVal; +                } +            } +            // note: prev_bkt's semantics here is 'cur_bkt - 1', thus we are actually cycling +            //       until cur_bkt == rec->HashVal *inclusively* +            while (!rec || prev_bkt != (rec->HashVal >> FreeBits)) { +                bucketsBuf[bkt_bufpos++] = cur_off; +                if (bkt_bufpos == bucketsBuf.size()) { +                    f.seek(bkt_wroff, SEEK_SET); +                    size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); +                    if (f.write(bucketsBuf.begin(), 1, sz) != sz) +                        throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; +                    bkt_wroff += sz; +                    bkt_bufpos = 0; +                    f.seek(cur_off, SEEK_SET); +                } +                prev_bkt++; +                if (!rec) { +                    break; +                } +                assert(prev_bkt < BucketCount); +            } +            if (!rec) { +                break; +            } +            emptyFile = false; +            CurrentBlockRecs.Append(rec->Buf, rec->RecSize); +            if (!currentBlock.empty()) { +                Merge(currentBlock, rec->RecSize); +            } else { +                prevRecSize = rec->RecSize; +            } +        } +        // finish buckets table +        f.seek(bkt_wroff, SEEK_SET); +        size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); +        if (sz && f.write(bucketsBuf.begin(), 1, sz) != sz) +            throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; +        bkt_wroff += sz; +        for (; prev_bkt < BucketCount; prev_bkt++) +            f.fput(cur_off); +        // finally write header +        sv.data_end_off = cur_off; +        f.seek(0, SEEK_SET); +        f.fput(sv); +        f.close(); +    } + +    void Close() { +        Srt.Close(); +        if (+OurTmpDir) +            rmdir(~OurTmpDir); +    } +}; diff --git a/library/cpp/microbdb/header.cpp b/library/cpp/microbdb/header.cpp new file mode 100644 index 00000000000..f4511d6fb62 --- /dev/null +++ b/library/cpp/microbdb/header.cpp @@ -0,0 +1,91 @@ +#include "header.h" + +#include <util/stream/output.h> +#include <util/stream/format.h> + +TString ToString(EMbdbErrors error) { +    TString ret; +    switch (error) { +        case MBDB_ALREADY_INITIALIZED: +            ret = "already initialized"; +            break; +        case MBDB_NOT_INITIALIZED: +            ret = "not initialized"; +            break; +        case MBDB_BAD_DESCRIPTOR: +            ret = "bad descriptor"; +            break; +        case MBDB_OPEN_ERROR: +            ret = "open error"; +            break; +        case MBDB_READ_ERROR: +            ret = "read error"; +            break; +        case MBDB_WRITE_ERROR: +            ret = "write error"; +            break; +        case MBDB_CLOSE_ERROR: +            ret = "close error"; +            break; +        case MBDB_EXPECTED_EOF: +            ret = "expected eof"; +            break; +        case MBDB_UNEXPECTED_EOF: +            ret = "unxepected eof"; +            break; +        case MBDB_BAD_FILENAME: +            ret = "bad filename"; +            break; +        case MBDB_BAD_METAPAGE: +            ret = "bad metapage"; +            break; +        case MBDB_BAD_RECORDSIG: +            ret = "bad recordsig"; +            break; +        case MBDB_BAD_FILE_SIZE: +            ret = "bad file size"; +            break; +        case MBDB_BAD_PAGESIG: +            ret = "bad pagesig"; +            break; +        case MBDB_BAD_PAGESIZE: +            ret = "bad pagesize"; +            break; +        case MBDB_BAD_PARM: +            ret = "bad parm"; +            break; +        case MBDB_BAD_SYNC: +            ret = "bad sync"; +            break; +        case MBDB_PAGE_OVERFLOW: +            ret = "page overflow"; +            break; +        case MBDB_NO_MEMORY: +            ret = "no memory"; +            break; +        case MBDB_MEMORY_LEAK: +            ret = "memory leak"; +            break; +        case MBDB_NOT_SUPPORTED: +            ret = "not supported"; +            break; +        default: +            ret = "unknown"; +            break; +    } +    return ret; +} + +TString ErrorMessage(int error, const TString& text, const TString& path, ui32 recordSig, ui32 gotRecordSig) { +    TStringStream str; +    str << text; +    if (path.size()) +        str << " '" << path << "'"; +    str << ": " << ToString(static_cast<EMbdbErrors>(error)); +    if (recordSig && (!gotRecordSig || recordSig != gotRecordSig)) +        str << ". Expected RecordSig: " << Hex(recordSig, HF_ADDX); +    if (recordSig && gotRecordSig && recordSig != gotRecordSig) +        str << ", got: " << Hex(gotRecordSig, HF_ADDX); +    str << ". Last system error text: " << LastSystemErrorText(); +    return str.Str(); +} diff --git a/library/cpp/microbdb/header.h b/library/cpp/microbdb/header.h new file mode 100644 index 00000000000..0951d610ea7 --- /dev/null +++ b/library/cpp/microbdb/header.h @@ -0,0 +1,159 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/generic/typetraits.h> +#include <util/generic/string.h> +#include <util/str_stl.h> + +#include <stdio.h> + +#define METASIZE (1u << 12) +#define METASIG 0x12345678u +#define PAGESIG 0x87654321u + +enum EMbdbErrors { +    MBDB_ALREADY_INITIALIZED = 200, +    MBDB_NOT_INITIALIZED = 201, +    MBDB_BAD_DESCRIPTOR = 202, +    MBDB_OPEN_ERROR = 203, +    MBDB_READ_ERROR = 204, +    MBDB_WRITE_ERROR = 205, +    MBDB_CLOSE_ERROR = 206, +    MBDB_EXPECTED_EOF = 207, +    MBDB_UNEXPECTED_EOF = 208, +    MBDB_BAD_FILENAME = 209, +    MBDB_BAD_METAPAGE = 210, +    MBDB_BAD_RECORDSIG = 211, +    MBDB_BAD_FILE_SIZE = 212, +    MBDB_BAD_PAGESIG = 213, +    MBDB_BAD_PAGESIZE = 214, +    MBDB_BAD_PARM = 215, +    MBDB_BAD_SYNC = 216, +    MBDB_PAGE_OVERFLOW = 217, +    MBDB_NO_MEMORY = 218, +    MBDB_MEMORY_LEAK = 219, +    MBDB_NOT_SUPPORTED = 220 +}; + +TString ToString(EMbdbErrors error); +TString ErrorMessage(int error, const TString& text, const TString& path = TString(), ui32 recordSig = 0, ui32 gotRecordSig = 0); + +enum EPageFormat { +    MBDB_FORMAT_RAW = 0, +    MBDB_FORMAT_COMPRESSED = 1, +    MBDB_FORMAT_NULL = 255 +}; + +enum ECompressionAlgorithm { +    MBDB_COMPRESSION_ZLIB = 1, +    MBDB_COMPRESSION_FASTLZ = 2, +    MBDB_COMPRESSION_SNAPPY = 3 +}; + +struct TDatMetaPage { +    ui32 MetaSig; +    ui32 RecordSig; +    ui32 PageSize; +}; + +struct TDatPage { +    ui32 RecNum; //!< number of records on this page +    ui32 PageSig; +    ui32 Format : 2; //!< one of EPageFormat +    ui32 Reserved : 30; +}; + +/// Additional page header with compression info +struct TCompressedPage { +    ui32 BlockCount; +    ui32 Algorithm : 4; +    ui32 Version : 4; +    ui32 Reserved : 24; +}; + +namespace NMicroBDB { +    /// Header of compressed block +    struct TCompressedHeader { +        ui32 Compressed; +        ui32 Original; /// original size of block +        ui32 Count;    /// number of records in block +        ui32 Reserved; +    }; + +    Y_HAS_MEMBER(AssertValid); + +    template <typename T, bool TVal> +    struct TAssertValid { +        void operator()(const T*) { +        } +    }; + +    template <typename T> +    struct TAssertValid<T, true> { +        void operator()(const T* rec) { +            return rec->AssertValid(); +        } +    }; + +    template <typename T> +    void AssertValid(const T* rec) { +        return NMicroBDB::TAssertValid<T, NMicroBDB::THasAssertValid<T>::value>()(rec); +    } + +    Y_HAS_MEMBER(SizeOf); + +    template <typename T, bool TVal> +    struct TGetSizeOf; + +    template <typename T> +    struct TGetSizeOf<T, true> { +        size_t operator()(const T* rec) { +            return rec->SizeOf(); +        } +    }; + +    template <typename T> +    struct TGetSizeOf<T, false> { +        size_t operator()(const T*) { +            return sizeof(T); +        } +    }; + +    inline char* GetFirstRecord(const TDatPage* page) { +        switch (page->Format) { +            case MBDB_FORMAT_RAW: +                return (char*)page + sizeof(TDatPage); +            case MBDB_FORMAT_COMPRESSED: +                // Первая запись на сжатой странице сохраняется несжатой +                // сразу же после всех заголовков. +                // Алгоритм сохранения смотреть в TOutputRecordIterator::FlushBuffer +                return (char*)page + sizeof(TDatPage) + sizeof(TCompressedPage) + sizeof(NMicroBDB::TCompressedHeader); +        } +        return (char*)nullptr; +    } +} + +template <typename T> +size_t SizeOf(const T* rec) { +    return NMicroBDB::TGetSizeOf<T, NMicroBDB::THasSizeOf<T>::value>()(rec); +} + +template <typename T> +size_t MaxSizeOf() { +    return sizeof(T); +} + +static inline int DatNameToIdx(char iname[/*FILENAME_MAX*/], const char* dname) { +    if (!dname || !*dname) +        return MBDB_BAD_FILENAME; +    const char* ptr; +    if (!(ptr = strrchr(dname, '/'))) +        ptr = dname; +    if (!(ptr = strrchr(ptr, '.'))) +        ptr = strchr(dname, 0); +    if (ptr - dname > FILENAME_MAX - 5) +        return MBDB_BAD_FILENAME; +    memcpy(iname, dname, ptr - dname); +    strcpy(iname + (ptr - dname), ".idx"); +    return 0; +} diff --git a/library/cpp/microbdb/heap.h b/library/cpp/microbdb/heap.h new file mode 100644 index 00000000000..ef5a53534cb --- /dev/null +++ b/library/cpp/microbdb/heap.h @@ -0,0 +1,143 @@ +#pragma once + +#include "header.h" +#include "extinfo.h" + +#include <util/generic/vector.h> + +#include <errno.h> + +/////////////////////////////////////////////////////////////////////////////// + +/// Default comparator +template <class TVal> +struct TCompareByLess { +    inline bool operator()(const TVal* a, const TVal* b) const { +        return TLess<TVal>()(*a, *b); +    } +}; + +/////////////////////////////////////////////////////////////////////////////// + +template <class TVal, class TIterator, class TCompare = TCompareByLess<TVal>> +class THeapIter { +public: +    int Init(TIterator** iters, int count) { +        Term(); +        if (!count) +            return 0; +        if (!(Heap = (TIterator**)malloc(count * sizeof(TIterator*)))) +            return ENOMEM; + +        Count = count; +        count = 0; +        while (count < Count) +            if (count && !(*iters)->Next()) { //here first TIterator is NOT initialized! +                Count--; +                iters++; +            } else { +                Heap[count++] = *iters++; +            } +        count = Count / 2; +        while (--count > 0)     //Heap[0] is not changed! +            Sift(count, Count); //do not try to replace this code by make_heap +        return 0; +    } + +    int Init(TIterator* iters, int count) { +        TVector<TIterator*> a(count); +        for (int i = 0; i < count; ++i) +            a[i] = &iters[i]; +        return Init(&a[0], count); +    } + +    THeapIter() +        : Heap(nullptr) +        , Count(0) +    { +    } + +    THeapIter(TIterator* a, TIterator* b) +        : Heap(nullptr) +        , Count(0) +    { +        TIterator* arr[] = {a, b}; +        if (Init(arr, 2)) +            ythrow yexception() << "can't Init THeapIter"; +    } + +    THeapIter(TVector<TIterator>& v) +        : Heap(nullptr) +        , Count(0) +    { +        if (Init(&v[0], v.size())) { +            ythrow yexception() << "can't Init THeapIter"; +        } +    } + +    ~THeapIter() { +        Term(); +    } + +    inline const TVal* Current() const { +        if (!Count) +            return nullptr; +        return (*Heap)->Current(); +    } + +    inline const TIterator* CurrentIter() const { +        return *Heap; +    } + +    //for ends of last file will use Heap[0] = Heap[0] ! and +    //returns Current of eof so Current of eof MUST return NULL +    //possible this is bug and need fixing +    const TVal* Next() { +        if (!Count) +            return nullptr; +        if (!(*Heap)->Next())      //on first call unitialized first TIterator +            *Heap = Heap[--Count]; //will be correctly initialized + +        if (Count == 2) { +            if (TCompare()(Heap[1]->Current(), Heap[0]->Current())) +                DoSwap(Heap[1], Heap[0]); +        } else +            Sift(0, Count); + +        return Current(); +    } + +    inline bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { +        return (*Heap)->GetExtInfo(extInfo); +    } + +    inline const ui8* GetExtInfoRaw(size_t* len) const { +        return (*Heap)->GetExtInfoRaw(len); +    } + +    void Term() { +        ::free(Heap); +        Heap = nullptr; +        Count = 0; +    } + +protected: +    void Sift(int node, int end) { +        TIterator* x = Heap[node]; +        int son; +        for (son = 2 * node + 1; son < end; node = son, son = 2 * node + 1) { +            if (son < (end - 1) && TCompare()(Heap[son + 1]->Current(), Heap[son]->Current())) +                son++; +            if (TCompare()(Heap[son]->Current(), x->Current())) +                Heap[node] = Heap[son]; +            else +                break; +        } +        Heap[node] = x; +    } + +    TIterator** Heap; +    int Count; +}; + +/////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/microbdb/input.h b/library/cpp/microbdb/input.h new file mode 100644 index 00000000000..13b12f28b79 --- /dev/null +++ b/library/cpp/microbdb/input.h @@ -0,0 +1,1027 @@ +#pragma once + +#include "header.h" +#include "file.h" +#include "reader.h" + +#include <util/system/maxlen.h> +#include <util/system/event.h> +#include <util/system/thread.h> + +#include <thread> + +#include <sys/uio.h> + +#include <errno.h> + +template <class TFileManip> +inline ssize_t Readv(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { +    ssize_t read_count = 0; +    for (int n = 0; n < iovcnt; n++) { +        ssize_t last_read = fileManip.Read(iov[n].iov_base, iov[n].iov_len); +        if (last_read < 0) +            return -1; +        read_count += last_read; +    } +    return read_count; +} + +template <class TVal, typename TBasePageIter> +class TInputRecordIterator: public TBasePageIter { +    typedef THolder<NMicroBDB::IBasePageReader<TVal>> TReaderHolder; + +public: +    typedef TBasePageIter TPageIter; + +    TInputRecordIterator() { +        Init(); +    } + +    ~TInputRecordIterator() { +        Term(); +    } + +    const TVal* Current() const { +        return Rec; +    } + +    bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { +        if (!Rec) +            return false; +        return Reader->GetExtInfo(extInfo); +    } + +    const ui8* GetExtInfoRaw(size_t* len) const { +        if (!Rec) +            return nullptr; +        return Reader->GetExtInfoRaw(len); +    } + +    size_t GetRecSize() const { +        return Reader->GetRecSize(); +    } + +    size_t GetExtSize() const { +        return Reader->GetExtSize(); +    } + +    const TVal* Next() { +        if (RecNum) +            --RecNum; +        else { +            TDatPage* page = TPageIter::Next(); +            if (!page) { +                if (TPageIter::IsFrozen() && Reader.Get()) +                    Reader->SetClearFlag(); +                return Rec = nullptr; +            } else if (!!SelectReader()) +                return Rec = nullptr; +            RecNum = TPageIter::Current()->RecNum - 1; +        } +        return Rec = Reader->Next(); +    } + +    // Skip(0) == Current(); Skip(1) == Next() +    const TVal* Skip(int& num) { +        // Y_ASSERT(num >= 0); ? otherwise it gets into infinite loop +        while (num > RecNum) { +            num -= RecNum + 1; +            if (!TPageIter::Next() || !!SelectReader()) { +                RecNum = 0; +                return Rec = nullptr; +            } +            RecNum = TPageIter::Current()->RecNum - 1; +            Rec = Reader->Next(); +        } +        ++num; +        while (--num) +            Next(); +        return Rec; +    } + +    // begin reading from next page +    void Reset() { +        Rec = NULL; +        RecNum = 0; +        if (Reader.Get()) +            Reader->Reset(); +    } + +protected: +    int Init() { +        Rec = nullptr; +        RecNum = 0; +        Format = MBDB_FORMAT_NULL; +        return 0; +    } + +    int Term() { +        Reader.Reset(nullptr); +        Format = MBDB_FORMAT_NULL; +        Rec = nullptr; +        RecNum = 0; +        return 0; +    } + +    const TVal* GotoPage(int pageno) { +        if (!TPageIter::GotoPage(pageno) || !!SelectReader()) +            return Rec = nullptr; +        RecNum = TPageIter::Current()->RecNum - 1; +        return Rec = Reader->Next(); +    } + +    int SelectReader() { +        if (!TPageIter::Current()) +            return MBDB_UNEXPECTED_EOF; +        if (ui32(Format) != TPageIter::Current()->Format) { +            switch (TPageIter::Current()->Format) { +                case MBDB_FORMAT_RAW: +                    Reader.Reset(new NMicroBDB::TRawPageReader<TVal, TPageIter>(this)); +                    break; +                case MBDB_FORMAT_COMPRESSED: +                    Reader.Reset(new NMicroBDB::TCompressedReader<TVal, TPageIter>(this)); +                    break; +                default: +                    return MBDB_NOT_SUPPORTED; +            } +            Format = EPageFormat(TPageIter::Current()->Format); +        } else { +            Y_ASSERT(Reader.Get() != nullptr); +            Reader->Reset(); +        } +        return 0; +    } + +    const TVal* Rec; +    TReaderHolder Reader; +    int RecNum; //!< number of records on the current page after the current record +    EPageFormat Format; +}; + +template <class TBaseReader> +class TInputPageIterator: public TBaseReader { +public: +    typedef TBaseReader TReader; + +    TInputPageIterator() +        : Buf(nullptr) +    { +        Term(); +    } + +    ~TInputPageIterator() { +        Term(); +    } + +    TDatPage* Current() { +        return CurPage; +    } + +    int Freeze() { +        return (Frozen = (PageNum == -1) ? 0 : PageNum); +    } + +    void Unfreeze() { +        Frozen = -1; +    } + +    inline int IsFrozen() const { +        return Frozen + 1; +    } + +    inline size_t GetPageSize() const { +        return TReader::GetPageSize(); +    } + +    inline int GetPageNum() const { +        return PageNum; +    } + +    inline int IsEof() const { +        return Eof; +    } + +    TDatPage* Next() { +        if (PageNum >= Maxpage && ReadBuf()) { +            Eof = Eof ? Eof : TReader::IsEof(); +            return CurPage = nullptr; +        } +        return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); +    } + +    TDatPage* GotoPage(int pageno) { +        if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { +            PageNum = pageno; +            return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); +        } +        if (IsFrozen() || TReader::GotoPage(pageno)) +            return nullptr; +        Maxpage = PageNum = pageno - 1; +        Eof = 0; +        return Next(); +    } + +protected: +    int Init(size_t pages, int pagesOrBytes) { +        Term(); +        if (pagesOrBytes == -1) +            Bufpages = TReader::GetLastPage(); +        else if (pagesOrBytes) +            Bufpages = pages; +        else +            Bufpages = pages / GetPageSize(); +        if (!TReader::GetLastPage()) { +            Bufpages = 0; +            assert(Eof == 1); +            return 0; +        } +        int lastPage = TReader::GetLastPage(); +        if (lastPage >= 0) +            Bufpages = (int)Min(lastPage, Bufpages); +        Bufpages = Max(2, Bufpages); +        Eof = 0; +        ABuf.Alloc(Bufpages * GetPageSize()); +        return (Buf = ABuf.Begin()) ? 0 : ENOMEM; +        //        return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; +    } + +    int Term() { +        //        free(Buf); +        ABuf.Dealloc(); +        Buf = nullptr; +        Maxpage = PageNum = Frozen = -1; +        Bufpages = 0; +        Pages = 0; +        Eof = 1; +        CurPage = nullptr; +        return 0; +    } + +    int ReadBuf() { +        int nvec; +        iovec vec[2]; +        int maxpage = (Frozen == -1 ? Maxpage + 1 : Frozen) + Bufpages - 1; +        int minpage = Maxpage + 1; +        if (maxpage < minpage) +            return EAGAIN; +        minpage %= Bufpages; +        maxpage %= Bufpages; +        if (maxpage < minpage) { +            vec[0].iov_base = Buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (Bufpages - minpage); +            vec[1].iov_base = Buf; +            vec[1].iov_len = GetPageSize() * (maxpage + 1); +            nvec = 2; +        } else { +            vec[0].iov_base = Buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); +            nvec = 1; +        } +        TReader::ReadPages(vec, nvec, &Pages); +        Maxpage += Pages; +        return !Pages; +    } + +    int Maxpage, PageNum, Frozen, Bufpages, Eof, Pages; +    TDatPage* CurPage; +    //    TMappedArray<char> ABuf; +    TMappedAllocation ABuf; +    char* Buf; +}; + +template <class TBaseReader> +class TInputPageIteratorMT: public TBaseReader { +public: +    typedef TBaseReader TReader; + +    TInputPageIteratorMT() +        : CurBuf(0) +        , CurReadBuf(0) +        , Buf(nullptr) +    { +        Term(); +    } + +    ~TInputPageIteratorMT() { +        Term(); +    } + +    TDatPage* Current() { +        return CurPage; +    } + +    int Freeze() { +        return (Frozen = (PageNum == -1) ? 0 : PageNum); +    } + +    void Unfreeze() { +        Frozen = -1; +    } + +    inline int IsFrozen() const { +        return Frozen + 1; +    } + +    inline size_t GetPageSize() const { +        return TReader::GetPageSize(); +    } + +    inline int GetPageNum() const { +        return PageNum; +    } + +    inline int IsEof() const { +        return Eof; +    } + +    TDatPage* Next() { +        if (Eof) +            return CurPage = nullptr; +        if (PageNum >= Maxpage && ReadBuf()) { +            Eof = Eof ? Eof : TReader::IsEof(); +            return CurPage = nullptr; +        } +        return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); +    } + +    TDatPage* GotoPage(int pageno) { +        if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { +            PageNum = pageno; +            return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); +        } +        if (IsFrozen() || TReader::GotoPage(pageno)) +            return nullptr; +        Maxpage = PageNum = pageno - 1; +        Eof = 0; +        return Next(); +    } + +    void ReadPages() { +        //        fprintf(stderr, "ReadPages started\n"); +        bool eof = false; +        while (!eof) { +            QEvent[CurBuf].Wait(); +            if (Finish) +                return; +            int pages = ReadCurBuf(Bufs[CurBuf]); +            PagesM[CurBuf] = pages; +            eof = !pages; +            AEvent[CurBuf].Signal(); +            CurBuf ^= 1; +        } +    } + +protected: +    int Init(size_t pages, int pagesOrBytes) { +        Term(); +        if (pagesOrBytes == -1) +            Bufpages = TReader::GetLastPage(); +        else if (pagesOrBytes) +            Bufpages = pages; +        else +            Bufpages = pages / GetPageSize(); +        if (!TReader::GetLastPage()) { +            Bufpages = 0; +            assert(Eof == 1); +            return 0; +        } +        int lastPage = TReader::GetLastPage(); +        if (lastPage >= 0) +            Bufpages = (int)Min(lastPage, Bufpages); +        Bufpages = Max(2, Bufpages); +        Eof = 0; +        ABuf.Alloc(Bufpages * GetPageSize() * 2); +        Bufs[0] = ABuf.Begin(); +        Bufs[1] = Bufs[0] + Bufpages * GetPageSize(); +        //        return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; +        Finish = false; +        ReadThread = std::thread([this]() { +            TThread::SetCurrentThreadName("DatReader"); +            ReadPages(); +        }); +        QEvent[0].Signal(); +        return Bufs[0] ? 0 : ENOMEM; +    } + +    void StopThread() { +        Finish = true; +        QEvent[0].Signal(); +        QEvent[1].Signal(); +        ReadThread.join(); +    } + +    int Term() { +        //        free(Buf); +        if (ReadThread.joinable()) +            StopThread(); +        ABuf.Dealloc(); +        Buf = nullptr; +        Bufs[0] = nullptr; +        Bufs[1] = nullptr; +        Maxpage = MaxpageR = PageNum = Frozen = -1; +        Bufpages = 0; +        Pages = 0; +        Eof = 1; +        CurPage = nullptr; +        return 0; +    } + +    int ReadCurBuf(char* buf) { +        int nvec; +        iovec vec[2]; +        int maxpage = (Frozen == -1 ? MaxpageR + 1 : Frozen) + Bufpages - 1; +        int minpage = MaxpageR + 1; +        if (maxpage < minpage) +            return EAGAIN; +        minpage %= Bufpages; +        maxpage %= Bufpages; +        if (maxpage < minpage) { +            vec[0].iov_base = buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (Bufpages - minpage); +            vec[1].iov_base = buf; +            vec[1].iov_len = GetPageSize() * (maxpage + 1); +            nvec = 2; +        } else { +            vec[0].iov_base = buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); +            nvec = 1; +        } +        int pages; +        TReader::ReadPages(vec, nvec, &pages); +        MaxpageR += pages; +        return pages; +    } + +    int ReadBuf() { +        QEvent[CurReadBuf ^ 1].Signal(); +        AEvent[CurReadBuf].Wait(); +        Buf = Bufs[CurReadBuf]; +        Maxpage += (Pages = PagesM[CurReadBuf]); +        CurReadBuf ^= 1; +        return !Pages; +    } + +    int Maxpage, MaxpageR, PageNum, Frozen, Bufpages, Eof, Pages; +    TDatPage* CurPage; +    //    TMappedArray<char> ABuf; +    ui32 CurBuf; +    ui32 CurReadBuf; +    TMappedAllocation ABuf; +    char* Buf; +    char* Bufs[2]; +    ui32 PagesM[2]; +    TAutoEvent QEvent[2]; +    TAutoEvent AEvent[2]; +    std::thread ReadThread; +    bool Finish; +}; + +template <typename TFileManip> +class TInputPageFileImpl: private TNonCopyable { +protected: +    TFileManip FileManip; + +public: +    TInputPageFileImpl() +        : Pagesize(0) +        , Fd(-1) +        , Eof(1) +        , Error(0) +        , Pagenum(0) +        , Recordsig(0) +    { +        Term(); +    } + +    ~TInputPageFileImpl() { +        Term(); +    } + +    inline int IsEof() const { +        return Eof; +    } + +    inline int GetError() const { +        return Error; +    } + +    inline size_t GetPageSize() const { +        return Pagesize; +    } + +    inline int GetLastPage() const { +        return Pagenum; +    } + +    inline ui32 GetRecordSig() const { +        return Recordsig; +    } + +    inline bool IsOpen() const { +        return FileManip.IsOpen(); +    } + +protected: +    int Init(const char* fname, ui32 recsig, ui32* gotrecsig = nullptr, bool direct = false) { +        Error = FileManip.Open(fname, direct); +        return Error ? Error : Init(TFile(), recsig, gotrecsig); +    } + +    int Init(const TFile& file, ui32 recsig, ui32* gotrecsig = nullptr) { +        if (!file.IsOpen() && !FileManip.IsOpen()) +            return MBDB_NOT_INITIALIZED; +        if (file.IsOpen() && FileManip.IsOpen()) +            return MBDB_ALREADY_INITIALIZED; +        if (file.IsOpen()) { +            Error = FileManip.Init(file); +            if (Error) +                return Error; +        } + +        //        TArrayHolder<ui8> buf(new ui8[METASIZE + FS_BLOCK_SIZE]); +        //        ui8* ptr = (buf.Get() + FS_BLOCK_SIZE - ((ui64)buf.Get() & (FS_BLOCK_SIZE - 1))); +        TMappedArray<ui8> buf; +        buf.Create(METASIZE); +        ui8* ptr = &buf[0]; +        TDatMetaPage* meta = (TDatMetaPage*)ptr; +        ssize_t size = METASIZE; +        ssize_t ret; +        while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { +            Y_ASSERT(ret <= size); +            size -= ret; +            ptr += ret; +        } +        if (size) { +            FileManip.Close(); +            return Error = MBDB_BAD_METAPAGE; +        } +        if (gotrecsig) +            *gotrecsig = meta->RecordSig; +        return Init(TFile(), meta, recsig); +    } + +    int Init(TAutoPtr<IInputStream> input, ui32 recsig, ui32* gotrecsig = nullptr) { +        if (!input && !FileManip.IsOpen()) +            return MBDB_NOT_INITIALIZED; +        if (FileManip.IsOpen()) +            return MBDB_ALREADY_INITIALIZED; + +        Error = FileManip.Open(input); +        if (Error) +            return Error; + +        TArrayHolder<ui8> buf(new ui8[METASIZE]); +        ui8* ptr = buf.Get(); +        ssize_t size = METASIZE; +        ssize_t ret; +        while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { +            Y_ASSERT(ret <= size); +            size -= ret; +            ptr += ret; +        } +        if (size) { +            FileManip.Close(); +            return Error = MBDB_BAD_METAPAGE; +        } +        TDatMetaPage* meta = (TDatMetaPage*)buf.Get(); +        if (gotrecsig) +            *gotrecsig = meta->RecordSig; +        return Init(TFile(), meta, recsig); +    } + +    int Init(const TFile& file, const TDatMetaPage* meta, ui32 recsig) { +        if (!file.IsOpen() && !FileManip.IsOpen()) +            return MBDB_NOT_INITIALIZED; +        if (file.IsOpen() && FileManip.IsOpen()) +            return MBDB_ALREADY_INITIALIZED; +        if (file.IsOpen()) { +            Error = FileManip.Init(file); +            if (Error) +                return Error; +        } + +        if (meta->MetaSig != METASIG) +            Error = MBDB_BAD_METAPAGE; +        else if (meta->RecordSig != recsig) +            Error = MBDB_BAD_RECORDSIG; + +        if (Error) { +            FileManip.Close(); +            return Error; +        } + +        i64 flength = FileManip.GetLength(); +        if (flength >= 0) { +            i64 fsize = flength; +            fsize -= METASIZE; +            if (fsize % meta->PageSize) +                return Error = MBDB_BAD_FILE_SIZE; +            Pagenum = (int)(fsize / meta->PageSize); +        } else { +            Pagenum = -1; +        } +        Pagesize = meta->PageSize; +        Recordsig = meta->RecordSig; +        Error = Eof = 0; +        return Error; +    } + +    int ReadPages(iovec* vec, int nvec, int* pages) { +        *pages = 0; + +        if (Eof || Error) +            return Error; + +        ssize_t size = 0, delta = 0, total = 0; +        iovec* pvec = vec; +        int vsize = nvec; + +        while (vsize && (size = Readv(FileManip, pvec, (int)Min(vsize, 16))) > 0) { +            total += size; +            if (delta) { +                size += delta; +                pvec->iov_len += delta; +                pvec->iov_base = (char*)pvec->iov_base - delta; +                delta = 0; +            } +            while (size) { +                if ((size_t)size >= pvec->iov_len) { +                    size -= pvec->iov_len; +                    ++pvec; +                    --vsize; +                } else { +                    delta = size; +                    pvec->iov_len -= size; +                    pvec->iov_base = (char*)pvec->iov_base + size; +                    size = 0; +                } +            } +        } +        if (delta) { +            pvec->iov_len += delta; +            pvec->iov_base = (char*)pvec->iov_base - delta; +        } +        if (size < 0) +            return Error = errno ? errno : MBDB_READ_ERROR; +        if (total % Pagesize) +            return Error = MBDB_BAD_FILE_SIZE; +        if (vsize) +            Eof = 1; +        *pages = total / Pagesize; // it would be better to assign it after the for-loops +        for (; total; ++vec, total -= size) +            for (size = 0; size < total && (size_t)size < vec->iov_len; size += Pagesize) +                if (((TDatPage*)((char*)vec->iov_base + size))->PageSig != PAGESIG) +                    return Error = MBDB_BAD_PAGESIG; +        return Error; +    } + +    int GotoPage(int page) { +        if (Error) +            return Error; +        Eof = 0; +        i64 offset = (i64)page * Pagesize + METASIZE; +        if (offset != FileManip.Seek(offset, SEEK_SET)) +            Error = MBDB_BAD_FILE_SIZE; +        return Error; +    } + +    int Term() { +        return FileManip.Close(); +    } + +    size_t Pagesize; +    int Fd; +    int Eof; +    int Error; +    int Pagenum; //!< number of pages in this file +    ui32 Recordsig; +}; + +template <class TBaseReader> +class TMappedInputPageIterator: public TBaseReader { +public: +    typedef TBaseReader TReader; + +    TMappedInputPageIterator() { +        Term(); +    } + +    ~TMappedInputPageIterator() { +        Term(); +    } + +    TDatPage* Current() { +        return CurPage; +    } + +    inline size_t GetPageSize() const { +        return TReader::GetPageSize(); +    } + +    inline int GetPageNum() const { +        return PageNum; +    } + +    inline int IsEof() const { +        return Eof; +    } + +    inline int IsFrozen() const { +        return 0; +    } + +    TDatPage* Next() { +        i64 pos = (i64)(++PageNum) * GetPageSize() + METASIZE; +        if (pos < 0 || pos >= (i64)TReader::GetSize()) { +            Eof = 1; +            return CurPage = nullptr; +        } +        return CurPage = (TDatPage*)((char*)TReader::GetData() + pos); +    } + +protected: +    int Init(size_t /*pages*/, int /*pagesOrBytes*/) { +        Term(); +        Eof = 0; +        return 0; +    } + +    int Term() { +        PageNum = -1; +        Eof = 1; +        CurPage = nullptr; +        return 0; +    } + +    TDatPage* GotoPage(int pageno) { +        PageNum = pageno - 1; +        Eof = 0; +        return Next(); +    } + +    int PageNum, Eof, Pages, Pagenum; +    TDatPage* CurPage; +}; + +using TInputPageFile = TInputPageFileImpl<TInputFileManip>; + +template <class TVal, +          typename TBaseRecIter = TInputRecordIterator<TVal, TInputPageIterator<TInputPageFile>>> +class TInDatFileImpl: public TBaseRecIter { +public: +    typedef TBaseRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TPageIter::TReader TReader; +    using TRecIter::GotoPage; + +    int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr, bool direct = false) { +        int ret = TReader::Init(fname, TVal::RecordSig, gotRecordSig, direct); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Open(const TFile& file, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { +        int ret = TReader::Init(file, TVal::RecordSig, gotRecordSig); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Open(TAutoPtr<IInputStream> input, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { +        int ret = TReader::Init(input, TVal::RecordSig, gotRecordSig); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Open(const TFile& file, const TDatMetaPage* meta, size_t pages = 1, int pagesOrBytes = 1) { +        int ret = TReader::Init(file, meta, TVal::RecordSig); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Close() { +        int ret1 = TRecIter::Term(); +        int ret2 = TPageIter::Term(); +        int ret3 = TReader::Term(); +        return ret1 ? ret1 : ret2 ? ret2 : ret3; +    } + +    const TVal* GotoLastPage() { +        return TReader::GetLastPage() <= 0 ? nullptr : TRecIter::GotoPage(TReader::GetLastPage() - 1); +    } + +private: +    int Open2(size_t pages, int pagesOrBytes) { +        int ret = TPageIter::Init(pages, pagesOrBytes); +        if (!ret) +            ret = TRecIter::Init(); +        if (ret) +            Close(); +        return ret; +    } +}; + +template <class TVal> +class TInIndexFile: protected  TInDatFileImpl<TVal> { +    typedef TInDatFileImpl<TVal> TDatFile; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TExtInfoType<TVal>::TResult TExtInfo; + +public: +    using TDatFile::IsOpen; + +    TInIndexFile() +        : Index0(nullptr) +    { +    } + +    int Open(const char* fname, size_t pages = 2, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { +        int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); +        if (ret) +            return ret; +        if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { +            TDatFile::Close(); +            return MBDB_NO_MEMORY; +        } +        if (!TExtInfoType<TVal>::Exists && SizeOf((TVal*)nullptr)) +            RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TVal*)nullptr)); +        TDatFile::Next(); +        memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); +        return 0; +    } + +    int Close() { +        free(Index0); +        Index0 = nullptr; +        return TDatFile::Close(); +    } + +    inline int GetError() const { +        return TDatFile::GetError(); +    } + +    int FindKey(const TVal* akey, const TExtInfo* extInfo = nullptr) { +        assert(IsOpen()); +        if (TExtInfoType<TVal>::Exists || !SizeOf((TVal*)nullptr)) +            return FindVszKey(akey, extInfo); +        int num = FindKeyOnPage(Index0, akey); +        TDatPage* page = TPageIter::GotoPage(num + 1); +        if (!page) +            return 0; +        num = FindKeyOnPage(page, akey); +        num += (TPageIter::GetPageNum() - 1) * RecsOnPage; +        return num; +    } + +    int FindVszKey(const TVal* akey, const TExtInfo* extInfo = NULL) { +        int num = FindVszKeyOnPage(Index0, akey, extInfo); +        int num_add = 0; +        for (int p = 0; p < num; p++) { +            TDatPage* page = TPageIter::GotoPage(p + 1); +            if (!page) +                return 0; +            num_add += page->RecNum; +        } +        TDatPage* page = TPageIter::GotoPage(num + 1); +        if (!page) +            return 0; +        num = FindVszKeyOnPage(page, akey, extInfo); +        num += num_add; +        return num; +    } + +protected: +    int FindKeyOnPage(TDatPage* page, const TVal* key) { +        int left = 0; +        int right = page->RecNum - 1; +        int recsize = DatCeil(SizeOf((TVal*)nullptr)); +        while (left < right) { +            int middle = (left + right) >> 1; +            if (*((TVal*)((char*)page + sizeof(TDatPage) + middle * recsize)) < *key) +                left = middle + 1; +            else +                right = middle; +        } +        //borders check (left and right) +        return (left == 0 || *((TVal*)((char*)page + sizeof(TDatPage) + left * recsize)) < *key) ? left : left - 1; +    } + +    // will deserialize rawExtinfoA to extInfoA only if necessery +    inline bool KeyLess_(const TVal* a, const TVal* b, +                         TExtInfo* extInfoA, const TExtInfo* extInfoB, +                         const ui8* rawExtInfoA, size_t rawLen) { +        if (*a < *b) { +            return true; +        } else if (!extInfoB || *b < *a) { +            return false; +        } else { +            // *a == *b && extInfoB +            Y_PROTOBUF_SUPPRESS_NODISCARD extInfoA->ParseFromArray(rawExtInfoA, rawLen); +            return (*extInfoA < *extInfoB); +        } +    } + +    int FindVszKeyOnPage(TDatPage* page, const TVal* key, const TExtInfo* extInfo) { +        TVal* cur = (TVal*)((char*)page + sizeof(TDatPage)); +        ui32 recnum = page->RecNum; +        if (!TExtInfoType<TVal>::Exists) { +            for (; recnum > 0 && *cur < *key; --recnum) +                cur = (TVal*)((char*)cur + DatCeil(SizeOf(cur))); +        } else { +            size_t ll; +            size_t l; +            size_t sz = NMicroBDB::SizeOfExt(cur, &ll, &l); +            TExtInfo ei; +            for (; recnum > 0 && KeyLess_(cur, key, &ei, extInfo, (ui8*)cur + sz + ll, l); --recnum) { +                cur = (TVal*)((ui8*)cur + DatCeil(sz + ll + l)); +                sz = NMicroBDB::SizeOfExt(cur, &ll, &l); +            } +        } + +        int idx = page->RecNum - recnum - 1; +        return (idx >= 0) ? idx : 0; +    } + +    TDatPage* Index0; +    int RecsOnPage; +}; + +template <class TVal, class TKey, class TPageIterator = TInputPageIterator<TInputPageFile>> +class TKeyFileMixin: public TInDatFileImpl<TVal, TInputRecordIterator<TVal, TPageIterator>> { +protected: +    TInIndexFile<TKey> KeyFile; +}; + +template <class TVal, class TKey, class TBase = TKeyFileMixin<TVal, TKey>> +class TDirectInDatFile: public TBase { +    typedef TBase TDatFile; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TDatFile::TPageIter TPageIter; + +public: +    void Open(const char* path, size_t pages = 1, size_t keypages = 1, int pagesOrBytes = 1) { +        int ret; +        ui32 gotRecordSig = 0; + +        ret = TDatFile::Open(path, pages, pagesOrBytes, &gotRecordSig); +        if (ret) { +            ythrow yexception() << ErrorMessage(ret, "Failed to open input file", path, TVal::RecordSig, gotRecordSig); +        } +        char KeyName[PATH_MAX + 1]; +        if (DatNameToIdx(KeyName, path)) { +            ythrow yexception() << ErrorMessage(MBDB_BAD_FILENAME, "Failed to open input file", path); +        } +        gotRecordSig = 0; +        ret = KeyFile.Open(KeyName, keypages, 1, &gotRecordSig); +        if (ret) { +            ythrow yexception() << ErrorMessage(ret, "Failed to open input keyfile", KeyName, TKey::RecordSig, gotRecordSig); +        } +    } + +    void Close() { +        int ret; + +        if (TDatFile::IsOpen() && (ret = TDatFile::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing input file"); +        if ((ret = TDatFile::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing input file"); + +        if (KeyFile.IsOpen() && (ret = KeyFile.GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing input keyfile"); +        if ((ret = KeyFile.Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing input keyfile"); +    } + +    const TVal* FindRecord(const TKey* key, const typename TExtInfoType<TKey>::TResult* extInfo = nullptr) { +        int page = KeyFile.FindKey(key, extInfo); +        const TVal* val = TRecIter::GotoPage(page); +        if (!TExtInfoType<TVal>::Exists || !extInfo) { +            TKey k; +            while (val) { +                TMakeExtKey<TVal, TKey>::Make(&k, nullptr, val, nullptr); +                if (!(k < *key)) +                    break; +                val = TRecIter::Next(); +            } +        } else { +            typename TExtInfoType<TVal>::TResult valExt; +            TKey k; +            typename TExtInfoType<TKey>::TResult kExt; +            while (val) { +                TRecIter::GetExtInfo(&valExt); +                TMakeExtKey<TVal, TKey>::Make(&k, &kExt, val, &valExt); +                if (*key < k || !(k < *key) && !(kExt < *extInfo)) //  k > *key || k == *key && kExt >= *extInfo +                    break; +                val = TRecIter::Next(); +            } +        } +        return val; +    }; + +    int FindPagesNo(const TKey* key, const typename TExtInfoType<TVal>::TResult* extInfo = NULL) { +        return KeyFile.FindKey(key, extInfo); +    } + +protected: +    using TBase::KeyFile; +}; diff --git a/library/cpp/microbdb/microbdb.cpp b/library/cpp/microbdb/microbdb.cpp new file mode 100644 index 00000000000..c10dbdf1268 --- /dev/null +++ b/library/cpp/microbdb/microbdb.cpp @@ -0,0 +1 @@ +#include "microbdb.h" diff --git a/library/cpp/microbdb/microbdb.h b/library/cpp/microbdb/microbdb.h new file mode 100644 index 00000000000..75218873374 --- /dev/null +++ b/library/cpp/microbdb/microbdb.h @@ -0,0 +1,54 @@ +#pragma once + +#include <util/folder/dirut.h> + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4706) /*assignment within conditional expression*/ +#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ +#endif + +#include "align.h" +#include "extinfo.h" +#include "header.h" +#include "reader.h" +#include "heap.h" +#include "file.h" +#include "sorter.h" +#include "input.h" +#include "output.h" +#include "sorterdef.h" + +inline int MakeSorterTempl(char path[/*FILENAME_MAX*/], const char* prefix) { +    int ret = MakeTempDir(path, prefix); +    if (!ret && strlcat(path, "%06d", FILENAME_MAX) > FILENAME_MAX - 100) +        ret = EINVAL; +    if (ret) +        path[0] = 0; +    return ret; +} + +inline int GetMeta(TFile& file, TDatMetaPage* meta) { +    ui8 buf[METASIZE], *ptr = buf; +    ssize_t size = sizeof(buf), ret; +    while (size && (ret = file.Read(ptr, size)) > 0) { +        size -= ret; +        ptr += ret; +    } +    if (size) +        return MBDB_BAD_FILE_SIZE; +    ptr = buf; // gcc 4.4 warning fix +    *meta = *(TDatMetaPage*)ptr; +    return (meta->MetaSig == METASIG) ? 0 : MBDB_BAD_METAPAGE; +} + +template <class TRec> +inline bool IsDatFile(const char* fname) { +    TDatMetaPage meta; +    TFile f(fname, RdOnly); +    return !GetMeta(f, &meta) && meta.RecordSig == TRec::RecordSig; +} + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif diff --git a/library/cpp/microbdb/noextinfo.proto b/library/cpp/microbdb/noextinfo.proto new file mode 100644 index 00000000000..6a78882e079 --- /dev/null +++ b/library/cpp/microbdb/noextinfo.proto @@ -0,0 +1,4 @@ + +message TNoExtInfo { +} + diff --git a/library/cpp/microbdb/output.h b/library/cpp/microbdb/output.h new file mode 100644 index 00000000000..d0ecab21085 --- /dev/null +++ b/library/cpp/microbdb/output.h @@ -0,0 +1,1049 @@ +#pragma once + +#include "header.h" +#include "file.h" + +#include <util/generic/buffer.h> +#include <util/memory/tempbuf.h> + +#include <sys/uio.h> + +template <class TFileManip> +inline ssize_t Writev(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { +    ssize_t written_count = 0; +    for (int n = 0; n < iovcnt; n++) { +        ssize_t last_write = fileManip.Write(iov[n].iov_base, iov[n].iov_len); +        if (last_write < 0) +            return -1; +        written_count += last_write; +    } +    return written_count; +} + +//********************************************************************* +struct TFakeIndexer { +    inline void NextPage(TDatPage*) noexcept { +    } +}; + +struct TCallbackIndexer { +    typedef void (*TCallback)(void* This, const TDatPage* page); + +    TCallbackIndexer() { +        Callback = nullptr; +    } + +    void SetCallback(void* t, TCallback c) { +        This = t; +        Callback = c; +    } + +    void NextPage(TDatPage* dat) { +        Callback(This, dat); +    } + +    TCallback Callback; +    void* This; +}; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer = TFakeIndexer, typename TCompressor = TFakeCompression> +class TOutputRecordIterator; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer> +class TOutputRecordIterator<TVal, TBasePageIter, TBaseIndexer, TFakeCompression> +   : public TBasePageIter, public TBaseIndexer { +public: +    enum EOffset { +        WrongOffset = size_t(-1) +    }; + +    typedef TBasePageIter TPageIter; +    typedef TBaseIndexer TIndexer; + +    TOutputRecordIterator() { +        Clear(); +    } + +    ~TOutputRecordIterator() { +        Term(); +    } + +    inline const TVal* Current() const { +        return Rec; +    } + +    const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { +        NMicroBDB::AssertValid(v); +        size_t len = SizeOf(v); +        if (!TExtInfoType<TVal>::Exists) +            return (Reserve(len)) ? (TVal*)memcpy(Rec, v, len) : nullptr; +        else if (extInfo) { +            size_t extSize = extInfo->ByteSize(); +            size_t extLenSize = len_long((i64)extSize); +            if (!Reserve(len + extLenSize + extSize)) +                return nullptr; +            memcpy(Rec, v, len); +            out_long((i64)extSize, (char*)Rec + len); +            extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); +            return Rec; +        } else { +            size_t extLenSize = len_long((i64)0); +            if (!Reserve(len + extLenSize)) +                return nullptr; +            memcpy(Rec, v, len); +            out_long((i64)0, (char*)Rec + len); +            return Rec; +        } +    } + +    const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { +        NMicroBDB::AssertValid(v); +        size_t sz = SizeOf(v); +        if (!Reserve(sz + extLen)) +            return nullptr; +        memcpy(Rec, v, sz); +        memcpy((ui8*)Rec + sz, extInfoRaw, extLen); +        return Rec; +    } + +    // use values stored in microbdb readers/writers internal buffer only. +    // method expects serialized extInfo after this record +    const TVal* PushWithExtInfo(const TVal* v) { +        NMicroBDB::AssertValid(v); +        size_t extSize; +        size_t extLenSize; +        size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); +        sz += extLenSize + extSize; +        if (!Reserve(sz)) +            return nullptr; +        memcpy(Rec, v, sz); +        return Rec; +    } + +    TVal* Reserve(size_t len) { +        if (CurLen + DatCeil(len) > TPageIter::GetPageSize()) { +            if (sizeof(TDatPage) + DatCeil(len) > TPageIter::GetPageSize()) +                return Rec = nullptr; +            if (TPageIter::Current() && RecNum) { +                TPageIter::Current()->RecNum = RecNum; +                TPageIter::Current()->Format = MBDB_FORMAT_RAW; +                memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); +                TIndexer::NextPage(TPageIter::Current()); +                RecNum = 0; +            } +            if (!TPageIter::Next()) { +                CurLen = TPageIter::GetPageSize(); +                return Rec = nullptr; +            } +            CurLen = sizeof(TDatPage); +        } +        LenForOffset = CurLen; +        Rec = (TVal*)((char*)TPageIter::Current() + CurLen); +        DatSet(Rec, len); + +        CurLen += DatCeil(len); + +        ++RecNum; +        return Rec; +    } + +    void Flush() { +        TPageIter::Current()->RecNum = RecNum; +        TPageIter::Current()->Format = MBDB_FORMAT_RAW; +    } + +    size_t Offset() const { +        return Rec ? TPageIter::Offset() + LenForOffset : WrongOffset; +    } + +    void ResetDat() { +        CurLen = (char*)Rec - (char*)TPageIter::Current(); +        size_t len; +        if (!TExtInfoType<TVal>::Exists) { +            len = SizeOf(Rec); +        } else { +            size_t ll; +            size_t l; +            len = NMicroBDB::SizeOfExt(Rec, &ll, &l); +            len += ll + l; +        } +        CurLen += DatCeil(len); +    } + +protected: +    void Clear() { +        Rec = nullptr; +        RecNum = 0; +        CurLen = 0; +        LenForOffset = 0; +    } + +    int Init() { +        Clear(); +        CurLen = TPageIter::GetPageSize(); +        return 0; +    } + +    int Term() { +        if (TPageIter::Current()) { +            TPageIter::Current()->RecNum = RecNum; +            TPageIter::Current()->Format = MBDB_FORMAT_RAW; +            memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); +            RecNum = 0; +        } +        int ret = !TPageIter::Current() && RecNum; +        Clear(); +        return ret; +    } + +    int GotoPage(int pageno) { +        if (TPageIter::Current()) { +            TPageIter::Current()->RecNum = RecNum; +            TPageIter::Current()->Format = MBDB_FORMAT_RAW; +            memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); +        } +        int ret = TPageIter::GotoPage(pageno); +        if (!ret) { +            RecNum = 0; +            CurLen = sizeof(TDatPage); +        } +        return ret; +    } + +    TVal* Rec; +    int RecNum; +    size_t CurLen; +    size_t LenForOffset; +}; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer, typename TAlgorithm> +class TOutputRecordIterator +   : public TBasePageIter, +      public TBaseIndexer, +      private TAlgorithm { +    class TPageBuffer { +    public: +        void Init(size_t page) { +            Pos = 0; +            RecNum = 0; +            Size = Min(page / 2, size_t(64 << 10)); +            Data.Reset(new ui8[Size]); +        } + +        void Clear() { +            Pos = 0; +            RecNum = 0; +        } + +        inline bool Empty() const { +            return RecNum == 0; +        } + +    public: +        size_t Size; +        size_t Pos; +        int RecNum; +        TArrayHolder<ui8> Data; +    }; + +public: +    typedef TBasePageIter TPageIter; +    typedef TBaseIndexer TIndexer; + +    TOutputRecordIterator() +        : Rec(nullptr) +        , RecNum(0) +    { +    } + +    ~TOutputRecordIterator() { +        Term(); +    } + +    const TVal* Current() const { +        return Rec; +    } + +    const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { +        NMicroBDB::AssertValid(v); +        size_t len = SizeOf(v); +        if (!TExtInfoType<TVal>::Exists) +            return (Reserve(len)) ? (TVal*)memcpy((TVal*)Rec, v, len) : nullptr; +        else if (extInfo) { +            size_t extSize = extInfo->ByteSize(); +            size_t extLenSize = len_long((i64)extSize); +            if (!Reserve(len + extLenSize + extSize)) +                return nullptr; +            memcpy(Rec, v, len); +            out_long((i64)extSize, (char*)Rec + len); +            extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); +            return Rec; +        } else { +            size_t extLenSize = len_long((i64)0); +            if (!Reserve(len + extLenSize)) +                return nullptr; +            memcpy(Rec, v, len); +            out_long((i64)0, (char*)Rec + len); +            return Rec; +        } +    } + +    const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { +        NMicroBDB::AssertValid(v); +        size_t sz = SizeOf(v); +        if (!Reserve(sz + extLen)) +            return NULL; +        memcpy(Rec, v, sz); +        memcpy((ui8*)Rec + sz, extInfoRaw, extLen); +        return Rec; +    } + +    // use values stored in microbdb readers/writers internal buffer only. +    // method expects serialized extInfo after this record +    const TVal* PushWithExtInfo(const TVal* v) { +        NMicroBDB::AssertValid(v); +        size_t extSize; +        size_t extLenSize; +        size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); +        sz += extLenSize + extSize; +        if (!Reserve(sz)) +            return nullptr; +        memcpy(Rec, v, sz); +        return Rec; +    } + +    TVal* Reserve(const size_t len) { +        const size_t aligned = DatCeil(len); + +        if (!TPageIter::Current()) { // Allocate fist page +            if (!TPageIter::Next()) { +                CurLen = TPageIter::GetPageSize(); +                return Rec = nullptr; +            } +            CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); +        } + +        if (Buffer.Pos + aligned > Buffer.Size) { +            if (Buffer.Pos == 0) +                return Rec = nullptr; +            if (FlushBuffer()) +                return Rec = nullptr; +            if (Buffer.Pos + aligned + sizeof(TDatPage) + sizeof(TCompressedPage) > Buffer.Size) +                return Rec = nullptr; +        } + +        Rec = (TVal*)((char*)Buffer.Data.Get() + Buffer.Pos); +        DatSet(Rec, len); // len is correct because DatSet set align tail to zero + +        Buffer.RecNum++; +        Buffer.Pos += aligned; +        ++RecNum; +        return Rec; +    } + +    void Flush() { +        if (!Buffer.Empty()) { +            FlushBuffer(); +            TPageIter::Current()->RecNum = RecNum; +            TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; +        } +    } + +    size_t Offset() const { +        // According to vadya@ there is no evil to return 0 all the time +        return 0; +    } + +    void ResetDat() { +        Buffer.Pos = (char*)Rec - (char*)Buffer.Data.Get(); +        size_t len = SizeOf(Rec); +        Buffer.Pos += DatCeil(len); +    } + +protected: +    void Clear() { +        RecNum = 0; +        Rec = nullptr; +        Count = 0; +        CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); +        Buffer.Clear(); +    } + +    int Init() { +        Clear(); +        Buffer.Init(TPageIter::GetPageSize()); +        TAlgorithm::Init(); +        return 0; +    } + +    int Term() { +        if (TPageIter::Current()) +            Commit(); +        int ret = !TPageIter::Current() && RecNum; +        Clear(); +        TAlgorithm::Term(); +        return ret; +    } + +    int GotoPage(int pageno) { +        if (TPageIter::Current()) +            Commit(); +        int ret = TPageIter::GotoPage(pageno); +        if (!ret) +            Reset(); +        return ret; +    } + +private: +    void Commit() { +        Flush(); +        TPageIter::Current()->RecNum = RecNum; +        TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; +        SetCompressedPageHeader(); + +        memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); +        RecNum = 0; +        Count = 0; +    } + +    inline void SetCompressedPageHeader() { +        TCompressedPage* const hdr = (TCompressedPage*)((ui8*)TPageIter::Current() + sizeof(TDatPage)); + +        hdr->BlockCount = Count; +        hdr->Algorithm = TAlgorithm::Code; +        hdr->Version = 0; +        hdr->Reserved = 0; +    } + +    inline void Reset() { +        RecNum = 0; +        CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); +        Count = 0; +        Buffer.Clear(); +    } + +    int FlushBuffer() { +        TArrayHolder<ui8> data; +        const ui8* const buf = Buffer.Data.Get(); +        size_t first = 0; + +        if (!TExtInfoType<TVal>::Exists) +            first = DatCeil(SizeOf((TVal*)buf)); +        else { +            size_t ll; +            size_t l; +            first = NMicroBDB::SizeOfExt((const TVal*)buf, &ll, &l); +            first = DatCeil(first + ll + l); +        } + +        size_t total = sizeof(NMicroBDB::TCompressedHeader) + first + ((Buffer.RecNum == 1) ? 0 : TAlgorithm::CompressBound(Buffer.Pos - first)); +        size_t real = total; + +        { +            ui8* p = nullptr; +            NMicroBDB::TCompressedHeader* hdr = nullptr; + +            // 1. Choose data destination (temporary buffer or dat-page) +            if (CurLen + total > TPageIter::GetPageSize()) { +                data.Reset(new ui8[total]); + +                hdr = (NMicroBDB::TCompressedHeader*)data.Get(); +                p = data.Get() + sizeof(NMicroBDB::TCompressedHeader); +            } else { +                p = (ui8*)TPageIter::Current() + CurLen; +                hdr = (NMicroBDB::TCompressedHeader*)p; +                p += sizeof(NMicroBDB::TCompressedHeader); +            } + +            // 2. Compress data + +            // Fill header and first record +            hdr->Original = Buffer.Pos; +            hdr->Compressed = 0; +            hdr->Count = Buffer.RecNum; +            hdr->Reserved = 0; +            memcpy(p, Buffer.Data.Get(), first); +            // Fill compressed part +            if (Buffer.RecNum > 1) { +                size_t size = TAlgorithm::CompressBound(Buffer.Pos - first); + +                p += first; +                TAlgorithm::Compress(p, size, buf + first, Buffer.Pos - first); + +                hdr->Compressed = size; + +                real = sizeof(NMicroBDB::TCompressedHeader) + first + size; +            } +        } + +        Y_ASSERT(sizeof(TDatPage) + sizeof(TCompressedPage) + real <= TPageIter::GetPageSize()); + +        // 3. Check page capacity + +        if (CurLen + real > TPageIter::GetPageSize()) { +            Y_ASSERT(data.Get() != nullptr); + +            if (TPageIter::Current() && RecNum) { +                RecNum = RecNum - Buffer.RecNum; +                TPageIter::Current()->RecNum = RecNum; +                TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; +                SetCompressedPageHeader(); +                memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); +                TIndexer::NextPage(TPageIter::Current()); +                RecNum = Buffer.RecNum; +                Count = 0; +            } +            if (!TPageIter::Next()) { +                CurLen = TPageIter::GetPageSize(); +                return MBDB_NO_MEMORY; +            } +            CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); +        } + +        // 4. Flush data and reset buffer state + +        if (data.Get()) +            memcpy((ui8*)TPageIter::Current() + CurLen, data.Get(), real); +        CurLen += real; +        ++Count; +        Buffer.Clear(); +        return 0; +    } + +private: +    size_t CurLen; +    TPageBuffer Buffer; +    TVal* Rec; +    ui32 Count; //! < count of compressed blocks on page +public: +    int RecNum; +}; + +template <typename TBaseWriter> +class TOutputPageIterator: public TBaseWriter { +public: +    typedef TBaseWriter TWriter; + +    TOutputPageIterator() +        : Buf(nullptr) +    { +        Clear(); +    } + +    ~TOutputPageIterator() { +        Term(); +    } + +    TDatPage* Current() { +        return CurPage; +    } + +    size_t Offset() const { +        //Cout << "PS = " << TWriter::GetPageSize() << "; PN = " << PageNum << "; MS = " << METASIZE << Endl; +        return TWriter::GetPageSize() * PageNum + METASIZE; +    } + +    int Freeze() { +        return (Frozen = (PageNum == -1) ? 0 : (int)PageNum); +    } + +    void Unfreeze() { +        Frozen = -1; +    } + +    inline int IsFrozen() const { +        return Frozen + 1; +    } + +    inline size_t GetPageSize() const { +        return TWriter::GetPageSize(); +    } + +    inline int GetPageNum() const { +        return (int)PageNum; +    } + +    TDatPage* Next() { +        if (PageNum >= Maxpage && WriteBuf()) +            return CurPage = nullptr; +        CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); +        memset(CurPage, 0, sizeof(TDatPage)); +        return CurPage; +    } + +protected: +    int Init(size_t pages, int pagesOrBytes) { +        Term(); +        if (pagesOrBytes) +            Bufpages = pages; +        else +            Bufpages = pages / GetPageSize(); +        Bufpages = Max<size_t>(1, Bufpages); +        Maxpage = Bufpages - 1; +        //        if (!(Buf = (char*)malloc(Bufpages * GetPageSize()))) +        //            return ENOMEM; +        ABuf.Alloc(Bufpages * GetPageSize()); +        Buf = ABuf.Begin(); +        if (TWriter::Memo) +            Freeze(); +        return 0; +    } + +    int Term() { +        Unfreeze(); +        int ret = (PageNum < 0) ? 0 : WriteBuf(); +        Clear(); +        return ret; +    } + +    int GotoPage(int pageno) { +        int ret = EAGAIN; +        if (IsFrozen() || PageNum >= 0 && ((ret = WriteBuf())) || ((ret = TWriter::GotoPage(pageno)))) +            return ret; +        PageNum = pageno; +        Maxpage = Bufpages - 1 + pageno; +        CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); +        memset(CurPage, 0, sizeof(TDatPage)); +        return 0; +    } + +    void Clear() { +        ABuf.Dealloc(); +        Buf = nullptr; +        Maxpage = PageNum = Frozen = -1; +        Bufpages = 0; +        CurPage = nullptr; +    } + +    int WriteBuf() { +        int nvec; +        iovec vec[2]; +        ssize_t minpage = Maxpage - Bufpages + 1; +        ssize_t maxpage = Frozen == -1 ? PageNum : Frozen - 1; +        if (maxpage < minpage) +            return EAGAIN; +        minpage %= Bufpages; +        maxpage %= Bufpages; +        if (maxpage < minpage) { +            vec[0].iov_base = Buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (Bufpages - minpage); +            vec[1].iov_base = Buf; +            vec[1].iov_len = GetPageSize() * (maxpage + 1); +            nvec = 2; +        } else { +            vec[0].iov_base = Buf + GetPageSize() * minpage; +            vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); +            nvec = 1; +        } +        if (TWriter::WritePages(vec, nvec)) +            return EIO; +        Maxpage += (maxpage < minpage) ? (Bufpages - minpage + maxpage + 1) : (maxpage - minpage + 1); +        return 0; +    } + +    ssize_t Maxpage; +    ssize_t Bufpages; +    ssize_t PageNum; +    int Frozen; +    TDatPage* CurPage; +    char* Buf; +    TMappedAllocation ABuf; +}; + +template <class TFileManip> +class TOutputPageFileImpl: private TNonCopyable { +public: +    TOutputPageFileImpl() +        : Pagesize(0) +        , Eof(1) +        , Error(0) +        , Memo(0) +        , Recordsig(0) +    { +    } + +    ~TOutputPageFileImpl() { +        Term(); +    } + +    inline int IsEof() const { +        return Eof; +    } + +    inline int GetError() const { +        return Error; +    } + +    inline bool IsOpen() const { +        return FileManip.IsOpen(); +    } + +    inline size_t GetPageSize() const { +        return Pagesize; +    } + +    inline ui32 GetRecordSig() const { +        return Recordsig; +    } + +    int Init(const char* fname, size_t pagesize, ui32 recsig, bool direct = false) { +        Memo = 0; +        if (FileManip.IsOpen()) +            return MBDB_ALREADY_INITIALIZED; + +        if (!fname) { +            Eof = Error = 0; +            Pagesize = pagesize; +            Recordsig = recsig; +            Memo = 1; +            return 0; +        } + +        Error = FileManip.Open(fname, WrOnly | CreateAlways | ARW | AWOther | (direct ? DirectAligned : EOpenMode())); +        if (Error) +            return Error; +        Error = Init(TFile(), pagesize, recsig); +        if (Error) { +            FileManip.Close(); +            unlink(fname); +        } +        return Error; +    } + +    int Init(TAutoPtr<IOutputStream> output, size_t pagesize, ui32 recsig) { +        Memo = 0; +        if (FileManip.IsOpen()) { +            return MBDB_ALREADY_INITIALIZED; +        } + +        if (!output) { +            Eof = Error = 0; +            Pagesize = pagesize; +            Recordsig = recsig; +            Memo = 1; +            return 0; +        } + +        Error = FileManip.Open(output); +        if (Error) +            return Error; +        Error = Init(TFile(), pagesize, recsig); +        if (Error) { +            FileManip.Close(); +        } +        return Error; +    } + +    int Init(const TFile& file, size_t pagesize, ui32 recsig) { +        Memo = 0; +        if (!file.IsOpen() && !FileManip.IsOpen()) +            return MBDB_NOT_INITIALIZED; +        if (file.IsOpen() && FileManip.IsOpen()) +            return MBDB_ALREADY_INITIALIZED; +        if (file.IsOpen()) { +            Error = FileManip.Init(file); +            if (Error) +                return Error; +        } + +        Eof = 1; +        TTempBuf buf(METASIZE + FS_BLOCK_SIZE); +        const char* ptr = (buf.Data() + FS_BLOCK_SIZE - ((ui64)buf.Data() & (FS_BLOCK_SIZE - 1))); +        TDatMetaPage* meta = (TDatMetaPage*)ptr; + +        memset(buf.Data(), 0, buf.Size()); +        meta->MetaSig = METASIG; +        meta->PageSize = Pagesize = pagesize; +        meta->RecordSig = Recordsig = recsig; + +        ssize_t size = METASIZE, ret = 0; +        while (size && (ret = FileManip.Write(ptr, (unsigned)size)) > 0) { +            size -= ret; +            ptr += ret; +        } +        if (size || ret <= 0) { +            Term(); +            return Error = errno ? errno : MBDB_WRITE_ERROR; +        } + +        Error = Eof = 0; +        return Error; +    } + +protected: +    int WritePages(iovec* vec, int nvec) { +        if (Error || Memo) +            return Error; + +        ssize_t size, delta; +        iovec* pvec; +        int vsize; + +        for (vsize = 0, pvec = vec; vsize < nvec; vsize++, pvec++) +            for (size = 0; (size_t)size < pvec->iov_len; size += Pagesize) +                ((TDatPage*)((char*)pvec->iov_base + size))->PageSig = PAGESIG; + +        delta = size = 0; +        pvec = vec; +        vsize = nvec; +        while (vsize && (size = Writev(FileManip, pvec, (int)Min(vsize, 16))) > 0) { +            if (delta) { +                size += delta; +                pvec->iov_len += delta; +                pvec->iov_base = (char*)pvec->iov_base - delta; +                delta = 0; +            } +            while (size) { +                if ((size_t)size >= pvec->iov_len) { +                    size -= pvec->iov_len; +                    ++pvec; +                    --vsize; +                } else { +                    delta = size; +                    pvec->iov_len -= size; +                    pvec->iov_base = (char*)pvec->iov_base + size; +                    size = 0; +                } +            } +        } +        if (delta) { +            pvec->iov_len += delta; +            pvec->iov_base = (char*)pvec->iov_base - delta; +        } +        return Error = (!size && !vsize) ? 0 : errno ? errno : MBDB_WRITE_ERROR; +    } + +    i64 Tell() { +        return FileManip.RealSeek(0, SEEK_CUR); +    } + +    int GotoPage(int pageno) { +        if (Error || Memo) +            return Error; +        Eof = 0; +        i64 offset = (i64)pageno * Pagesize + METASIZE; +        if (offset != FileManip.Seek(offset, SEEK_SET)) +            Error = MBDB_BAD_FILE_SIZE; +        return Error; +    } + +    int Term() { +        int ret = FileManip.Close(); +        Eof = 1; +        Memo = 0; +        if (!Error) +            Error = ret; +        return Error; +    } + +    size_t Pagesize; +    int Eof; +    int Error; +    int Memo; +    ui32 Recordsig; + +private: +    TFileManip FileManip; +}; + +using TOutputPageFile = TOutputPageFileImpl<TOutputFileManip>; + +template <class TVal, +          typename TBaseRecIter = TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>>> +class TOutDatFileImpl: public TBaseRecIter { +public: +    typedef TBaseRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TPageIter::TWriter TWriter; + +    int Open(const char* fname, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1, bool direct = false) { +        int ret = TWriter::Init(fname, pagesize, TVal::RecordSig, direct); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Open(const TFile& file, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { +        int ret = TWriter::Init(file, pagesize, TVal::RecordSig); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Open(TAutoPtr<IOutputStream> output, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { +        int ret = TWriter::Init(output, pagesize, TVal::RecordSig); +        return ret ? ret : Open2(pages, pagesOrBytes); +    } + +    int Close() { +        int ret1 = TRecIter::Term(); +        int ret2 = TPageIter::Term(); +        int ret3 = TWriter::Term(); +        return ret1 ? ret1 : ret2 ? ret2 : ret3; +    } + +private: +    int Open2(size_t pages, int pagesOrBytes) { +        int ret = TPageIter::Init(pages, pagesOrBytes); +        if (!ret) +            ret = TRecIter::Init(); +        if (ret) +            Close(); +        return ret; +    } +}; + +template <class TVal> +class TOutIndexFile: public TOutDatFileImpl< +                          TVal, +                          TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> { +    typedef TOutDatFileImpl< +        TVal, +        TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> +        TDatFile; +    typedef TOutIndexFile<TVal> TMyType; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TIndexer TIndexer; + +public: +    TOutIndexFile() { +        TIndexer::SetCallback(this, DispatchCallback); +    } + +    int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { +        int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); +        if (ret) +            return ret; +        if ((ret = TRecIter::GotoPage(1))) { +            TDatFile::Close(); +            return ret; +        } +        Index0.Clear(); +        return ret; +    } + +    int Close() { +        TPageIter::Unfreeze(); +        if (TRecIter::RecNum) { +            TRecIter::Flush(); +            NextPage(TPageIter::Current()); +        } +        int ret = 0; +        if (Index0.Size() && !(ret = TRecIter::GotoPage(0))) { +            const char* ptr = Index0.Begin(); +            size_t recSize; +            while (ptr < Index0.End()) { +                Y_ASSERT((size_t)(Index0.End() - ptr) >= sizeof(size_t)); +                memcpy(&recSize, ptr, sizeof(size_t)); +                ptr += sizeof(size_t); +                Y_ASSERT((size_t)(Index0.End() - ptr) >= recSize); +                ui8* buf = (ui8*)TRecIter::Reserve(recSize); +                if (!buf) { +                    ret = MBDB_PAGE_OVERFLOW; +                    break; +                } +                memcpy(buf, ptr, recSize); +                TRecIter::ResetDat(); +                ptr += recSize; +            } +            Index0.Clear(); +            ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); +        } +        int ret1 = TDatFile::Close(); +        return ret ? ret : ret1; +    } + +protected: +    TBuffer Index0; + +    void NextPage(const TDatPage* page) { +        const TVal* first = (const TVal*)NMicroBDB::GetFirstRecord(page); +        size_t sz; +        if (!TExtInfoType<TVal>::Exists) { +            sz = SizeOf(first); +        } else { +            size_t ll; +            size_t l; +            sz = NMicroBDB::SizeOfExt(first, &ll, &l); +            sz += ll + l; +        } +        Index0.Append((const char*)&sz, sizeof(size_t)); +        Index0.Append((const char*)first, sz); +    } + +    static void DispatchCallback(void* This, const TDatPage* page) { +        ((TMyType*)This)->NextPage(page); +    } +}; + +template <class TVal, class TKey, typename TCompressor = TFakeCompression, class TPageFile = TOutputPageFile> +class TOutDirectFileImpl: public TOutDatFileImpl< +                               TVal, +                               TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> { +    typedef TOutDatFileImpl< +        TVal, +        TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> +        TDatFile; +    typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TMyType; +    typedef typename TDatFile::TRecIter TRecIter; +    typedef typename TRecIter::TPageIter TPageIter; +    typedef typename TRecIter::TIndexer TIndexer; +    typedef TOutIndexFile<TKey> TKeyFile; + +public: +    TOutDirectFileImpl() { +        TIndexer::SetCallback(this, DispatchCallback); +    } + +    int Open(const char* fname, size_t pagesize, int pages = 1, size_t ipagesize = 0, size_t ipages = 1, int pagesOrBytes = 1) { +        char iname[FILENAME_MAX]; +        int ret; +        if (ipagesize == 0) +            ipagesize = pagesize; +        ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); +        ret = ret ? ret : DatNameToIdx(iname, fname); +        ret = ret ? ret : KeyFile.Open(iname, ipagesize, ipages, pagesOrBytes); +        if (ret) +            TDatFile::Close(); +        return ret; +    } + +    int Close() { +        if (TRecIter::RecNum) { +            TRecIter::Flush(); +            NextPage(TPageIter::Current()); +        } +        int ret = KeyFile.Close(); +        int ret1 = TDatFile::Close(); +        return ret1 ? ret1 : ret; +    } + +    int GetError() const { +        return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); +    } + +protected: +    TKeyFile KeyFile; + +    void NextPage(const TDatPage* page) { +        typedef TMakeExtKey<TVal, TKey> TMakeExtKey; + +        TVal* val = (TVal*)NMicroBDB::GetFirstRecord(page); +        TKey key; +        if (!TMakeExtKey::Exists) { +            TMakeExtKey::Make(&key, nullptr, val, nullptr); +            KeyFile.Push(&key); +        } else { +            size_t ll; +            size_t l; +            size_t sz = NMicroBDB::SizeOfExt(val, &ll, &l); +            typename TExtInfoType<TVal>::TResult valExt; +            if (TExtInfoType<TVal>::Exists) +                Y_PROTOBUF_SUPPRESS_NODISCARD valExt.ParseFromArray((ui8*)val + sz + ll, l); +            typename TExtInfoType<TKey>::TResult keyExt; +            TMakeExtKey::Make(&key, &keyExt, val, &valExt); +            KeyFile.Push(&key, &keyExt); +        } +    } + +    static void DispatchCallback(void* This, const TDatPage* page) { +        ((TMyType*)This)->NextPage(page); +    } +}; diff --git a/library/cpp/microbdb/powersorter.h b/library/cpp/microbdb/powersorter.h new file mode 100644 index 00000000000..c40de9c23f0 --- /dev/null +++ b/library/cpp/microbdb/powersorter.h @@ -0,0 +1,667 @@ +#pragma once + +#include "safeopen.h" + +#include <util/generic/vector.h> +#include <util/generic/deque.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> +#include <util/thread/pool.h> + +template < +    class TRecord, +    template <typename T> class TCompare, +    class TSieve, +    class TMemoFile = TOutDatFile<TRecord>> +class TDatSorterBuf { +public: +    typedef TRecord TRec; +    typedef TVector<TRec*> TVectorType; +    typedef TMemoFile TMemo; +    typedef TCompare<TRecord> TComp; + +public: +    TDatSorterBuf(size_t memory, size_t pageSize) +        : Memo("memo", pageSize, memory, 0) +        , Cur() +    { +        Memo.Open(nullptr); +        Memo.Freeze(); +    } + +    ~TDatSorterBuf() { +        Vector.clear(); +        Memo.Close(); +    } + +    const TRec* Push(const TRec* v) { +        const TRec* u = Memo.Push(v); +        if (u) +            Vector.push_back((TRec*)u); +        return u; +    } + +    const TRec* Next() { +        if (Ptr == Vector.end()) { +            if (Cur) +                TSieve::Sieve(Cur, Cur); +            Cur = nullptr; +        } else { +            Cur = *Ptr++; +            if (!TIsSieveFake<TSieve>::Result) +                while (Ptr != Vector.end() && TSieve::Sieve(Cur, *Ptr)) +                    ++Ptr; +        } +        return Cur; +    } + +    const TRec* Current() { +        return Cur; +    } + +    size_t Size() { +        return Vector.size(); +    } + +    void Sort() { +        Ptr = Vector.begin(); +        Cur = nullptr; + +        MBDB_SORT_FUN(Vector.begin(), Vector.end(), TComp()); +    } + +    void Clear() { +        Vector.clear(); +        Memo.Freeze(); +        Ptr = Vector.begin(); +        Cur = nullptr; +    } + +private: +    TVectorType Vector; +    TMemo Memo; + +    typename TVectorType::iterator +        Ptr; +    TRec* Cur; +}; + +template < +    class TRecord, +    class TInput, +    template <typename T> class TCompare, +    class TSieve> +class TDatMerger { +public: +    typedef TRecord TRec; +    typedef TCompare<TRecord> TComp; +    typedef TSimpleSharedPtr<TInput> TInputPtr; +    typedef TVector<TInputPtr> TInputVector; + +public: +    ~TDatMerger() { +        Close(); +    } + +    void Init(const TInputVector& inputs) { +        Inputs = inputs; +        TVector<TInput*> v; +        for (int i = 0; i < Inputs.ysize(); ++i) +            v.push_back(Inputs[i].Get()); +        HeapIter.Init(&v[0], v.size()); +        if (!TIsSieveFake<TSieve>::Result) +            PNext = HeapIter.Next(); +    } + +    const TRec* Next() { +        if (TIsSieveFake<TSieve>::Result) { +            return HeapIter.Next(); +        } + +        if (!PNext) { +            if (PCur) { +                TSieve::Sieve(PCur, PCur); +                PCur = nullptr; +            } +            return nullptr; +        } + +        PCur = &Cur; +        memcpy(PCur, PNext, SizeOf((const TRec*)PNext)); + +        do { +            PNext = HeapIter.Next(); +        } while (PNext && TSieve::Sieve(PCur, PNext)); + +        return PCur; +    } + +    const TRec* Current() { +        return (TIsSieveFake<TSieve>::Result ? HeapIter.Current() : PCur); +    } + +    void Close() { +        Inputs.clear(); +        HeapIter.Term(); +    } + +private: +    TInputVector Inputs; +    THeapIter<TRec, TInput, TComp> HeapIter; +    TRec Cur; +    TRec* PCur = nullptr; +    const TRec* PNext = nullptr; +}; + +class TPortionManager { +public: +    void Open(const char* tempDir) { +        TGuard<TMutex> guard(Mutex); +        TempDir = tempDir; +    } + +    TString Next() { +        TGuard<TMutex> guard(Mutex); +        if (Portions == 0) +            DoOpen(); +        TString fname = GeneratePortionFilename(Portions++); +        return fname; +    } + +    void Close() { +        TGuard<TMutex> guard(Mutex); +        Portions = 0; +    } + +private: +    void DoOpen() { +        if (MakeSorterTempl(PortionFilenameTempl, TempDir.data())) { +            PortionFilenameTempl[0] = 0; +            ythrow yexception() << "portion-manager: bad tempdir \"" << TempDir.data() << "\": " << LastSystemErrorText(); +        } +    } + +    TString GeneratePortionFilename(int i) { +        char str[FILENAME_MAX]; +        snprintf(str, sizeof(str), PortionFilenameTempl, i); +        return TString(str); +    } + +private: +    TMutex Mutex; + +    TString TempDir; +    char PortionFilenameTempl[FILENAME_MAX] = {}; +    int Portions = 0; +}; + +// A merger powered by threads +template < +    class TRecord, +    template <typename T> class TCompare, +    class TSieve, +    class TInput = TInDatFile<TRecord>, +    class TOutput = TOutDatFile<TRecord>> +class TPowerMerger { +public: +    typedef TRecord TRec; +    typedef TDatMerger<TRecord, TInput, TCompare, TSieve> TMerger; +    typedef TSimpleSharedPtr<TMerger> TMergerPtr; +    typedef TPowerMerger<TRecord, TCompare, TSieve, TInput, TOutput> TFileMerger; + +    struct TMergePortionTask: public IObjectInQueue { +        TFileMerger* FileMerger; +        int Begin; +        int End; +        TString OutFname; + +        TMergePortionTask(TFileMerger* fileMerger, int begin, int end, const TString& outFname) +            : FileMerger(fileMerger) +            , Begin(begin) +            , End(end) +            , OutFname(outFname) +        { +        } + +        void Process(void*) override { +            THolder<TMergePortionTask> This(this); +            //fprintf(stderr, "MergePortion: (%i, %i, %s)\n", Begin, End, ~OutFname); +            FileMerger->MergePortion(Begin, End, OutFname); +        } +    }; + +public: +    TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const TSimpleSharedPtr<TPortionManager>& portMan, +                 int memory, int pageSize, bool autoUnlink) +        : MtpQueue(mtpQueue) +        , PortionManager(portMan) +        , Memory(memory) +        , PageSize(pageSize) +        , AutoUnlink(autoUnlink) +    { +    } + +    TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const char* tempDir, +                 int memory, int pageSize, bool autoUnlink) +        : MtpQueue(mtpQueue) +        , PortionManager(new TPortionManager) +        , Memory(memory) +        , PageSize(pageSize) +        , AutoUnlink(autoUnlink) +    { +        PortionManager->Open(tempDir); +    } + +    ~TPowerMerger() { +        Close(); +    } + +    void SetMtpQueue(const TSimpleSharedPtr<TThreadPool>& mtpQueue) { +        MtpQueue = mtpQueue; +    } + +    void MergePortion(int begin, int end, const TString& outFname) { +        TMerger merger; +        InitMerger(merger, begin, end); + +        TOutput out("mergeportion-tmpout", PageSize, BufSize, 0); +        out.Open(outFname.data()); +        const TRec* rec; +        while ((rec = merger.Next())) +            out.Push(rec); +        out.Close(); + +        merger.Close(); + +        { +            TGuard<TMutex> guard(Mutex); +            UnlinkFiles(begin, end); +            Files.push_back(outFname); +            --Tasks; +            TaskFinishedCond.Signal(); +        } +    } + +    void Add(const TString& fname) { +        TGuard<TMutex> guard(Mutex); +        //        fprintf(stderr, "TPowerMerger::Add: %s\n", ~fname); +        Files.push_back(fname); +        if (InitialFilesEnd > 0) +            ythrow yexception() << "TPowerMerger::Add: no more files allowed"; +    } + +    void Merge(int maxPortions) { +        TGuard<TMutex> guard(Mutex); +        InitialFilesEnd = Files.ysize(); +        if (!InitialFilesEnd) +            ythrow yexception() << "TPowerMerger::Merge: no files added"; +        Optimize(maxPortions); +        MergeMT(); +        InitMerger(Merger, CPortions, Files.ysize()); +    } + +    void Close() { +        TGuard<TMutex> guard(Mutex); +        Merger.Close(); +        UnlinkFiles(CPortions, Files.ysize()); +        InitialFilesEnd = CPortions = 0; +        Files.clear(); +    } + +    const TRec* Next() { +        return Merger.Next(); +    } + +    const TRec* Current() { +        return Merger.Current(); +    } + +    int FileCount() const { +        TGuard<TMutex> guard(Mutex); +        return Files.ysize(); +    } + +private: +    void InitMerger(TMerger& merger, int begin, int end) { +        TGuard<TMutex> guard(Mutex); +        TVector<TSimpleSharedPtr<TInput>> inputs; +        for (int i = begin; i < end; ++i) { +            inputs.push_back(new TInput("mergeportion-tmpin", BufSize, 0)); +            inputs.back()->Open(Files[i]); +            //            fprintf(stderr, "InitMerger: %i, %s\n", i, ~Files[i]); +        } +        merger.Init(inputs); +    } + +    void UnlinkFiles(int begin, int end) { +        TGuard<TMutex> guard(Mutex); +        for (int i = begin; i < end; ++i) { +            if (i >= InitialFilesEnd || AutoUnlink) +                unlink(Files[i].c_str()); +        } +    } + +    void Optimize(int maxPortions, size_t maxBufSize = 4u << 20) { +        TGuard<TMutex> guard(Mutex); +        maxPortions = std::min(maxPortions, Memory / PageSize - 1); +        maxBufSize = std::max((size_t)PageSize, maxBufSize); + +        if (maxPortions <= 2) { +            FPortions = MPortions = 2; +            BufSize = PageSize; +            return; +        } + +        int Portions = Files.ysize(); +        if (maxPortions >= Portions) { +            FPortions = MPortions = Portions; +        } else if (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) { +            while (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) +                --maxPortions; +            MPortions = ++maxPortions; +            int total = ((Portions + MPortions - 1) / MPortions) + Portions; +            FPortions = (total % MPortions) ? (total % MPortions) : (int)MPortions; +        } else +            FPortions = MPortions = maxPortions; + +        BufSize = std::min((size_t)(Memory / (MPortions + 1)), maxBufSize); +        //        fprintf(stderr, "Optimize: Portions=%i; MPortions=%i; FPortions=%i; Memory=%i; BufSize=%i\n", +        //                (int)Portions, (int)MPortions, (int)FPortions, (int)Memory, (int)BufSize); +    } + +    void MergeMT() { +        TGuard<TMutex> guard(Mutex); +        do { +            int n; +            while ((n = Files.ysize() - CPortions) > MPortions) { +                int m = std::min((CPortions == 0 ? (int)FPortions : (int)MPortions), n); +                TString fname = PortionManager->Next(); +                if (!MtpQueue->Add(new TMergePortionTask(this, CPortions, CPortions + m, fname))) +                    ythrow yexception() << "TPowerMerger::MergeMT: failed to add task"; +                CPortions += m; +                ++Tasks; +            } +            if (Tasks > 0) +                TaskFinishedCond.Wait(Mutex); +        } while (Tasks > 0); +    } + +private: +    TMutex Mutex; +    TCondVar TaskFinishedCond; + +    TMerger Merger; +    TSimpleSharedPtr<TThreadPool> MtpQueue; +    TSimpleSharedPtr<TPortionManager> PortionManager; +    TVector<TString> Files; +    int Tasks = 0; +    int InitialFilesEnd = 0; +    int CPortions = 0; +    int MPortions = 0; +    int FPortions = 0; +    int Memory = 0; +    int PageSize = 0; +    int BufSize = 0; +    bool AutoUnlink = false; +}; + +// A sorter powered by threads +template < +    class TRecord, +    template <typename T> class TCompare, +    class TSieve = TFakeSieve<TRecord>, +    class TTmpInput = TInDatFile<TRecord>, +    class TTmpOutput = TOutDatFile<TRecord>> +class TPowerSorter { +public: +    typedef TPowerSorter<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TSorter; +    typedef TRecord TRec; +    typedef TTmpOutput TTmpOut; +    typedef TTmpInput TTmpIn; +    typedef TDatSorterBuf<TRecord, TCompare, TSieve> TSorterBuf; +    typedef TCompare<TRecord> TComp; +    typedef TPowerMerger<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TFileMerger; + +    struct TSortPortionTask: public IObjectInQueue { +        TSorter* Sorter; +        TSorterBuf* SorterBuf; +        int Portion; + +        TSortPortionTask(TSorter* sorter, TSorterBuf* sorterBuf, int portion) +            : Sorter(sorter) +            , SorterBuf(sorterBuf) +            , Portion(portion) +        { +        } + +        void Process(void*) override { +            TAutoPtr<TSortPortionTask> This(this); +            //            fprintf(stderr, "SortPortion: %i\n", Portion); +            Sorter->SortPortion(SorterBuf); +        } +    }; + +    class TSorterBufQueue { +    private: +        TMutex Mutex; +        TCondVar Cond; +        TVector<TSimpleSharedPtr<TSorterBuf>> V; +        TDeque<TSorterBuf*> Q; + +        int Memory, PageSize, MaxSorterBufs; + +    public: +        TSorterBufQueue(int memory, int pageSize, int maxSorterBufs) +            : Memory(memory) +            , PageSize(pageSize) +            , MaxSorterBufs(maxSorterBufs) +        { +        } + +        void Push(TSorterBuf* sb) { +            TGuard<TMutex> guard(Mutex); +            sb->Clear(); +            Q.push_back(sb); +            Cond.Signal(); +        } + +        TSorterBuf* Pop() { +            TGuard<TMutex> guard(Mutex); +            if (!Q.size() && V.ysize() < MaxSorterBufs) { +                V.push_back(new TSorterBuf(Memory / MaxSorterBufs, PageSize)); +                return V.back().Get(); +            } else { +                while (!Q.size()) +                    Cond.Wait(Mutex); +                TSorterBuf* t = Q.front(); +                Q.pop_front(); +                return t; +            } +        } + +        void Clear() { +            TGuard<TMutex> guard(Mutex); +            Q.clear(); +            V.clear(); +        } + +        void WaitAll() { +            TGuard<TMutex> guard(Mutex); +            while (Q.size() < V.size()) { +                Cond.Wait(Mutex); +            } +        } + +        int GetMaxSorterBufs() const { +            return MaxSorterBufs; +        } +    }; + +public: +    TPowerSorter(const TSimpleSharedPtr<TThreadPool>& mtpQueue, size_t maxSorterBufs, +                 const char* name, size_t memory, size_t pageSize, size_t bufSize) +        : MaxSorterBufs(maxSorterBufs) +        , Name(name) +        , Memory(memory) +        , PageSize(pageSize) +        , BufSize(bufSize) +        , MtpQueue(mtpQueue) +        , PortionManager(new TPortionManager) +        , SBQueue(Memory, PageSize, MaxSorterBufs) +        , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) +    { +    } + +    TPowerSorter(size_t maxSorterBufs, +                 const char* name, size_t memory, size_t pageSize, size_t bufSize) +        : MaxSorterBufs(maxSorterBufs) +        , Name(name) +        , Memory(memory) +        , PageSize(pageSize) +        , BufSize(bufSize) +        , PortionManager(new TPortionManager) +        , SBQueue(Memory, PageSize, maxSorterBufs) +        , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) +    { +    } + +    TPowerSorter(const char* name, size_t memory, size_t pageSize, size_t bufSize) +        : MaxSorterBufs(5) +        , Name(name) +        , Memory(memory) +        , PageSize(pageSize) +        , BufSize(bufSize) +        , PortionManager(new TPortionManager) +        , SBQueue(Memory, PageSize, MaxSorterBufs) +        , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) +    { +    } + +    ~TPowerSorter() { +        Close(); +    } + +    void Open(const char* tempDir) { +        Close(); +        CurSB = SBQueue.Pop(); +        PortionManager->Open(tempDir); +    } + +    void Reopen(const char* fname) { +        Open(fname); +    } + +    void Close() { +        CurSB = nullptr; +        SBQueue.Clear(); +        PortionCount = 0; +        FileMerger.Close(); +        PortionManager->Close(); +    } + +    const TRec* Push(const TRec* v) { +        CheckOpen("Push"); +        const TRec* u = CurSB->Push(v); +        if (!u) { +            NextPortion(); +            u = CurSB->Push(v); +        } +        return u; +    } + +    void Sort(int maxPortions = 1000) { +        CheckOpen("Sort"); +        if (!PortionCount) { +            CurSB->Sort(); +        } else { +            NextPortion(); +            SBQueue.Push(CurSB); +            CurSB = nullptr; +            SBQueue.WaitAll(); +            SBQueue.Clear(); +            FileMerger.Merge(maxPortions); +        } +    } + +    const TRec* Next() { +        return PortionCount ? FileMerger.Next() : CurSB->Next(); +    } + +    const TRec* Current() { +        return PortionCount ? FileMerger.Current() : CurSB->Current(); +    } + +    int GetBufSize() const { +        return BufSize; +    } + +    int GetPageSize() const { +        return PageSize; +    } + +    const char* GetName() const { +        return Name.data(); +    } + +private: +    void CheckOpen(const char* m) { +        if (!CurSB) +            ythrow yexception() << "TPowerSorter::" << m << ": the sorter is not open"; +    } + +    void NextPortion() { +        if (!CurSB->Size()) +            return; +        ++PortionCount; +        if (MaxSorterBufs <= 1) { +            SortPortion(CurSB); +        } else { +            if (!MtpQueue.Get()) { +                MtpQueue.Reset(new TThreadPool); +                MtpQueue->Start(MaxSorterBufs - 1); +                FileMerger.SetMtpQueue(MtpQueue); +            } +            if (!MtpQueue->Add(new TSortPortionTask(this, CurSB, PortionCount))) +                ythrow yexception() << "TPowerSorter::NextPortion: failed to add task"; +        } +        CurSB = SBQueue.Pop(); +    } + +    void SortPortion(TSorterBuf* sorterBuf) { +        TString portionFilename = PortionManager->Next(); +        try { +            sorterBuf->Sort(); + +            //            fprintf(stderr, "TPowerSorter::SortPortion: -> %s\n", ~portionFilename); +            TTmpOut out("powersorter-portion", PageSize, BufSize, 0); +            out.Open(portionFilename.data()); + +            while (sorterBuf->Next()) +                out.Push(sorterBuf->Current()); + +            out.Close(); +            FileMerger.Add(portionFilename); +            SBQueue.Push(sorterBuf); +        } catch (const yexception& e) { +            unlink(portionFilename.data()); +            ythrow yexception() << "SortPortion: " << e.what(); +        } +    } + +private: +    int MaxSorterBufs = 0; +    TString Name; +    int Memory = 0; +    int PageSize = 0; +    int BufSize = 0; + +    TMutex Mutex; +    TSimpleSharedPtr<TThreadPool> MtpQueue; +    TSimpleSharedPtr<TPortionManager> PortionManager; + +    TSorterBufQueue SBQueue; +    TSorterBuf* CurSB = nullptr; +    int PortionCount = 0; + +    TFileMerger FileMerger; +}; diff --git a/library/cpp/microbdb/reader.h b/library/cpp/microbdb/reader.h new file mode 100644 index 00000000000..694a2f17662 --- /dev/null +++ b/library/cpp/microbdb/reader.h @@ -0,0 +1,354 @@ +#pragma once + +#include "align.h" +#include "header.h" +#include "extinfo.h" + +#include <contrib/libs/zlib/zlib.h> +#include <contrib/libs/fastlz/fastlz.h> +#include <contrib/libs/snappy/snappy.h> + +#include <util/generic/vector.h> +#include <util/memory/tempbuf.h> + +namespace NMicroBDB { +    static const size_t DEFAULT_BUFFER_SIZE = (64 << 10); + +    //! +    template <class TVal> +    class IBasePageReader { +    public: +        virtual size_t GetRecSize() const = 0; +        virtual size_t GetExtSize() const = 0; +        virtual bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const = 0; +        virtual const ui8* GetExtInfoRaw(size_t* len) const = 0; +        virtual const TVal* Next() = 0; +        virtual void Reset() = 0; +        //! set clearing flag, so temporary buffers will be cleared +        //! in next call of Next() +        virtual void SetClearFlag() { +        } + +        virtual ~IBasePageReader() { +        } +    }; + +    template <class TVal, typename TPageIter> +    class TRawPageReader: public IBasePageReader<TVal> { +    public: +        TRawPageReader(TPageIter* const iter) +            : PageIter(iter) +        { +            Reset(); +        } + +        bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { +            Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); +            if (!Rec) +                return false; +            ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; +            return extInfo->ParseFromArray(raw, ExtSize); +        } + +        size_t GetRecSize() const override { +            return RecSize + ExtLenSize; +        } + +        size_t GetExtSize() const override { +            return ExtSize; +        } + +        const ui8* GetExtInfoRaw(size_t* len) const override { +            Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); +            if (!Rec) { +                *len = 0; +                return nullptr; +            } +            *len = ExtLenSize + ExtSize; +            return (ui8*)Rec + RecSize; +        } + +        const TVal* Next() override { +            if (!Rec) +                Rec = (TVal*)((char*)PageIter->Current() + sizeof(TDatPage)); +            else +                Rec = (TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); +            if (!TExtInfoType<TVal>::Exists) +                RecSize = SizeOf(Rec); +            else +                RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); +            return Rec; +        } + +        void Reset() override { +            Rec = nullptr; +            RecSize = 0; +            ExtLenSize = 0; +            ExtSize = 0; +        } + +    private: +        const TVal* Rec; +        size_t RecSize; +        size_t ExtLenSize; +        size_t ExtSize; +        TPageIter* const PageIter; +    }; + +    template <class TVal, typename TPageIter> +    class TCompressedReader: public IBasePageReader<TVal> { +        inline size_t GetFirstRecordSize(const TVal* const in) const { +            if (!TExtInfoType<TVal>::Exists) { +                return DatCeil(SizeOf(in)); +            } else { +                size_t ll; +                size_t l; +                size_t ret = SizeOfExt(in, &ll, &l); + +                return DatCeil(ret + ll + l); +            } +        } + +        void DecompressBlock() { +            if (PageIter->IsFrozen() && Buffer.Get()) +                Blocks.push_back(Buffer.Release()); + +            const TCompressedHeader* hdr = (const TCompressedHeader*)(Page); + +            Page += sizeof(TCompressedHeader); + +            const size_t first = GetFirstRecordSize((const TVal*)Page); + +            if (!Buffer.Get() || Buffer->Size() < hdr->Original) +                Buffer.Reset(new TTempBuf(Max<size_t>(hdr->Original, DEFAULT_BUFFER_SIZE))); + +            memcpy(Buffer->Data(), Page, first); +            Page += first; + +            if (hdr->Count > 1) { +                switch (Algo) { +                    case MBDB_COMPRESSION_ZLIB: { +                        uLongf dst = hdr->Original - first; + +                        int ret = uncompress((Bytef*)Buffer->Data() + first, &dst, Page, hdr->Compressed); + +                        if (ret != Z_OK) +                            ythrow yexception() << "error then uncompress " << ret; +                    } break; +                    case MBDB_COMPRESSION_FASTLZ: { +                        int dst = hdr->Original - first; +                        int ret = yfastlz_decompress(Page, hdr->Compressed, Buffer->Data() + first, dst); + +                        if (!ret) +                            ythrow yexception() << "error then uncompress"; +                    } break; +                    case MBDB_COMPRESSION_SNAPPY: { +                        if (!snappy::RawUncompress((const char*)Page, hdr->Compressed, Buffer->Data() + first)) +                            ythrow yexception() << "error then uncompress"; +                    } break; +                } +            } + +            Rec = nullptr; +            RecNum = hdr->Count; +            Page += hdr->Compressed; +        } + +        void ClearBuffer() { +            for (size_t i = 0; i < Blocks.size(); ++i) +                delete Blocks[i]; +            Blocks.clear(); +            ClearFlag = false; +        } + +    public: +        TCompressedReader(TPageIter* const iter) +            : Rec(nullptr) +            , RecSize(0) +            , ExtLenSize(0) +            , ExtSize(0) +            , Page(nullptr) +            , PageIter(iter) +            , RecNum(0) +            , BlockNum(0) +            , ClearFlag(false) +        { +        } + +        ~TCompressedReader() override { +            ClearBuffer(); +        } + +        size_t GetRecSize() const override { +            return RecSize + ExtLenSize; +        } + +        size_t GetExtSize() const override { +            return ExtSize; +        } + +        bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { +            Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); +            if (!Rec) +                return false; +            ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; +            return extInfo->ParseFromArray(raw, ExtSize); +        } + +        const ui8* GetExtInfoRaw(size_t* len) const override { +            Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); +            if (!Rec) { +                *len = 0; +                return nullptr; +            } +            *len = ExtLenSize + ExtSize; +            return (ui8*)Rec + RecSize; +        } + +        const TVal* Next() override { +            Y_ASSERT(RecNum >= 0); + +            if (ClearFlag) +                ClearBuffer(); + +            if (!Page) { +                if (!PageIter->Current()) +                    return nullptr; + +                Page = (ui8*)PageIter->Current() + sizeof(TDatPage); + +                BlockNum = ((TCompressedPage*)Page)->BlockCount - 1; +                Algo = (ECompressionAlgorithm)((TCompressedPage*)Page)->Algorithm; +                Page += sizeof(TCompressedPage); + +                DecompressBlock(); +            } + +            if (!RecNum) { +                if (BlockNum <= 0) +                    return nullptr; +                else { +                    --BlockNum; +                    DecompressBlock(); +                } +            } + +            --RecNum; +            if (!Rec) +                Rec = (const TVal*)Buffer->Data(); +            else +                Rec = (const TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); + +            if (!TExtInfoType<TVal>::Exists) +                RecSize = SizeOf(Rec); +            else +                RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); + +            return Rec; +        } + +        void Reset() override { +            Page = nullptr; +            BlockNum = 0; +            Rec = nullptr; +            RecSize = 0; +            ExtLenSize = 0; +            ExtSize = 0; +            RecNum = 0; +        } + +        void SetClearFlag() override { +            ClearFlag = true; +        } + +    public: +        THolder<TTempBuf> Buffer; +        TVector<TTempBuf*> Blocks; +        const TVal* Rec; +        size_t RecSize; +        size_t ExtLenSize; +        size_t ExtSize; +        const ui8* Page; +        TPageIter* const PageIter; +        int RecNum; //!< count of recs in current block +        int BlockNum; +        ECompressionAlgorithm Algo; +        bool ClearFlag; +    }; + +    class TZLibCompressionImpl { +    public: +        static const ECompressionAlgorithm Code = MBDB_COMPRESSION_ZLIB; + +        inline void Init() { +            // - +        } + +        inline void Term() { +            // - +        } + +        inline size_t CompressBound(size_t size) const noexcept { +            return ::compressBound(size); +        } + +        inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { +            uLongf size = outSize; + +            if (compress((Bytef*)out, &size, (const Bytef*)in, inSize) != Z_OK) +                ythrow yexception() << "not compressed"; +            outSize = size; +        } +    }; + +    class TFastlzCompressionImpl { +    public: +        static const ECompressionAlgorithm Code = MBDB_COMPRESSION_FASTLZ; + +        inline void Init() { +            // - +        } + +        inline void Term() { +            // - +        } + +        inline size_t CompressBound(size_t size) const noexcept { +            size_t rval = size_t(size * 1.07); +            return rval < 66 ? 66 : rval; +        } + +        inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { +            outSize = yfastlz_compress_level(2, in, inSize, out); +            if (!outSize) +                ythrow yexception() << "not compressed"; +        } +    }; + +    class TSnappyCompressionImpl { +    public: +        static const ECompressionAlgorithm Code = MBDB_COMPRESSION_SNAPPY; + +        inline void Init() { +            // - +        } + +        inline void Term() { +            // - +        } + +        inline size_t CompressBound(size_t size) const noexcept { +            return snappy::MaxCompressedLength(size); +        } + +        inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { +            snappy::RawCompress((const char*)in, inSize, (char*)out, &outSize); +        } +    }; + +} + +using TFakeCompression = void; +using TZLibCompression = NMicroBDB::TZLibCompressionImpl; +using TFastlzCompression = NMicroBDB::TFastlzCompressionImpl; +using TSnappyCompression = NMicroBDB::TSnappyCompressionImpl; diff --git a/library/cpp/microbdb/safeopen.h b/library/cpp/microbdb/safeopen.h new file mode 100644 index 00000000000..c328ffd575a --- /dev/null +++ b/library/cpp/microbdb/safeopen.h @@ -0,0 +1,792 @@ +#pragma once + +// util +#include <util/generic/yexception.h> +#include <util/generic/vector.h> +#include <util/string/util.h> +#include <util/system/mutex.h> +#include <thread> + +#include "microbdb.h" + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4706) /*assignment within conditional expression*/ +#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ +#endif + +template <typename TVal, typename TPageFile = TInputPageFile, typename TIterator = TInputPageIterator<TPageFile>> +class TInDatFile: protected  TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> { +public: +    typedef TVal TRec; +    typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> TBase; + +    TInDatFile(const TString& name, size_t pages, int pagesOrBytes = 1) +        : Name(name) +        , Pages(pages) +        , PagesOrBytes(pagesOrBytes) +    { +    } + +    ~TInDatFile() { +        Close(); +    } + +    void Open(const TString& fname, bool direct = false) { +        ui32 gotRecordSig = 0; +        int ret = TBase::Open(fname.data(), Pages, PagesOrBytes, &gotRecordSig, direct); +        if (ret) { +            // XXX: print record type name, not type sig +            ythrow yexception() << ErrorMessage(ret, "Failed to open input file", fname, TVal::RecordSig, gotRecordSig); +        } +        Name = fname; +    } + +    void OpenStream(TAutoPtr<IInputStream> input) { +        ui32 gotRecordSig = 0; +        int ret = TBase::Open(input, Pages, PagesOrBytes, &gotRecordSig); +        if (ret) { +            // XXX: print record type name, not type sig +            ythrow yexception() << ErrorMessage(ret, "Failed to open input file", Name, TVal::RecordSig, gotRecordSig); +        } +    } + +    void Close() { +        int ret; +        if (IsOpen() && (ret = TBase::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing input file", Name); +        if ((ret = TBase::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing input file", Name); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +    using TBase::Current; +    using TBase::Freeze; +    using TBase::GetError; +    using TBase::GetExtInfo; +    using TBase::GetExtInfoRaw; +    using TBase::GetExtSize; +    using TBase::GetLastPage; +    using TBase::GetPageNum; +    using TBase::GetPageSize; +    using TBase::GetRecSize; +    using TBase::GotoLastPage; +    using TBase::GotoPage; +    using TBase::IsEof; +    using TBase::IsOpen; +    using TBase::Next; +    using TBase::Skip; +    using TBase::Unfreeze; + +protected: +    TString Name; +    size_t Pages; +    int PagesOrBytes; +}; + +template <typename TVal> +class TMappedInDatFile: protected  TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> { +public: +    typedef TVal TRec; +    typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> TBase; + +    TMappedInDatFile(const TString& name, size_t /* pages */, int /* pagesOrBytes */) +        : Name(name) +    { +    } + +    ~TMappedInDatFile() { +        Close(); +    } + +    void Open(const TString& fname) { +        int ret = TBase::Open(fname.data()); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, "Failed to open mapped file", fname, TVal::RecordSig); +        Name = fname; +    } + +    void Close() { +        int ret; +        if (IsOpen() && (ret = TBase::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing mapped file", Name); +        if ((ret = TBase::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing mapped file", Name); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +    using TBase::Current; +    using TBase::GetError; +    using TBase::GetExtInfo; +    using TBase::GetExtInfoRaw; +    using TBase::GetLastPage; +    using TBase::GetPageNum; +    using TBase::GetPageSize; +    using TBase::GotoLastPage; +    using TBase::GotoPage; +    using TBase::IsEof; +    using TBase::IsOpen; +    using TBase::Next; +    using TBase::Skip; + +protected: +    TString Name; +}; + +template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDatFile: protected  TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> { +public: +    typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> TBase; + +    TOutDatFile(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : Name(name) +        , PageSize(pagesize) +        , Pages(pages) +        , PagesOrBytes(pagesOrBytes) +    { +    } + +    ~TOutDatFile() { +        Close(); +    } + +    void Open(const char* fname, bool direct = false) { +        int ret = TBase::Open(fname, PageSize, Pages, PagesOrBytes, direct); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); +        Name = fname; +    } + +    void Open(const TString& fname) { +        Open(fname.data()); +    } + +    void OpenStream(TAutoPtr<IOutputStream> output) { +        int ret = TBase::Open(output, PageSize, Pages, PagesOrBytes); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, "Failed to open output stream", Name); +    } + +    void Close() { +        int ret; +        if ((ret = TBase::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); +        if ((ret = TBase::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +    using TBase::Freeze; +    using TBase::GetError; +    using TBase::GetPageSize; +    using TBase::IsEof; +    using TBase::IsOpen; +    using TBase::Offset; +    using TBase::Push; +    using TBase::PushWithExtInfo; +    using TBase::Reserve; +    using TBase::Unfreeze; + +protected: +    TString Name; +    size_t PageSize, Pages; +    int PagesOrBytes; +}; + +template <typename TVal, typename TCompressor, typename TPageFile> +class TOutDatFileArray; + +template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDatFileArray { +    typedef TOutDatFile<TVal, TCompressor, TPageFile> TFileType; + +public: +    TOutDatFileArray(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : Name(name) +        , PageSize(pagesize) +        , Pages(pages) +        , PagesOrBytes(pagesOrBytes) +        , NumFiles(0) +        , Files(nullptr) +    { +    } + +    ~TOutDatFileArray() { +        for (int i = 0; i < NumFiles; ++i) { +            Files[i].Close(); +            Files[i].~TFileType(); +        } +        free(Files); +        Files = nullptr; +        NumFiles = 0; +    } + +    TFileType& operator[](size_t pos) { +        return Files[pos]; +    } + +    void Open(int n, const TString& fname) { +        char temp[FILENAME_MAX]; + +        Name = fname; +        NumFiles = CreateDatObjects(n, fname); + +        int i; +        try { +            for (i = 0; i < NumFiles; ++i) { +                sprintf(temp, fname.data(), i); +                Files[i].Open(temp); +            } +        } catch (...) { +            while (--i >= 0) +                Files[i].Close(); +            throw; +        } +    } + +    template <typename TNameBuilder> +    void OpenWithCallback(int n, const TNameBuilder& builder) { +        NumFiles = CreateDatObjects(n, Name); + +        for (int i = 0; i < NumFiles; ++i) +            Files[i].Open(builder.GetName(i).data()); +    } + +    void Close() { +        for (int i = 0; i < NumFiles; ++i) +            Files[i].Close(); +    } + +    void CloseMT(ui32 threads) { +        int current = 0; +        TMutex mutex; +        TVector<std::thread> thrs; +        thrs.reserve(threads); +        for (ui32 i = 0; i < threads; i++) { +            thrs.emplace_back([this, ¤t, &mutex]() { +                while (true) { +                    mutex.Acquire(); +                    int cur = current++; +                    mutex.Release(); +                    if (cur >= NumFiles) +                        break; +                    Files[cur].Close(); +                } +            }); +        } +        for (auto& thread : thrs) { +            thread.join(); +        } +    } + +    const char* GetName() const { +        return Name.data(); +    } + +protected: +    int CreateDatObjects(int n, const TString& fname) { +        if (!(Files = (TFileType*)malloc(n * sizeof(TFileType)))) +            ythrow yexception() << "can't alloc \"" << fname << "\" file array: " << LastSystemErrorText(); +        int num = 0; +        char temp[FILENAME_MAX]; +        for (int i = 0; i < n; ++i, ++num) { +            sprintf(temp, "%s[%d]", fname.data(), i); +            new (Files + i) TFileType(temp, PageSize, Pages, PagesOrBytes); +        } +        return num; +    } + +    TString Name; +    size_t PageSize, Pages; +    int PagesOrBytes, NumFiles; +    TFileType* Files; +}; + +template <typename TVal, typename TKey, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDirectFile: protected  TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> { +    typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TBase; + +public: +    TOutDirectFile(const TString& name, size_t pagesize, size_t pages, size_t ipagesize, size_t ipages, int pagesOrBytes) +        : Name(name) +        , PageSize(pagesize) +        , Pages(pages) +        , IdxPageSize(ipagesize) +        , IdxPages(ipages) +        , PagesOrBytes(pagesOrBytes) +    { +    } + +    ~TOutDirectFile() { +        Close(); +    } + +    void Open(const TString& fname) { +        int ret = TBase::Open(fname.data(), PageSize, Pages, IdxPageSize, IdxPages, PagesOrBytes); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); +        Name = fname; +    } + +    void Close() { +        int ret; +        if ((ret = TBase::GetError())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); +        if ((ret = TBase::Close())) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +    using TBase::Freeze; +    using TBase::Push; +    using TBase::PushWithExtInfo; +    using TBase::Reserve; +    using TBase::Unfreeze; + +protected: +    TString Name; +    size_t PageSize, Pages, IdxPageSize, IdxPages; +    int PagesOrBytes; +}; + +template < +    typename TVal, +    template <typename T> class TComparer, +    typename TCompress = TFakeCompression, +    typename TSieve = TFakeSieve<TVal>, +    typename TPageFile = TOutputPageFile, +    typename TFileTypes = TDefInterFileTypes> +class TDatSorter: protected  TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> { +    typedef TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> TBase; + +public: +    typedef TVal TRec; + +public: +    TDatSorter(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : Name(name) +        , Memory(memory) +        , PageSize(pagesize) +        , Pages(pages) +        , PagesOrBytes(pagesOrBytes) +    { +        Templ[0] = 0; +    } + +    ~TDatSorter() { +        Close(); +        Templ[0] = 0; +    } + +    void Open(const TString& dirName) { +        int ret; +        if (ret = MakeSorterTempl(Templ, dirName.data())) { +            Templ[0] = 0; +            ythrow yexception() << ErrorMessage(ret, Name + " sorter: bad tempdir", dirName); +        } +        if ((ret = TBase::Open(Templ, PageSize, Pages, PagesOrBytes))) +            ythrow yexception() << ErrorMessage(ret, Name + " sorter: open error, temp dir", Templ); +    } + +    void Sort(bool direct = false) { +        int ret = TBase::Sort(Memory, 1000, direct); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, Name + " sorter: sort error, temp dir", Templ, TVal::RecordSig); +    } + +    void SortToFile(const TString& name) { +        int ret = TBase::SortToFile(name.data(), Memory); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToFile", name, TVal::RecordSig); +    } + +    void SortToStream(TAutoPtr<IOutputStream> output) { +        int ret = TBase::SortToStream(output, Memory); +        if (ret) +            ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToStream", "", TVal::RecordSig); +    } + +    void Close() { +        int ret1 = TBase::GetError(); +        int ret2 = TBase::Close(); +        if (Templ[0]) { +            *strrchr(Templ, GetDirectorySeparator()) = 0; +            RemoveDirWithContents(Templ); +            Templ[0] = 0; +        } +        if (ret1) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret1, Name + "sorter: error before closing"); +        if (ret2) +            if (!std::uncaught_exception()) +                ythrow yexception() << ErrorMessage(ret2, Name + "sorter: error while closing"); +    } + +    int Sort(size_t memory, int maxportions, bool direct = false) { +        return TBase::Sort(memory, maxportions, direct); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +    using TBase::GetPageSize; +    using TBase::GetPages; +    using TBase::Next; +    using TBase::NextPortion; +    using TBase::Push; +    using TBase::PushWithExtInfo; +    using TBase::UseSegmentSorter; + +protected: +    TString Name; +    size_t Memory, PageSize, Pages; +    int PagesOrBytes; +    char Templ[FILENAME_MAX]; +}; + +template <typename TSorter> +class TSorterArray { +public: +    typedef TSorter TDatSorter; + +public: +    TSorterArray(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : Name(name) +        , Memory(memory) +        , PageSize(pagesize) +        , Pages(pages) +        , PagesOrBytes(pagesOrBytes) +        , NumSorters(0) +        , Sorters(nullptr) +    { +    } + +    ~TSorterArray() { +        for (int i = 0; i < NumSorters; ++i) { +            Sorters[i].Close(); +            Sorters[i].~TSorter(); +        } +        free(Sorters); +        Sorters = nullptr; +        NumSorters = 0; +    } + +    TSorter& operator[](size_t pos) { +        return Sorters[pos]; +    } + +    void Open(int n, const TString& fname, size_t memory = 0) { +        if (!(Sorters = (TSorter*)malloc(n * sizeof(TSorter)))) +            ythrow yexception() << "can't alloc \"" << fname << "\" sorter array: " << LastSystemErrorText(); +        NumSorters = n; +        char temp[FILENAME_MAX]; +        if (memory) +            Memory = memory; +        for (int i = 0; i < NumSorters; ++i) { +            sprintf(temp, "%s[%d]", Name.data(), i); +            new (Sorters + i) TSorter(temp, Memory, PageSize, Pages, PagesOrBytes); +        } +        for (int i = 0; i < NumSorters; ++i) +            Sorters[i].Open(fname); +    } + +    void Close() { +        for (int i = 0; i < NumSorters; ++i) +            Sorters[i].Close(); +    } + +    const char* GetName() const { +        return Name.data(); +    } + +protected: +    TString Name; +    size_t Memory, PageSize, Pages; +    int PagesOrBytes, NumSorters; +    TSorter* Sorters; +}; + +template <typename TVal, template <typename T> class TCompare, typename TSieve = TFakeSieve<TVal>> +class TDatSorterArray: public TSorterArray<TDatSorter<TVal, TCompare, TSieve>> { +public: +    TDatSorterArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : TSorterArray<TDatSorter<TVal, TCompare, TSieve>>(name, memory, pagesize, pages, pagesOrBytes) +    { +    } +}; + +template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, +          typename TSieve = TFakeSieve<TVal>, typename TPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterMemo: public TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> { +    typedef TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> TSorter; + +public: +    TOutDatFile<TVal> Memo; +    TString Home; +    bool OpenReq; +    bool Opened; +    bool UseDirectWrite; + +public: +    TDatSorterMemo(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : TSorter(name, memory, pagesize, pages, pagesOrBytes) +        , Memo(name, pagesize, memory, 0) +    { +        OpenReq = false; +        Opened = false; +        UseDirectWrite = false; +    } + +    void Open(const TString& home) { +        OpenReq = true; +        //        TSorter::Open(home); +        Home = home; +        Memo.Open(nullptr); +        Memo.Freeze(); +    } + +    void Reopen(const char* home) { +        Close(); +        Open(home); +    } + +    void Open() { +        if (!OpenReq) { +            OpenReq = true; +            Memo.Open(nullptr); +            Memo.Freeze(); +        } +    } + +    void OpenIfNeeded() { +        if (OpenReq && !Opened) { +            if (!Home) +                ythrow yexception() << "Temp directory not specified, call Open(char*) first : " << TSorter::Name; +            TSorter::Open(Home); +            Opened = true; +        } +    } + +    TVal* Reserve(size_t len) { +        if (TExtInfoType<TVal>::Exists) +            return ReserveWithExt(len, 0); + +        TVal* u = Memo.Reserve(len); +        if (!u) { +            OpenIfNeeded(); +            TSorter::NextPortion(UseDirectWrite); +            Memo.Freeze(); +            u = Memo.Reserve(len); +        } +        TSorter::PushWithExtInfo(u); +        return u; +    } + +    TVal* ReserveWithExt(size_t len, size_t extSize) { +        size_t fullLen = len + len_long((i64)extSize) + extSize; +        TVal* u = Memo.Reserve(fullLen); +        if (!u) { +            OpenIfNeeded(); +            TSorter::NextPortion(UseDirectWrite); +            Memo.Freeze(); +            u = Memo.Reserve(fullLen); +            if (!u) { +                if (fullLen > Memo.GetPageSize()) { +                    ythrow yexception() << "Size of element and " << len << " size of extInfo " << extSize +                                        << " is larger than page size " << Memo.GetPageSize(); +                } +                ythrow yexception() << "going to insert a null pointer. Bad."; +            } +        } +        out_long((i64)extSize, (char*)u + len); +        TSorter::PushWithExtInfo(u); +        return u; +    } + +    char* GetReservedExt(TVal* rec, size_t len, size_t extSize) { +        return (char*)rec + len + len_long((i64)extSize); +    } + +    const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { +        const TVal* u = Memo.Push(v, extInfo); +        if (!u) { +            OpenIfNeeded(); +            TSorter::NextPortion(UseDirectWrite); +            Memo.Freeze(); +            u = Memo.Push(v, extInfo); +            if (!u) { +                if (SizeOf(v) > Memo.GetPageSize()) { +                    ythrow yexception() << "Size of element " << SizeOf(v) +                                        << " is larger than page size " << Memo.GetPageSize(); +                } +                ythrow yexception() << "going to insert a null pointer. Bad."; +            } +        } +        TSorter::PushWithExtInfo(u); +        return u; +    } + +    const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { +        const TVal* u = Memo.Push(v, extInfoRaw, extLen); +        if (!u) { +            OpenIfNeeded(); +            TSorter::NextPortion(UseDirectWrite); +            Memo.Freeze(); +            u = Memo.Push(v, extInfoRaw, extLen); +            if (!u) { +                if (SizeOf(v) > Memo.GetPageSize()) { +                    ythrow yexception() << "Size of element " << SizeOf(v) +                                        << " is larger than page size " << Memo.GetPageSize(); +                } +                ythrow yexception() << "going to insert a null pointer. Bad.."; +            } +        } +        TSorter::PushWithExtInfo(u); +        return u; +    } + +    const TVal* PushWithExtInfo(const TVal* v) { +        const TVal* u = Memo.PushWithExtInfo(v); +        if (!u) { +            OpenIfNeeded(); +            TSorter::NextPortion(UseDirectWrite); +            Memo.Freeze(); +            u = Memo.PushWithExtInfo(v); +            if (!u) { +                if (SizeOf(v) > Memo.GetPageSize()) { +                    ythrow yexception() << "Size of element " << SizeOf(v) +                                        << " is larger than page size " << Memo.GetPageSize(); +                } +                ythrow yexception() << "going to insert a null pointer. Bad..."; +            } +        } +        TSorter::PushWithExtInfo(u); +        return u; +    } + +    void Sort(bool direct = false) { +        if (Opened) { +            TSorter::NextPortion(UseDirectWrite); +            Memo.Close(); +            OpenReq = false; +            TSorter::Sort(direct); +        } else { +            TSorter::SortPortion(); +        } +    } + +    const TVal* Next() { +        return Opened ? TSorter::Next() : TSorter::Nextp(); +    } + +    bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { +        return NMicroBDB::GetExtInfo(Current(), extInfo); +    } + +    const ui8* GetExtInfoRaw(size_t* len) const { +        return NMicroBDB::GetExtInfoRaw(Current(), len); +    } + +    const TVal* Current() const { +        return Opened ? TSorter::Current() : TSorter::Currentp(); +    } + +    int NextPortion() { +        OpenIfNeeded(); +        return TSorter::NextPortion(UseDirectWrite); +    } + +    void SortToFile(const char* name) { +        OpenIfNeeded(); +        TSorter::NextPortion(UseDirectWrite); +        Memo.Close(); +        OpenReq = false; +        TSorter::SortToFile(name); +    } + +    void SortToStream(TAutoPtr<IOutputStream> output) { +        OpenIfNeeded(); +        TSorter::NextPortion(UseDirectWrite); +        Memo.Close(); +        OpenReq = false; +        TSorter::SortToStream(output); +    } + +    template <typename TKey, typename TOutCompress> +    void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { +        Sort(); +        TOutDirectFile<TVal, TKey, TOutCompress> out(TSorter::Name, TSorter::PageSize, TSorter::Pages, ipagesize, ipages, TSorter::PagesOrBytes); +        out.Open(name); +        while (const TVal* rec = Next()) +            out.PushWithExtInfo(rec); +        out.Close(); +    } + +    template <typename TKey> +    void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { +        SortToDirectFile<TKey, TCompress>(name, ipagesize, ipages); +    } + +    void CloseSorter() { +        if (Opened) +            TSorter::Close(); +        else +            TSorter::Closep(); +        Memo.Freeze(); +        Opened = false; +    } + +    void Close() { +        if (Opened) +            TSorter::Close(); +        else +            TSorter::Closep(); +        Memo.Close(); +        OpenReq = false; +        Opened = false; +    } + +    int SavePortions(const char* mask) { +        return TSorter::SavePortions(mask, UseDirectWrite); +    } + +public: +    using TSorter::RestorePortions; +}; + +template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, +          typename TSieve = TFakeSieve<TVal>, class TPageFile = TOutputPageFile, class TFileTypes = TDefInterFileTypes> +class TDatSorterMemoArray: public TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> { +public: +    typedef TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> TBase; + +    TDatSorterMemoArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) +        : TBase(name, memory, pagesize, pages, pagesOrBytes) +    { +    } +}; + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif diff --git a/library/cpp/microbdb/sorter.h b/library/cpp/microbdb/sorter.h new file mode 100644 index 00000000000..b2e7390377d --- /dev/null +++ b/library/cpp/microbdb/sorter.h @@ -0,0 +1,677 @@ +#pragma once + +#include <util/ysaveload.h> +#include <util/generic/algorithm.h> +#include <contrib/libs/libc_compat/include/link/link.h> + +#include "header.h" +#include "heap.h" +#include "extinfo.h" +#include "input.h" +#include "output.h" + +#ifdef TEST_MERGE +#define MBDB_SORT_FUN ::StableSort +#else +#define MBDB_SORT_FUN ::Sort +#endif + +template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile, typename TFileTypes> +class TDatSorterImpl; + +template <class TVal> +struct TFakeSieve { +    static inline int Sieve(TVal*, const TVal*) noexcept { +        return 0; +    } +}; + +template <class TSieve> +struct TIsSieveFake { +    static const bool Result = false; +}; + +template <class T> +struct TIsSieveFake<TFakeSieve<T>> { +    static const bool Result = true; +}; + +class TDefInterFileTypes { +public: +    typedef TOutputPageFile TOutPageFile; +    typedef TInputPageFile TInPageFile; +}; + +//class TCompressedInterFileTypes; + +template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterImplBase: protected  THeapIter<TVal, TInDatFileImpl<TVal, TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>>>, TCompare> { +    typedef TOutputRecordIterator<TVal, TOutputPageIterator<typename TFileTypes::TOutPageFile>, TFakeIndexer, TCompress> TTmpRecIter; +    typedef TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>> TInTmpRecIter; + +public: +    typedef TOutDatFileImpl<TVal, TTmpRecIter> TTmpOut; +    typedef TInDatFileImpl<TVal, TInTmpRecIter> TTmpIn; + +    typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TOutPageFile>, TFakeIndexer, TCompress>> TOut; +    typedef THeapIter<TVal, TTmpIn, TCompare> TMyHeap; +    typedef TVector<const TVal*> TMyVector; +    typedef typename TMyVector::iterator TMyIterator; + +    class IPortionSorter { +    public: +        virtual ~IPortionSorter() { +        } + +        virtual void Sort(TMyVector&, TTmpOut*) = 0; +    }; + +    class TDefaultSorter: public IPortionSorter { +    public: +        void Sort(TMyVector& vector, TTmpOut* out) override { +            MBDB_SORT_FUN(vector.begin(), vector.end(), TCompare()); + +            const typename TMyVector::const_iterator +                end = (TIsSieveFake<TSieve>::Result) ? vector.end() : TDatSorterImplBase::SieveRange(vector.begin(), vector.end()); + +            for (typename TMyVector::const_iterator it = vector.begin(); it != end; ++it) { +                out->PushWithExtInfo(*it); +            } +        } +    }; + +    class TSegmentedSorter: public IPortionSorter { +        class TAdaptor { +            typedef typename TMyVector::const_iterator TConstIterator; + +        public: +            TAdaptor(TConstIterator b, TConstIterator e) +                : Curr_(b) +                , End_(e) +            { +                --Curr_; +            } + +            inline const TVal* Current() const { +                return *Curr_; +            } + +            inline const TVal* Next() { +                ++Curr_; + +                if (Curr_ == End_) { +                    return nullptr; +                } + +                return *Curr_; +            } + +        private: +            TConstIterator Curr_; +            TConstIterator End_; +        }; + +        typedef THeapIter<TVal, TAdaptor, TCompare> TPortionsHeap; + +    public: +        void Sort(TMyVector& vector, TTmpOut* out) override { +            TVector<TAdaptor> bounds; +            typename TMyVector::iterator +                it = vector.begin(); +            const size_t portions = Max<size_t>(1, (vector.size() * sizeof(TVal)) / (4 << 20)); +            const size_t step = vector.size() / portions; + +            // Sort segments +            while (it != vector.end()) { +                const typename TMyVector::iterator +                    end = Min(it + step, vector.end()); + +                MBDB_SORT_FUN(it, end, TCompare()); + +                bounds.push_back(TAdaptor(it, end)); + +                it = end; +            } + +            // +            // Merge result +            // + +            TPortionsHeap heap(bounds); + +            if (TIsSieveFake<TSieve>::Result) { +                while (const TVal* val = heap.Next()) { +                    out->PushWithExtInfo(val); +                } +            } else { +                const TVal* val = heap.Next(); +                const TVal* prev = out->PushWithExtInfo(val); + +                for (val = heap.Next(); val && prev; val = heap.Next()) { +                    if (TSieve::Sieve((TVal*)prev, val)) { +                        continue; +                    } + +                    prev = out->PushWithExtInfo(val); +                } + +                if (prev) { +                    TSieve::Sieve((TVal*)prev, prev); +                } +            } +        } +    }; + +public: +    TDatSorterImplBase() +        : Sorter(new TDefaultSorter) +    { +        InFiles = nullptr; +        TempBuf = nullptr; +        Ptr = Vector.end(); +        Cur = nullptr; +        Portions = CPortions = Error = 0; +    } + +    ~TDatSorterImplBase() { +        Close(); +    } + +    int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { +        Portions = CPortions = Error = 0; +        TempBuf = strdup(templ); +        Pagesize = pagesize; +        if (pagesOrBytes) +            Pages = pages; +        else +            Pages = pages / pagesize; +        Pages = Max(1, Pages); +        return 0; +    } + +    void Push(const TVal* v) { +        // Serialized extInfo must follow a record being pushed, therefore, to avoid +        // unintentional misusage (as if when you are adding TExtInfo in your record +        // type: you may forget to check your sorting routines and get a segfault as +        // a result). +        // PushWithExtInfo(v) should be called on records with extInfo. +        static_assert(!TExtInfoType<TVal>::Exists, "expect !TExtInfoType<TVal>::Exists"); + +        Vector.push_back(v); +    } + +    void PushWithExtInfo(const TVal* v) { +        Vector.push_back(v); +    } + +    int SortPortion() { +        Ptr = Vector.end(); +        Cur = nullptr; +        if (!Vector.size() || Error) +            return Error; + +        MBDB_SORT_FUN(Vector.begin(), Vector.end(), TCompare()); + +        if (!TIsSieveFake<TSieve>::Result) { +            const typename TMyVector::iterator +                end = SieveRange(Vector.begin(), Vector.end()); + +            Vector.resize(end - Vector.begin()); +        } + +        Ptr = Vector.begin(); +        Cur = nullptr; +        return 0; +    } + +    const TVal* Nextp() { +        Cur = Ptr == Vector.end() ? nullptr : *Ptr++; +        return Cur; +    } + +    const TVal* Currentp() const { +        return Cur; +    } + +    void Closep() { +        Vector.clear(); +        Ptr = Vector.end(); +        Cur = nullptr; +    } + +    int NextPortion(bool direct = false) { +        if (!Vector.size() || Error) +            return Error; + +        TTmpOut out; +        int ret, ret1; +        char fname[FILENAME_MAX]; + +        snprintf(fname, sizeof(fname), TempBuf, Portions++); +        if ((ret = out.Open(fname, Pagesize, Pages, 1, direct))) +            return Error = ret; + +        Sorter->Sort(Vector, &out); + +        Vector.erase(Vector.begin(), Vector.end()); +        ret = out.GetError(); +        ret1 = out.Close(); +        Error = Error ? Error : ret ? ret : ret1; +        if (Error) +            unlink(fname); +        return Error; +    } + +    int SavePortions(const char* mask, bool direct = false) { +        char srcname[PATH_MAX], dstname[PATH_MAX]; +        if (Vector.size()) +            NextPortion(direct); +        for (int i = 0; i < Portions; i++) { +            char num[10]; +            sprintf(num, "%i", i); +            snprintf(srcname, sizeof(srcname), TempBuf, i); +            snprintf(dstname, sizeof(dstname), mask, num); +            int res = rename(srcname, dstname); +            if (res) +                return res; +        } +        snprintf(dstname, sizeof(dstname), mask, "count"); +        TOFStream fcount(dstname); +        Save(&fcount, Portions); +        fcount.Finish(); +        return 0; +    } + +    int RestorePortions(const char* mask) { +        char srcname[PATH_MAX], dstname[PATH_MAX]; +        snprintf(srcname, sizeof(srcname), mask, "count"); +        TIFStream fcount(srcname); +        Load(&fcount, Portions); +        for (int i = 0; i < Portions; i++) { +            char num[10]; +            sprintf(num, "%i", i); +            snprintf(dstname, sizeof(dstname), TempBuf, i); +            snprintf(srcname, sizeof(srcname), mask, num); +            unlink(dstname); +            int res = link(srcname, dstname); +            if (res) +                return res; +        } +        return 0; +    } + +    int RestorePortions(const char* mask, ui32 count) { +        char srcname[PATH_MAX], dstname[PATH_MAX]; +        ui32 portions; +        TVector<ui32> counts; +        for (ui32 j = 0; j < count; j++) { +            snprintf(srcname, sizeof(srcname), mask, j, "count"); +            TIFStream fcount(srcname); +            Load(&fcount, portions); +            counts.push_back(portions); +            Portions += portions; +        } +        ui32 p = 0; +        for (ui32 j = 0; j < count; j++) { +            int cnt = counts[j]; +            for (int i = 0; i < cnt; i++, p++) { +                char num[10]; +                sprintf(num, "%i", i); +                snprintf(dstname, sizeof(dstname), TempBuf, p); +                snprintf(srcname, sizeof(srcname), mask, j, num); +                unlink(dstname); +                int res = link(srcname, dstname); +                if (res) { +                    fprintf(stderr, "Can not link %s to %s\n", srcname, dstname); +                    return res; +                } +            } +        } +        return 0; +    } + +    int Sort(size_t memory, int maxportions = 1000, bool direct = false) { +        int ret, end, beg, i; +        char fname[FILENAME_MAX]; + +        if (Vector.size()) +            NextPortion(); + +        if (Error) +            return Error; +        if (!Portions) { +            TMyHeap::Init(&DummyFile, 1); // closed file +            HPages = 1; +            return 0; +        } + +        Optimize(memory, maxportions); +        if (!(InFiles = new TTmpIn[MPortions])) +            return MBDB_NO_MEMORY; + +        for (beg = 0; beg < Portions && !Error; beg = end) { +            end = (int)Min(beg + FPortions, Portions); +            for (i = beg; i < end && !Error; i++) { +                snprintf(fname, sizeof(fname), TempBuf, i); +                if ((ret = InFiles[i - beg].Open(fname, HPages, 1, nullptr, direct))) +                    Error = Error ? Error : ret; +            } +            if (Error) +                return Error; +            TMyHeap::Init(InFiles, end - beg); +            if (end != Portions) { +                TTmpOut out; +                const TVal* v; +                snprintf(fname, sizeof(fname), TempBuf, Portions++); +                if ((ret = out.Open(fname, Pagesize, HPages))) +                    return Error = Error ? Error : ret; +                while ((v = TMyHeap::Next())) +                    out.PushWithExtInfo(v); +                ret = out.GetError(); +                Error = Error ? Error : ret; +                ret = out.Close(); +                Error = Error ? Error : ret; +                for (i = beg; i < end; i++) { +                    ret = InFiles[i - beg].Close(); +                    Error = Error ? Error : ret; +                    snprintf(fname, sizeof(fname), TempBuf, CPortions++); +                    unlink(fname); +                } +            } +            FPortions = MPortions; +        } +        return Error; +    } + +    int Close() { +        char fname[FILENAME_MAX]; +        delete[] InFiles; +        InFiles = nullptr; +        Closep(); +        for (int i = CPortions; i < Portions; i++) { +            snprintf(fname, sizeof(fname), TempBuf, i); +            unlink(fname); +        } +        CPortions = Portions = 0; +        free(TempBuf); +        TempBuf = nullptr; +        return Error; +    } + +    void UseSegmentSorter() { +        Sorter.Reset(new TSegmentedSorter); +    } + +    inline int GetError() const { +        return Error; +    } + +    inline int GetPages() const { +        return Pages; +    } + +    inline int GetPageSize() const { +        return Pagesize; +    } + +private: +    static TMyIterator SieveRange(const TMyIterator begin, const TMyIterator end) { +        TMyIterator it = begin; +        TMyIterator prev = begin; + +        for (++it; it != end; ++it) { +            if (TSieve::Sieve((TVal*)*prev, *it)) { +                continue; +            } + +            ++prev; + +            if (it != prev) { +                *prev = *it; +            } +        } + +        TSieve::Sieve((TVal*)*prev, *prev); + +        return ++prev; +    } + +protected: +    void Optimize(size_t memory, int maxportions, size_t fbufmax = 256u << 20) { +        maxportions = (int)Min((size_t)maxportions, memory / Pagesize) - 1; +        size_t maxpages = Max((size_t)1u, fbufmax / Pagesize); + +        if (maxportions <= 2) { +            FPortions = MPortions = 2; +            HPages = 1; +            return; +        } +        if (maxportions >= Portions) { +            FPortions = MPortions = Portions; +            HPages = (int)Min(memory / ((Portions + 1) * Pagesize), maxpages); +            return; +        } +        if (((Portions + maxportions - 1) / maxportions) <= maxportions) { +            while (((Portions + maxportions - 1) / maxportions) <= maxportions) +                --maxportions; +            MPortions = ++maxportions; +            int total = ((Portions + maxportions - 1) / maxportions) + Portions; +            FPortions = (total % maxportions) ? (total % maxportions) : MPortions; +            HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); +            return; +        } +        FPortions = MPortions = maxportions; +        HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); +    } + +    TMyVector Vector; +    typename TMyVector::iterator Ptr; +    const TVal* Cur; +    TTmpIn *InFiles, DummyFile; +    char* TempBuf; +    int Portions, CPortions, Pagesize, Pages, Error; +    int FPortions, MPortions, HPages; +    THolder<IPortionSorter> Sorter; +}; + +template <class TVal, class TCompare, typename TCompress> +class TDatSorterImpl<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> +   : public TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> { +    typedef TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> TBase; + +public: +    int SortToFile(const char* name, size_t memory, int maxportions = 1000) { +        int ret = TBase::Sort(memory, maxportions); +        if (ret) +            return ret; +        typename TBase::TOut out; +        if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) +            return ret; +        const TVal* rec; +        while ((rec = Next())) +            out.PushWithExtInfo(rec); +        if ((ret = out.GetError())) +            return ret; +        if ((ret = out.Close())) +            return ret; +        if ((ret = TBase::Close())) +            return ret; +        return 0; +    } + +    int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { +        int ret = TBase::Sort(memory, maxportions); +        if (ret) +            return ret; +        typename TBase::TOut out; +        if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) +            return ret; +        const TVal* rec; +        while ((rec = Next())) +            out.PushWithExtInfo(rec); +        if ((ret = out.GetError())) +            return ret; +        if ((ret = out.Close())) +            return ret; +        if ((ret = TBase::Close())) +            return ret; +        return 0; +    } + +    const TVal* Next() { +        return TBase::TMyHeap::Next(); +    } + +    const TVal* Current() const { +        return TBase::TMyHeap::Current(); +    } + +    bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { +        return TBase::TMyHeap::GetExtInfo(extInfo); +    } + +    const ui8* GetExtInfoRaw(size_t* len) const { +        return TBase::TMyHeap::GetExtInfoRaw(len); +    } +}; + +template <class TVal, class TCompare, typename TCompress, typename TSieve, +          typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterImpl: public TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> { +    typedef TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> TBase; + +public: +    TDatSorterImpl() +        : Cur(nullptr) +        , Prev(nullptr) +    { +    } + +    int SortToFile(const char* name, size_t memory, int maxportions = 1000) { +        int ret = Sort(memory, maxportions); +        if (ret) +            return ret; +        typename TBase::TOut out; +        if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) +            return ret; +        const TVal* rec; +        while ((rec = Next())) +            out.PushWithExtInfo(rec); +        if ((ret = out.GetError())) +            return ret; +        if ((ret = out.Close())) +            return ret; +        if ((ret = TBase::Close())) +            return ret; +        return 0; +    } + +    int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { +        int ret = Sort(memory, maxportions); +        if (ret) +            return ret; +        typename TBase::TOut out; +        if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) +            return ret; +        const TVal* rec; +        while ((rec = Next())) +            out.PushWithExtInfo(rec); +        if ((ret = out.GetError())) +            return ret; +        if ((ret = out.Close())) +            return ret; +        if ((ret = TBase::Close())) +            return ret; +        return 0; +    } + +    int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { +        int res = TBase::Open(templ, pagesize, pages, pagesOrBytes); +        Prev = nullptr; +        Cur = nullptr; +        return res; +    } + +    int Sort(size_t memory, int maxportions = 1000, bool direct = false) { +        int res = TBase::Sort(memory, maxportions, direct); +        if (!res) { +            const TVal* rec = TBase::TMyHeap::Next(); +            if (rec) { +                size_t els, es; +                size_t sz = NMicroBDB::SizeOfExt(rec, &els, &es); +                sz += els + es; +                if (!TExtInfoType<TVal>::Exists) +                    Cur = (TVal*)malloc(sizeof(TVal)); +                else +                    Cur = (TVal*)malloc(TBase::Pagesize); +                memcpy(Cur, rec, sz); +            } +        } +        return res; +    } + +    // Prev = last returned +    // Cur = current accumlating with TSieve + +    const TVal* Next() { +        if (!Cur) { +            if (Prev) { +                free(Prev); +                Prev = nullptr; +            } +            return nullptr; +        } +        const TVal* rec; + +        if (TIsSieveFake<TSieve>::Result) +            rec = TBase::TMyHeap::Next(); +        else { +            do { +                rec = TBase::TMyHeap::Next(); +            } while (rec && TSieve::Sieve((TVal*)Cur, rec)); +        } + +        if (!Prev) { +            if (!TExtInfoType<TVal>::Exists) +                Prev = (TVal*)malloc(sizeof(TVal)); +            else +                Prev = (TVal*)malloc(TBase::Pagesize); +        } +        size_t els, es; +        size_t sz = NMicroBDB::SizeOfExt(Cur, &els, &es); +        sz += els + es; +        memcpy(Prev, Cur, sz); + +        if (rec) { +            sz = NMicroBDB::SizeOfExt(rec, &els, &es); +            sz += els + es; +            memcpy(Cur, rec, sz); +        } else { +            TSieve::Sieve((TVal*)Cur, Cur); +            free(Cur); +            Cur = nullptr; +        } +        return Prev; +    } + +    const TVal* Current() const { +        return Prev; +    } + +    int Close() { +        int res = TBase::Close(); +        if (Prev) { +            free(Prev); +            Prev = nullptr; +        } +        if (Cur) { +            free(Cur); +            Cur = nullptr; +        } +        return res; +    } + +protected: +    TVal* Cur; +    TVal* Prev; +}; diff --git a/library/cpp/microbdb/sorterdef.h b/library/cpp/microbdb/sorterdef.h new file mode 100644 index 00000000000..8834b5fff80 --- /dev/null +++ b/library/cpp/microbdb/sorterdef.h @@ -0,0 +1,19 @@ +#pragma once + +#define MAKESORTERTMPL(TRecord, MemberFunc)                       \ +    template <typename T>                                         \ +    struct MemberFunc;                                            \ +    template <>                                                   \ +    struct MemberFunc<TRecord> {                                  \ +        bool operator()(const TRecord* l, const TRecord* r) {     \ +            return TRecord ::MemberFunc(l, r) < 0;                \ +        }                                                         \ +        int operator()(const TRecord* l, const TRecord* r, int) { \ +            return TRecord ::MemberFunc(l, r);                    \ +        }                                                         \ +    } + +template <typename T> +static inline int compare(const T& a, const T& b) { +    return (a < b) ? -1 : (a > b); +} diff --git a/library/cpp/microbdb/utility.h b/library/cpp/microbdb/utility.h new file mode 100644 index 00000000000..5c86061bca0 --- /dev/null +++ b/library/cpp/microbdb/utility.h @@ -0,0 +1,75 @@ +#pragma once + +#include "microbdb.h" + +template <class TRecord, template <class T> class TCompare> +int SortData(const TFile& ifile, const TFile& ofile, const TDatMetaPage* meta, size_t memory, const char* tmpDir = nullptr) { +    char templ[FILENAME_MAX]; +    TInDatFileImpl<TRecord> datin; +    TOutDatFileImpl<TRecord> datout; +    TDatSorterImpl<TRecord, TCompare<TRecord>, TFakeCompression, TFakeSieve<TRecord>> sorter; +    const TRecord* u; +    int ret; + +    const size_t minMemory = (2u << 20); +    memory = Max(memory, minMemory + minMemory / 2); +    if (datin.Open(ifile, meta, memory - minMemory, 0)) +        err(1, "can't read input file"); + +    size_t outpages = Max((size_t)2u, minMemory / datin.GetPageSize()); +    memory -= outpages * datin.GetPageSize(); + +    if (ret = MakeSorterTempl(templ, tmpDir)) +        err(1, "can't create tempdir in \"%s\"; error: %d\n", templ, ret); + +    if (sorter.Open(templ, datin.GetPageSize(), outpages)) { +        *strrchr(templ, LOCSLASH_C) = 0; +        RemoveDirWithContents(templ); +        err(1, "can't open sorter"); +    } + +    while (1) { +        datin.Freeze(); +        while ((u = datin.Next())) +            sorter.PushWithExtInfo(u); +        sorter.NextPortion(); +        if (datin.GetError() || datin.IsEof()) +            break; +    } + +    if (datin.GetError()) { +        *strrchr(templ, LOCSLASH_C) = 0; +        RemoveDirWithContents(templ); +        err(1, "in data file error %d", datin.GetError()); +    } +    if (datin.Close()) { +        *strrchr(templ, LOCSLASH_C) = 0; +        RemoveDirWithContents(templ); +        err(1, "can't close in data file"); +    } + +    sorter.Sort(memory); + +    if (datout.Open(ofile, datin.GetPageSize(), outpages)) { +        *strrchr(templ, LOCSLASH_C) = 0; +        RemoveDirWithContents(templ); +        err(1, "can't write out file"); +    } + +    while ((u = sorter.Next())) +        datout.PushWithExtInfo(u); + +    if (sorter.GetError()) +        err(1, "sorter error %d", sorter.GetError()); +    if (sorter.Close()) +        err(1, "can't close sorter"); + +    *strrchr(templ, LOCSLASH_C) = 0; +    RemoveDirWithContents(templ); + +    if (datout.GetError()) +        err(1, "out data file error %d", datout.GetError()); +    if (datout.Close()) +        err(1, "can't close out data file"); +    return 0; +} diff --git a/library/cpp/microbdb/wrappers.h b/library/cpp/microbdb/wrappers.h new file mode 100644 index 00000000000..38eb8edebc0 --- /dev/null +++ b/library/cpp/microbdb/wrappers.h @@ -0,0 +1,637 @@ +#pragma once + +#include "microbdb.h" + +#define MAKEFILTERTMPL(TRecord, MemberFunc, NS) \ +    template <typename T>                       \ +    struct MemberFunc;                          \ +    template <>                                 \ +    struct MemberFunc<TRecord> {                \ +        bool operator()(const TRecord* r) {     \ +            return NS::MemberFunc(r);           \ +        }                                       \ +    } + +#define MAKEJOINTMPL(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ +    template <typename A, typename B>                                \ +    struct MemberFunc;                                               \ +    template <>                                                      \ +    struct MemberFunc<TRecordA, TRecordB> {                          \ +        int operator()(const TRecordA* l, const TRecordB* r) {       \ +            return NS::MemberFunc(l, r);                             \ +        }                                                            \ +    };                                                               \ +    typedef TMergeRec<TRecordA, TRecordB> TMergeType + +#define MAKEJOINTMPL2(TRecordA, TRecordB, MemberFunc, StructName, TMergeType) \ +    template <typename A, typename B>                                         \ +    struct StructName;                                                        \ +    template <>                                                               \ +    struct StructName<TRecordA, TRecordB> {                                   \ +        int operator()(const TRecordA* l, const TRecordB* r) {                \ +            return MemberFunc(l, r);                                          \ +        }                                                                     \ +    };                                                                        \ +    typedef TMergeRec<TRecordA, TRecordB> TMergeType + +#define MAKEJOINTMPLLEFT(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ +    template <typename A, typename B>                                    \ +    struct MemberFunc;                                                   \ +    template <>                                                          \ +    struct MemberFunc<TRecordA, TRecordB> {                              \ +        int operator()(const TRecordA* l, const TRecordB* r) {           \ +            return NS::MemberFunc(l->RecA, r);                           \ +        }                                                                \ +    };                                                                   \ +    typedef TMergeRec<TRecordA, TRecordB> TMergeType + +template <class TRec> +class IDatNextSource { +public: +    virtual const TRec* Next() = 0; +    virtual void Work() { +    } +}; + +template <class TRec> +class IDatNextReceiver { +public: +    IDatNextReceiver(IDatNextSource<TRec>& source) +        : Source(source) +    { +    } + +    virtual void Work() { +        Source.Work(); +    } + +protected: +    IDatNextSource<TRec>& Source; +}; + +template <class TInRec, class TOutRec> +class IDatNextChannel: public IDatNextReceiver<TInRec>, public IDatNextSource<TOutRec> { +public: +    IDatNextChannel(IDatNextSource<TInRec>& source) +        : IDatNextReceiver<TInRec>(source) +    { +    } + +    virtual void Work() { +        IDatNextReceiver<TInRec>::Work(); +    } +}; + +class IDatWorker { +public: +    virtual void Work() = 0; +}; + +template <class TRec> +class IDatPushReceiver { +public: +    virtual void Push(const TRec* rec) = 0; +    virtual void Work() = 0; +}; + +template <class TRec> +class IDatPushSource { +public: +    IDatPushSource(IDatPushReceiver<TRec>& receiver) +        : Receiver(receiver) +    { +    } + +    virtual void Work() { +        Receiver.Work(); +    } + +protected: +    IDatPushReceiver<TRec>& Receiver; +}; + +template <class TInRec, class TOutRec> +class IDatPushChannel: public IDatPushReceiver<TInRec>, public IDatPushSource<TOutRec> { +public: +    IDatPushChannel(IDatPushReceiver<TOutRec>& receiver) +        : IDatPushSource<TOutRec>(receiver) +    { +    } + +    virtual void Work() { +        IDatPushSource<TOutRec>::Work(); +    } +}; + +template <class TRec> +class IDatNextToPush: public IDatNextReceiver<TRec>, public IDatPushSource<TRec> { +    typedef IDatNextReceiver<TRec> TNextReceiver; +    typedef IDatPushSource<TRec> TPushSource; + +public: +    IDatNextToPush(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) +        : TNextReceiver(source) +        , TPushSource(receiver) +    { +    } + +    virtual void Work() { +        const TRec* rec; +        while (rec = TNextReceiver::Source.Next()) +            TPushSource::Receiver.Push(rec); +        TPushSource::Work(); +        TNextReceiver::Work(); +    } +}; + +template <class TRec> +class TDatNextPNSplitter: public IDatNextReceiver<TRec>, public IDatNextSource<TRec>, public IDatPushSource<TRec> { +public: +    TDatNextPNSplitter(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) +        : IDatNextReceiver<TRec>(source) +        , IDatNextSource<TRec>() +        , IDatPushSource<TRec>(receiver) +    { +    } + +    const TRec* Next() { +        const TRec* rec = IDatNextReceiver<TRec>::Source.Next(); +        if (rec) { +            IDatPushSource<TRec>::Receiver.Push(rec); +            return rec; +        } else { +            return 0; +        } +    } + +    virtual void Work() { +        IDatNextReceiver<TRec>::Work(); +        IDatPushSource<TRec>::Work(); +    } +}; + +template <class TRec, class TOutRecA = TRec, class TOutRecB = TRec> +class TDatPushPPSplitter: public IDatPushReceiver<TRec>, public IDatPushSource<TOutRecA>, public IDatPushSource<TOutRecB> { +public: +    TDatPushPPSplitter(IDatPushReceiver<TOutRecA>& receiverA, IDatPushReceiver<TOutRecB>& receiverB) +        : IDatPushSource<TOutRecA>(receiverA) +        , IDatPushSource<TOutRecB>(receiverB) +    { +    } + +    void Push(const TRec* rec) { +        IDatPushSource<TOutRecA>::Receiver.Push(rec); +        IDatPushSource<TOutRecB>::Receiver.Push(rec); +    } + +    void Work() { +        IDatPushSource<TOutRecA>::Work(); +        IDatPushSource<TOutRecB>::Work(); +    } +}; + +template <class TRec> +class TFastInDatFile: public TInDatFile<TRec>, public IDatNextSource<TRec> { +public: +    typedef TInDatFile<TRec> Base; + +    TFastInDatFile(const char* name, bool open = true, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) +        : TInDatFile<TRec>(name, pages, pagesOrBytes) +        , FileName(name) +    { +        if (open) +            Base::Open(name); +    } + +    void Open() { +        Base::Open(FileName); +    } + +    template <class TPassRec> +    bool PassToUid(const TRec* inrec, const TPassRec* torec) { +        inrec = Base::Current(); +        while (inrec && CompareUids(inrec, torec) < 0) +            inrec = Base::Next(); +        return (inrec && CompareUids(inrec, torec) == 0); +    } + +    void Work() { +        Base::Close(); +    } + +    const TRec* Next() { +        return Base::Next(); +    } + +private: +    TString FileName; +}; + +template <class TRec> +class TPushOutDatFile: public TOutDatFile<TRec>, public IDatPushReceiver<TRec> { +public: +    typedef TOutDatFile<TRec> Base; + +    TPushOutDatFile(const char* name, bool open = true) +        : Base(name, dbcfg::pg_docuid, dbcfg::fbufsize, 0) +        , FileName(name) +    { +        if (open) +            Base::Open(name); +    } + +    void Open() { +        Base::Open(~FileName); +    } + +    void Push(const TRec* rec) { +        Base::Push(rec); +    } + +    void Work() { +        Base::Close(); +    } + +private: +    TString FileName; +}; + +template <class TRec> +class TNextOutDatFile: public IDatNextToPush<TRec> { +public: +    typedef IDatNextToPush<TRec> TBase; + +    TNextOutDatFile(const char* name, IDatNextSource<TRec>& source, bool open = true) +        : TBase(source, File) +        , File(name, open) +    { +    } + +    void Open() { +        File.Open(); +    } + +private: +    TPushOutDatFile<TRec> File; +}; + +template <class TVal, template <typename T> class TCompare> +class TNextDatSorterMemo: public TDatSorterMemo<TVal, TCompare>, public IDatNextChannel<TVal, TVal> { +    typedef TDatSorterMemo<TVal, TCompare> TImpl; + +public: +    TNextDatSorterMemo(IDatNextSource<TVal>& source, const char* dir = dbcfg::fname_temp, const char* name = "yet another sorter", size_t memory = dbcfg::small_sorter_size, size_t pagesize = dbcfg::pg_docuid, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) +        : TImpl(name, memory, pagesize, pages, pagesOrBytes) +        , IDatNextChannel<TVal, TVal>(source) +        , Sorted(false) +    { +        TImpl::Open(dir); +    } + +    void Sort() { +        const TVal* rec; +        while (rec = IDatNextChannel<TVal, TVal>::Source.Next()) { +            TImpl::Push(rec); +        } +        TImpl::Sort(); +        Sorted = true; +    } + +    const TVal* Next() { +        if (!Sorted) +            Sort(); +        return TImpl::Next(); +    } + +private: +    bool Sorted; +    TString Dir; +}; + +template <class TInRec, class TOutRec> +class TDatConverter: public IDatNextChannel<TInRec, TOutRec> { +public: +    TDatConverter(IDatNextSource<TInRec>& source) +        : IDatNextChannel<TInRec, TOutRec>(source) +    { +    } + +    virtual void Convert(const TInRec& inrec, TOutRec& outrec) { +        outrec(inrec); +    } + +    const TOutRec* Next() { +        const TInRec* rec = IDatNextChannel<TInRec, TOutRec>::Source.Next(); +        if (!rec) +            return 0; +        Convert(*rec, CurrentRec); +        return &CurrentRec; +    } + +private: +    TOutRec CurrentRec; +}; + +template <class TRecA, class TRecB> +class TMergeRec { +public: +    const TRecA* RecA; +    const TRecB* RecB; +}; + +enum NMergeTypes { +    MT_JOIN = 0, +    MT_ADD = 1, +    MT_OVERWRITE = 2, +    MT_TYPENUM +}; + +template <class TRecA, class TRecB, template <typename TA, typename TB> class TCompare> +class TNextDatMerger: public IDatNextReceiver<TRecA>, public IDatNextReceiver<TRecB>, public IDatNextSource<TMergeRec<TRecA, TRecB>> { +public: +    TNextDatMerger(IDatNextSource<TRecA>& sourceA, IDatNextSource<TRecB>& sourceB, ui8 mergeType) +        : IDatNextReceiver<TRecA>(sourceA) +        , IDatNextReceiver<TRecB>(sourceB) +        , MergeType(mergeType) +        , MoveA(false) +        , MoveB(false) +        , NotInit(true) +    { +    } + +    const TMergeRec<TRecA, TRecB>* Next() { +        if (MoveA || NotInit) +            SourceARec = IDatNextReceiver<TRecA>::Source.Next(); +        if (MoveB || NotInit) +            SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); +        NotInit = false; + +        //        Cout << "Next " << SourceARec->HostId << "\t" << SourceBRec->HostId << "\t" << TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) << "\t" << ::compare(SourceARec->HostId, SourceBRec->HostId) << "\t" << ::compare(1, 2) << "\t" << ::compare(2,1) << Endl; +        if (MergeType == MT_ADD && SourceARec && (!SourceBRec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { +            MergeRec.RecA = SourceARec; +            MergeRec.RecB = 0; +            MoveA = true; +            MoveB = false; +            return &MergeRec; +        } + +        if (MergeType == MT_ADD && SourceBRec && (!SourceARec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { +            MergeRec.RecA = 0; +            MergeRec.RecB = SourceBRec; +            MoveA = false; +            MoveB = true; +            return &MergeRec; +        } + +        if (MergeType == MT_ADD && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) == 0) { +            MergeRec.RecA = SourceARec; +            MergeRec.RecB = SourceBRec; +            MoveA = true; +            MoveB = true; +            return &MergeRec; +        } + +        while (MergeType == MT_JOIN && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) != 0) { +            while (SourceARec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0) { +                SourceARec = IDatNextReceiver<TRecA>::Source.Next(); +            } +            while (SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) > 0) { +                SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); +            } +        } + +        if (MergeType == MT_JOIN && SourceARec && SourceBRec) { +            MergeRec.RecA = SourceARec; +            MergeRec.RecB = SourceBRec; +            MoveA = true; +            MoveB = true; +            return &MergeRec; +        } + +        MergeRec.RecA = 0; +        MergeRec.RecB = 0; +        return 0; +    } + +    void Work() { +        IDatNextReceiver<TRecA>::Source.Work(); +        IDatNextReceiver<TRecB>::Source.Work(); +    } + +private: +    TMergeRec<TRecA, TRecB> MergeRec; +    const TRecA* SourceARec; +    const TRecB* SourceBRec; +    ui8 MergeType; +    bool MoveA; +    bool MoveB; +    bool NotInit; +}; + +/*template<class TRec, class TSource, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TPushDatMerger { +public: +    TPushDatMerger(TSource& source, TReceiver& receiver, ui8 mergeType) +        : Source(source) +        , Receiver(receiver) +        , MergeType(mergeType) +    { +    } + +    virtual void Init() { +        SourceRec = Source.Next(); +    } + +    virtual void Push(const TRec* rec) { +        while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) < 0) { +            if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) +                Receiver.Push(SourceRec); +            SourceRec = Source.Next(); +        } + +        bool intersected = false; +        while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) == 0) { +            intersected = true; +            if (MergeType == MT_ADD) +                Receiver.Push(SourceRec); +            SourceRec = Source.Next(); +        } + +        if (intersected && MergeType == MT_JOIN) +            Receiver.Push(rec); + +        if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) +            Receiver.Push(rec); +    } + +    virtual void Term() { +        if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) { +            while (SourceRec) { +                Receiver.Push(SourceRec); +                SourceRec = Source.Next(); +            } +        } +    } + +private: +    TSource&                Source; +    const TRec*             SourceRec; +    TReceiver&              Receiver; +    ui8                     MergeType; +};*/ + +/*template <class TRec, class TSourceA, class TSourceB, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TNextDatMerger: public TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> { +    typedef TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> TImpl; +public: +    TNextDatMerger(TSourceA& sourceA, TSourceB& sourceB, TReceiver& receiver, ui8 mergeType) +        : TImpl(sourceA, receiver, mergeType) +        , SourceB(sourceB) +    { +    } + +    virtual void Work() { +        TImpl::Init(); +        while (SourceBRec = SourceB.Next()) { +            TImpl::Push(SourceBRec); +        } +        TImpl::Term(); +    } +private: +    TSourceB&               SourceB; +    const TRec*             SourceBRec; +};*/ + +/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TFilePushDatMerger: public TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> { +    typedef TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; +public: +    TFilePushDatMerger(const char* name, TReceiver& receiver, ui8 mergeType) +        : TImpl(SourceFile, receiver, mergeType) +        , SourceFile(name) +    { +    } + +    virtual void Push(const TRec* rec) { +        TImpl::Push(rec); +    } + +    virtual void Term() { +        TImpl::Term(); +    } +private: +    TFastInDatFile<TRec>        SourceFile; +};*/ + +/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TFileNextDatMerger: public TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> { +    typedef TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; +public: +    TFileNextDatMerger(const char* sourceAname, const char* sourceBname, TReceiver& receiver, ui8 mergeType) +        : TImpl(FileA, FileB, receiver, mergeType) +        , FileA(sourceAname) +        , FileB(sourceBname) +    { +    } + +    virtual void Work() { +        TImpl::Work(); +    } +private: +    TFastInDatFile<TRec>      FileA; +    TFastInDatFile<TRec>      FileB; +};*/ + +template <class TRec, template <typename T> class TPredicate> +class TDatNextFilter: public IDatNextChannel<TRec, TRec> { +public: +    TDatNextFilter(IDatNextSource<TRec>& source) +        : IDatNextChannel<TRec, TRec>(source) +    { +    } + +    virtual const TRec* Next() { +        const TRec* rec; +        while ((rec = IDatNextChannel<TRec, TRec>::Source.Next()) != 0 && !Check(rec)) { +        } +        if (!rec) +            return 0; +        return rec; +    } + +protected: +    virtual bool Check(const TRec* rec) { +        return TPredicate<TRec>()(rec); +    } +}; + +template <class TRec, template <typename T> class TPredicate> +class TDatPushFilter: public IDatPushChannel<TRec, TRec> { +public: +    TDatPushFilter(IDatPushReceiver<TRec>& receiver) +        : IDatPushChannel<TRec, TRec>(receiver) +    { +    } + +    virtual void Push(const TRec* rec) { +        if (Check(rec)) +            IDatPushChannel<TRec, TRec>::Receiver.Push(rec); +    } + +private: +    virtual bool Check(const TRec* rec) { +        return TPredicate<TRec>()(rec); +    } +}; + +template <class TInRec, class TOutRec, template <typename T> class TCompare> +class TDatGrouper: public IDatNextChannel<TInRec, TOutRec> { +public: +    TDatGrouper(IDatNextSource<TInRec>& source) +        : IDatNextChannel<TInRec, TOutRec>(source) +        , Begin(true) +        , Finish(false) +        , HasOutput(false) +    { +    } + +    const TOutRec* Next() { +        while (CurrentRec = IDatNextChannel<TInRec, TOutRec>::Source.Next()) { +            int cmp = 0; +            if (Begin) { +                Begin = false; +                OnStart(); +            } else if ((cmp = TCompare<TInRec>()(CurrentRec, LastRec, 0)) != 0) { +                OnFinish(); +                OnStart(); +            } +            OnRecord(); +            LastRec = CurrentRec; +            if (HasOutput) { +                HasOutput = false; +                return &OutRec; +            } +        } +        if (!Finish) +            OnFinish(); +        Finish = true; +        if (HasOutput) { +            HasOutput = false; +            return &OutRec; +        } +        return 0; +    } + +protected: +    virtual void OnStart() = 0; +    virtual void OnRecord() = 0; +    virtual void OnFinish() = 0; + +    const TInRec* CurrentRec; +    const TInRec* LastRec; +    TOutRec OutRec; + +    bool Begin; +    bool Finish; +    bool HasOutput; +};  | 
