diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/microbdb | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/microbdb')
-rw-r--r-- | library/cpp/microbdb/align.h | 17 | ||||
-rw-r--r-- | library/cpp/microbdb/compressed.h | 520 | ||||
-rw-r--r-- | library/cpp/microbdb/extinfo.h | 127 | ||||
-rw-r--r-- | library/cpp/microbdb/file.cpp | 220 | ||||
-rw-r--r-- | library/cpp/microbdb/file.h | 225 | ||||
-rw-r--r-- | library/cpp/microbdb/hashes.h | 250 | ||||
-rw-r--r-- | library/cpp/microbdb/header.cpp | 91 | ||||
-rw-r--r-- | library/cpp/microbdb/header.h | 159 | ||||
-rw-r--r-- | library/cpp/microbdb/heap.h | 143 | ||||
-rw-r--r-- | library/cpp/microbdb/input.h | 1027 | ||||
-rw-r--r-- | library/cpp/microbdb/microbdb.cpp | 1 | ||||
-rw-r--r-- | library/cpp/microbdb/microbdb.h | 54 | ||||
-rw-r--r-- | library/cpp/microbdb/noextinfo.proto | 4 | ||||
-rw-r--r-- | library/cpp/microbdb/output.h | 1049 | ||||
-rw-r--r-- | library/cpp/microbdb/powersorter.h | 667 | ||||
-rw-r--r-- | library/cpp/microbdb/reader.h | 354 | ||||
-rw-r--r-- | library/cpp/microbdb/safeopen.h | 792 | ||||
-rw-r--r-- | library/cpp/microbdb/sorter.h | 677 | ||||
-rw-r--r-- | library/cpp/microbdb/sorterdef.h | 19 | ||||
-rw-r--r-- | library/cpp/microbdb/utility.h | 75 | ||||
-rw-r--r-- | library/cpp/microbdb/wrappers.h | 637 |
21 files changed, 7108 insertions, 0 deletions
diff --git a/library/cpp/microbdb/align.h b/library/cpp/microbdb/align.h new file mode 100644 index 0000000000..2f8567f134 --- /dev/null +++ b/library/cpp/microbdb/align.h @@ -0,0 +1,17 @@ +#pragma once + +#include <util/system/defaults.h> + +using TDatAlign = int; + +static inline size_t DatFloor(size_t size) { + return (size - 1) & ~(sizeof(TDatAlign) - 1); +} + +static inline size_t DatCeil(size_t size) { + return DatFloor(size) + sizeof(TDatAlign); +} + +static inline void DatSet(void* ptr, size_t size) { + *(TDatAlign*)((char*)ptr + DatFloor(size)) = 0; +} diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h new file mode 100644 index 0000000000..f0c9edfa92 --- /dev/null +++ b/library/cpp/microbdb/compressed.h @@ -0,0 +1,520 @@ +#pragma once + +#include <util/stream/zlib.h> + +#include "microbdb.h" +#include "safeopen.h" + +class TCompressedInputFileManip: public TInputFileManip { +public: + inline i64 GetLength() const { + return -1; // Some microbdb logic rely on unknown size of compressed files + } + + inline i64 Seek(i64 offset, int whence) { + i64 oldPos = DoGetPosition(); + i64 newPos = offset; + switch (whence) { + case SEEK_CUR: + newPos += oldPos; + [[fallthrough]]; // Complier happy. Please fix it! + case SEEK_SET: + break; + default: + return -1L; + } + if (oldPos > newPos) { + VerifyRandomAccess(); + DoSeek(0, SEEK_SET, IsStreamOpen()); + oldPos = 0; + } + const size_t bufsize = 1 << 12; + char buf[bufsize]; + for (i64 i = oldPos; i < newPos; i += bufsize) + InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); + return newPos; + } + + i64 RealSeek(i64 offset, int whence) { + InputStream.Destroy(); + i64 ret = DoSeek(offset, whence, !!CompressedInput); + if (ret != -1) + DoStreamOpen(DoCreateStream(), true); + return ret; + } + +protected: + IInputStream* CreateStream(const TFile& file) override { + CompressedInput.Reset(new TUnbufferedFileInput(file)); + return DoCreateStream(); + } + inline IInputStream* DoCreateStream() { + return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip); + //return new TLzqDecompress(CompressedInput.Get()); + } + THolder<IInputStream> CompressedInput; +}; + +class TCompressedBufferedInputFileManip: public TCompressedInputFileManip { +protected: + IInputStream* CreateStream(const TFile& file) override { + CompressedInput.Reset(new TFileInput(file, 0x100000)); + return DoCreateStream(); + } +}; + +using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>; +using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>; + +template <class TVal> +struct TGzKey { + ui64 Offset; + TVal Key; + + static const ui32 RecordSig = TVal::RecordSig + 0x50495a47; + + TGzKey() { + } + + TGzKey(ui64 offset, const TVal& key) + : Offset(offset) + , Key(key) + { + } + + size_t SizeOf() const { + if (this) + return sizeof(Offset) + ::SizeOf(&Key); + else { + size_t sizeOfKey = ::SizeOf((TVal*)NULL); + return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0; + } + } +}; + +template <class TVal> +class TInZIndexFile: protected TInDatFileImpl<TGzKey<TVal>> { + typedef TInDatFileImpl<TGzKey<TVal>> TDatFile; + typedef TGzKey<TVal> TGzVal; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + +public: + TInZIndexFile() + : Index0(nullptr) + { + } + + int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { + int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); + if (ret) + return ret; + if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { + TDatFile::Close(); + return MBDB_NO_MEMORY; + } + if (SizeOf((TGzVal*)NULL)) + RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL)); + TDatFile::Next(); + memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); + return 0; + } + + int Close() { + free(Index0); + Index0 = NULL; + return TDatFile::Close(); + } + + inline int GetError() const { + return TDatFile::GetError(); + } + + int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { + assert(IsOpen()); + if (!SizeOf((TVal*)NULL)) + return FindVszKey(akey); + int pageno; + i64 offset; + FindKeyOnPage(pageno, offset, Index0, akey); + TDatPage* page = TPageIter::GotoPage(pageno + 1); + int num_add = (int)offset; + FindKeyOnPage(pageno, offset, page, akey); + return pageno + num_add; + } + + using TDatFile::IsOpen; + + int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { + int pageno; + i64 offset; + FindVszKeyOnPage(pageno, offset, Index0, akey); + TDatPage* page = TPageIter::GotoPage(pageno + 1); + int num_add = (int)offset; + FindVszKeyOnPage(pageno, offset, page, akey); + return pageno + num_add; + } + + i64 FindPage(int pageno) { + if (!SizeOf((TVal*)NULL)) + return FindVszPage(pageno); + int recsize = DatCeil(SizeOf((TGzVal*)NULL)); + TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage); + if (!page) // can happen if pageno is beyond EOF + return -1; + unsigned int localpageno = pageno % RecsOnPage; + if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF + return -1; + TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize); + return v->Offset; + } + + i64 FindVszPage(int pageno) { + TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage)); + TGzVal* prev = cur; + unsigned int n = 0; + while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) { + prev = cur; + cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); + n++; + } + TDatPage* page = TPageIter::GotoPage(n); + unsigned int num_add = (unsigned int)(prev->Offset); + n = 0; + cur = (TGzVal*)((char*)page + sizeof(TDatPage)); + while (n < page->RecNum && n + num_add < (unsigned int)pageno) { + cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); + n++; + } + if (n == page->RecNum) // can happen if pageno is beyond EOF + return -1; + return cur->Offset; + } + +protected: + void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) { + int left = 0; + int right = page->RecNum - 1; + int recsize = DatCeil(SizeOf((TGzVal*)NULL)); + while (left < right) { + int middle = (left + right) >> 1; + if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key) + left = middle + 1; + else + right = middle; + } + //borders check (left and right) + pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1; + offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset; + } + + void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) { + TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage)); + ui32 RecordSig = page->RecNum; + i64 tmpoffset = cur->Offset; + for (; RecordSig > 0 && cur->Key < *key; --RecordSig) { + tmpoffset = cur->Offset; + cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); + } + int idx = page->RecNum - RecordSig - 1; + pageno = (idx >= 0) ? idx : 0; + offset = tmpoffset; + } + + TDatPage* Index0; + int RecsOnPage; +}; + +template <class TKey> +class TCompressedIndexedInputPageFile: public TCompressedInputPageFile { +public: + int GotoPage(int pageno); + +protected: + TInZIndexFile<TKey> KeyFile; +}; + +template <class TVal, class TKey> +class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey, + TInDatFileImpl<TVal, TInputRecordIterator<TVal, + TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> { +}; + +class TCompressedOutputFileManip: public TOutputFileManip { +public: + inline i64 GetLength() const { + return -1; // Some microbdb logic rely on unknown size of compressed files + } + + inline i64 Seek(i64 offset, int whence) { + i64 oldPos = DoGetPosition(); + i64 newPos = offset; + switch (whence) { + case SEEK_CUR: + newPos += oldPos; + [[fallthrough]]; // Compler happy. Please fix it! + case SEEK_SET: + break; + default: + return -1L; + } + if (oldPos > newPos) + return -1L; + + const size_t bufsize = 1 << 12; + char buf[bufsize] = {0}; + for (i64 i = oldPos; i < newPos; i += bufsize) + OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); + return newPos; + } + + i64 RealSeek(i64 offset, int whence) { + OutputStream.Destroy(); + i64 ret = DoSeek(offset, whence, !!CompressedOutput); + if (ret != -1) + DoStreamOpen(DoCreateStream(), true); + return ret; + } + +protected: + IOutputStream* CreateStream(const TFile& file) override { + CompressedOutput.Reset(new TUnbufferedFileOutput(file)); + return DoCreateStream(); + } + inline IOutputStream* DoCreateStream() { + return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1); + } + THolder<IOutputStream> CompressedOutput; +}; + +class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip { +protected: + IOutputStream* CreateStream(const TFile& file) override { + CompressedOutput.Reset(new TUnbufferedFileOutput(file)); + return DoCreateStream(); + } + inline IOutputStream* DoCreateStream() { + return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000); + } +}; + +using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>; +using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>; + +template <class TVal> +class TOutZIndexFile: public TOutDatFileImpl< + TGzKey<TVal>, + TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> { + typedef TOutDatFileImpl< + TGzKey<TVal>, + TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> + TDatFile; + typedef TOutZIndexFile<TVal> TMyType; + typedef TGzKey<TVal> TGzVal; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TIndexer TIndexer; + +public: + TOutZIndexFile() { + TotalRecNum = 0; + TIndexer::SetCallback(this, DispatchCallback); + } + + int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { + int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); + if (ret) + return ret; + if ((ret = TRecIter::GotoPage(1))) + TDatFile::Close(); + return ret; + } + + int Close() { + TPageIter::Unfreeze(); + if (TRecIter::RecNum) + NextPage(TPageIter::Current()); + int ret = 0; + if (Index0.size() && !(ret = TRecIter::GotoPage(0))) { + typename std::vector<TGzVal>::iterator it, end = Index0.end(); + for (it = Index0.begin(); it != end; ++it) + TRecIter::Push(&*it); + ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); + } + Index0.clear(); + int ret1 = TDatFile::Close(); + return ret ? ret : ret1; + } + +protected: + int TotalRecNum; // should be enough because we have GotoPage(int) + std::vector<TGzVal> Index0; + + void NextPage(const TDatPage* page) { + TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage)); + Index0.push_back(TGzVal(TotalRecNum, rec->Key)); + TotalRecNum += TRecIter::RecNum; + } + + static void DispatchCallback(void* This, const TDatPage* page) { + ((TMyType*)This)->NextPage(page); + } +}; + +template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile> +class TOutDirectCompressedFileImpl: public TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> { + typedef TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> + TDatFile; + typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TIndexer TIndexer; + typedef TGzKey<TKey> TMyKey; + typedef TOutZIndexFile<TKey> TKeyFile; + +protected: + using TDatFile::Tell; + +public: + TOutDirectCompressedFileImpl() { + TIndexer::SetCallback(this, DispatchCallback); + } + + int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) { + char iname[FILENAME_MAX]; + int ret; + if (ipagesize == 0) + ipagesize = pagesize; + + ret = TDatFile::Open(fname, pagesize, 1, 1); + ret = ret ? ret : DatNameToIdx(iname, fname); + ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1); + if (ret) + TDatFile::Close(); + return ret; + } + + int Close() { + if (TRecIter::RecNum) + NextPage(TPageIter::Current()); + int ret = KeyFile.Close(); + int ret1 = TDatFile::Close(); + return ret1 ? ret1 : ret; + } + + int GetError() const { + return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); + } + +protected: + TKeyFile KeyFile; + + void NextPage(const TDatPage* page) { + size_t sz = SizeOf((TMyKey*)NULL); + TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>()); + if (rec) { + rec->Offset = Tell(); + rec->Key = *(TVal*)((char*)page + sizeof(TDatPage)); + KeyFile.ResetDat(); + } + } + + static void DispatchCallback(void* This, const TDatPage* page) { + ((TMyType*)This)->NextPage(page); + } +}; + +template <class TKey> +int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) { + if (Error) + return Error; + + Eof = 0; + + i64 offset = KeyFile.FindPage(pageno); + if (!offset) + return Error = MBDB_BAD_FILE_SIZE; + + if (offset != FileManip.RealSeek(offset, SEEK_SET)) + Error = MBDB_BAD_FILE_SIZE; + + return Error; +} + +template <typename TVal> +class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> { +public: + TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1) + : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes) + { + } +}; + +template <typename TVal> +class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> { +public: + TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes) + { + } +}; + +template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile> +class TOutDirectCompressedFile: protected TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> { + typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase; + +public: + TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0) + : Name(strdup(name)) + , PageSize(pagesize) + , IdxPageSize(ipagesize) + { + } + + ~TOutDirectCompressedFile() { + Close(); + free(Name); + Name = NULL; + } + + void Open(const char* fname) { + int ret = TBase::Open(fname, PageSize, IdxPageSize); + if (ret) + ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); + free(Name); + Name = strdup(fname); + } + + void Close() { + int ret; + if ((ret = TBase::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); + if ((ret = TBase::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); + } + + const char* GetName() const { + return Name; + } + + using TBase::Freeze; + using TBase::Push; + using TBase::Reserve; + using TBase::Unfreeze; + +protected: + char* Name; + size_t PageSize, IdxPageSize; +}; + +class TCompressedInterFileTypes { +public: + typedef TCompressedBufferedOutputPageFile TOutPageFile; + typedef TCompressedBufferedInputPageFile TInPageFile; +}; diff --git a/library/cpp/microbdb/extinfo.h b/library/cpp/microbdb/extinfo.h new file mode 100644 index 0000000000..c8389e783c --- /dev/null +++ b/library/cpp/microbdb/extinfo.h @@ -0,0 +1,127 @@ +#pragma once + +#include "header.h" + +#include <library/cpp/packedtypes/longs.h> + +#include <util/generic/typetraits.h> + +#include <library/cpp/microbdb/noextinfo.pb.h> + +inline bool operator<(const TNoExtInfo&, const TNoExtInfo&) { + return false; +} + +namespace NMicroBDB { + Y_HAS_MEMBER(TExtInfo); + + template <class, bool> + struct TSelectExtInfo; + + template <class T> + struct TSelectExtInfo<T, false> { + typedef TNoExtInfo TExtInfo; + }; + + template <class T> + struct TSelectExtInfo<T, true> { + typedef typename T::TExtInfo TExtInfo; + }; + + template <class T> + class TExtInfoType { + public: + static const bool Exists = THasTExtInfo<T>::value; + typedef typename TSelectExtInfo<T, Exists>::TExtInfo TResult; + }; + + Y_HAS_MEMBER(MakeExtKey); + + template <class, class, bool> + struct TSelectMakeExtKey; + + template <class TVal, class TKey> + struct TSelectMakeExtKey<TVal, TKey, false> { + static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult*, const TVal* from, const typename TExtInfoType<TVal>::TResult*) { + *to = *from; + } + }; + + template <class TVal, class TKey> + struct TSelectMakeExtKey<TVal, TKey, true> { + static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { + TVal::MakeExtKey(to, toExt, from, fromExt); + } + }; + + template <typename T> + inline size_t SizeOfExt(const T* rec, size_t* /*out*/ extLenSize = nullptr, size_t* /*out*/ extSize = nullptr) { + if (!TExtInfoType<T>::Exists) { + if (extLenSize) + *extLenSize = 0; + if (extSize) + *extSize = 0; + return SizeOf(rec); + } else { + size_t sz = SizeOf(rec); + i64 l; + int els = in_long(l, (const char*)rec + sz); + if (extLenSize) + *extLenSize = static_cast<size_t>(els); + if (extSize) + *extSize = static_cast<size_t>(l); + return sz; + } + } + + template <class T> + bool GetExtInfo(const T* rec, typename TExtInfoType<T>::TResult* extInfo) { + Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); + if (!rec) + return false; + size_t els; + size_t es; + size_t s = SizeOfExt(rec, &els, &es); + const ui8* raw = (const ui8*)rec + s + els; + return extInfo->ParseFromArray(raw, es); + } + + template <class T> + const ui8* GetExtInfoRaw(const T* rec, size_t* len) { + Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); + if (!rec) { + *len = 0; + return nullptr; + } + size_t els; + size_t es; + size_t s = SizeOfExt(rec, &els, &es); + *len = els + es; + return (const ui8*)rec + s; + } + + // Compares serialized extInfo (e.g. for stable sort) + template <class T> + int CompareExtInfo(const T* a, const T* b) { + Y_VERIFY(TExtInfoType<T>::Exists, "CompareExtInfo should only be used with extended records"); + size_t elsA, esA; + size_t elsB, esB; + SizeOfExt(a, &elsA, &esA); + SizeOfExt(a, &elsB, &esB); + if (esA != esB) + return esA - esB; + else + return memcmp((const ui8*)a + elsA, (const ui8*)b + elsB, esA); + } + +} + +using NMicroBDB::TExtInfoType; + +template <class TVal, class TKey> +struct TMakeExtKey { + static const bool Exists = NMicroBDB::THasMakeExtKey<TVal>::value; + static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { + NMicroBDB::TSelectMakeExtKey<TVal, TKey, Exists>::Make(to, toExt, from, fromExt); + } +}; diff --git a/library/cpp/microbdb/file.cpp b/library/cpp/microbdb/file.cpp new file mode 100644 index 0000000000..599a7301a0 --- /dev/null +++ b/library/cpp/microbdb/file.cpp @@ -0,0 +1,220 @@ +#include "file.h" + +#include <fcntl.h> +#include <errno.h> +#include <sys/stat.h> + +#ifdef _win32_ +#define S_ISREG(x) !!(x & S_IFREG) +#endif + +TFileManipBase::TFileManipBase() + : FileBased(true) +{ +} + +i64 TFileManipBase::DoSeek(i64 offset, int whence, bool isStreamOpen) { + if (!isStreamOpen) + return -1; + VerifyRandomAccess(); + return File.Seek(offset, (SeekDir)whence); +} + +int TFileManipBase::DoFileOpen(const TFile& file) { + File = file; + SetFileBased(IsFileBased()); + return (File.IsOpen()) ? 0 : MBDB_OPEN_ERROR; +} + +int TFileManipBase::DoFileClose() { + if (File.IsOpen()) { + File.Close(); + return MBDB_ALREADY_INITIALIZED; + } + return 0; +} + +int TFileManipBase::IsFileBased() const { + bool fileBased = true; +#if defined(_win_) +#elif defined(_unix_) + FHANDLE h = File.GetHandle(); + struct stat sb; + fileBased = false; + if (h != INVALID_FHANDLE && !::fstat(h, &sb) && S_ISREG(sb.st_mode)) { + fileBased = true; + } +#else +#error +#endif + return fileBased; +} + +TInputFileManip::TInputFileManip() + : InputStream(nullptr) +{ +} + +int TInputFileManip::Open(const char* fname, bool direct) { + int ret; + return (ret = DoClose()) ? ret : DoStreamOpen(TFile(fname, RdOnly | (direct ? DirectAligned : EOpenMode()))); +} + +int TInputFileManip::Open(IInputStream& input) { + int ret; + return (ret = DoClose()) ? ret : DoStreamOpen(&input); +} + +int TInputFileManip::Open(TAutoPtr<IInputStream> input) { + int ret; + return (ret = DoClose()) ? ret : DoStreamOpen(input.Release()); +} + +int TInputFileManip::Init(const TFile& file) { + int ret; + if (ret = DoClose()) + return ret; + DoStreamOpen(file); + return 0; +} + +int TInputFileManip::Close() { + DoClose(); + return 0; +} + +ssize_t TInputFileManip::Read(void* buf, unsigned len) { + if (!IsStreamOpen()) + return -1; + return InputStream->Load(buf, len); +} + +IInputStream* TInputFileManip::CreateStream(const TFile& file) { + return new TUnbufferedFileInput(file); +} + +TMappedInputPageFile::TMappedInputPageFile() + : Pagesize(0) + , Error(0) + , Pagenum(0) + , Recordsig(0) + , Open(false) +{ + Term(); +} + +TMappedInputPageFile::~TMappedInputPageFile() { + Term(); +} + +int TMappedInputPageFile::Init(const char* fname, ui32 recsig, ui32* gotRecordSig, bool) { + Mappedfile.init(fname); + Open = true; + + TDatMetaPage* meta = (TDatMetaPage*)Mappedfile.getData(); + if (gotRecordSig) + *gotRecordSig = meta->RecordSig; + + if (meta->MetaSig != METASIG) + Error = MBDB_BAD_METAPAGE; + else if (meta->RecordSig != recsig) + Error = MBDB_BAD_RECORDSIG; + + if (Error) { + Mappedfile.term(); + return Error; + } + + size_t fsize = Mappedfile.getSize(); + if (fsize < METASIZE) + return Error = MBDB_BAD_FILE_SIZE; + fsize -= METASIZE; + if (fsize % meta->PageSize) + return Error = MBDB_BAD_FILE_SIZE; + Pagenum = (int)(fsize / meta->PageSize); + Pagesize = meta->PageSize; + Recordsig = meta->RecordSig; + Error = 0; + return Error; +} + +int TMappedInputPageFile::Term() { + Mappedfile.term(); + Open = false; + return 0; +} + +TOutputFileManip::TOutputFileManip() + : OutputStream(nullptr) +{ +} + +int TOutputFileManip::Open(const char* fname, EOpenMode mode) { + if (IsStreamOpen()) { + return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip + } + + try { + if (unlink(fname) && errno != ENOENT) { + if (strncmp(fname, "/dev/std", 8)) + return MBDB_OPEN_ERROR; + } + TFile file(fname, mode); + DoStreamOpen(file); + } catch (const TFileError&) { + return MBDB_OPEN_ERROR; + } + return 0; +} + +int TOutputFileManip::Open(IOutputStream& output) { + if (IsStreamOpen()) + return MBDB_ALREADY_INITIALIZED; + DoStreamOpen(&output); + return 0; +} + +int TOutputFileManip::Open(TAutoPtr<IOutputStream> output) { + if (IsStreamOpen()) + return MBDB_ALREADY_INITIALIZED; + DoStreamOpen(output.Release()); + return 0; +} + +int TOutputFileManip::Init(const TFile& file) { + if (IsStreamOpen()) + return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip + DoStreamOpen(file); + return 0; +} + +int TOutputFileManip::Rotate(const char* newfname) { + if (!IsStreamOpen()) { + return MBDB_NOT_INITIALIZED; + } + + try { + TFile file(newfname, WrOnly | OpenAlways | TruncExisting | ARW | AWOther); + DoClose(); + DoStreamOpen(file); + } catch (const TFileError&) { + return MBDB_OPEN_ERROR; + } + return 0; +} + +int TOutputFileManip::Close() { + DoClose(); + return 0; +} + +int TOutputFileManip::Write(const void* buf, unsigned len) { + if (!IsStreamOpen()) + return -1; + OutputStream->Write(buf, len); + return len; +} + +IOutputStream* TOutputFileManip::CreateStream(const TFile& file) { + return new TUnbufferedFileOutput(file); +} diff --git a/library/cpp/microbdb/file.h b/library/cpp/microbdb/file.h new file mode 100644 index 0000000000..f7c7818375 --- /dev/null +++ b/library/cpp/microbdb/file.h @@ -0,0 +1,225 @@ +#pragma once + +#include "header.h" + +#include <library/cpp/deprecated/mapped_file/mapped_file.h> + +#include <util/generic/noncopyable.h> +#include <util/stream/file.h> +#include <util/system/filemap.h> + +#define FS_BLOCK_SIZE 512 + +class TFileManipBase { +protected: + TFileManipBase(); + + virtual ~TFileManipBase() { + } + + i64 DoSeek(i64 offset, int whence, bool isStreamOpen); + + int DoFileOpen(const TFile& file); + + int DoFileClose(); + + int IsFileBased() const; + + inline void SetFileBased(bool fileBased) { + FileBased = fileBased; + } + + inline i64 DoGetPosition() const { + Y_ASSERT(FileBased); + return File.GetPosition(); + } + + inline i64 DoGetLength() const { + return (FileBased) ? File.GetLength() : -1; + } + + inline void VerifyRandomAccess() const { + Y_VERIFY(FileBased, "non-file stream can not be accessed randomly"); + } + + inline i64 GetPosition() const { + return (i64)File.GetPosition(); + } + +private: + TFile File; + bool FileBased; +}; + +class TInputFileManip: public TFileManipBase { +public: + using TFileManipBase::GetPosition; + + TInputFileManip(); + + int Open(const char* fname, bool direct = false); + + int Open(IInputStream& input); + + int Open(TAutoPtr<IInputStream> input); + + int Init(const TFile& file); + + int Close(); + + ssize_t Read(void* buf, unsigned len); + + inline bool IsOpen() const { + return IsStreamOpen(); + } + + inline i64 GetLength() const { + return DoGetLength(); + } + + inline i64 Seek(i64 offset, int whence) { + return DoSeek(offset, whence, IsStreamOpen()); + } + + inline i64 RealSeek(i64 offset, int whence) { + return Seek(offset, whence); + } + +protected: + inline bool IsStreamOpen() const { + return !!InputStream; + } + + inline int DoStreamOpen(IInputStream* input, bool fileBased = false) { + InputStream.Reset(input); + SetFileBased(fileBased); + return 0; + } + + inline int DoStreamOpen(const TFile& file) { + int ret; + return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), IsFileBased()); + } + + virtual IInputStream* CreateStream(const TFile& file); + + inline bool DoClose() { + if (IsStreamOpen()) { + InputStream.Destroy(); + return DoFileClose(); + } + return 0; + } + + THolder<IInputStream> InputStream; +}; + +class TMappedInputPageFile: private TNonCopyable { +public: + TMappedInputPageFile(); + + ~TMappedInputPageFile(); + + inline int GetError() const { + return Error; + } + + inline size_t GetPageSize() const { + return Pagesize; + } + + inline int GetLastPage() const { + return Pagenum; + } + + inline ui32 GetRecordSig() const { + return Recordsig; + } + + inline bool IsOpen() const { + return Open; + } + + inline char* GetData() const { + return Open ? (char*)Mappedfile.getData() : nullptr; + } + + inline size_t GetSize() const { + return Open ? Mappedfile.getSize() : 0; + } + +protected: + int Init(const char* fname, ui32 recsig, ui32* gotRecordSig = nullptr, bool direct = false); + + int Term(); + + TMappedFile Mappedfile; + size_t Pagesize; + int Error; + int Pagenum; + ui32 Recordsig; + bool Open; +}; + +class TOutputFileManip: public TFileManipBase { +public: + TOutputFileManip(); + + int Open(const char* fname, EOpenMode mode = WrOnly | CreateAlways | ARW | AWOther); + + int Open(IOutputStream& output); + + int Open(TAutoPtr<IOutputStream> output); + + int Init(const TFile& file); + + int Rotate(const char* newfname); + + int Write(const void* buf, unsigned len); + + int Close(); + + inline bool IsOpen() const { + return IsStreamOpen(); + } + + inline i64 GetLength() const { + return DoGetLength(); + } + + inline i64 Seek(i64 offset, int whence) { + return DoSeek(offset, whence, IsStreamOpen()); + } + + inline i64 RealSeek(i64 offset, int whence) { + return Seek(offset, whence); + } + +protected: + inline bool IsStreamOpen() const { + return !!OutputStream; + } + + inline int DoStreamOpen(IOutputStream* output, bool fileBased = false) { + OutputStream.Reset(output); + SetFileBased(fileBased); + return 0; + } + + inline int DoStreamOpen(const TFile& file) { + int ret; + return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), true); + } + + virtual IOutputStream* CreateStream(const TFile& file); + + inline bool DoClose() { + if (IsStreamOpen()) { + OutputStream.Destroy(); + return DoFileClose(); + } + return 0; + } + + THolder<IOutputStream> OutputStream; +}; diff --git a/library/cpp/microbdb/hashes.h b/library/cpp/microbdb/hashes.h new file mode 100644 index 0000000000..bfd113c3ba --- /dev/null +++ b/library/cpp/microbdb/hashes.h @@ -0,0 +1,250 @@ +#pragma once + +#include <library/cpp/on_disk/st_hash/static_hash.h> +#include <util/system/sysstat.h> +#include <util/stream/mem.h> +#include <util/string/printf.h> +#include <library/cpp/deprecated/fgood/fgood.h> + +#include "safeopen.h" + +/** This file currently implements creation of mappable read-only hash file. + Basic usage of these "static hashes" is defined in util/static_hash.h (see docs there). + Additional useful wrappers are available in util/static_hash_map.h + + There are two ways to create mappable hash file: + + A) Fill an THashMap/set structure in RAM, then dump it to disk. + This is usually done by save_hash_to_file* functions defined in static_hash.h + (see description in static_hash.h). + + B) Prepare all data using external sorter, then create hash file straight on disk. + This approach is necessary when there isn't enough RAM to hold entire original THashMap. + Implemented in this file as TStaticHashBuilder class. + + Current implementation's major drawback is that the size of the hash must be estimated + before the hash is built (bucketCount), which is not always possible. + Separate implementation with two sort passes is yet to be done. + + Another problem is that maximum stored size of the element (maxRecSize) must also be + known in advance, because we use TDatSorterMemo, etc. + */ + +template <class SizeType> +struct TSthashTmpRec { + SizeType HashVal; + SizeType RecSize; + char Buf[1]; + size_t SizeOf() const { + return &Buf[RecSize] - (char*)this; + } + bool operator<(const TSthashTmpRec& than) const { + return HashVal < than.HashVal; + } + static const ui32 RecordSig = 20100124 + sizeof(SizeType) - 4; +}; + +template <typename T> +struct TReplaceMerger { + T operator()(const T& oldRecord, const T& newRecord) const { + Y_UNUSED(oldRecord); + return newRecord; + } +}; + +/** TStaticHashBuilder template parameters: + HashType - THashMap map/set type for which we construct corresponding mappable hash; + SizeType - type used to store offsets and length in resulting hash; + MergerType - type of object to process records with equal key (see TReplaceMerger for example); + */ + +template <class HashType, class SizeType, class MergerType = TReplaceMerger<typename HashType::mapped_type>> +struct TStaticHashBuilder { + const size_t SrtIOPageSz; + const size_t WrBufSz; + typedef TSthashTmpRec<SizeType> TIoRec; + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, SizeType> TKeySaver; + typedef typename HashType::value_type TValueType; + typedef typename HashType::mapped_type TMappedType; + typedef typename HashType::key_type TKeyType; + + TDatSorterMemo<TIoRec, TCompareByLess> Srt; + TBuffer IoRec, CurrentBlockRecs; + TKeySaver KeySaver; + typename HashType::hasher Hasher; + typename HashType::key_equal Equals; + MergerType merger; + TString HashFileName; + TString OurTmpDir; + size_t BucketCount; + int FreeBits; + + // memSz is the Sorter buffer size; + // maxRecSize is the maximum size (as reported by size_for_st) of our record(s) + TStaticHashBuilder(size_t memSz, size_t maxRecSize) + : SrtIOPageSz((maxRecSize * 16 + 65535) & ~size_t(65535)) + , WrBufSz(memSz / 16 >= SrtIOPageSz ? memSz / 16 : SrtIOPageSz) + , Srt("unused", memSz, SrtIOPageSz, WrBufSz, 0) + , IoRec(sizeof(TIoRec) + maxRecSize) + , CurrentBlockRecs(sizeof(TIoRec) + maxRecSize) + , BucketCount(0) + , FreeBits(0) + { + } + + ~TStaticHashBuilder() { + Close(); + } + + // if tmpDir is supplied, it must exist; + // bucketCount should be HashBucketCount() of the (estimated) element count + void Open(const char* fname, size_t bucketCount, const char* tmpDir = nullptr) { + if (!tmpDir) + tmpDir = ~(OurTmpDir = Sprintf("%s.temp", fname)); + Mkdir(tmpDir, MODE0775); + Srt.Open(tmpDir); + HashFileName = fname; + BucketCount = bucketCount; + int bitCount = 0; + while (((size_t)1 << bitCount) <= BucketCount && bitCount < int(8 * sizeof(size_t))) + ++bitCount; + FreeBits = 8 * sizeof(size_t) - bitCount; + } + + void Push(const TValueType& rec) { + TIoRec* ioRec = MakeIoRec(rec); + Srt.Push(ioRec); + } + TIoRec* MakeIoRec(const TValueType& rec) { + TIoRec* ioRec = (TIoRec*)IoRec.Data(); + size_t mask = (1 << FreeBits) - 1; + size_t hash = Hasher(rec.first); + ioRec->HashVal = ((hash % BucketCount) << FreeBits) + ((hash / BucketCount) & mask); + + TMemoryOutput output(ioRec->Buf, IoRec.Capacity() - offsetof(TIoRec, Buf)); + KeySaver.SaveRecord(&output, rec); + ioRec->RecSize = output.Buf() - ioRec->Buf; + return ioRec; + } + + bool Merge(TVector<std::pair<TKeyType, TMappedType>>& records, size_t newRecordSize) { + TSthashIterator<const TKeyType, const TMappedType, typename HashType::hasher, + typename HashType::key_equal> + newPtr(CurrentBlockRecs.End() - newRecordSize); + for (size_t i = 0; i < records.size(); ++i) { + if (newPtr.KeyEquals(Equals, records[i].first)) { + TMappedType oldValue = records[i].second; + TMappedType newValue = newPtr.Value(); + newValue = merger(oldValue, newValue); + records[i].second = newValue; + return true; + } + } + records.push_back(std::make_pair(newPtr.Key(), newPtr.Value())); + return false; + } + + void PutRecord(const char* buf, size_t rec_size, TFILEPtr& f, SizeType& cur_off) { + f.fsput(buf, rec_size); + cur_off += rec_size; + } + + void Finish() { + Srt.Sort(); + // We use variant 1. + // Variant 1: read sorter once, write records, fseeks to write buckets + // (this doesn't allow fname to be stdout) + // Variant 2: read sorter (probably temp. file) twice: write buckets, then write records + // (this allows fname to be stdout but seems to be longer) + TFILEPtr f(HashFileName, "wb"); + setvbuf(f, nullptr, _IOFBF, WrBufSz); + TVector<SizeType> bucketsBuf(WrBufSz, 0); + // prepare header (note: this code must be unified with save_stl.h) + typedef sthashtable_nvm_sv<typename HashType::hasher, typename HashType::key_equal, SizeType> sv_type; + sv_type sv = {Hasher, Equals, BucketCount, 0, 0}; + // to do: m.b. use just the size of corresponding object? + SizeType cur_off = sizeof(sv_type) + + (sv.num_buckets + 1) * sizeof(SizeType); + SizeType bkt_wroff = sizeof(sv_type), bkt_bufpos = 0, prev_bkt = 0, prev_hash = (SizeType)-1; + bucketsBuf[bkt_bufpos++] = cur_off; + // if might me better to write many zeroes here + f.seek(cur_off, SEEK_SET); + TVector<std::pair<TKeyType, TMappedType>> currentBlock; + bool emptyFile = true; + size_t prevRecSize = 0; + // seek forward + while (true) { + const TIoRec* rec = Srt.Next(); + if (currentBlock.empty() && !emptyFile) { + if (rec && prev_hash == rec->HashVal) { + Merge(currentBlock, prevRecSize); + } else { + // if there is only one record with this hash, don't recode it, just write + PutRecord(CurrentBlockRecs.Data(), prevRecSize, f, cur_off); + sv.num_elements++; + } + } + if (!rec || prev_hash != rec->HashVal) { + // write buckets table + for (size_t i = 0; i < currentBlock.size(); ++i) { + TIoRec* ioRec = MakeIoRec(TValueType(currentBlock[i])); + PutRecord(ioRec->Buf, ioRec->RecSize, f, cur_off); + } + sv.num_elements += currentBlock.size(); + currentBlock.clear(); + CurrentBlockRecs.Clear(); + if (rec) { + prev_hash = rec->HashVal; + } + } + // note: prev_bkt's semantics here is 'cur_bkt - 1', thus we are actually cycling + // until cur_bkt == rec->HashVal *inclusively* + while (!rec || prev_bkt != (rec->HashVal >> FreeBits)) { + bucketsBuf[bkt_bufpos++] = cur_off; + if (bkt_bufpos == bucketsBuf.size()) { + f.seek(bkt_wroff, SEEK_SET); + size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); + if (f.write(bucketsBuf.begin(), 1, sz) != sz) + throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; + bkt_wroff += sz; + bkt_bufpos = 0; + f.seek(cur_off, SEEK_SET); + } + prev_bkt++; + if (!rec) { + break; + } + assert(prev_bkt < BucketCount); + } + if (!rec) { + break; + } + emptyFile = false; + CurrentBlockRecs.Append(rec->Buf, rec->RecSize); + if (!currentBlock.empty()) { + Merge(currentBlock, rec->RecSize); + } else { + prevRecSize = rec->RecSize; + } + } + // finish buckets table + f.seek(bkt_wroff, SEEK_SET); + size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); + if (sz && f.write(bucketsBuf.begin(), 1, sz) != sz) + throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; + bkt_wroff += sz; + for (; prev_bkt < BucketCount; prev_bkt++) + f.fput(cur_off); + // finally write header + sv.data_end_off = cur_off; + f.seek(0, SEEK_SET); + f.fput(sv); + f.close(); + } + + void Close() { + Srt.Close(); + if (+OurTmpDir) + rmdir(~OurTmpDir); + } +}; diff --git a/library/cpp/microbdb/header.cpp b/library/cpp/microbdb/header.cpp new file mode 100644 index 0000000000..f4511d6fb6 --- /dev/null +++ b/library/cpp/microbdb/header.cpp @@ -0,0 +1,91 @@ +#include "header.h" + +#include <util/stream/output.h> +#include <util/stream/format.h> + +TString ToString(EMbdbErrors error) { + TString ret; + switch (error) { + case MBDB_ALREADY_INITIALIZED: + ret = "already initialized"; + break; + case MBDB_NOT_INITIALIZED: + ret = "not initialized"; + break; + case MBDB_BAD_DESCRIPTOR: + ret = "bad descriptor"; + break; + case MBDB_OPEN_ERROR: + ret = "open error"; + break; + case MBDB_READ_ERROR: + ret = "read error"; + break; + case MBDB_WRITE_ERROR: + ret = "write error"; + break; + case MBDB_CLOSE_ERROR: + ret = "close error"; + break; + case MBDB_EXPECTED_EOF: + ret = "expected eof"; + break; + case MBDB_UNEXPECTED_EOF: + ret = "unxepected eof"; + break; + case MBDB_BAD_FILENAME: + ret = "bad filename"; + break; + case MBDB_BAD_METAPAGE: + ret = "bad metapage"; + break; + case MBDB_BAD_RECORDSIG: + ret = "bad recordsig"; + break; + case MBDB_BAD_FILE_SIZE: + ret = "bad file size"; + break; + case MBDB_BAD_PAGESIG: + ret = "bad pagesig"; + break; + case MBDB_BAD_PAGESIZE: + ret = "bad pagesize"; + break; + case MBDB_BAD_PARM: + ret = "bad parm"; + break; + case MBDB_BAD_SYNC: + ret = "bad sync"; + break; + case MBDB_PAGE_OVERFLOW: + ret = "page overflow"; + break; + case MBDB_NO_MEMORY: + ret = "no memory"; + break; + case MBDB_MEMORY_LEAK: + ret = "memory leak"; + break; + case MBDB_NOT_SUPPORTED: + ret = "not supported"; + break; + default: + ret = "unknown"; + break; + } + return ret; +} + +TString ErrorMessage(int error, const TString& text, const TString& path, ui32 recordSig, ui32 gotRecordSig) { + TStringStream str; + str << text; + if (path.size()) + str << " '" << path << "'"; + str << ": " << ToString(static_cast<EMbdbErrors>(error)); + if (recordSig && (!gotRecordSig || recordSig != gotRecordSig)) + str << ". Expected RecordSig: " << Hex(recordSig, HF_ADDX); + if (recordSig && gotRecordSig && recordSig != gotRecordSig) + str << ", got: " << Hex(gotRecordSig, HF_ADDX); + str << ". Last system error text: " << LastSystemErrorText(); + return str.Str(); +} diff --git a/library/cpp/microbdb/header.h b/library/cpp/microbdb/header.h new file mode 100644 index 0000000000..0951d610ea --- /dev/null +++ b/library/cpp/microbdb/header.h @@ -0,0 +1,159 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/generic/typetraits.h> +#include <util/generic/string.h> +#include <util/str_stl.h> + +#include <stdio.h> + +#define METASIZE (1u << 12) +#define METASIG 0x12345678u +#define PAGESIG 0x87654321u + +enum EMbdbErrors { + MBDB_ALREADY_INITIALIZED = 200, + MBDB_NOT_INITIALIZED = 201, + MBDB_BAD_DESCRIPTOR = 202, + MBDB_OPEN_ERROR = 203, + MBDB_READ_ERROR = 204, + MBDB_WRITE_ERROR = 205, + MBDB_CLOSE_ERROR = 206, + MBDB_EXPECTED_EOF = 207, + MBDB_UNEXPECTED_EOF = 208, + MBDB_BAD_FILENAME = 209, + MBDB_BAD_METAPAGE = 210, + MBDB_BAD_RECORDSIG = 211, + MBDB_BAD_FILE_SIZE = 212, + MBDB_BAD_PAGESIG = 213, + MBDB_BAD_PAGESIZE = 214, + MBDB_BAD_PARM = 215, + MBDB_BAD_SYNC = 216, + MBDB_PAGE_OVERFLOW = 217, + MBDB_NO_MEMORY = 218, + MBDB_MEMORY_LEAK = 219, + MBDB_NOT_SUPPORTED = 220 +}; + +TString ToString(EMbdbErrors error); +TString ErrorMessage(int error, const TString& text, const TString& path = TString(), ui32 recordSig = 0, ui32 gotRecordSig = 0); + +enum EPageFormat { + MBDB_FORMAT_RAW = 0, + MBDB_FORMAT_COMPRESSED = 1, + MBDB_FORMAT_NULL = 255 +}; + +enum ECompressionAlgorithm { + MBDB_COMPRESSION_ZLIB = 1, + MBDB_COMPRESSION_FASTLZ = 2, + MBDB_COMPRESSION_SNAPPY = 3 +}; + +struct TDatMetaPage { + ui32 MetaSig; + ui32 RecordSig; + ui32 PageSize; +}; + +struct TDatPage { + ui32 RecNum; //!< number of records on this page + ui32 PageSig; + ui32 Format : 2; //!< one of EPageFormat + ui32 Reserved : 30; +}; + +/// Additional page header with compression info +struct TCompressedPage { + ui32 BlockCount; + ui32 Algorithm : 4; + ui32 Version : 4; + ui32 Reserved : 24; +}; + +namespace NMicroBDB { + /// Header of compressed block + struct TCompressedHeader { + ui32 Compressed; + ui32 Original; /// original size of block + ui32 Count; /// number of records in block + ui32 Reserved; + }; + + Y_HAS_MEMBER(AssertValid); + + template <typename T, bool TVal> + struct TAssertValid { + void operator()(const T*) { + } + }; + + template <typename T> + struct TAssertValid<T, true> { + void operator()(const T* rec) { + return rec->AssertValid(); + } + }; + + template <typename T> + void AssertValid(const T* rec) { + return NMicroBDB::TAssertValid<T, NMicroBDB::THasAssertValid<T>::value>()(rec); + } + + Y_HAS_MEMBER(SizeOf); + + template <typename T, bool TVal> + struct TGetSizeOf; + + template <typename T> + struct TGetSizeOf<T, true> { + size_t operator()(const T* rec) { + return rec->SizeOf(); + } + }; + + template <typename T> + struct TGetSizeOf<T, false> { + size_t operator()(const T*) { + return sizeof(T); + } + }; + + inline char* GetFirstRecord(const TDatPage* page) { + switch (page->Format) { + case MBDB_FORMAT_RAW: + return (char*)page + sizeof(TDatPage); + case MBDB_FORMAT_COMPRESSED: + // Первая запись на сжатой странице сохраняется несжатой + // сразу же после всех заголовков. + // Алгоритм сохранения смотреть в TOutputRecordIterator::FlushBuffer + return (char*)page + sizeof(TDatPage) + sizeof(TCompressedPage) + sizeof(NMicroBDB::TCompressedHeader); + } + return (char*)nullptr; + } +} + +template <typename T> +size_t SizeOf(const T* rec) { + return NMicroBDB::TGetSizeOf<T, NMicroBDB::THasSizeOf<T>::value>()(rec); +} + +template <typename T> +size_t MaxSizeOf() { + return sizeof(T); +} + +static inline int DatNameToIdx(char iname[/*FILENAME_MAX*/], const char* dname) { + if (!dname || !*dname) + return MBDB_BAD_FILENAME; + const char* ptr; + if (!(ptr = strrchr(dname, '/'))) + ptr = dname; + if (!(ptr = strrchr(ptr, '.'))) + ptr = strchr(dname, 0); + if (ptr - dname > FILENAME_MAX - 5) + return MBDB_BAD_FILENAME; + memcpy(iname, dname, ptr - dname); + strcpy(iname + (ptr - dname), ".idx"); + return 0; +} diff --git a/library/cpp/microbdb/heap.h b/library/cpp/microbdb/heap.h new file mode 100644 index 0000000000..ef5a53534c --- /dev/null +++ b/library/cpp/microbdb/heap.h @@ -0,0 +1,143 @@ +#pragma once + +#include "header.h" +#include "extinfo.h" + +#include <util/generic/vector.h> + +#include <errno.h> + +/////////////////////////////////////////////////////////////////////////////// + +/// Default comparator +template <class TVal> +struct TCompareByLess { + inline bool operator()(const TVal* a, const TVal* b) const { + return TLess<TVal>()(*a, *b); + } +}; + +/////////////////////////////////////////////////////////////////////////////// + +template <class TVal, class TIterator, class TCompare = TCompareByLess<TVal>> +class THeapIter { +public: + int Init(TIterator** iters, int count) { + Term(); + if (!count) + return 0; + if (!(Heap = (TIterator**)malloc(count * sizeof(TIterator*)))) + return ENOMEM; + + Count = count; + count = 0; + while (count < Count) + if (count && !(*iters)->Next()) { //here first TIterator is NOT initialized! + Count--; + iters++; + } else { + Heap[count++] = *iters++; + } + count = Count / 2; + while (--count > 0) //Heap[0] is not changed! + Sift(count, Count); //do not try to replace this code by make_heap + return 0; + } + + int Init(TIterator* iters, int count) { + TVector<TIterator*> a(count); + for (int i = 0; i < count; ++i) + a[i] = &iters[i]; + return Init(&a[0], count); + } + + THeapIter() + : Heap(nullptr) + , Count(0) + { + } + + THeapIter(TIterator* a, TIterator* b) + : Heap(nullptr) + , Count(0) + { + TIterator* arr[] = {a, b}; + if (Init(arr, 2)) + ythrow yexception() << "can't Init THeapIter"; + } + + THeapIter(TVector<TIterator>& v) + : Heap(nullptr) + , Count(0) + { + if (Init(&v[0], v.size())) { + ythrow yexception() << "can't Init THeapIter"; + } + } + + ~THeapIter() { + Term(); + } + + inline const TVal* Current() const { + if (!Count) + return nullptr; + return (*Heap)->Current(); + } + + inline const TIterator* CurrentIter() const { + return *Heap; + } + + //for ends of last file will use Heap[0] = Heap[0] ! and + //returns Current of eof so Current of eof MUST return NULL + //possible this is bug and need fixing + const TVal* Next() { + if (!Count) + return nullptr; + if (!(*Heap)->Next()) //on first call unitialized first TIterator + *Heap = Heap[--Count]; //will be correctly initialized + + if (Count == 2) { + if (TCompare()(Heap[1]->Current(), Heap[0]->Current())) + DoSwap(Heap[1], Heap[0]); + } else + Sift(0, Count); + + return Current(); + } + + inline bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { + return (*Heap)->GetExtInfo(extInfo); + } + + inline const ui8* GetExtInfoRaw(size_t* len) const { + return (*Heap)->GetExtInfoRaw(len); + } + + void Term() { + ::free(Heap); + Heap = nullptr; + Count = 0; + } + +protected: + void Sift(int node, int end) { + TIterator* x = Heap[node]; + int son; + for (son = 2 * node + 1; son < end; node = son, son = 2 * node + 1) { + if (son < (end - 1) && TCompare()(Heap[son + 1]->Current(), Heap[son]->Current())) + son++; + if (TCompare()(Heap[son]->Current(), x->Current())) + Heap[node] = Heap[son]; + else + break; + } + Heap[node] = x; + } + + TIterator** Heap; + int Count; +}; + +/////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/microbdb/input.h b/library/cpp/microbdb/input.h new file mode 100644 index 0000000000..13b12f28b7 --- /dev/null +++ b/library/cpp/microbdb/input.h @@ -0,0 +1,1027 @@ +#pragma once + +#include "header.h" +#include "file.h" +#include "reader.h" + +#include <util/system/maxlen.h> +#include <util/system/event.h> +#include <util/system/thread.h> + +#include <thread> + +#include <sys/uio.h> + +#include <errno.h> + +template <class TFileManip> +inline ssize_t Readv(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { + ssize_t read_count = 0; + for (int n = 0; n < iovcnt; n++) { + ssize_t last_read = fileManip.Read(iov[n].iov_base, iov[n].iov_len); + if (last_read < 0) + return -1; + read_count += last_read; + } + return read_count; +} + +template <class TVal, typename TBasePageIter> +class TInputRecordIterator: public TBasePageIter { + typedef THolder<NMicroBDB::IBasePageReader<TVal>> TReaderHolder; + +public: + typedef TBasePageIter TPageIter; + + TInputRecordIterator() { + Init(); + } + + ~TInputRecordIterator() { + Term(); + } + + const TVal* Current() const { + return Rec; + } + + bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { + if (!Rec) + return false; + return Reader->GetExtInfo(extInfo); + } + + const ui8* GetExtInfoRaw(size_t* len) const { + if (!Rec) + return nullptr; + return Reader->GetExtInfoRaw(len); + } + + size_t GetRecSize() const { + return Reader->GetRecSize(); + } + + size_t GetExtSize() const { + return Reader->GetExtSize(); + } + + const TVal* Next() { + if (RecNum) + --RecNum; + else { + TDatPage* page = TPageIter::Next(); + if (!page) { + if (TPageIter::IsFrozen() && Reader.Get()) + Reader->SetClearFlag(); + return Rec = nullptr; + } else if (!!SelectReader()) + return Rec = nullptr; + RecNum = TPageIter::Current()->RecNum - 1; + } + return Rec = Reader->Next(); + } + + // Skip(0) == Current(); Skip(1) == Next() + const TVal* Skip(int& num) { + // Y_ASSERT(num >= 0); ? otherwise it gets into infinite loop + while (num > RecNum) { + num -= RecNum + 1; + if (!TPageIter::Next() || !!SelectReader()) { + RecNum = 0; + return Rec = nullptr; + } + RecNum = TPageIter::Current()->RecNum - 1; + Rec = Reader->Next(); + } + ++num; + while (--num) + Next(); + return Rec; + } + + // begin reading from next page + void Reset() { + Rec = NULL; + RecNum = 0; + if (Reader.Get()) + Reader->Reset(); + } + +protected: + int Init() { + Rec = nullptr; + RecNum = 0; + Format = MBDB_FORMAT_NULL; + return 0; + } + + int Term() { + Reader.Reset(nullptr); + Format = MBDB_FORMAT_NULL; + Rec = nullptr; + RecNum = 0; + return 0; + } + + const TVal* GotoPage(int pageno) { + if (!TPageIter::GotoPage(pageno) || !!SelectReader()) + return Rec = nullptr; + RecNum = TPageIter::Current()->RecNum - 1; + return Rec = Reader->Next(); + } + + int SelectReader() { + if (!TPageIter::Current()) + return MBDB_UNEXPECTED_EOF; + if (ui32(Format) != TPageIter::Current()->Format) { + switch (TPageIter::Current()->Format) { + case MBDB_FORMAT_RAW: + Reader.Reset(new NMicroBDB::TRawPageReader<TVal, TPageIter>(this)); + break; + case MBDB_FORMAT_COMPRESSED: + Reader.Reset(new NMicroBDB::TCompressedReader<TVal, TPageIter>(this)); + break; + default: + return MBDB_NOT_SUPPORTED; + } + Format = EPageFormat(TPageIter::Current()->Format); + } else { + Y_ASSERT(Reader.Get() != nullptr); + Reader->Reset(); + } + return 0; + } + + const TVal* Rec; + TReaderHolder Reader; + int RecNum; //!< number of records on the current page after the current record + EPageFormat Format; +}; + +template <class TBaseReader> +class TInputPageIterator: public TBaseReader { +public: + typedef TBaseReader TReader; + + TInputPageIterator() + : Buf(nullptr) + { + Term(); + } + + ~TInputPageIterator() { + Term(); + } + + TDatPage* Current() { + return CurPage; + } + + int Freeze() { + return (Frozen = (PageNum == -1) ? 0 : PageNum); + } + + void Unfreeze() { + Frozen = -1; + } + + inline int IsFrozen() const { + return Frozen + 1; + } + + inline size_t GetPageSize() const { + return TReader::GetPageSize(); + } + + inline int GetPageNum() const { + return PageNum; + } + + inline int IsEof() const { + return Eof; + } + + TDatPage* Next() { + if (PageNum >= Maxpage && ReadBuf()) { + Eof = Eof ? Eof : TReader::IsEof(); + return CurPage = nullptr; + } + return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); + } + + TDatPage* GotoPage(int pageno) { + if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { + PageNum = pageno; + return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); + } + if (IsFrozen() || TReader::GotoPage(pageno)) + return nullptr; + Maxpage = PageNum = pageno - 1; + Eof = 0; + return Next(); + } + +protected: + int Init(size_t pages, int pagesOrBytes) { + Term(); + if (pagesOrBytes == -1) + Bufpages = TReader::GetLastPage(); + else if (pagesOrBytes) + Bufpages = pages; + else + Bufpages = pages / GetPageSize(); + if (!TReader::GetLastPage()) { + Bufpages = 0; + assert(Eof == 1); + return 0; + } + int lastPage = TReader::GetLastPage(); + if (lastPage >= 0) + Bufpages = (int)Min(lastPage, Bufpages); + Bufpages = Max(2, Bufpages); + Eof = 0; + ABuf.Alloc(Bufpages * GetPageSize()); + return (Buf = ABuf.Begin()) ? 0 : ENOMEM; + // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; + } + + int Term() { + // free(Buf); + ABuf.Dealloc(); + Buf = nullptr; + Maxpage = PageNum = Frozen = -1; + Bufpages = 0; + Pages = 0; + Eof = 1; + CurPage = nullptr; + return 0; + } + + int ReadBuf() { + int nvec; + iovec vec[2]; + int maxpage = (Frozen == -1 ? Maxpage + 1 : Frozen) + Bufpages - 1; + int minpage = Maxpage + 1; + if (maxpage < minpage) + return EAGAIN; + minpage %= Bufpages; + maxpage %= Bufpages; + if (maxpage < minpage) { + vec[0].iov_base = Buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (Bufpages - minpage); + vec[1].iov_base = Buf; + vec[1].iov_len = GetPageSize() * (maxpage + 1); + nvec = 2; + } else { + vec[0].iov_base = Buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); + nvec = 1; + } + TReader::ReadPages(vec, nvec, &Pages); + Maxpage += Pages; + return !Pages; + } + + int Maxpage, PageNum, Frozen, Bufpages, Eof, Pages; + TDatPage* CurPage; + // TMappedArray<char> ABuf; + TMappedAllocation ABuf; + char* Buf; +}; + +template <class TBaseReader> +class TInputPageIteratorMT: public TBaseReader { +public: + typedef TBaseReader TReader; + + TInputPageIteratorMT() + : CurBuf(0) + , CurReadBuf(0) + , Buf(nullptr) + { + Term(); + } + + ~TInputPageIteratorMT() { + Term(); + } + + TDatPage* Current() { + return CurPage; + } + + int Freeze() { + return (Frozen = (PageNum == -1) ? 0 : PageNum); + } + + void Unfreeze() { + Frozen = -1; + } + + inline int IsFrozen() const { + return Frozen + 1; + } + + inline size_t GetPageSize() const { + return TReader::GetPageSize(); + } + + inline int GetPageNum() const { + return PageNum; + } + + inline int IsEof() const { + return Eof; + } + + TDatPage* Next() { + if (Eof) + return CurPage = nullptr; + if (PageNum >= Maxpage && ReadBuf()) { + Eof = Eof ? Eof : TReader::IsEof(); + return CurPage = nullptr; + } + return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); + } + + TDatPage* GotoPage(int pageno) { + if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { + PageNum = pageno; + return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); + } + if (IsFrozen() || TReader::GotoPage(pageno)) + return nullptr; + Maxpage = PageNum = pageno - 1; + Eof = 0; + return Next(); + } + + void ReadPages() { + // fprintf(stderr, "ReadPages started\n"); + bool eof = false; + while (!eof) { + QEvent[CurBuf].Wait(); + if (Finish) + return; + int pages = ReadCurBuf(Bufs[CurBuf]); + PagesM[CurBuf] = pages; + eof = !pages; + AEvent[CurBuf].Signal(); + CurBuf ^= 1; + } + } + +protected: + int Init(size_t pages, int pagesOrBytes) { + Term(); + if (pagesOrBytes == -1) + Bufpages = TReader::GetLastPage(); + else if (pagesOrBytes) + Bufpages = pages; + else + Bufpages = pages / GetPageSize(); + if (!TReader::GetLastPage()) { + Bufpages = 0; + assert(Eof == 1); + return 0; + } + int lastPage = TReader::GetLastPage(); + if (lastPage >= 0) + Bufpages = (int)Min(lastPage, Bufpages); + Bufpages = Max(2, Bufpages); + Eof = 0; + ABuf.Alloc(Bufpages * GetPageSize() * 2); + Bufs[0] = ABuf.Begin(); + Bufs[1] = Bufs[0] + Bufpages * GetPageSize(); + // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; + Finish = false; + ReadThread = std::thread([this]() { + TThread::SetCurrentThreadName("DatReader"); + ReadPages(); + }); + QEvent[0].Signal(); + return Bufs[0] ? 0 : ENOMEM; + } + + void StopThread() { + Finish = true; + QEvent[0].Signal(); + QEvent[1].Signal(); + ReadThread.join(); + } + + int Term() { + // free(Buf); + if (ReadThread.joinable()) + StopThread(); + ABuf.Dealloc(); + Buf = nullptr; + Bufs[0] = nullptr; + Bufs[1] = nullptr; + Maxpage = MaxpageR = PageNum = Frozen = -1; + Bufpages = 0; + Pages = 0; + Eof = 1; + CurPage = nullptr; + return 0; + } + + int ReadCurBuf(char* buf) { + int nvec; + iovec vec[2]; + int maxpage = (Frozen == -1 ? MaxpageR + 1 : Frozen) + Bufpages - 1; + int minpage = MaxpageR + 1; + if (maxpage < minpage) + return EAGAIN; + minpage %= Bufpages; + maxpage %= Bufpages; + if (maxpage < minpage) { + vec[0].iov_base = buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (Bufpages - minpage); + vec[1].iov_base = buf; + vec[1].iov_len = GetPageSize() * (maxpage + 1); + nvec = 2; + } else { + vec[0].iov_base = buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); + nvec = 1; + } + int pages; + TReader::ReadPages(vec, nvec, &pages); + MaxpageR += pages; + return pages; + } + + int ReadBuf() { + QEvent[CurReadBuf ^ 1].Signal(); + AEvent[CurReadBuf].Wait(); + Buf = Bufs[CurReadBuf]; + Maxpage += (Pages = PagesM[CurReadBuf]); + CurReadBuf ^= 1; + return !Pages; + } + + int Maxpage, MaxpageR, PageNum, Frozen, Bufpages, Eof, Pages; + TDatPage* CurPage; + // TMappedArray<char> ABuf; + ui32 CurBuf; + ui32 CurReadBuf; + TMappedAllocation ABuf; + char* Buf; + char* Bufs[2]; + ui32 PagesM[2]; + TAutoEvent QEvent[2]; + TAutoEvent AEvent[2]; + std::thread ReadThread; + bool Finish; +}; + +template <typename TFileManip> +class TInputPageFileImpl: private TNonCopyable { +protected: + TFileManip FileManip; + +public: + TInputPageFileImpl() + : Pagesize(0) + , Fd(-1) + , Eof(1) + , Error(0) + , Pagenum(0) + , Recordsig(0) + { + Term(); + } + + ~TInputPageFileImpl() { + Term(); + } + + inline int IsEof() const { + return Eof; + } + + inline int GetError() const { + return Error; + } + + inline size_t GetPageSize() const { + return Pagesize; + } + + inline int GetLastPage() const { + return Pagenum; + } + + inline ui32 GetRecordSig() const { + return Recordsig; + } + + inline bool IsOpen() const { + return FileManip.IsOpen(); + } + +protected: + int Init(const char* fname, ui32 recsig, ui32* gotrecsig = nullptr, bool direct = false) { + Error = FileManip.Open(fname, direct); + return Error ? Error : Init(TFile(), recsig, gotrecsig); + } + + int Init(const TFile& file, ui32 recsig, ui32* gotrecsig = nullptr) { + if (!file.IsOpen() && !FileManip.IsOpen()) + return MBDB_NOT_INITIALIZED; + if (file.IsOpen() && FileManip.IsOpen()) + return MBDB_ALREADY_INITIALIZED; + if (file.IsOpen()) { + Error = FileManip.Init(file); + if (Error) + return Error; + } + + // TArrayHolder<ui8> buf(new ui8[METASIZE + FS_BLOCK_SIZE]); + // ui8* ptr = (buf.Get() + FS_BLOCK_SIZE - ((ui64)buf.Get() & (FS_BLOCK_SIZE - 1))); + TMappedArray<ui8> buf; + buf.Create(METASIZE); + ui8* ptr = &buf[0]; + TDatMetaPage* meta = (TDatMetaPage*)ptr; + ssize_t size = METASIZE; + ssize_t ret; + while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { + Y_ASSERT(ret <= size); + size -= ret; + ptr += ret; + } + if (size) { + FileManip.Close(); + return Error = MBDB_BAD_METAPAGE; + } + if (gotrecsig) + *gotrecsig = meta->RecordSig; + return Init(TFile(), meta, recsig); + } + + int Init(TAutoPtr<IInputStream> input, ui32 recsig, ui32* gotrecsig = nullptr) { + if (!input && !FileManip.IsOpen()) + return MBDB_NOT_INITIALIZED; + if (FileManip.IsOpen()) + return MBDB_ALREADY_INITIALIZED; + + Error = FileManip.Open(input); + if (Error) + return Error; + + TArrayHolder<ui8> buf(new ui8[METASIZE]); + ui8* ptr = buf.Get(); + ssize_t size = METASIZE; + ssize_t ret; + while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { + Y_ASSERT(ret <= size); + size -= ret; + ptr += ret; + } + if (size) { + FileManip.Close(); + return Error = MBDB_BAD_METAPAGE; + } + TDatMetaPage* meta = (TDatMetaPage*)buf.Get(); + if (gotrecsig) + *gotrecsig = meta->RecordSig; + return Init(TFile(), meta, recsig); + } + + int Init(const TFile& file, const TDatMetaPage* meta, ui32 recsig) { + if (!file.IsOpen() && !FileManip.IsOpen()) + return MBDB_NOT_INITIALIZED; + if (file.IsOpen() && FileManip.IsOpen()) + return MBDB_ALREADY_INITIALIZED; + if (file.IsOpen()) { + Error = FileManip.Init(file); + if (Error) + return Error; + } + + if (meta->MetaSig != METASIG) + Error = MBDB_BAD_METAPAGE; + else if (meta->RecordSig != recsig) + Error = MBDB_BAD_RECORDSIG; + + if (Error) { + FileManip.Close(); + return Error; + } + + i64 flength = FileManip.GetLength(); + if (flength >= 0) { + i64 fsize = flength; + fsize -= METASIZE; + if (fsize % meta->PageSize) + return Error = MBDB_BAD_FILE_SIZE; + Pagenum = (int)(fsize / meta->PageSize); + } else { + Pagenum = -1; + } + Pagesize = meta->PageSize; + Recordsig = meta->RecordSig; + Error = Eof = 0; + return Error; + } + + int ReadPages(iovec* vec, int nvec, int* pages) { + *pages = 0; + + if (Eof || Error) + return Error; + + ssize_t size = 0, delta = 0, total = 0; + iovec* pvec = vec; + int vsize = nvec; + + while (vsize && (size = Readv(FileManip, pvec, (int)Min(vsize, 16))) > 0) { + total += size; + if (delta) { + size += delta; + pvec->iov_len += delta; + pvec->iov_base = (char*)pvec->iov_base - delta; + delta = 0; + } + while (size) { + if ((size_t)size >= pvec->iov_len) { + size -= pvec->iov_len; + ++pvec; + --vsize; + } else { + delta = size; + pvec->iov_len -= size; + pvec->iov_base = (char*)pvec->iov_base + size; + size = 0; + } + } + } + if (delta) { + pvec->iov_len += delta; + pvec->iov_base = (char*)pvec->iov_base - delta; + } + if (size < 0) + return Error = errno ? errno : MBDB_READ_ERROR; + if (total % Pagesize) + return Error = MBDB_BAD_FILE_SIZE; + if (vsize) + Eof = 1; + *pages = total / Pagesize; // it would be better to assign it after the for-loops + for (; total; ++vec, total -= size) + for (size = 0; size < total && (size_t)size < vec->iov_len; size += Pagesize) + if (((TDatPage*)((char*)vec->iov_base + size))->PageSig != PAGESIG) + return Error = MBDB_BAD_PAGESIG; + return Error; + } + + int GotoPage(int page) { + if (Error) + return Error; + Eof = 0; + i64 offset = (i64)page * Pagesize + METASIZE; + if (offset != FileManip.Seek(offset, SEEK_SET)) + Error = MBDB_BAD_FILE_SIZE; + return Error; + } + + int Term() { + return FileManip.Close(); + } + + size_t Pagesize; + int Fd; + int Eof; + int Error; + int Pagenum; //!< number of pages in this file + ui32 Recordsig; +}; + +template <class TBaseReader> +class TMappedInputPageIterator: public TBaseReader { +public: + typedef TBaseReader TReader; + + TMappedInputPageIterator() { + Term(); + } + + ~TMappedInputPageIterator() { + Term(); + } + + TDatPage* Current() { + return CurPage; + } + + inline size_t GetPageSize() const { + return TReader::GetPageSize(); + } + + inline int GetPageNum() const { + return PageNum; + } + + inline int IsEof() const { + return Eof; + } + + inline int IsFrozen() const { + return 0; + } + + TDatPage* Next() { + i64 pos = (i64)(++PageNum) * GetPageSize() + METASIZE; + if (pos < 0 || pos >= (i64)TReader::GetSize()) { + Eof = 1; + return CurPage = nullptr; + } + return CurPage = (TDatPage*)((char*)TReader::GetData() + pos); + } + +protected: + int Init(size_t /*pages*/, int /*pagesOrBytes*/) { + Term(); + Eof = 0; + return 0; + } + + int Term() { + PageNum = -1; + Eof = 1; + CurPage = nullptr; + return 0; + } + + TDatPage* GotoPage(int pageno) { + PageNum = pageno - 1; + Eof = 0; + return Next(); + } + + int PageNum, Eof, Pages, Pagenum; + TDatPage* CurPage; +}; + +using TInputPageFile = TInputPageFileImpl<TInputFileManip>; + +template <class TVal, + typename TBaseRecIter = TInputRecordIterator<TVal, TInputPageIterator<TInputPageFile>>> +class TInDatFileImpl: public TBaseRecIter { +public: + typedef TBaseRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TPageIter::TReader TReader; + using TRecIter::GotoPage; + + int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr, bool direct = false) { + int ret = TReader::Init(fname, TVal::RecordSig, gotRecordSig, direct); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Open(const TFile& file, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { + int ret = TReader::Init(file, TVal::RecordSig, gotRecordSig); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Open(TAutoPtr<IInputStream> input, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { + int ret = TReader::Init(input, TVal::RecordSig, gotRecordSig); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Open(const TFile& file, const TDatMetaPage* meta, size_t pages = 1, int pagesOrBytes = 1) { + int ret = TReader::Init(file, meta, TVal::RecordSig); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Close() { + int ret1 = TRecIter::Term(); + int ret2 = TPageIter::Term(); + int ret3 = TReader::Term(); + return ret1 ? ret1 : ret2 ? ret2 : ret3; + } + + const TVal* GotoLastPage() { + return TReader::GetLastPage() <= 0 ? nullptr : TRecIter::GotoPage(TReader::GetLastPage() - 1); + } + +private: + int Open2(size_t pages, int pagesOrBytes) { + int ret = TPageIter::Init(pages, pagesOrBytes); + if (!ret) + ret = TRecIter::Init(); + if (ret) + Close(); + return ret; + } +}; + +template <class TVal> +class TInIndexFile: protected TInDatFileImpl<TVal> { + typedef TInDatFileImpl<TVal> TDatFile; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TExtInfoType<TVal>::TResult TExtInfo; + +public: + using TDatFile::IsOpen; + + TInIndexFile() + : Index0(nullptr) + { + } + + int Open(const char* fname, size_t pages = 2, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { + int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); + if (ret) + return ret; + if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { + TDatFile::Close(); + return MBDB_NO_MEMORY; + } + if (!TExtInfoType<TVal>::Exists && SizeOf((TVal*)nullptr)) + RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TVal*)nullptr)); + TDatFile::Next(); + memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); + return 0; + } + + int Close() { + free(Index0); + Index0 = nullptr; + return TDatFile::Close(); + } + + inline int GetError() const { + return TDatFile::GetError(); + } + + int FindKey(const TVal* akey, const TExtInfo* extInfo = nullptr) { + assert(IsOpen()); + if (TExtInfoType<TVal>::Exists || !SizeOf((TVal*)nullptr)) + return FindVszKey(akey, extInfo); + int num = FindKeyOnPage(Index0, akey); + TDatPage* page = TPageIter::GotoPage(num + 1); + if (!page) + return 0; + num = FindKeyOnPage(page, akey); + num += (TPageIter::GetPageNum() - 1) * RecsOnPage; + return num; + } + + int FindVszKey(const TVal* akey, const TExtInfo* extInfo = NULL) { + int num = FindVszKeyOnPage(Index0, akey, extInfo); + int num_add = 0; + for (int p = 0; p < num; p++) { + TDatPage* page = TPageIter::GotoPage(p + 1); + if (!page) + return 0; + num_add += page->RecNum; + } + TDatPage* page = TPageIter::GotoPage(num + 1); + if (!page) + return 0; + num = FindVszKeyOnPage(page, akey, extInfo); + num += num_add; + return num; + } + +protected: + int FindKeyOnPage(TDatPage* page, const TVal* key) { + int left = 0; + int right = page->RecNum - 1; + int recsize = DatCeil(SizeOf((TVal*)nullptr)); + while (left < right) { + int middle = (left + right) >> 1; + if (*((TVal*)((char*)page + sizeof(TDatPage) + middle * recsize)) < *key) + left = middle + 1; + else + right = middle; + } + //borders check (left and right) + return (left == 0 || *((TVal*)((char*)page + sizeof(TDatPage) + left * recsize)) < *key) ? left : left - 1; + } + + // will deserialize rawExtinfoA to extInfoA only if necessery + inline bool KeyLess_(const TVal* a, const TVal* b, + TExtInfo* extInfoA, const TExtInfo* extInfoB, + const ui8* rawExtInfoA, size_t rawLen) { + if (*a < *b) { + return true; + } else if (!extInfoB || *b < *a) { + return false; + } else { + // *a == *b && extInfoB + Y_PROTOBUF_SUPPRESS_NODISCARD extInfoA->ParseFromArray(rawExtInfoA, rawLen); + return (*extInfoA < *extInfoB); + } + } + + int FindVszKeyOnPage(TDatPage* page, const TVal* key, const TExtInfo* extInfo) { + TVal* cur = (TVal*)((char*)page + sizeof(TDatPage)); + ui32 recnum = page->RecNum; + if (!TExtInfoType<TVal>::Exists) { + for (; recnum > 0 && *cur < *key; --recnum) + cur = (TVal*)((char*)cur + DatCeil(SizeOf(cur))); + } else { + size_t ll; + size_t l; + size_t sz = NMicroBDB::SizeOfExt(cur, &ll, &l); + TExtInfo ei; + for (; recnum > 0 && KeyLess_(cur, key, &ei, extInfo, (ui8*)cur + sz + ll, l); --recnum) { + cur = (TVal*)((ui8*)cur + DatCeil(sz + ll + l)); + sz = NMicroBDB::SizeOfExt(cur, &ll, &l); + } + } + + int idx = page->RecNum - recnum - 1; + return (idx >= 0) ? idx : 0; + } + + TDatPage* Index0; + int RecsOnPage; +}; + +template <class TVal, class TKey, class TPageIterator = TInputPageIterator<TInputPageFile>> +class TKeyFileMixin: public TInDatFileImpl<TVal, TInputRecordIterator<TVal, TPageIterator>> { +protected: + TInIndexFile<TKey> KeyFile; +}; + +template <class TVal, class TKey, class TBase = TKeyFileMixin<TVal, TKey>> +class TDirectInDatFile: public TBase { + typedef TBase TDatFile; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TDatFile::TPageIter TPageIter; + +public: + void Open(const char* path, size_t pages = 1, size_t keypages = 1, int pagesOrBytes = 1) { + int ret; + ui32 gotRecordSig = 0; + + ret = TDatFile::Open(path, pages, pagesOrBytes, &gotRecordSig); + if (ret) { + ythrow yexception() << ErrorMessage(ret, "Failed to open input file", path, TVal::RecordSig, gotRecordSig); + } + char KeyName[PATH_MAX + 1]; + if (DatNameToIdx(KeyName, path)) { + ythrow yexception() << ErrorMessage(MBDB_BAD_FILENAME, "Failed to open input file", path); + } + gotRecordSig = 0; + ret = KeyFile.Open(KeyName, keypages, 1, &gotRecordSig); + if (ret) { + ythrow yexception() << ErrorMessage(ret, "Failed to open input keyfile", KeyName, TKey::RecordSig, gotRecordSig); + } + } + + void Close() { + int ret; + + if (TDatFile::IsOpen() && (ret = TDatFile::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing input file"); + if ((ret = TDatFile::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing input file"); + + if (KeyFile.IsOpen() && (ret = KeyFile.GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing input keyfile"); + if ((ret = KeyFile.Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing input keyfile"); + } + + const TVal* FindRecord(const TKey* key, const typename TExtInfoType<TKey>::TResult* extInfo = nullptr) { + int page = KeyFile.FindKey(key, extInfo); + const TVal* val = TRecIter::GotoPage(page); + if (!TExtInfoType<TVal>::Exists || !extInfo) { + TKey k; + while (val) { + TMakeExtKey<TVal, TKey>::Make(&k, nullptr, val, nullptr); + if (!(k < *key)) + break; + val = TRecIter::Next(); + } + } else { + typename TExtInfoType<TVal>::TResult valExt; + TKey k; + typename TExtInfoType<TKey>::TResult kExt; + while (val) { + TRecIter::GetExtInfo(&valExt); + TMakeExtKey<TVal, TKey>::Make(&k, &kExt, val, &valExt); + if (*key < k || !(k < *key) && !(kExt < *extInfo)) // k > *key || k == *key && kExt >= *extInfo + break; + val = TRecIter::Next(); + } + } + return val; + }; + + int FindPagesNo(const TKey* key, const typename TExtInfoType<TVal>::TResult* extInfo = NULL) { + return KeyFile.FindKey(key, extInfo); + } + +protected: + using TBase::KeyFile; +}; diff --git a/library/cpp/microbdb/microbdb.cpp b/library/cpp/microbdb/microbdb.cpp new file mode 100644 index 0000000000..c10dbdf126 --- /dev/null +++ b/library/cpp/microbdb/microbdb.cpp @@ -0,0 +1 @@ +#include "microbdb.h" diff --git a/library/cpp/microbdb/microbdb.h b/library/cpp/microbdb/microbdb.h new file mode 100644 index 0000000000..7521887337 --- /dev/null +++ b/library/cpp/microbdb/microbdb.h @@ -0,0 +1,54 @@ +#pragma once + +#include <util/folder/dirut.h> + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4706) /*assignment within conditional expression*/ +#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ +#endif + +#include "align.h" +#include "extinfo.h" +#include "header.h" +#include "reader.h" +#include "heap.h" +#include "file.h" +#include "sorter.h" +#include "input.h" +#include "output.h" +#include "sorterdef.h" + +inline int MakeSorterTempl(char path[/*FILENAME_MAX*/], const char* prefix) { + int ret = MakeTempDir(path, prefix); + if (!ret && strlcat(path, "%06d", FILENAME_MAX) > FILENAME_MAX - 100) + ret = EINVAL; + if (ret) + path[0] = 0; + return ret; +} + +inline int GetMeta(TFile& file, TDatMetaPage* meta) { + ui8 buf[METASIZE], *ptr = buf; + ssize_t size = sizeof(buf), ret; + while (size && (ret = file.Read(ptr, size)) > 0) { + size -= ret; + ptr += ret; + } + if (size) + return MBDB_BAD_FILE_SIZE; + ptr = buf; // gcc 4.4 warning fix + *meta = *(TDatMetaPage*)ptr; + return (meta->MetaSig == METASIG) ? 0 : MBDB_BAD_METAPAGE; +} + +template <class TRec> +inline bool IsDatFile(const char* fname) { + TDatMetaPage meta; + TFile f(fname, RdOnly); + return !GetMeta(f, &meta) && meta.RecordSig == TRec::RecordSig; +} + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif diff --git a/library/cpp/microbdb/noextinfo.proto b/library/cpp/microbdb/noextinfo.proto new file mode 100644 index 0000000000..6a78882e07 --- /dev/null +++ b/library/cpp/microbdb/noextinfo.proto @@ -0,0 +1,4 @@ + +message TNoExtInfo { +} + diff --git a/library/cpp/microbdb/output.h b/library/cpp/microbdb/output.h new file mode 100644 index 0000000000..d0ecab2108 --- /dev/null +++ b/library/cpp/microbdb/output.h @@ -0,0 +1,1049 @@ +#pragma once + +#include "header.h" +#include "file.h" + +#include <util/generic/buffer.h> +#include <util/memory/tempbuf.h> + +#include <sys/uio.h> + +template <class TFileManip> +inline ssize_t Writev(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { + ssize_t written_count = 0; + for (int n = 0; n < iovcnt; n++) { + ssize_t last_write = fileManip.Write(iov[n].iov_base, iov[n].iov_len); + if (last_write < 0) + return -1; + written_count += last_write; + } + return written_count; +} + +//********************************************************************* +struct TFakeIndexer { + inline void NextPage(TDatPage*) noexcept { + } +}; + +struct TCallbackIndexer { + typedef void (*TCallback)(void* This, const TDatPage* page); + + TCallbackIndexer() { + Callback = nullptr; + } + + void SetCallback(void* t, TCallback c) { + This = t; + Callback = c; + } + + void NextPage(TDatPage* dat) { + Callback(This, dat); + } + + TCallback Callback; + void* This; +}; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer = TFakeIndexer, typename TCompressor = TFakeCompression> +class TOutputRecordIterator; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer> +class TOutputRecordIterator<TVal, TBasePageIter, TBaseIndexer, TFakeCompression> + : public TBasePageIter, public TBaseIndexer { +public: + enum EOffset { + WrongOffset = size_t(-1) + }; + + typedef TBasePageIter TPageIter; + typedef TBaseIndexer TIndexer; + + TOutputRecordIterator() { + Clear(); + } + + ~TOutputRecordIterator() { + Term(); + } + + inline const TVal* Current() const { + return Rec; + } + + const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { + NMicroBDB::AssertValid(v); + size_t len = SizeOf(v); + if (!TExtInfoType<TVal>::Exists) + return (Reserve(len)) ? (TVal*)memcpy(Rec, v, len) : nullptr; + else if (extInfo) { + size_t extSize = extInfo->ByteSize(); + size_t extLenSize = len_long((i64)extSize); + if (!Reserve(len + extLenSize + extSize)) + return nullptr; + memcpy(Rec, v, len); + out_long((i64)extSize, (char*)Rec + len); + extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); + return Rec; + } else { + size_t extLenSize = len_long((i64)0); + if (!Reserve(len + extLenSize)) + return nullptr; + memcpy(Rec, v, len); + out_long((i64)0, (char*)Rec + len); + return Rec; + } + } + + const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { + NMicroBDB::AssertValid(v); + size_t sz = SizeOf(v); + if (!Reserve(sz + extLen)) + return nullptr; + memcpy(Rec, v, sz); + memcpy((ui8*)Rec + sz, extInfoRaw, extLen); + return Rec; + } + + // use values stored in microbdb readers/writers internal buffer only. + // method expects serialized extInfo after this record + const TVal* PushWithExtInfo(const TVal* v) { + NMicroBDB::AssertValid(v); + size_t extSize; + size_t extLenSize; + size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); + sz += extLenSize + extSize; + if (!Reserve(sz)) + return nullptr; + memcpy(Rec, v, sz); + return Rec; + } + + TVal* Reserve(size_t len) { + if (CurLen + DatCeil(len) > TPageIter::GetPageSize()) { + if (sizeof(TDatPage) + DatCeil(len) > TPageIter::GetPageSize()) + return Rec = nullptr; + if (TPageIter::Current() && RecNum) { + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_RAW; + memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); + TIndexer::NextPage(TPageIter::Current()); + RecNum = 0; + } + if (!TPageIter::Next()) { + CurLen = TPageIter::GetPageSize(); + return Rec = nullptr; + } + CurLen = sizeof(TDatPage); + } + LenForOffset = CurLen; + Rec = (TVal*)((char*)TPageIter::Current() + CurLen); + DatSet(Rec, len); + + CurLen += DatCeil(len); + + ++RecNum; + return Rec; + } + + void Flush() { + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_RAW; + } + + size_t Offset() const { + return Rec ? TPageIter::Offset() + LenForOffset : WrongOffset; + } + + void ResetDat() { + CurLen = (char*)Rec - (char*)TPageIter::Current(); + size_t len; + if (!TExtInfoType<TVal>::Exists) { + len = SizeOf(Rec); + } else { + size_t ll; + size_t l; + len = NMicroBDB::SizeOfExt(Rec, &ll, &l); + len += ll + l; + } + CurLen += DatCeil(len); + } + +protected: + void Clear() { + Rec = nullptr; + RecNum = 0; + CurLen = 0; + LenForOffset = 0; + } + + int Init() { + Clear(); + CurLen = TPageIter::GetPageSize(); + return 0; + } + + int Term() { + if (TPageIter::Current()) { + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_RAW; + memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); + RecNum = 0; + } + int ret = !TPageIter::Current() && RecNum; + Clear(); + return ret; + } + + int GotoPage(int pageno) { + if (TPageIter::Current()) { + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_RAW; + memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); + } + int ret = TPageIter::GotoPage(pageno); + if (!ret) { + RecNum = 0; + CurLen = sizeof(TDatPage); + } + return ret; + } + + TVal* Rec; + int RecNum; + size_t CurLen; + size_t LenForOffset; +}; + +template <class TVal, typename TBasePageIter, typename TBaseIndexer, typename TAlgorithm> +class TOutputRecordIterator + : public TBasePageIter, + public TBaseIndexer, + private TAlgorithm { + class TPageBuffer { + public: + void Init(size_t page) { + Pos = 0; + RecNum = 0; + Size = Min(page / 2, size_t(64 << 10)); + Data.Reset(new ui8[Size]); + } + + void Clear() { + Pos = 0; + RecNum = 0; + } + + inline bool Empty() const { + return RecNum == 0; + } + + public: + size_t Size; + size_t Pos; + int RecNum; + TArrayHolder<ui8> Data; + }; + +public: + typedef TBasePageIter TPageIter; + typedef TBaseIndexer TIndexer; + + TOutputRecordIterator() + : Rec(nullptr) + , RecNum(0) + { + } + + ~TOutputRecordIterator() { + Term(); + } + + const TVal* Current() const { + return Rec; + } + + const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { + NMicroBDB::AssertValid(v); + size_t len = SizeOf(v); + if (!TExtInfoType<TVal>::Exists) + return (Reserve(len)) ? (TVal*)memcpy((TVal*)Rec, v, len) : nullptr; + else if (extInfo) { + size_t extSize = extInfo->ByteSize(); + size_t extLenSize = len_long((i64)extSize); + if (!Reserve(len + extLenSize + extSize)) + return nullptr; + memcpy(Rec, v, len); + out_long((i64)extSize, (char*)Rec + len); + extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); + return Rec; + } else { + size_t extLenSize = len_long((i64)0); + if (!Reserve(len + extLenSize)) + return nullptr; + memcpy(Rec, v, len); + out_long((i64)0, (char*)Rec + len); + return Rec; + } + } + + const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { + NMicroBDB::AssertValid(v); + size_t sz = SizeOf(v); + if (!Reserve(sz + extLen)) + return NULL; + memcpy(Rec, v, sz); + memcpy((ui8*)Rec + sz, extInfoRaw, extLen); + return Rec; + } + + // use values stored in microbdb readers/writers internal buffer only. + // method expects serialized extInfo after this record + const TVal* PushWithExtInfo(const TVal* v) { + NMicroBDB::AssertValid(v); + size_t extSize; + size_t extLenSize; + size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); + sz += extLenSize + extSize; + if (!Reserve(sz)) + return nullptr; + memcpy(Rec, v, sz); + return Rec; + } + + TVal* Reserve(const size_t len) { + const size_t aligned = DatCeil(len); + + if (!TPageIter::Current()) { // Allocate fist page + if (!TPageIter::Next()) { + CurLen = TPageIter::GetPageSize(); + return Rec = nullptr; + } + CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); + } + + if (Buffer.Pos + aligned > Buffer.Size) { + if (Buffer.Pos == 0) + return Rec = nullptr; + if (FlushBuffer()) + return Rec = nullptr; + if (Buffer.Pos + aligned + sizeof(TDatPage) + sizeof(TCompressedPage) > Buffer.Size) + return Rec = nullptr; + } + + Rec = (TVal*)((char*)Buffer.Data.Get() + Buffer.Pos); + DatSet(Rec, len); // len is correct because DatSet set align tail to zero + + Buffer.RecNum++; + Buffer.Pos += aligned; + ++RecNum; + return Rec; + } + + void Flush() { + if (!Buffer.Empty()) { + FlushBuffer(); + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; + } + } + + size_t Offset() const { + // According to vadya@ there is no evil to return 0 all the time + return 0; + } + + void ResetDat() { + Buffer.Pos = (char*)Rec - (char*)Buffer.Data.Get(); + size_t len = SizeOf(Rec); + Buffer.Pos += DatCeil(len); + } + +protected: + void Clear() { + RecNum = 0; + Rec = nullptr; + Count = 0; + CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); + Buffer.Clear(); + } + + int Init() { + Clear(); + Buffer.Init(TPageIter::GetPageSize()); + TAlgorithm::Init(); + return 0; + } + + int Term() { + if (TPageIter::Current()) + Commit(); + int ret = !TPageIter::Current() && RecNum; + Clear(); + TAlgorithm::Term(); + return ret; + } + + int GotoPage(int pageno) { + if (TPageIter::Current()) + Commit(); + int ret = TPageIter::GotoPage(pageno); + if (!ret) + Reset(); + return ret; + } + +private: + void Commit() { + Flush(); + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; + SetCompressedPageHeader(); + + memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); + RecNum = 0; + Count = 0; + } + + inline void SetCompressedPageHeader() { + TCompressedPage* const hdr = (TCompressedPage*)((ui8*)TPageIter::Current() + sizeof(TDatPage)); + + hdr->BlockCount = Count; + hdr->Algorithm = TAlgorithm::Code; + hdr->Version = 0; + hdr->Reserved = 0; + } + + inline void Reset() { + RecNum = 0; + CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); + Count = 0; + Buffer.Clear(); + } + + int FlushBuffer() { + TArrayHolder<ui8> data; + const ui8* const buf = Buffer.Data.Get(); + size_t first = 0; + + if (!TExtInfoType<TVal>::Exists) + first = DatCeil(SizeOf((TVal*)buf)); + else { + size_t ll; + size_t l; + first = NMicroBDB::SizeOfExt((const TVal*)buf, &ll, &l); + first = DatCeil(first + ll + l); + } + + size_t total = sizeof(NMicroBDB::TCompressedHeader) + first + ((Buffer.RecNum == 1) ? 0 : TAlgorithm::CompressBound(Buffer.Pos - first)); + size_t real = total; + + { + ui8* p = nullptr; + NMicroBDB::TCompressedHeader* hdr = nullptr; + + // 1. Choose data destination (temporary buffer or dat-page) + if (CurLen + total > TPageIter::GetPageSize()) { + data.Reset(new ui8[total]); + + hdr = (NMicroBDB::TCompressedHeader*)data.Get(); + p = data.Get() + sizeof(NMicroBDB::TCompressedHeader); + } else { + p = (ui8*)TPageIter::Current() + CurLen; + hdr = (NMicroBDB::TCompressedHeader*)p; + p += sizeof(NMicroBDB::TCompressedHeader); + } + + // 2. Compress data + + // Fill header and first record + hdr->Original = Buffer.Pos; + hdr->Compressed = 0; + hdr->Count = Buffer.RecNum; + hdr->Reserved = 0; + memcpy(p, Buffer.Data.Get(), first); + // Fill compressed part + if (Buffer.RecNum > 1) { + size_t size = TAlgorithm::CompressBound(Buffer.Pos - first); + + p += first; + TAlgorithm::Compress(p, size, buf + first, Buffer.Pos - first); + + hdr->Compressed = size; + + real = sizeof(NMicroBDB::TCompressedHeader) + first + size; + } + } + + Y_ASSERT(sizeof(TDatPage) + sizeof(TCompressedPage) + real <= TPageIter::GetPageSize()); + + // 3. Check page capacity + + if (CurLen + real > TPageIter::GetPageSize()) { + Y_ASSERT(data.Get() != nullptr); + + if (TPageIter::Current() && RecNum) { + RecNum = RecNum - Buffer.RecNum; + TPageIter::Current()->RecNum = RecNum; + TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; + SetCompressedPageHeader(); + memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); + TIndexer::NextPage(TPageIter::Current()); + RecNum = Buffer.RecNum; + Count = 0; + } + if (!TPageIter::Next()) { + CurLen = TPageIter::GetPageSize(); + return MBDB_NO_MEMORY; + } + CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); + } + + // 4. Flush data and reset buffer state + + if (data.Get()) + memcpy((ui8*)TPageIter::Current() + CurLen, data.Get(), real); + CurLen += real; + ++Count; + Buffer.Clear(); + return 0; + } + +private: + size_t CurLen; + TPageBuffer Buffer; + TVal* Rec; + ui32 Count; //! < count of compressed blocks on page +public: + int RecNum; +}; + +template <typename TBaseWriter> +class TOutputPageIterator: public TBaseWriter { +public: + typedef TBaseWriter TWriter; + + TOutputPageIterator() + : Buf(nullptr) + { + Clear(); + } + + ~TOutputPageIterator() { + Term(); + } + + TDatPage* Current() { + return CurPage; + } + + size_t Offset() const { + //Cout << "PS = " << TWriter::GetPageSize() << "; PN = " << PageNum << "; MS = " << METASIZE << Endl; + return TWriter::GetPageSize() * PageNum + METASIZE; + } + + int Freeze() { + return (Frozen = (PageNum == -1) ? 0 : (int)PageNum); + } + + void Unfreeze() { + Frozen = -1; + } + + inline int IsFrozen() const { + return Frozen + 1; + } + + inline size_t GetPageSize() const { + return TWriter::GetPageSize(); + } + + inline int GetPageNum() const { + return (int)PageNum; + } + + TDatPage* Next() { + if (PageNum >= Maxpage && WriteBuf()) + return CurPage = nullptr; + CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); + memset(CurPage, 0, sizeof(TDatPage)); + return CurPage; + } + +protected: + int Init(size_t pages, int pagesOrBytes) { + Term(); + if (pagesOrBytes) + Bufpages = pages; + else + Bufpages = pages / GetPageSize(); + Bufpages = Max<size_t>(1, Bufpages); + Maxpage = Bufpages - 1; + // if (!(Buf = (char*)malloc(Bufpages * GetPageSize()))) + // return ENOMEM; + ABuf.Alloc(Bufpages * GetPageSize()); + Buf = ABuf.Begin(); + if (TWriter::Memo) + Freeze(); + return 0; + } + + int Term() { + Unfreeze(); + int ret = (PageNum < 0) ? 0 : WriteBuf(); + Clear(); + return ret; + } + + int GotoPage(int pageno) { + int ret = EAGAIN; + if (IsFrozen() || PageNum >= 0 && ((ret = WriteBuf())) || ((ret = TWriter::GotoPage(pageno)))) + return ret; + PageNum = pageno; + Maxpage = Bufpages - 1 + pageno; + CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); + memset(CurPage, 0, sizeof(TDatPage)); + return 0; + } + + void Clear() { + ABuf.Dealloc(); + Buf = nullptr; + Maxpage = PageNum = Frozen = -1; + Bufpages = 0; + CurPage = nullptr; + } + + int WriteBuf() { + int nvec; + iovec vec[2]; + ssize_t minpage = Maxpage - Bufpages + 1; + ssize_t maxpage = Frozen == -1 ? PageNum : Frozen - 1; + if (maxpage < minpage) + return EAGAIN; + minpage %= Bufpages; + maxpage %= Bufpages; + if (maxpage < minpage) { + vec[0].iov_base = Buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (Bufpages - minpage); + vec[1].iov_base = Buf; + vec[1].iov_len = GetPageSize() * (maxpage + 1); + nvec = 2; + } else { + vec[0].iov_base = Buf + GetPageSize() * minpage; + vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); + nvec = 1; + } + if (TWriter::WritePages(vec, nvec)) + return EIO; + Maxpage += (maxpage < minpage) ? (Bufpages - minpage + maxpage + 1) : (maxpage - minpage + 1); + return 0; + } + + ssize_t Maxpage; + ssize_t Bufpages; + ssize_t PageNum; + int Frozen; + TDatPage* CurPage; + char* Buf; + TMappedAllocation ABuf; +}; + +template <class TFileManip> +class TOutputPageFileImpl: private TNonCopyable { +public: + TOutputPageFileImpl() + : Pagesize(0) + , Eof(1) + , Error(0) + , Memo(0) + , Recordsig(0) + { + } + + ~TOutputPageFileImpl() { + Term(); + } + + inline int IsEof() const { + return Eof; + } + + inline int GetError() const { + return Error; + } + + inline bool IsOpen() const { + return FileManip.IsOpen(); + } + + inline size_t GetPageSize() const { + return Pagesize; + } + + inline ui32 GetRecordSig() const { + return Recordsig; + } + + int Init(const char* fname, size_t pagesize, ui32 recsig, bool direct = false) { + Memo = 0; + if (FileManip.IsOpen()) + return MBDB_ALREADY_INITIALIZED; + + if (!fname) { + Eof = Error = 0; + Pagesize = pagesize; + Recordsig = recsig; + Memo = 1; + return 0; + } + + Error = FileManip.Open(fname, WrOnly | CreateAlways | ARW | AWOther | (direct ? DirectAligned : EOpenMode())); + if (Error) + return Error; + Error = Init(TFile(), pagesize, recsig); + if (Error) { + FileManip.Close(); + unlink(fname); + } + return Error; + } + + int Init(TAutoPtr<IOutputStream> output, size_t pagesize, ui32 recsig) { + Memo = 0; + if (FileManip.IsOpen()) { + return MBDB_ALREADY_INITIALIZED; + } + + if (!output) { + Eof = Error = 0; + Pagesize = pagesize; + Recordsig = recsig; + Memo = 1; + return 0; + } + + Error = FileManip.Open(output); + if (Error) + return Error; + Error = Init(TFile(), pagesize, recsig); + if (Error) { + FileManip.Close(); + } + return Error; + } + + int Init(const TFile& file, size_t pagesize, ui32 recsig) { + Memo = 0; + if (!file.IsOpen() && !FileManip.IsOpen()) + return MBDB_NOT_INITIALIZED; + if (file.IsOpen() && FileManip.IsOpen()) + return MBDB_ALREADY_INITIALIZED; + if (file.IsOpen()) { + Error = FileManip.Init(file); + if (Error) + return Error; + } + + Eof = 1; + TTempBuf buf(METASIZE + FS_BLOCK_SIZE); + const char* ptr = (buf.Data() + FS_BLOCK_SIZE - ((ui64)buf.Data() & (FS_BLOCK_SIZE - 1))); + TDatMetaPage* meta = (TDatMetaPage*)ptr; + + memset(buf.Data(), 0, buf.Size()); + meta->MetaSig = METASIG; + meta->PageSize = Pagesize = pagesize; + meta->RecordSig = Recordsig = recsig; + + ssize_t size = METASIZE, ret = 0; + while (size && (ret = FileManip.Write(ptr, (unsigned)size)) > 0) { + size -= ret; + ptr += ret; + } + if (size || ret <= 0) { + Term(); + return Error = errno ? errno : MBDB_WRITE_ERROR; + } + + Error = Eof = 0; + return Error; + } + +protected: + int WritePages(iovec* vec, int nvec) { + if (Error || Memo) + return Error; + + ssize_t size, delta; + iovec* pvec; + int vsize; + + for (vsize = 0, pvec = vec; vsize < nvec; vsize++, pvec++) + for (size = 0; (size_t)size < pvec->iov_len; size += Pagesize) + ((TDatPage*)((char*)pvec->iov_base + size))->PageSig = PAGESIG; + + delta = size = 0; + pvec = vec; + vsize = nvec; + while (vsize && (size = Writev(FileManip, pvec, (int)Min(vsize, 16))) > 0) { + if (delta) { + size += delta; + pvec->iov_len += delta; + pvec->iov_base = (char*)pvec->iov_base - delta; + delta = 0; + } + while (size) { + if ((size_t)size >= pvec->iov_len) { + size -= pvec->iov_len; + ++pvec; + --vsize; + } else { + delta = size; + pvec->iov_len -= size; + pvec->iov_base = (char*)pvec->iov_base + size; + size = 0; + } + } + } + if (delta) { + pvec->iov_len += delta; + pvec->iov_base = (char*)pvec->iov_base - delta; + } + return Error = (!size && !vsize) ? 0 : errno ? errno : MBDB_WRITE_ERROR; + } + + i64 Tell() { + return FileManip.RealSeek(0, SEEK_CUR); + } + + int GotoPage(int pageno) { + if (Error || Memo) + return Error; + Eof = 0; + i64 offset = (i64)pageno * Pagesize + METASIZE; + if (offset != FileManip.Seek(offset, SEEK_SET)) + Error = MBDB_BAD_FILE_SIZE; + return Error; + } + + int Term() { + int ret = FileManip.Close(); + Eof = 1; + Memo = 0; + if (!Error) + Error = ret; + return Error; + } + + size_t Pagesize; + int Eof; + int Error; + int Memo; + ui32 Recordsig; + +private: + TFileManip FileManip; +}; + +using TOutputPageFile = TOutputPageFileImpl<TOutputFileManip>; + +template <class TVal, + typename TBaseRecIter = TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>>> +class TOutDatFileImpl: public TBaseRecIter { +public: + typedef TBaseRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TPageIter::TWriter TWriter; + + int Open(const char* fname, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1, bool direct = false) { + int ret = TWriter::Init(fname, pagesize, TVal::RecordSig, direct); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Open(const TFile& file, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { + int ret = TWriter::Init(file, pagesize, TVal::RecordSig); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Open(TAutoPtr<IOutputStream> output, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { + int ret = TWriter::Init(output, pagesize, TVal::RecordSig); + return ret ? ret : Open2(pages, pagesOrBytes); + } + + int Close() { + int ret1 = TRecIter::Term(); + int ret2 = TPageIter::Term(); + int ret3 = TWriter::Term(); + return ret1 ? ret1 : ret2 ? ret2 : ret3; + } + +private: + int Open2(size_t pages, int pagesOrBytes) { + int ret = TPageIter::Init(pages, pagesOrBytes); + if (!ret) + ret = TRecIter::Init(); + if (ret) + Close(); + return ret; + } +}; + +template <class TVal> +class TOutIndexFile: public TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> { + typedef TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> + TDatFile; + typedef TOutIndexFile<TVal> TMyType; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TIndexer TIndexer; + +public: + TOutIndexFile() { + TIndexer::SetCallback(this, DispatchCallback); + } + + int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { + int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); + if (ret) + return ret; + if ((ret = TRecIter::GotoPage(1))) { + TDatFile::Close(); + return ret; + } + Index0.Clear(); + return ret; + } + + int Close() { + TPageIter::Unfreeze(); + if (TRecIter::RecNum) { + TRecIter::Flush(); + NextPage(TPageIter::Current()); + } + int ret = 0; + if (Index0.Size() && !(ret = TRecIter::GotoPage(0))) { + const char* ptr = Index0.Begin(); + size_t recSize; + while (ptr < Index0.End()) { + Y_ASSERT((size_t)(Index0.End() - ptr) >= sizeof(size_t)); + memcpy(&recSize, ptr, sizeof(size_t)); + ptr += sizeof(size_t); + Y_ASSERT((size_t)(Index0.End() - ptr) >= recSize); + ui8* buf = (ui8*)TRecIter::Reserve(recSize); + if (!buf) { + ret = MBDB_PAGE_OVERFLOW; + break; + } + memcpy(buf, ptr, recSize); + TRecIter::ResetDat(); + ptr += recSize; + } + Index0.Clear(); + ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); + } + int ret1 = TDatFile::Close(); + return ret ? ret : ret1; + } + +protected: + TBuffer Index0; + + void NextPage(const TDatPage* page) { + const TVal* first = (const TVal*)NMicroBDB::GetFirstRecord(page); + size_t sz; + if (!TExtInfoType<TVal>::Exists) { + sz = SizeOf(first); + } else { + size_t ll; + size_t l; + sz = NMicroBDB::SizeOfExt(first, &ll, &l); + sz += ll + l; + } + Index0.Append((const char*)&sz, sizeof(size_t)); + Index0.Append((const char*)first, sz); + } + + static void DispatchCallback(void* This, const TDatPage* page) { + ((TMyType*)This)->NextPage(page); + } +}; + +template <class TVal, class TKey, typename TCompressor = TFakeCompression, class TPageFile = TOutputPageFile> +class TOutDirectFileImpl: public TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> { + typedef TOutDatFileImpl< + TVal, + TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> + TDatFile; + typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TMyType; + typedef typename TDatFile::TRecIter TRecIter; + typedef typename TRecIter::TPageIter TPageIter; + typedef typename TRecIter::TIndexer TIndexer; + typedef TOutIndexFile<TKey> TKeyFile; + +public: + TOutDirectFileImpl() { + TIndexer::SetCallback(this, DispatchCallback); + } + + int Open(const char* fname, size_t pagesize, int pages = 1, size_t ipagesize = 0, size_t ipages = 1, int pagesOrBytes = 1) { + char iname[FILENAME_MAX]; + int ret; + if (ipagesize == 0) + ipagesize = pagesize; + ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); + ret = ret ? ret : DatNameToIdx(iname, fname); + ret = ret ? ret : KeyFile.Open(iname, ipagesize, ipages, pagesOrBytes); + if (ret) + TDatFile::Close(); + return ret; + } + + int Close() { + if (TRecIter::RecNum) { + TRecIter::Flush(); + NextPage(TPageIter::Current()); + } + int ret = KeyFile.Close(); + int ret1 = TDatFile::Close(); + return ret1 ? ret1 : ret; + } + + int GetError() const { + return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); + } + +protected: + TKeyFile KeyFile; + + void NextPage(const TDatPage* page) { + typedef TMakeExtKey<TVal, TKey> TMakeExtKey; + + TVal* val = (TVal*)NMicroBDB::GetFirstRecord(page); + TKey key; + if (!TMakeExtKey::Exists) { + TMakeExtKey::Make(&key, nullptr, val, nullptr); + KeyFile.Push(&key); + } else { + size_t ll; + size_t l; + size_t sz = NMicroBDB::SizeOfExt(val, &ll, &l); + typename TExtInfoType<TVal>::TResult valExt; + if (TExtInfoType<TVal>::Exists) + Y_PROTOBUF_SUPPRESS_NODISCARD valExt.ParseFromArray((ui8*)val + sz + ll, l); + typename TExtInfoType<TKey>::TResult keyExt; + TMakeExtKey::Make(&key, &keyExt, val, &valExt); + KeyFile.Push(&key, &keyExt); + } + } + + static void DispatchCallback(void* This, const TDatPage* page) { + ((TMyType*)This)->NextPage(page); + } +}; diff --git a/library/cpp/microbdb/powersorter.h b/library/cpp/microbdb/powersorter.h new file mode 100644 index 0000000000..c40de9c23f --- /dev/null +++ b/library/cpp/microbdb/powersorter.h @@ -0,0 +1,667 @@ +#pragma once + +#include "safeopen.h" + +#include <util/generic/vector.h> +#include <util/generic/deque.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> +#include <util/thread/pool.h> + +template < + class TRecord, + template <typename T> class TCompare, + class TSieve, + class TMemoFile = TOutDatFile<TRecord>> +class TDatSorterBuf { +public: + typedef TRecord TRec; + typedef TVector<TRec*> TVectorType; + typedef TMemoFile TMemo; + typedef TCompare<TRecord> TComp; + +public: + TDatSorterBuf(size_t memory, size_t pageSize) + : Memo("memo", pageSize, memory, 0) + , Cur() + { + Memo.Open(nullptr); + Memo.Freeze(); + } + + ~TDatSorterBuf() { + Vector.clear(); + Memo.Close(); + } + + const TRec* Push(const TRec* v) { + const TRec* u = Memo.Push(v); + if (u) + Vector.push_back((TRec*)u); + return u; + } + + const TRec* Next() { + if (Ptr == Vector.end()) { + if (Cur) + TSieve::Sieve(Cur, Cur); + Cur = nullptr; + } else { + Cur = *Ptr++; + if (!TIsSieveFake<TSieve>::Result) + while (Ptr != Vector.end() && TSieve::Sieve(Cur, *Ptr)) + ++Ptr; + } + return Cur; + } + + const TRec* Current() { + return Cur; + } + + size_t Size() { + return Vector.size(); + } + + void Sort() { + Ptr = Vector.begin(); + Cur = nullptr; + + MBDB_SORT_FUN(Vector.begin(), Vector.end(), TComp()); + } + + void Clear() { + Vector.clear(); + Memo.Freeze(); + Ptr = Vector.begin(); + Cur = nullptr; + } + +private: + TVectorType Vector; + TMemo Memo; + + typename TVectorType::iterator + Ptr; + TRec* Cur; +}; + +template < + class TRecord, + class TInput, + template <typename T> class TCompare, + class TSieve> +class TDatMerger { +public: + typedef TRecord TRec; + typedef TCompare<TRecord> TComp; + typedef TSimpleSharedPtr<TInput> TInputPtr; + typedef TVector<TInputPtr> TInputVector; + +public: + ~TDatMerger() { + Close(); + } + + void Init(const TInputVector& inputs) { + Inputs = inputs; + TVector<TInput*> v; + for (int i = 0; i < Inputs.ysize(); ++i) + v.push_back(Inputs[i].Get()); + HeapIter.Init(&v[0], v.size()); + if (!TIsSieveFake<TSieve>::Result) + PNext = HeapIter.Next(); + } + + const TRec* Next() { + if (TIsSieveFake<TSieve>::Result) { + return HeapIter.Next(); + } + + if (!PNext) { + if (PCur) { + TSieve::Sieve(PCur, PCur); + PCur = nullptr; + } + return nullptr; + } + + PCur = &Cur; + memcpy(PCur, PNext, SizeOf((const TRec*)PNext)); + + do { + PNext = HeapIter.Next(); + } while (PNext && TSieve::Sieve(PCur, PNext)); + + return PCur; + } + + const TRec* Current() { + return (TIsSieveFake<TSieve>::Result ? HeapIter.Current() : PCur); + } + + void Close() { + Inputs.clear(); + HeapIter.Term(); + } + +private: + TInputVector Inputs; + THeapIter<TRec, TInput, TComp> HeapIter; + TRec Cur; + TRec* PCur = nullptr; + const TRec* PNext = nullptr; +}; + +class TPortionManager { +public: + void Open(const char* tempDir) { + TGuard<TMutex> guard(Mutex); + TempDir = tempDir; + } + + TString Next() { + TGuard<TMutex> guard(Mutex); + if (Portions == 0) + DoOpen(); + TString fname = GeneratePortionFilename(Portions++); + return fname; + } + + void Close() { + TGuard<TMutex> guard(Mutex); + Portions = 0; + } + +private: + void DoOpen() { + if (MakeSorterTempl(PortionFilenameTempl, TempDir.data())) { + PortionFilenameTempl[0] = 0; + ythrow yexception() << "portion-manager: bad tempdir \"" << TempDir.data() << "\": " << LastSystemErrorText(); + } + } + + TString GeneratePortionFilename(int i) { + char str[FILENAME_MAX]; + snprintf(str, sizeof(str), PortionFilenameTempl, i); + return TString(str); + } + +private: + TMutex Mutex; + + TString TempDir; + char PortionFilenameTempl[FILENAME_MAX] = {}; + int Portions = 0; +}; + +// A merger powered by threads +template < + class TRecord, + template <typename T> class TCompare, + class TSieve, + class TInput = TInDatFile<TRecord>, + class TOutput = TOutDatFile<TRecord>> +class TPowerMerger { +public: + typedef TRecord TRec; + typedef TDatMerger<TRecord, TInput, TCompare, TSieve> TMerger; + typedef TSimpleSharedPtr<TMerger> TMergerPtr; + typedef TPowerMerger<TRecord, TCompare, TSieve, TInput, TOutput> TFileMerger; + + struct TMergePortionTask: public IObjectInQueue { + TFileMerger* FileMerger; + int Begin; + int End; + TString OutFname; + + TMergePortionTask(TFileMerger* fileMerger, int begin, int end, const TString& outFname) + : FileMerger(fileMerger) + , Begin(begin) + , End(end) + , OutFname(outFname) + { + } + + void Process(void*) override { + THolder<TMergePortionTask> This(this); + //fprintf(stderr, "MergePortion: (%i, %i, %s)\n", Begin, End, ~OutFname); + FileMerger->MergePortion(Begin, End, OutFname); + } + }; + +public: + TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const TSimpleSharedPtr<TPortionManager>& portMan, + int memory, int pageSize, bool autoUnlink) + : MtpQueue(mtpQueue) + , PortionManager(portMan) + , Memory(memory) + , PageSize(pageSize) + , AutoUnlink(autoUnlink) + { + } + + TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const char* tempDir, + int memory, int pageSize, bool autoUnlink) + : MtpQueue(mtpQueue) + , PortionManager(new TPortionManager) + , Memory(memory) + , PageSize(pageSize) + , AutoUnlink(autoUnlink) + { + PortionManager->Open(tempDir); + } + + ~TPowerMerger() { + Close(); + } + + void SetMtpQueue(const TSimpleSharedPtr<TThreadPool>& mtpQueue) { + MtpQueue = mtpQueue; + } + + void MergePortion(int begin, int end, const TString& outFname) { + TMerger merger; + InitMerger(merger, begin, end); + + TOutput out("mergeportion-tmpout", PageSize, BufSize, 0); + out.Open(outFname.data()); + const TRec* rec; + while ((rec = merger.Next())) + out.Push(rec); + out.Close(); + + merger.Close(); + + { + TGuard<TMutex> guard(Mutex); + UnlinkFiles(begin, end); + Files.push_back(outFname); + --Tasks; + TaskFinishedCond.Signal(); + } + } + + void Add(const TString& fname) { + TGuard<TMutex> guard(Mutex); + // fprintf(stderr, "TPowerMerger::Add: %s\n", ~fname); + Files.push_back(fname); + if (InitialFilesEnd > 0) + ythrow yexception() << "TPowerMerger::Add: no more files allowed"; + } + + void Merge(int maxPortions) { + TGuard<TMutex> guard(Mutex); + InitialFilesEnd = Files.ysize(); + if (!InitialFilesEnd) + ythrow yexception() << "TPowerMerger::Merge: no files added"; + Optimize(maxPortions); + MergeMT(); + InitMerger(Merger, CPortions, Files.ysize()); + } + + void Close() { + TGuard<TMutex> guard(Mutex); + Merger.Close(); + UnlinkFiles(CPortions, Files.ysize()); + InitialFilesEnd = CPortions = 0; + Files.clear(); + } + + const TRec* Next() { + return Merger.Next(); + } + + const TRec* Current() { + return Merger.Current(); + } + + int FileCount() const { + TGuard<TMutex> guard(Mutex); + return Files.ysize(); + } + +private: + void InitMerger(TMerger& merger, int begin, int end) { + TGuard<TMutex> guard(Mutex); + TVector<TSimpleSharedPtr<TInput>> inputs; + for (int i = begin; i < end; ++i) { + inputs.push_back(new TInput("mergeportion-tmpin", BufSize, 0)); + inputs.back()->Open(Files[i]); + // fprintf(stderr, "InitMerger: %i, %s\n", i, ~Files[i]); + } + merger.Init(inputs); + } + + void UnlinkFiles(int begin, int end) { + TGuard<TMutex> guard(Mutex); + for (int i = begin; i < end; ++i) { + if (i >= InitialFilesEnd || AutoUnlink) + unlink(Files[i].c_str()); + } + } + + void Optimize(int maxPortions, size_t maxBufSize = 4u << 20) { + TGuard<TMutex> guard(Mutex); + maxPortions = std::min(maxPortions, Memory / PageSize - 1); + maxBufSize = std::max((size_t)PageSize, maxBufSize); + + if (maxPortions <= 2) { + FPortions = MPortions = 2; + BufSize = PageSize; + return; + } + + int Portions = Files.ysize(); + if (maxPortions >= Portions) { + FPortions = MPortions = Portions; + } else if (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) { + while (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) + --maxPortions; + MPortions = ++maxPortions; + int total = ((Portions + MPortions - 1) / MPortions) + Portions; + FPortions = (total % MPortions) ? (total % MPortions) : (int)MPortions; + } else + FPortions = MPortions = maxPortions; + + BufSize = std::min((size_t)(Memory / (MPortions + 1)), maxBufSize); + // fprintf(stderr, "Optimize: Portions=%i; MPortions=%i; FPortions=%i; Memory=%i; BufSize=%i\n", + // (int)Portions, (int)MPortions, (int)FPortions, (int)Memory, (int)BufSize); + } + + void MergeMT() { + TGuard<TMutex> guard(Mutex); + do { + int n; + while ((n = Files.ysize() - CPortions) > MPortions) { + int m = std::min((CPortions == 0 ? (int)FPortions : (int)MPortions), n); + TString fname = PortionManager->Next(); + if (!MtpQueue->Add(new TMergePortionTask(this, CPortions, CPortions + m, fname))) + ythrow yexception() << "TPowerMerger::MergeMT: failed to add task"; + CPortions += m; + ++Tasks; + } + if (Tasks > 0) + TaskFinishedCond.Wait(Mutex); + } while (Tasks > 0); + } + +private: + TMutex Mutex; + TCondVar TaskFinishedCond; + + TMerger Merger; + TSimpleSharedPtr<TThreadPool> MtpQueue; + TSimpleSharedPtr<TPortionManager> PortionManager; + TVector<TString> Files; + int Tasks = 0; + int InitialFilesEnd = 0; + int CPortions = 0; + int MPortions = 0; + int FPortions = 0; + int Memory = 0; + int PageSize = 0; + int BufSize = 0; + bool AutoUnlink = false; +}; + +// A sorter powered by threads +template < + class TRecord, + template <typename T> class TCompare, + class TSieve = TFakeSieve<TRecord>, + class TTmpInput = TInDatFile<TRecord>, + class TTmpOutput = TOutDatFile<TRecord>> +class TPowerSorter { +public: + typedef TPowerSorter<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TSorter; + typedef TRecord TRec; + typedef TTmpOutput TTmpOut; + typedef TTmpInput TTmpIn; + typedef TDatSorterBuf<TRecord, TCompare, TSieve> TSorterBuf; + typedef TCompare<TRecord> TComp; + typedef TPowerMerger<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TFileMerger; + + struct TSortPortionTask: public IObjectInQueue { + TSorter* Sorter; + TSorterBuf* SorterBuf; + int Portion; + + TSortPortionTask(TSorter* sorter, TSorterBuf* sorterBuf, int portion) + : Sorter(sorter) + , SorterBuf(sorterBuf) + , Portion(portion) + { + } + + void Process(void*) override { + TAutoPtr<TSortPortionTask> This(this); + // fprintf(stderr, "SortPortion: %i\n", Portion); + Sorter->SortPortion(SorterBuf); + } + }; + + class TSorterBufQueue { + private: + TMutex Mutex; + TCondVar Cond; + TVector<TSimpleSharedPtr<TSorterBuf>> V; + TDeque<TSorterBuf*> Q; + + int Memory, PageSize, MaxSorterBufs; + + public: + TSorterBufQueue(int memory, int pageSize, int maxSorterBufs) + : Memory(memory) + , PageSize(pageSize) + , MaxSorterBufs(maxSorterBufs) + { + } + + void Push(TSorterBuf* sb) { + TGuard<TMutex> guard(Mutex); + sb->Clear(); + Q.push_back(sb); + Cond.Signal(); + } + + TSorterBuf* Pop() { + TGuard<TMutex> guard(Mutex); + if (!Q.size() && V.ysize() < MaxSorterBufs) { + V.push_back(new TSorterBuf(Memory / MaxSorterBufs, PageSize)); + return V.back().Get(); + } else { + while (!Q.size()) + Cond.Wait(Mutex); + TSorterBuf* t = Q.front(); + Q.pop_front(); + return t; + } + } + + void Clear() { + TGuard<TMutex> guard(Mutex); + Q.clear(); + V.clear(); + } + + void WaitAll() { + TGuard<TMutex> guard(Mutex); + while (Q.size() < V.size()) { + Cond.Wait(Mutex); + } + } + + int GetMaxSorterBufs() const { + return MaxSorterBufs; + } + }; + +public: + TPowerSorter(const TSimpleSharedPtr<TThreadPool>& mtpQueue, size_t maxSorterBufs, + const char* name, size_t memory, size_t pageSize, size_t bufSize) + : MaxSorterBufs(maxSorterBufs) + , Name(name) + , Memory(memory) + , PageSize(pageSize) + , BufSize(bufSize) + , MtpQueue(mtpQueue) + , PortionManager(new TPortionManager) + , SBQueue(Memory, PageSize, MaxSorterBufs) + , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) + { + } + + TPowerSorter(size_t maxSorterBufs, + const char* name, size_t memory, size_t pageSize, size_t bufSize) + : MaxSorterBufs(maxSorterBufs) + , Name(name) + , Memory(memory) + , PageSize(pageSize) + , BufSize(bufSize) + , PortionManager(new TPortionManager) + , SBQueue(Memory, PageSize, maxSorterBufs) + , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) + { + } + + TPowerSorter(const char* name, size_t memory, size_t pageSize, size_t bufSize) + : MaxSorterBufs(5) + , Name(name) + , Memory(memory) + , PageSize(pageSize) + , BufSize(bufSize) + , PortionManager(new TPortionManager) + , SBQueue(Memory, PageSize, MaxSorterBufs) + , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) + { + } + + ~TPowerSorter() { + Close(); + } + + void Open(const char* tempDir) { + Close(); + CurSB = SBQueue.Pop(); + PortionManager->Open(tempDir); + } + + void Reopen(const char* fname) { + Open(fname); + } + + void Close() { + CurSB = nullptr; + SBQueue.Clear(); + PortionCount = 0; + FileMerger.Close(); + PortionManager->Close(); + } + + const TRec* Push(const TRec* v) { + CheckOpen("Push"); + const TRec* u = CurSB->Push(v); + if (!u) { + NextPortion(); + u = CurSB->Push(v); + } + return u; + } + + void Sort(int maxPortions = 1000) { + CheckOpen("Sort"); + if (!PortionCount) { + CurSB->Sort(); + } else { + NextPortion(); + SBQueue.Push(CurSB); + CurSB = nullptr; + SBQueue.WaitAll(); + SBQueue.Clear(); + FileMerger.Merge(maxPortions); + } + } + + const TRec* Next() { + return PortionCount ? FileMerger.Next() : CurSB->Next(); + } + + const TRec* Current() { + return PortionCount ? FileMerger.Current() : CurSB->Current(); + } + + int GetBufSize() const { + return BufSize; + } + + int GetPageSize() const { + return PageSize; + } + + const char* GetName() const { + return Name.data(); + } + +private: + void CheckOpen(const char* m) { + if (!CurSB) + ythrow yexception() << "TPowerSorter::" << m << ": the sorter is not open"; + } + + void NextPortion() { + if (!CurSB->Size()) + return; + ++PortionCount; + if (MaxSorterBufs <= 1) { + SortPortion(CurSB); + } else { + if (!MtpQueue.Get()) { + MtpQueue.Reset(new TThreadPool); + MtpQueue->Start(MaxSorterBufs - 1); + FileMerger.SetMtpQueue(MtpQueue); + } + if (!MtpQueue->Add(new TSortPortionTask(this, CurSB, PortionCount))) + ythrow yexception() << "TPowerSorter::NextPortion: failed to add task"; + } + CurSB = SBQueue.Pop(); + } + + void SortPortion(TSorterBuf* sorterBuf) { + TString portionFilename = PortionManager->Next(); + try { + sorterBuf->Sort(); + + // fprintf(stderr, "TPowerSorter::SortPortion: -> %s\n", ~portionFilename); + TTmpOut out("powersorter-portion", PageSize, BufSize, 0); + out.Open(portionFilename.data()); + + while (sorterBuf->Next()) + out.Push(sorterBuf->Current()); + + out.Close(); + FileMerger.Add(portionFilename); + SBQueue.Push(sorterBuf); + } catch (const yexception& e) { + unlink(portionFilename.data()); + ythrow yexception() << "SortPortion: " << e.what(); + } + } + +private: + int MaxSorterBufs = 0; + TString Name; + int Memory = 0; + int PageSize = 0; + int BufSize = 0; + + TMutex Mutex; + TSimpleSharedPtr<TThreadPool> MtpQueue; + TSimpleSharedPtr<TPortionManager> PortionManager; + + TSorterBufQueue SBQueue; + TSorterBuf* CurSB = nullptr; + int PortionCount = 0; + + TFileMerger FileMerger; +}; diff --git a/library/cpp/microbdb/reader.h b/library/cpp/microbdb/reader.h new file mode 100644 index 0000000000..694a2f1766 --- /dev/null +++ b/library/cpp/microbdb/reader.h @@ -0,0 +1,354 @@ +#pragma once + +#include "align.h" +#include "header.h" +#include "extinfo.h" + +#include <contrib/libs/zlib/zlib.h> +#include <contrib/libs/fastlz/fastlz.h> +#include <contrib/libs/snappy/snappy.h> + +#include <util/generic/vector.h> +#include <util/memory/tempbuf.h> + +namespace NMicroBDB { + static const size_t DEFAULT_BUFFER_SIZE = (64 << 10); + + //! + template <class TVal> + class IBasePageReader { + public: + virtual size_t GetRecSize() const = 0; + virtual size_t GetExtSize() const = 0; + virtual bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const = 0; + virtual const ui8* GetExtInfoRaw(size_t* len) const = 0; + virtual const TVal* Next() = 0; + virtual void Reset() = 0; + //! set clearing flag, so temporary buffers will be cleared + //! in next call of Next() + virtual void SetClearFlag() { + } + + virtual ~IBasePageReader() { + } + }; + + template <class TVal, typename TPageIter> + class TRawPageReader: public IBasePageReader<TVal> { + public: + TRawPageReader(TPageIter* const iter) + : PageIter(iter) + { + Reset(); + } + + bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { + Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); + if (!Rec) + return false; + ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; + return extInfo->ParseFromArray(raw, ExtSize); + } + + size_t GetRecSize() const override { + return RecSize + ExtLenSize; + } + + size_t GetExtSize() const override { + return ExtSize; + } + + const ui8* GetExtInfoRaw(size_t* len) const override { + Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); + if (!Rec) { + *len = 0; + return nullptr; + } + *len = ExtLenSize + ExtSize; + return (ui8*)Rec + RecSize; + } + + const TVal* Next() override { + if (!Rec) + Rec = (TVal*)((char*)PageIter->Current() + sizeof(TDatPage)); + else + Rec = (TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); + if (!TExtInfoType<TVal>::Exists) + RecSize = SizeOf(Rec); + else + RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); + return Rec; + } + + void Reset() override { + Rec = nullptr; + RecSize = 0; + ExtLenSize = 0; + ExtSize = 0; + } + + private: + const TVal* Rec; + size_t RecSize; + size_t ExtLenSize; + size_t ExtSize; + TPageIter* const PageIter; + }; + + template <class TVal, typename TPageIter> + class TCompressedReader: public IBasePageReader<TVal> { + inline size_t GetFirstRecordSize(const TVal* const in) const { + if (!TExtInfoType<TVal>::Exists) { + return DatCeil(SizeOf(in)); + } else { + size_t ll; + size_t l; + size_t ret = SizeOfExt(in, &ll, &l); + + return DatCeil(ret + ll + l); + } + } + + void DecompressBlock() { + if (PageIter->IsFrozen() && Buffer.Get()) + Blocks.push_back(Buffer.Release()); + + const TCompressedHeader* hdr = (const TCompressedHeader*)(Page); + + Page += sizeof(TCompressedHeader); + + const size_t first = GetFirstRecordSize((const TVal*)Page); + + if (!Buffer.Get() || Buffer->Size() < hdr->Original) + Buffer.Reset(new TTempBuf(Max<size_t>(hdr->Original, DEFAULT_BUFFER_SIZE))); + + memcpy(Buffer->Data(), Page, first); + Page += first; + + if (hdr->Count > 1) { + switch (Algo) { + case MBDB_COMPRESSION_ZLIB: { + uLongf dst = hdr->Original - first; + + int ret = uncompress((Bytef*)Buffer->Data() + first, &dst, Page, hdr->Compressed); + + if (ret != Z_OK) + ythrow yexception() << "error then uncompress " << ret; + } break; + case MBDB_COMPRESSION_FASTLZ: { + int dst = hdr->Original - first; + int ret = yfastlz_decompress(Page, hdr->Compressed, Buffer->Data() + first, dst); + + if (!ret) + ythrow yexception() << "error then uncompress"; + } break; + case MBDB_COMPRESSION_SNAPPY: { + if (!snappy::RawUncompress((const char*)Page, hdr->Compressed, Buffer->Data() + first)) + ythrow yexception() << "error then uncompress"; + } break; + } + } + + Rec = nullptr; + RecNum = hdr->Count; + Page += hdr->Compressed; + } + + void ClearBuffer() { + for (size_t i = 0; i < Blocks.size(); ++i) + delete Blocks[i]; + Blocks.clear(); + ClearFlag = false; + } + + public: + TCompressedReader(TPageIter* const iter) + : Rec(nullptr) + , RecSize(0) + , ExtLenSize(0) + , ExtSize(0) + , Page(nullptr) + , PageIter(iter) + , RecNum(0) + , BlockNum(0) + , ClearFlag(false) + { + } + + ~TCompressedReader() override { + ClearBuffer(); + } + + size_t GetRecSize() const override { + return RecSize + ExtLenSize; + } + + size_t GetExtSize() const override { + return ExtSize; + } + + bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { + Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); + if (!Rec) + return false; + ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; + return extInfo->ParseFromArray(raw, ExtSize); + } + + const ui8* GetExtInfoRaw(size_t* len) const override { + Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); + if (!Rec) { + *len = 0; + return nullptr; + } + *len = ExtLenSize + ExtSize; + return (ui8*)Rec + RecSize; + } + + const TVal* Next() override { + Y_ASSERT(RecNum >= 0); + + if (ClearFlag) + ClearBuffer(); + + if (!Page) { + if (!PageIter->Current()) + return nullptr; + + Page = (ui8*)PageIter->Current() + sizeof(TDatPage); + + BlockNum = ((TCompressedPage*)Page)->BlockCount - 1; + Algo = (ECompressionAlgorithm)((TCompressedPage*)Page)->Algorithm; + Page += sizeof(TCompressedPage); + + DecompressBlock(); + } + + if (!RecNum) { + if (BlockNum <= 0) + return nullptr; + else { + --BlockNum; + DecompressBlock(); + } + } + + --RecNum; + if (!Rec) + Rec = (const TVal*)Buffer->Data(); + else + Rec = (const TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); + + if (!TExtInfoType<TVal>::Exists) + RecSize = SizeOf(Rec); + else + RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); + + return Rec; + } + + void Reset() override { + Page = nullptr; + BlockNum = 0; + Rec = nullptr; + RecSize = 0; + ExtLenSize = 0; + ExtSize = 0; + RecNum = 0; + } + + void SetClearFlag() override { + ClearFlag = true; + } + + public: + THolder<TTempBuf> Buffer; + TVector<TTempBuf*> Blocks; + const TVal* Rec; + size_t RecSize; + size_t ExtLenSize; + size_t ExtSize; + const ui8* Page; + TPageIter* const PageIter; + int RecNum; //!< count of recs in current block + int BlockNum; + ECompressionAlgorithm Algo; + bool ClearFlag; + }; + + class TZLibCompressionImpl { + public: + static const ECompressionAlgorithm Code = MBDB_COMPRESSION_ZLIB; + + inline void Init() { + // - + } + + inline void Term() { + // - + } + + inline size_t CompressBound(size_t size) const noexcept { + return ::compressBound(size); + } + + inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { + uLongf size = outSize; + + if (compress((Bytef*)out, &size, (const Bytef*)in, inSize) != Z_OK) + ythrow yexception() << "not compressed"; + outSize = size; + } + }; + + class TFastlzCompressionImpl { + public: + static const ECompressionAlgorithm Code = MBDB_COMPRESSION_FASTLZ; + + inline void Init() { + // - + } + + inline void Term() { + // - + } + + inline size_t CompressBound(size_t size) const noexcept { + size_t rval = size_t(size * 1.07); + return rval < 66 ? 66 : rval; + } + + inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { + outSize = yfastlz_compress_level(2, in, inSize, out); + if (!outSize) + ythrow yexception() << "not compressed"; + } + }; + + class TSnappyCompressionImpl { + public: + static const ECompressionAlgorithm Code = MBDB_COMPRESSION_SNAPPY; + + inline void Init() { + // - + } + + inline void Term() { + // - + } + + inline size_t CompressBound(size_t size) const noexcept { + return snappy::MaxCompressedLength(size); + } + + inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { + snappy::RawCompress((const char*)in, inSize, (char*)out, &outSize); + } + }; + +} + +using TFakeCompression = void; +using TZLibCompression = NMicroBDB::TZLibCompressionImpl; +using TFastlzCompression = NMicroBDB::TFastlzCompressionImpl; +using TSnappyCompression = NMicroBDB::TSnappyCompressionImpl; diff --git a/library/cpp/microbdb/safeopen.h b/library/cpp/microbdb/safeopen.h new file mode 100644 index 0000000000..c328ffd575 --- /dev/null +++ b/library/cpp/microbdb/safeopen.h @@ -0,0 +1,792 @@ +#pragma once + +// util +#include <util/generic/yexception.h> +#include <util/generic/vector.h> +#include <util/string/util.h> +#include <util/system/mutex.h> +#include <thread> + +#include "microbdb.h" + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4706) /*assignment within conditional expression*/ +#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ +#endif + +template <typename TVal, typename TPageFile = TInputPageFile, typename TIterator = TInputPageIterator<TPageFile>> +class TInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> { +public: + typedef TVal TRec; + typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> TBase; + + TInDatFile(const TString& name, size_t pages, int pagesOrBytes = 1) + : Name(name) + , Pages(pages) + , PagesOrBytes(pagesOrBytes) + { + } + + ~TInDatFile() { + Close(); + } + + void Open(const TString& fname, bool direct = false) { + ui32 gotRecordSig = 0; + int ret = TBase::Open(fname.data(), Pages, PagesOrBytes, &gotRecordSig, direct); + if (ret) { + // XXX: print record type name, not type sig + ythrow yexception() << ErrorMessage(ret, "Failed to open input file", fname, TVal::RecordSig, gotRecordSig); + } + Name = fname; + } + + void OpenStream(TAutoPtr<IInputStream> input) { + ui32 gotRecordSig = 0; + int ret = TBase::Open(input, Pages, PagesOrBytes, &gotRecordSig); + if (ret) { + // XXX: print record type name, not type sig + ythrow yexception() << ErrorMessage(ret, "Failed to open input file", Name, TVal::RecordSig, gotRecordSig); + } + } + + void Close() { + int ret; + if (IsOpen() && (ret = TBase::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing input file", Name); + if ((ret = TBase::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing input file", Name); + } + + const char* GetName() const { + return Name.data(); + } + + using TBase::Current; + using TBase::Freeze; + using TBase::GetError; + using TBase::GetExtInfo; + using TBase::GetExtInfoRaw; + using TBase::GetExtSize; + using TBase::GetLastPage; + using TBase::GetPageNum; + using TBase::GetPageSize; + using TBase::GetRecSize; + using TBase::GotoLastPage; + using TBase::GotoPage; + using TBase::IsEof; + using TBase::IsOpen; + using TBase::Next; + using TBase::Skip; + using TBase::Unfreeze; + +protected: + TString Name; + size_t Pages; + int PagesOrBytes; +}; + +template <typename TVal> +class TMappedInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> { +public: + typedef TVal TRec; + typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> TBase; + + TMappedInDatFile(const TString& name, size_t /* pages */, int /* pagesOrBytes */) + : Name(name) + { + } + + ~TMappedInDatFile() { + Close(); + } + + void Open(const TString& fname) { + int ret = TBase::Open(fname.data()); + if (ret) + ythrow yexception() << ErrorMessage(ret, "Failed to open mapped file", fname, TVal::RecordSig); + Name = fname; + } + + void Close() { + int ret; + if (IsOpen() && (ret = TBase::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing mapped file", Name); + if ((ret = TBase::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing mapped file", Name); + } + + const char* GetName() const { + return Name.data(); + } + + using TBase::Current; + using TBase::GetError; + using TBase::GetExtInfo; + using TBase::GetExtInfoRaw; + using TBase::GetLastPage; + using TBase::GetPageNum; + using TBase::GetPageSize; + using TBase::GotoLastPage; + using TBase::GotoPage; + using TBase::IsEof; + using TBase::IsOpen; + using TBase::Next; + using TBase::Skip; + +protected: + TString Name; +}; + +template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDatFile: protected TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> { +public: + typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> TBase; + + TOutDatFile(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : Name(name) + , PageSize(pagesize) + , Pages(pages) + , PagesOrBytes(pagesOrBytes) + { + } + + ~TOutDatFile() { + Close(); + } + + void Open(const char* fname, bool direct = false) { + int ret = TBase::Open(fname, PageSize, Pages, PagesOrBytes, direct); + if (ret) + ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); + Name = fname; + } + + void Open(const TString& fname) { + Open(fname.data()); + } + + void OpenStream(TAutoPtr<IOutputStream> output) { + int ret = TBase::Open(output, PageSize, Pages, PagesOrBytes); + if (ret) + ythrow yexception() << ErrorMessage(ret, "Failed to open output stream", Name); + } + + void Close() { + int ret; + if ((ret = TBase::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); + if ((ret = TBase::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); + } + + const char* GetName() const { + return Name.data(); + } + + using TBase::Freeze; + using TBase::GetError; + using TBase::GetPageSize; + using TBase::IsEof; + using TBase::IsOpen; + using TBase::Offset; + using TBase::Push; + using TBase::PushWithExtInfo; + using TBase::Reserve; + using TBase::Unfreeze; + +protected: + TString Name; + size_t PageSize, Pages; + int PagesOrBytes; +}; + +template <typename TVal, typename TCompressor, typename TPageFile> +class TOutDatFileArray; + +template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDatFileArray { + typedef TOutDatFile<TVal, TCompressor, TPageFile> TFileType; + +public: + TOutDatFileArray(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : Name(name) + , PageSize(pagesize) + , Pages(pages) + , PagesOrBytes(pagesOrBytes) + , NumFiles(0) + , Files(nullptr) + { + } + + ~TOutDatFileArray() { + for (int i = 0; i < NumFiles; ++i) { + Files[i].Close(); + Files[i].~TFileType(); + } + free(Files); + Files = nullptr; + NumFiles = 0; + } + + TFileType& operator[](size_t pos) { + return Files[pos]; + } + + void Open(int n, const TString& fname) { + char temp[FILENAME_MAX]; + + Name = fname; + NumFiles = CreateDatObjects(n, fname); + + int i; + try { + for (i = 0; i < NumFiles; ++i) { + sprintf(temp, fname.data(), i); + Files[i].Open(temp); + } + } catch (...) { + while (--i >= 0) + Files[i].Close(); + throw; + } + } + + template <typename TNameBuilder> + void OpenWithCallback(int n, const TNameBuilder& builder) { + NumFiles = CreateDatObjects(n, Name); + + for (int i = 0; i < NumFiles; ++i) + Files[i].Open(builder.GetName(i).data()); + } + + void Close() { + for (int i = 0; i < NumFiles; ++i) + Files[i].Close(); + } + + void CloseMT(ui32 threads) { + int current = 0; + TMutex mutex; + TVector<std::thread> thrs; + thrs.reserve(threads); + for (ui32 i = 0; i < threads; i++) { + thrs.emplace_back([this, ¤t, &mutex]() { + while (true) { + mutex.Acquire(); + int cur = current++; + mutex.Release(); + if (cur >= NumFiles) + break; + Files[cur].Close(); + } + }); + } + for (auto& thread : thrs) { + thread.join(); + } + } + + const char* GetName() const { + return Name.data(); + } + +protected: + int CreateDatObjects(int n, const TString& fname) { + if (!(Files = (TFileType*)malloc(n * sizeof(TFileType)))) + ythrow yexception() << "can't alloc \"" << fname << "\" file array: " << LastSystemErrorText(); + int num = 0; + char temp[FILENAME_MAX]; + for (int i = 0; i < n; ++i, ++num) { + sprintf(temp, "%s[%d]", fname.data(), i); + new (Files + i) TFileType(temp, PageSize, Pages, PagesOrBytes); + } + return num; + } + + TString Name; + size_t PageSize, Pages; + int PagesOrBytes, NumFiles; + TFileType* Files; +}; + +template <typename TVal, typename TKey, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> +class TOutDirectFile: protected TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> { + typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TBase; + +public: + TOutDirectFile(const TString& name, size_t pagesize, size_t pages, size_t ipagesize, size_t ipages, int pagesOrBytes) + : Name(name) + , PageSize(pagesize) + , Pages(pages) + , IdxPageSize(ipagesize) + , IdxPages(ipages) + , PagesOrBytes(pagesOrBytes) + { + } + + ~TOutDirectFile() { + Close(); + } + + void Open(const TString& fname) { + int ret = TBase::Open(fname.data(), PageSize, Pages, IdxPageSize, IdxPages, PagesOrBytes); + if (ret) + ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); + Name = fname; + } + + void Close() { + int ret; + if ((ret = TBase::GetError())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); + if ((ret = TBase::Close())) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); + } + + const char* GetName() const { + return Name.data(); + } + + using TBase::Freeze; + using TBase::Push; + using TBase::PushWithExtInfo; + using TBase::Reserve; + using TBase::Unfreeze; + +protected: + TString Name; + size_t PageSize, Pages, IdxPageSize, IdxPages; + int PagesOrBytes; +}; + +template < + typename TVal, + template <typename T> class TComparer, + typename TCompress = TFakeCompression, + typename TSieve = TFakeSieve<TVal>, + typename TPageFile = TOutputPageFile, + typename TFileTypes = TDefInterFileTypes> +class TDatSorter: protected TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> { + typedef TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> TBase; + +public: + typedef TVal TRec; + +public: + TDatSorter(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : Name(name) + , Memory(memory) + , PageSize(pagesize) + , Pages(pages) + , PagesOrBytes(pagesOrBytes) + { + Templ[0] = 0; + } + + ~TDatSorter() { + Close(); + Templ[0] = 0; + } + + void Open(const TString& dirName) { + int ret; + if (ret = MakeSorterTempl(Templ, dirName.data())) { + Templ[0] = 0; + ythrow yexception() << ErrorMessage(ret, Name + " sorter: bad tempdir", dirName); + } + if ((ret = TBase::Open(Templ, PageSize, Pages, PagesOrBytes))) + ythrow yexception() << ErrorMessage(ret, Name + " sorter: open error, temp dir", Templ); + } + + void Sort(bool direct = false) { + int ret = TBase::Sort(Memory, 1000, direct); + if (ret) + ythrow yexception() << ErrorMessage(ret, Name + " sorter: sort error, temp dir", Templ, TVal::RecordSig); + } + + void SortToFile(const TString& name) { + int ret = TBase::SortToFile(name.data(), Memory); + if (ret) + ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToFile", name, TVal::RecordSig); + } + + void SortToStream(TAutoPtr<IOutputStream> output) { + int ret = TBase::SortToStream(output, Memory); + if (ret) + ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToStream", "", TVal::RecordSig); + } + + void Close() { + int ret1 = TBase::GetError(); + int ret2 = TBase::Close(); + if (Templ[0]) { + *strrchr(Templ, GetDirectorySeparator()) = 0; + RemoveDirWithContents(Templ); + Templ[0] = 0; + } + if (ret1) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret1, Name + "sorter: error before closing"); + if (ret2) + if (!std::uncaught_exception()) + ythrow yexception() << ErrorMessage(ret2, Name + "sorter: error while closing"); + } + + int Sort(size_t memory, int maxportions, bool direct = false) { + return TBase::Sort(memory, maxportions, direct); + } + + const char* GetName() const { + return Name.data(); + } + + using TBase::GetPageSize; + using TBase::GetPages; + using TBase::Next; + using TBase::NextPortion; + using TBase::Push; + using TBase::PushWithExtInfo; + using TBase::UseSegmentSorter; + +protected: + TString Name; + size_t Memory, PageSize, Pages; + int PagesOrBytes; + char Templ[FILENAME_MAX]; +}; + +template <typename TSorter> +class TSorterArray { +public: + typedef TSorter TDatSorter; + +public: + TSorterArray(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : Name(name) + , Memory(memory) + , PageSize(pagesize) + , Pages(pages) + , PagesOrBytes(pagesOrBytes) + , NumSorters(0) + , Sorters(nullptr) + { + } + + ~TSorterArray() { + for (int i = 0; i < NumSorters; ++i) { + Sorters[i].Close(); + Sorters[i].~TSorter(); + } + free(Sorters); + Sorters = nullptr; + NumSorters = 0; + } + + TSorter& operator[](size_t pos) { + return Sorters[pos]; + } + + void Open(int n, const TString& fname, size_t memory = 0) { + if (!(Sorters = (TSorter*)malloc(n * sizeof(TSorter)))) + ythrow yexception() << "can't alloc \"" << fname << "\" sorter array: " << LastSystemErrorText(); + NumSorters = n; + char temp[FILENAME_MAX]; + if (memory) + Memory = memory; + for (int i = 0; i < NumSorters; ++i) { + sprintf(temp, "%s[%d]", Name.data(), i); + new (Sorters + i) TSorter(temp, Memory, PageSize, Pages, PagesOrBytes); + } + for (int i = 0; i < NumSorters; ++i) + Sorters[i].Open(fname); + } + + void Close() { + for (int i = 0; i < NumSorters; ++i) + Sorters[i].Close(); + } + + const char* GetName() const { + return Name.data(); + } + +protected: + TString Name; + size_t Memory, PageSize, Pages; + int PagesOrBytes, NumSorters; + TSorter* Sorters; +}; + +template <typename TVal, template <typename T> class TCompare, typename TSieve = TFakeSieve<TVal>> +class TDatSorterArray: public TSorterArray<TDatSorter<TVal, TCompare, TSieve>> { +public: + TDatSorterArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : TSorterArray<TDatSorter<TVal, TCompare, TSieve>>(name, memory, pagesize, pages, pagesOrBytes) + { + } +}; + +template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, + typename TSieve = TFakeSieve<TVal>, typename TPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterMemo: public TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> { + typedef TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> TSorter; + +public: + TOutDatFile<TVal> Memo; + TString Home; + bool OpenReq; + bool Opened; + bool UseDirectWrite; + +public: + TDatSorterMemo(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : TSorter(name, memory, pagesize, pages, pagesOrBytes) + , Memo(name, pagesize, memory, 0) + { + OpenReq = false; + Opened = false; + UseDirectWrite = false; + } + + void Open(const TString& home) { + OpenReq = true; + // TSorter::Open(home); + Home = home; + Memo.Open(nullptr); + Memo.Freeze(); + } + + void Reopen(const char* home) { + Close(); + Open(home); + } + + void Open() { + if (!OpenReq) { + OpenReq = true; + Memo.Open(nullptr); + Memo.Freeze(); + } + } + + void OpenIfNeeded() { + if (OpenReq && !Opened) { + if (!Home) + ythrow yexception() << "Temp directory not specified, call Open(char*) first : " << TSorter::Name; + TSorter::Open(Home); + Opened = true; + } + } + + TVal* Reserve(size_t len) { + if (TExtInfoType<TVal>::Exists) + return ReserveWithExt(len, 0); + + TVal* u = Memo.Reserve(len); + if (!u) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Freeze(); + u = Memo.Reserve(len); + } + TSorter::PushWithExtInfo(u); + return u; + } + + TVal* ReserveWithExt(size_t len, size_t extSize) { + size_t fullLen = len + len_long((i64)extSize) + extSize; + TVal* u = Memo.Reserve(fullLen); + if (!u) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Freeze(); + u = Memo.Reserve(fullLen); + if (!u) { + if (fullLen > Memo.GetPageSize()) { + ythrow yexception() << "Size of element and " << len << " size of extInfo " << extSize + << " is larger than page size " << Memo.GetPageSize(); + } + ythrow yexception() << "going to insert a null pointer. Bad."; + } + } + out_long((i64)extSize, (char*)u + len); + TSorter::PushWithExtInfo(u); + return u; + } + + char* GetReservedExt(TVal* rec, size_t len, size_t extSize) { + return (char*)rec + len + len_long((i64)extSize); + } + + const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { + const TVal* u = Memo.Push(v, extInfo); + if (!u) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Freeze(); + u = Memo.Push(v, extInfo); + if (!u) { + if (SizeOf(v) > Memo.GetPageSize()) { + ythrow yexception() << "Size of element " << SizeOf(v) + << " is larger than page size " << Memo.GetPageSize(); + } + ythrow yexception() << "going to insert a null pointer. Bad."; + } + } + TSorter::PushWithExtInfo(u); + return u; + } + + const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { + const TVal* u = Memo.Push(v, extInfoRaw, extLen); + if (!u) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Freeze(); + u = Memo.Push(v, extInfoRaw, extLen); + if (!u) { + if (SizeOf(v) > Memo.GetPageSize()) { + ythrow yexception() << "Size of element " << SizeOf(v) + << " is larger than page size " << Memo.GetPageSize(); + } + ythrow yexception() << "going to insert a null pointer. Bad.."; + } + } + TSorter::PushWithExtInfo(u); + return u; + } + + const TVal* PushWithExtInfo(const TVal* v) { + const TVal* u = Memo.PushWithExtInfo(v); + if (!u) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Freeze(); + u = Memo.PushWithExtInfo(v); + if (!u) { + if (SizeOf(v) > Memo.GetPageSize()) { + ythrow yexception() << "Size of element " << SizeOf(v) + << " is larger than page size " << Memo.GetPageSize(); + } + ythrow yexception() << "going to insert a null pointer. Bad..."; + } + } + TSorter::PushWithExtInfo(u); + return u; + } + + void Sort(bool direct = false) { + if (Opened) { + TSorter::NextPortion(UseDirectWrite); + Memo.Close(); + OpenReq = false; + TSorter::Sort(direct); + } else { + TSorter::SortPortion(); + } + } + + const TVal* Next() { + return Opened ? TSorter::Next() : TSorter::Nextp(); + } + + bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { + return NMicroBDB::GetExtInfo(Current(), extInfo); + } + + const ui8* GetExtInfoRaw(size_t* len) const { + return NMicroBDB::GetExtInfoRaw(Current(), len); + } + + const TVal* Current() const { + return Opened ? TSorter::Current() : TSorter::Currentp(); + } + + int NextPortion() { + OpenIfNeeded(); + return TSorter::NextPortion(UseDirectWrite); + } + + void SortToFile(const char* name) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Close(); + OpenReq = false; + TSorter::SortToFile(name); + } + + void SortToStream(TAutoPtr<IOutputStream> output) { + OpenIfNeeded(); + TSorter::NextPortion(UseDirectWrite); + Memo.Close(); + OpenReq = false; + TSorter::SortToStream(output); + } + + template <typename TKey, typename TOutCompress> + void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { + Sort(); + TOutDirectFile<TVal, TKey, TOutCompress> out(TSorter::Name, TSorter::PageSize, TSorter::Pages, ipagesize, ipages, TSorter::PagesOrBytes); + out.Open(name); + while (const TVal* rec = Next()) + out.PushWithExtInfo(rec); + out.Close(); + } + + template <typename TKey> + void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { + SortToDirectFile<TKey, TCompress>(name, ipagesize, ipages); + } + + void CloseSorter() { + if (Opened) + TSorter::Close(); + else + TSorter::Closep(); + Memo.Freeze(); + Opened = false; + } + + void Close() { + if (Opened) + TSorter::Close(); + else + TSorter::Closep(); + Memo.Close(); + OpenReq = false; + Opened = false; + } + + int SavePortions(const char* mask) { + return TSorter::SavePortions(mask, UseDirectWrite); + } + +public: + using TSorter::RestorePortions; +}; + +template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, + typename TSieve = TFakeSieve<TVal>, class TPageFile = TOutputPageFile, class TFileTypes = TDefInterFileTypes> +class TDatSorterMemoArray: public TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> { +public: + typedef TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> TBase; + + TDatSorterMemoArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) + : TBase(name, memory, pagesize, pages, pagesOrBytes) + { + } +}; + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif diff --git a/library/cpp/microbdb/sorter.h b/library/cpp/microbdb/sorter.h new file mode 100644 index 0000000000..b2e7390377 --- /dev/null +++ b/library/cpp/microbdb/sorter.h @@ -0,0 +1,677 @@ +#pragma once + +#include <util/ysaveload.h> +#include <util/generic/algorithm.h> +#include <contrib/libs/libc_compat/include/link/link.h> + +#include "header.h" +#include "heap.h" +#include "extinfo.h" +#include "input.h" +#include "output.h" + +#ifdef TEST_MERGE +#define MBDB_SORT_FUN ::StableSort +#else +#define MBDB_SORT_FUN ::Sort +#endif + +template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile, typename TFileTypes> +class TDatSorterImpl; + +template <class TVal> +struct TFakeSieve { + static inline int Sieve(TVal*, const TVal*) noexcept { + return 0; + } +}; + +template <class TSieve> +struct TIsSieveFake { + static const bool Result = false; +}; + +template <class T> +struct TIsSieveFake<TFakeSieve<T>> { + static const bool Result = true; +}; + +class TDefInterFileTypes { +public: + typedef TOutputPageFile TOutPageFile; + typedef TInputPageFile TInPageFile; +}; + +//class TCompressedInterFileTypes; + +template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterImplBase: protected THeapIter<TVal, TInDatFileImpl<TVal, TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>>>, TCompare> { + typedef TOutputRecordIterator<TVal, TOutputPageIterator<typename TFileTypes::TOutPageFile>, TFakeIndexer, TCompress> TTmpRecIter; + typedef TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>> TInTmpRecIter; + +public: + typedef TOutDatFileImpl<TVal, TTmpRecIter> TTmpOut; + typedef TInDatFileImpl<TVal, TInTmpRecIter> TTmpIn; + + typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TOutPageFile>, TFakeIndexer, TCompress>> TOut; + typedef THeapIter<TVal, TTmpIn, TCompare> TMyHeap; + typedef TVector<const TVal*> TMyVector; + typedef typename TMyVector::iterator TMyIterator; + + class IPortionSorter { + public: + virtual ~IPortionSorter() { + } + + virtual void Sort(TMyVector&, TTmpOut*) = 0; + }; + + class TDefaultSorter: public IPortionSorter { + public: + void Sort(TMyVector& vector, TTmpOut* out) override { + MBDB_SORT_FUN(vector.begin(), vector.end(), TCompare()); + + const typename TMyVector::const_iterator + end = (TIsSieveFake<TSieve>::Result) ? vector.end() : TDatSorterImplBase::SieveRange(vector.begin(), vector.end()); + + for (typename TMyVector::const_iterator it = vector.begin(); it != end; ++it) { + out->PushWithExtInfo(*it); + } + } + }; + + class TSegmentedSorter: public IPortionSorter { + class TAdaptor { + typedef typename TMyVector::const_iterator TConstIterator; + + public: + TAdaptor(TConstIterator b, TConstIterator e) + : Curr_(b) + , End_(e) + { + --Curr_; + } + + inline const TVal* Current() const { + return *Curr_; + } + + inline const TVal* Next() { + ++Curr_; + + if (Curr_ == End_) { + return nullptr; + } + + return *Curr_; + } + + private: + TConstIterator Curr_; + TConstIterator End_; + }; + + typedef THeapIter<TVal, TAdaptor, TCompare> TPortionsHeap; + + public: + void Sort(TMyVector& vector, TTmpOut* out) override { + TVector<TAdaptor> bounds; + typename TMyVector::iterator + it = vector.begin(); + const size_t portions = Max<size_t>(1, (vector.size() * sizeof(TVal)) / (4 << 20)); + const size_t step = vector.size() / portions; + + // Sort segments + while (it != vector.end()) { + const typename TMyVector::iterator + end = Min(it + step, vector.end()); + + MBDB_SORT_FUN(it, end, TCompare()); + + bounds.push_back(TAdaptor(it, end)); + + it = end; + } + + // + // Merge result + // + + TPortionsHeap heap(bounds); + + if (TIsSieveFake<TSieve>::Result) { + while (const TVal* val = heap.Next()) { + out->PushWithExtInfo(val); + } + } else { + const TVal* val = heap.Next(); + const TVal* prev = out->PushWithExtInfo(val); + + for (val = heap.Next(); val && prev; val = heap.Next()) { + if (TSieve::Sieve((TVal*)prev, val)) { + continue; + } + + prev = out->PushWithExtInfo(val); + } + + if (prev) { + TSieve::Sieve((TVal*)prev, prev); + } + } + } + }; + +public: + TDatSorterImplBase() + : Sorter(new TDefaultSorter) + { + InFiles = nullptr; + TempBuf = nullptr; + Ptr = Vector.end(); + Cur = nullptr; + Portions = CPortions = Error = 0; + } + + ~TDatSorterImplBase() { + Close(); + } + + int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { + Portions = CPortions = Error = 0; + TempBuf = strdup(templ); + Pagesize = pagesize; + if (pagesOrBytes) + Pages = pages; + else + Pages = pages / pagesize; + Pages = Max(1, Pages); + return 0; + } + + void Push(const TVal* v) { + // Serialized extInfo must follow a record being pushed, therefore, to avoid + // unintentional misusage (as if when you are adding TExtInfo in your record + // type: you may forget to check your sorting routines and get a segfault as + // a result). + // PushWithExtInfo(v) should be called on records with extInfo. + static_assert(!TExtInfoType<TVal>::Exists, "expect !TExtInfoType<TVal>::Exists"); + + Vector.push_back(v); + } + + void PushWithExtInfo(const TVal* v) { + Vector.push_back(v); + } + + int SortPortion() { + Ptr = Vector.end(); + Cur = nullptr; + if (!Vector.size() || Error) + return Error; + + MBDB_SORT_FUN(Vector.begin(), Vector.end(), TCompare()); + + if (!TIsSieveFake<TSieve>::Result) { + const typename TMyVector::iterator + end = SieveRange(Vector.begin(), Vector.end()); + + Vector.resize(end - Vector.begin()); + } + + Ptr = Vector.begin(); + Cur = nullptr; + return 0; + } + + const TVal* Nextp() { + Cur = Ptr == Vector.end() ? nullptr : *Ptr++; + return Cur; + } + + const TVal* Currentp() const { + return Cur; + } + + void Closep() { + Vector.clear(); + Ptr = Vector.end(); + Cur = nullptr; + } + + int NextPortion(bool direct = false) { + if (!Vector.size() || Error) + return Error; + + TTmpOut out; + int ret, ret1; + char fname[FILENAME_MAX]; + + snprintf(fname, sizeof(fname), TempBuf, Portions++); + if ((ret = out.Open(fname, Pagesize, Pages, 1, direct))) + return Error = ret; + + Sorter->Sort(Vector, &out); + + Vector.erase(Vector.begin(), Vector.end()); + ret = out.GetError(); + ret1 = out.Close(); + Error = Error ? Error : ret ? ret : ret1; + if (Error) + unlink(fname); + return Error; + } + + int SavePortions(const char* mask, bool direct = false) { + char srcname[PATH_MAX], dstname[PATH_MAX]; + if (Vector.size()) + NextPortion(direct); + for (int i = 0; i < Portions; i++) { + char num[10]; + sprintf(num, "%i", i); + snprintf(srcname, sizeof(srcname), TempBuf, i); + snprintf(dstname, sizeof(dstname), mask, num); + int res = rename(srcname, dstname); + if (res) + return res; + } + snprintf(dstname, sizeof(dstname), mask, "count"); + TOFStream fcount(dstname); + Save(&fcount, Portions); + fcount.Finish(); + return 0; + } + + int RestorePortions(const char* mask) { + char srcname[PATH_MAX], dstname[PATH_MAX]; + snprintf(srcname, sizeof(srcname), mask, "count"); + TIFStream fcount(srcname); + Load(&fcount, Portions); + for (int i = 0; i < Portions; i++) { + char num[10]; + sprintf(num, "%i", i); + snprintf(dstname, sizeof(dstname), TempBuf, i); + snprintf(srcname, sizeof(srcname), mask, num); + unlink(dstname); + int res = link(srcname, dstname); + if (res) + return res; + } + return 0; + } + + int RestorePortions(const char* mask, ui32 count) { + char srcname[PATH_MAX], dstname[PATH_MAX]; + ui32 portions; + TVector<ui32> counts; + for (ui32 j = 0; j < count; j++) { + snprintf(srcname, sizeof(srcname), mask, j, "count"); + TIFStream fcount(srcname); + Load(&fcount, portions); + counts.push_back(portions); + Portions += portions; + } + ui32 p = 0; + for (ui32 j = 0; j < count; j++) { + int cnt = counts[j]; + for (int i = 0; i < cnt; i++, p++) { + char num[10]; + sprintf(num, "%i", i); + snprintf(dstname, sizeof(dstname), TempBuf, p); + snprintf(srcname, sizeof(srcname), mask, j, num); + unlink(dstname); + int res = link(srcname, dstname); + if (res) { + fprintf(stderr, "Can not link %s to %s\n", srcname, dstname); + return res; + } + } + } + return 0; + } + + int Sort(size_t memory, int maxportions = 1000, bool direct = false) { + int ret, end, beg, i; + char fname[FILENAME_MAX]; + + if (Vector.size()) + NextPortion(); + + if (Error) + return Error; + if (!Portions) { + TMyHeap::Init(&DummyFile, 1); // closed file + HPages = 1; + return 0; + } + + Optimize(memory, maxportions); + if (!(InFiles = new TTmpIn[MPortions])) + return MBDB_NO_MEMORY; + + for (beg = 0; beg < Portions && !Error; beg = end) { + end = (int)Min(beg + FPortions, Portions); + for (i = beg; i < end && !Error; i++) { + snprintf(fname, sizeof(fname), TempBuf, i); + if ((ret = InFiles[i - beg].Open(fname, HPages, 1, nullptr, direct))) + Error = Error ? Error : ret; + } + if (Error) + return Error; + TMyHeap::Init(InFiles, end - beg); + if (end != Portions) { + TTmpOut out; + const TVal* v; + snprintf(fname, sizeof(fname), TempBuf, Portions++); + if ((ret = out.Open(fname, Pagesize, HPages))) + return Error = Error ? Error : ret; + while ((v = TMyHeap::Next())) + out.PushWithExtInfo(v); + ret = out.GetError(); + Error = Error ? Error : ret; + ret = out.Close(); + Error = Error ? Error : ret; + for (i = beg; i < end; i++) { + ret = InFiles[i - beg].Close(); + Error = Error ? Error : ret; + snprintf(fname, sizeof(fname), TempBuf, CPortions++); + unlink(fname); + } + } + FPortions = MPortions; + } + return Error; + } + + int Close() { + char fname[FILENAME_MAX]; + delete[] InFiles; + InFiles = nullptr; + Closep(); + for (int i = CPortions; i < Portions; i++) { + snprintf(fname, sizeof(fname), TempBuf, i); + unlink(fname); + } + CPortions = Portions = 0; + free(TempBuf); + TempBuf = nullptr; + return Error; + } + + void UseSegmentSorter() { + Sorter.Reset(new TSegmentedSorter); + } + + inline int GetError() const { + return Error; + } + + inline int GetPages() const { + return Pages; + } + + inline int GetPageSize() const { + return Pagesize; + } + +private: + static TMyIterator SieveRange(const TMyIterator begin, const TMyIterator end) { + TMyIterator it = begin; + TMyIterator prev = begin; + + for (++it; it != end; ++it) { + if (TSieve::Sieve((TVal*)*prev, *it)) { + continue; + } + + ++prev; + + if (it != prev) { + *prev = *it; + } + } + + TSieve::Sieve((TVal*)*prev, *prev); + + return ++prev; + } + +protected: + void Optimize(size_t memory, int maxportions, size_t fbufmax = 256u << 20) { + maxportions = (int)Min((size_t)maxportions, memory / Pagesize) - 1; + size_t maxpages = Max((size_t)1u, fbufmax / Pagesize); + + if (maxportions <= 2) { + FPortions = MPortions = 2; + HPages = 1; + return; + } + if (maxportions >= Portions) { + FPortions = MPortions = Portions; + HPages = (int)Min(memory / ((Portions + 1) * Pagesize), maxpages); + return; + } + if (((Portions + maxportions - 1) / maxportions) <= maxportions) { + while (((Portions + maxportions - 1) / maxportions) <= maxportions) + --maxportions; + MPortions = ++maxportions; + int total = ((Portions + maxportions - 1) / maxportions) + Portions; + FPortions = (total % maxportions) ? (total % maxportions) : MPortions; + HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); + return; + } + FPortions = MPortions = maxportions; + HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); + } + + TMyVector Vector; + typename TMyVector::iterator Ptr; + const TVal* Cur; + TTmpIn *InFiles, DummyFile; + char* TempBuf; + int Portions, CPortions, Pagesize, Pages, Error; + int FPortions, MPortions, HPages; + THolder<IPortionSorter> Sorter; +}; + +template <class TVal, class TCompare, typename TCompress> +class TDatSorterImpl<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> + : public TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> { + typedef TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> TBase; + +public: + int SortToFile(const char* name, size_t memory, int maxportions = 1000) { + int ret = TBase::Sort(memory, maxportions); + if (ret) + return ret; + typename TBase::TOut out; + if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) + return ret; + const TVal* rec; + while ((rec = Next())) + out.PushWithExtInfo(rec); + if ((ret = out.GetError())) + return ret; + if ((ret = out.Close())) + return ret; + if ((ret = TBase::Close())) + return ret; + return 0; + } + + int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { + int ret = TBase::Sort(memory, maxportions); + if (ret) + return ret; + typename TBase::TOut out; + if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) + return ret; + const TVal* rec; + while ((rec = Next())) + out.PushWithExtInfo(rec); + if ((ret = out.GetError())) + return ret; + if ((ret = out.Close())) + return ret; + if ((ret = TBase::Close())) + return ret; + return 0; + } + + const TVal* Next() { + return TBase::TMyHeap::Next(); + } + + const TVal* Current() const { + return TBase::TMyHeap::Current(); + } + + bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { + return TBase::TMyHeap::GetExtInfo(extInfo); + } + + const ui8* GetExtInfoRaw(size_t* len) const { + return TBase::TMyHeap::GetExtInfoRaw(len); + } +}; + +template <class TVal, class TCompare, typename TCompress, typename TSieve, + typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> +class TDatSorterImpl: public TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> { + typedef TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> TBase; + +public: + TDatSorterImpl() + : Cur(nullptr) + , Prev(nullptr) + { + } + + int SortToFile(const char* name, size_t memory, int maxportions = 1000) { + int ret = Sort(memory, maxportions); + if (ret) + return ret; + typename TBase::TOut out; + if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) + return ret; + const TVal* rec; + while ((rec = Next())) + out.PushWithExtInfo(rec); + if ((ret = out.GetError())) + return ret; + if ((ret = out.Close())) + return ret; + if ((ret = TBase::Close())) + return ret; + return 0; + } + + int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { + int ret = Sort(memory, maxportions); + if (ret) + return ret; + typename TBase::TOut out; + if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) + return ret; + const TVal* rec; + while ((rec = Next())) + out.PushWithExtInfo(rec); + if ((ret = out.GetError())) + return ret; + if ((ret = out.Close())) + return ret; + if ((ret = TBase::Close())) + return ret; + return 0; + } + + int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { + int res = TBase::Open(templ, pagesize, pages, pagesOrBytes); + Prev = nullptr; + Cur = nullptr; + return res; + } + + int Sort(size_t memory, int maxportions = 1000, bool direct = false) { + int res = TBase::Sort(memory, maxportions, direct); + if (!res) { + const TVal* rec = TBase::TMyHeap::Next(); + if (rec) { + size_t els, es; + size_t sz = NMicroBDB::SizeOfExt(rec, &els, &es); + sz += els + es; + if (!TExtInfoType<TVal>::Exists) + Cur = (TVal*)malloc(sizeof(TVal)); + else + Cur = (TVal*)malloc(TBase::Pagesize); + memcpy(Cur, rec, sz); + } + } + return res; + } + + // Prev = last returned + // Cur = current accumlating with TSieve + + const TVal* Next() { + if (!Cur) { + if (Prev) { + free(Prev); + Prev = nullptr; + } + return nullptr; + } + const TVal* rec; + + if (TIsSieveFake<TSieve>::Result) + rec = TBase::TMyHeap::Next(); + else { + do { + rec = TBase::TMyHeap::Next(); + } while (rec && TSieve::Sieve((TVal*)Cur, rec)); + } + + if (!Prev) { + if (!TExtInfoType<TVal>::Exists) + Prev = (TVal*)malloc(sizeof(TVal)); + else + Prev = (TVal*)malloc(TBase::Pagesize); + } + size_t els, es; + size_t sz = NMicroBDB::SizeOfExt(Cur, &els, &es); + sz += els + es; + memcpy(Prev, Cur, sz); + + if (rec) { + sz = NMicroBDB::SizeOfExt(rec, &els, &es); + sz += els + es; + memcpy(Cur, rec, sz); + } else { + TSieve::Sieve((TVal*)Cur, Cur); + free(Cur); + Cur = nullptr; + } + return Prev; + } + + const TVal* Current() const { + return Prev; + } + + int Close() { + int res = TBase::Close(); + if (Prev) { + free(Prev); + Prev = nullptr; + } + if (Cur) { + free(Cur); + Cur = nullptr; + } + return res; + } + +protected: + TVal* Cur; + TVal* Prev; +}; diff --git a/library/cpp/microbdb/sorterdef.h b/library/cpp/microbdb/sorterdef.h new file mode 100644 index 0000000000..8834b5fff8 --- /dev/null +++ b/library/cpp/microbdb/sorterdef.h @@ -0,0 +1,19 @@ +#pragma once + +#define MAKESORTERTMPL(TRecord, MemberFunc) \ + template <typename T> \ + struct MemberFunc; \ + template <> \ + struct MemberFunc<TRecord> { \ + bool operator()(const TRecord* l, const TRecord* r) { \ + return TRecord ::MemberFunc(l, r) < 0; \ + } \ + int operator()(const TRecord* l, const TRecord* r, int) { \ + return TRecord ::MemberFunc(l, r); \ + } \ + } + +template <typename T> +static inline int compare(const T& a, const T& b) { + return (a < b) ? -1 : (a > b); +} diff --git a/library/cpp/microbdb/utility.h b/library/cpp/microbdb/utility.h new file mode 100644 index 0000000000..5c86061bca --- /dev/null +++ b/library/cpp/microbdb/utility.h @@ -0,0 +1,75 @@ +#pragma once + +#include "microbdb.h" + +template <class TRecord, template <class T> class TCompare> +int SortData(const TFile& ifile, const TFile& ofile, const TDatMetaPage* meta, size_t memory, const char* tmpDir = nullptr) { + char templ[FILENAME_MAX]; + TInDatFileImpl<TRecord> datin; + TOutDatFileImpl<TRecord> datout; + TDatSorterImpl<TRecord, TCompare<TRecord>, TFakeCompression, TFakeSieve<TRecord>> sorter; + const TRecord* u; + int ret; + + const size_t minMemory = (2u << 20); + memory = Max(memory, minMemory + minMemory / 2); + if (datin.Open(ifile, meta, memory - minMemory, 0)) + err(1, "can't read input file"); + + size_t outpages = Max((size_t)2u, minMemory / datin.GetPageSize()); + memory -= outpages * datin.GetPageSize(); + + if (ret = MakeSorterTempl(templ, tmpDir)) + err(1, "can't create tempdir in \"%s\"; error: %d\n", templ, ret); + + if (sorter.Open(templ, datin.GetPageSize(), outpages)) { + *strrchr(templ, LOCSLASH_C) = 0; + RemoveDirWithContents(templ); + err(1, "can't open sorter"); + } + + while (1) { + datin.Freeze(); + while ((u = datin.Next())) + sorter.PushWithExtInfo(u); + sorter.NextPortion(); + if (datin.GetError() || datin.IsEof()) + break; + } + + if (datin.GetError()) { + *strrchr(templ, LOCSLASH_C) = 0; + RemoveDirWithContents(templ); + err(1, "in data file error %d", datin.GetError()); + } + if (datin.Close()) { + *strrchr(templ, LOCSLASH_C) = 0; + RemoveDirWithContents(templ); + err(1, "can't close in data file"); + } + + sorter.Sort(memory); + + if (datout.Open(ofile, datin.GetPageSize(), outpages)) { + *strrchr(templ, LOCSLASH_C) = 0; + RemoveDirWithContents(templ); + err(1, "can't write out file"); + } + + while ((u = sorter.Next())) + datout.PushWithExtInfo(u); + + if (sorter.GetError()) + err(1, "sorter error %d", sorter.GetError()); + if (sorter.Close()) + err(1, "can't close sorter"); + + *strrchr(templ, LOCSLASH_C) = 0; + RemoveDirWithContents(templ); + + if (datout.GetError()) + err(1, "out data file error %d", datout.GetError()); + if (datout.Close()) + err(1, "can't close out data file"); + return 0; +} diff --git a/library/cpp/microbdb/wrappers.h b/library/cpp/microbdb/wrappers.h new file mode 100644 index 0000000000..38eb8edebc --- /dev/null +++ b/library/cpp/microbdb/wrappers.h @@ -0,0 +1,637 @@ +#pragma once + +#include "microbdb.h" + +#define MAKEFILTERTMPL(TRecord, MemberFunc, NS) \ + template <typename T> \ + struct MemberFunc; \ + template <> \ + struct MemberFunc<TRecord> { \ + bool operator()(const TRecord* r) { \ + return NS::MemberFunc(r); \ + } \ + } + +#define MAKEJOINTMPL(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ + template <typename A, typename B> \ + struct MemberFunc; \ + template <> \ + struct MemberFunc<TRecordA, TRecordB> { \ + int operator()(const TRecordA* l, const TRecordB* r) { \ + return NS::MemberFunc(l, r); \ + } \ + }; \ + typedef TMergeRec<TRecordA, TRecordB> TMergeType + +#define MAKEJOINTMPL2(TRecordA, TRecordB, MemberFunc, StructName, TMergeType) \ + template <typename A, typename B> \ + struct StructName; \ + template <> \ + struct StructName<TRecordA, TRecordB> { \ + int operator()(const TRecordA* l, const TRecordB* r) { \ + return MemberFunc(l, r); \ + } \ + }; \ + typedef TMergeRec<TRecordA, TRecordB> TMergeType + +#define MAKEJOINTMPLLEFT(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ + template <typename A, typename B> \ + struct MemberFunc; \ + template <> \ + struct MemberFunc<TRecordA, TRecordB> { \ + int operator()(const TRecordA* l, const TRecordB* r) { \ + return NS::MemberFunc(l->RecA, r); \ + } \ + }; \ + typedef TMergeRec<TRecordA, TRecordB> TMergeType + +template <class TRec> +class IDatNextSource { +public: + virtual const TRec* Next() = 0; + virtual void Work() { + } +}; + +template <class TRec> +class IDatNextReceiver { +public: + IDatNextReceiver(IDatNextSource<TRec>& source) + : Source(source) + { + } + + virtual void Work() { + Source.Work(); + } + +protected: + IDatNextSource<TRec>& Source; +}; + +template <class TInRec, class TOutRec> +class IDatNextChannel: public IDatNextReceiver<TInRec>, public IDatNextSource<TOutRec> { +public: + IDatNextChannel(IDatNextSource<TInRec>& source) + : IDatNextReceiver<TInRec>(source) + { + } + + virtual void Work() { + IDatNextReceiver<TInRec>::Work(); + } +}; + +class IDatWorker { +public: + virtual void Work() = 0; +}; + +template <class TRec> +class IDatPushReceiver { +public: + virtual void Push(const TRec* rec) = 0; + virtual void Work() = 0; +}; + +template <class TRec> +class IDatPushSource { +public: + IDatPushSource(IDatPushReceiver<TRec>& receiver) + : Receiver(receiver) + { + } + + virtual void Work() { + Receiver.Work(); + } + +protected: + IDatPushReceiver<TRec>& Receiver; +}; + +template <class TInRec, class TOutRec> +class IDatPushChannel: public IDatPushReceiver<TInRec>, public IDatPushSource<TOutRec> { +public: + IDatPushChannel(IDatPushReceiver<TOutRec>& receiver) + : IDatPushSource<TOutRec>(receiver) + { + } + + virtual void Work() { + IDatPushSource<TOutRec>::Work(); + } +}; + +template <class TRec> +class IDatNextToPush: public IDatNextReceiver<TRec>, public IDatPushSource<TRec> { + typedef IDatNextReceiver<TRec> TNextReceiver; + typedef IDatPushSource<TRec> TPushSource; + +public: + IDatNextToPush(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) + : TNextReceiver(source) + , TPushSource(receiver) + { + } + + virtual void Work() { + const TRec* rec; + while (rec = TNextReceiver::Source.Next()) + TPushSource::Receiver.Push(rec); + TPushSource::Work(); + TNextReceiver::Work(); + } +}; + +template <class TRec> +class TDatNextPNSplitter: public IDatNextReceiver<TRec>, public IDatNextSource<TRec>, public IDatPushSource<TRec> { +public: + TDatNextPNSplitter(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) + : IDatNextReceiver<TRec>(source) + , IDatNextSource<TRec>() + , IDatPushSource<TRec>(receiver) + { + } + + const TRec* Next() { + const TRec* rec = IDatNextReceiver<TRec>::Source.Next(); + if (rec) { + IDatPushSource<TRec>::Receiver.Push(rec); + return rec; + } else { + return 0; + } + } + + virtual void Work() { + IDatNextReceiver<TRec>::Work(); + IDatPushSource<TRec>::Work(); + } +}; + +template <class TRec, class TOutRecA = TRec, class TOutRecB = TRec> +class TDatPushPPSplitter: public IDatPushReceiver<TRec>, public IDatPushSource<TOutRecA>, public IDatPushSource<TOutRecB> { +public: + TDatPushPPSplitter(IDatPushReceiver<TOutRecA>& receiverA, IDatPushReceiver<TOutRecB>& receiverB) + : IDatPushSource<TOutRecA>(receiverA) + , IDatPushSource<TOutRecB>(receiverB) + { + } + + void Push(const TRec* rec) { + IDatPushSource<TOutRecA>::Receiver.Push(rec); + IDatPushSource<TOutRecB>::Receiver.Push(rec); + } + + void Work() { + IDatPushSource<TOutRecA>::Work(); + IDatPushSource<TOutRecB>::Work(); + } +}; + +template <class TRec> +class TFastInDatFile: public TInDatFile<TRec>, public IDatNextSource<TRec> { +public: + typedef TInDatFile<TRec> Base; + + TFastInDatFile(const char* name, bool open = true, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) + : TInDatFile<TRec>(name, pages, pagesOrBytes) + , FileName(name) + { + if (open) + Base::Open(name); + } + + void Open() { + Base::Open(FileName); + } + + template <class TPassRec> + bool PassToUid(const TRec* inrec, const TPassRec* torec) { + inrec = Base::Current(); + while (inrec && CompareUids(inrec, torec) < 0) + inrec = Base::Next(); + return (inrec && CompareUids(inrec, torec) == 0); + } + + void Work() { + Base::Close(); + } + + const TRec* Next() { + return Base::Next(); + } + +private: + TString FileName; +}; + +template <class TRec> +class TPushOutDatFile: public TOutDatFile<TRec>, public IDatPushReceiver<TRec> { +public: + typedef TOutDatFile<TRec> Base; + + TPushOutDatFile(const char* name, bool open = true) + : Base(name, dbcfg::pg_docuid, dbcfg::fbufsize, 0) + , FileName(name) + { + if (open) + Base::Open(name); + } + + void Open() { + Base::Open(~FileName); + } + + void Push(const TRec* rec) { + Base::Push(rec); + } + + void Work() { + Base::Close(); + } + +private: + TString FileName; +}; + +template <class TRec> +class TNextOutDatFile: public IDatNextToPush<TRec> { +public: + typedef IDatNextToPush<TRec> TBase; + + TNextOutDatFile(const char* name, IDatNextSource<TRec>& source, bool open = true) + : TBase(source, File) + , File(name, open) + { + } + + void Open() { + File.Open(); + } + +private: + TPushOutDatFile<TRec> File; +}; + +template <class TVal, template <typename T> class TCompare> +class TNextDatSorterMemo: public TDatSorterMemo<TVal, TCompare>, public IDatNextChannel<TVal, TVal> { + typedef TDatSorterMemo<TVal, TCompare> TImpl; + +public: + TNextDatSorterMemo(IDatNextSource<TVal>& source, const char* dir = dbcfg::fname_temp, const char* name = "yet another sorter", size_t memory = dbcfg::small_sorter_size, size_t pagesize = dbcfg::pg_docuid, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) + : TImpl(name, memory, pagesize, pages, pagesOrBytes) + , IDatNextChannel<TVal, TVal>(source) + , Sorted(false) + { + TImpl::Open(dir); + } + + void Sort() { + const TVal* rec; + while (rec = IDatNextChannel<TVal, TVal>::Source.Next()) { + TImpl::Push(rec); + } + TImpl::Sort(); + Sorted = true; + } + + const TVal* Next() { + if (!Sorted) + Sort(); + return TImpl::Next(); + } + +private: + bool Sorted; + TString Dir; +}; + +template <class TInRec, class TOutRec> +class TDatConverter: public IDatNextChannel<TInRec, TOutRec> { +public: + TDatConverter(IDatNextSource<TInRec>& source) + : IDatNextChannel<TInRec, TOutRec>(source) + { + } + + virtual void Convert(const TInRec& inrec, TOutRec& outrec) { + outrec(inrec); + } + + const TOutRec* Next() { + const TInRec* rec = IDatNextChannel<TInRec, TOutRec>::Source.Next(); + if (!rec) + return 0; + Convert(*rec, CurrentRec); + return &CurrentRec; + } + +private: + TOutRec CurrentRec; +}; + +template <class TRecA, class TRecB> +class TMergeRec { +public: + const TRecA* RecA; + const TRecB* RecB; +}; + +enum NMergeTypes { + MT_JOIN = 0, + MT_ADD = 1, + MT_OVERWRITE = 2, + MT_TYPENUM +}; + +template <class TRecA, class TRecB, template <typename TA, typename TB> class TCompare> +class TNextDatMerger: public IDatNextReceiver<TRecA>, public IDatNextReceiver<TRecB>, public IDatNextSource<TMergeRec<TRecA, TRecB>> { +public: + TNextDatMerger(IDatNextSource<TRecA>& sourceA, IDatNextSource<TRecB>& sourceB, ui8 mergeType) + : IDatNextReceiver<TRecA>(sourceA) + , IDatNextReceiver<TRecB>(sourceB) + , MergeType(mergeType) + , MoveA(false) + , MoveB(false) + , NotInit(true) + { + } + + const TMergeRec<TRecA, TRecB>* Next() { + if (MoveA || NotInit) + SourceARec = IDatNextReceiver<TRecA>::Source.Next(); + if (MoveB || NotInit) + SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); + NotInit = false; + + // Cout << "Next " << SourceARec->HostId << "\t" << SourceBRec->HostId << "\t" << TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) << "\t" << ::compare(SourceARec->HostId, SourceBRec->HostId) << "\t" << ::compare(1, 2) << "\t" << ::compare(2,1) << Endl; + if (MergeType == MT_ADD && SourceARec && (!SourceBRec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { + MergeRec.RecA = SourceARec; + MergeRec.RecB = 0; + MoveA = true; + MoveB = false; + return &MergeRec; + } + + if (MergeType == MT_ADD && SourceBRec && (!SourceARec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { + MergeRec.RecA = 0; + MergeRec.RecB = SourceBRec; + MoveA = false; + MoveB = true; + return &MergeRec; + } + + if (MergeType == MT_ADD && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) == 0) { + MergeRec.RecA = SourceARec; + MergeRec.RecB = SourceBRec; + MoveA = true; + MoveB = true; + return &MergeRec; + } + + while (MergeType == MT_JOIN && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) != 0) { + while (SourceARec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0) { + SourceARec = IDatNextReceiver<TRecA>::Source.Next(); + } + while (SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) > 0) { + SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); + } + } + + if (MergeType == MT_JOIN && SourceARec && SourceBRec) { + MergeRec.RecA = SourceARec; + MergeRec.RecB = SourceBRec; + MoveA = true; + MoveB = true; + return &MergeRec; + } + + MergeRec.RecA = 0; + MergeRec.RecB = 0; + return 0; + } + + void Work() { + IDatNextReceiver<TRecA>::Source.Work(); + IDatNextReceiver<TRecB>::Source.Work(); + } + +private: + TMergeRec<TRecA, TRecB> MergeRec; + const TRecA* SourceARec; + const TRecB* SourceBRec; + ui8 MergeType; + bool MoveA; + bool MoveB; + bool NotInit; +}; + +/*template<class TRec, class TSource, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TPushDatMerger { +public: + TPushDatMerger(TSource& source, TReceiver& receiver, ui8 mergeType) + : Source(source) + , Receiver(receiver) + , MergeType(mergeType) + { + } + + virtual void Init() { + SourceRec = Source.Next(); + } + + virtual void Push(const TRec* rec) { + while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) < 0) { + if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) + Receiver.Push(SourceRec); + SourceRec = Source.Next(); + } + + bool intersected = false; + while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) == 0) { + intersected = true; + if (MergeType == MT_ADD) + Receiver.Push(SourceRec); + SourceRec = Source.Next(); + } + + if (intersected && MergeType == MT_JOIN) + Receiver.Push(rec); + + if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) + Receiver.Push(rec); + } + + virtual void Term() { + if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) { + while (SourceRec) { + Receiver.Push(SourceRec); + SourceRec = Source.Next(); + } + } + } + +private: + TSource& Source; + const TRec* SourceRec; + TReceiver& Receiver; + ui8 MergeType; +};*/ + +/*template <class TRec, class TSourceA, class TSourceB, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TNextDatMerger: public TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> { + typedef TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> TImpl; +public: + TNextDatMerger(TSourceA& sourceA, TSourceB& sourceB, TReceiver& receiver, ui8 mergeType) + : TImpl(sourceA, receiver, mergeType) + , SourceB(sourceB) + { + } + + virtual void Work() { + TImpl::Init(); + while (SourceBRec = SourceB.Next()) { + TImpl::Push(SourceBRec); + } + TImpl::Term(); + } +private: + TSourceB& SourceB; + const TRec* SourceBRec; +};*/ + +/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TFilePushDatMerger: public TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> { + typedef TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; +public: + TFilePushDatMerger(const char* name, TReceiver& receiver, ui8 mergeType) + : TImpl(SourceFile, receiver, mergeType) + , SourceFile(name) + { + } + + virtual void Push(const TRec* rec) { + TImpl::Push(rec); + } + + virtual void Term() { + TImpl::Term(); + } +private: + TFastInDatFile<TRec> SourceFile; +};*/ + +/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > +class TFileNextDatMerger: public TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> { + typedef TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; +public: + TFileNextDatMerger(const char* sourceAname, const char* sourceBname, TReceiver& receiver, ui8 mergeType) + : TImpl(FileA, FileB, receiver, mergeType) + , FileA(sourceAname) + , FileB(sourceBname) + { + } + + virtual void Work() { + TImpl::Work(); + } +private: + TFastInDatFile<TRec> FileA; + TFastInDatFile<TRec> FileB; +};*/ + +template <class TRec, template <typename T> class TPredicate> +class TDatNextFilter: public IDatNextChannel<TRec, TRec> { +public: + TDatNextFilter(IDatNextSource<TRec>& source) + : IDatNextChannel<TRec, TRec>(source) + { + } + + virtual const TRec* Next() { + const TRec* rec; + while ((rec = IDatNextChannel<TRec, TRec>::Source.Next()) != 0 && !Check(rec)) { + } + if (!rec) + return 0; + return rec; + } + +protected: + virtual bool Check(const TRec* rec) { + return TPredicate<TRec>()(rec); + } +}; + +template <class TRec, template <typename T> class TPredicate> +class TDatPushFilter: public IDatPushChannel<TRec, TRec> { +public: + TDatPushFilter(IDatPushReceiver<TRec>& receiver) + : IDatPushChannel<TRec, TRec>(receiver) + { + } + + virtual void Push(const TRec* rec) { + if (Check(rec)) + IDatPushChannel<TRec, TRec>::Receiver.Push(rec); + } + +private: + virtual bool Check(const TRec* rec) { + return TPredicate<TRec>()(rec); + } +}; + +template <class TInRec, class TOutRec, template <typename T> class TCompare> +class TDatGrouper: public IDatNextChannel<TInRec, TOutRec> { +public: + TDatGrouper(IDatNextSource<TInRec>& source) + : IDatNextChannel<TInRec, TOutRec>(source) + , Begin(true) + , Finish(false) + , HasOutput(false) + { + } + + const TOutRec* Next() { + while (CurrentRec = IDatNextChannel<TInRec, TOutRec>::Source.Next()) { + int cmp = 0; + if (Begin) { + Begin = false; + OnStart(); + } else if ((cmp = TCompare<TInRec>()(CurrentRec, LastRec, 0)) != 0) { + OnFinish(); + OnStart(); + } + OnRecord(); + LastRec = CurrentRec; + if (HasOutput) { + HasOutput = false; + return &OutRec; + } + } + if (!Finish) + OnFinish(); + Finish = true; + if (HasOutput) { + HasOutput = false; + return &OutRec; + } + return 0; + } + +protected: + virtual void OnStart() = 0; + virtual void OnRecord() = 0; + virtual void OnFinish() = 0; + + const TInRec* CurrentRec; + const TInRec* LastRec; + TOutRec OutRec; + + bool Begin; + bool Finish; + bool HasOutput; +}; |