diff options
| author | aidarsamer <[email protected]> | 2023-10-03 11:28:23 +0300 |
|---|---|---|
| committer | aidarsamer <[email protected]> | 2023-10-03 12:17:24 +0300 |
| commit | 23622625eaff1282a6747c8a0b5c65b64ae04b23 (patch) | |
| tree | 45993192385522125346b9bae07b89d36ac2d721 | |
| parent | d6e6ee8dea5c7a1854889fb47138c835a1693a2a (diff) | |
KIKIMR-19041: Add support to read OLAP tables using Generic Queries
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 95 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 293 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 280 | ||||
| -rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 3 | ||||
| -rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 13 | ||||
| -rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 106 |
8 files changed, 482 insertions, 316 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 916984da92d..35034a56d8f 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -128,8 +128,11 @@ public: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter") + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, + maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter" + ) , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) { @@ -184,6 +187,7 @@ public: ReadOnlyTx && !ImmediateTx && !HasPersistentChannels && + !HasOlapTable && (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) && AppData()->FeatureFlags.GetEnableMvccSnapshotReads() ); @@ -292,9 +296,9 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData); - hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); @@ -1338,18 +1342,6 @@ private: return true; } - void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) { - if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { - // Validate parameters - YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit); - YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse); - return; - } - - taskMeta.ReadInfo.ItemsLimit = itemsLimit; - taskMeta.ReadInfo.Reverse = reverse; - }; - void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { THashMap<ui64, ui64> shardTasks; // shardId -> taskId auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -1387,16 +1379,8 @@ private: YQL_ENSURE(!shardInfo.KeyWriteRanges); auto& task = getShardTask(shardId); - FillGeneralReadInfo(task.Meta, readSettings.ItemsLimit, readSettings.Reverse); - - TTaskMeta::TShardReadInfo readInfo; - readInfo.Ranges = std::move(*shardInfo.KeyReadRanges); - readInfo.Columns = columns; - - if (!task.Meta.Reads) { - task.Meta.Reads.ConstructInPlace(); - } - task.Meta.Reads->emplace_back(std::move(readInfo)); + MergeReadInfoToTaskMeta(task.Meta, shardId, shardInfo.KeyReadRanges, readSettings, + columns, op, /*isPersistentScan*/ false); } break; @@ -1660,7 +1644,7 @@ private: void DoExecute() { TVector<TString> secretNames; for (const auto& transaction : Request.Transactions) { - for (const auto& secretName : transaction.Body->GetSecretNames()) { + for (const auto& secretName : transaction.Body->GetSecretNames()) { SecretSnapshotRequired = true; secretNames.push_back(secretName); } @@ -1668,7 +1652,7 @@ private: if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) { ResourceSnapshotRequired = true; HasExternalSources = true; - } + } } } @@ -1690,6 +1674,18 @@ private: } } + bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) { + if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) { + return true; + } + for (const auto &tableOp : stage.GetTableOps()) { + if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + return true; + } + } + return false; + } + void Execute() { NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); @@ -1722,7 +1718,7 @@ private: } } - if (stageInfo.Meta.IsOlap() && tx.Body->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA) { + if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, @@ -1747,6 +1743,8 @@ private: default: YQL_ENSURE(false, "unknown source type"); } + } else if (StreamResult && stageInfo.Meta.IsOlap()) { + BuildScanTasksFromShards(stageInfo); } else if (stageInfo.Meta.ShardOperations.empty()) { BuildComputeTasks(stageInfo, secureParams); } else if (stageInfo.Meta.IsSysView()) { @@ -1889,11 +1887,43 @@ private: void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; + + TSet<ui64> shardIds; + if (StreamResult) { + for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { + if (stageInfo.Meta.IsOlap()) { + HasOlapTable = true; + break; + } + } + if (HasOlapTable) { + for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { + if (stageInfo.Meta.ShardKey) { + for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { + shardIds.insert(partition.ShardId); + } + } + } + if (shardIds) { + LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); + auto kqpShardsResolver = CreateKqpShardsResolver( + this->SelfId(), TxId, false, std::move(shardIds)); + KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); + } else { + GetResourcesSnapshot(); + } + return; + } + } DoExecute(); } void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; + if (HasOlapTable) { + GetResourcesSnapshot(); + return; + } OnShardsResolve(); } @@ -1903,7 +1933,7 @@ private: auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); - LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState"); + LOG_T("Create temporary mvcc snapshot, become WaitSnapshotState"); Become(&TKqpDataExecuter::WaitSnapshotState); if (ExecuterStateSpan) { ExecuterStateSpan.End(); @@ -2416,12 +2446,13 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) + TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, + TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 7f4d45f06c7..ae38be06287 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -87,7 +87,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -103,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 1a1aeddaf95..5c0473abcbb 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -117,6 +117,7 @@ public: TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) @@ -128,6 +129,8 @@ public: , ExecuterRetriesConfig(executerRetriesConfig) , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) , UserRequestContext(userRequestContext) + , AggregationSettings(aggregation) + , HasOlapTable(false) { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>(); @@ -402,7 +405,7 @@ protected: } Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>(); - + TString secretValue; for (const TString& secretName : SecretNames) { auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName); @@ -448,14 +451,6 @@ protected: return secureParams; } - void GetResourcesSnapshot() { - GetKqpResourceManager()->RequestClusterResourcesInfo( - [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); - as->Send(eh); - }); - } - protected: bool CheckExecutionComplete() { if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) { @@ -835,7 +830,7 @@ protected: auto sourceName = externalSource.GetSourceName(); TString structuredToken; if (sourceName) { - structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); } TVector<ui64> tasksIds; @@ -1012,6 +1007,279 @@ protected: } } + void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted) const + { + if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { + // Validate parameters + YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit); + YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse); + return; + } + + taskMeta.ReadInfo.ItemsLimit = itemsLimit; + taskMeta.ReadInfo.Reverse = reverse; + taskMeta.ReadInfo.Sorted = sorted; + taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows; + } + + TTaskMeta::TReadInfo::EReadType OlapReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) const { + switch (type) { + case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS: + return TTaskMeta::TReadInfo::EReadType::Rows; + case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS: + return TTaskMeta::TReadInfo::EReadType::Blocks; + default: + YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf."); + } + } + + void FillOlapReadInfo(TTaskMeta& taskMeta, NKikimr::NMiniKQL::TType* resultType, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange) const { + if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { + // Validate parameters + if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { + YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program.empty()); + return; + } + + YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program == readOlapRange->GetOlapProgram()); + return; + } + + if (resultType) { + YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct + || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple); + + auto* resultStructType = static_cast<NKikimr::NMiniKQL::TStructType*>(resultType); + ui32 resultColsCount = resultStructType->GetMembersCount(); + + taskMeta.ReadInfo.ResultColumnsTypes.reserve(resultColsCount); + for (ui32 i = 0; i < resultColsCount; ++i) { + auto memberType = resultStructType->GetMemberType(i); + if (memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) { + memberType = static_cast<NKikimr::NMiniKQL::TOptionalType*>(memberType)->GetItemType(); + } + // TODO: support pg types + YQL_ENSURE(memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data, + "Expected simple data types to be read from column shard"); + auto memberDataType = static_cast<NKikimr::NMiniKQL::TDataType*>(memberType); + taskMeta.ReadInfo.ResultColumnsTypes.push_back(NScheme::TTypeInfo(memberDataType->GetSchemeType())); + } + } + + if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { + return; + } + taskMeta.ReadInfo.ReadType = OlapReadTypeFromProto(readOlapRange->GetReadType()); + taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram(); + for (auto& name: readOlapRange->GetOlapProgramParameterNames()) { + taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name); + } + } + + void MergeReadInfoToTaskMeta(TTaskMeta& meta, ui64 shardId, TMaybe<TShardKeyRanges>& keyReadRanges, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns, + const NKqpProto::TKqpPhyTableOperation& op, bool isPersistentScan) const + { + TTaskMeta::TShardReadInfo readInfo = { + .Columns = columns, + }; + if (keyReadRanges) { + readInfo.Ranges = std::move(*keyReadRanges); // sorted & non-intersecting + } + + if (isPersistentScan) { + readInfo.ShardId = shardId; + } + + FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted); + if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + FillOlapReadInfo(meta, readSettings.ResultType, op.GetReadOlapRange()); + } + + if (!meta.Reads) { + meta.Reads.ConstructInPlace(); + } + + meta.Reads->emplace_back(std::move(readInfo)); + } + + ui32 GetScanTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const { + ui32 result = 0; + if (isOlapScan) { + if (AggregationSettings.HasCSScanThreadsPerNode()) { + result = AggregationSettings.GetCSScanThreadsPerNode(); + } else { + const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); + result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}); + } + } else { + const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); + result = AggregationSettings.GetDSScanMinimalThreads(); + if (stage.GetProgram().GetSettings().GetHasSort()) { + result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); + } + if (stage.GetProgram().GetSettings().GetHasMapJoin()) { + result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads()); + } + } + return Max<ui32>(1, result); + } + + TTask& AssignScanTaskToShard( + TStageInfo& stageInfo, const ui64 shardId, + THashMap<ui64, std::vector<ui64>>& nodeTasks, + THashMap<ui64, ui64>& assignedShardsCount, + const bool sorted, const bool isOlapScan) + { + ui64 nodeId = ShardIdToNodeId.at(shardId); + if (stageInfo.Meta.IsOlap() && sorted) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.ExecuterId = SelfId(); + task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; + return task; + } + + auto& tasks = nodeTasks[nodeId]; + auto& cnt = assignedShardsCount[nodeId]; + const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); + if (cnt < maxScansPerNode) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; + tasks.push_back(task.Id); + ++cnt; + return task; + } else { + ui64 taskIdx = cnt % maxScansPerNode; + ++cnt; + return TasksGraph.GetTask(tasks[taskIdx]); + } + } + + void BuildScanTasksFromShards(TStageInfo& stageInfo) { + THashMap<ui64, std::vector<ui64>> nodeTasks; + THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards; + THashMap<ui64, ui64> assignedShardsCount; + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); + + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + const auto& keyTypes = tableInfo->KeyColumnTypes; + ui32 metaId = 0; + for (auto& op : stage.GetTableOps()) { + Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); + + auto columns = BuildKqpColumns(op, tableInfo); + auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv()); + const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange); + auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); + + if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadRange) { + stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(), + op.GetReadRange().GetSkipNullKeys().end()); + // not supported for scan queries + YQL_ENSURE(!readSettings.Reverse); + } + + for (auto&& i: partitions) { + const ui64 nodeId = ShardIdToNodeId.at(i.first); + nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second))); + } + + if (Stats && CollectProfileStats(Request.StatsMode)) { + for (auto&& i : nodeShards) { + Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size()); + } + } + + if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) { + for (auto&& pair : nodeShards) { + auto& shardsInfo = pair.second; + for (auto&& shardInfo : shardsInfo) { + auto& task = AssignScanTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.Sorted, isOlapScan); + MergeReadInfoToTaskMeta(task.Meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings, + columns, op, /*isPersistentScan*/ true); + } + } + + for (const auto& pair : nodeTasks) { + for (const auto& taskIdx : pair.second) { + auto& task = TasksGraph.GetTask(taskIdx); + task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted); + PrepareScanMetaForUsage(task.Meta, keyTypes); + } + } + + } else { + for (auto&& pair : nodeShards) { + const auto nodeId = pair.first; + auto& shardsInfo = pair.second; + const ui32 metaGlueingId = ++metaId; + TTaskMeta meta; + { + for (auto&& shardInfo : shardsInfo) { + MergeReadInfoToTaskMeta(meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings, + columns, op, /*isPersistentScan*/ true); + } + PrepareScanMetaForUsage(meta, keyTypes); + LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId + << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry)); + } + for (ui32 t = 0; t < GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta = meta; + task.Meta.SetEnableShardsSequentialScan(false); + task.Meta.ExecuterId = SelfId(); + task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; + task.SetMetaId(metaGlueingId); + } + } + } + } + + LOG_D("Stage " << stageInfo.Id << " will be executed on " << nodeTasks.size() << " nodes."); + } + + void PrepareScanMetaForUsage(TTaskMeta& meta, const TVector<NScheme::TTypeInfo>& keyTypes) const { + YQL_ENSURE(meta.Reads.Defined()); + auto& taskReads = meta.Reads.GetRef(); + + /* + * Sort read ranges so that sequential scan of that ranges produce sorted result. + * + * Partition pruner feed us with set of non-intersecting ranges with filled right boundary. + * So we may sort ranges based solely on the their rightmost point. + */ + std::sort(taskReads.begin(), taskReads.end(), [&](const auto& lhs, const auto& rhs) { + if (lhs.ShardId == rhs.ShardId) { + return false; + } + + const std::pair<const TSerializedCellVec*, bool> k1 = lhs.Ranges.GetRightBorder(); + const std::pair<const TSerializedCellVec*, bool> k2 = rhs.Ranges.GetRightBorder(); + + const int cmp = CompareBorders<false, false>( + k1.first->GetCells(), + k2.first->GetCells(), + k1.second, + k2.second, + keyTypes); + + return (cmp < 0); + }); + } + + void GetResourcesSnapshot() { + GetKqpResourceManager()->RequestClusterResourcesInfo( + [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new typename TEvPrivate::TEvResourcesSnapshot(std::move(resources))); + as->Send(eh); + }); + } + protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { @@ -1306,6 +1574,10 @@ protected: TIntrusivePtr<TUserRequestContext> UserRequestContext; + const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; + TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot; + bool HasOlapTable; + private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; @@ -1314,6 +1586,7 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 4286719a7e7..e19abf395ba 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -37,17 +37,6 @@ using namespace NYql::NDq; namespace { -TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) { - switch (type) { - case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS: - return TTaskMeta::TReadInfo::EReadType::Rows; - case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS: - return TTaskMeta::TReadInfo::EReadType::Blocks; - default: - YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf."); - } -} - class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> { using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>; TPreparedQueryHolder::TConstPtr PreparedQuery; @@ -63,9 +52,10 @@ public: TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter") + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, + maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter" + ) , PreparedQuery(preparedQuery) - , AggregationSettings(aggregation) { YQL_ENSURE(Request.Transactions.size() == 1); YQL_ENSURE(Request.DataShardLocks.empty()); @@ -159,81 +149,6 @@ private: private: - void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted, - NKikimr::NMiniKQL::TType* resultType, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange) const - { - if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { - // Validate parameters - YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit); - YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse); - - if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { - YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program.empty()); - return; - } - - YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program == readOlapRange->GetOlapProgram()); - return; - } - - taskMeta.ReadInfo.ItemsLimit = itemsLimit; - taskMeta.ReadInfo.Reverse = reverse; - taskMeta.ReadInfo.Sorted = sorted; - taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows; - - if (resultType) { - YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct - || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple); - - auto* resultStructType = static_cast<NKikimr::NMiniKQL::TStructType*>(resultType); - ui32 resultColsCount = resultStructType->GetMembersCount(); - - taskMeta.ReadInfo.ResultColumnsTypes.reserve(resultColsCount); - for (ui32 i = 0; i < resultColsCount; ++i) { - auto memberType = resultStructType->GetMemberType(i); - if (memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) { - memberType = static_cast<NKikimr::NMiniKQL::TOptionalType*>(memberType)->GetItemType(); - } - // TODO: support pg types - YQL_ENSURE(memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data, - "Expected simple data types to be read from column shard"); - auto memberDataType = static_cast<NKikimr::NMiniKQL::TDataType*>(memberType); - taskMeta.ReadInfo.ResultColumnsTypes.push_back(NScheme::TTypeInfo(memberDataType->GetSchemeType())); - } - } - - if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { - return; - } - taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType()); - taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram(); - for (auto& name: readOlapRange->GetOlapProgramParameterNames()) { - taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name); - } - }; - - ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const { - ui32 result = 0; - if (isOlapScan) { - if (AggregationSettings.HasCSScanThreadsPerNode()) { - result = AggregationSettings.GetCSScanThreadsPerNode(); - } else { - const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); - result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}); - } - } else { - const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - result = AggregationSettings.GetDSScanMinimalThreads(); - if (stage.GetProgram().GetSettings().GetHasSort()) { - result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); - } - if (stage.GetProgram().GetSettings().GetHasMapJoin()) { - result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads()); - } - } - return Max<ui32>(1, result); - } - ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const { if (AggregationSettings.HasAggregationComputeThreads()) { return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads()); @@ -245,178 +160,6 @@ private: } } - TTask& AssignTaskToShard( - TStageInfo& stageInfo, const ui64 shardId, - THashMap<ui64, std::vector<ui64>>& nodeTasks, - THashMap<ui64, ui64>& assignedShardsCount, - const bool sorted, const bool isOlapScan) - { - ui64 nodeId = ShardIdToNodeId.at(shardId); - if (stageInfo.Meta.IsOlap() && sorted) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.ExecuterId = SelfId(); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.Meta.Type = TTaskMeta::TTaskType::Scan; - return task; - } - - auto& tasks = nodeTasks[nodeId]; - auto& cnt = assignedShardsCount[nodeId]; - const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId); - if (cnt < maxScansPerNode) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.Meta.Type = TTaskMeta::TTaskType::Scan; - tasks.push_back(task.Id); - ++cnt; - return task; - } else { - ui64 taskIdx = cnt % maxScansPerNode; - ++cnt; - return TasksGraph.GetTask(tasks[taskIdx]); - } - } - - void MergeToTaskMeta(TTaskMeta& meta, TShardInfoWithId& shardInfo, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns, - const NKqpProto::TKqpPhyTableOperation& op) const { - YQL_ENSURE(!shardInfo.KeyWriteRanges); - - TTaskMeta::TShardReadInfo readInfo = { - .Ranges = std::move(*shardInfo.KeyReadRanges), // sorted & non-intersecting - .Columns = columns, - .ShardId = shardInfo.ShardId, - }; - - if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { - const auto& readRange = op.GetReadOlapRange(); - FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, readSettings.ResultType, readRange); - } else { - FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, nullptr, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>()); - } - - if (!meta.Reads) { - meta.Reads.ConstructInPlace(); - } - - meta.Reads->emplace_back(std::move(readInfo)); - } - - void PrepareMetaForUsage(TTaskMeta& meta, const TVector<NScheme::TTypeInfo>& keyTypes) const { - YQL_ENSURE(meta.Reads.Defined()); - auto& taskReads = meta.Reads.GetRef(); - - /* - * Sort read ranges so that sequential scan of that ranges produce sorted result. - * - * Partition pruner feed us with set of non-intersecting ranges with filled right boundary. - * So we may sort ranges based solely on the their rightmost point. - */ - std::sort(taskReads.begin(), taskReads.end(), [&](const auto& lhs, const auto& rhs) { - if (lhs.ShardId == rhs.ShardId) { - return false; - } - - const std::pair<const TSerializedCellVec*, bool> k1 = lhs.Ranges.GetRightBorder(); - const std::pair<const TSerializedCellVec*, bool> k2 = rhs.Ranges.GetRightBorder(); - - const int cmp = CompareBorders<false, false>( - k1.first->GetCells(), - k2.first->GetCells(), - k1.second, - k2.second, - keyTypes); - - return (cmp < 0); - }); - } - - void BuildScanTasks(TStageInfo& stageInfo) { - THashMap<ui64, std::vector<ui64>> nodeTasks; - THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards; - THashMap<ui64, ui64> assignedShardsCount; - auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - - const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyTypes = tableInfo->KeyColumnTypes; - ui32 metaId = 0; - for (auto& op : stage.GetTableOps()) { - Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); - - auto columns = BuildKqpColumns(op, tableInfo); - auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv()); - const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange); - auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); - - if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadRange) { - stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(), - op.GetReadRange().GetSkipNullKeys().end()); - // not supported for scan queries - YQL_ENSURE(!readSettings.Reverse); - } - - for (auto&& i: partitions) { - const ui64 nodeId = ShardIdToNodeId.at(i.first); - nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second))); - } - - if (Stats && CollectProfileStats(Request.StatsMode)) { - for (auto&& i : nodeShards) { - Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size()); - } - } - - if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) { - for (auto&& pair : nodeShards) { - auto& shardsInfo = pair.second; - for (auto&& shardInfo : shardsInfo) { - auto& task = AssignTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.Sorted, isOlapScan); - MergeToTaskMeta(task.Meta, shardInfo, readSettings, columns, op); - } - } - - for (const auto& pair : nodeTasks) { - for (const auto& taskIdx : pair.second) { - auto& task = TasksGraph.GetTask(taskIdx); - task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted); - PrepareMetaForUsage(task.Meta, keyTypes); - } - } - - } else { - for (auto&& pair : nodeShards) { - const auto nodeId = pair.first; - auto& shardsInfo = pair.second; - const ui32 metaGlueingId = ++metaId; - TTaskMeta meta; - { - for (auto&& shardInfo : shardsInfo) { - MergeToTaskMeta(meta, shardInfo, readSettings, columns, op); - } - PrepareMetaForUsage(meta, keyTypes); - LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId - << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry)); - } - for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta = meta; - task.Meta.SetEnableShardsSequentialScan(false); - task.Meta.ExecuterId = SelfId(); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.Meta.Type = TTaskMeta::TTaskType::Scan; - task.SetMetaId(metaGlueingId); - } - } - } - - } - - LOG_D("Stage " << stageInfo.Id << " will be executed on " << nodeTasks.size() << " nodes."); - - } - void BuildComputeTasks(TStageInfo& stageInfo) { auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -506,10 +249,11 @@ private: LOG_E("Can not find default state storage group for database " << Database); } - Execute(std::move(ev->Get()->Snapshot)); + ResourcesSnapshot = std::move(ev->Get()->Snapshot); + Execute(); } - void Execute(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { + void Execute() { LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId); NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); @@ -535,7 +279,8 @@ private: } else if (stageInfo.Meta.IsSysView()) { BuildSysViewScanTasks(stageInfo, {}); } else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) { - BuildScanTasks(stageInfo); + HasOlapTable = true; + BuildScanTasksFromShards(stageInfo); } else { YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind); } @@ -609,7 +354,7 @@ private: LOG_D("TotalShardScans: " << nShardScans); - ExecuteScanTx(std::move(snapshot)); + ExecuteScanTx(); Become(&TKqpScanExecuter::ExecuteState); if (ExecuterStateSpan) { @@ -649,11 +394,12 @@ public: } private: - void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { + void ExecuteScanTx() { Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling, - Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false, UserRequestContext); + Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, false /* isDataQuery */, + Request.MkqlMemoryLimit, nullptr, false, UserRequestContext); LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size()); auto err = Planner->PlanExecution(); @@ -692,8 +438,6 @@ private: TBase::PassAway(); } -private: - const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index d6c42aa1b54..ec96d32d892 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -238,6 +238,9 @@ public: bool NeedPersistentSnapshot() const { auto type = GetType(); + if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) { + return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery()); + } return ( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 3c7695ae971..ec5accc1b0e 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -179,5 +179,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig return readPhases > 1; } +bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { + for (const auto &tx : physicalQuery.GetTransactions()) { + for (const auto &stage : tx.GetStages()) { + for (const auto &tableOp : stage.GetTableOps()) { + if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + return true; + } + } + } + } + return false; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index 642f5c07b6d..75d3f8778d9 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -386,6 +386,8 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx, bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery); +bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); + } // namespace NKikimr::NKqp template<> diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index f34c57c9677..714b88018f4 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1,5 +1,6 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/core/sys_view/service/query_history.h> #include <ydb/core/tx/columnshard/columnshard_ut_common.h> @@ -1437,7 +1438,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); auto rows = ExecuteScanQuery(tableClient, selectQuery); - + TInstant tsPrev = TInstant::MicroSeconds(2000000); std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099, 1000999, 1001000, @@ -4934,7 +4935,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestTableWithNulls({ testCase }); } - Y_UNIT_TEST(Olap_InsertFails) { + Y_UNIT_TEST(Olap_InsertFailsOnDataQuery) { auto settings = TKikimrSettings() .SetWithSampleTables(false) .SetForceColumnTablesCompositeMarks(true); @@ -4951,7 +4952,29 @@ Y_UNIT_TEST_SUITE(KqpOlap) { INSERT INTO `/Root/tableWithNulls`(id, resource_id, level) VALUES(1, "1", 1); )", TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT(!result.IsSuccess()); + std::string errorMsg = result.GetIssues().ToString(); + UNIT_ASSERT_C(errorMsg.find("Data manipulation queries do not support column shard tables.") != std::string::npos, errorMsg); + } + + Y_UNIT_TEST(Olap_InsertFailsOnGenericQuery) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetForceColumnTablesCompositeMarks(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + INSERT INTO `/Root/tableWithNulls`(id, resource_id, level) VALUES(1, "1", 1); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT(!result.IsSuccess()); + std::string errorMsg = result.GetIssues().ToString(); + UNIT_ASSERT_C(errorMsg.find("Data manipulation queries do not support column shard tables.") != std::string::npos, errorMsg); } Y_UNIT_TEST(OlapRead_FailsOnDataQuery) { @@ -5025,6 +5048,83 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } + + Y_UNIT_TEST(OlapRead_UsesGenericQueryOnJoinWithDataShardTable) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetForceColumnTablesCompositeMarks(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + TLocalHelper(kikimr).CreateTestOlapTable(); + + { + WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls"); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); + } + + auto db = kikimr.GetQueryClient(); + auto result = db.ExecuteQuery(R"( + SELECT timestamp, resource_id, uid, level FROM `/Root/olapStore/olapTable` WHERE resource_id IN (SELECT CAST(id AS Utf8) FROM `/Root/tableWithNulls`); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + TString output = FormatResultSetYson(result.GetResultSet(0)); + Cout << output << Endl; + CompareYson(output, R"([[1000001u;["1"];["uid_1000001"];[1]]])"); + } + + Y_UNIT_TEST(OlapRead_GenericQuery) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetForceColumnTablesCompositeMarks(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + TLocalHelper(kikimr).CreateTestOlapTable(); + + { + WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls"); + } + + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/tableWithNulls`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + TString output = FormatResultSetYson(result.GetResultSet(0)); + Cout << output << Endl; + CompareYson(output, R"([[10u;]])"); + } + + Y_UNIT_TEST(OlapRead_ScanQuery) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetForceColumnTablesCompositeMarks(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + TLocalHelper(kikimr).CreateTestOlapTable(); + + { + WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls"); + } + + NScripting::TScriptingClient client(kikimr.GetDriver()); + auto result = client.ExecuteYqlScript(R"( + SELECT COUNT(*) FROM `/Root/tableWithNulls`; + )").GetValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + TString output = FormatResultSetYson(result.GetResultSet(0)); + Cout << output << Endl; + CompareYson(output, R"([[10u;]])"); + } } } // namespace NKqp |
