diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-01-30 13:07:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 13:07:20 +0300 |
commit | 97103370d3942b7301dfd3d042df52dcf0bc15b5 (patch) | |
tree | 79b56959ee1c14838ebf7aed631c85c05f7a6e19 | |
parent | 1975739200613f4a943b6249c31e3ac9b777f20f (diff) | |
download | ydb-97103370d3942b7301dfd3d042df52dcf0bc15b5.tar.gz |
Fixes for replace select (#1333)
-rw-r--r-- | ydb/core/base/appdata_fwd.h | 1 | ||||
-rw-r--r-- | ydb/core/formats/arrow/arrow_batch_builder.cpp | 10 | ||||
-rw-r--r-- | ydb/core/formats/arrow/arrow_batch_builder.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 162 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 182 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 1 |
12 files changed, 308 insertions, 97 deletions
diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 329183c5af..33422b430b 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -233,6 +233,7 @@ struct TAppData { bool EnableMvccSnapshotWithLegacyDomainRoot = false; bool UsePartitionStatsCollectorForTests = false; bool DisableCdcAutoSwitchingToReadyStateForTests = false; + bool EnableOlapSink = false; TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks TVector<TString> DefaultUserSIDs; TString AllAuthenticatedUsers = "all-users@well-known"; diff --git a/ydb/core/formats/arrow/arrow_batch_builder.cpp b/ydb/core/formats/arrow/arrow_batch_builder.cpp index 3e3fd04865..76c1942155 100644 --- a/ydb/core/formats/arrow/arrow_batch_builder.cpp +++ b/ydb/core/formats/arrow/arrow_batch_builder.cpp @@ -241,6 +241,16 @@ void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& key, const TConstAr } } +void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& row) { + ++NumRows; + + size_t offset = 0; + for (size_t i = 0; i < row.size(); ++i, ++offset) { + auto& cell = row[i]; + AppendCell(cell, offset); + } +} + void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) { if (!BatchBuilder || columnNo >= (ui32)BatchBuilder->num_fields()) { return; diff --git a/ydb/core/formats/arrow/arrow_batch_builder.h b/ydb/core/formats/arrow/arrow_batch_builder.h index 38c30d1cc0..2d4e1f3ed6 100644 --- a/ydb/core/formats/arrow/arrow_batch_builder.h +++ b/ydb/core/formats/arrow/arrow_batch_builder.h @@ -161,6 +161,7 @@ public: void AddRow(const NKikimr::TDbTupleRef& key, const NKikimr::TDbTupleRef& value) override; void AddRow(const TConstArrayRef<TCell>& key, const TConstArrayRef<TCell>& value); + void AddRow(const TConstArrayRef<TCell>& row); // You have to call it before Start() void Reserve(size_t numRows) { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 34b3074d9b..b0da00f2e8 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -477,6 +477,7 @@ private: bool enableSequences = TableServiceConfig.GetEnableSequences(); bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault(); + bool enableOlapSink = TableServiceConfig.GetEnableOlapSink(); auto mkqlHeavyLimit = TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit(); @@ -501,6 +502,7 @@ private: TableServiceConfig.GetIndexAutoChooseMode() != indexAutoChooser || TableServiceConfig.GetEnableSequences() != enableSequences || TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault || + TableServiceConfig.GetEnableOlapSink() != enableOlapSink || TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit || TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit) { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ea44de20d6..32ca975492 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,12 +127,14 @@ public: NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + const bool enableOlapSink) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter" ) , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) + , EnableOlapSink(enableOlapSink) { Target = creator; @@ -1661,11 +1663,21 @@ private: } for (const auto &tableOp : stage.GetTableOps()) { - if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange - && tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kUpsertRows) { + if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { return true; } } + + return false; + } + + bool HasOlapSink(const NKqpProto::TKqpPhyStage& stage) { + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) { + return true; + } + } + return false; } @@ -1698,7 +1710,8 @@ private: } } - if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) { + if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) + || (EnableOlapSink && HasOlapSink(stage))) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, @@ -2201,7 +2214,7 @@ private: } } - const bool singlePartitionOptAllowed = !UnknownAffectedShardCount && !HasExternalSources && (DatashardTxs.size() == 0); + const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && (DatashardTxs.size() == 0); const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.size() == 0); const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.size() == 0); @@ -2405,6 +2418,7 @@ private: private: NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; bool StreamResult = false; + bool EnableOlapSink = false; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2446,10 +2460,10 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 6d4cef595b..739f16d040 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -91,7 +91,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + const bool enableOlapSink); IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType, const TString& database, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index d6ad7a20a1..f43e2ad845 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -83,12 +83,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + const bool enableOlapSink) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -104,13 +105,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 227120f6a4..cd767f1ce4 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1769,7 +1769,8 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, + const bool enableOlapSink); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index bb5260bbf9..c6217022ae 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -6,6 +6,7 @@ #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/data_events/events.h> #include <ydb/core/tx/data_events/payload_helper.h> #include <ydb/core/tx/data_events/shards_splitter.h> @@ -19,7 +20,7 @@ namespace { struct TWriteActorBackoffSettings { - TDuration StartRetryDelay = TDuration::MilliSeconds(100); + TDuration StartRetryDelay = TDuration::MilliSeconds(150); TDuration MaxRetryDelay = TDuration::Seconds(5); double UnsertaintyRatio = 0.5; double Multiplier = 2.0; @@ -81,13 +82,13 @@ public: , Counters(counters) , TypeEnv(args.TypeEnv) , TxId(args.TxId) - , TableId(Settings.GetTable().GetOwnerId(), Settings.GetTable().GetTableId()) + , TableId( + Settings.GetTable().GetOwnerId(), + Settings.GetTable().GetTableId(), + Settings.GetTable().GetVersion()) { YQL_ENSURE(std::holds_alternative<ui64>(TxId)); EgressStats.Level = args.StatsLevel; - - BuildColumns(); - PrepareBatchBuilder(); } void Bootstrap() { @@ -114,7 +115,9 @@ private: } i64 GetFreeSpace() const final { - return MemoryLimit - (MemoryInFlight + BatchBuilder->Bytes()); + return SchemeEntry + ? MemoryLimit - (MemoryInFlight + BatchBuilder->Bytes()) + : std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit! } void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64, const TMaybe<NYql::NDqProto::TCheckpoint>&, bool finished) final { @@ -128,18 +131,19 @@ private: } void AddToInputQueue(NMiniKQL::TUnboxedValueBatch&& data) { + YQL_ENSURE(SchemeEntry); + YQL_ENSURE(BatchBuilder); + TVector<TCell> cells(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { - cells[index] = MakeCell( + cells[SendIndexToWriteIndexMapping[index]] = MakeCell( Columns[index].PType, row.GetElement(index), TypeEnv, /* copy */ false); } - BatchBuilder->AddRow( - TConstArrayRef<TCell>{cells.begin(), cells.begin() + KeyColumns.size()}, - TConstArrayRef<TCell>{cells.begin() + KeyColumns.size(), cells.end()}); + BatchBuilder->AddRow(TConstArrayRef<TCell>{cells.begin(), cells.end()}); }); } @@ -176,16 +180,21 @@ private: RuntimeError(TStringBuilder() << "Failed to get table: " << TableId << "'", NYql::NDqProto::StatusIds::SCHEME_ERROR); } - auto& resultSet = ev->Get()->Request->ResultSet; YQL_ENSURE(resultSet.size() == 1); SchemeEntry = resultSet[0]; YQL_ENSURE(SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable); - CA_LOG_D("Resolved TableId=" << TableId); + CA_LOG_D("Resolved TableId=" << TableId << " (" << SchemeEntry->TableId.PathId.ToString() << " " << SchemeEntry->TableId.SchemaVersion << ")"); - ProcessRows(); + if (SchemeEntry->TableId.SchemaVersion != TableId.SchemaVersion) { + RuntimeError(TStringBuilder() << "Schema was updated.", NYql::NDqProto::StatusIds::SCHEME_ERROR); + } + + Prepare(); + + Callbacks->ResumeExecution(); } void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { @@ -224,9 +233,13 @@ private: } else if (ev->Get()->GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) { CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId() << ", TabletId=" << ev->Get()->Record.GetOrigin()); auto& batchesQueue = InFlightBatches.at(ev->Get()->Record.GetOrigin()); - YQL_ENSURE(!batchesQueue.empty()); - if (ev->Get()->Record.GetTxId() == batchesQueue.front().TxId) { + if (!batchesQueue.empty() + && (ev->Get()->Record.GetTxId() == batchesQueue.front().TxId + || std::find( + std::begin(batchesQueue.front().OldTxIds), + std::end(batchesQueue.front().OldTxIds), + ev->Get()->Record.GetTxId()) != std::end(batchesQueue.front().OldTxIds))) { const bool needToResume = (GetFreeSpace() <= 0); EgressStats.Bytes += batchesQueue.front().Data.size(); @@ -247,7 +260,7 @@ private: void ProcessRows() { SplitBatchByShards(); - SendNewBatchesToShards(); + SendBatchesToShards(); if (Finished && SchemeEntry && IsInFlightBatchesEmpty()) { Callbacks->OnAsyncOutputFinished(GetOutputIndex()); @@ -290,9 +303,9 @@ private: } } - void SendNewBatchesToShards() { + void SendBatchesToShards() { for (auto& [shardId, batches] : InFlightBatches) { - if (!batches.empty() && batches.front().SendAttempts == 0) { + if (!batches.empty() && batches.front().TxId == 0) { if (const auto txId = AllocateTxId(); txId) { batches.front().TxId = *txId; SendRequestShard(shardId); @@ -331,7 +344,7 @@ private: Settings.GetTable().GetTableId(), Settings.GetTable().GetVersion() + 1 // TODO: SchemeShard returns wrong version. }, - ColumnIds, + WriteColumnIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); @@ -348,6 +361,19 @@ private: ++inFlightBatch.SendAttempts; } + void RetryShard(const ui64 shardId) { + if (!InFlightBatches.contains(shardId) || InFlightBatches.at(shardId).empty()) { + return; + } + CA_LOG_D("Retry ShardID=" << shardId); + auto& inFlightBatch = InFlightBatches.at(shardId).front(); + if (inFlightBatch.TxId != 0) { + inFlightBatch.OldTxIds.push_back(inFlightBatch.TxId); + inFlightBatch.TxId = 0; + } + RequestNewTxId(); + } + void Handle(TEvPrivate::TEvShardRequestTimeout::TPtr& ev) { if (!InFlightBatches.contains(ev->Get()->ShardId)) { return; @@ -359,7 +385,7 @@ private: return; } - SendRequestShard(ev->Get()->ShardId); + RetryShard(ev->Get()->ShardId); } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -367,7 +393,7 @@ private: if (!InFlightBatches.contains(ev->Get()->TabletId)) { return; } - SendRequestShard(ev->Get()->TabletId); + RetryShard(ev->Get()->TabletId); } void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { @@ -383,8 +409,8 @@ private: if (FreeTxIds.empty()) { return std::nullopt; } - const ui64 result = FreeTxIds.back(); - FreeTxIds.pop_back(); + const ui64 result = FreeTxIds.front(); + FreeTxIds.pop_front(); return result; } @@ -401,52 +427,55 @@ private: } void PassAway() override { + Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); TActorBootstrapped<TKqpWriteActor>::PassAway(); } - void BuildColumns() { - KeyColumns.reserve(Settings.KeyColumnsSize()); - i32 number = 0; - for (const auto& column : Settings.GetKeyColumns()) { - KeyColumns.emplace_back( - column.GetName(), - column.GetId(), - NScheme::TTypeInfo { - static_cast<NScheme::TTypeId>(column.GetTypeId()), - column.GetTypeId() == NScheme::NTypeIds::Pg - ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId()) - : nullptr - }, - column.GetTypeInfo().GetPgTypeMod(), - number++ - ); + void Prepare() { + YQL_ENSURE(SchemeEntry); + std::vector<std::pair<TString, NScheme::TTypeInfo>> batchBuilderColumns; + THashMap<ui32, ui32> writeColumnIdToIndex; + { + batchBuilderColumns.reserve(Settings.ColumnsSize()); + WriteColumnIds.reserve(Settings.ColumnsSize()); + if (!SchemeEntry->ColumnTableInfo) { + RuntimeError("Expected column table.", NYql::NDqProto::StatusIds::SCHEME_ERROR); + } + if (!SchemeEntry->ColumnTableInfo->Description.HasSchema()) { + RuntimeError("Unknown schema for column table.", NYql::NDqProto::StatusIds::SCHEME_ERROR); + } + i32 number = 0; + for (const auto& column : SchemeEntry->ColumnTableInfo->Description.GetSchema().GetColumns()) { + Y_ABORT_UNLESS(column.HasTypeId()); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetTypeId(), + column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr); + batchBuilderColumns.emplace_back(column.GetName(), typeInfoMod.TypeInfo); + WriteColumnIds.push_back(column.GetId()); + writeColumnIdToIndex[column.GetId()] = number++; + } } - ColumnIds.reserve(Settings.ColumnsSize()); - Columns.reserve(Settings.ColumnsSize()); - number = 0; - for (const auto& column : Settings.GetColumns()) { - ColumnIds.push_back(column.GetId()); - Columns.emplace_back( - column.GetName(), - column.GetId(), - NScheme::TTypeInfo { - static_cast<NScheme::TTypeId>(column.GetTypeId()), - column.GetTypeId() == NScheme::NTypeIds::Pg - ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId()) - : nullptr - }, - column.GetTypeInfo().GetPgTypeMod(), - number++ - ); + { + SendIndexToWriteIndexMapping.resize(Settings.ColumnsSize()); + Columns.reserve(Settings.ColumnsSize()); + i32 number = 0; + for (const auto& column : Settings.GetColumns()) { + SendIndexToWriteIndexMapping[number] = writeColumnIdToIndex.at(column.GetId()); + Columns.emplace_back( + column.GetName(), + column.GetId(), + NScheme::TTypeInfo { + static_cast<NScheme::TTypeId>(column.GetTypeId()), + column.GetTypeId() == NScheme::NTypeIds::Pg + ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId()) + : nullptr + }, + column.GetTypeInfo().GetPgTypeMod(), + number++ + ); + } } - } - void PrepareBatchBuilder() { - std::vector<std::pair<TString, NScheme::TTypeInfo>> columns; - for (const auto& column : Columns) { - columns.emplace_back(column.Name, column.PType); - } std::set<std::string> notNullColumns; for (const auto& column : Settings.GetColumns()) { if (column.GetNotNull()) { @@ -457,7 +486,7 @@ private: BatchBuilder = std::make_unique<NArrow::TArrowBatchBuilder>(arrow::Compression::UNCOMPRESSED, notNullColumns); TString err; - if (!BatchBuilder->Start(columns, 0, 0, err)) { + if (!BatchBuilder->Start(batchBuilderColumns, 0, 0, err)) { RuntimeError("Failed to start batch builder: " + err, NYql::NDqProto::StatusIds::PRECONDITION_FAILED); } } @@ -498,9 +527,9 @@ private: const NYql::NDq::TTxId TxId; const TTableId TableId; - TVector<TSysTables::TTableColumnInfo> KeyColumns; TVector<TSysTables::TTableColumnInfo> Columns; - std::vector<ui32> ColumnIds; + TVector<ui32> SendIndexToWriteIndexMapping; + std::vector<ui32> WriteColumnIds; std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry; @@ -510,6 +539,7 @@ private: TString Data; ui32 SendAttempts = 0; ui64 TxId = 0; + TVector<ui64> OldTxIds; }; THashMap<ui64, std::deque<TInFlightBatch>> InFlightBatches; bool Finished = false; @@ -517,7 +547,7 @@ private: const i64 MemoryLimit = kInFlightMemoryLimitPerActor; i64 MemoryInFlight = 0; - std::vector<ui64> FreeTxIds; + std::deque<ui64> FreeTxIds; }; void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1ff33679fd..fe2a661c3c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1084,7 +1084,8 @@ public: QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()), - QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId)); + QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId), + Settings.TableService.GetEnableOlapSink()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index c5de31b63b..9f9a4701b6 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -5507,13 +5507,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); const TString query = R"( - CREATE TABLE `/Root/DataShard` ( + CREATE TABLE `/Root/DataShard1` ( Col1 Uint64 NOT NULL, Col2 String, Col3 Int32 NOT NULL, @@ -5521,7 +5522,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ) WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); - CREATE TABLE `/Root/ColumnShard` ( + CREATE TABLE `/Root/ColumnShard1` ( Col1 Uint64 NOT NULL, Col2 String, Col3 Int32 NOT NULL, @@ -5553,7 +5554,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto client = kikimr.GetQueryClient(); auto prepareResult = client.ExecuteQuery(R"( - REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); @@ -5561,14 +5562,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { { // row -> column const TString sql = R"( - REPLACE INTO `/Root/ColumnShard` - SELECT * FROM `/Root/DataShard` + REPLACE INTO `/Root/ColumnShard1` + SELECT * FROM `/Root/DataShard1` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); auto it = client.StreamExecuteQuery(R"( - SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3; + SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); TString output = StreamResultToYson(it); @@ -5578,14 +5579,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { { // Missing Nullable column const TString sql = R"( - REPLACE INTO `/Root/ColumnShard` - SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard` + REPLACE INTO `/Root/ColumnShard1` + SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard1` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); auto it = client.StreamExecuteQuery(R"( - SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3; + SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); TString output = StreamResultToYson(it); @@ -5598,7 +5599,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { // column -> column const TString sql = R"( REPLACE INTO `/Root/ColumnShard2` - SELECT * FROM `/Root/ColumnShard` + SELECT * FROM `/Root/ColumnShard1` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); @@ -5633,10 +5634,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(OlapReplace_FromSelectLarge) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + + TTestHelper testHelper(settings); + + TKikimrRunner& kikimr = testHelper.GetKikimr(); + testHelper.GetRuntime().GetAppData(0).EnableOlapSink = true; + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("Col1").SetType(NScheme::NTypeIds::Int64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Col2").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + }; + + TTestHelper::TColumnTable testTable1; + testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema); + testHelper.CreateTable(testTable1); + + TTestHelper::TColumnTable testTable2; + testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema); + testHelper.CreateTable(testTable2); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable1.GetArrowSchema(schema)); + for (size_t index = 0; index < 10000; ++index) { + tableInserter.AddRow().Add(index).Add(index * 10); + } + testHelper.BulkUpsert(testTable1, tableInserter); + } + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto client = kikimr.GetQueryClient(); + + { + const TString sql = R"( + REPLACE INTO `/Root/ColumnShard2` + SELECT * FROM `/Root/ColumnShard1` + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); + + auto it = client.StreamExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/ColumnShard2`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[10000u]])"); + } + } + Y_UNIT_TEST(OlapReplace_Simple) { auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5645,13 +5700,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { const TString query = R"( CREATE TABLE `/Root/ColumnShard` ( Col1 Uint64 NOT NULL, - Col2 String NOT NULL, - Col3 Int32, + Col2 Int32, Col4 String, - PRIMARY KEY (Col1, Col2) + Col3 String NOT NULL, + PRIMARY KEY (Col1, Col3) ) PARTITION BY HASH(Col1) - WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -5661,7 +5716,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto client = kikimr.GetQueryClient(); { auto prepareResult = client.ExecuteQuery(R"( - REPLACE INTO `/Root/ColumnShard` (Col2, Col4, Col3, Col1) VALUES + REPLACE INTO `/Root/ColumnShard` (Col3, Col4, Col2, Col1) VALUES ("test100", "100", 1000, 100u); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); @@ -5669,14 +5724,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { { auto prepareResult = client.ExecuteQuery(R"( - REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3, Col4) VALUES + REPLACE INTO `/Root/ColumnShard` (Col1, Col3, Col2, Col4) VALUES (1u, "test1", 10, "1"), (2u, "test2", NULL, "2"), (3u, "test3", 12, NULL), (4u, "test4", NULL, NULL); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); } auto it = client.StreamExecuteQuery(R"( - SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3, Col4; + SELECT Col1, Col3, Col2, Col4 FROM `/Root/ColumnShard` ORDER BY Col1, Col3, Col2, Col4; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); TString output = StreamResultToYson(it); @@ -5687,6 +5742,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5735,6 +5791,98 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(output, R"([])"); } } + + Y_UNIT_TEST(OlapReplace_Duplicates) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + { + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES + (100u, 1000), (100u, 1000); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + { + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES + (100u, 1000), (100u, 1000); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + auto it = client.StreamExecuteQuery(R"( + SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson(output, R"([[100u;[1000]]])"); + } + + Y_UNIT_TEST(OlapReplace_DisableOlapSink) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = false; + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + { + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (1u, 1) + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!prepareResult.IsSuccess()); + UNIT_ASSERT_C( + prepareResult.GetIssues().ToString().Contains("Data manipulation queries do not support column shard tables."), + prepareResult.GetIssues().ToString()); + } + + { + auto it = client.StreamExecuteQuery(R"( + SELECT * FROM `/Root/ColumnShard`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson(output, R"([])"); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 5ca397d677..54aa9c0d83 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -272,4 +272,5 @@ message TTableServiceConfig { optional bool EnablePgConstsToParams = 53 [default = false]; optional uint64 ExtractPredicateRangesLimit = 54 [default = 10000]; + optional bool EnableOlapSink = 55 [default = false]; }; |