summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <[email protected]>2023-10-03 11:28:23 +0300
committeraidarsamer <[email protected]>2023-10-03 12:17:24 +0300
commit23622625eaff1282a6747c8a0b5c65b64ae04b23 (patch)
tree45993192385522125346b9bae07b89d36ac2d721
parentd6e6ee8dea5c7a1854889fb47138c835a1693a2a (diff)
KIKIMR-19041: Add support to read OLAP tables using Generic Queries
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp95
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h293
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp280
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h3
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp13
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h2
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp106
8 files changed, 482 insertions, 316 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 916984da92d..35034a56d8f 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -128,8 +128,11 @@ public:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter")
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
+ maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
+ )
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
@@ -184,6 +187,7 @@ public:
ReadOnlyTx &&
!ImmediateTx &&
!HasPersistentChannels &&
+ !HasOlapTable &&
(!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) &&
AppData()->FeatureFlags.GetEnableMvccSnapshotReads()
);
@@ -292,9 +296,9 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData);
- hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
@@ -1338,18 +1342,6 @@ private:
return true;
}
- void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) {
- if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
- // Validate parameters
- YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit);
- YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse);
- return;
- }
-
- taskMeta.ReadInfo.ItemsLimit = itemsLimit;
- taskMeta.ReadInfo.Reverse = reverse;
- };
-
void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
THashMap<ui64, ui64> shardTasks; // shardId -> taskId
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -1387,16 +1379,8 @@ private:
YQL_ENSURE(!shardInfo.KeyWriteRanges);
auto& task = getShardTask(shardId);
- FillGeneralReadInfo(task.Meta, readSettings.ItemsLimit, readSettings.Reverse);
-
- TTaskMeta::TShardReadInfo readInfo;
- readInfo.Ranges = std::move(*shardInfo.KeyReadRanges);
- readInfo.Columns = columns;
-
- if (!task.Meta.Reads) {
- task.Meta.Reads.ConstructInPlace();
- }
- task.Meta.Reads->emplace_back(std::move(readInfo));
+ MergeReadInfoToTaskMeta(task.Meta, shardId, shardInfo.KeyReadRanges, readSettings,
+ columns, op, /*isPersistentScan*/ false);
}
break;
@@ -1660,7 +1644,7 @@ private:
void DoExecute() {
TVector<TString> secretNames;
for (const auto& transaction : Request.Transactions) {
- for (const auto& secretName : transaction.Body->GetSecretNames()) {
+ for (const auto& secretName : transaction.Body->GetSecretNames()) {
SecretSnapshotRequired = true;
secretNames.push_back(secretName);
}
@@ -1668,7 +1652,7 @@ private:
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
ResourceSnapshotRequired = true;
HasExternalSources = true;
- }
+ }
}
}
@@ -1690,6 +1674,18 @@ private:
}
}
+ bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
+ if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) {
+ return true;
+ }
+ for (const auto &tableOp : stage.GetTableOps()) {
+ if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ return true;
+ }
+ }
+ return false;
+ }
+
void Execute() {
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -1722,7 +1718,7 @@ private:
}
}
- if (stageInfo.Meta.IsOlap() && tx.Body->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA) {
+ if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
@@ -1747,6 +1743,8 @@ private:
default:
YQL_ENSURE(false, "unknown source type");
}
+ } else if (StreamResult && stageInfo.Meta.IsOlap()) {
+ BuildScanTasksFromShards(stageInfo);
} else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo, secureParams);
} else if (stageInfo.Meta.IsSysView()) {
@@ -1889,11 +1887,43 @@ private:
void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
+
+ TSet<ui64> shardIds;
+ if (StreamResult) {
+ for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
+ if (stageInfo.Meta.IsOlap()) {
+ HasOlapTable = true;
+ break;
+ }
+ }
+ if (HasOlapTable) {
+ for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
+ if (stageInfo.Meta.ShardKey) {
+ for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
+ shardIds.insert(partition.ShardId);
+ }
+ }
+ }
+ if (shardIds) {
+ LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
+ auto kqpShardsResolver = CreateKqpShardsResolver(
+ this->SelfId(), TxId, false, std::move(shardIds));
+ KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver);
+ } else {
+ GetResourcesSnapshot();
+ }
+ return;
+ }
+ }
DoExecute();
}
void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
+ if (HasOlapTable) {
+ GetResourcesSnapshot();
+ return;
+ }
OnShardsResolve();
}
@@ -1903,7 +1933,7 @@ private:
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
- LOG_T("Create temporary mvcc snapshot, ebcome WaitSnapshotState");
+ LOG_T("Create temporary mvcc snapshot, become WaitSnapshotState");
Become(&TKqpDataExecuter::WaitSnapshotState);
if (ExecuterStateSpan) {
ExecuterStateSpan.End();
@@ -2416,12 +2446,13 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
- std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 7f4d45f06c7..ae38be06287 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -87,7 +87,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -103,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 1a1aeddaf95..5c0473abcbb 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -117,6 +117,7 @@ public:
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
@@ -128,6 +129,8 @@ public:
, ExecuterRetriesConfig(executerRetriesConfig)
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
, UserRequestContext(userRequestContext)
+ , AggregationSettings(aggregation)
+ , HasOlapTable(false)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -402,7 +405,7 @@ protected:
}
Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>();
-
+
TString secretValue;
for (const TString& secretName : SecretNames) {
auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName);
@@ -448,14 +451,6 @@ protected:
return secureParams;
}
- void GetResourcesSnapshot() {
- GetKqpResourceManager()->RequestClusterResourcesInfo(
- [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
- as->Send(eh);
- });
- }
-
protected:
bool CheckExecutionComplete() {
if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) {
@@ -835,7 +830,7 @@ protected:
auto sourceName = externalSource.GetSourceName();
TString structuredToken;
if (sourceName) {
- structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
}
TVector<ui64> tasksIds;
@@ -1012,6 +1007,279 @@ protected:
}
}
+ void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted) const
+ {
+ if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
+ // Validate parameters
+ YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit);
+ YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse);
+ return;
+ }
+
+ taskMeta.ReadInfo.ItemsLimit = itemsLimit;
+ taskMeta.ReadInfo.Reverse = reverse;
+ taskMeta.ReadInfo.Sorted = sorted;
+ taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows;
+ }
+
+ TTaskMeta::TReadInfo::EReadType OlapReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) const {
+ switch (type) {
+ case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS:
+ return TTaskMeta::TReadInfo::EReadType::Rows;
+ case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS:
+ return TTaskMeta::TReadInfo::EReadType::Blocks;
+ default:
+ YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf.");
+ }
+ }
+
+ void FillOlapReadInfo(TTaskMeta& taskMeta, NKikimr::NMiniKQL::TType* resultType, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange) const {
+ if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
+ // Validate parameters
+ if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
+ YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program.empty());
+ return;
+ }
+
+ YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program == readOlapRange->GetOlapProgram());
+ return;
+ }
+
+ if (resultType) {
+ YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct
+ || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple);
+
+ auto* resultStructType = static_cast<NKikimr::NMiniKQL::TStructType*>(resultType);
+ ui32 resultColsCount = resultStructType->GetMembersCount();
+
+ taskMeta.ReadInfo.ResultColumnsTypes.reserve(resultColsCount);
+ for (ui32 i = 0; i < resultColsCount; ++i) {
+ auto memberType = resultStructType->GetMemberType(i);
+ if (memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) {
+ memberType = static_cast<NKikimr::NMiniKQL::TOptionalType*>(memberType)->GetItemType();
+ }
+ // TODO: support pg types
+ YQL_ENSURE(memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data,
+ "Expected simple data types to be read from column shard");
+ auto memberDataType = static_cast<NKikimr::NMiniKQL::TDataType*>(memberType);
+ taskMeta.ReadInfo.ResultColumnsTypes.push_back(NScheme::TTypeInfo(memberDataType->GetSchemeType()));
+ }
+ }
+
+ if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
+ return;
+ }
+ taskMeta.ReadInfo.ReadType = OlapReadTypeFromProto(readOlapRange->GetReadType());
+ taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram();
+ for (auto& name: readOlapRange->GetOlapProgramParameterNames()) {
+ taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name);
+ }
+ }
+
+ void MergeReadInfoToTaskMeta(TTaskMeta& meta, ui64 shardId, TMaybe<TShardKeyRanges>& keyReadRanges, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns,
+ const NKqpProto::TKqpPhyTableOperation& op, bool isPersistentScan) const
+ {
+ TTaskMeta::TShardReadInfo readInfo = {
+ .Columns = columns,
+ };
+ if (keyReadRanges) {
+ readInfo.Ranges = std::move(*keyReadRanges); // sorted & non-intersecting
+ }
+
+ if (isPersistentScan) {
+ readInfo.ShardId = shardId;
+ }
+
+ FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted);
+ if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ FillOlapReadInfo(meta, readSettings.ResultType, op.GetReadOlapRange());
+ }
+
+ if (!meta.Reads) {
+ meta.Reads.ConstructInPlace();
+ }
+
+ meta.Reads->emplace_back(std::move(readInfo));
+ }
+
+ ui32 GetScanTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const {
+ ui32 result = 0;
+ if (isOlapScan) {
+ if (AggregationSettings.HasCSScanThreadsPerNode()) {
+ result = AggregationSettings.GetCSScanThreadsPerNode();
+ } else {
+ const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
+ result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {});
+ }
+ } else {
+ const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+ result = AggregationSettings.GetDSScanMinimalThreads();
+ if (stage.GetProgram().GetSettings().GetHasSort()) {
+ result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
+ }
+ if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
+ result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads());
+ }
+ }
+ return Max<ui32>(1, result);
+ }
+
+ TTask& AssignScanTaskToShard(
+ TStageInfo& stageInfo, const ui64 shardId,
+ THashMap<ui64, std::vector<ui64>>& nodeTasks,
+ THashMap<ui64, ui64>& assignedShardsCount,
+ const bool sorted, const bool isOlapScan)
+ {
+ ui64 nodeId = ShardIdToNodeId.at(shardId);
+ if (stageInfo.Meta.IsOlap() && sorted) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.ExecuterId = SelfId();
+ task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ return task;
+ }
+
+ auto& tasks = nodeTasks[nodeId];
+ auto& cnt = assignedShardsCount[nodeId];
+ const ui32 maxScansPerNode = GetScanTasksPerNode(stageInfo, isOlapScan, nodeId);
+ if (cnt < maxScansPerNode) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ tasks.push_back(task.Id);
+ ++cnt;
+ return task;
+ } else {
+ ui64 taskIdx = cnt % maxScansPerNode;
+ ++cnt;
+ return TasksGraph.GetTask(tasks[taskIdx]);
+ }
+ }
+
+ void BuildScanTasksFromShards(TStageInfo& stageInfo) {
+ THashMap<ui64, std::vector<ui64>> nodeTasks;
+ THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards;
+ THashMap<ui64, ui64> assignedShardsCount;
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ const auto& keyTypes = tableInfo->KeyColumnTypes;
+ ui32 metaId = 0;
+ for (auto& op : stage.GetTableOps()) {
+ Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
+
+ auto columns = BuildKqpColumns(op, tableInfo);
+ auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv());
+ const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange);
+ auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
+
+ if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadRange) {
+ stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(),
+ op.GetReadRange().GetSkipNullKeys().end());
+ // not supported for scan queries
+ YQL_ENSURE(!readSettings.Reverse);
+ }
+
+ for (auto&& i: partitions) {
+ const ui64 nodeId = ShardIdToNodeId.at(i.first);
+ nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
+ }
+
+ if (Stats && CollectProfileStats(Request.StatsMode)) {
+ for (auto&& i : nodeShards) {
+ Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
+ }
+ }
+
+ if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) {
+ for (auto&& pair : nodeShards) {
+ auto& shardsInfo = pair.second;
+ for (auto&& shardInfo : shardsInfo) {
+ auto& task = AssignScanTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.Sorted, isOlapScan);
+ MergeReadInfoToTaskMeta(task.Meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings,
+ columns, op, /*isPersistentScan*/ true);
+ }
+ }
+
+ for (const auto& pair : nodeTasks) {
+ for (const auto& taskIdx : pair.second) {
+ auto& task = TasksGraph.GetTask(taskIdx);
+ task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
+ PrepareScanMetaForUsage(task.Meta, keyTypes);
+ }
+ }
+
+ } else {
+ for (auto&& pair : nodeShards) {
+ const auto nodeId = pair.first;
+ auto& shardsInfo = pair.second;
+ const ui32 metaGlueingId = ++metaId;
+ TTaskMeta meta;
+ {
+ for (auto&& shardInfo : shardsInfo) {
+ MergeReadInfoToTaskMeta(meta, shardInfo.ShardId, shardInfo.KeyReadRanges, readSettings,
+ columns, op, /*isPersistentScan*/ true);
+ }
+ PrepareScanMetaForUsage(meta, keyTypes);
+ LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId
+ << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry));
+ }
+ for (ui32 t = 0; t < GetScanTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta = meta;
+ task.Meta.SetEnableShardsSequentialScan(false);
+ task.Meta.ExecuterId = SelfId();
+ task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ task.SetMetaId(metaGlueingId);
+ }
+ }
+ }
+ }
+
+ LOG_D("Stage " << stageInfo.Id << " will be executed on " << nodeTasks.size() << " nodes.");
+ }
+
+ void PrepareScanMetaForUsage(TTaskMeta& meta, const TVector<NScheme::TTypeInfo>& keyTypes) const {
+ YQL_ENSURE(meta.Reads.Defined());
+ auto& taskReads = meta.Reads.GetRef();
+
+ /*
+ * Sort read ranges so that sequential scan of that ranges produce sorted result.
+ *
+ * Partition pruner feed us with set of non-intersecting ranges with filled right boundary.
+ * So we may sort ranges based solely on the their rightmost point.
+ */
+ std::sort(taskReads.begin(), taskReads.end(), [&](const auto& lhs, const auto& rhs) {
+ if (lhs.ShardId == rhs.ShardId) {
+ return false;
+ }
+
+ const std::pair<const TSerializedCellVec*, bool> k1 = lhs.Ranges.GetRightBorder();
+ const std::pair<const TSerializedCellVec*, bool> k2 = rhs.Ranges.GetRightBorder();
+
+ const int cmp = CompareBorders<false, false>(
+ k1.first->GetCells(),
+ k2.first->GetCells(),
+ k1.second,
+ k2.second,
+ keyTypes);
+
+ return (cmp < 0);
+ });
+ }
+
+ void GetResourcesSnapshot() {
+ GetKqpResourceManager()->RequestClusterResourcesInfo(
+ [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new typename TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
+ as->Send(eh);
+ });
+ }
+
protected:
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
@@ -1306,6 +1574,10 @@ protected:
TIntrusivePtr<TUserRequestContext> UserRequestContext;
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
+ TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
+ bool HasOlapTable;
+
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
@@ -1314,6 +1586,7 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 4286719a7e7..e19abf395ba 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -37,17 +37,6 @@ using namespace NYql::NDq;
namespace {
-TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) {
- switch (type) {
- case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS:
- return TTaskMeta::TReadInfo::EReadType::Rows;
- case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS:
- return TTaskMeta::TReadInfo::EReadType::Blocks;
- default:
- YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf.");
- }
-}
-
class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> {
using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>;
TPreparedQueryHolder::TConstPtr PreparedQuery;
@@ -63,9 +52,10 @@ public:
TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter")
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
+ maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter"
+ )
, PreparedQuery(preparedQuery)
- , AggregationSettings(aggregation)
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.DataShardLocks.empty());
@@ -159,81 +149,6 @@ private:
private:
- void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted,
- NKikimr::NMiniKQL::TType* resultType, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange) const
- {
- if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
- // Validate parameters
- YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit);
- YQL_ENSURE(taskMeta.ReadInfo.Reverse == reverse);
-
- if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
- YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program.empty());
- return;
- }
-
- YQL_ENSURE(taskMeta.ReadInfo.OlapProgram.Program == readOlapRange->GetOlapProgram());
- return;
- }
-
- taskMeta.ReadInfo.ItemsLimit = itemsLimit;
- taskMeta.ReadInfo.Reverse = reverse;
- taskMeta.ReadInfo.Sorted = sorted;
- taskMeta.ReadInfo.ReadType = TTaskMeta::TReadInfo::EReadType::Rows;
-
- if (resultType) {
- YQL_ENSURE(resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct
- || resultType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Tuple);
-
- auto* resultStructType = static_cast<NKikimr::NMiniKQL::TStructType*>(resultType);
- ui32 resultColsCount = resultStructType->GetMembersCount();
-
- taskMeta.ReadInfo.ResultColumnsTypes.reserve(resultColsCount);
- for (ui32 i = 0; i < resultColsCount; ++i) {
- auto memberType = resultStructType->GetMemberType(i);
- if (memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) {
- memberType = static_cast<NKikimr::NMiniKQL::TOptionalType*>(memberType)->GetItemType();
- }
- // TODO: support pg types
- YQL_ENSURE(memberType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data,
- "Expected simple data types to be read from column shard");
- auto memberDataType = static_cast<NKikimr::NMiniKQL::TDataType*>(memberType);
- taskMeta.ReadInfo.ResultColumnsTypes.push_back(NScheme::TTypeInfo(memberDataType->GetSchemeType()));
- }
- }
-
- if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
- return;
- }
- taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType());
- taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram();
- for (auto& name: readOlapRange->GetOlapProgramParameterNames()) {
- taskMeta.ReadInfo.OlapProgram.ParameterNames.insert(name);
- }
- };
-
- ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const {
- ui32 result = 0;
- if (isOlapScan) {
- if (AggregationSettings.HasCSScanThreadsPerNode()) {
- result = AggregationSettings.GetCSScanThreadsPerNode();
- } else {
- const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
- result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {});
- }
- } else {
- const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- result = AggregationSettings.GetDSScanMinimalThreads();
- if (stage.GetProgram().GetSettings().GetHasSort()) {
- result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
- }
- if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
- result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads());
- }
- }
- return Max<ui32>(1, result);
- }
-
ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const {
if (AggregationSettings.HasAggregationComputeThreads()) {
return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads());
@@ -245,178 +160,6 @@ private:
}
}
- TTask& AssignTaskToShard(
- TStageInfo& stageInfo, const ui64 shardId,
- THashMap<ui64, std::vector<ui64>>& nodeTasks,
- THashMap<ui64, ui64>& assignedShardsCount,
- const bool sorted, const bool isOlapScan)
- {
- ui64 nodeId = ShardIdToNodeId.at(shardId);
- if (stageInfo.Meta.IsOlap() && sorted) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.ExecuterId = SelfId();
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.Meta.Type = TTaskMeta::TTaskType::Scan;
- return task;
- }
-
- auto& tasks = nodeTasks[nodeId];
- auto& cnt = assignedShardsCount[nodeId];
- const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId);
- if (cnt < maxScansPerNode) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.Meta.Type = TTaskMeta::TTaskType::Scan;
- tasks.push_back(task.Id);
- ++cnt;
- return task;
- } else {
- ui64 taskIdx = cnt % maxScansPerNode;
- ++cnt;
- return TasksGraph.GetTask(tasks[taskIdx]);
- }
- }
-
- void MergeToTaskMeta(TTaskMeta& meta, TShardInfoWithId& shardInfo, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns,
- const NKqpProto::TKqpPhyTableOperation& op) const {
- YQL_ENSURE(!shardInfo.KeyWriteRanges);
-
- TTaskMeta::TShardReadInfo readInfo = {
- .Ranges = std::move(*shardInfo.KeyReadRanges), // sorted & non-intersecting
- .Columns = columns,
- .ShardId = shardInfo.ShardId,
- };
-
- if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
- const auto& readRange = op.GetReadOlapRange();
- FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, readSettings.ResultType, readRange);
- } else {
- FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, nullptr, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
- }
-
- if (!meta.Reads) {
- meta.Reads.ConstructInPlace();
- }
-
- meta.Reads->emplace_back(std::move(readInfo));
- }
-
- void PrepareMetaForUsage(TTaskMeta& meta, const TVector<NScheme::TTypeInfo>& keyTypes) const {
- YQL_ENSURE(meta.Reads.Defined());
- auto& taskReads = meta.Reads.GetRef();
-
- /*
- * Sort read ranges so that sequential scan of that ranges produce sorted result.
- *
- * Partition pruner feed us with set of non-intersecting ranges with filled right boundary.
- * So we may sort ranges based solely on the their rightmost point.
- */
- std::sort(taskReads.begin(), taskReads.end(), [&](const auto& lhs, const auto& rhs) {
- if (lhs.ShardId == rhs.ShardId) {
- return false;
- }
-
- const std::pair<const TSerializedCellVec*, bool> k1 = lhs.Ranges.GetRightBorder();
- const std::pair<const TSerializedCellVec*, bool> k2 = rhs.Ranges.GetRightBorder();
-
- const int cmp = CompareBorders<false, false>(
- k1.first->GetCells(),
- k2.first->GetCells(),
- k1.second,
- k2.second,
- keyTypes);
-
- return (cmp < 0);
- });
- }
-
- void BuildScanTasks(TStageInfo& stageInfo) {
- THashMap<ui64, std::vector<ui64>> nodeTasks;
- THashMap<ui64, std::vector<TShardInfoWithId>> nodeShards;
- THashMap<ui64, ui64> assignedShardsCount;
- auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
-
- const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyTypes = tableInfo->KeyColumnTypes;
- ui32 metaId = 0;
- for (auto& op : stage.GetTableOps()) {
- Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
-
- auto columns = BuildKqpColumns(op, tableInfo);
- auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv());
- const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange);
- auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
-
- if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadRange) {
- stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(),
- op.GetReadRange().GetSkipNullKeys().end());
- // not supported for scan queries
- YQL_ENSURE(!readSettings.Reverse);
- }
-
- for (auto&& i: partitions) {
- const ui64 nodeId = ShardIdToNodeId.at(i.first);
- nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
- }
-
- if (Stats && CollectProfileStats(Request.StatsMode)) {
- for (auto&& i : nodeShards) {
- Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
- }
- }
-
- if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) {
- for (auto&& pair : nodeShards) {
- auto& shardsInfo = pair.second;
- for (auto&& shardInfo : shardsInfo) {
- auto& task = AssignTaskToShard(stageInfo, shardInfo.ShardId, nodeTasks, assignedShardsCount, readSettings.Sorted, isOlapScan);
- MergeToTaskMeta(task.Meta, shardInfo, readSettings, columns, op);
- }
- }
-
- for (const auto& pair : nodeTasks) {
- for (const auto& taskIdx : pair.second) {
- auto& task = TasksGraph.GetTask(taskIdx);
- task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
- PrepareMetaForUsage(task.Meta, keyTypes);
- }
- }
-
- } else {
- for (auto&& pair : nodeShards) {
- const auto nodeId = pair.first;
- auto& shardsInfo = pair.second;
- const ui32 metaGlueingId = ++metaId;
- TTaskMeta meta;
- {
- for (auto&& shardInfo : shardsInfo) {
- MergeToTaskMeta(meta, shardInfo, readSettings, columns, op);
- }
- PrepareMetaForUsage(meta, keyTypes);
- LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId
- << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry));
- }
- for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta = meta;
- task.Meta.SetEnableShardsSequentialScan(false);
- task.Meta.ExecuterId = SelfId();
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.Meta.Type = TTaskMeta::TTaskType::Scan;
- task.SetMetaId(metaGlueingId);
- }
- }
- }
-
- }
-
- LOG_D("Stage " << stageInfo.Id << " will be executed on " << nodeTasks.size() << " nodes.");
-
- }
-
void BuildComputeTasks(TStageInfo& stageInfo) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -506,10 +249,11 @@ private:
LOG_E("Can not find default state storage group for database " << Database);
}
- Execute(std::move(ev->Get()->Snapshot));
+ ResourcesSnapshot = std::move(ev->Get()->Snapshot);
+ Execute();
}
- void Execute(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
+ void Execute() {
LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId);
NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
@@ -535,7 +279,8 @@ private:
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo, {});
} else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) {
- BuildScanTasks(stageInfo);
+ HasOlapTable = true;
+ BuildScanTasksFromShards(stageInfo);
} else {
YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind);
}
@@ -609,7 +354,7 @@ private:
LOG_D("TotalShardScans: " << nShardScans);
- ExecuteScanTx(std::move(snapshot));
+ ExecuteScanTx();
Become(&TKqpScanExecuter::ExecuteState);
if (ExecuterStateSpan) {
@@ -649,11 +394,12 @@ public:
}
private:
- void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
+ void ExecuteScanTx() {
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
- Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false, UserRequestContext);
+ Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, false /* isDataQuery */,
+ Request.MkqlMemoryLimit, nullptr, false, UserRequestContext);
LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
auto err = Planner->PlanExecution();
@@ -692,8 +438,6 @@ private:
TBase::PassAway();
}
-private:
- const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
};
} // namespace
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index d6c42aa1b54..ec96d32d892 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -238,6 +238,9 @@ public:
bool NeedPersistentSnapshot() const {
auto type = GetType();
+ if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) {
+ return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery());
+ }
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 3c7695ae971..ec5accc1b0e 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -179,5 +179,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return readPhases > 1;
}
+bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
+ for (const auto &tx : physicalQuery.GetTransactions()) {
+ for (const auto &stage : tx.GetStages()) {
+ for (const auto &tableOp : stage.GetTableOps()) {
+ if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index 642f5c07b6d..75d3f8778d9 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -386,6 +386,8 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp
bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx,
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);
+bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
+
} // namespace NKikimr::NKqp
template<>
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index f34c57c9677..714b88018f4 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/core/sys_view/service/query_history.h>
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
@@ -1437,7 +1438,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
auto rows = ExecuteScanQuery(tableClient, selectQuery);
-
+
TInstant tsPrev = TInstant::MicroSeconds(2000000);
std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099,
1000999, 1001000,
@@ -4934,7 +4935,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestTableWithNulls({ testCase });
}
- Y_UNIT_TEST(Olap_InsertFails) {
+ Y_UNIT_TEST(Olap_InsertFailsOnDataQuery) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetForceColumnTablesCompositeMarks(true);
@@ -4951,7 +4952,29 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
INSERT INTO `/Root/tableWithNulls`(id, resource_id, level) VALUES(1, "1", 1);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
- UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
+ UNIT_ASSERT(!result.IsSuccess());
+ std::string errorMsg = result.GetIssues().ToString();
+ UNIT_ASSERT_C(errorMsg.find("Data manipulation queries do not support column shard tables.") != std::string::npos, errorMsg);
+ }
+
+ Y_UNIT_TEST(Olap_InsertFailsOnGenericQuery) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetForceColumnTablesCompositeMarks(true);
+ TKikimrRunner kikimr(settings);
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO `/Root/tableWithNulls`(id, resource_id, level) VALUES(1, "1", 1);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT(!result.IsSuccess());
+ std::string errorMsg = result.GetIssues().ToString();
+ UNIT_ASSERT_C(errorMsg.find("Data manipulation queries do not support column shard tables.") != std::string::npos, errorMsg);
}
Y_UNIT_TEST(OlapRead_FailsOnDataQuery) {
@@ -5025,6 +5048,83 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
+
+ Y_UNIT_TEST(OlapRead_UsesGenericQueryOnJoinWithDataShardTable) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetForceColumnTablesCompositeMarks(true);
+ TKikimrRunner kikimr(settings);
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+ TLocalHelper(kikimr).CreateTestOlapTable();
+
+ {
+ WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
+ }
+
+ auto db = kikimr.GetQueryClient();
+ auto result = db.ExecuteQuery(R"(
+ SELECT timestamp, resource_id, uid, level FROM `/Root/olapStore/olapTable` WHERE resource_id IN (SELECT CAST(id AS Utf8) FROM `/Root/tableWithNulls`);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ TString output = FormatResultSetYson(result.GetResultSet(0));
+ Cout << output << Endl;
+ CompareYson(output, R"([[1000001u;["1"];["uid_1000001"];[1]]])");
+ }
+
+ Y_UNIT_TEST(OlapRead_GenericQuery) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetForceColumnTablesCompositeMarks(true);
+ TKikimrRunner kikimr(settings);
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+ TLocalHelper(kikimr).CreateTestOlapTable();
+
+ {
+ WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
+ }
+
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT COUNT(*) FROM `/Root/tableWithNulls`;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ TString output = FormatResultSetYson(result.GetResultSet(0));
+ Cout << output << Endl;
+ CompareYson(output, R"([[10u;]])");
+ }
+
+ Y_UNIT_TEST(OlapRead_ScanQuery) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetForceColumnTablesCompositeMarks(true);
+ TKikimrRunner kikimr(settings);
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+ TLocalHelper(kikimr).CreateTestOlapTable();
+
+ {
+ WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
+ }
+
+ NScripting::TScriptingClient client(kikimr.GetDriver());
+ auto result = client.ExecuteYqlScript(R"(
+ SELECT COUNT(*) FROM `/Root/tableWithNulls`;
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ TString output = FormatResultSetYson(result.GetResultSet(0));
+ Cout << output << Endl;
+ CompareYson(output, R"([[10u;]])");
+ }
}
} // namespace NKqp