aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-16 17:44:12 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-16 17:44:12 +0300
commitafdecb0e4f540b92a669eca4ff54e6b2ca899830 (patch)
treee83b23a20d602bd3224b8c330410b93b126c95b1
parent1f8614d61e3c3105e37ca88d36d15b675d3e010e (diff)
downloadydb-afdecb0e4f540b92a669eca4ff54e6b2ca899830.tar.gz
batch using directly
-rw-r--r--ydb/core/formats/arrow_batch_builder.cpp117
-rw-r--r--ydb/core/formats/arrow_batch_builder.h136
-rw-r--r--ydb/core/kqp/kqp_compute.cpp42
-rw-r--r--ydb/core/kqp/kqp_compute.h2
-rw-r--r--ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.cpp214
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.h117
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp17
8 files changed, 370 insertions, 279 deletions
diff --git a/ydb/core/formats/arrow_batch_builder.cpp b/ydb/core/formats/arrow_batch_builder.cpp
index 8ecf97b9c78..ee74796711a 100644
--- a/ydb/core/formats/arrow_batch_builder.cpp
+++ b/ydb/core/formats/arrow_batch_builder.cpp
@@ -68,6 +68,123 @@ arrow::Status AppendCell(arrow::RecordBatchBuilder& builder, const TCell& cell,
}
+NKikimr::NArrow::TRecordBatchConstructor::TRecordConstructor& TRecordBatchConstructor::TRecordConstructor::AddRecordValue(
+ const std::shared_ptr<arrow::Scalar>& value)
+{
+ Y_VERIFY(CurrentBuilder != Owner.Builders.end());
+ AddValueToBuilder(**CurrentBuilder, value, WithCast);
+ ++CurrentBuilder;
+ return *this;
+}
+
+void TRecordBatchConstructor::AddValueToBuilder(arrow::ArrayBuilder& builder,
+ const std::shared_ptr<arrow::Scalar>& value, const bool withCast) {
+ if (!value) {
+ Y_VERIFY(builder.AppendNull().ok());
+ } else if (!withCast) {
+ Y_VERIFY(builder.AppendScalar(*value).ok());
+ } else {
+ auto castStatus = value->CastTo(builder.type());
+ Y_VERIFY(castStatus.ok());
+ Y_VERIFY(builder.AppendScalar(*castStatus.ValueUnsafe()).ok());
+ }
+}
+
+NKikimr::NArrow::TRecordBatchConstructor& TRecordBatchConstructor::AddRecordsBatchSlow(
+ const std::shared_ptr<arrow::RecordBatch>& value, const bool withCast /*= false*/, const bool withRemap /*= false*/)
+{
+ Y_VERIFY(!!value);
+ Y_VERIFY(!!Schema);
+ Y_VERIFY(!InConstruction);
+ std::vector<std::shared_ptr<arrow::Array>> batchColumns;
+ if (withRemap) {
+ for (auto&& f : Schema->fields()) {
+ batchColumns.emplace_back(value->GetColumnByName(f->name()));
+ }
+ } else {
+ Y_VERIFY(value->num_columns() <= Schema->num_fields());
+ for (auto&& i : value->columns()) {
+ batchColumns.emplace_back(i);
+ }
+ for (i32 i = value->num_columns(); i < Schema->num_fields(); ++i) {
+ batchColumns.emplace_back(nullptr);
+ }
+ }
+ Y_VERIFY((int)batchColumns.size() == Schema->num_fields());
+ Y_VERIFY((int)Builders.size() == Schema->num_fields());
+ ui32 cIdx = 0;
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>>::const_iterator currentBuilder = Builders.begin();
+ for (auto&& c : batchColumns) {
+ if (!c) {
+ Y_VERIFY((*currentBuilder)->AppendNulls(value->num_rows()).ok());
+ } else {
+ for (ui32 r = 0; r < value->num_rows(); ++r) {
+ std::shared_ptr<arrow::Scalar> value;
+ if (c->IsNull(r)) {
+ value = nullptr;
+ } else {
+ auto statusGet = c->GetScalar(r);
+ Y_VERIFY(statusGet.ok());
+ value = statusGet.ValueUnsafe();
+ }
+ AddValueToBuilder(**currentBuilder, value, withCast);
+ }
+ }
+ ++currentBuilder;
+ ++cIdx;
+ }
+ RecordsCount += value->num_rows();
+ return *this;
+}
+
+NKikimr::NArrow::TRecordBatchConstructor& TRecordBatchConstructor::InitColumns(const std::shared_ptr<arrow::Schema>& schema) {
+ Schema = schema;
+ Builders.clear();
+ Builders.reserve(Schema->num_fields());
+ for (auto&& f : Schema->fields()) {
+ std::unique_ptr<arrow::ArrayBuilder> arrayBuilder;
+ Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok());
+ Builders.emplace_back(std::move(arrayBuilder));
+ }
+ return *this;
+}
+
+NKikimr::NArrow::TRecordBatchReader TRecordBatchConstructor::Finish() {
+ Y_VERIFY(!InConstruction);
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(Builders.size());
+ for (auto&& i : Builders) {
+ arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish();
+ Y_VERIFY(aData.ok());
+ columns.emplace_back(aData.ValueUnsafe());
+ }
+ std::shared_ptr<arrow::RecordBatch> batch = arrow::RecordBatch::Make(Schema, RecordsCount, columns);
+#if !defined(NDEBUG)
+ auto statusValidation = batch->ValidateFull();
+ if (!statusValidation.ok()) {
+ Cerr << statusValidation.ToString() << "/" << statusValidation.message() << Endl;
+ Y_VERIFY(false);
+ }
+#endif
+
+ return TRecordBatchReader(batch);
+}
+
+void TRecordBatchReader::SerializeToStrings(TString& schema, TString& data) const {
+ Y_VERIFY(!!Batch);
+ schema = NArrow::SerializeSchema(*Batch->schema());
+ data = NArrow::SerializeBatchNoCompression(Batch);
+}
+
+bool TRecordBatchReader::DeserializeFromStrings(const TString& schemaString, const TString& dataString) {
+ std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(schemaString);
+ if (!schema) {
+ return false;
+ }
+ Batch = NArrow::DeserializeBatch(dataString, schema);
+ return true;
+}
+
TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec)
: WriteOptions(arrow::ipc::IpcWriteOptions::Defaults())
{
diff --git a/ydb/core/formats/arrow_batch_builder.h b/ydb/core/formats/arrow_batch_builder.h
index d52a94ed20c..ae15d175445 100644
--- a/ydb/core/formats/arrow_batch_builder.h
+++ b/ydb/core/formats/arrow_batch_builder.h
@@ -5,6 +5,142 @@
namespace NKikimr::NArrow {
+class TRecordBatchReader {
+private:
+ std::shared_ptr<arrow::RecordBatch> Batch;
+public:
+ TRecordBatchReader() = default;
+ TRecordBatchReader(const std::shared_ptr<arrow::RecordBatch>& batch)
+ : Batch(batch)
+ {
+ }
+ class TRecordIterator {
+ private:
+ friend class TRecordBatchReader;
+ ui32 RecordIdx = 0;
+ const TRecordBatchReader& Reader;
+ TRecordIterator(const ui32 recordIdx, const TRecordBatchReader& reader)
+ : RecordIdx(recordIdx)
+ , Reader(reader) {
+
+ }
+ public:
+ class TColumnIterator {
+ private:
+ friend class TRecordIterator;
+ ui32 ColumnIdx = 0;
+ const ui32 RecordIdx = 0;
+ const TRecordBatchReader& Reader;
+ TColumnIterator(const ui32 columnIdx, const ui32 recordIdx, const TRecordBatchReader& reader)
+ : ColumnIdx(columnIdx)
+ , RecordIdx(recordIdx)
+ , Reader(reader) {
+
+ }
+ public:
+ std::shared_ptr<arrow::Scalar> operator*() const {
+ auto c = Reader.Batch->column(ColumnIdx);
+ if (c->IsNull(RecordIdx)) {
+ return nullptr;
+ } else {
+ auto status = c->GetScalar(RecordIdx);
+ Y_VERIFY(status.ok());
+ return *status;
+ }
+ }
+ void operator++() {
+ ++ColumnIdx;
+ }
+ bool operator==(const TColumnIterator& value) const {
+ return RecordIdx == value.RecordIdx && ColumnIdx == value.ColumnIdx && Reader.Batch.get() == value.Reader.Batch.get();
+ }
+ };
+ TRecordIterator operator*() const {
+ return *this;
+ }
+ bool operator==(const TRecordIterator& value) const {
+ return RecordIdx == value.RecordIdx && Reader.Batch.get() == value.Reader.Batch.get();
+ }
+ void operator++() {
+ ++RecordIdx;
+ }
+ ui32 GetIndex() const {
+ return RecordIdx;
+ }
+ TColumnIterator begin() const {
+ return TColumnIterator(0, RecordIdx, Reader);
+ }
+ TColumnIterator end() const {
+ return TColumnIterator(Reader.Batch->schema()->num_fields(), RecordIdx, Reader);
+ }
+ };
+ ui32 GetColumnsCount() const {
+ return Batch ? Batch->num_columns() : 0;
+ }
+ ui32 GetRowsCount() const {
+ return Batch ? Batch->num_rows() : 0;
+ }
+ TRecordIterator begin() const {
+ return TRecordIterator(0, *this);
+ }
+ TRecordIterator end() const {
+ return TRecordIterator(GetRowsCount(), *this);
+ }
+ void SerializeToStrings(TString& schema, TString& data) const;
+ bool DeserializeFromStrings(const TString& schemaString, const TString& dataString);
+};
+
+class TRecordBatchConstructor {
+private:
+ ui32 RecordsCount = 0;
+ bool InConstruction = false;
+ static void AddValueToBuilder(arrow::ArrayBuilder& builder, const std::shared_ptr<arrow::Scalar>& value, const bool withCast);
+protected:
+ std::shared_ptr<arrow::Schema> Schema;
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders;
+public:
+ class TRecordConstructor {
+ private:
+ TRecordBatchConstructor& Owner;
+ const bool WithCast = false;
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>>::const_iterator CurrentBuilder;
+ public:
+ TRecordConstructor(TRecordBatchConstructor& owner, const bool withCast)
+ : Owner(owner)
+ , WithCast(withCast)
+ {
+ Y_VERIFY(!Owner.InConstruction);
+ CurrentBuilder = Owner.Builders.begin();
+ Owner.InConstruction = true;
+ }
+ ~TRecordConstructor() {
+ for (; CurrentBuilder != Owner.Builders.end(); ++CurrentBuilder) {
+ Y_VERIFY((*CurrentBuilder)->AppendNull().ok());
+ }
+ Owner.InConstruction = false;
+ ++Owner.RecordsCount;
+ }
+ TRecordConstructor& AddRecordValue(const std::shared_ptr<arrow::Scalar>& value);
+ };
+
+ TRecordBatchConstructor& InitColumns(const std::shared_ptr<arrow::Schema>& schema);
+
+ TRecordConstructor StartRecord(const bool withCast = false) {
+ Y_VERIFY(!InConstruction);
+ return TRecordConstructor(*this, withCast);
+ }
+
+ TRecordBatchConstructor& AddRecordsBatchSlow(const std::shared_ptr<arrow::RecordBatch>& value, const bool withCast = false, const bool withRemap = false);
+
+ void Reserve(const ui32 recordsCount) {
+ for (auto&& i : Builders) {
+ Y_VERIFY(i->Reserve(recordsCount).ok());
+ }
+ }
+
+ TRecordBatchReader Finish();
+};
+
/// YDB rows to arrow::RecordBatch converter
class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
public:
diff --git a/ydb/core/kqp/kqp_compute.cpp b/ydb/core/kqp/kqp_compute.cpp
index 82bf27a407c..88a2063e96e 100644
--- a/ydb/core/kqp/kqp_compute.cpp
+++ b/ydb/core/kqp/kqp_compute.cpp
@@ -18,31 +18,53 @@ NActors::IEventBase* TEvKqpCompute::TEvCostData::Load(TEventSerializedData* data
return result.Release();
}
-TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges() const {
+TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges(const ui32 splitFactor) const {
+ TVector<NOlap::NCosts::TKeyRanges::TMark> usefulMarks;
+ {
+ ui32 currentCount = (TableRanges.IsLeftBorderOpened() ? 1 : 0);
+ const ui32 realCount = TableRanges.GetMarksCount() + currentCount + 1;
+ const double countPerSection = 1.0 * realCount / splitFactor;
+ bool predSkipped = !TableRanges.IsLeftBorderOpened();
+ for (auto&& i : TableRanges.GetBatch()) {
+ auto& features = TableRanges.GetMarkFeatures(i.GetIndex());
+ ++currentCount;
+ if (currentCount >= countPerSection || predSkipped || features.GetIntervalSkipped()) {
+ currentCount = 0;
+ usefulMarks.emplace_back(i);
+ }
+ predSkipped = features.GetIntervalSkipped();
+ }
+ }
+
TVector<TSerializedTableRange> result;
TVector<TCell> borderPred;
- borderPred.resize(TableRanges.ColumnsCount(), TCell());
+ borderPred.resize(TableRanges.GetColumnsCount(), TCell());
bool predSkipped = !TableRanges.IsLeftBorderOpened();
bool predIncluded = false;
- for (auto&& i : TableRanges.GetRangeMarks()) {
+
+ for (auto&& i : usefulMarks) {
+ auto& features = TableRanges.GetMarkFeatures(i.GetIndex());
TVector<TCell> borderCurrent;
- for (auto&& value : i.GetMark()) {
+ for (auto&& value : i) {
+ if (!value) {
+ break;
+ }
borderCurrent.emplace_back(SerializeScalarToCell(value));
}
if (!predSkipped) {
- for (ui32 additional = borderPred.size(); additional < TableRanges.ColumnsCount(); ++additional) {
+ for (ui32 additional = borderPred.size(); additional < TableRanges.GetColumnsCount(); ++additional) {
borderPred.emplace_back(TCell());
}
- if (!i.GetMarkIncluded()) {
- for (ui32 additional = borderCurrent.size(); additional < TableRanges.ColumnsCount(); ++additional) {
+ if (!features.GetMarkIncluded()) {
+ for (ui32 additional = borderCurrent.size(); additional < TableRanges.GetColumnsCount(); ++additional) {
borderCurrent.emplace_back(TCell());
}
}
- TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, i.GetMarkIncluded());
+ TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, features.GetMarkIncluded());
result.emplace_back(std::move(serializedRange));
}
- predSkipped = i.GetIntervalSkipped();
- predIncluded = i.GetMarkIncluded();
+ predSkipped = features.GetIntervalSkipped();
+ predIncluded = features.GetMarkIncluded();
std::swap(borderCurrent, borderPred);
}
if (predSkipped) {
diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h
index 52d374b3e50..998f3502c3f 100644
--- a/ydb/core/kqp/kqp_compute.h
+++ b/ydb/core/kqp/kqp_compute.h
@@ -157,7 +157,7 @@ struct TEvKqpCompute {
const NOlap::NCosts::TKeyRanges& GetTableRanges() const {
return TableRanges;
}
- TVector<TSerializedTableRange> GetSerializedTableRanges() const;
+ TVector<TSerializedTableRange> GetSerializedTableRanges(const ui32 splitFactor = 1) const;
virtual bool IsSerializable() const override {
return true;
diff --git a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
index cec70f8353b..5c9d2a8707d 100644
--- a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
@@ -64,6 +64,10 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct
.SetKqpSettings({})
.SetEnableKqpScanQueryStreamLookup(false);
TKikimrRunner kikimr{kikimrSettings};
+// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
+// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
+// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG);
+// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
CreateSampleTables(kikimr);
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp
index d0300f2f758..4405a9695d2 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.cpp
+++ b/ydb/core/tx/columnshard/columnshard__costs.cpp
@@ -8,216 +8,78 @@
namespace NKikimr::NOlap::NCosts {
-TKeyMark TCostsOperator::BuildMarkFromGranule(const TGranuleRecord& record) const {
- TKeyMark result;
- result.AddValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey)));
- return result;
+void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) {
+ Constructor.StartRecord(true).AddRecordValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey)));
+ Features.emplace_back(TMarkRangeFeatures());
}
-TRangeMark TCostsOperator::BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const {
+bool TKeyRangesBuilder::AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) {
if (!p) {
- return TRangeMark();
- }
- TRangeMark result;
- for (i32 i = 0; i < p->Batch->num_columns(); ++i) {
- auto column = p->Batch->column(i);
- Y_VERIFY(!!column && column->length() == 1);
- Y_VERIFY(!column->IsNull(0));
- auto status = column->GetScalar(0);
- Y_VERIFY(status.ok());
- result.MutableMark().AddValue(status.ValueUnsafe());
+ return false;
}
- result.SetMarkIncluded(p->Inclusive);
- return result;
+ Y_VERIFY(p->Batch->num_rows() == 1);
+ Constructor.AddRecordsBatchSlow(p->Batch);
+ Features.emplace_back(TMarkRangeFeatures().SetMarkIncluded(p->Inclusive));
+ return true;
}
-void TCostsOperator::FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const {
+void TKeyRangesBuilder::FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>& left, const TVector<TGranuleRecord>& granuleRecords,
+ const std::shared_ptr<NOlap::TPredicate>& right) {
LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Request from " << (left ? left->Batch->ToString() : "-Inf") <<
" to " << (right ? right->Batch->ToString() : "+Inf"));
- result.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK()));
- {
- TRangeMark leftBorder = BuildMarkFromPredicate(left);
- if (!leftBorder.GetMark().ColumnsCount()) {
- Y_VERIFY(!result.IsLeftBorderOpened());
- result.SetLeftBorderOpened(true);
- } else {
- Y_VERIFY(result.AddRangeIfNotLess(std::move(leftBorder)));
- }
+ if (!AddMarkFromPredicate(left)) {
+ Y_VERIFY(!LeftBorderOpened);
+ LeftBorderOpened = true;
}
- for (auto&& i : GranuleRecords) {
- auto granuleMark = BuildMarkFromGranule(i);
- Y_VERIFY(result.AddRangeIfGrow(TRangeMark(std::move(granuleMark))));
+ for (auto&& i : granuleRecords) {
+ AddMarkFromGranule(i);
}
- {
- TRangeMark rightBorder = BuildMarkFromPredicate(right);
- if (rightBorder.GetMark().ColumnsCount()) {
- Y_VERIFY(result.AddRangeIfNotLess(std::move(rightBorder)));
- result.MutableRangeMarks().back().SetIntervalSkipped(true);
- }
+ if (AddMarkFromPredicate(right)) {
+ Features.back().SetIntervalSkipped(true);
}
}
-ui64 TCostsOperator::ExtractKey(const TString& key) {
+ui64 TKeyRangesBuilder::ExtractKey(const TString& key) {
Y_VERIFY(key.size() == 8);
return *reinterpret_cast<const ui64*>(key.data());
}
-bool TRangeMark::operator<(const TRangeMark& item) const {
- return Mark < item.Mark;
-}
-
-TString TRangeMark::ToString() const {
- TStringBuilder sb;
- sb << "MARK=" << Mark.ToString() << ";SKIP=" << IntervalSkipped << ";";
- return sb;
-}
-
NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const {
NKikimrKqp::TEvRemoteCostData::TCostInfo result;
result.SetLeftBorderOpened(LeftBorderOpened);
- for (auto&& i : RangeMarks) {
- *result.AddIntervalMeta() = i.SerializeMetaToProto();
+ for (auto&& i : RangeMarkFeatures) {
+ *result.AddIntervalMeta() = i.SerializeToProto();
}
-
- std::shared_ptr<arrow::Schema> schema = KeyColumns;
- *result.MutableColumnsSchema() = NArrow::SerializeSchema(*schema);
- if (RangeMarks.empty()) {
- return result;
- }
-
- TVector<std::unique_ptr<arrow::ArrayBuilder>> arrayBuilders;
- for (auto&& f : KeyColumns->fields()) {
- std::unique_ptr<arrow::ArrayBuilder> arrayBuilder;
- Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok());
- arrayBuilders.emplace_back(std::move(arrayBuilder));
- Y_VERIFY(arrayBuilders.back()->Reserve(RangeMarks.size()).ok());
- }
- for (auto&& i : RangeMarks) {
- Y_VERIFY(i.GetMark().ColumnsCount() <= ColumnsCount());
- auto itBuilder = arrayBuilders.begin();
- for (auto&& scalar : i.GetMark()) {
- Y_VERIFY(!!scalar);
- auto correctScalar = scalar->CastTo((*itBuilder)->type());
- Y_VERIFY(correctScalar.ok());
- Y_VERIFY((*itBuilder)->AppendScalar(**correctScalar).ok());
- ++itBuilder;
- }
- for (; itBuilder != arrayBuilders.end(); ++itBuilder) {
- Y_VERIFY((*itBuilder)->AppendNull().ok());
- }
- }
- TVector<std::shared_ptr<arrow::Array>> arrays;
- for (auto&& i : arrayBuilders) {
- arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish();
- Y_VERIFY(aData.ok());
- arrays.emplace_back(aData.ValueUnsafe());
- }
- std::shared_ptr<arrow::RecordBatch> rb = arrow::RecordBatch::Make(schema, RangeMarks.size(), arrays);
- *result.MutableColumnsData() = NArrow::SerializeBatchNoCompression(rb);
+ BatchReader.SerializeToStrings(*result.MutableColumnsSchema(), *result.MutableColumnsData());
return result;
}
bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto) {
- std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(proto.GetColumnsSchema());
- Y_VERIFY(schema);
+ Y_VERIFY(BatchReader.DeserializeFromStrings(proto.GetColumnsSchema(), proto.GetColumnsData()));
LeftBorderOpened = proto.GetLeftBorderOpened();
- KeyColumns = schema;
-
- TVector<TRangeMark> resultLocal;
- if (!!proto.GetColumnsData()) {
- std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(proto.GetColumnsData(), schema);
- Y_VERIFY(batch->num_columns() == (int)ColumnsCount());
- resultLocal.reserve(batch->num_rows());
- auto& batchColumns = batch->columns();
- for (ui32 rowIdx = 0; rowIdx < batch->num_rows(); ++rowIdx) {
- TKeyMark mark;
- for (auto&& c: batchColumns) {
- if (c->IsNull(rowIdx)) {
- break;
- }
- auto valueStatus = c->GetScalar(rowIdx);
- Y_VERIFY(valueStatus.ok());
- mark.AddValue(valueStatus.ValueUnsafe());
- }
- resultLocal.emplace_back(TRangeMark(std::move(mark)));
- }
- }
- Y_VERIFY(resultLocal.size() == (size_t)proto.GetIntervalMeta().size());
- ui32 idx = 0;
+ Y_VERIFY(BatchReader.GetRowsCount() == (size_t)proto.GetIntervalMeta().size());
+ RangeMarkFeatures.reserve(proto.GetIntervalMeta().size());
for (auto&& i : proto.GetIntervalMeta()) {
- if (!resultLocal[idx++].DeserializeMetaFromProto(i)) {
- return false;
- }
+ TMarkRangeFeatures features;
+ Y_VERIFY(features.DeserializeFromProto(i));
+ RangeMarkFeatures.emplace_back(std::move(features));
}
- std::swap(RangeMarks, resultLocal);
return true;
}
-bool TKeyRanges::AddRangeIfGrow(TRangeMark&& range) {
- Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount());
- if (RangeMarks.empty() || RangeMarks.back() < range) {
- RangeMarks.emplace_back(std::move(range));
- return true;
- }
- return false;
-}
-
-bool TKeyRanges::AddRangeIfNotLess(TRangeMark&& range) {
- Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount());
- if (RangeMarks.empty() || RangeMarks.back() < range || !(range < RangeMarks.back())) {
- RangeMarks.emplace_back(std::move(range));
- return true;
- }
- return RangeMarks.back() < range;
+TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo)
+ : IndexInfo(indexInfo)
+{
+ Constructor.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK()));
}
-TString TKeyRanges::ToString() const {
- TStringBuilder sb;
- sb << "COLUMNS: " << KeyColumns->ToString() << Endl;
- sb << Endl << "DATA: " << Endl;
- for (auto&& i : RangeMarks) {
- sb << i.ToString() << ";";
- }
- return sb;
-}
-
-TKeyRanges::TKeyRanges() {
- KeyColumns = arrow::SchemaBuilder().Finish().ValueUnsafe();
-}
-
-TString TKeyMark::ToString() const {
- TStringBuilder sb;
- for (auto&& i : Values) {
- sb << (i ? i->ToString() : "NO_VALUE") << ";";
- }
- return sb;
-}
-
-bool TKeyMark::operator<(const TKeyMark& item) const {
- auto itSelf = Values.begin();
- auto itItem = item.Values.begin();
- for (; itSelf != Values.end() && itItem != item.Values.end(); ++itSelf, ++itItem) {
- if (NArrow::ScalarLess(*itSelf, *itItem)) {
- return true;
- } else if (NArrow::ScalarLess(*itItem, *itSelf)) {
- return false;
- }
- }
- if (itSelf == Values.end()) {
- if (itItem == item.Values.end()) {
- return false;
- } else {
- return true;
- }
- } else {
- if (itItem == item.Values.end()) {
- return true;
- } else {
- Y_VERIFY(false, "impossible");
- }
- }
- return false;
+NKikimr::NOlap::NCosts::TKeyRanges TKeyRangesBuilder::Build() {
+ TKeyRanges result;
+ result.LeftBorderOpened = LeftBorderOpened;
+ result.RangeMarkFeatures = std::move(Features);
+ result.BatchReader = Constructor.Finish();
+ return result;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h
index 48c629f46f5..6a11f3187c5 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.h
+++ b/ydb/core/tx/columnshard/columnshard__costs.h
@@ -1,4 +1,5 @@
#pragma once
+#include <ydb/core/formats/arrow_batch_builder.h>
#include <ydb/core/formats/arrow_helpers.h>
#include <ydb/core/protos/kqp.pb.h>
@@ -11,56 +12,16 @@ namespace NKikimr::NOlap {
namespace NKikimr::NOlap::NCosts {
-class TKeyMark {
+class TMarkRangeFeatures {
private:
- TVector<std::shared_ptr<arrow::Scalar>> Values;
-public:
- TKeyMark() = default;
-
- TString ToString() const;
-
- TVector<std::shared_ptr<arrow::Scalar>>::const_iterator begin() const {
- return Values.begin();
- }
- TVector<std::shared_ptr<arrow::Scalar>>::const_iterator end() const {
- return Values.end();
- }
- TKeyMark& AddValue(std::shared_ptr<arrow::Scalar> value) {
- Values.emplace_back(value);
- return *this;
- }
- ui32 ColumnsCount() const {
- return Values.size();
- }
- bool operator<(const TKeyMark& item) const;
-};
-
-class TRangeMark {
-private:
- TKeyMark Mark;
bool IntervalSkipped = false;
bool MarkIncluded = true;
public:
- TRangeMark() = default;
- TRangeMark(TKeyMark&& mark)
- : Mark(std::move(mark))
- {
-
- }
+ TMarkRangeFeatures() = default;
TString ToString() const;
- const TKeyMark& GetMark() const {
- return Mark;
- }
-
- TKeyMark& MutableMark() {
- return Mark;
- }
-
- bool operator<(const TRangeMark& item) const;
-
- TRangeMark& SetIntervalSkipped(const bool value) {
+ TMarkRangeFeatures& SetIntervalSkipped(const bool value) {
IntervalSkipped = value;
return *this;
}
@@ -69,7 +30,7 @@ public:
return IntervalSkipped;
}
- TRangeMark& SetMarkIncluded(const bool value) {
+ TMarkRangeFeatures& SetMarkIncluded(const bool value) {
MarkIncluded = value;
return *this;
}
@@ -78,14 +39,14 @@ public:
return MarkIncluded;
}
- NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeMetaToProto() const {
+ NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeToProto() const {
NKikimrKqp::TEvRemoteCostData::TIntervalMeta result;
result.SetIntervalSkipped(IntervalSkipped);
result.SetMarkIncluded(MarkIncluded);
return result;
}
- bool DeserializeMetaFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) {
+ bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) {
SetIntervalSkipped(proto.GetIntervalSkipped());
SetMarkIncluded(proto.GetMarkIncluded());
return true;
@@ -94,64 +55,58 @@ public:
class TKeyRanges {
private:
- std::shared_ptr<arrow::Schema> KeyColumns;
- std::vector<TRangeMark> RangeMarks;
+ friend class TKeyRangesBuilder;
+ NArrow::TRecordBatchReader BatchReader;
+ std::vector<TMarkRangeFeatures> RangeMarkFeatures;
bool LeftBorderOpened = false;
public:
- TKeyRanges();
+ using TMark = NArrow::TRecordBatchReader::TRecordIterator;
- TKeyRanges& Reserve(const ui32 num) {
- RangeMarks.reserve(num);
- return *this;
+ const TMarkRangeFeatures& GetMarkFeatures(const ui32 index) const {
+ Y_VERIFY(index < RangeMarkFeatures.size());
+ return RangeMarkFeatures[index];
}
TString ToString() const;
-
- size_t ColumnsCount() const {
- return KeyColumns ? KeyColumns->num_fields() : 0;
- }
- const arrow::Schema& GetKeyColumns() const {
- return *KeyColumns;
- }
- const std::vector<TRangeMark>& GetRangeMarks() const {
- return RangeMarks;
+ ui32 GetMarksCount() const {
+ return BatchReader.GetRowsCount();
}
- std::vector<TRangeMark>& MutableRangeMarks() {
- return RangeMarks;
+ size_t GetColumnsCount() const {
+ return BatchReader.GetColumnsCount();
}
- TKeyRanges& InitColumns(std::shared_ptr<arrow::Schema> schema) {
- KeyColumns = schema;
- return *this;
+ size_t ColumnsCount() const {
+ return BatchReader.GetColumnsCount();
}
- TKeyRanges& SetLeftBorderOpened(const bool value) {
- LeftBorderOpened = value;
- return *this;
+ const NArrow::TRecordBatchReader& GetBatch() const {
+ return BatchReader;
}
bool IsLeftBorderOpened() const {
return LeftBorderOpened;
}
- bool AddRangeIfGrow(TRangeMark&& range);
- bool AddRangeIfNotLess(TRangeMark&& range);
NKikimrKqp::TEvRemoteCostData::TCostInfo SerializeToProto() const;
bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto);
};
-class TCostsOperator {
+class TKeyRangesBuilder {
private:
- const TVector<TGranuleRecord>& GranuleRecords;
+ bool LeftBorderOpened = false;
+ NArrow::TRecordBatchConstructor Constructor;
+ TVector<TMarkRangeFeatures> Features;
+
const TIndexInfo& IndexInfo;
- TRangeMark BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const;
- TKeyMark BuildMarkFromGranule(const TGranuleRecord& record) const;
+ bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p);
+ void AddMarkFromGranule(const TGranuleRecord& record);
static ui64 ExtractKey(const TString& key);
public:
- TCostsOperator(const TVector<TGranuleRecord>& granuleRecords, const TIndexInfo& indexInfo)
- : GranuleRecords(granuleRecords)
- , IndexInfo(indexInfo)
- {
-
+ TKeyRangesBuilder(const TIndexInfo& indexInfo);
+ void Reserve(const ui32 num) {
+ Constructor.Reserve(num);
}
- void FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const;
+ void FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>& left, const TVector<TGranuleRecord>& granuleRecords,
+ const std::shared_ptr<NOlap::TPredicate>& right);
+
+ TKeyRanges Build();
};
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 7ab2a7d7ae6..01a26bc26f6 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -720,17 +720,14 @@ void TTxScan::Complete(const TActorContext& ctx) {
TStringStream detailedInfo;
if (IS_LOG_PRIORITY_ENABLED(ctx, NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
- detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")"
- << " req: " << request;
+ detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request;
}
TVector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges;
if (request.GetCostDataOnly()) {
for (auto&& i : ReadMetadataRanges) {
NOlap::TReadMetadata::TConstPtr rMetadata = std::dynamic_pointer_cast<const NOlap::TReadMetadata>(i);
if (!rMetadata || !rMetadata->SelectInfo) {
- NOlap::NCosts::TKeyRanges ranges;
- ranges.SetLeftBorderOpened(true);
- auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId);
+ auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(NOlap::NCosts::TKeyRanges(), scanId);
ctx.Send(scanComputeActor, ev.Release());
return;
}
@@ -762,23 +759,21 @@ void TTxScan::Complete(const TActorContext& ctx) {
}
if (request.GetCostDataOnly()) {
- NOlap::NCosts::TKeyRanges ranges;
if (request.GetReverse()) {
std::reverse(rMetadataRanges.begin(), rMetadataRanges.end());
}
+ NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo());
{
ui32 recordsCount = 0;
for (auto&& i : rMetadataRanges) {
recordsCount += i->SelectInfo->Granules.size() + 2;
}
- ranges.Reserve(recordsCount);
+ krBuilder.Reserve(recordsCount);
}
for (auto&& i : rMetadataRanges) {
- NOlap::NCosts::TCostsOperator cOperator(i->SelectInfo->Granules, i->IndexInfo);
- cOperator.FillRangeMarks(ranges, i->GreaterPredicate, i->LessPredicate);
+ krBuilder.FillRangeMarks(i->GreaterPredicate, i->SelectInfo->Granules, i->LessPredicate);
}
- LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Result " << ranges.ToString());
- auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId);
+ auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(krBuilder.Build(), scanId);
ctx.Send(scanComputeActor, ev.Release());
return;
}