aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/io.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/interface/io.h
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/interface/io.h')
-rw-r--r--yt/cpp/mapreduce/interface/io.h586
1 files changed, 586 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h
new file mode 100644
index 0000000000..e2b20a1802
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/io.h
@@ -0,0 +1,586 @@
+#pragma once
+
+///
+/// @file yt/cpp/mapreduce/interface/io.h
+///
+/// Header containing client interface for reading and writing tables and files.
+
+
+#include "fwd.h"
+
+#include "client_method_options.h"
+#include "common.h"
+#include "format.h"
+#include "node.h"
+#include "mpl.h"
+#include "skiff_row.h"
+
+#include <google/protobuf/message.h>
+
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+#include <util/generic/yexception.h>
+#include <util/generic/maybe.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief "Marker" type to use for several protobuf types in @ref NYT::TTableReader.
+///
+/// @tparam Ts Possible types of rows to be read.
+template<class... TProtoRowTypes>
+class TProtoOneOf
+{
+public:
+ static_assert(
+ (TIsBaseOf<::google::protobuf::Message, TProtoRowTypes>::Value && ...),
+ "Template parameters can only be protobuf types");
+
+ TProtoOneOf() = delete;
+};
+
+///
+/// @brief "Marker" type to use for several skiff row types in @ref NYT::TTableReader.
+///
+/// @tparam Ts Possible types of rows to be read.
+template<class... TSkiffRowTypes>
+class TSkiffRowOneOf
+{
+public:
+ static_assert(
+ (TIsSkiffRow<TSkiffRowTypes>::value && ...),
+ "Template parameters can only be SkiffRow types");
+
+ TSkiffRowOneOf() = delete;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @cond Doxygen_Suppress
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TTuple>
+struct TProtoOneOfFromTuple;
+
+template <class... Ts>
+struct TProtoOneOfFromTuple<std::tuple<Ts...>>
+{
+ using TType = TProtoOneOf<Ts...>;
+};
+
+template <class... Ts>
+struct TProtoOneOfUnique
+{
+ using TTuple = typename TUniqueTypes<std::tuple<>, std::tuple<Ts...>>::TType;
+ using TType = typename TProtoOneOfFromTuple<TTuple>::TType;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+/// @endcond
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct INodeReaderImpl;
+struct IYaMRReaderImpl;
+struct IProtoReaderImpl;
+struct ISkiffRowReaderImpl;
+struct INodeWriterImpl;
+struct IYaMRWriterImpl;
+struct IProtoWriterImpl;
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Class of exceptions connected to reading or writing tables or files.
+class TIOException
+ : public yexception
+{ };
+
+///////////////////////////////////////////////////////////////////////////////
+
+/// Interface representing YT file reader.
+class IFileReader
+ : public TThrRefBase
+ , public IInputStream
+{ };
+
+/// Interface representing YT file writer.
+class IFileWriter
+ : public TThrRefBase
+ , public IOutputStream
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Low-level interface to read YT table with retries.
+class TRawTableReader
+ : public TThrRefBase
+ , public IInputStream
+{
+public:
+ /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
+ ///
+ /// @param rangeIndex Index of first range to read
+ /// @param rowIndex Index of first row to read; if `rowIndex == Nothing` entire request will be retried.
+ ///
+ /// @return `true` on successful request retry, `false` if no retry attempts are left (then `Retry()` shouldn't be called any more).
+ ///
+ /// `rowIndex` must be inside the range with index `rangeIndex` if the latter is specified.
+ ///
+ /// After successful retry the user should reset `rangeIndex` / `rowIndex` values and read new ones
+ /// from the stream.
+ virtual bool Retry(
+ const TMaybe<ui32>& rangeIndex,
+ const TMaybe<ui64>& rowIndex) = 0;
+
+ /// Resets retry attempt count to the initial value (then `Retry()` can be called again).
+ virtual void ResetRetries() = 0;
+
+ /// @brief May the input stream contain table ranges?
+ ///
+ /// In the case when it is `true` the `TRawTableReader` user is responsible
+ /// to track active range index in order to pass it to Retry().
+ virtual bool HasRangeIndices() const = 0;
+};
+
+/// @brief Low-level interface to write YT table.
+///
+/// Retries must be handled by implementation.
+class TRawTableWriter
+ : public TThrRefBase
+ , public IOutputStream
+{
+public:
+ /// @brief Call this method after complete row representation is written to the stream.
+ ///
+ /// When this method is called `TRowTableWriter` can check its buffer
+ /// and if it is full send data to YT.
+ /// @note `TRawTableWriter` never sends partial records to YT (due to retries).
+ virtual void NotifyRowEnd() = 0;
+
+ /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
+ ///
+ /// By default it does nothing, but implementations are welcome to override this method.
+ virtual void Abort()
+ { }
+};
+
+/// @brief Interface to deal with multiple raw output streams.
+class IProxyOutput
+{
+public:
+ virtual ~IProxyOutput()
+ { }
+
+ /// Get amount of managed streams.
+ virtual size_t GetStreamCount() const = 0;
+
+ /// Get stream corresponding to the specified table index.
+ virtual IOutputStream* GetStream(size_t tableIndex) const = 0;
+
+ /// This handler must be called right after the next row has been written.
+ virtual void OnRowFinished(size_t tableIndex) = 0;
+
+ /// @brief Try to abort writing process as soon as possible (makes sense for multi-threaded writers).
+ ///
+ /// By default it does nothing, but implementations are welcome to override this method.
+ virtual void Abort()
+ { }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Class template to read typed rows from YT tables.
+///
+/// @tparam T Row type.
+///
+/// Correct usage of this class usually looks like
+/// ```
+/// for (const auto& cursor : *reader) {
+/// const auto& row = cursor.GetRow();
+/// ...
+/// }
+/// ```
+/// or, more verbosely,
+/// ```
+/// for (; reader->IsValid(); reader->Next()) {
+/// const auto& row = reader->GetRow();
+/// ...
+/// }
+/// ```
+///
+/// @note Actual (partial) specializations of this template may look a bit different,
+/// e.g. @ref NYT::TTableReader::GetRow, @ref NYT::TTableReader::MoveRow may be method templates.
+template <class T, class>
+class TTableReader
+ : public TThrRefBase
+{
+public:
+ /// Get current row.
+ const T& GetRow() const;
+
+ /// Extract current row; further calls to `GetRow` and `MoveRow` will fail.
+ T MoveRow();
+
+ /// Extract current row to `result`; further calls to `GetRow` and `MoveRow` will fail.
+ void MoveRow(T* result);
+
+ /// Check whether all the rows were read.
+ bool IsValid() const;
+
+ /// Move the cursor to the next row.
+ void Next();
+
+ /// Get table index of the current row.
+ ui32 GetTableIndex() const;
+
+ /// Get range index of the current row (zero if it is unknown or read request contains no ranges)
+ ui32 GetRangeIndex() const;
+
+ /// Get current row index (zero if it unknown).
+ ui64 GetRowIndex() const;
+
+ /// Get current tablet index (for ordered dynamic tables).
+ i64 GetTabletIndex() const;
+
+ /// Returns `true` if job consumed all the input and `false` otherwise.
+ bool IsEndOfStream() const;
+
+ /// Returns `true` if job raw input stream was closed and `false` otherwise.
+ bool IsRawReaderExhausted() const;
+};
+
+/// @brief Iterator for use in range-based-for.
+///
+/// @note Idiomatic usage:
+/// ```
+/// for (const auto& cursor : *reader) {
+/// const auto& row = cursor.GetRow();
+/// ...
+/// }
+/// ```
+template <class T>
+class TTableReaderIterator
+{
+public:
+ /// Construct iterator from table reader (can be `nullptr`).
+ explicit TTableReaderIterator<T>(TTableReader<T>* reader)
+ {
+ if (reader && reader->IsValid()) {
+ Reader_ = reader;
+ } else {
+ Reader_ = nullptr;
+ }
+ }
+
+ /// Equality operator.
+ bool operator==(const TTableReaderIterator& it) const
+ {
+ return Reader_ == it.Reader_;
+ }
+
+ /// Inequality operator.
+ bool operator!=(const TTableReaderIterator& it) const
+ {
+ return Reader_ != it.Reader_;
+ }
+
+ /// Dereference operator.
+ TTableReader<T>& operator*()
+ {
+ return *Reader_;
+ }
+
+ /// Const dereference operator.
+ const TTableReader<T>& operator*() const
+ {
+ return *Reader_;
+ }
+
+ /// Preincrement operator.
+ TTableReaderIterator& operator++()
+ {
+ Reader_->Next();
+ if (!Reader_->IsValid()) {
+ Reader_ = nullptr;
+ }
+ return *this;
+ }
+
+private:
+ TTableReader<T>* Reader_;
+};
+
+/// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
+///
+/// @see @ref NYT::TTableReaderIterator
+template <class T>
+TTableReaderIterator<T> begin(TTableReader<T>& reader)
+{
+ return TTableReaderIterator<T>(&reader);
+}
+
+/// @brief Function to facilitate range-based-for for @ref NYT::TTableReader.
+///
+/// @see @ref NYT::TTableReaderIterator
+template <class T>
+TTableReaderIterator<T> end(TTableReader<T>&)
+{
+ return TTableReaderIterator<T>(nullptr);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Class to facilitate reading table rows sorted by key.
+///
+/// Each reader returned from @ref NYT::TTableRangesReader::GetRange represents
+/// a range of rows with the same key.
+///
+/// @note Idiomatic usage:
+/// ```
+/// for (; reader->IsValid(); reader->Next()) {
+/// auto& rangeReader = reader->GetRange();
+/// ...
+/// }
+/// ```
+template <class T, class>
+class TTableRangesReader
+ : public TThrRefBase
+{
+public:
+ /// Get reader for rows with the same key.
+ TTableReader<T>& GetRange();
+
+ /// Check whether all rows are read.
+ bool IsValid() const;
+
+ /// Move cursor to the next range.
+ void Next();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Class template to write typed rows to YT tables.
+template <class T, class>
+class TTableWriter
+ : public TThrRefBase
+{
+public:
+ /// @brief Submit a row for writing.
+ ///
+ /// The row may (and very probably will) *not* be written immediately.
+ void AddRow(const T& row);
+
+ /// Stop writing data as soon as possible (without flushing data, e.g. before aborting parent transaction).
+ void Finish();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Type representing YaMR table row.
+///
+/// @deprecated
+struct TYaMRRow
+{
+ /// Key column.
+ TStringBuf Key;
+
+ /// Subkey column.
+ TStringBuf SubKey;
+
+ /// Value column.
+ TStringBuf Value;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Interface for creating table and file readers and writer.
+class IIOClient
+{
+public:
+ virtual ~IIOClient() = default;
+
+ /// Create a reader for file at `path`.
+ virtual IFileReaderPtr CreateFileReader(
+ const TRichYPath& path,
+ const TFileReaderOptions& options = TFileReaderOptions()) = 0;
+
+ /// Create a writer for file at `path`.
+ virtual IFileWriterPtr CreateFileWriter(
+ const TRichYPath& path,
+ const TFileWriterOptions& options = TFileWriterOptions()) = 0;
+
+ /// Create a typed reader for table at `path`.
+ template <class T>
+ TTableReaderPtr<T> CreateTableReader(
+ const TRichYPath& path,
+ const TTableReaderOptions& options = TTableReaderOptions());
+
+ /// Create a typed writer for table at `path`.
+ template <class T>
+ TTableWriterPtr<T> CreateTableWriter(
+ const TRichYPath& path,
+ const TTableWriterOptions& options = TTableWriterOptions());
+
+ /// Create a writer to write protobuf messages with specified descriptor.
+ virtual TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
+ const TRichYPath& path,
+ const ::google::protobuf::Descriptor& descriptor,
+ const TTableWriterOptions& options = TTableWriterOptions()) = 0;
+
+ /// Create a reader to read a table using specified format.
+ virtual TRawTableReaderPtr CreateRawReader(
+ const TRichYPath& path,
+ const TFormat& format,
+ const TTableReaderOptions& options = TTableReaderOptions()) = 0;
+
+ /// Create a reader to write a table using specified format.
+ virtual TRawTableWriterPtr CreateRawWriter(
+ const TRichYPath& path,
+ const TFormat& format,
+ const TTableWriterOptions& options = TTableWriterOptions()) = 0;
+
+ ///
+ /// @brief Create a reader for [blob table](https://docs.yandex-team.ru/docs/yt/description/storage/blobtables) at `path`.
+ ///
+ /// @param path Blob table path.
+ /// @param blobId Key identifying the blob.
+ /// @param options Optional parameters
+ ///
+ /// Blob table is a table that stores a number of blobs.
+ /// Blobs are sliced into parts of the same size (maybe except of last part).
+ /// Those parts are stored in the separate rows.
+ ///
+ /// Blob table have constraints on its schema.
+ /// - There must be columns that identify blob (blob id columns). That columns might be of any type.
+ /// - There must be a column of `int64` type that identify part inside the blob (this column is called `part index`).
+ /// - There must be a column of `string` type that stores actual data (this column is called `data column`).
+ virtual IFileReaderPtr CreateBlobTableReader(
+ const TYPath& path,
+ const TKey& blobId,
+ const TBlobTableReaderOptions& options = TBlobTableReaderOptions()) = 0;
+
+private:
+ virtual ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
+ const TRichYPath& path, const TTableReaderOptions& options) = 0;
+
+ virtual ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
+ const TRichYPath& path, const TTableReaderOptions& options) = 0;
+
+ virtual ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ const TRichYPath& path,
+ const TTableReaderOptions& options,
+ const ::google::protobuf::Message* prototype) = 0;
+
+ virtual ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
+ const TRichYPath& path,
+ const TTableReaderOptions& options,
+ const ISkiffRowSkipperPtr& skipper,
+ const NSkiff::TSkiffSchemaPtr& schema) = 0;
+
+ virtual ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
+ const TRichYPath& path, const TTableWriterOptions& options) = 0;
+
+ virtual ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
+ const TRichYPath& path, const TTableWriterOptions& options) = 0;
+
+ virtual ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
+ const TRichYPath& path,
+ const TTableWriterOptions& options,
+ const ::google::protobuf::Message* prototype) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
+/// @brief Create a protobuf table reader from a stream.
+///
+/// @tparam T Protobuf message type to read (must be inherited from `Message`).
+///
+/// @param stream Input stream in YT protobuf format.
+template <typename T>
+TTableReaderPtr<T> CreateTableReader(
+ IInputStream* stream,
+ const TTableReaderOptions& options = {});
+
+///
+/// @brief Create a protobuf multi table reader from a stream.
+///
+/// @tparam Ts Protobuf message types to read (must be inherited from `Message`).
+///
+/// @param stream Input stream in YT protobuf format.
+template <class... Ts>
+TTableReaderPtr<typename NDetail::TProtoOneOfUnique<Ts...>::TType> CreateProtoMultiTableReader(
+ IInputStream* stream,
+ const TTableReaderOptions& options = {});
+
+///
+/// @brief Create a homogenous protobuf multi table reader from a stream.
+///
+/// @tparam T Protobuf message type to read (must be inherited from `Message`).
+///
+/// @param stream Input stream in YT protobuf format.
+/// @param tableCount Number of tables in input stream.
+template <class T>
+TTableReaderPtr<T> CreateProtoMultiTableReader(
+ IInputStream* stream,
+ int tableCount,
+ const TTableReaderOptions& options = {});
+
+/// Create a @ref NYT::TNode table reader from a stream.
+template <>
+TTableReaderPtr<TNode> CreateTableReader<TNode>(
+ IInputStream* stream, const TTableReaderOptions& options);
+
+/// Create a @ref NYT::TYaMRRow table reader from a stream.
+template <>
+TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
+ IInputStream* stream, const TTableReaderOptions& options);
+
+namespace NDetail {
+
+/// Create a protobuf table reader from a stream.
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& options,
+ const ::google::protobuf::Descriptor* descriptor);
+
+
+/// Create a protobuf table reader from a stream that can contain table switches.
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& options,
+ TVector<const ::google::protobuf::Descriptor*> descriptors);
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// Convert generic protobuf table reader to a concrete one (for certain type `T`).
+template <typename T>
+TTableReaderPtr<T> CreateConcreteProtobufReader(TTableReader<Message>* reader);
+
+/// Convert generic protobuf table reader to a concrete one (for certain type `T`).
+template <typename T>
+TTableReaderPtr<T> CreateConcreteProtobufReader(const TTableReaderPtr<Message>& reader);
+
+/// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
+template <typename T>
+TTableReaderPtr<Message> CreateGenericProtobufReader(TTableReader<T>* reader);
+
+/// Convert a concrete (for certain type `T`) protobuf table reader to a generic one.
+template <typename T>
+TTableReaderPtr<Message> CreateGenericProtobufReader(const TTableReaderPtr<T>& reader);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define IO_INL_H_
+#include "io-inl.h"
+#undef IO_INL_H_