diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/yamr_table_reader.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/yamr_table_reader.cpp')
-rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_reader.cpp | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp new file mode 100644 index 0000000000..6204738e10 --- /dev/null +++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp @@ -0,0 +1,145 @@ +#include "yamr_table_reader.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +//////////////////////////////////////////////////////////////////// + +static void CheckedSkip(IInputStream* input, size_t byteCount) +{ + size_t skipped = input->Skip(byteCount); + Y_ENSURE(skipped == byteCount, "Premature end of YaMR stream"); +} + +//////////////////////////////////////////////////////////////////// + +namespace NYT { + +using namespace NYT::NDetail::NRawClient; + +//////////////////////////////////////////////////////////////////////////////// + +TYaMRTableReader::TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input) + : TLenvalTableReader(std::move(input)) +{ } + +TYaMRTableReader::~TYaMRTableReader() +{ } + +const TYaMRRow& TYaMRTableReader::GetRow() const +{ + CheckValidity(); + if (!RowTaken_) { + const_cast<TYaMRTableReader*>(this)->ReadRow(); + } + return Row_; +} + +bool TYaMRTableReader::IsValid() const +{ + return Valid_; +} + +void TYaMRTableReader::Next() +{ + TLenvalTableReader::Next(); +} + +void TYaMRTableReader::NextKey() +{ + TLenvalTableReader::NextKey(); +} + +ui32 TYaMRTableReader::GetTableIndex() const +{ + return TLenvalTableReader::GetTableIndex(); +} + +ui32 TYaMRTableReader::GetRangeIndex() const +{ + return TLenvalTableReader::GetRangeIndex(); +} + +ui64 TYaMRTableReader::GetRowIndex() const +{ + return TLenvalTableReader::GetRowIndex(); +} + +TMaybe<size_t> TYaMRTableReader::GetReadByteCount() const +{ + return TLenvalTableReader::GetReadByteCount(); +} + +bool TYaMRTableReader::IsEndOfStream() const +{ + return TLenvalTableReader::IsEndOfStream(); +} + +bool TYaMRTableReader::IsRawReaderExhausted() const +{ + return TLenvalTableReader::IsRawReaderExhausted(); +} + +void TYaMRTableReader::ReadField(TString* result, i32 length) +{ + result->resize(length); + size_t count = Input_.Load(result->begin(), length); + Y_ENSURE(count == static_cast<size_t>(length), "Premature end of YaMR stream"); +} + +void TYaMRTableReader::ReadRow() +{ + while (true) { + try { + i32 value = static_cast<i32>(Length_); + ReadField(&Key_, value); + Row_.Key = Key_; + + ReadInteger(&value); + ReadField(&SubKey_, value); + Row_.SubKey = SubKey_; + + ReadInteger(&value); + ReadField(&Value_, value); + Row_.Value = Value_; + + RowTaken_ = true; + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +void TYaMRTableReader::SkipRow() +{ + while (true) { + try { + i32 value = static_cast<i32>(Length_); + CheckedSkip(&Input_, value); + + ReadInteger(&value); + CheckedSkip(&Input_, value); + + ReadInteger(&value); + CheckedSkip(&Input_, value); + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |