diff options
author | vvvv <vvvv@ydb.tech> | 2023-07-31 20:07:26 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-07-31 20:07:26 +0300 |
commit | f9e4743508b7930e884714cc99985ac45f84ed98 (patch) | |
tree | a1290261a4915a6f607e110e2cc27aee4c205f85 /library/cpp/microbdb/compressed.h | |
parent | 5cf9beeab3ea847da0b6c414fcb5faa9cb041317 (diff) | |
download | ydb-f9e4743508b7930e884714cc99985ac45f84ed98.tar.gz |
Use UDFs from YDB
Diffstat (limited to 'library/cpp/microbdb/compressed.h')
-rw-r--r-- | library/cpp/microbdb/compressed.h | 520 |
1 files changed, 0 insertions, 520 deletions
diff --git a/library/cpp/microbdb/compressed.h b/library/cpp/microbdb/compressed.h deleted file mode 100644 index f0c9edfa925..00000000000 --- a/library/cpp/microbdb/compressed.h +++ /dev/null @@ -1,520 +0,0 @@ -#pragma once - -#include <util/stream/zlib.h> - -#include "microbdb.h" -#include "safeopen.h" - -class TCompressedInputFileManip: public TInputFileManip { -public: - inline i64 GetLength() const { - return -1; // Some microbdb logic rely on unknown size of compressed files - } - - inline i64 Seek(i64 offset, int whence) { - i64 oldPos = DoGetPosition(); - i64 newPos = offset; - switch (whence) { - case SEEK_CUR: - newPos += oldPos; - [[fallthrough]]; // Complier happy. Please fix it! - case SEEK_SET: - break; - default: - return -1L; - } - if (oldPos > newPos) { - VerifyRandomAccess(); - DoSeek(0, SEEK_SET, IsStreamOpen()); - oldPos = 0; - } - const size_t bufsize = 1 << 12; - char buf[bufsize]; - for (i64 i = oldPos; i < newPos; i += bufsize) - InputStream->Read(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); - return newPos; - } - - i64 RealSeek(i64 offset, int whence) { - InputStream.Destroy(); - i64 ret = DoSeek(offset, whence, !!CompressedInput); - if (ret != -1) - DoStreamOpen(DoCreateStream(), true); - return ret; - } - -protected: - IInputStream* CreateStream(const TFile& file) override { - CompressedInput.Reset(new TUnbufferedFileInput(file)); - return DoCreateStream(); - } - inline IInputStream* DoCreateStream() { - return new TZLibDecompress(CompressedInput.Get(), ZLib::GZip); - //return new TLzqDecompress(CompressedInput.Get()); - } - THolder<IInputStream> CompressedInput; -}; - -class TCompressedBufferedInputFileManip: public TCompressedInputFileManip { -protected: - IInputStream* CreateStream(const TFile& file) override { - CompressedInput.Reset(new TFileInput(file, 0x100000)); - return DoCreateStream(); - } -}; - -using TCompressedInputPageFile = TInputPageFileImpl<TCompressedInputFileManip>; -using TCompressedBufferedInputPageFile = TInputPageFileImpl<TCompressedBufferedInputFileManip>; - -template <class TVal> -struct TGzKey { - ui64 Offset; - TVal Key; - - static const ui32 RecordSig = TVal::RecordSig + 0x50495a47; - - TGzKey() { - } - - TGzKey(ui64 offset, const TVal& key) - : Offset(offset) - , Key(key) - { - } - - size_t SizeOf() const { - if (this) - return sizeof(Offset) + ::SizeOf(&Key); - else { - size_t sizeOfKey = ::SizeOf((TVal*)NULL); - return sizeOfKey ? (sizeof(Offset) + sizeOfKey) : 0; - } - } -}; - -template <class TVal> -class TInZIndexFile: protected TInDatFileImpl<TGzKey<TVal>> { - typedef TInDatFileImpl<TGzKey<TVal>> TDatFile; - typedef TGzKey<TVal> TGzVal; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - -public: - TInZIndexFile() - : Index0(nullptr) - { - } - - int Open(const char* fname, size_t pages = 1, int pagesOrBytes = 1, ui32* gotRecordSig = nullptr) { - int ret = TDatFile::Open(fname, pages, pagesOrBytes, gotRecordSig); - if (ret) - return ret; - if (!(Index0 = (TDatPage*)malloc(TPageIter::GetPageSize()))) { - TDatFile::Close(); - return MBDB_NO_MEMORY; - } - if (SizeOf((TGzVal*)NULL)) - RecsOnPage = (TPageIter::GetPageSize() - sizeof(TDatPage)) / DatCeil(SizeOf((TGzVal*)NULL)); - TDatFile::Next(); - memcpy(Index0, TPageIter::Current(), TPageIter::GetPageSize()); - return 0; - } - - int Close() { - free(Index0); - Index0 = NULL; - return TDatFile::Close(); - } - - inline int GetError() const { - return TDatFile::GetError(); - } - - int FindKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { - assert(IsOpen()); - if (!SizeOf((TVal*)NULL)) - return FindVszKey(akey); - int pageno; - i64 offset; - FindKeyOnPage(pageno, offset, Index0, akey); - TDatPage* page = TPageIter::GotoPage(pageno + 1); - int num_add = (int)offset; - FindKeyOnPage(pageno, offset, page, akey); - return pageno + num_add; - } - - using TDatFile::IsOpen; - - int FindVszKey(const TVal* akey, const typename TExtInfoType<TVal>::TResult* = NULL) { - int pageno; - i64 offset; - FindVszKeyOnPage(pageno, offset, Index0, akey); - TDatPage* page = TPageIter::GotoPage(pageno + 1); - int num_add = (int)offset; - FindVszKeyOnPage(pageno, offset, page, akey); - return pageno + num_add; - } - - i64 FindPage(int pageno) { - if (!SizeOf((TVal*)NULL)) - return FindVszPage(pageno); - int recsize = DatCeil(SizeOf((TGzVal*)NULL)); - TDatPage* page = TPageIter::GotoPage(1 + pageno / RecsOnPage); - if (!page) // can happen if pageno is beyond EOF - return -1; - unsigned int localpageno = pageno % RecsOnPage; - if (localpageno >= page->RecNum) // can happen if pageno is beyond EOF - return -1; - TGzVal* v = (TGzVal*)((char*)page + sizeof(TDatPage) + localpageno * recsize); - return v->Offset; - } - - i64 FindVszPage(int pageno) { - TGzVal* cur = (TGzVal*)((char*)Index0 + sizeof(TDatPage)); - TGzVal* prev = cur; - unsigned int n = 0; - while (n < Index0->RecNum && cur->Offset <= (unsigned int)pageno) { - prev = cur; - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - n++; - } - TDatPage* page = TPageIter::GotoPage(n); - unsigned int num_add = (unsigned int)(prev->Offset); - n = 0; - cur = (TGzVal*)((char*)page + sizeof(TDatPage)); - while (n < page->RecNum && n + num_add < (unsigned int)pageno) { - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - n++; - } - if (n == page->RecNum) // can happen if pageno is beyond EOF - return -1; - return cur->Offset; - } - -protected: - void FindKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* Key) { - int left = 0; - int right = page->RecNum - 1; - int recsize = DatCeil(SizeOf((TGzVal*)NULL)); - while (left < right) { - int middle = (left + right) >> 1; - if (((TGzVal*)((char*)page + sizeof(TDatPage) + middle * recsize))->Key < *Key) - left = middle + 1; - else - right = middle; - } - //borders check (left and right) - pageno = (left == 0 || ((TGzVal*)((char*)page + sizeof(TDatPage) + left * recsize))->Key < *Key) ? left : left - 1; - offset = ((TGzVal*)((char*)page + sizeof(TDatPage) + pageno * recsize))->Offset; - } - - void FindVszKeyOnPage(int& pageno, i64& offset, TDatPage* page, const TVal* key) { - TGzVal* cur = (TGzVal*)((char*)page + sizeof(TDatPage)); - ui32 RecordSig = page->RecNum; - i64 tmpoffset = cur->Offset; - for (; RecordSig > 0 && cur->Key < *key; --RecordSig) { - tmpoffset = cur->Offset; - cur = (TGzVal*)((char*)cur + DatCeil(SizeOf(cur))); - } - int idx = page->RecNum - RecordSig - 1; - pageno = (idx >= 0) ? idx : 0; - offset = tmpoffset; - } - - TDatPage* Index0; - int RecsOnPage; -}; - -template <class TKey> -class TCompressedIndexedInputPageFile: public TCompressedInputPageFile { -public: - int GotoPage(int pageno); - -protected: - TInZIndexFile<TKey> KeyFile; -}; - -template <class TVal, class TKey> -class TDirectCompressedInDatFile: public TDirectInDatFile<TVal, TKey, - TInDatFileImpl<TVal, TInputRecordIterator<TVal, - TInputPageIterator<TCompressedIndexedInputPageFile<TKey>>>>> { -}; - -class TCompressedOutputFileManip: public TOutputFileManip { -public: - inline i64 GetLength() const { - return -1; // Some microbdb logic rely on unknown size of compressed files - } - - inline i64 Seek(i64 offset, int whence) { - i64 oldPos = DoGetPosition(); - i64 newPos = offset; - switch (whence) { - case SEEK_CUR: - newPos += oldPos; - [[fallthrough]]; // Compler happy. Please fix it! - case SEEK_SET: - break; - default: - return -1L; - } - if (oldPos > newPos) - return -1L; - - const size_t bufsize = 1 << 12; - char buf[bufsize] = {0}; - for (i64 i = oldPos; i < newPos; i += bufsize) - OutputStream->Write(buf, (i + (i64)bufsize < newPos) ? bufsize : (size_t)(newPos - i)); - return newPos; - } - - i64 RealSeek(i64 offset, int whence) { - OutputStream.Destroy(); - i64 ret = DoSeek(offset, whence, !!CompressedOutput); - if (ret != -1) - DoStreamOpen(DoCreateStream(), true); - return ret; - } - -protected: - IOutputStream* CreateStream(const TFile& file) override { - CompressedOutput.Reset(new TUnbufferedFileOutput(file)); - return DoCreateStream(); - } - inline IOutputStream* DoCreateStream() { - return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1); - } - THolder<IOutputStream> CompressedOutput; -}; - -class TCompressedBufferedOutputFileManip: public TCompressedOutputFileManip { -protected: - IOutputStream* CreateStream(const TFile& file) override { - CompressedOutput.Reset(new TUnbufferedFileOutput(file)); - return DoCreateStream(); - } - inline IOutputStream* DoCreateStream() { - return new TZLibCompress(CompressedOutput.Get(), ZLib::GZip, 1, 0x100000); - } -}; - -using TCompressedOutputPageFile = TOutputPageFileImpl<TCompressedOutputFileManip>; -using TCompressedBufferedOutputPageFile = TOutputPageFileImpl<TCompressedBufferedOutputFileManip>; - -template <class TVal> -class TOutZIndexFile: public TOutDatFileImpl< - TGzKey<TVal>, - TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> { - typedef TOutDatFileImpl< - TGzKey<TVal>, - TOutputRecordIterator<TGzKey<TVal>, TOutputPageIterator<TOutputPageFile>, TCallbackIndexer>> - TDatFile; - typedef TOutZIndexFile<TVal> TMyType; - typedef TGzKey<TVal> TGzVal; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - -public: - TOutZIndexFile() { - TotalRecNum = 0; - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, size_t pages, int pagesOrBytes = 1) { - int ret = TDatFile::Open(fname, pagesize, pages, pagesOrBytes); - if (ret) - return ret; - if ((ret = TRecIter::GotoPage(1))) - TDatFile::Close(); - return ret; - } - - int Close() { - TPageIter::Unfreeze(); - if (TRecIter::RecNum) - NextPage(TPageIter::Current()); - int ret = 0; - if (Index0.size() && !(ret = TRecIter::GotoPage(0))) { - typename std::vector<TGzVal>::iterator it, end = Index0.end(); - for (it = Index0.begin(); it != end; ++it) - TRecIter::Push(&*it); - ret = (TPageIter::GetPageNum() != 0) ? MBDB_PAGE_OVERFLOW : TPageIter::GetError(); - } - Index0.clear(); - int ret1 = TDatFile::Close(); - return ret ? ret : ret1; - } - -protected: - int TotalRecNum; // should be enough because we have GotoPage(int) - std::vector<TGzVal> Index0; - - void NextPage(const TDatPage* page) { - TGzVal* rec = (TGzVal*)((char*)page + sizeof(TDatPage)); - Index0.push_back(TGzVal(TotalRecNum, rec->Key)); - TotalRecNum += TRecIter::RecNum; - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; - -template <class TVal, class TKey, class TPageFile = TCompressedOutputPageFile> -class TOutDirectCompressedFileImpl: public TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> { - typedef TOutDatFileImpl< - TVal, - TOutputRecordIterator<TVal, TOutputPageIterator<TPageFile>, TCallbackIndexer>> - TDatFile; - typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TMyType; - typedef typename TDatFile::TRecIter TRecIter; - typedef typename TRecIter::TPageIter TPageIter; - typedef typename TRecIter::TIndexer TIndexer; - typedef TGzKey<TKey> TMyKey; - typedef TOutZIndexFile<TKey> TKeyFile; - -protected: - using TDatFile::Tell; - -public: - TOutDirectCompressedFileImpl() { - TIndexer::SetCallback(this, DispatchCallback); - } - - int Open(const char* fname, size_t pagesize, size_t ipagesize = 0) { - char iname[FILENAME_MAX]; - int ret; - if (ipagesize == 0) - ipagesize = pagesize; - - ret = TDatFile::Open(fname, pagesize, 1, 1); - ret = ret ? ret : DatNameToIdx(iname, fname); - ret = ret ? ret : KeyFile.Open(iname, ipagesize, 1, 1); - if (ret) - TDatFile::Close(); - return ret; - } - - int Close() { - if (TRecIter::RecNum) - NextPage(TPageIter::Current()); - int ret = KeyFile.Close(); - int ret1 = TDatFile::Close(); - return ret1 ? ret1 : ret; - } - - int GetError() const { - return TDatFile::GetError() ? TDatFile::GetError() : KeyFile.GetError(); - } - -protected: - TKeyFile KeyFile; - - void NextPage(const TDatPage* page) { - size_t sz = SizeOf((TMyKey*)NULL); - TMyKey* rec = KeyFile.Reserve(sz ? sz : MaxSizeOf<TMyKey>()); - if (rec) { - rec->Offset = Tell(); - rec->Key = *(TVal*)((char*)page + sizeof(TDatPage)); - KeyFile.ResetDat(); - } - } - - static void DispatchCallback(void* This, const TDatPage* page) { - ((TMyType*)This)->NextPage(page); - } -}; - -template <class TKey> -int TCompressedIndexedInputPageFile<TKey>::GotoPage(int pageno) { - if (Error) - return Error; - - Eof = 0; - - i64 offset = KeyFile.FindPage(pageno); - if (!offset) - return Error = MBDB_BAD_FILE_SIZE; - - if (offset != FileManip.RealSeek(offset, SEEK_SET)) - Error = MBDB_BAD_FILE_SIZE; - - return Error; -} - -template <typename TVal> -class TCompressedInDatFile: public TInDatFile<TVal, TCompressedInputPageFile> { -public: - TCompressedInDatFile(const char* name, size_t pages, int pagesOrBytes = 1) - : TInDatFile<TVal, TCompressedInputPageFile>(name, pages, pagesOrBytes) - { - } -}; - -template <typename TVal> -class TCompressedOutDatFile: public TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile> { -public: - TCompressedOutDatFile(const char* name, size_t pagesize, size_t pages, int pagesOrBytes = 1) - : TOutDatFile<TVal, TFakeCompression, TCompressedOutputPageFile>(name, pagesize, pages, pagesOrBytes) - { - } -}; - -template <typename TVal, typename TKey, typename TPageFile = TCompressedOutputPageFile> -class TOutDirectCompressedFile: protected TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> { - typedef TOutDirectCompressedFileImpl<TVal, TKey, TPageFile> TBase; - -public: - TOutDirectCompressedFile(const char* name, size_t pagesize, size_t ipagesize = 0) - : Name(strdup(name)) - , PageSize(pagesize) - , IdxPageSize(ipagesize) - { - } - - ~TOutDirectCompressedFile() { - Close(); - free(Name); - Name = NULL; - } - - void Open(const char* fname) { - int ret = TBase::Open(fname, PageSize, IdxPageSize); - if (ret) - ythrow yexception() << ErrorMessage(ret, "Failed to open output file", fname); - free(Name); - Name = strdup(fname); - } - - void Close() { - int ret; - if ((ret = TBase::GetError())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error before closing output file", Name); - if ((ret = TBase::Close())) - if (!std::uncaught_exception()) - ythrow yexception() << ErrorMessage(ret, "Error while closing output file", Name); - } - - const char* GetName() const { - return Name; - } - - using TBase::Freeze; - using TBase::Push; - using TBase::Reserve; - using TBase::Unfreeze; - -protected: - char* Name; - size_t PageSize, IdxPageSize; -}; - -class TCompressedInterFileTypes { -public: - typedef TCompressedBufferedOutputPageFile TOutPageFile; - typedef TCompressedBufferedInputPageFile TInPageFile; -}; |