diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-16 17:44:12 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-16 17:44:12 +0300 |
commit | afdecb0e4f540b92a669eca4ff54e6b2ca899830 (patch) | |
tree | e83b23a20d602bd3224b8c330410b93b126c95b1 | |
parent | 1f8614d61e3c3105e37ca88d36d15b675d3e010e (diff) | |
download | ydb-afdecb0e4f540b92a669eca4ff54e6b2ca899830.tar.gz |
batch using directly
-rw-r--r-- | ydb/core/formats/arrow_batch_builder.cpp | 117 | ||||
-rw-r--r-- | ydb/core/formats/arrow_batch_builder.h | 136 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compute.cpp | 42 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compute.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__costs.cpp | 214 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__costs.h | 117 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 17 |
8 files changed, 370 insertions, 279 deletions
diff --git a/ydb/core/formats/arrow_batch_builder.cpp b/ydb/core/formats/arrow_batch_builder.cpp index 8ecf97b9c78..ee74796711a 100644 --- a/ydb/core/formats/arrow_batch_builder.cpp +++ b/ydb/core/formats/arrow_batch_builder.cpp @@ -68,6 +68,123 @@ arrow::Status AppendCell(arrow::RecordBatchBuilder& builder, const TCell& cell, } +NKikimr::NArrow::TRecordBatchConstructor::TRecordConstructor& TRecordBatchConstructor::TRecordConstructor::AddRecordValue( + const std::shared_ptr<arrow::Scalar>& value) +{ + Y_VERIFY(CurrentBuilder != Owner.Builders.end()); + AddValueToBuilder(**CurrentBuilder, value, WithCast); + ++CurrentBuilder; + return *this; +} + +void TRecordBatchConstructor::AddValueToBuilder(arrow::ArrayBuilder& builder, + const std::shared_ptr<arrow::Scalar>& value, const bool withCast) { + if (!value) { + Y_VERIFY(builder.AppendNull().ok()); + } else if (!withCast) { + Y_VERIFY(builder.AppendScalar(*value).ok()); + } else { + auto castStatus = value->CastTo(builder.type()); + Y_VERIFY(castStatus.ok()); + Y_VERIFY(builder.AppendScalar(*castStatus.ValueUnsafe()).ok()); + } +} + +NKikimr::NArrow::TRecordBatchConstructor& TRecordBatchConstructor::AddRecordsBatchSlow( + const std::shared_ptr<arrow::RecordBatch>& value, const bool withCast /*= false*/, const bool withRemap /*= false*/) +{ + Y_VERIFY(!!value); + Y_VERIFY(!!Schema); + Y_VERIFY(!InConstruction); + std::vector<std::shared_ptr<arrow::Array>> batchColumns; + if (withRemap) { + for (auto&& f : Schema->fields()) { + batchColumns.emplace_back(value->GetColumnByName(f->name())); + } + } else { + Y_VERIFY(value->num_columns() <= Schema->num_fields()); + for (auto&& i : value->columns()) { + batchColumns.emplace_back(i); + } + for (i32 i = value->num_columns(); i < Schema->num_fields(); ++i) { + batchColumns.emplace_back(nullptr); + } + } + Y_VERIFY((int)batchColumns.size() == Schema->num_fields()); + Y_VERIFY((int)Builders.size() == Schema->num_fields()); + ui32 cIdx = 0; + std::vector<std::unique_ptr<arrow::ArrayBuilder>>::const_iterator currentBuilder = Builders.begin(); + for (auto&& c : batchColumns) { + if (!c) { + Y_VERIFY((*currentBuilder)->AppendNulls(value->num_rows()).ok()); + } else { + for (ui32 r = 0; r < value->num_rows(); ++r) { + std::shared_ptr<arrow::Scalar> value; + if (c->IsNull(r)) { + value = nullptr; + } else { + auto statusGet = c->GetScalar(r); + Y_VERIFY(statusGet.ok()); + value = statusGet.ValueUnsafe(); + } + AddValueToBuilder(**currentBuilder, value, withCast); + } + } + ++currentBuilder; + ++cIdx; + } + RecordsCount += value->num_rows(); + return *this; +} + +NKikimr::NArrow::TRecordBatchConstructor& TRecordBatchConstructor::InitColumns(const std::shared_ptr<arrow::Schema>& schema) { + Schema = schema; + Builders.clear(); + Builders.reserve(Schema->num_fields()); + for (auto&& f : Schema->fields()) { + std::unique_ptr<arrow::ArrayBuilder> arrayBuilder; + Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok()); + Builders.emplace_back(std::move(arrayBuilder)); + } + return *this; +} + +NKikimr::NArrow::TRecordBatchReader TRecordBatchConstructor::Finish() { + Y_VERIFY(!InConstruction); + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(Builders.size()); + for (auto&& i : Builders) { + arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish(); + Y_VERIFY(aData.ok()); + columns.emplace_back(aData.ValueUnsafe()); + } + std::shared_ptr<arrow::RecordBatch> batch = arrow::RecordBatch::Make(Schema, RecordsCount, columns); +#if !defined(NDEBUG) + auto statusValidation = batch->ValidateFull(); + if (!statusValidation.ok()) { + Cerr << statusValidation.ToString() << "/" << statusValidation.message() << Endl; + Y_VERIFY(false); + } +#endif + + return TRecordBatchReader(batch); +} + +void TRecordBatchReader::SerializeToStrings(TString& schema, TString& data) const { + Y_VERIFY(!!Batch); + schema = NArrow::SerializeSchema(*Batch->schema()); + data = NArrow::SerializeBatchNoCompression(Batch); +} + +bool TRecordBatchReader::DeserializeFromStrings(const TString& schemaString, const TString& dataString) { + std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(schemaString); + if (!schema) { + return false; + } + Batch = NArrow::DeserializeBatch(dataString, schema); + return true; +} + TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec) : WriteOptions(arrow::ipc::IpcWriteOptions::Defaults()) { diff --git a/ydb/core/formats/arrow_batch_builder.h b/ydb/core/formats/arrow_batch_builder.h index d52a94ed20c..ae15d175445 100644 --- a/ydb/core/formats/arrow_batch_builder.h +++ b/ydb/core/formats/arrow_batch_builder.h @@ -5,6 +5,142 @@ namespace NKikimr::NArrow { +class TRecordBatchReader { +private: + std::shared_ptr<arrow::RecordBatch> Batch; +public: + TRecordBatchReader() = default; + TRecordBatchReader(const std::shared_ptr<arrow::RecordBatch>& batch) + : Batch(batch) + { + } + class TRecordIterator { + private: + friend class TRecordBatchReader; + ui32 RecordIdx = 0; + const TRecordBatchReader& Reader; + TRecordIterator(const ui32 recordIdx, const TRecordBatchReader& reader) + : RecordIdx(recordIdx) + , Reader(reader) { + + } + public: + class TColumnIterator { + private: + friend class TRecordIterator; + ui32 ColumnIdx = 0; + const ui32 RecordIdx = 0; + const TRecordBatchReader& Reader; + TColumnIterator(const ui32 columnIdx, const ui32 recordIdx, const TRecordBatchReader& reader) + : ColumnIdx(columnIdx) + , RecordIdx(recordIdx) + , Reader(reader) { + + } + public: + std::shared_ptr<arrow::Scalar> operator*() const { + auto c = Reader.Batch->column(ColumnIdx); + if (c->IsNull(RecordIdx)) { + return nullptr; + } else { + auto status = c->GetScalar(RecordIdx); + Y_VERIFY(status.ok()); + return *status; + } + } + void operator++() { + ++ColumnIdx; + } + bool operator==(const TColumnIterator& value) const { + return RecordIdx == value.RecordIdx && ColumnIdx == value.ColumnIdx && Reader.Batch.get() == value.Reader.Batch.get(); + } + }; + TRecordIterator operator*() const { + return *this; + } + bool operator==(const TRecordIterator& value) const { + return RecordIdx == value.RecordIdx && Reader.Batch.get() == value.Reader.Batch.get(); + } + void operator++() { + ++RecordIdx; + } + ui32 GetIndex() const { + return RecordIdx; + } + TColumnIterator begin() const { + return TColumnIterator(0, RecordIdx, Reader); + } + TColumnIterator end() const { + return TColumnIterator(Reader.Batch->schema()->num_fields(), RecordIdx, Reader); + } + }; + ui32 GetColumnsCount() const { + return Batch ? Batch->num_columns() : 0; + } + ui32 GetRowsCount() const { + return Batch ? Batch->num_rows() : 0; + } + TRecordIterator begin() const { + return TRecordIterator(0, *this); + } + TRecordIterator end() const { + return TRecordIterator(GetRowsCount(), *this); + } + void SerializeToStrings(TString& schema, TString& data) const; + bool DeserializeFromStrings(const TString& schemaString, const TString& dataString); +}; + +class TRecordBatchConstructor { +private: + ui32 RecordsCount = 0; + bool InConstruction = false; + static void AddValueToBuilder(arrow::ArrayBuilder& builder, const std::shared_ptr<arrow::Scalar>& value, const bool withCast); +protected: + std::shared_ptr<arrow::Schema> Schema; + std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; +public: + class TRecordConstructor { + private: + TRecordBatchConstructor& Owner; + const bool WithCast = false; + std::vector<std::unique_ptr<arrow::ArrayBuilder>>::const_iterator CurrentBuilder; + public: + TRecordConstructor(TRecordBatchConstructor& owner, const bool withCast) + : Owner(owner) + , WithCast(withCast) + { + Y_VERIFY(!Owner.InConstruction); + CurrentBuilder = Owner.Builders.begin(); + Owner.InConstruction = true; + } + ~TRecordConstructor() { + for (; CurrentBuilder != Owner.Builders.end(); ++CurrentBuilder) { + Y_VERIFY((*CurrentBuilder)->AppendNull().ok()); + } + Owner.InConstruction = false; + ++Owner.RecordsCount; + } + TRecordConstructor& AddRecordValue(const std::shared_ptr<arrow::Scalar>& value); + }; + + TRecordBatchConstructor& InitColumns(const std::shared_ptr<arrow::Schema>& schema); + + TRecordConstructor StartRecord(const bool withCast = false) { + Y_VERIFY(!InConstruction); + return TRecordConstructor(*this, withCast); + } + + TRecordBatchConstructor& AddRecordsBatchSlow(const std::shared_ptr<arrow::RecordBatch>& value, const bool withCast = false, const bool withRemap = false); + + void Reserve(const ui32 recordsCount) { + for (auto&& i : Builders) { + Y_VERIFY(i->Reserve(recordsCount).ok()); + } + } + + TRecordBatchReader Finish(); +}; + /// YDB rows to arrow::RecordBatch converter class TArrowBatchBuilder : public NKikimr::IBlockBuilder { public: diff --git a/ydb/core/kqp/kqp_compute.cpp b/ydb/core/kqp/kqp_compute.cpp index 82bf27a407c..88a2063e96e 100644 --- a/ydb/core/kqp/kqp_compute.cpp +++ b/ydb/core/kqp/kqp_compute.cpp @@ -18,31 +18,53 @@ NActors::IEventBase* TEvKqpCompute::TEvCostData::Load(TEventSerializedData* data return result.Release(); } -TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges() const { +TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges(const ui32 splitFactor) const { + TVector<NOlap::NCosts::TKeyRanges::TMark> usefulMarks; + { + ui32 currentCount = (TableRanges.IsLeftBorderOpened() ? 1 : 0); + const ui32 realCount = TableRanges.GetMarksCount() + currentCount + 1; + const double countPerSection = 1.0 * realCount / splitFactor; + bool predSkipped = !TableRanges.IsLeftBorderOpened(); + for (auto&& i : TableRanges.GetBatch()) { + auto& features = TableRanges.GetMarkFeatures(i.GetIndex()); + ++currentCount; + if (currentCount >= countPerSection || predSkipped || features.GetIntervalSkipped()) { + currentCount = 0; + usefulMarks.emplace_back(i); + } + predSkipped = features.GetIntervalSkipped(); + } + } + TVector<TSerializedTableRange> result; TVector<TCell> borderPred; - borderPred.resize(TableRanges.ColumnsCount(), TCell()); + borderPred.resize(TableRanges.GetColumnsCount(), TCell()); bool predSkipped = !TableRanges.IsLeftBorderOpened(); bool predIncluded = false; - for (auto&& i : TableRanges.GetRangeMarks()) { + + for (auto&& i : usefulMarks) { + auto& features = TableRanges.GetMarkFeatures(i.GetIndex()); TVector<TCell> borderCurrent; - for (auto&& value : i.GetMark()) { + for (auto&& value : i) { + if (!value) { + break; + } borderCurrent.emplace_back(SerializeScalarToCell(value)); } if (!predSkipped) { - for (ui32 additional = borderPred.size(); additional < TableRanges.ColumnsCount(); ++additional) { + for (ui32 additional = borderPred.size(); additional < TableRanges.GetColumnsCount(); ++additional) { borderPred.emplace_back(TCell()); } - if (!i.GetMarkIncluded()) { - for (ui32 additional = borderCurrent.size(); additional < TableRanges.ColumnsCount(); ++additional) { + if (!features.GetMarkIncluded()) { + for (ui32 additional = borderCurrent.size(); additional < TableRanges.GetColumnsCount(); ++additional) { borderCurrent.emplace_back(TCell()); } } - TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, i.GetMarkIncluded()); + TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, features.GetMarkIncluded()); result.emplace_back(std::move(serializedRange)); } - predSkipped = i.GetIntervalSkipped(); - predIncluded = i.GetMarkIncluded(); + predSkipped = features.GetIntervalSkipped(); + predIncluded = features.GetMarkIncluded(); std::swap(borderCurrent, borderPred); } if (predSkipped) { diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h index 52d374b3e50..998f3502c3f 100644 --- a/ydb/core/kqp/kqp_compute.h +++ b/ydb/core/kqp/kqp_compute.h @@ -157,7 +157,7 @@ struct TEvKqpCompute { const NOlap::NCosts::TKeyRanges& GetTableRanges() const { return TableRanges; } - TVector<TSerializedTableRange> GetSerializedTableRanges() const; + TVector<TSerializedTableRange> GetSerializedTableRanges(const ui32 splitFactor = 1) const; virtual bool IsSerializable() const override { return true; diff --git a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp index cec70f8353b..5c9d2a8707d 100644 --- a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp @@ -64,6 +64,10 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct .SetKqpSettings({}) .SetEnableKqpScanQueryStreamLookup(false); TKikimrRunner kikimr{kikimrSettings}; +// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); +// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); +// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG); +// kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); CreateSampleTables(kikimr); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp index d0300f2f758..4405a9695d2 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.cpp +++ b/ydb/core/tx/columnshard/columnshard__costs.cpp @@ -8,216 +8,78 @@ namespace NKikimr::NOlap::NCosts { -TKeyMark TCostsOperator::BuildMarkFromGranule(const TGranuleRecord& record) const { - TKeyMark result; - result.AddValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey))); - return result; +void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) { + Constructor.StartRecord(true).AddRecordValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey))); + Features.emplace_back(TMarkRangeFeatures()); } -TRangeMark TCostsOperator::BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const { +bool TKeyRangesBuilder::AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) { if (!p) { - return TRangeMark(); - } - TRangeMark result; - for (i32 i = 0; i < p->Batch->num_columns(); ++i) { - auto column = p->Batch->column(i); - Y_VERIFY(!!column && column->length() == 1); - Y_VERIFY(!column->IsNull(0)); - auto status = column->GetScalar(0); - Y_VERIFY(status.ok()); - result.MutableMark().AddValue(status.ValueUnsafe()); + return false; } - result.SetMarkIncluded(p->Inclusive); - return result; + Y_VERIFY(p->Batch->num_rows() == 1); + Constructor.AddRecordsBatchSlow(p->Batch); + Features.emplace_back(TMarkRangeFeatures().SetMarkIncluded(p->Inclusive)); + return true; } -void TCostsOperator::FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const { +void TKeyRangesBuilder::FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>& left, const TVector<TGranuleRecord>& granuleRecords, + const std::shared_ptr<NOlap::TPredicate>& right) { LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Request from " << (left ? left->Batch->ToString() : "-Inf") << " to " << (right ? right->Batch->ToString() : "+Inf")); - result.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK())); - { - TRangeMark leftBorder = BuildMarkFromPredicate(left); - if (!leftBorder.GetMark().ColumnsCount()) { - Y_VERIFY(!result.IsLeftBorderOpened()); - result.SetLeftBorderOpened(true); - } else { - Y_VERIFY(result.AddRangeIfNotLess(std::move(leftBorder))); - } + if (!AddMarkFromPredicate(left)) { + Y_VERIFY(!LeftBorderOpened); + LeftBorderOpened = true; } - for (auto&& i : GranuleRecords) { - auto granuleMark = BuildMarkFromGranule(i); - Y_VERIFY(result.AddRangeIfGrow(TRangeMark(std::move(granuleMark)))); + for (auto&& i : granuleRecords) { + AddMarkFromGranule(i); } - { - TRangeMark rightBorder = BuildMarkFromPredicate(right); - if (rightBorder.GetMark().ColumnsCount()) { - Y_VERIFY(result.AddRangeIfNotLess(std::move(rightBorder))); - result.MutableRangeMarks().back().SetIntervalSkipped(true); - } + if (AddMarkFromPredicate(right)) { + Features.back().SetIntervalSkipped(true); } } -ui64 TCostsOperator::ExtractKey(const TString& key) { +ui64 TKeyRangesBuilder::ExtractKey(const TString& key) { Y_VERIFY(key.size() == 8); return *reinterpret_cast<const ui64*>(key.data()); } -bool TRangeMark::operator<(const TRangeMark& item) const { - return Mark < item.Mark; -} - -TString TRangeMark::ToString() const { - TStringBuilder sb; - sb << "MARK=" << Mark.ToString() << ";SKIP=" << IntervalSkipped << ";"; - return sb; -} - NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const { NKikimrKqp::TEvRemoteCostData::TCostInfo result; result.SetLeftBorderOpened(LeftBorderOpened); - for (auto&& i : RangeMarks) { - *result.AddIntervalMeta() = i.SerializeMetaToProto(); + for (auto&& i : RangeMarkFeatures) { + *result.AddIntervalMeta() = i.SerializeToProto(); } - - std::shared_ptr<arrow::Schema> schema = KeyColumns; - *result.MutableColumnsSchema() = NArrow::SerializeSchema(*schema); - if (RangeMarks.empty()) { - return result; - } - - TVector<std::unique_ptr<arrow::ArrayBuilder>> arrayBuilders; - for (auto&& f : KeyColumns->fields()) { - std::unique_ptr<arrow::ArrayBuilder> arrayBuilder; - Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok()); - arrayBuilders.emplace_back(std::move(arrayBuilder)); - Y_VERIFY(arrayBuilders.back()->Reserve(RangeMarks.size()).ok()); - } - for (auto&& i : RangeMarks) { - Y_VERIFY(i.GetMark().ColumnsCount() <= ColumnsCount()); - auto itBuilder = arrayBuilders.begin(); - for (auto&& scalar : i.GetMark()) { - Y_VERIFY(!!scalar); - auto correctScalar = scalar->CastTo((*itBuilder)->type()); - Y_VERIFY(correctScalar.ok()); - Y_VERIFY((*itBuilder)->AppendScalar(**correctScalar).ok()); - ++itBuilder; - } - for (; itBuilder != arrayBuilders.end(); ++itBuilder) { - Y_VERIFY((*itBuilder)->AppendNull().ok()); - } - } - TVector<std::shared_ptr<arrow::Array>> arrays; - for (auto&& i : arrayBuilders) { - arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish(); - Y_VERIFY(aData.ok()); - arrays.emplace_back(aData.ValueUnsafe()); - } - std::shared_ptr<arrow::RecordBatch> rb = arrow::RecordBatch::Make(schema, RangeMarks.size(), arrays); - *result.MutableColumnsData() = NArrow::SerializeBatchNoCompression(rb); + BatchReader.SerializeToStrings(*result.MutableColumnsSchema(), *result.MutableColumnsData()); return result; } bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto) { - std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(proto.GetColumnsSchema()); - Y_VERIFY(schema); + Y_VERIFY(BatchReader.DeserializeFromStrings(proto.GetColumnsSchema(), proto.GetColumnsData())); LeftBorderOpened = proto.GetLeftBorderOpened(); - KeyColumns = schema; - - TVector<TRangeMark> resultLocal; - if (!!proto.GetColumnsData()) { - std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(proto.GetColumnsData(), schema); - Y_VERIFY(batch->num_columns() == (int)ColumnsCount()); - resultLocal.reserve(batch->num_rows()); - auto& batchColumns = batch->columns(); - for (ui32 rowIdx = 0; rowIdx < batch->num_rows(); ++rowIdx) { - TKeyMark mark; - for (auto&& c: batchColumns) { - if (c->IsNull(rowIdx)) { - break; - } - auto valueStatus = c->GetScalar(rowIdx); - Y_VERIFY(valueStatus.ok()); - mark.AddValue(valueStatus.ValueUnsafe()); - } - resultLocal.emplace_back(TRangeMark(std::move(mark))); - } - } - Y_VERIFY(resultLocal.size() == (size_t)proto.GetIntervalMeta().size()); - ui32 idx = 0; + Y_VERIFY(BatchReader.GetRowsCount() == (size_t)proto.GetIntervalMeta().size()); + RangeMarkFeatures.reserve(proto.GetIntervalMeta().size()); for (auto&& i : proto.GetIntervalMeta()) { - if (!resultLocal[idx++].DeserializeMetaFromProto(i)) { - return false; - } + TMarkRangeFeatures features; + Y_VERIFY(features.DeserializeFromProto(i)); + RangeMarkFeatures.emplace_back(std::move(features)); } - std::swap(RangeMarks, resultLocal); return true; } -bool TKeyRanges::AddRangeIfGrow(TRangeMark&& range) { - Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount()); - if (RangeMarks.empty() || RangeMarks.back() < range) { - RangeMarks.emplace_back(std::move(range)); - return true; - } - return false; -} - -bool TKeyRanges::AddRangeIfNotLess(TRangeMark&& range) { - Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount()); - if (RangeMarks.empty() || RangeMarks.back() < range || !(range < RangeMarks.back())) { - RangeMarks.emplace_back(std::move(range)); - return true; - } - return RangeMarks.back() < range; +TKeyRangesBuilder::TKeyRangesBuilder(const TIndexInfo& indexInfo) + : IndexInfo(indexInfo) +{ + Constructor.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK())); } -TString TKeyRanges::ToString() const { - TStringBuilder sb; - sb << "COLUMNS: " << KeyColumns->ToString() << Endl; - sb << Endl << "DATA: " << Endl; - for (auto&& i : RangeMarks) { - sb << i.ToString() << ";"; - } - return sb; -} - -TKeyRanges::TKeyRanges() { - KeyColumns = arrow::SchemaBuilder().Finish().ValueUnsafe(); -} - -TString TKeyMark::ToString() const { - TStringBuilder sb; - for (auto&& i : Values) { - sb << (i ? i->ToString() : "NO_VALUE") << ";"; - } - return sb; -} - -bool TKeyMark::operator<(const TKeyMark& item) const { - auto itSelf = Values.begin(); - auto itItem = item.Values.begin(); - for (; itSelf != Values.end() && itItem != item.Values.end(); ++itSelf, ++itItem) { - if (NArrow::ScalarLess(*itSelf, *itItem)) { - return true; - } else if (NArrow::ScalarLess(*itItem, *itSelf)) { - return false; - } - } - if (itSelf == Values.end()) { - if (itItem == item.Values.end()) { - return false; - } else { - return true; - } - } else { - if (itItem == item.Values.end()) { - return true; - } else { - Y_VERIFY(false, "impossible"); - } - } - return false; +NKikimr::NOlap::NCosts::TKeyRanges TKeyRangesBuilder::Build() { + TKeyRanges result; + result.LeftBorderOpened = LeftBorderOpened; + result.RangeMarkFeatures = std::move(Features); + result.BatchReader = Constructor.Finish(); + return result; } } diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h index 48c629f46f5..6a11f3187c5 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.h +++ b/ydb/core/tx/columnshard/columnshard__costs.h @@ -1,4 +1,5 @@ #pragma once +#include <ydb/core/formats/arrow_batch_builder.h> #include <ydb/core/formats/arrow_helpers.h> #include <ydb/core/protos/kqp.pb.h> @@ -11,56 +12,16 @@ namespace NKikimr::NOlap { namespace NKikimr::NOlap::NCosts { -class TKeyMark { +class TMarkRangeFeatures { private: - TVector<std::shared_ptr<arrow::Scalar>> Values; -public: - TKeyMark() = default; - - TString ToString() const; - - TVector<std::shared_ptr<arrow::Scalar>>::const_iterator begin() const { - return Values.begin(); - } - TVector<std::shared_ptr<arrow::Scalar>>::const_iterator end() const { - return Values.end(); - } - TKeyMark& AddValue(std::shared_ptr<arrow::Scalar> value) { - Values.emplace_back(value); - return *this; - } - ui32 ColumnsCount() const { - return Values.size(); - } - bool operator<(const TKeyMark& item) const; -}; - -class TRangeMark { -private: - TKeyMark Mark; bool IntervalSkipped = false; bool MarkIncluded = true; public: - TRangeMark() = default; - TRangeMark(TKeyMark&& mark) - : Mark(std::move(mark)) - { - - } + TMarkRangeFeatures() = default; TString ToString() const; - const TKeyMark& GetMark() const { - return Mark; - } - - TKeyMark& MutableMark() { - return Mark; - } - - bool operator<(const TRangeMark& item) const; - - TRangeMark& SetIntervalSkipped(const bool value) { + TMarkRangeFeatures& SetIntervalSkipped(const bool value) { IntervalSkipped = value; return *this; } @@ -69,7 +30,7 @@ public: return IntervalSkipped; } - TRangeMark& SetMarkIncluded(const bool value) { + TMarkRangeFeatures& SetMarkIncluded(const bool value) { MarkIncluded = value; return *this; } @@ -78,14 +39,14 @@ public: return MarkIncluded; } - NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeMetaToProto() const { + NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeToProto() const { NKikimrKqp::TEvRemoteCostData::TIntervalMeta result; result.SetIntervalSkipped(IntervalSkipped); result.SetMarkIncluded(MarkIncluded); return result; } - bool DeserializeMetaFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) { + bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) { SetIntervalSkipped(proto.GetIntervalSkipped()); SetMarkIncluded(proto.GetMarkIncluded()); return true; @@ -94,64 +55,58 @@ public: class TKeyRanges { private: - std::shared_ptr<arrow::Schema> KeyColumns; - std::vector<TRangeMark> RangeMarks; + friend class TKeyRangesBuilder; + NArrow::TRecordBatchReader BatchReader; + std::vector<TMarkRangeFeatures> RangeMarkFeatures; bool LeftBorderOpened = false; public: - TKeyRanges(); + using TMark = NArrow::TRecordBatchReader::TRecordIterator; - TKeyRanges& Reserve(const ui32 num) { - RangeMarks.reserve(num); - return *this; + const TMarkRangeFeatures& GetMarkFeatures(const ui32 index) const { + Y_VERIFY(index < RangeMarkFeatures.size()); + return RangeMarkFeatures[index]; } TString ToString() const; - - size_t ColumnsCount() const { - return KeyColumns ? KeyColumns->num_fields() : 0; - } - const arrow::Schema& GetKeyColumns() const { - return *KeyColumns; - } - const std::vector<TRangeMark>& GetRangeMarks() const { - return RangeMarks; + ui32 GetMarksCount() const { + return BatchReader.GetRowsCount(); } - std::vector<TRangeMark>& MutableRangeMarks() { - return RangeMarks; + size_t GetColumnsCount() const { + return BatchReader.GetColumnsCount(); } - TKeyRanges& InitColumns(std::shared_ptr<arrow::Schema> schema) { - KeyColumns = schema; - return *this; + size_t ColumnsCount() const { + return BatchReader.GetColumnsCount(); } - TKeyRanges& SetLeftBorderOpened(const bool value) { - LeftBorderOpened = value; - return *this; + const NArrow::TRecordBatchReader& GetBatch() const { + return BatchReader; } bool IsLeftBorderOpened() const { return LeftBorderOpened; } - bool AddRangeIfGrow(TRangeMark&& range); - bool AddRangeIfNotLess(TRangeMark&& range); NKikimrKqp::TEvRemoteCostData::TCostInfo SerializeToProto() const; bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto); }; -class TCostsOperator { +class TKeyRangesBuilder { private: - const TVector<TGranuleRecord>& GranuleRecords; + bool LeftBorderOpened = false; + NArrow::TRecordBatchConstructor Constructor; + TVector<TMarkRangeFeatures> Features; + const TIndexInfo& IndexInfo; - TRangeMark BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const; - TKeyMark BuildMarkFromGranule(const TGranuleRecord& record) const; + bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p); + void AddMarkFromGranule(const TGranuleRecord& record); static ui64 ExtractKey(const TString& key); public: - TCostsOperator(const TVector<TGranuleRecord>& granuleRecords, const TIndexInfo& indexInfo) - : GranuleRecords(granuleRecords) - , IndexInfo(indexInfo) - { - + TKeyRangesBuilder(const TIndexInfo& indexInfo); + void Reserve(const ui32 num) { + Constructor.Reserve(num); } - void FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const; + void FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>& left, const TVector<TGranuleRecord>& granuleRecords, + const std::shared_ptr<NOlap::TPredicate>& right); + + TKeyRanges Build(); }; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 7ab2a7d7ae6..01a26bc26f6 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -720,17 +720,14 @@ void TTxScan::Complete(const TActorContext& ctx) { TStringStream detailedInfo; if (IS_LOG_PRIORITY_ENABLED(ctx, NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { - detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" - << " req: " << request; + detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request; } TVector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges; if (request.GetCostDataOnly()) { for (auto&& i : ReadMetadataRanges) { NOlap::TReadMetadata::TConstPtr rMetadata = std::dynamic_pointer_cast<const NOlap::TReadMetadata>(i); if (!rMetadata || !rMetadata->SelectInfo) { - NOlap::NCosts::TKeyRanges ranges; - ranges.SetLeftBorderOpened(true); - auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId); + auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(NOlap::NCosts::TKeyRanges(), scanId); ctx.Send(scanComputeActor, ev.Release()); return; } @@ -762,23 +759,21 @@ void TTxScan::Complete(const TActorContext& ctx) { } if (request.GetCostDataOnly()) { - NOlap::NCosts::TKeyRanges ranges; if (request.GetReverse()) { std::reverse(rMetadataRanges.begin(), rMetadataRanges.end()); } + NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo()); { ui32 recordsCount = 0; for (auto&& i : rMetadataRanges) { recordsCount += i->SelectInfo->Granules.size() + 2; } - ranges.Reserve(recordsCount); + krBuilder.Reserve(recordsCount); } for (auto&& i : rMetadataRanges) { - NOlap::NCosts::TCostsOperator cOperator(i->SelectInfo->Granules, i->IndexInfo); - cOperator.FillRangeMarks(ranges, i->GreaterPredicate, i->LessPredicate); + krBuilder.FillRangeMarks(i->GreaterPredicate, i->SelectInfo->Granules, i->LessPredicate); } - LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Result " << ranges.ToString()); - auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId); + auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(krBuilder.Build(), scanId); ctx.Send(scanComputeActor, ev.Release()); return; } |