diff options
Diffstat (limited to 'library/cpp/streams/factory/open_by_signature')
5 files changed, 244 insertions, 0 deletions
diff --git a/library/cpp/streams/factory/open_by_signature/factory.cpp b/library/cpp/streams/factory/open_by_signature/factory.cpp new file mode 100644 index 0000000000..2c96015f42 --- /dev/null +++ b/library/cpp/streams/factory/open_by_signature/factory.cpp @@ -0,0 +1,99 @@ +#include "factory.h" + +#include <library/cpp/streams/bzip2/bzip2.h> +#include <library/cpp/streams/factory/open_common/factory.h> +#include <util/stream/holder.h> +#include <util/stream/file.h> +#include <library/cpp/streams/lz/lz.h> +#include <util/stream/str.h> +#include <util/stream/zlib.h> +#include <util/stream/multi.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> + +namespace { + template <class T> + struct TInputHolderX: public T { + inline decltype(T().Get()) Set(T t) noexcept { + t.Swap(*this); + + return this->Get(); + } + }; + + template <class T> + struct TInputHolderX<T*> { + static inline T* Set(T* t) noexcept { + return t; + } + }; + + template <class TInput> + struct TStringMultiInput: private TInputHolderX<TInput>, private TString, private THolder<IInputStream>, public TMultiInput { + TStringMultiInput(const TString& head, TInput tail) + : TString(head) + , THolder<IInputStream>(new TStringInput(*this)) + , TMultiInput(THolder<IInputStream>::Get(), this->Set(tail)) + { + } + + ~TStringMultiInput() override { + } + }; +} + +template <class TInput> +THolder<IInputStream> OpenMaybeCompressedInputX(TInput input) { + const size_t MAX_SIGNATURE_SIZE = 4; + char buffer[MAX_SIGNATURE_SIZE]; + TString header(buffer, input->Load(buffer, MAX_SIGNATURE_SIZE)); + + if (header.size() == MAX_SIGNATURE_SIZE) { + // any lz + THolder<IInputStream> lz = TryOpenOwnedLzDecompressor(new TStringMultiInput<TInput>(header, input)); + + if (lz.Get()) { + return lz; + } + } + + THolder<IInputStream> multi(new TStringMultiInput<TInput>(header, input)); + + // gzip + const TStringBuf GZIP = "\x1F\x8B"; + const TStringBuf ZLIB = "\x78\x9C"; + + if (header.StartsWith(GZIP) || header.StartsWith(ZLIB)) { + return MakeHolder<THoldingStream<TBufferedZLibDecompress>>(std::move(multi)); + } + + // bzip2 + constexpr TStringBuf BZIP2 = "BZ"; + if (header.StartsWith(BZIP2)) { + return MakeHolder<THoldingStream<TBZipDecompress>>(std::move(multi)); + } + + return multi; +} + +THolder<IInputStream> OpenMaybeCompressedInput(IInputStream* input) { + return OpenMaybeCompressedInputX(input); +} + +THolder<IInputStream> OpenOwnedMaybeCompressedInput(THolder<IInputStream> input) { + return OpenMaybeCompressedInputX(TAtomicSharedPtr<IInputStream>(input)); +} + +THolder<IInputStream> OpenMaybeCompressedInput(const TString& path) { + if (!path || path == TStringBuf("-")) { + return OpenOwnedMaybeCompressedInput(OpenStdin()); + } + return OpenOwnedMaybeCompressedInput(MakeHolder<TFileInput>(path)); +} + +THolder<IInputStream> OpenMaybeCompressedInput(const TString& path, ui32 bufSize) { + if (!path || path == TStringBuf("-")) { + return OpenOwnedMaybeCompressedInput(OpenStdin(bufSize)); + } + return OpenOwnedMaybeCompressedInput(MakeHolder<TFileInput>(path, bufSize)); +} diff --git a/library/cpp/streams/factory/open_by_signature/factory.h b/library/cpp/streams/factory/open_by_signature/factory.h new file mode 100644 index 0000000000..7f5288bda2 --- /dev/null +++ b/library/cpp/streams/factory/open_by_signature/factory.h @@ -0,0 +1,37 @@ +#pragma once + +#include <util/generic/fwd.h> +#include <util/generic/ptr.h> +#include <util/stream/fwd.h> + +/** + * Peeks into the provided input stream to determine its compression format, + * if any, and returns a corresponding decompressing stream. If the stream is + * not compressed, then returns a simple pass-through proxy stream. + * + * Note that returned stream doesn't own the provided input stream, thus it's + * up to the user to free them both. + * + * @param input Input stream. + * @returns Newly constructed stream. + */ +THolder<IInputStream> OpenMaybeCompressedInput(IInputStream* input); + +/** + * Same as `OpenMaybeCompressedInput`, but returned stream owns the one passed + * into this function. + * + * @param input Input stream. + * @returns Newly constructed stream. + * @see OpenMaybeCompressedInput(IInputStream*) + */ +THolder<IInputStream> OpenOwnedMaybeCompressedInput(THolder<IInputStream> input); + +/** + * @param input Input stream. + * @returns Newly constructed stream. + * @see OpenMaybeCompressedInput(IInputStream*) + */ +THolder<IInputStream> OpenMaybeCompressedInput(const TString& path); + +THolder<IInputStream> OpenMaybeCompressedInput(const TString& path, ui32 bufSize); diff --git a/library/cpp/streams/factory/open_by_signature/factory_ut.cpp b/library/cpp/streams/factory/open_by_signature/factory_ut.cpp new file mode 100644 index 0000000000..7c0d1b436a --- /dev/null +++ b/library/cpp/streams/factory/open_by_signature/factory_ut.cpp @@ -0,0 +1,84 @@ +#include "factory.h" + +#include <library/cpp/streams/lz/lz.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/buffer.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/stream/buffer.h> +#include <util/stream/file.h> +#include <util/stream/mem.h> +#include <util/stream/zlib.h> +#include <util/system/env.h> + +static const TString plain = "aaaaaaaaaaabbbbbbbbbbbdddddd22222222000000aldkfa9s3jsfkjlkja909090909090q3lkjalkjf3aldjl"; + +static const ui8 gz[] = {31, 139, 8, 8, 126, 193, 203, 80, 0, 3, 97, 46, 116, 120, 116, 0, 75, 76, 132, 131, 36, 4, 72, 1, 3, 35, 40, 48, 0, 131, 196, 156, 148, 236, 180, 68, 203, 98, 227, 172, 226, 180, 236, 172, 156, 236, 172, 68, 75, 3, 4, 44, 52, 6, 137, 0, 113, 154, 49, 80, 97, 86, 14, 0, 5, 203, 67, 131, 88, 0, 0, 0}; +static const auto gzLength = Y_ARRAY_SIZE(gz); + +static const ui8 bz2[] = {66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 140, 92, 215, 106, 0, 0, 17, 73, 128, 20, 128, 88, 32, 53, 28, 40, 0, 32, 0, 84, 66, 52, 211, 0, 6, 72, 122, 140, 131, 36, 97, 60, 92, 230, 1, 71, 91, 170, 135, 33, 135, 149, 133, 75, 174, 153, 146, 217, 24, 174, 177, 76, 246, 69, 254, 225, 195, 236, 95, 180, 93, 201, 20, 225, 66, 66, 49, 115, 93, 168}; +static const auto bz2Length = Y_ARRAY_SIZE(bz2); + +Y_UNIT_TEST_SUITE(TRecognizeCompressorTest) { + static void TestRawData(const void* data, size_t len, const TString& orig) { + TMemoryInput mem(data, len); + + THolder<IInputStream> input = OpenMaybeCompressedInput(&mem); + UNIT_ASSERT_VALUES_UNEQUAL(input.Get(), nullptr); + UNIT_ASSERT_VALUES_EQUAL(input->ReadAll(), orig); + } + + static void TestRawDataOwned(const void* data, size_t len, const TString& orig) { + THolder<IInputStream> input = OpenOwnedMaybeCompressedInput(MakeHolder<TMemoryInput>(data, len)); + UNIT_ASSERT_VALUES_UNEQUAL(input.Get(), nullptr); + UNIT_ASSERT_VALUES_EQUAL(input->ReadAll(), orig); + } + + static inline void TestSame(const TString& text) { + TestRawData(text.data(), text.size(), text); + TestRawDataOwned(text.data(), text.size(), text); + } + + Y_UNIT_TEST(TestPlain) { + TestSame(plain); + TestSame(""); + TestSame("a"); + TestSame("ab"); + TestSame("abc"); + TestSame("abcd"); + } + + Y_UNIT_TEST(TestGzip) { + TestRawData(gz, gzLength, plain); + TestRawDataOwned(gz, gzLength, plain); + } + + Y_UNIT_TEST(TestBzip2) { + TestRawData(bz2, bz2Length, plain); + TestRawDataOwned(bz2, bz2Length, plain); + } + + template <typename TCompress> + static void TestCompress() { + TBufferStream buf; + { + TCompress z(&buf); + z.Write(plain.data(), plain.size()); + } + TestRawData(buf.Buffer().Data(), buf.Buffer().Size(), plain); + } + + Y_UNIT_TEST(TestLz) { + TestCompress<TLz4Compress>(); + TestCompress<TSnappyCompress>(); + TestCompress<TLzoCompress>(); + TestCompress<TLzqCompress>(); + TestCompress<TLzfCompress>(); + } + + Y_UNIT_TEST(TestZlib) { + TestCompress<TZLibCompress>(); + } +} diff --git a/library/cpp/streams/factory/open_by_signature/ut/ya.make b/library/cpp/streams/factory/open_by_signature/ut/ya.make new file mode 100644 index 0000000000..53136880ca --- /dev/null +++ b/library/cpp/streams/factory/open_by_signature/ut/ya.make @@ -0,0 +1,7 @@ +UNITTEST_FOR(library/cpp/streams/factory/open_by_signature) + +SRCS( + factory_ut.cpp +) + +END() diff --git a/library/cpp/streams/factory/open_by_signature/ya.make b/library/cpp/streams/factory/open_by_signature/ya.make new file mode 100644 index 0000000000..ed72269488 --- /dev/null +++ b/library/cpp/streams/factory/open_by_signature/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +PEERDIR( + library/cpp/streams/bzip2 + library/cpp/streams/factory/open_common + library/cpp/streams/lz +) + +SRCS( + factory.cpp +) + +END() + +RECURSE_FOR_TESTS( + ut +) |