diff options
author | trivias <trivias@yandex-team.com> | 2024-01-19 15:31:12 +0300 |
---|---|---|
committer | trivias <trivias@yandex-team.com> | 2024-01-19 15:51:19 +0300 |
commit | b82391321222f253ff2653e731cd98099b5e2ce7 (patch) | |
tree | 3de245f24c82e1483ca3e02b9ce7e58c84e95db0 | |
parent | 5ad9230c8cc1de0dbd03cffca57a245b4fb28b54 (diff) | |
download | ydb-b82391321222f253ff2653e731cd98099b5e2ce7.tar.gz |
Remove navi_serializable feature
-rw-r--r-- | yt/yt/client/chunk_client/data_statistics.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.cpp | 7 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.h | 4 | ||||
-rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 34 | ||||
-rw-r--r-- | yt/yt/library/formats/format.h | 10 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto | 3 |
6 files changed, 52 insertions, 10 deletions
diff --git a/yt/yt/client/chunk_client/data_statistics.cpp b/yt/yt/client/chunk_client/data_statistics.cpp index 46736f5570..5fdcf5dfa9 100644 --- a/yt/yt/client/chunk_client/data_statistics.cpp +++ b/yt/yt/client/chunk_client/data_statistics.cpp @@ -44,6 +44,8 @@ TDataStatistics& operator += (TDataStatistics& lhs, const TDataStatistics& rhs) lhs.set_row_count(lhs.row_count() + rhs.row_count()); lhs.set_regular_disk_space(lhs.regular_disk_space() + rhs.regular_disk_space()); lhs.set_erasure_disk_space(lhs.erasure_disk_space() + rhs.erasure_disk_space()); + lhs.set_encoded_row_batch_count(lhs.encoded_row_batch_count() + rhs.encoded_row_batch_count()); + lhs.set_encoded_columnar_batch_count(lhs.encoded_columnar_batch_count() + rhs.encoded_columnar_batch_count()); if (HasInvalidDataWeight(lhs) || HasInvalidDataWeight(rhs)) { lhs.set_data_weight(-1); @@ -114,6 +116,8 @@ void Serialize(const TDataStatistics& statistics, NYson::IYsonConsumer* consumer .Item("erasure_disk_space").Value(statistics.erasure_disk_space()) .Item("unmerged_row_count").Value(statistics.unmerged_row_count()) .Item("unmerged_data_weight").Value(statistics.unmerged_data_weight()) + .Item("encoded_row_batch_count").Value(statistics.encoded_row_batch_count()) + .Item("encoded_columnar_batch_count").Value(statistics.encoded_columnar_batch_count()) .EndMap(); } diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp index 23028fc71c..5001c2b36f 100644 --- a/yt/yt/client/table_client/adapters.cpp +++ b/yt/yt/client/table_client/adapters.cpp @@ -160,7 +160,8 @@ void PipeReaderToWriter( void PipeReaderToWriterByBatches( const ITableReaderPtr& reader, const NFormats::ISchemalessFormatWriterPtr& writer, - const TRowBatchReadOptions& options) + const TRowBatchReadOptions& options, + TDuration pipeDelay) { TPeriodicYielder yielder(TDuration::Seconds(1)); @@ -173,6 +174,10 @@ void PipeReaderToWriterByBatches( continue; } + if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) { + TDelayedExecutor::WaitForDuration(pipeDelay); + } + if (!writer->WriteBatch(batch)) { WaitFor(writer->GetReadyEvent()) .ThrowOnError(); diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h index 9cafefb49e..4625398b41 100644 --- a/yt/yt/client/table_client/adapters.h +++ b/yt/yt/client/table_client/adapters.h @@ -36,10 +36,12 @@ void PipeReaderToWriter( const IUnversionedRowsetWriterPtr& writer, const TPipeReaderToWriterOptions& options); +//! Parameter #pipeDelay is used only for testing. void PipeReaderToWriterByBatches( const NApi::ITableReaderPtr& reader, const NFormats::ISchemalessFormatWriterPtr& writer, - const TRowBatchReadOptions& options); + const TRowBatchReadOptions& options, + TDuration pipeDelay = TDuration::Zero()); void PipeInputToOutput( IInputStream* input, diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index 72a0f12ac7..79a01e97b9 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -997,22 +997,27 @@ private: auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rows.size()); WriteRowsForSingleTable(currentRows, tableIndex); + ++EncodedRowBatchCount_; } void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override { + if (TableCount_ > 1) { + DoWrite(rowBatch->MaterializeRows()); + return; + } + auto columnarBatch = rowBatch->TryAsColumnar(); if (!columnarBatch) { - YT_LOG_DEBUG("Encoding non-columnar batch; running write rows (RowCount: %v)", rowBatch->GetRowCount()); DoWrite(rowBatch->MaterializeRows()); - } else { - YT_LOG_DEBUG("Encoding columnar batch (RowCount: %v)", rowBatch->GetRowCount()); - YT_VERIFY(TableCount_ == 1); - Reset(); - RowCount_ = rowBatch->GetRowCount(); - PrepareColumns(columnarBatch->MaterializeColumns(), 0); - Encode(0); + return; } + + Reset(); + RowCount_ = rowBatch->GetRowCount(); + PrepareColumns(columnarBatch->MaterializeColumns(), 0); + Encode(0); + ++EncodedColumnarBatchCount_; } void Encode(i32 tableIndex) @@ -1034,6 +1039,16 @@ private: TryFlushBuffer(true); } + i64 GetEncodedRowBatchCount() const override + { + return EncodedRowBatchCount_; + } + + i64 GetEncodedColumnarBatchCount() const override + { + return EncodedColumnarBatchCount_; + } + private: int TableCount_ = 0; bool IsFirstBatch_ = true; @@ -1046,6 +1061,9 @@ private: std::vector<THashMap<int, int>> TableIdToIndex_; std::vector<bool> IsFirstBatchForSpecificTable_; + i64 EncodedRowBatchCount_ = 0; + i64 EncodedColumnarBatchCount_ = 0; + struct TMessage { std::optional<flatbuffers::FlatBufferBuilder> FlatbufBuilder; diff --git a/yt/yt/library/formats/format.h b/yt/yt/library/formats/format.h index 3a85d7f1a4..d848ec4b0f 100644 --- a/yt/yt/library/formats/format.h +++ b/yt/yt/library/formats/format.h @@ -25,6 +25,16 @@ struct ISchemalessFormatWriter virtual i64 GetWrittenSize() const = 0; + virtual i64 GetEncodedRowBatchCount() const + { + return 0; + } + + virtual i64 GetEncodedColumnarBatchCount() const + { + return 0; + } + [[nodiscard]] virtual TFuture<void> Flush() = 0; virtual bool WriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) = 0; diff --git a/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto b/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto index 9696458aac..d8b939940c 100644 --- a/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto +++ b/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto @@ -18,6 +18,9 @@ message TDataStatistics optional int64 unmerged_row_count = 9 [default = 0]; optional int64 unmerged_data_weight = 10 [default = 0]; + + optional int64 encoded_columnar_batch_count = 11 [default = 0]; + optional int64 encoded_row_batch_count = 12 [default = 0]; } //////////////////////////////////////////////////////////////////////////////// |