diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-12 19:25:43 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-12 19:25:43 +0300 |
commit | b904ee45712baf856ee4699f539d7a842b4e05f1 (patch) | |
tree | c5c7f66c707327c6c1b6de55e3b34fbb43891989 | |
parent | e69209b1872f1e4517c87540108812fbdbc06de8 (diff) | |
download | ydb-b904ee45712baf856ee4699f539d7a842b4e05f1.tar.gz |
KIKIMR-18453: its correct case for zero batch size, but we have to use chunks count limit
-rw-r--r-- | ydb/core/formats/arrow/size_calcer.cpp | 12 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_size_calcer.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 2 | ||||
-rw-r--r-- | ydb/library/chunks_limiter/chunks_limiter.cpp | 3 |
5 files changed, 31 insertions, 16 deletions
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index 296dd5ab94..26e39be2ba 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -159,25 +159,25 @@ ui64 GetArrayDataSizeImpl<arrow::NullType>(const std::shared_ptr<arrow::Array>& template <> ui64 GetArrayDataSizeImpl<arrow::StringType>(const std::shared_ptr<arrow::Array>& column) { auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column); - return typedColumn->total_values_length(); + return typedColumn->total_values_length() + sizeof(arrow::StringArray::offset_type) * column->length(); } template <> ui64 GetArrayDataSizeImpl<arrow::LargeStringType>(const std::shared_ptr<arrow::Array>& column) { - auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column); - return typedColumn->total_values_length(); + auto typedColumn = std::static_pointer_cast<arrow::LargeStringArray>(column); + return typedColumn->total_values_length() + sizeof(arrow::LargeStringArray::offset_type) * column->length(); } template <> ui64 GetArrayDataSizeImpl<arrow::BinaryType>(const std::shared_ptr<arrow::Array>& column) { auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column); - return typedColumn->total_values_length(); + return typedColumn->total_values_length() + sizeof(arrow::BinaryArray::offset_type) * column->length(); } template <> ui64 GetArrayDataSizeImpl<arrow::LargeBinaryType>(const std::shared_ptr<arrow::Array>& column) { - auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column); - return typedColumn->total_values_length(); + auto typedColumn = std::static_pointer_cast<arrow::LargeBinaryArray>(column); + return typedColumn->total_values_length() + sizeof(arrow::LargeBinaryArray::offset_type) * column->length(); } template <> diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp index b7c959e325..879827e124 100644 --- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp +++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp @@ -16,7 +16,7 @@ Y_UNIT_TEST_SUITE(SizeCalcer) { "field", NConstruction::TStringPoolFiller(8, 512)); std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048); Cerr << GetBatchDataSize(batch) << Endl; - UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 512); + UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 512 + 2048 * 4); } Y_UNIT_TEST(DictionaryStrings) { @@ -24,7 +24,23 @@ Y_UNIT_TEST_SUITE(SizeCalcer) { "field", NConstruction::TStringPoolFiller(8, 512)); std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048); Cerr << GetBatchDataSize(batch) << Endl; - UNIT_ASSERT(GetBatchDataSize(batch) == 8 * 512 + 2048); + UNIT_ASSERT(GetBatchDataSize(batch) == 8 * 512 + 2048 + 4 * 8); + } + + Y_UNIT_TEST(ZeroSimpleStrings) { + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(1, 0)); + std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048); + Cerr << GetBatchDataSize(batch) << Endl; + UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 4); + } + + Y_UNIT_TEST(ZeroDictionaryStrings) { + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TDictionaryArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(1, 0)); + std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048); + Cerr << GetBatchDataSize(batch) << Endl; + UNIT_ASSERT(GetBatchDataSize(batch) == 8 + 2048); } Y_UNIT_TEST(SimpleInt64) { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 94c184a6c3..e63df783da 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -200,6 +200,7 @@ private: void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) { auto g = Stats.MakeGuard("ack"); + Y_VERIFY(!AckReceivedInstant); AckReceivedInstant = TMonotonic::Now(); if (!ComputeActorId) { ComputeActorId = ev->Sender; @@ -208,6 +209,7 @@ private: Y_VERIFY(ev->Get()->Generation == ScanGen); ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount); + Y_VERIFY(ev->Get()->MaxChunksCount == 1); ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString()); if (ScanIterator && !!ScanIterator->GetAvailableResultsCount() && !*ScanIterator->GetAvailableResultsCount()) { ScanCountersPool.OnEmptyAck(); @@ -315,6 +317,7 @@ private: Result->ArrowBatch = batch; Rows += batch->num_rows(); Bytes += NArrow::GetBatchDataSize(batch); + ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetBatchDataSize(batch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names())); break; } } // switch DataFormat @@ -500,9 +503,6 @@ private: << " finished: " << Result->Finished << " pageFault: " << Result->PageFault << " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : "")); - Y_VERIFY(ChunksLimiter.Take(Bytes)); - Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore(); - Finished = Result->Finished; if (Finished) { Stats.Finish(); @@ -513,10 +513,12 @@ private: << " finished: " << Result->Finished << " pageFault: " << Result->PageFault << " stats:" << Stats.DebugString(); } else { + Y_VERIFY(ChunksLimiter.Take(Bytes)); + Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore(); Y_VERIFY(AckReceivedInstant); ScanCountersPool.AckWaitingInfo(TMonotonic::Now() - *AckReceivedInstant); - AckReceivedInstant.reset(); } + AckReceivedInstant.reset(); Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ? diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 7f71a92752..bd763d6646 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2452,7 +2452,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); ui32 resultLimit = 1024 * 1024; - runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit))); + runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit, 0, 1))); auto scan = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanData>(handle); auto batchStats = scan->ArrowBatch; UNIT_ASSERT(batchStats); diff --git a/ydb/library/chunks_limiter/chunks_limiter.cpp b/ydb/library/chunks_limiter/chunks_limiter.cpp index 9efd3e0347..3e39f243d7 100644 --- a/ydb/library/chunks_limiter/chunks_limiter.cpp +++ b/ydb/library/chunks_limiter/chunks_limiter.cpp @@ -12,9 +12,6 @@ TString TChunksLimiter::DebugString() const { } bool TChunksLimiter::Take(const ui64 bytes) { - if (!bytes) { - return true; - } if (!HasMore()) { return false; } |