diff options
author | chertus <azuikov@ydb.tech> | 2022-10-18 21:09:11 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-10-18 21:09:11 +0300 |
commit | cf532004dcdf731c0035e191c519fcb8704225e9 (patch) | |
tree | 82673dedd35dc2ecc7c30ac44a401f35af8dacef | |
parent | 9cfe49a7d9c3155bb6aec4b84db58c1a2b7a3136 (diff) | |
download | ydb-cf532004dcdf731c0035e191c519fcb8704225e9.tar.gz |
split tx and events location in columnshard's code
16 files changed, 303 insertions, 296 deletions
diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h index 998f3502c3f..078f3920ab3 100644 --- a/ydb/core/kqp/kqp_compute.h +++ b/ydb/core/kqp/kqp_compute.h @@ -7,7 +7,7 @@ #include <ydb/core/scheme/scheme_tabledefs.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> -#include <ydb/core/tx/columnshard/columnshard__costs.h> +#include <ydb/core/tx/columnshard/columnshard_costs.h> namespace NKikimr::NKqp { diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt index f966e2b2932..74b573329ec 100644 --- a/ydb/core/tx/columnshard/CMakeLists.txt +++ b/ydb/core/tx/columnshard/CMakeLists.txt @@ -40,7 +40,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp @@ -50,6 +49,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__write.cpp @@ -57,6 +57,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index f0f3d56e496..fe67c9360ac 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -1,5 +1,4 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" namespace NKikimr { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 89ac045f2b5..e06755ad671 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -1,6 +1,6 @@ #include "columnshard_impl.h" #include "columnshard_ttl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "columnshard_schema.h" #include "blob_manager_db.h" @@ -12,6 +12,21 @@ using namespace NTabletFlatExecutor; // TTxInit => SwitchToWork +/// Load data from local database +class TTxInit : public TTransactionBase<TColumnShard> { +public: + TTxInit(TColumnShard* self) + : TBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_INIT; } + + void SetDefaults(); + bool ReadEverything(TTransactionContext& txc, const TActorContext& ctx); +}; + void TTxInit::SetDefaults() { Self->CurrentSchemeShardId = 0; Self->LastSchemaSeqNo = { }; @@ -322,6 +337,18 @@ void TTxInit::Complete(const TActorContext& ctx) { // TTxUpdateSchema => TTxInit +/// Update local database on tablet start +class TTxUpdateSchema : public TTransactionBase<TColumnShard> { +public: + TTxUpdateSchema(TColumnShard* self) + : TBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; } +}; + bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { Y_UNUSED(txc); LOG_S_DEBUG("TTxUpdateSchema.Execute at tablet " << Self->TabletID()); @@ -335,6 +362,18 @@ void TTxUpdateSchema::Complete(const TActorContext& ctx) { // TTxInitSchema => TTxUpdateSchema +/// Create local database on tablet start if none +class TTxInitSchema : public TTransactionBase<TColumnShard> { +public: + TTxInitSchema(TColumnShard* self) + : TBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_INIT_SCHEMA; } +}; + bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID()); diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp index ed8f1ea1be7..d11687dc72e 100644 --- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp +++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -1,5 +1,5 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "columnshard_schema.h" #include <util/string/vector.h> diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 06813954659..7e9490dbad4 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -1,5 +1,5 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "columnshard_schema.h" namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index f6d08e7f32a..14700c5a6f2 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -1,10 +1,9 @@ -#include "columnshard_impl.h" -#include "columnshard_txs.h" -#include "columnshard_schema.h" -#include "columnshard__index_scan.h" -#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/columnshard__read_base.h> +#include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> -#include <ydb/core/formats/ssa_program_optimizer.h> namespace NKikimr::NColumnShard { @@ -37,201 +36,6 @@ private: }; -std::shared_ptr<NOlap::TReadMetadata> -TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read, - const std::unique_ptr<NOlap::TInsertTable>& insertTable, - const std::unique_ptr<NOlap::IColumnEngine>& index, - const TBatchCache& batchCache, - TString& error) const { - Y_UNUSED(ctx); - - if (!insertTable || !index) { - return {}; - } - - if (read.PlanStep < Self->GetMinReadStep()) { - error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId); - return {}; - } - - const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo(); - auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo); - auto& out = *spOut; - - out.PlanStep = read.PlanStep; - out.TxId = read.TxId; - - // schemas - - out.BlobSchema = indexInfo.ArrowSchema(); - if (read.ColumnIds.size()) { - out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds); - } else if (read.ColumnNames.size()) { - out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames); - } else { - error = "Empty column list requested"; - return {}; - } - - if (!out.BlobSchema) { - error = "Could not get BlobSchema."; - return {}; - } - - if (!out.ResultSchema) { - error = "Could not get ResultSchema."; - return {}; - } - - // insert table - - out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId); - for (auto& cmt : out.CommittedBlobs) { - if (auto batch = batchCache.Get(cmt.BlobId)) { - out.CommittedBatches.emplace(cmt.BlobId, batch); - } - } - - // index - - /// @note We could have column name changes between schema versions: - /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads. - /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}. - /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones. - /// It's not possible for blobs with several columns. There should be a special logic for them. - TVector<TString> columns = read.ColumnNames; - if (!read.ColumnIds.empty()) { - columns = indexInfo.GetColumnNames(read.ColumnIds); - } - Y_VERIFY(!columns.empty(), "Empty column list"); - - { // Add more columns: snapshot, replace, predicate - // Key columns (replace, sort) - THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns(); - - // Snapshot columns - requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP); - requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID); - - // Predicate columns - if (read.LessPredicate) { - for (auto& col : read.LessPredicate->ColumnNames()) { - requiredColumns.insert(col); - } - } - if (read.GreaterPredicate) { - for (auto& col : read.GreaterPredicate->ColumnNames()) { - requiredColumns.insert(col); - } - } - - for (auto& col : columns) { - requiredColumns.erase(col); - } - - for (auto& reqCol : requiredColumns) { - columns.push_back(reqCol); - } - } - - out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns); - if (!out.LoadSchema) { - return {}; - } - - if (read.LessPredicate) { - if (!read.LessPredicate->Good() || - !read.LessPredicate->IsTo()) { - return {}; - } - out.LessPredicate = read.LessPredicate; - } - if (read.GreaterPredicate) { - if (!read.GreaterPredicate->Good() || - !read.GreaterPredicate->IsFrom()) { - return {}; - } - out.GreaterPredicate = read.GreaterPredicate; - } - - THashSet<ui32> columnIds; - for (auto& field : out.LoadSchema->fields()) { - TString column(field->name().data(), field->name().size()); - columnIds.insert(indexInfo.GetColumnId(column)); - } - - out.Program = std::move(read.Program); - for (auto& [id, name] : read.ProgramSourceColumns) { - columnIds.insert(id); - } - - if (read.ReadNothing) { - out.SelectInfo = std::make_shared<NOlap::TSelectInfo>(); - } else { - out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds, - out.GreaterPredicate, out.LessPredicate); - } - return spOut; -} - -bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType, - TString serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) -{ - if (serializedProgram.empty()) { - return true; - } - - NKikimrSSA::TProgram program; - NKikimrSSA::TOlapProgram olapProgram; - - switch (programType) { - case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS: - if (!olapProgram.ParseFromString(serializedProgram)) { - ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID(); - return false; - } - - if (!program.ParseFromString(olapProgram.GetProgram())) { - ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID(); - return false; - } - - break; - default: - ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType; - return false; - } - - if (ctx.LoggerSettings() && - ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) - { - TString out; - ::google::protobuf::TextFormat::PrintToString(program, &out); - LOG_S_DEBUG("Process program: " << Endl << out); - } - - if (olapProgram.HasParameters()) { - Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema."); - - auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema()); - read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema); - } - - - auto ssaProgramSteps = read.AddProgram(columnResolver, program); - if (!ssaProgramSteps) { - ErrorDescription = TStringBuilder() << "Wrong olap program"; - return false; - } - if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) { - ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo()); - } - - read.Program = ssaProgramSteps->Program; - read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns; - return true; -} - bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); Y_VERIFY(Self->PrimaryIndex); diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp new file mode 100644 index 00000000000..4ab8d8acb26 --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -0,0 +1,203 @@ +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/columnshard__read_base.h> +#include <ydb/core/tx/columnshard/columnshard__index_scan.h> +#include <ydb/core/formats/ssa_program_optimizer.h> + +namespace NKikimr::NColumnShard { + +std::shared_ptr<NOlap::TReadMetadata> +TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read, + const std::unique_ptr<NOlap::TInsertTable>& insertTable, + const std::unique_ptr<NOlap::IColumnEngine>& index, + const TBatchCache& batchCache, + TString& error) const { + Y_UNUSED(ctx); + + if (!insertTable || !index) { + return {}; + } + + if (read.PlanStep < Self->GetMinReadStep()) { + error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId); + return {}; + } + + const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo(); + auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo); + auto& out = *spOut; + + out.PlanStep = read.PlanStep; + out.TxId = read.TxId; + + // schemas + + out.BlobSchema = indexInfo.ArrowSchema(); + if (read.ColumnIds.size()) { + out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds); + } else if (read.ColumnNames.size()) { + out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames); + } else { + error = "Empty column list requested"; + return {}; + } + + if (!out.BlobSchema) { + error = "Could not get BlobSchema."; + return {}; + } + + if (!out.ResultSchema) { + error = "Could not get ResultSchema."; + return {}; + } + + // insert table + + out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId); + for (auto& cmt : out.CommittedBlobs) { + if (auto batch = batchCache.Get(cmt.BlobId)) { + out.CommittedBatches.emplace(cmt.BlobId, batch); + } + } + + // index + + /// @note We could have column name changes between schema versions: + /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads. + /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}. + /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones. + /// It's not possible for blobs with several columns. There should be a special logic for them. + TVector<TString> columns = read.ColumnNames; + if (!read.ColumnIds.empty()) { + columns = indexInfo.GetColumnNames(read.ColumnIds); + } + Y_VERIFY(!columns.empty(), "Empty column list"); + + { // Add more columns: snapshot, replace, predicate + // Key columns (replace, sort) + THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns(); + + // Snapshot columns + requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP); + requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID); + + // Predicate columns + if (read.LessPredicate) { + for (auto& col : read.LessPredicate->ColumnNames()) { + requiredColumns.insert(col); + } + } + if (read.GreaterPredicate) { + for (auto& col : read.GreaterPredicate->ColumnNames()) { + requiredColumns.insert(col); + } + } + + for (auto& col : columns) { + requiredColumns.erase(col); + } + + for (auto& reqCol : requiredColumns) { + columns.push_back(reqCol); + } + } + + out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns); + if (!out.LoadSchema) { + return {}; + } + + if (read.LessPredicate) { + if (!read.LessPredicate->Good() || + !read.LessPredicate->IsTo()) { + return {}; + } + out.LessPredicate = read.LessPredicate; + } + if (read.GreaterPredicate) { + if (!read.GreaterPredicate->Good() || + !read.GreaterPredicate->IsFrom()) { + return {}; + } + out.GreaterPredicate = read.GreaterPredicate; + } + + THashSet<ui32> columnIds; + for (auto& field : out.LoadSchema->fields()) { + TString column(field->name().data(), field->name().size()); + columnIds.insert(indexInfo.GetColumnId(column)); + } + + out.Program = std::move(read.Program); + for (auto& [id, name] : read.ProgramSourceColumns) { + columnIds.insert(id); + } + + if (read.ReadNothing) { + out.SelectInfo = std::make_shared<NOlap::TSelectInfo>(); + } else { + out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds, + out.GreaterPredicate, out.LessPredicate); + } + return spOut; +} + +bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType, + TString serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) +{ + if (serializedProgram.empty()) { + return true; + } + + NKikimrSSA::TProgram program; + NKikimrSSA::TOlapProgram olapProgram; + + switch (programType) { + case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS: + if (!olapProgram.ParseFromString(serializedProgram)) { + ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID(); + return false; + } + + if (!program.ParseFromString(olapProgram.GetProgram())) { + ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID(); + return false; + } + + break; + default: + ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType; + return false; + } + + if (ctx.LoggerSettings() && + ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) + { + TString out; + ::google::protobuf::TextFormat::PrintToString(program, &out); + LOG_S_DEBUG("Process program: " << Endl << out); + } + + if (olapProgram.HasParameters()) { + Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema."); + + auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema()); + read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema); + } + + + auto ssaProgramSteps = read.AddProgram(columnResolver, program); + if (!ssaProgramSteps) { + ErrorDescription = TStringBuilder() << "Wrong olap program"; + return false; + } + if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) { + ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo()); + } + + read.Program = ssaProgramSteps->Program; + read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns; + return true; +} + +} diff --git a/ydb/core/tx/columnshard/columnshard__read_base.h b/ydb/core/tx/columnshard/columnshard__read_base.h new file mode 100644 index 00000000000..2962eba3597 --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__read_base.h @@ -0,0 +1,34 @@ +#pragma once +#include <ydb/core/tx/columnshard/columnshard_impl.h> + +namespace NKikimr::NColumnShard { + +/// Read portion of data in OLAP transaction +class TTxReadBase : public TTransactionBase<TColumnShard> { +protected: + explicit TTxReadBase(TColumnShard* self) + : TBase(self) + {} + + std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata( + const TActorContext& ctx, + const TReadDescription& readDescription, + const std::unique_ptr<NOlap::TInsertTable>& insertTable, + const std::unique_ptr<NOlap::IColumnEngine>& index, + const TBatchCache& batchCache, + TString& error) const; + +protected: + bool ParseProgram( + const TActorContext& ctx, + NKikimrSchemeOp::EOlapProgramType programType, + TString serializedProgram, + TReadDescription& read, + const IColumnResolver& columnResolver + ); + +protected: + TString ErrorDescription; +}; + +} diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp index 1dd732b3d10..5f9e0615457 100644 --- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp @@ -1,5 +1,5 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "columnshard_schema.h" #include "blob_manager_db.h" diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 41245654f76..74081ab4717 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -1,10 +1,10 @@ -#include "columnshard__scan.h" -#include "columnshard__index_scan.h" -#include "columnshard__stats_scan.h" - +#include <ydb/core/tx/columnshard/columnshard__scan.h> +#include <ydb/core/tx/columnshard/columnshard__index_scan.h> +#include <ydb/core/tx/columnshard/columnshard__stats_scan.h> +#include <ydb/core/tx/columnshard/columnshard__read_base.h> #include <ydb/core/tx/columnshard/blob_cache.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/columnshard_txs.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> #include <ydb/core/tablet_flat/flat_row_celled.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/kqp/kqp_compute.h> diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 14b073c7898..110148fafc4 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -1,5 +1,5 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "columnshard_schema.h" #include "blob_manager_db.h" #include "blob_cache.h" diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp index f258d669905..0b2f1742ab5 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.cpp +++ b/ydb/core/tx/columnshard/columnshard_costs.cpp @@ -1,4 +1,4 @@ -#include "columnshard__costs.h" +#include <ydb/core/tx/columnshard/columnshard_costs.h> #include <ydb/core/tx/columnshard/engines/index_info.h> #include <ydb/core/tx/columnshard/engines/granules_table.h> #include <ydb/core/formats/arrow_helpers.h> diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard_costs.h index b7cb20ebe5d..b7cb20ebe5d 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.h +++ b/ydb/core/tx/columnshard/columnshard_costs.h diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 3f1f6c33aa7..1b3048f357e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -3,7 +3,7 @@ #include "columnshard.h" #include "columnshard_common.h" #include "columnshard_ttl.h" -#include "columnshard_txs.h" +#include "columnshard_private_events.h" #include "blob_manager.h" #include "inflight_request_tracker.h" @@ -12,6 +12,9 @@ #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tablet_flat/tablet_flat_executor.h> +#include <ydb/core/tx/tx_processing.h> namespace NKikimr::NColumnShard { @@ -68,6 +71,10 @@ struct TSettings { } }; +using ITransaction = NTabletFlatExecutor::ITransaction; + +template <typename T> +using TTransactionBase = NTabletFlatExecutor::TTransactionBase<T>; class TColumnShard : public TActor<TColumnShard> diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 1ffc693fbd2..bfbe1c429a7 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -2,13 +2,7 @@ #include "blob_manager.h" -#include <ydb/core/tx/columnshard/engines/column_engine.h> -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> - #include <ydb/core/protos/counters_columnshard.pb.h> -#include <ydb/core/tablet_flat/tablet_flat_executed.h> -#include <ydb/core/tablet_flat/tablet_flat_executor.h> -#include <ydb/core/tx/tx_processing.h> namespace NKikimr::NColumnShard { @@ -211,78 +205,4 @@ struct TEvPrivate { }; }; -using ITransaction = NTabletFlatExecutor::ITransaction; - -template <typename T> -using TTransactionBase = NTabletFlatExecutor::TTransactionBase<T>; - -class TColumnShard; - -/// Load data from local database -class TTxInit : public TTransactionBase<TColumnShard> { -public: - TTxInit(TColumnShard* self) - : TBase(self) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_INIT; } - - void SetDefaults(); - bool ReadEverything(TTransactionContext& txc, const TActorContext& ctx); -}; - -/// Create local database on tablet start if none -class TTxInitSchema : public TTransactionBase<TColumnShard> { -public: - TTxInitSchema(TColumnShard* self) - : TBase(self) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_INIT_SCHEMA; } -}; - -/// Update local database on tablet start -class TTxUpdateSchema : public TTransactionBase<TColumnShard> { -public: - TTxUpdateSchema(TColumnShard* self) - : TBase(self) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; } -}; - -/// Read portion of data in OLAP transaction -class TTxReadBase : public TTransactionBase<TColumnShard> { -protected: - explicit TTxReadBase(TColumnShard* self) - : TBase(self) - {} - - std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata( - const TActorContext& ctx, - const TReadDescription& readDescription, - const std::unique_ptr<NOlap::TInsertTable>& insertTable, - const std::unique_ptr<NOlap::IColumnEngine>& index, - const TBatchCache& batchCache, - TString& error) const; - -protected: - bool ParseProgram( - const TActorContext& ctx, - NKikimrSchemeOp::EOlapProgramType programType, - TString serializedProgram, - TReadDescription& read, - const IColumnResolver& columnResolver - ); - -protected: - TString ErrorDescription; -}; - } |