aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-19 23:41:42 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-19 23:54:21 +0300
commit0bffcf36ed900c47b3abe5b0aeac86dcfb00a16f (patch)
treec716c1b61ec379ffcebd7e85dfa7c2701f252ab1
parentc0e09264adc48044d0fd225374245e9b06a4d78c (diff)
downloadydb-0bffcf36ed900c47b3abe5b0aeac86dcfb00a16f.tar.gz
KIKIMR-19880: move interval data construction into separated thread. dont merge results into one pack
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp2
-rw-r--r--ydb/core/formats/arrow/permutations.cpp30
-rw-r--r--ydb/core/formats/arrow/permutations.h30
-rw-r--r--ydb/core/formats/arrow/program.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp102
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h56
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h39
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp50
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h22
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h4
17 files changed, 221 insertions, 178 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 3e3b9a30fe..6dbcc045ee 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -188,7 +188,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr
auto srcSchema = srcBatch->schema();
for (auto& name : columnNames) {
const int pos = srcSchema->GetFieldIndex(name);
- AFL_VERIFY(pos >= 0)("field_name", name)("fields", srcBatch->schema()->ToString());
+ AFL_VERIFY(pos >= 0)("field_name", name)("names", JoinSeq(",", columnNames))("fields", JoinSeq(",", srcBatch->schema()->field_names()));
fields.push_back(srcSchema->field(pos));
columns.push_back(srcBatch->column(pos));
}
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index 270a6c505b..ce92ca26c7 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -4,6 +4,7 @@
#include "size_calcer.h"
#include <ydb/core/formats/arrow/common/validation.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
+#include <ydb/library/services/services.pb.h>
#include <library/cpp/actors/core/log.h>
namespace NKikimr::NArrow {
@@ -181,6 +182,7 @@ bool THashConstructor::BuildHashUI64(std::shared_ptr<arrow::RecordBatch>& batch,
if (fieldNames.size() == 1) {
auto column = batch->GetColumnByName(fieldNames.front());
if (!column) {
+ AFL_WARN(NKikimrServices::ARROW_HELPER)("event", "cannot_build_hash")("reason", "field_not_found")("field_name", fieldNames.front());
return false;
}
Y_ABORT_UNLESS(column);
@@ -202,6 +204,7 @@ bool THashConstructor::BuildHashUI64(std::shared_ptr<arrow::RecordBatch>& batch,
}
}
if (columns.empty()) {
+ AFL_WARN(NKikimrServices::ARROW_HELPER)("event", "cannot_build_hash")("reason", "fields_not_found")("field_names", JoinSeq(",", fieldNames));
return false;
}
for (i64 i = 0; i < batch->num_rows(); ++i) {
@@ -298,25 +301,37 @@ ui64 TShardedRecordBatch::GetMemorySize() const {
return NArrow::GetBatchMemorySize(RecordBatch);
}
+TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch): RecordBatch(batch) {
+ AFL_VERIFY(RecordBatch);
+ SplittedByShards = {RecordBatch};
+}
+
+TShardedRecordBatch::TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards)
+ : RecordBatch(batch)
+ , SplittedByShards(std::move(splittedByShards))
+{
+ AFL_VERIFY(RecordBatch);
+ AFL_VERIFY(SplittedByShards.size());
+}
+
std::vector<std::shared_ptr<arrow::RecordBatch>> TShardingSplitIndex::Apply(const std::shared_ptr<arrow::RecordBatch>& input) {
- Y_ABORT_UNLESS(input);
- Y_ABORT_UNLESS(input->num_rows() == RecordsCount);
+ AFL_VERIFY(input);
+ AFL_VERIFY(input->num_rows() == RecordsCount);
auto permutation = BuildPermutation();
auto resultBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(input, *permutation)).record_batch();
- Y_ABORT_UNLESS(resultBatch->num_rows() == RecordsCount);
+ AFL_VERIFY(resultBatch->num_rows() == RecordsCount);
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
ui64 startIndex = 0;
for (auto&& i : Remapping) {
result.emplace_back(resultBatch->Slice(startIndex, i.size()));
startIndex += i.size();
}
+ AFL_VERIFY(startIndex == RecordsCount);
return result;
}
NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shardsCount, const std::shared_ptr<arrow::RecordBatch>& input, const std::string& hashColumnName) {
- if (!input) {
- return TShardedRecordBatch();
- }
+ AFL_VERIFY(input);
if (shardsCount == 1) {
return TShardedRecordBatch(input);
}
@@ -336,7 +351,8 @@ NKikimr::NArrow::TShardedRecordBatch TShardingSplitIndex::Apply(const ui32 shard
} else {
Y_ABORT_UNLESS(false);
}
- return TShardedRecordBatch(input, splitter->Apply(input));
+ auto resultBatch = NArrow::TStatusValidator::GetValid(input->RemoveColumn(input->schema()->GetFieldIndex(hashColumnName)));
+ return TShardedRecordBatch(resultBatch, splitter->Apply(resultBatch));
}
std::shared_ptr<arrow::UInt64Array> TShardingSplitIndex::BuildPermutation() const {
diff --git a/ydb/core/formats/arrow/permutations.h b/ydb/core/formats/arrow/permutations.h
index 6cba30c2bb..8acf5240fe 100644
--- a/ydb/core/formats/arrow/permutations.h
+++ b/ydb/core/formats/arrow/permutations.h
@@ -18,16 +18,30 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, RecordBatch);
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, SplittedByShards);
public:
- TShardedRecordBatch() = default;
- TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch)
- : RecordBatch(batch) {
- SplittedByShards = {RecordBatch};
+ TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
+
+ void Cut(const ui32 limit) {
+ RecordBatch = RecordBatch->Slice(0, limit);
+ for (auto&& i : SplittedByShards) {
+ if (i->num_rows() > limit) {
+ i = i->Slice(0, limit);
+ }
+ }
+ }
+
+ bool IsSharded() const {
+ return SplittedByShards.size() > 1;
}
- TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards)
- : RecordBatch(batch)
- , SplittedByShards(std::move(splittedByShards))
- {
+ TShardedRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, std::vector<std::shared_ptr<arrow::RecordBatch>>&& splittedByShards);
+
+ void StripColumns(const std::shared_ptr<arrow::Schema>& schema) {
+ if (RecordBatch) {
+ RecordBatch = NArrow::ExtractColumns(RecordBatch, schema);
+ }
+ for (auto&& i : SplittedByShards) {
+ i = NArrow::ExtractColumns(i, schema);
+ }
}
ui64 GetMemorySize() const;
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index 1ca0ee34e0..66906851f8 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -895,12 +895,13 @@ std::shared_ptr<NArrow::TColumnFilter> TProgram::ApplyEarlyFilter(std::shared_pt
NArrow::TStatusValidator::Validate(step->ApplyAssignes(*datumBatch, NArrow::GetCustomExecContext()));
NArrow::TColumnFilter local = NArrow::TColumnFilter::BuildAllowFilter();
NArrow::TStatusValidator::Validate(step->MakeCombinedFilter(*datumBatch, local));
- *filter = filter->CombineSequentialAnd(local);
if (!useFilter) {
- break;
- }
- if (!local.Apply(batch)) {
- break;
+ *filter = filter->And(local);
+ } else {
+ *filter = filter->CombineSequentialAnd(local);
+ if (!local.Apply(batch)) {
+ break;
+ }
}
} catch (const std::exception& ex) {
AFL_VERIFY(false);
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index dd52217086..79349592d1 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -34,14 +34,9 @@ void TColumnShardScanIterator::FillReadyResults() {
auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
- if (ready[i].GetResultBatch().num_rows() == 0 && !ready[i].GetLastReadKey()) {
- Y_ABORT_UNLESS(i + 1 == ready.size(), "Only last batch can be empty!");
- break;
- }
-
auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
if (batch.GetResultBatch().num_rows() > limitLeft) {
- batch.Slice(0, limitLeft);
+ batch.Cut(limitLeft);
}
limitLeft -= batch.GetResultBatch().num_rows();
ItemsRead += batch.GetResultBatch().num_rows();
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 22c9e4f06f..0a54eb28b4 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -97,6 +97,7 @@ public:
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
, ComputeShardingPolicy(computeShardingPolicy)
{
+ KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
}
void Bootstrap(const TActorContext& ctx) {
@@ -219,40 +220,42 @@ private:
return false;
}
- auto batch = result.GetResultBatchPtrVerified();
- int numRows = batch->num_rows();
- int numColumns = batch->num_columns();
- if (!numRows) {
+ if (!result.GetRecordsCount()) {
ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString());
return true;
}
- ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("format", NKikimrDataEvents::EDataFormat_Name(DataFormat))
- ("columns", numColumns)("rows", numRows);
+ auto& shardedBatch = result.GetShardedBatch();
+ auto batch = shardedBatch.GetRecordBatch();
+ int numRows = batch->num_rows();
+ int numColumns = batch->num_columns();
+ ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("columns", numColumns)("rows", result.GetRecordsCount());
- std::optional<NArrow::TShardedRecordBatch> shardedBatch;
- if (NArrow::THashConstructor::BuildHashUI64(batch, ComputeShardingPolicy.GetColumnNames(), "__compute_sharding_hash")) {
- shardedBatch = NArrow::TShardingSplitIndex::Apply(ComputeShardingPolicy.GetShardsCount(), batch, "__compute_sharding_hash");
- }
AFL_VERIFY(DataFormat == NKikimrDataEvents::FORMAT_ARROW);
{
MakeResult(0);
- if (ComputeShardingPolicy.IsEnabled() && shardedBatch) {
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_success")("count", shardedBatch->GetSplittedByShards().size());
- Result->SplittedBatches = shardedBatch->GetSplittedByShards();
+ if (shardedBatch.IsSharded()) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_success")("count", shardedBatch.GetSplittedByShards().size())("info", ComputeShardingPolicy.DebugString());
+ Result->SplittedBatches = shardedBatch.GetSplittedByShards();
+ Result->ArrowBatch = shardedBatch.GetRecordBatch();
} else {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_problems")("sb", !!shardedBatch)("policy", ComputeShardingPolicy.IsEnabled());
- Result->ArrowBatch = batch;
+ if (ComputeShardingPolicy.IsEnabled()) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_problems")("info", ComputeShardingPolicy.DebugString());
+ }
+ Result->ArrowBatch = shardedBatch.GetRecordBatch();
}
Rows += batch->num_rows();
Bytes += NArrow::GetBatchDataSize(batch);
ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetBatchDataSize(batch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names()));
}
- if (result.GetLastReadKey()) {
- Result->LastKey = ConvertLastKey(result.GetLastReadKey());
- } else {
- Y_ABORT_UNLESS(numRows == 0, "Got non-empty result batch without last key");
+ if (CurrentLastReadKey) {
+ NOlap::NIndexedReader::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, false);
+ NOlap::NIndexedReader::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, false);
+ AFL_VERIFY(pOld < pNew);
}
+ CurrentLastReadKey = result.GetLastReadKey();
+
+ Result->LastKey = ConvertLastKey(result.GetLastReadKey());
SendResult(false, false);
ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
return true;
@@ -533,6 +536,7 @@ private:
TChunksLimiter ChunksLimiter;
THolder<TEvKqpCompute::TEvScanData> Result;
+ std::shared_ptr<arrow::RecordBatch> CurrentLastReadKey;
i64 InFlightReads = 0;
bool Finished = false;
@@ -874,50 +878,48 @@ namespace NKikimr::NOlap {
class TCurrentBatch {
private:
std::vector<TPartialReadResult> Results;
+ ui64 RecordsCount = 0;
public:
+ ui64 GetRecordsCount() const {
+ return RecordsCount;
+ }
+
void AddChunk(TPartialReadResult&& res) {
+ RecordsCount += res.GetRecordsCount();
Results.emplace_back(std::move(res));
}
- void FillResult(std::vector<TPartialReadResult>& result, const bool /*mergePartsToMax*/) const {
+ void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const {
if (Results.empty()) {
return;
}
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards;
- for (auto&& i : Results) {
- batches.emplace_back(i.GetResultBatchPtrVerified());
- guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end());
+ if (mergePartsToMax) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards;
+ for (auto&& i : Results) {
+ batches.emplace_back(i.GetResultBatchPtrVerified());
+ guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end());
+ }
+ auto res = NArrow::CombineBatches(batches);
+ AFL_VERIFY(res);
+ result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey()));
+ } else {
+ for (auto&& i : Results) {
+ result.emplace_back(std::move(i));
+ }
}
- auto res = NArrow::CombineBatches(batches);
- AFL_VERIFY(res);
- result.emplace_back(TPartialReadResult(guards, res, Results.back().GetLastReadKey()));
}
};
-std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 /*maxRecordsInResult*/, const bool /*mergePartsToMax*/) {
+std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) {
+ std::vector<TCurrentBatch> resultBatches;
TCurrentBatch currentBatch;
for (auto&& i : resultsExt) {
+ AFL_VERIFY(i.GetRecordsCount());
currentBatch.AddChunk(std::move(i));
- }
- std::vector<TPartialReadResult> result;
- currentBatch.FillResult(result, true);
- /*
- std::vector<TCurrentBatch> resultBatches;
- for (auto&& i : resultsExt) {
- std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch;
- while (currentBatchSplitting && currentBatchSplitting->num_rows()) {
- const ui32 currentRecordsCount = currentBatch.GetRecordsCount();
- if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) {
- currentBatch.AddChunk(currentBatchSplitting, i.GetResourcesGuards());
- currentBatchSplitting = nullptr;
- } else {
- auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount);
- currentBatch.AddChunk(currentSlice, i.GetResourcesGuards());
- resultBatches.emplace_back(std::move(currentBatch));
- currentBatch = TCurrentBatch();
- currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount);
- }
+ if (currentBatch.GetRecordsCount() >= maxRecordsInResult) {
+ resultBatches.emplace_back(std::move(currentBatch));
+ currentBatch = TCurrentBatch();
}
}
if (currentBatch.GetRecordsCount()) {
@@ -926,10 +928,8 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults
std::vector<TPartialReadResult> result;
for (auto&& i : resultBatches) {
- Y_UNUSED(mergePartsToMax);
- i.FillResult(result, true);
+ i.FillResult(result, mergePartsToMax);
}
- */
return result;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index d8269927b1..7576cb3247 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -13,13 +13,25 @@ namespace NKikimr::NOlap {
class TPartialReadResult {
private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>, ResourcesGuards);
- std::shared_ptr<arrow::RecordBatch> ResultBatch;
+ NArrow::TShardedRecordBatch ResultBatch;
// This 1-row batch contains the last key that was read while producing the ResultBatch.
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<arrow::RecordBatch> LastReadKey;
public:
+ void Cut(const ui32 limit) {
+ ResultBatch.Cut(limit);
+ }
+
+ const arrow::RecordBatch& GetResultBatch() const {
+ return *ResultBatch.GetRecordBatch();
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const {
+ return ResultBatch.GetRecordBatch();
+ }
+
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuardOnly() const {
AFL_VERIFY(ResourcesGuards.size() == 1);
AFL_VERIFY(!!ResourcesGuards.front());
@@ -27,45 +39,19 @@ public:
}
ui64 GetMemorySize() const {
- return NArrow::GetBatchMemorySize(ResultBatch);
+ return ResultBatch.GetMemorySize();
}
ui64 GetRecordsCount() const {
- return ResultBatch ? ResultBatch->num_rows() : 0;
- }
-
- void StripColumns(const std::shared_ptr<arrow::Schema>& schema) {
- if (ResultBatch) {
- ResultBatch = NArrow::ExtractColumns(ResultBatch, schema);
- }
+ return ResultBatch.GetRecordsCount();
}
static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax);
- void Slice(const ui32 offset, const ui32 length) {
- ResultBatch = ResultBatch->Slice(offset, length);
- }
-
- void ApplyProgram(const NOlap::TProgramContainer& program) {
- if (!program.HasProgram()) {
- return;
- }
- auto status = program.ApplyProgram(ResultBatch);
- if (!status.ok()) {
- ErrorString = status.message();
- }
- }
-
- const std::shared_ptr<arrow::RecordBatch>& GetResultBatchPtrVerified() const {
- Y_ABORT_UNLESS(ResultBatch);
+ const NArrow::TShardedRecordBatch& GetShardedBatch() const {
return ResultBatch;
}
- const arrow::RecordBatch& GetResultBatch() const {
- Y_ABORT_UNLESS(ResultBatch);
- return *ResultBatch;
- }
-
const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const {
return LastReadKey;
}
@@ -74,26 +60,26 @@ public:
explicit TPartialReadResult(
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
- std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey)
+ const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)
: ResourcesGuards(resourcesGuards)
, ResultBatch(batch)
, LastReadKey(lastKey) {
for (auto&& i : ResourcesGuards) {
AFL_VERIFY(i);
}
- Y_ABORT_UNLESS(ResultBatch);
- Y_ABORT_UNLESS(ResultBatch->num_rows());
+ Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(LastReadKey);
+ Y_ABORT_UNLESS(LastReadKey->num_rows() == 1);
}
explicit TPartialReadResult(
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuards,
- std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey)
+ const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>({resourcesGuards}), batch, lastKey) {
AFL_VERIFY(resourcesGuards);
}
- explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey)
+ explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>(), batch, lastKey) {
}
};
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
index aaad1e70c8..05fe129905 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
@@ -16,10 +16,12 @@ std::optional<NOlap::TPartialReadResult> TStatsIterator::GetBatch() {
}
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
-
+ NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch));
+ if (!resultBatch->num_rows()) {
+ return {};
+ }
NOlap::TPartialReadResult out(resultBatch, lastKey);
- out.ApplyProgram(ReadMetadata->GetProgram());
return std::move(out);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
index 5b65afa0d0..1a94bbb315 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp
@@ -15,9 +15,6 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS
for (auto&& i : external.ColumnIds) {
result.ColumnIds.erase(i);
}
- for (auto&& i : external.ColumnNames) {
- result.ColumnNames.erase(i);
- }
arrow::FieldVector fields;
for (auto&& i : Schema->fields()) {
if (!external.Schema->GetFieldByName(i->name())) {
@@ -25,13 +22,13 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS
}
}
result.Schema = std::make_shared<arrow::Schema>(fields);
+ result.Rebuild();
return result;
}
NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const {
TColumnsSet result = *this;
result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
- result.ColumnNames.insert(external.ColumnNames.begin(), external.ColumnNames.end());
auto fields = result.Schema->fields();
for (auto&& i : external.Schema->fields()) {
if (!result.Schema->GetFieldByName(i->name())) {
@@ -39,6 +36,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS
}
}
result.Schema = std::make_shared<arrow::Schema>(fields);
+ result.Rebuild();
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
index 9da046b6e7..58e44c3d09 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
@@ -8,20 +8,23 @@ class TColumnsSet {
private:
YDB_READONLY_DEF(std::set<ui32>, ColumnIds);
YDB_READONLY_DEF(std::set<TString>, ColumnNames);
- mutable std::optional<std::vector<TString>> ColumnNamesVector;
+ std::vector<TString> ColumnNamesVector;
YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema);
+
+ void Rebuild() {
+ ColumnNamesVector.clear();
+ ColumnNames.clear();
+ for (auto&& i : Schema->field_names()) {
+ ColumnNamesVector.emplace_back(i);
+ ColumnNames.emplace(i);
+ }
+ }
+
public:
TColumnsSet() = default;
const std::vector<TString>& GetColumnNamesVector() const {
- if (!ColumnNamesVector) {
- std::vector<TString> result;
- for (auto&& i : Schema->field_names()) {
- result.emplace_back(i);
- }
- ColumnNamesVector = std::move(result);
- }
- return *ColumnNamesVector;
+ return ColumnNamesVector;
}
ui32 GetSize() const {
@@ -30,20 +33,18 @@ public:
bool ColumnsOnly(const std::vector<std::string>& fieldNames) const;
- TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo) {
- ColumnIds = columnIds;
+ TColumnsSet(const std::set<ui32>& columnIds, const TIndexInfo& indexInfo)
+ : ColumnIds(columnIds)
+ {
Schema = indexInfo.GetColumnsSchema(ColumnIds);
- for (auto&& i : ColumnIds) {
- ColumnNames.emplace(indexInfo.GetColumnName(i));
- }
+ Rebuild();
}
- TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo) {
- for (auto&& i : columnIds) {
- Y_ABORT_UNLESS(ColumnIds.emplace(i).second);
- ColumnNames.emplace(indexInfo.GetColumnName(i));
- }
+ TColumnsSet(const std::vector<ui32>& columnIds, const TIndexInfo& indexInfo)
+ : ColumnIds(columnIds.begin(), columnIds.end())
+ {
Schema = indexInfo.GetColumnsSchema(ColumnIds);
+ Rebuild();
}
bool Contains(const std::shared_ptr<TColumnsSet>& columnsSet) const {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
index febfa79e71..f6e7c92632 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
@@ -13,7 +13,7 @@ bool TAssembleFilter::DoExecute() {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
options.IncludedColumnIds = FilterColumnIds;
ui32 needSnapshotColumnsRestore = 0;
- const bool needSnapshotsFilter = ReadMetadata->GetSnapshot() < RecordsMaxSnapshot;
+ const bool needSnapshotsFilter = true;// ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot;
if (!needSnapshotsFilter && UseFilter) {
for (auto&& i : TIndexInfo::GetSpecialColumnIds()) {
needSnapshotColumnsRestore += options.IncludedColumnIds->erase(i) ? 1 : 0;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 7f0056d49d..c115c8b1f1 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -15,29 +15,54 @@ private:
std::map<ui32, std::shared_ptr<IDataSource>> Sources;
std::shared_ptr<TMergingContext> MergingContext;
const ui32 IntervalIdx;
+ std::optional<NArrow::TShardedRecordBatch> ShardedBatch;
+
+ void PrepareResultBatch() {
+ if (!ResultBatch || ResultBatch->num_rows() == 0) {
+ ResultBatch = nullptr;
+ LastPK = nullptr;
+ return;
+ }
+ {
+ ResultBatch = NArrow::ExtractColumns(ResultBatch, Context->GetReadMetadata()->GetResultSchema());
+ AFL_VERIFY(ResultBatch->num_columns() == Context->GetReadMetadata()->GetResultSchema()->num_fields());
+ NArrow::TStatusValidator::Validate(Context->GetReadMetadata()->GetProgram().ApplyProgram(ResultBatch));
+ }
+ if (ResultBatch->num_rows()) {
+ const auto& shardingPolicy = Context->GetCommonContext()->GetComputeShardingPolicy();
+ if (NArrow::THashConstructor::BuildHashUI64(ResultBatch, shardingPolicy.GetColumnNames(), "__compute_sharding_hash")) {
+ ShardedBatch = NArrow::TShardingSplitIndex::Apply(shardingPolicy.GetShardsCount(), ResultBatch, "__compute_sharding_hash");
+ } else {
+ ShardedBatch = NArrow::TShardedRecordBatch(ResultBatch);
+ }
+ AFL_VERIFY(!!LastPK == !!ShardedBatch->GetRecordsCount())("lpk", !!LastPK)("sb", ShardedBatch->GetRecordsCount());
+ } else {
+ ResultBatch = nullptr;
+ LastPK = nullptr;
+ }
+ }
protected:
virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const override {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoApply")("interval_idx", MergingContext->GetIntervalIdx());
auto& reader = static_cast<TPlainReadData&>(indexedDataRead);
- reader.MutableScanner().OnIntervalResult(ResultBatch, LastPK, IntervalIdx, reader);
+ reader.MutableScanner().OnIntervalResult(ShardedBatch, LastPK, IntervalIdx, reader);
return true;
}
virtual bool DoExecute() override {
- if (MergingContext->IsExclusiveInterval(Sources.size())) {
+ if (MergingContext->IsExclusiveInterval()) {
ResultBatch = Sources.begin()->second->GetBatch();
- LastPK = Sources.begin()->second->GetLastPK();
- Sources.clear();
if (ResultBatch && ResultBatch->num_rows()) {
+ LastPK = Sources.begin()->second->GetLastPK();
ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetResultColumns()->GetColumnNamesVector());
AFL_VERIFY(ResultBatch)("info", Context->GetResultColumns()->GetSchema()->ToString());
Context->GetCommonContext()->GetCounters().OnNoScanInterval(ResultBatch->num_rows());
if (Context->GetCommonContext()->IsReverse()) {
ResultBatch = NArrow::ReverseRecords(ResultBatch);
}
- } else {
- ResultBatch = nullptr;
- LastPK = nullptr;
+ PrepareResultBatch();
}
+ Sources.clear();
+ AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows()));
return true;
}
std::shared_ptr<NIndexedReader::TMergePartialStream> merger = Context->BuildMerger();
@@ -73,10 +98,11 @@ protected:
Context->GetCommonContext()->GetCounters().OnLinearScanInterval(rbBuilder->GetRecordsCount());
ResultBatch = rbBuilder->Finalize();
}
- AFL_VERIFY(!!lastResultPosition == (!!ResultBatch && ResultBatch->num_rows()));
if (lastResultPosition) {
LastPK = lastResultPosition->ExtractSortingPosition();
}
+ AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows()));
+ PrepareResultBatch();
return true;
}
public:
@@ -129,9 +155,9 @@ void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) {
TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const std::shared_ptr<TSpecialReadContext>& context,
- const bool includeFinish, const bool includeStart)
- : TTaskBase(0, context->GetMemoryForSources(sources, TMergingContext::IsExclusiveInterval(sources.size(), includeStart, includeFinish)), "", context->GetCommonContext()->GetResourcesTaskContext())
- , MergingContext(std::make_shared<TMergingContext>(start, finish, intervalIdx, includeFinish, includeStart))
+ const bool includeFinish, const bool includeStart, const bool isExclusiveInterval)
+ : TTaskBase(0, context->GetMemoryForSources(sources, isExclusiveInterval), "", context->GetCommonContext()->GetResourcesTaskContext())
+ , MergingContext(std::make_shared<TMergingContext>(start, finish, intervalIdx, includeFinish, includeStart, isExclusiveInterval))
, Context(context)
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, Sources(sources)
@@ -148,7 +174,7 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro
("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size());
OnInitResourcesGuard(guard);
for (auto&& [_, i] : Sources) {
- i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval(Sources.size())), i);
+ i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
index b8be9a1243..4c72476158 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
@@ -14,23 +14,22 @@ protected:
YDB_READONLY(bool, IncludeFinish, true);
YDB_READONLY(bool, IncludeStart, false);
YDB_READONLY(ui32, IntervalIdx, 0);
+ bool IsExclusiveIntervalFlag = false;
public:
- bool IsExclusiveInterval(const ui32 sourcesCount) const {
- return IsExclusiveInterval(sourcesCount, IncludeStart, IncludeFinish);
- }
-
- static bool IsExclusiveInterval(const ui32 sourcesCount, const bool includeStart, const bool includeFinish) {
- return includeFinish && includeStart && sourcesCount == 1;
- }
-
TMergingContext(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
- const ui32 intervalIdx, const bool includeFinish, const bool includeStart)
+ const ui32 intervalIdx, const bool includeFinish, const bool includeStart, const bool isExclusiveInterval)
: Start(start)
, Finish(finish)
, IncludeFinish(includeFinish)
, IncludeStart(includeStart)
- , IntervalIdx(intervalIdx) {
+ , IntervalIdx(intervalIdx)
+ , IsExclusiveIntervalFlag(isExclusiveInterval)
+ {
+
+ }
+ bool IsExclusiveInterval() const {
+ return IsExclusiveIntervalFlag;
}
NJson::TJsonValue DebugJson() const {
@@ -39,6 +38,7 @@ public:
result.InsertValue("idx", IntervalIdx);
result.InsertValue("finish", Finish.DebugJson());
result.InsertValue("include_finish", IncludeFinish);
+ result.InsertValue("exclusive", IsExclusiveIntervalFlag);
return result;
}
@@ -116,7 +116,7 @@ public:
TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const std::shared_ptr<TSpecialReadContext>& context,
- const bool includeFinish, const bool includeStart);
+ const bool includeFinish, const bool includeStart, const bool isExclusiveInterval);
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index 54b006f660..e818904dc4 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -51,19 +51,19 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
}
std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) {
- if ((ReadyResultsCount < maxRowsInBatch || (GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch)) && !Scanner->IsFinished()) {
+ if ((GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch) && !Scanner->IsFinished()) {
return {};
}
- ReadyResultsCount = 0;
-
auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch, GetContext().GetIsInternalRead());
- PartialResults.clear();
ui32 count = 0;
for (auto&& r: result) {
- r.StripColumns(GetReadMetadata()->GetResultSchema());
count += r.GetRecordsCount();
- r.ApplyProgram(GetReadMetadata()->GetProgram());
}
+ AFL_VERIFY(count == ReadyResultsCount);
+
+ ReadyResultsCount = 0;
+ PartialResults.clear();
+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index a468e978ff..0fd0c4d799 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -5,18 +5,18 @@
namespace NKikimr::NOlap::NPlainReader {
-void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) {
+void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& newBatch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader) {
auto itInterval = FetchingIntervals.find(intervalIdx);
AFL_VERIFY(itInterval != FetchingIntervals.end());
if (!Context->GetCommonContext()->GetReadMetadata()->IsSorted()) {
- if (newBatch && newBatch->num_rows()) {
- reader.OnIntervalResult(std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), newBatch, lastPK));
+ if (newBatch && newBatch->GetRecordsCount()) {
+ reader.OnIntervalResult(std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), *newBatch, lastPK));
}
AFL_VERIFY(FetchingIntervals.erase(intervalIdx));
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count", newBatch ? newBatch->num_rows() : 0);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count", newBatch ? newBatch->GetRecordsCount() : 0);
} else {
- if (newBatch && newBatch->num_rows()) {
- AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), newBatch, lastPK)).second);
+ if (newBatch && newBatch->GetRecordsCount()) {
+ AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(itInterval->second->GetResourcesGuard(), *newBatch, lastPK)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
}
@@ -72,7 +72,6 @@ bool TScanHead::BuildNextInterval() {
while (BorderPoints.size()) {
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
bool includeStart = firstBorderPointInfo.GetStartSources().size();
-
for (auto&& i : firstBorderPointInfo.GetStartSources()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("add_source", i->GetSourceIdx());
AFL_VERIFY(CurrentSegments.emplace(i->GetSourceIdx(), i).second)("idx", i->GetSourceIdx());
@@ -83,7 +82,7 @@ bool TScanHead::BuildNextInterval() {
const ui32 intervalIdx = SegmentIdxCounter++;
auto it = FetchingIntervals.emplace(intervalIdx, std::make_shared<TFetchingInterval>(
BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, CurrentSegments,
- Context, true, true)).first;
+ Context, true, true, false)).first;
IntervalStats.emplace_back(CurrentSegments.size(), true);
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), it->second);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", it->second->DebugJson());
@@ -100,9 +99,10 @@ bool TScanHead::BuildNextInterval() {
Y_ABORT_UNLESS(BorderPoints.size());
const bool includeFinish = BorderPoints.begin()->second.GetStartSources().empty();
const ui32 intervalIdx = SegmentIdxCounter++;
+ const bool isExclusiveInterval = (CurrentSegments.size() == 1) && includeStart && includeFinish;
auto it = FetchingIntervals.emplace(intervalIdx, std::make_shared<TFetchingInterval>(
*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentSegments,
- Context, includeFinish, includeStart)).first;
+ Context, includeFinish, includeStart, isExclusiveInterval)).first;
IntervalStats.emplace_back(CurrentSegments.size(), false);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", it->second->DebugJson());
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), it->second);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index bc505f416c..bd646f68d0 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -74,7 +74,7 @@ public:
return sb;
}
- void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader);
+ void OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch>& batch, const std::shared_ptr<arrow::RecordBatch>& lastPK, const ui32 intervalIdx, TPlainReadData& reader);
TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 2583693146..1e5b27f8a5 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -17,6 +17,10 @@ private:
YDB_READONLY(ui32, ShardsCount, 0);
YDB_READONLY_DEF(std::vector<std::string>, ColumnNames);
public:
+ TString DebugString() const {
+ return TStringBuilder() << "shards_count:" << ShardsCount << ";columns=" << JoinSeq(",", ColumnNames) << ";";
+ }
+
TComputeShardingPolicy() = default;
bool DeserializeFromProto(const NKikimrTxDataShard::TComputeShardingPolicy& policy) {
ShardsCount = policy.GetShardsCount();