aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/microbdb
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-07-31 20:07:26 +0300
committervvvv <vvvv@ydb.tech>2023-07-31 20:07:26 +0300
commitf9e4743508b7930e884714cc99985ac45f84ed98 (patch)
treea1290261a4915a6f607e110e2cc27aee4c205f85 /library/cpp/microbdb
parent5cf9beeab3ea847da0b6c414fcb5faa9cb041317 (diff)
downloadydb-f9e4743508b7930e884714cc99985ac45f84ed98.tar.gz
Use UDFs from YDB
Diffstat (limited to 'library/cpp/microbdb')
-rw-r--r--library/cpp/microbdb/CMakeLists.darwin-x86_64.txt56
-rw-r--r--library/cpp/microbdb/CMakeLists.linux-aarch64.txt57
-rw-r--r--library/cpp/microbdb/CMakeLists.linux-x86_64.txt57
-rw-r--r--library/cpp/microbdb/CMakeLists.txt17
-rw-r--r--library/cpp/microbdb/CMakeLists.windows-x86_64.txt56
-rw-r--r--library/cpp/microbdb/align.h17
-rw-r--r--library/cpp/microbdb/compressed.h520
-rw-r--r--library/cpp/microbdb/extinfo.h127
-rw-r--r--library/cpp/microbdb/file.cpp220
-rw-r--r--library/cpp/microbdb/file.h225
-rw-r--r--library/cpp/microbdb/hashes.h250
-rw-r--r--library/cpp/microbdb/header.cpp91
-rw-r--r--library/cpp/microbdb/header.h159
-rw-r--r--library/cpp/microbdb/heap.h143
-rw-r--r--library/cpp/microbdb/input.h1027
-rw-r--r--library/cpp/microbdb/microbdb.cpp1
-rw-r--r--library/cpp/microbdb/microbdb.h54
-rw-r--r--library/cpp/microbdb/noextinfo.proto4
-rw-r--r--library/cpp/microbdb/output.h1049
-rw-r--r--library/cpp/microbdb/powersorter.h667
-rw-r--r--library/cpp/microbdb/reader.h354
-rw-r--r--library/cpp/microbdb/safeopen.h792
-rw-r--r--library/cpp/microbdb/sorter.h677
-rw-r--r--library/cpp/microbdb/sorterdef.h19
-rw-r--r--library/cpp/microbdb/utility.h75
-rw-r--r--library/cpp/microbdb/wrappers.h637
-rw-r--r--library/cpp/microbdb/ya.make36
27 files changed, 0 insertions, 7387 deletions
diff --git a/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt b/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index c4d2e9d3a41..00000000000
--- a/library/cpp/microbdb/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,56 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-find_package(ZLIB REQUIRED)
-get_built_tool_path(
- TOOL_protoc_bin
- TOOL_protoc_dependency
- contrib/tools/protoc/bin
- protoc
-)
-get_built_tool_path(
- TOOL_cpp_styleguide_bin
- TOOL_cpp_styleguide_dependency
- contrib/tools/protoc/plugins/cpp_styleguide
- cpp_styleguide
-)
-
-add_library(library-cpp-microbdb)
-target_link_libraries(library-cpp-microbdb PUBLIC
- contrib-libs-cxxsupp
- yutil
- contrib-libs-fastlz
- contrib-libs-libc_compat
- contrib-libs-protobuf
- contrib-libs-snappy
- ZLIB::ZLIB
- cpp-deprecated-fgood
- cpp-on_disk-st_hash
- library-cpp-packedtypes
-)
-target_proto_messages(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto
-)
-target_sources(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp
-)
-target_proto_addincls(library-cpp-microbdb
- ./
- ${CMAKE_SOURCE_DIR}/
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
-)
-target_proto_outs(library-cpp-microbdb
- --cpp_out=${CMAKE_BINARY_DIR}/
- --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
-)
diff --git a/library/cpp/microbdb/CMakeLists.linux-aarch64.txt b/library/cpp/microbdb/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 302dbd03cdc..00000000000
--- a/library/cpp/microbdb/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,57 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-find_package(ZLIB REQUIRED)
-get_built_tool_path(
- TOOL_protoc_bin
- TOOL_protoc_dependency
- contrib/tools/protoc/bin
- protoc
-)
-get_built_tool_path(
- TOOL_cpp_styleguide_bin
- TOOL_cpp_styleguide_dependency
- contrib/tools/protoc/plugins/cpp_styleguide
- cpp_styleguide
-)
-
-add_library(library-cpp-microbdb)
-target_link_libraries(library-cpp-microbdb PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- contrib-libs-fastlz
- contrib-libs-libc_compat
- contrib-libs-protobuf
- contrib-libs-snappy
- ZLIB::ZLIB
- cpp-deprecated-fgood
- cpp-on_disk-st_hash
- library-cpp-packedtypes
-)
-target_proto_messages(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto
-)
-target_sources(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp
-)
-target_proto_addincls(library-cpp-microbdb
- ./
- ${CMAKE_SOURCE_DIR}/
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
-)
-target_proto_outs(library-cpp-microbdb
- --cpp_out=${CMAKE_BINARY_DIR}/
- --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
-)
diff --git a/library/cpp/microbdb/CMakeLists.linux-x86_64.txt b/library/cpp/microbdb/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 302dbd03cdc..00000000000
--- a/library/cpp/microbdb/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,57 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-find_package(ZLIB REQUIRED)
-get_built_tool_path(
- TOOL_protoc_bin
- TOOL_protoc_dependency
- contrib/tools/protoc/bin
- protoc
-)
-get_built_tool_path(
- TOOL_cpp_styleguide_bin
- TOOL_cpp_styleguide_dependency
- contrib/tools/protoc/plugins/cpp_styleguide
- cpp_styleguide
-)
-
-add_library(library-cpp-microbdb)
-target_link_libraries(library-cpp-microbdb PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- contrib-libs-fastlz
- contrib-libs-libc_compat
- contrib-libs-protobuf
- contrib-libs-snappy
- ZLIB::ZLIB
- cpp-deprecated-fgood
- cpp-on_disk-st_hash
- library-cpp-packedtypes
-)
-target_proto_messages(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto
-)
-target_sources(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp
-)
-target_proto_addincls(library-cpp-microbdb
- ./
- ${CMAKE_SOURCE_DIR}/
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
-)
-target_proto_outs(library-cpp-microbdb
- --cpp_out=${CMAKE_BINARY_DIR}/
- --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
-)
diff --git a/library/cpp/microbdb/CMakeLists.txt b/library/cpp/microbdb/CMakeLists.txt
deleted file mode 100644
index f8b31df0c11..00000000000
--- a/library/cpp/microbdb/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/library/cpp/microbdb/CMakeLists.windows-x86_64.txt b/library/cpp/microbdb/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index c4d2e9d3a41..00000000000
--- a/library/cpp/microbdb/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,56 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-find_package(ZLIB REQUIRED)
-get_built_tool_path(
- TOOL_protoc_bin
- TOOL_protoc_dependency
- contrib/tools/protoc/bin
- protoc
-)
-get_built_tool_path(
- TOOL_cpp_styleguide_bin
- TOOL_cpp_styleguide_dependency
- contrib/tools/protoc/plugins/cpp_styleguide
- cpp_styleguide
-)
-
-add_library(library-cpp-microbdb)
-target_link_libraries(library-cpp-microbdb PUBLIC
- contrib-libs-cxxsupp
- yutil
- contrib-libs-fastlz
- contrib-libs-libc_compat
- contrib-libs-protobuf
- contrib-libs-snappy
- ZLIB::ZLIB
- cpp-deprecated-fgood
- cpp-on_disk-st_hash
- library-cpp-packedtypes
-)
-target_proto_messages(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/noextinfo.proto
-)
-target_sources(library-cpp-microbdb PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/file.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/header.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/microbdb/microbdb.cpp
-)
-target_proto_addincls(library-cpp-microbdb
- ./
- ${CMAKE_SOURCE_DIR}/
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
- ${CMAKE_BINARY_DIR}
- ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
-)
-target_proto_outs(library-cpp-microbdb
- --cpp_out=${CMAKE_BINARY_DIR}/
- --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
-)
diff --git a/library/cpp/microbdb/align.h b/library/cpp/microbdb/align.h
deleted file mode 100644
index 2f8567f1344..00000000000
--- a/library/cpp/microbdb/align.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#pragma once
-
-#include <util/system/defaults.h>
-
-using TDatAlign = int;
-
-static inline size_t DatFloor(size_t size) {
- return (size - 1) & ~(sizeof(TDatAlign) - 1);
-}
-
-static inline size_t DatCeil(size_t size) {
- return DatFloor(size) + sizeof(TDatAlign);
-}
-
-static inline void DatSet(void* ptr, size_t size) {
- *(TDatAlign*)((char*)ptr + DatFloor(size)) = 0;
-}
diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h
deleted file mode 100644
index f0c9edfa925..00000000000
--- a/library/cpp/microbdb/compressed.h
+++ /dev/null
@@ -1,520 +0,0 @@
-#pragma once
-
-#include <util/stream/zlib.h>
-
-#include "microbdb.h"
-#include "safeopen.h"
-
-class TCompressedInputFileManip: public TInputFileManip {
-public:
- inline i64 GetLength() const {
- return -1; // Some microbdb logic rely on unknown size of compressed files
- }
-
- inline i64 Seek(i64 offset, int whence) {
- i64 oldPos = DoGetPosition();
- i64 newPos = offset;
- switch (whence) {
- case SEEK_CUR:
- newPos += oldPos;
- [[fallthrough]]; // Complier happy. Please fix it!
- case SEEK_SET:
- break;
- default:
- return -1L;
- }
- if (oldPos > newPos) {
- VerifyRandomAccess();
- DoSeek(0, SEEK_SET, IsStreamOpen());
- oldPos = 0;
- }
- const size_t bufsize = 1 << 12;
- char buf[bufsize];
- for (i64 i = oldPos; i < newPos; i += bufsize)
- InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i));
- return newPos;
- }
-
- i64 RealSeek(i64 offset, int whence) {
- InputStream.Destroy();
- i64 ret = DoSeek(offset, whence, !!CompressedInput);
- if (ret != -1)
- DoStreamOpen(DoCreateStream(), true);
- return ret;
- }
-
-protected:
- IInputStream* CreateStream(const TFile& file) override {
- CompressedInput.Reset(new TUnbufferedFileInput(file));
- return DoCreateStream();
- }
- inline IInputStream* DoCreateStream() {
- return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip);
- //return new TLzqDecompress(CompressedInput.Get());
- }
- THolder<IInputStream> CompressedInput;
-};
-
-class TCompressedBufferedInputFileManip: public TCompressedInputFileManip {
-protected:
- IInputStream* CreateStream(const TFile& file) override {
- CompressedInput.Reset(new TFileInput(file, 0x100000));
- return DoCreateStream();
- }
-};
-
-using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>;
-using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>;
-
-template <class TVal>
-struct TGzKey {
- ui64 Offset;
- TVal Key;
-
- static const ui32 RecordSig = TVal::RecordSig + 0x50495a47;
-
- TGzKey() {
- }
-
- TGzKey(ui64 offset, const TVal& key)
- : Offset(offset)
- , Key(key)
- {
- }
-
- size_t SizeOf() const {
- if (this)
- return sizeof(Offset) + ::SizeOf(&Key);
- else {
- size_t sizeOfKey = ::SizeOf((TVal*)NULL);
- return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0;
- }
- }
-};
-
-template <class TVal>
-class TInZIndexFile: protected TInDatFileImpl<TGzKey<TVal>> {
- typedef TInDatFileImpl<TGzKey<TVal>> TDatFile;
- typedef TGzKey<TVal> TGzVal;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
-
-public:
- TInZIndexFile()
- : Index0(nullptr)
- {
- }
-
- int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
- int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig);
- if (ret)
- return ret;
- if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) {
- TDatFile::Close();
- return MBDB_NO_MEMORY;
- }
- if (SizeOf((TGzVal*)NULL))
- RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL));
- TDatFile::Next();
- memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize());
- return 0;
- }
-
- int Close() {
- free(Index0);
- Index0 = NULL;
- return TDatFile::Close();
- }
-
- inline int GetError() const {
- return TDatFile::GetError();
- }
-
- int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) {
- assert(IsOpen());
- if (!SizeOf((TVal*)NULL))
- return FindVszKey(akey);
- int pageno;
- i64 offset;
- FindKeyOnPage(pageno, offset, Index0, akey);
- TDatPage* page = TPageIter::GotoPage(pageno + 1);
- int num_add = (int)offset;
- FindKeyOnPage(pageno, offset, page, akey);
- return pageno + num_add;
- }
-
- using TDatFile::IsOpen;
-
- int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) {
- int pageno;
- i64 offset;
- FindVszKeyOnPage(pageno, offset, Index0, akey);
- TDatPage* page = TPageIter::GotoPage(pageno + 1);
- int num_add = (int)offset;
- FindVszKeyOnPage(pageno, offset, page, akey);
- return pageno + num_add;
- }
-
- i64 FindPage(int pageno) {
- if (!SizeOf((TVal*)NULL))
- return FindVszPage(pageno);
- int recsize = DatCeil(SizeOf((TGzVal*)NULL));
- TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage);
- if (!page) // can happen if pageno is beyond EOF
- return -1;
- unsigned int localpageno = pageno % RecsOnPage;
- if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF
- return -1;
- TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize);
- return v->Offset;
- }
-
- i64 FindVszPage(int pageno) {
- TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage));
- TGzVal* prev = cur;
- unsigned int n = 0;
- while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) {
- prev = cur;
- cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
- n++;
- }
- TDatPage* page = TPageIter::GotoPage(n);
- unsigned int num_add = (unsigned int)(prev->Offset);
- n = 0;
- cur = (TGzVal*)((char*)page + sizeof(TDatPage));
- while (n < page->RecNum && n + num_add < (unsigned int)pageno) {
- cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
- n++;
- }
- if (n == page->RecNum) // can happen if pageno is beyond EOF
- return -1;
- return cur->Offset;
- }
-
-protected:
- void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) {
- int left = 0;
- int right = page->RecNum - 1;
- int recsize = DatCeil(SizeOf((TGzVal*)NULL));
- while (left < right) {
- int middle = (left + right) >> 1;
- if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key)
- left = middle + 1;
- else
- right = middle;
- }
- //borders check (left and right)
- pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1;
- offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset;
- }
-
- void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) {
- TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage));
- ui32 RecordSig = page->RecNum;
- i64 tmpoffset = cur->Offset;
- for (; RecordSig > 0 && cur->Key < *key; --RecordSig) {
- tmpoffset = cur->Offset;
- cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur)));
- }
- int idx = page->RecNum - RecordSig - 1;
- pageno = (idx >= 0) ? idx : 0;
- offset = tmpoffset;
- }
-
- TDatPage* Index0;
- int RecsOnPage;
-};
-
-template <class TKey>
-class TCompressedIndexedInputPageFile: public TCompressedInputPageFile {
-public:
- int GotoPage(int pageno);
-
-protected:
- TInZIndexFile<TKey> KeyFile;
-};
-
-template <class TVal, class TKey>
-class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey,
- TInDatFileImpl<TVal, TInputRecordIterator<TVal,
- TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> {
-};
-
-class TCompressedOutputFileManip: public TOutputFileManip {
-public:
- inline i64 GetLength() const {
- return -1; // Some microbdb logic rely on unknown size of compressed files
- }
-
- inline i64 Seek(i64 offset, int whence) {
- i64 oldPos = DoGetPosition();
- i64 newPos = offset;
- switch (whence) {
- case SEEK_CUR:
- newPos += oldPos;
- [[fallthrough]]; // Compler happy. Please fix it!
- case SEEK_SET:
- break;
- default:
- return -1L;
- }
- if (oldPos > newPos)
- return -1L;
-
- const size_t bufsize = 1 << 12;
- char buf[bufsize] = {0};
- for (i64 i = oldPos; i < newPos; i += bufsize)
- OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i));
- return newPos;
- }
-
- i64 RealSeek(i64 offset, int whence) {
- OutputStream.Destroy();
- i64 ret = DoSeek(offset, whence, !!CompressedOutput);
- if (ret != -1)
- DoStreamOpen(DoCreateStream(), true);
- return ret;
- }
-
-protected:
- IOutputStream* CreateStream(const TFile& file) override {
- CompressedOutput.Reset(new TUnbufferedFileOutput(file));
- return DoCreateStream();
- }
- inline IOutputStream* DoCreateStream() {
- return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1);
- }
- THolder<IOutputStream> CompressedOutput;
-};
-
-class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip {
-protected:
- IOutputStream* CreateStream(const TFile& file) override {
- CompressedOutput.Reset(new TUnbufferedFileOutput(file));
- return DoCreateStream();
- }
- inline IOutputStream* DoCreateStream() {
- return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000);
- }
-};
-
-using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>;
-using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>;
-
-template <class TVal>
-class TOutZIndexFile: public TOutDatFileImpl<
- TGzKey<TVal>,
- TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> {
- typedef TOutDatFileImpl<
- TGzKey<TVal>,
- TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>>
- TDatFile;
- typedef TOutZIndexFile<TVal> TMyType;
- typedef TGzKey<TVal> TGzVal;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TIndexer TIndexer;
-
-public:
- TOutZIndexFile() {
- TotalRecNum = 0;
- TIndexer::SetCallback(this, DispatchCallback);
- }
-
- int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
- int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
- if (ret)
- return ret;
- if ((ret = TRecIter::GotoPage(1)))
- TDatFile::Close();
- return ret;
- }
-
- int Close() {
- TPageIter::Unfreeze();
- if (TRecIter::RecNum)
- NextPage(TPageIter::Current());
- int ret = 0;
- if (Index0.size() && !(ret = TRecIter::GotoPage(0))) {
- typename std::vector<TGzVal>::iterator it, end = Index0.end();
- for (it = Index0.begin(); it != end; ++it)
- TRecIter::Push(&*it);
- ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError();
- }
- Index0.clear();
- int ret1 = TDatFile::Close();
- return ret ? ret : ret1;
- }
-
-protected:
- int TotalRecNum; // should be enough because we have GotoPage(int)
- std::vector<TGzVal> Index0;
-
- void NextPage(const TDatPage* page) {
- TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage));
- Index0.push_back(TGzVal(TotalRecNum, rec->Key));
- TotalRecNum += TRecIter::RecNum;
- }
-
- static void DispatchCallback(void* This, const TDatPage* page) {
- ((TMyType*)This)->NextPage(page);
- }
-};
-
-template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile>
-class TOutDirectCompressedFileImpl: public TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> {
- typedef TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>>
- TDatFile;
- typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TIndexer TIndexer;
- typedef TGzKey<TKey> TMyKey;
- typedef TOutZIndexFile<TKey> TKeyFile;
-
-protected:
- using TDatFile::Tell;
-
-public:
- TOutDirectCompressedFileImpl() {
- TIndexer::SetCallback(this, DispatchCallback);
- }
-
- int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) {
- char iname[FILENAME_MAX];
- int ret;
- if (ipagesize == 0)
- ipagesize = pagesize;
-
- ret = TDatFile::Open(fname, pagesize, 1, 1);
- ret = ret ? ret : DatNameToIdx(iname, fname);
- ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1);
- if (ret)
- TDatFile::Close();
- return ret;
- }
-
- int Close() {
- if (TRecIter::RecNum)
- NextPage(TPageIter::Current());
- int ret = KeyFile.Close();
- int ret1 = TDatFile::Close();
- return ret1 ? ret1 : ret;
- }
-
- int GetError() const {
- return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError();
- }
-
-protected:
- TKeyFile KeyFile;
-
- void NextPage(const TDatPage* page) {
- size_t sz = SizeOf((TMyKey*)NULL);
- TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>());
- if (rec) {
- rec->Offset = Tell();
- rec->Key = *(TVal*)((char*)page + sizeof(TDatPage));
- KeyFile.ResetDat();
- }
- }
-
- static void DispatchCallback(void* This, const TDatPage* page) {
- ((TMyType*)This)->NextPage(page);
- }
-};
-
-template <class TKey>
-int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) {
- if (Error)
- return Error;
-
- Eof = 0;
-
- i64 offset = KeyFile.FindPage(pageno);
- if (!offset)
- return Error = MBDB_BAD_FILE_SIZE;
-
- if (offset != FileManip.RealSeek(offset, SEEK_SET))
- Error = MBDB_BAD_FILE_SIZE;
-
- return Error;
-}
-
-template <typename TVal>
-class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> {
-public:
- TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1)
- : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes)
- {
- }
-};
-
-template <typename TVal>
-class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> {
-public:
- TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes)
- {
- }
-};
-
-template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile>
-class TOutDirectCompressedFile: protected TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> {
- typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase;
-
-public:
- TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0)
- : Name(strdup(name))
- , PageSize(pagesize)
- , IdxPageSize(ipagesize)
- {
- }
-
- ~TOutDirectCompressedFile() {
- Close();
- free(Name);
- Name = NULL;
- }
-
- void Open(const char* fname) {
- int ret = TBase::Open(fname, PageSize, IdxPageSize);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
- free(Name);
- Name = strdup(fname);
- }
-
- void Close() {
- int ret;
- if ((ret = TBase::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
- if ((ret = TBase::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
- }
-
- const char* GetName() const {
- return Name;
- }
-
- using TBase::Freeze;
- using TBase::Push;
- using TBase::Reserve;
- using TBase::Unfreeze;
-
-protected:
- char* Name;
- size_t PageSize, IdxPageSize;
-};
-
-class TCompressedInterFileTypes {
-public:
- typedef TCompressedBufferedOutputPageFile TOutPageFile;
- typedef TCompressedBufferedInputPageFile TInPageFile;
-};
diff --git a/library/cpp/microbdb/extinfo.h b/library/cpp/microbdb/extinfo.h
deleted file mode 100644
index c8389e783cd..00000000000
--- a/library/cpp/microbdb/extinfo.h
+++ /dev/null
@@ -1,127 +0,0 @@
-#pragma once
-
-#include "header.h"
-
-#include <library/cpp/packedtypes/longs.h>
-
-#include <util/generic/typetraits.h>
-
-#include <library/cpp/microbdb/noextinfo.pb.h>
-
-inline bool operator<(const TNoExtInfo&, const TNoExtInfo&) {
- return false;
-}
-
-namespace NMicroBDB {
- Y_HAS_MEMBER(TExtInfo);
-
- template <class, bool>
- struct TSelectExtInfo;
-
- template <class T>
- struct TSelectExtInfo<T, false> {
- typedef TNoExtInfo TExtInfo;
- };
-
- template <class T>
- struct TSelectExtInfo<T, true> {
- typedef typename T::TExtInfo TExtInfo;
- };
-
- template <class T>
- class TExtInfoType {
- public:
- static const bool Exists = THasTExtInfo<T>::value;
- typedef typename TSelectExtInfo<T, Exists>::TExtInfo TResult;
- };
-
- Y_HAS_MEMBER(MakeExtKey);
-
- template <class, class, bool>
- struct TSelectMakeExtKey;
-
- template <class TVal, class TKey>
- struct TSelectMakeExtKey<TVal, TKey, false> {
- static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult*, const TVal* from, const typename TExtInfoType<TVal>::TResult*) {
- *to = *from;
- }
- };
-
- template <class TVal, class TKey>
- struct TSelectMakeExtKey<TVal, TKey, true> {
- static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) {
- TVal::MakeExtKey(to, toExt, from, fromExt);
- }
- };
-
- template <typename T>
- inline size_t SizeOfExt(const T* rec, size_t* /*out*/ extLenSize = nullptr, size_t* /*out*/ extSize = nullptr) {
- if (!TExtInfoType<T>::Exists) {
- if (extLenSize)
- *extLenSize = 0;
- if (extSize)
- *extSize = 0;
- return SizeOf(rec);
- } else {
- size_t sz = SizeOf(rec);
- i64 l;
- int els = in_long(l, (const char*)rec + sz);
- if (extLenSize)
- *extLenSize = static_cast<size_t>(els);
- if (extSize)
- *extSize = static_cast<size_t>(l);
- return sz;
- }
- }
-
- template <class T>
- bool GetExtInfo(const T* rec, typename TExtInfoType<T>::TResult* extInfo) {
- Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records");
- if (!rec)
- return false;
- size_t els;
- size_t es;
- size_t s = SizeOfExt(rec, &els, &es);
- const ui8* raw = (const ui8*)rec + s + els;
- return extInfo->ParseFromArray(raw, es);
- }
-
- template <class T>
- const ui8* GetExtInfoRaw(const T* rec, size_t* len) {
- Y_VERIFY(TExtInfoType<T>::Exists, "GetExtInfo should only be used with extended records");
- if (!rec) {
- *len = 0;
- return nullptr;
- }
- size_t els;
- size_t es;
- size_t s = SizeOfExt(rec, &els, &es);
- *len = els + es;
- return (const ui8*)rec + s;
- }
-
- // Compares serialized extInfo (e.g. for stable sort)
- template <class T>
- int CompareExtInfo(const T* a, const T* b) {
- Y_VERIFY(TExtInfoType<T>::Exists, "CompareExtInfo should only be used with extended records");
- size_t elsA, esA;
- size_t elsB, esB;
- SizeOfExt(a, &elsA, &esA);
- SizeOfExt(a, &elsB, &esB);
- if (esA != esB)
- return esA - esB;
- else
- return memcmp((const ui8*)a + elsA, (const ui8*)b + elsB, esA);
- }
-
-}
-
-using NMicroBDB::TExtInfoType;
-
-template <class TVal, class TKey>
-struct TMakeExtKey {
- static const bool Exists = NMicroBDB::THasMakeExtKey<TVal>::value;
- static inline void Make(TKey* to, typename TExtInfoType<TKey>::TResult* toExt, const TVal* from, const typename TExtInfoType<TVal>::TResult* fromExt) {
- NMicroBDB::TSelectMakeExtKey<TVal, TKey, Exists>::Make(to, toExt, from, fromExt);
- }
-};
diff --git a/library/cpp/microbdb/file.cpp b/library/cpp/microbdb/file.cpp
deleted file mode 100644
index 599a7301a0f..00000000000
--- a/library/cpp/microbdb/file.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-#include "file.h"
-
-#include <fcntl.h>
-#include <errno.h>
-#include <sys/stat.h>
-
-#ifdef _win32_
-#define S_ISREG(x) !!(x & S_IFREG)
-#endif
-
-TFileManipBase::TFileManipBase()
- : FileBased(true)
-{
-}
-
-i64 TFileManipBase::DoSeek(i64 offset, int whence, bool isStreamOpen) {
- if (!isStreamOpen)
- return -1;
- VerifyRandomAccess();
- return File.Seek(offset, (SeekDir)whence);
-}
-
-int TFileManipBase::DoFileOpen(const TFile& file) {
- File = file;
- SetFileBased(IsFileBased());
- return (File.IsOpen()) ? 0 : MBDB_OPEN_ERROR;
-}
-
-int TFileManipBase::DoFileClose() {
- if (File.IsOpen()) {
- File.Close();
- return MBDB_ALREADY_INITIALIZED;
- }
- return 0;
-}
-
-int TFileManipBase::IsFileBased() const {
- bool fileBased = true;
-#if defined(_win_)
-#elif defined(_unix_)
- FHANDLE h = File.GetHandle();
- struct stat sb;
- fileBased = false;
- if (h != INVALID_FHANDLE && !::fstat(h, &sb) && S_ISREG(sb.st_mode)) {
- fileBased = true;
- }
-#else
-#error
-#endif
- return fileBased;
-}
-
-TInputFileManip::TInputFileManip()
- : InputStream(nullptr)
-{
-}
-
-int TInputFileManip::Open(const char* fname, bool direct) {
- int ret;
- return (ret = DoClose()) ? ret : DoStreamOpen(TFile(fname, RdOnly | (direct ? DirectAligned : EOpenMode())));
-}
-
-int TInputFileManip::Open(IInputStream& input) {
- int ret;
- return (ret = DoClose()) ? ret : DoStreamOpen(&input);
-}
-
-int TInputFileManip::Open(TAutoPtr<IInputStream> input) {
- int ret;
- return (ret = DoClose()) ? ret : DoStreamOpen(input.Release());
-}
-
-int TInputFileManip::Init(const TFile& file) {
- int ret;
- if (ret = DoClose())
- return ret;
- DoStreamOpen(file);
- return 0;
-}
-
-int TInputFileManip::Close() {
- DoClose();
- return 0;
-}
-
-ssize_t TInputFileManip::Read(void* buf, unsigned len) {
- if (!IsStreamOpen())
- return -1;
- return InputStream->Load(buf, len);
-}
-
-IInputStream* TInputFileManip::CreateStream(const TFile& file) {
- return new TUnbufferedFileInput(file);
-}
-
-TMappedInputPageFile::TMappedInputPageFile()
- : Pagesize(0)
- , Error(0)
- , Pagenum(0)
- , Recordsig(0)
- , Open(false)
-{
- Term();
-}
-
-TMappedInputPageFile::~TMappedInputPageFile() {
- Term();
-}
-
-int TMappedInputPageFile::Init(const char* fname, ui32 recsig, ui32* gotRecordSig, bool) {
- Mappedfile.init(fname);
- Open = true;
-
- TDatMetaPage* meta = (TDatMetaPage*)Mappedfile.getData();
- if (gotRecordSig)
- *gotRecordSig = meta->RecordSig;
-
- if (meta->MetaSig != METASIG)
- Error = MBDB_BAD_METAPAGE;
- else if (meta->RecordSig != recsig)
- Error = MBDB_BAD_RECORDSIG;
-
- if (Error) {
- Mappedfile.term();
- return Error;
- }
-
- size_t fsize = Mappedfile.getSize();
- if (fsize < METASIZE)
- return Error = MBDB_BAD_FILE_SIZE;
- fsize -= METASIZE;
- if (fsize % meta->PageSize)
- return Error = MBDB_BAD_FILE_SIZE;
- Pagenum = (int)(fsize / meta->PageSize);
- Pagesize = meta->PageSize;
- Recordsig = meta->RecordSig;
- Error = 0;
- return Error;
-}
-
-int TMappedInputPageFile::Term() {
- Mappedfile.term();
- Open = false;
- return 0;
-}
-
-TOutputFileManip::TOutputFileManip()
- : OutputStream(nullptr)
-{
-}
-
-int TOutputFileManip::Open(const char* fname, EOpenMode mode) {
- if (IsStreamOpen()) {
- return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip
- }
-
- try {
- if (unlink(fname) && errno != ENOENT) {
- if (strncmp(fname, "/dev/std", 8))
- return MBDB_OPEN_ERROR;
- }
- TFile file(fname, mode);
- DoStreamOpen(file);
- } catch (const TFileError&) {
- return MBDB_OPEN_ERROR;
- }
- return 0;
-}
-
-int TOutputFileManip::Open(IOutputStream& output) {
- if (IsStreamOpen())
- return MBDB_ALREADY_INITIALIZED;
- DoStreamOpen(&output);
- return 0;
-}
-
-int TOutputFileManip::Open(TAutoPtr<IOutputStream> output) {
- if (IsStreamOpen())
- return MBDB_ALREADY_INITIALIZED;
- DoStreamOpen(output.Release());
- return 0;
-}
-
-int TOutputFileManip::Init(const TFile& file) {
- if (IsStreamOpen())
- return MBDB_ALREADY_INITIALIZED; // should it be closed as TInputFileManip
- DoStreamOpen(file);
- return 0;
-}
-
-int TOutputFileManip::Rotate(const char* newfname) {
- if (!IsStreamOpen()) {
- return MBDB_NOT_INITIALIZED;
- }
-
- try {
- TFile file(newfname, WrOnly | OpenAlways | TruncExisting | ARW | AWOther);
- DoClose();
- DoStreamOpen(file);
- } catch (const TFileError&) {
- return MBDB_OPEN_ERROR;
- }
- return 0;
-}
-
-int TOutputFileManip::Close() {
- DoClose();
- return 0;
-}
-
-int TOutputFileManip::Write(const void* buf, unsigned len) {
- if (!IsStreamOpen())
- return -1;
- OutputStream->Write(buf, len);
- return len;
-}
-
-IOutputStream* TOutputFileManip::CreateStream(const TFile& file) {
- return new TUnbufferedFileOutput(file);
-}
diff --git a/library/cpp/microbdb/file.h b/library/cpp/microbdb/file.h
deleted file mode 100644
index f7c78183759..00000000000
--- a/library/cpp/microbdb/file.h
+++ /dev/null
@@ -1,225 +0,0 @@
-#pragma once
-
-#include "header.h"
-
-#include <library/cpp/deprecated/mapped_file/mapped_file.h>
-
-#include <util/generic/noncopyable.h>
-#include <util/stream/file.h>
-#include <util/system/filemap.h>
-
-#define FS_BLOCK_SIZE 512
-
-class TFileManipBase {
-protected:
- TFileManipBase();
-
- virtual ~TFileManipBase() {
- }
-
- i64 DoSeek(i64 offset, int whence, bool isStreamOpen);
-
- int DoFileOpen(const TFile& file);
-
- int DoFileClose();
-
- int IsFileBased() const;
-
- inline void SetFileBased(bool fileBased) {
- FileBased = fileBased;
- }
-
- inline i64 DoGetPosition() const {
- Y_ASSERT(FileBased);
- return File.GetPosition();
- }
-
- inline i64 DoGetLength() const {
- return (FileBased) ? File.GetLength() : -1;
- }
-
- inline void VerifyRandomAccess() const {
- Y_VERIFY(FileBased, "non-file stream can not be accessed randomly");
- }
-
- inline i64 GetPosition() const {
- return (i64)File.GetPosition();
- }
-
-private:
- TFile File;
- bool FileBased;
-};
-
-class TInputFileManip: public TFileManipBase {
-public:
- using TFileManipBase::GetPosition;
-
- TInputFileManip();
-
- int Open(const char* fname, bool direct = false);
-
- int Open(IInputStream& input);
-
- int Open(TAutoPtr<IInputStream> input);
-
- int Init(const TFile& file);
-
- int Close();
-
- ssize_t Read(void* buf, unsigned len);
-
- inline bool IsOpen() const {
- return IsStreamOpen();
- }
-
- inline i64 GetLength() const {
- return DoGetLength();
- }
-
- inline i64 Seek(i64 offset, int whence) {
- return DoSeek(offset, whence, IsStreamOpen());
- }
-
- inline i64 RealSeek(i64 offset, int whence) {
- return Seek(offset, whence);
- }
-
-protected:
- inline bool IsStreamOpen() const {
- return !!InputStream;
- }
-
- inline int DoStreamOpen(IInputStream* input, bool fileBased = false) {
- InputStream.Reset(input);
- SetFileBased(fileBased);
- return 0;
- }
-
- inline int DoStreamOpen(const TFile& file) {
- int ret;
- return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), IsFileBased());
- }
-
- virtual IInputStream* CreateStream(const TFile& file);
-
- inline bool DoClose() {
- if (IsStreamOpen()) {
- InputStream.Destroy();
- return DoFileClose();
- }
- return 0;
- }
-
- THolder<IInputStream> InputStream;
-};
-
-class TMappedInputPageFile: private TNonCopyable {
-public:
- TMappedInputPageFile();
-
- ~TMappedInputPageFile();
-
- inline int GetError() const {
- return Error;
- }
-
- inline size_t GetPageSize() const {
- return Pagesize;
- }
-
- inline int GetLastPage() const {
- return Pagenum;
- }
-
- inline ui32 GetRecordSig() const {
- return Recordsig;
- }
-
- inline bool IsOpen() const {
- return Open;
- }
-
- inline char* GetData() const {
- return Open ? (char*)Mappedfile.getData() : nullptr;
- }
-
- inline size_t GetSize() const {
- return Open ? Mappedfile.getSize() : 0;
- }
-
-protected:
- int Init(const char* fname, ui32 recsig, ui32* gotRecordSig = nullptr, bool direct = false);
-
- int Term();
-
- TMappedFile Mappedfile;
- size_t Pagesize;
- int Error;
- int Pagenum;
- ui32 Recordsig;
- bool Open;
-};
-
-class TOutputFileManip: public TFileManipBase {
-public:
- TOutputFileManip();
-
- int Open(const char* fname, EOpenMode mode = WrOnly | CreateAlways | ARW | AWOther);
-
- int Open(IOutputStream& output);
-
- int Open(TAutoPtr<IOutputStream> output);
-
- int Init(const TFile& file);
-
- int Rotate(const char* newfname);
-
- int Write(const void* buf, unsigned len);
-
- int Close();
-
- inline bool IsOpen() const {
- return IsStreamOpen();
- }
-
- inline i64 GetLength() const {
- return DoGetLength();
- }
-
- inline i64 Seek(i64 offset, int whence) {
- return DoSeek(offset, whence, IsStreamOpen());
- }
-
- inline i64 RealSeek(i64 offset, int whence) {
- return Seek(offset, whence);
- }
-
-protected:
- inline bool IsStreamOpen() const {
- return !!OutputStream;
- }
-
- inline int DoStreamOpen(IOutputStream* output, bool fileBased = false) {
- OutputStream.Reset(output);
- SetFileBased(fileBased);
- return 0;
- }
-
- inline int DoStreamOpen(const TFile& file) {
- int ret;
- return (ret = DoFileOpen(file)) ? ret : DoStreamOpen(CreateStream(file), true);
- }
-
- virtual IOutputStream* CreateStream(const TFile& file);
-
- inline bool DoClose() {
- if (IsStreamOpen()) {
- OutputStream.Destroy();
- return DoFileClose();
- }
- return 0;
- }
-
- THolder<IOutputStream> OutputStream;
-};
diff --git a/library/cpp/microbdb/hashes.h b/library/cpp/microbdb/hashes.h
deleted file mode 100644
index bfd113c3ba4..00000000000
--- a/library/cpp/microbdb/hashes.h
+++ /dev/null
@@ -1,250 +0,0 @@
-#pragma once
-
-#include <library/cpp/on_disk/st_hash/static_hash.h>
-#include <util/system/sysstat.h>
-#include <util/stream/mem.h>
-#include <util/string/printf.h>
-#include <library/cpp/deprecated/fgood/fgood.h>
-
-#include "safeopen.h"
-
-/** This file currently implements creation of mappable read-only hash file.
- Basic usage of these "static hashes" is defined in util/static_hash.h (see docs there).
- Additional useful wrappers are available in util/static_hash_map.h
-
- There are two ways to create mappable hash file:
-
- A) Fill an THashMap/set structure in RAM, then dump it to disk.
- This is usually done by save_hash_to_file* functions defined in static_hash.h
- (see description in static_hash.h).
-
- B) Prepare all data using external sorter, then create hash file straight on disk.
- This approach is necessary when there isn't enough RAM to hold entire original THashMap.
- Implemented in this file as TStaticHashBuilder class.
-
- Current implementation's major drawback is that the size of the hash must be estimated
- before the hash is built (bucketCount), which is not always possible.
- Separate implementation with two sort passes is yet to be done.
-
- Another problem is that maximum stored size of the element (maxRecSize) must also be
- known in advance, because we use TDatSorterMemo, etc.
- */
-
-template <class SizeType>
-struct TSthashTmpRec {
- SizeType HashVal;
- SizeType RecSize;
- char Buf[1];
- size_t SizeOf() const {
- return &Buf[RecSize] - (char*)this;
- }
- bool operator<(const TSthashTmpRec& than) const {
- return HashVal < than.HashVal;
- }
- static const ui32 RecordSig = 20100124 + sizeof(SizeType) - 4;
-};
-
-template <typename T>
-struct TReplaceMerger {
- T operator()(const T& oldRecord, const T& newRecord) const {
- Y_UNUSED(oldRecord);
- return newRecord;
- }
-};
-
-/** TStaticHashBuilder template parameters:
- HashType - THashMap map/set type for which we construct corresponding mappable hash;
- SizeType - type used to store offsets and length in resulting hash;
- MergerType - type of object to process records with equal key (see TReplaceMerger for example);
- */
-
-template <class HashType, class SizeType, class MergerType = TReplaceMerger<typename HashType::mapped_type>>
-struct TStaticHashBuilder {
- const size_t SrtIOPageSz;
- const size_t WrBufSz;
- typedef TSthashTmpRec<SizeType> TIoRec;
- typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, SizeType> TKeySaver;
- typedef typename HashType::value_type TValueType;
- typedef typename HashType::mapped_type TMappedType;
- typedef typename HashType::key_type TKeyType;
-
- TDatSorterMemo<TIoRec, TCompareByLess> Srt;
- TBuffer IoRec, CurrentBlockRecs;
- TKeySaver KeySaver;
- typename HashType::hasher Hasher;
- typename HashType::key_equal Equals;
- MergerType merger;
- TString HashFileName;
- TString OurTmpDir;
- size_t BucketCount;
- int FreeBits;
-
- // memSz is the Sorter buffer size;
- // maxRecSize is the maximum size (as reported by size_for_st) of our record(s)
- TStaticHashBuilder(size_t memSz, size_t maxRecSize)
- : SrtIOPageSz((maxRecSize * 16 + 65535) & ~size_t(65535))
- , WrBufSz(memSz / 16 >= SrtIOPageSz ? memSz / 16 : SrtIOPageSz)
- , Srt("unused", memSz, SrtIOPageSz, WrBufSz, 0)
- , IoRec(sizeof(TIoRec) + maxRecSize)
- , CurrentBlockRecs(sizeof(TIoRec) + maxRecSize)
- , BucketCount(0)
- , FreeBits(0)
- {
- }
-
- ~TStaticHashBuilder() {
- Close();
- }
-
- // if tmpDir is supplied, it must exist;
- // bucketCount should be HashBucketCount() of the (estimated) element count
- void Open(const char* fname, size_t bucketCount, const char* tmpDir = nullptr) {
- if (!tmpDir)
- tmpDir = ~(OurTmpDir = Sprintf("%s.temp", fname));
- Mkdir(tmpDir, MODE0775);
- Srt.Open(tmpDir);
- HashFileName = fname;
- BucketCount = bucketCount;
- int bitCount = 0;
- while (((size_t)1 << bitCount) <= BucketCount && bitCount < int(8 * sizeof(size_t)))
- ++bitCount;
- FreeBits = 8 * sizeof(size_t) - bitCount;
- }
-
- void Push(const TValueType& rec) {
- TIoRec* ioRec = MakeIoRec(rec);
- Srt.Push(ioRec);
- }
- TIoRec* MakeIoRec(const TValueType& rec) {
- TIoRec* ioRec = (TIoRec*)IoRec.Data();
- size_t mask = (1 << FreeBits) - 1;
- size_t hash = Hasher(rec.first);
- ioRec->HashVal = ((hash % BucketCount) << FreeBits) + ((hash / BucketCount) & mask);
-
- TMemoryOutput output(ioRec->Buf, IoRec.Capacity() - offsetof(TIoRec, Buf));
- KeySaver.SaveRecord(&output, rec);
- ioRec->RecSize = output.Buf() - ioRec->Buf;
- return ioRec;
- }
-
- bool Merge(TVector<std::pair<TKeyType, TMappedType>>& records, size_t newRecordSize) {
- TSthashIterator<const TKeyType, const TMappedType, typename HashType::hasher,
- typename HashType::key_equal>
- newPtr(CurrentBlockRecs.End() - newRecordSize);
- for (size_t i = 0; i < records.size(); ++i) {
- if (newPtr.KeyEquals(Equals, records[i].first)) {
- TMappedType oldValue = records[i].second;
- TMappedType newValue = newPtr.Value();
- newValue = merger(oldValue, newValue);
- records[i].second = newValue;
- return true;
- }
- }
- records.push_back(std::make_pair(newPtr.Key(), newPtr.Value()));
- return false;
- }
-
- void PutRecord(const char* buf, size_t rec_size, TFILEPtr& f, SizeType& cur_off) {
- f.fsput(buf, rec_size);
- cur_off += rec_size;
- }
-
- void Finish() {
- Srt.Sort();
- // We use variant 1.
- // Variant 1: read sorter once, write records, fseeks to write buckets
- // (this doesn't allow fname to be stdout)
- // Variant 2: read sorter (probably temp. file) twice: write buckets, then write records
- // (this allows fname to be stdout but seems to be longer)
- TFILEPtr f(HashFileName, "wb");
- setvbuf(f, nullptr, _IOFBF, WrBufSz);
- TVector<SizeType> bucketsBuf(WrBufSz, 0);
- // prepare header (note: this code must be unified with save_stl.h)
- typedef sthashtable_nvm_sv<typename HashType::hasher, typename HashType::key_equal, SizeType> sv_type;
- sv_type sv = {Hasher, Equals, BucketCount, 0, 0};
- // to do: m.b. use just the size of corresponding object?
- SizeType cur_off = sizeof(sv_type) +
- (sv.num_buckets + 1) * sizeof(SizeType);
- SizeType bkt_wroff = sizeof(sv_type), bkt_bufpos = 0, prev_bkt = 0, prev_hash = (SizeType)-1;
- bucketsBuf[bkt_bufpos++] = cur_off;
- // if might me better to write many zeroes here
- f.seek(cur_off, SEEK_SET);
- TVector<std::pair<TKeyType, TMappedType>> currentBlock;
- bool emptyFile = true;
- size_t prevRecSize = 0;
- // seek forward
- while (true) {
- const TIoRec* rec = Srt.Next();
- if (currentBlock.empty() && !emptyFile) {
- if (rec && prev_hash == rec->HashVal) {
- Merge(currentBlock, prevRecSize);
- } else {
- // if there is only one record with this hash, don't recode it, just write
- PutRecord(CurrentBlockRecs.Data(), prevRecSize, f, cur_off);
- sv.num_elements++;
- }
- }
- if (!rec || prev_hash != rec->HashVal) {
- // write buckets table
- for (size_t i = 0; i < currentBlock.size(); ++i) {
- TIoRec* ioRec = MakeIoRec(TValueType(currentBlock[i]));
- PutRecord(ioRec->Buf, ioRec->RecSize, f, cur_off);
- }
- sv.num_elements += currentBlock.size();
- currentBlock.clear();
- CurrentBlockRecs.Clear();
- if (rec) {
- prev_hash = rec->HashVal;
- }
- }
- // note: prev_bkt's semantics here is 'cur_bkt - 1', thus we are actually cycling
- // until cur_bkt == rec->HashVal *inclusively*
- while (!rec || prev_bkt != (rec->HashVal >> FreeBits)) {
- bucketsBuf[bkt_bufpos++] = cur_off;
- if (bkt_bufpos == bucketsBuf.size()) {
- f.seek(bkt_wroff, SEEK_SET);
- size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]);
- if (f.write(bucketsBuf.begin(), 1, sz) != sz)
- throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName;
- bkt_wroff += sz;
- bkt_bufpos = 0;
- f.seek(cur_off, SEEK_SET);
- }
- prev_bkt++;
- if (!rec) {
- break;
- }
- assert(prev_bkt < BucketCount);
- }
- if (!rec) {
- break;
- }
- emptyFile = false;
- CurrentBlockRecs.Append(rec->Buf, rec->RecSize);
- if (!currentBlock.empty()) {
- Merge(currentBlock, rec->RecSize);
- } else {
- prevRecSize = rec->RecSize;
- }
- }
- // finish buckets table
- f.seek(bkt_wroff, SEEK_SET);
- size_t sz = bkt_bufpos * sizeof(bucketsBuf[0]);
- if (sz && f.write(bucketsBuf.begin(), 1, sz) != sz)
- throw yexception() << "could not write " << sz << " bytes to " << ~HashFileName;
- bkt_wroff += sz;
- for (; prev_bkt < BucketCount; prev_bkt++)
- f.fput(cur_off);
- // finally write header
- sv.data_end_off = cur_off;
- f.seek(0, SEEK_SET);
- f.fput(sv);
- f.close();
- }
-
- void Close() {
- Srt.Close();
- if (+OurTmpDir)
- rmdir(~OurTmpDir);
- }
-};
diff --git a/library/cpp/microbdb/header.cpp b/library/cpp/microbdb/header.cpp
deleted file mode 100644
index f4511d6fb62..00000000000
--- a/library/cpp/microbdb/header.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#include "header.h"
-
-#include <util/stream/output.h>
-#include <util/stream/format.h>
-
-TString ToString(EMbdbErrors error) {
- TString ret;
- switch (error) {
- case MBDB_ALREADY_INITIALIZED:
- ret = "already initialized";
- break;
- case MBDB_NOT_INITIALIZED:
- ret = "not initialized";
- break;
- case MBDB_BAD_DESCRIPTOR:
- ret = "bad descriptor";
- break;
- case MBDB_OPEN_ERROR:
- ret = "open error";
- break;
- case MBDB_READ_ERROR:
- ret = "read error";
- break;
- case MBDB_WRITE_ERROR:
- ret = "write error";
- break;
- case MBDB_CLOSE_ERROR:
- ret = "close error";
- break;
- case MBDB_EXPECTED_EOF:
- ret = "expected eof";
- break;
- case MBDB_UNEXPECTED_EOF:
- ret = "unxepected eof";
- break;
- case MBDB_BAD_FILENAME:
- ret = "bad filename";
- break;
- case MBDB_BAD_METAPAGE:
- ret = "bad metapage";
- break;
- case MBDB_BAD_RECORDSIG:
- ret = "bad recordsig";
- break;
- case MBDB_BAD_FILE_SIZE:
- ret = "bad file size";
- break;
- case MBDB_BAD_PAGESIG:
- ret = "bad pagesig";
- break;
- case MBDB_BAD_PAGESIZE:
- ret = "bad pagesize";
- break;
- case MBDB_BAD_PARM:
- ret = "bad parm";
- break;
- case MBDB_BAD_SYNC:
- ret = "bad sync";
- break;
- case MBDB_PAGE_OVERFLOW:
- ret = "page overflow";
- break;
- case MBDB_NO_MEMORY:
- ret = "no memory";
- break;
- case MBDB_MEMORY_LEAK:
- ret = "memory leak";
- break;
- case MBDB_NOT_SUPPORTED:
- ret = "not supported";
- break;
- default:
- ret = "unknown";
- break;
- }
- return ret;
-}
-
-TString ErrorMessage(int error, const TString& text, const TString& path, ui32 recordSig, ui32 gotRecordSig) {
- TStringStream str;
- str << text;
- if (path.size())
- str << " '" << path << "'";
- str << ": " << ToString(static_cast<EMbdbErrors>(error));
- if (recordSig && (!gotRecordSig || recordSig != gotRecordSig))
- str << ". Expected RecordSig: " << Hex(recordSig, HF_ADDX);
- if (recordSig && gotRecordSig && recordSig != gotRecordSig)
- str << ", got: " << Hex(gotRecordSig, HF_ADDX);
- str << ". Last system error text: " << LastSystemErrorText();
- return str.Str();
-}
diff --git a/library/cpp/microbdb/header.h b/library/cpp/microbdb/header.h
deleted file mode 100644
index 0951d610ea7..00000000000
--- a/library/cpp/microbdb/header.h
+++ /dev/null
@@ -1,159 +0,0 @@
-#pragma once
-
-#include <util/system/defaults.h>
-#include <util/generic/typetraits.h>
-#include <util/generic/string.h>
-#include <util/str_stl.h>
-
-#include <stdio.h>
-
-#define METASIZE (1u << 12)
-#define METASIG 0x12345678u
-#define PAGESIG 0x87654321u
-
-enum EMbdbErrors {
- MBDB_ALREADY_INITIALIZED = 200,
- MBDB_NOT_INITIALIZED = 201,
- MBDB_BAD_DESCRIPTOR = 202,
- MBDB_OPEN_ERROR = 203,
- MBDB_READ_ERROR = 204,
- MBDB_WRITE_ERROR = 205,
- MBDB_CLOSE_ERROR = 206,
- MBDB_EXPECTED_EOF = 207,
- MBDB_UNEXPECTED_EOF = 208,
- MBDB_BAD_FILENAME = 209,
- MBDB_BAD_METAPAGE = 210,
- MBDB_BAD_RECORDSIG = 211,
- MBDB_BAD_FILE_SIZE = 212,
- MBDB_BAD_PAGESIG = 213,
- MBDB_BAD_PAGESIZE = 214,
- MBDB_BAD_PARM = 215,
- MBDB_BAD_SYNC = 216,
- MBDB_PAGE_OVERFLOW = 217,
- MBDB_NO_MEMORY = 218,
- MBDB_MEMORY_LEAK = 219,
- MBDB_NOT_SUPPORTED = 220
-};
-
-TString ToString(EMbdbErrors error);
-TString ErrorMessage(int error, const TString& text, const TString& path = TString(), ui32 recordSig = 0, ui32 gotRecordSig = 0);
-
-enum EPageFormat {
- MBDB_FORMAT_RAW = 0,
- MBDB_FORMAT_COMPRESSED = 1,
- MBDB_FORMAT_NULL = 255
-};
-
-enum ECompressionAlgorithm {
- MBDB_COMPRESSION_ZLIB = 1,
- MBDB_COMPRESSION_FASTLZ = 2,
- MBDB_COMPRESSION_SNAPPY = 3
-};
-
-struct TDatMetaPage {
- ui32 MetaSig;
- ui32 RecordSig;
- ui32 PageSize;
-};
-
-struct TDatPage {
- ui32 RecNum; //!< number of records on this page
- ui32 PageSig;
- ui32 Format : 2; //!< one of EPageFormat
- ui32 Reserved : 30;
-};
-
-/// Additional page header with compression info
-struct TCompressedPage {
- ui32 BlockCount;
- ui32 Algorithm : 4;
- ui32 Version : 4;
- ui32 Reserved : 24;
-};
-
-namespace NMicroBDB {
- /// Header of compressed block
- struct TCompressedHeader {
- ui32 Compressed;
- ui32 Original; /// original size of block
- ui32 Count; /// number of records in block
- ui32 Reserved;
- };
-
- Y_HAS_MEMBER(AssertValid);
-
- template <typename T, bool TVal>
- struct TAssertValid {
- void operator()(const T*) {
- }
- };
-
- template <typename T>
- struct TAssertValid<T, true> {
- void operator()(const T* rec) {
- return rec->AssertValid();
- }
- };
-
- template <typename T>
- void AssertValid(const T* rec) {
- return NMicroBDB::TAssertValid<T, NMicroBDB::THasAssertValid<T>::value>()(rec);
- }
-
- Y_HAS_MEMBER(SizeOf);
-
- template <typename T, bool TVal>
- struct TGetSizeOf;
-
- template <typename T>
- struct TGetSizeOf<T, true> {
- size_t operator()(const T* rec) {
- return rec->SizeOf();
- }
- };
-
- template <typename T>
- struct TGetSizeOf<T, false> {
- size_t operator()(const T*) {
- return sizeof(T);
- }
- };
-
- inline char* GetFirstRecord(const TDatPage* page) {
- switch (page->Format) {
- case MBDB_FORMAT_RAW:
- return (char*)page + sizeof(TDatPage);
- case MBDB_FORMAT_COMPRESSED:
- // Первая запись на сжатой странице сохраняется несжатой
- // сразу же после всех заголовков.
- // Алгоритм сохранения смотреть в TOutputRecordIterator::FlushBuffer
- return (char*)page + sizeof(TDatPage) + sizeof(TCompressedPage) + sizeof(NMicroBDB::TCompressedHeader);
- }
- return (char*)nullptr;
- }
-}
-
-template <typename T>
-size_t SizeOf(const T* rec) {
- return NMicroBDB::TGetSizeOf<T, NMicroBDB::THasSizeOf<T>::value>()(rec);
-}
-
-template <typename T>
-size_t MaxSizeOf() {
- return sizeof(T);
-}
-
-static inline int DatNameToIdx(char iname[/*FILENAME_MAX*/], const char* dname) {
- if (!dname || !*dname)
- return MBDB_BAD_FILENAME;
- const char* ptr;
- if (!(ptr = strrchr(dname, '/')))
- ptr = dname;
- if (!(ptr = strrchr(ptr, '.')))
- ptr = strchr(dname, 0);
- if (ptr - dname > FILENAME_MAX - 5)
- return MBDB_BAD_FILENAME;
- memcpy(iname, dname, ptr - dname);
- strcpy(iname + (ptr - dname), ".idx");
- return 0;
-}
diff --git a/library/cpp/microbdb/heap.h b/library/cpp/microbdb/heap.h
deleted file mode 100644
index ef5a53534cb..00000000000
--- a/library/cpp/microbdb/heap.h
+++ /dev/null
@@ -1,143 +0,0 @@
-#pragma once
-
-#include "header.h"
-#include "extinfo.h"
-
-#include <util/generic/vector.h>
-
-#include <errno.h>
-
-///////////////////////////////////////////////////////////////////////////////
-
-/// Default comparator
-template <class TVal>
-struct TCompareByLess {
- inline bool operator()(const TVal* a, const TVal* b) const {
- return TLess<TVal>()(*a, *b);
- }
-};
-
-///////////////////////////////////////////////////////////////////////////////
-
-template <class TVal, class TIterator, class TCompare = TCompareByLess<TVal>>
-class THeapIter {
-public:
- int Init(TIterator** iters, int count) {
- Term();
- if (!count)
- return 0;
- if (!(Heap = (TIterator**)malloc(count * sizeof(TIterator*))))
- return ENOMEM;
-
- Count = count;
- count = 0;
- while (count < Count)
- if (count && !(*iters)->Next()) { //here first TIterator is NOT initialized!
- Count--;
- iters++;
- } else {
- Heap[count++] = *iters++;
- }
- count = Count / 2;
- while (--count > 0) //Heap[0] is not changed!
- Sift(count, Count); //do not try to replace this code by make_heap
- return 0;
- }
-
- int Init(TIterator* iters, int count) {
- TVector<TIterator*> a(count);
- for (int i = 0; i < count; ++i)
- a[i] = &iters[i];
- return Init(&a[0], count);
- }
-
- THeapIter()
- : Heap(nullptr)
- , Count(0)
- {
- }
-
- THeapIter(TIterator* a, TIterator* b)
- : Heap(nullptr)
- , Count(0)
- {
- TIterator* arr[] = {a, b};
- if (Init(arr, 2))
- ythrow yexception() << "can't Init THeapIter";
- }
-
- THeapIter(TVector<TIterator>& v)
- : Heap(nullptr)
- , Count(0)
- {
- if (Init(&v[0], v.size())) {
- ythrow yexception() << "can't Init THeapIter";
- }
- }
-
- ~THeapIter() {
- Term();
- }
-
- inline const TVal* Current() const {
- if (!Count)
- return nullptr;
- return (*Heap)->Current();
- }
-
- inline const TIterator* CurrentIter() const {
- return *Heap;
- }
-
- //for ends of last file will use Heap[0] = Heap[0] ! and
- //returns Current of eof so Current of eof MUST return NULL
- //possible this is bug and need fixing
- const TVal* Next() {
- if (!Count)
- return nullptr;
- if (!(*Heap)->Next()) //on first call unitialized first TIterator
- *Heap = Heap[--Count]; //will be correctly initialized
-
- if (Count == 2) {
- if (TCompare()(Heap[1]->Current(), Heap[0]->Current()))
- DoSwap(Heap[1], Heap[0]);
- } else
- Sift(0, Count);
-
- return Current();
- }
-
- inline bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
- return (*Heap)->GetExtInfo(extInfo);
- }
-
- inline const ui8* GetExtInfoRaw(size_t* len) const {
- return (*Heap)->GetExtInfoRaw(len);
- }
-
- void Term() {
- ::free(Heap);
- Heap = nullptr;
- Count = 0;
- }
-
-protected:
- void Sift(int node, int end) {
- TIterator* x = Heap[node];
- int son;
- for (son = 2 * node + 1; son < end; node = son, son = 2 * node + 1) {
- if (son < (end - 1) && TCompare()(Heap[son + 1]->Current(), Heap[son]->Current()))
- son++;
- if (TCompare()(Heap[son]->Current(), x->Current()))
- Heap[node] = Heap[son];
- else
- break;
- }
- Heap[node] = x;
- }
-
- TIterator** Heap;
- int Count;
-};
-
-///////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/microbdb/input.h b/library/cpp/microbdb/input.h
deleted file mode 100644
index a214ba6e8ae..00000000000
--- a/library/cpp/microbdb/input.h
+++ /dev/null
@@ -1,1027 +0,0 @@
-#pragma once
-
-#include "header.h"
-#include "file.h"
-#include "reader.h"
-
-#include <util/system/maxlen.h>
-#include <util/system/event.h>
-#include <util/system/thread.h>
-
-#include <thread>
-
-#include <sys/uio.h>
-
-#include <errno.h>
-
-template <class TFileManip>
-inline ssize_t Readv(TFileManip& fileManip, const struct iovec* iov, int iovcnt) {
- ssize_t read_count = 0;
- for (int n = 0; n < iovcnt; n++) {
- ssize_t last_read = fileManip.Read(iov[n].iov_base, iov[n].iov_len);
- if (last_read < 0)
- return -1;
- read_count += last_read;
- }
- return read_count;
-}
-
-template <class TVal, typename TBasePageIter>
-class TInputRecordIterator: public TBasePageIter {
- typedef THolder<NMicroBDB::IBasePageReader<TVal>> TReaderHolder;
-
-public:
- typedef TBasePageIter TPageIter;
-
- TInputRecordIterator() {
- Init();
- }
-
- ~TInputRecordIterator() {
- Term();
- }
-
- const TVal* Current() const {
- return Rec;
- }
-
- bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
- if (!Rec)
- return false;
- return Reader->GetExtInfo(extInfo);
- }
-
- const ui8* GetExtInfoRaw(size_t* len) const {
- if (!Rec)
- return nullptr;
- return Reader->GetExtInfoRaw(len);
- }
-
- size_t GetRecSize() const {
- return Reader->GetRecSize();
- }
-
- size_t GetExtSize() const {
- return Reader->GetExtSize();
- }
-
- const TVal* Next() {
- if (RecNum)
- --RecNum;
- else {
- TDatPage* page = TPageIter::Next();
- if (!page) {
- if (TPageIter::IsFrozen() && Reader.Get())
- Reader->SetClearFlag();
- return Rec = nullptr;
- } else if (!!SelectReader())
- return Rec = nullptr;
- RecNum = TPageIter::Current()->RecNum - 1;
- }
- return Rec = Reader->Next();
- }
-
- // Skip(0) == Current(); Skip(1) == Next()
- const TVal* Skip(int& num) {
- // Y_ASSERT(num >= 0); ? otherwise it gets into infinite loop
- while (num > RecNum) {
- num -= RecNum + 1;
- if (!TPageIter::Next() || !!SelectReader()) {
- RecNum = 0;
- return Rec = nullptr;
- }
- RecNum = TPageIter::Current()->RecNum - 1;
- Rec = Reader->Next();
- }
- ++num;
- while (--num)
- Next();
- return Rec;
- }
-
- // begin reading from next page
- void Reset() {
- Rec = NULL;
- RecNum = 0;
- if (Reader.Get())
- Reader->Reset();
- }
-
-protected:
- int Init() {
- Rec = nullptr;
- RecNum = 0;
- Format = MBDB_FORMAT_NULL;
- return 0;
- }
-
- int Term() {
- Reader.Reset(nullptr);
- Format = MBDB_FORMAT_NULL;
- Rec = nullptr;
- RecNum = 0;
- return 0;
- }
-
- const TVal* GotoPage(int pageno) {
- if (!TPageIter::GotoPage(pageno) || !!SelectReader())
- return Rec = nullptr;
- RecNum = TPageIter::Current()->RecNum - 1;
- return Rec = Reader->Next();
- }
-
- int SelectReader() {
- if (!TPageIter::Current())
- return MBDB_UNEXPECTED_EOF;
- if (ui32(Format) != TPageIter::Current()->Format) {
- switch (TPageIter::Current()->Format) {
- case MBDB_FORMAT_RAW:
- Reader.Reset(new NMicroBDB::TRawPageReader<TVal, TPageIter>(this));
- break;
- case MBDB_FORMAT_COMPRESSED:
- Reader.Reset(new NMicroBDB::TCompressedReader<TVal, TPageIter>(this));
- break;
- default:
- return MBDB_NOT_SUPPORTED;
- }
- Format = EPageFormat(TPageIter::Current()->Format);
- } else {
- Y_ASSERT(Reader.Get() != nullptr);
- Reader->Reset();
- }
- return 0;
- }
-
- const TVal* Rec;
- TReaderHolder Reader;
- int RecNum; //!< number of records on the current page after the current record
- EPageFormat Format;
-};
-
-template <class TBaseReader>
-class TInputPageIterator: public TBaseReader {
-public:
- typedef TBaseReader TReader;
-
- TInputPageIterator()
- : Buf(nullptr)
- {
- Term();
- }
-
- ~TInputPageIterator() {
- Term();
- }
-
- TDatPage* Current() {
- return CurPage;
- }
-
- int Freeze() {
- return (Frozen = (PageNum == -1) ? 0 : PageNum);
- }
-
- void Unfreeze() {
- Frozen = -1;
- }
-
- inline int IsFrozen() const {
- return Frozen + 1;
- }
-
- inline size_t GetPageSize() const {
- return TReader::GetPageSize();
- }
-
- inline int GetPageNum() const {
- return PageNum;
- }
-
- inline int IsEof() const {
- return Eof;
- }
-
- TDatPage* Next() {
- if (PageNum >= Maxpage && ReadBuf()) {
- Eof = Eof ? Eof : TReader::IsEof();
- return CurPage = nullptr;
- }
- return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
- }
-
- TDatPage* GotoPage(int pageno) {
- if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) {
- PageNum = pageno;
- return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
- }
- if (IsFrozen() || TReader::GotoPage(pageno))
- return nullptr;
- Maxpage = PageNum = pageno - 1;
- Eof = 0;
- return Next();
- }
-
-protected:
- int Init(size_t pages, int pagesOrBytes) {
- Term();
- if (pagesOrBytes == -1)
- Bufpages = TReader::GetLastPage();
- else if (pagesOrBytes)
- Bufpages = pages;
- else
- Bufpages = pages / GetPageSize();
- if (!TReader::GetLastPage()) {
- Bufpages = 0;
- assert(Eof == 1);
- return 0;
- }
- int lastPage = TReader::GetLastPage();
- if (lastPage >= 0)
- Bufpages = (int)Min(lastPage, Bufpages);
- Bufpages = Max(2, Bufpages);
- Eof = 0;
- ABuf.Alloc(Bufpages * GetPageSize());
- return (Buf = ABuf.Begin()) ? 0 : ENOMEM;
- // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM;
- }
-
- int Term() {
- // free(Buf);
- ABuf.Dealloc();
- Buf = nullptr;
- Maxpage = PageNum = Frozen = -1;
- Bufpages = 0;
- Pages = 0;
- Eof = 1;
- CurPage = nullptr;
- return 0;
- }
-
- int ReadBuf() {
- int nvec;
- iovec vec[2];
- int maxpage = (Frozen == -1 ? Maxpage + 1 : Frozen) + Bufpages - 1;
- int minpage = Maxpage + 1;
- if (maxpage < minpage)
- return EAGAIN;
- minpage %= Bufpages;
- maxpage %= Bufpages;
- if (maxpage < minpage) {
- vec[0].iov_base = Buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
- vec[1].iov_base = Buf;
- vec[1].iov_len = GetPageSize() * (maxpage + 1);
- nvec = 2;
- } else {
- vec[0].iov_base = Buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
- nvec = 1;
- }
- TReader::ReadPages(vec, nvec, &Pages);
- Maxpage += Pages;
- return !Pages;
- }
-
- int Maxpage, PageNum, Frozen, Bufpages, Eof, Pages;
- TDatPage* CurPage;
- // TMappedArray<char> ABuf;
- TMappedAllocation ABuf;
- char* Buf;
-};
-
-template <class TBaseReader>
-class TInputPageIteratorMT: public TBaseReader {
-public:
- typedef TBaseReader TReader;
-
- TInputPageIteratorMT()
- : CurBuf(0)
- , CurReadBuf(0)
- , Buf(nullptr)
- {
- Term();
- }
-
- ~TInputPageIteratorMT() {
- Term();
- }
-
- TDatPage* Current() {
- return CurPage;
- }
-
- int Freeze() {
- return (Frozen = (PageNum == -1) ? 0 : PageNum);
- }
-
- void Unfreeze() {
- Frozen = -1;
- }
-
- inline int IsFrozen() const {
- return Frozen + 1;
- }
-
- inline size_t GetPageSize() const {
- return TReader::GetPageSize();
- }
-
- inline int GetPageNum() const {
- return PageNum;
- }
-
- inline int IsEof() const {
- return Eof;
- }
-
- TDatPage* Next() {
- if (Eof)
- return CurPage = nullptr;
- if (PageNum >= Maxpage && ReadBuf()) {
- Eof = Eof ? Eof : TReader::IsEof();
- return CurPage = nullptr;
- }
- return CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
- }
-
- TDatPage* GotoPage(int pageno) {
- if (pageno <= Maxpage && pageno >= (Maxpage - Pages + 1)) {
- PageNum = pageno;
- return CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
- }
- if (IsFrozen() || TReader::GotoPage(pageno))
- return nullptr;
- Maxpage = PageNum = pageno - 1;
- Eof = 0;
- return Next();
- }
-
- void ReadPages() {
- // fprintf(stderr, "ReadPages started\n");
- bool eof = false;
- while (!eof) {
- QEvent[CurBuf].Wait();
- if (Finish)
- return;
- int pages = ReadCurBuf(Bufs[CurBuf]);
- PagesM[CurBuf] = pages;
- eof = !pages;
- AEvent[CurBuf].Signal();
- CurBuf ^= 1;
- }
- }
-
-protected:
- int Init(size_t pages, int pagesOrBytes) {
- Term();
- if (pagesOrBytes == -1)
- Bufpages = TReader::GetLastPage();
- else if (pagesOrBytes)
- Bufpages = pages;
- else
- Bufpages = pages / GetPageSize();
- if (!TReader::GetLastPage()) {
- Bufpages = 0;
- assert(Eof == 1);
- return 0;
- }
- int lastPage = TReader::GetLastPage();
- if (lastPage >= 0)
- Bufpages = (int)Min(lastPage, Bufpages);
- Bufpages = Max(2, Bufpages);
- Eof = 0;
- ABuf.Alloc(Bufpages * GetPageSize() * 2);
- Bufs[0] = ABuf.Begin();
- Bufs[1] = Bufs[0] + Bufpages * GetPageSize();
- // return (Buf = (char*)malloc(Bufpages * GetPageSize())) ? 0 : ENOMEM;
- Finish = false;
- ReadThread = std::thread([this]() {
- TThread::SetCurrentThreadName("DatReader");
- ReadPages();
- });
- QEvent[0].Signal();
- return Bufs[0] ? 0 : ENOMEM;
- }
-
- void StopThread() {
- Finish = true;
- QEvent[0].Signal();
- QEvent[1].Signal();
- ReadThread.join();
- }
-
- int Term() {
- // free(Buf);
- if (ReadThread.joinable())
- StopThread();
- ABuf.Dealloc();
- Buf = nullptr;
- Bufs[0] = nullptr;
- Bufs[1] = nullptr;
- Maxpage = MaxpageR = PageNum = Frozen = -1;
- Bufpages = 0;
- Pages = 0;
- Eof = 1;
- CurPage = nullptr;
- return 0;
- }
-
- int ReadCurBuf(char* buf) {
- int nvec;
- iovec vec[2];
- int maxpage = (Frozen == -1 ? MaxpageR + 1 : Frozen) + Bufpages - 1;
- int minpage = MaxpageR + 1;
- if (maxpage < minpage)
- return EAGAIN;
- minpage %= Bufpages;
- maxpage %= Bufpages;
- if (maxpage < minpage) {
- vec[0].iov_base = buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
- vec[1].iov_base = buf;
- vec[1].iov_len = GetPageSize() * (maxpage + 1);
- nvec = 2;
- } else {
- vec[0].iov_base = buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
- nvec = 1;
- }
- int pages;
- TReader::ReadPages(vec, nvec, &pages);
- MaxpageR += pages;
- return pages;
- }
-
- int ReadBuf() {
- QEvent[CurReadBuf ^ 1].Signal();
- AEvent[CurReadBuf].Wait();
- Buf = Bufs[CurReadBuf];
- Maxpage += (Pages = PagesM[CurReadBuf]);
- CurReadBuf ^= 1;
- return !Pages;
- }
-
- int Maxpage, MaxpageR, PageNum, Frozen, Bufpages, Eof, Pages;
- TDatPage* CurPage;
- // TMappedArray<char> ABuf;
- ui32 CurBuf;
- ui32 CurReadBuf;
- TMappedAllocation ABuf;
- char* Buf;
- char* Bufs[2];
- ui32 PagesM[2];
- TAutoEvent QEvent[2];
- TAutoEvent AEvent[2];
- std::thread ReadThread;
- bool Finish;
-};
-
-template <typename TFileManip>
-class TInputPageFileImpl: private TNonCopyable {
-protected:
- TFileManip FileManip;
-
-public:
- TInputPageFileImpl()
- : Pagesize(0)
- , Fd(-1)
- , Eof(1)
- , Error(0)
- , Pagenum(0)
- , Recordsig(0)
- {
- Term();
- }
-
- ~TInputPageFileImpl() {
- Term();
- }
-
- inline int IsEof() const {
- return Eof;
- }
-
- inline int GetError() const {
- return Error;
- }
-
- inline size_t GetPageSize() const {
- return Pagesize;
- }
-
- inline int GetLastPage() const {
- return Pagenum;
- }
-
- inline ui32 GetRecordSig() const {
- return Recordsig;
- }
-
- inline bool IsOpen() const {
- return FileManip.IsOpen();
- }
-
-protected:
- int Init(const char* fname, ui32 recsig, ui32* gotrecsig = nullptr, bool direct = false) {
- Error = FileManip.Open(fname, direct);
- return Error ? Error : Init(TFile(), recsig, gotrecsig);
- }
-
- int Init(const TFile& file, ui32 recsig, ui32* gotrecsig = nullptr) {
- if (!file.IsOpen() && !FileManip.IsOpen())
- return MBDB_NOT_INITIALIZED;
- if (file.IsOpen() && FileManip.IsOpen())
- return MBDB_ALREADY_INITIALIZED;
- if (file.IsOpen()) {
- Error = FileManip.Init(file);
- if (Error)
- return Error;
- }
-
- // TArrayHolder<ui8> buf(new ui8[METASIZE + FS_BLOCK_SIZE]);
- // ui8* ptr = (buf.Get() + FS_BLOCK_SIZE - ((ui64)buf.Get() & (FS_BLOCK_SIZE - 1)));
- TMappedArray<ui8> buf;
- buf.Create(METASIZE);
- ui8* ptr = &buf[0];
- TDatMetaPage* meta = (TDatMetaPage*)ptr;
- ssize_t size = METASIZE;
- ssize_t ret;
- while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) {
- Y_ASSERT(ret <= size);
- size -= ret;
- ptr += ret;
- }
- if (size) {
- FileManip.Close();
- return Error = MBDB_BAD_METAPAGE;
- }
- if (gotrecsig)
- *gotrecsig = meta->RecordSig;
- return Init(TFile(), meta, recsig);
- }
-
- int Init(TAutoPtr<IInputStream> input, ui32 recsig, ui32* gotrecsig = nullptr) {
- if (!input && !FileManip.IsOpen())
- return MBDB_NOT_INITIALIZED;
- if (FileManip.IsOpen())
- return MBDB_ALREADY_INITIALIZED;
-
- Error = FileManip.Open(input);
- if (Error)
- return Error;
-
- TArrayHolder<ui8> buf(new ui8[METASIZE]);
- ui8* ptr = buf.Get();
- ssize_t size = METASIZE;
- ssize_t ret;
- while (size && (ret = FileManip.Read(ptr, (unsigned)size)) > 0) {
- Y_ASSERT(ret <= size);
- size -= ret;
- ptr += ret;
- }
- if (size) {
- FileManip.Close();
- return Error = MBDB_BAD_METAPAGE;
- }
- TDatMetaPage* meta = (TDatMetaPage*)buf.Get();
- if (gotrecsig)
- *gotrecsig = meta->RecordSig;
- return Init(TFile(), meta, recsig);
- }
-
- int Init(const TFile& file, const TDatMetaPage* meta, ui32 recsig) {
- if (!file.IsOpen() && !FileManip.IsOpen())
- return MBDB_NOT_INITIALIZED;
- if (file.IsOpen() && FileManip.IsOpen())
- return MBDB_ALREADY_INITIALIZED;
- if (file.IsOpen()) {
- Error = FileManip.Init(file);
- if (Error)
- return Error;
- }
-
- if (meta->MetaSig != METASIG)
- Error = MBDB_BAD_METAPAGE;
- else if (meta->RecordSig != recsig)
- Error = MBDB_BAD_RECORDSIG;
-
- if (Error) {
- FileManip.Close();
- return Error;
- }
-
- i64 flength = FileManip.GetLength();
- if (flength >= 0) {
- i64 fsize = flength;
- fsize -= METASIZE;
- if (fsize % meta->PageSize)
- return Error = MBDB_BAD_FILE_SIZE;
- Pagenum = (int)(fsize / meta->PageSize);
- } else {
- Pagenum = -1;
- }
- Pagesize = meta->PageSize;
- Recordsig = meta->RecordSig;
- Error = Eof = 0;
- return Error;
- }
-
- int ReadPages(iovec* vec, int nvec, int* pages) {
- *pages = 0;
-
- if (Eof || Error)
- return Error;
-
- ssize_t size = 0, delta = 0, total = 0;
- iovec* pvec = vec;
- int vsize = nvec;
-
- while (vsize && (size = Readv(FileManip, pvec, (int)Min(vsize, 16))) > 0) {
- total += size;
- if (delta) {
- size += delta;
- pvec->iov_len += delta;
- pvec->iov_base = (char*)pvec->iov_base - delta;
- delta = 0;
- }
- while (size) {
- if ((size_t)size >= pvec->iov_len) {
- size -= pvec->iov_len;
- ++pvec;
- --vsize;
- } else {
- delta = size;
- pvec->iov_len -= size;
- pvec->iov_base = (char*)pvec->iov_base + size;
- size = 0;
- }
- }
- }
- if (delta) {
- pvec->iov_len += delta;
- pvec->iov_base = (char*)pvec->iov_base - delta;
- }
- if (size < 0)
- return Error = errno ? errno : MBDB_READ_ERROR;
- if (total % Pagesize)
- return Error = MBDB_BAD_FILE_SIZE;
- if (vsize)
- Eof = 1;
- *pages = total / Pagesize; // it would be better to assign it after the for-loops
- for (; total; ++vec, total -= size)
- for (size = 0; size < total && (size_t)size < vec->iov_len; size += Pagesize)
- if (((TDatPage*)((char*)vec->iov_base + size))->PageSig != PAGESIG)
- return Error = MBDB_BAD_PAGESIG;
- return Error;
- }
-
- int GotoPage(int page) {
- if (Error)
- return Error;
- Eof = 0;
- i64 offset = (i64)page * Pagesize + METASIZE;
- if (offset != FileManip.Seek(offset, SEEK_SET))
- Error = MBDB_BAD_FILE_SIZE;
- return Error;
- }
-
- int Term() {
- return FileManip.Close();
- }
-
- size_t Pagesize;
- int Fd;
- int Eof;
- int Error;
- int Pagenum; //!< number of pages in this file
- ui32 Recordsig;
-};
-
-template <class TBaseReader>
-class TMappedInputPageIterator: public TBaseReader {
-public:
- typedef TBaseReader TReader;
-
- TMappedInputPageIterator() {
- Term();
- }
-
- ~TMappedInputPageIterator() {
- Term();
- }
-
- TDatPage* Current() {
- return CurPage;
- }
-
- inline size_t GetPageSize() const {
- return TReader::GetPageSize();
- }
-
- inline int GetPageNum() const {
- return PageNum;
- }
-
- inline int IsEof() const {
- return Eof;
- }
-
- inline int IsFrozen() const {
- return 0;
- }
-
- TDatPage* Next() {
- i64 pos = (i64)(++PageNum) * GetPageSize() + METASIZE;
- if (pos < 0 || pos >= (i64)TReader::GetSize()) {
- Eof = 1;
- return CurPage = nullptr;
- }
- return CurPage = (TDatPage*)((char*)TReader::GetData() + pos);
- }
-
-protected:
- int Init(size_t /*pages*/, int /*pagesOrBytes*/) {
- Term();
- Eof = 0;
- return 0;
- }
-
- int Term() {
- PageNum = -1;
- Eof = 1;
- CurPage = nullptr;
- return 0;
- }
-
- TDatPage* GotoPage(int pageno) {
- PageNum = pageno - 1;
- Eof = 0;
- return Next();
- }
-
- int PageNum, Eof, Pages, Pagenum;
- TDatPage* CurPage;
-};
-
-using TInputPageFile = TInputPageFileImpl<TInputFileManip>;
-
-template <class TVal,
- typename TBaseRecIter = TInputRecordIterator<TVal, TInputPageIterator<TInputPageFile>>>
-class TInDatFileImpl: public TBaseRecIter {
-public:
- typedef TBaseRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TPageIter::TReader TReader;
- using TRecIter::GotoPage;
-
- int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr, bool direct = false) {
- int ret = TReader::Init(fname, TVal::RecordSig, gotRecordSig, direct);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Open(const TFile& file, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
- int ret = TReader::Init(file, TVal::RecordSig, gotRecordSig);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Open(TAutoPtr<IInputStream> input, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
- int ret = TReader::Init(input, TVal::RecordSig, gotRecordSig);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Open(const TFile& file, const TDatMetaPage* meta, size_t pages = 1, int pagesOrBytes = 1) {
- int ret = TReader::Init(file, meta, TVal::RecordSig);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Close() {
- int ret1 = TRecIter::Term();
- int ret2 = TPageIter::Term();
- int ret3 = TReader::Term();
- return ret1 ? ret1 : ret2 ? ret2 : ret3;
- }
-
- const TVal* GotoLastPage() {
- return TReader::GetLastPage() <= 0 ? nullptr : TRecIter::GotoPage(TReader::GetLastPage() - 1);
- }
-
-private:
- int Open2(size_t pages, int pagesOrBytes) {
- int ret = TPageIter::Init(pages, pagesOrBytes);
- if (!ret)
- ret = TRecIter::Init();
- if (ret)
- Close();
- return ret;
- }
-};
-
-template <class TVal>
-class TInIndexFile: protected TInDatFileImpl<TVal> {
- typedef TInDatFileImpl<TVal> TDatFile;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TExtInfoType<TVal>::TResult TExtInfo;
-
-public:
- using TDatFile::IsOpen;
-
- TInIndexFile()
- : Index0(nullptr)
- {
- }
-
- int Open(const char* fname, size_t pages = 2, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) {
- int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig);
- if (ret)
- return ret;
- if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) {
- TDatFile::Close();
- return MBDB_NO_MEMORY;
- }
- if (!TExtInfoType<TVal>::Exists && SizeOf((TVal*)nullptr))
- RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TVal*)nullptr));
- TDatFile::Next();
- memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize());
- return 0;
- }
-
- int Close() {
- free(Index0);
- Index0 = nullptr;
- return TDatFile::Close();
- }
-
- inline int GetError() const {
- return TDatFile::GetError();
- }
-
- int FindKey(const TVal* akey, const TExtInfo* extInfo = nullptr) {
- assert(IsOpen());
- if (TExtInfoType<TVal>::Exists || !SizeOf((TVal*)nullptr))
- return FindVszKey(akey, extInfo);
- int num = FindKeyOnPage(Index0, akey);
- TDatPage* page = TPageIter::GotoPage(num + 1);
- if (!page)
- return 0;
- num = FindKeyOnPage(page, akey);
- num += (TPageIter::GetPageNum() - 1) * RecsOnPage;
- return num;
- }
-
- int FindVszKey(const TVal* akey, const TExtInfo* extInfo = NULL) {
- int num = FindVszKeyOnPage(Index0, akey, extInfo);
- int num_add = 0;
- for (int p = 0; p < num; p++) {
- TDatPage* page = TPageIter::GotoPage(p + 1);
- if (!page)
- return 0;
- num_add += page->RecNum;
- }
- TDatPage* page = TPageIter::GotoPage(num + 1);
- if (!page)
- return 0;
- num = FindVszKeyOnPage(page, akey, extInfo);
- num += num_add;
- return num;
- }
-
-protected:
- int FindKeyOnPage(TDatPage* page, const TVal* key) {
- int left = 0;
- int right = page->RecNum - 1;
- int recsize = DatCeil(SizeOf((TVal*)nullptr));
- while (left < right) {
- int middle = (left + right) >> 1;
- if (*((TVal*)((char*)page + sizeof(TDatPage) + middle * recsize)) < *key)
- left = middle + 1;
- else
- right = middle;
- }
- //borders check (left and right)
- return (left == 0 || *((TVal*)((char*)page + sizeof(TDatPage) + left * recsize)) < *key) ? left : left - 1;
- }
-
- // will deserialize rawExtinfoA to extInfoA only if necessery
- inline bool KeyLess_(const TVal* a, const TVal* b,
- TExtInfo* extInfoA, const TExtInfo* extInfoB,
- const ui8* rawExtInfoA, size_t rawLen) {
- if (*a < *b) {
- return true;
- } else if (!extInfoB || *b < *a) {
- return false;
- } else {
- // *a == *b && extInfoB
- Y_PROTOBUF_SUPPRESS_NODISCARD extInfoA->ParseFromArray(rawExtInfoA, rawLen);
- return (*extInfoA < *extInfoB);
- }
- }
-
- int FindVszKeyOnPage(TDatPage* page, const TVal* key, const TExtInfo* extInfo) {
- TVal* cur = (TVal*)((char*)page + sizeof(TDatPage));
- ui32 recnum = page->RecNum;
- if (!TExtInfoType<TVal>::Exists) {
- for (; recnum > 0 && *cur < *key; --recnum)
- cur = (TVal*)((char*)cur + DatCeil(SizeOf(cur)));
- } else {
- size_t ll;
- size_t l;
- size_t sz = NMicroBDB::SizeOfExt(cur, &ll, &l);
- TExtInfo ei;
- for (; recnum > 0 && KeyLess_(cur, key, &ei, extInfo, (ui8*)cur + sz + ll, l); --recnum) {
- cur = (TVal*)((ui8*)cur + DatCeil(sz + ll + l));
- sz = NMicroBDB::SizeOfExt(cur, &ll, &l);
- }
- }
-
- int idx = page->RecNum - recnum - 1;
- return (idx >= 0) ? idx : 0;
- }
-
- TDatPage* Index0;
- int RecsOnPage;
-};
-
-template <class TVal, class TKey, class TPageIterator = TInputPageIterator<TInputPageFile>>
-class TKeyFileMixin: public TInDatFileImpl<TVal, TInputRecordIterator<TVal, TPageIterator>> {
-protected:
- TInIndexFile<TKey> KeyFile;
-};
-
-template <class TVal, class TKey, class TBase = TKeyFileMixin<TVal, TKey>>
-class TDirectInDatFile: public TBase {
- typedef TBase TDatFile;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TDatFile::TPageIter TPageIter;
-
-public:
- void Open(const char* path, size_t pages = 1, size_t keypages = 1, int pagesOrBytes = 1) {
- int ret;
- ui32 gotRecordSig = 0;
-
- ret = TDatFile::Open(path, pages, pagesOrBytes, &gotRecordSig);
- if (ret) {
- ythrow yexception() << ErrorMessage(ret, "Failed to open input file", path, TVal::RecordSig, gotRecordSig);
- }
- char KeyName[PATH_MAX + 1];
- if (DatNameToIdx(KeyName, path)) {
- ythrow yexception() << ErrorMessage(MBDB_BAD_FILENAME, "Failed to open input file", path);
- }
- gotRecordSig = 0;
- ret = KeyFile.Open(KeyName, keypages, 1, &gotRecordSig);
- if (ret) {
- ythrow yexception() << ErrorMessage(ret, "Failed to open input keyfile", KeyName, TKey::RecordSig, gotRecordSig);
- }
- }
-
- void Close() {
- int ret;
-
- if (TDatFile::IsOpen() && (ret = TDatFile::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing input file");
- if ((ret = TDatFile::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing input file");
-
- if (KeyFile.IsOpen() && (ret = KeyFile.GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing input keyfile");
- if ((ret = KeyFile.Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing input keyfile");
- }
-
- const TVal* FindRecord(const TKey* key, const typename TExtInfoType<TKey>::TResult* extInfo = nullptr) {
- int page = KeyFile.FindKey(key, extInfo);
- const TVal* val = TRecIter::GotoPage(page);
- if (!TExtInfoType<TVal>::Exists || !extInfo) {
- TKey k;
- while (val) {
- TMakeExtKey<TVal, TKey>::Make(&k, nullptr, val, nullptr);
- if (!(k < *key))
- break;
- val = TRecIter::Next();
- }
- } else {
- typename TExtInfoType<TVal>::TResult valExt;
- TKey k;
- typename TExtInfoType<TKey>::TResult kExt;
- while (val) {
- TRecIter::GetExtInfo(&valExt);
- TMakeExtKey<TVal, TKey>::Make(&k, &kExt, val, &valExt);
- if (*key < k || !(k < *key) && !(kExt < *extInfo)) // k > *key || k == *key && kExt >= *extInfo
- break;
- val = TRecIter::Next();
- }
- }
- return val;
- }
-
- int FindPagesNo(const TKey* key, const typename TExtInfoType<TVal>::TResult* extInfo = NULL) {
- return KeyFile.FindKey(key, extInfo);
- }
-
-protected:
- using TBase::KeyFile;
-};
diff --git a/library/cpp/microbdb/microbdb.cpp b/library/cpp/microbdb/microbdb.cpp
deleted file mode 100644
index c10dbdf1268..00000000000
--- a/library/cpp/microbdb/microbdb.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "microbdb.h"
diff --git a/library/cpp/microbdb/microbdb.h b/library/cpp/microbdb/microbdb.h
deleted file mode 100644
index 75218873374..00000000000
--- a/library/cpp/microbdb/microbdb.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include <util/folder/dirut.h>
-
-#if defined(_MSC_VER)
-#pragma warning(push)
-#pragma warning(disable : 4706) /*assignment within conditional expression*/
-#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/
-#endif
-
-#include "align.h"
-#include "extinfo.h"
-#include "header.h"
-#include "reader.h"
-#include "heap.h"
-#include "file.h"
-#include "sorter.h"
-#include "input.h"
-#include "output.h"
-#include "sorterdef.h"
-
-inline int MakeSorterTempl(char path[/*FILENAME_MAX*/], const char* prefix) {
- int ret = MakeTempDir(path, prefix);
- if (!ret && strlcat(path, "%06d", FILENAME_MAX) > FILENAME_MAX - 100)
- ret = EINVAL;
- if (ret)
- path[0] = 0;
- return ret;
-}
-
-inline int GetMeta(TFile& file, TDatMetaPage* meta) {
- ui8 buf[METASIZE], *ptr = buf;
- ssize_t size = sizeof(buf), ret;
- while (size && (ret = file.Read(ptr, size)) > 0) {
- size -= ret;
- ptr += ret;
- }
- if (size)
- return MBDB_BAD_FILE_SIZE;
- ptr = buf; // gcc 4.4 warning fix
- *meta = *(TDatMetaPage*)ptr;
- return (meta->MetaSig == METASIG) ? 0 : MBDB_BAD_METAPAGE;
-}
-
-template <class TRec>
-inline bool IsDatFile(const char* fname) {
- TDatMetaPage meta;
- TFile f(fname, RdOnly);
- return !GetMeta(f, &meta) && meta.RecordSig == TRec::RecordSig;
-}
-
-#if defined(_MSC_VER)
-#pragma warning(pop)
-#endif
diff --git a/library/cpp/microbdb/noextinfo.proto b/library/cpp/microbdb/noextinfo.proto
deleted file mode 100644
index 6a78882e079..00000000000
--- a/library/cpp/microbdb/noextinfo.proto
+++ /dev/null
@@ -1,4 +0,0 @@
-
-message TNoExtInfo {
-}
-
diff --git a/library/cpp/microbdb/output.h b/library/cpp/microbdb/output.h
deleted file mode 100644
index d0ecab21085..00000000000
--- a/library/cpp/microbdb/output.h
+++ /dev/null
@@ -1,1049 +0,0 @@
-#pragma once
-
-#include "header.h"
-#include "file.h"
-
-#include <util/generic/buffer.h>
-#include <util/memory/tempbuf.h>
-
-#include <sys/uio.h>
-
-template <class TFileManip>
-inline ssize_t Writev(TFileManip& fileManip, const struct iovec* iov, int iovcnt) {
- ssize_t written_count = 0;
- for (int n = 0; n < iovcnt; n++) {
- ssize_t last_write = fileManip.Write(iov[n].iov_base, iov[n].iov_len);
- if (last_write < 0)
- return -1;
- written_count += last_write;
- }
- return written_count;
-}
-
-//*********************************************************************
-struct TFakeIndexer {
- inline void NextPage(TDatPage*) noexcept {
- }
-};
-
-struct TCallbackIndexer {
- typedef void (*TCallback)(void* This, const TDatPage* page);
-
- TCallbackIndexer() {
- Callback = nullptr;
- }
-
- void SetCallback(void* t, TCallback c) {
- This = t;
- Callback = c;
- }
-
- void NextPage(TDatPage* dat) {
- Callback(This, dat);
- }
-
- TCallback Callback;
- void* This;
-};
-
-template <class TVal, typename TBasePageIter, typename TBaseIndexer = TFakeIndexer, typename TCompressor = TFakeCompression>
-class TOutputRecordIterator;
-
-template <class TVal, typename TBasePageIter, typename TBaseIndexer>
-class TOutputRecordIterator<TVal, TBasePageIter, TBaseIndexer, TFakeCompression>
- : public TBasePageIter, public TBaseIndexer {
-public:
- enum EOffset {
- WrongOffset = size_t(-1)
- };
-
- typedef TBasePageIter TPageIter;
- typedef TBaseIndexer TIndexer;
-
- TOutputRecordIterator() {
- Clear();
- }
-
- ~TOutputRecordIterator() {
- Term();
- }
-
- inline const TVal* Current() const {
- return Rec;
- }
-
- const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
- NMicroBDB::AssertValid(v);
- size_t len = SizeOf(v);
- if (!TExtInfoType<TVal>::Exists)
- return (Reserve(len)) ? (TVal*)memcpy(Rec, v, len) : nullptr;
- else if (extInfo) {
- size_t extSize = extInfo->ByteSize();
- size_t extLenSize = len_long((i64)extSize);
- if (!Reserve(len + extLenSize + extSize))
- return nullptr;
- memcpy(Rec, v, len);
- out_long((i64)extSize, (char*)Rec + len);
- extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize);
- return Rec;
- } else {
- size_t extLenSize = len_long((i64)0);
- if (!Reserve(len + extLenSize))
- return nullptr;
- memcpy(Rec, v, len);
- out_long((i64)0, (char*)Rec + len);
- return Rec;
- }
- }
-
- const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
- NMicroBDB::AssertValid(v);
- size_t sz = SizeOf(v);
- if (!Reserve(sz + extLen))
- return nullptr;
- memcpy(Rec, v, sz);
- memcpy((ui8*)Rec + sz, extInfoRaw, extLen);
- return Rec;
- }
-
- // use values stored in microbdb readers/writers internal buffer only.
- // method expects serialized extInfo after this record
- const TVal* PushWithExtInfo(const TVal* v) {
- NMicroBDB::AssertValid(v);
- size_t extSize;
- size_t extLenSize;
- size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize);
- sz += extLenSize + extSize;
- if (!Reserve(sz))
- return nullptr;
- memcpy(Rec, v, sz);
- return Rec;
- }
-
- TVal* Reserve(size_t len) {
- if (CurLen + DatCeil(len) > TPageIter::GetPageSize()) {
- if (sizeof(TDatPage) + DatCeil(len) > TPageIter::GetPageSize())
- return Rec = nullptr;
- if (TPageIter::Current() && RecNum) {
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_RAW;
- memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
- TIndexer::NextPage(TPageIter::Current());
- RecNum = 0;
- }
- if (!TPageIter::Next()) {
- CurLen = TPageIter::GetPageSize();
- return Rec = nullptr;
- }
- CurLen = sizeof(TDatPage);
- }
- LenForOffset = CurLen;
- Rec = (TVal*)((char*)TPageIter::Current() + CurLen);
- DatSet(Rec, len);
-
- CurLen += DatCeil(len);
-
- ++RecNum;
- return Rec;
- }
-
- void Flush() {
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_RAW;
- }
-
- size_t Offset() const {
- return Rec ? TPageIter::Offset() + LenForOffset : WrongOffset;
- }
-
- void ResetDat() {
- CurLen = (char*)Rec - (char*)TPageIter::Current();
- size_t len;
- if (!TExtInfoType<TVal>::Exists) {
- len = SizeOf(Rec);
- } else {
- size_t ll;
- size_t l;
- len = NMicroBDB::SizeOfExt(Rec, &ll, &l);
- len += ll + l;
- }
- CurLen += DatCeil(len);
- }
-
-protected:
- void Clear() {
- Rec = nullptr;
- RecNum = 0;
- CurLen = 0;
- LenForOffset = 0;
- }
-
- int Init() {
- Clear();
- CurLen = TPageIter::GetPageSize();
- return 0;
- }
-
- int Term() {
- if (TPageIter::Current()) {
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_RAW;
- memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
- RecNum = 0;
- }
- int ret = !TPageIter::Current() && RecNum;
- Clear();
- return ret;
- }
-
- int GotoPage(int pageno) {
- if (TPageIter::Current()) {
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_RAW;
- memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
- }
- int ret = TPageIter::GotoPage(pageno);
- if (!ret) {
- RecNum = 0;
- CurLen = sizeof(TDatPage);
- }
- return ret;
- }
-
- TVal* Rec;
- int RecNum;
- size_t CurLen;
- size_t LenForOffset;
-};
-
-template <class TVal, typename TBasePageIter, typename TBaseIndexer, typename TAlgorithm>
-class TOutputRecordIterator
- : public TBasePageIter,
- public TBaseIndexer,
- private TAlgorithm {
- class TPageBuffer {
- public:
- void Init(size_t page) {
- Pos = 0;
- RecNum = 0;
- Size = Min(page / 2, size_t(64 << 10));
- Data.Reset(new ui8[Size]);
- }
-
- void Clear() {
- Pos = 0;
- RecNum = 0;
- }
-
- inline bool Empty() const {
- return RecNum == 0;
- }
-
- public:
- size_t Size;
- size_t Pos;
- int RecNum;
- TArrayHolder<ui8> Data;
- };
-
-public:
- typedef TBasePageIter TPageIter;
- typedef TBaseIndexer TIndexer;
-
- TOutputRecordIterator()
- : Rec(nullptr)
- , RecNum(0)
- {
- }
-
- ~TOutputRecordIterator() {
- Term();
- }
-
- const TVal* Current() const {
- return Rec;
- }
-
- const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
- NMicroBDB::AssertValid(v);
- size_t len = SizeOf(v);
- if (!TExtInfoType<TVal>::Exists)
- return (Reserve(len)) ? (TVal*)memcpy((TVal*)Rec, v, len) : nullptr;
- else if (extInfo) {
- size_t extSize = extInfo->ByteSize();
- size_t extLenSize = len_long((i64)extSize);
- if (!Reserve(len + extLenSize + extSize))
- return nullptr;
- memcpy(Rec, v, len);
- out_long((i64)extSize, (char*)Rec + len);
- extInfo->SerializeWithCachedSizesToArray((ui8*)Rec + len + extLenSize);
- return Rec;
- } else {
- size_t extLenSize = len_long((i64)0);
- if (!Reserve(len + extLenSize))
- return nullptr;
- memcpy(Rec, v, len);
- out_long((i64)0, (char*)Rec + len);
- return Rec;
- }
- }
-
- const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
- NMicroBDB::AssertValid(v);
- size_t sz = SizeOf(v);
- if (!Reserve(sz + extLen))
- return NULL;
- memcpy(Rec, v, sz);
- memcpy((ui8*)Rec + sz, extInfoRaw, extLen);
- return Rec;
- }
-
- // use values stored in microbdb readers/writers internal buffer only.
- // method expects serialized extInfo after this record
- const TVal* PushWithExtInfo(const TVal* v) {
- NMicroBDB::AssertValid(v);
- size_t extSize;
- size_t extLenSize;
- size_t sz = NMicroBDB::SizeOfExt(v, &extLenSize, &extSize);
- sz += extLenSize + extSize;
- if (!Reserve(sz))
- return nullptr;
- memcpy(Rec, v, sz);
- return Rec;
- }
-
- TVal* Reserve(const size_t len) {
- const size_t aligned = DatCeil(len);
-
- if (!TPageIter::Current()) { // Allocate fist page
- if (!TPageIter::Next()) {
- CurLen = TPageIter::GetPageSize();
- return Rec = nullptr;
- }
- CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
- }
-
- if (Buffer.Pos + aligned > Buffer.Size) {
- if (Buffer.Pos == 0)
- return Rec = nullptr;
- if (FlushBuffer())
- return Rec = nullptr;
- if (Buffer.Pos + aligned + sizeof(TDatPage) + sizeof(TCompressedPage) > Buffer.Size)
- return Rec = nullptr;
- }
-
- Rec = (TVal*)((char*)Buffer.Data.Get() + Buffer.Pos);
- DatSet(Rec, len); // len is correct because DatSet set align tail to zero
-
- Buffer.RecNum++;
- Buffer.Pos += aligned;
- ++RecNum;
- return Rec;
- }
-
- void Flush() {
- if (!Buffer.Empty()) {
- FlushBuffer();
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
- }
- }
-
- size_t Offset() const {
- // According to vadya@ there is no evil to return 0 all the time
- return 0;
- }
-
- void ResetDat() {
- Buffer.Pos = (char*)Rec - (char*)Buffer.Data.Get();
- size_t len = SizeOf(Rec);
- Buffer.Pos += DatCeil(len);
- }
-
-protected:
- void Clear() {
- RecNum = 0;
- Rec = nullptr;
- Count = 0;
- CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
- Buffer.Clear();
- }
-
- int Init() {
- Clear();
- Buffer.Init(TPageIter::GetPageSize());
- TAlgorithm::Init();
- return 0;
- }
-
- int Term() {
- if (TPageIter::Current())
- Commit();
- int ret = !TPageIter::Current() && RecNum;
- Clear();
- TAlgorithm::Term();
- return ret;
- }
-
- int GotoPage(int pageno) {
- if (TPageIter::Current())
- Commit();
- int ret = TPageIter::GotoPage(pageno);
- if (!ret)
- Reset();
- return ret;
- }
-
-private:
- void Commit() {
- Flush();
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
- SetCompressedPageHeader();
-
- memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
- RecNum = 0;
- Count = 0;
- }
-
- inline void SetCompressedPageHeader() {
- TCompressedPage* const hdr = (TCompressedPage*)((ui8*)TPageIter::Current() + sizeof(TDatPage));
-
- hdr->BlockCount = Count;
- hdr->Algorithm = TAlgorithm::Code;
- hdr->Version = 0;
- hdr->Reserved = 0;
- }
-
- inline void Reset() {
- RecNum = 0;
- CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
- Count = 0;
- Buffer.Clear();
- }
-
- int FlushBuffer() {
- TArrayHolder<ui8> data;
- const ui8* const buf = Buffer.Data.Get();
- size_t first = 0;
-
- if (!TExtInfoType<TVal>::Exists)
- first = DatCeil(SizeOf((TVal*)buf));
- else {
- size_t ll;
- size_t l;
- first = NMicroBDB::SizeOfExt((const TVal*)buf, &ll, &l);
- first = DatCeil(first + ll + l);
- }
-
- size_t total = sizeof(NMicroBDB::TCompressedHeader) + first + ((Buffer.RecNum == 1) ? 0 : TAlgorithm::CompressBound(Buffer.Pos - first));
- size_t real = total;
-
- {
- ui8* p = nullptr;
- NMicroBDB::TCompressedHeader* hdr = nullptr;
-
- // 1. Choose data destination (temporary buffer or dat-page)
- if (CurLen + total > TPageIter::GetPageSize()) {
- data.Reset(new ui8[total]);
-
- hdr = (NMicroBDB::TCompressedHeader*)data.Get();
- p = data.Get() + sizeof(NMicroBDB::TCompressedHeader);
- } else {
- p = (ui8*)TPageIter::Current() + CurLen;
- hdr = (NMicroBDB::TCompressedHeader*)p;
- p += sizeof(NMicroBDB::TCompressedHeader);
- }
-
- // 2. Compress data
-
- // Fill header and first record
- hdr->Original = Buffer.Pos;
- hdr->Compressed = 0;
- hdr->Count = Buffer.RecNum;
- hdr->Reserved = 0;
- memcpy(p, Buffer.Data.Get(), first);
- // Fill compressed part
- if (Buffer.RecNum > 1) {
- size_t size = TAlgorithm::CompressBound(Buffer.Pos - first);
-
- p += first;
- TAlgorithm::Compress(p, size, buf + first, Buffer.Pos - first);
-
- hdr->Compressed = size;
-
- real = sizeof(NMicroBDB::TCompressedHeader) + first + size;
- }
- }
-
- Y_ASSERT(sizeof(TDatPage) + sizeof(TCompressedPage) + real <= TPageIter::GetPageSize());
-
- // 3. Check page capacity
-
- if (CurLen + real > TPageIter::GetPageSize()) {
- Y_ASSERT(data.Get() != nullptr);
-
- if (TPageIter::Current() && RecNum) {
- RecNum = RecNum - Buffer.RecNum;
- TPageIter::Current()->RecNum = RecNum;
- TPageIter::Current()->Format = MBDB_FORMAT_COMPRESSED;
- SetCompressedPageHeader();
- memset((char*)TPageIter::Current() + CurLen, 0, TPageIter::GetPageSize() - CurLen);
- TIndexer::NextPage(TPageIter::Current());
- RecNum = Buffer.RecNum;
- Count = 0;
- }
- if (!TPageIter::Next()) {
- CurLen = TPageIter::GetPageSize();
- return MBDB_NO_MEMORY;
- }
- CurLen = sizeof(TDatPage) + sizeof(TCompressedPage);
- }
-
- // 4. Flush data and reset buffer state
-
- if (data.Get())
- memcpy((ui8*)TPageIter::Current() + CurLen, data.Get(), real);
- CurLen += real;
- ++Count;
- Buffer.Clear();
- return 0;
- }
-
-private:
- size_t CurLen;
- TPageBuffer Buffer;
- TVal* Rec;
- ui32 Count; //! < count of compressed blocks on page
-public:
- int RecNum;
-};
-
-template <typename TBaseWriter>
-class TOutputPageIterator: public TBaseWriter {
-public:
- typedef TBaseWriter TWriter;
-
- TOutputPageIterator()
- : Buf(nullptr)
- {
- Clear();
- }
-
- ~TOutputPageIterator() {
- Term();
- }
-
- TDatPage* Current() {
- return CurPage;
- }
-
- size_t Offset() const {
- //Cout << "PS = " << TWriter::GetPageSize() << "; PN = " << PageNum << "; MS = " << METASIZE << Endl;
- return TWriter::GetPageSize() * PageNum + METASIZE;
- }
-
- int Freeze() {
- return (Frozen = (PageNum == -1) ? 0 : (int)PageNum);
- }
-
- void Unfreeze() {
- Frozen = -1;
- }
-
- inline int IsFrozen() const {
- return Frozen + 1;
- }
-
- inline size_t GetPageSize() const {
- return TWriter::GetPageSize();
- }
-
- inline int GetPageNum() const {
- return (int)PageNum;
- }
-
- TDatPage* Next() {
- if (PageNum >= Maxpage && WriteBuf())
- return CurPage = nullptr;
- CurPage = (TDatPage*)(Buf + ((++PageNum) % Bufpages) * GetPageSize());
- memset(CurPage, 0, sizeof(TDatPage));
- return CurPage;
- }
-
-protected:
- int Init(size_t pages, int pagesOrBytes) {
- Term();
- if (pagesOrBytes)
- Bufpages = pages;
- else
- Bufpages = pages / GetPageSize();
- Bufpages = Max<size_t>(1, Bufpages);
- Maxpage = Bufpages - 1;
- // if (!(Buf = (char*)malloc(Bufpages * GetPageSize())))
- // return ENOMEM;
- ABuf.Alloc(Bufpages * GetPageSize());
- Buf = ABuf.Begin();
- if (TWriter::Memo)
- Freeze();
- return 0;
- }
-
- int Term() {
- Unfreeze();
- int ret = (PageNum < 0) ? 0 : WriteBuf();
- Clear();
- return ret;
- }
-
- int GotoPage(int pageno) {
- int ret = EAGAIN;
- if (IsFrozen() || PageNum >= 0 && ((ret = WriteBuf())) || ((ret = TWriter::GotoPage(pageno))))
- return ret;
- PageNum = pageno;
- Maxpage = Bufpages - 1 + pageno;
- CurPage = (TDatPage*)(Buf + (PageNum % Bufpages) * GetPageSize());
- memset(CurPage, 0, sizeof(TDatPage));
- return 0;
- }
-
- void Clear() {
- ABuf.Dealloc();
- Buf = nullptr;
- Maxpage = PageNum = Frozen = -1;
- Bufpages = 0;
- CurPage = nullptr;
- }
-
- int WriteBuf() {
- int nvec;
- iovec vec[2];
- ssize_t minpage = Maxpage - Bufpages + 1;
- ssize_t maxpage = Frozen == -1 ? PageNum : Frozen - 1;
- if (maxpage < minpage)
- return EAGAIN;
- minpage %= Bufpages;
- maxpage %= Bufpages;
- if (maxpage < minpage) {
- vec[0].iov_base = Buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (Bufpages - minpage);
- vec[1].iov_base = Buf;
- vec[1].iov_len = GetPageSize() * (maxpage + 1);
- nvec = 2;
- } else {
- vec[0].iov_base = Buf + GetPageSize() * minpage;
- vec[0].iov_len = GetPageSize() * (maxpage - minpage + 1);
- nvec = 1;
- }
- if (TWriter::WritePages(vec, nvec))
- return EIO;
- Maxpage += (maxpage < minpage) ? (Bufpages - minpage + maxpage + 1) : (maxpage - minpage + 1);
- return 0;
- }
-
- ssize_t Maxpage;
- ssize_t Bufpages;
- ssize_t PageNum;
- int Frozen;
- TDatPage* CurPage;
- char* Buf;
- TMappedAllocation ABuf;
-};
-
-template <class TFileManip>
-class TOutputPageFileImpl: private TNonCopyable {
-public:
- TOutputPageFileImpl()
- : Pagesize(0)
- , Eof(1)
- , Error(0)
- , Memo(0)
- , Recordsig(0)
- {
- }
-
- ~TOutputPageFileImpl() {
- Term();
- }
-
- inline int IsEof() const {
- return Eof;
- }
-
- inline int GetError() const {
- return Error;
- }
-
- inline bool IsOpen() const {
- return FileManip.IsOpen();
- }
-
- inline size_t GetPageSize() const {
- return Pagesize;
- }
-
- inline ui32 GetRecordSig() const {
- return Recordsig;
- }
-
- int Init(const char* fname, size_t pagesize, ui32 recsig, bool direct = false) {
- Memo = 0;
- if (FileManip.IsOpen())
- return MBDB_ALREADY_INITIALIZED;
-
- if (!fname) {
- Eof = Error = 0;
- Pagesize = pagesize;
- Recordsig = recsig;
- Memo = 1;
- return 0;
- }
-
- Error = FileManip.Open(fname, WrOnly | CreateAlways | ARW | AWOther | (direct ? DirectAligned : EOpenMode()));
- if (Error)
- return Error;
- Error = Init(TFile(), pagesize, recsig);
- if (Error) {
- FileManip.Close();
- unlink(fname);
- }
- return Error;
- }
-
- int Init(TAutoPtr<IOutputStream> output, size_t pagesize, ui32 recsig) {
- Memo = 0;
- if (FileManip.IsOpen()) {
- return MBDB_ALREADY_INITIALIZED;
- }
-
- if (!output) {
- Eof = Error = 0;
- Pagesize = pagesize;
- Recordsig = recsig;
- Memo = 1;
- return 0;
- }
-
- Error = FileManip.Open(output);
- if (Error)
- return Error;
- Error = Init(TFile(), pagesize, recsig);
- if (Error) {
- FileManip.Close();
- }
- return Error;
- }
-
- int Init(const TFile& file, size_t pagesize, ui32 recsig) {
- Memo = 0;
- if (!file.IsOpen() && !FileManip.IsOpen())
- return MBDB_NOT_INITIALIZED;
- if (file.IsOpen() && FileManip.IsOpen())
- return MBDB_ALREADY_INITIALIZED;
- if (file.IsOpen()) {
- Error = FileManip.Init(file);
- if (Error)
- return Error;
- }
-
- Eof = 1;
- TTempBuf buf(METASIZE + FS_BLOCK_SIZE);
- const char* ptr = (buf.Data() + FS_BLOCK_SIZE - ((ui64)buf.Data() & (FS_BLOCK_SIZE - 1)));
- TDatMetaPage* meta = (TDatMetaPage*)ptr;
-
- memset(buf.Data(), 0, buf.Size());
- meta->MetaSig = METASIG;
- meta->PageSize = Pagesize = pagesize;
- meta->RecordSig = Recordsig = recsig;
-
- ssize_t size = METASIZE, ret = 0;
- while (size && (ret = FileManip.Write(ptr, (unsigned)size)) > 0) {
- size -= ret;
- ptr += ret;
- }
- if (size || ret <= 0) {
- Term();
- return Error = errno ? errno : MBDB_WRITE_ERROR;
- }
-
- Error = Eof = 0;
- return Error;
- }
-
-protected:
- int WritePages(iovec* vec, int nvec) {
- if (Error || Memo)
- return Error;
-
- ssize_t size, delta;
- iovec* pvec;
- int vsize;
-
- for (vsize = 0, pvec = vec; vsize < nvec; vsize++, pvec++)
- for (size = 0; (size_t)size < pvec->iov_len; size += Pagesize)
- ((TDatPage*)((char*)pvec->iov_base + size))->PageSig = PAGESIG;
-
- delta = size = 0;
- pvec = vec;
- vsize = nvec;
- while (vsize && (size = Writev(FileManip, pvec, (int)Min(vsize, 16))) > 0) {
- if (delta) {
- size += delta;
- pvec->iov_len += delta;
- pvec->iov_base = (char*)pvec->iov_base - delta;
- delta = 0;
- }
- while (size) {
- if ((size_t)size >= pvec->iov_len) {
- size -= pvec->iov_len;
- ++pvec;
- --vsize;
- } else {
- delta = size;
- pvec->iov_len -= size;
- pvec->iov_base = (char*)pvec->iov_base + size;
- size = 0;
- }
- }
- }
- if (delta) {
- pvec->iov_len += delta;
- pvec->iov_base = (char*)pvec->iov_base - delta;
- }
- return Error = (!size && !vsize) ? 0 : errno ? errno : MBDB_WRITE_ERROR;
- }
-
- i64 Tell() {
- return FileManip.RealSeek(0, SEEK_CUR);
- }
-
- int GotoPage(int pageno) {
- if (Error || Memo)
- return Error;
- Eof = 0;
- i64 offset = (i64)pageno * Pagesize + METASIZE;
- if (offset != FileManip.Seek(offset, SEEK_SET))
- Error = MBDB_BAD_FILE_SIZE;
- return Error;
- }
-
- int Term() {
- int ret = FileManip.Close();
- Eof = 1;
- Memo = 0;
- if (!Error)
- Error = ret;
- return Error;
- }
-
- size_t Pagesize;
- int Eof;
- int Error;
- int Memo;
- ui32 Recordsig;
-
-private:
- TFileManip FileManip;
-};
-
-using TOutputPageFile = TOutputPageFileImpl<TOutputFileManip>;
-
-template <class TVal,
- typename TBaseRecIter = TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>>>
-class TOutDatFileImpl: public TBaseRecIter {
-public:
- typedef TBaseRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TPageIter::TWriter TWriter;
-
- int Open(const char* fname, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1, bool direct = false) {
- int ret = TWriter::Init(fname, pagesize, TVal::RecordSig, direct);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Open(const TFile& file, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) {
- int ret = TWriter::Init(file, pagesize, TVal::RecordSig);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Open(TAutoPtr<IOutputStream> output, size_t pagesize, size_t pages = 1, int pagesOrBytes = 1) {
- int ret = TWriter::Init(output, pagesize, TVal::RecordSig);
- return ret ? ret : Open2(pages, pagesOrBytes);
- }
-
- int Close() {
- int ret1 = TRecIter::Term();
- int ret2 = TPageIter::Term();
- int ret3 = TWriter::Term();
- return ret1 ? ret1 : ret2 ? ret2 : ret3;
- }
-
-private:
- int Open2(size_t pages, int pagesOrBytes) {
- int ret = TPageIter::Init(pages, pagesOrBytes);
- if (!ret)
- ret = TRecIter::Init();
- if (ret)
- Close();
- return ret;
- }
-};
-
-template <class TVal>
-class TOutIndexFile: public TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>> {
- typedef TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer, TFakeCompression>>
- TDatFile;
- typedef TOutIndexFile<TVal> TMyType;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TIndexer TIndexer;
-
-public:
- TOutIndexFile() {
- TIndexer::SetCallback(this, DispatchCallback);
- }
-
- int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
- int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
- if (ret)
- return ret;
- if ((ret = TRecIter::GotoPage(1))) {
- TDatFile::Close();
- return ret;
- }
- Index0.Clear();
- return ret;
- }
-
- int Close() {
- TPageIter::Unfreeze();
- if (TRecIter::RecNum) {
- TRecIter::Flush();
- NextPage(TPageIter::Current());
- }
- int ret = 0;
- if (Index0.Size() && !(ret = TRecIter::GotoPage(0))) {
- const char* ptr = Index0.Begin();
- size_t recSize;
- while (ptr < Index0.End()) {
- Y_ASSERT((size_t)(Index0.End() - ptr) >= sizeof(size_t));
- memcpy(&recSize, ptr, sizeof(size_t));
- ptr += sizeof(size_t);
- Y_ASSERT((size_t)(Index0.End() - ptr) >= recSize);
- ui8* buf = (ui8*)TRecIter::Reserve(recSize);
- if (!buf) {
- ret = MBDB_PAGE_OVERFLOW;
- break;
- }
- memcpy(buf, ptr, recSize);
- TRecIter::ResetDat();
- ptr += recSize;
- }
- Index0.Clear();
- ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError();
- }
- int ret1 = TDatFile::Close();
- return ret ? ret : ret1;
- }
-
-protected:
- TBuffer Index0;
-
- void NextPage(const TDatPage* page) {
- const TVal* first = (const TVal*)NMicroBDB::GetFirstRecord(page);
- size_t sz;
- if (!TExtInfoType<TVal>::Exists) {
- sz = SizeOf(first);
- } else {
- size_t ll;
- size_t l;
- sz = NMicroBDB::SizeOfExt(first, &ll, &l);
- sz += ll + l;
- }
- Index0.Append((const char*)&sz, sizeof(size_t));
- Index0.Append((const char*)first, sz);
- }
-
- static void DispatchCallback(void* This, const TDatPage* page) {
- ((TMyType*)This)->NextPage(page);
- }
-};
-
-template <class TVal, class TKey, typename TCompressor = TFakeCompression, class TPageFile = TOutputPageFile>
-class TOutDirectFileImpl: public TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>> {
- typedef TOutDatFileImpl<
- TVal,
- TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer, TCompressor>>
- TDatFile;
- typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TMyType;
- typedef typename TDatFile::TRecIter TRecIter;
- typedef typename TRecIter::TPageIter TPageIter;
- typedef typename TRecIter::TIndexer TIndexer;
- typedef TOutIndexFile<TKey> TKeyFile;
-
-public:
- TOutDirectFileImpl() {
- TIndexer::SetCallback(this, DispatchCallback);
- }
-
- int Open(const char* fname, size_t pagesize, int pages = 1, size_t ipagesize = 0, size_t ipages = 1, int pagesOrBytes = 1) {
- char iname[FILENAME_MAX];
- int ret;
- if (ipagesize == 0)
- ipagesize = pagesize;
- ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes);
- ret = ret ? ret : DatNameToIdx(iname, fname);
- ret = ret ? ret : KeyFile.Open(iname, ipagesize, ipages, pagesOrBytes);
- if (ret)
- TDatFile::Close();
- return ret;
- }
-
- int Close() {
- if (TRecIter::RecNum) {
- TRecIter::Flush();
- NextPage(TPageIter::Current());
- }
- int ret = KeyFile.Close();
- int ret1 = TDatFile::Close();
- return ret1 ? ret1 : ret;
- }
-
- int GetError() const {
- return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError();
- }
-
-protected:
- TKeyFile KeyFile;
-
- void NextPage(const TDatPage* page) {
- typedef TMakeExtKey<TVal, TKey> TMakeExtKey;
-
- TVal* val = (TVal*)NMicroBDB::GetFirstRecord(page);
- TKey key;
- if (!TMakeExtKey::Exists) {
- TMakeExtKey::Make(&key, nullptr, val, nullptr);
- KeyFile.Push(&key);
- } else {
- size_t ll;
- size_t l;
- size_t sz = NMicroBDB::SizeOfExt(val, &ll, &l);
- typename TExtInfoType<TVal>::TResult valExt;
- if (TExtInfoType<TVal>::Exists)
- Y_PROTOBUF_SUPPRESS_NODISCARD valExt.ParseFromArray((ui8*)val + sz + ll, l);
- typename TExtInfoType<TKey>::TResult keyExt;
- TMakeExtKey::Make(&key, &keyExt, val, &valExt);
- KeyFile.Push(&key, &keyExt);
- }
- }
-
- static void DispatchCallback(void* This, const TDatPage* page) {
- ((TMyType*)This)->NextPage(page);
- }
-};
diff --git a/library/cpp/microbdb/powersorter.h b/library/cpp/microbdb/powersorter.h
deleted file mode 100644
index c40de9c23f0..00000000000
--- a/library/cpp/microbdb/powersorter.h
+++ /dev/null
@@ -1,667 +0,0 @@
-#pragma once
-
-#include "safeopen.h"
-
-#include <util/generic/vector.h>
-#include <util/generic/deque.h>
-#include <util/system/mutex.h>
-#include <util/system/condvar.h>
-#include <util/thread/pool.h>
-
-template <
- class TRecord,
- template <typename T> class TCompare,
- class TSieve,
- class TMemoFile = TOutDatFile<TRecord>>
-class TDatSorterBuf {
-public:
- typedef TRecord TRec;
- typedef TVector<TRec*> TVectorType;
- typedef TMemoFile TMemo;
- typedef TCompare<TRecord> TComp;
-
-public:
- TDatSorterBuf(size_t memory, size_t pageSize)
- : Memo("memo", pageSize, memory, 0)
- , Cur()
- {
- Memo.Open(nullptr);
- Memo.Freeze();
- }
-
- ~TDatSorterBuf() {
- Vector.clear();
- Memo.Close();
- }
-
- const TRec* Push(const TRec* v) {
- const TRec* u = Memo.Push(v);
- if (u)
- Vector.push_back((TRec*)u);
- return u;
- }
-
- const TRec* Next() {
- if (Ptr == Vector.end()) {
- if (Cur)
- TSieve::Sieve(Cur, Cur);
- Cur = nullptr;
- } else {
- Cur = *Ptr++;
- if (!TIsSieveFake<TSieve>::Result)
- while (Ptr != Vector.end() && TSieve::Sieve(Cur, *Ptr))
- ++Ptr;
- }
- return Cur;
- }
-
- const TRec* Current() {
- return Cur;
- }
-
- size_t Size() {
- return Vector.size();
- }
-
- void Sort() {
- Ptr = Vector.begin();
- Cur = nullptr;
-
- MBDB_SORT_FUN(Vector.begin(), Vector.end(), TComp());
- }
-
- void Clear() {
- Vector.clear();
- Memo.Freeze();
- Ptr = Vector.begin();
- Cur = nullptr;
- }
-
-private:
- TVectorType Vector;
- TMemo Memo;
-
- typename TVectorType::iterator
- Ptr;
- TRec* Cur;
-};
-
-template <
- class TRecord,
- class TInput,
- template <typename T> class TCompare,
- class TSieve>
-class TDatMerger {
-public:
- typedef TRecord TRec;
- typedef TCompare<TRecord> TComp;
- typedef TSimpleSharedPtr<TInput> TInputPtr;
- typedef TVector<TInputPtr> TInputVector;
-
-public:
- ~TDatMerger() {
- Close();
- }
-
- void Init(const TInputVector& inputs) {
- Inputs = inputs;
- TVector<TInput*> v;
- for (int i = 0; i < Inputs.ysize(); ++i)
- v.push_back(Inputs[i].Get());
- HeapIter.Init(&v[0], v.size());
- if (!TIsSieveFake<TSieve>::Result)
- PNext = HeapIter.Next();
- }
-
- const TRec* Next() {
- if (TIsSieveFake<TSieve>::Result) {
- return HeapIter.Next();
- }
-
- if (!PNext) {
- if (PCur) {
- TSieve::Sieve(PCur, PCur);
- PCur = nullptr;
- }
- return nullptr;
- }
-
- PCur = &Cur;
- memcpy(PCur, PNext, SizeOf((const TRec*)PNext));
-
- do {
- PNext = HeapIter.Next();
- } while (PNext && TSieve::Sieve(PCur, PNext));
-
- return PCur;
- }
-
- const TRec* Current() {
- return (TIsSieveFake<TSieve>::Result ? HeapIter.Current() : PCur);
- }
-
- void Close() {
- Inputs.clear();
- HeapIter.Term();
- }
-
-private:
- TInputVector Inputs;
- THeapIter<TRec, TInput, TComp> HeapIter;
- TRec Cur;
- TRec* PCur = nullptr;
- const TRec* PNext = nullptr;
-};
-
-class TPortionManager {
-public:
- void Open(const char* tempDir) {
- TGuard<TMutex> guard(Mutex);
- TempDir = tempDir;
- }
-
- TString Next() {
- TGuard<TMutex> guard(Mutex);
- if (Portions == 0)
- DoOpen();
- TString fname = GeneratePortionFilename(Portions++);
- return fname;
- }
-
- void Close() {
- TGuard<TMutex> guard(Mutex);
- Portions = 0;
- }
-
-private:
- void DoOpen() {
- if (MakeSorterTempl(PortionFilenameTempl, TempDir.data())) {
- PortionFilenameTempl[0] = 0;
- ythrow yexception() << "portion-manager: bad tempdir \"" << TempDir.data() << "\": " << LastSystemErrorText();
- }
- }
-
- TString GeneratePortionFilename(int i) {
- char str[FILENAME_MAX];
- snprintf(str, sizeof(str), PortionFilenameTempl, i);
- return TString(str);
- }
-
-private:
- TMutex Mutex;
-
- TString TempDir;
- char PortionFilenameTempl[FILENAME_MAX] = {};
- int Portions = 0;
-};
-
-// A merger powered by threads
-template <
- class TRecord,
- template <typename T> class TCompare,
- class TSieve,
- class TInput = TInDatFile<TRecord>,
- class TOutput = TOutDatFile<TRecord>>
-class TPowerMerger {
-public:
- typedef TRecord TRec;
- typedef TDatMerger<TRecord, TInput, TCompare, TSieve> TMerger;
- typedef TSimpleSharedPtr<TMerger> TMergerPtr;
- typedef TPowerMerger<TRecord, TCompare, TSieve, TInput, TOutput> TFileMerger;
-
- struct TMergePortionTask: public IObjectInQueue {
- TFileMerger* FileMerger;
- int Begin;
- int End;
- TString OutFname;
-
- TMergePortionTask(TFileMerger* fileMerger, int begin, int end, const TString& outFname)
- : FileMerger(fileMerger)
- , Begin(begin)
- , End(end)
- , OutFname(outFname)
- {
- }
-
- void Process(void*) override {
- THolder<TMergePortionTask> This(this);
- //fprintf(stderr, "MergePortion: (%i, %i, %s)\n", Begin, End, ~OutFname);
- FileMerger->MergePortion(Begin, End, OutFname);
- }
- };
-
-public:
- TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const TSimpleSharedPtr<TPortionManager>& portMan,
- int memory, int pageSize, bool autoUnlink)
- : MtpQueue(mtpQueue)
- , PortionManager(portMan)
- , Memory(memory)
- , PageSize(pageSize)
- , AutoUnlink(autoUnlink)
- {
- }
-
- TPowerMerger(const TSimpleSharedPtr<TThreadPool>& mtpQueue, const char* tempDir,
- int memory, int pageSize, bool autoUnlink)
- : MtpQueue(mtpQueue)
- , PortionManager(new TPortionManager)
- , Memory(memory)
- , PageSize(pageSize)
- , AutoUnlink(autoUnlink)
- {
- PortionManager->Open(tempDir);
- }
-
- ~TPowerMerger() {
- Close();
- }
-
- void SetMtpQueue(const TSimpleSharedPtr<TThreadPool>& mtpQueue) {
- MtpQueue = mtpQueue;
- }
-
- void MergePortion(int begin, int end, const TString& outFname) {
- TMerger merger;
- InitMerger(merger, begin, end);
-
- TOutput out("mergeportion-tmpout", PageSize, BufSize, 0);
- out.Open(outFname.data());
- const TRec* rec;
- while ((rec = merger.Next()))
- out.Push(rec);
- out.Close();
-
- merger.Close();
-
- {
- TGuard<TMutex> guard(Mutex);
- UnlinkFiles(begin, end);
- Files.push_back(outFname);
- --Tasks;
- TaskFinishedCond.Signal();
- }
- }
-
- void Add(const TString& fname) {
- TGuard<TMutex> guard(Mutex);
- // fprintf(stderr, "TPowerMerger::Add: %s\n", ~fname);
- Files.push_back(fname);
- if (InitialFilesEnd > 0)
- ythrow yexception() << "TPowerMerger::Add: no more files allowed";
- }
-
- void Merge(int maxPortions) {
- TGuard<TMutex> guard(Mutex);
- InitialFilesEnd = Files.ysize();
- if (!InitialFilesEnd)
- ythrow yexception() << "TPowerMerger::Merge: no files added";
- Optimize(maxPortions);
- MergeMT();
- InitMerger(Merger, CPortions, Files.ysize());
- }
-
- void Close() {
- TGuard<TMutex> guard(Mutex);
- Merger.Close();
- UnlinkFiles(CPortions, Files.ysize());
- InitialFilesEnd = CPortions = 0;
- Files.clear();
- }
-
- const TRec* Next() {
- return Merger.Next();
- }
-
- const TRec* Current() {
- return Merger.Current();
- }
-
- int FileCount() const {
- TGuard<TMutex> guard(Mutex);
- return Files.ysize();
- }
-
-private:
- void InitMerger(TMerger& merger, int begin, int end) {
- TGuard<TMutex> guard(Mutex);
- TVector<TSimpleSharedPtr<TInput>> inputs;
- for (int i = begin; i < end; ++i) {
- inputs.push_back(new TInput("mergeportion-tmpin", BufSize, 0));
- inputs.back()->Open(Files[i]);
- // fprintf(stderr, "InitMerger: %i, %s\n", i, ~Files[i]);
- }
- merger.Init(inputs);
- }
-
- void UnlinkFiles(int begin, int end) {
- TGuard<TMutex> guard(Mutex);
- for (int i = begin; i < end; ++i) {
- if (i >= InitialFilesEnd || AutoUnlink)
- unlink(Files[i].c_str());
- }
- }
-
- void Optimize(int maxPortions, size_t maxBufSize = 4u << 20) {
- TGuard<TMutex> guard(Mutex);
- maxPortions = std::min(maxPortions, Memory / PageSize - 1);
- maxBufSize = std::max((size_t)PageSize, maxBufSize);
-
- if (maxPortions <= 2) {
- FPortions = MPortions = 2;
- BufSize = PageSize;
- return;
- }
-
- int Portions = Files.ysize();
- if (maxPortions >= Portions) {
- FPortions = MPortions = Portions;
- } else if (((Portions + maxPortions - 1) / maxPortions) <= maxPortions) {
- while (((Portions + maxPortions - 1) / maxPortions) <= maxPortions)
- --maxPortions;
- MPortions = ++maxPortions;
- int total = ((Portions + MPortions - 1) / MPortions) + Portions;
- FPortions = (total % MPortions) ? (total % MPortions) : (int)MPortions;
- } else
- FPortions = MPortions = maxPortions;
-
- BufSize = std::min((size_t)(Memory / (MPortions + 1)), maxBufSize);
- // fprintf(stderr, "Optimize: Portions=%i; MPortions=%i; FPortions=%i; Memory=%i; BufSize=%i\n",
- // (int)Portions, (int)MPortions, (int)FPortions, (int)Memory, (int)BufSize);
- }
-
- void MergeMT() {
- TGuard<TMutex> guard(Mutex);
- do {
- int n;
- while ((n = Files.ysize() - CPortions) > MPortions) {
- int m = std::min((CPortions == 0 ? (int)FPortions : (int)MPortions), n);
- TString fname = PortionManager->Next();
- if (!MtpQueue->Add(new TMergePortionTask(this, CPortions, CPortions + m, fname)))
- ythrow yexception() << "TPowerMerger::MergeMT: failed to add task";
- CPortions += m;
- ++Tasks;
- }
- if (Tasks > 0)
- TaskFinishedCond.Wait(Mutex);
- } while (Tasks > 0);
- }
-
-private:
- TMutex Mutex;
- TCondVar TaskFinishedCond;
-
- TMerger Merger;
- TSimpleSharedPtr<TThreadPool> MtpQueue;
- TSimpleSharedPtr<TPortionManager> PortionManager;
- TVector<TString> Files;
- int Tasks = 0;
- int InitialFilesEnd = 0;
- int CPortions = 0;
- int MPortions = 0;
- int FPortions = 0;
- int Memory = 0;
- int PageSize = 0;
- int BufSize = 0;
- bool AutoUnlink = false;
-};
-
-// A sorter powered by threads
-template <
- class TRecord,
- template <typename T> class TCompare,
- class TSieve = TFakeSieve<TRecord>,
- class TTmpInput = TInDatFile<TRecord>,
- class TTmpOutput = TOutDatFile<TRecord>>
-class TPowerSorter {
-public:
- typedef TPowerSorter<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TSorter;
- typedef TRecord TRec;
- typedef TTmpOutput TTmpOut;
- typedef TTmpInput TTmpIn;
- typedef TDatSorterBuf<TRecord, TCompare, TSieve> TSorterBuf;
- typedef TCompare<TRecord> TComp;
- typedef TPowerMerger<TRecord, TCompare, TSieve, TTmpInput, TTmpOutput> TFileMerger;
-
- struct TSortPortionTask: public IObjectInQueue {
- TSorter* Sorter;
- TSorterBuf* SorterBuf;
- int Portion;
-
- TSortPortionTask(TSorter* sorter, TSorterBuf* sorterBuf, int portion)
- : Sorter(sorter)
- , SorterBuf(sorterBuf)
- , Portion(portion)
- {
- }
-
- void Process(void*) override {
- TAutoPtr<TSortPortionTask> This(this);
- // fprintf(stderr, "SortPortion: %i\n", Portion);
- Sorter->SortPortion(SorterBuf);
- }
- };
-
- class TSorterBufQueue {
- private:
- TMutex Mutex;
- TCondVar Cond;
- TVector<TSimpleSharedPtr<TSorterBuf>> V;
- TDeque<TSorterBuf*> Q;
-
- int Memory, PageSize, MaxSorterBufs;
-
- public:
- TSorterBufQueue(int memory, int pageSize, int maxSorterBufs)
- : Memory(memory)
- , PageSize(pageSize)
- , MaxSorterBufs(maxSorterBufs)
- {
- }
-
- void Push(TSorterBuf* sb) {
- TGuard<TMutex> guard(Mutex);
- sb->Clear();
- Q.push_back(sb);
- Cond.Signal();
- }
-
- TSorterBuf* Pop() {
- TGuard<TMutex> guard(Mutex);
- if (!Q.size() && V.ysize() < MaxSorterBufs) {
- V.push_back(new TSorterBuf(Memory / MaxSorterBufs, PageSize));
- return V.back().Get();
- } else {
- while (!Q.size())
- Cond.Wait(Mutex);
- TSorterBuf* t = Q.front();
- Q.pop_front();
- return t;
- }
- }
-
- void Clear() {
- TGuard<TMutex> guard(Mutex);
- Q.clear();
- V.clear();
- }
-
- void WaitAll() {
- TGuard<TMutex> guard(Mutex);
- while (Q.size() < V.size()) {
- Cond.Wait(Mutex);
- }
- }
-
- int GetMaxSorterBufs() const {
- return MaxSorterBufs;
- }
- };
-
-public:
- TPowerSorter(const TSimpleSharedPtr<TThreadPool>& mtpQueue, size_t maxSorterBufs,
- const char* name, size_t memory, size_t pageSize, size_t bufSize)
- : MaxSorterBufs(maxSorterBufs)
- , Name(name)
- , Memory(memory)
- , PageSize(pageSize)
- , BufSize(bufSize)
- , MtpQueue(mtpQueue)
- , PortionManager(new TPortionManager)
- , SBQueue(Memory, PageSize, MaxSorterBufs)
- , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
- {
- }
-
- TPowerSorter(size_t maxSorterBufs,
- const char* name, size_t memory, size_t pageSize, size_t bufSize)
- : MaxSorterBufs(maxSorterBufs)
- , Name(name)
- , Memory(memory)
- , PageSize(pageSize)
- , BufSize(bufSize)
- , PortionManager(new TPortionManager)
- , SBQueue(Memory, PageSize, maxSorterBufs)
- , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
- {
- }
-
- TPowerSorter(const char* name, size_t memory, size_t pageSize, size_t bufSize)
- : MaxSorterBufs(5)
- , Name(name)
- , Memory(memory)
- , PageSize(pageSize)
- , BufSize(bufSize)
- , PortionManager(new TPortionManager)
- , SBQueue(Memory, PageSize, MaxSorterBufs)
- , FileMerger(MtpQueue, PortionManager, Memory, PageSize, true)
- {
- }
-
- ~TPowerSorter() {
- Close();
- }
-
- void Open(const char* tempDir) {
- Close();
- CurSB = SBQueue.Pop();
- PortionManager->Open(tempDir);
- }
-
- void Reopen(const char* fname) {
- Open(fname);
- }
-
- void Close() {
- CurSB = nullptr;
- SBQueue.Clear();
- PortionCount = 0;
- FileMerger.Close();
- PortionManager->Close();
- }
-
- const TRec* Push(const TRec* v) {
- CheckOpen("Push");
- const TRec* u = CurSB->Push(v);
- if (!u) {
- NextPortion();
- u = CurSB->Push(v);
- }
- return u;
- }
-
- void Sort(int maxPortions = 1000) {
- CheckOpen("Sort");
- if (!PortionCount) {
- CurSB->Sort();
- } else {
- NextPortion();
- SBQueue.Push(CurSB);
- CurSB = nullptr;
- SBQueue.WaitAll();
- SBQueue.Clear();
- FileMerger.Merge(maxPortions);
- }
- }
-
- const TRec* Next() {
- return PortionCount ? FileMerger.Next() : CurSB->Next();
- }
-
- const TRec* Current() {
- return PortionCount ? FileMerger.Current() : CurSB->Current();
- }
-
- int GetBufSize() const {
- return BufSize;
- }
-
- int GetPageSize() const {
- return PageSize;
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
-private:
- void CheckOpen(const char* m) {
- if (!CurSB)
- ythrow yexception() << "TPowerSorter::" << m << ": the sorter is not open";
- }
-
- void NextPortion() {
- if (!CurSB->Size())
- return;
- ++PortionCount;
- if (MaxSorterBufs <= 1) {
- SortPortion(CurSB);
- } else {
- if (!MtpQueue.Get()) {
- MtpQueue.Reset(new TThreadPool);
- MtpQueue->Start(MaxSorterBufs - 1);
- FileMerger.SetMtpQueue(MtpQueue);
- }
- if (!MtpQueue->Add(new TSortPortionTask(this, CurSB, PortionCount)))
- ythrow yexception() << "TPowerSorter::NextPortion: failed to add task";
- }
- CurSB = SBQueue.Pop();
- }
-
- void SortPortion(TSorterBuf* sorterBuf) {
- TString portionFilename = PortionManager->Next();
- try {
- sorterBuf->Sort();
-
- // fprintf(stderr, "TPowerSorter::SortPortion: -> %s\n", ~portionFilename);
- TTmpOut out("powersorter-portion", PageSize, BufSize, 0);
- out.Open(portionFilename.data());
-
- while (sorterBuf->Next())
- out.Push(sorterBuf->Current());
-
- out.Close();
- FileMerger.Add(portionFilename);
- SBQueue.Push(sorterBuf);
- } catch (const yexception& e) {
- unlink(portionFilename.data());
- ythrow yexception() << "SortPortion: " << e.what();
- }
- }
-
-private:
- int MaxSorterBufs = 0;
- TString Name;
- int Memory = 0;
- int PageSize = 0;
- int BufSize = 0;
-
- TMutex Mutex;
- TSimpleSharedPtr<TThreadPool> MtpQueue;
- TSimpleSharedPtr<TPortionManager> PortionManager;
-
- TSorterBufQueue SBQueue;
- TSorterBuf* CurSB = nullptr;
- int PortionCount = 0;
-
- TFileMerger FileMerger;
-};
diff --git a/library/cpp/microbdb/reader.h b/library/cpp/microbdb/reader.h
deleted file mode 100644
index 694a2f17662..00000000000
--- a/library/cpp/microbdb/reader.h
+++ /dev/null
@@ -1,354 +0,0 @@
-#pragma once
-
-#include "align.h"
-#include "header.h"
-#include "extinfo.h"
-
-#include <contrib/libs/zlib/zlib.h>
-#include <contrib/libs/fastlz/fastlz.h>
-#include <contrib/libs/snappy/snappy.h>
-
-#include <util/generic/vector.h>
-#include <util/memory/tempbuf.h>
-
-namespace NMicroBDB {
- static const size_t DEFAULT_BUFFER_SIZE = (64 << 10);
-
- //!
- template <class TVal>
- class IBasePageReader {
- public:
- virtual size_t GetRecSize() const = 0;
- virtual size_t GetExtSize() const = 0;
- virtual bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const = 0;
- virtual const ui8* GetExtInfoRaw(size_t* len) const = 0;
- virtual const TVal* Next() = 0;
- virtual void Reset() = 0;
- //! set clearing flag, so temporary buffers will be cleared
- //! in next call of Next()
- virtual void SetClearFlag() {
- }
-
- virtual ~IBasePageReader() {
- }
- };
-
- template <class TVal, typename TPageIter>
- class TRawPageReader: public IBasePageReader<TVal> {
- public:
- TRawPageReader(TPageIter* const iter)
- : PageIter(iter)
- {
- Reset();
- }
-
- bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override {
- Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
- if (!Rec)
- return false;
- ui8* raw = (ui8*)Rec + RecSize + ExtLenSize;
- return extInfo->ParseFromArray(raw, ExtSize);
- }
-
- size_t GetRecSize() const override {
- return RecSize + ExtLenSize;
- }
-
- size_t GetExtSize() const override {
- return ExtSize;
- }
-
- const ui8* GetExtInfoRaw(size_t* len) const override {
- Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
- if (!Rec) {
- *len = 0;
- return nullptr;
- }
- *len = ExtLenSize + ExtSize;
- return (ui8*)Rec + RecSize;
- }
-
- const TVal* Next() override {
- if (!Rec)
- Rec = (TVal*)((char*)PageIter->Current() + sizeof(TDatPage));
- else
- Rec = (TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize));
- if (!TExtInfoType<TVal>::Exists)
- RecSize = SizeOf(Rec);
- else
- RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize);
- return Rec;
- }
-
- void Reset() override {
- Rec = nullptr;
- RecSize = 0;
- ExtLenSize = 0;
- ExtSize = 0;
- }
-
- private:
- const TVal* Rec;
- size_t RecSize;
- size_t ExtLenSize;
- size_t ExtSize;
- TPageIter* const PageIter;
- };
-
- template <class TVal, typename TPageIter>
- class TCompressedReader: public IBasePageReader<TVal> {
- inline size_t GetFirstRecordSize(const TVal* const in) const {
- if (!TExtInfoType<TVal>::Exists) {
- return DatCeil(SizeOf(in));
- } else {
- size_t ll;
- size_t l;
- size_t ret = SizeOfExt(in, &ll, &l);
-
- return DatCeil(ret + ll + l);
- }
- }
-
- void DecompressBlock() {
- if (PageIter->IsFrozen() && Buffer.Get())
- Blocks.push_back(Buffer.Release());
-
- const TCompressedHeader* hdr = (const TCompressedHeader*)(Page);
-
- Page += sizeof(TCompressedHeader);
-
- const size_t first = GetFirstRecordSize((const TVal*)Page);
-
- if (!Buffer.Get() || Buffer->Size() < hdr->Original)
- Buffer.Reset(new TTempBuf(Max<size_t>(hdr->Original, DEFAULT_BUFFER_SIZE)));
-
- memcpy(Buffer->Data(), Page, first);
- Page += first;
-
- if (hdr->Count > 1) {
- switch (Algo) {
- case MBDB_COMPRESSION_ZLIB: {
- uLongf dst = hdr->Original - first;
-
- int ret = uncompress((Bytef*)Buffer->Data() + first, &dst, Page, hdr->Compressed);
-
- if (ret != Z_OK)
- ythrow yexception() << "error then uncompress " << ret;
- } break;
- case MBDB_COMPRESSION_FASTLZ: {
- int dst = hdr->Original - first;
- int ret = yfastlz_decompress(Page, hdr->Compressed, Buffer->Data() + first, dst);
-
- if (!ret)
- ythrow yexception() << "error then uncompress";
- } break;
- case MBDB_COMPRESSION_SNAPPY: {
- if (!snappy::RawUncompress((const char*)Page, hdr->Compressed, Buffer->Data() + first))
- ythrow yexception() << "error then uncompress";
- } break;
- }
- }
-
- Rec = nullptr;
- RecNum = hdr->Count;
- Page += hdr->Compressed;
- }
-
- void ClearBuffer() {
- for (size_t i = 0; i < Blocks.size(); ++i)
- delete Blocks[i];
- Blocks.clear();
- ClearFlag = false;
- }
-
- public:
- TCompressedReader(TPageIter* const iter)
- : Rec(nullptr)
- , RecSize(0)
- , ExtLenSize(0)
- , ExtSize(0)
- , Page(nullptr)
- , PageIter(iter)
- , RecNum(0)
- , BlockNum(0)
- , ClearFlag(false)
- {
- }
-
- ~TCompressedReader() override {
- ClearBuffer();
- }
-
- size_t GetRecSize() const override {
- return RecSize + ExtLenSize;
- }
-
- size_t GetExtSize() const override {
- return ExtSize;
- }
-
- bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const override {
- Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
- if (!Rec)
- return false;
- ui8* raw = (ui8*)Rec + RecSize + ExtLenSize;
- return extInfo->ParseFromArray(raw, ExtSize);
- }
-
- const ui8* GetExtInfoRaw(size_t* len) const override {
- Y_VERIFY(TExtInfoType<TVal>::Exists, "GetExtInfo should only be used with extended records");
- if (!Rec) {
- *len = 0;
- return nullptr;
- }
- *len = ExtLenSize + ExtSize;
- return (ui8*)Rec + RecSize;
- }
-
- const TVal* Next() override {
- Y_ASSERT(RecNum >= 0);
-
- if (ClearFlag)
- ClearBuffer();
-
- if (!Page) {
- if (!PageIter->Current())
- return nullptr;
-
- Page = (ui8*)PageIter->Current() + sizeof(TDatPage);
-
- BlockNum = ((TCompressedPage*)Page)->BlockCount - 1;
- Algo = (ECompressionAlgorithm)((TCompressedPage*)Page)->Algorithm;
- Page += sizeof(TCompressedPage);
-
- DecompressBlock();
- }
-
- if (!RecNum) {
- if (BlockNum <= 0)
- return nullptr;
- else {
- --BlockNum;
- DecompressBlock();
- }
- }
-
- --RecNum;
- if (!Rec)
- Rec = (const TVal*)Buffer->Data();
- else
- Rec = (const TVal*)((char*)Rec + DatCeil(RecSize + ExtLenSize + ExtSize));
-
- if (!TExtInfoType<TVal>::Exists)
- RecSize = SizeOf(Rec);
- else
- RecSize = SizeOfExt(Rec, &ExtLenSize, &ExtSize);
-
- return Rec;
- }
-
- void Reset() override {
- Page = nullptr;
- BlockNum = 0;
- Rec = nullptr;
- RecSize = 0;
- ExtLenSize = 0;
- ExtSize = 0;
- RecNum = 0;
- }
-
- void SetClearFlag() override {
- ClearFlag = true;
- }
-
- public:
- THolder<TTempBuf> Buffer;
- TVector<TTempBuf*> Blocks;
- const TVal* Rec;
- size_t RecSize;
- size_t ExtLenSize;
- size_t ExtSize;
- const ui8* Page;
- TPageIter* const PageIter;
- int RecNum; //!< count of recs in current block
- int BlockNum;
- ECompressionAlgorithm Algo;
- bool ClearFlag;
- };
-
- class TZLibCompressionImpl {
- public:
- static const ECompressionAlgorithm Code = MBDB_COMPRESSION_ZLIB;
-
- inline void Init() {
- // -
- }
-
- inline void Term() {
- // -
- }
-
- inline size_t CompressBound(size_t size) const noexcept {
- return ::compressBound(size);
- }
-
- inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
- uLongf size = outSize;
-
- if (compress((Bytef*)out, &size, (const Bytef*)in, inSize) != Z_OK)
- ythrow yexception() << "not compressed";
- outSize = size;
- }
- };
-
- class TFastlzCompressionImpl {
- public:
- static const ECompressionAlgorithm Code = MBDB_COMPRESSION_FASTLZ;
-
- inline void Init() {
- // -
- }
-
- inline void Term() {
- // -
- }
-
- inline size_t CompressBound(size_t size) const noexcept {
- size_t rval = size_t(size * 1.07);
- return rval < 66 ? 66 : rval;
- }
-
- inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
- outSize = yfastlz_compress_level(2, in, inSize, out);
- if (!outSize)
- ythrow yexception() << "not compressed";
- }
- };
-
- class TSnappyCompressionImpl {
- public:
- static const ECompressionAlgorithm Code = MBDB_COMPRESSION_SNAPPY;
-
- inline void Init() {
- // -
- }
-
- inline void Term() {
- // -
- }
-
- inline size_t CompressBound(size_t size) const noexcept {
- return snappy::MaxCompressedLength(size);
- }
-
- inline void Compress(void* out, size_t& outSize, const void* in, size_t inSize) {
- snappy::RawCompress((const char*)in, inSize, (char*)out, &outSize);
- }
- };
-
-}
-
-using TFakeCompression = void;
-using TZLibCompression = NMicroBDB::TZLibCompressionImpl;
-using TFastlzCompression = NMicroBDB::TFastlzCompressionImpl;
-using TSnappyCompression = NMicroBDB::TSnappyCompressionImpl;
diff --git a/library/cpp/microbdb/safeopen.h b/library/cpp/microbdb/safeopen.h
deleted file mode 100644
index c328ffd575a..00000000000
--- a/library/cpp/microbdb/safeopen.h
+++ /dev/null
@@ -1,792 +0,0 @@
-#pragma once
-
-// util
-#include <util/generic/yexception.h>
-#include <util/generic/vector.h>
-#include <util/string/util.h>
-#include <util/system/mutex.h>
-#include <thread>
-
-#include "microbdb.h"
-
-#if defined(_MSC_VER)
-#pragma warning(push)
-#pragma warning(disable : 4706) /*assignment within conditional expression*/
-#pragma warning(disable : 4267) /*conversion from 'size_t' to 'type', possible loss of data*/
-#endif
-
-template <typename TVal, typename TPageFile = TInputPageFile, typename TIterator = TInputPageIterator<TPageFile>>
-class TInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> {
-public:
- typedef TVal TRec;
- typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TIterator>> TBase;
-
- TInDatFile(const TString& name, size_t pages, int pagesOrBytes = 1)
- : Name(name)
- , Pages(pages)
- , PagesOrBytes(pagesOrBytes)
- {
- }
-
- ~TInDatFile() {
- Close();
- }
-
- void Open(const TString& fname, bool direct = false) {
- ui32 gotRecordSig = 0;
- int ret = TBase::Open(fname.data(), Pages, PagesOrBytes, &gotRecordSig, direct);
- if (ret) {
- // XXX: print record type name, not type sig
- ythrow yexception() << ErrorMessage(ret, "Failed to open input file", fname, TVal::RecordSig, gotRecordSig);
- }
- Name = fname;
- }
-
- void OpenStream(TAutoPtr<IInputStream> input) {
- ui32 gotRecordSig = 0;
- int ret = TBase::Open(input, Pages, PagesOrBytes, &gotRecordSig);
- if (ret) {
- // XXX: print record type name, not type sig
- ythrow yexception() << ErrorMessage(ret, "Failed to open input file", Name, TVal::RecordSig, gotRecordSig);
- }
- }
-
- void Close() {
- int ret;
- if (IsOpen() && (ret = TBase::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing input file", Name);
- if ((ret = TBase::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing input file", Name);
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
- using TBase::Current;
- using TBase::Freeze;
- using TBase::GetError;
- using TBase::GetExtInfo;
- using TBase::GetExtInfoRaw;
- using TBase::GetExtSize;
- using TBase::GetLastPage;
- using TBase::GetPageNum;
- using TBase::GetPageSize;
- using TBase::GetRecSize;
- using TBase::GotoLastPage;
- using TBase::GotoPage;
- using TBase::IsEof;
- using TBase::IsOpen;
- using TBase::Next;
- using TBase::Skip;
- using TBase::Unfreeze;
-
-protected:
- TString Name;
- size_t Pages;
- int PagesOrBytes;
-};
-
-template <typename TVal>
-class TMappedInDatFile: protected TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> {
-public:
- typedef TVal TRec;
- typedef TInDatFileImpl<TVal, TInputRecordIterator<TVal, TMappedInputPageIterator<TMappedInputPageFile>>> TBase;
-
- TMappedInDatFile(const TString& name, size_t /* pages */, int /* pagesOrBytes */)
- : Name(name)
- {
- }
-
- ~TMappedInDatFile() {
- Close();
- }
-
- void Open(const TString& fname) {
- int ret = TBase::Open(fname.data());
- if (ret)
- ythrow yexception() << ErrorMessage(ret, "Failed to open mapped file", fname, TVal::RecordSig);
- Name = fname;
- }
-
- void Close() {
- int ret;
- if (IsOpen() && (ret = TBase::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing mapped file", Name);
- if ((ret = TBase::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing mapped file", Name);
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
- using TBase::Current;
- using TBase::GetError;
- using TBase::GetExtInfo;
- using TBase::GetExtInfoRaw;
- using TBase::GetLastPage;
- using TBase::GetPageNum;
- using TBase::GetPageSize;
- using TBase::GotoLastPage;
- using TBase::GotoPage;
- using TBase::IsEof;
- using TBase::IsOpen;
- using TBase::Next;
- using TBase::Skip;
-
-protected:
- TString Name;
-};
-
-template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
-class TOutDatFile: protected TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> {
-public:
- typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TFakeIndexer, TCompressor>> TBase;
-
- TOutDatFile(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : Name(name)
- , PageSize(pagesize)
- , Pages(pages)
- , PagesOrBytes(pagesOrBytes)
- {
- }
-
- ~TOutDatFile() {
- Close();
- }
-
- void Open(const char* fname, bool direct = false) {
- int ret = TBase::Open(fname, PageSize, Pages, PagesOrBytes, direct);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
- Name = fname;
- }
-
- void Open(const TString& fname) {
- Open(fname.data());
- }
-
- void OpenStream(TAutoPtr<IOutputStream> output) {
- int ret = TBase::Open(output, PageSize, Pages, PagesOrBytes);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, "Failed to open output stream", Name);
- }
-
- void Close() {
- int ret;
- if ((ret = TBase::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
- if ((ret = TBase::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
- using TBase::Freeze;
- using TBase::GetError;
- using TBase::GetPageSize;
- using TBase::IsEof;
- using TBase::IsOpen;
- using TBase::Offset;
- using TBase::Push;
- using TBase::PushWithExtInfo;
- using TBase::Reserve;
- using TBase::Unfreeze;
-
-protected:
- TString Name;
- size_t PageSize, Pages;
- int PagesOrBytes;
-};
-
-template <typename TVal, typename TCompressor, typename TPageFile>
-class TOutDatFileArray;
-
-template <typename TVal, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
-class TOutDatFileArray {
- typedef TOutDatFile<TVal, TCompressor, TPageFile> TFileType;
-
-public:
- TOutDatFileArray(const TString& name, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : Name(name)
- , PageSize(pagesize)
- , Pages(pages)
- , PagesOrBytes(pagesOrBytes)
- , NumFiles(0)
- , Files(nullptr)
- {
- }
-
- ~TOutDatFileArray() {
- for (int i = 0; i < NumFiles; ++i) {
- Files[i].Close();
- Files[i].~TFileType();
- }
- free(Files);
- Files = nullptr;
- NumFiles = 0;
- }
-
- TFileType& operator[](size_t pos) {
- return Files[pos];
- }
-
- void Open(int n, const TString& fname) {
- char temp[FILENAME_MAX];
-
- Name = fname;
- NumFiles = CreateDatObjects(n, fname);
-
- int i;
- try {
- for (i = 0; i < NumFiles; ++i) {
- sprintf(temp, fname.data(), i);
- Files[i].Open(temp);
- }
- } catch (...) {
- while (--i >= 0)
- Files[i].Close();
- throw;
- }
- }
-
- template <typename TNameBuilder>
- void OpenWithCallback(int n, const TNameBuilder& builder) {
- NumFiles = CreateDatObjects(n, Name);
-
- for (int i = 0; i < NumFiles; ++i)
- Files[i].Open(builder.GetName(i).data());
- }
-
- void Close() {
- for (int i = 0; i < NumFiles; ++i)
- Files[i].Close();
- }
-
- void CloseMT(ui32 threads) {
- int current = 0;
- TMutex mutex;
- TVector<std::thread> thrs;
- thrs.reserve(threads);
- for (ui32 i = 0; i < threads; i++) {
- thrs.emplace_back([this, &current, &mutex]() {
- while (true) {
- mutex.Acquire();
- int cur = current++;
- mutex.Release();
- if (cur >= NumFiles)
- break;
- Files[cur].Close();
- }
- });
- }
- for (auto& thread : thrs) {
- thread.join();
- }
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
-protected:
- int CreateDatObjects(int n, const TString& fname) {
- if (!(Files = (TFileType*)malloc(n * sizeof(TFileType))))
- ythrow yexception() << "can't alloc \"" << fname << "\" file array: " << LastSystemErrorText();
- int num = 0;
- char temp[FILENAME_MAX];
- for (int i = 0; i < n; ++i, ++num) {
- sprintf(temp, "%s[%d]", fname.data(), i);
- new (Files + i) TFileType(temp, PageSize, Pages, PagesOrBytes);
- }
- return num;
- }
-
- TString Name;
- size_t PageSize, Pages;
- int PagesOrBytes, NumFiles;
- TFileType* Files;
-};
-
-template <typename TVal, typename TKey, typename TCompressor = TFakeCompression, typename TPageFile = TOutputPageFile>
-class TOutDirectFile: protected TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> {
- typedef TOutDirectFileImpl<TVal, TKey, TCompressor, TPageFile> TBase;
-
-public:
- TOutDirectFile(const TString& name, size_t pagesize, size_t pages, size_t ipagesize, size_t ipages, int pagesOrBytes)
- : Name(name)
- , PageSize(pagesize)
- , Pages(pages)
- , IdxPageSize(ipagesize)
- , IdxPages(ipages)
- , PagesOrBytes(pagesOrBytes)
- {
- }
-
- ~TOutDirectFile() {
- Close();
- }
-
- void Open(const TString& fname) {
- int ret = TBase::Open(fname.data(), PageSize, Pages, IdxPageSize, IdxPages, PagesOrBytes);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname);
- Name = fname;
- }
-
- void Close() {
- int ret;
- if ((ret = TBase::GetError()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name);
- if ((ret = TBase::Close()))
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name);
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
- using TBase::Freeze;
- using TBase::Push;
- using TBase::PushWithExtInfo;
- using TBase::Reserve;
- using TBase::Unfreeze;
-
-protected:
- TString Name;
- size_t PageSize, Pages, IdxPageSize, IdxPages;
- int PagesOrBytes;
-};
-
-template <
- typename TVal,
- template <typename T> class TComparer,
- typename TCompress = TFakeCompression,
- typename TSieve = TFakeSieve<TVal>,
- typename TPageFile = TOutputPageFile,
- typename TFileTypes = TDefInterFileTypes>
-class TDatSorter: protected TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> {
- typedef TDatSorterImpl<TVal, TComparer<TVal>, TCompress, TSieve, TPageFile, TFileTypes> TBase;
-
-public:
- typedef TVal TRec;
-
-public:
- TDatSorter(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : Name(name)
- , Memory(memory)
- , PageSize(pagesize)
- , Pages(pages)
- , PagesOrBytes(pagesOrBytes)
- {
- Templ[0] = 0;
- }
-
- ~TDatSorter() {
- Close();
- Templ[0] = 0;
- }
-
- void Open(const TString& dirName) {
- int ret;
- if (ret = MakeSorterTempl(Templ, dirName.data())) {
- Templ[0] = 0;
- ythrow yexception() << ErrorMessage(ret, Name + " sorter: bad tempdir", dirName);
- }
- if ((ret = TBase::Open(Templ, PageSize, Pages, PagesOrBytes)))
- ythrow yexception() << ErrorMessage(ret, Name + " sorter: open error, temp dir", Templ);
- }
-
- void Sort(bool direct = false) {
- int ret = TBase::Sort(Memory, 1000, direct);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, Name + " sorter: sort error, temp dir", Templ, TVal::RecordSig);
- }
-
- void SortToFile(const TString& name) {
- int ret = TBase::SortToFile(name.data(), Memory);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToFile", name, TVal::RecordSig);
- }
-
- void SortToStream(TAutoPtr<IOutputStream> output) {
- int ret = TBase::SortToStream(output, Memory);
- if (ret)
- ythrow yexception() << ErrorMessage(ret, Name + "sorter: error in SortToStream", "", TVal::RecordSig);
- }
-
- void Close() {
- int ret1 = TBase::GetError();
- int ret2 = TBase::Close();
- if (Templ[0]) {
- *strrchr(Templ, GetDirectorySeparator()) = 0;
- RemoveDirWithContents(Templ);
- Templ[0] = 0;
- }
- if (ret1)
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret1, Name + "sorter: error before closing");
- if (ret2)
- if (!std::uncaught_exception())
- ythrow yexception() << ErrorMessage(ret2, Name + "sorter: error while closing");
- }
-
- int Sort(size_t memory, int maxportions, bool direct = false) {
- return TBase::Sort(memory, maxportions, direct);
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
- using TBase::GetPageSize;
- using TBase::GetPages;
- using TBase::Next;
- using TBase::NextPortion;
- using TBase::Push;
- using TBase::PushWithExtInfo;
- using TBase::UseSegmentSorter;
-
-protected:
- TString Name;
- size_t Memory, PageSize, Pages;
- int PagesOrBytes;
- char Templ[FILENAME_MAX];
-};
-
-template <typename TSorter>
-class TSorterArray {
-public:
- typedef TSorter TDatSorter;
-
-public:
- TSorterArray(const TString& name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : Name(name)
- , Memory(memory)
- , PageSize(pagesize)
- , Pages(pages)
- , PagesOrBytes(pagesOrBytes)
- , NumSorters(0)
- , Sorters(nullptr)
- {
- }
-
- ~TSorterArray() {
- for (int i = 0; i < NumSorters; ++i) {
- Sorters[i].Close();
- Sorters[i].~TSorter();
- }
- free(Sorters);
- Sorters = nullptr;
- NumSorters = 0;
- }
-
- TSorter& operator[](size_t pos) {
- return Sorters[pos];
- }
-
- void Open(int n, const TString& fname, size_t memory = 0) {
- if (!(Sorters = (TSorter*)malloc(n * sizeof(TSorter))))
- ythrow yexception() << "can't alloc \"" << fname << "\" sorter array: " << LastSystemErrorText();
- NumSorters = n;
- char temp[FILENAME_MAX];
- if (memory)
- Memory = memory;
- for (int i = 0; i < NumSorters; ++i) {
- sprintf(temp, "%s[%d]", Name.data(), i);
- new (Sorters + i) TSorter(temp, Memory, PageSize, Pages, PagesOrBytes);
- }
- for (int i = 0; i < NumSorters; ++i)
- Sorters[i].Open(fname);
- }
-
- void Close() {
- for (int i = 0; i < NumSorters; ++i)
- Sorters[i].Close();
- }
-
- const char* GetName() const {
- return Name.data();
- }
-
-protected:
- TString Name;
- size_t Memory, PageSize, Pages;
- int PagesOrBytes, NumSorters;
- TSorter* Sorters;
-};
-
-template <typename TVal, template <typename T> class TCompare, typename TSieve = TFakeSieve<TVal>>
-class TDatSorterArray: public TSorterArray<TDatSorter<TVal, TCompare, TSieve>> {
-public:
- TDatSorterArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : TSorterArray<TDatSorter<TVal, TCompare, TSieve>>(name, memory, pagesize, pages, pagesOrBytes)
- {
- }
-};
-
-template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression,
- typename TSieve = TFakeSieve<TVal>, typename TPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
-class TDatSorterMemo: public TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> {
- typedef TDatSorter<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes> TSorter;
-
-public:
- TOutDatFile<TVal> Memo;
- TString Home;
- bool OpenReq;
- bool Opened;
- bool UseDirectWrite;
-
-public:
- TDatSorterMemo(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : TSorter(name, memory, pagesize, pages, pagesOrBytes)
- , Memo(name, pagesize, memory, 0)
- {
- OpenReq = false;
- Opened = false;
- UseDirectWrite = false;
- }
-
- void Open(const TString& home) {
- OpenReq = true;
- // TSorter::Open(home);
- Home = home;
- Memo.Open(nullptr);
- Memo.Freeze();
- }
-
- void Reopen(const char* home) {
- Close();
- Open(home);
- }
-
- void Open() {
- if (!OpenReq) {
- OpenReq = true;
- Memo.Open(nullptr);
- Memo.Freeze();
- }
- }
-
- void OpenIfNeeded() {
- if (OpenReq && !Opened) {
- if (!Home)
- ythrow yexception() << "Temp directory not specified, call Open(char*) first : " << TSorter::Name;
- TSorter::Open(Home);
- Opened = true;
- }
- }
-
- TVal* Reserve(size_t len) {
- if (TExtInfoType<TVal>::Exists)
- return ReserveWithExt(len, 0);
-
- TVal* u = Memo.Reserve(len);
- if (!u) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Freeze();
- u = Memo.Reserve(len);
- }
- TSorter::PushWithExtInfo(u);
- return u;
- }
-
- TVal* ReserveWithExt(size_t len, size_t extSize) {
- size_t fullLen = len + len_long((i64)extSize) + extSize;
- TVal* u = Memo.Reserve(fullLen);
- if (!u) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Freeze();
- u = Memo.Reserve(fullLen);
- if (!u) {
- if (fullLen > Memo.GetPageSize()) {
- ythrow yexception() << "Size of element and " << len << " size of extInfo " << extSize
- << " is larger than page size " << Memo.GetPageSize();
- }
- ythrow yexception() << "going to insert a null pointer. Bad.";
- }
- }
- out_long((i64)extSize, (char*)u + len);
- TSorter::PushWithExtInfo(u);
- return u;
- }
-
- char* GetReservedExt(TVal* rec, size_t len, size_t extSize) {
- return (char*)rec + len + len_long((i64)extSize);
- }
-
- const TVal* Push(const TVal* v, const typename TExtInfoType<TVal>::TResult* extInfo = nullptr) {
- const TVal* u = Memo.Push(v, extInfo);
- if (!u) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Freeze();
- u = Memo.Push(v, extInfo);
- if (!u) {
- if (SizeOf(v) > Memo.GetPageSize()) {
- ythrow yexception() << "Size of element " << SizeOf(v)
- << " is larger than page size " << Memo.GetPageSize();
- }
- ythrow yexception() << "going to insert a null pointer. Bad.";
- }
- }
- TSorter::PushWithExtInfo(u);
- return u;
- }
-
- const TVal* Push(const TVal* v, const ui8* extInfoRaw, size_t extLen) {
- const TVal* u = Memo.Push(v, extInfoRaw, extLen);
- if (!u) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Freeze();
- u = Memo.Push(v, extInfoRaw, extLen);
- if (!u) {
- if (SizeOf(v) > Memo.GetPageSize()) {
- ythrow yexception() << "Size of element " << SizeOf(v)
- << " is larger than page size " << Memo.GetPageSize();
- }
- ythrow yexception() << "going to insert a null pointer. Bad..";
- }
- }
- TSorter::PushWithExtInfo(u);
- return u;
- }
-
- const TVal* PushWithExtInfo(const TVal* v) {
- const TVal* u = Memo.PushWithExtInfo(v);
- if (!u) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Freeze();
- u = Memo.PushWithExtInfo(v);
- if (!u) {
- if (SizeOf(v) > Memo.GetPageSize()) {
- ythrow yexception() << "Size of element " << SizeOf(v)
- << " is larger than page size " << Memo.GetPageSize();
- }
- ythrow yexception() << "going to insert a null pointer. Bad...";
- }
- }
- TSorter::PushWithExtInfo(u);
- return u;
- }
-
- void Sort(bool direct = false) {
- if (Opened) {
- TSorter::NextPortion(UseDirectWrite);
- Memo.Close();
- OpenReq = false;
- TSorter::Sort(direct);
- } else {
- TSorter::SortPortion();
- }
- }
-
- const TVal* Next() {
- return Opened ? TSorter::Next() : TSorter::Nextp();
- }
-
- bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
- return NMicroBDB::GetExtInfo(Current(), extInfo);
- }
-
- const ui8* GetExtInfoRaw(size_t* len) const {
- return NMicroBDB::GetExtInfoRaw(Current(), len);
- }
-
- const TVal* Current() const {
- return Opened ? TSorter::Current() : TSorter::Currentp();
- }
-
- int NextPortion() {
- OpenIfNeeded();
- return TSorter::NextPortion(UseDirectWrite);
- }
-
- void SortToFile(const char* name) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Close();
- OpenReq = false;
- TSorter::SortToFile(name);
- }
-
- void SortToStream(TAutoPtr<IOutputStream> output) {
- OpenIfNeeded();
- TSorter::NextPortion(UseDirectWrite);
- Memo.Close();
- OpenReq = false;
- TSorter::SortToStream(output);
- }
-
- template <typename TKey, typename TOutCompress>
- void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) {
- Sort();
- TOutDirectFile<TVal, TKey, TOutCompress> out(TSorter::Name, TSorter::PageSize, TSorter::Pages, ipagesize, ipages, TSorter::PagesOrBytes);
- out.Open(name);
- while (const TVal* rec = Next())
- out.PushWithExtInfo(rec);
- out.Close();
- }
-
- template <typename TKey>
- void SortToDirectFile(const char* name, size_t ipagesize, size_t ipages) {
- SortToDirectFile<TKey, TCompress>(name, ipagesize, ipages);
- }
-
- void CloseSorter() {
- if (Opened)
- TSorter::Close();
- else
- TSorter::Closep();
- Memo.Freeze();
- Opened = false;
- }
-
- void Close() {
- if (Opened)
- TSorter::Close();
- else
- TSorter::Closep();
- Memo.Close();
- OpenReq = false;
- Opened = false;
- }
-
- int SavePortions(const char* mask) {
- return TSorter::SavePortions(mask, UseDirectWrite);
- }
-
-public:
- using TSorter::RestorePortions;
-};
-
-template <typename TVal, template <typename T> class TCompare, typename TCompress = TFakeCompression,
- typename TSieve = TFakeSieve<TVal>, class TPageFile = TOutputPageFile, class TFileTypes = TDefInterFileTypes>
-class TDatSorterMemoArray: public TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> {
-public:
- typedef TSorterArray<TDatSorterMemo<TVal, TCompare, TCompress, TSieve, TPageFile, TFileTypes>> TBase;
-
- TDatSorterMemoArray(const char* name, size_t memory, size_t pagesize, size_t pages, int pagesOrBytes = 1)
- : TBase(name, memory, pagesize, pages, pagesOrBytes)
- {
- }
-};
-
-#if defined(_MSC_VER)
-#pragma warning(pop)
-#endif
diff --git a/library/cpp/microbdb/sorter.h b/library/cpp/microbdb/sorter.h
deleted file mode 100644
index b2e7390377d..00000000000
--- a/library/cpp/microbdb/sorter.h
+++ /dev/null
@@ -1,677 +0,0 @@
-#pragma once
-
-#include <util/ysaveload.h>
-#include <util/generic/algorithm.h>
-#include <contrib/libs/libc_compat/include/link/link.h>
-
-#include "header.h"
-#include "heap.h"
-#include "extinfo.h"
-#include "input.h"
-#include "output.h"
-
-#ifdef TEST_MERGE
-#define MBDB_SORT_FUN ::StableSort
-#else
-#define MBDB_SORT_FUN ::Sort
-#endif
-
-template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile, typename TFileTypes>
-class TDatSorterImpl;
-
-template <class TVal>
-struct TFakeSieve {
- static inline int Sieve(TVal*, const TVal*) noexcept {
- return 0;
- }
-};
-
-template <class TSieve>
-struct TIsSieveFake {
- static const bool Result = false;
-};
-
-template <class T>
-struct TIsSieveFake<TFakeSieve<T>> {
- static const bool Result = true;
-};
-
-class TDefInterFileTypes {
-public:
- typedef TOutputPageFile TOutPageFile;
- typedef TInputPageFile TInPageFile;
-};
-
-//class TCompressedInterFileTypes;
-
-template <class TVal, class TCompare, typename TCompress, typename TSieve, typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
-class TDatSorterImplBase: protected THeapIter<TVal, TInDatFileImpl<TVal, TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>>>, TCompare> {
- typedef TOutputRecordIterator<TVal, TOutputPageIterator<typename TFileTypes::TOutPageFile>, TFakeIndexer, TCompress> TTmpRecIter;
- typedef TInputRecordIterator<TVal, TInputPageIterator<typename TFileTypes::TInPageFile>> TInTmpRecIter;
-
-public:
- typedef TOutDatFileImpl<TVal, TTmpRecIter> TTmpOut;
- typedef TInDatFileImpl<TVal, TInTmpRecIter> TTmpIn;
-
- typedef TOutDatFileImpl<TVal, TOutputRecordIterator<TVal, TOutputPageIterator<TOutPageFile>, TFakeIndexer, TCompress>> TOut;
- typedef THeapIter<TVal, TTmpIn, TCompare> TMyHeap;
- typedef TVector<const TVal*> TMyVector;
- typedef typename TMyVector::iterator TMyIterator;
-
- class IPortionSorter {
- public:
- virtual ~IPortionSorter() {
- }
-
- virtual void Sort(TMyVector&, TTmpOut*) = 0;
- };
-
- class TDefaultSorter: public IPortionSorter {
- public:
- void Sort(TMyVector& vector, TTmpOut* out) override {
- MBDB_SORT_FUN(vector.begin(), vector.end(), TCompare());
-
- const typename TMyVector::const_iterator
- end = (TIsSieveFake<TSieve>::Result) ? vector.end() : TDatSorterImplBase::SieveRange(vector.begin(), vector.end());
-
- for (typename TMyVector::const_iterator it = vector.begin(); it != end; ++it) {
- out->PushWithExtInfo(*it);
- }
- }
- };
-
- class TSegmentedSorter: public IPortionSorter {
- class TAdaptor {
- typedef typename TMyVector::const_iterator TConstIterator;
-
- public:
- TAdaptor(TConstIterator b, TConstIterator e)
- : Curr_(b)
- , End_(e)
- {
- --Curr_;
- }
-
- inline const TVal* Current() const {
- return *Curr_;
- }
-
- inline const TVal* Next() {
- ++Curr_;
-
- if (Curr_ == End_) {
- return nullptr;
- }
-
- return *Curr_;
- }
-
- private:
- TConstIterator Curr_;
- TConstIterator End_;
- };
-
- typedef THeapIter<TVal, TAdaptor, TCompare> TPortionsHeap;
-
- public:
- void Sort(TMyVector& vector, TTmpOut* out) override {
- TVector<TAdaptor> bounds;
- typename TMyVector::iterator
- it = vector.begin();
- const size_t portions = Max<size_t>(1, (vector.size() * sizeof(TVal)) / (4 << 20));
- const size_t step = vector.size() / portions;
-
- // Sort segments
- while (it != vector.end()) {
- const typename TMyVector::iterator
- end = Min(it + step, vector.end());
-
- MBDB_SORT_FUN(it, end, TCompare());
-
- bounds.push_back(TAdaptor(it, end));
-
- it = end;
- }
-
- //
- // Merge result
- //
-
- TPortionsHeap heap(bounds);
-
- if (TIsSieveFake<TSieve>::Result) {
- while (const TVal* val = heap.Next()) {
- out->PushWithExtInfo(val);
- }
- } else {
- const TVal* val = heap.Next();
- const TVal* prev = out->PushWithExtInfo(val);
-
- for (val = heap.Next(); val && prev; val = heap.Next()) {
- if (TSieve::Sieve((TVal*)prev, val)) {
- continue;
- }
-
- prev = out->PushWithExtInfo(val);
- }
-
- if (prev) {
- TSieve::Sieve((TVal*)prev, prev);
- }
- }
- }
- };
-
-public:
- TDatSorterImplBase()
- : Sorter(new TDefaultSorter)
- {
- InFiles = nullptr;
- TempBuf = nullptr;
- Ptr = Vector.end();
- Cur = nullptr;
- Portions = CPortions = Error = 0;
- }
-
- ~TDatSorterImplBase() {
- Close();
- }
-
- int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
- Portions = CPortions = Error = 0;
- TempBuf = strdup(templ);
- Pagesize = pagesize;
- if (pagesOrBytes)
- Pages = pages;
- else
- Pages = pages / pagesize;
- Pages = Max(1, Pages);
- return 0;
- }
-
- void Push(const TVal* v) {
- // Serialized extInfo must follow a record being pushed, therefore, to avoid
- // unintentional misusage (as if when you are adding TExtInfo in your record
- // type: you may forget to check your sorting routines and get a segfault as
- // a result).
- // PushWithExtInfo(v) should be called on records with extInfo.
- static_assert(!TExtInfoType<TVal>::Exists, "expect !TExtInfoType<TVal>::Exists");
-
- Vector.push_back(v);
- }
-
- void PushWithExtInfo(const TVal* v) {
- Vector.push_back(v);
- }
-
- int SortPortion() {
- Ptr = Vector.end();
- Cur = nullptr;
- if (!Vector.size() || Error)
- return Error;
-
- MBDB_SORT_FUN(Vector.begin(), Vector.end(), TCompare());
-
- if (!TIsSieveFake<TSieve>::Result) {
- const typename TMyVector::iterator
- end = SieveRange(Vector.begin(), Vector.end());
-
- Vector.resize(end - Vector.begin());
- }
-
- Ptr = Vector.begin();
- Cur = nullptr;
- return 0;
- }
-
- const TVal* Nextp() {
- Cur = Ptr == Vector.end() ? nullptr : *Ptr++;
- return Cur;
- }
-
- const TVal* Currentp() const {
- return Cur;
- }
-
- void Closep() {
- Vector.clear();
- Ptr = Vector.end();
- Cur = nullptr;
- }
-
- int NextPortion(bool direct = false) {
- if (!Vector.size() || Error)
- return Error;
-
- TTmpOut out;
- int ret, ret1;
- char fname[FILENAME_MAX];
-
- snprintf(fname, sizeof(fname), TempBuf, Portions++);
- if ((ret = out.Open(fname, Pagesize, Pages, 1, direct)))
- return Error = ret;
-
- Sorter->Sort(Vector, &out);
-
- Vector.erase(Vector.begin(), Vector.end());
- ret = out.GetError();
- ret1 = out.Close();
- Error = Error ? Error : ret ? ret : ret1;
- if (Error)
- unlink(fname);
- return Error;
- }
-
- int SavePortions(const char* mask, bool direct = false) {
- char srcname[PATH_MAX], dstname[PATH_MAX];
- if (Vector.size())
- NextPortion(direct);
- for (int i = 0; i < Portions; i++) {
- char num[10];
- sprintf(num, "%i", i);
- snprintf(srcname, sizeof(srcname), TempBuf, i);
- snprintf(dstname, sizeof(dstname), mask, num);
- int res = rename(srcname, dstname);
- if (res)
- return res;
- }
- snprintf(dstname, sizeof(dstname), mask, "count");
- TOFStream fcount(dstname);
- Save(&fcount, Portions);
- fcount.Finish();
- return 0;
- }
-
- int RestorePortions(const char* mask) {
- char srcname[PATH_MAX], dstname[PATH_MAX];
- snprintf(srcname, sizeof(srcname), mask, "count");
- TIFStream fcount(srcname);
- Load(&fcount, Portions);
- for (int i = 0; i < Portions; i++) {
- char num[10];
- sprintf(num, "%i", i);
- snprintf(dstname, sizeof(dstname), TempBuf, i);
- snprintf(srcname, sizeof(srcname), mask, num);
- unlink(dstname);
- int res = link(srcname, dstname);
- if (res)
- return res;
- }
- return 0;
- }
-
- int RestorePortions(const char* mask, ui32 count) {
- char srcname[PATH_MAX], dstname[PATH_MAX];
- ui32 portions;
- TVector<ui32> counts;
- for (ui32 j = 0; j < count; j++) {
- snprintf(srcname, sizeof(srcname), mask, j, "count");
- TIFStream fcount(srcname);
- Load(&fcount, portions);
- counts.push_back(portions);
- Portions += portions;
- }
- ui32 p = 0;
- for (ui32 j = 0; j < count; j++) {
- int cnt = counts[j];
- for (int i = 0; i < cnt; i++, p++) {
- char num[10];
- sprintf(num, "%i", i);
- snprintf(dstname, sizeof(dstname), TempBuf, p);
- snprintf(srcname, sizeof(srcname), mask, j, num);
- unlink(dstname);
- int res = link(srcname, dstname);
- if (res) {
- fprintf(stderr, "Can not link %s to %s\n", srcname, dstname);
- return res;
- }
- }
- }
- return 0;
- }
-
- int Sort(size_t memory, int maxportions = 1000, bool direct = false) {
- int ret, end, beg, i;
- char fname[FILENAME_MAX];
-
- if (Vector.size())
- NextPortion();
-
- if (Error)
- return Error;
- if (!Portions) {
- TMyHeap::Init(&DummyFile, 1); // closed file
- HPages = 1;
- return 0;
- }
-
- Optimize(memory, maxportions);
- if (!(InFiles = new TTmpIn[MPortions]))
- return MBDB_NO_MEMORY;
-
- for (beg = 0; beg < Portions && !Error; beg = end) {
- end = (int)Min(beg + FPortions, Portions);
- for (i = beg; i < end && !Error; i++) {
- snprintf(fname, sizeof(fname), TempBuf, i);
- if ((ret = InFiles[i - beg].Open(fname, HPages, 1, nullptr, direct)))
- Error = Error ? Error : ret;
- }
- if (Error)
- return Error;
- TMyHeap::Init(InFiles, end - beg);
- if (end != Portions) {
- TTmpOut out;
- const TVal* v;
- snprintf(fname, sizeof(fname), TempBuf, Portions++);
- if ((ret = out.Open(fname, Pagesize, HPages)))
- return Error = Error ? Error : ret;
- while ((v = TMyHeap::Next()))
- out.PushWithExtInfo(v);
- ret = out.GetError();
- Error = Error ? Error : ret;
- ret = out.Close();
- Error = Error ? Error : ret;
- for (i = beg; i < end; i++) {
- ret = InFiles[i - beg].Close();
- Error = Error ? Error : ret;
- snprintf(fname, sizeof(fname), TempBuf, CPortions++);
- unlink(fname);
- }
- }
- FPortions = MPortions;
- }
- return Error;
- }
-
- int Close() {
- char fname[FILENAME_MAX];
- delete[] InFiles;
- InFiles = nullptr;
- Closep();
- for (int i = CPortions; i < Portions; i++) {
- snprintf(fname, sizeof(fname), TempBuf, i);
- unlink(fname);
- }
- CPortions = Portions = 0;
- free(TempBuf);
- TempBuf = nullptr;
- return Error;
- }
-
- void UseSegmentSorter() {
- Sorter.Reset(new TSegmentedSorter);
- }
-
- inline int GetError() const {
- return Error;
- }
-
- inline int GetPages() const {
- return Pages;
- }
-
- inline int GetPageSize() const {
- return Pagesize;
- }
-
-private:
- static TMyIterator SieveRange(const TMyIterator begin, const TMyIterator end) {
- TMyIterator it = begin;
- TMyIterator prev = begin;
-
- for (++it; it != end; ++it) {
- if (TSieve::Sieve((TVal*)*prev, *it)) {
- continue;
- }
-
- ++prev;
-
- if (it != prev) {
- *prev = *it;
- }
- }
-
- TSieve::Sieve((TVal*)*prev, *prev);
-
- return ++prev;
- }
-
-protected:
- void Optimize(size_t memory, int maxportions, size_t fbufmax = 256u << 20) {
- maxportions = (int)Min((size_t)maxportions, memory / Pagesize) - 1;
- size_t maxpages = Max((size_t)1u, fbufmax / Pagesize);
-
- if (maxportions <= 2) {
- FPortions = MPortions = 2;
- HPages = 1;
- return;
- }
- if (maxportions >= Portions) {
- FPortions = MPortions = Portions;
- HPages = (int)Min(memory / ((Portions + 1) * Pagesize), maxpages);
- return;
- }
- if (((Portions + maxportions - 1) / maxportions) <= maxportions) {
- while (((Portions + maxportions - 1) / maxportions) <= maxportions)
- --maxportions;
- MPortions = ++maxportions;
- int total = ((Portions + maxportions - 1) / maxportions) + Portions;
- FPortions = (total % maxportions) ? (total % maxportions) : MPortions;
- HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages);
- return;
- }
- FPortions = MPortions = maxportions;
- HPages = (int)Min(memory / ((MPortions + 1) * Pagesize), maxpages);
- }
-
- TMyVector Vector;
- typename TMyVector::iterator Ptr;
- const TVal* Cur;
- TTmpIn *InFiles, DummyFile;
- char* TempBuf;
- int Portions, CPortions, Pagesize, Pages, Error;
- int FPortions, MPortions, HPages;
- THolder<IPortionSorter> Sorter;
-};
-
-template <class TVal, class TCompare, typename TCompress>
-class TDatSorterImpl<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes>
- : public TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> {
- typedef TDatSorterImplBase<TVal, TCompare, TCompress, TFakeSieve<TVal>, TOutputPageFile, TDefInterFileTypes> TBase;
-
-public:
- int SortToFile(const char* name, size_t memory, int maxportions = 1000) {
- int ret = TBase::Sort(memory, maxportions);
- if (ret)
- return ret;
- typename TBase::TOut out;
- if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages)))
- return ret;
- const TVal* rec;
- while ((rec = Next()))
- out.PushWithExtInfo(rec);
- if ((ret = out.GetError()))
- return ret;
- if ((ret = out.Close()))
- return ret;
- if ((ret = TBase::Close()))
- return ret;
- return 0;
- }
-
- int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) {
- int ret = TBase::Sort(memory, maxportions);
- if (ret)
- return ret;
- typename TBase::TOut out;
- if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages)))
- return ret;
- const TVal* rec;
- while ((rec = Next()))
- out.PushWithExtInfo(rec);
- if ((ret = out.GetError()))
- return ret;
- if ((ret = out.Close()))
- return ret;
- if ((ret = TBase::Close()))
- return ret;
- return 0;
- }
-
- const TVal* Next() {
- return TBase::TMyHeap::Next();
- }
-
- const TVal* Current() const {
- return TBase::TMyHeap::Current();
- }
-
- bool GetExtInfo(typename TExtInfoType<TVal>::TResult* extInfo) const {
- return TBase::TMyHeap::GetExtInfo(extInfo);
- }
-
- const ui8* GetExtInfoRaw(size_t* len) const {
- return TBase::TMyHeap::GetExtInfoRaw(len);
- }
-};
-
-template <class TVal, class TCompare, typename TCompress, typename TSieve,
- typename TOutPageFile = TOutputPageFile, typename TFileTypes = TDefInterFileTypes>
-class TDatSorterImpl: public TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> {
- typedef TDatSorterImplBase<TVal, TCompare, TCompress, TSieve, TOutPageFile, TFileTypes> TBase;
-
-public:
- TDatSorterImpl()
- : Cur(nullptr)
- , Prev(nullptr)
- {
- }
-
- int SortToFile(const char* name, size_t memory, int maxportions = 1000) {
- int ret = Sort(memory, maxportions);
- if (ret)
- return ret;
- typename TBase::TOut out;
- if ((ret = out.Open(name, TBase::Pagesize, TBase::HPages)))
- return ret;
- const TVal* rec;
- while ((rec = Next()))
- out.PushWithExtInfo(rec);
- if ((ret = out.GetError()))
- return ret;
- if ((ret = out.Close()))
- return ret;
- if ((ret = TBase::Close()))
- return ret;
- return 0;
- }
-
- int SortToStream(TAutoPtr<IOutputStream> output, size_t memory, int maxportions = 1000) {
- int ret = Sort(memory, maxportions);
- if (ret)
- return ret;
- typename TBase::TOut out;
- if ((ret = out.Open(output, TBase::Pagesize, TBase::HPages)))
- return ret;
- const TVal* rec;
- while ((rec = Next()))
- out.PushWithExtInfo(rec);
- if ((ret = out.GetError()))
- return ret;
- if ((ret = out.Close()))
- return ret;
- if ((ret = TBase::Close()))
- return ret;
- return 0;
- }
-
- int Open(const char* templ, size_t pagesize, size_t pages, int pagesOrBytes = 1) {
- int res = TBase::Open(templ, pagesize, pages, pagesOrBytes);
- Prev = nullptr;
- Cur = nullptr;
- return res;
- }
-
- int Sort(size_t memory, int maxportions = 1000, bool direct = false) {
- int res = TBase::Sort(memory, maxportions, direct);
- if (!res) {
- const TVal* rec = TBase::TMyHeap::Next();
- if (rec) {
- size_t els, es;
- size_t sz = NMicroBDB::SizeOfExt(rec, &els, &es);
- sz += els + es;
- if (!TExtInfoType<TVal>::Exists)
- Cur = (TVal*)malloc(sizeof(TVal));
- else
- Cur = (TVal*)malloc(TBase::Pagesize);
- memcpy(Cur, rec, sz);
- }
- }
- return res;
- }
-
- // Prev = last returned
- // Cur = current accumlating with TSieve
-
- const TVal* Next() {
- if (!Cur) {
- if (Prev) {
- free(Prev);
- Prev = nullptr;
- }
- return nullptr;
- }
- const TVal* rec;
-
- if (TIsSieveFake<TSieve>::Result)
- rec = TBase::TMyHeap::Next();
- else {
- do {
- rec = TBase::TMyHeap::Next();
- } while (rec && TSieve::Sieve((TVal*)Cur, rec));
- }
-
- if (!Prev) {
- if (!TExtInfoType<TVal>::Exists)
- Prev = (TVal*)malloc(sizeof(TVal));
- else
- Prev = (TVal*)malloc(TBase::Pagesize);
- }
- size_t els, es;
- size_t sz = NMicroBDB::SizeOfExt(Cur, &els, &es);
- sz += els + es;
- memcpy(Prev, Cur, sz);
-
- if (rec) {
- sz = NMicroBDB::SizeOfExt(rec, &els, &es);
- sz += els + es;
- memcpy(Cur, rec, sz);
- } else {
- TSieve::Sieve((TVal*)Cur, Cur);
- free(Cur);
- Cur = nullptr;
- }
- return Prev;
- }
-
- const TVal* Current() const {
- return Prev;
- }
-
- int Close() {
- int res = TBase::Close();
- if (Prev) {
- free(Prev);
- Prev = nullptr;
- }
- if (Cur) {
- free(Cur);
- Cur = nullptr;
- }
- return res;
- }
-
-protected:
- TVal* Cur;
- TVal* Prev;
-};
diff --git a/library/cpp/microbdb/sorterdef.h b/library/cpp/microbdb/sorterdef.h
deleted file mode 100644
index 8834b5fff80..00000000000
--- a/library/cpp/microbdb/sorterdef.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#define MAKESORTERTMPL(TRecord, MemberFunc) \
- template <typename T> \
- struct MemberFunc; \
- template <> \
- struct MemberFunc<TRecord> { \
- bool operator()(const TRecord* l, const TRecord* r) { \
- return TRecord ::MemberFunc(l, r) < 0; \
- } \
- int operator()(const TRecord* l, const TRecord* r, int) { \
- return TRecord ::MemberFunc(l, r); \
- } \
- }
-
-template <typename T>
-static inline int compare(const T& a, const T& b) {
- return (a < b) ? -1 : (a > b);
-}
diff --git a/library/cpp/microbdb/utility.h b/library/cpp/microbdb/utility.h
deleted file mode 100644
index 5c86061bca0..00000000000
--- a/library/cpp/microbdb/utility.h
+++ /dev/null
@@ -1,75 +0,0 @@
-#pragma once
-
-#include "microbdb.h"
-
-template <class TRecord, template <class T> class TCompare>
-int SortData(const TFile& ifile, const TFile& ofile, const TDatMetaPage* meta, size_t memory, const char* tmpDir = nullptr) {
- char templ[FILENAME_MAX];
- TInDatFileImpl<TRecord> datin;
- TOutDatFileImpl<TRecord> datout;
- TDatSorterImpl<TRecord, TCompare<TRecord>, TFakeCompression, TFakeSieve<TRecord>> sorter;
- const TRecord* u;
- int ret;
-
- const size_t minMemory = (2u << 20);
- memory = Max(memory, minMemory + minMemory / 2);
- if (datin.Open(ifile, meta, memory - minMemory, 0))
- err(1, "can't read input file");
-
- size_t outpages = Max((size_t)2u, minMemory / datin.GetPageSize());
- memory -= outpages * datin.GetPageSize();
-
- if (ret = MakeSorterTempl(templ, tmpDir))
- err(1, "can't create tempdir in \"%s\"; error: %d\n", templ, ret);
-
- if (sorter.Open(templ, datin.GetPageSize(), outpages)) {
- *strrchr(templ, LOCSLASH_C) = 0;
- RemoveDirWithContents(templ);
- err(1, "can't open sorter");
- }
-
- while (1) {
- datin.Freeze();
- while ((u = datin.Next()))
- sorter.PushWithExtInfo(u);
- sorter.NextPortion();
- if (datin.GetError() || datin.IsEof())
- break;
- }
-
- if (datin.GetError()) {
- *strrchr(templ, LOCSLASH_C) = 0;
- RemoveDirWithContents(templ);
- err(1, "in data file error %d", datin.GetError());
- }
- if (datin.Close()) {
- *strrchr(templ, LOCSLASH_C) = 0;
- RemoveDirWithContents(templ);
- err(1, "can't close in data file");
- }
-
- sorter.Sort(memory);
-
- if (datout.Open(ofile, datin.GetPageSize(), outpages)) {
- *strrchr(templ, LOCSLASH_C) = 0;
- RemoveDirWithContents(templ);
- err(1, "can't write out file");
- }
-
- while ((u = sorter.Next()))
- datout.PushWithExtInfo(u);
-
- if (sorter.GetError())
- err(1, "sorter error %d", sorter.GetError());
- if (sorter.Close())
- err(1, "can't close sorter");
-
- *strrchr(templ, LOCSLASH_C) = 0;
- RemoveDirWithContents(templ);
-
- if (datout.GetError())
- err(1, "out data file error %d", datout.GetError());
- if (datout.Close())
- err(1, "can't close out data file");
- return 0;
-}
diff --git a/library/cpp/microbdb/wrappers.h b/library/cpp/microbdb/wrappers.h
deleted file mode 100644
index 38eb8edebc0..00000000000
--- a/library/cpp/microbdb/wrappers.h
+++ /dev/null
@@ -1,637 +0,0 @@
-#pragma once
-
-#include "microbdb.h"
-
-#define MAKEFILTERTMPL(TRecord, MemberFunc, NS) \
- template <typename T> \
- struct MemberFunc; \
- template <> \
- struct MemberFunc<TRecord> { \
- bool operator()(const TRecord* r) { \
- return NS::MemberFunc(r); \
- } \
- }
-
-#define MAKEJOINTMPL(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \
- template <typename A, typename B> \
- struct MemberFunc; \
- template <> \
- struct MemberFunc<TRecordA, TRecordB> { \
- int operator()(const TRecordA* l, const TRecordB* r) { \
- return NS::MemberFunc(l, r); \
- } \
- }; \
- typedef TMergeRec<TRecordA, TRecordB> TMergeType
-
-#define MAKEJOINTMPL2(TRecordA, TRecordB, MemberFunc, StructName, TMergeType) \
- template <typename A, typename B> \
- struct StructName; \
- template <> \
- struct StructName<TRecordA, TRecordB> { \
- int operator()(const TRecordA* l, const TRecordB* r) { \
- return MemberFunc(l, r); \
- } \
- }; \
- typedef TMergeRec<TRecordA, TRecordB> TMergeType
-
-#define MAKEJOINTMPLLEFT(TRecordA, TRecordB, MemberFunc, NS, TMergeType) \
- template <typename A, typename B> \
- struct MemberFunc; \
- template <> \
- struct MemberFunc<TRecordA, TRecordB> { \
- int operator()(const TRecordA* l, const TRecordB* r) { \
- return NS::MemberFunc(l->RecA, r); \
- } \
- }; \
- typedef TMergeRec<TRecordA, TRecordB> TMergeType
-
-template <class TRec>
-class IDatNextSource {
-public:
- virtual const TRec* Next() = 0;
- virtual void Work() {
- }
-};
-
-template <class TRec>
-class IDatNextReceiver {
-public:
- IDatNextReceiver(IDatNextSource<TRec>& source)
- : Source(source)
- {
- }
-
- virtual void Work() {
- Source.Work();
- }
-
-protected:
- IDatNextSource<TRec>& Source;
-};
-
-template <class TInRec, class TOutRec>
-class IDatNextChannel: public IDatNextReceiver<TInRec>, public IDatNextSource<TOutRec> {
-public:
- IDatNextChannel(IDatNextSource<TInRec>& source)
- : IDatNextReceiver<TInRec>(source)
- {
- }
-
- virtual void Work() {
- IDatNextReceiver<TInRec>::Work();
- }
-};
-
-class IDatWorker {
-public:
- virtual void Work() = 0;
-};
-
-template <class TRec>
-class IDatPushReceiver {
-public:
- virtual void Push(const TRec* rec) = 0;
- virtual void Work() = 0;
-};
-
-template <class TRec>
-class IDatPushSource {
-public:
- IDatPushSource(IDatPushReceiver<TRec>& receiver)
- : Receiver(receiver)
- {
- }
-
- virtual void Work() {
- Receiver.Work();
- }
-
-protected:
- IDatPushReceiver<TRec>& Receiver;
-};
-
-template <class TInRec, class TOutRec>
-class IDatPushChannel: public IDatPushReceiver<TInRec>, public IDatPushSource<TOutRec> {
-public:
- IDatPushChannel(IDatPushReceiver<TOutRec>& receiver)
- : IDatPushSource<TOutRec>(receiver)
- {
- }
-
- virtual void Work() {
- IDatPushSource<TOutRec>::Work();
- }
-};
-
-template <class TRec>
-class IDatNextToPush: public IDatNextReceiver<TRec>, public IDatPushSource<TRec> {
- typedef IDatNextReceiver<TRec> TNextReceiver;
- typedef IDatPushSource<TRec> TPushSource;
-
-public:
- IDatNextToPush(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver)
- : TNextReceiver(source)
- , TPushSource(receiver)
- {
- }
-
- virtual void Work() {
- const TRec* rec;
- while (rec = TNextReceiver::Source.Next())
- TPushSource::Receiver.Push(rec);
- TPushSource::Work();
- TNextReceiver::Work();
- }
-};
-
-template <class TRec>
-class TDatNextPNSplitter: public IDatNextReceiver<TRec>, public IDatNextSource<TRec>, public IDatPushSource<TRec> {
-public:
- TDatNextPNSplitter(IDatNextSource<TRec>& source, IDatPushReceiver<TRec>& receiver)
- : IDatNextReceiver<TRec>(source)
- , IDatNextSource<TRec>()
- , IDatPushSource<TRec>(receiver)
- {
- }
-
- const TRec* Next() {
- const TRec* rec = IDatNextReceiver<TRec>::Source.Next();
- if (rec) {
- IDatPushSource<TRec>::Receiver.Push(rec);
- return rec;
- } else {
- return 0;
- }
- }
-
- virtual void Work() {
- IDatNextReceiver<TRec>::Work();
- IDatPushSource<TRec>::Work();
- }
-};
-
-template <class TRec, class TOutRecA = TRec, class TOutRecB = TRec>
-class TDatPushPPSplitter: public IDatPushReceiver<TRec>, public IDatPushSource<TOutRecA>, public IDatPushSource<TOutRecB> {
-public:
- TDatPushPPSplitter(IDatPushReceiver<TOutRecA>& receiverA, IDatPushReceiver<TOutRecB>& receiverB)
- : IDatPushSource<TOutRecA>(receiverA)
- , IDatPushSource<TOutRecB>(receiverB)
- {
- }
-
- void Push(const TRec* rec) {
- IDatPushSource<TOutRecA>::Receiver.Push(rec);
- IDatPushSource<TOutRecB>::Receiver.Push(rec);
- }
-
- void Work() {
- IDatPushSource<TOutRecA>::Work();
- IDatPushSource<TOutRecB>::Work();
- }
-};
-
-template <class TRec>
-class TFastInDatFile: public TInDatFile<TRec>, public IDatNextSource<TRec> {
-public:
- typedef TInDatFile<TRec> Base;
-
- TFastInDatFile(const char* name, bool open = true, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0)
- : TInDatFile<TRec>(name, pages, pagesOrBytes)
- , FileName(name)
- {
- if (open)
- Base::Open(name);
- }
-
- void Open() {
- Base::Open(FileName);
- }
-
- template <class TPassRec>
- bool PassToUid(const TRec* inrec, const TPassRec* torec) {
- inrec = Base::Current();
- while (inrec && CompareUids(inrec, torec) < 0)
- inrec = Base::Next();
- return (inrec && CompareUids(inrec, torec) == 0);
- }
-
- void Work() {
- Base::Close();
- }
-
- const TRec* Next() {
- return Base::Next();
- }
-
-private:
- TString FileName;
-};
-
-template <class TRec>
-class TPushOutDatFile: public TOutDatFile<TRec>, public IDatPushReceiver<TRec> {
-public:
- typedef TOutDatFile<TRec> Base;
-
- TPushOutDatFile(const char* name, bool open = true)
- : Base(name, dbcfg::pg_docuid, dbcfg::fbufsize, 0)
- , FileName(name)
- {
- if (open)
- Base::Open(name);
- }
-
- void Open() {
- Base::Open(~FileName);
- }
-
- void Push(const TRec* rec) {
- Base::Push(rec);
- }
-
- void Work() {
- Base::Close();
- }
-
-private:
- TString FileName;
-};
-
-template <class TRec>
-class TNextOutDatFile: public IDatNextToPush<TRec> {
-public:
- typedef IDatNextToPush<TRec> TBase;
-
- TNextOutDatFile(const char* name, IDatNextSource<TRec>& source, bool open = true)
- : TBase(source, File)
- , File(name, open)
- {
- }
-
- void Open() {
- File.Open();
- }
-
-private:
- TPushOutDatFile<TRec> File;
-};
-
-template <class TVal, template <typename T> class TCompare>
-class TNextDatSorterMemo: public TDatSorterMemo<TVal, TCompare>, public IDatNextChannel<TVal, TVal> {
- typedef TDatSorterMemo<TVal, TCompare> TImpl;
-
-public:
- TNextDatSorterMemo(IDatNextSource<TVal>& source, const char* dir = dbcfg::fname_temp, const char* name = "yet another sorter", size_t memory = dbcfg::small_sorter_size, size_t pagesize = dbcfg::pg_docuid, size_t pages = dbcfg::fbufsize, int pagesOrBytes = 0)
- : TImpl(name, memory, pagesize, pages, pagesOrBytes)
- , IDatNextChannel<TVal, TVal>(source)
- , Sorted(false)
- {
- TImpl::Open(dir);
- }
-
- void Sort() {
- const TVal* rec;
- while (rec = IDatNextChannel<TVal, TVal>::Source.Next()) {
- TImpl::Push(rec);
- }
- TImpl::Sort();
- Sorted = true;
- }
-
- const TVal* Next() {
- if (!Sorted)
- Sort();
- return TImpl::Next();
- }
-
-private:
- bool Sorted;
- TString Dir;
-};
-
-template <class TInRec, class TOutRec>
-class TDatConverter: public IDatNextChannel<TInRec, TOutRec> {
-public:
- TDatConverter(IDatNextSource<TInRec>& source)
- : IDatNextChannel<TInRec, TOutRec>(source)
- {
- }
-
- virtual void Convert(const TInRec& inrec, TOutRec& outrec) {
- outrec(inrec);
- }
-
- const TOutRec* Next() {
- const TInRec* rec = IDatNextChannel<TInRec, TOutRec>::Source.Next();
- if (!rec)
- return 0;
- Convert(*rec, CurrentRec);
- return &CurrentRec;
- }
-
-private:
- TOutRec CurrentRec;
-};
-
-template <class TRecA, class TRecB>
-class TMergeRec {
-public:
- const TRecA* RecA;
- const TRecB* RecB;
-};
-
-enum NMergeTypes {
- MT_JOIN = 0,
- MT_ADD = 1,
- MT_OVERWRITE = 2,
- MT_TYPENUM
-};
-
-template <class TRecA, class TRecB, template <typename TA, typename TB> class TCompare>
-class TNextDatMerger: public IDatNextReceiver<TRecA>, public IDatNextReceiver<TRecB>, public IDatNextSource<TMergeRec<TRecA, TRecB>> {
-public:
- TNextDatMerger(IDatNextSource<TRecA>& sourceA, IDatNextSource<TRecB>& sourceB, ui8 mergeType)
- : IDatNextReceiver<TRecA>(sourceA)
- , IDatNextReceiver<TRecB>(sourceB)
- , MergeType(mergeType)
- , MoveA(false)
- , MoveB(false)
- , NotInit(true)
- {
- }
-
- const TMergeRec<TRecA, TRecB>* Next() {
- if (MoveA || NotInit)
- SourceARec = IDatNextReceiver<TRecA>::Source.Next();
- if (MoveB || NotInit)
- SourceBRec = IDatNextReceiver<TRecB>::Source.Next();
- NotInit = false;
-
- // Cout << "Next " << SourceARec->HostId << "\t" << SourceBRec->HostId << "\t" << TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) << "\t" << ::compare(SourceARec->HostId, SourceBRec->HostId) << "\t" << ::compare(1, 2) << "\t" << ::compare(2,1) << Endl;
- if (MergeType == MT_ADD && SourceARec && (!SourceBRec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) {
- MergeRec.RecA = SourceARec;
- MergeRec.RecB = 0;
- MoveA = true;
- MoveB = false;
- return &MergeRec;
- }
-
- if (MergeType == MT_ADD && SourceBRec && (!SourceARec || TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0)) {
- MergeRec.RecA = 0;
- MergeRec.RecB = SourceBRec;
- MoveA = false;
- MoveB = true;
- return &MergeRec;
- }
-
- if (MergeType == MT_ADD && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) == 0) {
- MergeRec.RecA = SourceARec;
- MergeRec.RecB = SourceBRec;
- MoveA = true;
- MoveB = true;
- return &MergeRec;
- }
-
- while (MergeType == MT_JOIN && SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) != 0) {
- while (SourceARec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) < 0) {
- SourceARec = IDatNextReceiver<TRecA>::Source.Next();
- }
- while (SourceARec && SourceBRec && TCompare<TRecA, TRecB>()(SourceARec, SourceBRec) > 0) {
- SourceBRec = IDatNextReceiver<TRecB>::Source.Next();
- }
- }
-
- if (MergeType == MT_JOIN && SourceARec && SourceBRec) {
- MergeRec.RecA = SourceARec;
- MergeRec.RecB = SourceBRec;
- MoveA = true;
- MoveB = true;
- return &MergeRec;
- }
-
- MergeRec.RecA = 0;
- MergeRec.RecB = 0;
- return 0;
- }
-
- void Work() {
- IDatNextReceiver<TRecA>::Source.Work();
- IDatNextReceiver<TRecB>::Source.Work();
- }
-
-private:
- TMergeRec<TRecA, TRecB> MergeRec;
- const TRecA* SourceARec;
- const TRecB* SourceBRec;
- ui8 MergeType;
- bool MoveA;
- bool MoveB;
- bool NotInit;
-};
-
-/*template<class TRec, class TSource, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
-class TPushDatMerger {
-public:
- TPushDatMerger(TSource& source, TReceiver& receiver, ui8 mergeType)
- : Source(source)
- , Receiver(receiver)
- , MergeType(mergeType)
- {
- }
-
- virtual void Init() {
- SourceRec = Source.Next();
- }
-
- virtual void Push(const TRec* rec) {
- while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) < 0) {
- if (MergeType == MT_OVERWRITE || MergeType == MT_ADD)
- Receiver.Push(SourceRec);
- SourceRec = Source.Next();
- }
-
- bool intersected = false;
- while (SourceRec && TCompare<TRec>()(SourceRec, rec, 0) == 0) {
- intersected = true;
- if (MergeType == MT_ADD)
- Receiver.Push(SourceRec);
- SourceRec = Source.Next();
- }
-
- if (intersected && MergeType == MT_JOIN)
- Receiver.Push(rec);
-
- if (MergeType == MT_OVERWRITE || MergeType == MT_ADD)
- Receiver.Push(rec);
- }
-
- virtual void Term() {
- if (MergeType == MT_OVERWRITE || MergeType == MT_ADD) {
- while (SourceRec) {
- Receiver.Push(SourceRec);
- SourceRec = Source.Next();
- }
- }
- }
-
-private:
- TSource& Source;
- const TRec* SourceRec;
- TReceiver& Receiver;
- ui8 MergeType;
-};*/
-
-/*template <class TRec, class TSourceA, class TSourceB, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
-class TNextDatMerger: public TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> {
- typedef TPushDatMerger<TRec, TSourceA, TCompare, TReceiver> TImpl;
-public:
- TNextDatMerger(TSourceA& sourceA, TSourceB& sourceB, TReceiver& receiver, ui8 mergeType)
- : TImpl(sourceA, receiver, mergeType)
- , SourceB(sourceB)
- {
- }
-
- virtual void Work() {
- TImpl::Init();
- while (SourceBRec = SourceB.Next()) {
- TImpl::Push(SourceBRec);
- }
- TImpl::Term();
- }
-private:
- TSourceB& SourceB;
- const TRec* SourceBRec;
-};*/
-
-/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
-class TFilePushDatMerger: public TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> {
- typedef TPushDatMerger<TRec, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl;
-public:
- TFilePushDatMerger(const char* name, TReceiver& receiver, ui8 mergeType)
- : TImpl(SourceFile, receiver, mergeType)
- , SourceFile(name)
- {
- }
-
- virtual void Push(const TRec* rec) {
- TImpl::Push(rec);
- }
-
- virtual void Term() {
- TImpl::Term();
- }
-private:
- TFastInDatFile<TRec> SourceFile;
-};*/
-
-/*template <class TRec, template <typename T> class TCompare, class TReceiver = TPushOutDatFile<TRec> >
-class TFileNextDatMerger: public TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> {
- typedef TNextDatMerger<TRec, TFastInDatFile<TRec>, TFastInDatFile<TRec>, TCompare, TReceiver> TImpl;
-public:
- TFileNextDatMerger(const char* sourceAname, const char* sourceBname, TReceiver& receiver, ui8 mergeType)
- : TImpl(FileA, FileB, receiver, mergeType)
- , FileA(sourceAname)
- , FileB(sourceBname)
- {
- }
-
- virtual void Work() {
- TImpl::Work();
- }
-private:
- TFastInDatFile<TRec> FileA;
- TFastInDatFile<TRec> FileB;
-};*/
-
-template <class TRec, template <typename T> class TPredicate>
-class TDatNextFilter: public IDatNextChannel<TRec, TRec> {
-public:
- TDatNextFilter(IDatNextSource<TRec>& source)
- : IDatNextChannel<TRec, TRec>(source)
- {
- }
-
- virtual const TRec* Next() {
- const TRec* rec;
- while ((rec = IDatNextChannel<TRec, TRec>::Source.Next()) != 0 && !Check(rec)) {
- }
- if (!rec)
- return 0;
- return rec;
- }
-
-protected:
- virtual bool Check(const TRec* rec) {
- return TPredicate<TRec>()(rec);
- }
-};
-
-template <class TRec, template <typename T> class TPredicate>
-class TDatPushFilter: public IDatPushChannel<TRec, TRec> {
-public:
- TDatPushFilter(IDatPushReceiver<TRec>& receiver)
- : IDatPushChannel<TRec, TRec>(receiver)
- {
- }
-
- virtual void Push(const TRec* rec) {
- if (Check(rec))
- IDatPushChannel<TRec, TRec>::Receiver.Push(rec);
- }
-
-private:
- virtual bool Check(const TRec* rec) {
- return TPredicate<TRec>()(rec);
- }
-};
-
-template <class TInRec, class TOutRec, template <typename T> class TCompare>
-class TDatGrouper: public IDatNextChannel<TInRec, TOutRec> {
-public:
- TDatGrouper(IDatNextSource<TInRec>& source)
- : IDatNextChannel<TInRec, TOutRec>(source)
- , Begin(true)
- , Finish(false)
- , HasOutput(false)
- {
- }
-
- const TOutRec* Next() {
- while (CurrentRec = IDatNextChannel<TInRec, TOutRec>::Source.Next()) {
- int cmp = 0;
- if (Begin) {
- Begin = false;
- OnStart();
- } else if ((cmp = TCompare<TInRec>()(CurrentRec, LastRec, 0)) != 0) {
- OnFinish();
- OnStart();
- }
- OnRecord();
- LastRec = CurrentRec;
- if (HasOutput) {
- HasOutput = false;
- return &OutRec;
- }
- }
- if (!Finish)
- OnFinish();
- Finish = true;
- if (HasOutput) {
- HasOutput = false;
- return &OutRec;
- }
- return 0;
- }
-
-protected:
- virtual void OnStart() = 0;
- virtual void OnRecord() = 0;
- virtual void OnFinish() = 0;
-
- const TInRec* CurrentRec;
- const TInRec* LastRec;
- TOutRec OutRec;
-
- bool Begin;
- bool Finish;
- bool HasOutput;
-};
diff --git a/library/cpp/microbdb/ya.make b/library/cpp/microbdb/ya.make
deleted file mode 100644
index 3e553f8535b..00000000000
--- a/library/cpp/microbdb/ya.make
+++ /dev/null
@@ -1,36 +0,0 @@
-LIBRARY()
-
-SRCS(
- align.h
- compressed.h
- extinfo.h
- file.cpp
- hashes.h
- header.h
- header.cpp
- heap.h
- input.h
- microbdb.cpp
- noextinfo.proto
- output.h
- powersorter.h
- reader.h
- safeopen.h
- sorter.h
- sorterdef.h
- utility.h
- wrappers.h
-)
-
-PEERDIR(
- contrib/libs/fastlz
- contrib/libs/libc_compat
- contrib/libs/protobuf
- contrib/libs/snappy
- contrib/libs/zlib
- library/cpp/deprecated/fgood
- library/cpp/on_disk/st_hash
- library/cpp/packedtypes
-)
-
-END()