aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-10-18 21:09:11 +0300
committerchertus <azuikov@ydb.tech>2022-10-18 21:09:11 +0300
commitcf532004dcdf731c0035e191c519fcb8704225e9 (patch)
tree82673dedd35dc2ecc7c30ac44a401f35af8dacef
parent9cfe49a7d9c3155bb6aec4b84db58c1a2b7a3136 (diff)
downloadydb-cf532004dcdf731c0035e191c519fcb8704225e9.tar.gz
split tx and events location in columnshard's code
-rw-r--r--ydb/core/kqp/kqp_compute.h2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.txt3
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp41
-rw-r--r--ydb/core/tx/columnshard/columnshard__plan_step.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp206
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp203
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.h34
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_costs.cpp (renamed from ydb/core/tx/columnshard/columnshard__costs.cpp)2
-rw-r--r--ydb/core/tx/columnshard/columnshard_costs.h (renamed from ydb/core/tx/columnshard/columnshard__costs.h)0
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h (renamed from ydb/core/tx/columnshard/columnshard_txs.h)80
16 files changed, 303 insertions, 296 deletions
diff --git a/ydb/core/kqp/kqp_compute.h b/ydb/core/kqp/kqp_compute.h
index 998f3502c3f..078f3920ab3 100644
--- a/ydb/core/kqp/kqp_compute.h
+++ b/ydb/core/kqp/kqp_compute.h
@@ -7,7 +7,7 @@
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
-#include <ydb/core/tx/columnshard/columnshard__costs.h>
+#include <ydb/core/tx/columnshard/columnshard_costs.h>
namespace NKikimr::NKqp {
diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt
index f966e2b2932..74b573329ec 100644
--- a/ydb/core/tx/columnshard/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.txt
@@ -40,7 +40,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -50,6 +49,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -57,6 +57,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index f0f3d56e496..fe67c9360ac 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -1,5 +1,4 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
namespace NKikimr {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 89ac045f2b5..e06755ad671 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -1,6 +1,6 @@
#include "columnshard_impl.h"
#include "columnshard_ttl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "columnshard_schema.h"
#include "blob_manager_db.h"
@@ -12,6 +12,21 @@ using namespace NTabletFlatExecutor;
// TTxInit => SwitchToWork
+/// Load data from local database
+class TTxInit : public TTransactionBase<TColumnShard> {
+public:
+ TTxInit(TColumnShard* self)
+ : TBase(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_INIT; }
+
+ void SetDefaults();
+ bool ReadEverything(TTransactionContext& txc, const TActorContext& ctx);
+};
+
void TTxInit::SetDefaults() {
Self->CurrentSchemeShardId = 0;
Self->LastSchemaSeqNo = { };
@@ -322,6 +337,18 @@ void TTxInit::Complete(const TActorContext& ctx) {
// TTxUpdateSchema => TTxInit
+/// Update local database on tablet start
+class TTxUpdateSchema : public TTransactionBase<TColumnShard> {
+public:
+ TTxUpdateSchema(TColumnShard* self)
+ : TBase(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; }
+};
+
bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
Y_UNUSED(txc);
LOG_S_DEBUG("TTxUpdateSchema.Execute at tablet " << Self->TabletID());
@@ -335,6 +362,18 @@ void TTxUpdateSchema::Complete(const TActorContext& ctx) {
// TTxInitSchema => TTxUpdateSchema
+/// Create local database on tablet start if none
+class TTxInitSchema : public TTransactionBase<TColumnShard> {
+public:
+ TTxInitSchema(TColumnShard* self)
+ : TBase(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_INIT_SCHEMA; }
+};
+
bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());
diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
index ed8f1ea1be7..d11687dc72e 100644
--- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp
+++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
@@ -1,5 +1,5 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "columnshard_schema.h"
#include <util/string/vector.h>
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 06813954659..7e9490dbad4 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -1,5 +1,5 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "columnshard_schema.h"
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index f6d08e7f32a..14700c5a6f2 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -1,10 +1,9 @@
-#include "columnshard_impl.h"
-#include "columnshard_txs.h"
-#include "columnshard_schema.h"
-#include "columnshard__index_scan.h"
-#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/columnshard__read_base.h>
+#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-#include <ydb/core/formats/ssa_program_optimizer.h>
namespace NKikimr::NColumnShard {
@@ -37,201 +36,6 @@ private:
};
-std::shared_ptr<NOlap::TReadMetadata>
-TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read,
- const std::unique_ptr<NOlap::TInsertTable>& insertTable,
- const std::unique_ptr<NOlap::IColumnEngine>& index,
- const TBatchCache& batchCache,
- TString& error) const {
- Y_UNUSED(ctx);
-
- if (!insertTable || !index) {
- return {};
- }
-
- if (read.PlanStep < Self->GetMinReadStep()) {
- error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId);
- return {};
- }
-
- const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo();
- auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo);
- auto& out = *spOut;
-
- out.PlanStep = read.PlanStep;
- out.TxId = read.TxId;
-
- // schemas
-
- out.BlobSchema = indexInfo.ArrowSchema();
- if (read.ColumnIds.size()) {
- out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds);
- } else if (read.ColumnNames.size()) {
- out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames);
- } else {
- error = "Empty column list requested";
- return {};
- }
-
- if (!out.BlobSchema) {
- error = "Could not get BlobSchema.";
- return {};
- }
-
- if (!out.ResultSchema) {
- error = "Could not get ResultSchema.";
- return {};
- }
-
- // insert table
-
- out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId);
- for (auto& cmt : out.CommittedBlobs) {
- if (auto batch = batchCache.Get(cmt.BlobId)) {
- out.CommittedBatches.emplace(cmt.BlobId, batch);
- }
- }
-
- // index
-
- /// @note We could have column name changes between schema versions:
- /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads.
- /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}.
- /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones.
- /// It's not possible for blobs with several columns. There should be a special logic for them.
- TVector<TString> columns = read.ColumnNames;
- if (!read.ColumnIds.empty()) {
- columns = indexInfo.GetColumnNames(read.ColumnIds);
- }
- Y_VERIFY(!columns.empty(), "Empty column list");
-
- { // Add more columns: snapshot, replace, predicate
- // Key columns (replace, sort)
- THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns();
-
- // Snapshot columns
- requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP);
- requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID);
-
- // Predicate columns
- if (read.LessPredicate) {
- for (auto& col : read.LessPredicate->ColumnNames()) {
- requiredColumns.insert(col);
- }
- }
- if (read.GreaterPredicate) {
- for (auto& col : read.GreaterPredicate->ColumnNames()) {
- requiredColumns.insert(col);
- }
- }
-
- for (auto& col : columns) {
- requiredColumns.erase(col);
- }
-
- for (auto& reqCol : requiredColumns) {
- columns.push_back(reqCol);
- }
- }
-
- out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns);
- if (!out.LoadSchema) {
- return {};
- }
-
- if (read.LessPredicate) {
- if (!read.LessPredicate->Good() ||
- !read.LessPredicate->IsTo()) {
- return {};
- }
- out.LessPredicate = read.LessPredicate;
- }
- if (read.GreaterPredicate) {
- if (!read.GreaterPredicate->Good() ||
- !read.GreaterPredicate->IsFrom()) {
- return {};
- }
- out.GreaterPredicate = read.GreaterPredicate;
- }
-
- THashSet<ui32> columnIds;
- for (auto& field : out.LoadSchema->fields()) {
- TString column(field->name().data(), field->name().size());
- columnIds.insert(indexInfo.GetColumnId(column));
- }
-
- out.Program = std::move(read.Program);
- for (auto& [id, name] : read.ProgramSourceColumns) {
- columnIds.insert(id);
- }
-
- if (read.ReadNothing) {
- out.SelectInfo = std::make_shared<NOlap::TSelectInfo>();
- } else {
- out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds,
- out.GreaterPredicate, out.LessPredicate);
- }
- return spOut;
-}
-
-bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType,
- TString serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver)
-{
- if (serializedProgram.empty()) {
- return true;
- }
-
- NKikimrSSA::TProgram program;
- NKikimrSSA::TOlapProgram olapProgram;
-
- switch (programType) {
- case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS:
- if (!olapProgram.ParseFromString(serializedProgram)) {
- ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID();
- return false;
- }
-
- if (!program.ParseFromString(olapProgram.GetProgram())) {
- ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID();
- return false;
- }
-
- break;
- default:
- ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType;
- return false;
- }
-
- if (ctx.LoggerSettings() &&
- ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD))
- {
- TString out;
- ::google::protobuf::TextFormat::PrintToString(program, &out);
- LOG_S_DEBUG("Process program: " << Endl << out);
- }
-
- if (olapProgram.HasParameters()) {
- Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema.");
-
- auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema());
- read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema);
- }
-
-
- auto ssaProgramSteps = read.AddProgram(columnResolver, program);
- if (!ssaProgramSteps) {
- ErrorDescription = TStringBuilder() << "Wrong olap program";
- return false;
- }
- if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) {
- ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo());
- }
-
- read.Program = ssaProgramSteps->Program;
- read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns;
- return true;
-}
-
bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
Y_VERIFY(Self->PrimaryIndex);
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
new file mode 100644
index 00000000000..4ab8d8acb26
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -0,0 +1,203 @@
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/columnshard__read_base.h>
+#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
+#include <ydb/core/formats/ssa_program_optimizer.h>
+
+namespace NKikimr::NColumnShard {
+
+std::shared_ptr<NOlap::TReadMetadata>
+TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read,
+ const std::unique_ptr<NOlap::TInsertTable>& insertTable,
+ const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const TBatchCache& batchCache,
+ TString& error) const {
+ Y_UNUSED(ctx);
+
+ if (!insertTable || !index) {
+ return {};
+ }
+
+ if (read.PlanStep < Self->GetMinReadStep()) {
+ error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId);
+ return {};
+ }
+
+ const NOlap::TIndexInfo& indexInfo = index->GetIndexInfo();
+ auto spOut = std::make_shared<NOlap::TReadMetadata>(indexInfo);
+ auto& out = *spOut;
+
+ out.PlanStep = read.PlanStep;
+ out.TxId = read.TxId;
+
+ // schemas
+
+ out.BlobSchema = indexInfo.ArrowSchema();
+ if (read.ColumnIds.size()) {
+ out.ResultSchema = indexInfo.ArrowSchema(read.ColumnIds);
+ } else if (read.ColumnNames.size()) {
+ out.ResultSchema = indexInfo.ArrowSchema(read.ColumnNames);
+ } else {
+ error = "Empty column list requested";
+ return {};
+ }
+
+ if (!out.BlobSchema) {
+ error = "Could not get BlobSchema.";
+ return {};
+ }
+
+ if (!out.ResultSchema) {
+ error = "Could not get ResultSchema.";
+ return {};
+ }
+
+ // insert table
+
+ out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId);
+ for (auto& cmt : out.CommittedBlobs) {
+ if (auto batch = batchCache.Get(cmt.BlobId)) {
+ out.CommittedBatches.emplace(cmt.BlobId, batch);
+ }
+ }
+
+ // index
+
+ /// @note We could have column name changes between schema versions:
+ /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads.
+ /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}.
+ /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones.
+ /// It's not possible for blobs with several columns. There should be a special logic for them.
+ TVector<TString> columns = read.ColumnNames;
+ if (!read.ColumnIds.empty()) {
+ columns = indexInfo.GetColumnNames(read.ColumnIds);
+ }
+ Y_VERIFY(!columns.empty(), "Empty column list");
+
+ { // Add more columns: snapshot, replace, predicate
+ // Key columns (replace, sort)
+ THashSet<TString> requiredColumns = indexInfo.GetRequiredColumns();
+
+ // Snapshot columns
+ requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_PLAN_STEP);
+ requiredColumns.insert(NOlap::TIndexInfo::SPEC_COL_TX_ID);
+
+ // Predicate columns
+ if (read.LessPredicate) {
+ for (auto& col : read.LessPredicate->ColumnNames()) {
+ requiredColumns.insert(col);
+ }
+ }
+ if (read.GreaterPredicate) {
+ for (auto& col : read.GreaterPredicate->ColumnNames()) {
+ requiredColumns.insert(col);
+ }
+ }
+
+ for (auto& col : columns) {
+ requiredColumns.erase(col);
+ }
+
+ for (auto& reqCol : requiredColumns) {
+ columns.push_back(reqCol);
+ }
+ }
+
+ out.LoadSchema = indexInfo.AddColumns(out.ResultSchema, columns);
+ if (!out.LoadSchema) {
+ return {};
+ }
+
+ if (read.LessPredicate) {
+ if (!read.LessPredicate->Good() ||
+ !read.LessPredicate->IsTo()) {
+ return {};
+ }
+ out.LessPredicate = read.LessPredicate;
+ }
+ if (read.GreaterPredicate) {
+ if (!read.GreaterPredicate->Good() ||
+ !read.GreaterPredicate->IsFrom()) {
+ return {};
+ }
+ out.GreaterPredicate = read.GreaterPredicate;
+ }
+
+ THashSet<ui32> columnIds;
+ for (auto& field : out.LoadSchema->fields()) {
+ TString column(field->name().data(), field->name().size());
+ columnIds.insert(indexInfo.GetColumnId(column));
+ }
+
+ out.Program = std::move(read.Program);
+ for (auto& [id, name] : read.ProgramSourceColumns) {
+ columnIds.insert(id);
+ }
+
+ if (read.ReadNothing) {
+ out.SelectInfo = std::make_shared<NOlap::TSelectInfo>();
+ } else {
+ out.SelectInfo = index->Select(read.PathId, {read.PlanStep, read.TxId}, columnIds,
+ out.GreaterPredicate, out.LessPredicate);
+ }
+ return spOut;
+}
+
+bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType,
+ TString serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver)
+{
+ if (serializedProgram.empty()) {
+ return true;
+ }
+
+ NKikimrSSA::TProgram program;
+ NKikimrSSA::TOlapProgram olapProgram;
+
+ switch (programType) {
+ case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS:
+ if (!olapProgram.ParseFromString(serializedProgram)) {
+ ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID();
+ return false;
+ }
+
+ if (!program.ParseFromString(olapProgram.GetProgram())) {
+ ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID();
+ return false;
+ }
+
+ break;
+ default:
+ ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType;
+ return false;
+ }
+
+ if (ctx.LoggerSettings() &&
+ ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD))
+ {
+ TString out;
+ ::google::protobuf::TextFormat::PrintToString(program, &out);
+ LOG_S_DEBUG("Process program: " << Endl << out);
+ }
+
+ if (olapProgram.HasParameters()) {
+ Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema.");
+
+ auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema());
+ read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema);
+ }
+
+
+ auto ssaProgramSteps = read.AddProgram(columnResolver, program);
+ if (!ssaProgramSteps) {
+ ErrorDescription = TStringBuilder() << "Wrong olap program";
+ return false;
+ }
+ if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) {
+ ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo());
+ }
+
+ read.Program = ssaProgramSteps->Program;
+ read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns;
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.h b/ydb/core/tx/columnshard/columnshard__read_base.h
new file mode 100644
index 00000000000..2962eba3597
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__read_base.h
@@ -0,0 +1,34 @@
+#pragma once
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NColumnShard {
+
+/// Read portion of data in OLAP transaction
+class TTxReadBase : public TTransactionBase<TColumnShard> {
+protected:
+ explicit TTxReadBase(TColumnShard* self)
+ : TBase(self)
+ {}
+
+ std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata(
+ const TActorContext& ctx,
+ const TReadDescription& readDescription,
+ const std::unique_ptr<NOlap::TInsertTable>& insertTable,
+ const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const TBatchCache& batchCache,
+ TString& error) const;
+
+protected:
+ bool ParseProgram(
+ const TActorContext& ctx,
+ NKikimrSchemeOp::EOlapProgramType programType,
+ TString serializedProgram,
+ TReadDescription& read,
+ const IColumnResolver& columnResolver
+ );
+
+protected:
+ TString ErrorDescription;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
index 1dd732b3d10..5f9e0615457 100644
--- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
@@ -1,5 +1,5 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "columnshard_schema.h"
#include "blob_manager_db.h"
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 41245654f76..74081ab4717 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -1,10 +1,10 @@
-#include "columnshard__scan.h"
-#include "columnshard__index_scan.h"
-#include "columnshard__stats_scan.h"
-
+#include <ydb/core/tx/columnshard/columnshard__scan.h>
+#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
+#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
+#include <ydb/core/tx/columnshard/columnshard__read_base.h>
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/columnshard_txs.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tablet_flat/flat_row_celled.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/kqp/kqp_compute.h>
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 14b073c7898..110148fafc4 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -1,5 +1,5 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "columnshard_schema.h"
#include "blob_manager_db.h"
#include "blob_cache.h"
diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard_costs.cpp
index f258d669905..0b2f1742ab5 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.cpp
+++ b/ydb/core/tx/columnshard/columnshard_costs.cpp
@@ -1,4 +1,4 @@
-#include "columnshard__costs.h"
+#include <ydb/core/tx/columnshard/columnshard_costs.h>
#include <ydb/core/tx/columnshard/engines/index_info.h>
#include <ydb/core/tx/columnshard/engines/granules_table.h>
#include <ydb/core/formats/arrow_helpers.h>
diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard_costs.h
index b7cb20ebe5d..b7cb20ebe5d 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.h
+++ b/ydb/core/tx/columnshard/columnshard_costs.h
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 3f1f6c33aa7..1b3048f357e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -3,7 +3,7 @@
#include "columnshard.h"
#include "columnshard_common.h"
#include "columnshard_ttl.h"
-#include "columnshard_txs.h"
+#include "columnshard_private_events.h"
#include "blob_manager.h"
#include "inflight_request_tracker.h"
@@ -12,6 +12,9 @@
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+#include <ydb/core/tx/tx_processing.h>
namespace NKikimr::NColumnShard {
@@ -68,6 +71,10 @@ struct TSettings {
}
};
+using ITransaction = NTabletFlatExecutor::ITransaction;
+
+template <typename T>
+using TTransactionBase = NTabletFlatExecutor::TTransactionBase<T>;
class TColumnShard
: public TActor<TColumnShard>
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 1ffc693fbd2..bfbe1c429a7 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -2,13 +2,7 @@
#include "blob_manager.h"
-#include <ydb/core/tx/columnshard/engines/column_engine.h>
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
-
#include <ydb/core/protos/counters_columnshard.pb.h>
-#include <ydb/core/tablet_flat/tablet_flat_executed.h>
-#include <ydb/core/tablet_flat/tablet_flat_executor.h>
-#include <ydb/core/tx/tx_processing.h>
namespace NKikimr::NColumnShard {
@@ -211,78 +205,4 @@ struct TEvPrivate {
};
};
-using ITransaction = NTabletFlatExecutor::ITransaction;
-
-template <typename T>
-using TTransactionBase = NTabletFlatExecutor::TTransactionBase<T>;
-
-class TColumnShard;
-
-/// Load data from local database
-class TTxInit : public TTransactionBase<TColumnShard> {
-public:
- TTxInit(TColumnShard* self)
- : TBase(self)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_INIT; }
-
- void SetDefaults();
- bool ReadEverything(TTransactionContext& txc, const TActorContext& ctx);
-};
-
-/// Create local database on tablet start if none
-class TTxInitSchema : public TTransactionBase<TColumnShard> {
-public:
- TTxInitSchema(TColumnShard* self)
- : TBase(self)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_INIT_SCHEMA; }
-};
-
-/// Update local database on tablet start
-class TTxUpdateSchema : public TTransactionBase<TColumnShard> {
-public:
- TTxUpdateSchema(TColumnShard* self)
- : TBase(self)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; }
-};
-
-/// Read portion of data in OLAP transaction
-class TTxReadBase : public TTransactionBase<TColumnShard> {
-protected:
- explicit TTxReadBase(TColumnShard* self)
- : TBase(self)
- {}
-
- std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata(
- const TActorContext& ctx,
- const TReadDescription& readDescription,
- const std::unique_ptr<NOlap::TInsertTable>& insertTable,
- const std::unique_ptr<NOlap::IColumnEngine>& index,
- const TBatchCache& batchCache,
- TString& error) const;
-
-protected:
- bool ParseProgram(
- const TActorContext& ctx,
- NKikimrSchemeOp::EOlapProgramType programType,
- TString serializedProgram,
- TReadDescription& read,
- const IColumnResolver& columnResolver
- );
-
-protected:
- TString ErrorDescription;
-};
-
}