diff options
author | fedorov-o <fedorov-o@yandex-team.com> | 2024-01-23 19:26:03 +0300 |
---|---|---|
committer | fedorov-o <fedorov-o@yandex-team.com> | 2024-01-23 19:48:02 +0300 |
commit | bd7d89b121ae7b9f4427766292c950fcc91c2975 (patch) | |
tree | c8645540686ace9153d8f29e4be3fb0e12345d9a | |
parent | a76c7d48687ea8d50769feff3163ae61dd800a3b (diff) | |
download | ydb-bd7d89b121ae7b9f4427766292c950fcc91c2975.tar.gz |
add x/image/draw to arcadia
-rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 65 |
1 files changed, 54 insertions, 11 deletions
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index 79a01e97b9..6d74d54e1b 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -158,6 +158,41 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali } } +int ExtractTableIndexFromColumn(const TBatchColumn* column) +{ + YT_VERIFY(column->Values); + + // Expecting rle but not dictionary column. + YT_VERIFY(column->Rle); + YT_VERIFY(!column->Rle->ValueColumn->Dictionary); + + const auto* valueColumn = column->Rle->ValueColumn; + auto values = valueColumn->GetTypedValues<ui64>(); + + // Expecting only one element. + YT_VERIFY(values.size() == 1); + + auto rleIndexes = column->GetTypedValues<ui64>(); + + auto startIndex = column->StartIndex; + + int tableIndex = 0; + DecodeIntegerVector( + startIndex, + startIndex + 1, + valueColumn->Values->BaseValue, + valueColumn->Values->ZigZagEncoded, + TRange<ui32>(), + rleIndexes, + [&] (auto index) { + return values[index]; + }, + [&] (auto value) { + tableIndex = value; + }); + return tableIndex; +} + int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type) { switch (type) { @@ -971,10 +1006,10 @@ private: Reset(); ssize_t sameTableRangeBeginRowIndex = 0; - i32 tableIndex = 0; + int tableIndex = 0; for (ssize_t rowIndex = 0; rowIndex < std::ssize(rows); rowIndex++) { - i32 currentTableIndex = -1; + int currentTableIndex = -1; if (TableCount_ > 1) { const auto& elems = rows[rowIndex].Elements(); for (ssize_t columnIndex = std::ssize(elems) - 1; columnIndex >= 0; --columnIndex) { @@ -1002,25 +1037,33 @@ private: void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override { - if (TableCount_ > 1) { - DoWrite(rowBatch->MaterializeRows()); - return; - } - auto columnarBatch = rowBatch->TryAsColumnar(); if (!columnarBatch) { DoWrite(rowBatch->MaterializeRows()); return; } + int tableIndex = 0; + auto batchColumns = columnarBatch->MaterializeColumns(); + + if (TableCount_ > 1) { + tableIndex = -1; + for (const auto* column : batchColumns) { + if (column->Id == GetTableIndexColumnId()) { + tableIndex = ExtractTableIndexFromColumn(column); + break; + } + } + YT_VERIFY(tableIndex < TableCount_ && tableIndex >= 0); + } Reset(); RowCount_ = rowBatch->GetRowCount(); - PrepareColumns(columnarBatch->MaterializeColumns(), 0); - Encode(0); + PrepareColumns(batchColumns, tableIndex); + Encode(tableIndex); ++EncodedColumnarBatchCount_; } - void Encode(i32 tableIndex) + void Encode(int tableIndex) { auto output = GetOutputStream(); if (tableIndex != PrevTableIndex_ || IsSchemaMessageNeeded()) { @@ -1179,7 +1222,7 @@ private: std::move(bodyWriter)}); } - void PrepareSchema(i32 tableIndex) + void PrepareSchema(int tableIndex) { flatbuffers::FlatBufferBuilder flatbufBuilder; |