aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfedorov-o <fedorov-o@yandex-team.com>2024-01-23 19:26:03 +0300
committerfedorov-o <fedorov-o@yandex-team.com>2024-01-23 19:48:02 +0300
commitbd7d89b121ae7b9f4427766292c950fcc91c2975 (patch)
treec8645540686ace9153d8f29e4be3fb0e12345d9a
parenta76c7d48687ea8d50769feff3163ae61dd800a3b (diff)
downloadydb-bd7d89b121ae7b9f4427766292c950fcc91c2975.tar.gz
add x/image/draw to arcadia
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp65
1 files changed, 54 insertions, 11 deletions
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index 79a01e97b9..6d74d54e1b 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -158,6 +158,41 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
}
}
+int ExtractTableIndexFromColumn(const TBatchColumn* column)
+{
+ YT_VERIFY(column->Values);
+
+ // Expecting rle but not dictionary column.
+ YT_VERIFY(column->Rle);
+ YT_VERIFY(!column->Rle->ValueColumn->Dictionary);
+
+ const auto* valueColumn = column->Rle->ValueColumn;
+ auto values = valueColumn->GetTypedValues<ui64>();
+
+ // Expecting only one element.
+ YT_VERIFY(values.size() == 1);
+
+ auto rleIndexes = column->GetTypedValues<ui64>();
+
+ auto startIndex = column->StartIndex;
+
+ int tableIndex = 0;
+ DecodeIntegerVector(
+ startIndex,
+ startIndex + 1,
+ valueColumn->Values->BaseValue,
+ valueColumn->Values->ZigZagEncoded,
+ TRange<ui32>(),
+ rleIndexes,
+ [&] (auto index) {
+ return values[index];
+ },
+ [&] (auto value) {
+ tableIndex = value;
+ });
+ return tableIndex;
+}
+
int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type)
{
switch (type) {
@@ -971,10 +1006,10 @@ private:
Reset();
ssize_t sameTableRangeBeginRowIndex = 0;
- i32 tableIndex = 0;
+ int tableIndex = 0;
for (ssize_t rowIndex = 0; rowIndex < std::ssize(rows); rowIndex++) {
- i32 currentTableIndex = -1;
+ int currentTableIndex = -1;
if (TableCount_ > 1) {
const auto& elems = rows[rowIndex].Elements();
for (ssize_t columnIndex = std::ssize(elems) - 1; columnIndex >= 0; --columnIndex) {
@@ -1002,25 +1037,33 @@ private:
void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override
{
- if (TableCount_ > 1) {
- DoWrite(rowBatch->MaterializeRows());
- return;
- }
-
auto columnarBatch = rowBatch->TryAsColumnar();
if (!columnarBatch) {
DoWrite(rowBatch->MaterializeRows());
return;
}
+ int tableIndex = 0;
+ auto batchColumns = columnarBatch->MaterializeColumns();
+
+ if (TableCount_ > 1) {
+ tableIndex = -1;
+ for (const auto* column : batchColumns) {
+ if (column->Id == GetTableIndexColumnId()) {
+ tableIndex = ExtractTableIndexFromColumn(column);
+ break;
+ }
+ }
+ YT_VERIFY(tableIndex < TableCount_ && tableIndex >= 0);
+ }
Reset();
RowCount_ = rowBatch->GetRowCount();
- PrepareColumns(columnarBatch->MaterializeColumns(), 0);
- Encode(0);
+ PrepareColumns(batchColumns, tableIndex);
+ Encode(tableIndex);
++EncodedColumnarBatchCount_;
}
- void Encode(i32 tableIndex)
+ void Encode(int tableIndex)
{
auto output = GetOutputStream();
if (tableIndex != PrevTableIndex_ || IsSchemaMessageNeeded()) {
@@ -1179,7 +1222,7 @@ private:
std::move(bodyWriter)});
}
- void PrepareSchema(i32 tableIndex)
+ void PrepareSchema(int tableIndex)
{
flatbuffers::FlatBufferBuilder flatbufBuilder;