aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/microbdb
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/microbdb
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/microbdb')
-rw-r--r--library/cpp/microbdb/align.h17
-rw-r--r--library/cpp/microbdb/compressed.h520
-rw-r--r--library/cpp/microbdb/extinfo.h127
-rw-r--r--library/cpp/microbdb/file.cpp220
-rw-r--r--library/cpp/microbdb/file.h225
-rw-r--r--library/cpp/microbdb/hashes.h250
-rw-r--r--library/cpp/microbdb/header.cpp91
-rw-r--r--library/cpp/microbdb/header.h159
-rw-r--r--library/cpp/microbdb/heap.h143
-rw-r--r--library/cpp/microbdb/input.h1027
-rw-r--r--library/cpp/microbdb/microbdb.cpp1
-rw-r--r--library/cpp/microbdb/microbdb.h54
-rw-r--r--library/cpp/microbdb/noextinfo.proto4
-rw-r--r--library/cpp/microbdb/output.h1049
-rw-r--r--library/cpp/microbdb/powersorter.h667
-rw-r--r--library/cpp/microbdb/reader.h354
-rw-r--r--library/cpp/microbdb/safeopen.h792
-rw-r--r--library/cpp/microbdb/sorter.h677
-rw-r--r--library/cpp/microbdb/sorterdef.h19
-rw-r--r--library/cpp/microbdb/utility.h75
-rw-r--r--library/cpp/microbdb/wrappers.h637
21 files changed, 7108 insertions, 0 deletions
diff --git a/library/cpp/microbdb/align.h b/library/cpp/microbdb/align.h
new file mode 100644
index 0000000000..2f8567f134
--- /dev/null
+++ b/library/cpp/microbdb/align.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <util/system/defaults.h>
+
+using TDatAlign = int;
+
+static inline size_t DatFloor(size_t size) {
+ return (size - 1) & ~(sizeof(TDatAlign) - 1);
+}
+
+static inline size_t DatCeil(size_t size) {
+ return DatFloor(size) + sizeof(TDatAlign);
+}
+
+static inline void DatSet(void* ptr, size_t size) {
+ *(TDatAlign*)((char*)ptr + DatFloor(size)) = 0;
+}
diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h
new file mode 100644
index 0000000000..f0c9edfa92
--- /dev/null
+++ b/library/cpp/microbdb/compressed.h
@@ -0,0 +1,520 @@
+#pragma once
+
+#include <util/stream/zlib.h>
+
+#include "microbdb.h"
+#include "safeopen.h"
+
+class TCompressedInputFileManip: public TInputFileManip {
+public:
+ inline i64 GetLength() const {
+ return -1; // Some microbdb logic rely on unknown size of compressed files
+ }
+
+ inline i64 Seek(i64 offset, int whence) {
+ i64 oldPos = DoGetPosition();
+ i64 newPos = offset;
+ switch (whence) {
+ case SEEK_CUR:
+ newPos += oldPos;
+ [[fallthrough]]; // Complier happy. Please fix it!
+ case SEEK_SET:
+ break;
+ default:
+ return -1L;
+ }
+ if (oldPos > newPos) {
+ VerifyRandomAccess();
+ DoSeek(0, SEEK_SET, IsStreamOpen());
+ oldPos = 0;
+ }
+ const size_t bufsize = 1 << 12;
+ char buf[bufsize];
+ for (i64 i = oldPos; i < newPos; i += bufsize)
+ InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i));
+ return newPos;
+ }
+
+ i64 RealSeek(i64 offset, int whence) {
+ InputStream.Destroy();
+ i64 ret = DoSeek(offset, whence, !!CompressedInput);
+ if (ret != -1)
+ DoStreamOpen(DoCreateStream(), true);
+ return ret;
+ }
+
+protected:
+ IInputStream* CreateStream(const TFile& file) override {
+ CompressedInput.Reset(new TUnbufferedFileInput(file));
+ return DoCreateStream();
+ }
+ inline IInputStream* DoCreateStream() {
+ return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip);
+ //return new TLzqDecompress(CompressedInput.Get());
+ }
+ THolder<IInputStream> CompressedInput;
+};
+
+class TCompressedBufferedInputFileManip: public TCompressedInputFileManip {
+protected:
+ IInputStream* CreateStream(const TFile& file) override {
+ CompressedInput.Reset(new TFileInput(file, 0x100000));
+ return DoCreateStream();
+ }
+};
+
+using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>;
+using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>;
+
+template <class TVal>
+struct TGzKey {
+ ui64 Offset;
+ TVal Key;
+
+ static const ui32 RecordSig = TVal::RecordSig + 0x50495a47;
+
+ TGzKey() {
+ }
+
+ TGzKey(ui64 offset, const TVal& key)
+ : Offset(offset)
+ , Key(key)
+ {
+ }
+
+ size_t SizeOf() const {
+ if (this)
+ return sizeof(Offset) + ::SizeOf(&Key);
+ else {
+ size_t sizeOfKey = ::SizeOf((TVal*)NULL);
+ return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0;
+ }
+ }
+};
+
+template <class TVal>
+class TInZIndexFile: protected TInDatFileImpl<TGzKey<TVal>> {
+ typedef TInDatFileImpl<TGzKey<TVal>> TDatFile;
+ typedef TGzKey<TVal> TGzVal;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+
+public:
+ TInZIndexFile()
+ : Index0(nullptr)
+ {
+ }
+
+ int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
+ int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig);
+ if (ret)
+ return ret;
+ if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) {
+ TDatFile::Close();
+ return MBDB_NO_MEMORY;
+ }
+ if (SizeOf((TGzVal*)NULL))
+ RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL));
+ TDatFile::Next();
+ memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize());
+ return 0;
+ }
+
+ int Close() {
+ free(Index0);
+ Index0 = NULL;
+ return TDatFile::Close();
+ }
+
+ inline int GetError() const {
+ return TDatFile::GetError();
+ }
+
+ int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) {
+ assert(IsOpen());
+ if (!SizeOf((TVal*)NULL))
+ return FindVszKey(akey);
+ int pageno;
+ i64 offset;
+ FindKeyOnPage(pageno, offset, Index0, akey);
+ TDatPage* page = TPageIter::GotoPage(pageno + 1);
+ int num_add = (int)offset;
+ FindKeyOnPage(pageno, offset, page, akey);
+ return pageno + num_add;
+ }
+
+ using TDatFile::IsOpen;
+
+ int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) {
+ int pageno;
+ i64 offset;
+ FindVszKeyOnPage(pageno, offset, Index0, akey);
+ TDatPage* page = TPageIter::GotoPage(pageno + 1);
+ int num_add = (int)offset;
+ FindVszKeyOnPage(pageno, offset, page, akey);
+ return pageno + num_add;
+ }
+
+ i64 FindPage(int pageno) {
+ if (!SizeOf((TVal*)NULL))
+ return FindVszPage(pageno);
+ int recsize = DatCeil(SizeOf((TGzVal*)NULL));
+ TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage);
+ if (!page) // can happen if pageno is beyond EOF
+ return -1;
+ unsigned int localpageno = pageno % RecsOnPage;
+ if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF
+ return -1;
+ TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize);
+ return v->Offset;
+ }
+
+ i64 FindVszPage(int pageno) {
+ TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage));
+ TGzVal* prev = cur;
+ unsigned int n = 0;
+ while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) {
+ prev = cur;
+ cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
+ n++;
+ }
+ TDatPage* page = TPageIter::GotoPage(n);
+ unsigned int num_add = (unsigned int)(prev->Offset);
+ n = 0;
+ cur = (TGzVal*)((char*)page + sizeof(TDatPage));
+ while (n < page->RecNum && n + num_add < (unsigned int)pageno) {
+ cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
+ n++;
+ }
+ if (n == page->RecNum) // can happen if pageno is beyond EOF
+ return -1;
+ return cur->Offset;
+ }
+
+protected:
+ void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) {
+ int left = 0;
+ int right = page->RecNum - 1;
+ int recsize = DatCeil(SizeOf((TGzVal*)NULL));
+ while (left < right) {
+ int middle = (left + right) >> 1;
+ if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key)
+ left = middle + 1;
+ else
+ right = middle;
+ }
+ //borders check (left and right)
+ pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1;
+ offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset;
+ }
+
+ void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) {
+ TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage));
+ ui32 RecordSig = page->RecNum;
+ i64 tmpoffset = cur->Offset;
+ for (; RecordSig > 0 && cur->Key < *key; --RecordSig) {
+ tmpoffset = cur->Offset;
+ cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
+ }
+ int idx = page->RecNum - RecordSig - 1;
+ pageno = (idx >= 0) ? idx : 0;
+ offset = tmpoffset;
+ }
+
+ TDatPage* Index0;
+ int RecsOnPage;
+};
+
+template <class TKey>
+class TCompressedIndexedInputPageFile: public TCompressedInputPageFile {
+public:
+ int GotoPage(int pageno);
+
+protected:
+ TInZIndexFile<TKey> KeyFile;
+};
+
+template <class TVal, class TKey>
+class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey,
+ TInDatFileImpl<TVal, TInputRecordIterator<TVal,
+ TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> {
+};
+
+class TCompressedOutputFileManip: public TOutputFileManip {
+public:
+ inline i64 GetLength() const {
+ return -1; // Some microbdb logic rely on unknown size of compressed files
+ }
+
+ inline i64 Seek(i64 offset, int whence) {
+ i64 oldPos = DoGetPosition();
+ i64 newPos = offset;
+ switch (whence) {
+ case SEEK_CUR:
+ newPos += oldPos;
+ [[fallthrough]]; // Compler happy. Please fix it!
+ case SEEK_SET:
+ break;
+ default:
+ return -1L;
+ }
+ if (oldPos > newPos)
+ return -1L;
+
+ const size_t bufsize = 1 << 12;
+ char buf[bufsize] = {0};
+ for (i64 i = oldPos; i < newPos; i += bufsize)
+ OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i));
+ return newPos;
+ }
+
+ i64 RealSeek(i64 offset, int whence) {
+ OutputStream.Destroy();
+ i64 ret = DoSeek(offset, whence, !!CompressedOutput);
+ if (ret != -1)
+ DoStreamOpen(DoCreateStream(), true);
+ return ret;
+ }
+
+protected:
+ IOutputStream* CreateStream(const TFile& file) override {
+ CompressedOutput.Reset(new TUnbufferedFileOutput(file));
+ return DoCreateStream();
+ }
+ inline IOutputStream* DoCreateStream() {
+ return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1);
+ }
+ THolder<IOutputStream> CompressedOutput;
+};
+
+class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip {
+protected:
+ IOutputStream* CreateStream(const TFile& file) override {
+ CompressedOutput.Reset(new TUnbufferedFileOutput(file));
+ return DoCreateStream();
+ }
+ inline IOutputStream* DoCreateStream() {
+ return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000);
+ }
+};
+
+using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>;
+using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>;
+
+template <class TVal>
+class TOutZIndexFile: public TOutDatFileImpl<
+ TGzKey<TVal>,
+ TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> {
+ typedef TOutDatFileImpl<
+ TGzKey<TVal>,
+ TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>>
+ TDatFile;
+ typedef TOutZIndexFile<TVal> TMyType;
+ typedef TGzKey<TVal> TGzVal;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TIndexer TIndexer;
+
+public:
+ TOutZIndexFile() {
+ TotalRecNum = 0;
+ TIndexer::SetCallback(this, DispatchCallback);
+ }
+
+ int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
+ int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
+ if (ret)
+ return ret;
+ if ((ret = TRecIter::GotoPage(1)))
+ TDatFile::Close();
+ return ret;
+ }
+
+ int Close() {
+ TPageIter::Unfreeze();
+ if (TRecIter::RecNum)
+ NextPage(TPageIter::Current());
+ int ret = 0;
+ if (Index0.size() && !(ret = TRecIter::GotoPage(0))) {
+ typename std::vector<TGzVal>::iterator it, end = Index0.end();
+ for (it = Index0.begin(); it != end; ++it)
+ TRecIter::Push(&*it);
+ ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError();
+ }
+ Index0.clear();
+ int ret1 = TDatFile::Close();
+ return ret ? ret : ret1;
+ }
+
+protected:
+ int TotalRecNum; // should be enough because we have GotoPage(int)
+ std::vector<TGzVal> Index0;
+
+ void NextPage(const TDatPage* page) {
+ TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage));
+ Index0.push_back(TGzVal(TotalRecNum, rec->Key));
+ TotalRecNum += TRecIter::RecNum;
+ }
+
+ static void DispatchCallback(void* This, const TDatPage* page) {
+ ((TMyType*)This)->NextPage(page);
+ }
+};
+
+template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile>
+class TOutDirectCompressedFileImpl: public TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> {
+ typedef TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>>
+ TDatFile;
+ typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TIndexer TIndexer;
+ typedef TGzKey<TKey> TMyKey;
+ typedef TOutZIndexFile<TKey> TKeyFile;
+
+protected:
+ using TDatFile::Tell;
+
+public:
+ TOutDirectCompressedFileImpl() {
+ TIndexer::SetCallback(this, DispatchCallback);
+ }
+
+ int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) {
+ char iname[FILENAME_MAX];
+ int ret;
+ if (ipagesize == 0)
+ ipagesize = pagesize;
+
+ ret = TDatFile::Open(fname, pagesize, 1, 1);
+ ret = ret ? ret : DatNameToIdx(iname, fname);
+ ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1);
+ if (ret)
+ TDatFile::Close();
+ return ret;
+ }
+
+ int Close() {
+ if (TRecIter::RecNum)
+ NextPage(TPageIter::Current());
+ int ret = KeyFile.Close();
+ int ret1 = TDatFile::Close();
+ return ret1 ? ret1 : ret;
+ }
+
+ int GetError() const {
+ return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError();
+ }
+
+protected:
+ TKeyFile KeyFile;
+
+ void NextPage(const TDatPage* page) {
+ size_t sz = SizeOf((TMyKey*)NULL);
+ TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>());
+ if (rec) {
+ rec->Offset = Tell();
+ rec->Key = *(TVal*)((char*)page + sizeof(TDatPage));
+ KeyFile.ResetDat();
+ }
+ }
+
+ static void DispatchCallback(void* This, const TDatPage* page) {
+ ((TMyType*)This)->NextPage(page);
+ }
+};
+
+template <class TKey>
+int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) {
+ if (Error)
+ return Error;
+
+ Eof = 0;
+
+ i64 offset = KeyFile.FindPage(pageno);
+ if (!offset)
+ return Error = MBDB_BAD_FILE_SIZE;
+
+ if (offset != FileManip.RealSeek(offset, SEEK_SET))
+ Error = MBDB_BAD_FILE_SIZE;
+
+ return Error;
+}
+
+template <typename TVal>
+class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> {
+public:
+ TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1)
+ : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes)
+ {
+ }
+};
+
+template <typename TVal>
+class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> {
+public:
+ TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes)
+ {
+ }
+};
+
+template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile>
+class TOutDirectCompressedFile: protected TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> {
+ typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase;
+
+public:
+ TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0)
+ : Name(strdup(name))
+ , PageSize(pagesize)
+ , IdxPageSize(ipagesize)
+ {
+ }
+
+ ~TOutDirectCompressedFile() {
+ Close();
+ free(Name);
+ Name = NULL;
+ }
+
+ void Open(const char* fname) {
+ int ret = TBase::Open(fname, PageSize, IdxPageSize);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
+ free(Name);
+ Name = strdup(fname);
+ }
+
+ void Close() {
+ int ret;
+ if ((ret = TBase::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
+ if ((ret = TBase::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
+ }
+
+ const char* GetName() const {
+ return Name;
+ }
+
+ using TBase::Freeze;
+ using TBase::Push;
+ using TBase::Reserve;
+ using TBase::Unfreeze;
+
+protected:
+ char* Name;
+ size_t PageSize, IdxPageSize;
+};
+
+class TCompressedInterFileTypes {
+public:
+ typedef TCompressedBufferedOutputPageFile TOutPageFile;
+ typedef TCompressedBufferedInputPageFile TInPageFile;
+};
diff --git a/library/cpp/microbdb/extinfo.h b/library/cpp/microbdb/extinfo.h
new file mode 100644
index 0000000000..c8389e783c
--- /dev/null
+++ b/library/cpp/microbdb/extinfo.h
@@ -0,0 +1,127 @@
+#pragma once
+
+#include "header.h"
+
+#include <library/cpp/packedtypes/longs.h>
+
+#include <util/generic/typetraits.h>
+
+#include <library/cpp/microbdb/noextinfo.pb.h>
+
+inline bool operator<(const TNoExtInfo&, const TNoExtInfo&) {
+ return false;
+}
+
+namespace NMicroBDB {
+ Y_HAS_MEMBER(TExtInfo);
+
+ template <class, bool>
+ struct TSelectExtInfo;
+
+ template <class T>
+ struct TSelectExtInfo<T, false> {
+ typedef TNoExtInfo TExtInfo;
+ };
+
+ template <class T>
+ struct TSelectExtInfo<T, true> {
+ typedef typename T::TExtInfo TExtInfo;
+ };
+
+ template <class T>
+ class TExtInfoType {
+ public:
+ static const bool Exists = THasTExtInfo<T>::value;
+ typedef typename TSelectExtInfo<T, Exists>::TExtInfo TResult;
+ };
+
+ Y_HAS_MEMBER(MakeExtKey);
+
+ template <class, class, bool>
+ struct TSelectMakeExtKey;
+
+ template <class TVal, class TKey>
+ struct TSelectMakeExtKey<TVal, TKey, false> {
+ static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult*, const TVal* from, const typename TExtInfoType<TVal>::TResult*) {
+ *to = *from;
+ }
+ };
+
+ template <class TVal, class TKey>
+ struct TSelectMakeExtKey<TVal, TKey, true> {
+ static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) {
+ TVal::MakeExtKey(to, toExt, from, fromExt);
+ }
+ };
+
+ template <typename T>
+ inline size_t SizeOfExt(const T* rec, size_t* /*out*/ extLenSize = nullptr, size_t* /*out*/ extSize = nullptr) {
+ if (!TExtInfoType<T>::Exists) {
+ if (extLenSize)
+ *extLenSize = 0;
+ if (extSize)
+ *extSize = 0;
+ return SizeOf(rec);
+ } else {
+ size_t sz = SizeOf(rec);
+ i64 l;
+ int els = in_long(l, (const char*)rec + sz);
+ if (extLenSize)
+ *extLenSize = static_cast<size_t>(els);
+ if (extSize)
+ *extSize = static_cast<size_t>(l);
+ return sz;
+ }
+ }
+
+ template <class T>
+ bool GetExtInfo(const T* rec, typename TExtInfoType<T>::TResult* extInfo) {
+ Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records");
+ if (!rec)
+ return false;
+ size_t els;
+ size_t es;
+ size_t s = SizeOfExt(rec, &els, &es);
+ const ui8* raw = (const ui8*)rec + s + els;
+ return extInfo->ParseFromArray(raw, es);
+ }
+
+ template <class T>
+ const ui8* GetExtInfoRaw(const T* rec, size_t* len) {
+ Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records");
+ if (!rec) {
+ *len = 0;
+ return nullptr;
+ }
+ size_t els;
+ size_t es;
+ size_t s = SizeOfExt(rec, &els, &es);
+ *len = els + es;
+ return (const ui8*)rec + s;
+ }
+
+ // Compares serialized extInfo (e.g. for stable sort)
+ template <class T>
+ int CompareExtInfo(const T* a, const T* b) {
+ Y_VERIFY(TExtInfoType<T>::Exists, "CompareExtInfo should only be used with extended records");
+ size_t elsA, esA;
+ size_t elsB, esB;
+ SizeOfExt(a, &elsA, &esA);
+ SizeOfExt(a, &elsB, &esB);
+ if (esA != esB)
+ return esA - esB;
+ else
+ return memcmp((const ui8*)a + elsA, (const ui8*)b + elsB, esA);
+ }
+
+}
+
+using NMicroBDB::TExtInfoType;
+
+template <class TVal, class TKey>
+struct TMakeExtKey {
+ static const bool Exists = NMicroBDB::THasMakeExtKey<TVal>::value;
+ static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) {
+ NMicroBDB::TSelectMakeExtKey<TVal, TKey, Exists>::Make(to, toExt, from, fromExt);
+ }
+};
diff --git a/library/cpp/microbdb/file.cpp b/library/cpp/microbdb/file.cpp
new file mode 100644
index 0000000000..599a7301a0
--- /dev/null
+++ b/library/cpp/microbdb/file.cpp
@@ -0,0 +1,220 @@
+#include "file.h"
+
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/stat.h>
+
+#ifdef _win32_
+#define S_ISREG(x) !!(x & S_IFREG)
+#endif
+
+TFileManipBase::TFileManipBase()
+ : FileBased(true)
+{
+}
+
+i64 TFileManipBase::DoSeek(i64 offset, int whence, bool isStreamOpen) {
+ if (!isStreamOpen)
+ return -1;
+ VerifyRandomAccess();
+ return File.Seek(offset, (SeekDir)whence);
+}
+
+int TFileManipBase::DoFileOpen(const TFile& file) {
+ File = file;
+ SetFileBased(IsFileBased());
+ return (File.IsOpen()) ? 0 : MBDB_OPEN_ERROR;
+}
+
+int TFileManipBase::DoFileClose() {
+ if (File.IsOpen()) {
+ File.Close();
+ return MBDB_ALREADY_INITIALIZED;
+ }
+ return 0;
+}
+
+int TFileManipBase::IsFileBased() const {
+ bool fileBased = true;
+#if defined(_win_)
+#elif defined(_unix_)
+ FHANDLE h = File.GetHandle();
+ struct stat sb;
+ fileBased = false;
+ if (h != INVALID_FHANDLE && !::fstat(h, &sb) && S_ISREG(sb.st_mode)) {
+ fileBased = true;
+ }
+#else
+#error
+#endif
+ return fileBased;
+}
+
+TInputFileManip::TInputFileManip()
+ : InputStream(nullptr)
+{
+}
+
+int TInputFileManip::Open(const char* fname, bool direct) {
+ int ret;
+ return (ret = DoClose()) ? ret : DoStreamOpen(TFile(fname, RdOnly | (direct ? DirectAligned : EOpenMode())));
+}
+
+int TInputFileManip::Open(IInputStream& input) {
+ int ret;
+ return (ret = DoClose()) ? ret : DoStreamOpen(&input);
+}
+
+int TInputFileManip::Open(TAutoPtr<IInputStream> input) {
+ int ret;
+ return (ret = DoClose()) ? ret : DoStreamOpen(input.Release());
+}
+
+int TInputFileManip::Init(const TFile& file) {
+ int ret;
+ if (ret = DoClose())
+ return ret;
+ DoStreamOpen(file);
+ return 0;
+}
+
+int TInputFileManip::Close() {
+ DoClose();
+ return 0;
+}
+
+ssize_t TInputFileManip::Read(void* buf, unsigned len) {
+ if (!IsStreamOpen())
+ return -1;
+ return InputStream->Load(buf, len);
+}
+
+IInputStream* TInputFileManip::CreateStream(const TFile& file) {
+ return new TUnbufferedFileInput(file);
+}
+
+TMappedInputPageFile::TMappedInputPageFile()
+ : Pagesize(0)
+ , Error(0)
+ , Pagenum(0)
+ , Recordsig(0)
+ , Open(false)
+{
+ Term();
+}
+
+TMappedInputPageFile::~TMappedInputPageFile() {
+ Term();
+}
+
+int TMappedInputPageFile::Init(const char* fname, ui32 recsig, ui32* gotRecordSig, bool) {
+ Mappedfile.init(fname);
+ Open = true;
+
+ TDatMetaPage* meta = (TDatMetaPage*)Mappedfile.getData();
+ if (gotRecordSig)
+ *gotRecordSig = meta->RecordSig;
+
+ if (meta->MetaSig != METASIG)
+ Error = MBDB_BAD_METAPAGE;
+ else if (meta->RecordSig != recsig)
+ Error = MBDB_BAD_RECORDSIG;
+
+ if (Error) {
+ Mappedfile.term();
+ return Error;
+ }
+
+ size_t fsize = Mappedfile.getSize();
+ if (fsize < METASIZE)
+ return Error = MBDB_BAD_FILE_SIZE;
+ fsize -= METASIZE;
+ if (fsize % meta->PageSize)
+ return Error = MBDB_BAD_FILE_SIZE;
+ Pagenum = (int)(fsize / meta->PageSize);
+ Pagesize = meta->PageSize;
+ Recordsig = meta->RecordSig;
+ Error = 0;
+ return Error;
+}
+
+int TMappedInputPageFile::Term() {
+ Mappedfile.term();
+ Open = false;
+ return 0;
+}
+
+TOutputFileManip::TOutputFileManip()
+ : OutputStream(nullptr)
+{
+}
+
+int TOutputFileManip::Open(const char* fname, EOpenMode mode) {
+ if (IsStreamOpen()) {
+ return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip
+ }
+
+ try {
+ if (unlink(fname) && errno != ENOENT) {
+ if (strncmp(fname, "/dev/std", 8))
+ return MBDB_OPEN_ERROR;
+ }
+ TFile file(fname, mode);
+ DoStreamOpen(file);
+ } catch (const TFileError&) {
+ return MBDB_OPEN_ERROR;
+ }
+ return 0;
+}
+
+int TOutputFileManip::Open(IOutputStream& output) {
+ if (IsStreamOpen())
+ return MBDB_ALREADY_INITIALIZED;
+ DoStreamOpen(&output);
+ return 0;
+}
+
+int TOutputFileManip::Open(TAutoPtr<IOutputStream> output) {
+ if (IsStreamOpen())
+ return MBDB_ALREADY_INITIALIZED;
+ DoStreamOpen(output.Release());
+ return 0;
+}
+
+int TOutputFileManip::Init(const TFile& file) {
+ if (IsStreamOpen())
+ return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip
+ DoStreamOpen(file);
+ return 0;
+}
+
+int TOutputFileManip::Rotate(const char* newfname) {
+ if (!IsStreamOpen()) {
+ return MBDB_NOT_INITIALIZED;
+ }
+
+ try {
+ TFile file(newfname, WrOnly | OpenAlways | TruncExisting | ARW | AWOther);
+ DoClose();
+ DoStreamOpen(file);
+ } catch (const TFileError&) {
+ return MBDB_OPEN_ERROR;
+ }
+ return 0;
+}
+
+int TOutputFileManip::Close() {
+ DoClose();
+ return 0;
+}
+
+int TOutputFileManip::Write(const void* buf, unsigned len) {
+ if (!IsStreamOpen())
+ return -1;
+ OutputStream->Write(buf, len);
+ return len;
+}
+
+IOutputStream* TOutputFileManip::CreateStream(const TFile& file) {
+ return new TUnbufferedFileOutput(file);
+}
diff --git a/library/cpp/microbdb/file.h b/library/cpp/microbdb/file.h
new file mode 100644
index 0000000000..f7c7818375
--- /dev/null
+++ b/library/cpp/microbdb/file.h
@@ -0,0 +1,225 @@
+#pragma once
+
+#include "header.h"
+
+#include <library/cpp/deprecated/mapped_file/mapped_file.h>
+
+#include <util/generic/noncopyable.h>
+#include <util/stream/file.h>
+#include <util/system/filemap.h>
+
+#define FS_BLOCK_SIZE 512
+
+class TFileManipBase {
+protected:
+ TFileManipBase();
+
+ virtual ~TFileManipBase() {
+ }
+
+ i64 DoSeek(i64 offset, int whence, bool isStreamOpen);
+
+ int DoFileOpen(const TFile& file);
+
+ int DoFileClose();
+
+ int IsFileBased() const;
+
+ inline void SetFileBased(bool fileBased) {
+ FileBased = fileBased;
+ }
+
+ inline i64 DoGetPosition() const {
+ Y_ASSERT(FileBased);
+ return File.GetPosition();
+ }
+
+ inline i64 DoGetLength() const {
+ return (FileBased) ? File.GetLength() : -1;
+ }
+
+ inline void VerifyRandomAccess() const {
+ Y_VERIFY(FileBased, "non-file stream can not be accessed randomly");
+ }
+
+ inline i64 GetPosition() const {
+ return (i64)File.GetPosition();
+ }
+
+private:
+ TFile File;
+ bool FileBased;
+};
+
+class TInputFileManip: public TFileManipBase {
+public:
+ using TFileManipBase::GetPosition;
+
+ TInputFileManip();
+
+ int Open(const char* fname, bool direct = false);
+
+ int Open(IInputStream& input);
+
+ int Open(TAutoPtr<IInputStream> input);
+
+ int Init(const TFile& file);
+
+ int Close();
+
+ ssize_t Read(void* buf, unsigned len);
+
+ inline bool IsOpen() const {
+ return IsStreamOpen();
+ }
+
+ inline i64 GetLength() const {
+ return DoGetLength();
+ }
+
+ inline i64 Seek(i64 offset, int whence) {
+ return DoSeek(offset, whence, IsStreamOpen());
+ }
+
+ inline i64 RealSeek(i64 offset, int whence) {
+ return Seek(offset, whence);
+ }
+
+protected:
+ inline bool IsStreamOpen() const {
+ return !!InputStream;
+ }
+
+ inline int DoStreamOpen(IInputStream* input, bool fileBased = false) {
+ InputStream.Reset(input);
+ SetFileBased(fileBased);
+ return 0;
+ }
+
+ inline int DoStreamOpen(const TFile& file) {
+ int ret;
+ return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), IsFileBased());
+ }
+
+ virtual IInputStream* CreateStream(const TFile& file);
+
+ inline bool DoClose() {
+ if (IsStreamOpen()) {
+ InputStream.Destroy();
+ return DoFileClose();
+ }
+ return 0;
+ }
+
+ THolder<IInputStream> InputStream;
+};
+
+class TMappedInputPageFile: private TNonCopyable {
+public:
+ TMappedInputPageFile();
+
+ ~TMappedInputPageFile();
+
+ inline int GetError() const {
+ return Error;
+ }
+
+ inline size_t GetPageSize() const {
+ return Pagesize;
+ }
+
+ inline int GetLastPage() const {
+ return Pagenum;
+ }
+
+ inline ui32 GetRecordSig() const {
+ return Recordsig;
+ }
+
+ inline bool IsOpen() const {
+ return Open;
+ }
+
+ inline char* GetData() const {
+ return Open ? (char*)Mappedfile.getData() : nullptr;
+ }
+
+ inline size_t GetSize() const {
+ return Open ? Mappedfile.getSize() : 0;
+ }
+
+protected:
+ int Init(const char* fname, ui32 recsig, ui32* gotRecordSig = nullptr, bool direct = false);
+
+ int Term();
+
+ TMappedFile Mappedfile;
+ size_t Pagesize;
+ int Error;
+ int Pagenum;
+ ui32 Recordsig;
+ bool Open;
+};
+
+class TOutputFileManip: public TFileManipBase {
+public:
+ TOutputFileManip();
+
+ int Open(const char* fname, EOpenMode mode = WrOnly | CreateAlways | ARW | AWOther);
+
+ int Open(IOutputStream& output);
+
+ int Open(TAutoPtr<IOutputStream> output);
+
+ int Init(const TFile& file);
+
+ int Rotate(const char* newfname);
+
+ int Write(const void* buf, unsigned len);
+
+ int Close();
+
+ inline bool IsOpen() const {
+ return IsStreamOpen();
+ }
+
+ inline i64 GetLength() const {
+ return DoGetLength();
+ }
+
+ inline i64 Seek(i64 offset, int whence) {
+ return DoSeek(offset, whence, IsStreamOpen());
+ }
+
+ inline i64 RealSeek(i64 offset, int whence) {
+ return Seek(offset, whence);
+ }
+
+protected:
+ inline bool IsStreamOpen() const {
+ return !!OutputStream;
+ }
+
+ inline int DoStreamOpen(IOutputStream* output, bool fileBased = false) {
+ OutputStream.Reset(output);
+ SetFileBased(fileBased);
+ return 0;
+ }
+
+ inline int DoStreamOpen(const TFile& file) {
+ int ret;
+ return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), true);
+ }
+
+ virtual IOutputStream* CreateStream(const TFile& file);
+
+ inline bool DoClose() {
+ if (IsStreamOpen()) {
+ OutputStream.Destroy();
+ return DoFileClose();
+ }
+ return 0;
+ }
+
+ THolder<IOutputStream> OutputStream;
+};
diff --git a/library/cpp/microbdb/hashes.h b/library/cpp/microbdb/hashes.h
new file mode 100644
index 0000000000..bfd113c3ba
--- /dev/null
+++ b/library/cpp/microbdb/hashes.h
@@ -0,0 +1,250 @@
+#pragma once
+
+#include <library/cpp/on_disk/st_hash/static_hash.h>
+#include <util/system/sysstat.h>
+#include <util/stream/mem.h>
+#include <util/string/printf.h>
+#include <library/cpp/deprecated/fgood/fgood.h>
+
+#include "safeopen.h"
+
+/** This file currently implements creation of mappable read-only hash file.
+ Basic usage of these "static hashes" is defined in util/static_hash.h (see docs there).
+ Additional useful wrappers are available in util/static_hash_map.h
+
+ There are two ways to create mappable hash file:
+
+ A) Fill an THashMap/set structure in RAM, then dump it to disk.
+ This is usually done by save_hash_to_file* functions defined in static_hash.h
+ (see description in static_hash.h).
+
+ B) Prepare all data using external sorter, then create hash file straight on disk.
+ This approach is necessary when there isn't enough RAM to hold entire original THashMap.
+ Implemented in this file as TStaticHashBuilder class.
+
+ Current implementation's major drawback is that the size of the hash must be estimated
+ before the hash is built (bucketCount), which is not always possible.
+ Separate implementation with two sort passes is yet to be done.
+
+ Another problem is that maximum stored size of the element (maxRecSize) must also be
+ known in advance, because we use TDatSorterMemo, etc.
+ */
+
+template <class SizeType>
+struct TSthashTmpRec {
+ SizeType HashVal;
+ SizeType RecSize;
+ char Buf[1];
+ size_t SizeOf() const {
+ return &Buf[RecSize] - (char*)this;
+ }
+ bool operator<(const TSthashTmpRec& than) const {
+ return HashVal < than.HashVal;
+ }
+ static const ui32 RecordSig = 20100124 + sizeof(SizeType) - 4;
+};
+
+template <typename T>
+struct TReplaceMerger {
+ T operator()(const T& oldRecord, const T& newRecord) const {
+ Y_UNUSED(oldRecord);
+ return newRecord;
+ }
+};
+
+/** TStaticHashBuilder template parameters:
+ HashType - THashMap map/set type for which we construct corresponding mappable hash;
+ SizeType - type used to store offsets and length in resulting hash;
+ MergerType - type of object to process records with equal key (see TReplaceMerger for example);
+ */
+
+template <class HashType, class SizeType, class MergerType = TReplaceMerger<typename HashType::mapped_type>>
+struct TStaticHashBuilder {
+ const size_t SrtIOPageSz;
+ const size_t WrBufSz;
+ typedef TSthashTmpRec<SizeType> TIoRec;
+ typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, SizeType> TKeySaver;
+ typedef typename HashType::value_type TValueType;
+ typedef typename HashType::mapped_type TMappedType;
+ typedef typename HashType::key_type TKeyType;
+
+ TDatSorterMemo<TIoRec, TCompareByLess> Srt;
+ TBuffer IoRec, CurrentBlockRecs;
+ TKeySaver KeySaver;
+ typename HashType::hasher Hasher;
+ typename HashType::key_equal Equals;
+ MergerType merger;
+ TString HashFileName;
+ TString OurTmpDir;
+ size_t BucketCount;
+ int FreeBits;
+
+ // memSz is the Sorter buffer size;
+ // maxRecSize is the maximum size (as reported by size_for_st) of our record(s)
+ TStaticHashBuilder(size_t memSz, size_t maxRecSize)
+ : SrtIOPageSz((maxRecSize * 16 + 65535) & ~size_t(65535))
+ , WrBufSz(memSz / 16 >= SrtIOPageSz ? memSz / 16 : SrtIOPageSz)
+ , Srt("unused", memSz, SrtIOPageSz, WrBufSz, 0)
+ , IoRec(sizeof(TIoRec) + maxRecSize)
+ , CurrentBlockRecs(sizeof(TIoRec) + maxRecSize)
+ , BucketCount(0)
+ , FreeBits(0)
+ {
+ }
+
+ ~TStaticHashBuilder() {
+ Close();
+ }
+
+ // if tmpDir is supplied, it must exist;
+ // bucketCount should be HashBucketCount() of the (estimated) element count
+ void Open(const char* fname, size_t bucketCount, const char* tmpDir = nullptr) {
+ if (!tmpDir)
+ tmpDir = ~(OurTmpDir = Sprintf("%s.temp", fname));
+ Mkdir(tmpDir, MODE0775);
+ Srt.Open(tmpDir);
+ HashFileName = fname;
+ BucketCount = bucketCount;
+ int bitCount = 0;
+ while (((size_t)1 << bitCount) <= BucketCount && bitCount < int(8 * sizeof(size_t)))
+ ++bitCount;
+ FreeBits = 8 * sizeof(size_t) - bitCount;
+ }
+
+ void Push(const TValueType& rec) {
+ TIoRec* ioRec = MakeIoRec(rec);
+ Srt.Push(ioRec);
+ }
+ TIoRec* MakeIoRec(const TValueType& rec) {
+ TIoRec* ioRec = (TIoRec*)IoRec.Data();
+ size_t mask = (1 << FreeBits) - 1;
+ size_t hash = Hasher(rec.first);
+ ioRec->HashVal = ((hash % BucketCount) << FreeBits) + ((hash / BucketCount) & mask);
+
+ TMemoryOutput output(ioRec->Buf, IoRec.Capacity() - offsetof(TIoRec, Buf));
+ KeySaver.SaveRecord(&output, rec);
+ ioRec->RecSize = output.Buf() - ioRec->Buf;
+ return ioRec;
+ }
+
+ bool Merge(TVector<std::pair<TKeyType, TMappedType>>& records, size_t newRecordSize) {
+ TSthashIterator<const TKeyType, const TMappedType, typename HashType::hasher,
+ typename HashType::key_equal>
+ newPtr(CurrentBlockRecs.End() - newRecordSize);
+ for (size_t i = 0; i < records.size(); ++i) {
+ if (newPtr.KeyEquals(Equals, records[i].first)) {
+ TMappedType oldValue = records[i].second;
+ TMappedType newValue = newPtr.Value();
+ newValue = merger(oldValue, newValue);
+ records[i].second = newValue;
+ return true;
+ }
+ }
+ records.push_back(std::make_pair(newPtr.Key(), newPtr.Value()));
+ return false;
+ }
+
+ void PutRecord(const char* buf, size_t rec_size, TFILEPtr& f, SizeType& cur_off) {
+ f.fsput(buf, rec_size);
+ cur_off += rec_size;
+ }
+
+ void Finish() {
+ Srt.Sort();
+ // We use variant 1.
+ // Variant 1: read sorter once, write records, fseeks to write buckets
+ // (this doesn't allow fname to be stdout)
+ // Variant 2: read sorter (probably temp. file) twice: write buckets, then write records
+ // (this allows fname to be stdout but seems to be longer)
+ TFILEPtr f(HashFileName, "wb");
+ setvbuf(f, nullptr, _IOFBF, WrBufSz);
+ TVector<SizeType> bucketsBuf(WrBufSz, 0);
+ // prepare header (note: this code must be unified with save_stl.h)
+ typedef sthashtable_nvm_sv<typename HashType::hasher, typename HashType::key_equal, SizeType> sv_type;
+ sv_type sv = {Hasher, Equals, BucketCount, 0, 0};
+ // to do: m.b. use just the size of corresponding object?
+ SizeType cur_off = sizeof(sv_type) +
+ (sv.num_buckets + 1) * sizeof(SizeType);
+ SizeType bkt_wroff = sizeof(sv_type), bkt_bufpos = 0, prev_bkt = 0, prev_hash = (SizeType)-1;
+ bucketsBuf[bkt_bufpos++] = cur_off;
+ // if might me better to write many zeroes here
+ f.seek(cur_off, SEEK_SET);
+ TVector<std::pair<TKeyType, TMappedType>> currentBlock;
+ bool emptyFile = true;
+ size_t prevRecSize = 0;
+ // seek forward
+ while (true) {
+ const TIoRec* rec = Srt.Next();
+ if (currentBlock.empty() && !emptyFile) {
+ if (rec && prev_hash == rec->HashVal) {
+ Merge(currentBlock, prevRecSize);
+ } else {
+ // if there is only one record with this hash, don't recode it, just write
+ PutRecord(CurrentBlockRecs.Data(), prevRecSize, f, cur_off);
+ sv.num_elements++;
+ }
+ }
+ if (!rec || prev_hash != rec->HashVal) {
+ // write buckets table
+ for (size_t i = 0; i < currentBlock.size(); ++i) {
+ TIoRec* ioRec = MakeIoRec(TValueType(currentBlock[i]));
+ PutRecord(ioRec->Buf, ioRec->RecSize, f, cur_off);
+ }
+ sv.num_elements += currentBlock.size();
+ currentBlock.clear();
+ CurrentBlockRecs.Clear();
+ if (rec) {
+ prev_hash = rec->HashVal;
+ }
+ }
+ // note: prev_bkt's semantics here is 'cur_bkt - 1', thus we are actually cycling
+ // until cur_bkt == rec->HashVal *inclusively*
+ while (!rec || prev_bkt != (rec->HashVal >> FreeBits)) {
+ bucketsBuf[bkt_bufpos++] = cur_off;
+ if (bkt_bufpos == bucketsBuf.size()) {
+ f.seek(bkt_wroff, SEEK_SET);
+ size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]);
+ if (f.write(bucketsBuf.begin(), 1, sz) != sz)
+ throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName;
+ bkt_wroff += sz;
+ bkt_bufpos = 0;
+ f.seek(cur_off, SEEK_SET);
+ }
+ prev_bkt++;
+ if (!rec) {
+ break;
+ }
+ assert(prev_bkt < BucketCount);
+ }
+ if (!rec) {
+ break;
+ }
+ emptyFile = false;
+ CurrentBlockRecs.Append(rec->Buf, rec->RecSize);
+ if (!currentBlock.empty()) {
+ Merge(currentBlock, rec->RecSize);
+ } else {
+ prevRecSize = rec->RecSize;
+ }
+ }
+ // finish buckets table
+ f.seek(bkt_wroff, SEEK_SET);
+ size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]);
+ if (sz && f.write(bucketsBuf.begin(), 1, sz) != sz)
+ throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName;
+ bkt_wroff += sz;
+ for (; prev_bkt < BucketCount; prev_bkt++)
+ f.fput(cur_off);
+ // finally write header
+ sv.data_end_off = cur_off;
+ f.seek(0, SEEK_SET);
+ f.fput(sv);
+ f.close();
+ }
+
+ void Close() {
+ Srt.Close();
+ if (+OurTmpDir)
+ rmdir(~OurTmpDir);
+ }
+};
diff --git a/library/cpp/microbdb/header.cpp b/library/cpp/microbdb/header.cpp
new file mode 100644
index 0000000000..f4511d6fb6
--- /dev/null
+++ b/library/cpp/microbdb/header.cpp
@@ -0,0 +1,91 @@
+#include "header.h"
+
+#include <util/stream/output.h>
+#include <util/stream/format.h>
+
+TString ToString(EMbdbErrors error) {
+ TString ret;
+ switch (error) {
+ case MBDB_ALREADY_INITIALIZED:
+ ret = "already initialized";
+ break;
+ case MBDB_NOT_INITIALIZED:
+ ret = "not initialized";
+ break;
+ case MBDB_BAD_DESCRIPTOR:
+ ret = "bad descriptor";
+ break;
+ case MBDB_OPEN_ERROR:
+ ret = "open error";
+ break;
+ case MBDB_READ_ERROR:
+ ret = "read error";
+ break;
+ case MBDB_WRITE_ERROR:
+ ret = "write error";
+ break;
+ case MBDB_CLOSE_ERROR:
+ ret = "close error";
+ break;
+ case MBDB_EXPECTED_EOF:
+ ret = "expected eof";
+ break;
+ case MBDB_UNEXPECTED_EOF:
+ ret = "unxepected eof";
+ break;
+ case MBDB_BAD_FILENAME:
+ ret = "bad filename";
+ break;
+ case MBDB_BAD_METAPAGE:
+ ret = "bad metapage";
+ break;
+ case MBDB_BAD_RECORDSIG:
+ ret = "bad recordsig";
+ break;
+ case MBDB_BAD_FILE_SIZE:
+ ret = "bad file size";
+ break;
+ case MBDB_BAD_PAGESIG:
+ ret = "bad pagesig";
+ break;
+ case MBDB_BAD_PAGESIZE:
+ ret = "bad pagesize";
+ break;
+ case MBDB_BAD_PARM:
+ ret = "bad parm";
+ break;
+ case MBDB_BAD_SYNC:
+ ret = "bad sync";
+ break;
+ case MBDB_PAGE_OVERFLOW:
+ ret = "page overflow";
+ break;
+ case MBDB_NO_MEMORY:
+ ret = "no memory";
+ break;
+ case MBDB_MEMORY_LEAK:
+ ret = "memory leak";
+ break;
+ case MBDB_NOT_SUPPORTED:
+ ret = "not supported";
+ break;
+ default:
+ ret = "unknown";
+ break;
+ }
+ return ret;
+}
+
+TString ErrorMessage(int error, const TString& text, const TString& path, ui32 recordSig, ui32 gotRecordSig) {
+ TStringStream str;
+ str << text;
+ if (path.size())
+ str << " '" << path << "'";
+ str << ": " << ToString(static_cast<EMbdbErrors>(error));
+ if (recordSig && (!gotRecordSig || recordSig != gotRecordSig))
+ str << ". Expected RecordSig: " << Hex(recordSig, HF_ADDX);
+ if (recordSig && gotRecordSig && recordSig != gotRecordSig)
+ str << ", got: " << Hex(gotRecordSig, HF_ADDX);
+ str << ". Last system error text: " << LastSystemErrorText();
+ return str.Str();
+}
diff --git a/library/cpp/microbdb/header.h b/library/cpp/microbdb/header.h
new file mode 100644
index 0000000000..0951d610ea
--- /dev/null
+++ b/library/cpp/microbdb/header.h
@@ -0,0 +1,159 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/string.h>
+#include <util/str_stl.h>
+
+#include <stdio.h>
+
+#define METASIZE (1u << 12)
+#define METASIG 0x12345678u
+#define PAGESIG 0x87654321u
+
+enum EMbdbErrors {
+ MBDB_ALREADY_INITIALIZED = 200,
+ MBDB_NOT_INITIALIZED = 201,
+ MBDB_BAD_DESCRIPTOR = 202,
+ MBDB_OPEN_ERROR = 203,
+ MBDB_READ_ERROR = 204,
+ MBDB_WRITE_ERROR = 205,
+ MBDB_CLOSE_ERROR = 206,
+ MBDB_EXPECTED_EOF = 207,
+ MBDB_UNEXPECTED_EOF = 208,
+ MBDB_BAD_FILENAME = 209,
+ MBDB_BAD_METAPAGE = 210,
+ MBDB_BAD_RECORDSIG = 211,
+ MBDB_BAD_FILE_SIZE = 212,
+ MBDB_BAD_PAGESIG = 213,
+ MBDB_BAD_PAGESIZE = 214,
+ MBDB_BAD_PARM = 215,
+ MBDB_BAD_SYNC = 216,
+ MBDB_PAGE_OVERFLOW = 217,
+ MBDB_NO_MEMORY = 218,
+ MBDB_MEMORY_LEAK = 219,
+ MBDB_NOT_SUPPORTED = 220
+};
+
+TString ToString(EMbdbErrors error);
+TString ErrorMessage(int error, const TString& text, const TString& path = TString(), ui32 recordSig = 0, ui32 gotRecordSig = 0);
+
+enum EPageFormat {
+ MBDB_FORMAT_RAW = 0,
+ MBDB_FORMAT_COMPRESSED = 1,
+ MBDB_FORMAT_NULL = 255
+};
+
+enum ECompressionAlgorithm {
+ MBDB_COMPRESSION_ZLIB = 1,
+ MBDB_COMPRESSION_FASTLZ = 2,
+ MBDB_COMPRESSION_SNAPPY = 3
+};
+
+struct TDatMetaPage {
+ ui32 MetaSig;
+ ui32 RecordSig;
+ ui32 PageSize;
+};
+
+struct TDatPage {
+ ui32 RecNum; //!< number of records on this page
+ ui32 PageSig;
+ ui32 Format : 2; //!< one of EPageFormat
+ ui32 Reserved : 30;
+};
+
+/// Additional page header with compression info
+struct TCompressedPage {
+ ui32 BlockCount;
+ ui32 Algorithm : 4;
+ ui32 Version : 4;
+ ui32 Reserved : 24;
+};
+
+namespace NMicroBDB {
+ /// Header of compressed block
+ struct TCompressedHeader {
+ ui32 Compressed;
+ ui32 Original; /// original size of block
+ ui32 Count; /// number of records in block
+ ui32 Reserved;
+ };
+
+ Y_HAS_MEMBER(AssertValid);
+
+ template <typename T, bool TVal>
+ struct TAssertValid {
+ void operator()(const T*) {
+ }
+ };
+
+ template <typename T>
+ struct TAssertValid<T, true> {
+ void operator()(const T* rec) {
+ return rec->AssertValid();
+ }
+ };
+
+ template <typename T>
+ void AssertValid(const T* rec) {
+ return NMicroBDB::TAssertValid<T, NMicroBDB::THasAssertValid<T>::value>()(rec);
+ }
+
+ Y_HAS_MEMBER(SizeOf);
+
+ template <typename T, bool TVal>
+ struct TGetSizeOf;
+
+ template <typename T>
+ struct TGetSizeOf<T, true> {
+ size_t operator()(const T* rec) {
+ return rec->SizeOf();
+ }
+ };
+
+ template <typename T>
+ struct TGetSizeOf<T, false> {
+ size_t operator()(const T*) {
+ return sizeof(T);
+ }
+ };
+
+ inline char* GetFirstRecord(const TDatPage* page) {
+ switch (page->Format) {
+ case MBDB_FORMAT_RAW:
+ return (char*)page + sizeof(TDatPage);
+ case MBDB_FORMAT_COMPRESSED:
+ // Первая запись на сжатой странице сохраняется несжатой
+ // сразу же после всех заголовков.
+ // Алгоритм сохранения смотреть в TOutputRecordIterator::FlushBuffer
+ return (char*)page + sizeof(TDatPage) + sizeof(TCompressedPage) + sizeof(NMicroBDB::TCompressedHeader);
+ }
+ return (char*)nullptr;
+ }
+}
+
+template <typename T>
+size_t SizeOf(const T* rec) {
+ return NMicroBDB::TGetSizeOf<T, NMicroBDB::THasSizeOf<T>::value>()(rec);
+}
+
+template <typename T>
+size_t MaxSizeOf() {
+ return sizeof(T);
+}
+
+static inline int DatNameToIdx(char iname[/*FILENAME_MAX*/], const char* dname) {
+ if (!dname || !*dname)
+ return MBDB_BAD_FILENAME;
+ const char* ptr;
+ if (!(ptr = strrchr(dname, '/')))
+ ptr = dname;
+ if (!(ptr = strrchr(ptr, '.')))
+ ptr = strchr(dname, 0);
+ if (ptr - dname > FILENAME_MAX - 5)
+ return MBDB_BAD_FILENAME;
+ memcpy(iname, dname, ptr - dname);
+ strcpy(iname + (ptr - dname), ".idx");
+ return 0;
+}
diff --git a/library/cpp/microbdb/heap.h b/library/cpp/microbdb/heap.h
new file mode 100644
index 0000000000..ef5a53534c
--- /dev/null
+++ b/library/cpp/microbdb/heap.h
@@ -0,0 +1,143 @@
+#pragma once
+
+#include "header.h"
+#include "extinfo.h"
+
+#include <util/generic/vector.h>
+
+#include <errno.h>
+
+///////////////////////////////////////////////////////////////////////////////
+
+/// Default comparator
+template <class TVal>
+struct TCompareByLess {
+ inline bool operator()(const TVal* a, const TVal* b) const {
+ return TLess<TVal>()(*a, *b);
+ }
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+template <class TVal, class TIterator, class TCompare = TCompareByLess<TVal>>
+class THeapIter {
+public:
+ int Init(TIterator** iters, int count) {
+ Term();
+ if (!count)
+ return 0;
+ if (!(Heap = (TIterator**)malloc(count * sizeof(TIterator*))))
+ return ENOMEM;
+
+ Count = count;
+ count = 0;
+ while (count < Count)
+ if (count && !(*iters)->Next()) { //here first TIterator is NOT initialized!
+ Count--;
+ iters++;
+ } else {
+ Heap[count++] = *iters++;
+ }
+ count = Count / 2;
+ while (--count > 0) //Heap[0] is not changed!
+ Sift(count, Count); //do not try to replace this code by make_heap
+ return 0;
+ }
+
+ int Init(TIterator* iters, int count) {
+ TVector<TIterator*> a(count);
+ for (int i = 0; i < count; ++i)
+ a[i] = &iters[i];
+ return Init(&a[0], count);
+ }
+
+ THeapIter()
+ : Heap(nullptr)
+ , Count(0)
+ {
+ }
+
+ THeapIter(TIterator* a, TIterator* b)
+ : Heap(nullptr)
+ , Count(0)
+ {
+ TIterator* arr[] = {a, b};
+ if (Init(arr, 2))
+ ythrow yexception() << "can't Init THeapIter";
+ }
+
+ THeapIter(TVector<TIterator>& v)
+ : Heap(nullptr)
+ , Count(0)
+ {
+ if (Init(&v[0], v.size())) {
+ ythrow yexception() << "can't Init THeapIter";
+ }
+ }
+
+ ~THeapIter() {
+ Term();
+ }
+
+ inline const TVal* Current() const {
+ if (!Count)
+ return nullptr;
+ return (*Heap)->Current();
+ }
+
+ inline const TIterator* CurrentIter() const {
+ return *Heap;
+ }
+
+ //for ends of last file will use Heap[0] = Heap[0] ! and
+ //returns Current of eof so Current of eof MUST return NULL
+ //possible this is bug and need fixing
+ const TVal* Next() {
+ if (!Count)
+ return nullptr;
+ if (!(*Heap)->Next()) //on first call unitialized first TIterator
+ *Heap = Heap[--Count]; //will be correctly initialized
+
+ if (Count == 2) {
+ if (TCompare()(Heap[1]->Current(), Heap[0]->Current()))
+ DoSwap(Heap[1], Heap[0]);
+ } else
+ Sift(0, Count);
+
+ return Current();
+ }
+
+ inline bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
+ return (*Heap)->GetExtInfo(extInfo);
+ }
+
+ inline const ui8* GetExtInfoRaw(size_t* len) const {
+ return (*Heap)->GetExtInfoRaw(len);
+ }
+
+ void Term() {
+ ::free(Heap);
+ Heap = nullptr;
+ Count = 0;
+ }
+
+protected:
+ void Sift(int node, int end) {
+ TIterator* x = Heap[node];
+ int son;
+ for (son = 2 * node + 1; son < end; node = son, son = 2 * node + 1) {
+ if (son < (end - 1) && TCompare()(Heap[son + 1]->Current(), Heap[son]->Current()))
+ son++;
+ if (TCompare()(Heap[son]->Current(), x->Current()))
+ Heap[node] = Heap[son];
+ else
+ break;
+ }
+ Heap[node] = x;
+ }
+
+ TIterator** Heap;
+ int Count;
+};
+
+///////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/microbdb/input.h b/library/cpp/microbdb/input.h
new file mode 100644
index 0000000000..13b12f28b7
--- /dev/null
+++ b/library/cpp/microbdb/input.h
@@ -0,0 +1,1027 @@
+#pragma once
+
+#include "header.h"
+#include "file.h"
+#include "reader.h"
+
+#include <util/system/maxlen.h>
+#include <util/system/event.h>
+#include <util/system/thread.h>
+
+#include <thread>
+
+#include <sys/uio.h>
+
+#include <errno.h>
+
+template <class TFileManip>
+inline ssize_t Readv(TFileManip& fileManip, const struct iovec* iov, int iovcnt) {
+ ssize_t read_count = 0;
+ for (int n = 0; n < iovcnt; n++) {
+ ssize_t last_read = fileManip.Read(iov[n].iov_base, iov[n].iov_len);
+ if (last_read < 0)
+ return -1;
+ read_count += last_read;
+ }
+ return read_count;
+}
+
+template <class TVal, typename TBasePageIter>
+class TInputRecordIterator: public TBasePageIter {
+ typedef THolder<NMicroBDB::IBasePageReader<TVal>> TReaderHolder;
+
+public:
+ typedef TBasePageIter TPageIter;
+
+ TInputRecordIterator() {
+ Init();
+ }
+
+ ~TInputRecordIterator() {
+ Term();
+ }
+
+ const TVal* Current() const {
+ return Rec;
+ }
+
+ bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
+ if (!Rec)
+ return false;
+ return Reader->GetExtInfo(extInfo);
+ }
+
+ const ui8* GetExtInfoRaw(size_t* len) const {
+ if (!Rec)
+ return nullptr;
+ return Reader->GetExtInfoRaw(len);
+ }
+
+ size_t GetRecSize() const {
+ return Reader->GetRecSize();
+ }
+
+ size_t GetExtSize() const {
+ return Reader->GetExtSize();
+ }
+
+ const TVal* Next() {
+ if (RecNum)
+ --RecNum;
+ else {
+ TDatPage* page = TPageIter::Next();
+ if (!page) {
+ if (TPageIter::IsFrozen() && Reader.Get())
+ Reader->SetClearFlag();
+ return Rec = nullptr;
+ } else if (!!SelectReader())
+ return Rec = nullptr;
+ RecNum = TPageIter::Current()->RecNum - 1;
+ }
+ return Rec = Reader->Next();
+ }
+
+ // Skip(0) == Current(); Skip(1) == Next()
+ const TVal* Skip(int& num) {
+ // Y_ASSERT(num >= 0); ? otherwise it gets into infinite loop
+ while (num > RecNum) {
+ num -= RecNum + 1;
+ if (!TPageIter::Next() || !!SelectReader()) {
+ RecNum = 0;
+ return Rec = nullptr;
+ }
+ RecNum = TPageIter::Current()->RecNum - 1;
+ Rec = Reader->Next();
+ }
+ ++num;
+ while (--num)
+ Next();
+ return Rec;
+ }
+
+ // begin reading from next page
+ void Reset() {
+ Rec = NULL;
+ RecNum = 0;
+ if (Reader.Get())
+ Reader->Reset();
+ }
+
+protected:
+ int Init() {
+ Rec = nullptr;
+ RecNum = 0;
+ Format = MBDB_FORMAT_NULL;
+ return 0;
+ }
+
+ int Term() {
+ Reader.Reset(nullptr);
+ Format = MBDB_FORMAT_NULL;
+ Rec = nullptr;
+ RecNum = 0;
+ return 0;
+ }
+
+ const TVal* GotoPage(int pageno) {
+ if (!TPageIter::GotoPage(pageno) || !!SelectReader())
+ return Rec = nullptr;
+ RecNum = TPageIter::Current()->RecNum - 1;
+ return Rec = Reader->Next();
+ }
+
+ int SelectReader() {
+ if (!TPageIter::Current())
+ return MBDB_UNEXPECTED_EOF;
+ if (ui32(Format) != TPageIter::Current()->Format) {
+ switch (TPageIter::Current()->Format) {
+ case MBDB_FORMAT_RAW:
+ Reader.Reset(new NMicroBDB::TRawPageReader<TVal, TPageIter>(this));
+ break;
+ case MBDB_FORMAT_COMPRESSED:
+ Reader.Reset(new NMicroBDB::TCompressedReader<TVal, TPageIter>(this));
+ break;
+ default:
+ return MBDB_NOT_SUPPORTED;
+ }
+ Format = EPageFormat(TPageIter::Current()->Format);
+ } else {
+ Y_ASSERT(Reader.Get() != nullptr);
+ Reader->Reset();
+ }
+ return 0;
+ }
+
+ const TVal* Rec;
+ TReaderHolder Reader;
+ int RecNum; //!< number of records on the current page after the current record
+ EPageFormat Format;
+};
+
+template <class TBaseReader>
+class TInputPageIterator: public TBaseReader {
+public:
+ typedef TBaseReader TReader;
+
+ TInputPageIterator()
+ : Buf(nullptr)
+ {
+ Term();
+ }
+
+ ~TInputPageIterator() {
+ Term();
+ }
+
+ TDatPage* Current() {
+ return CurPage;
+ }
+
+ int Freeze() {
+ return (Frozen = (PageNum == -1) ? 0 : PageNum);
+ }
+
+ void Unfreeze() {
+ Frozen = -1;
+ }
+
+ inline int IsFrozen() const {
+ return Frozen + 1;
+ }
+
+ inline size_t GetPageSize() const {
+ return TReader::GetPageSize();
+ }
+
+ inline int GetPageNum() const {
+ return PageNum;
+ }
+
+ inline int IsEof() const {
+ return Eof;
+ }
+
+ TDatPage* Next() {
+ if (PageNum >= Maxpage && ReadBuf()) {
+ Eof = Eof ? Eof : TReader::IsEof();
+ return CurPage = nullptr;
+ }
+ return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
+ }
+
+ TDatPage* GotoPage(int pageno) {
+ if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) {
+ PageNum = pageno;
+ return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
+ }
+ if (IsFrozen() || TReader::GotoPage(pageno))
+ return nullptr;
+ Maxpage = PageNum = pageno - 1;
+ Eof = 0;
+ return Next();
+ }
+
+protected:
+ int Init(size_t pages, int pagesOrBytes) {
+ Term();
+ if (pagesOrBytes == -1)
+ Bufpages = TReader::GetLastPage();
+ else if (pagesOrBytes)
+ Bufpages = pages;
+ else
+ Bufpages = pages / GetPageSize();
+ if (!TReader::GetLastPage()) {
+ Bufpages = 0;
+ assert(Eof == 1);
+ return 0;
+ }
+ int lastPage = TReader::GetLastPage();
+ if (lastPage >= 0)
+ Bufpages = (int)Min(lastPage, Bufpages);
+ Bufpages = Max(2, Bufpages);
+ Eof = 0;
+ ABuf.Alloc(Bufpages * GetPageSize());
+ return (Buf = ABuf.Begin()) ? 0 : ENOMEM;
+ // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM;
+ }
+
+ int Term() {
+ // free(Buf);
+ ABuf.Dealloc();
+ Buf = nullptr;
+ Maxpage = PageNum = Frozen = -1;
+ Bufpages = 0;
+ Pages = 0;
+ Eof = 1;
+ CurPage = nullptr;
+ return 0;
+ }
+
+ int ReadBuf() {
+ int nvec;
+ iovec vec[2];
+ int maxpage = (Frozen == -1 ? Maxpage + 1 : Frozen) + Bufpages - 1;
+ int minpage = Maxpage + 1;
+ if (maxpage < minpage)
+ return EAGAIN;
+ minpage %= Bufpages;
+ maxpage %= Bufpages;
+ if (maxpage < minpage) {
+ vec[0].iov_base = Buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
+ vec[1].iov_base = Buf;
+ vec[1].iov_len = GetPageSize() * (maxpage + 1);
+ nvec = 2;
+ } else {
+ vec[0].iov_base = Buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
+ nvec = 1;
+ }
+ TReader::ReadPages(vec, nvec, &Pages);
+ Maxpage += Pages;
+ return !Pages;
+ }
+
+ int Maxpage, PageNum, Frozen, Bufpages, Eof, Pages;
+ TDatPage* CurPage;
+ // TMappedArray<char> ABuf;
+ TMappedAllocation ABuf;
+ char* Buf;
+};
+
+template <class TBaseReader>
+class TInputPageIteratorMT: public TBaseReader {
+public:
+ typedef TBaseReader TReader;
+
+ TInputPageIteratorMT()
+ : CurBuf(0)
+ , CurReadBuf(0)
+ , Buf(nullptr)
+ {
+ Term();
+ }
+
+ ~TInputPageIteratorMT() {
+ Term();
+ }
+
+ TDatPage* Current() {
+ return CurPage;
+ }
+
+ int Freeze() {
+ return (Frozen = (PageNum == -1) ? 0 : PageNum);
+ }
+
+ void Unfreeze() {
+ Frozen = -1;
+ }
+
+ inline int IsFrozen() const {
+ return Frozen + 1;
+ }
+
+ inline size_t GetPageSize() const {
+ return TReader::GetPageSize();
+ }
+
+ inline int GetPageNum() const {
+ return PageNum;
+ }
+
+ inline int IsEof() const {
+ return Eof;
+ }
+
+ TDatPage* Next() {
+ if (Eof)
+ return CurPage = nullptr;
+ if (PageNum >= Maxpage && ReadBuf()) {
+ Eof = Eof ? Eof : TReader::IsEof();
+ return CurPage = nullptr;
+ }
+ return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
+ }
+
+ TDatPage* GotoPage(int pageno) {
+ if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) {
+ PageNum = pageno;
+ return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
+ }
+ if (IsFrozen() || TReader::GotoPage(pageno))
+ return nullptr;
+ Maxpage = PageNum = pageno - 1;
+ Eof = 0;
+ return Next();
+ }
+
+ void ReadPages() {
+ // fprintf(stderr, "ReadPages started\n");
+ bool eof = false;
+ while (!eof) {
+ QEvent[CurBuf].Wait();
+ if (Finish)
+ return;
+ int pages = ReadCurBuf(Bufs[CurBuf]);
+ PagesM[CurBuf] = pages;
+ eof = !pages;
+ AEvent[CurBuf].Signal();
+ CurBuf ^= 1;
+ }
+ }
+
+protected:
+ int Init(size_t pages, int pagesOrBytes) {
+ Term();
+ if (pagesOrBytes == -1)
+ Bufpages = TReader::GetLastPage();
+ else if (pagesOrBytes)
+ Bufpages = pages;
+ else
+ Bufpages = pages / GetPageSize();
+ if (!TReader::GetLastPage()) {
+ Bufpages = 0;
+ assert(Eof == 1);
+ return 0;
+ }
+ int lastPage = TReader::GetLastPage();
+ if (lastPage >= 0)
+ Bufpages = (int)Min(lastPage, Bufpages);
+ Bufpages = Max(2, Bufpages);
+ Eof = 0;
+ ABuf.Alloc(Bufpages * GetPageSize() * 2);
+ Bufs[0] = ABuf.Begin();
+ Bufs[1] = Bufs[0] + Bufpages * GetPageSize();
+ // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM;
+ Finish = false;
+ ReadThread = std::thread([this]() {
+ TThread::SetCurrentThreadName("DatReader");
+ ReadPages();
+ });
+ QEvent[0].Signal();
+ return Bufs[0] ? 0 : ENOMEM;
+ }
+
+ void StopThread() {
+ Finish = true;
+ QEvent[0].Signal();
+ QEvent[1].Signal();
+ ReadThread.join();
+ }
+
+ int Term() {
+ // free(Buf);
+ if (ReadThread.joinable())
+ StopThread();
+ ABuf.Dealloc();
+ Buf = nullptr;
+ Bufs[0] = nullptr;
+ Bufs[1] = nullptr;
+ Maxpage = MaxpageR = PageNum = Frozen = -1;
+ Bufpages = 0;
+ Pages = 0;
+ Eof = 1;
+ CurPage = nullptr;
+ return 0;
+ }
+
+ int ReadCurBuf(char* buf) {
+ int nvec;
+ iovec vec[2];
+ int maxpage = (Frozen == -1 ? MaxpageR + 1 : Frozen) + Bufpages - 1;
+ int minpage = MaxpageR + 1;
+ if (maxpage < minpage)
+ return EAGAIN;
+ minpage %= Bufpages;
+ maxpage %= Bufpages;
+ if (maxpage < minpage) {
+ vec[0].iov_base = buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
+ vec[1].iov_base = buf;
+ vec[1].iov_len = GetPageSize() * (maxpage + 1);
+ nvec = 2;
+ } else {
+ vec[0].iov_base = buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
+ nvec = 1;
+ }
+ int pages;
+ TReader::ReadPages(vec, nvec, &pages);
+ MaxpageR += pages;
+ return pages;
+ }
+
+ int ReadBuf() {
+ QEvent[CurReadBuf ^ 1].Signal();
+ AEvent[CurReadBuf].Wait();
+ Buf = Bufs[CurReadBuf];
+ Maxpage += (Pages = PagesM[CurReadBuf]);
+ CurReadBuf ^= 1;
+ return !Pages;
+ }
+
+ int Maxpage, MaxpageR, PageNum, Frozen, Bufpages, Eof, Pages;
+ TDatPage* CurPage;
+ // TMappedArray<char> ABuf;
+ ui32 CurBuf;
+ ui32 CurReadBuf;
+ TMappedAllocation ABuf;
+ char* Buf;
+ char* Bufs[2];
+ ui32 PagesM[2];
+ TAutoEvent QEvent[2];
+ TAutoEvent AEvent[2];
+ std::thread ReadThread;
+ bool Finish;
+};
+
+template <typename TFileManip>
+class TInputPageFileImpl: private TNonCopyable {
+protected:
+ TFileManip FileManip;
+
+public:
+ TInputPageFileImpl()
+ : Pagesize(0)
+ , Fd(-1)
+ , Eof(1)
+ , Error(0)
+ , Pagenum(0)
+ , Recordsig(0)
+ {
+ Term();
+ }
+
+ ~TInputPageFileImpl() {
+ Term();
+ }
+
+ inline int IsEof() const {
+ return Eof;
+ }
+
+ inline int GetError() const {
+ return Error;
+ }
+
+ inline size_t GetPageSize() const {
+ return Pagesize;
+ }
+
+ inline int GetLastPage() const {
+ return Pagenum;
+ }
+
+ inline ui32 GetRecordSig() const {
+ return Recordsig;
+ }
+
+ inline bool IsOpen() const {
+ return FileManip.IsOpen();
+ }
+
+protected:
+ int Init(const char* fname, ui32 recsig, ui32* gotrecsig = nullptr, bool direct = false) {
+ Error = FileManip.Open(fname, direct);
+ return Error ? Error : Init(TFile(), recsig, gotrecsig);
+ }
+
+ int Init(const TFile& file, ui32 recsig, ui32* gotrecsig = nullptr) {
+ if (!file.IsOpen() && !FileManip.IsOpen())
+ return MBDB_NOT_INITIALIZED;
+ if (file.IsOpen() && FileManip.IsOpen())
+ return MBDB_ALREADY_INITIALIZED;
+ if (file.IsOpen()) {
+ Error = FileManip.Init(file);
+ if (Error)
+ return Error;
+ }
+
+ // TArrayHolder<ui8> buf(new ui8[METASIZE + FS_BLOCK_SIZE]);
+ // ui8* ptr = (buf.Get() + FS_BLOCK_SIZE - ((ui64)buf.Get() & (FS_BLOCK_SIZE - 1)));
+ TMappedArray<ui8> buf;
+ buf.Create(METASIZE);
+ ui8* ptr = &buf[0];
+ TDatMetaPage* meta = (TDatMetaPage*)ptr;
+ ssize_t size = METASIZE;
+ ssize_t ret;
+ while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) {
+ Y_ASSERT(ret <= size);
+ size -= ret;
+ ptr += ret;
+ }
+ if (size) {
+ FileManip.Close();
+ return Error = MBDB_BAD_METAPAGE;
+ }
+ if (gotrecsig)
+ *gotrecsig = meta->RecordSig;
+ return Init(TFile(), meta, recsig);
+ }
+
+ int Init(TAutoPtr<IInputStream> input, ui32 recsig, ui32* gotrecsig = nullptr) {
+ if (!input && !FileManip.IsOpen())
+ return MBDB_NOT_INITIALIZED;
+ if (FileManip.IsOpen())
+ return MBDB_ALREADY_INITIALIZED;
+
+ Error = FileManip.Open(input);
+ if (Error)
+ return Error;
+
+ TArrayHolder<ui8> buf(new ui8[METASIZE]);
+ ui8* ptr = buf.Get();
+ ssize_t size = METASIZE;
+ ssize_t ret;
+ while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) {
+ Y_ASSERT(ret <= size);
+ size -= ret;
+ ptr += ret;
+ }
+ if (size) {
+ FileManip.Close();
+ return Error = MBDB_BAD_METAPAGE;
+ }
+ TDatMetaPage* meta = (TDatMetaPage*)buf.Get();
+ if (gotrecsig)
+ *gotrecsig = meta->RecordSig;
+ return Init(TFile(), meta, recsig);
+ }
+
+ int Init(const TFile& file, const TDatMetaPage* meta, ui32 recsig) {
+ if (!file.IsOpen() && !FileManip.IsOpen())
+ return MBDB_NOT_INITIALIZED;
+ if (file.IsOpen() && FileManip.IsOpen())
+ return MBDB_ALREADY_INITIALIZED;
+ if (file.IsOpen()) {
+ Error = FileManip.Init(file);
+ if (Error)
+ return Error;
+ }
+
+ if (meta->MetaSig != METASIG)
+ Error = MBDB_BAD_METAPAGE;
+ else if (meta->RecordSig != recsig)
+ Error = MBDB_BAD_RECORDSIG;
+
+ if (Error) {
+ FileManip.Close();
+ return Error;
+ }
+
+ i64 flength = FileManip.GetLength();
+ if (flength >= 0) {
+ i64 fsize = flength;
+ fsize -= METASIZE;
+ if (fsize % meta->PageSize)
+ return Error = MBDB_BAD_FILE_SIZE;
+ Pagenum = (int)(fsize / meta->PageSize);
+ } else {
+ Pagenum = -1;
+ }
+ Pagesize = meta->PageSize;
+ Recordsig = meta->RecordSig;
+ Error = Eof = 0;
+ return Error;
+ }
+
+ int ReadPages(iovec* vec, int nvec, int* pages) {
+ *pages = 0;
+
+ if (Eof || Error)
+ return Error;
+
+ ssize_t size = 0, delta = 0, total = 0;
+ iovec* pvec = vec;
+ int vsize = nvec;
+
+ while (vsize && (size = Readv(FileManip, pvec, (int)Min(vsize, 16))) > 0) {
+ total += size;
+ if (delta) {
+ size += delta;
+ pvec->iov_len += delta;
+ pvec->iov_base = (char*)pvec->iov_base - delta;
+ delta = 0;
+ }
+ while (size) {
+ if ((size_t)size >= pvec->iov_len) {
+ size -= pvec->iov_len;
+ ++pvec;
+ --vsize;
+ } else {
+ delta = size;
+ pvec->iov_len -= size;
+ pvec->iov_base = (char*)pvec->iov_base + size;
+ size = 0;
+ }
+ }
+ }
+ if (delta) {
+ pvec->iov_len += delta;
+ pvec->iov_base = (char*)pvec->iov_base - delta;
+ }
+ if (size < 0)
+ return Error = errno ? errno : MBDB_READ_ERROR;
+ if (total % Pagesize)
+ return Error = MBDB_BAD_FILE_SIZE;
+ if (vsize)
+ Eof = 1;
+ *pages = total / Pagesize; // it would be better to assign it after the for-loops
+ for (; total; ++vec, total -= size)
+ for (size = 0; size < total && (size_t)size < vec->iov_len; size += Pagesize)
+ if (((TDatPage*)((char*)vec->iov_base + size))->PageSig != PAGESIG)
+ return Error = MBDB_BAD_PAGESIG;
+ return Error;
+ }
+
+ int GotoPage(int page) {
+ if (Error)
+ return Error;
+ Eof = 0;
+ i64 offset = (i64)page * Pagesize + METASIZE;
+ if (offset != FileManip.Seek(offset, SEEK_SET))
+ Error = MBDB_BAD_FILE_SIZE;
+ return Error;
+ }
+
+ int Term() {
+ return FileManip.Close();
+ }
+
+ size_t Pagesize;
+ int Fd;
+ int Eof;
+ int Error;
+ int Pagenum; //!< number of pages in this file
+ ui32 Recordsig;
+};
+
+template <class TBaseReader>
+class TMappedInputPageIterator: public TBaseReader {
+public:
+ typedef TBaseReader TReader;
+
+ TMappedInputPageIterator() {
+ Term();
+ }
+
+ ~TMappedInputPageIterator() {
+ Term();
+ }
+
+ TDatPage* Current() {
+ return CurPage;
+ }
+
+ inline size_t GetPageSize() const {
+ return TReader::GetPageSize();
+ }
+
+ inline int GetPageNum() const {
+ return PageNum;
+ }
+
+ inline int IsEof() const {
+ return Eof;
+ }
+
+ inline int IsFrozen() const {
+ return 0;
+ }
+
+ TDatPage* Next() {
+ i64 pos = (i64)(++PageNum) * GetPageSize() + METASIZE;
+ if (pos < 0 || pos >= (i64)TReader::GetSize()) {
+ Eof = 1;
+ return CurPage = nullptr;
+ }
+ return CurPage = (TDatPage*)((char*)TReader::GetData() + pos);
+ }
+
+protected:
+ int Init(size_t /*pages*/, int /*pagesOrBytes*/) {
+ Term();
+ Eof = 0;
+ return 0;
+ }
+
+ int Term() {
+ PageNum = -1;
+ Eof = 1;
+ CurPage = nullptr;
+ return 0;
+ }
+
+ TDatPage* GotoPage(int pageno) {
+ PageNum = pageno - 1;
+ Eof = 0;
+ return Next();
+ }
+
+ int PageNum, Eof, Pages, Pagenum;
+ TDatPage* CurPage;
+};
+
+using TInputPageFile = TInputPageFileImpl<TInputFileManip>;
+
+template <class TVal,
+ typename TBaseRecIter = TInputRecordIterator<TVal, TInputPageIterator<TInputPageFile>>>
+class TInDatFileImpl: public TBaseRecIter {
+public:
+ typedef TBaseRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TPageIter::TReader TReader;
+ using TRecIter::GotoPage;
+
+ int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr, bool direct = false) {
+ int ret = TReader::Init(fname, TVal::RecordSig, gotRecordSig, direct);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Open(const TFile& file, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
+ int ret = TReader::Init(file, TVal::RecordSig, gotRecordSig);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Open(TAutoPtr<IInputStream> input, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
+ int ret = TReader::Init(input, TVal::RecordSig, gotRecordSig);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Open(const TFile& file, const TDatMetaPage* meta, size_t pages = 1, int pagesOrBytes = 1) {
+ int ret = TReader::Init(file, meta, TVal::RecordSig);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Close() {
+ int ret1 = TRecIter::Term();
+ int ret2 = TPageIter::Term();
+ int ret3 = TReader::Term();
+ return ret1 ? ret1 : ret2 ? ret2 : ret3;
+ }
+
+ const TVal* GotoLastPage() {
+ return TReader::GetLastPage() <= 0 ? nullptr : TRecIter::GotoPage(TReader::GetLastPage() - 1);
+ }
+
+private:
+ int Open2(size_t pages, int pagesOrBytes) {
+ int ret = TPageIter::Init(pages, pagesOrBytes);
+ if (!ret)
+ ret = TRecIter::Init();
+ if (ret)
+ Close();
+ return ret;
+ }
+};
+
+template <class TVal>
+class TInIndexFile: protected TInDatFileImpl<TVal> {
+ typedef TInDatFileImpl<TVal> TDatFile;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TExtInfoType<TVal>::TResult TExtInfo;
+
+public:
+ using TDatFile::IsOpen;
+
+ TInIndexFile()
+ : Index0(nullptr)
+ {
+ }
+
+ int Open(const char* fname, size_t pages = 2, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
+ int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig);
+ if (ret)
+ return ret;
+ if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) {
+ TDatFile::Close();
+ return MBDB_NO_MEMORY;
+ }
+ if (!TExtInfoType<TVal>::Exists && SizeOf((TVal*)nullptr))
+ RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TVal*)nullptr));
+ TDatFile::Next();
+ memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize());
+ return 0;
+ }
+
+ int Close() {
+ free(Index0);
+ Index0 = nullptr;
+ return TDatFile::Close();
+ }
+
+ inline int GetError() const {
+ return TDatFile::GetError();
+ }
+
+ int FindKey(const TVal* akey, const TExtInfo* extInfo = nullptr) {
+ assert(IsOpen());
+ if (TExtInfoType<TVal>::Exists || !SizeOf((TVal*)nullptr))
+ return FindVszKey(akey, extInfo);
+ int num = FindKeyOnPage(Index0, akey);
+ TDatPage* page = TPageIter::GotoPage(num + 1);
+ if (!page)
+ return 0;
+ num = FindKeyOnPage(page, akey);
+ num += (TPageIter::GetPageNum() - 1) * RecsOnPage;
+ return num;
+ }
+
+ int FindVszKey(const TVal* akey, const TExtInfo* extInfo = NULL) {
+ int num = FindVszKeyOnPage(Index0, akey, extInfo);
+ int num_add = 0;
+ for (int p = 0; p < num; p++) {
+ TDatPage* page = TPageIter::GotoPage(p + 1);
+ if (!page)
+ return 0;
+ num_add += page->RecNum;
+ }
+ TDatPage* page = TPageIter::GotoPage(num + 1);
+ if (!page)
+ return 0;
+ num = FindVszKeyOnPage(page, akey, extInfo);
+ num += num_add;
+ return num;
+ }
+
+protected:
+ int FindKeyOnPage(TDatPage* page, const TVal* key) {
+ int left = 0;
+ int right = page->RecNum - 1;
+ int recsize = DatCeil(SizeOf((TVal*)nullptr));
+ while (left < right) {
+ int middle = (left + right) >> 1;
+ if (*((TVal*)((char*)page + sizeof(TDatPage) + middle * recsize)) < *key)
+ left = middle + 1;
+ else
+ right = middle;
+ }
+ //borders check (left and right)
+ return (left == 0 || *((TVal*)((char*)page + sizeof(TDatPage) + left * recsize)) < *key) ? left : left - 1;
+ }
+
+ // will deserialize rawExtinfoA to extInfoA only if necessery
+ inline bool KeyLess_(const TVal* a, const TVal* b,
+ TExtInfo* extInfoA, const TExtInfo* extInfoB,
+ const ui8* rawExtInfoA, size_t rawLen) {
+ if (*a < *b) {
+ return true;
+ } else if (!extInfoB || *b < *a) {
+ return false;
+ } else {
+ // *a == *b && extInfoB
+ Y_PROTOBUF_SUPPRESS_NODISCARD extInfoA->ParseFromArray(rawExtInfoA, rawLen);
+ return (*extInfoA < *extInfoB);
+ }
+ }
+
+ int FindVszKeyOnPage(TDatPage* page, const TVal* key, const TExtInfo* extInfo) {
+ TVal* cur = (TVal*)((char*)page + sizeof(TDatPage));
+ ui32 recnum = page->RecNum;
+ if (!TExtInfoType<TVal>::Exists) {
+ for (; recnum > 0 && *cur < *key; --recnum)
+ cur = (TVal*)((char*)cur + DatCeil(SizeOf(cur)));
+ } else {
+ size_t ll;
+ size_t l;
+ size_t sz = NMicroBDB::SizeOfExt(cur, &ll, &l);
+ TExtInfo ei;
+ for (; recnum > 0 && KeyLess_(cur, key, &ei, extInfo, (ui8*)cur + sz + ll, l); --recnum) {
+ cur = (TVal*)((ui8*)cur + DatCeil(sz + ll + l));
+ sz = NMicroBDB::SizeOfExt(cur, &ll, &l);
+ }
+ }
+
+ int idx = page->RecNum - recnum - 1;
+ return (idx >= 0) ? idx : 0;
+ }
+
+ TDatPage* Index0;
+ int RecsOnPage;
+};
+
+template <class TVal, class TKey, class TPageIterator = TInputPageIterator<TInputPageFile>>
+class TKeyFileMixin: public TInDatFileImpl<TVal, TInputRecordIterator<TVal, TPageIterator>> {
+protected:
+ TInIndexFile<TKey> KeyFile;
+};
+
+template <class TVal, class TKey, class TBase = TKeyFileMixin<TVal, TKey>>
+class TDirectInDatFile: public TBase {
+ typedef TBase TDatFile;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TDatFile::TPageIter TPageIter;
+
+public:
+ void Open(const char* path, size_t pages = 1, size_t keypages = 1, int pagesOrBytes = 1) {
+ int ret;
+ ui32 gotRecordSig = 0;
+
+ ret = TDatFile::Open(path, pages, pagesOrBytes, &gotRecordSig);
+ if (ret) {
+ ythrow yexception() << ErrorMessage(ret, "Failed to open input file", path, TVal::RecordSig, gotRecordSig);
+ }
+ char KeyName[PATH_MAX + 1];
+ if (DatNameToIdx(KeyName, path)) {
+ ythrow yexception() << ErrorMessage(MBDB_BAD_FILENAME, "Failed to open input file", path);
+ }
+ gotRecordSig = 0;
+ ret = KeyFile.Open(KeyName, keypages, 1, &gotRecordSig);
+ if (ret) {
+ ythrow yexception() << ErrorMessage(ret, "Failed to open input keyfile", KeyName, TKey::RecordSig, gotRecordSig);
+ }
+ }
+
+ void Close() {
+ int ret;
+
+ if (TDatFile::IsOpen() && (ret = TDatFile::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing input file");
+ if ((ret = TDatFile::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing input file");
+
+ if (KeyFile.IsOpen() && (ret = KeyFile.GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing input keyfile");
+ if ((ret = KeyFile.Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing input keyfile");
+ }
+
+ const TVal* FindRecord(const TKey* key, const typename TExtInfoType<TKey>::TResult* extInfo = nullptr) {
+ int page = KeyFile.FindKey(key, extInfo);
+ const TVal* val = TRecIter::GotoPage(page);
+ if (!TExtInfoType<TVal>::Exists || !extInfo) {
+ TKey k;
+ while (val) {
+ TMakeExtKey<TVal, TKey>::Make(&k, nullptr, val, nullptr);
+ if (!(k < *key))
+ break;
+ val = TRecIter::Next();
+ }
+ } else {
+ typename TExtInfoType<TVal>::TResult valExt;
+ TKey k;
+ typename TExtInfoType<TKey>::TResult kExt;
+ while (val) {
+ TRecIter::GetExtInfo(&valExt);
+ TMakeExtKey<TVal, TKey>::Make(&k, &kExt, val, &valExt);
+ if (*key < k || !(k < *key) && !(kExt < *extInfo)) // k > *key || k == *key && kExt >= *extInfo
+ break;
+ val = TRecIter::Next();
+ }
+ }
+ return val;
+ };
+
+ int FindPagesNo(const TKey* key, const typename TExtInfoType<TVal>::TResult* extInfo = NULL) {
+ return KeyFile.FindKey(key, extInfo);
+ }
+
+protected:
+ using TBase::KeyFile;
+};
diff --git a/library/cpp/microbdb/microbdb.cpp b/library/cpp/microbdb/microbdb.cpp
new file mode 100644
index 0000000000..c10dbdf126
--- /dev/null
+++ b/library/cpp/microbdb/microbdb.cpp
@@ -0,0 +1 @@
+#include "microbdb.h"
diff --git a/library/cpp/microbdb/microbdb.h b/library/cpp/microbdb/microbdb.h
new file mode 100644
index 0000000000..7521887337
--- /dev/null
+++ b/library/cpp/microbdb/microbdb.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <util/folder/dirut.h>
+
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable : 4706) /*assignment within conditional expression*/
+#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/
+#endif
+
+#include "align.h"
+#include "extinfo.h"
+#include "header.h"
+#include "reader.h"
+#include "heap.h"
+#include "file.h"
+#include "sorter.h"
+#include "input.h"
+#include "output.h"
+#include "sorterdef.h"
+
+inline int MakeSorterTempl(char path[/*FILENAME_MAX*/], const char* prefix) {
+ int ret = MakeTempDir(path, prefix);
+ if (!ret && strlcat(path, "%06d", FILENAME_MAX) > FILENAME_MAX - 100)
+ ret = EINVAL;
+ if (ret)
+ path[0] = 0;
+ return ret;
+}
+
+inline int GetMeta(TFile& file, TDatMetaPage* meta) {
+ ui8 buf[METASIZE], *ptr = buf;
+ ssize_t size = sizeof(buf), ret;
+ while (size && (ret = file.Read(ptr, size)) > 0) {
+ size -= ret;
+ ptr += ret;
+ }
+ if (size)
+ return MBDB_BAD_FILE_SIZE;
+ ptr = buf; // gcc 4.4 warning fix
+ *meta = *(TDatMetaPage*)ptr;
+ return (meta->MetaSig == METASIG) ? 0 : MBDB_BAD_METAPAGE;
+}
+
+template <class TRec>
+inline bool IsDatFile(const char* fname) {
+ TDatMetaPage meta;
+ TFile f(fname, RdOnly);
+ return !GetMeta(f, &meta) && meta.RecordSig == TRec::RecordSig;
+}
+
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif
diff --git a/library/cpp/microbdb/noextinfo.proto b/library/cpp/microbdb/noextinfo.proto
new file mode 100644
index 0000000000..6a78882e07
--- /dev/null
+++ b/library/cpp/microbdb/noextinfo.proto
@@ -0,0 +1,4 @@
+
+message TNoExtInfo {
+}
+
diff --git a/library/cpp/microbdb/output.h b/library/cpp/microbdb/output.h
new file mode 100644
index 0000000000..d0ecab2108
--- /dev/null
+++ b/library/cpp/microbdb/output.h
@@ -0,0 +1,1049 @@
+#pragma once
+
+#include "header.h"
+#include "file.h"
+
+#include <util/generic/buffer.h>
+#include <util/memory/tempbuf.h>
+
+#include <sys/uio.h>
+
+template <class TFileManip>
+inline ssize_t Writev(TFileManip& fileManip, const struct iovec* iov, int iovcnt) {
+ ssize_t written_count = 0;
+ for (int n = 0; n < iovcnt; n++) {
+ ssize_t last_write = fileManip.Write(iov[n].iov_base, iov[n].iov_len);
+ if (last_write < 0)
+ return -1;
+ written_count += last_write;
+ }
+ return written_count;
+}
+
+//*********************************************************************
+struct TFakeIndexer {
+ inline void NextPage(TDatPage*) noexcept {
+ }
+};
+
+struct TCallbackIndexer {
+ typedef void (*TCallback)(void* This, const TDatPage* page);
+
+ TCallbackIndexer() {
+ Callback = nullptr;
+ }
+
+ void SetCallback(void* t, TCallback c) {
+ This = t;
+ Callback = c;
+ }
+
+ void NextPage(TDatPage* dat) {
+ Callback(This, dat);
+ }
+
+ TCallback Callback;
+ void* This;
+};
+
+template <class TVal, typename TBasePageIter, typename TBaseIndexer = TFakeIndexer, typename TCompressor = TFakeCompression>
+class TOutputRecordIterator;
+
+template <class TVal, typename TBasePageIter, typename TBaseIndexer>
+class TOutputRecordIterator<TVal, TBasePageIter, TBaseIndexer, TFakeCompression>
+ : public TBasePageIter, public TBaseIndexer {
+public:
+ enum EOffset {
+ WrongOffset = size_t(-1)
+ };
+
+ typedef TBasePageIter TPageIter;
+ typedef TBaseIndexer TIndexer;
+
+ TOutputRecordIterator() {
+ Clear();
+ }
+
+ ~TOutputRecordIterator() {
+ Term();
+ }
+
+ inline const TVal* Current() const {
+ return Rec;
+ }
+
+ const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
+ NMicroBDB::AssertValid(v);
+ size_t len = SizeOf(v);
+ if (!TExtInfoType<TVal>::Exists)
+ return (Reserve(len)) ? (TVal*)memcpy(Rec, v, len) : nullptr;
+ else if (extInfo) {
+ size_t extSize = extInfo->ByteSize();
+ size_t extLenSize = len_long((i64)extSize);
+ if (!Reserve(len + extLenSize + extSize))
+ return nullptr;
+ memcpy(Rec, v, len);
+ out_long((i64)extSize, (char*)Rec + len);
+ extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize);
+ return Rec;
+ } else {
+ size_t extLenSize = len_long((i64)0);
+ if (!Reserve(len + extLenSize))
+ return nullptr;
+ memcpy(Rec, v, len);
+ out_long((i64)0, (char*)Rec + len);
+ return Rec;
+ }
+ }
+
+ const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
+ NMicroBDB::AssertValid(v);
+ size_t sz = SizeOf(v);
+ if (!Reserve(sz + extLen))
+ return nullptr;
+ memcpy(Rec, v, sz);
+ memcpy((ui8*)Rec + sz, extInfoRaw, extLen);
+ return Rec;
+ }
+
+ // use values stored in microbdb readers/writers internal buffer only.
+ // method expects serialized extInfo after this record
+ const TVal* PushWithExtInfo(const TVal* v) {
+ NMicroBDB::AssertValid(v);
+ size_t extSize;
+ size_t extLenSize;
+ size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize);
+ sz += extLenSize + extSize;
+ if (!Reserve(sz))
+ return nullptr;
+ memcpy(Rec, v, sz);
+ return Rec;
+ }
+
+ TVal* Reserve(size_t len) {
+ if (CurLen + DatCeil(len) > TPageIter::GetPageSize()) {
+ if (sizeof(TDatPage) + DatCeil(len) > TPageIter::GetPageSize())
+ return Rec = nullptr;
+ if (TPageIter::Current() && RecNum) {
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_RAW;
+ memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
+ TIndexer::NextPage(TPageIter::Current());
+ RecNum = 0;
+ }
+ if (!TPageIter::Next()) {
+ CurLen = TPageIter::GetPageSize();
+ return Rec = nullptr;
+ }
+ CurLen = sizeof(TDatPage);
+ }
+ LenForOffset = CurLen;
+ Rec = (TVal*)((char*)TPageIter::Current() + CurLen);
+ DatSet(Rec, len);
+
+ CurLen += DatCeil(len);
+
+ ++RecNum;
+ return Rec;
+ }
+
+ void Flush() {
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_RAW;
+ }
+
+ size_t Offset() const {
+ return Rec ? TPageIter::Offset() + LenForOffset : WrongOffset;
+ }
+
+ void ResetDat() {
+ CurLen = (char*)Rec - (char*)TPageIter::Current();
+ size_t len;
+ if (!TExtInfoType<TVal>::Exists) {
+ len = SizeOf(Rec);
+ } else {
+ size_t ll;
+ size_t l;
+ len = NMicroBDB::SizeOfExt(Rec, &ll, &l);
+ len += ll + l;
+ }
+ CurLen += DatCeil(len);
+ }
+
+protected:
+ void Clear() {
+ Rec = nullptr;
+ RecNum = 0;
+ CurLen = 0;
+ LenForOffset = 0;
+ }
+
+ int Init() {
+ Clear();
+ CurLen = TPageIter::GetPageSize();
+ return 0;
+ }
+
+ int Term() {
+ if (TPageIter::Current()) {
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_RAW;
+ memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
+ RecNum = 0;
+ }
+ int ret = !TPageIter::Current() && RecNum;
+ Clear();
+ return ret;
+ }
+
+ int GotoPage(int pageno) {
+ if (TPageIter::Current()) {
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_RAW;
+ memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
+ }
+ int ret = TPageIter::GotoPage(pageno);
+ if (!ret) {
+ RecNum = 0;
+ CurLen = sizeof(TDatPage);
+ }
+ return ret;
+ }
+
+ TVal* Rec;
+ int RecNum;
+ size_t CurLen;
+ size_t LenForOffset;
+};
+
+template <class TVal, typename TBasePageIter, typename TBaseIndexer, typename TAlgorithm>
+class TOutputRecordIterator
+ : public TBasePageIter,
+ public TBaseIndexer,
+ private TAlgorithm {
+ class TPageBuffer {
+ public:
+ void Init(size_t page) {
+ Pos = 0;
+ RecNum = 0;
+ Size = Min(page / 2, size_t(64 << 10));
+ Data.Reset(new ui8[Size]);
+ }
+
+ void Clear() {
+ Pos = 0;
+ RecNum = 0;
+ }
+
+ inline bool Empty() const {
+ return RecNum == 0;
+ }
+
+ public:
+ size_t Size;
+ size_t Pos;
+ int RecNum;
+ TArrayHolder<ui8> Data;
+ };
+
+public:
+ typedef TBasePageIter TPageIter;
+ typedef TBaseIndexer TIndexer;
+
+ TOutputRecordIterator()
+ : Rec(nullptr)
+ , RecNum(0)
+ {
+ }
+
+ ~TOutputRecordIterator() {
+ Term();
+ }
+
+ const TVal* Current() const {
+ return Rec;
+ }
+
+ const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
+ NMicroBDB::AssertValid(v);
+ size_t len = SizeOf(v);
+ if (!TExtInfoType<TVal>::Exists)
+ return (Reserve(len)) ? (TVal*)memcpy((TVal*)Rec, v, len) : nullptr;
+ else if (extInfo) {
+ size_t extSize = extInfo->ByteSize();
+ size_t extLenSize = len_long((i64)extSize);
+ if (!Reserve(len + extLenSize + extSize))
+ return nullptr;
+ memcpy(Rec, v, len);
+ out_long((i64)extSize, (char*)Rec + len);
+ extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize);
+ return Rec;
+ } else {
+ size_t extLenSize = len_long((i64)0);
+ if (!Reserve(len + extLenSize))
+ return nullptr;
+ memcpy(Rec, v, len);
+ out_long((i64)0, (char*)Rec + len);
+ return Rec;
+ }
+ }
+
+ const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
+ NMicroBDB::AssertValid(v);
+ size_t sz = SizeOf(v);
+ if (!Reserve(sz + extLen))
+ return NULL;
+ memcpy(Rec, v, sz);
+ memcpy((ui8*)Rec + sz, extInfoRaw, extLen);
+ return Rec;
+ }
+
+ // use values stored in microbdb readers/writers internal buffer only.
+ // method expects serialized extInfo after this record
+ const TVal* PushWithExtInfo(const TVal* v) {
+ NMicroBDB::AssertValid(v);
+ size_t extSize;
+ size_t extLenSize;
+ size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize);
+ sz += extLenSize + extSize;
+ if (!Reserve(sz))
+ return nullptr;
+ memcpy(Rec, v, sz);
+ return Rec;
+ }
+
+ TVal* Reserve(const size_t len) {
+ const size_t aligned = DatCeil(len);
+
+ if (!TPageIter::Current()) { // Allocate fist page
+ if (!TPageIter::Next()) {
+ CurLen = TPageIter::GetPageSize();
+ return Rec = nullptr;
+ }
+ CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
+ }
+
+ if (Buffer.Pos + aligned > Buffer.Size) {
+ if (Buffer.Pos == 0)
+ return Rec = nullptr;
+ if (FlushBuffer())
+ return Rec = nullptr;
+ if (Buffer.Pos + aligned + sizeof(TDatPage) + sizeof(TCompressedPage) > Buffer.Size)
+ return Rec = nullptr;
+ }
+
+ Rec = (TVal*)((char*)Buffer.Data.Get() + Buffer.Pos);
+ DatSet(Rec, len); // len is correct because DatSet set align tail to zero
+
+ Buffer.RecNum++;
+ Buffer.Pos += aligned;
+ ++RecNum;
+ return Rec;
+ }
+
+ void Flush() {
+ if (!Buffer.Empty()) {
+ FlushBuffer();
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
+ }
+ }
+
+ size_t Offset() const {
+ // According to vadya@ there is no evil to return 0 all the time
+ return 0;
+ }
+
+ void ResetDat() {
+ Buffer.Pos = (char*)Rec - (char*)Buffer.Data.Get();
+ size_t len = SizeOf(Rec);
+ Buffer.Pos += DatCeil(len);
+ }
+
+protected:
+ void Clear() {
+ RecNum = 0;
+ Rec = nullptr;
+ Count = 0;
+ CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
+ Buffer.Clear();
+ }
+
+ int Init() {
+ Clear();
+ Buffer.Init(TPageIter::GetPageSize());
+ TAlgorithm::Init();
+ return 0;
+ }
+
+ int Term() {
+ if (TPageIter::Current())
+ Commit();
+ int ret = !TPageIter::Current() && RecNum;
+ Clear();
+ TAlgorithm::Term();
+ return ret;
+ }
+
+ int GotoPage(int pageno) {
+ if (TPageIter::Current())
+ Commit();
+ int ret = TPageIter::GotoPage(pageno);
+ if (!ret)
+ Reset();
+ return ret;
+ }
+
+private:
+ void Commit() {
+ Flush();
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
+ SetCompressedPageHeader();
+
+ memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
+ RecNum = 0;
+ Count = 0;
+ }
+
+ inline void SetCompressedPageHeader() {
+ TCompressedPage* const hdr = (TCompressedPage*)((ui8*)TPageIter::Current() + sizeof(TDatPage));
+
+ hdr->BlockCount = Count;
+ hdr->Algorithm = TAlgorithm::Code;
+ hdr->Version = 0;
+ hdr->Reserved = 0;
+ }
+
+ inline void Reset() {
+ RecNum = 0;
+ CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
+ Count = 0;
+ Buffer.Clear();
+ }
+
+ int FlushBuffer() {
+ TArrayHolder<ui8> data;
+ const ui8* const buf = Buffer.Data.Get();
+ size_t first = 0;
+
+ if (!TExtInfoType<TVal>::Exists)
+ first = DatCeil(SizeOf((TVal*)buf));
+ else {
+ size_t ll;
+ size_t l;
+ first = NMicroBDB::SizeOfExt((const TVal*)buf, &ll, &l);
+ first = DatCeil(first + ll + l);
+ }
+
+ size_t total = sizeof(NMicroBDB::TCompressedHeader) + first + ((Buffer.RecNum == 1) ? 0 : TAlgorithm::CompressBound(Buffer.Pos - first));
+ size_t real = total;
+
+ {
+ ui8* p = nullptr;
+ NMicroBDB::TCompressedHeader* hdr = nullptr;
+
+ // 1. Choose data destination (temporary buffer or dat-page)
+ if (CurLen + total > TPageIter::GetPageSize()) {
+ data.Reset(new ui8[total]);
+
+ hdr = (NMicroBDB::TCompressedHeader*)data.Get();
+ p = data.Get() + sizeof(NMicroBDB::TCompressedHeader);
+ } else {
+ p = (ui8*)TPageIter::Current() + CurLen;
+ hdr = (NMicroBDB::TCompressedHeader*)p;
+ p += sizeof(NMicroBDB::TCompressedHeader);
+ }
+
+ // 2. Compress data
+
+ // Fill header and first record
+ hdr->Original = Buffer.Pos;
+ hdr->Compressed = 0;
+ hdr->Count = Buffer.RecNum;
+ hdr->Reserved = 0;
+ memcpy(p, Buffer.Data.Get(), first);
+ // Fill compressed part
+ if (Buffer.RecNum > 1) {
+ size_t size = TAlgorithm::CompressBound(Buffer.Pos - first);
+
+ p += first;
+ TAlgorithm::Compress(p, size, buf + first, Buffer.Pos - first);
+
+ hdr->Compressed = size;
+
+ real = sizeof(NMicroBDB::TCompressedHeader) + first + size;
+ }
+ }
+
+ Y_ASSERT(sizeof(TDatPage) + sizeof(TCompressedPage) + real <= TPageIter::GetPageSize());
+
+ // 3. Check page capacity
+
+ if (CurLen + real > TPageIter::GetPageSize()) {
+ Y_ASSERT(data.Get() != nullptr);
+
+ if (TPageIter::Current() && RecNum) {
+ RecNum = RecNum - Buffer.RecNum;
+ TPageIter::Current()->RecNum = RecNum;
+ TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
+ SetCompressedPageHeader();
+ memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
+ TIndexer::NextPage(TPageIter::Current());
+ RecNum = Buffer.RecNum;
+ Count = 0;
+ }
+ if (!TPageIter::Next()) {
+ CurLen = TPageIter::GetPageSize();
+ return MBDB_NO_MEMORY;
+ }
+ CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
+ }
+
+ // 4. Flush data and reset buffer state
+
+ if (data.Get())
+ memcpy((ui8*)TPageIter::Current() + CurLen, data.Get(), real);
+ CurLen += real;
+ ++Count;
+ Buffer.Clear();
+ return 0;
+ }
+
+private:
+ size_t CurLen;
+ TPageBuffer Buffer;
+ TVal* Rec;
+ ui32 Count; //! < count of compressed blocks on page
+public:
+ int RecNum;
+};
+
+template <typename TBaseWriter>
+class TOutputPageIterator: public TBaseWriter {
+public:
+ typedef TBaseWriter TWriter;
+
+ TOutputPageIterator()
+ : Buf(nullptr)
+ {
+ Clear();
+ }
+
+ ~TOutputPageIterator() {
+ Term();
+ }
+
+ TDatPage* Current() {
+ return CurPage;
+ }
+
+ size_t Offset() const {
+ //Cout << "PS = " << TWriter::GetPageSize() << "; PN = " << PageNum << "; MS = " << METASIZE << Endl;
+ return TWriter::GetPageSize() * PageNum + METASIZE;
+ }
+
+ int Freeze() {
+ return (Frozen = (PageNum == -1) ? 0 : (int)PageNum);
+ }
+
+ void Unfreeze() {
+ Frozen = -1;
+ }
+
+ inline int IsFrozen() const {
+ return Frozen + 1;
+ }
+
+ inline size_t GetPageSize() const {
+ return TWriter::GetPageSize();
+ }
+
+ inline int GetPageNum() const {
+ return (int)PageNum;
+ }
+
+ TDatPage* Next() {
+ if (PageNum >= Maxpage && WriteBuf())
+ return CurPage = nullptr;
+ CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
+ memset(CurPage, 0, sizeof(TDatPage));
+ return CurPage;
+ }
+
+protected:
+ int Init(size_t pages, int pagesOrBytes) {
+ Term();
+ if (pagesOrBytes)
+ Bufpages = pages;
+ else
+ Bufpages = pages / GetPageSize();
+ Bufpages = Max<size_t>(1, Bufpages);
+ Maxpage = Bufpages - 1;
+ // if (!(Buf = (char*)malloc(Bufpages * GetPageSize())))
+ // return ENOMEM;
+ ABuf.Alloc(Bufpages * GetPageSize());
+ Buf = ABuf.Begin();
+ if (TWriter::Memo)
+ Freeze();
+ return 0;
+ }
+
+ int Term() {
+ Unfreeze();
+ int ret = (PageNum < 0) ? 0 : WriteBuf();
+ Clear();
+ return ret;
+ }
+
+ int GotoPage(int pageno) {
+ int ret = EAGAIN;
+ if (IsFrozen() || PageNum >= 0 && ((ret = WriteBuf())) || ((ret = TWriter::GotoPage(pageno))))
+ return ret;
+ PageNum = pageno;
+ Maxpage = Bufpages - 1 + pageno;
+ CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
+ memset(CurPage, 0, sizeof(TDatPage));
+ return 0;
+ }
+
+ void Clear() {
+ ABuf.Dealloc();
+ Buf = nullptr;
+ Maxpage = PageNum = Frozen = -1;
+ Bufpages = 0;
+ CurPage = nullptr;
+ }
+
+ int WriteBuf() {
+ int nvec;
+ iovec vec[2];
+ ssize_t minpage = Maxpage - Bufpages + 1;
+ ssize_t maxpage = Frozen == -1 ? PageNum : Frozen - 1;
+ if (maxpage < minpage)
+ return EAGAIN;
+ minpage %= Bufpages;
+ maxpage %= Bufpages;
+ if (maxpage < minpage) {
+ vec[0].iov_base = Buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
+ vec[1].iov_base = Buf;
+ vec[1].iov_len = GetPageSize() * (maxpage + 1);
+ nvec = 2;
+ } else {
+ vec[0].iov_base = Buf + GetPageSize() * minpage;
+ vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
+ nvec = 1;
+ }
+ if (TWriter::WritePages(vec, nvec))
+ return EIO;
+ Maxpage += (maxpage < minpage) ? (Bufpages - minpage + maxpage + 1) : (maxpage - minpage + 1);
+ return 0;
+ }
+
+ ssize_t Maxpage;
+ ssize_t Bufpages;
+ ssize_t PageNum;
+ int Frozen;
+ TDatPage* CurPage;
+ char* Buf;
+ TMappedAllocation ABuf;
+};
+
+template <class TFileManip>
+class TOutputPageFileImpl: private TNonCopyable {
+public:
+ TOutputPageFileImpl()
+ : Pagesize(0)
+ , Eof(1)
+ , Error(0)
+ , Memo(0)
+ , Recordsig(0)
+ {
+ }
+
+ ~TOutputPageFileImpl() {
+ Term();
+ }
+
+ inline int IsEof() const {
+ return Eof;
+ }
+
+ inline int GetError() const {
+ return Error;
+ }
+
+ inline bool IsOpen() const {
+ return FileManip.IsOpen();
+ }
+
+ inline size_t GetPageSize() const {
+ return Pagesize;
+ }
+
+ inline ui32 GetRecordSig() const {
+ return Recordsig;
+ }
+
+ int Init(const char* fname, size_t pagesize, ui32 recsig, bool direct = false) {
+ Memo = 0;
+ if (FileManip.IsOpen())
+ return MBDB_ALREADY_INITIALIZED;
+
+ if (!fname) {
+ Eof = Error = 0;
+ Pagesize = pagesize;
+ Recordsig = recsig;
+ Memo = 1;
+ return 0;
+ }
+
+ Error = FileManip.Open(fname, WrOnly | CreateAlways | ARW | AWOther | (direct ? DirectAligned : EOpenMode()));
+ if (Error)
+ return Error;
+ Error = Init(TFile(), pagesize, recsig);
+ if (Error) {
+ FileManip.Close();
+ unlink(fname);
+ }
+ return Error;
+ }
+
+ int Init(TAutoPtr<IOutputStream> output, size_t pagesize, ui32 recsig) {
+ Memo = 0;
+ if (FileManip.IsOpen()) {
+ return MBDB_ALREADY_INITIALIZED;
+ }
+
+ if (!output) {
+ Eof = Error = 0;
+ Pagesize = pagesize;
+ Recordsig = recsig;
+ Memo = 1;
+ return 0;
+ }
+
+ Error = FileManip.Open(output);
+ if (Error)
+ return Error;
+ Error = Init(TFile(), pagesize, recsig);
+ if (Error) {
+ FileManip.Close();
+ }
+ return Error;
+ }
+
+ int Init(const TFile& file, size_t pagesize, ui32 recsig) {
+ Memo = 0;
+ if (!file.IsOpen() && !FileManip.IsOpen())
+ return MBDB_NOT_INITIALIZED;
+ if (file.IsOpen() && FileManip.IsOpen())
+ return MBDB_ALREADY_INITIALIZED;
+ if (file.IsOpen()) {
+ Error = FileManip.Init(file);
+ if (Error)
+ return Error;
+ }
+
+ Eof = 1;
+ TTempBuf buf(METASIZE + FS_BLOCK_SIZE);
+ const char* ptr = (buf.Data() + FS_BLOCK_SIZE - ((ui64)buf.Data() & (FS_BLOCK_SIZE - 1)));
+ TDatMetaPage* meta = (TDatMetaPage*)ptr;
+
+ memset(buf.Data(), 0, buf.Size());
+ meta->MetaSig = METASIG;
+ meta->PageSize = Pagesize = pagesize;
+ meta->RecordSig = Recordsig = recsig;
+
+ ssize_t size = METASIZE, ret = 0;
+ while (size && (ret = FileManip.Write(ptr, (unsigned)size)) > 0) {
+ size -= ret;
+ ptr += ret;
+ }
+ if (size || ret <= 0) {
+ Term();
+ return Error = errno ? errno : MBDB_WRITE_ERROR;
+ }
+
+ Error = Eof = 0;
+ return Error;
+ }
+
+protected:
+ int WritePages(iovec* vec, int nvec) {
+ if (Error || Memo)
+ return Error;
+
+ ssize_t size, delta;
+ iovec* pvec;
+ int vsize;
+
+ for (vsize = 0, pvec = vec; vsize < nvec; vsize++, pvec++)
+ for (size = 0; (size_t)size < pvec->iov_len; size += Pagesize)
+ ((TDatPage*)((char*)pvec->iov_base + size))->PageSig = PAGESIG;
+
+ delta = size = 0;
+ pvec = vec;
+ vsize = nvec;
+ while (vsize && (size = Writev(FileManip, pvec, (int)Min(vsize, 16))) > 0) {
+ if (delta) {
+ size += delta;
+ pvec->iov_len += delta;
+ pvec->iov_base = (char*)pvec->iov_base - delta;
+ delta = 0;
+ }
+ while (size) {
+ if ((size_t)size >= pvec->iov_len) {
+ size -= pvec->iov_len;
+ ++pvec;
+ --vsize;
+ } else {
+ delta = size;
+ pvec->iov_len -= size;
+ pvec->iov_base = (char*)pvec->iov_base + size;
+ size = 0;
+ }
+ }
+ }
+ if (delta) {
+ pvec->iov_len += delta;
+ pvec->iov_base = (char*)pvec->iov_base - delta;
+ }
+ return Error = (!size && !vsize) ? 0 : errno ? errno : MBDB_WRITE_ERROR;
+ }
+
+ i64 Tell() {
+ return FileManip.RealSeek(0, SEEK_CUR);
+ }
+
+ int GotoPage(int pageno) {
+ if (Error || Memo)
+ return Error;
+ Eof = 0;
+ i64 offset = (i64)pageno * Pagesize + METASIZE;
+ if (offset != FileManip.Seek(offset, SEEK_SET))
+ Error = MBDB_BAD_FILE_SIZE;
+ return Error;
+ }
+
+ int Term() {
+ int ret = FileManip.Close();
+ Eof = 1;
+ Memo = 0;
+ if (!Error)
+ Error = ret;
+ return Error;
+ }
+
+ size_t Pagesize;
+ int Eof;
+ int Error;
+ int Memo;
+ ui32 Recordsig;
+
+private:
+ TFileManip FileManip;
+};
+
+using TOutputPageFile = TOutputPageFileImpl<TOutputFileManip>;
+
+template <class TVal,
+ typename TBaseRecIter = TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>>>
+class TOutDatFileImpl: public TBaseRecIter {
+public:
+ typedef TBaseRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TPageIter::TWriter TWriter;
+
+ int Open(const char* fname, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1, bool direct = false) {
+ int ret = TWriter::Init(fname, pagesize, TVal::RecordSig, direct);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Open(const TFile& file, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) {
+ int ret = TWriter::Init(file, pagesize, TVal::RecordSig);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Open(TAutoPtr<IOutputStream> output, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) {
+ int ret = TWriter::Init(output, pagesize, TVal::RecordSig);
+ return ret ? ret : Open2(pages, pagesOrBytes);
+ }
+
+ int Close() {
+ int ret1 = TRecIter::Term();
+ int ret2 = TPageIter::Term();
+ int ret3 = TWriter::Term();
+ return ret1 ? ret1 : ret2 ? ret2 : ret3;
+ }
+
+private:
+ int Open2(size_t pages, int pagesOrBytes) {
+ int ret = TPageIter::Init(pages, pagesOrBytes);
+ if (!ret)
+ ret = TRecIter::Init();
+ if (ret)
+ Close();
+ return ret;
+ }
+};
+
+template <class TVal>
+class TOutIndexFile: public TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> {
+ typedef TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>>
+ TDatFile;
+ typedef TOutIndexFile<TVal> TMyType;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TIndexer TIndexer;
+
+public:
+ TOutIndexFile() {
+ TIndexer::SetCallback(this, DispatchCallback);
+ }
+
+ int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
+ int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
+ if (ret)
+ return ret;
+ if ((ret = TRecIter::GotoPage(1))) {
+ TDatFile::Close();
+ return ret;
+ }
+ Index0.Clear();
+ return ret;
+ }
+
+ int Close() {
+ TPageIter::Unfreeze();
+ if (TRecIter::RecNum) {
+ TRecIter::Flush();
+ NextPage(TPageIter::Current());
+ }
+ int ret = 0;
+ if (Index0.Size() && !(ret = TRecIter::GotoPage(0))) {
+ const char* ptr = Index0.Begin();
+ size_t recSize;
+ while (ptr < Index0.End()) {
+ Y_ASSERT((size_t)(Index0.End() - ptr) >= sizeof(size_t));
+ memcpy(&recSize, ptr, sizeof(size_t));
+ ptr += sizeof(size_t);
+ Y_ASSERT((size_t)(Index0.End() - ptr) >= recSize);
+ ui8* buf = (ui8*)TRecIter::Reserve(recSize);
+ if (!buf) {
+ ret = MBDB_PAGE_OVERFLOW;
+ break;
+ }
+ memcpy(buf, ptr, recSize);
+ TRecIter::ResetDat();
+ ptr += recSize;
+ }
+ Index0.Clear();
+ ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError();
+ }
+ int ret1 = TDatFile::Close();
+ return ret ? ret : ret1;
+ }
+
+protected:
+ TBuffer Index0;
+
+ void NextPage(const TDatPage* page) {
+ const TVal* first = (const TVal*)NMicroBDB::GetFirstRecord(page);
+ size_t sz;
+ if (!TExtInfoType<TVal>::Exists) {
+ sz = SizeOf(first);
+ } else {
+ size_t ll;
+ size_t l;
+ sz = NMicroBDB::SizeOfExt(first, &ll, &l);
+ sz += ll + l;
+ }
+ Index0.Append((const char*)&sz, sizeof(size_t));
+ Index0.Append((const char*)first, sz);
+ }
+
+ static void DispatchCallback(void* This, const TDatPage* page) {
+ ((TMyType*)This)->NextPage(page);
+ }
+};
+
+template <class TVal, class TKey, typename TCompressor = TFakeCompression, class TPageFile = TOutputPageFile>
+class TOutDirectFileImpl: public TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> {
+ typedef TOutDatFileImpl<
+ TVal,
+ TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>>
+ TDatFile;
+ typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TMyType;
+ typedef typename TDatFile::TRecIter TRecIter;
+ typedef typename TRecIter::TPageIter TPageIter;
+ typedef typename TRecIter::TIndexer TIndexer;
+ typedef TOutIndexFile<TKey> TKeyFile;
+
+public:
+ TOutDirectFileImpl() {
+ TIndexer::SetCallback(this, DispatchCallback);
+ }
+
+ int Open(const char* fname, size_t pagesize, int pages = 1, size_t ipagesize = 0, size_t ipages = 1, int pagesOrBytes = 1) {
+ char iname[FILENAME_MAX];
+ int ret;
+ if (ipagesize == 0)
+ ipagesize = pagesize;
+ ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
+ ret = ret ? ret : DatNameToIdx(iname, fname);
+ ret = ret ? ret : KeyFile.Open(iname, ipagesize, ipages, pagesOrBytes);
+ if (ret)
+ TDatFile::Close();
+ return ret;
+ }
+
+ int Close() {
+ if (TRecIter::RecNum) {
+ TRecIter::Flush();
+ NextPage(TPageIter::Current());
+ }
+ int ret = KeyFile.Close();
+ int ret1 = TDatFile::Close();
+ return ret1 ? ret1 : ret;
+ }
+
+ int GetError() const {
+ return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError();
+ }
+
+protected:
+ TKeyFile KeyFile;
+
+ void NextPage(const TDatPage* page) {
+ typedef TMakeExtKey<TVal, TKey> TMakeExtKey;
+
+ TVal* val = (TVal*)NMicroBDB::GetFirstRecord(page);
+ TKey key;
+ if (!TMakeExtKey::Exists) {
+ TMakeExtKey::Make(&key, nullptr, val, nullptr);
+ KeyFile.Push(&key);
+ } else {
+ size_t ll;
+ size_t l;
+ size_t sz = NMicroBDB::SizeOfExt(val, &ll, &l);
+ typename TExtInfoType<TVal>::TResult valExt;
+ if (TExtInfoType<TVal>::Exists)
+ Y_PROTOBUF_SUPPRESS_NODISCARD valExt.ParseFromArray((ui8*)val + sz + ll, l);
+ typename TExtInfoType<TKey>::TResult keyExt;
+ TMakeExtKey::Make(&key, &keyExt, val, &valExt);
+ KeyFile.Push(&key, &keyExt);
+ }
+ }
+
+ static void DispatchCallback(void* This, const TDatPage* page) {
+ ((TMyType*)This)->NextPage(page);
+ }
+};
diff --git a/library/cpp/microbdb/powersorter.h b/library/cpp/microbdb/powersorter.h
new file mode 100644
index 0000000000..c40de9c23f
--- /dev/null
+++ b/library/cpp/microbdb/powersorter.h
@@ -0,0 +1,667 @@
+#pragma once
+
+#include "safeopen.h"
+
+#include <util/generic/vector.h>
+#include <util/generic/deque.h>
+#include <util/system/mutex.h>
+#include <util/system/condvar.h>
+#include <util/thread/pool.h>
+
+template <
+ class TRecord,
+ template <typename T> class TCompare,
+ class TSieve,
+ class TMemoFile = TOutDatFile<TRecord>>
+class TDatSorterBuf {
+public:
+ typedef TRecord TRec;
+ typedef TVector<TRec*> TVectorType;
+ typedef TMemoFile TMemo;
+ typedef TCompare<TRecord> TComp;
+
+public:
+ TDatSorterBuf(size_t memory, size_t pageSize)
+ : Memo("memo", pageSize, memory, 0)
+ , Cur()
+ {
+ Memo.Open(nullptr);
+ Memo.Freeze();
+ }
+
+ ~TDatSorterBuf() {
+ Vector.clear();
+ Memo.Close();
+ }
+
+ const TRec* Push(const TRec* v) {
+ const TRec* u = Memo.Push(v);
+ if (u)
+ Vector.push_back((TRec*)u);
+ return u;
+ }
+
+ const TRec* Next() {
+ if (Ptr == Vector.end()) {
+ if (Cur)
+ TSieve::Sieve(Cur, Cur);
+ Cur = nullptr;
+ } else {
+ Cur = *Ptr++;
+ if (!TIsSieveFake<TSieve>::Result)
+ while (Ptr != Vector.end() && TSieve::Sieve(Cur, *Ptr))
+ ++Ptr;
+ }
+ return Cur;
+ }
+
+ const TRec* Current() {
+ return Cur;
+ }
+
+ size_t Size() {
+ return Vector.size();
+ }
+
+ void Sort() {
+ Ptr = Vector.begin();
+ Cur = nullptr;
+
+ MBDB_SORT_FUN(Vector.begin(), Vector.end(), TComp());
+ }
+
+ void Clear() {
+ Vector.clear();
+ Memo.Freeze();
+ Ptr = Vector.begin();
+ Cur = nullptr;
+ }
+
+private:
+ TVectorType Vector;
+ TMemo Memo;
+
+ typename TVectorType::iterator
+ Ptr;
+ TRec* Cur;
+};
+
+template <
+ class TRecord,
+ class TInput,
+ template <typename T> class TCompare,
+ class TSieve>
+class TDatMerger {
+public:
+ typedef TRecord TRec;
+ typedef TCompare<TRecord> TComp;
+ typedef TSimpleSharedPtr<TInput> TInputPtr;
+ typedef TVector<TInputPtr> TInputVector;
+
+public:
+ ~TDatMerger() {
+ Close();
+ }
+
+ void Init(const TInputVector& inputs) {
+ Inputs = inputs;
+ TVector<TInput*> v;
+ for (int i = 0; i < Inputs.ysize(); ++i)
+ v.push_back(Inputs[i].Get());
+ HeapIter.Init(&v[0], v.size());
+ if (!TIsSieveFake<TSieve>::Result)
+ PNext = HeapIter.Next();
+ }
+
+ const TRec* Next() {
+ if (TIsSieveFake<TSieve>::Result) {
+ return HeapIter.Next();
+ }
+
+ if (!PNext) {
+ if (PCur) {
+ TSieve::Sieve(PCur, PCur);
+ PCur = nullptr;
+ }
+ return nullptr;
+ }
+
+ PCur = &Cur;
+ memcpy(PCur, PNext, SizeOf((const TRec*)PNext));
+
+ do {
+ PNext = HeapIter.Next();
+ } while (PNext && TSieve::Sieve(PCur, PNext));
+
+ return PCur;
+ }
+
+ const TRec* Current() {
+ return (TIsSieveFake<TSieve>::Result ? HeapIter.Current() : PCur);
+ }
+
+ void Close() {
+ Inputs.clear();
+ HeapIter.Term();
+ }
+
+private:
+ TInputVector Inputs;
+ THeapIter<TRec, TInput, TComp> HeapIter;
+ TRec Cur;
+ TRec* PCur = nullptr;
+ const TRec* PNext = nullptr;
+};
+
+class TPortionManager {
+public:
+ void Open(const char* tempDir) {
+ TGuard<TMutex> guard(Mutex);
+ TempDir = tempDir;
+ }
+
+ TString Next() {
+ TGuard<TMutex> guard(Mutex);
+ if (Portions == 0)
+ DoOpen();
+ TString fname = GeneratePortionFilename(Portions++);
+ return fname;
+ }
+
+ void Close() {
+ TGuard<TMutex> guard(Mutex);
+ Portions = 0;
+ }
+
+private:
+ void DoOpen() {
+ if (MakeSorterTempl(PortionFilenameTempl, TempDir.data())) {
+ PortionFilenameTempl[0] = 0;
+ ythrow yexception() << "portion-manager: bad tempdir \"" << TempDir.data() << "\": " << LastSystemErrorText();
+ }
+ }
+
+ TString GeneratePortionFilename(int i) {
+ char str[FILENAME_MAX];
+ snprintf(str, sizeof(str), PortionFilenameTempl, i);
+ return TString(str);
+ }
+
+private:
+ TMutex Mutex;
+
+ TString TempDir;
+ char PortionFilenameTempl[FILENAME_MAX] = {};
+ int Portions = 0;
+};
+
+// A merger powered by threads
+template <
+ class TRecord,
+ template <typename T> class TCompare,
+ class TSieve,
+ class TInput = TInDatFile<TRecord>,
+ class TOutput = TOutDatFile<TRecord>>
+class TPowerMerger {
+public:
+ typedef TRecord TRec;
+ typedef TDatMerger<TRecord, TInput, TCompare, TSieve> TMerger;
+ typedef TSimpleSharedPtr<TMerger> TMergerPtr;
+ typedef TPowerMerger<TRecord, TCompare, TSieve, TInput, TOutput> TFileMerger;
+
+ struct TMergePortionTask: public IObjectInQueue {
+ TFileMerger* FileMerger;
+ int Begin;
+ int End;
+ TString OutFname;
+
+ TMergePortionTask(TFileMerger* fileMerger, int begin, int end, const TString& outFname)
+ : FileMerger(fileMerger)
+ , Begin(begin)
+ , End(end)
+ , OutFname(outFname)
+ {
+ }
+
+ void Process(void*) override {
+ THolder<TMergePortionTask> This(this);
+ //fprintf(stderr, "MergePortion: (%i, %i, %s)\n", Begin, End, ~OutFname);
+ FileMerger->MergePortion(Begin, End, OutFname);
+ }
+ };
+
+public:
+ TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const TSimpleSharedPtr<TPortionManager>& portMan,
+ int memory, int pageSize, bool autoUnlink)
+ : MtpQueue(mtpQueue)
+ , PortionManager(portMan)
+ , Memory(memory)
+ , PageSize(pageSize)
+ , AutoUnlink(autoUnlink)
+ {
+ }
+
+ TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const char* tempDir,
+ int memory, int pageSize, bool autoUnlink)
+ : MtpQueue(mtpQueue)
+ , PortionManager(new TPortionManager)
+ , Memory(memory)
+ , PageSize(pageSize)
+ , AutoUnlink(autoUnlink)
+ {
+ PortionManager->Open(tempDir);
+ }
+
+ ~TPowerMerger() {
+ Close();
+ }
+
+ void SetMtpQueue(const TSimpleSharedPtr<TThreadPool>& mtpQueue) {
+ MtpQueue = mtpQueue;
+ }
+
+ void MergePortion(int begin, int end, const TString& outFname) {
+ TMerger merger;
+ InitMerger(merger, begin, end);
+
+ TOutput out("mergeportion-tmpout", PageSize, BufSize, 0);
+ out.Open(outFname.data());
+ const TRec* rec;
+ while ((rec = merger.Next()))
+ out.Push(rec);
+ out.Close();
+
+ merger.Close();
+
+ {
+ TGuard<TMutex> guard(Mutex);
+ UnlinkFiles(begin, end);
+ Files.push_back(outFname);
+ --Tasks;
+ TaskFinishedCond.Signal();
+ }
+ }
+
+ void Add(const TString& fname) {
+ TGuard<TMutex> guard(Mutex);
+ // fprintf(stderr, "TPowerMerger::Add: %s\n", ~fname);
+ Files.push_back(fname);
+ if (InitialFilesEnd > 0)
+ ythrow yexception() << "TPowerMerger::Add: no more files allowed";
+ }
+
+ void Merge(int maxPortions) {
+ TGuard<TMutex> guard(Mutex);
+ InitialFilesEnd = Files.ysize();
+ if (!InitialFilesEnd)
+ ythrow yexception() << "TPowerMerger::Merge: no files added";
+ Optimize(maxPortions);
+ MergeMT();
+ InitMerger(Merger, CPortions, Files.ysize());
+ }
+
+ void Close() {
+ TGuard<TMutex> guard(Mutex);
+ Merger.Close();
+ UnlinkFiles(CPortions, Files.ysize());
+ InitialFilesEnd = CPortions = 0;
+ Files.clear();
+ }
+
+ const TRec* Next() {
+ return Merger.Next();
+ }
+
+ const TRec* Current() {
+ return Merger.Current();
+ }
+
+ int FileCount() const {
+ TGuard<TMutex> guard(Mutex);
+ return Files.ysize();
+ }
+
+private:
+ void InitMerger(TMerger& merger, int begin, int end) {
+ TGuard<TMutex> guard(Mutex);
+ TVector<TSimpleSharedPtr<TInput>> inputs;
+ for (int i = begin; i < end; ++i) {
+ inputs.push_back(new TInput("mergeportion-tmpin", BufSize, 0));
+ inputs.back()->Open(Files[i]);
+ // fprintf(stderr, "InitMerger: %i, %s\n", i, ~Files[i]);
+ }
+ merger.Init(inputs);
+ }
+
+ void UnlinkFiles(int begin, int end) {
+ TGuard<TMutex> guard(Mutex);
+ for (int i = begin; i < end; ++i) {
+ if (i >= InitialFilesEnd || AutoUnlink)
+ unlink(Files[i].c_str());
+ }
+ }
+
+ void Optimize(int maxPortions, size_t maxBufSize = 4u << 20) {
+ TGuard<TMutex> guard(Mutex);
+ maxPortions = std::min(maxPortions, Memory / PageSize - 1);
+ maxBufSize = std::max((size_t)PageSize, maxBufSize);
+
+ if (maxPortions <= 2) {
+ FPortions = MPortions = 2;
+ BufSize = PageSize;
+ return;
+ }
+
+ int Portions = Files.ysize();
+ if (maxPortions >= Portions) {
+ FPortions = MPortions = Portions;
+ } else if (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) {
+ while (((Portions + maxPortions - 1) / maxPortions) <= maxPortions)
+ --maxPortions;
+ MPortions = ++maxPortions;
+ int total = ((Portions + MPortions - 1) / MPortions) + Portions;
+ FPortions = (total % MPortions) ? (total % MPortions) : (int)MPortions;
+ } else
+ FPortions = MPortions = maxPortions;
+
+ BufSize = std::min((size_t)(Memory / (MPortions + 1)), maxBufSize);
+ // fprintf(stderr, "Optimize: Portions=%i; MPortions=%i; FPortions=%i; Memory=%i; BufSize=%i\n",
+ // (int)Portions, (int)MPortions, (int)FPortions, (int)Memory, (int)BufSize);
+ }
+
+ void MergeMT() {
+ TGuard<TMutex> guard(Mutex);
+ do {
+ int n;
+ while ((n = Files.ysize() - CPortions) > MPortions) {
+ int m = std::min((CPortions == 0 ? (int)FPortions : (int)MPortions), n);
+ TString fname = PortionManager->Next();
+ if (!MtpQueue->Add(new TMergePortionTask(this, CPortions, CPortions + m, fname)))
+ ythrow yexception() << "TPowerMerger::MergeMT: failed to add task";
+ CPortions += m;
+ ++Tasks;
+ }
+ if (Tasks > 0)
+ TaskFinishedCond.Wait(Mutex);
+ } while (Tasks > 0);
+ }
+
+private:
+ TMutex Mutex;
+ TCondVar TaskFinishedCond;
+
+ TMerger Merger;
+ TSimpleSharedPtr<TThreadPool> MtpQueue;
+ TSimpleSharedPtr<TPortionManager> PortionManager;
+ TVector<TString> Files;
+ int Tasks = 0;
+ int InitialFilesEnd = 0;
+ int CPortions = 0;
+ int MPortions = 0;
+ int FPortions = 0;
+ int Memory = 0;
+ int PageSize = 0;
+ int BufSize = 0;
+ bool AutoUnlink = false;
+};
+
+// A sorter powered by threads
+template <
+ class TRecord,
+ template <typename T> class TCompare,
+ class TSieve = TFakeSieve<TRecord>,
+ class TTmpInput = TInDatFile<TRecord>,
+ class TTmpOutput = TOutDatFile<TRecord>>
+class TPowerSorter {
+public:
+ typedef TPowerSorter<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TSorter;
+ typedef TRecord TRec;
+ typedef TTmpOutput TTmpOut;
+ typedef TTmpInput TTmpIn;
+ typedef TDatSorterBuf<TRecord, TCompare, TSieve> TSorterBuf;
+ typedef TCompare<TRecord> TComp;
+ typedef TPowerMerger<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TFileMerger;
+
+ struct TSortPortionTask: public IObjectInQueue {
+ TSorter* Sorter;
+ TSorterBuf* SorterBuf;
+ int Portion;
+
+ TSortPortionTask(TSorter* sorter, TSorterBuf* sorterBuf, int portion)
+ : Sorter(sorter)
+ , SorterBuf(sorterBuf)
+ , Portion(portion)
+ {
+ }
+
+ void Process(void*) override {
+ TAutoPtr<TSortPortionTask> This(this);
+ // fprintf(stderr, "SortPortion: %i\n", Portion);
+ Sorter->SortPortion(SorterBuf);
+ }
+ };
+
+ class TSorterBufQueue {
+ private:
+ TMutex Mutex;
+ TCondVar Cond;
+ TVector<TSimpleSharedPtr<TSorterBuf>> V;
+ TDeque<TSorterBuf*> Q;
+
+ int Memory, PageSize, MaxSorterBufs;
+
+ public:
+ TSorterBufQueue(int memory, int pageSize, int maxSorterBufs)
+ : Memory(memory)
+ , PageSize(pageSize)
+ , MaxSorterBufs(maxSorterBufs)
+ {
+ }
+
+ void Push(TSorterBuf* sb) {
+ TGuard<TMutex> guard(Mutex);
+ sb->Clear();
+ Q.push_back(sb);
+ Cond.Signal();
+ }
+
+ TSorterBuf* Pop() {
+ TGuard<TMutex> guard(Mutex);
+ if (!Q.size() && V.ysize() < MaxSorterBufs) {
+ V.push_back(new TSorterBuf(Memory / MaxSorterBufs, PageSize));
+ return V.back().Get();
+ } else {
+ while (!Q.size())
+ Cond.Wait(Mutex);
+ TSorterBuf* t = Q.front();
+ Q.pop_front();
+ return t;
+ }
+ }
+
+ void Clear() {
+ TGuard<TMutex> guard(Mutex);
+ Q.clear();
+ V.clear();
+ }
+
+ void WaitAll() {
+ TGuard<TMutex> guard(Mutex);
+ while (Q.size() < V.size()) {
+ Cond.Wait(Mutex);
+ }
+ }
+
+ int GetMaxSorterBufs() const {
+ return MaxSorterBufs;
+ }
+ };
+
+public:
+ TPowerSorter(const TSimpleSharedPtr<TThreadPool>& mtpQueue, size_t maxSorterBufs,
+ const char* name, size_t memory, size_t pageSize, size_t bufSize)
+ : MaxSorterBufs(maxSorterBufs)
+ , Name(name)
+ , Memory(memory)
+ , PageSize(pageSize)
+ , BufSize(bufSize)
+ , MtpQueue(mtpQueue)
+ , PortionManager(new TPortionManager)
+ , SBQueue(Memory, PageSize, MaxSorterBufs)
+ , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
+ {
+ }
+
+ TPowerSorter(size_t maxSorterBufs,
+ const char* name, size_t memory, size_t pageSize, size_t bufSize)
+ : MaxSorterBufs(maxSorterBufs)
+ , Name(name)
+ , Memory(memory)
+ , PageSize(pageSize)
+ , BufSize(bufSize)
+ , PortionManager(new TPortionManager)
+ , SBQueue(Memory, PageSize, maxSorterBufs)
+ , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
+ {
+ }
+
+ TPowerSorter(const char* name, size_t memory, size_t pageSize, size_t bufSize)
+ : MaxSorterBufs(5)
+ , Name(name)
+ , Memory(memory)
+ , PageSize(pageSize)
+ , BufSize(bufSize)
+ , PortionManager(new TPortionManager)
+ , SBQueue(Memory, PageSize, MaxSorterBufs)
+ , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
+ {
+ }
+
+ ~TPowerSorter() {
+ Close();
+ }
+
+ void Open(const char* tempDir) {
+ Close();
+ CurSB = SBQueue.Pop();
+ PortionManager->Open(tempDir);
+ }
+
+ void Reopen(const char* fname) {
+ Open(fname);
+ }
+
+ void Close() {
+ CurSB = nullptr;
+ SBQueue.Clear();
+ PortionCount = 0;
+ FileMerger.Close();
+ PortionManager->Close();
+ }
+
+ const TRec* Push(const TRec* v) {
+ CheckOpen("Push");
+ const TRec* u = CurSB->Push(v);
+ if (!u) {
+ NextPortion();
+ u = CurSB->Push(v);
+ }
+ return u;
+ }
+
+ void Sort(int maxPortions = 1000) {
+ CheckOpen("Sort");
+ if (!PortionCount) {
+ CurSB->Sort();
+ } else {
+ NextPortion();
+ SBQueue.Push(CurSB);
+ CurSB = nullptr;
+ SBQueue.WaitAll();
+ SBQueue.Clear();
+ FileMerger.Merge(maxPortions);
+ }
+ }
+
+ const TRec* Next() {
+ return PortionCount ? FileMerger.Next() : CurSB->Next();
+ }
+
+ const TRec* Current() {
+ return PortionCount ? FileMerger.Current() : CurSB->Current();
+ }
+
+ int GetBufSize() const {
+ return BufSize;
+ }
+
+ int GetPageSize() const {
+ return PageSize;
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+private:
+ void CheckOpen(const char* m) {
+ if (!CurSB)
+ ythrow yexception() << "TPowerSorter::" << m << ": the sorter is not open";
+ }
+
+ void NextPortion() {
+ if (!CurSB->Size())
+ return;
+ ++PortionCount;
+ if (MaxSorterBufs <= 1) {
+ SortPortion(CurSB);
+ } else {
+ if (!MtpQueue.Get()) {
+ MtpQueue.Reset(new TThreadPool);
+ MtpQueue->Start(MaxSorterBufs - 1);
+ FileMerger.SetMtpQueue(MtpQueue);
+ }
+ if (!MtpQueue->Add(new TSortPortionTask(this, CurSB, PortionCount)))
+ ythrow yexception() << "TPowerSorter::NextPortion: failed to add task";
+ }
+ CurSB = SBQueue.Pop();
+ }
+
+ void SortPortion(TSorterBuf* sorterBuf) {
+ TString portionFilename = PortionManager->Next();
+ try {
+ sorterBuf->Sort();
+
+ // fprintf(stderr, "TPowerSorter::SortPortion: -> %s\n", ~portionFilename);
+ TTmpOut out("powersorter-portion", PageSize, BufSize, 0);
+ out.Open(portionFilename.data());
+
+ while (sorterBuf->Next())
+ out.Push(sorterBuf->Current());
+
+ out.Close();
+ FileMerger.Add(portionFilename);
+ SBQueue.Push(sorterBuf);
+ } catch (const yexception& e) {
+ unlink(portionFilename.data());
+ ythrow yexception() << "SortPortion: " << e.what();
+ }
+ }
+
+private:
+ int MaxSorterBufs = 0;
+ TString Name;
+ int Memory = 0;
+ int PageSize = 0;
+ int BufSize = 0;
+
+ TMutex Mutex;
+ TSimpleSharedPtr<TThreadPool> MtpQueue;
+ TSimpleSharedPtr<TPortionManager> PortionManager;
+
+ TSorterBufQueue SBQueue;
+ TSorterBuf* CurSB = nullptr;
+ int PortionCount = 0;
+
+ TFileMerger FileMerger;
+};
diff --git a/library/cpp/microbdb/reader.h b/library/cpp/microbdb/reader.h
new file mode 100644
index 0000000000..694a2f1766
--- /dev/null
+++ b/library/cpp/microbdb/reader.h
@@ -0,0 +1,354 @@
+#pragma once
+
+#include "align.h"
+#include "header.h"
+#include "extinfo.h"
+
+#include <contrib/libs/zlib/zlib.h>
+#include <contrib/libs/fastlz/fastlz.h>
+#include <contrib/libs/snappy/snappy.h>
+
+#include <util/generic/vector.h>
+#include <util/memory/tempbuf.h>
+
+namespace NMicroBDB {
+ static const size_t DEFAULT_BUFFER_SIZE = (64 << 10);
+
+ //!
+ template <class TVal>
+ class IBasePageReader {
+ public:
+ virtual size_t GetRecSize() const = 0;
+ virtual size_t GetExtSize() const = 0;
+ virtual bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const = 0;
+ virtual const ui8* GetExtInfoRaw(size_t* len) const = 0;
+ virtual const TVal* Next() = 0;
+ virtual void Reset() = 0;
+ //! set clearing flag, so temporary buffers will be cleared
+ //! in next call of Next()
+ virtual void SetClearFlag() {
+ }
+
+ virtual ~IBasePageReader() {
+ }
+ };
+
+ template <class TVal, typename TPageIter>
+ class TRawPageReader: public IBasePageReader<TVal> {
+ public:
+ TRawPageReader(TPageIter* const iter)
+ : PageIter(iter)
+ {
+ Reset();
+ }
+
+ bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override {
+ Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
+ if (!Rec)
+ return false;
+ ui8* raw = (ui8*)Rec + RecSize + ExtLenSize;
+ return extInfo->ParseFromArray(raw, ExtSize);
+ }
+
+ size_t GetRecSize() const override {
+ return RecSize + ExtLenSize;
+ }
+
+ size_t GetExtSize() const override {
+ return ExtSize;
+ }
+
+ const ui8* GetExtInfoRaw(size_t* len) const override {
+ Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
+ if (!Rec) {
+ *len = 0;
+ return nullptr;
+ }
+ *len = ExtLenSize + ExtSize;
+ return (ui8*)Rec + RecSize;
+ }
+
+ const TVal* Next() override {
+ if (!Rec)
+ Rec = (TVal*)((char*)PageIter->Current() + sizeof(TDatPage));
+ else
+ Rec = (TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize));
+ if (!TExtInfoType<TVal>::Exists)
+ RecSize = SizeOf(Rec);
+ else
+ RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize);
+ return Rec;
+ }
+
+ void Reset() override {
+ Rec = nullptr;
+ RecSize = 0;
+ ExtLenSize = 0;
+ ExtSize = 0;
+ }
+
+ private:
+ const TVal* Rec;
+ size_t RecSize;
+ size_t ExtLenSize;
+ size_t ExtSize;
+ TPageIter* const PageIter;
+ };
+
+ template <class TVal, typename TPageIter>
+ class TCompressedReader: public IBasePageReader<TVal> {
+ inline size_t GetFirstRecordSize(const TVal* const in) const {
+ if (!TExtInfoType<TVal>::Exists) {
+ return DatCeil(SizeOf(in));
+ } else {
+ size_t ll;
+ size_t l;
+ size_t ret = SizeOfExt(in, &ll, &l);
+
+ return DatCeil(ret + ll + l);
+ }
+ }
+
+ void DecompressBlock() {
+ if (PageIter->IsFrozen() && Buffer.Get())
+ Blocks.push_back(Buffer.Release());
+
+ const TCompressedHeader* hdr = (const TCompressedHeader*)(Page);
+
+ Page += sizeof(TCompressedHeader);
+
+ const size_t first = GetFirstRecordSize((const TVal*)Page);
+
+ if (!Buffer.Get() || Buffer->Size() < hdr->Original)
+ Buffer.Reset(new TTempBuf(Max<size_t>(hdr->Original, DEFAULT_BUFFER_SIZE)));
+
+ memcpy(Buffer->Data(), Page, first);
+ Page += first;
+
+ if (hdr->Count > 1) {
+ switch (Algo) {
+ case MBDB_COMPRESSION_ZLIB: {
+ uLongf dst = hdr->Original - first;
+
+ int ret = uncompress((Bytef*)Buffer->Data() + first, &dst, Page, hdr->Compressed);
+
+ if (ret != Z_OK)
+ ythrow yexception() << "error then uncompress " << ret;
+ } break;
+ case MBDB_COMPRESSION_FASTLZ: {
+ int dst = hdr->Original - first;
+ int ret = yfastlz_decompress(Page, hdr->Compressed, Buffer->Data() + first, dst);
+
+ if (!ret)
+ ythrow yexception() << "error then uncompress";
+ } break;
+ case MBDB_COMPRESSION_SNAPPY: {
+ if (!snappy::RawUncompress((const char*)Page, hdr->Compressed, Buffer->Data() + first))
+ ythrow yexception() << "error then uncompress";
+ } break;
+ }
+ }
+
+ Rec = nullptr;
+ RecNum = hdr->Count;
+ Page += hdr->Compressed;
+ }
+
+ void ClearBuffer() {
+ for (size_t i = 0; i < Blocks.size(); ++i)
+ delete Blocks[i];
+ Blocks.clear();
+ ClearFlag = false;
+ }
+
+ public:
+ TCompressedReader(TPageIter* const iter)
+ : Rec(nullptr)
+ , RecSize(0)
+ , ExtLenSize(0)
+ , ExtSize(0)
+ , Page(nullptr)
+ , PageIter(iter)
+ , RecNum(0)
+ , BlockNum(0)
+ , ClearFlag(false)
+ {
+ }
+
+ ~TCompressedReader() override {
+ ClearBuffer();
+ }
+
+ size_t GetRecSize() const override {
+ return RecSize + ExtLenSize;
+ }
+
+ size_t GetExtSize() const override {
+ return ExtSize;
+ }
+
+ bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override {
+ Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
+ if (!Rec)
+ return false;
+ ui8* raw = (ui8*)Rec + RecSize + ExtLenSize;
+ return extInfo->ParseFromArray(raw, ExtSize);
+ }
+
+ const ui8* GetExtInfoRaw(size_t* len) const override {
+ Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
+ if (!Rec) {
+ *len = 0;
+ return nullptr;
+ }
+ *len = ExtLenSize + ExtSize;
+ return (ui8*)Rec + RecSize;
+ }
+
+ const TVal* Next() override {
+ Y_ASSERT(RecNum >= 0);
+
+ if (ClearFlag)
+ ClearBuffer();
+
+ if (!Page) {
+ if (!PageIter->Current())
+ return nullptr;
+
+ Page = (ui8*)PageIter->Current() + sizeof(TDatPage);
+
+ BlockNum = ((TCompressedPage*)Page)->BlockCount - 1;
+ Algo = (ECompressionAlgorithm)((TCompressedPage*)Page)->Algorithm;
+ Page += sizeof(TCompressedPage);
+
+ DecompressBlock();
+ }
+
+ if (!RecNum) {
+ if (BlockNum <= 0)
+ return nullptr;
+ else {
+ --BlockNum;
+ DecompressBlock();
+ }
+ }
+
+ --RecNum;
+ if (!Rec)
+ Rec = (const TVal*)Buffer->Data();
+ else
+ Rec = (const TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize));
+
+ if (!TExtInfoType<TVal>::Exists)
+ RecSize = SizeOf(Rec);
+ else
+ RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize);
+
+ return Rec;
+ }
+
+ void Reset() override {
+ Page = nullptr;
+ BlockNum = 0;
+ Rec = nullptr;
+ RecSize = 0;
+ ExtLenSize = 0;
+ ExtSize = 0;
+ RecNum = 0;
+ }
+
+ void SetClearFlag() override {
+ ClearFlag = true;
+ }
+
+ public:
+ THolder<TTempBuf> Buffer;
+ TVector<TTempBuf*> Blocks;
+ const TVal* Rec;
+ size_t RecSize;
+ size_t ExtLenSize;
+ size_t ExtSize;
+ const ui8* Page;
+ TPageIter* const PageIter;
+ int RecNum; //!< count of recs in current block
+ int BlockNum;
+ ECompressionAlgorithm Algo;
+ bool ClearFlag;
+ };
+
+ class TZLibCompressionImpl {
+ public:
+ static const ECompressionAlgorithm Code = MBDB_COMPRESSION_ZLIB;
+
+ inline void Init() {
+ // -
+ }
+
+ inline void Term() {
+ // -
+ }
+
+ inline size_t CompressBound(size_t size) const noexcept {
+ return ::compressBound(size);
+ }
+
+ inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
+ uLongf size = outSize;
+
+ if (compress((Bytef*)out, &size, (const Bytef*)in, inSize) != Z_OK)
+ ythrow yexception() << "not compressed";
+ outSize = size;
+ }
+ };
+
+ class TFastlzCompressionImpl {
+ public:
+ static const ECompressionAlgorithm Code = MBDB_COMPRESSION_FASTLZ;
+
+ inline void Init() {
+ // -
+ }
+
+ inline void Term() {
+ // -
+ }
+
+ inline size_t CompressBound(size_t size) const noexcept {
+ size_t rval = size_t(size * 1.07);
+ return rval < 66 ? 66 : rval;
+ }
+
+ inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
+ outSize = yfastlz_compress_level(2, in, inSize, out);
+ if (!outSize)
+ ythrow yexception() << "not compressed";
+ }
+ };
+
+ class TSnappyCompressionImpl {
+ public:
+ static const ECompressionAlgorithm Code = MBDB_COMPRESSION_SNAPPY;
+
+ inline void Init() {
+ // -
+ }
+
+ inline void Term() {
+ // -
+ }
+
+ inline size_t CompressBound(size_t size) const noexcept {
+ return snappy::MaxCompressedLength(size);
+ }
+
+ inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
+ snappy::RawCompress((const char*)in, inSize, (char*)out, &outSize);
+ }
+ };
+
+}
+
+using TFakeCompression = void;
+using TZLibCompression = NMicroBDB::TZLibCompressionImpl;
+using TFastlzCompression = NMicroBDB::TFastlzCompressionImpl;
+using TSnappyCompression = NMicroBDB::TSnappyCompressionImpl;
diff --git a/library/cpp/microbdb/safeopen.h b/library/cpp/microbdb/safeopen.h
new file mode 100644
index 0000000000..c328ffd575
--- /dev/null
+++ b/library/cpp/microbdb/safeopen.h
@@ -0,0 +1,792 @@
+#pragma once
+
+// util
+#include <util/generic/yexception.h>
+#include <util/generic/vector.h>
+#include <util/string/util.h>
+#include <util/system/mutex.h>
+#include <thread>
+
+#include "microbdb.h"
+
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable : 4706) /*assignment within conditional expression*/
+#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/
+#endif
+
+template <typename TVal, typename TPageFile = TInputPageFile, typename TIterator = TInputPageIterator<TPageFile>>
+class TInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> {
+public:
+ typedef TVal TRec;
+ typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> TBase;
+
+ TInDatFile(const TString& name, size_t pages, int pagesOrBytes = 1)
+ : Name(name)
+ , Pages(pages)
+ , PagesOrBytes(pagesOrBytes)
+ {
+ }
+
+ ~TInDatFile() {
+ Close();
+ }
+
+ void Open(const TString& fname, bool direct = false) {
+ ui32 gotRecordSig = 0;
+ int ret = TBase::Open(fname.data(), Pages, PagesOrBytes, &gotRecordSig, direct);
+ if (ret) {
+ // XXX: print record type name, not type sig
+ ythrow yexception() << ErrorMessage(ret, "Failed to open input file", fname, TVal::RecordSig, gotRecordSig);
+ }
+ Name = fname;
+ }
+
+ void OpenStream(TAutoPtr<IInputStream> input) {
+ ui32 gotRecordSig = 0;
+ int ret = TBase::Open(input, Pages, PagesOrBytes, &gotRecordSig);
+ if (ret) {
+ // XXX: print record type name, not type sig
+ ythrow yexception() << ErrorMessage(ret, "Failed to open input file", Name, TVal::RecordSig, gotRecordSig);
+ }
+ }
+
+ void Close() {
+ int ret;
+ if (IsOpen() && (ret = TBase::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing input file", Name);
+ if ((ret = TBase::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing input file", Name);
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+ using TBase::Current;
+ using TBase::Freeze;
+ using TBase::GetError;
+ using TBase::GetExtInfo;
+ using TBase::GetExtInfoRaw;
+ using TBase::GetExtSize;
+ using TBase::GetLastPage;
+ using TBase::GetPageNum;
+ using TBase::GetPageSize;
+ using TBase::GetRecSize;
+ using TBase::GotoLastPage;
+ using TBase::GotoPage;
+ using TBase::IsEof;
+ using TBase::IsOpen;
+ using TBase::Next;
+ using TBase::Skip;
+ using TBase::Unfreeze;
+
+protected:
+ TString Name;
+ size_t Pages;
+ int PagesOrBytes;
+};
+
+template <typename TVal>
+class TMappedInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> {
+public:
+ typedef TVal TRec;
+ typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> TBase;
+
+ TMappedInDatFile(const TString& name, size_t /* pages */, int /* pagesOrBytes */)
+ : Name(name)
+ {
+ }
+
+ ~TMappedInDatFile() {
+ Close();
+ }
+
+ void Open(const TString& fname) {
+ int ret = TBase::Open(fname.data());
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, "Failed to open mapped file", fname, TVal::RecordSig);
+ Name = fname;
+ }
+
+ void Close() {
+ int ret;
+ if (IsOpen() && (ret = TBase::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing mapped file", Name);
+ if ((ret = TBase::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing mapped file", Name);
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+ using TBase::Current;
+ using TBase::GetError;
+ using TBase::GetExtInfo;
+ using TBase::GetExtInfoRaw;
+ using TBase::GetLastPage;
+ using TBase::GetPageNum;
+ using TBase::GetPageSize;
+ using TBase::GotoLastPage;
+ using TBase::GotoPage;
+ using TBase::IsEof;
+ using TBase::IsOpen;
+ using TBase::Next;
+ using TBase::Skip;
+
+protected:
+ TString Name;
+};
+
+template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
+class TOutDatFile: protected TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> {
+public:
+ typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> TBase;
+
+ TOutDatFile(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : Name(name)
+ , PageSize(pagesize)
+ , Pages(pages)
+ , PagesOrBytes(pagesOrBytes)
+ {
+ }
+
+ ~TOutDatFile() {
+ Close();
+ }
+
+ void Open(const char* fname, bool direct = false) {
+ int ret = TBase::Open(fname, PageSize, Pages, PagesOrBytes, direct);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
+ Name = fname;
+ }
+
+ void Open(const TString& fname) {
+ Open(fname.data());
+ }
+
+ void OpenStream(TAutoPtr<IOutputStream> output) {
+ int ret = TBase::Open(output, PageSize, Pages, PagesOrBytes);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, "Failed to open output stream", Name);
+ }
+
+ void Close() {
+ int ret;
+ if ((ret = TBase::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
+ if ((ret = TBase::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+ using TBase::Freeze;
+ using TBase::GetError;
+ using TBase::GetPageSize;
+ using TBase::IsEof;
+ using TBase::IsOpen;
+ using TBase::Offset;
+ using TBase::Push;
+ using TBase::PushWithExtInfo;
+ using TBase::Reserve;
+ using TBase::Unfreeze;
+
+protected:
+ TString Name;
+ size_t PageSize, Pages;
+ int PagesOrBytes;
+};
+
+template <typename TVal, typename TCompressor, typename TPageFile>
+class TOutDatFileArray;
+
+template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
+class TOutDatFileArray {
+ typedef TOutDatFile<TVal, TCompressor, TPageFile> TFileType;
+
+public:
+ TOutDatFileArray(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : Name(name)
+ , PageSize(pagesize)
+ , Pages(pages)
+ , PagesOrBytes(pagesOrBytes)
+ , NumFiles(0)
+ , Files(nullptr)
+ {
+ }
+
+ ~TOutDatFileArray() {
+ for (int i = 0; i < NumFiles; ++i) {
+ Files[i].Close();
+ Files[i].~TFileType();
+ }
+ free(Files);
+ Files = nullptr;
+ NumFiles = 0;
+ }
+
+ TFileType& operator[](size_t pos) {
+ return Files[pos];
+ }
+
+ void Open(int n, const TString& fname) {
+ char temp[FILENAME_MAX];
+
+ Name = fname;
+ NumFiles = CreateDatObjects(n, fname);
+
+ int i;
+ try {
+ for (i = 0; i < NumFiles; ++i) {
+ sprintf(temp, fname.data(), i);
+ Files[i].Open(temp);
+ }
+ } catch (...) {
+ while (--i >= 0)
+ Files[i].Close();
+ throw;
+ }
+ }
+
+ template <typename TNameBuilder>
+ void OpenWithCallback(int n, const TNameBuilder& builder) {
+ NumFiles = CreateDatObjects(n, Name);
+
+ for (int i = 0; i < NumFiles; ++i)
+ Files[i].Open(builder.GetName(i).data());
+ }
+
+ void Close() {
+ for (int i = 0; i < NumFiles; ++i)
+ Files[i].Close();
+ }
+
+ void CloseMT(ui32 threads) {
+ int current = 0;
+ TMutex mutex;
+ TVector<std::thread> thrs;
+ thrs.reserve(threads);
+ for (ui32 i = 0; i < threads; i++) {
+ thrs.emplace_back([this, &current, &mutex]() {
+ while (true) {
+ mutex.Acquire();
+ int cur = current++;
+ mutex.Release();
+ if (cur >= NumFiles)
+ break;
+ Files[cur].Close();
+ }
+ });
+ }
+ for (auto& thread : thrs) {
+ thread.join();
+ }
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+protected:
+ int CreateDatObjects(int n, const TString& fname) {
+ if (!(Files = (TFileType*)malloc(n * sizeof(TFileType))))
+ ythrow yexception() << "can't alloc \"" << fname << "\" file array: " << LastSystemErrorText();
+ int num = 0;
+ char temp[FILENAME_MAX];
+ for (int i = 0; i < n; ++i, ++num) {
+ sprintf(temp, "%s[%d]", fname.data(), i);
+ new (Files + i) TFileType(temp, PageSize, Pages, PagesOrBytes);
+ }
+ return num;
+ }
+
+ TString Name;
+ size_t PageSize, Pages;
+ int PagesOrBytes, NumFiles;
+ TFileType* Files;
+};
+
+template <typename TVal, typename TKey, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
+class TOutDirectFile: protected TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> {
+ typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TBase;
+
+public:
+ TOutDirectFile(const TString& name, size_t pagesize, size_t pages, size_t ipagesize, size_t ipages, int pagesOrBytes)
+ : Name(name)
+ , PageSize(pagesize)
+ , Pages(pages)
+ , IdxPageSize(ipagesize)
+ , IdxPages(ipages)
+ , PagesOrBytes(pagesOrBytes)
+ {
+ }
+
+ ~TOutDirectFile() {
+ Close();
+ }
+
+ void Open(const TString& fname) {
+ int ret = TBase::Open(fname.data(), PageSize, Pages, IdxPageSize, IdxPages, PagesOrBytes);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
+ Name = fname;
+ }
+
+ void Close() {
+ int ret;
+ if ((ret = TBase::GetError()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
+ if ((ret = TBase::Close()))
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+ using TBase::Freeze;
+ using TBase::Push;
+ using TBase::PushWithExtInfo;
+ using TBase::Reserve;
+ using TBase::Unfreeze;
+
+protected:
+ TString Name;
+ size_t PageSize, Pages, IdxPageSize, IdxPages;
+ int PagesOrBytes;
+};
+
+template <
+ typename TVal,
+ template <typename T> class TComparer,
+ typename TCompress = TFakeCompression,
+ typename TSieve = TFakeSieve<TVal>,
+ typename TPageFile = TOutputPageFile,
+ typename TFileTypes = TDefInterFileTypes>
+class TDatSorter: protected TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> {
+ typedef TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> TBase;
+
+public:
+ typedef TVal TRec;
+
+public:
+ TDatSorter(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : Name(name)
+ , Memory(memory)
+ , PageSize(pagesize)
+ , Pages(pages)
+ , PagesOrBytes(pagesOrBytes)
+ {
+ Templ[0] = 0;
+ }
+
+ ~TDatSorter() {
+ Close();
+ Templ[0] = 0;
+ }
+
+ void Open(const TString& dirName) {
+ int ret;
+ if (ret = MakeSorterTempl(Templ, dirName.data())) {
+ Templ[0] = 0;
+ ythrow yexception() << ErrorMessage(ret, Name + " sorter: bad tempdir", dirName);
+ }
+ if ((ret = TBase::Open(Templ, PageSize, Pages, PagesOrBytes)))
+ ythrow yexception() << ErrorMessage(ret, Name + " sorter: open error, temp dir", Templ);
+ }
+
+ void Sort(bool direct = false) {
+ int ret = TBase::Sort(Memory, 1000, direct);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, Name + " sorter: sort error, temp dir", Templ, TVal::RecordSig);
+ }
+
+ void SortToFile(const TString& name) {
+ int ret = TBase::SortToFile(name.data(), Memory);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToFile", name, TVal::RecordSig);
+ }
+
+ void SortToStream(TAutoPtr<IOutputStream> output) {
+ int ret = TBase::SortToStream(output, Memory);
+ if (ret)
+ ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToStream", "", TVal::RecordSig);
+ }
+
+ void Close() {
+ int ret1 = TBase::GetError();
+ int ret2 = TBase::Close();
+ if (Templ[0]) {
+ *strrchr(Templ, GetDirectorySeparator()) = 0;
+ RemoveDirWithContents(Templ);
+ Templ[0] = 0;
+ }
+ if (ret1)
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret1, Name + "sorter: error before closing");
+ if (ret2)
+ if (!std::uncaught_exception())
+ ythrow yexception() << ErrorMessage(ret2, Name + "sorter: error while closing");
+ }
+
+ int Sort(size_t memory, int maxportions, bool direct = false) {
+ return TBase::Sort(memory, maxportions, direct);
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+ using TBase::GetPageSize;
+ using TBase::GetPages;
+ using TBase::Next;
+ using TBase::NextPortion;
+ using TBase::Push;
+ using TBase::PushWithExtInfo;
+ using TBase::UseSegmentSorter;
+
+protected:
+ TString Name;
+ size_t Memory, PageSize, Pages;
+ int PagesOrBytes;
+ char Templ[FILENAME_MAX];
+};
+
+template <typename TSorter>
+class TSorterArray {
+public:
+ typedef TSorter TDatSorter;
+
+public:
+ TSorterArray(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : Name(name)
+ , Memory(memory)
+ , PageSize(pagesize)
+ , Pages(pages)
+ , PagesOrBytes(pagesOrBytes)
+ , NumSorters(0)
+ , Sorters(nullptr)
+ {
+ }
+
+ ~TSorterArray() {
+ for (int i = 0; i < NumSorters; ++i) {
+ Sorters[i].Close();
+ Sorters[i].~TSorter();
+ }
+ free(Sorters);
+ Sorters = nullptr;
+ NumSorters = 0;
+ }
+
+ TSorter& operator[](size_t pos) {
+ return Sorters[pos];
+ }
+
+ void Open(int n, const TString& fname, size_t memory = 0) {
+ if (!(Sorters = (TSorter*)malloc(n * sizeof(TSorter))))
+ ythrow yexception() << "can't alloc \"" << fname << "\" sorter array: " << LastSystemErrorText();
+ NumSorters = n;
+ char temp[FILENAME_MAX];
+ if (memory)
+ Memory = memory;
+ for (int i = 0; i < NumSorters; ++i) {
+ sprintf(temp, "%s[%d]", Name.data(), i);
+ new (Sorters + i) TSorter(temp, Memory, PageSize, Pages, PagesOrBytes);
+ }
+ for (int i = 0; i < NumSorters; ++i)
+ Sorters[i].Open(fname);
+ }
+
+ void Close() {
+ for (int i = 0; i < NumSorters; ++i)
+ Sorters[i].Close();
+ }
+
+ const char* GetName() const {
+ return Name.data();
+ }
+
+protected:
+ TString Name;
+ size_t Memory, PageSize, Pages;
+ int PagesOrBytes, NumSorters;
+ TSorter* Sorters;
+};
+
+template <typename TVal, template <typename T> class TCompare, typename TSieve = TFakeSieve<TVal>>
+class TDatSorterArray: public TSorterArray<TDatSorter<TVal, TCompare, TSieve>> {
+public:
+ TDatSorterArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : TSorterArray<TDatSorter<TVal, TCompare, TSieve>>(name, memory, pagesize, pages, pagesOrBytes)
+ {
+ }
+};
+
+template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression,
+ typename TSieve = TFakeSieve<TVal>, typename TPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
+class TDatSorterMemo: public TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> {
+ typedef TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> TSorter;
+
+public:
+ TOutDatFile<TVal> Memo;
+ TString Home;
+ bool OpenReq;
+ bool Opened;
+ bool UseDirectWrite;
+
+public:
+ TDatSorterMemo(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : TSorter(name, memory, pagesize, pages, pagesOrBytes)
+ , Memo(name, pagesize, memory, 0)
+ {
+ OpenReq = false;
+ Opened = false;
+ UseDirectWrite = false;
+ }
+
+ void Open(const TString& home) {
+ OpenReq = true;
+ // TSorter::Open(home);
+ Home = home;
+ Memo.Open(nullptr);
+ Memo.Freeze();
+ }
+
+ void Reopen(const char* home) {
+ Close();
+ Open(home);
+ }
+
+ void Open() {
+ if (!OpenReq) {
+ OpenReq = true;
+ Memo.Open(nullptr);
+ Memo.Freeze();
+ }
+ }
+
+ void OpenIfNeeded() {
+ if (OpenReq && !Opened) {
+ if (!Home)
+ ythrow yexception() << "Temp directory not specified, call Open(char*) first : " << TSorter::Name;
+ TSorter::Open(Home);
+ Opened = true;
+ }
+ }
+
+ TVal* Reserve(size_t len) {
+ if (TExtInfoType<TVal>::Exists)
+ return ReserveWithExt(len, 0);
+
+ TVal* u = Memo.Reserve(len);
+ if (!u) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Freeze();
+ u = Memo.Reserve(len);
+ }
+ TSorter::PushWithExtInfo(u);
+ return u;
+ }
+
+ TVal* ReserveWithExt(size_t len, size_t extSize) {
+ size_t fullLen = len + len_long((i64)extSize) + extSize;
+ TVal* u = Memo.Reserve(fullLen);
+ if (!u) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Freeze();
+ u = Memo.Reserve(fullLen);
+ if (!u) {
+ if (fullLen > Memo.GetPageSize()) {
+ ythrow yexception() << "Size of element and " << len << " size of extInfo " << extSize
+ << " is larger than page size " << Memo.GetPageSize();
+ }
+ ythrow yexception() << "going to insert a null pointer. Bad.";
+ }
+ }
+ out_long((i64)extSize, (char*)u + len);
+ TSorter::PushWithExtInfo(u);
+ return u;
+ }
+
+ char* GetReservedExt(TVal* rec, size_t len, size_t extSize) {
+ return (char*)rec + len + len_long((i64)extSize);
+ }
+
+ const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
+ const TVal* u = Memo.Push(v, extInfo);
+ if (!u) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Freeze();
+ u = Memo.Push(v, extInfo);
+ if (!u) {
+ if (SizeOf(v) > Memo.GetPageSize()) {
+ ythrow yexception() << "Size of element " << SizeOf(v)
+ << " is larger than page size " << Memo.GetPageSize();
+ }
+ ythrow yexception() << "going to insert a null pointer. Bad.";
+ }
+ }
+ TSorter::PushWithExtInfo(u);
+ return u;
+ }
+
+ const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
+ const TVal* u = Memo.Push(v, extInfoRaw, extLen);
+ if (!u) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Freeze();
+ u = Memo.Push(v, extInfoRaw, extLen);
+ if (!u) {
+ if (SizeOf(v) > Memo.GetPageSize()) {
+ ythrow yexception() << "Size of element " << SizeOf(v)
+ << " is larger than page size " << Memo.GetPageSize();
+ }
+ ythrow yexception() << "going to insert a null pointer. Bad..";
+ }
+ }
+ TSorter::PushWithExtInfo(u);
+ return u;
+ }
+
+ const TVal* PushWithExtInfo(const TVal* v) {
+ const TVal* u = Memo.PushWithExtInfo(v);
+ if (!u) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Freeze();
+ u = Memo.PushWithExtInfo(v);
+ if (!u) {
+ if (SizeOf(v) > Memo.GetPageSize()) {
+ ythrow yexception() << "Size of element " << SizeOf(v)
+ << " is larger than page size " << Memo.GetPageSize();
+ }
+ ythrow yexception() << "going to insert a null pointer. Bad...";
+ }
+ }
+ TSorter::PushWithExtInfo(u);
+ return u;
+ }
+
+ void Sort(bool direct = false) {
+ if (Opened) {
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Close();
+ OpenReq = false;
+ TSorter::Sort(direct);
+ } else {
+ TSorter::SortPortion();
+ }
+ }
+
+ const TVal* Next() {
+ return Opened ? TSorter::Next() : TSorter::Nextp();
+ }
+
+ bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
+ return NMicroBDB::GetExtInfo(Current(), extInfo);
+ }
+
+ const ui8* GetExtInfoRaw(size_t* len) const {
+ return NMicroBDB::GetExtInfoRaw(Current(), len);
+ }
+
+ const TVal* Current() const {
+ return Opened ? TSorter::Current() : TSorter::Currentp();
+ }
+
+ int NextPortion() {
+ OpenIfNeeded();
+ return TSorter::NextPortion(UseDirectWrite);
+ }
+
+ void SortToFile(const char* name) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Close();
+ OpenReq = false;
+ TSorter::SortToFile(name);
+ }
+
+ void SortToStream(TAutoPtr<IOutputStream> output) {
+ OpenIfNeeded();
+ TSorter::NextPortion(UseDirectWrite);
+ Memo.Close();
+ OpenReq = false;
+ TSorter::SortToStream(output);
+ }
+
+ template <typename TKey, typename TOutCompress>
+ void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) {
+ Sort();
+ TOutDirectFile<TVal, TKey, TOutCompress> out(TSorter::Name, TSorter::PageSize, TSorter::Pages, ipagesize, ipages, TSorter::PagesOrBytes);
+ out.Open(name);
+ while (const TVal* rec = Next())
+ out.PushWithExtInfo(rec);
+ out.Close();
+ }
+
+ template <typename TKey>
+ void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) {
+ SortToDirectFile<TKey, TCompress>(name, ipagesize, ipages);
+ }
+
+ void CloseSorter() {
+ if (Opened)
+ TSorter::Close();
+ else
+ TSorter::Closep();
+ Memo.Freeze();
+ Opened = false;
+ }
+
+ void Close() {
+ if (Opened)
+ TSorter::Close();
+ else
+ TSorter::Closep();
+ Memo.Close();
+ OpenReq = false;
+ Opened = false;
+ }
+
+ int SavePortions(const char* mask) {
+ return TSorter::SavePortions(mask, UseDirectWrite);
+ }
+
+public:
+ using TSorter::RestorePortions;
+};
+
+template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression,
+ typename TSieve = TFakeSieve<TVal>, class TPageFile = TOutputPageFile, class TFileTypes = TDefInterFileTypes>
+class TDatSorterMemoArray: public TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> {
+public:
+ typedef TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> TBase;
+
+ TDatSorterMemoArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
+ : TBase(name, memory, pagesize, pages, pagesOrBytes)
+ {
+ }
+};
+
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif
diff --git a/library/cpp/microbdb/sorter.h b/library/cpp/microbdb/sorter.h
new file mode 100644
index 0000000000..b2e7390377
--- /dev/null
+++ b/library/cpp/microbdb/sorter.h
@@ -0,0 +1,677 @@
+#pragma once
+
+#include <util/ysaveload.h>
+#include <util/generic/algorithm.h>
+#include <contrib/libs/libc_compat/include/link/link.h>
+
+#include "header.h"
+#include "heap.h"
+#include "extinfo.h"
+#include "input.h"
+#include "output.h"
+
+#ifdef TEST_MERGE
+#define MBDB_SORT_FUN ::StableSort
+#else
+#define MBDB_SORT_FUN ::Sort
+#endif
+
+template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile, typename TFileTypes>
+class TDatSorterImpl;
+
+template <class TVal>
+struct TFakeSieve {
+ static inline int Sieve(TVal*, const TVal*) noexcept {
+ return 0;
+ }
+};
+
+template <class TSieve>
+struct TIsSieveFake {
+ static const bool Result = false;
+};
+
+template <class T>
+struct TIsSieveFake<TFakeSieve<T>> {
+ static const bool Result = true;
+};
+
+class TDefInterFileTypes {
+public:
+ typedef TOutputPageFile TOutPageFile;
+ typedef TInputPageFile TInPageFile;
+};
+
+//class TCompressedInterFileTypes;
+
+template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
+class TDatSorterImplBase: protected THeapIter<TVal, TInDatFileImpl<TVal, TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>>>, TCompare> {
+ typedef TOutputRecordIterator<TVal, TOutputPageIterator<typename TFileTypes::TOutPageFile>, TFakeIndexer, TCompress> TTmpRecIter;
+ typedef TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>> TInTmpRecIter;
+
+public:
+ typedef TOutDatFileImpl<TVal, TTmpRecIter> TTmpOut;
+ typedef TInDatFileImpl<TVal, TInTmpRecIter> TTmpIn;
+
+ typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TOutPageFile>, TFakeIndexer, TCompress>> TOut;
+ typedef THeapIter<TVal, TTmpIn, TCompare> TMyHeap;
+ typedef TVector<const TVal*> TMyVector;
+ typedef typename TMyVector::iterator TMyIterator;
+
+ class IPortionSorter {
+ public:
+ virtual ~IPortionSorter() {
+ }
+
+ virtual void Sort(TMyVector&, TTmpOut*) = 0;
+ };
+
+ class TDefaultSorter: public IPortionSorter {
+ public:
+ void Sort(TMyVector& vector, TTmpOut* out) override {
+ MBDB_SORT_FUN(vector.begin(), vector.end(), TCompare());
+
+ const typename TMyVector::const_iterator
+ end = (TIsSieveFake<TSieve>::Result) ? vector.end() : TDatSorterImplBase::SieveRange(vector.begin(), vector.end());
+
+ for (typename TMyVector::const_iterator it = vector.begin(); it != end; ++it) {
+ out->PushWithExtInfo(*it);
+ }
+ }
+ };
+
+ class TSegmentedSorter: public IPortionSorter {
+ class TAdaptor {
+ typedef typename TMyVector::const_iterator TConstIterator;
+
+ public:
+ TAdaptor(TConstIterator b, TConstIterator e)
+ : Curr_(b)
+ , End_(e)
+ {
+ --Curr_;
+ }
+
+ inline const TVal* Current() const {
+ return *Curr_;
+ }
+
+ inline const TVal* Next() {
+ ++Curr_;
+
+ if (Curr_ == End_) {
+ return nullptr;
+ }
+
+ return *Curr_;
+ }
+
+ private:
+ TConstIterator Curr_;
+ TConstIterator End_;
+ };
+
+ typedef THeapIter<TVal, TAdaptor, TCompare> TPortionsHeap;
+
+ public:
+ void Sort(TMyVector& vector, TTmpOut* out) override {
+ TVector<TAdaptor> bounds;
+ typename TMyVector::iterator
+ it = vector.begin();
+ const size_t portions = Max<size_t>(1, (vector.size() * sizeof(TVal)) / (4 << 20));
+ const size_t step = vector.size() / portions;
+
+ // Sort segments
+ while (it != vector.end()) {
+ const typename TMyVector::iterator
+ end = Min(it + step, vector.end());
+
+ MBDB_SORT_FUN(it, end, TCompare());
+
+ bounds.push_back(TAdaptor(it, end));
+
+ it = end;
+ }
+
+ //
+ // Merge result
+ //
+
+ TPortionsHeap heap(bounds);
+
+ if (TIsSieveFake<TSieve>::Result) {
+ while (const TVal* val = heap.Next()) {
+ out->PushWithExtInfo(val);
+ }
+ } else {
+ const TVal* val = heap.Next();
+ const TVal* prev = out->PushWithExtInfo(val);
+
+ for (val = heap.Next(); val && prev; val = heap.Next()) {
+ if (TSieve::Sieve((TVal*)prev, val)) {
+ continue;
+ }
+
+ prev = out->PushWithExtInfo(val);
+ }
+
+ if (prev) {
+ TSieve::Sieve((TVal*)prev, prev);
+ }
+ }
+ }
+ };
+
+public:
+ TDatSorterImplBase()
+ : Sorter(new TDefaultSorter)
+ {
+ InFiles = nullptr;
+ TempBuf = nullptr;
+ Ptr = Vector.end();
+ Cur = nullptr;
+ Portions = CPortions = Error = 0;
+ }
+
+ ~TDatSorterImplBase() {
+ Close();
+ }
+
+ int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
+ Portions = CPortions = Error = 0;
+ TempBuf = strdup(templ);
+ Pagesize = pagesize;
+ if (pagesOrBytes)
+ Pages = pages;
+ else
+ Pages = pages / pagesize;
+ Pages = Max(1, Pages);
+ return 0;
+ }
+
+ void Push(const TVal* v) {
+ // Serialized extInfo must follow a record being pushed, therefore, to avoid
+ // unintentional misusage (as if when you are adding TExtInfo in your record
+ // type: you may forget to check your sorting routines and get a segfault as
+ // a result).
+ // PushWithExtInfo(v) should be called on records with extInfo.
+ static_assert(!TExtInfoType<TVal>::Exists, "expect !TExtInfoType<TVal>::Exists");
+
+ Vector.push_back(v);
+ }
+
+ void PushWithExtInfo(const TVal* v) {
+ Vector.push_back(v);
+ }
+
+ int SortPortion() {
+ Ptr = Vector.end();
+ Cur = nullptr;
+ if (!Vector.size() || Error)
+ return Error;
+
+ MBDB_SORT_FUN(Vector.begin(), Vector.end(), TCompare());
+
+ if (!TIsSieveFake<TSieve>::Result) {
+ const typename TMyVector::iterator
+ end = SieveRange(Vector.begin(), Vector.end());
+
+ Vector.resize(end - Vector.begin());
+ }
+
+ Ptr = Vector.begin();
+ Cur = nullptr;
+ return 0;
+ }
+
+ const TVal* Nextp() {
+ Cur = Ptr == Vector.end() ? nullptr : *Ptr++;
+ return Cur;
+ }
+
+ const TVal* Currentp() const {
+ return Cur;
+ }
+
+ void Closep() {
+ Vector.clear();
+ Ptr = Vector.end();
+ Cur = nullptr;
+ }
+
+ int NextPortion(bool direct = false) {
+ if (!Vector.size() || Error)
+ return Error;
+
+ TTmpOut out;
+ int ret, ret1;
+ char fname[FILENAME_MAX];
+
+ snprintf(fname, sizeof(fname), TempBuf, Portions++);
+ if ((ret = out.Open(fname, Pagesize, Pages, 1, direct)))
+ return Error = ret;
+
+ Sorter->Sort(Vector, &out);
+
+ Vector.erase(Vector.begin(), Vector.end());
+ ret = out.GetError();
+ ret1 = out.Close();
+ Error = Error ? Error : ret ? ret : ret1;
+ if (Error)
+ unlink(fname);
+ return Error;
+ }
+
+ int SavePortions(const char* mask, bool direct = false) {
+ char srcname[PATH_MAX], dstname[PATH_MAX];
+ if (Vector.size())
+ NextPortion(direct);
+ for (int i = 0; i < Portions; i++) {
+ char num[10];
+ sprintf(num, "%i", i);
+ snprintf(srcname, sizeof(srcname), TempBuf, i);
+ snprintf(dstname, sizeof(dstname), mask, num);
+ int res = rename(srcname, dstname);
+ if (res)
+ return res;
+ }
+ snprintf(dstname, sizeof(dstname), mask, "count");
+ TOFStream fcount(dstname);
+ Save(&fcount, Portions);
+ fcount.Finish();
+ return 0;
+ }
+
+ int RestorePortions(const char* mask) {
+ char srcname[PATH_MAX], dstname[PATH_MAX];
+ snprintf(srcname, sizeof(srcname), mask, "count");
+ TIFStream fcount(srcname);
+ Load(&fcount, Portions);
+ for (int i = 0; i < Portions; i++) {
+ char num[10];
+ sprintf(num, "%i", i);
+ snprintf(dstname, sizeof(dstname), TempBuf, i);
+ snprintf(srcname, sizeof(srcname), mask, num);
+ unlink(dstname);
+ int res = link(srcname, dstname);
+ if (res)
+ return res;
+ }
+ return 0;
+ }
+
+ int RestorePortions(const char* mask, ui32 count) {
+ char srcname[PATH_MAX], dstname[PATH_MAX];
+ ui32 portions;
+ TVector<ui32> counts;
+ for (ui32 j = 0; j < count; j++) {
+ snprintf(srcname, sizeof(srcname), mask, j, "count");
+ TIFStream fcount(srcname);
+ Load(&fcount, portions);
+ counts.push_back(portions);
+ Portions += portions;
+ }
+ ui32 p = 0;
+ for (ui32 j = 0; j < count; j++) {
+ int cnt = counts[j];
+ for (int i = 0; i < cnt; i++, p++) {
+ char num[10];
+ sprintf(num, "%i", i);
+ snprintf(dstname, sizeof(dstname), TempBuf, p);
+ snprintf(srcname, sizeof(srcname), mask, j, num);
+ unlink(dstname);
+ int res = link(srcname, dstname);
+ if (res) {
+ fprintf(stderr, "Can not link %s to %s\n", srcname, dstname);
+ return res;
+ }
+ }
+ }
+ return 0;
+ }
+
+ int Sort(size_t memory, int maxportions = 1000, bool direct = false) {
+ int ret, end, beg, i;
+ char fname[FILENAME_MAX];
+
+ if (Vector.size())
+ NextPortion();
+
+ if (Error)
+ return Error;
+ if (!Portions) {
+ TMyHeap::Init(&DummyFile, 1); // closed file
+ HPages = 1;
+ return 0;
+ }
+
+ Optimize(memory, maxportions);
+ if (!(InFiles = new TTmpIn[MPortions]))
+ return MBDB_NO_MEMORY;
+
+ for (beg = 0; beg < Portions && !Error; beg = end) {
+ end = (int)Min(beg + FPortions, Portions);
+ for (i = beg; i < end && !Error; i++) {
+ snprintf(fname, sizeof(fname), TempBuf, i);
+ if ((ret = InFiles[i - beg].Open(fname, HPages, 1, nullptr, direct)))
+ Error = Error ? Error : ret;
+ }
+ if (Error)
+ return Error;
+ TMyHeap::Init(InFiles, end - beg);
+ if (end != Portions) {
+ TTmpOut out;
+ const TVal* v;
+ snprintf(fname, sizeof(fname), TempBuf, Portions++);
+ if ((ret = out.Open(fname, Pagesize, HPages)))
+ return Error = Error ? Error : ret;
+ while ((v = TMyHeap::Next()))
+ out.PushWithExtInfo(v);
+ ret = out.GetError();
+ Error = Error ? Error : ret;
+ ret = out.Close();
+ Error = Error ? Error : ret;
+ for (i = beg; i < end; i++) {
+ ret = InFiles[i - beg].Close();
+ Error = Error ? Error : ret;
+ snprintf(fname, sizeof(fname), TempBuf, CPortions++);
+ unlink(fname);
+ }
+ }
+ FPortions = MPortions;
+ }
+ return Error;
+ }
+
+ int Close() {
+ char fname[FILENAME_MAX];
+ delete[] InFiles;
+ InFiles = nullptr;
+ Closep();
+ for (int i = CPortions; i < Portions; i++) {
+ snprintf(fname, sizeof(fname), TempBuf, i);
+ unlink(fname);
+ }
+ CPortions = Portions = 0;
+ free(TempBuf);
+ TempBuf = nullptr;
+ return Error;
+ }
+
+ void UseSegmentSorter() {
+ Sorter.Reset(new TSegmentedSorter);
+ }
+
+ inline int GetError() const {
+ return Error;
+ }
+
+ inline int GetPages() const {
+ return Pages;
+ }
+
+ inline int GetPageSize() const {
+ return Pagesize;
+ }
+
+private:
+ static TMyIterator SieveRange(const TMyIterator begin, const TMyIterator end) {
+ TMyIterator it = begin;
+ TMyIterator prev = begin;
+
+ for (++it; it != end; ++it) {
+ if (TSieve::Sieve((TVal*)*prev, *it)) {
+ continue;
+ }
+
+ ++prev;
+
+ if (it != prev) {
+ *prev = *it;
+ }
+ }
+
+ TSieve::Sieve((TVal*)*prev, *prev);
+
+ return ++prev;
+ }
+
+protected:
+ void Optimize(size_t memory, int maxportions, size_t fbufmax = 256u << 20) {
+ maxportions = (int)Min((size_t)maxportions, memory / Pagesize) - 1;
+ size_t maxpages = Max((size_t)1u, fbufmax / Pagesize);
+
+ if (maxportions <= 2) {
+ FPortions = MPortions = 2;
+ HPages = 1;
+ return;
+ }
+ if (maxportions >= Portions) {
+ FPortions = MPortions = Portions;
+ HPages = (int)Min(memory / ((Portions + 1) * Pagesize), maxpages);
+ return;
+ }
+ if (((Portions + maxportions - 1) / maxportions) <= maxportions) {
+ while (((Portions + maxportions - 1) / maxportions) <= maxportions)
+ --maxportions;
+ MPortions = ++maxportions;
+ int total = ((Portions + maxportions - 1) / maxportions) + Portions;
+ FPortions = (total % maxportions) ? (total % maxportions) : MPortions;
+ HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages);
+ return;
+ }
+ FPortions = MPortions = maxportions;
+ HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages);
+ }
+
+ TMyVector Vector;
+ typename TMyVector::iterator Ptr;
+ const TVal* Cur;
+ TTmpIn *InFiles, DummyFile;
+ char* TempBuf;
+ int Portions, CPortions, Pagesize, Pages, Error;
+ int FPortions, MPortions, HPages;
+ THolder<IPortionSorter> Sorter;
+};
+
+template <class TVal, class TCompare, typename TCompress>
+class TDatSorterImpl<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes>
+ : public TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> {
+ typedef TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> TBase;
+
+public:
+ int SortToFile(const char* name, size_t memory, int maxportions = 1000) {
+ int ret = TBase::Sort(memory, maxportions);
+ if (ret)
+ return ret;
+ typename TBase::TOut out;
+ if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages)))
+ return ret;
+ const TVal* rec;
+ while ((rec = Next()))
+ out.PushWithExtInfo(rec);
+ if ((ret = out.GetError()))
+ return ret;
+ if ((ret = out.Close()))
+ return ret;
+ if ((ret = TBase::Close()))
+ return ret;
+ return 0;
+ }
+
+ int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) {
+ int ret = TBase::Sort(memory, maxportions);
+ if (ret)
+ return ret;
+ typename TBase::TOut out;
+ if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages)))
+ return ret;
+ const TVal* rec;
+ while ((rec = Next()))
+ out.PushWithExtInfo(rec);
+ if ((ret = out.GetError()))
+ return ret;
+ if ((ret = out.Close()))
+ return ret;
+ if ((ret = TBase::Close()))
+ return ret;
+ return 0;
+ }
+
+ const TVal* Next() {
+ return TBase::TMyHeap::Next();
+ }
+
+ const TVal* Current() const {
+ return TBase::TMyHeap::Current();
+ }
+
+ bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
+ return TBase::TMyHeap::GetExtInfo(extInfo);
+ }
+
+ const ui8* GetExtInfoRaw(size_t* len) const {
+ return TBase::TMyHeap::GetExtInfoRaw(len);
+ }
+};
+
+template <class TVal, class TCompare, typename TCompress, typename TSieve,
+ typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
+class TDatSorterImpl: public TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> {
+ typedef TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> TBase;
+
+public:
+ TDatSorterImpl()
+ : Cur(nullptr)
+ , Prev(nullptr)
+ {
+ }
+
+ int SortToFile(const char* name, size_t memory, int maxportions = 1000) {
+ int ret = Sort(memory, maxportions);
+ if (ret)
+ return ret;
+ typename TBase::TOut out;
+ if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages)))
+ return ret;
+ const TVal* rec;
+ while ((rec = Next()))
+ out.PushWithExtInfo(rec);
+ if ((ret = out.GetError()))
+ return ret;
+ if ((ret = out.Close()))
+ return ret;
+ if ((ret = TBase::Close()))
+ return ret;
+ return 0;
+ }
+
+ int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) {
+ int ret = Sort(memory, maxportions);
+ if (ret)
+ return ret;
+ typename TBase::TOut out;
+ if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages)))
+ return ret;
+ const TVal* rec;
+ while ((rec = Next()))
+ out.PushWithExtInfo(rec);
+ if ((ret = out.GetError()))
+ return ret;
+ if ((ret = out.Close()))
+ return ret;
+ if ((ret = TBase::Close()))
+ return ret;
+ return 0;
+ }
+
+ int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
+ int res = TBase::Open(templ, pagesize, pages, pagesOrBytes);
+ Prev = nullptr;
+ Cur = nullptr;
+ return res;
+ }
+
+ int Sort(size_t memory, int maxportions = 1000, bool direct = false) {
+ int res = TBase::Sort(memory, maxportions, direct);
+ if (!res) {
+ const TVal* rec = TBase::TMyHeap::Next();
+ if (rec) {
+ size_t els, es;
+ size_t sz = NMicroBDB::SizeOfExt(rec, &els, &es);
+ sz += els + es;
+ if (!TExtInfoType<TVal>::Exists)
+ Cur = (TVal*)malloc(sizeof(TVal));
+ else
+ Cur = (TVal*)malloc(TBase::Pagesize);
+ memcpy(Cur, rec, sz);
+ }
+ }
+ return res;
+ }
+
+ // Prev = last returned
+ // Cur = current accumlating with TSieve
+
+ const TVal* Next() {
+ if (!Cur) {
+ if (Prev) {
+ free(Prev);
+ Prev = nullptr;
+ }
+ return nullptr;
+ }
+ const TVal* rec;
+
+ if (TIsSieveFake<TSieve>::Result)
+ rec = TBase::TMyHeap::Next();
+ else {
+ do {
+ rec = TBase::TMyHeap::Next();
+ } while (rec && TSieve::Sieve((TVal*)Cur, rec));
+ }
+
+ if (!Prev) {
+ if (!TExtInfoType<TVal>::Exists)
+ Prev = (TVal*)malloc(sizeof(TVal));
+ else
+ Prev = (TVal*)malloc(TBase::Pagesize);
+ }
+ size_t els, es;
+ size_t sz = NMicroBDB::SizeOfExt(Cur, &els, &es);
+ sz += els + es;
+ memcpy(Prev, Cur, sz);
+
+ if (rec) {
+ sz = NMicroBDB::SizeOfExt(rec, &els, &es);
+ sz += els + es;
+ memcpy(Cur, rec, sz);
+ } else {
+ TSieve::Sieve((TVal*)Cur, Cur);
+ free(Cur);
+ Cur = nullptr;
+ }
+ return Prev;
+ }
+
+ const TVal* Current() const {
+ return Prev;
+ }
+
+ int Close() {
+ int res = TBase::Close();
+ if (Prev) {
+ free(Prev);
+ Prev = nullptr;
+ }
+ if (Cur) {
+ free(Cur);
+ Cur = nullptr;
+ }
+ return res;
+ }
+
+protected:
+ TVal* Cur;
+ TVal* Prev;
+};
diff --git a/library/cpp/microbdb/sorterdef.h b/library/cpp/microbdb/sorterdef.h
new file mode 100644
index 0000000000..8834b5fff8
--- /dev/null
+++ b/library/cpp/microbdb/sorterdef.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#define MAKESORTERTMPL(TRecord, MemberFunc) \
+ template <typename T> \
+ struct MemberFunc; \
+ template <> \
+ struct MemberFunc<TRecord> { \
+ bool operator()(const TRecord* l, const TRecord* r) { \
+ return TRecord ::MemberFunc(l, r) < 0; \
+ } \
+ int operator()(const TRecord* l, const TRecord* r, int) { \
+ return TRecord ::MemberFunc(l, r); \
+ } \
+ }
+
+template <typename T>
+static inline int compare(const T& a, const T& b) {
+ return (a < b) ? -1 : (a > b);
+}
diff --git a/library/cpp/microbdb/utility.h b/library/cpp/microbdb/utility.h
new file mode 100644
index 0000000000..5c86061bca
--- /dev/null
+++ b/library/cpp/microbdb/utility.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include "microbdb.h"
+
+template <class TRecord, template <class T> class TCompare>
+int SortData(const TFile& ifile, const TFile& ofile, const TDatMetaPage* meta, size_t memory, const char* tmpDir = nullptr) {
+ char templ[FILENAME_MAX];
+ TInDatFileImpl<TRecord> datin;
+ TOutDatFileImpl<TRecord> datout;
+ TDatSorterImpl<TRecord, TCompare<TRecord>, TFakeCompression, TFakeSieve<TRecord>> sorter;
+ const TRecord* u;
+ int ret;
+
+ const size_t minMemory = (2u << 20);
+ memory = Max(memory, minMemory + minMemory / 2);
+ if (datin.Open(ifile, meta, memory - minMemory, 0))
+ err(1, "can't read input file");
+
+ size_t outpages = Max((size_t)2u, minMemory / datin.GetPageSize());
+ memory -= outpages * datin.GetPageSize();
+
+ if (ret = MakeSorterTempl(templ, tmpDir))
+ err(1, "can't create tempdir in \"%s\"; error: %d\n", templ, ret);
+
+ if (sorter.Open(templ, datin.GetPageSize(), outpages)) {
+ *strrchr(templ, LOCSLASH_C) = 0;
+ RemoveDirWithContents(templ);
+ err(1, "can't open sorter");
+ }
+
+ while (1) {
+ datin.Freeze();
+ while ((u = datin.Next()))
+ sorter.PushWithExtInfo(u);
+ sorter.NextPortion();
+ if (datin.GetError() || datin.IsEof())
+ break;
+ }
+
+ if (datin.GetError()) {
+ *strrchr(templ, LOCSLASH_C) = 0;
+ RemoveDirWithContents(templ);
+ err(1, "in data file error %d", datin.GetError());
+ }
+ if (datin.Close()) {
+ *strrchr(templ, LOCSLASH_C) = 0;
+ RemoveDirWithContents(templ);
+ err(1, "can't close in data file");
+ }
+
+ sorter.Sort(memory);
+
+ if (datout.Open(ofile, datin.GetPageSize(), outpages)) {
+ *strrchr(templ, LOCSLASH_C) = 0;
+ RemoveDirWithContents(templ);
+ err(1, "can't write out file");
+ }
+
+ while ((u = sorter.Next()))
+ datout.PushWithExtInfo(u);
+
+ if (sorter.GetError())
+ err(1, "sorter error %d", sorter.GetError());
+ if (sorter.Close())
+ err(1, "can't close sorter");
+
+ *strrchr(templ, LOCSLASH_C) = 0;
+ RemoveDirWithContents(templ);
+
+ if (datout.GetError())
+ err(1, "out data file error %d", datout.GetError());
+ if (datout.Close())
+ err(1, "can't close out data file");
+ return 0;
+}
diff --git a/library/cpp/microbdb/wrappers.h b/library/cpp/microbdb/wrappers.h
new file mode 100644
index 0000000000..38eb8edebc
--- /dev/null
+++ b/library/cpp/microbdb/wrappers.h
@@ -0,0 +1,637 @@
+#pragma once
+
+#include "microbdb.h"
+
+#define MAKEFILTERTMPL(TRecord, MemberFunc, NS) \
+ template <typename T> \
+ struct MemberFunc; \
+ template <> \
+ struct MemberFunc<TRecord> { \
+ bool operator()(const TRecord* r) { \
+ return NS::MemberFunc(r); \
+ } \
+ }
+
+#define MAKEJOINTMPL(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \
+ template <typename A, typename B> \
+ struct MemberFunc; \
+ template <> \
+ struct MemberFunc<TRecordA, TRecordB> { \
+ int operator()(const TRecordA* l, const TRecordB* r) { \
+ return NS::MemberFunc(l, r); \
+ } \
+ }; \
+ typedef TMergeRec<TRecordA, TRecordB> TMergeType
+
+#define MAKEJOINTMPL2(TRecordA, TRecordB, MemberFunc, StructName, TMergeType) \
+ template <typename A, typename B> \
+ struct StructName; \
+ template <> \
+ struct StructName<TRecordA, TRecordB> { \
+ int operator()(const TRecordA* l, const TRecordB* r) { \
+ return MemberFunc(l, r); \
+ } \
+ }; \
+ typedef TMergeRec<TRecordA, TRecordB> TMergeType
+
+#define MAKEJOINTMPLLEFT(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \
+ template <typename A, typename B> \
+ struct MemberFunc; \
+ template <> \
+ struct MemberFunc<TRecordA, TRecordB> { \
+ int operator()(const TRecordA* l, const TRecordB* r) { \
+ return NS::MemberFunc(l->RecA, r); \
+ } \
+ }; \
+ typedef TMergeRec<TRecordA, TRecordB> TMergeType
+
+template <class TRec>
+class IDatNextSource {
+public:
+ virtual const TRec* Next() = 0;
+ virtual void Work() {
+ }
+};
+
+template <class TRec>
+class IDatNextReceiver {
+public:
+ IDatNextReceiver(IDatNextSource<TRec>& source)
+ : Source(source)
+ {
+ }
+
+ virtual void Work() {
+ Source.Work();
+ }
+
+protected:
+ IDatNextSource<TRec>& Source;
+};
+
+template <class TInRec, class TOutRec>
+class IDatNextChannel: public IDatNextReceiver<TInRec>, public IDatNextSource<TOutRec> {
+public:
+ IDatNextChannel(IDatNextSource<TInRec>& source)
+ : IDatNextReceiver<TInRec>(source)
+ {
+ }
+
+ virtual void Work() {
+ IDatNextReceiver<TInRec>::Work();
+ }
+};
+
+class IDatWorker {
+public:
+ virtual void Work() = 0;
+};
+
+template <class TRec>
+class IDatPushReceiver {
+public:
+ virtual void Push(const TRec* rec) = 0;
+ virtual void Work() = 0;
+};
+
+template <class TRec>
+class IDatPushSource {
+public:
+ IDatPushSource(IDatPushReceiver<TRec>& receiver)
+ : Receiver(receiver)
+ {
+ }
+
+ virtual void Work() {
+ Receiver.Work();
+ }
+
+protected:
+ IDatPushReceiver<TRec>& Receiver;
+};
+
+template <class TInRec, class TOutRec>
+class IDatPushChannel: public IDatPushReceiver<TInRec>, public IDatPushSource<TOutRec> {
+public:
+ IDatPushChannel(IDatPushReceiver<TOutRec>& receiver)
+ : IDatPushSource<TOutRec>(receiver)
+ {
+ }
+
+ virtual void Work() {
+ IDatPushSource<TOutRec>::Work();
+ }
+};
+
+template <class TRec>
+class IDatNextToPush: public IDatNextReceiver<TRec>, public IDatPushSource<TRec> {
+ typedef IDatNextReceiver<TRec> TNextReceiver;
+ typedef IDatPushSource<TRec> TPushSource;
+
+public:
+ IDatNextToPush(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver)
+ : TNextReceiver(source)
+ , TPushSource(receiver)
+ {
+ }
+
+ virtual void Work() {
+ const TRec* rec;
+ while (rec = TNextReceiver::Source.Next())
+ TPushSource::Receiver.Push(rec);
+ TPushSource::Work();
+ TNextReceiver::Work();
+ }
+};
+
+template <class TRec>
+class TDatNextPNSplitter: public IDatNextReceiver<TRec>, public IDatNextSource<TRec>, public IDatPushSource<TRec> {
+public:
+ TDatNextPNSplitter(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver)
+ : IDatNextReceiver<TRec>(source)
+ , IDatNextSource<TRec>()
+ , IDatPushSource<TRec>(receiver)
+ {
+ }
+
+ const TRec* Next() {
+ const TRec* rec = IDatNextReceiver<TRec>::Source.Next();
+ if (rec) {
+ IDatPushSource<TRec>::Receiver.Push(rec);
+ return rec;
+ } else {
+ return 0;
+ }
+ }
+
+ virtual void Work() {
+ IDatNextReceiver<TRec>::Work();
+ IDatPushSource<TRec>::Work();
+ }
+};
+
+template <class TRec, class TOutRecA = TRec, class TOutRecB = TRec>
+class TDatPushPPSplitter: public IDatPushReceiver<TRec>, public IDatPushSource<TOutRecA>, public IDatPushSource<TOutRecB> {
+public:
+ TDatPushPPSplitter(IDatPushReceiver<TOutRecA>& receiverA, IDatPushReceiver<TOutRecB>& receiverB)
+ : IDatPushSource<TOutRecA>(receiverA)
+ , IDatPushSource<TOutRecB>(receiverB)
+ {
+ }
+
+ void Push(const TRec* rec) {
+ IDatPushSource<TOutRecA>::Receiver.Push(rec);
+ IDatPushSource<TOutRecB>::Receiver.Push(rec);
+ }
+
+ void Work() {
+ IDatPushSource<TOutRecA>::Work();
+ IDatPushSource<TOutRecB>::Work();
+ }
+};
+
+template <class TRec>
+class TFastInDatFile: public TInDatFile<TRec>, public IDatNextSource<TRec> {
+public:
+ typedef TInDatFile<TRec> Base;
+
+ TFastInDatFile(const char* name, bool open = true, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0)
+ : TInDatFile<TRec>(name, pages, pagesOrBytes)
+ , FileName(name)
+ {
+ if (open)
+ Base::Open(name);
+ }
+
+ void Open() {
+ Base::Open(FileName);
+ }
+
+ template <class TPassRec>
+ bool PassToUid(const TRec* inrec, const TPassRec* torec) {
+ inrec = Base::Current();
+ while (inrec && CompareUids(inrec, torec) < 0)
+ inrec = Base::Next();
+ return (inrec && CompareUids(inrec, torec) == 0);
+ }
+
+ void Work() {
+ Base::Close();
+ }
+
+ const TRec* Next() {
+ return Base::Next();
+ }
+
+private:
+ TString FileName;
+};
+
+template <class TRec>
+class TPushOutDatFile: public TOutDatFile<TRec>, public IDatPushReceiver<TRec> {
+public:
+ typedef TOutDatFile<TRec> Base;
+
+ TPushOutDatFile(const char* name, bool open = true)
+ : Base(name, dbcfg::pg_docuid, dbcfg::fbufsize, 0)
+ , FileName(name)
+ {
+ if (open)
+ Base::Open(name);
+ }
+
+ void Open() {
+ Base::Open(~FileName);
+ }
+
+ void Push(const TRec* rec) {
+ Base::Push(rec);
+ }
+
+ void Work() {
+ Base::Close();
+ }
+
+private:
+ TString FileName;
+};
+
+template <class TRec>
+class TNextOutDatFile: public IDatNextToPush<TRec> {
+public:
+ typedef IDatNextToPush<TRec> TBase;
+
+ TNextOutDatFile(const char* name, IDatNextSource<TRec>& source, bool open = true)
+ : TBase(source, File)
+ , File(name, open)
+ {
+ }
+
+ void Open() {
+ File.Open();
+ }
+
+private:
+ TPushOutDatFile<TRec> File;
+};
+
+template <class TVal, template <typename T> class TCompare>
+class TNextDatSorterMemo: public TDatSorterMemo<TVal, TCompare>, public IDatNextChannel<TVal, TVal> {
+ typedef TDatSorterMemo<TVal, TCompare> TImpl;
+
+public:
+ TNextDatSorterMemo(IDatNextSource<TVal>& source, const char* dir = dbcfg::fname_temp, const char* name = "yet another sorter", size_t memory = dbcfg::small_sorter_size, size_t pagesize = dbcfg::pg_docuid, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0)
+ : TImpl(name, memory, pagesize, pages, pagesOrBytes)
+ , IDatNextChannel<TVal, TVal>(source)
+ , Sorted(false)
+ {
+ TImpl::Open(dir);
+ }
+
+ void Sort() {
+ const TVal* rec;
+ while (rec = IDatNextChannel<TVal, TVal>::Source.Next()) {
+ TImpl::Push(rec);
+ }
+ TImpl::Sort();
+ Sorted = true;
+ }
+
+ const TVal* Next() {
+ if (!Sorted)
+ Sort();
+ return TImpl::Next();
+ }
+
+private:
+ bool Sorted;
+ TString Dir;
+};
+
+template <class TInRec, class TOutRec>
+class TDatConverter: public IDatNextChannel<TInRec, TOutRec> {
+public:
+ TDatConverter(IDatNextSource<TInRec>& source)
+ : IDatNextChannel<TInRec, TOutRec>(source)
+ {
+ }
+
+ virtual void Convert(const TInRec& inrec, TOutRec& outrec) {
+ outrec(inrec);
+ }
+
+ const TOutRec* Next() {
+ const TInRec* rec = IDatNextChannel<TInRec, TOutRec>::Source.Next();
+ if (!rec)
+ return 0;
+ Convert(*rec, CurrentRec);
+ return &CurrentRec;
+ }
+
+private:
+ TOutRec CurrentRec;
+};
+
+template <class TRecA, class TRecB>
+class TMergeRec {
+public:
+ const TRecA* RecA;
+ const TRecB* RecB;
+};
+
+enum NMergeTypes {
+ MT_JOIN = 0,
+ MT_ADD = 1,
+ MT_OVERWRITE = 2,
+ MT_TYPENUM
+};
+
+template <class TRecA, class TRecB, template <typename TA, typename TB> class TCompare>
+class TNextDatMerger: public IDatNextReceiver<TRecA>, public IDatNextReceiver<TRecB>, public IDatNextSource<TMergeRec<TRecA, TRecB>> {
+public:
+ TNextDatMerger(IDatNextSource<TRecA>& sourceA, IDatNextSource<TRecB>& sourceB, ui8 mergeType)
+ : IDatNextReceiver<TRecA>(sourceA)
+ , IDatNextReceiver<TRecB>(sourceB)
+ , MergeType(mergeType)
+ , MoveA(false)
+ , MoveB(false)
+ , NotInit(true)
+ {
+ }
+
+ const TMergeRec<TRecA, TRecB>* Next() {
+ if (MoveA || NotInit)
+ SourceARec = IDatNextReceiver<TRecA>::Source.Next();
+ if (MoveB || NotInit)
+ SourceBRec = IDatNextReceiver<TRecB>::Source.Next();
+ NotInit = false;
+
+ // Cout << "Next " << SourceARec->HostId << "\t" << SourceBRec->HostId << "\t" << TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) << "\t" << ::compare(SourceARec->HostId, SourceBRec->HostId) << "\t" << ::compare(1, 2) << "\t" << ::compare(2,1) << Endl;
+ if (MergeType == MT_ADD && SourceARec && (!SourceBRec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) {
+ MergeRec.RecA = SourceARec;
+ MergeRec.RecB = 0;
+ MoveA = true;
+ MoveB = false;
+ return &MergeRec;
+ }
+
+ if (MergeType == MT_ADD && SourceBRec && (!SourceARec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) {
+ MergeRec.RecA = 0;
+ MergeRec.RecB = SourceBRec;
+ MoveA = false;
+ MoveB = true;
+ return &MergeRec;
+ }
+
+ if (MergeType == MT_ADD && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) == 0) {
+ MergeRec.RecA = SourceARec;
+ MergeRec.RecB = SourceBRec;
+ MoveA = true;
+ MoveB = true;
+ return &MergeRec;
+ }
+
+ while (MergeType == MT_JOIN && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) != 0) {
+ while (SourceARec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0) {
+ SourceARec = IDatNextReceiver<TRecA>::Source.Next();
+ }
+ while (SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) > 0) {
+ SourceBRec = IDatNextReceiver<TRecB>::Source.Next();
+ }
+ }
+
+ if (MergeType == MT_JOIN && SourceARec && SourceBRec) {
+ MergeRec.RecA = SourceARec;
+ MergeRec.RecB = SourceBRec;
+ MoveA = true;
+ MoveB = true;
+ return &MergeRec;
+ }
+
+ MergeRec.RecA = 0;
+ MergeRec.RecB = 0;
+ return 0;
+ }
+
+ void Work() {
+ IDatNextReceiver<TRecA>::Source.Work();
+ IDatNextReceiver<TRecB>::Source.Work();
+ }
+
+private:
+ TMergeRec<TRecA, TRecB> MergeRec;
+ const TRecA* SourceARec;
+ const TRecB* SourceBRec;
+ ui8 MergeType;
+ bool MoveA;
+ bool MoveB;
+ bool NotInit;
+};
+
+/*template<class TRec, class TSource, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
+class TPushDatMerger {
+public:
+ TPushDatMerger(TSource& source, TReceiver& receiver, ui8 mergeType)
+ : Source(source)
+ , Receiver(receiver)
+ , MergeType(mergeType)
+ {
+ }
+
+ virtual void Init() {
+ SourceRec = Source.Next();
+ }
+
+ virtual void Push(const TRec* rec) {
+ while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) < 0) {
+ if (MergeType == MT_OVERWRITE || MergeType == MT_ADD)
+ Receiver.Push(SourceRec);
+ SourceRec = Source.Next();
+ }
+
+ bool intersected = false;
+ while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) == 0) {
+ intersected = true;
+ if (MergeType == MT_ADD)
+ Receiver.Push(SourceRec);
+ SourceRec = Source.Next();
+ }
+
+ if (intersected && MergeType == MT_JOIN)
+ Receiver.Push(rec);
+
+ if (MergeType == MT_OVERWRITE || MergeType == MT_ADD)
+ Receiver.Push(rec);
+ }
+
+ virtual void Term() {
+ if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) {
+ while (SourceRec) {
+ Receiver.Push(SourceRec);
+ SourceRec = Source.Next();
+ }
+ }
+ }
+
+private:
+ TSource& Source;
+ const TRec* SourceRec;
+ TReceiver& Receiver;
+ ui8 MergeType;
+};*/
+
+/*template <class TRec, class TSourceA, class TSourceB, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
+class TNextDatMerger: public TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> {
+ typedef TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> TImpl;
+public:
+ TNextDatMerger(TSourceA& sourceA, TSourceB& sourceB, TReceiver& receiver, ui8 mergeType)
+ : TImpl(sourceA, receiver, mergeType)
+ , SourceB(sourceB)
+ {
+ }
+
+ virtual void Work() {
+ TImpl::Init();
+ while (SourceBRec = SourceB.Next()) {
+ TImpl::Push(SourceBRec);
+ }
+ TImpl::Term();
+ }
+private:
+ TSourceB& SourceB;
+ const TRec* SourceBRec;
+};*/
+
+/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
+class TFilePushDatMerger: public TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> {
+ typedef TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl;
+public:
+ TFilePushDatMerger(const char* name, TReceiver& receiver, ui8 mergeType)
+ : TImpl(SourceFile, receiver, mergeType)
+ , SourceFile(name)
+ {
+ }
+
+ virtual void Push(const TRec* rec) {
+ TImpl::Push(rec);
+ }
+
+ virtual void Term() {
+ TImpl::Term();
+ }
+private:
+ TFastInDatFile<TRec> SourceFile;
+};*/
+
+/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
+class TFileNextDatMerger: public TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> {
+ typedef TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl;
+public:
+ TFileNextDatMerger(const char* sourceAname, const char* sourceBname, TReceiver& receiver, ui8 mergeType)
+ : TImpl(FileA, FileB, receiver, mergeType)
+ , FileA(sourceAname)
+ , FileB(sourceBname)
+ {
+ }
+
+ virtual void Work() {
+ TImpl::Work();
+ }
+private:
+ TFastInDatFile<TRec> FileA;
+ TFastInDatFile<TRec> FileB;
+};*/
+
+template <class TRec, template <typename T> class TPredicate>
+class TDatNextFilter: public IDatNextChannel<TRec, TRec> {
+public:
+ TDatNextFilter(IDatNextSource<TRec>& source)
+ : IDatNextChannel<TRec, TRec>(source)
+ {
+ }
+
+ virtual const TRec* Next() {
+ const TRec* rec;
+ while ((rec = IDatNextChannel<TRec, TRec>::Source.Next()) != 0 && !Check(rec)) {
+ }
+ if (!rec)
+ return 0;
+ return rec;
+ }
+
+protected:
+ virtual bool Check(const TRec* rec) {
+ return TPredicate<TRec>()(rec);
+ }
+};
+
+template <class TRec, template <typename T> class TPredicate>
+class TDatPushFilter: public IDatPushChannel<TRec, TRec> {
+public:
+ TDatPushFilter(IDatPushReceiver<TRec>& receiver)
+ : IDatPushChannel<TRec, TRec>(receiver)
+ {
+ }
+
+ virtual void Push(const TRec* rec) {
+ if (Check(rec))
+ IDatPushChannel<TRec, TRec>::Receiver.Push(rec);
+ }
+
+private:
+ virtual bool Check(const TRec* rec) {
+ return TPredicate<TRec>()(rec);
+ }
+};
+
+template <class TInRec, class TOutRec, template <typename T> class TCompare>
+class TDatGrouper: public IDatNextChannel<TInRec, TOutRec> {
+public:
+ TDatGrouper(IDatNextSource<TInRec>& source)
+ : IDatNextChannel<TInRec, TOutRec>(source)
+ , Begin(true)
+ , Finish(false)
+ , HasOutput(false)
+ {
+ }
+
+ const TOutRec* Next() {
+ while (CurrentRec = IDatNextChannel<TInRec, TOutRec>::Source.Next()) {
+ int cmp = 0;
+ if (Begin) {
+ Begin = false;
+ OnStart();
+ } else if ((cmp = TCompare<TInRec>()(CurrentRec, LastRec, 0)) != 0) {
+ OnFinish();
+ OnStart();
+ }
+ OnRecord();
+ LastRec = CurrentRec;
+ if (HasOutput) {
+ HasOutput = false;
+ return &OutRec;
+ }
+ }
+ if (!Finish)
+ OnFinish();
+ Finish = true;
+ if (HasOutput) {
+ HasOutput = false;
+ return &OutRec;
+ }
+ return 0;
+ }
+
+protected:
+ virtual void OnStart() = 0;
+ virtual void OnRecord() = 0;
+ virtual void OnFinish() = 0;
+
+ const TInRec* CurrentRec;
+ const TInRec* LastRec;
+ TOutRec OutRec;
+
+ bool Begin;
+ bool Finish;
+ bool HasOutput;
+};