diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-14 17:47:10 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-14 17:47:10 +0300 |
commit | 43f2a855553ca09945ed0688b33c997ae6e905e4 (patch) | |
tree | 8941596fabf68e3c5200f2eaaecd4cbbfc00510f | |
parent | 1aa4ac536eea700d238b4e6142dbf58dc3bb5434 (diff) | |
download | ydb-43f2a855553ca09945ed0688b33c997ae6e905e4.tar.gz |
prepare costs request for column shard
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 12 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_common.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compute.cpp | 83 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compute.h | 55 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 18 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__costs.cpp | 222 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__costs.h | 157 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_common.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp | 2 |
17 files changed, 597 insertions, 7 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 72ff6fd234..6a28052da2 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -738,6 +738,12 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar return out; } +bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) { + Y_VERIFY(x); + Y_VERIFY(y); + return ScalarLess(*x, *y); +} + bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) { Y_VERIFY(x.type->Equals(y.type)); @@ -745,7 +751,11 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) { using TWrap = std::decay_t<decltype(type)>; using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>; - + if constexpr (arrow::has_string_view<TScalar>()) { + const auto& xval = static_cast<const TScalar&>(x).value; + const auto& yval = static_cast<const TScalar&>(y).value; + return TStringBuf((char*)xval->data(), xval->size()) < TStringBuf((char*)yval->data(), yval->size()); + } if constexpr (std::is_arithmetic_v<TValue>) { const auto& xval = static_cast<const TScalar&>(x).value; const auto& yval = static_cast<const TScalar&>(y).value; diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 19d1505270..2f6406b68e 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -137,6 +137,7 @@ std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& column); std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position); +bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y); class IRowWriter; diff --git a/ydb/core/kqp/CMakeLists.txt b/ydb/core/kqp/CMakeLists.txt index 12335e5955..287c52c9bb 100644 --- a/ydb/core/kqp/CMakeLists.txt +++ b/ydb/core/kqp/CMakeLists.txt @@ -68,6 +68,7 @@ target_sources(ydb-core-kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_request.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_shutdown_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_metadata_loader.cpp diff --git a/ydb/core/kqp/common/kqp_common.h b/ydb/core/kqp/common/kqp_common.h index f9379ad129..76bf41bffe 100644 --- a/ydb/core/kqp/common/kqp_common.h +++ b/ydb/core/kqp/common/kqp_common.h @@ -74,6 +74,8 @@ struct TKqpComputeEvents { EvScanInitActor, EvRemoteScanData, EvRemoteScanDataAck, + EvRemoteCostData, + EvCostData, }; static_assert(Unused0 == EventSpaceBegin(TKikimrEvents::ES_KQP) + 200); diff --git a/ydb/core/kqp/kqp_compute.cpp b/ydb/core/kqp/kqp_compute.cpp new file mode 100644 index 0000000000..82bf27a407 --- /dev/null +++ b/ydb/core/kqp/kqp_compute.cpp @@ -0,0 +1,83 @@ +#include "kqp_compute.h" + +namespace NKikimr::NKqp { + +void TEvKqpCompute::TEvCostData::InitRemote() const { + if (!Remote) { + Remote.Reset(new TEvRemoteCostData); + *Remote->Record.MutableCostInfo() = TableRanges.SerializeToProto(); + Remote->Record.SetScanId(ScanId); + } +} + +NActors::IEventBase* TEvKqpCompute::TEvCostData::Load(TEventSerializedData* data) { + auto pbEv = THolder<TEvRemoteCostData>(static_cast<TEvRemoteCostData*>(TEvRemoteCostData::Load(data))); + NOlap::NCosts::TKeyRanges ranges; + Y_VERIFY(ranges.DeserializeFromProto(pbEv->Record.GetCostInfo())); + THolder<TEvCostData> result(new TEvCostData(std::move(ranges), pbEv->Record.GetScanId())); + return result.Release(); +} + +TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges() const { + TVector<TSerializedTableRange> result; + TVector<TCell> borderPred; + borderPred.resize(TableRanges.ColumnsCount(), TCell()); + bool predSkipped = !TableRanges.IsLeftBorderOpened(); + bool predIncluded = false; + for (auto&& i : TableRanges.GetRangeMarks()) { + TVector<TCell> borderCurrent; + for (auto&& value : i.GetMark()) { + borderCurrent.emplace_back(SerializeScalarToCell(value)); + } + if (!predSkipped) { + for (ui32 additional = borderPred.size(); additional < TableRanges.ColumnsCount(); ++additional) { + borderPred.emplace_back(TCell()); + } + if (!i.GetMarkIncluded()) { + for (ui32 additional = borderCurrent.size(); additional < TableRanges.ColumnsCount(); ++additional) { + borderCurrent.emplace_back(TCell()); + } + } + TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, i.GetMarkIncluded()); + result.emplace_back(std::move(serializedRange)); + } + predSkipped = i.GetIntervalSkipped(); + predIncluded = i.GetMarkIncluded(); + std::swap(borderCurrent, borderPred); + } + if (predSkipped) { + Y_VERIFY(result.size()); + result.back().ToInclusive = predIncluded; + } else { + TSerializedTableRange serializedRange(borderPred, predIncluded, TVector<TCell>(), false); + result.emplace_back(std::move(serializedRange)); + } + return result; +} + +NKikimr::TCell TEvKqpCompute::TEvCostData::SerializeScalarToCell(const std::shared_ptr<arrow::Scalar>& x) const { + if (!x) { + return TCell(); + } + auto castStatus = std::dynamic_pointer_cast<arrow::BaseBinaryScalar>(x); + if (castStatus) { + TCell resultCell((char*)castStatus->value->data(), castStatus->value->size()); + return resultCell; + } else { + TCell resultCell; + Y_VERIFY(NArrow::SwitchType(x->type->id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; + using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*x).value)>; + + if (!std::is_arithmetic_v<TValue>) { + return false; + } + resultCell = TCell::Make(static_cast<const TScalar&>(*x).value); + return true; + })); + return resultCell; + } +} + +} diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h index 943f5adac3..52d374b3e5 100644 --- a/ydb/core/kqp/kqp_compute.h +++ b/ydb/core/kqp/kqp_compute.h @@ -4,8 +4,10 @@ #include <ydb/core/formats/arrow_helpers.h> #include <ydb/core/protos/tx_datashard.pb.h> +#include <ydb/core/scheme/scheme_tabledefs.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <ydb/core/tx/columnshard/columnshard__costs.h> namespace NKikimr::NKqp { @@ -22,6 +24,8 @@ struct TEvKqpCompute { * node. To support scans in this case we provide serialization routines. For now such remote scan * is considered as rare event and not worth of some fast serialization, so we just use protobuf. * + * TEvCostData is just reply with costs for request physical level optimization + * * TEvScanDataAck follows the same pattern mostly for symmetry reasons. */ struct TEvScanData : public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::EvScanData> { @@ -126,8 +130,55 @@ struct TEvKqpCompute { } }; - struct TEvRemoteScanDataAck : public NActors::TEventPB<TEvRemoteScanDataAck, NKikimrKqp::TEvRemoteScanDataAck, - TKqpComputeEvents::EvRemoteScanDataAck> {}; + struct TEvRemoteCostData: public NActors::TEventPB<TEvRemoteCostData, NKikimrKqp::TEvRemoteCostData, + TKqpComputeEvents::EvRemoteCostData> { + }; + + class TEvCostData: public NActors::TEventLocal<TEvCostData, TKqpComputeEvents::EvCostData> { + private: + NOlap::NCosts::TKeyRanges TableRanges; + ui32 ScanId = 0; + mutable THolder<TEvRemoteCostData> Remote; + + void InitRemote() const; + TCell SerializeScalarToCell(const std::shared_ptr<arrow::Scalar>& x) const; + TVector<NKikimr::TSerializedTableRange> BuildSerializedRanges(const NOlap::NCosts::TKeyRanges& costRanges) const; + + public: + TEvCostData(NOlap::NCosts::TKeyRanges&& ranges, const ui32 scanId) + : TableRanges(std::move(ranges)) + , ScanId(scanId) { + } + + ui32 GetScanId() const { + return ScanId; + } + + const NOlap::NCosts::TKeyRanges& GetTableRanges() const { + return TableRanges; + } + TVector<TSerializedTableRange> GetSerializedTableRanges() const; + + virtual bool IsSerializable() const override { + return true; + } + + virtual ui32 CalculateSerializedSize() const override { + InitRemote(); + return Remote->CalculateSerializedSizeCached(); + } + + virtual bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override { + InitRemote(); + return Remote->SerializeToArcadiaStream(chunker); + } + + static NActors::IEventBase* Load(TEventSerializedData* data); + }; + + struct TEvRemoteScanDataAck: public NActors::TEventPB<TEvRemoteScanDataAck, NKikimrKqp::TEvRemoteScanDataAck, + TKqpComputeEvents::EvRemoteScanDataAck> { + }; struct TEvScanDataAck : public NActors::TEventLocal<TEvScanDataAck, TKqpComputeEvents::EvScanDataAck> { explicit TEvScanDataAck(ui64 freeSpace, ui32 generation = 0) diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 3a4e101ff5..bb647ed258 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -576,6 +576,24 @@ message TEvRemoteScanDataAck { optional uint32 Generation = 2; } +message TEvRemoteCostData { + message TIntervalMeta { + optional bool IntervalSkipped = 1[default = false]; + optional bool MarkIncluded = 2[default = true]; + } + + message TCostInfo { + repeated TIntervalMeta IntervalMeta = 1; + optional bool LeftBorderOpened = 2; + // ColumnsData contains RecordBatch with schema that serialized in ColumnsSchema + optional bytes ColumnsData = 3; + optional bytes ColumnsSchema = 4; + } + + optional uint32 ScanId = 1; + optional TCostInfo CostInfo = 2; +} + message TEvKillScanTablet { } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 31420c8b67..d2dd9ae089 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1426,6 +1426,7 @@ message TEvKqpScan { optional NYql.NDqProto.EDqStatsMode StatsMode = 18; optional bytes OlapProgram = 19; optional NKikimrSchemeOp.EOlapProgramType OlapProgramType = 20; + optional bool CostDataOnly = 21[default = false]; } message TEvCompactTable { diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt index 52d50b153d..f966e2b293 100644 --- a/ydb/core/tx/columnshard/CMakeLists.txt +++ b/ydb/core/tx/columnshard/CMakeLists.txt @@ -40,6 +40,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp new file mode 100644 index 0000000000..a8c53cf9dd --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__costs.cpp @@ -0,0 +1,222 @@ +#include "columnshard__costs.h" +#include <ydb/core/tx/columnshard/engines/index_info.h> +#include <ydb/core/tx/columnshard/engines/granules_table.h> +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/scheme/scheme_tabledefs.h> +#include <ydb/core/protos/ssa.pb.h> +#include <ydb/core/tx/columnshard/engines/predicate.h> + +namespace NKikimr::NOlap::NCosts { + +TKeyMark TCostsOperator::BuildMarkFromGranule(const TGranuleRecord& record) const { + TKeyMark result; + result.AddValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey))); + return result; +} + +TRangeMark TCostsOperator::BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const { + if (!p) { + return TRangeMark(); + } + TRangeMark result; + for (i32 i = 0; i < p->Batch->num_columns(); ++i) { + auto column = p->Batch->column(i); + Y_VERIFY(!!column && column->length() == 1); + Y_VERIFY(!column->IsNull(0)); + auto status = column->GetScalar(0); + Y_VERIFY(status.ok()); + result.MutableMark().AddValue(status.ValueUnsafe()); + } + result.SetMarkIncluded(p->Inclusive); + return result; +} + +void TCostsOperator::FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const { + LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Request from " << (left ? left->Batch->ToString() : "-Inf") << + " to " << (right ? right->Batch->ToString() : "+Inf")); + + result.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK())); + { + TRangeMark leftBorder = BuildMarkFromPredicate(left); + if (!leftBorder.GetMark().ColumnsCount()) { + Y_VERIFY(!result.IsLeftBorderOpened()); + result.SetLeftBorderOpened(true); + } else { + Y_VERIFY(result.AddRangeIfNotLess(std::move(leftBorder))); + } + } + for (auto&& i : GranuleRecords) { + auto granuleMark = BuildMarkFromGranule(i); + Y_VERIFY(result.AddRangeIfGrow(TRangeMark(std::move(granuleMark)))); + } + { + TRangeMark rightBorder = BuildMarkFromPredicate(right); + if (rightBorder.GetMark().ColumnsCount()) { + Y_VERIFY(result.AddRangeIfNotLess(std::move(rightBorder))); + result.MutableRangeMarks().back().SetIntervalSkipped(true); + } + } +} + +ui64 TCostsOperator::ExtractKey(const TString& key) { + Y_VERIFY(key.size() == 8); + return *reinterpret_cast<const ui64*>(key.data()); +} + +bool TRangeMark::operator<(const TRangeMark& item) const { + return Mark < item.Mark; +} + +TString TRangeMark::ToString() const { + TStringBuilder sb; + sb << "MARK=" << Mark.ToString() << ";SKIP=" << IntervalSkipped << ";"; + return sb; +} + +NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const { + NKikimrKqp::TEvRemoteCostData::TCostInfo result; + result.SetLeftBorderOpened(LeftBorderOpened); + for (auto&& i : RangeMarks) { + *result.AddIntervalMeta() = i.SerializeMetaToProto(); + } + + std::shared_ptr<arrow::Schema> schema = KeyColumns; + *result.MutableColumnsSchema() = NArrow::SerializeSchema(*schema); + if (RangeMarks.empty()) { + return result; + } + + TVector<std::unique_ptr<arrow::ArrayBuilder>> arrayBuilders; + for (auto&& f : KeyColumns->fields()) { + std::unique_ptr<arrow::ArrayBuilder> arrayBuilder; + Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok()); + arrayBuilders.emplace_back(std::move(arrayBuilder)); + Y_VERIFY(arrayBuilders.back()->Reserve(RangeMarks.size()).ok()); + } + for (auto&& i : RangeMarks) { + Y_VERIFY(i.GetMark().ColumnsCount() <= ColumnsCount()); + auto itBuilder = arrayBuilders.begin(); + for (auto&& scalar : i.GetMark()) { + Y_VERIFY(!!scalar); + auto correctScalar = scalar->CastTo((*itBuilder)->type()); + Y_VERIFY(correctScalar.ok()); + Y_VERIFY((*itBuilder)->AppendScalar(**correctScalar).ok()); + ++itBuilder; + } + for (; itBuilder != arrayBuilders.end(); ++itBuilder) { + Y_VERIFY((*itBuilder)->AppendNull().ok()); + } + } + TVector<std::shared_ptr<arrow::Array>> arrays; + for (auto&& i : arrayBuilders) { + arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish(); + Y_VERIFY(aData.ok()); + arrays.emplace_back(aData.ValueUnsafe()); + } + std::shared_ptr<arrow::RecordBatch> rb = arrow::RecordBatch::Make(schema, RangeMarks.size(), arrays); + *result.MutableColumnsData() = NArrow::SerializeBatchNoCompression(rb); + return result; +} + +bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto) { + std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(proto.GetColumnsSchema()); + Y_VERIFY(schema); + LeftBorderOpened = proto.GetLeftBorderOpened(); + KeyColumns = schema; + + TVector<TRangeMark> resultLocal; + if (!!proto.GetColumnsData()) { + std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(proto.GetColumnsData(), schema); + Y_VERIFY(batch->num_columns() == (int)ColumnsCount()); + resultLocal.reserve(batch->num_rows()); + for (ui32 rowIdx = 0; rowIdx < batch->num_rows(); ++rowIdx) { + TKeyMark mark; + for (ui32 cIdx = 0; cIdx < ColumnsCount(); ++cIdx) { + if (batch->column(cIdx)->IsNull(rowIdx)) { + break; + } + auto valueStatus = batch->column(cIdx)->GetScalar(rowIdx); + Y_VERIFY(valueStatus.ok()); + mark.AddValue(valueStatus.ValueUnsafe()); + } + resultLocal.emplace_back(TRangeMark(std::move(mark))); + } + } + Y_VERIFY(resultLocal.size() == (size_t)proto.GetIntervalMeta().size()); + ui32 idx = 0; + for (auto&& i : proto.GetIntervalMeta()) { + if (!resultLocal[idx++].DeserializeMetaFromProto(i)) { + return false; + } + } + std::swap(RangeMarks, resultLocal); + return true; +} + +bool TKeyRanges::AddRangeIfGrow(TRangeMark&& range) { + Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount()); + if (RangeMarks.empty() || RangeMarks.back() < range) { + RangeMarks.emplace_back(std::move(range)); + return true; + } + return false; +} + +bool TKeyRanges::AddRangeIfNotLess(TRangeMark&& range) { + Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount()); + if (RangeMarks.empty() || RangeMarks.back() < range || !(range < RangeMarks.back())) { + RangeMarks.emplace_back(std::move(range)); + return true; + } + return RangeMarks.back() < range; +} + +TString TKeyRanges::ToString() const { + TStringBuilder sb; + sb << "COLUMNS: " << KeyColumns->ToString() << Endl; + sb << Endl << "DATA: " << Endl; + for (auto&& i : RangeMarks) { + sb << i.ToString() << ";"; + } + return sb; +} + +TKeyRanges::TKeyRanges() { + KeyColumns = arrow::SchemaBuilder().Finish().ValueUnsafe(); +} + +TString TKeyMark::ToString() const { + TStringBuilder sb; + for (auto&& i : Values) { + sb << (i ? i->ToString() : "NO_VALUE") << ";"; + } + return sb; +} + +bool TKeyMark::operator<(const TKeyMark& item) const { + auto itSelf = Values.begin(); + auto itItem = item.Values.begin(); + for (; itSelf != Values.end() && itItem != item.Values.end(); ++itSelf, ++itItem) { + if (NArrow::ScalarLess(*itSelf, *itItem)) { + return true; + } else if (NArrow::ScalarLess(*itItem, *itSelf)) { + return false; + } + } + if (itSelf == Values.end()) { + if (itItem == item.Values.end()) { + return false; + } else { + return true; + } + } else { + if (itItem == item.Values.end()) { + return true; + } else { + Y_VERIFY(false, "impossible"); + } + } + return false; +} + +} diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h new file mode 100644 index 0000000000..48c629f46f --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__costs.h @@ -0,0 +1,157 @@ +#pragma once +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/protos/kqp.pb.h> + +namespace NKikimr::NOlap { + struct TIndexInfo; + struct TGranuleRecord; + struct TPredicate; +} + +namespace NKikimr::NOlap::NCosts { + + +class TKeyMark { +private: + TVector<std::shared_ptr<arrow::Scalar>> Values; +public: + TKeyMark() = default; + + TString ToString() const; + + TVector<std::shared_ptr<arrow::Scalar>>::const_iterator begin() const { + return Values.begin(); + } + TVector<std::shared_ptr<arrow::Scalar>>::const_iterator end() const { + return Values.end(); + } + TKeyMark& AddValue(std::shared_ptr<arrow::Scalar> value) { + Values.emplace_back(value); + return *this; + } + ui32 ColumnsCount() const { + return Values.size(); + } + bool operator<(const TKeyMark& item) const; +}; + +class TRangeMark { +private: + TKeyMark Mark; + bool IntervalSkipped = false; + bool MarkIncluded = true; +public: + TRangeMark() = default; + TRangeMark(TKeyMark&& mark) + : Mark(std::move(mark)) + { + + } + + TString ToString() const; + + const TKeyMark& GetMark() const { + return Mark; + } + + TKeyMark& MutableMark() { + return Mark; + } + + bool operator<(const TRangeMark& item) const; + + TRangeMark& SetIntervalSkipped(const bool value) { + IntervalSkipped = value; + return *this; + } + + bool GetIntervalSkipped() const { + return IntervalSkipped; + } + + TRangeMark& SetMarkIncluded(const bool value) { + MarkIncluded = value; + return *this; + } + + bool GetMarkIncluded() const { + return MarkIncluded; + } + + NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeMetaToProto() const { + NKikimrKqp::TEvRemoteCostData::TIntervalMeta result; + result.SetIntervalSkipped(IntervalSkipped); + result.SetMarkIncluded(MarkIncluded); + return result; + } + + bool DeserializeMetaFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) { + SetIntervalSkipped(proto.GetIntervalSkipped()); + SetMarkIncluded(proto.GetMarkIncluded()); + return true; + } +}; + +class TKeyRanges { +private: + std::shared_ptr<arrow::Schema> KeyColumns; + std::vector<TRangeMark> RangeMarks; + bool LeftBorderOpened = false; +public: + TKeyRanges(); + + TKeyRanges& Reserve(const ui32 num) { + RangeMarks.reserve(num); + return *this; + } + + TString ToString() const; + + size_t ColumnsCount() const { + return KeyColumns ? KeyColumns->num_fields() : 0; + } + const arrow::Schema& GetKeyColumns() const { + return *KeyColumns; + } + const std::vector<TRangeMark>& GetRangeMarks() const { + return RangeMarks; + } + std::vector<TRangeMark>& MutableRangeMarks() { + return RangeMarks; + } + TKeyRanges& InitColumns(std::shared_ptr<arrow::Schema> schema) { + KeyColumns = schema; + return *this; + } + TKeyRanges& SetLeftBorderOpened(const bool value) { + LeftBorderOpened = value; + return *this; + } + bool IsLeftBorderOpened() const { + return LeftBorderOpened; + } + bool AddRangeIfGrow(TRangeMark&& range); + bool AddRangeIfNotLess(TRangeMark&& range); + + NKikimrKqp::TEvRemoteCostData::TCostInfo SerializeToProto() const; + bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto); + +}; + +class TCostsOperator { +private: + const TVector<TGranuleRecord>& GranuleRecords; + const TIndexInfo& IndexInfo; + TRangeMark BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const; + TKeyMark BuildMarkFromGranule(const TGranuleRecord& record) const; + static ui64 ExtractKey(const TString& key); +public: + TCostsOperator(const TVector<TGranuleRecord>& granuleRecords, const TIndexInfo& indexInfo) + : GranuleRecords(granuleRecords) + , IndexInfo(indexInfo) + { + + } + void FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const; +}; +} diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index fb49312146..93fd867343 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -722,6 +722,20 @@ void TTxScan::Complete(const TActorContext& ctx) { detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request; } + TVector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges; + if (request.GetCostDataOnly()) { + for (auto&& i : ReadMetadataRanges) { + NOlap::TReadMetadata::TConstPtr rMetadata = std::dynamic_pointer_cast<const NOlap::TReadMetadata>(i); + if (!rMetadata || !rMetadata->SelectInfo) { + NOlap::NCosts::TKeyRanges ranges; + ranges.SetLeftBorderOpened(true); + auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId); + ctx.Send(scanComputeActor, ev.Release()); + return; + } + rMetadataRanges.emplace_back(rMetadata); + } + } if (ReadMetadataRanges.empty()) { LOG_S_DEBUG("TTxScan failed " @@ -746,6 +760,28 @@ void TTxScan::Complete(const TActorContext& ctx) { return; } + if (request.GetCostDataOnly()) { + NOlap::NCosts::TKeyRanges ranges; + if (request.GetReverse()) { + std::reverse(rMetadataRanges.begin(), rMetadataRanges.end()); + } + { + ui32 recordsCount = 0; + for (auto&& i : rMetadataRanges) { + recordsCount += i->SelectInfo->Granules.size() + 2; + } + ranges.Reserve(recordsCount); + } + for (auto&& i : rMetadataRanges) { + NOlap::NCosts::TCostsOperator cOperator(i->SelectInfo->Granules, i->IndexInfo); + cOperator.FillRangeMarks(ranges, i->GreaterPredicate, i->LessPredicate); + } + LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Result " << ranges.ToString()); + auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId); + ctx.Send(scanComputeActor, ev.Release()); + return; + } + ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges, *Self->BlobManager); auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp index aed13a5a27..25337a206f 100644 --- a/ydb/core/tx/columnshard/columnshard_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_common.cpp @@ -344,7 +344,8 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r bool leftTrailingNull = false; { TConstArrayRef<TCell> cells = range.From.GetCells(); - size_t size = cells.size(); + const size_t size = cells.size(); + Y_ASSERT(size <= columns.size()); leftCells.reserve(size); leftColumns.reserve(size); for (size_t i = 0; i < size; ++i) { @@ -363,7 +364,8 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r bool rightTrailingNull = false; { TConstArrayRef<TCell> cells = range.To.GetCells(); - size_t size = cells.size(); + const size_t size = cells.size(); + Y_ASSERT(size <= columns.size()); rightCells.reserve(size); rightColumns.reserve(size); for (size_t i = 0; i < size; ++i) { diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt index 95fe0abeb6..0aee1e51a3 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt @@ -26,6 +26,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ) target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp new file mode 100644 index 0000000000..99542dd0a1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -0,0 +1,4 @@ +#include "column_engine.h"
+ +namespace NKikimr::NOlap { +} diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 8d790cf014..ea85a05c88 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -253,7 +253,7 @@ struct TSelectInfo { } }; - TVector<TGranuleRecord> Granules; // oredered by key (asc) + TVector<TGranuleRecord> Granules; // ordered by key (ascending) TVector<TPortionInfo> Portions; TVector<ui64> GranulesOrder(bool rev = false) const { diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index 69a3b14caf..ed516f2941 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -223,7 +223,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } /* - * Check that remote scan actually happend. + * Check that remote scan actually happened. */ case NKqp::TKqpComputeEvents::EvScanData: { remoteScanDetected = remoteScanDetected || ev->Sender.NodeId() != ev->Recipient.NodeId(); |