aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-14 17:47:10 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-14 17:47:10 +0300
commit43f2a855553ca09945ed0688b33c997ae6e905e4 (patch)
tree8941596fabf68e3c5200f2eaaecd4cbbfc00510f
parent1aa4ac536eea700d238b4e6142dbf58dc3bb5434 (diff)
downloadydb-43f2a855553ca09945ed0688b33c997ae6e905e4.tar.gz
prepare costs request for column shard
-rw-r--r--ydb/core/formats/arrow_helpers.cpp12
-rw-r--r--ydb/core/formats/arrow_helpers.h1
-rw-r--r--ydb/core/kqp/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/common/kqp_common.h2
-rw-r--r--ydb/core/kqp/kqp_compute.cpp83
-rw-r--r--ydb/core/kqp/kqp_compute.h55
-rw-r--r--ydb/core/protos/kqp.proto18
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.cpp222
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.h157
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp36
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp2
17 files changed, 597 insertions, 7 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 72ff6fd234..6a28052da2 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -738,6 +738,12 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
return out;
}
+bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
+ Y_VERIFY(x);
+ Y_VERIFY(y);
+ return ScalarLess(*x, *y);
+}
+
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
Y_VERIFY(x.type->Equals(y.type));
@@ -745,7 +751,11 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
using TWrap = std::decay_t<decltype(type)>;
using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>;
-
+ if constexpr (arrow::has_string_view<TScalar>()) {
+ const auto& xval = static_cast<const TScalar&>(x).value;
+ const auto& yval = static_cast<const TScalar&>(y).value;
+ return TStringBuf((char*)xval->data(), xval->size()) < TStringBuf((char*)yval->data(), yval->size());
+ }
if constexpr (std::is_arithmetic_v<TValue>) {
const auto& xval = static_cast<const TScalar&>(x).value;
const auto& yval = static_cast<const TScalar&>(y).value;
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 19d1505270..2f6406b68e 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -137,6 +137,7 @@ std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>&
std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& column);
std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position);
+bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
class IRowWriter;
diff --git a/ydb/core/kqp/CMakeLists.txt b/ydb/core/kqp/CMakeLists.txt
index 12335e5955..287c52c9bb 100644
--- a/ydb/core/kqp/CMakeLists.txt
+++ b/ydb/core/kqp/CMakeLists.txt
@@ -68,6 +68,7 @@ target_sources(ydb-core-kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_request.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compile_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_shutdown_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_ic_gateway.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/kqp_metadata_loader.cpp
diff --git a/ydb/core/kqp/common/kqp_common.h b/ydb/core/kqp/common/kqp_common.h
index f9379ad129..76bf41bffe 100644
--- a/ydb/core/kqp/common/kqp_common.h
+++ b/ydb/core/kqp/common/kqp_common.h
@@ -74,6 +74,8 @@ struct TKqpComputeEvents {
EvScanInitActor,
EvRemoteScanData,
EvRemoteScanDataAck,
+ EvRemoteCostData,
+ EvCostData,
};
static_assert(Unused0 == EventSpaceBegin(TKikimrEvents::ES_KQP) + 200);
diff --git a/ydb/core/kqp/kqp_compute.cpp b/ydb/core/kqp/kqp_compute.cpp
new file mode 100644
index 0000000000..82bf27a407
--- /dev/null
+++ b/ydb/core/kqp/kqp_compute.cpp
@@ -0,0 +1,83 @@
+#include "kqp_compute.h"
+
+namespace NKikimr::NKqp {
+
+void TEvKqpCompute::TEvCostData::InitRemote() const {
+ if (!Remote) {
+ Remote.Reset(new TEvRemoteCostData);
+ *Remote->Record.MutableCostInfo() = TableRanges.SerializeToProto();
+ Remote->Record.SetScanId(ScanId);
+ }
+}
+
+NActors::IEventBase* TEvKqpCompute::TEvCostData::Load(TEventSerializedData* data) {
+ auto pbEv = THolder<TEvRemoteCostData>(static_cast<TEvRemoteCostData*>(TEvRemoteCostData::Load(data)));
+ NOlap::NCosts::TKeyRanges ranges;
+ Y_VERIFY(ranges.DeserializeFromProto(pbEv->Record.GetCostInfo()));
+ THolder<TEvCostData> result(new TEvCostData(std::move(ranges), pbEv->Record.GetScanId()));
+ return result.Release();
+}
+
+TVector<NKikimr::TSerializedTableRange> TEvKqpCompute::TEvCostData::GetSerializedTableRanges() const {
+ TVector<TSerializedTableRange> result;
+ TVector<TCell> borderPred;
+ borderPred.resize(TableRanges.ColumnsCount(), TCell());
+ bool predSkipped = !TableRanges.IsLeftBorderOpened();
+ bool predIncluded = false;
+ for (auto&& i : TableRanges.GetRangeMarks()) {
+ TVector<TCell> borderCurrent;
+ for (auto&& value : i.GetMark()) {
+ borderCurrent.emplace_back(SerializeScalarToCell(value));
+ }
+ if (!predSkipped) {
+ for (ui32 additional = borderPred.size(); additional < TableRanges.ColumnsCount(); ++additional) {
+ borderPred.emplace_back(TCell());
+ }
+ if (!i.GetMarkIncluded()) {
+ for (ui32 additional = borderCurrent.size(); additional < TableRanges.ColumnsCount(); ++additional) {
+ borderCurrent.emplace_back(TCell());
+ }
+ }
+ TSerializedTableRange serializedRange(borderPred, predIncluded, borderCurrent, i.GetMarkIncluded());
+ result.emplace_back(std::move(serializedRange));
+ }
+ predSkipped = i.GetIntervalSkipped();
+ predIncluded = i.GetMarkIncluded();
+ std::swap(borderCurrent, borderPred);
+ }
+ if (predSkipped) {
+ Y_VERIFY(result.size());
+ result.back().ToInclusive = predIncluded;
+ } else {
+ TSerializedTableRange serializedRange(borderPred, predIncluded, TVector<TCell>(), false);
+ result.emplace_back(std::move(serializedRange));
+ }
+ return result;
+}
+
+NKikimr::TCell TEvKqpCompute::TEvCostData::SerializeScalarToCell(const std::shared_ptr<arrow::Scalar>& x) const {
+ if (!x) {
+ return TCell();
+ }
+ auto castStatus = std::dynamic_pointer_cast<arrow::BaseBinaryScalar>(x);
+ if (castStatus) {
+ TCell resultCell((char*)castStatus->value->data(), castStatus->value->size());
+ return resultCell;
+ } else {
+ TCell resultCell;
+ Y_VERIFY(NArrow::SwitchType(x->type->id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
+ using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*x).value)>;
+
+ if (!std::is_arithmetic_v<TValue>) {
+ return false;
+ }
+ resultCell = TCell::Make(static_cast<const TScalar&>(*x).value);
+ return true;
+ }));
+ return resultCell;
+ }
+}
+
+}
diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h
index 943f5adac3..52d374b3e5 100644
--- a/ydb/core/kqp/kqp_compute.h
+++ b/ydb/core/kqp/kqp_compute.h
@@ -4,8 +4,10 @@
#include <ydb/core/formats/arrow_helpers.h>
#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <ydb/core/tx/columnshard/columnshard__costs.h>
namespace NKikimr::NKqp {
@@ -22,6 +24,8 @@ struct TEvKqpCompute {
* node. To support scans in this case we provide serialization routines. For now such remote scan
* is considered as rare event and not worth of some fast serialization, so we just use protobuf.
*
+ * TEvCostData is just reply with costs for request physical level optimization
+ *
* TEvScanDataAck follows the same pattern mostly for symmetry reasons.
*/
struct TEvScanData : public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::EvScanData> {
@@ -126,8 +130,55 @@ struct TEvKqpCompute {
}
};
- struct TEvRemoteScanDataAck : public NActors::TEventPB<TEvRemoteScanDataAck, NKikimrKqp::TEvRemoteScanDataAck,
- TKqpComputeEvents::EvRemoteScanDataAck> {};
+ struct TEvRemoteCostData: public NActors::TEventPB<TEvRemoteCostData, NKikimrKqp::TEvRemoteCostData,
+ TKqpComputeEvents::EvRemoteCostData> {
+ };
+
+ class TEvCostData: public NActors::TEventLocal<TEvCostData, TKqpComputeEvents::EvCostData> {
+ private:
+ NOlap::NCosts::TKeyRanges TableRanges;
+ ui32 ScanId = 0;
+ mutable THolder<TEvRemoteCostData> Remote;
+
+ void InitRemote() const;
+ TCell SerializeScalarToCell(const std::shared_ptr<arrow::Scalar>& x) const;
+ TVector<NKikimr::TSerializedTableRange> BuildSerializedRanges(const NOlap::NCosts::TKeyRanges& costRanges) const;
+
+ public:
+ TEvCostData(NOlap::NCosts::TKeyRanges&& ranges, const ui32 scanId)
+ : TableRanges(std::move(ranges))
+ , ScanId(scanId) {
+ }
+
+ ui32 GetScanId() const {
+ return ScanId;
+ }
+
+ const NOlap::NCosts::TKeyRanges& GetTableRanges() const {
+ return TableRanges;
+ }
+ TVector<TSerializedTableRange> GetSerializedTableRanges() const;
+
+ virtual bool IsSerializable() const override {
+ return true;
+ }
+
+ virtual ui32 CalculateSerializedSize() const override {
+ InitRemote();
+ return Remote->CalculateSerializedSizeCached();
+ }
+
+ virtual bool SerializeToArcadiaStream(NActors::TChunkSerializer* chunker) const override {
+ InitRemote();
+ return Remote->SerializeToArcadiaStream(chunker);
+ }
+
+ static NActors::IEventBase* Load(TEventSerializedData* data);
+ };
+
+ struct TEvRemoteScanDataAck: public NActors::TEventPB<TEvRemoteScanDataAck, NKikimrKqp::TEvRemoteScanDataAck,
+ TKqpComputeEvents::EvRemoteScanDataAck> {
+ };
struct TEvScanDataAck : public NActors::TEventLocal<TEvScanDataAck, TKqpComputeEvents::EvScanDataAck> {
explicit TEvScanDataAck(ui64 freeSpace, ui32 generation = 0)
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 3a4e101ff5..bb647ed258 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -576,6 +576,24 @@ message TEvRemoteScanDataAck {
optional uint32 Generation = 2;
}
+message TEvRemoteCostData {
+ message TIntervalMeta {
+ optional bool IntervalSkipped = 1[default = false];
+ optional bool MarkIncluded = 2[default = true];
+ }
+
+ message TCostInfo {
+ repeated TIntervalMeta IntervalMeta = 1;
+ optional bool LeftBorderOpened = 2;
+ // ColumnsData contains RecordBatch with schema that serialized in ColumnsSchema
+ optional bytes ColumnsData = 3;
+ optional bytes ColumnsSchema = 4;
+ }
+
+ optional uint32 ScanId = 1;
+ optional TCostInfo CostInfo = 2;
+}
+
message TEvKillScanTablet {
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 31420c8b67..d2dd9ae089 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1426,6 +1426,7 @@ message TEvKqpScan {
optional NYql.NDqProto.EDqStatsMode StatsMode = 18;
optional bytes OlapProgram = 19;
optional NKikimrSchemeOp.EOlapProgramType OlapProgramType = 20;
+ optional bool CostDataOnly = 21[default = false];
}
message TEvCompactTable {
diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt
index 52d50b153d..f966e2b293 100644
--- a/ydb/core/tx/columnshard/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.txt
@@ -40,6 +40,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp
diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp
new file mode 100644
index 0000000000..a8c53cf9dd
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__costs.cpp
@@ -0,0 +1,222 @@
+#include "columnshard__costs.h"
+#include <ydb/core/tx/columnshard/engines/index_info.h>
+#include <ydb/core/tx/columnshard/engines/granules_table.h>
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
+#include <ydb/core/protos/ssa.pb.h>
+#include <ydb/core/tx/columnshard/engines/predicate.h>
+
+namespace NKikimr::NOlap::NCosts {
+
+TKeyMark TCostsOperator::BuildMarkFromGranule(const TGranuleRecord& record) const {
+ TKeyMark result;
+ result.AddValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey)));
+ return result;
+}
+
+TRangeMark TCostsOperator::BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const {
+ if (!p) {
+ return TRangeMark();
+ }
+ TRangeMark result;
+ for (i32 i = 0; i < p->Batch->num_columns(); ++i) {
+ auto column = p->Batch->column(i);
+ Y_VERIFY(!!column && column->length() == 1);
+ Y_VERIFY(!column->IsNull(0));
+ auto status = column->GetScalar(0);
+ Y_VERIFY(status.ok());
+ result.MutableMark().AddValue(status.ValueUnsafe());
+ }
+ result.SetMarkIncluded(p->Inclusive);
+ return result;
+}
+
+void TCostsOperator::FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const {
+ LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Request from " << (left ? left->Batch->ToString() : "-Inf") <<
+ " to " << (right ? right->Batch->ToString() : "+Inf"));
+
+ result.InitColumns(NArrow::MakeArrowSchema(IndexInfo.GetPK()));
+ {
+ TRangeMark leftBorder = BuildMarkFromPredicate(left);
+ if (!leftBorder.GetMark().ColumnsCount()) {
+ Y_VERIFY(!result.IsLeftBorderOpened());
+ result.SetLeftBorderOpened(true);
+ } else {
+ Y_VERIFY(result.AddRangeIfNotLess(std::move(leftBorder)));
+ }
+ }
+ for (auto&& i : GranuleRecords) {
+ auto granuleMark = BuildMarkFromGranule(i);
+ Y_VERIFY(result.AddRangeIfGrow(TRangeMark(std::move(granuleMark))));
+ }
+ {
+ TRangeMark rightBorder = BuildMarkFromPredicate(right);
+ if (rightBorder.GetMark().ColumnsCount()) {
+ Y_VERIFY(result.AddRangeIfNotLess(std::move(rightBorder)));
+ result.MutableRangeMarks().back().SetIntervalSkipped(true);
+ }
+ }
+}
+
+ui64 TCostsOperator::ExtractKey(const TString& key) {
+ Y_VERIFY(key.size() == 8);
+ return *reinterpret_cast<const ui64*>(key.data());
+}
+
+bool TRangeMark::operator<(const TRangeMark& item) const {
+ return Mark < item.Mark;
+}
+
+TString TRangeMark::ToString() const {
+ TStringBuilder sb;
+ sb << "MARK=" << Mark.ToString() << ";SKIP=" << IntervalSkipped << ";";
+ return sb;
+}
+
+NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const {
+ NKikimrKqp::TEvRemoteCostData::TCostInfo result;
+ result.SetLeftBorderOpened(LeftBorderOpened);
+ for (auto&& i : RangeMarks) {
+ *result.AddIntervalMeta() = i.SerializeMetaToProto();
+ }
+
+ std::shared_ptr<arrow::Schema> schema = KeyColumns;
+ *result.MutableColumnsSchema() = NArrow::SerializeSchema(*schema);
+ if (RangeMarks.empty()) {
+ return result;
+ }
+
+ TVector<std::unique_ptr<arrow::ArrayBuilder>> arrayBuilders;
+ for (auto&& f : KeyColumns->fields()) {
+ std::unique_ptr<arrow::ArrayBuilder> arrayBuilder;
+ Y_VERIFY(arrow::MakeBuilder(arrow::default_memory_pool(), f->type(), &arrayBuilder).ok());
+ arrayBuilders.emplace_back(std::move(arrayBuilder));
+ Y_VERIFY(arrayBuilders.back()->Reserve(RangeMarks.size()).ok());
+ }
+ for (auto&& i : RangeMarks) {
+ Y_VERIFY(i.GetMark().ColumnsCount() <= ColumnsCount());
+ auto itBuilder = arrayBuilders.begin();
+ for (auto&& scalar : i.GetMark()) {
+ Y_VERIFY(!!scalar);
+ auto correctScalar = scalar->CastTo((*itBuilder)->type());
+ Y_VERIFY(correctScalar.ok());
+ Y_VERIFY((*itBuilder)->AppendScalar(**correctScalar).ok());
+ ++itBuilder;
+ }
+ for (; itBuilder != arrayBuilders.end(); ++itBuilder) {
+ Y_VERIFY((*itBuilder)->AppendNull().ok());
+ }
+ }
+ TVector<std::shared_ptr<arrow::Array>> arrays;
+ for (auto&& i : arrayBuilders) {
+ arrow::Result<std::shared_ptr<arrow::Array>> aData = i->Finish();
+ Y_VERIFY(aData.ok());
+ arrays.emplace_back(aData.ValueUnsafe());
+ }
+ std::shared_ptr<arrow::RecordBatch> rb = arrow::RecordBatch::Make(schema, RangeMarks.size(), arrays);
+ *result.MutableColumnsData() = NArrow::SerializeBatchNoCompression(rb);
+ return result;
+}
+
+bool TKeyRanges::DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto) {
+ std::shared_ptr<arrow::Schema> schema = NArrow::DeserializeSchema(proto.GetColumnsSchema());
+ Y_VERIFY(schema);
+ LeftBorderOpened = proto.GetLeftBorderOpened();
+ KeyColumns = schema;
+
+ TVector<TRangeMark> resultLocal;
+ if (!!proto.GetColumnsData()) {
+ std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(proto.GetColumnsData(), schema);
+ Y_VERIFY(batch->num_columns() == (int)ColumnsCount());
+ resultLocal.reserve(batch->num_rows());
+ for (ui32 rowIdx = 0; rowIdx < batch->num_rows(); ++rowIdx) {
+ TKeyMark mark;
+ for (ui32 cIdx = 0; cIdx < ColumnsCount(); ++cIdx) {
+ if (batch->column(cIdx)->IsNull(rowIdx)) {
+ break;
+ }
+ auto valueStatus = batch->column(cIdx)->GetScalar(rowIdx);
+ Y_VERIFY(valueStatus.ok());
+ mark.AddValue(valueStatus.ValueUnsafe());
+ }
+ resultLocal.emplace_back(TRangeMark(std::move(mark)));
+ }
+ }
+ Y_VERIFY(resultLocal.size() == (size_t)proto.GetIntervalMeta().size());
+ ui32 idx = 0;
+ for (auto&& i : proto.GetIntervalMeta()) {
+ if (!resultLocal[idx++].DeserializeMetaFromProto(i)) {
+ return false;
+ }
+ }
+ std::swap(RangeMarks, resultLocal);
+ return true;
+}
+
+bool TKeyRanges::AddRangeIfGrow(TRangeMark&& range) {
+ Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount());
+ if (RangeMarks.empty() || RangeMarks.back() < range) {
+ RangeMarks.emplace_back(std::move(range));
+ return true;
+ }
+ return false;
+}
+
+bool TKeyRanges::AddRangeIfNotLess(TRangeMark&& range) {
+ Y_VERIFY(range.GetMark().ColumnsCount() <= ColumnsCount());
+ if (RangeMarks.empty() || RangeMarks.back() < range || !(range < RangeMarks.back())) {
+ RangeMarks.emplace_back(std::move(range));
+ return true;
+ }
+ return RangeMarks.back() < range;
+}
+
+TString TKeyRanges::ToString() const {
+ TStringBuilder sb;
+ sb << "COLUMNS: " << KeyColumns->ToString() << Endl;
+ sb << Endl << "DATA: " << Endl;
+ for (auto&& i : RangeMarks) {
+ sb << i.ToString() << ";";
+ }
+ return sb;
+}
+
+TKeyRanges::TKeyRanges() {
+ KeyColumns = arrow::SchemaBuilder().Finish().ValueUnsafe();
+}
+
+TString TKeyMark::ToString() const {
+ TStringBuilder sb;
+ for (auto&& i : Values) {
+ sb << (i ? i->ToString() : "NO_VALUE") << ";";
+ }
+ return sb;
+}
+
+bool TKeyMark::operator<(const TKeyMark& item) const {
+ auto itSelf = Values.begin();
+ auto itItem = item.Values.begin();
+ for (; itSelf != Values.end() && itItem != item.Values.end(); ++itSelf, ++itItem) {
+ if (NArrow::ScalarLess(*itSelf, *itItem)) {
+ return true;
+ } else if (NArrow::ScalarLess(*itItem, *itSelf)) {
+ return false;
+ }
+ }
+ if (itSelf == Values.end()) {
+ if (itItem == item.Values.end()) {
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ if (itItem == item.Values.end()) {
+ return true;
+ } else {
+ Y_VERIFY(false, "impossible");
+ }
+ }
+ return false;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h
new file mode 100644
index 0000000000..48c629f46f
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__costs.h
@@ -0,0 +1,157 @@
+#pragma once
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/protos/kqp.pb.h>
+
+namespace NKikimr::NOlap {
+ struct TIndexInfo;
+ struct TGranuleRecord;
+ struct TPredicate;
+}
+
+namespace NKikimr::NOlap::NCosts {
+
+
+class TKeyMark {
+private:
+ TVector<std::shared_ptr<arrow::Scalar>> Values;
+public:
+ TKeyMark() = default;
+
+ TString ToString() const;
+
+ TVector<std::shared_ptr<arrow::Scalar>>::const_iterator begin() const {
+ return Values.begin();
+ }
+ TVector<std::shared_ptr<arrow::Scalar>>::const_iterator end() const {
+ return Values.end();
+ }
+ TKeyMark& AddValue(std::shared_ptr<arrow::Scalar> value) {
+ Values.emplace_back(value);
+ return *this;
+ }
+ ui32 ColumnsCount() const {
+ return Values.size();
+ }
+ bool operator<(const TKeyMark& item) const;
+};
+
+class TRangeMark {
+private:
+ TKeyMark Mark;
+ bool IntervalSkipped = false;
+ bool MarkIncluded = true;
+public:
+ TRangeMark() = default;
+ TRangeMark(TKeyMark&& mark)
+ : Mark(std::move(mark))
+ {
+
+ }
+
+ TString ToString() const;
+
+ const TKeyMark& GetMark() const {
+ return Mark;
+ }
+
+ TKeyMark& MutableMark() {
+ return Mark;
+ }
+
+ bool operator<(const TRangeMark& item) const;
+
+ TRangeMark& SetIntervalSkipped(const bool value) {
+ IntervalSkipped = value;
+ return *this;
+ }
+
+ bool GetIntervalSkipped() const {
+ return IntervalSkipped;
+ }
+
+ TRangeMark& SetMarkIncluded(const bool value) {
+ MarkIncluded = value;
+ return *this;
+ }
+
+ bool GetMarkIncluded() const {
+ return MarkIncluded;
+ }
+
+ NKikimrKqp::TEvRemoteCostData::TIntervalMeta SerializeMetaToProto() const {
+ NKikimrKqp::TEvRemoteCostData::TIntervalMeta result;
+ result.SetIntervalSkipped(IntervalSkipped);
+ result.SetMarkIncluded(MarkIncluded);
+ return result;
+ }
+
+ bool DeserializeMetaFromProto(const NKikimrKqp::TEvRemoteCostData::TIntervalMeta& proto) {
+ SetIntervalSkipped(proto.GetIntervalSkipped());
+ SetMarkIncluded(proto.GetMarkIncluded());
+ return true;
+ }
+};
+
+class TKeyRanges {
+private:
+ std::shared_ptr<arrow::Schema> KeyColumns;
+ std::vector<TRangeMark> RangeMarks;
+ bool LeftBorderOpened = false;
+public:
+ TKeyRanges();
+
+ TKeyRanges& Reserve(const ui32 num) {
+ RangeMarks.reserve(num);
+ return *this;
+ }
+
+ TString ToString() const;
+
+ size_t ColumnsCount() const {
+ return KeyColumns ? KeyColumns->num_fields() : 0;
+ }
+ const arrow::Schema& GetKeyColumns() const {
+ return *KeyColumns;
+ }
+ const std::vector<TRangeMark>& GetRangeMarks() const {
+ return RangeMarks;
+ }
+ std::vector<TRangeMark>& MutableRangeMarks() {
+ return RangeMarks;
+ }
+ TKeyRanges& InitColumns(std::shared_ptr<arrow::Schema> schema) {
+ KeyColumns = schema;
+ return *this;
+ }
+ TKeyRanges& SetLeftBorderOpened(const bool value) {
+ LeftBorderOpened = value;
+ return *this;
+ }
+ bool IsLeftBorderOpened() const {
+ return LeftBorderOpened;
+ }
+ bool AddRangeIfGrow(TRangeMark&& range);
+ bool AddRangeIfNotLess(TRangeMark&& range);
+
+ NKikimrKqp::TEvRemoteCostData::TCostInfo SerializeToProto() const;
+ bool DeserializeFromProto(const NKikimrKqp::TEvRemoteCostData::TCostInfo& proto);
+
+};
+
+class TCostsOperator {
+private:
+ const TVector<TGranuleRecord>& GranuleRecords;
+ const TIndexInfo& IndexInfo;
+ TRangeMark BuildMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p) const;
+ TKeyMark BuildMarkFromGranule(const TGranuleRecord& record) const;
+ static ui64 ExtractKey(const TString& key);
+public:
+ TCostsOperator(const TVector<TGranuleRecord>& granuleRecords, const TIndexInfo& indexInfo)
+ : GranuleRecords(granuleRecords)
+ , IndexInfo(indexInfo)
+ {
+
+ }
+ void FillRangeMarks(TKeyRanges& result, const std::shared_ptr<NOlap::TPredicate>& left, const std::shared_ptr<NOlap::TPredicate>& right) const;
+};
+}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index fb49312146..93fd867343 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -722,6 +722,20 @@ void TTxScan::Complete(const TActorContext& ctx) {
detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")"
<< " req: " << request;
}
+ TVector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges;
+ if (request.GetCostDataOnly()) {
+ for (auto&& i : ReadMetadataRanges) {
+ NOlap::TReadMetadata::TConstPtr rMetadata = std::dynamic_pointer_cast<const NOlap::TReadMetadata>(i);
+ if (!rMetadata || !rMetadata->SelectInfo) {
+ NOlap::NCosts::TKeyRanges ranges;
+ ranges.SetLeftBorderOpened(true);
+ auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId);
+ ctx.Send(scanComputeActor, ev.Release());
+ return;
+ }
+ rMetadataRanges.emplace_back(rMetadata);
+ }
+ }
if (ReadMetadataRanges.empty()) {
LOG_S_DEBUG("TTxScan failed "
@@ -746,6 +760,28 @@ void TTxScan::Complete(const TActorContext& ctx) {
return;
}
+ if (request.GetCostDataOnly()) {
+ NOlap::NCosts::TKeyRanges ranges;
+ if (request.GetReverse()) {
+ std::reverse(rMetadataRanges.begin(), rMetadataRanges.end());
+ }
+ {
+ ui32 recordsCount = 0;
+ for (auto&& i : rMetadataRanges) {
+ recordsCount += i->SelectInfo->Granules.size() + 2;
+ }
+ ranges.Reserve(recordsCount);
+ }
+ for (auto&& i : rMetadataRanges) {
+ NOlap::NCosts::TCostsOperator cOperator(i->SelectInfo->Granules, i->IndexInfo);
+ cOperator.FillRangeMarks(ranges, i->GreaterPredicate, i->LessPredicate);
+ }
+ LOG_S_DEBUG("TCostsOperator::BuildRangeMarks::Result " << ranges.ToString());
+ auto ev = MakeHolder<TEvKqpCompute::TEvCostData>(std::move(ranges), scanId);
+ ctx.Send(scanComputeActor, ev.Release());
+ return;
+ }
+
ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges, *Self->BlobManager);
auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
diff --git a/ydb/core/tx/columnshard/columnshard_common.cpp b/ydb/core/tx/columnshard/columnshard_common.cpp
index aed13a5a27..25337a206f 100644
--- a/ydb/core/tx/columnshard/columnshard_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_common.cpp
@@ -344,7 +344,8 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r
bool leftTrailingNull = false;
{
TConstArrayRef<TCell> cells = range.From.GetCells();
- size_t size = cells.size();
+ const size_t size = cells.size();
+ Y_ASSERT(size <= columns.size());
leftCells.reserve(size);
leftColumns.reserve(size);
for (size_t i = 0; i < size; ++i) {
@@ -363,7 +364,8 @@ std::pair<TPredicate, TPredicate> RangePredicates(const TSerializedTableRange& r
bool rightTrailingNull = false;
{
TConstArrayRef<TCell> cells = range.To.GetCells();
- size_t size = cells.size();
+ const size_t size = cells.size();
+ Y_ASSERT(size <= columns.size());
rightCells.reserve(size);
rightColumns.reserve(size);
for (size_t i = 0; i < size; ++i) {
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt
index 95fe0abeb6..0aee1e51a3 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt
@@ -26,6 +26,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
)
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
new file mode 100644
index 0000000000..99542dd0a1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -0,0 +1,4 @@
+#include "column_engine.h"
+
+namespace NKikimr::NOlap {
+}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 8d790cf014..ea85a05c88 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -253,7 +253,7 @@ struct TSelectInfo {
}
};
- TVector<TGranuleRecord> Granules; // oredered by key (asc)
+ TVector<TGranuleRecord> Granules; // ordered by key (ascending)
TVector<TPortionInfo> Portions;
TVector<ui64> GranulesOrder(bool rev = false) const {
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index 69a3b14caf..ed516f2941 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -223,7 +223,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
/*
- * Check that remote scan actually happend.
+ * Check that remote scan actually happened.
*/
case NKqp::TKqpComputeEvents::EvScanData: {
remoteScanDetected = remoteScanDetected || ev->Sender.NodeId() != ev->Recipient.NodeId();