aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-12 19:25:43 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-12 19:25:43 +0300
commitb904ee45712baf856ee4699f539d7a842b4e05f1 (patch)
treec5c7f66c707327c6c1b6de55e3b34fbb43891989
parente69209b1872f1e4517c87540108812fbdbc06de8 (diff)
downloadydb-b904ee45712baf856ee4699f539d7a842b4e05f1.tar.gz
KIKIMR-18453: its correct case for zero batch size, but we have to use chunks count limit
-rw-r--r--ydb/core/formats/arrow/size_calcer.cpp12
-rw-r--r--ydb/core/formats/arrow/ut/ut_size_calcer.cpp20
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp10
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp2
-rw-r--r--ydb/library/chunks_limiter/chunks_limiter.cpp3
5 files changed, 31 insertions, 16 deletions
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp
index 296dd5ab94..26e39be2ba 100644
--- a/ydb/core/formats/arrow/size_calcer.cpp
+++ b/ydb/core/formats/arrow/size_calcer.cpp
@@ -159,25 +159,25 @@ ui64 GetArrayDataSizeImpl<arrow::NullType>(const std::shared_ptr<arrow::Array>&
template <>
ui64 GetArrayDataSizeImpl<arrow::StringType>(const std::shared_ptr<arrow::Array>& column) {
auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column);
- return typedColumn->total_values_length();
+ return typedColumn->total_values_length() + sizeof(arrow::StringArray::offset_type) * column->length();
}
template <>
ui64 GetArrayDataSizeImpl<arrow::LargeStringType>(const std::shared_ptr<arrow::Array>& column) {
- auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column);
- return typedColumn->total_values_length();
+ auto typedColumn = std::static_pointer_cast<arrow::LargeStringArray>(column);
+ return typedColumn->total_values_length() + sizeof(arrow::LargeStringArray::offset_type) * column->length();
}
template <>
ui64 GetArrayDataSizeImpl<arrow::BinaryType>(const std::shared_ptr<arrow::Array>& column) {
auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column);
- return typedColumn->total_values_length();
+ return typedColumn->total_values_length() + sizeof(arrow::BinaryArray::offset_type) * column->length();
}
template <>
ui64 GetArrayDataSizeImpl<arrow::LargeBinaryType>(const std::shared_ptr<arrow::Array>& column) {
- auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column);
- return typedColumn->total_values_length();
+ auto typedColumn = std::static_pointer_cast<arrow::LargeBinaryArray>(column);
+ return typedColumn->total_values_length() + sizeof(arrow::LargeBinaryArray::offset_type) * column->length();
}
template <>
diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
index b7c959e325..879827e124 100644
--- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
+++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp
@@ -16,7 +16,7 @@ Y_UNIT_TEST_SUITE(SizeCalcer) {
"field", NConstruction::TStringPoolFiller(8, 512));
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
Cerr << GetBatchDataSize(batch) << Endl;
- UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 512);
+ UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 512 + 2048 * 4);
}
Y_UNIT_TEST(DictionaryStrings) {
@@ -24,7 +24,23 @@ Y_UNIT_TEST_SUITE(SizeCalcer) {
"field", NConstruction::TStringPoolFiller(8, 512));
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
Cerr << GetBatchDataSize(batch) << Endl;
- UNIT_ASSERT(GetBatchDataSize(batch) == 8 * 512 + 2048);
+ UNIT_ASSERT(GetBatchDataSize(batch) == 8 * 512 + 2048 + 4 * 8);
+ }
+
+ Y_UNIT_TEST(ZeroSimpleStrings) {
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(1, 0));
+ std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
+ Cerr << GetBatchDataSize(batch) << Endl;
+ UNIT_ASSERT(GetBatchDataSize(batch) == 2048 * 4);
+ }
+
+ Y_UNIT_TEST(ZeroDictionaryStrings) {
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TDictionaryArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(1, 0));
+ std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
+ Cerr << GetBatchDataSize(batch) << Endl;
+ UNIT_ASSERT(GetBatchDataSize(batch) == 8 + 2048);
}
Y_UNIT_TEST(SimpleInt64) {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 94c184a6c3..e63df783da 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -200,6 +200,7 @@ private:
void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
auto g = Stats.MakeGuard("ack");
+ Y_VERIFY(!AckReceivedInstant);
AckReceivedInstant = TMonotonic::Now();
if (!ComputeActorId) {
ComputeActorId = ev->Sender;
@@ -208,6 +209,7 @@ private:
Y_VERIFY(ev->Get()->Generation == ScanGen);
ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount);
+ Y_VERIFY(ev->Get()->MaxChunksCount == 1);
ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString());
if (ScanIterator && !!ScanIterator->GetAvailableResultsCount() && !*ScanIterator->GetAvailableResultsCount()) {
ScanCountersPool.OnEmptyAck();
@@ -315,6 +317,7 @@ private:
Result->ArrowBatch = batch;
Rows += batch->num_rows();
Bytes += NArrow::GetBatchDataSize(batch);
+ ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetBatchDataSize(batch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names()));
break;
}
} // switch DataFormat
@@ -500,9 +503,6 @@ private:
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault
<< " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : ""));
- Y_VERIFY(ChunksLimiter.Take(Bytes));
- Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
-
Finished = Result->Finished;
if (Finished) {
Stats.Finish();
@@ -513,10 +513,12 @@ private:
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault
<< " stats:" << Stats.DebugString();
} else {
+ Y_VERIFY(ChunksLimiter.Take(Bytes));
+ Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
Y_VERIFY(AckReceivedInstant);
ScanCountersPool.AckWaitingInfo(TMonotonic::Now() - *AckReceivedInstant);
- AckReceivedInstant.reset();
}
+ AckReceivedInstant.reset();
Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 7f71a92752..bd763d6646 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2452,7 +2452,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
ui32 resultLimit = 1024 * 1024;
- runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit)));
+ runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit, 0, 1)));
auto scan = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanData>(handle);
auto batchStats = scan->ArrowBatch;
UNIT_ASSERT(batchStats);
diff --git a/ydb/library/chunks_limiter/chunks_limiter.cpp b/ydb/library/chunks_limiter/chunks_limiter.cpp
index 9efd3e0347..3e39f243d7 100644
--- a/ydb/library/chunks_limiter/chunks_limiter.cpp
+++ b/ydb/library/chunks_limiter/chunks_limiter.cpp
@@ -12,9 +12,6 @@ TString TChunksLimiter::DebugString() const {
}
bool TChunksLimiter::Take(const ui64 bytes) {
- if (!bytes) {
- return true;
- }
if (!HasMore()) {
return false;
}