diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/lenval_table_reader.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/lenval_table_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.cpp | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp new file mode 100644 index 00000000000..98274c79960 --- /dev/null +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -0,0 +1,198 @@ +#include "lenval_table_reader.h" + +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <util/string/printf.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +const i32 CONTROL_ATTR_TABLE_INDEX = -1; +const i32 CONTROL_ATTR_KEY_SWITCH = -2; +const i32 CONTROL_ATTR_RANGE_INDEX = -3; +const i32 CONTROL_ATTR_ROW_INDEX = -4; +const i32 CONTROL_ATTR_END_OF_STREAM = -5; +const i32 CONTROL_ATTR_TABLET_INDEX = -6; + +//////////////////////////////////////////////////////////////////////////////// + +TLenvalTableReader::TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input) + : Input_(std::move(input)) +{ + TLenvalTableReader::Next(); +} + +TLenvalTableReader::~TLenvalTableReader() +{ } + +void TLenvalTableReader::CheckValidity() const +{ + if (!IsValid()) { + ythrow yexception() << "Iterator is not valid"; + } +} + +bool TLenvalTableReader::IsValid() const +{ + return Valid_; +} + +void TLenvalTableReader::Next() +{ + if (!RowTaken_) { + SkipRow(); + } + + CheckValidity(); + + if (RowIndex_) { + ++*RowIndex_; + } + + while (true) { + try { + i32 value = 0; + if (!ReadInteger(&value, true)) { + return; + } + + while (value < 0 && !IsEndOfStream_) { + switch (value) { + case CONTROL_ATTR_KEY_SWITCH: + if (!AtStart_) { + Valid_ = false; + return; + } else { + ReadInteger(&value); + } + break; + + case CONTROL_ATTR_TABLE_INDEX: { + ui32 tmp = 0; + ReadInteger(&tmp); + TableIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_ROW_INDEX: { + ui64 tmp = 0; + ReadInteger(&tmp); + RowIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_RANGE_INDEX: { + ui32 tmp = 0; + ReadInteger(&tmp); + RangeIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_TABLET_INDEX: { + ui64 tmp = 0; + ReadInteger(&tmp); + TabletIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_END_OF_STREAM: { + IsEndOfStream_ = true; + break; + } + default: + ythrow yexception() << + Sprintf("Invalid control integer %d in lenval stream", value); + } + } + + Length_ = static_cast<ui32>(value); + RowTaken_ = false; + AtStart_ = false; + } catch (const std::exception& e) { + if (!PrepareRetry()) { + throw; + } + continue; + } + break; + } +} + +bool TLenvalTableReader::Retry() +{ + if (PrepareRetry()) { + RowTaken_ = true; + Next(); + return true; + } + return false; +} + +void TLenvalTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } + + RowTaken_ = true; +} + +ui32 TLenvalTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TLenvalTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TLenvalTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0UL); +} + +TMaybe<size_t> TLenvalTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TLenvalTableReader::IsEndOfStream() const +{ + return IsEndOfStream_; +} + +bool TLenvalTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +bool TLenvalTableReader::PrepareRetry() +{ + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + return true; + } + return false; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |
