aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortrivias <trivias@yandex-team.com>2024-01-19 15:31:12 +0300
committertrivias <trivias@yandex-team.com>2024-01-19 15:51:19 +0300
commitb82391321222f253ff2653e731cd98099b5e2ce7 (patch)
tree3de245f24c82e1483ca3e02b9ce7e58c84e95db0
parent5ad9230c8cc1de0dbd03cffca57a245b4fb28b54 (diff)
downloadydb-b82391321222f253ff2653e731cd98099b5e2ce7.tar.gz
Remove navi_serializable feature
-rw-r--r--yt/yt/client/chunk_client/data_statistics.cpp4
-rw-r--r--yt/yt/client/table_client/adapters.cpp7
-rw-r--r--yt/yt/client/table_client/adapters.h4
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp34
-rw-r--r--yt/yt/library/formats/format.h10
-rw-r--r--yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto3
6 files changed, 52 insertions, 10 deletions
diff --git a/yt/yt/client/chunk_client/data_statistics.cpp b/yt/yt/client/chunk_client/data_statistics.cpp
index 46736f5570..5fdcf5dfa9 100644
--- a/yt/yt/client/chunk_client/data_statistics.cpp
+++ b/yt/yt/client/chunk_client/data_statistics.cpp
@@ -44,6 +44,8 @@ TDataStatistics& operator += (TDataStatistics& lhs, const TDataStatistics& rhs)
lhs.set_row_count(lhs.row_count() + rhs.row_count());
lhs.set_regular_disk_space(lhs.regular_disk_space() + rhs.regular_disk_space());
lhs.set_erasure_disk_space(lhs.erasure_disk_space() + rhs.erasure_disk_space());
+ lhs.set_encoded_row_batch_count(lhs.encoded_row_batch_count() + rhs.encoded_row_batch_count());
+ lhs.set_encoded_columnar_batch_count(lhs.encoded_columnar_batch_count() + rhs.encoded_columnar_batch_count());
if (HasInvalidDataWeight(lhs) || HasInvalidDataWeight(rhs)) {
lhs.set_data_weight(-1);
@@ -114,6 +116,8 @@ void Serialize(const TDataStatistics& statistics, NYson::IYsonConsumer* consumer
.Item("erasure_disk_space").Value(statistics.erasure_disk_space())
.Item("unmerged_row_count").Value(statistics.unmerged_row_count())
.Item("unmerged_data_weight").Value(statistics.unmerged_data_weight())
+ .Item("encoded_row_batch_count").Value(statistics.encoded_row_batch_count())
+ .Item("encoded_columnar_batch_count").Value(statistics.encoded_columnar_batch_count())
.EndMap();
}
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp
index 23028fc71c..5001c2b36f 100644
--- a/yt/yt/client/table_client/adapters.cpp
+++ b/yt/yt/client/table_client/adapters.cpp
@@ -160,7 +160,8 @@ void PipeReaderToWriter(
void PipeReaderToWriterByBatches(
const ITableReaderPtr& reader,
const NFormats::ISchemalessFormatWriterPtr& writer,
- const TRowBatchReadOptions& options)
+ const TRowBatchReadOptions& options,
+ TDuration pipeDelay)
{
TPeriodicYielder yielder(TDuration::Seconds(1));
@@ -173,6 +174,10 @@ void PipeReaderToWriterByBatches(
continue;
}
+ if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
+ TDelayedExecutor::WaitForDuration(pipeDelay);
+ }
+
if (!writer->WriteBatch(batch)) {
WaitFor(writer->GetReadyEvent())
.ThrowOnError();
diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h
index 9cafefb49e..4625398b41 100644
--- a/yt/yt/client/table_client/adapters.h
+++ b/yt/yt/client/table_client/adapters.h
@@ -36,10 +36,12 @@ void PipeReaderToWriter(
const IUnversionedRowsetWriterPtr& writer,
const TPipeReaderToWriterOptions& options);
+//! Parameter #pipeDelay is used only for testing.
void PipeReaderToWriterByBatches(
const NApi::ITableReaderPtr& reader,
const NFormats::ISchemalessFormatWriterPtr& writer,
- const TRowBatchReadOptions& options);
+ const TRowBatchReadOptions& options,
+ TDuration pipeDelay = TDuration::Zero());
void PipeInputToOutput(
IInputStream* input,
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index 72a0f12ac7..79a01e97b9 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -997,22 +997,27 @@ private:
auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rows.size());
WriteRowsForSingleTable(currentRows, tableIndex);
+ ++EncodedRowBatchCount_;
}
void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override
{
+ if (TableCount_ > 1) {
+ DoWrite(rowBatch->MaterializeRows());
+ return;
+ }
+
auto columnarBatch = rowBatch->TryAsColumnar();
if (!columnarBatch) {
- YT_LOG_DEBUG("Encoding non-columnar batch; running write rows (RowCount: %v)", rowBatch->GetRowCount());
DoWrite(rowBatch->MaterializeRows());
- } else {
- YT_LOG_DEBUG("Encoding columnar batch (RowCount: %v)", rowBatch->GetRowCount());
- YT_VERIFY(TableCount_ == 1);
- Reset();
- RowCount_ = rowBatch->GetRowCount();
- PrepareColumns(columnarBatch->MaterializeColumns(), 0);
- Encode(0);
+ return;
}
+
+ Reset();
+ RowCount_ = rowBatch->GetRowCount();
+ PrepareColumns(columnarBatch->MaterializeColumns(), 0);
+ Encode(0);
+ ++EncodedColumnarBatchCount_;
}
void Encode(i32 tableIndex)
@@ -1034,6 +1039,16 @@ private:
TryFlushBuffer(true);
}
+ i64 GetEncodedRowBatchCount() const override
+ {
+ return EncodedRowBatchCount_;
+ }
+
+ i64 GetEncodedColumnarBatchCount() const override
+ {
+ return EncodedColumnarBatchCount_;
+ }
+
private:
int TableCount_ = 0;
bool IsFirstBatch_ = true;
@@ -1046,6 +1061,9 @@ private:
std::vector<THashMap<int, int>> TableIdToIndex_;
std::vector<bool> IsFirstBatchForSpecificTable_;
+ i64 EncodedRowBatchCount_ = 0;
+ i64 EncodedColumnarBatchCount_ = 0;
+
struct TMessage
{
std::optional<flatbuffers::FlatBufferBuilder> FlatbufBuilder;
diff --git a/yt/yt/library/formats/format.h b/yt/yt/library/formats/format.h
index 3a85d7f1a4..d848ec4b0f 100644
--- a/yt/yt/library/formats/format.h
+++ b/yt/yt/library/formats/format.h
@@ -25,6 +25,16 @@ struct ISchemalessFormatWriter
virtual i64 GetWrittenSize() const = 0;
+ virtual i64 GetEncodedRowBatchCount() const
+ {
+ return 0;
+ }
+
+ virtual i64 GetEncodedColumnarBatchCount() const
+ {
+ return 0;
+ }
+
[[nodiscard]] virtual TFuture<void> Flush() = 0;
virtual bool WriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) = 0;
diff --git a/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto b/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto
index 9696458aac..d8b939940c 100644
--- a/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto
+++ b/yt/yt_proto/yt/client/chunk_client/proto/data_statistics.proto
@@ -18,6 +18,9 @@ message TDataStatistics
optional int64 unmerged_row_count = 9 [default = 0];
optional int64 unmerged_data_weight = 10 [default = 0];
+
+ optional int64 encoded_columnar_batch_count = 11 [default = 0];
+ optional int64 encoded_row_batch_count = 12 [default = 0];
}
////////////////////////////////////////////////////////////////////////////////