diff options
author | vvvv <vvvv@ydb.tech> | 2023-07-31 20:07:26 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-07-31 20:07:26 +0300 |
commit | f9e4743508b7930e884714cc99985ac45f84ed98 (patch) | |
tree | a1290261a4915a6f607e110e2cc27aee4c205f85 /library/cpp/microbdb | |
parent | 5cf9beeab3ea847da0b6c414fcb5faa9cb041317 (diff) | |
download | ydb-f9e4743508b7930e884714cc99985ac45f84ed98.tar.gz |
Use UDFs from YDB
Diffstat (limited to 'library/cpp/microbdb')
27 files changed, 0 insertions, 7387 deletions
diff --git a/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt b/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index c4d2e9d3a41..00000000000 --- a/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,56 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -find_package(ZLIB REQUIRED) -get_built_tool_path( - TOOL_protoc_bin - TOOL_protoc_dependency - contrib/tools/protoc/bin - protoc -) -get_built_tool_path( - TOOL_cpp_styleguide_bin - TOOL_cpp_styleguide_dependency - contrib/tools/protoc/plugins/cpp_styleguide - cpp_styleguide -) - -add_library(library-cpp-microbdb) -target_link_libraries(library-cpp-microbdb PUBLIC - contrib-libs-cxxsupp - yutil - contrib-libs-fastlz - contrib-libs-libc_compat - contrib-libs-protobuf - contrib-libs-snappy - ZLIB::ZLIB - cpp-deprecated-fgood - cpp-on_disk-st_hash - library-cpp-packedtypes -) -target_proto_messages(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto -) -target_sources(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp -) -target_proto_addincls(library-cpp-microbdb - ./ - ${CMAKE_SOURCE_DIR}/ - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src -) -target_proto_outs(library-cpp-microbdb - --cpp_out=${CMAKE_BINARY_DIR}/ - --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ -) diff --git a/library/cpp/microbdb/CMakeLists.linux-aarch64.txt b/library/cpp/microbdb/CMakeLists.linux-aarch64.txt deleted file mode 100644 index 302dbd03cdc..00000000000 --- a/library/cpp/microbdb/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,57 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -find_package(ZLIB REQUIRED) -get_built_tool_path( - TOOL_protoc_bin - TOOL_protoc_dependency - contrib/tools/protoc/bin - protoc -) -get_built_tool_path( - TOOL_cpp_styleguide_bin - TOOL_cpp_styleguide_dependency - contrib/tools/protoc/plugins/cpp_styleguide - cpp_styleguide -) - -add_library(library-cpp-microbdb) -target_link_libraries(library-cpp-microbdb PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - contrib-libs-fastlz - contrib-libs-libc_compat - contrib-libs-protobuf - contrib-libs-snappy - ZLIB::ZLIB - cpp-deprecated-fgood - cpp-on_disk-st_hash - library-cpp-packedtypes -) -target_proto_messages(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto -) -target_sources(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp -) -target_proto_addincls(library-cpp-microbdb - ./ - ${CMAKE_SOURCE_DIR}/ - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src -) -target_proto_outs(library-cpp-microbdb - --cpp_out=${CMAKE_BINARY_DIR}/ - --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ -) diff --git a/library/cpp/microbdb/CMakeLists.linux-x86_64.txt b/library/cpp/microbdb/CMakeLists.linux-x86_64.txt deleted file mode 100644 index 302dbd03cdc..00000000000 --- a/library/cpp/microbdb/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,57 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -find_package(ZLIB REQUIRED) -get_built_tool_path( - TOOL_protoc_bin - TOOL_protoc_dependency - contrib/tools/protoc/bin - protoc -) -get_built_tool_path( - TOOL_cpp_styleguide_bin - TOOL_cpp_styleguide_dependency - contrib/tools/protoc/plugins/cpp_styleguide - cpp_styleguide -) - -add_library(library-cpp-microbdb) -target_link_libraries(library-cpp-microbdb PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - contrib-libs-fastlz - contrib-libs-libc_compat - contrib-libs-protobuf - contrib-libs-snappy - ZLIB::ZLIB - cpp-deprecated-fgood - cpp-on_disk-st_hash - library-cpp-packedtypes -) -target_proto_messages(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto -) -target_sources(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp -) -target_proto_addincls(library-cpp-microbdb - ./ - ${CMAKE_SOURCE_DIR}/ - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src -) -target_proto_outs(library-cpp-microbdb - --cpp_out=${CMAKE_BINARY_DIR}/ - --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ -) diff --git a/library/cpp/microbdb/CMakeLists.txt b/library/cpp/microbdb/CMakeLists.txt deleted file mode 100644 index f8b31df0c11..00000000000 --- a/library/cpp/microbdb/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() diff --git a/library/cpp/microbdb/CMakeLists.windows-x86_64.txt b/library/cpp/microbdb/CMakeLists.windows-x86_64.txt deleted file mode 100644 index c4d2e9d3a41..00000000000 --- a/library/cpp/microbdb/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,56 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -find_package(ZLIB REQUIRED) -get_built_tool_path( - TOOL_protoc_bin - TOOL_protoc_dependency - contrib/tools/protoc/bin - protoc -) -get_built_tool_path( - TOOL_cpp_styleguide_bin - TOOL_cpp_styleguide_dependency - contrib/tools/protoc/plugins/cpp_styleguide - cpp_styleguide -) - -add_library(library-cpp-microbdb) -target_link_libraries(library-cpp-microbdb PUBLIC - contrib-libs-cxxsupp - yutil - contrib-libs-fastlz - contrib-libs-libc_compat - contrib-libs-protobuf - contrib-libs-snappy - ZLIB::ZLIB - cpp-deprecated-fgood - cpp-on_disk-st_hash - library-cpp-packedtypes -) -target_proto_messages(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto -) -target_sources(library-cpp-microbdb PRIVATE - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp - ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp -) -target_proto_addincls(library-cpp-microbdb - ./ - ${CMAKE_SOURCE_DIR}/ - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src - ${CMAKE_BINARY_DIR} - ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src -) -target_proto_outs(library-cpp-microbdb - --cpp_out=${CMAKE_BINARY_DIR}/ - --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ -) diff --git a/library/cpp/microbdb/align.h b/library/cpp/microbdb/align.h deleted file mode 100644 index 2f8567f1344..00000000000 --- a/library/cpp/microbdb/align.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include <util/system/defaults.h> - -using TDatAlign = int; - -static inline size_t DatFloor(size_t size) { - return (size - 1) & ~(sizeof(TDatAlign) - 1); -} - -static inline size_t DatCeil(size_t size) { - return DatFloor(size) + sizeof(TDatAlign); -} - -static inline void DatSet(void* ptr, size_t size) { - *(TDatAlign*)((char*)ptr + DatFloor(size)) = 0; -} diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h deleted file mode 100644 index f0c9edfa925..00000000000 --- a/library/cpp/microbdb/compressed.h +++ /dev/null @@ -1,520 +0,0 @@ -#pragma once - -#include <util/stream/zlib.h> - -#include "microbdb.h" -#include "safeopen.h" - -class TCompressedInputFileManip: public TInputFileManip { -public: - inline i64 GetLength() const { - return -1; // Some microbdb logic rely on unknown size of compressed files - } - - inline i64 Seek(i64 offset, int whence) { - i64 oldPos = DoGetPosition(); - i64 newPos = offset; - switch (whence) { - case SEEK_CUR: - newPos += oldPos; - [[fallthrough]]; // Complier happy. Please fix it! - case SEEK_SET: - break; - default: - return -1L; - } - if (oldPos > newPos) { - VerifyRandomAccess(); - DoSeek(0, SEEK_SET, IsStreamOpen()); - oldPos = 0; - } - const size_t bufsize = 1 << 12; - char buf[bufsize]; - for (i64 i = oldPos; i < newPos; i += bufsize) - InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); - return newPos; - } - - i64 RealSeek(i64 offset, int whence) { - InputStream.Destroy(); - i64 ret = DoSeek(offset, whence, !!CompressedInput); - if (ret != -1) - DoStreamOpen(DoCreateStream(), true); - return ret; - } - -protected: - IInputStream* CreateStream(const TFile& file) override { - CompressedInput.Reset(new TUnbufferedFileInput(file)); - return DoCreateStream(); - } - inline IInputStream* DoCreateStream() { - return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip); - //return new TLzqDecompress(CompressedInput.Get()); - } - THolder<IInputStream> CompressedInput; -}; - -class TCompressedBufferedInputFileManip: public TCompressedInputFileManip { -protected: - IInputStream* CreateStream(const TFile& file) override { - CompressedInput.Reset(new TFileInput(file, 0x100000)); - return DoCreateStream(); - } -}; - -using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>; -using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>; - -template <class TVal> -struct TGzKey { - ui64 Offset; - TVal Key; - - static const ui32 RecordSig = TVal::RecordSig + 0x50495a47; - - TGzKey() { - } - - TGzKey(ui64 offset, const TVal& key) - : Offset(offset) - , Key(key) - { - } - - size_t SizeOf() const { - if (this) - return sizeof(Offset) + ::SizeOf(&Key); - else { - size_t sizeOfKey = ::SizeOf((TVal*)NULL); - return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0; - } - } -}; - -template <class TVal> -class TInZIndexFile: protected TInDatFileImpl<TGzKey<TVal>> { - typedef TInDatFileImpl<TGzKey<TVal>> TDatFile; - typedef TGzKey<TVal> TGzVal; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - -public: - TInZIndexFile() - : Index0(nullptr) - { - } - - int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { - int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); - if (ret) - return ret; - if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { - TDatFile::Close(); - return MBDB_NO_MEMORY; - } - if (SizeOf((TGzVal*)NULL)) - RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL)); - TDatFile::Next(); - memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); - return 0; - } - - int Close() { - free(Index0); - Index0 = NULL; - return TDatFile::Close(); - } - - inline int GetError() const { - return TDatFile::GetError(); - } - - int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { - assert(IsOpen()); - if (!SizeOf((TVal*)NULL)) - return FindVszKey(akey); - int pageno; - i64 offset; - FindKeyOnPage(pageno, offset, Index0, akey); - TDatPage* page = TPageIter::GotoPage(pageno + 1); - int num_add = (int)offset; - FindKeyOnPage(pageno, offset, page, akey); - return pageno + num_add; - } - - using TDatFile::IsOpen; - - int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { - int pageno; - i64 offset; - FindVszKeyOnPage(pageno, offset, Index0, akey); - TDatPage* page = TPageIter::GotoPage(pageno + 1); - int num_add = (int)offset; - FindVszKeyOnPage(pageno, offset, page, akey); - return pageno + num_add; - } - - i64 FindPage(int pageno) { - if (!SizeOf((TVal*)NULL)) - return FindVszPage(pageno); - int recsize = DatCeil(SizeOf((TGzVal*)NULL)); - TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage); - if (!page) // can happen if pageno is beyond EOF - return -1; - unsigned int localpageno = pageno % RecsOnPage; - if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF - return -1; - TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize); - return v->Offset; - } - - i64 FindVszPage(int pageno) { - TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage)); - TGzVal* prev = cur; - unsigned int n = 0; - while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) { - prev = cur; - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - n++; - } - TDatPage* page = TPageIter::GotoPage(n); - unsigned int num_add = (unsigned int)(prev->Offset); - n = 0; - cur = (TGzVal*)((char*)page + sizeof(TDatPage)); - while (n < page->RecNum && n + num_add < (unsigned int)pageno) { - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - n++; - } - if (n == page->RecNum) // can happen if pageno is beyond EOF - return -1; - return cur->Offset; - } - -protected: - void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) { - int left = 0; - int right = page->RecNum - 1; - int recsize = DatCeil(SizeOf((TGzVal*)NULL)); - while (left < right) { - int middle = (left + right) >> 1; - if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key) - left = middle + 1; - else - right = middle; - } - //borders check (left and right) - pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1; - offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset; - } - - void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) { - TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage)); - ui32 RecordSig = page->RecNum; - i64 tmpoffset = cur->Offset; - for (; RecordSig > 0 && cur->Key < *key; --RecordSig) { - tmpoffset = cur->Offset; - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - } - int idx = page->RecNum - RecordSig - 1; - pageno = (idx >= 0) ? idx : 0; - offset = tmpoffset; - } - - TDatPage* Index0; - int RecsOnPage; -}; - -template <class TKey> -class TCompressedIndexedInputPageFile: public TCompressedInputPageFile { -public: - int GotoPage(int pageno); - -protected: - TInZIndexFile<TKey> KeyFile; -}; - -template <class TVal, class TKey> -class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey, - TInDatFileImpl<TVal, TInputRecordIterator<TVal, - TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> { -}; - -class TCompressedOutputFileManip: public TOutputFileManip { -public: - inline i64 GetLength() const { - return -1; // Some microbdb logic rely on unknown size of compressed files - } - - inline i64 Seek(i64 offset, int whence) { - i64 oldPos = DoGetPosition(); - i64 newPos = offset; - switch (whence) { - case SEEK_CUR: - newPos += oldPos; - [[fallthrough]]; // Compler happy. Please fix it! - case SEEK_SET: - break; - default: - return -1L; - } - if (oldPos > newPos) - return -1L; - - const size_t bufsize = 1 << 12; - char buf[bufsize] = {0}; - for (i64 i = oldPos; i < newPos; i += bufsize) - OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); - return newPos; - } - - i64 RealSeek(i64 offset, int whence) { - OutputStream.Destroy(); - i64 ret = DoSeek(offset, whence, !!CompressedOutput); - if (ret != -1) - DoStreamOpen(DoCreateStream(), true); - return ret; - } - -protected: - IOutputStream* CreateStream(const TFile& file) override { - CompressedOutput.Reset(new TUnbufferedFileOutput(file)); - return DoCreateStream(); - } - inline IOutputStream* DoCreateStream() { - return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1); - } - THolder<IOutputStream> CompressedOutput; -}; - -class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip { -protected: - IOutputStream* CreateStream(const TFile& file) override { - CompressedOutput.Reset(new TUnbufferedFileOutput(file)); - return DoCreateStream(); - } - inline IOutputStream* DoCreateStream() { - return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000); - } -}; - -using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>; -using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>; - -template <class TVal> -class TOutZIndexFile: public TOutDatFileImpl< - TGzKey<TVal>, - TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> { - typedef TOutDatFileImpl< - TGzKey<TVal>, - TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> - TDatFile; - typedef TOutZIndexFile<TVal> TMyType; - typedef TGzKey<TVal> TGzVal; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - -public: - TOutZIndexFile() { - TotalRecNum = 0; - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { - int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); - if (ret) - return ret; - if ((ret = TRecIter::GotoPage(1))) - TDatFile::Close(); - return ret; - } - - int Close() { - TPageIter::Unfreeze(); - if (TRecIter::RecNum) - NextPage(TPageIter::Current()); - int ret = 0; - if (Index0.size() && !(ret = TRecIter::GotoPage(0))) { - typename std::vector<TGzVal>::iterator it, end = Index0.end(); - for (it = Index0.begin(); it != end; ++it) - TRecIter::Push(&*it); - ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); - } - Index0.clear(); - int ret1 = TDatFile::Close(); - return ret ? ret : ret1; - } - -protected: - int TotalRecNum; // should be enough because we have GotoPage(int) - std::vector<TGzVal> Index0; - - void NextPage(const TDatPage* page) { - TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage)); - Index0.push_back(TGzVal(TotalRecNum, rec->Key)); - TotalRecNum += TRecIter::RecNum; - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; - -template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile> -class TOutDirectCompressedFileImpl: public TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> { - typedef TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> - TDatFile; - typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - typedef TGzKey<TKey> TMyKey; - typedef TOutZIndexFile<TKey> TKeyFile; - -protected: - using TDatFile::Tell; - -public: - TOutDirectCompressedFileImpl() { - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) { - char iname[FILENAME_MAX]; - int ret; - if (ipagesize == 0) - ipagesize = pagesize; - - ret = TDatFile::Open(fname, pagesize, 1, 1); - ret = ret ? ret : DatNameToIdx(iname, fname); - ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1); - if (ret) - TDatFile::Close(); - return ret; - } - - int Close() { - if (TRecIter::RecNum) - NextPage(TPageIter::Current()); - int ret = KeyFile.Close(); - int ret1 = TDatFile::Close(); - return ret1 ? ret1 : ret; - } - - int GetError() const { - return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); - } - -protected: - TKeyFile KeyFile; - - void NextPage(const TDatPage* page) { - size_t sz = SizeOf((TMyKey*)NULL); - TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>()); - if (rec) { - rec->Offset = Tell(); - rec->Key = *(TVal*)((char*)page + sizeof(TDatPage)); - KeyFile.ResetDat(); - } - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; - -template <class TKey> -int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) { - if (Error) - return Error; - - Eof = 0; - - i64 offset = KeyFile.FindPage(pageno); - if (!offset) - return Error = MBDB_BAD_FILE_SIZE; - - if (offset != FileManip.RealSeek(offset, SEEK_SET)) - Error = MBDB_BAD_FILE_SIZE; - - return Error; -} - -template <typename TVal> -class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> { -public: - TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1) - : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes) - { - } -}; - -template <typename TVal> -class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> { -public: - TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes) - { - } -}; - -template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile> -class TOutDirectCompressedFile: protected TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> { - typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase; - -public: - TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0) - : Name(strdup(name)) - , PageSize(pagesize) - , IdxPageSize(ipagesize) - { - } - - ~TOutDirectCompressedFile() { - Close(); - free(Name); - Name = NULL; - } - - void Open(const char* fname) { - int ret = TBase::Open(fname, PageSize, IdxPageSize); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); - free(Name); - Name = strdup(fname); - } - - void Close() { - int ret; - if ((ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); - } - - const char* GetName() const { - return Name; - } - - using TBase::Freeze; - using TBase::Push; - using TBase::Reserve; - using TBase::Unfreeze; - -protected: - char* Name; - size_t PageSize, IdxPageSize; -}; - -class TCompressedInterFileTypes { -public: - typedef TCompressedBufferedOutputPageFile TOutPageFile; - typedef TCompressedBufferedInputPageFile TInPageFile; -}; diff --git a/library/cpp/microbdb/extinfo.h b/library/cpp/microbdb/extinfo.h deleted file mode 100644 index c8389e783cd..00000000000 --- a/library/cpp/microbdb/extinfo.h +++ /dev/null @@ -1,127 +0,0 @@ -#pragma once - -#include "header.h" - -#include <library/cpp/packedtypes/longs.h> - -#include <util/generic/typetraits.h> - -#include <library/cpp/microbdb/noextinfo.pb.h> - -inline bool operator<(const TNoExtInfo&, const TNoExtInfo&) { - return false; -} - -namespace NMicroBDB { - Y_HAS_MEMBER(TExtInfo); - - template <class, bool> - struct TSelectExtInfo; - - template <class T> - struct TSelectExtInfo<T, false> { - typedef TNoExtInfo TExtInfo; - }; - - template <class T> - struct TSelectExtInfo<T, true> { - typedef typename T::TExtInfo TExtInfo; - }; - - template <class T> - class TExtInfoType { - public: - static const bool Exists = THasTExtInfo<T>::value; - typedef typename TSelectExtInfo<T, Exists>::TExtInfo TResult; - }; - - Y_HAS_MEMBER(MakeExtKey); - - template <class, class, bool> - struct TSelectMakeExtKey; - - template <class TVal, class TKey> - struct TSelectMakeExtKey<TVal, TKey, false> { - static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult*, const TVal* from, const typename TExtInfoType<TVal>::TResult*) { - *to = *from; - } - }; - - template <class TVal, class TKey> - struct TSelectMakeExtKey<TVal, TKey, true> { - static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { - TVal::MakeExtKey(to, toExt, from, fromExt); - } - }; - - template <typename T> - inline size_t SizeOfExt(const T* rec, size_t* /*out*/ extLenSize = nullptr, size_t* /*out*/ extSize = nullptr) { - if (!TExtInfoType<T>::Exists) { - if (extLenSize) - *extLenSize = 0; - if (extSize) - *extSize = 0; - return SizeOf(rec); - } else { - size_t sz = SizeOf(rec); - i64 l; - int els = in_long(l, (const char*)rec + sz); - if (extLenSize) - *extLenSize = static_cast<size_t>(els); - if (extSize) - *extSize = static_cast<size_t>(l); - return sz; - } - } - - template <class T> - bool GetExtInfo(const T* rec, typename TExtInfoType<T>::TResult* extInfo) { - Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); - if (!rec) - return false; - size_t els; - size_t es; - size_t s = SizeOfExt(rec, &els, &es); - const ui8* raw = (const ui8*)rec + s + els; - return extInfo->ParseFromArray(raw, es); - } - - template <class T> - const ui8* GetExtInfoRaw(const T* rec, size_t* len) { - Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records"); - if (!rec) { - *len = 0; - return nullptr; - } - size_t els; - size_t es; - size_t s = SizeOfExt(rec, &els, &es); - *len = els + es; - return (const ui8*)rec + s; - } - - // Compares serialized extInfo (e.g. for stable sort) - template <class T> - int CompareExtInfo(const T* a, const T* b) { - Y_VERIFY(TExtInfoType<T>::Exists, "CompareExtInfo should only be used with extended records"); - size_t elsA, esA; - size_t elsB, esB; - SizeOfExt(a, &elsA, &esA); - SizeOfExt(a, &elsB, &esB); - if (esA != esB) - return esA - esB; - else - return memcmp((const ui8*)a + elsA, (const ui8*)b + elsB, esA); - } - -} - -using NMicroBDB::TExtInfoType; - -template <class TVal, class TKey> -struct TMakeExtKey { - static const bool Exists = NMicroBDB::THasMakeExtKey<TVal>::value; - static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) { - NMicroBDB::TSelectMakeExtKey<TVal, TKey, Exists>::Make(to, toExt, from, fromExt); - } -}; diff --git a/library/cpp/microbdb/file.cpp b/library/cpp/microbdb/file.cpp deleted file mode 100644 index 599a7301a0f..00000000000 --- a/library/cpp/microbdb/file.cpp +++ /dev/null @@ -1,220 +0,0 @@ -#include "file.h" - -#include <fcntl.h> -#include <errno.h> -#include <sys/stat.h> - -#ifdef _win32_ -#define S_ISREG(x) !!(x & S_IFREG) -#endif - -TFileManipBase::TFileManipBase() - : FileBased(true) -{ -} - -i64 TFileManipBase::DoSeek(i64 offset, int whence, bool isStreamOpen) { - if (!isStreamOpen) - return -1; - VerifyRandomAccess(); - return File.Seek(offset, (SeekDir)whence); -} - -int TFileManipBase::DoFileOpen(const TFile& file) { - File = file; - SetFileBased(IsFileBased()); - return (File.IsOpen()) ? 0 : MBDB_OPEN_ERROR; -} - -int TFileManipBase::DoFileClose() { - if (File.IsOpen()) { - File.Close(); - return MBDB_ALREADY_INITIALIZED; - } - return 0; -} - -int TFileManipBase::IsFileBased() const { - bool fileBased = true; -#if defined(_win_) -#elif defined(_unix_) - FHANDLE h = File.GetHandle(); - struct stat sb; - fileBased = false; - if (h != INVALID_FHANDLE && !::fstat(h, &sb) && S_ISREG(sb.st_mode)) { - fileBased = true; - } -#else -#error -#endif - return fileBased; -} - -TInputFileManip::TInputFileManip() - : InputStream(nullptr) -{ -} - -int TInputFileManip::Open(const char* fname, bool direct) { - int ret; - return (ret = DoClose()) ? ret : DoStreamOpen(TFile(fname, RdOnly | (direct ? DirectAligned : EOpenMode()))); -} - -int TInputFileManip::Open(IInputStream& input) { - int ret; - return (ret = DoClose()) ? ret : DoStreamOpen(&input); -} - -int TInputFileManip::Open(TAutoPtr<IInputStream> input) { - int ret; - return (ret = DoClose()) ? ret : DoStreamOpen(input.Release()); -} - -int TInputFileManip::Init(const TFile& file) { - int ret; - if (ret = DoClose()) - return ret; - DoStreamOpen(file); - return 0; -} - -int TInputFileManip::Close() { - DoClose(); - return 0; -} - -ssize_t TInputFileManip::Read(void* buf, unsigned len) { - if (!IsStreamOpen()) - return -1; - return InputStream->Load(buf, len); -} - -IInputStream* TInputFileManip::CreateStream(const TFile& file) { - return new TUnbufferedFileInput(file); -} - -TMappedInputPageFile::TMappedInputPageFile() - : Pagesize(0) - , Error(0) - , Pagenum(0) - , Recordsig(0) - , Open(false) -{ - Term(); -} - -TMappedInputPageFile::~TMappedInputPageFile() { - Term(); -} - -int TMappedInputPageFile::Init(const char* fname, ui32 recsig, ui32* gotRecordSig, bool) { - Mappedfile.init(fname); - Open = true; - - TDatMetaPage* meta = (TDatMetaPage*)Mappedfile.getData(); - if (gotRecordSig) - *gotRecordSig = meta->RecordSig; - - if (meta->MetaSig != METASIG) - Error = MBDB_BAD_METAPAGE; - else if (meta->RecordSig != recsig) - Error = MBDB_BAD_RECORDSIG; - - if (Error) { - Mappedfile.term(); - return Error; - } - - size_t fsize = Mappedfile.getSize(); - if (fsize < METASIZE) - return Error = MBDB_BAD_FILE_SIZE; - fsize -= METASIZE; - if (fsize % meta->PageSize) - return Error = MBDB_BAD_FILE_SIZE; - Pagenum = (int)(fsize / meta->PageSize); - Pagesize = meta->PageSize; - Recordsig = meta->RecordSig; - Error = 0; - return Error; -} - -int TMappedInputPageFile::Term() { - Mappedfile.term(); - Open = false; - return 0; -} - -TOutputFileManip::TOutputFileManip() - : OutputStream(nullptr) -{ -} - -int TOutputFileManip::Open(const char* fname, EOpenMode mode) { - if (IsStreamOpen()) { - return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip - } - - try { - if (unlink(fname) && errno != ENOENT) { - if (strncmp(fname, "/dev/std", 8)) - return MBDB_OPEN_ERROR; - } - TFile file(fname, mode); - DoStreamOpen(file); - } catch (const TFileError&) { - return MBDB_OPEN_ERROR; - } - return 0; -} - -int TOutputFileManip::Open(IOutputStream& output) { - if (IsStreamOpen()) - return MBDB_ALREADY_INITIALIZED; - DoStreamOpen(&output); - return 0; -} - -int TOutputFileManip::Open(TAutoPtr<IOutputStream> output) { - if (IsStreamOpen()) - return MBDB_ALREADY_INITIALIZED; - DoStreamOpen(output.Release()); - return 0; -} - -int TOutputFileManip::Init(const TFile& file) { - if (IsStreamOpen()) - return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip - DoStreamOpen(file); - return 0; -} - -int TOutputFileManip::Rotate(const char* newfname) { - if (!IsStreamOpen()) { - return MBDB_NOT_INITIALIZED; - } - - try { - TFile file(newfname, WrOnly | OpenAlways | TruncExisting | ARW | AWOther); - DoClose(); - DoStreamOpen(file); - } catch (const TFileError&) { - return MBDB_OPEN_ERROR; - } - return 0; -} - -int TOutputFileManip::Close() { - DoClose(); - return 0; -} - -int TOutputFileManip::Write(const void* buf, unsigned len) { - if (!IsStreamOpen()) - return -1; - OutputStream->Write(buf, len); - return len; -} - -IOutputStream* TOutputFileManip::CreateStream(const TFile& file) { - return new TUnbufferedFileOutput(file); -} diff --git a/library/cpp/microbdb/file.h b/library/cpp/microbdb/file.h deleted file mode 100644 index f7c78183759..00000000000 --- a/library/cpp/microbdb/file.h +++ /dev/null @@ -1,225 +0,0 @@ -#pragma once - -#include "header.h" - -#include <library/cpp/deprecated/mapped_file/mapped_file.h> - -#include <util/generic/noncopyable.h> -#include <util/stream/file.h> -#include <util/system/filemap.h> - -#define FS_BLOCK_SIZE 512 - -class TFileManipBase { -protected: - TFileManipBase(); - - virtual ~TFileManipBase() { - } - - i64 DoSeek(i64 offset, int whence, bool isStreamOpen); - - int DoFileOpen(const TFile& file); - - int DoFileClose(); - - int IsFileBased() const; - - inline void SetFileBased(bool fileBased) { - FileBased = fileBased; - } - - inline i64 DoGetPosition() const { - Y_ASSERT(FileBased); - return File.GetPosition(); - } - - inline i64 DoGetLength() const { - return (FileBased) ? File.GetLength() : -1; - } - - inline void VerifyRandomAccess() const { - Y_VERIFY(FileBased, "non-file stream can not be accessed randomly"); - } - - inline i64 GetPosition() const { - return (i64)File.GetPosition(); - } - -private: - TFile File; - bool FileBased; -}; - -class TInputFileManip: public TFileManipBase { -public: - using TFileManipBase::GetPosition; - - TInputFileManip(); - - int Open(const char* fname, bool direct = false); - - int Open(IInputStream& input); - - int Open(TAutoPtr<IInputStream> input); - - int Init(const TFile& file); - - int Close(); - - ssize_t Read(void* buf, unsigned len); - - inline bool IsOpen() const { - return IsStreamOpen(); - } - - inline i64 GetLength() const { - return DoGetLength(); - } - - inline i64 Seek(i64 offset, int whence) { - return DoSeek(offset, whence, IsStreamOpen()); - } - - inline i64 RealSeek(i64 offset, int whence) { - return Seek(offset, whence); - } - -protected: - inline bool IsStreamOpen() const { - return !!InputStream; - } - - inline int DoStreamOpen(IInputStream* input, bool fileBased = false) { - InputStream.Reset(input); - SetFileBased(fileBased); - return 0; - } - - inline int DoStreamOpen(const TFile& file) { - int ret; - return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), IsFileBased()); - } - - virtual IInputStream* CreateStream(const TFile& file); - - inline bool DoClose() { - if (IsStreamOpen()) { - InputStream.Destroy(); - return DoFileClose(); - } - return 0; - } - - THolder<IInputStream> InputStream; -}; - -class TMappedInputPageFile: private TNonCopyable { -public: - TMappedInputPageFile(); - - ~TMappedInputPageFile(); - - inline int GetError() const { - return Error; - } - - inline size_t GetPageSize() const { - return Pagesize; - } - - inline int GetLastPage() const { - return Pagenum; - } - - inline ui32 GetRecordSig() const { - return Recordsig; - } - - inline bool IsOpen() const { - return Open; - } - - inline char* GetData() const { - return Open ? (char*)Mappedfile.getData() : nullptr; - } - - inline size_t GetSize() const { - return Open ? Mappedfile.getSize() : 0; - } - -protected: - int Init(const char* fname, ui32 recsig, ui32* gotRecordSig = nullptr, bool direct = false); - - int Term(); - - TMappedFile Mappedfile; - size_t Pagesize; - int Error; - int Pagenum; - ui32 Recordsig; - bool Open; -}; - -class TOutputFileManip: public TFileManipBase { -public: - TOutputFileManip(); - - int Open(const char* fname, EOpenMode mode = WrOnly | CreateAlways | ARW | AWOther); - - int Open(IOutputStream& output); - - int Open(TAutoPtr<IOutputStream> output); - - int Init(const TFile& file); - - int Rotate(const char* newfname); - - int Write(const void* buf, unsigned len); - - int Close(); - - inline bool IsOpen() const { - return IsStreamOpen(); - } - - inline i64 GetLength() const { - return DoGetLength(); - } - - inline i64 Seek(i64 offset, int whence) { - return DoSeek(offset, whence, IsStreamOpen()); - } - - inline i64 RealSeek(i64 offset, int whence) { - return Seek(offset, whence); - } - -protected: - inline bool IsStreamOpen() const { - return !!OutputStream; - } - - inline int DoStreamOpen(IOutputStream* output, bool fileBased = false) { - OutputStream.Reset(output); - SetFileBased(fileBased); - return 0; - } - - inline int DoStreamOpen(const TFile& file) { - int ret; - return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), true); - } - - virtual IOutputStream* CreateStream(const TFile& file); - - inline bool DoClose() { - if (IsStreamOpen()) { - OutputStream.Destroy(); - return DoFileClose(); - } - return 0; - } - - THolder<IOutputStream> OutputStream; -}; diff --git a/library/cpp/microbdb/hashes.h b/library/cpp/microbdb/hashes.h deleted file mode 100644 index bfd113c3ba4..00000000000 --- a/library/cpp/microbdb/hashes.h +++ /dev/null @@ -1,250 +0,0 @@ -#pragma once - -#include <library/cpp/on_disk/st_hash/static_hash.h> -#include <util/system/sysstat.h> -#include <util/stream/mem.h> -#include <util/string/printf.h> -#include <library/cpp/deprecated/fgood/fgood.h> - -#include "safeopen.h" - -/** This file currently implements creation of mappable read-only hash file. - Basic usage of these "static hashes" is defined in util/static_hash.h (see docs there). - Additional useful wrappers are available in util/static_hash_map.h - - There are two ways to create mappable hash file: - - A) Fill an THashMap/set structure in RAM, then dump it to disk. - This is usually done by save_hash_to_file* functions defined in static_hash.h - (see description in static_hash.h). - - B) Prepare all data using external sorter, then create hash file straight on disk. - This approach is necessary when there isn't enough RAM to hold entire original THashMap. - Implemented in this file as TStaticHashBuilder class. - - Current implementation's major drawback is that the size of the hash must be estimated - before the hash is built (bucketCount), which is not always possible. - Separate implementation with two sort passes is yet to be done. - - Another problem is that maximum stored size of the element (maxRecSize) must also be - known in advance, because we use TDatSorterMemo, etc. - */ - -template <class SizeType> -struct TSthashTmpRec { - SizeType HashVal; - SizeType RecSize; - char Buf[1]; - size_t SizeOf() const { - return &Buf[RecSize] - (char*)this; - } - bool operator<(const TSthashTmpRec& than) const { - return HashVal < than.HashVal; - } - static const ui32 RecordSig = 20100124 + sizeof(SizeType) - 4; -}; - -template <typename T> -struct TReplaceMerger { - T operator()(const T& oldRecord, const T& newRecord) const { - Y_UNUSED(oldRecord); - return newRecord; - } -}; - -/** TStaticHashBuilder template parameters: - HashType - THashMap map/set type for which we construct corresponding mappable hash; - SizeType - type used to store offsets and length in resulting hash; - MergerType - type of object to process records with equal key (see TReplaceMerger for example); - */ - -template <class HashType, class SizeType, class MergerType = TReplaceMerger<typename HashType::mapped_type>> -struct TStaticHashBuilder { - const size_t SrtIOPageSz; - const size_t WrBufSz; - typedef TSthashTmpRec<SizeType> TIoRec; - typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, SizeType> TKeySaver; - typedef typename HashType::value_type TValueType; - typedef typename HashType::mapped_type TMappedType; - typedef typename HashType::key_type TKeyType; - - TDatSorterMemo<TIoRec, TCompareByLess> Srt; - TBuffer IoRec, CurrentBlockRecs; - TKeySaver KeySaver; - typename HashType::hasher Hasher; - typename HashType::key_equal Equals; - MergerType merger; - TString HashFileName; - TString OurTmpDir; - size_t BucketCount; - int FreeBits; - - // memSz is the Sorter buffer size; - // maxRecSize is the maximum size (as reported by size_for_st) of our record(s) - TStaticHashBuilder(size_t memSz, size_t maxRecSize) - : SrtIOPageSz((maxRecSize * 16 + 65535) & ~size_t(65535)) - , WrBufSz(memSz / 16 >= SrtIOPageSz ? memSz / 16 : SrtIOPageSz) - , Srt("unused", memSz, SrtIOPageSz, WrBufSz, 0) - , IoRec(sizeof(TIoRec) + maxRecSize) - , CurrentBlockRecs(sizeof(TIoRec) + maxRecSize) - , BucketCount(0) - , FreeBits(0) - { - } - - ~TStaticHashBuilder() { - Close(); - } - - // if tmpDir is supplied, it must exist; - // bucketCount should be HashBucketCount() of the (estimated) element count - void Open(const char* fname, size_t bucketCount, const char* tmpDir = nullptr) { - if (!tmpDir) - tmpDir = ~(OurTmpDir = Sprintf("%s.temp", fname)); - Mkdir(tmpDir, MODE0775); - Srt.Open(tmpDir); - HashFileName = fname; - BucketCount = bucketCount; - int bitCount = 0; - while (((size_t)1 << bitCount) <= BucketCount && bitCount < int(8 * sizeof(size_t))) - ++bitCount; - FreeBits = 8 * sizeof(size_t) - bitCount; - } - - void Push(const TValueType& rec) { - TIoRec* ioRec = MakeIoRec(rec); - Srt.Push(ioRec); - } - TIoRec* MakeIoRec(const TValueType& rec) { - TIoRec* ioRec = (TIoRec*)IoRec.Data(); - size_t mask = (1 << FreeBits) - 1; - size_t hash = Hasher(rec.first); - ioRec->HashVal = ((hash % BucketCount) << FreeBits) + ((hash / BucketCount) & mask); - - TMemoryOutput output(ioRec->Buf, IoRec.Capacity() - offsetof(TIoRec, Buf)); - KeySaver.SaveRecord(&output, rec); - ioRec->RecSize = output.Buf() - ioRec->Buf; - return ioRec; - } - - bool Merge(TVector<std::pair<TKeyType, TMappedType>>& records, size_t newRecordSize) { - TSthashIterator<const TKeyType, const TMappedType, typename HashType::hasher, - typename HashType::key_equal> - newPtr(CurrentBlockRecs.End() - newRecordSize); - for (size_t i = 0; i < records.size(); ++i) { - if (newPtr.KeyEquals(Equals, records[i].first)) { - TMappedType oldValue = records[i].second; - TMappedType newValue = newPtr.Value(); - newValue = merger(oldValue, newValue); - records[i].second = newValue; - return true; - } - } - records.push_back(std::make_pair(newPtr.Key(), newPtr.Value())); - return false; - } - - void PutRecord(const char* buf, size_t rec_size, TFILEPtr& f, SizeType& cur_off) { - f.fsput(buf, rec_size); - cur_off += rec_size; - } - - void Finish() { - Srt.Sort(); - // We use variant 1. - // Variant 1: read sorter once, write records, fseeks to write buckets - // (this doesn't allow fname to be stdout) - // Variant 2: read sorter (probably temp. file) twice: write buckets, then write records - // (this allows fname to be stdout but seems to be longer) - TFILEPtr f(HashFileName, "wb"); - setvbuf(f, nullptr, _IOFBF, WrBufSz); - TVector<SizeType> bucketsBuf(WrBufSz, 0); - // prepare header (note: this code must be unified with save_stl.h) - typedef sthashtable_nvm_sv<typename HashType::hasher, typename HashType::key_equal, SizeType> sv_type; - sv_type sv = {Hasher, Equals, BucketCount, 0, 0}; - // to do: m.b. use just the size of corresponding object? - SizeType cur_off = sizeof(sv_type) + - (sv.num_buckets + 1) * sizeof(SizeType); - SizeType bkt_wroff = sizeof(sv_type), bkt_bufpos = 0, prev_bkt = 0, prev_hash = (SizeType)-1; - bucketsBuf[bkt_bufpos++] = cur_off; - // if might me better to write many zeroes here - f.seek(cur_off, SEEK_SET); - TVector<std::pair<TKeyType, TMappedType>> currentBlock; - bool emptyFile = true; - size_t prevRecSize = 0; - // seek forward - while (true) { - const TIoRec* rec = Srt.Next(); - if (currentBlock.empty() && !emptyFile) { - if (rec && prev_hash == rec->HashVal) { - Merge(currentBlock, prevRecSize); - } else { - // if there is only one record with this hash, don't recode it, just write - PutRecord(CurrentBlockRecs.Data(), prevRecSize, f, cur_off); - sv.num_elements++; - } - } - if (!rec || prev_hash != rec->HashVal) { - // write buckets table - for (size_t i = 0; i < currentBlock.size(); ++i) { - TIoRec* ioRec = MakeIoRec(TValueType(currentBlock[i])); - PutRecord(ioRec->Buf, ioRec->RecSize, f, cur_off); - } - sv.num_elements += currentBlock.size(); - currentBlock.clear(); - CurrentBlockRecs.Clear(); - if (rec) { - prev_hash = rec->HashVal; - } - } - // note: prev_bkt's semantics here is 'cur_bkt - 1', thus we are actually cycling - // until cur_bkt == rec->HashVal *inclusively* - while (!rec || prev_bkt != (rec->HashVal >> FreeBits)) { - bucketsBuf[bkt_bufpos++] = cur_off; - if (bkt_bufpos == bucketsBuf.size()) { - f.seek(bkt_wroff, SEEK_SET); - size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); - if (f.write(bucketsBuf.begin(), 1, sz) != sz) - throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; - bkt_wroff += sz; - bkt_bufpos = 0; - f.seek(cur_off, SEEK_SET); - } - prev_bkt++; - if (!rec) { - break; - } - assert(prev_bkt < BucketCount); - } - if (!rec) { - break; - } - emptyFile = false; - CurrentBlockRecs.Append(rec->Buf, rec->RecSize); - if (!currentBlock.empty()) { - Merge(currentBlock, rec->RecSize); - } else { - prevRecSize = rec->RecSize; - } - } - // finish buckets table - f.seek(bkt_wroff, SEEK_SET); - size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]); - if (sz && f.write(bucketsBuf.begin(), 1, sz) != sz) - throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName; - bkt_wroff += sz; - for (; prev_bkt < BucketCount; prev_bkt++) - f.fput(cur_off); - // finally write header - sv.data_end_off = cur_off; - f.seek(0, SEEK_SET); - f.fput(sv); - f.close(); - } - - void Close() { - Srt.Close(); - if (+OurTmpDir) - rmdir(~OurTmpDir); - } -}; diff --git a/library/cpp/microbdb/header.cpp b/library/cpp/microbdb/header.cpp deleted file mode 100644 index f4511d6fb62..00000000000 --- a/library/cpp/microbdb/header.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "header.h" - -#include <util/stream/output.h> -#include <util/stream/format.h> - -TString ToString(EMbdbErrors error) { - TString ret; - switch (error) { - case MBDB_ALREADY_INITIALIZED: - ret = "already initialized"; - break; - case MBDB_NOT_INITIALIZED: - ret = "not initialized"; - break; - case MBDB_BAD_DESCRIPTOR: - ret = "bad descriptor"; - break; - case MBDB_OPEN_ERROR: - ret = "open error"; - break; - case MBDB_READ_ERROR: - ret = "read error"; - break; - case MBDB_WRITE_ERROR: - ret = "write error"; - break; - case MBDB_CLOSE_ERROR: - ret = "close error"; - break; - case MBDB_EXPECTED_EOF: - ret = "expected eof"; - break; - case MBDB_UNEXPECTED_EOF: - ret = "unxepected eof"; - break; - case MBDB_BAD_FILENAME: - ret = "bad filename"; - break; - case MBDB_BAD_METAPAGE: - ret = "bad metapage"; - break; - case MBDB_BAD_RECORDSIG: - ret = "bad recordsig"; - break; - case MBDB_BAD_FILE_SIZE: - ret = "bad file size"; - break; - case MBDB_BAD_PAGESIG: - ret = "bad pagesig"; - break; - case MBDB_BAD_PAGESIZE: - ret = "bad pagesize"; - break; - case MBDB_BAD_PARM: - ret = "bad parm"; - break; - case MBDB_BAD_SYNC: - ret = "bad sync"; - break; - case MBDB_PAGE_OVERFLOW: - ret = "page overflow"; - break; - case MBDB_NO_MEMORY: - ret = "no memory"; - break; - case MBDB_MEMORY_LEAK: - ret = "memory leak"; - break; - case MBDB_NOT_SUPPORTED: - ret = "not supported"; - break; - default: - ret = "unknown"; - break; - } - return ret; -} - -TString ErrorMessage(int error, const TString& text, const TString& path, ui32 recordSig, ui32 gotRecordSig) { - TStringStream str; - str << text; - if (path.size()) - str << " '" << path << "'"; - str << ": " << ToString(static_cast<EMbdbErrors>(error)); - if (recordSig && (!gotRecordSig || recordSig != gotRecordSig)) - str << ". Expected RecordSig: " << Hex(recordSig, HF_ADDX); - if (recordSig && gotRecordSig && recordSig != gotRecordSig) - str << ", got: " << Hex(gotRecordSig, HF_ADDX); - str << ". Last system error text: " << LastSystemErrorText(); - return str.Str(); -} diff --git a/library/cpp/microbdb/header.h b/library/cpp/microbdb/header.h deleted file mode 100644 index 0951d610ea7..00000000000 --- a/library/cpp/microbdb/header.h +++ /dev/null @@ -1,159 +0,0 @@ -#pragma once - -#include <util/system/defaults.h> -#include <util/generic/typetraits.h> -#include <util/generic/string.h> -#include <util/str_stl.h> - -#include <stdio.h> - -#define METASIZE (1u << 12) -#define METASIG 0x12345678u -#define PAGESIG 0x87654321u - -enum EMbdbErrors { - MBDB_ALREADY_INITIALIZED = 200, - MBDB_NOT_INITIALIZED = 201, - MBDB_BAD_DESCRIPTOR = 202, - MBDB_OPEN_ERROR = 203, - MBDB_READ_ERROR = 204, - MBDB_WRITE_ERROR = 205, - MBDB_CLOSE_ERROR = 206, - MBDB_EXPECTED_EOF = 207, - MBDB_UNEXPECTED_EOF = 208, - MBDB_BAD_FILENAME = 209, - MBDB_BAD_METAPAGE = 210, - MBDB_BAD_RECORDSIG = 211, - MBDB_BAD_FILE_SIZE = 212, - MBDB_BAD_PAGESIG = 213, - MBDB_BAD_PAGESIZE = 214, - MBDB_BAD_PARM = 215, - MBDB_BAD_SYNC = 216, - MBDB_PAGE_OVERFLOW = 217, - MBDB_NO_MEMORY = 218, - MBDB_MEMORY_LEAK = 219, - MBDB_NOT_SUPPORTED = 220 -}; - -TString ToString(EMbdbErrors error); -TString ErrorMessage(int error, const TString& text, const TString& path = TString(), ui32 recordSig = 0, ui32 gotRecordSig = 0); - -enum EPageFormat { - MBDB_FORMAT_RAW = 0, - MBDB_FORMAT_COMPRESSED = 1, - MBDB_FORMAT_NULL = 255 -}; - -enum ECompressionAlgorithm { - MBDB_COMPRESSION_ZLIB = 1, - MBDB_COMPRESSION_FASTLZ = 2, - MBDB_COMPRESSION_SNAPPY = 3 -}; - -struct TDatMetaPage { - ui32 MetaSig; - ui32 RecordSig; - ui32 PageSize; -}; - -struct TDatPage { - ui32 RecNum; //!< number of records on this page - ui32 PageSig; - ui32 Format : 2; //!< one of EPageFormat - ui32 Reserved : 30; -}; - -/// Additional page header with compression info -struct TCompressedPage { - ui32 BlockCount; - ui32 Algorithm : 4; - ui32 Version : 4; - ui32 Reserved : 24; -}; - -namespace NMicroBDB { - /// Header of compressed block - struct TCompressedHeader { - ui32 Compressed; - ui32 Original; /// original size of block - ui32 Count; /// number of records in block - ui32 Reserved; - }; - - Y_HAS_MEMBER(AssertValid); - - template <typename T, bool TVal> - struct TAssertValid { - void operator()(const T*) { - } - }; - - template <typename T> - struct TAssertValid<T, true> { - void operator()(const T* rec) { - return rec->AssertValid(); - } - }; - - template <typename T> - void AssertValid(const T* rec) { - return NMicroBDB::TAssertValid<T, NMicroBDB::THasAssertValid<T>::value>()(rec); - } - - Y_HAS_MEMBER(SizeOf); - - template <typename T, bool TVal> - struct TGetSizeOf; - - template <typename T> - struct TGetSizeOf<T, true> { - size_t operator()(const T* rec) { - return rec->SizeOf(); - } - }; - - template <typename T> - struct TGetSizeOf<T, false> { - size_t operator()(const T*) { - return sizeof(T); - } - }; - - inline char* GetFirstRecord(const TDatPage* page) { - switch (page->Format) { - case MBDB_FORMAT_RAW: - return (char*)page + sizeof(TDatPage); - case MBDB_FORMAT_COMPRESSED: - // Первая запись на сжатой странице сохраняется несжатой - // сразу же после всех заголовков. - // Алгоритм сохранения смотреть в TOutputRecordIterator::FlushBuffer - return (char*)page + sizeof(TDatPage) + sizeof(TCompressedPage) + sizeof(NMicroBDB::TCompressedHeader); - } - return (char*)nullptr; - } -} - -template <typename T> -size_t SizeOf(const T* rec) { - return NMicroBDB::TGetSizeOf<T, NMicroBDB::THasSizeOf<T>::value>()(rec); -} - -template <typename T> -size_t MaxSizeOf() { - return sizeof(T); -} - -static inline int DatNameToIdx(char iname[/*FILENAME_MAX*/], const char* dname) { - if (!dname || !*dname) - return MBDB_BAD_FILENAME; - const char* ptr; - if (!(ptr = strrchr(dname, '/'))) - ptr = dname; - if (!(ptr = strrchr(ptr, '.'))) - ptr = strchr(dname, 0); - if (ptr - dname > FILENAME_MAX - 5) - return MBDB_BAD_FILENAME; - memcpy(iname, dname, ptr - dname); - strcpy(iname + (ptr - dname), ".idx"); - return 0; -} diff --git a/library/cpp/microbdb/heap.h b/library/cpp/microbdb/heap.h deleted file mode 100644 index ef5a53534cb..00000000000 --- a/library/cpp/microbdb/heap.h +++ /dev/null @@ -1,143 +0,0 @@ -#pragma once - -#include "header.h" -#include "extinfo.h" - -#include <util/generic/vector.h> - -#include <errno.h> - -/////////////////////////////////////////////////////////////////////////////// - -/// Default comparator -template <class TVal> -struct TCompareByLess { - inline bool operator()(const TVal* a, const TVal* b) const { - return TLess<TVal>()(*a, *b); - } -}; - -/////////////////////////////////////////////////////////////////////////////// - -template <class TVal, class TIterator, class TCompare = TCompareByLess<TVal>> -class THeapIter { -public: - int Init(TIterator** iters, int count) { - Term(); - if (!count) - return 0; - if (!(Heap = (TIterator**)malloc(count * sizeof(TIterator*)))) - return ENOMEM; - - Count = count; - count = 0; - while (count < Count) - if (count && !(*iters)->Next()) { //here first TIterator is NOT initialized! - Count--; - iters++; - } else { - Heap[count++] = *iters++; - } - count = Count / 2; - while (--count > 0) //Heap[0] is not changed! - Sift(count, Count); //do not try to replace this code by make_heap - return 0; - } - - int Init(TIterator* iters, int count) { - TVector<TIterator*> a(count); - for (int i = 0; i < count; ++i) - a[i] = &iters[i]; - return Init(&a[0], count); - } - - THeapIter() - : Heap(nullptr) - , Count(0) - { - } - - THeapIter(TIterator* a, TIterator* b) - : Heap(nullptr) - , Count(0) - { - TIterator* arr[] = {a, b}; - if (Init(arr, 2)) - ythrow yexception() << "can't Init THeapIter"; - } - - THeapIter(TVector<TIterator>& v) - : Heap(nullptr) - , Count(0) - { - if (Init(&v[0], v.size())) { - ythrow yexception() << "can't Init THeapIter"; - } - } - - ~THeapIter() { - Term(); - } - - inline const TVal* Current() const { - if (!Count) - return nullptr; - return (*Heap)->Current(); - } - - inline const TIterator* CurrentIter() const { - return *Heap; - } - - //for ends of last file will use Heap[0] = Heap[0] ! and - //returns Current of eof so Current of eof MUST return NULL - //possible this is bug and need fixing - const TVal* Next() { - if (!Count) - return nullptr; - if (!(*Heap)->Next()) //on first call unitialized first TIterator - *Heap = Heap[--Count]; //will be correctly initialized - - if (Count == 2) { - if (TCompare()(Heap[1]->Current(), Heap[0]->Current())) - DoSwap(Heap[1], Heap[0]); - } else - Sift(0, Count); - - return Current(); - } - - inline bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { - return (*Heap)->GetExtInfo(extInfo); - } - - inline const ui8* GetExtInfoRaw(size_t* len) const { - return (*Heap)->GetExtInfoRaw(len); - } - - void Term() { - ::free(Heap); - Heap = nullptr; - Count = 0; - } - -protected: - void Sift(int node, int end) { - TIterator* x = Heap[node]; - int son; - for (son = 2 * node + 1; son < end; node = son, son = 2 * node + 1) { - if (son < (end - 1) && TCompare()(Heap[son + 1]->Current(), Heap[son]->Current())) - son++; - if (TCompare()(Heap[son]->Current(), x->Current())) - Heap[node] = Heap[son]; - else - break; - } - Heap[node] = x; - } - - TIterator** Heap; - int Count; -}; - -/////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/microbdb/input.h b/library/cpp/microbdb/input.h deleted file mode 100644 index a214ba6e8ae..00000000000 --- a/library/cpp/microbdb/input.h +++ /dev/null @@ -1,1027 +0,0 @@ -#pragma once - -#include "header.h" -#include "file.h" -#include "reader.h" - -#include <util/system/maxlen.h> -#include <util/system/event.h> -#include <util/system/thread.h> - -#include <thread> - -#include <sys/uio.h> - -#include <errno.h> - -template <class TFileManip> -inline ssize_t Readv(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { - ssize_t read_count = 0; - for (int n = 0; n < iovcnt; n++) { - ssize_t last_read = fileManip.Read(iov[n].iov_base, iov[n].iov_len); - if (last_read < 0) - return -1; - read_count += last_read; - } - return read_count; -} - -template <class TVal, typename TBasePageIter> -class TInputRecordIterator: public TBasePageIter { - typedef THolder<NMicroBDB::IBasePageReader<TVal>> TReaderHolder; - -public: - typedef TBasePageIter TPageIter; - - TInputRecordIterator() { - Init(); - } - - ~TInputRecordIterator() { - Term(); - } - - const TVal* Current() const { - return Rec; - } - - bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { - if (!Rec) - return false; - return Reader->GetExtInfo(extInfo); - } - - const ui8* GetExtInfoRaw(size_t* len) const { - if (!Rec) - return nullptr; - return Reader->GetExtInfoRaw(len); - } - - size_t GetRecSize() const { - return Reader->GetRecSize(); - } - - size_t GetExtSize() const { - return Reader->GetExtSize(); - } - - const TVal* Next() { - if (RecNum) - --RecNum; - else { - TDatPage* page = TPageIter::Next(); - if (!page) { - if (TPageIter::IsFrozen() && Reader.Get()) - Reader->SetClearFlag(); - return Rec = nullptr; - } else if (!!SelectReader()) - return Rec = nullptr; - RecNum = TPageIter::Current()->RecNum - 1; - } - return Rec = Reader->Next(); - } - - // Skip(0) == Current(); Skip(1) == Next() - const TVal* Skip(int& num) { - // Y_ASSERT(num >= 0); ? otherwise it gets into infinite loop - while (num > RecNum) { - num -= RecNum + 1; - if (!TPageIter::Next() || !!SelectReader()) { - RecNum = 0; - return Rec = nullptr; - } - RecNum = TPageIter::Current()->RecNum - 1; - Rec = Reader->Next(); - } - ++num; - while (--num) - Next(); - return Rec; - } - - // begin reading from next page - void Reset() { - Rec = NULL; - RecNum = 0; - if (Reader.Get()) - Reader->Reset(); - } - -protected: - int Init() { - Rec = nullptr; - RecNum = 0; - Format = MBDB_FORMAT_NULL; - return 0; - } - - int Term() { - Reader.Reset(nullptr); - Format = MBDB_FORMAT_NULL; - Rec = nullptr; - RecNum = 0; - return 0; - } - - const TVal* GotoPage(int pageno) { - if (!TPageIter::GotoPage(pageno) || !!SelectReader()) - return Rec = nullptr; - RecNum = TPageIter::Current()->RecNum - 1; - return Rec = Reader->Next(); - } - - int SelectReader() { - if (!TPageIter::Current()) - return MBDB_UNEXPECTED_EOF; - if (ui32(Format) != TPageIter::Current()->Format) { - switch (TPageIter::Current()->Format) { - case MBDB_FORMAT_RAW: - Reader.Reset(new NMicroBDB::TRawPageReader<TVal, TPageIter>(this)); - break; - case MBDB_FORMAT_COMPRESSED: - Reader.Reset(new NMicroBDB::TCompressedReader<TVal, TPageIter>(this)); - break; - default: - return MBDB_NOT_SUPPORTED; - } - Format = EPageFormat(TPageIter::Current()->Format); - } else { - Y_ASSERT(Reader.Get() != nullptr); - Reader->Reset(); - } - return 0; - } - - const TVal* Rec; - TReaderHolder Reader; - int RecNum; //!< number of records on the current page after the current record - EPageFormat Format; -}; - -template <class TBaseReader> -class TInputPageIterator: public TBaseReader { -public: - typedef TBaseReader TReader; - - TInputPageIterator() - : Buf(nullptr) - { - Term(); - } - - ~TInputPageIterator() { - Term(); - } - - TDatPage* Current() { - return CurPage; - } - - int Freeze() { - return (Frozen = (PageNum == -1) ? 0 : PageNum); - } - - void Unfreeze() { - Frozen = -1; - } - - inline int IsFrozen() const { - return Frozen + 1; - } - - inline size_t GetPageSize() const { - return TReader::GetPageSize(); - } - - inline int GetPageNum() const { - return PageNum; - } - - inline int IsEof() const { - return Eof; - } - - TDatPage* Next() { - if (PageNum >= Maxpage && ReadBuf()) { - Eof = Eof ? Eof : TReader::IsEof(); - return CurPage = nullptr; - } - return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); - } - - TDatPage* GotoPage(int pageno) { - if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { - PageNum = pageno; - return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); - } - if (IsFrozen() || TReader::GotoPage(pageno)) - return nullptr; - Maxpage = PageNum = pageno - 1; - Eof = 0; - return Next(); - } - -protected: - int Init(size_t pages, int pagesOrBytes) { - Term(); - if (pagesOrBytes == -1) - Bufpages = TReader::GetLastPage(); - else if (pagesOrBytes) - Bufpages = pages; - else - Bufpages = pages / GetPageSize(); - if (!TReader::GetLastPage()) { - Bufpages = 0; - assert(Eof == 1); - return 0; - } - int lastPage = TReader::GetLastPage(); - if (lastPage >= 0) - Bufpages = (int)Min(lastPage, Bufpages); - Bufpages = Max(2, Bufpages); - Eof = 0; - ABuf.Alloc(Bufpages * GetPageSize()); - return (Buf = ABuf.Begin()) ? 0 : ENOMEM; - // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; - } - - int Term() { - // free(Buf); - ABuf.Dealloc(); - Buf = nullptr; - Maxpage = PageNum = Frozen = -1; - Bufpages = 0; - Pages = 0; - Eof = 1; - CurPage = nullptr; - return 0; - } - - int ReadBuf() { - int nvec; - iovec vec[2]; - int maxpage = (Frozen == -1 ? Maxpage + 1 : Frozen) + Bufpages - 1; - int minpage = Maxpage + 1; - if (maxpage < minpage) - return EAGAIN; - minpage %= Bufpages; - maxpage %= Bufpages; - if (maxpage < minpage) { - vec[0].iov_base = Buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (Bufpages - minpage); - vec[1].iov_base = Buf; - vec[1].iov_len = GetPageSize() * (maxpage + 1); - nvec = 2; - } else { - vec[0].iov_base = Buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); - nvec = 1; - } - TReader::ReadPages(vec, nvec, &Pages); - Maxpage += Pages; - return !Pages; - } - - int Maxpage, PageNum, Frozen, Bufpages, Eof, Pages; - TDatPage* CurPage; - // TMappedArray<char> ABuf; - TMappedAllocation ABuf; - char* Buf; -}; - -template <class TBaseReader> -class TInputPageIteratorMT: public TBaseReader { -public: - typedef TBaseReader TReader; - - TInputPageIteratorMT() - : CurBuf(0) - , CurReadBuf(0) - , Buf(nullptr) - { - Term(); - } - - ~TInputPageIteratorMT() { - Term(); - } - - TDatPage* Current() { - return CurPage; - } - - int Freeze() { - return (Frozen = (PageNum == -1) ? 0 : PageNum); - } - - void Unfreeze() { - Frozen = -1; - } - - inline int IsFrozen() const { - return Frozen + 1; - } - - inline size_t GetPageSize() const { - return TReader::GetPageSize(); - } - - inline int GetPageNum() const { - return PageNum; - } - - inline int IsEof() const { - return Eof; - } - - TDatPage* Next() { - if (Eof) - return CurPage = nullptr; - if (PageNum >= Maxpage && ReadBuf()) { - Eof = Eof ? Eof : TReader::IsEof(); - return CurPage = nullptr; - } - return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); - } - - TDatPage* GotoPage(int pageno) { - if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) { - PageNum = pageno; - return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); - } - if (IsFrozen() || TReader::GotoPage(pageno)) - return nullptr; - Maxpage = PageNum = pageno - 1; - Eof = 0; - return Next(); - } - - void ReadPages() { - // fprintf(stderr, "ReadPages started\n"); - bool eof = false; - while (!eof) { - QEvent[CurBuf].Wait(); - if (Finish) - return; - int pages = ReadCurBuf(Bufs[CurBuf]); - PagesM[CurBuf] = pages; - eof = !pages; - AEvent[CurBuf].Signal(); - CurBuf ^= 1; - } - } - -protected: - int Init(size_t pages, int pagesOrBytes) { - Term(); - if (pagesOrBytes == -1) - Bufpages = TReader::GetLastPage(); - else if (pagesOrBytes) - Bufpages = pages; - else - Bufpages = pages / GetPageSize(); - if (!TReader::GetLastPage()) { - Bufpages = 0; - assert(Eof == 1); - return 0; - } - int lastPage = TReader::GetLastPage(); - if (lastPage >= 0) - Bufpages = (int)Min(lastPage, Bufpages); - Bufpages = Max(2, Bufpages); - Eof = 0; - ABuf.Alloc(Bufpages * GetPageSize() * 2); - Bufs[0] = ABuf.Begin(); - Bufs[1] = Bufs[0] + Bufpages * GetPageSize(); - // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM; - Finish = false; - ReadThread = std::thread([this]() { - TThread::SetCurrentThreadName("DatReader"); - ReadPages(); - }); - QEvent[0].Signal(); - return Bufs[0] ? 0 : ENOMEM; - } - - void StopThread() { - Finish = true; - QEvent[0].Signal(); - QEvent[1].Signal(); - ReadThread.join(); - } - - int Term() { - // free(Buf); - if (ReadThread.joinable()) - StopThread(); - ABuf.Dealloc(); - Buf = nullptr; - Bufs[0] = nullptr; - Bufs[1] = nullptr; - Maxpage = MaxpageR = PageNum = Frozen = -1; - Bufpages = 0; - Pages = 0; - Eof = 1; - CurPage = nullptr; - return 0; - } - - int ReadCurBuf(char* buf) { - int nvec; - iovec vec[2]; - int maxpage = (Frozen == -1 ? MaxpageR + 1 : Frozen) + Bufpages - 1; - int minpage = MaxpageR + 1; - if (maxpage < minpage) - return EAGAIN; - minpage %= Bufpages; - maxpage %= Bufpages; - if (maxpage < minpage) { - vec[0].iov_base = buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (Bufpages - minpage); - vec[1].iov_base = buf; - vec[1].iov_len = GetPageSize() * (maxpage + 1); - nvec = 2; - } else { - vec[0].iov_base = buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); - nvec = 1; - } - int pages; - TReader::ReadPages(vec, nvec, &pages); - MaxpageR += pages; - return pages; - } - - int ReadBuf() { - QEvent[CurReadBuf ^ 1].Signal(); - AEvent[CurReadBuf].Wait(); - Buf = Bufs[CurReadBuf]; - Maxpage += (Pages = PagesM[CurReadBuf]); - CurReadBuf ^= 1; - return !Pages; - } - - int Maxpage, MaxpageR, PageNum, Frozen, Bufpages, Eof, Pages; - TDatPage* CurPage; - // TMappedArray<char> ABuf; - ui32 CurBuf; - ui32 CurReadBuf; - TMappedAllocation ABuf; - char* Buf; - char* Bufs[2]; - ui32 PagesM[2]; - TAutoEvent QEvent[2]; - TAutoEvent AEvent[2]; - std::thread ReadThread; - bool Finish; -}; - -template <typename TFileManip> -class TInputPageFileImpl: private TNonCopyable { -protected: - TFileManip FileManip; - -public: - TInputPageFileImpl() - : Pagesize(0) - , Fd(-1) - , Eof(1) - , Error(0) - , Pagenum(0) - , Recordsig(0) - { - Term(); - } - - ~TInputPageFileImpl() { - Term(); - } - - inline int IsEof() const { - return Eof; - } - - inline int GetError() const { - return Error; - } - - inline size_t GetPageSize() const { - return Pagesize; - } - - inline int GetLastPage() const { - return Pagenum; - } - - inline ui32 GetRecordSig() const { - return Recordsig; - } - - inline bool IsOpen() const { - return FileManip.IsOpen(); - } - -protected: - int Init(const char* fname, ui32 recsig, ui32* gotrecsig = nullptr, bool direct = false) { - Error = FileManip.Open(fname, direct); - return Error ? Error : Init(TFile(), recsig, gotrecsig); - } - - int Init(const TFile& file, ui32 recsig, ui32* gotrecsig = nullptr) { - if (!file.IsOpen() && !FileManip.IsOpen()) - return MBDB_NOT_INITIALIZED; - if (file.IsOpen() && FileManip.IsOpen()) - return MBDB_ALREADY_INITIALIZED; - if (file.IsOpen()) { - Error = FileManip.Init(file); - if (Error) - return Error; - } - - // TArrayHolder<ui8> buf(new ui8[METASIZE + FS_BLOCK_SIZE]); - // ui8* ptr = (buf.Get() + FS_BLOCK_SIZE - ((ui64)buf.Get() & (FS_BLOCK_SIZE - 1))); - TMappedArray<ui8> buf; - buf.Create(METASIZE); - ui8* ptr = &buf[0]; - TDatMetaPage* meta = (TDatMetaPage*)ptr; - ssize_t size = METASIZE; - ssize_t ret; - while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { - Y_ASSERT(ret <= size); - size -= ret; - ptr += ret; - } - if (size) { - FileManip.Close(); - return Error = MBDB_BAD_METAPAGE; - } - if (gotrecsig) - *gotrecsig = meta->RecordSig; - return Init(TFile(), meta, recsig); - } - - int Init(TAutoPtr<IInputStream> input, ui32 recsig, ui32* gotrecsig = nullptr) { - if (!input && !FileManip.IsOpen()) - return MBDB_NOT_INITIALIZED; - if (FileManip.IsOpen()) - return MBDB_ALREADY_INITIALIZED; - - Error = FileManip.Open(input); - if (Error) - return Error; - - TArrayHolder<ui8> buf(new ui8[METASIZE]); - ui8* ptr = buf.Get(); - ssize_t size = METASIZE; - ssize_t ret; - while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) { - Y_ASSERT(ret <= size); - size -= ret; - ptr += ret; - } - if (size) { - FileManip.Close(); - return Error = MBDB_BAD_METAPAGE; - } - TDatMetaPage* meta = (TDatMetaPage*)buf.Get(); - if (gotrecsig) - *gotrecsig = meta->RecordSig; - return Init(TFile(), meta, recsig); - } - - int Init(const TFile& file, const TDatMetaPage* meta, ui32 recsig) { - if (!file.IsOpen() && !FileManip.IsOpen()) - return MBDB_NOT_INITIALIZED; - if (file.IsOpen() && FileManip.IsOpen()) - return MBDB_ALREADY_INITIALIZED; - if (file.IsOpen()) { - Error = FileManip.Init(file); - if (Error) - return Error; - } - - if (meta->MetaSig != METASIG) - Error = MBDB_BAD_METAPAGE; - else if (meta->RecordSig != recsig) - Error = MBDB_BAD_RECORDSIG; - - if (Error) { - FileManip.Close(); - return Error; - } - - i64 flength = FileManip.GetLength(); - if (flength >= 0) { - i64 fsize = flength; - fsize -= METASIZE; - if (fsize % meta->PageSize) - return Error = MBDB_BAD_FILE_SIZE; - Pagenum = (int)(fsize / meta->PageSize); - } else { - Pagenum = -1; - } - Pagesize = meta->PageSize; - Recordsig = meta->RecordSig; - Error = Eof = 0; - return Error; - } - - int ReadPages(iovec* vec, int nvec, int* pages) { - *pages = 0; - - if (Eof || Error) - return Error; - - ssize_t size = 0, delta = 0, total = 0; - iovec* pvec = vec; - int vsize = nvec; - - while (vsize && (size = Readv(FileManip, pvec, (int)Min(vsize, 16))) > 0) { - total += size; - if (delta) { - size += delta; - pvec->iov_len += delta; - pvec->iov_base = (char*)pvec->iov_base - delta; - delta = 0; - } - while (size) { - if ((size_t)size >= pvec->iov_len) { - size -= pvec->iov_len; - ++pvec; - --vsize; - } else { - delta = size; - pvec->iov_len -= size; - pvec->iov_base = (char*)pvec->iov_base + size; - size = 0; - } - } - } - if (delta) { - pvec->iov_len += delta; - pvec->iov_base = (char*)pvec->iov_base - delta; - } - if (size < 0) - return Error = errno ? errno : MBDB_READ_ERROR; - if (total % Pagesize) - return Error = MBDB_BAD_FILE_SIZE; - if (vsize) - Eof = 1; - *pages = total / Pagesize; // it would be better to assign it after the for-loops - for (; total; ++vec, total -= size) - for (size = 0; size < total && (size_t)size < vec->iov_len; size += Pagesize) - if (((TDatPage*)((char*)vec->iov_base + size))->PageSig != PAGESIG) - return Error = MBDB_BAD_PAGESIG; - return Error; - } - - int GotoPage(int page) { - if (Error) - return Error; - Eof = 0; - i64 offset = (i64)page * Pagesize + METASIZE; - if (offset != FileManip.Seek(offset, SEEK_SET)) - Error = MBDB_BAD_FILE_SIZE; - return Error; - } - - int Term() { - return FileManip.Close(); - } - - size_t Pagesize; - int Fd; - int Eof; - int Error; - int Pagenum; //!< number of pages in this file - ui32 Recordsig; -}; - -template <class TBaseReader> -class TMappedInputPageIterator: public TBaseReader { -public: - typedef TBaseReader TReader; - - TMappedInputPageIterator() { - Term(); - } - - ~TMappedInputPageIterator() { - Term(); - } - - TDatPage* Current() { - return CurPage; - } - - inline size_t GetPageSize() const { - return TReader::GetPageSize(); - } - - inline int GetPageNum() const { - return PageNum; - } - - inline int IsEof() const { - return Eof; - } - - inline int IsFrozen() const { - return 0; - } - - TDatPage* Next() { - i64 pos = (i64)(++PageNum) * GetPageSize() + METASIZE; - if (pos < 0 || pos >= (i64)TReader::GetSize()) { - Eof = 1; - return CurPage = nullptr; - } - return CurPage = (TDatPage*)((char*)TReader::GetData() + pos); - } - -protected: - int Init(size_t /*pages*/, int /*pagesOrBytes*/) { - Term(); - Eof = 0; - return 0; - } - - int Term() { - PageNum = -1; - Eof = 1; - CurPage = nullptr; - return 0; - } - - TDatPage* GotoPage(int pageno) { - PageNum = pageno - 1; - Eof = 0; - return Next(); - } - - int PageNum, Eof, Pages, Pagenum; - TDatPage* CurPage; -}; - -using TInputPageFile = TInputPageFileImpl<TInputFileManip>; - -template <class TVal, - typename TBaseRecIter = TInputRecordIterator<TVal, TInputPageIterator<TInputPageFile>>> -class TInDatFileImpl: public TBaseRecIter { -public: - typedef TBaseRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TPageIter::TReader TReader; - using TRecIter::GotoPage; - - int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr, bool direct = false) { - int ret = TReader::Init(fname, TVal::RecordSig, gotRecordSig, direct); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Open(const TFile& file, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { - int ret = TReader::Init(file, TVal::RecordSig, gotRecordSig); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Open(TAutoPtr<IInputStream> input, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { - int ret = TReader::Init(input, TVal::RecordSig, gotRecordSig); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Open(const TFile& file, const TDatMetaPage* meta, size_t pages = 1, int pagesOrBytes = 1) { - int ret = TReader::Init(file, meta, TVal::RecordSig); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Close() { - int ret1 = TRecIter::Term(); - int ret2 = TPageIter::Term(); - int ret3 = TReader::Term(); - return ret1 ? ret1 : ret2 ? ret2 : ret3; - } - - const TVal* GotoLastPage() { - return TReader::GetLastPage() <= 0 ? nullptr : TRecIter::GotoPage(TReader::GetLastPage() - 1); - } - -private: - int Open2(size_t pages, int pagesOrBytes) { - int ret = TPageIter::Init(pages, pagesOrBytes); - if (!ret) - ret = TRecIter::Init(); - if (ret) - Close(); - return ret; - } -}; - -template <class TVal> -class TInIndexFile: protected TInDatFileImpl<TVal> { - typedef TInDatFileImpl<TVal> TDatFile; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TExtInfoType<TVal>::TResult TExtInfo; - -public: - using TDatFile::IsOpen; - - TInIndexFile() - : Index0(nullptr) - { - } - - int Open(const char* fname, size_t pages = 2, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { - int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); - if (ret) - return ret; - if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { - TDatFile::Close(); - return MBDB_NO_MEMORY; - } - if (!TExtInfoType<TVal>::Exists && SizeOf((TVal*)nullptr)) - RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TVal*)nullptr)); - TDatFile::Next(); - memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); - return 0; - } - - int Close() { - free(Index0); - Index0 = nullptr; - return TDatFile::Close(); - } - - inline int GetError() const { - return TDatFile::GetError(); - } - - int FindKey(const TVal* akey, const TExtInfo* extInfo = nullptr) { - assert(IsOpen()); - if (TExtInfoType<TVal>::Exists || !SizeOf((TVal*)nullptr)) - return FindVszKey(akey, extInfo); - int num = FindKeyOnPage(Index0, akey); - TDatPage* page = TPageIter::GotoPage(num + 1); - if (!page) - return 0; - num = FindKeyOnPage(page, akey); - num += (TPageIter::GetPageNum() - 1) * RecsOnPage; - return num; - } - - int FindVszKey(const TVal* akey, const TExtInfo* extInfo = NULL) { - int num = FindVszKeyOnPage(Index0, akey, extInfo); - int num_add = 0; - for (int p = 0; p < num; p++) { - TDatPage* page = TPageIter::GotoPage(p + 1); - if (!page) - return 0; - num_add += page->RecNum; - } - TDatPage* page = TPageIter::GotoPage(num + 1); - if (!page) - return 0; - num = FindVszKeyOnPage(page, akey, extInfo); - num += num_add; - return num; - } - -protected: - int FindKeyOnPage(TDatPage* page, const TVal* key) { - int left = 0; - int right = page->RecNum - 1; - int recsize = DatCeil(SizeOf((TVal*)nullptr)); - while (left < right) { - int middle = (left + right) >> 1; - if (*((TVal*)((char*)page + sizeof(TDatPage) + middle * recsize)) < *key) - left = middle + 1; - else - right = middle; - } - //borders check (left and right) - return (left == 0 || *((TVal*)((char*)page + sizeof(TDatPage) + left * recsize)) < *key) ? left : left - 1; - } - - // will deserialize rawExtinfoA to extInfoA only if necessery - inline bool KeyLess_(const TVal* a, const TVal* b, - TExtInfo* extInfoA, const TExtInfo* extInfoB, - const ui8* rawExtInfoA, size_t rawLen) { - if (*a < *b) { - return true; - } else if (!extInfoB || *b < *a) { - return false; - } else { - // *a == *b && extInfoB - Y_PROTOBUF_SUPPRESS_NODISCARD extInfoA->ParseFromArray(rawExtInfoA, rawLen); - return (*extInfoA < *extInfoB); - } - } - - int FindVszKeyOnPage(TDatPage* page, const TVal* key, const TExtInfo* extInfo) { - TVal* cur = (TVal*)((char*)page + sizeof(TDatPage)); - ui32 recnum = page->RecNum; - if (!TExtInfoType<TVal>::Exists) { - for (; recnum > 0 && *cur < *key; --recnum) - cur = (TVal*)((char*)cur + DatCeil(SizeOf(cur))); - } else { - size_t ll; - size_t l; - size_t sz = NMicroBDB::SizeOfExt(cur, &ll, &l); - TExtInfo ei; - for (; recnum > 0 && KeyLess_(cur, key, &ei, extInfo, (ui8*)cur + sz + ll, l); --recnum) { - cur = (TVal*)((ui8*)cur + DatCeil(sz + ll + l)); - sz = NMicroBDB::SizeOfExt(cur, &ll, &l); - } - } - - int idx = page->RecNum - recnum - 1; - return (idx >= 0) ? idx : 0; - } - - TDatPage* Index0; - int RecsOnPage; -}; - -template <class TVal, class TKey, class TPageIterator = TInputPageIterator<TInputPageFile>> -class TKeyFileMixin: public TInDatFileImpl<TVal, TInputRecordIterator<TVal, TPageIterator>> { -protected: - TInIndexFile<TKey> KeyFile; -}; - -template <class TVal, class TKey, class TBase = TKeyFileMixin<TVal, TKey>> -class TDirectInDatFile: public TBase { - typedef TBase TDatFile; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TDatFile::TPageIter TPageIter; - -public: - void Open(const char* path, size_t pages = 1, size_t keypages = 1, int pagesOrBytes = 1) { - int ret; - ui32 gotRecordSig = 0; - - ret = TDatFile::Open(path, pages, pagesOrBytes, &gotRecordSig); - if (ret) { - ythrow yexception() << ErrorMessage(ret, "Failed to open input file", path, TVal::RecordSig, gotRecordSig); - } - char KeyName[PATH_MAX + 1]; - if (DatNameToIdx(KeyName, path)) { - ythrow yexception() << ErrorMessage(MBDB_BAD_FILENAME, "Failed to open input file", path); - } - gotRecordSig = 0; - ret = KeyFile.Open(KeyName, keypages, 1, &gotRecordSig); - if (ret) { - ythrow yexception() << ErrorMessage(ret, "Failed to open input keyfile", KeyName, TKey::RecordSig, gotRecordSig); - } - } - - void Close() { - int ret; - - if (TDatFile::IsOpen() && (ret = TDatFile::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing input file"); - if ((ret = TDatFile::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing input file"); - - if (KeyFile.IsOpen() && (ret = KeyFile.GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing input keyfile"); - if ((ret = KeyFile.Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing input keyfile"); - } - - const TVal* FindRecord(const TKey* key, const typename TExtInfoType<TKey>::TResult* extInfo = nullptr) { - int page = KeyFile.FindKey(key, extInfo); - const TVal* val = TRecIter::GotoPage(page); - if (!TExtInfoType<TVal>::Exists || !extInfo) { - TKey k; - while (val) { - TMakeExtKey<TVal, TKey>::Make(&k, nullptr, val, nullptr); - if (!(k < *key)) - break; - val = TRecIter::Next(); - } - } else { - typename TExtInfoType<TVal>::TResult valExt; - TKey k; - typename TExtInfoType<TKey>::TResult kExt; - while (val) { - TRecIter::GetExtInfo(&valExt); - TMakeExtKey<TVal, TKey>::Make(&k, &kExt, val, &valExt); - if (*key < k || !(k < *key) && !(kExt < *extInfo)) // k > *key || k == *key && kExt >= *extInfo - break; - val = TRecIter::Next(); - } - } - return val; - } - - int FindPagesNo(const TKey* key, const typename TExtInfoType<TVal>::TResult* extInfo = NULL) { - return KeyFile.FindKey(key, extInfo); - } - -protected: - using TBase::KeyFile; -}; diff --git a/library/cpp/microbdb/microbdb.cpp b/library/cpp/microbdb/microbdb.cpp deleted file mode 100644 index c10dbdf1268..00000000000 --- a/library/cpp/microbdb/microbdb.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "microbdb.h" diff --git a/library/cpp/microbdb/microbdb.h b/library/cpp/microbdb/microbdb.h deleted file mode 100644 index 75218873374..00000000000 --- a/library/cpp/microbdb/microbdb.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include <util/folder/dirut.h> - -#if defined(_MSC_VER) -#pragma warning(push) -#pragma warning(disable : 4706) /*assignment within conditional expression*/ -#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ -#endif - -#include "align.h" -#include "extinfo.h" -#include "header.h" -#include "reader.h" -#include "heap.h" -#include "file.h" -#include "sorter.h" -#include "input.h" -#include "output.h" -#include "sorterdef.h" - -inline int MakeSorterTempl(char path[/*FILENAME_MAX*/], const char* prefix) { - int ret = MakeTempDir(path, prefix); - if (!ret && strlcat(path, "%06d", FILENAME_MAX) > FILENAME_MAX - 100) - ret = EINVAL; - if (ret) - path[0] = 0; - return ret; -} - -inline int GetMeta(TFile& file, TDatMetaPage* meta) { - ui8 buf[METASIZE], *ptr = buf; - ssize_t size = sizeof(buf), ret; - while (size && (ret = file.Read(ptr, size)) > 0) { - size -= ret; - ptr += ret; - } - if (size) - return MBDB_BAD_FILE_SIZE; - ptr = buf; // gcc 4.4 warning fix - *meta = *(TDatMetaPage*)ptr; - return (meta->MetaSig == METASIG) ? 0 : MBDB_BAD_METAPAGE; -} - -template <class TRec> -inline bool IsDatFile(const char* fname) { - TDatMetaPage meta; - TFile f(fname, RdOnly); - return !GetMeta(f, &meta) && meta.RecordSig == TRec::RecordSig; -} - -#if defined(_MSC_VER) -#pragma warning(pop) -#endif diff --git a/library/cpp/microbdb/noextinfo.proto b/library/cpp/microbdb/noextinfo.proto deleted file mode 100644 index 6a78882e079..00000000000 --- a/library/cpp/microbdb/noextinfo.proto +++ /dev/null @@ -1,4 +0,0 @@ - -message TNoExtInfo { -} - diff --git a/library/cpp/microbdb/output.h b/library/cpp/microbdb/output.h deleted file mode 100644 index d0ecab21085..00000000000 --- a/library/cpp/microbdb/output.h +++ /dev/null @@ -1,1049 +0,0 @@ -#pragma once - -#include "header.h" -#include "file.h" - -#include <util/generic/buffer.h> -#include <util/memory/tempbuf.h> - -#include <sys/uio.h> - -template <class TFileManip> -inline ssize_t Writev(TFileManip& fileManip, const struct iovec* iov, int iovcnt) { - ssize_t written_count = 0; - for (int n = 0; n < iovcnt; n++) { - ssize_t last_write = fileManip.Write(iov[n].iov_base, iov[n].iov_len); - if (last_write < 0) - return -1; - written_count += last_write; - } - return written_count; -} - -//********************************************************************* -struct TFakeIndexer { - inline void NextPage(TDatPage*) noexcept { - } -}; - -struct TCallbackIndexer { - typedef void (*TCallback)(void* This, const TDatPage* page); - - TCallbackIndexer() { - Callback = nullptr; - } - - void SetCallback(void* t, TCallback c) { - This = t; - Callback = c; - } - - void NextPage(TDatPage* dat) { - Callback(This, dat); - } - - TCallback Callback; - void* This; -}; - -template <class TVal, typename TBasePageIter, typename TBaseIndexer = TFakeIndexer, typename TCompressor = TFakeCompression> -class TOutputRecordIterator; - -template <class TVal, typename TBasePageIter, typename TBaseIndexer> -class TOutputRecordIterator<TVal, TBasePageIter, TBaseIndexer, TFakeCompression> - : public TBasePageIter, public TBaseIndexer { -public: - enum EOffset { - WrongOffset = size_t(-1) - }; - - typedef TBasePageIter TPageIter; - typedef TBaseIndexer TIndexer; - - TOutputRecordIterator() { - Clear(); - } - - ~TOutputRecordIterator() { - Term(); - } - - inline const TVal* Current() const { - return Rec; - } - - const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { - NMicroBDB::AssertValid(v); - size_t len = SizeOf(v); - if (!TExtInfoType<TVal>::Exists) - return (Reserve(len)) ? (TVal*)memcpy(Rec, v, len) : nullptr; - else if (extInfo) { - size_t extSize = extInfo->ByteSize(); - size_t extLenSize = len_long((i64)extSize); - if (!Reserve(len + extLenSize + extSize)) - return nullptr; - memcpy(Rec, v, len); - out_long((i64)extSize, (char*)Rec + len); - extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); - return Rec; - } else { - size_t extLenSize = len_long((i64)0); - if (!Reserve(len + extLenSize)) - return nullptr; - memcpy(Rec, v, len); - out_long((i64)0, (char*)Rec + len); - return Rec; - } - } - - const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { - NMicroBDB::AssertValid(v); - size_t sz = SizeOf(v); - if (!Reserve(sz + extLen)) - return nullptr; - memcpy(Rec, v, sz); - memcpy((ui8*)Rec + sz, extInfoRaw, extLen); - return Rec; - } - - // use values stored in microbdb readers/writers internal buffer only. - // method expects serialized extInfo after this record - const TVal* PushWithExtInfo(const TVal* v) { - NMicroBDB::AssertValid(v); - size_t extSize; - size_t extLenSize; - size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); - sz += extLenSize + extSize; - if (!Reserve(sz)) - return nullptr; - memcpy(Rec, v, sz); - return Rec; - } - - TVal* Reserve(size_t len) { - if (CurLen + DatCeil(len) > TPageIter::GetPageSize()) { - if (sizeof(TDatPage) + DatCeil(len) > TPageIter::GetPageSize()) - return Rec = nullptr; - if (TPageIter::Current() && RecNum) { - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_RAW; - memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); - TIndexer::NextPage(TPageIter::Current()); - RecNum = 0; - } - if (!TPageIter::Next()) { - CurLen = TPageIter::GetPageSize(); - return Rec = nullptr; - } - CurLen = sizeof(TDatPage); - } - LenForOffset = CurLen; - Rec = (TVal*)((char*)TPageIter::Current() + CurLen); - DatSet(Rec, len); - - CurLen += DatCeil(len); - - ++RecNum; - return Rec; - } - - void Flush() { - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_RAW; - } - - size_t Offset() const { - return Rec ? TPageIter::Offset() + LenForOffset : WrongOffset; - } - - void ResetDat() { - CurLen = (char*)Rec - (char*)TPageIter::Current(); - size_t len; - if (!TExtInfoType<TVal>::Exists) { - len = SizeOf(Rec); - } else { - size_t ll; - size_t l; - len = NMicroBDB::SizeOfExt(Rec, &ll, &l); - len += ll + l; - } - CurLen += DatCeil(len); - } - -protected: - void Clear() { - Rec = nullptr; - RecNum = 0; - CurLen = 0; - LenForOffset = 0; - } - - int Init() { - Clear(); - CurLen = TPageIter::GetPageSize(); - return 0; - } - - int Term() { - if (TPageIter::Current()) { - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_RAW; - memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); - RecNum = 0; - } - int ret = !TPageIter::Current() && RecNum; - Clear(); - return ret; - } - - int GotoPage(int pageno) { - if (TPageIter::Current()) { - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_RAW; - memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); - } - int ret = TPageIter::GotoPage(pageno); - if (!ret) { - RecNum = 0; - CurLen = sizeof(TDatPage); - } - return ret; - } - - TVal* Rec; - int RecNum; - size_t CurLen; - size_t LenForOffset; -}; - -template <class TVal, typename TBasePageIter, typename TBaseIndexer, typename TAlgorithm> -class TOutputRecordIterator - : public TBasePageIter, - public TBaseIndexer, - private TAlgorithm { - class TPageBuffer { - public: - void Init(size_t page) { - Pos = 0; - RecNum = 0; - Size = Min(page / 2, size_t(64 << 10)); - Data.Reset(new ui8[Size]); - } - - void Clear() { - Pos = 0; - RecNum = 0; - } - - inline bool Empty() const { - return RecNum == 0; - } - - public: - size_t Size; - size_t Pos; - int RecNum; - TArrayHolder<ui8> Data; - }; - -public: - typedef TBasePageIter TPageIter; - typedef TBaseIndexer TIndexer; - - TOutputRecordIterator() - : Rec(nullptr) - , RecNum(0) - { - } - - ~TOutputRecordIterator() { - Term(); - } - - const TVal* Current() const { - return Rec; - } - - const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { - NMicroBDB::AssertValid(v); - size_t len = SizeOf(v); - if (!TExtInfoType<TVal>::Exists) - return (Reserve(len)) ? (TVal*)memcpy((TVal*)Rec, v, len) : nullptr; - else if (extInfo) { - size_t extSize = extInfo->ByteSize(); - size_t extLenSize = len_long((i64)extSize); - if (!Reserve(len + extLenSize + extSize)) - return nullptr; - memcpy(Rec, v, len); - out_long((i64)extSize, (char*)Rec + len); - extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize); - return Rec; - } else { - size_t extLenSize = len_long((i64)0); - if (!Reserve(len + extLenSize)) - return nullptr; - memcpy(Rec, v, len); - out_long((i64)0, (char*)Rec + len); - return Rec; - } - } - - const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { - NMicroBDB::AssertValid(v); - size_t sz = SizeOf(v); - if (!Reserve(sz + extLen)) - return NULL; - memcpy(Rec, v, sz); - memcpy((ui8*)Rec + sz, extInfoRaw, extLen); - return Rec; - } - - // use values stored in microbdb readers/writers internal buffer only. - // method expects serialized extInfo after this record - const TVal* PushWithExtInfo(const TVal* v) { - NMicroBDB::AssertValid(v); - size_t extSize; - size_t extLenSize; - size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize); - sz += extLenSize + extSize; - if (!Reserve(sz)) - return nullptr; - memcpy(Rec, v, sz); - return Rec; - } - - TVal* Reserve(const size_t len) { - const size_t aligned = DatCeil(len); - - if (!TPageIter::Current()) { // Allocate fist page - if (!TPageIter::Next()) { - CurLen = TPageIter::GetPageSize(); - return Rec = nullptr; - } - CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); - } - - if (Buffer.Pos + aligned > Buffer.Size) { - if (Buffer.Pos == 0) - return Rec = nullptr; - if (FlushBuffer()) - return Rec = nullptr; - if (Buffer.Pos + aligned + sizeof(TDatPage) + sizeof(TCompressedPage) > Buffer.Size) - return Rec = nullptr; - } - - Rec = (TVal*)((char*)Buffer.Data.Get() + Buffer.Pos); - DatSet(Rec, len); // len is correct because DatSet set align tail to zero - - Buffer.RecNum++; - Buffer.Pos += aligned; - ++RecNum; - return Rec; - } - - void Flush() { - if (!Buffer.Empty()) { - FlushBuffer(); - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; - } - } - - size_t Offset() const { - // According to vadya@ there is no evil to return 0 all the time - return 0; - } - - void ResetDat() { - Buffer.Pos = (char*)Rec - (char*)Buffer.Data.Get(); - size_t len = SizeOf(Rec); - Buffer.Pos += DatCeil(len); - } - -protected: - void Clear() { - RecNum = 0; - Rec = nullptr; - Count = 0; - CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); - Buffer.Clear(); - } - - int Init() { - Clear(); - Buffer.Init(TPageIter::GetPageSize()); - TAlgorithm::Init(); - return 0; - } - - int Term() { - if (TPageIter::Current()) - Commit(); - int ret = !TPageIter::Current() && RecNum; - Clear(); - TAlgorithm::Term(); - return ret; - } - - int GotoPage(int pageno) { - if (TPageIter::Current()) - Commit(); - int ret = TPageIter::GotoPage(pageno); - if (!ret) - Reset(); - return ret; - } - -private: - void Commit() { - Flush(); - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; - SetCompressedPageHeader(); - - memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); - RecNum = 0; - Count = 0; - } - - inline void SetCompressedPageHeader() { - TCompressedPage* const hdr = (TCompressedPage*)((ui8*)TPageIter::Current() + sizeof(TDatPage)); - - hdr->BlockCount = Count; - hdr->Algorithm = TAlgorithm::Code; - hdr->Version = 0; - hdr->Reserved = 0; - } - - inline void Reset() { - RecNum = 0; - CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); - Count = 0; - Buffer.Clear(); - } - - int FlushBuffer() { - TArrayHolder<ui8> data; - const ui8* const buf = Buffer.Data.Get(); - size_t first = 0; - - if (!TExtInfoType<TVal>::Exists) - first = DatCeil(SizeOf((TVal*)buf)); - else { - size_t ll; - size_t l; - first = NMicroBDB::SizeOfExt((const TVal*)buf, &ll, &l); - first = DatCeil(first + ll + l); - } - - size_t total = sizeof(NMicroBDB::TCompressedHeader) + first + ((Buffer.RecNum == 1) ? 0 : TAlgorithm::CompressBound(Buffer.Pos - first)); - size_t real = total; - - { - ui8* p = nullptr; - NMicroBDB::TCompressedHeader* hdr = nullptr; - - // 1. Choose data destination (temporary buffer or dat-page) - if (CurLen + total > TPageIter::GetPageSize()) { - data.Reset(new ui8[total]); - - hdr = (NMicroBDB::TCompressedHeader*)data.Get(); - p = data.Get() + sizeof(NMicroBDB::TCompressedHeader); - } else { - p = (ui8*)TPageIter::Current() + CurLen; - hdr = (NMicroBDB::TCompressedHeader*)p; - p += sizeof(NMicroBDB::TCompressedHeader); - } - - // 2. Compress data - - // Fill header and first record - hdr->Original = Buffer.Pos; - hdr->Compressed = 0; - hdr->Count = Buffer.RecNum; - hdr->Reserved = 0; - memcpy(p, Buffer.Data.Get(), first); - // Fill compressed part - if (Buffer.RecNum > 1) { - size_t size = TAlgorithm::CompressBound(Buffer.Pos - first); - - p += first; - TAlgorithm::Compress(p, size, buf + first, Buffer.Pos - first); - - hdr->Compressed = size; - - real = sizeof(NMicroBDB::TCompressedHeader) + first + size; - } - } - - Y_ASSERT(sizeof(TDatPage) + sizeof(TCompressedPage) + real <= TPageIter::GetPageSize()); - - // 3. Check page capacity - - if (CurLen + real > TPageIter::GetPageSize()) { - Y_ASSERT(data.Get() != nullptr); - - if (TPageIter::Current() && RecNum) { - RecNum = RecNum - Buffer.RecNum; - TPageIter::Current()->RecNum = RecNum; - TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED; - SetCompressedPageHeader(); - memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen); - TIndexer::NextPage(TPageIter::Current()); - RecNum = Buffer.RecNum; - Count = 0; - } - if (!TPageIter::Next()) { - CurLen = TPageIter::GetPageSize(); - return MBDB_NO_MEMORY; - } - CurLen = sizeof(TDatPage) + sizeof(TCompressedPage); - } - - // 4. Flush data and reset buffer state - - if (data.Get()) - memcpy((ui8*)TPageIter::Current() + CurLen, data.Get(), real); - CurLen += real; - ++Count; - Buffer.Clear(); - return 0; - } - -private: - size_t CurLen; - TPageBuffer Buffer; - TVal* Rec; - ui32 Count; //! < count of compressed blocks on page -public: - int RecNum; -}; - -template <typename TBaseWriter> -class TOutputPageIterator: public TBaseWriter { -public: - typedef TBaseWriter TWriter; - - TOutputPageIterator() - : Buf(nullptr) - { - Clear(); - } - - ~TOutputPageIterator() { - Term(); - } - - TDatPage* Current() { - return CurPage; - } - - size_t Offset() const { - //Cout << "PS = " << TWriter::GetPageSize() << "; PN = " << PageNum << "; MS = " << METASIZE << Endl; - return TWriter::GetPageSize() * PageNum + METASIZE; - } - - int Freeze() { - return (Frozen = (PageNum == -1) ? 0 : (int)PageNum); - } - - void Unfreeze() { - Frozen = -1; - } - - inline int IsFrozen() const { - return Frozen + 1; - } - - inline size_t GetPageSize() const { - return TWriter::GetPageSize(); - } - - inline int GetPageNum() const { - return (int)PageNum; - } - - TDatPage* Next() { - if (PageNum >= Maxpage && WriteBuf()) - return CurPage = nullptr; - CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize()); - memset(CurPage, 0, sizeof(TDatPage)); - return CurPage; - } - -protected: - int Init(size_t pages, int pagesOrBytes) { - Term(); - if (pagesOrBytes) - Bufpages = pages; - else - Bufpages = pages / GetPageSize(); - Bufpages = Max<size_t>(1, Bufpages); - Maxpage = Bufpages - 1; - // if (!(Buf = (char*)malloc(Bufpages * GetPageSize()))) - // return ENOMEM; - ABuf.Alloc(Bufpages * GetPageSize()); - Buf = ABuf.Begin(); - if (TWriter::Memo) - Freeze(); - return 0; - } - - int Term() { - Unfreeze(); - int ret = (PageNum < 0) ? 0 : WriteBuf(); - Clear(); - return ret; - } - - int GotoPage(int pageno) { - int ret = EAGAIN; - if (IsFrozen() || PageNum >= 0 && ((ret = WriteBuf())) || ((ret = TWriter::GotoPage(pageno)))) - return ret; - PageNum = pageno; - Maxpage = Bufpages - 1 + pageno; - CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize()); - memset(CurPage, 0, sizeof(TDatPage)); - return 0; - } - - void Clear() { - ABuf.Dealloc(); - Buf = nullptr; - Maxpage = PageNum = Frozen = -1; - Bufpages = 0; - CurPage = nullptr; - } - - int WriteBuf() { - int nvec; - iovec vec[2]; - ssize_t minpage = Maxpage - Bufpages + 1; - ssize_t maxpage = Frozen == -1 ? PageNum : Frozen - 1; - if (maxpage < minpage) - return EAGAIN; - minpage %= Bufpages; - maxpage %= Bufpages; - if (maxpage < minpage) { - vec[0].iov_base = Buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (Bufpages - minpage); - vec[1].iov_base = Buf; - vec[1].iov_len = GetPageSize() * (maxpage + 1); - nvec = 2; - } else { - vec[0].iov_base = Buf + GetPageSize() * minpage; - vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1); - nvec = 1; - } - if (TWriter::WritePages(vec, nvec)) - return EIO; - Maxpage += (maxpage < minpage) ? (Bufpages - minpage + maxpage + 1) : (maxpage - minpage + 1); - return 0; - } - - ssize_t Maxpage; - ssize_t Bufpages; - ssize_t PageNum; - int Frozen; - TDatPage* CurPage; - char* Buf; - TMappedAllocation ABuf; -}; - -template <class TFileManip> -class TOutputPageFileImpl: private TNonCopyable { -public: - TOutputPageFileImpl() - : Pagesize(0) - , Eof(1) - , Error(0) - , Memo(0) - , Recordsig(0) - { - } - - ~TOutputPageFileImpl() { - Term(); - } - - inline int IsEof() const { - return Eof; - } - - inline int GetError() const { - return Error; - } - - inline bool IsOpen() const { - return FileManip.IsOpen(); - } - - inline size_t GetPageSize() const { - return Pagesize; - } - - inline ui32 GetRecordSig() const { - return Recordsig; - } - - int Init(const char* fname, size_t pagesize, ui32 recsig, bool direct = false) { - Memo = 0; - if (FileManip.IsOpen()) - return MBDB_ALREADY_INITIALIZED; - - if (!fname) { - Eof = Error = 0; - Pagesize = pagesize; - Recordsig = recsig; - Memo = 1; - return 0; - } - - Error = FileManip.Open(fname, WrOnly | CreateAlways | ARW | AWOther | (direct ? DirectAligned : EOpenMode())); - if (Error) - return Error; - Error = Init(TFile(), pagesize, recsig); - if (Error) { - FileManip.Close(); - unlink(fname); - } - return Error; - } - - int Init(TAutoPtr<IOutputStream> output, size_t pagesize, ui32 recsig) { - Memo = 0; - if (FileManip.IsOpen()) { - return MBDB_ALREADY_INITIALIZED; - } - - if (!output) { - Eof = Error = 0; - Pagesize = pagesize; - Recordsig = recsig; - Memo = 1; - return 0; - } - - Error = FileManip.Open(output); - if (Error) - return Error; - Error = Init(TFile(), pagesize, recsig); - if (Error) { - FileManip.Close(); - } - return Error; - } - - int Init(const TFile& file, size_t pagesize, ui32 recsig) { - Memo = 0; - if (!file.IsOpen() && !FileManip.IsOpen()) - return MBDB_NOT_INITIALIZED; - if (file.IsOpen() && FileManip.IsOpen()) - return MBDB_ALREADY_INITIALIZED; - if (file.IsOpen()) { - Error = FileManip.Init(file); - if (Error) - return Error; - } - - Eof = 1; - TTempBuf buf(METASIZE + FS_BLOCK_SIZE); - const char* ptr = (buf.Data() + FS_BLOCK_SIZE - ((ui64)buf.Data() & (FS_BLOCK_SIZE - 1))); - TDatMetaPage* meta = (TDatMetaPage*)ptr; - - memset(buf.Data(), 0, buf.Size()); - meta->MetaSig = METASIG; - meta->PageSize = Pagesize = pagesize; - meta->RecordSig = Recordsig = recsig; - - ssize_t size = METASIZE, ret = 0; - while (size && (ret = FileManip.Write(ptr, (unsigned)size)) > 0) { - size -= ret; - ptr += ret; - } - if (size || ret <= 0) { - Term(); - return Error = errno ? errno : MBDB_WRITE_ERROR; - } - - Error = Eof = 0; - return Error; - } - -protected: - int WritePages(iovec* vec, int nvec) { - if (Error || Memo) - return Error; - - ssize_t size, delta; - iovec* pvec; - int vsize; - - for (vsize = 0, pvec = vec; vsize < nvec; vsize++, pvec++) - for (size = 0; (size_t)size < pvec->iov_len; size += Pagesize) - ((TDatPage*)((char*)pvec->iov_base + size))->PageSig = PAGESIG; - - delta = size = 0; - pvec = vec; - vsize = nvec; - while (vsize && (size = Writev(FileManip, pvec, (int)Min(vsize, 16))) > 0) { - if (delta) { - size += delta; - pvec->iov_len += delta; - pvec->iov_base = (char*)pvec->iov_base - delta; - delta = 0; - } - while (size) { - if ((size_t)size >= pvec->iov_len) { - size -= pvec->iov_len; - ++pvec; - --vsize; - } else { - delta = size; - pvec->iov_len -= size; - pvec->iov_base = (char*)pvec->iov_base + size; - size = 0; - } - } - } - if (delta) { - pvec->iov_len += delta; - pvec->iov_base = (char*)pvec->iov_base - delta; - } - return Error = (!size && !vsize) ? 0 : errno ? errno : MBDB_WRITE_ERROR; - } - - i64 Tell() { - return FileManip.RealSeek(0, SEEK_CUR); - } - - int GotoPage(int pageno) { - if (Error || Memo) - return Error; - Eof = 0; - i64 offset = (i64)pageno * Pagesize + METASIZE; - if (offset != FileManip.Seek(offset, SEEK_SET)) - Error = MBDB_BAD_FILE_SIZE; - return Error; - } - - int Term() { - int ret = FileManip.Close(); - Eof = 1; - Memo = 0; - if (!Error) - Error = ret; - return Error; - } - - size_t Pagesize; - int Eof; - int Error; - int Memo; - ui32 Recordsig; - -private: - TFileManip FileManip; -}; - -using TOutputPageFile = TOutputPageFileImpl<TOutputFileManip>; - -template <class TVal, - typename TBaseRecIter = TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>>> -class TOutDatFileImpl: public TBaseRecIter { -public: - typedef TBaseRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TPageIter::TWriter TWriter; - - int Open(const char* fname, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1, bool direct = false) { - int ret = TWriter::Init(fname, pagesize, TVal::RecordSig, direct); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Open(const TFile& file, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { - int ret = TWriter::Init(file, pagesize, TVal::RecordSig); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Open(TAutoPtr<IOutputStream> output, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) { - int ret = TWriter::Init(output, pagesize, TVal::RecordSig); - return ret ? ret : Open2(pages, pagesOrBytes); - } - - int Close() { - int ret1 = TRecIter::Term(); - int ret2 = TPageIter::Term(); - int ret3 = TWriter::Term(); - return ret1 ? ret1 : ret2 ? ret2 : ret3; - } - -private: - int Open2(size_t pages, int pagesOrBytes) { - int ret = TPageIter::Init(pages, pagesOrBytes); - if (!ret) - ret = TRecIter::Init(); - if (ret) - Close(); - return ret; - } -}; - -template <class TVal> -class TOutIndexFile: public TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> { - typedef TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> - TDatFile; - typedef TOutIndexFile<TVal> TMyType; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - -public: - TOutIndexFile() { - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { - int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); - if (ret) - return ret; - if ((ret = TRecIter::GotoPage(1))) { - TDatFile::Close(); - return ret; - } - Index0.Clear(); - return ret; - } - - int Close() { - TPageIter::Unfreeze(); - if (TRecIter::RecNum) { - TRecIter::Flush(); - NextPage(TPageIter::Current()); - } - int ret = 0; - if (Index0.Size() && !(ret = TRecIter::GotoPage(0))) { - const char* ptr = Index0.Begin(); - size_t recSize; - while (ptr < Index0.End()) { - Y_ASSERT((size_t)(Index0.End() - ptr) >= sizeof(size_t)); - memcpy(&recSize, ptr, sizeof(size_t)); - ptr += sizeof(size_t); - Y_ASSERT((size_t)(Index0.End() - ptr) >= recSize); - ui8* buf = (ui8*)TRecIter::Reserve(recSize); - if (!buf) { - ret = MBDB_PAGE_OVERFLOW; - break; - } - memcpy(buf, ptr, recSize); - TRecIter::ResetDat(); - ptr += recSize; - } - Index0.Clear(); - ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); - } - int ret1 = TDatFile::Close(); - return ret ? ret : ret1; - } - -protected: - TBuffer Index0; - - void NextPage(const TDatPage* page) { - const TVal* first = (const TVal*)NMicroBDB::GetFirstRecord(page); - size_t sz; - if (!TExtInfoType<TVal>::Exists) { - sz = SizeOf(first); - } else { - size_t ll; - size_t l; - sz = NMicroBDB::SizeOfExt(first, &ll, &l); - sz += ll + l; - } - Index0.Append((const char*)&sz, sizeof(size_t)); - Index0.Append((const char*)first, sz); - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; - -template <class TVal, class TKey, typename TCompressor = TFakeCompression, class TPageFile = TOutputPageFile> -class TOutDirectFileImpl: public TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> { - typedef TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> - TDatFile; - typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TMyType; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - typedef TOutIndexFile<TKey> TKeyFile; - -public: - TOutDirectFileImpl() { - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, int pages = 1, size_t ipagesize = 0, size_t ipages = 1, int pagesOrBytes = 1) { - char iname[FILENAME_MAX]; - int ret; - if (ipagesize == 0) - ipagesize = pagesize; - ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); - ret = ret ? ret : DatNameToIdx(iname, fname); - ret = ret ? ret : KeyFile.Open(iname, ipagesize, ipages, pagesOrBytes); - if (ret) - TDatFile::Close(); - return ret; - } - - int Close() { - if (TRecIter::RecNum) { - TRecIter::Flush(); - NextPage(TPageIter::Current()); - } - int ret = KeyFile.Close(); - int ret1 = TDatFile::Close(); - return ret1 ? ret1 : ret; - } - - int GetError() const { - return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); - } - -protected: - TKeyFile KeyFile; - - void NextPage(const TDatPage* page) { - typedef TMakeExtKey<TVal, TKey> TMakeExtKey; - - TVal* val = (TVal*)NMicroBDB::GetFirstRecord(page); - TKey key; - if (!TMakeExtKey::Exists) { - TMakeExtKey::Make(&key, nullptr, val, nullptr); - KeyFile.Push(&key); - } else { - size_t ll; - size_t l; - size_t sz = NMicroBDB::SizeOfExt(val, &ll, &l); - typename TExtInfoType<TVal>::TResult valExt; - if (TExtInfoType<TVal>::Exists) - Y_PROTOBUF_SUPPRESS_NODISCARD valExt.ParseFromArray((ui8*)val + sz + ll, l); - typename TExtInfoType<TKey>::TResult keyExt; - TMakeExtKey::Make(&key, &keyExt, val, &valExt); - KeyFile.Push(&key, &keyExt); - } - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; diff --git a/library/cpp/microbdb/powersorter.h b/library/cpp/microbdb/powersorter.h deleted file mode 100644 index c40de9c23f0..00000000000 --- a/library/cpp/microbdb/powersorter.h +++ /dev/null @@ -1,667 +0,0 @@ -#pragma once - -#include "safeopen.h" - -#include <util/generic/vector.h> -#include <util/generic/deque.h> -#include <util/system/mutex.h> -#include <util/system/condvar.h> -#include <util/thread/pool.h> - -template < - class TRecord, - template <typename T> class TCompare, - class TSieve, - class TMemoFile = TOutDatFile<TRecord>> -class TDatSorterBuf { -public: - typedef TRecord TRec; - typedef TVector<TRec*> TVectorType; - typedef TMemoFile TMemo; - typedef TCompare<TRecord> TComp; - -public: - TDatSorterBuf(size_t memory, size_t pageSize) - : Memo("memo", pageSize, memory, 0) - , Cur() - { - Memo.Open(nullptr); - Memo.Freeze(); - } - - ~TDatSorterBuf() { - Vector.clear(); - Memo.Close(); - } - - const TRec* Push(const TRec* v) { - const TRec* u = Memo.Push(v); - if (u) - Vector.push_back((TRec*)u); - return u; - } - - const TRec* Next() { - if (Ptr == Vector.end()) { - if (Cur) - TSieve::Sieve(Cur, Cur); - Cur = nullptr; - } else { - Cur = *Ptr++; - if (!TIsSieveFake<TSieve>::Result) - while (Ptr != Vector.end() && TSieve::Sieve(Cur, *Ptr)) - ++Ptr; - } - return Cur; - } - - const TRec* Current() { - return Cur; - } - - size_t Size() { - return Vector.size(); - } - - void Sort() { - Ptr = Vector.begin(); - Cur = nullptr; - - MBDB_SORT_FUN(Vector.begin(), Vector.end(), TComp()); - } - - void Clear() { - Vector.clear(); - Memo.Freeze(); - Ptr = Vector.begin(); - Cur = nullptr; - } - -private: - TVectorType Vector; - TMemo Memo; - - typename TVectorType::iterator - Ptr; - TRec* Cur; -}; - -template < - class TRecord, - class TInput, - template <typename T> class TCompare, - class TSieve> -class TDatMerger { -public: - typedef TRecord TRec; - typedef TCompare<TRecord> TComp; - typedef TSimpleSharedPtr<TInput> TInputPtr; - typedef TVector<TInputPtr> TInputVector; - -public: - ~TDatMerger() { - Close(); - } - - void Init(const TInputVector& inputs) { - Inputs = inputs; - TVector<TInput*> v; - for (int i = 0; i < Inputs.ysize(); ++i) - v.push_back(Inputs[i].Get()); - HeapIter.Init(&v[0], v.size()); - if (!TIsSieveFake<TSieve>::Result) - PNext = HeapIter.Next(); - } - - const TRec* Next() { - if (TIsSieveFake<TSieve>::Result) { - return HeapIter.Next(); - } - - if (!PNext) { - if (PCur) { - TSieve::Sieve(PCur, PCur); - PCur = nullptr; - } - return nullptr; - } - - PCur = &Cur; - memcpy(PCur, PNext, SizeOf((const TRec*)PNext)); - - do { - PNext = HeapIter.Next(); - } while (PNext && TSieve::Sieve(PCur, PNext)); - - return PCur; - } - - const TRec* Current() { - return (TIsSieveFake<TSieve>::Result ? HeapIter.Current() : PCur); - } - - void Close() { - Inputs.clear(); - HeapIter.Term(); - } - -private: - TInputVector Inputs; - THeapIter<TRec, TInput, TComp> HeapIter; - TRec Cur; - TRec* PCur = nullptr; - const TRec* PNext = nullptr; -}; - -class TPortionManager { -public: - void Open(const char* tempDir) { - TGuard<TMutex> guard(Mutex); - TempDir = tempDir; - } - - TString Next() { - TGuard<TMutex> guard(Mutex); - if (Portions == 0) - DoOpen(); - TString fname = GeneratePortionFilename(Portions++); - return fname; - } - - void Close() { - TGuard<TMutex> guard(Mutex); - Portions = 0; - } - -private: - void DoOpen() { - if (MakeSorterTempl(PortionFilenameTempl, TempDir.data())) { - PortionFilenameTempl[0] = 0; - ythrow yexception() << "portion-manager: bad tempdir \"" << TempDir.data() << "\": " << LastSystemErrorText(); - } - } - - TString GeneratePortionFilename(int i) { - char str[FILENAME_MAX]; - snprintf(str, sizeof(str), PortionFilenameTempl, i); - return TString(str); - } - -private: - TMutex Mutex; - - TString TempDir; - char PortionFilenameTempl[FILENAME_MAX] = {}; - int Portions = 0; -}; - -// A merger powered by threads -template < - class TRecord, - template <typename T> class TCompare, - class TSieve, - class TInput = TInDatFile<TRecord>, - class TOutput = TOutDatFile<TRecord>> -class TPowerMerger { -public: - typedef TRecord TRec; - typedef TDatMerger<TRecord, TInput, TCompare, TSieve> TMerger; - typedef TSimpleSharedPtr<TMerger> TMergerPtr; - typedef TPowerMerger<TRecord, TCompare, TSieve, TInput, TOutput> TFileMerger; - - struct TMergePortionTask: public IObjectInQueue { - TFileMerger* FileMerger; - int Begin; - int End; - TString OutFname; - - TMergePortionTask(TFileMerger* fileMerger, int begin, int end, const TString& outFname) - : FileMerger(fileMerger) - , Begin(begin) - , End(end) - , OutFname(outFname) - { - } - - void Process(void*) override { - THolder<TMergePortionTask> This(this); - //fprintf(stderr, "MergePortion: (%i, %i, %s)\n", Begin, End, ~OutFname); - FileMerger->MergePortion(Begin, End, OutFname); - } - }; - -public: - TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const TSimpleSharedPtr<TPortionManager>& portMan, - int memory, int pageSize, bool autoUnlink) - : MtpQueue(mtpQueue) - , PortionManager(portMan) - , Memory(memory) - , PageSize(pageSize) - , AutoUnlink(autoUnlink) - { - } - - TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const char* tempDir, - int memory, int pageSize, bool autoUnlink) - : MtpQueue(mtpQueue) - , PortionManager(new TPortionManager) - , Memory(memory) - , PageSize(pageSize) - , AutoUnlink(autoUnlink) - { - PortionManager->Open(tempDir); - } - - ~TPowerMerger() { - Close(); - } - - void SetMtpQueue(const TSimpleSharedPtr<TThreadPool>& mtpQueue) { - MtpQueue = mtpQueue; - } - - void MergePortion(int begin, int end, const TString& outFname) { - TMerger merger; - InitMerger(merger, begin, end); - - TOutput out("mergeportion-tmpout", PageSize, BufSize, 0); - out.Open(outFname.data()); - const TRec* rec; - while ((rec = merger.Next())) - out.Push(rec); - out.Close(); - - merger.Close(); - - { - TGuard<TMutex> guard(Mutex); - UnlinkFiles(begin, end); - Files.push_back(outFname); - --Tasks; - TaskFinishedCond.Signal(); - } - } - - void Add(const TString& fname) { - TGuard<TMutex> guard(Mutex); - // fprintf(stderr, "TPowerMerger::Add: %s\n", ~fname); - Files.push_back(fname); - if (InitialFilesEnd > 0) - ythrow yexception() << "TPowerMerger::Add: no more files allowed"; - } - - void Merge(int maxPortions) { - TGuard<TMutex> guard(Mutex); - InitialFilesEnd = Files.ysize(); - if (!InitialFilesEnd) - ythrow yexception() << "TPowerMerger::Merge: no files added"; - Optimize(maxPortions); - MergeMT(); - InitMerger(Merger, CPortions, Files.ysize()); - } - - void Close() { - TGuard<TMutex> guard(Mutex); - Merger.Close(); - UnlinkFiles(CPortions, Files.ysize()); - InitialFilesEnd = CPortions = 0; - Files.clear(); - } - - const TRec* Next() { - return Merger.Next(); - } - - const TRec* Current() { - return Merger.Current(); - } - - int FileCount() const { - TGuard<TMutex> guard(Mutex); - return Files.ysize(); - } - -private: - void InitMerger(TMerger& merger, int begin, int end) { - TGuard<TMutex> guard(Mutex); - TVector<TSimpleSharedPtr<TInput>> inputs; - for (int i = begin; i < end; ++i) { - inputs.push_back(new TInput("mergeportion-tmpin", BufSize, 0)); - inputs.back()->Open(Files[i]); - // fprintf(stderr, "InitMerger: %i, %s\n", i, ~Files[i]); - } - merger.Init(inputs); - } - - void UnlinkFiles(int begin, int end) { - TGuard<TMutex> guard(Mutex); - for (int i = begin; i < end; ++i) { - if (i >= InitialFilesEnd || AutoUnlink) - unlink(Files[i].c_str()); - } - } - - void Optimize(int maxPortions, size_t maxBufSize = 4u << 20) { - TGuard<TMutex> guard(Mutex); - maxPortions = std::min(maxPortions, Memory / PageSize - 1); - maxBufSize = std::max((size_t)PageSize, maxBufSize); - - if (maxPortions <= 2) { - FPortions = MPortions = 2; - BufSize = PageSize; - return; - } - - int Portions = Files.ysize(); - if (maxPortions >= Portions) { - FPortions = MPortions = Portions; - } else if (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) { - while (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) - --maxPortions; - MPortions = ++maxPortions; - int total = ((Portions + MPortions - 1) / MPortions) + Portions; - FPortions = (total % MPortions) ? (total % MPortions) : (int)MPortions; - } else - FPortions = MPortions = maxPortions; - - BufSize = std::min((size_t)(Memory / (MPortions + 1)), maxBufSize); - // fprintf(stderr, "Optimize: Portions=%i; MPortions=%i; FPortions=%i; Memory=%i; BufSize=%i\n", - // (int)Portions, (int)MPortions, (int)FPortions, (int)Memory, (int)BufSize); - } - - void MergeMT() { - TGuard<TMutex> guard(Mutex); - do { - int n; - while ((n = Files.ysize() - CPortions) > MPortions) { - int m = std::min((CPortions == 0 ? (int)FPortions : (int)MPortions), n); - TString fname = PortionManager->Next(); - if (!MtpQueue->Add(new TMergePortionTask(this, CPortions, CPortions + m, fname))) - ythrow yexception() << "TPowerMerger::MergeMT: failed to add task"; - CPortions += m; - ++Tasks; - } - if (Tasks > 0) - TaskFinishedCond.Wait(Mutex); - } while (Tasks > 0); - } - -private: - TMutex Mutex; - TCondVar TaskFinishedCond; - - TMerger Merger; - TSimpleSharedPtr<TThreadPool> MtpQueue; - TSimpleSharedPtr<TPortionManager> PortionManager; - TVector<TString> Files; - int Tasks = 0; - int InitialFilesEnd = 0; - int CPortions = 0; - int MPortions = 0; - int FPortions = 0; - int Memory = 0; - int PageSize = 0; - int BufSize = 0; - bool AutoUnlink = false; -}; - -// A sorter powered by threads -template < - class TRecord, - template <typename T> class TCompare, - class TSieve = TFakeSieve<TRecord>, - class TTmpInput = TInDatFile<TRecord>, - class TTmpOutput = TOutDatFile<TRecord>> -class TPowerSorter { -public: - typedef TPowerSorter<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TSorter; - typedef TRecord TRec; - typedef TTmpOutput TTmpOut; - typedef TTmpInput TTmpIn; - typedef TDatSorterBuf<TRecord, TCompare, TSieve> TSorterBuf; - typedef TCompare<TRecord> TComp; - typedef TPowerMerger<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TFileMerger; - - struct TSortPortionTask: public IObjectInQueue { - TSorter* Sorter; - TSorterBuf* SorterBuf; - int Portion; - - TSortPortionTask(TSorter* sorter, TSorterBuf* sorterBuf, int portion) - : Sorter(sorter) - , SorterBuf(sorterBuf) - , Portion(portion) - { - } - - void Process(void*) override { - TAutoPtr<TSortPortionTask> This(this); - // fprintf(stderr, "SortPortion: %i\n", Portion); - Sorter->SortPortion(SorterBuf); - } - }; - - class TSorterBufQueue { - private: - TMutex Mutex; - TCondVar Cond; - TVector<TSimpleSharedPtr<TSorterBuf>> V; - TDeque<TSorterBuf*> Q; - - int Memory, PageSize, MaxSorterBufs; - - public: - TSorterBufQueue(int memory, int pageSize, int maxSorterBufs) - : Memory(memory) - , PageSize(pageSize) - , MaxSorterBufs(maxSorterBufs) - { - } - - void Push(TSorterBuf* sb) { - TGuard<TMutex> guard(Mutex); - sb->Clear(); - Q.push_back(sb); - Cond.Signal(); - } - - TSorterBuf* Pop() { - TGuard<TMutex> guard(Mutex); - if (!Q.size() && V.ysize() < MaxSorterBufs) { - V.push_back(new TSorterBuf(Memory / MaxSorterBufs, PageSize)); - return V.back().Get(); - } else { - while (!Q.size()) - Cond.Wait(Mutex); - TSorterBuf* t = Q.front(); - Q.pop_front(); - return t; - } - } - - void Clear() { - TGuard<TMutex> guard(Mutex); - Q.clear(); - V.clear(); - } - - void WaitAll() { - TGuard<TMutex> guard(Mutex); - while (Q.size() < V.size()) { - Cond.Wait(Mutex); - } - } - - int GetMaxSorterBufs() const { - return MaxSorterBufs; - } - }; - -public: - TPowerSorter(const TSimpleSharedPtr<TThreadPool>& mtpQueue, size_t maxSorterBufs, - const char* name, size_t memory, size_t pageSize, size_t bufSize) - : MaxSorterBufs(maxSorterBufs) - , Name(name) - , Memory(memory) - , PageSize(pageSize) - , BufSize(bufSize) - , MtpQueue(mtpQueue) - , PortionManager(new TPortionManager) - , SBQueue(Memory, PageSize, MaxSorterBufs) - , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) - { - } - - TPowerSorter(size_t maxSorterBufs, - const char* name, size_t memory, size_t pageSize, size_t bufSize) - : MaxSorterBufs(maxSorterBufs) - , Name(name) - , Memory(memory) - , PageSize(pageSize) - , BufSize(bufSize) - , PortionManager(new TPortionManager) - , SBQueue(Memory, PageSize, maxSorterBufs) - , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) - { - } - - TPowerSorter(const char* name, size_t memory, size_t pageSize, size_t bufSize) - : MaxSorterBufs(5) - , Name(name) - , Memory(memory) - , PageSize(pageSize) - , BufSize(bufSize) - , PortionManager(new TPortionManager) - , SBQueue(Memory, PageSize, MaxSorterBufs) - , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true) - { - } - - ~TPowerSorter() { - Close(); - } - - void Open(const char* tempDir) { - Close(); - CurSB = SBQueue.Pop(); - PortionManager->Open(tempDir); - } - - void Reopen(const char* fname) { - Open(fname); - } - - void Close() { - CurSB = nullptr; - SBQueue.Clear(); - PortionCount = 0; - FileMerger.Close(); - PortionManager->Close(); - } - - const TRec* Push(const TRec* v) { - CheckOpen("Push"); - const TRec* u = CurSB->Push(v); - if (!u) { - NextPortion(); - u = CurSB->Push(v); - } - return u; - } - - void Sort(int maxPortions = 1000) { - CheckOpen("Sort"); - if (!PortionCount) { - CurSB->Sort(); - } else { - NextPortion(); - SBQueue.Push(CurSB); - CurSB = nullptr; - SBQueue.WaitAll(); - SBQueue.Clear(); - FileMerger.Merge(maxPortions); - } - } - - const TRec* Next() { - return PortionCount ? FileMerger.Next() : CurSB->Next(); - } - - const TRec* Current() { - return PortionCount ? FileMerger.Current() : CurSB->Current(); - } - - int GetBufSize() const { - return BufSize; - } - - int GetPageSize() const { - return PageSize; - } - - const char* GetName() const { - return Name.data(); - } - -private: - void CheckOpen(const char* m) { - if (!CurSB) - ythrow yexception() << "TPowerSorter::" << m << ": the sorter is not open"; - } - - void NextPortion() { - if (!CurSB->Size()) - return; - ++PortionCount; - if (MaxSorterBufs <= 1) { - SortPortion(CurSB); - } else { - if (!MtpQueue.Get()) { - MtpQueue.Reset(new TThreadPool); - MtpQueue->Start(MaxSorterBufs - 1); - FileMerger.SetMtpQueue(MtpQueue); - } - if (!MtpQueue->Add(new TSortPortionTask(this, CurSB, PortionCount))) - ythrow yexception() << "TPowerSorter::NextPortion: failed to add task"; - } - CurSB = SBQueue.Pop(); - } - - void SortPortion(TSorterBuf* sorterBuf) { - TString portionFilename = PortionManager->Next(); - try { - sorterBuf->Sort(); - - // fprintf(stderr, "TPowerSorter::SortPortion: -> %s\n", ~portionFilename); - TTmpOut out("powersorter-portion", PageSize, BufSize, 0); - out.Open(portionFilename.data()); - - while (sorterBuf->Next()) - out.Push(sorterBuf->Current()); - - out.Close(); - FileMerger.Add(portionFilename); - SBQueue.Push(sorterBuf); - } catch (const yexception& e) { - unlink(portionFilename.data()); - ythrow yexception() << "SortPortion: " << e.what(); - } - } - -private: - int MaxSorterBufs = 0; - TString Name; - int Memory = 0; - int PageSize = 0; - int BufSize = 0; - - TMutex Mutex; - TSimpleSharedPtr<TThreadPool> MtpQueue; - TSimpleSharedPtr<TPortionManager> PortionManager; - - TSorterBufQueue SBQueue; - TSorterBuf* CurSB = nullptr; - int PortionCount = 0; - - TFileMerger FileMerger; -}; diff --git a/library/cpp/microbdb/reader.h b/library/cpp/microbdb/reader.h deleted file mode 100644 index 694a2f17662..00000000000 --- a/library/cpp/microbdb/reader.h +++ /dev/null @@ -1,354 +0,0 @@ -#pragma once - -#include "align.h" -#include "header.h" -#include "extinfo.h" - -#include <contrib/libs/zlib/zlib.h> -#include <contrib/libs/fastlz/fastlz.h> -#include <contrib/libs/snappy/snappy.h> - -#include <util/generic/vector.h> -#include <util/memory/tempbuf.h> - -namespace NMicroBDB { - static const size_t DEFAULT_BUFFER_SIZE = (64 << 10); - - //! - template <class TVal> - class IBasePageReader { - public: - virtual size_t GetRecSize() const = 0; - virtual size_t GetExtSize() const = 0; - virtual bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const = 0; - virtual const ui8* GetExtInfoRaw(size_t* len) const = 0; - virtual const TVal* Next() = 0; - virtual void Reset() = 0; - //! set clearing flag, so temporary buffers will be cleared - //! in next call of Next() - virtual void SetClearFlag() { - } - - virtual ~IBasePageReader() { - } - }; - - template <class TVal, typename TPageIter> - class TRawPageReader: public IBasePageReader<TVal> { - public: - TRawPageReader(TPageIter* const iter) - : PageIter(iter) - { - Reset(); - } - - bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { - Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); - if (!Rec) - return false; - ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; - return extInfo->ParseFromArray(raw, ExtSize); - } - - size_t GetRecSize() const override { - return RecSize + ExtLenSize; - } - - size_t GetExtSize() const override { - return ExtSize; - } - - const ui8* GetExtInfoRaw(size_t* len) const override { - Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); - if (!Rec) { - *len = 0; - return nullptr; - } - *len = ExtLenSize + ExtSize; - return (ui8*)Rec + RecSize; - } - - const TVal* Next() override { - if (!Rec) - Rec = (TVal*)((char*)PageIter->Current() + sizeof(TDatPage)); - else - Rec = (TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); - if (!TExtInfoType<TVal>::Exists) - RecSize = SizeOf(Rec); - else - RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); - return Rec; - } - - void Reset() override { - Rec = nullptr; - RecSize = 0; - ExtLenSize = 0; - ExtSize = 0; - } - - private: - const TVal* Rec; - size_t RecSize; - size_t ExtLenSize; - size_t ExtSize; - TPageIter* const PageIter; - }; - - template <class TVal, typename TPageIter> - class TCompressedReader: public IBasePageReader<TVal> { - inline size_t GetFirstRecordSize(const TVal* const in) const { - if (!TExtInfoType<TVal>::Exists) { - return DatCeil(SizeOf(in)); - } else { - size_t ll; - size_t l; - size_t ret = SizeOfExt(in, &ll, &l); - - return DatCeil(ret + ll + l); - } - } - - void DecompressBlock() { - if (PageIter->IsFrozen() && Buffer.Get()) - Blocks.push_back(Buffer.Release()); - - const TCompressedHeader* hdr = (const TCompressedHeader*)(Page); - - Page += sizeof(TCompressedHeader); - - const size_t first = GetFirstRecordSize((const TVal*)Page); - - if (!Buffer.Get() || Buffer->Size() < hdr->Original) - Buffer.Reset(new TTempBuf(Max<size_t>(hdr->Original, DEFAULT_BUFFER_SIZE))); - - memcpy(Buffer->Data(), Page, first); - Page += first; - - if (hdr->Count > 1) { - switch (Algo) { - case MBDB_COMPRESSION_ZLIB: { - uLongf dst = hdr->Original - first; - - int ret = uncompress((Bytef*)Buffer->Data() + first, &dst, Page, hdr->Compressed); - - if (ret != Z_OK) - ythrow yexception() << "error then uncompress " << ret; - } break; - case MBDB_COMPRESSION_FASTLZ: { - int dst = hdr->Original - first; - int ret = yfastlz_decompress(Page, hdr->Compressed, Buffer->Data() + first, dst); - - if (!ret) - ythrow yexception() << "error then uncompress"; - } break; - case MBDB_COMPRESSION_SNAPPY: { - if (!snappy::RawUncompress((const char*)Page, hdr->Compressed, Buffer->Data() + first)) - ythrow yexception() << "error then uncompress"; - } break; - } - } - - Rec = nullptr; - RecNum = hdr->Count; - Page += hdr->Compressed; - } - - void ClearBuffer() { - for (size_t i = 0; i < Blocks.size(); ++i) - delete Blocks[i]; - Blocks.clear(); - ClearFlag = false; - } - - public: - TCompressedReader(TPageIter* const iter) - : Rec(nullptr) - , RecSize(0) - , ExtLenSize(0) - , ExtSize(0) - , Page(nullptr) - , PageIter(iter) - , RecNum(0) - , BlockNum(0) - , ClearFlag(false) - { - } - - ~TCompressedReader() override { - ClearBuffer(); - } - - size_t GetRecSize() const override { - return RecSize + ExtLenSize; - } - - size_t GetExtSize() const override { - return ExtSize; - } - - bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override { - Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); - if (!Rec) - return false; - ui8* raw = (ui8*)Rec + RecSize + ExtLenSize; - return extInfo->ParseFromArray(raw, ExtSize); - } - - const ui8* GetExtInfoRaw(size_t* len) const override { - Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records"); - if (!Rec) { - *len = 0; - return nullptr; - } - *len = ExtLenSize + ExtSize; - return (ui8*)Rec + RecSize; - } - - const TVal* Next() override { - Y_ASSERT(RecNum >= 0); - - if (ClearFlag) - ClearBuffer(); - - if (!Page) { - if (!PageIter->Current()) - return nullptr; - - Page = (ui8*)PageIter->Current() + sizeof(TDatPage); - - BlockNum = ((TCompressedPage*)Page)->BlockCount - 1; - Algo = (ECompressionAlgorithm)((TCompressedPage*)Page)->Algorithm; - Page += sizeof(TCompressedPage); - - DecompressBlock(); - } - - if (!RecNum) { - if (BlockNum <= 0) - return nullptr; - else { - --BlockNum; - DecompressBlock(); - } - } - - --RecNum; - if (!Rec) - Rec = (const TVal*)Buffer->Data(); - else - Rec = (const TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize)); - - if (!TExtInfoType<TVal>::Exists) - RecSize = SizeOf(Rec); - else - RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize); - - return Rec; - } - - void Reset() override { - Page = nullptr; - BlockNum = 0; - Rec = nullptr; - RecSize = 0; - ExtLenSize = 0; - ExtSize = 0; - RecNum = 0; - } - - void SetClearFlag() override { - ClearFlag = true; - } - - public: - THolder<TTempBuf> Buffer; - TVector<TTempBuf*> Blocks; - const TVal* Rec; - size_t RecSize; - size_t ExtLenSize; - size_t ExtSize; - const ui8* Page; - TPageIter* const PageIter; - int RecNum; //!< count of recs in current block - int BlockNum; - ECompressionAlgorithm Algo; - bool ClearFlag; - }; - - class TZLibCompressionImpl { - public: - static const ECompressionAlgorithm Code = MBDB_COMPRESSION_ZLIB; - - inline void Init() { - // - - } - - inline void Term() { - // - - } - - inline size_t CompressBound(size_t size) const noexcept { - return ::compressBound(size); - } - - inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { - uLongf size = outSize; - - if (compress((Bytef*)out, &size, (const Bytef*)in, inSize) != Z_OK) - ythrow yexception() << "not compressed"; - outSize = size; - } - }; - - class TFastlzCompressionImpl { - public: - static const ECompressionAlgorithm Code = MBDB_COMPRESSION_FASTLZ; - - inline void Init() { - // - - } - - inline void Term() { - // - - } - - inline size_t CompressBound(size_t size) const noexcept { - size_t rval = size_t(size * 1.07); - return rval < 66 ? 66 : rval; - } - - inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { - outSize = yfastlz_compress_level(2, in, inSize, out); - if (!outSize) - ythrow yexception() << "not compressed"; - } - }; - - class TSnappyCompressionImpl { - public: - static const ECompressionAlgorithm Code = MBDB_COMPRESSION_SNAPPY; - - inline void Init() { - // - - } - - inline void Term() { - // - - } - - inline size_t CompressBound(size_t size) const noexcept { - return snappy::MaxCompressedLength(size); - } - - inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) { - snappy::RawCompress((const char*)in, inSize, (char*)out, &outSize); - } - }; - -} - -using TFakeCompression = void; -using TZLibCompression = NMicroBDB::TZLibCompressionImpl; -using TFastlzCompression = NMicroBDB::TFastlzCompressionImpl; -using TSnappyCompression = NMicroBDB::TSnappyCompressionImpl; diff --git a/library/cpp/microbdb/safeopen.h b/library/cpp/microbdb/safeopen.h deleted file mode 100644 index c328ffd575a..00000000000 --- a/library/cpp/microbdb/safeopen.h +++ /dev/null @@ -1,792 +0,0 @@ -#pragma once - -// util -#include <util/generic/yexception.h> -#include <util/generic/vector.h> -#include <util/string/util.h> -#include <util/system/mutex.h> -#include <thread> - -#include "microbdb.h" - -#if defined(_MSC_VER) -#pragma warning(push) -#pragma warning(disable : 4706) /*assignment within conditional expression*/ -#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/ -#endif - -template <typename TVal, typename TPageFile = TInputPageFile, typename TIterator = TInputPageIterator<TPageFile>> -class TInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> { -public: - typedef TVal TRec; - typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> TBase; - - TInDatFile(const TString& name, size_t pages, int pagesOrBytes = 1) - : Name(name) - , Pages(pages) - , PagesOrBytes(pagesOrBytes) - { - } - - ~TInDatFile() { - Close(); - } - - void Open(const TString& fname, bool direct = false) { - ui32 gotRecordSig = 0; - int ret = TBase::Open(fname.data(), Pages, PagesOrBytes, &gotRecordSig, direct); - if (ret) { - // XXX: print record type name, not type sig - ythrow yexception() << ErrorMessage(ret, "Failed to open input file", fname, TVal::RecordSig, gotRecordSig); - } - Name = fname; - } - - void OpenStream(TAutoPtr<IInputStream> input) { - ui32 gotRecordSig = 0; - int ret = TBase::Open(input, Pages, PagesOrBytes, &gotRecordSig); - if (ret) { - // XXX: print record type name, not type sig - ythrow yexception() << ErrorMessage(ret, "Failed to open input file", Name, TVal::RecordSig, gotRecordSig); - } - } - - void Close() { - int ret; - if (IsOpen() && (ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing input file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing input file", Name); - } - - const char* GetName() const { - return Name.data(); - } - - using TBase::Current; - using TBase::Freeze; - using TBase::GetError; - using TBase::GetExtInfo; - using TBase::GetExtInfoRaw; - using TBase::GetExtSize; - using TBase::GetLastPage; - using TBase::GetPageNum; - using TBase::GetPageSize; - using TBase::GetRecSize; - using TBase::GotoLastPage; - using TBase::GotoPage; - using TBase::IsEof; - using TBase::IsOpen; - using TBase::Next; - using TBase::Skip; - using TBase::Unfreeze; - -protected: - TString Name; - size_t Pages; - int PagesOrBytes; -}; - -template <typename TVal> -class TMappedInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> { -public: - typedef TVal TRec; - typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> TBase; - - TMappedInDatFile(const TString& name, size_t /* pages */, int /* pagesOrBytes */) - : Name(name) - { - } - - ~TMappedInDatFile() { - Close(); - } - - void Open(const TString& fname) { - int ret = TBase::Open(fname.data()); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open mapped file", fname, TVal::RecordSig); - Name = fname; - } - - void Close() { - int ret; - if (IsOpen() && (ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing mapped file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing mapped file", Name); - } - - const char* GetName() const { - return Name.data(); - } - - using TBase::Current; - using TBase::GetError; - using TBase::GetExtInfo; - using TBase::GetExtInfoRaw; - using TBase::GetLastPage; - using TBase::GetPageNum; - using TBase::GetPageSize; - using TBase::GotoLastPage; - using TBase::GotoPage; - using TBase::IsEof; - using TBase::IsOpen; - using TBase::Next; - using TBase::Skip; - -protected: - TString Name; -}; - -template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> -class TOutDatFile: protected TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> { -public: - typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> TBase; - - TOutDatFile(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : Name(name) - , PageSize(pagesize) - , Pages(pages) - , PagesOrBytes(pagesOrBytes) - { - } - - ~TOutDatFile() { - Close(); - } - - void Open(const char* fname, bool direct = false) { - int ret = TBase::Open(fname, PageSize, Pages, PagesOrBytes, direct); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); - Name = fname; - } - - void Open(const TString& fname) { - Open(fname.data()); - } - - void OpenStream(TAutoPtr<IOutputStream> output) { - int ret = TBase::Open(output, PageSize, Pages, PagesOrBytes); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open output stream", Name); - } - - void Close() { - int ret; - if ((ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); - } - - const char* GetName() const { - return Name.data(); - } - - using TBase::Freeze; - using TBase::GetError; - using TBase::GetPageSize; - using TBase::IsEof; - using TBase::IsOpen; - using TBase::Offset; - using TBase::Push; - using TBase::PushWithExtInfo; - using TBase::Reserve; - using TBase::Unfreeze; - -protected: - TString Name; - size_t PageSize, Pages; - int PagesOrBytes; -}; - -template <typename TVal, typename TCompressor, typename TPageFile> -class TOutDatFileArray; - -template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> -class TOutDatFileArray { - typedef TOutDatFile<TVal, TCompressor, TPageFile> TFileType; - -public: - TOutDatFileArray(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : Name(name) - , PageSize(pagesize) - , Pages(pages) - , PagesOrBytes(pagesOrBytes) - , NumFiles(0) - , Files(nullptr) - { - } - - ~TOutDatFileArray() { - for (int i = 0; i < NumFiles; ++i) { - Files[i].Close(); - Files[i].~TFileType(); - } - free(Files); - Files = nullptr; - NumFiles = 0; - } - - TFileType& operator[](size_t pos) { - return Files[pos]; - } - - void Open(int n, const TString& fname) { - char temp[FILENAME_MAX]; - - Name = fname; - NumFiles = CreateDatObjects(n, fname); - - int i; - try { - for (i = 0; i < NumFiles; ++i) { - sprintf(temp, fname.data(), i); - Files[i].Open(temp); - } - } catch (...) { - while (--i >= 0) - Files[i].Close(); - throw; - } - } - - template <typename TNameBuilder> - void OpenWithCallback(int n, const TNameBuilder& builder) { - NumFiles = CreateDatObjects(n, Name); - - for (int i = 0; i < NumFiles; ++i) - Files[i].Open(builder.GetName(i).data()); - } - - void Close() { - for (int i = 0; i < NumFiles; ++i) - Files[i].Close(); - } - - void CloseMT(ui32 threads) { - int current = 0; - TMutex mutex; - TVector<std::thread> thrs; - thrs.reserve(threads); - for (ui32 i = 0; i < threads; i++) { - thrs.emplace_back([this, ¤t, &mutex]() { - while (true) { - mutex.Acquire(); - int cur = current++; - mutex.Release(); - if (cur >= NumFiles) - break; - Files[cur].Close(); - } - }); - } - for (auto& thread : thrs) { - thread.join(); - } - } - - const char* GetName() const { - return Name.data(); - } - -protected: - int CreateDatObjects(int n, const TString& fname) { - if (!(Files = (TFileType*)malloc(n * sizeof(TFileType)))) - ythrow yexception() << "can't alloc \"" << fname << "\" file array: " << LastSystemErrorText(); - int num = 0; - char temp[FILENAME_MAX]; - for (int i = 0; i < n; ++i, ++num) { - sprintf(temp, "%s[%d]", fname.data(), i); - new (Files + i) TFileType(temp, PageSize, Pages, PagesOrBytes); - } - return num; - } - - TString Name; - size_t PageSize, Pages; - int PagesOrBytes, NumFiles; - TFileType* Files; -}; - -template <typename TVal, typename TKey, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile> -class TOutDirectFile: protected TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> { - typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TBase; - -public: - TOutDirectFile(const TString& name, size_t pagesize, size_t pages, size_t ipagesize, size_t ipages, int pagesOrBytes) - : Name(name) - , PageSize(pagesize) - , Pages(pages) - , IdxPageSize(ipagesize) - , IdxPages(ipages) - , PagesOrBytes(pagesOrBytes) - { - } - - ~TOutDirectFile() { - Close(); - } - - void Open(const TString& fname) { - int ret = TBase::Open(fname.data(), PageSize, Pages, IdxPageSize, IdxPages, PagesOrBytes); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); - Name = fname; - } - - void Close() { - int ret; - if ((ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); - } - - const char* GetName() const { - return Name.data(); - } - - using TBase::Freeze; - using TBase::Push; - using TBase::PushWithExtInfo; - using TBase::Reserve; - using TBase::Unfreeze; - -protected: - TString Name; - size_t PageSize, Pages, IdxPageSize, IdxPages; - int PagesOrBytes; -}; - -template < - typename TVal, - template <typename T> class TComparer, - typename TCompress = TFakeCompression, - typename TSieve = TFakeSieve<TVal>, - typename TPageFile = TOutputPageFile, - typename TFileTypes = TDefInterFileTypes> -class TDatSorter: protected TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> { - typedef TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> TBase; - -public: - typedef TVal TRec; - -public: - TDatSorter(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : Name(name) - , Memory(memory) - , PageSize(pagesize) - , Pages(pages) - , PagesOrBytes(pagesOrBytes) - { - Templ[0] = 0; - } - - ~TDatSorter() { - Close(); - Templ[0] = 0; - } - - void Open(const TString& dirName) { - int ret; - if (ret = MakeSorterTempl(Templ, dirName.data())) { - Templ[0] = 0; - ythrow yexception() << ErrorMessage(ret, Name + " sorter: bad tempdir", dirName); - } - if ((ret = TBase::Open(Templ, PageSize, Pages, PagesOrBytes))) - ythrow yexception() << ErrorMessage(ret, Name + " sorter: open error, temp dir", Templ); - } - - void Sort(bool direct = false) { - int ret = TBase::Sort(Memory, 1000, direct); - if (ret) - ythrow yexception() << ErrorMessage(ret, Name + " sorter: sort error, temp dir", Templ, TVal::RecordSig); - } - - void SortToFile(const TString& name) { - int ret = TBase::SortToFile(name.data(), Memory); - if (ret) - ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToFile", name, TVal::RecordSig); - } - - void SortToStream(TAutoPtr<IOutputStream> output) { - int ret = TBase::SortToStream(output, Memory); - if (ret) - ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToStream", "", TVal::RecordSig); - } - - void Close() { - int ret1 = TBase::GetError(); - int ret2 = TBase::Close(); - if (Templ[0]) { - *strrchr(Templ, GetDirectorySeparator()) = 0; - RemoveDirWithContents(Templ); - Templ[0] = 0; - } - if (ret1) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret1, Name + "sorter: error before closing"); - if (ret2) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret2, Name + "sorter: error while closing"); - } - - int Sort(size_t memory, int maxportions, bool direct = false) { - return TBase::Sort(memory, maxportions, direct); - } - - const char* GetName() const { - return Name.data(); - } - - using TBase::GetPageSize; - using TBase::GetPages; - using TBase::Next; - using TBase::NextPortion; - using TBase::Push; - using TBase::PushWithExtInfo; - using TBase::UseSegmentSorter; - -protected: - TString Name; - size_t Memory, PageSize, Pages; - int PagesOrBytes; - char Templ[FILENAME_MAX]; -}; - -template <typename TSorter> -class TSorterArray { -public: - typedef TSorter TDatSorter; - -public: - TSorterArray(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : Name(name) - , Memory(memory) - , PageSize(pagesize) - , Pages(pages) - , PagesOrBytes(pagesOrBytes) - , NumSorters(0) - , Sorters(nullptr) - { - } - - ~TSorterArray() { - for (int i = 0; i < NumSorters; ++i) { - Sorters[i].Close(); - Sorters[i].~TSorter(); - } - free(Sorters); - Sorters = nullptr; - NumSorters = 0; - } - - TSorter& operator[](size_t pos) { - return Sorters[pos]; - } - - void Open(int n, const TString& fname, size_t memory = 0) { - if (!(Sorters = (TSorter*)malloc(n * sizeof(TSorter)))) - ythrow yexception() << "can't alloc \"" << fname << "\" sorter array: " << LastSystemErrorText(); - NumSorters = n; - char temp[FILENAME_MAX]; - if (memory) - Memory = memory; - for (int i = 0; i < NumSorters; ++i) { - sprintf(temp, "%s[%d]", Name.data(), i); - new (Sorters + i) TSorter(temp, Memory, PageSize, Pages, PagesOrBytes); - } - for (int i = 0; i < NumSorters; ++i) - Sorters[i].Open(fname); - } - - void Close() { - for (int i = 0; i < NumSorters; ++i) - Sorters[i].Close(); - } - - const char* GetName() const { - return Name.data(); - } - -protected: - TString Name; - size_t Memory, PageSize, Pages; - int PagesOrBytes, NumSorters; - TSorter* Sorters; -}; - -template <typename TVal, template <typename T> class TCompare, typename TSieve = TFakeSieve<TVal>> -class TDatSorterArray: public TSorterArray<TDatSorter<TVal, TCompare, TSieve>> { -public: - TDatSorterArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : TSorterArray<TDatSorter<TVal, TCompare, TSieve>>(name, memory, pagesize, pages, pagesOrBytes) - { - } -}; - -template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, - typename TSieve = TFakeSieve<TVal>, typename TPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> -class TDatSorterMemo: public TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> { - typedef TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> TSorter; - -public: - TOutDatFile<TVal> Memo; - TString Home; - bool OpenReq; - bool Opened; - bool UseDirectWrite; - -public: - TDatSorterMemo(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : TSorter(name, memory, pagesize, pages, pagesOrBytes) - , Memo(name, pagesize, memory, 0) - { - OpenReq = false; - Opened = false; - UseDirectWrite = false; - } - - void Open(const TString& home) { - OpenReq = true; - // TSorter::Open(home); - Home = home; - Memo.Open(nullptr); - Memo.Freeze(); - } - - void Reopen(const char* home) { - Close(); - Open(home); - } - - void Open() { - if (!OpenReq) { - OpenReq = true; - Memo.Open(nullptr); - Memo.Freeze(); - } - } - - void OpenIfNeeded() { - if (OpenReq && !Opened) { - if (!Home) - ythrow yexception() << "Temp directory not specified, call Open(char*) first : " << TSorter::Name; - TSorter::Open(Home); - Opened = true; - } - } - - TVal* Reserve(size_t len) { - if (TExtInfoType<TVal>::Exists) - return ReserveWithExt(len, 0); - - TVal* u = Memo.Reserve(len); - if (!u) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Freeze(); - u = Memo.Reserve(len); - } - TSorter::PushWithExtInfo(u); - return u; - } - - TVal* ReserveWithExt(size_t len, size_t extSize) { - size_t fullLen = len + len_long((i64)extSize) + extSize; - TVal* u = Memo.Reserve(fullLen); - if (!u) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Freeze(); - u = Memo.Reserve(fullLen); - if (!u) { - if (fullLen > Memo.GetPageSize()) { - ythrow yexception() << "Size of element and " << len << " size of extInfo " << extSize - << " is larger than page size " << Memo.GetPageSize(); - } - ythrow yexception() << "going to insert a null pointer. Bad."; - } - } - out_long((i64)extSize, (char*)u + len); - TSorter::PushWithExtInfo(u); - return u; - } - - char* GetReservedExt(TVal* rec, size_t len, size_t extSize) { - return (char*)rec + len + len_long((i64)extSize); - } - - const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) { - const TVal* u = Memo.Push(v, extInfo); - if (!u) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Freeze(); - u = Memo.Push(v, extInfo); - if (!u) { - if (SizeOf(v) > Memo.GetPageSize()) { - ythrow yexception() << "Size of element " << SizeOf(v) - << " is larger than page size " << Memo.GetPageSize(); - } - ythrow yexception() << "going to insert a null pointer. Bad."; - } - } - TSorter::PushWithExtInfo(u); - return u; - } - - const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) { - const TVal* u = Memo.Push(v, extInfoRaw, extLen); - if (!u) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Freeze(); - u = Memo.Push(v, extInfoRaw, extLen); - if (!u) { - if (SizeOf(v) > Memo.GetPageSize()) { - ythrow yexception() << "Size of element " << SizeOf(v) - << " is larger than page size " << Memo.GetPageSize(); - } - ythrow yexception() << "going to insert a null pointer. Bad.."; - } - } - TSorter::PushWithExtInfo(u); - return u; - } - - const TVal* PushWithExtInfo(const TVal* v) { - const TVal* u = Memo.PushWithExtInfo(v); - if (!u) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Freeze(); - u = Memo.PushWithExtInfo(v); - if (!u) { - if (SizeOf(v) > Memo.GetPageSize()) { - ythrow yexception() << "Size of element " << SizeOf(v) - << " is larger than page size " << Memo.GetPageSize(); - } - ythrow yexception() << "going to insert a null pointer. Bad..."; - } - } - TSorter::PushWithExtInfo(u); - return u; - } - - void Sort(bool direct = false) { - if (Opened) { - TSorter::NextPortion(UseDirectWrite); - Memo.Close(); - OpenReq = false; - TSorter::Sort(direct); - } else { - TSorter::SortPortion(); - } - } - - const TVal* Next() { - return Opened ? TSorter::Next() : TSorter::Nextp(); - } - - bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { - return NMicroBDB::GetExtInfo(Current(), extInfo); - } - - const ui8* GetExtInfoRaw(size_t* len) const { - return NMicroBDB::GetExtInfoRaw(Current(), len); - } - - const TVal* Current() const { - return Opened ? TSorter::Current() : TSorter::Currentp(); - } - - int NextPortion() { - OpenIfNeeded(); - return TSorter::NextPortion(UseDirectWrite); - } - - void SortToFile(const char* name) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Close(); - OpenReq = false; - TSorter::SortToFile(name); - } - - void SortToStream(TAutoPtr<IOutputStream> output) { - OpenIfNeeded(); - TSorter::NextPortion(UseDirectWrite); - Memo.Close(); - OpenReq = false; - TSorter::SortToStream(output); - } - - template <typename TKey, typename TOutCompress> - void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { - Sort(); - TOutDirectFile<TVal, TKey, TOutCompress> out(TSorter::Name, TSorter::PageSize, TSorter::Pages, ipagesize, ipages, TSorter::PagesOrBytes); - out.Open(name); - while (const TVal* rec = Next()) - out.PushWithExtInfo(rec); - out.Close(); - } - - template <typename TKey> - void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) { - SortToDirectFile<TKey, TCompress>(name, ipagesize, ipages); - } - - void CloseSorter() { - if (Opened) - TSorter::Close(); - else - TSorter::Closep(); - Memo.Freeze(); - Opened = false; - } - - void Close() { - if (Opened) - TSorter::Close(); - else - TSorter::Closep(); - Memo.Close(); - OpenReq = false; - Opened = false; - } - - int SavePortions(const char* mask) { - return TSorter::SavePortions(mask, UseDirectWrite); - } - -public: - using TSorter::RestorePortions; -}; - -template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression, - typename TSieve = TFakeSieve<TVal>, class TPageFile = TOutputPageFile, class TFileTypes = TDefInterFileTypes> -class TDatSorterMemoArray: public TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> { -public: - typedef TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> TBase; - - TDatSorterMemoArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : TBase(name, memory, pagesize, pages, pagesOrBytes) - { - } -}; - -#if defined(_MSC_VER) -#pragma warning(pop) -#endif diff --git a/library/cpp/microbdb/sorter.h b/library/cpp/microbdb/sorter.h deleted file mode 100644 index b2e7390377d..00000000000 --- a/library/cpp/microbdb/sorter.h +++ /dev/null @@ -1,677 +0,0 @@ -#pragma once - -#include <util/ysaveload.h> -#include <util/generic/algorithm.h> -#include <contrib/libs/libc_compat/include/link/link.h> - -#include "header.h" -#include "heap.h" -#include "extinfo.h" -#include "input.h" -#include "output.h" - -#ifdef TEST_MERGE -#define MBDB_SORT_FUN ::StableSort -#else -#define MBDB_SORT_FUN ::Sort -#endif - -template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile, typename TFileTypes> -class TDatSorterImpl; - -template <class TVal> -struct TFakeSieve { - static inline int Sieve(TVal*, const TVal*) noexcept { - return 0; - } -}; - -template <class TSieve> -struct TIsSieveFake { - static const bool Result = false; -}; - -template <class T> -struct TIsSieveFake<TFakeSieve<T>> { - static const bool Result = true; -}; - -class TDefInterFileTypes { -public: - typedef TOutputPageFile TOutPageFile; - typedef TInputPageFile TInPageFile; -}; - -//class TCompressedInterFileTypes; - -template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> -class TDatSorterImplBase: protected THeapIter<TVal, TInDatFileImpl<TVal, TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>>>, TCompare> { - typedef TOutputRecordIterator<TVal, TOutputPageIterator<typename TFileTypes::TOutPageFile>, TFakeIndexer, TCompress> TTmpRecIter; - typedef TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>> TInTmpRecIter; - -public: - typedef TOutDatFileImpl<TVal, TTmpRecIter> TTmpOut; - typedef TInDatFileImpl<TVal, TInTmpRecIter> TTmpIn; - - typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TOutPageFile>, TFakeIndexer, TCompress>> TOut; - typedef THeapIter<TVal, TTmpIn, TCompare> TMyHeap; - typedef TVector<const TVal*> TMyVector; - typedef typename TMyVector::iterator TMyIterator; - - class IPortionSorter { - public: - virtual ~IPortionSorter() { - } - - virtual void Sort(TMyVector&, TTmpOut*) = 0; - }; - - class TDefaultSorter: public IPortionSorter { - public: - void Sort(TMyVector& vector, TTmpOut* out) override { - MBDB_SORT_FUN(vector.begin(), vector.end(), TCompare()); - - const typename TMyVector::const_iterator - end = (TIsSieveFake<TSieve>::Result) ? vector.end() : TDatSorterImplBase::SieveRange(vector.begin(), vector.end()); - - for (typename TMyVector::const_iterator it = vector.begin(); it != end; ++it) { - out->PushWithExtInfo(*it); - } - } - }; - - class TSegmentedSorter: public IPortionSorter { - class TAdaptor { - typedef typename TMyVector::const_iterator TConstIterator; - - public: - TAdaptor(TConstIterator b, TConstIterator e) - : Curr_(b) - , End_(e) - { - --Curr_; - } - - inline const TVal* Current() const { - return *Curr_; - } - - inline const TVal* Next() { - ++Curr_; - - if (Curr_ == End_) { - return nullptr; - } - - return *Curr_; - } - - private: - TConstIterator Curr_; - TConstIterator End_; - }; - - typedef THeapIter<TVal, TAdaptor, TCompare> TPortionsHeap; - - public: - void Sort(TMyVector& vector, TTmpOut* out) override { - TVector<TAdaptor> bounds; - typename TMyVector::iterator - it = vector.begin(); - const size_t portions = Max<size_t>(1, (vector.size() * sizeof(TVal)) / (4 << 20)); - const size_t step = vector.size() / portions; - - // Sort segments - while (it != vector.end()) { - const typename TMyVector::iterator - end = Min(it + step, vector.end()); - - MBDB_SORT_FUN(it, end, TCompare()); - - bounds.push_back(TAdaptor(it, end)); - - it = end; - } - - // - // Merge result - // - - TPortionsHeap heap(bounds); - - if (TIsSieveFake<TSieve>::Result) { - while (const TVal* val = heap.Next()) { - out->PushWithExtInfo(val); - } - } else { - const TVal* val = heap.Next(); - const TVal* prev = out->PushWithExtInfo(val); - - for (val = heap.Next(); val && prev; val = heap.Next()) { - if (TSieve::Sieve((TVal*)prev, val)) { - continue; - } - - prev = out->PushWithExtInfo(val); - } - - if (prev) { - TSieve::Sieve((TVal*)prev, prev); - } - } - } - }; - -public: - TDatSorterImplBase() - : Sorter(new TDefaultSorter) - { - InFiles = nullptr; - TempBuf = nullptr; - Ptr = Vector.end(); - Cur = nullptr; - Portions = CPortions = Error = 0; - } - - ~TDatSorterImplBase() { - Close(); - } - - int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { - Portions = CPortions = Error = 0; - TempBuf = strdup(templ); - Pagesize = pagesize; - if (pagesOrBytes) - Pages = pages; - else - Pages = pages / pagesize; - Pages = Max(1, Pages); - return 0; - } - - void Push(const TVal* v) { - // Serialized extInfo must follow a record being pushed, therefore, to avoid - // unintentional misusage (as if when you are adding TExtInfo in your record - // type: you may forget to check your sorting routines and get a segfault as - // a result). - // PushWithExtInfo(v) should be called on records with extInfo. - static_assert(!TExtInfoType<TVal>::Exists, "expect !TExtInfoType<TVal>::Exists"); - - Vector.push_back(v); - } - - void PushWithExtInfo(const TVal* v) { - Vector.push_back(v); - } - - int SortPortion() { - Ptr = Vector.end(); - Cur = nullptr; - if (!Vector.size() || Error) - return Error; - - MBDB_SORT_FUN(Vector.begin(), Vector.end(), TCompare()); - - if (!TIsSieveFake<TSieve>::Result) { - const typename TMyVector::iterator - end = SieveRange(Vector.begin(), Vector.end()); - - Vector.resize(end - Vector.begin()); - } - - Ptr = Vector.begin(); - Cur = nullptr; - return 0; - } - - const TVal* Nextp() { - Cur = Ptr == Vector.end() ? nullptr : *Ptr++; - return Cur; - } - - const TVal* Currentp() const { - return Cur; - } - - void Closep() { - Vector.clear(); - Ptr = Vector.end(); - Cur = nullptr; - } - - int NextPortion(bool direct = false) { - if (!Vector.size() || Error) - return Error; - - TTmpOut out; - int ret, ret1; - char fname[FILENAME_MAX]; - - snprintf(fname, sizeof(fname), TempBuf, Portions++); - if ((ret = out.Open(fname, Pagesize, Pages, 1, direct))) - return Error = ret; - - Sorter->Sort(Vector, &out); - - Vector.erase(Vector.begin(), Vector.end()); - ret = out.GetError(); - ret1 = out.Close(); - Error = Error ? Error : ret ? ret : ret1; - if (Error) - unlink(fname); - return Error; - } - - int SavePortions(const char* mask, bool direct = false) { - char srcname[PATH_MAX], dstname[PATH_MAX]; - if (Vector.size()) - NextPortion(direct); - for (int i = 0; i < Portions; i++) { - char num[10]; - sprintf(num, "%i", i); - snprintf(srcname, sizeof(srcname), TempBuf, i); - snprintf(dstname, sizeof(dstname), mask, num); - int res = rename(srcname, dstname); - if (res) - return res; - } - snprintf(dstname, sizeof(dstname), mask, "count"); - TOFStream fcount(dstname); - Save(&fcount, Portions); - fcount.Finish(); - return 0; - } - - int RestorePortions(const char* mask) { - char srcname[PATH_MAX], dstname[PATH_MAX]; - snprintf(srcname, sizeof(srcname), mask, "count"); - TIFStream fcount(srcname); - Load(&fcount, Portions); - for (int i = 0; i < Portions; i++) { - char num[10]; - sprintf(num, "%i", i); - snprintf(dstname, sizeof(dstname), TempBuf, i); - snprintf(srcname, sizeof(srcname), mask, num); - unlink(dstname); - int res = link(srcname, dstname); - if (res) - return res; - } - return 0; - } - - int RestorePortions(const char* mask, ui32 count) { - char srcname[PATH_MAX], dstname[PATH_MAX]; - ui32 portions; - TVector<ui32> counts; - for (ui32 j = 0; j < count; j++) { - snprintf(srcname, sizeof(srcname), mask, j, "count"); - TIFStream fcount(srcname); - Load(&fcount, portions); - counts.push_back(portions); - Portions += portions; - } - ui32 p = 0; - for (ui32 j = 0; j < count; j++) { - int cnt = counts[j]; - for (int i = 0; i < cnt; i++, p++) { - char num[10]; - sprintf(num, "%i", i); - snprintf(dstname, sizeof(dstname), TempBuf, p); - snprintf(srcname, sizeof(srcname), mask, j, num); - unlink(dstname); - int res = link(srcname, dstname); - if (res) { - fprintf(stderr, "Can not link %s to %s\n", srcname, dstname); - return res; - } - } - } - return 0; - } - - int Sort(size_t memory, int maxportions = 1000, bool direct = false) { - int ret, end, beg, i; - char fname[FILENAME_MAX]; - - if (Vector.size()) - NextPortion(); - - if (Error) - return Error; - if (!Portions) { - TMyHeap::Init(&DummyFile, 1); // closed file - HPages = 1; - return 0; - } - - Optimize(memory, maxportions); - if (!(InFiles = new TTmpIn[MPortions])) - return MBDB_NO_MEMORY; - - for (beg = 0; beg < Portions && !Error; beg = end) { - end = (int)Min(beg + FPortions, Portions); - for (i = beg; i < end && !Error; i++) { - snprintf(fname, sizeof(fname), TempBuf, i); - if ((ret = InFiles[i - beg].Open(fname, HPages, 1, nullptr, direct))) - Error = Error ? Error : ret; - } - if (Error) - return Error; - TMyHeap::Init(InFiles, end - beg); - if (end != Portions) { - TTmpOut out; - const TVal* v; - snprintf(fname, sizeof(fname), TempBuf, Portions++); - if ((ret = out.Open(fname, Pagesize, HPages))) - return Error = Error ? Error : ret; - while ((v = TMyHeap::Next())) - out.PushWithExtInfo(v); - ret = out.GetError(); - Error = Error ? Error : ret; - ret = out.Close(); - Error = Error ? Error : ret; - for (i = beg; i < end; i++) { - ret = InFiles[i - beg].Close(); - Error = Error ? Error : ret; - snprintf(fname, sizeof(fname), TempBuf, CPortions++); - unlink(fname); - } - } - FPortions = MPortions; - } - return Error; - } - - int Close() { - char fname[FILENAME_MAX]; - delete[] InFiles; - InFiles = nullptr; - Closep(); - for (int i = CPortions; i < Portions; i++) { - snprintf(fname, sizeof(fname), TempBuf, i); - unlink(fname); - } - CPortions = Portions = 0; - free(TempBuf); - TempBuf = nullptr; - return Error; - } - - void UseSegmentSorter() { - Sorter.Reset(new TSegmentedSorter); - } - - inline int GetError() const { - return Error; - } - - inline int GetPages() const { - return Pages; - } - - inline int GetPageSize() const { - return Pagesize; - } - -private: - static TMyIterator SieveRange(const TMyIterator begin, const TMyIterator end) { - TMyIterator it = begin; - TMyIterator prev = begin; - - for (++it; it != end; ++it) { - if (TSieve::Sieve((TVal*)*prev, *it)) { - continue; - } - - ++prev; - - if (it != prev) { - *prev = *it; - } - } - - TSieve::Sieve((TVal*)*prev, *prev); - - return ++prev; - } - -protected: - void Optimize(size_t memory, int maxportions, size_t fbufmax = 256u << 20) { - maxportions = (int)Min((size_t)maxportions, memory / Pagesize) - 1; - size_t maxpages = Max((size_t)1u, fbufmax / Pagesize); - - if (maxportions <= 2) { - FPortions = MPortions = 2; - HPages = 1; - return; - } - if (maxportions >= Portions) { - FPortions = MPortions = Portions; - HPages = (int)Min(memory / ((Portions + 1) * Pagesize), maxpages); - return; - } - if (((Portions + maxportions - 1) / maxportions) <= maxportions) { - while (((Portions + maxportions - 1) / maxportions) <= maxportions) - --maxportions; - MPortions = ++maxportions; - int total = ((Portions + maxportions - 1) / maxportions) + Portions; - FPortions = (total % maxportions) ? (total % maxportions) : MPortions; - HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); - return; - } - FPortions = MPortions = maxportions; - HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages); - } - - TMyVector Vector; - typename TMyVector::iterator Ptr; - const TVal* Cur; - TTmpIn *InFiles, DummyFile; - char* TempBuf; - int Portions, CPortions, Pagesize, Pages, Error; - int FPortions, MPortions, HPages; - THolder<IPortionSorter> Sorter; -}; - -template <class TVal, class TCompare, typename TCompress> -class TDatSorterImpl<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> - : public TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> { - typedef TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> TBase; - -public: - int SortToFile(const char* name, size_t memory, int maxportions = 1000) { - int ret = TBase::Sort(memory, maxportions); - if (ret) - return ret; - typename TBase::TOut out; - if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) - return ret; - const TVal* rec; - while ((rec = Next())) - out.PushWithExtInfo(rec); - if ((ret = out.GetError())) - return ret; - if ((ret = out.Close())) - return ret; - if ((ret = TBase::Close())) - return ret; - return 0; - } - - int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { - int ret = TBase::Sort(memory, maxportions); - if (ret) - return ret; - typename TBase::TOut out; - if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) - return ret; - const TVal* rec; - while ((rec = Next())) - out.PushWithExtInfo(rec); - if ((ret = out.GetError())) - return ret; - if ((ret = out.Close())) - return ret; - if ((ret = TBase::Close())) - return ret; - return 0; - } - - const TVal* Next() { - return TBase::TMyHeap::Next(); - } - - const TVal* Current() const { - return TBase::TMyHeap::Current(); - } - - bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const { - return TBase::TMyHeap::GetExtInfo(extInfo); - } - - const ui8* GetExtInfoRaw(size_t* len) const { - return TBase::TMyHeap::GetExtInfoRaw(len); - } -}; - -template <class TVal, class TCompare, typename TCompress, typename TSieve, - typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes> -class TDatSorterImpl: public TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> { - typedef TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> TBase; - -public: - TDatSorterImpl() - : Cur(nullptr) - , Prev(nullptr) - { - } - - int SortToFile(const char* name, size_t memory, int maxportions = 1000) { - int ret = Sort(memory, maxportions); - if (ret) - return ret; - typename TBase::TOut out; - if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages))) - return ret; - const TVal* rec; - while ((rec = Next())) - out.PushWithExtInfo(rec); - if ((ret = out.GetError())) - return ret; - if ((ret = out.Close())) - return ret; - if ((ret = TBase::Close())) - return ret; - return 0; - } - - int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) { - int ret = Sort(memory, maxportions); - if (ret) - return ret; - typename TBase::TOut out; - if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages))) - return ret; - const TVal* rec; - while ((rec = Next())) - out.PushWithExtInfo(rec); - if ((ret = out.GetError())) - return ret; - if ((ret = out.Close())) - return ret; - if ((ret = TBase::Close())) - return ret; - return 0; - } - - int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) { - int res = TBase::Open(templ, pagesize, pages, pagesOrBytes); - Prev = nullptr; - Cur = nullptr; - return res; - } - - int Sort(size_t memory, int maxportions = 1000, bool direct = false) { - int res = TBase::Sort(memory, maxportions, direct); - if (!res) { - const TVal* rec = TBase::TMyHeap::Next(); - if (rec) { - size_t els, es; - size_t sz = NMicroBDB::SizeOfExt(rec, &els, &es); - sz += els + es; - if (!TExtInfoType<TVal>::Exists) - Cur = (TVal*)malloc(sizeof(TVal)); - else - Cur = (TVal*)malloc(TBase::Pagesize); - memcpy(Cur, rec, sz); - } - } - return res; - } - - // Prev = last returned - // Cur = current accumlating with TSieve - - const TVal* Next() { - if (!Cur) { - if (Prev) { - free(Prev); - Prev = nullptr; - } - return nullptr; - } - const TVal* rec; - - if (TIsSieveFake<TSieve>::Result) - rec = TBase::TMyHeap::Next(); - else { - do { - rec = TBase::TMyHeap::Next(); - } while (rec && TSieve::Sieve((TVal*)Cur, rec)); - } - - if (!Prev) { - if (!TExtInfoType<TVal>::Exists) - Prev = (TVal*)malloc(sizeof(TVal)); - else - Prev = (TVal*)malloc(TBase::Pagesize); - } - size_t els, es; - size_t sz = NMicroBDB::SizeOfExt(Cur, &els, &es); - sz += els + es; - memcpy(Prev, Cur, sz); - - if (rec) { - sz = NMicroBDB::SizeOfExt(rec, &els, &es); - sz += els + es; - memcpy(Cur, rec, sz); - } else { - TSieve::Sieve((TVal*)Cur, Cur); - free(Cur); - Cur = nullptr; - } - return Prev; - } - - const TVal* Current() const { - return Prev; - } - - int Close() { - int res = TBase::Close(); - if (Prev) { - free(Prev); - Prev = nullptr; - } - if (Cur) { - free(Cur); - Cur = nullptr; - } - return res; - } - -protected: - TVal* Cur; - TVal* Prev; -}; diff --git a/library/cpp/microbdb/sorterdef.h b/library/cpp/microbdb/sorterdef.h deleted file mode 100644 index 8834b5fff80..00000000000 --- a/library/cpp/microbdb/sorterdef.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#define MAKESORTERTMPL(TRecord, MemberFunc) \ - template <typename T> \ - struct MemberFunc; \ - template <> \ - struct MemberFunc<TRecord> { \ - bool operator()(const TRecord* l, const TRecord* r) { \ - return TRecord ::MemberFunc(l, r) < 0; \ - } \ - int operator()(const TRecord* l, const TRecord* r, int) { \ - return TRecord ::MemberFunc(l, r); \ - } \ - } - -template <typename T> -static inline int compare(const T& a, const T& b) { - return (a < b) ? -1 : (a > b); -} diff --git a/library/cpp/microbdb/utility.h b/library/cpp/microbdb/utility.h deleted file mode 100644 index 5c86061bca0..00000000000 --- a/library/cpp/microbdb/utility.h +++ /dev/null @@ -1,75 +0,0 @@ -#pragma once - -#include "microbdb.h" - -template <class TRecord, template <class T> class TCompare> -int SortData(const TFile& ifile, const TFile& ofile, const TDatMetaPage* meta, size_t memory, const char* tmpDir = nullptr) { - char templ[FILENAME_MAX]; - TInDatFileImpl<TRecord> datin; - TOutDatFileImpl<TRecord> datout; - TDatSorterImpl<TRecord, TCompare<TRecord>, TFakeCompression, TFakeSieve<TRecord>> sorter; - const TRecord* u; - int ret; - - const size_t minMemory = (2u << 20); - memory = Max(memory, minMemory + minMemory / 2); - if (datin.Open(ifile, meta, memory - minMemory, 0)) - err(1, "can't read input file"); - - size_t outpages = Max((size_t)2u, minMemory / datin.GetPageSize()); - memory -= outpages * datin.GetPageSize(); - - if (ret = MakeSorterTempl(templ, tmpDir)) - err(1, "can't create tempdir in \"%s\"; error: %d\n", templ, ret); - - if (sorter.Open(templ, datin.GetPageSize(), outpages)) { - *strrchr(templ, LOCSLASH_C) = 0; - RemoveDirWithContents(templ); - err(1, "can't open sorter"); - } - - while (1) { - datin.Freeze(); - while ((u = datin.Next())) - sorter.PushWithExtInfo(u); - sorter.NextPortion(); - if (datin.GetError() || datin.IsEof()) - break; - } - - if (datin.GetError()) { - *strrchr(templ, LOCSLASH_C) = 0; - RemoveDirWithContents(templ); - err(1, "in data file error %d", datin.GetError()); - } - if (datin.Close()) { - *strrchr(templ, LOCSLASH_C) = 0; - RemoveDirWithContents(templ); - err(1, "can't close in data file"); - } - - sorter.Sort(memory); - - if (datout.Open(ofile, datin.GetPageSize(), outpages)) { - *strrchr(templ, LOCSLASH_C) = 0; - RemoveDirWithContents(templ); - err(1, "can't write out file"); - } - - while ((u = sorter.Next())) - datout.PushWithExtInfo(u); - - if (sorter.GetError()) - err(1, "sorter error %d", sorter.GetError()); - if (sorter.Close()) - err(1, "can't close sorter"); - - *strrchr(templ, LOCSLASH_C) = 0; - RemoveDirWithContents(templ); - - if (datout.GetError()) - err(1, "out data file error %d", datout.GetError()); - if (datout.Close()) - err(1, "can't close out data file"); - return 0; -} diff --git a/library/cpp/microbdb/wrappers.h b/library/cpp/microbdb/wrappers.h deleted file mode 100644 index 38eb8edebc0..00000000000 --- a/library/cpp/microbdb/wrappers.h +++ /dev/null @@ -1,637 +0,0 @@ -#pragma once - -#include "microbdb.h" - -#define MAKEFILTERTMPL(TRecord, MemberFunc, NS) \ - template <typename T> \ - struct MemberFunc; \ - template <> \ - struct MemberFunc<TRecord> { \ - bool operator()(const TRecord* r) { \ - return NS::MemberFunc(r); \ - } \ - } - -#define MAKEJOINTMPL(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ - template <typename A, typename B> \ - struct MemberFunc; \ - template <> \ - struct MemberFunc<TRecordA, TRecordB> { \ - int operator()(const TRecordA* l, const TRecordB* r) { \ - return NS::MemberFunc(l, r); \ - } \ - }; \ - typedef TMergeRec<TRecordA, TRecordB> TMergeType - -#define MAKEJOINTMPL2(TRecordA, TRecordB, MemberFunc, StructName, TMergeType) \ - template <typename A, typename B> \ - struct StructName; \ - template <> \ - struct StructName<TRecordA, TRecordB> { \ - int operator()(const TRecordA* l, const TRecordB* r) { \ - return MemberFunc(l, r); \ - } \ - }; \ - typedef TMergeRec<TRecordA, TRecordB> TMergeType - -#define MAKEJOINTMPLLEFT(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \ - template <typename A, typename B> \ - struct MemberFunc; \ - template <> \ - struct MemberFunc<TRecordA, TRecordB> { \ - int operator()(const TRecordA* l, const TRecordB* r) { \ - return NS::MemberFunc(l->RecA, r); \ - } \ - }; \ - typedef TMergeRec<TRecordA, TRecordB> TMergeType - -template <class TRec> -class IDatNextSource { -public: - virtual const TRec* Next() = 0; - virtual void Work() { - } -}; - -template <class TRec> -class IDatNextReceiver { -public: - IDatNextReceiver(IDatNextSource<TRec>& source) - : Source(source) - { - } - - virtual void Work() { - Source.Work(); - } - -protected: - IDatNextSource<TRec>& Source; -}; - -template <class TInRec, class TOutRec> -class IDatNextChannel: public IDatNextReceiver<TInRec>, public IDatNextSource<TOutRec> { -public: - IDatNextChannel(IDatNextSource<TInRec>& source) - : IDatNextReceiver<TInRec>(source) - { - } - - virtual void Work() { - IDatNextReceiver<TInRec>::Work(); - } -}; - -class IDatWorker { -public: - virtual void Work() = 0; -}; - -template <class TRec> -class IDatPushReceiver { -public: - virtual void Push(const TRec* rec) = 0; - virtual void Work() = 0; -}; - -template <class TRec> -class IDatPushSource { -public: - IDatPushSource(IDatPushReceiver<TRec>& receiver) - : Receiver(receiver) - { - } - - virtual void Work() { - Receiver.Work(); - } - -protected: - IDatPushReceiver<TRec>& Receiver; -}; - -template <class TInRec, class TOutRec> -class IDatPushChannel: public IDatPushReceiver<TInRec>, public IDatPushSource<TOutRec> { -public: - IDatPushChannel(IDatPushReceiver<TOutRec>& receiver) - : IDatPushSource<TOutRec>(receiver) - { - } - - virtual void Work() { - IDatPushSource<TOutRec>::Work(); - } -}; - -template <class TRec> -class IDatNextToPush: public IDatNextReceiver<TRec>, public IDatPushSource<TRec> { - typedef IDatNextReceiver<TRec> TNextReceiver; - typedef IDatPushSource<TRec> TPushSource; - -public: - IDatNextToPush(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) - : TNextReceiver(source) - , TPushSource(receiver) - { - } - - virtual void Work() { - const TRec* rec; - while (rec = TNextReceiver::Source.Next()) - TPushSource::Receiver.Push(rec); - TPushSource::Work(); - TNextReceiver::Work(); - } -}; - -template <class TRec> -class TDatNextPNSplitter: public IDatNextReceiver<TRec>, public IDatNextSource<TRec>, public IDatPushSource<TRec> { -public: - TDatNextPNSplitter(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver) - : IDatNextReceiver<TRec>(source) - , IDatNextSource<TRec>() - , IDatPushSource<TRec>(receiver) - { - } - - const TRec* Next() { - const TRec* rec = IDatNextReceiver<TRec>::Source.Next(); - if (rec) { - IDatPushSource<TRec>::Receiver.Push(rec); - return rec; - } else { - return 0; - } - } - - virtual void Work() { - IDatNextReceiver<TRec>::Work(); - IDatPushSource<TRec>::Work(); - } -}; - -template <class TRec, class TOutRecA = TRec, class TOutRecB = TRec> -class TDatPushPPSplitter: public IDatPushReceiver<TRec>, public IDatPushSource<TOutRecA>, public IDatPushSource<TOutRecB> { -public: - TDatPushPPSplitter(IDatPushReceiver<TOutRecA>& receiverA, IDatPushReceiver<TOutRecB>& receiverB) - : IDatPushSource<TOutRecA>(receiverA) - , IDatPushSource<TOutRecB>(receiverB) - { - } - - void Push(const TRec* rec) { - IDatPushSource<TOutRecA>::Receiver.Push(rec); - IDatPushSource<TOutRecB>::Receiver.Push(rec); - } - - void Work() { - IDatPushSource<TOutRecA>::Work(); - IDatPushSource<TOutRecB>::Work(); - } -}; - -template <class TRec> -class TFastInDatFile: public TInDatFile<TRec>, public IDatNextSource<TRec> { -public: - typedef TInDatFile<TRec> Base; - - TFastInDatFile(const char* name, bool open = true, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) - : TInDatFile<TRec>(name, pages, pagesOrBytes) - , FileName(name) - { - if (open) - Base::Open(name); - } - - void Open() { - Base::Open(FileName); - } - - template <class TPassRec> - bool PassToUid(const TRec* inrec, const TPassRec* torec) { - inrec = Base::Current(); - while (inrec && CompareUids(inrec, torec) < 0) - inrec = Base::Next(); - return (inrec && CompareUids(inrec, torec) == 0); - } - - void Work() { - Base::Close(); - } - - const TRec* Next() { - return Base::Next(); - } - -private: - TString FileName; -}; - -template <class TRec> -class TPushOutDatFile: public TOutDatFile<TRec>, public IDatPushReceiver<TRec> { -public: - typedef TOutDatFile<TRec> Base; - - TPushOutDatFile(const char* name, bool open = true) - : Base(name, dbcfg::pg_docuid, dbcfg::fbufsize, 0) - , FileName(name) - { - if (open) - Base::Open(name); - } - - void Open() { - Base::Open(~FileName); - } - - void Push(const TRec* rec) { - Base::Push(rec); - } - - void Work() { - Base::Close(); - } - -private: - TString FileName; -}; - -template <class TRec> -class TNextOutDatFile: public IDatNextToPush<TRec> { -public: - typedef IDatNextToPush<TRec> TBase; - - TNextOutDatFile(const char* name, IDatNextSource<TRec>& source, bool open = true) - : TBase(source, File) - , File(name, open) - { - } - - void Open() { - File.Open(); - } - -private: - TPushOutDatFile<TRec> File; -}; - -template <class TVal, template <typename T> class TCompare> -class TNextDatSorterMemo: public TDatSorterMemo<TVal, TCompare>, public IDatNextChannel<TVal, TVal> { - typedef TDatSorterMemo<TVal, TCompare> TImpl; - -public: - TNextDatSorterMemo(IDatNextSource<TVal>& source, const char* dir = dbcfg::fname_temp, const char* name = "yet another sorter", size_t memory = dbcfg::small_sorter_size, size_t pagesize = dbcfg::pg_docuid, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0) - : TImpl(name, memory, pagesize, pages, pagesOrBytes) - , IDatNextChannel<TVal, TVal>(source) - , Sorted(false) - { - TImpl::Open(dir); - } - - void Sort() { - const TVal* rec; - while (rec = IDatNextChannel<TVal, TVal>::Source.Next()) { - TImpl::Push(rec); - } - TImpl::Sort(); - Sorted = true; - } - - const TVal* Next() { - if (!Sorted) - Sort(); - return TImpl::Next(); - } - -private: - bool Sorted; - TString Dir; -}; - -template <class TInRec, class TOutRec> -class TDatConverter: public IDatNextChannel<TInRec, TOutRec> { -public: - TDatConverter(IDatNextSource<TInRec>& source) - : IDatNextChannel<TInRec, TOutRec>(source) - { - } - - virtual void Convert(const TInRec& inrec, TOutRec& outrec) { - outrec(inrec); - } - - const TOutRec* Next() { - const TInRec* rec = IDatNextChannel<TInRec, TOutRec>::Source.Next(); - if (!rec) - return 0; - Convert(*rec, CurrentRec); - return &CurrentRec; - } - -private: - TOutRec CurrentRec; -}; - -template <class TRecA, class TRecB> -class TMergeRec { -public: - const TRecA* RecA; - const TRecB* RecB; -}; - -enum NMergeTypes { - MT_JOIN = 0, - MT_ADD = 1, - MT_OVERWRITE = 2, - MT_TYPENUM -}; - -template <class TRecA, class TRecB, template <typename TA, typename TB> class TCompare> -class TNextDatMerger: public IDatNextReceiver<TRecA>, public IDatNextReceiver<TRecB>, public IDatNextSource<TMergeRec<TRecA, TRecB>> { -public: - TNextDatMerger(IDatNextSource<TRecA>& sourceA, IDatNextSource<TRecB>& sourceB, ui8 mergeType) - : IDatNextReceiver<TRecA>(sourceA) - , IDatNextReceiver<TRecB>(sourceB) - , MergeType(mergeType) - , MoveA(false) - , MoveB(false) - , NotInit(true) - { - } - - const TMergeRec<TRecA, TRecB>* Next() { - if (MoveA || NotInit) - SourceARec = IDatNextReceiver<TRecA>::Source.Next(); - if (MoveB || NotInit) - SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); - NotInit = false; - - // Cout << "Next " << SourceARec->HostId << "\t" << SourceBRec->HostId << "\t" << TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) << "\t" << ::compare(SourceARec->HostId, SourceBRec->HostId) << "\t" << ::compare(1, 2) << "\t" << ::compare(2,1) << Endl; - if (MergeType == MT_ADD && SourceARec && (!SourceBRec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { - MergeRec.RecA = SourceARec; - MergeRec.RecB = 0; - MoveA = true; - MoveB = false; - return &MergeRec; - } - - if (MergeType == MT_ADD && SourceBRec && (!SourceARec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) { - MergeRec.RecA = 0; - MergeRec.RecB = SourceBRec; - MoveA = false; - MoveB = true; - return &MergeRec; - } - - if (MergeType == MT_ADD && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) == 0) { - MergeRec.RecA = SourceARec; - MergeRec.RecB = SourceBRec; - MoveA = true; - MoveB = true; - return &MergeRec; - } - - while (MergeType == MT_JOIN && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) != 0) { - while (SourceARec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0) { - SourceARec = IDatNextReceiver<TRecA>::Source.Next(); - } - while (SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) > 0) { - SourceBRec = IDatNextReceiver<TRecB>::Source.Next(); - } - } - - if (MergeType == MT_JOIN && SourceARec && SourceBRec) { - MergeRec.RecA = SourceARec; - MergeRec.RecB = SourceBRec; - MoveA = true; - MoveB = true; - return &MergeRec; - } - - MergeRec.RecA = 0; - MergeRec.RecB = 0; - return 0; - } - - void Work() { - IDatNextReceiver<TRecA>::Source.Work(); - IDatNextReceiver<TRecB>::Source.Work(); - } - -private: - TMergeRec<TRecA, TRecB> MergeRec; - const TRecA* SourceARec; - const TRecB* SourceBRec; - ui8 MergeType; - bool MoveA; - bool MoveB; - bool NotInit; -}; - -/*template<class TRec, class TSource, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > -class TPushDatMerger { -public: - TPushDatMerger(TSource& source, TReceiver& receiver, ui8 mergeType) - : Source(source) - , Receiver(receiver) - , MergeType(mergeType) - { - } - - virtual void Init() { - SourceRec = Source.Next(); - } - - virtual void Push(const TRec* rec) { - while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) < 0) { - if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) - Receiver.Push(SourceRec); - SourceRec = Source.Next(); - } - - bool intersected = false; - while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) == 0) { - intersected = true; - if (MergeType == MT_ADD) - Receiver.Push(SourceRec); - SourceRec = Source.Next(); - } - - if (intersected && MergeType == MT_JOIN) - Receiver.Push(rec); - - if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) - Receiver.Push(rec); - } - - virtual void Term() { - if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) { - while (SourceRec) { - Receiver.Push(SourceRec); - SourceRec = Source.Next(); - } - } - } - -private: - TSource& Source; - const TRec* SourceRec; - TReceiver& Receiver; - ui8 MergeType; -};*/ - -/*template <class TRec, class TSourceA, class TSourceB, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > -class TNextDatMerger: public TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> { - typedef TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> TImpl; -public: - TNextDatMerger(TSourceA& sourceA, TSourceB& sourceB, TReceiver& receiver, ui8 mergeType) - : TImpl(sourceA, receiver, mergeType) - , SourceB(sourceB) - { - } - - virtual void Work() { - TImpl::Init(); - while (SourceBRec = SourceB.Next()) { - TImpl::Push(SourceBRec); - } - TImpl::Term(); - } -private: - TSourceB& SourceB; - const TRec* SourceBRec; -};*/ - -/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > -class TFilePushDatMerger: public TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> { - typedef TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; -public: - TFilePushDatMerger(const char* name, TReceiver& receiver, ui8 mergeType) - : TImpl(SourceFile, receiver, mergeType) - , SourceFile(name) - { - } - - virtual void Push(const TRec* rec) { - TImpl::Push(rec); - } - - virtual void Term() { - TImpl::Term(); - } -private: - TFastInDatFile<TRec> SourceFile; -};*/ - -/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> > -class TFileNextDatMerger: public TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> { - typedef TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl; -public: - TFileNextDatMerger(const char* sourceAname, const char* sourceBname, TReceiver& receiver, ui8 mergeType) - : TImpl(FileA, FileB, receiver, mergeType) - , FileA(sourceAname) - , FileB(sourceBname) - { - } - - virtual void Work() { - TImpl::Work(); - } -private: - TFastInDatFile<TRec> FileA; - TFastInDatFile<TRec> FileB; -};*/ - -template <class TRec, template <typename T> class TPredicate> -class TDatNextFilter: public IDatNextChannel<TRec, TRec> { -public: - TDatNextFilter(IDatNextSource<TRec>& source) - : IDatNextChannel<TRec, TRec>(source) - { - } - - virtual const TRec* Next() { - const TRec* rec; - while ((rec = IDatNextChannel<TRec, TRec>::Source.Next()) != 0 && !Check(rec)) { - } - if (!rec) - return 0; - return rec; - } - -protected: - virtual bool Check(const TRec* rec) { - return TPredicate<TRec>()(rec); - } -}; - -template <class TRec, template <typename T> class TPredicate> -class TDatPushFilter: public IDatPushChannel<TRec, TRec> { -public: - TDatPushFilter(IDatPushReceiver<TRec>& receiver) - : IDatPushChannel<TRec, TRec>(receiver) - { - } - - virtual void Push(const TRec* rec) { - if (Check(rec)) - IDatPushChannel<TRec, TRec>::Receiver.Push(rec); - } - -private: - virtual bool Check(const TRec* rec) { - return TPredicate<TRec>()(rec); - } -}; - -template <class TInRec, class TOutRec, template <typename T> class TCompare> -class TDatGrouper: public IDatNextChannel<TInRec, TOutRec> { -public: - TDatGrouper(IDatNextSource<TInRec>& source) - : IDatNextChannel<TInRec, TOutRec>(source) - , Begin(true) - , Finish(false) - , HasOutput(false) - { - } - - const TOutRec* Next() { - while (CurrentRec = IDatNextChannel<TInRec, TOutRec>::Source.Next()) { - int cmp = 0; - if (Begin) { - Begin = false; - OnStart(); - } else if ((cmp = TCompare<TInRec>()(CurrentRec, LastRec, 0)) != 0) { - OnFinish(); - OnStart(); - } - OnRecord(); - LastRec = CurrentRec; - if (HasOutput) { - HasOutput = false; - return &OutRec; - } - } - if (!Finish) - OnFinish(); - Finish = true; - if (HasOutput) { - HasOutput = false; - return &OutRec; - } - return 0; - } - -protected: - virtual void OnStart() = 0; - virtual void OnRecord() = 0; - virtual void OnFinish() = 0; - - const TInRec* CurrentRec; - const TInRec* LastRec; - TOutRec OutRec; - - bool Begin; - bool Finish; - bool HasOutput; -}; diff --git a/library/cpp/microbdb/ya.make b/library/cpp/microbdb/ya.make deleted file mode 100644 index 3e553f8535b..00000000000 --- a/library/cpp/microbdb/ya.make +++ /dev/null @@ -1,36 +0,0 @@ -LIBRARY() - -SRCS( - align.h - compressed.h - extinfo.h - file.cpp - hashes.h - header.h - header.cpp - heap.h - input.h - microbdb.cpp - noextinfo.proto - output.h - powersorter.h - reader.h - safeopen.h - sorter.h - sorterdef.h - utility.h - wrappers.h -) - -PEERDIR( - contrib/libs/fastlz - contrib/libs/libc_compat - contrib/libs/protobuf - contrib/libs/snappy - contrib/libs/zlib - library/cpp/deprecated/fgood - library/cpp/on_disk/st_hash - library/cpp/packedtypes -) - -END() |