aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/streams/factory/open_by_signature
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/streams/factory/open_by_signature')
-rw-r--r--library/cpp/streams/factory/open_by_signature/factory.cpp99
-rw-r--r--library/cpp/streams/factory/open_by_signature/factory.h37
-rw-r--r--library/cpp/streams/factory/open_by_signature/factory_ut.cpp84
-rw-r--r--library/cpp/streams/factory/open_by_signature/ut/ya.make7
-rw-r--r--library/cpp/streams/factory/open_by_signature/ya.make17
5 files changed, 244 insertions, 0 deletions
diff --git a/library/cpp/streams/factory/open_by_signature/factory.cpp b/library/cpp/streams/factory/open_by_signature/factory.cpp
new file mode 100644
index 0000000000..2c96015f42
--- /dev/null
+++ b/library/cpp/streams/factory/open_by_signature/factory.cpp
@@ -0,0 +1,99 @@
+#include "factory.h"
+
+#include <library/cpp/streams/bzip2/bzip2.h>
+#include <library/cpp/streams/factory/open_common/factory.h>
+#include <util/stream/holder.h>
+#include <util/stream/file.h>
+#include <library/cpp/streams/lz/lz.h>
+#include <util/stream/str.h>
+#include <util/stream/zlib.h>
+#include <util/stream/multi.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+namespace {
+ template <class T>
+ struct TInputHolderX: public T {
+ inline decltype(T().Get()) Set(T t) noexcept {
+ t.Swap(*this);
+
+ return this->Get();
+ }
+ };
+
+ template <class T>
+ struct TInputHolderX<T*> {
+ static inline T* Set(T* t) noexcept {
+ return t;
+ }
+ };
+
+ template <class TInput>
+ struct TStringMultiInput: private TInputHolderX<TInput>, private TString, private THolder<IInputStream>, public TMultiInput {
+ TStringMultiInput(const TString& head, TInput tail)
+ : TString(head)
+ , THolder<IInputStream>(new TStringInput(*this))
+ , TMultiInput(THolder<IInputStream>::Get(), this->Set(tail))
+ {
+ }
+
+ ~TStringMultiInput() override {
+ }
+ };
+}
+
+template <class TInput>
+THolder<IInputStream> OpenMaybeCompressedInputX(TInput input) {
+ const size_t MAX_SIGNATURE_SIZE = 4;
+ char buffer[MAX_SIGNATURE_SIZE];
+ TString header(buffer, input->Load(buffer, MAX_SIGNATURE_SIZE));
+
+ if (header.size() == MAX_SIGNATURE_SIZE) {
+ // any lz
+ THolder<IInputStream> lz = TryOpenOwnedLzDecompressor(new TStringMultiInput<TInput>(header, input));
+
+ if (lz.Get()) {
+ return lz;
+ }
+ }
+
+ THolder<IInputStream> multi(new TStringMultiInput<TInput>(header, input));
+
+ // gzip
+ const TStringBuf GZIP = "\x1F\x8B";
+ const TStringBuf ZLIB = "\x78\x9C";
+
+ if (header.StartsWith(GZIP) || header.StartsWith(ZLIB)) {
+ return MakeHolder<THoldingStream<TBufferedZLibDecompress>>(std::move(multi));
+ }
+
+ // bzip2
+ constexpr TStringBuf BZIP2 = "BZ";
+ if (header.StartsWith(BZIP2)) {
+ return MakeHolder<THoldingStream<TBZipDecompress>>(std::move(multi));
+ }
+
+ return multi;
+}
+
+THolder<IInputStream> OpenMaybeCompressedInput(IInputStream* input) {
+ return OpenMaybeCompressedInputX(input);
+}
+
+THolder<IInputStream> OpenOwnedMaybeCompressedInput(THolder<IInputStream> input) {
+ return OpenMaybeCompressedInputX(TAtomicSharedPtr<IInputStream>(input));
+}
+
+THolder<IInputStream> OpenMaybeCompressedInput(const TString& path) {
+ if (!path || path == TStringBuf("-")) {
+ return OpenOwnedMaybeCompressedInput(OpenStdin());
+ }
+ return OpenOwnedMaybeCompressedInput(MakeHolder<TFileInput>(path));
+}
+
+THolder<IInputStream> OpenMaybeCompressedInput(const TString& path, ui32 bufSize) {
+ if (!path || path == TStringBuf("-")) {
+ return OpenOwnedMaybeCompressedInput(OpenStdin(bufSize));
+ }
+ return OpenOwnedMaybeCompressedInput(MakeHolder<TFileInput>(path, bufSize));
+}
diff --git a/library/cpp/streams/factory/open_by_signature/factory.h b/library/cpp/streams/factory/open_by_signature/factory.h
new file mode 100644
index 0000000000..7f5288bda2
--- /dev/null
+++ b/library/cpp/streams/factory/open_by_signature/factory.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <util/generic/ptr.h>
+#include <util/stream/fwd.h>
+
+/**
+ * Peeks into the provided input stream to determine its compression format,
+ * if any, and returns a corresponding decompressing stream. If the stream is
+ * not compressed, then returns a simple pass-through proxy stream.
+ *
+ * Note that returned stream doesn't own the provided input stream, thus it's
+ * up to the user to free them both.
+ *
+ * @param input Input stream.
+ * @returns Newly constructed stream.
+ */
+THolder<IInputStream> OpenMaybeCompressedInput(IInputStream* input);
+
+/**
+ * Same as `OpenMaybeCompressedInput`, but returned stream owns the one passed
+ * into this function.
+ *
+ * @param input Input stream.
+ * @returns Newly constructed stream.
+ * @see OpenMaybeCompressedInput(IInputStream*)
+ */
+THolder<IInputStream> OpenOwnedMaybeCompressedInput(THolder<IInputStream> input);
+
+/**
+ * @param input Input stream.
+ * @returns Newly constructed stream.
+ * @see OpenMaybeCompressedInput(IInputStream*)
+ */
+THolder<IInputStream> OpenMaybeCompressedInput(const TString& path);
+
+THolder<IInputStream> OpenMaybeCompressedInput(const TString& path, ui32 bufSize);
diff --git a/library/cpp/streams/factory/open_by_signature/factory_ut.cpp b/library/cpp/streams/factory/open_by_signature/factory_ut.cpp
new file mode 100644
index 0000000000..7c0d1b436a
--- /dev/null
+++ b/library/cpp/streams/factory/open_by_signature/factory_ut.cpp
@@ -0,0 +1,84 @@
+#include "factory.h"
+
+#include <library/cpp/streams/lz/lz.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/buffer.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/stream/buffer.h>
+#include <util/stream/file.h>
+#include <util/stream/mem.h>
+#include <util/stream/zlib.h>
+#include <util/system/env.h>
+
+static const TString plain = "aaaaaaaaaaabbbbbbbbbbbdddddd22222222000000aldkfa9s3jsfkjlkja909090909090q3lkjalkjf3aldjl";
+
+static const ui8 gz[] = {31, 139, 8, 8, 126, 193, 203, 80, 0, 3, 97, 46, 116, 120, 116, 0, 75, 76, 132, 131, 36, 4, 72, 1, 3, 35, 40, 48, 0, 131, 196, 156, 148, 236, 180, 68, 203, 98, 227, 172, 226, 180, 236, 172, 156, 236, 172, 68, 75, 3, 4, 44, 52, 6, 137, 0, 113, 154, 49, 80, 97, 86, 14, 0, 5, 203, 67, 131, 88, 0, 0, 0};
+static const auto gzLength = Y_ARRAY_SIZE(gz);
+
+static const ui8 bz2[] = {66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 140, 92, 215, 106, 0, 0, 17, 73, 128, 20, 128, 88, 32, 53, 28, 40, 0, 32, 0, 84, 66, 52, 211, 0, 6, 72, 122, 140, 131, 36, 97, 60, 92, 230, 1, 71, 91, 170, 135, 33, 135, 149, 133, 75, 174, 153, 146, 217, 24, 174, 177, 76, 246, 69, 254, 225, 195, 236, 95, 180, 93, 201, 20, 225, 66, 66, 49, 115, 93, 168};
+static const auto bz2Length = Y_ARRAY_SIZE(bz2);
+
+Y_UNIT_TEST_SUITE(TRecognizeCompressorTest) {
+ static void TestRawData(const void* data, size_t len, const TString& orig) {
+ TMemoryInput mem(data, len);
+
+ THolder<IInputStream> input = OpenMaybeCompressedInput(&mem);
+ UNIT_ASSERT_VALUES_UNEQUAL(input.Get(), nullptr);
+ UNIT_ASSERT_VALUES_EQUAL(input->ReadAll(), orig);
+ }
+
+ static void TestRawDataOwned(const void* data, size_t len, const TString& orig) {
+ THolder<IInputStream> input = OpenOwnedMaybeCompressedInput(MakeHolder<TMemoryInput>(data, len));
+ UNIT_ASSERT_VALUES_UNEQUAL(input.Get(), nullptr);
+ UNIT_ASSERT_VALUES_EQUAL(input->ReadAll(), orig);
+ }
+
+ static inline void TestSame(const TString& text) {
+ TestRawData(text.data(), text.size(), text);
+ TestRawDataOwned(text.data(), text.size(), text);
+ }
+
+ Y_UNIT_TEST(TestPlain) {
+ TestSame(plain);
+ TestSame("");
+ TestSame("a");
+ TestSame("ab");
+ TestSame("abc");
+ TestSame("abcd");
+ }
+
+ Y_UNIT_TEST(TestGzip) {
+ TestRawData(gz, gzLength, plain);
+ TestRawDataOwned(gz, gzLength, plain);
+ }
+
+ Y_UNIT_TEST(TestBzip2) {
+ TestRawData(bz2, bz2Length, plain);
+ TestRawDataOwned(bz2, bz2Length, plain);
+ }
+
+ template <typename TCompress>
+ static void TestCompress() {
+ TBufferStream buf;
+ {
+ TCompress z(&buf);
+ z.Write(plain.data(), plain.size());
+ }
+ TestRawData(buf.Buffer().Data(), buf.Buffer().Size(), plain);
+ }
+
+ Y_UNIT_TEST(TestLz) {
+ TestCompress<TLz4Compress>();
+ TestCompress<TSnappyCompress>();
+ TestCompress<TLzoCompress>();
+ TestCompress<TLzqCompress>();
+ TestCompress<TLzfCompress>();
+ }
+
+ Y_UNIT_TEST(TestZlib) {
+ TestCompress<TZLibCompress>();
+ }
+}
diff --git a/library/cpp/streams/factory/open_by_signature/ut/ya.make b/library/cpp/streams/factory/open_by_signature/ut/ya.make
new file mode 100644
index 0000000000..53136880ca
--- /dev/null
+++ b/library/cpp/streams/factory/open_by_signature/ut/ya.make
@@ -0,0 +1,7 @@
+UNITTEST_FOR(library/cpp/streams/factory/open_by_signature)
+
+SRCS(
+ factory_ut.cpp
+)
+
+END()
diff --git a/library/cpp/streams/factory/open_by_signature/ya.make b/library/cpp/streams/factory/open_by_signature/ya.make
new file mode 100644
index 0000000000..ed72269488
--- /dev/null
+++ b/library/cpp/streams/factory/open_by_signature/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+PEERDIR(
+ library/cpp/streams/bzip2
+ library/cpp/streams/factory/open_common
+ library/cpp/streams/lz
+)
+
+SRCS(
+ factory.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)