diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-31 03:05:48 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-31 03:05:48 +0300 |
commit | 58f259d0f1f2997f29bab23cee45b6fdfb2792d8 (patch) | |
tree | 5b00f4fa96d738379b2540437b1a35d171c54610 | |
parent | e1e1321e25787797767e2819e5eb3f06f3502733 (diff) | |
download | ydb-58f259d0f1f2997f29bab23cee45b6fdfb2792d8.tar.gz |
prepare to enable readranges source in trunk
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 135 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 7 |
6 files changed, 113 insertions, 52 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index c85dc3882c..801de6ac37 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1661,8 +1661,9 @@ private: if (i64 msc = (i64) RequestControls.MaxShardCount; msc > 0) { shardsLimit = std::min(shardsLimit, (ui32) msc); } - if (shardsLimit > 0 && datashardTasks.size() > shardsLimit) { - LOG_W("Too many affected shards: datashardTasks=" << datashardTasks.size() << ", limit: " << shardsLimit); + size_t shards = datashardTasks.size() + remoteComputeTasks.size(); + if (shardsLimit > 0 && shards > shardsLimit) { + LOG_W("Too many affected shards: datashardTasks=" << shards << ", limit: " << shardsLimit); Counters->TxProxyMon->TxResultError->Inc(); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 34dea52606..6651813e1d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -651,7 +651,7 @@ protected: } } - void BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) { + size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -695,6 +695,12 @@ protected: FillTableMeta(stageInfo, settings.MutableTable()); for (auto& keyColumn : keyTypes) { + auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(keyColumn); + if (columnType.TypeInfo) { + *settings.AddKeyColumnTypeInfos() = *columnType.TypeInfo; + } else { + *settings.AddKeyColumnTypeInfos() = NKikimrProto::TTypeInfo(); + } settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId())); } @@ -725,6 +731,9 @@ protected: settings.SetSorted(source.GetSorted()); settings.SetShardIdHint(shardId); + if (Stats) { + Stats->AffectedShards.insert(shardId); + } ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory, Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); @@ -744,6 +753,7 @@ protected: taskSourceSettings->PackFrom(settings); input.SourceType = NYql::KqpReadRangesSourceName; } + return partitions.size(); } protected: diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 65a2a00801..09bd872c4d 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -31,6 +31,7 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog:: } + namespace NKikimr { namespace NKqp { @@ -43,6 +44,19 @@ using namespace NKikimr::NDataShard; class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput { using TBase = TActorBootstrapped<TKqpReadActor>; public: + struct TResult { + ui64 ShardId; + THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; + TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch; + size_t ProcessedRows = 0; + + TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult) + : ShardId(shardId) + , ReadResult(std::move(readResult)) + { + } + }; + struct TShardState : public TIntrusiveListItem<TShardState> { TSmallVec<TSerializedTableRange> Ranges; TSmallVec<TSerializedCellVec> Points; @@ -261,8 +275,15 @@ public: ); KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size()); - for (auto typeId : Settings.GetKeyColumnTypes()) { - KeyColumnTypes.push_back(NScheme::TTypeInfo((NScheme::TTypeId)typeId)); + for (size_t i = 0; i < Settings.KeyColumnTypesSize(); ++i) { + auto typeId = Settings.GetKeyColumnTypes(i); + KeyColumnTypes.push_back( + NScheme::TTypeInfo( + (NScheme::TTypeId)typeId, + (typeId == NScheme::NTypeIds::Pg) ? + NPg::TypeDescFromPgTypeId( + Settings.GetKeyColumnTypeInfos(i).GetPgTypeId() + ) : nullptr)); } } @@ -355,7 +376,7 @@ public: TKeyDesc::TColumnOp op; op.Column = column.GetId(); op.Operation = TKeyDesc::EColumnOperation::Read; - op.ExpectedType = NScheme::TTypeInfo((NScheme::TTypeId)column.GetType()); + op.ExpectedType = MakeTypeInfo(column); columns.emplace_back(std::move(op)); } @@ -561,7 +582,6 @@ public: limit = EVREAD_MAX_ROWS; } if (limit == 0) { - delete state; return; } @@ -613,6 +633,7 @@ public: CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath() << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry) + << ", limit: " << limit << ", readId = " << id << " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")" << " lockTxId = " << Settings.GetLockTxId()); @@ -649,6 +670,7 @@ public: Reads[id].RegisterMessage(*ev->Get()); + RecievedRowCount += ev->Get()->GetRowsCount(); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed"); @@ -679,8 +701,7 @@ public: if (IsSystemColumn(Settings.GetColumns(resultColumnIndex).GetId())) { rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); } else { - rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize( - row[columnIndex], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()))); + rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[columnIndex], MakeTypeInfo(Settings.GetColumns(resultColumnIndex)))); columnIndex += 1; } } @@ -694,24 +715,21 @@ public: return TypeEnv.BindAllocator(); } - NMiniKQL::TBytesStatistics PackArrow( - THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result, - ui64 shardId, - NKikimr::NMiniKQL::TUnboxedValueVector& batch) - { + NMiniKQL::TBytesStatistics PackArrow(TResult& handle) { + auto& [shardId, result, batch, _] = handle; NMiniKQL::TBytesStatistics stats; bool hasResultColumns = false; if (result->Get()->GetRowsCount() == 0) { return {}; } if (Settings.ColumnsSize() == 0) { - batch.resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer()); + batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer()); } else { TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount()); - batch.reserve(result->Get()->GetRowsCount()); + batch->reserve(result->Get()->GetRowsCount()); for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { - batch.emplace_back(HolderFactory.CreateDirectArrayHolder( + batch->emplace_back(HolderFactory.CreateDirectArrayHolder( Settings.columns_size(), editAccessors[rowIndex]) ); @@ -743,29 +761,29 @@ public: return stats; } - NMiniKQL::TBytesStatistics PackCells( - THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result, - ui64 shardId, - NKikimr::NMiniKQL::TUnboxedValueVector& batch) - { + NMiniKQL::TBytesStatistics PackCells(TResult& handle) { + auto& [shardId, result, batch, _] = handle; NMiniKQL::TBytesStatistics stats; - batch.reserve(batch.size()); + batch->reserve(batch->size()); for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { const auto& row = result->Get()->GetCells(rowIndex); NUdf::TUnboxedValue* rowItems = nullptr; - batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems)); + batch->emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems)); + size_t rowSize = 0; size_t columnIndex = 0; for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) { auto tag = Settings.GetColumns(resultColumnIndex).GetId(); - auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType()); + auto type = MakeTypeInfo(Settings.GetColumns(resultColumnIndex)); if (IsSystemColumn(tag)) { NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type); } else { rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type); + rowSize += row[columnIndex].Size(); columnIndex += 1; } } - stats.AddStatistics(GetRowSize(rowItems)); + stats.DataBytes += std::max(rowSize, (size_t)8); + stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes; } return stats; } @@ -778,39 +796,40 @@ public: { ui64 bytes = 0; while (!Results.empty()) { - auto& [shardId, result, batch, processedRows] = Results.front(); - auto& msg = *result->Get(); + auto& result = Results.front(); + auto& batch = result.Batch; + auto& msg = *result.ReadResult->Get(); if (!batch.Defined()) { batch.ConstructInPlace(); switch (msg.Record.GetResultFormat()) { case NKikimrTxDataShard::EScanDataFormat::ARROW: - PackArrow(result, shardId, *batch); + BytesStats.AddStatistics(PackArrow(result)); break; case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: case NKikimrTxDataShard::EScanDataFormat::CELLVEC: - PackCells(result, shardId, *batch); + BytesStats.AddStatistics(PackCells(result)); } } - auto id = result->Get()->Record.GetReadId(); + auto id = result.ReadResult->Get()->Record.GetReadId(); if (!Reads[id]) { Results.pop(); continue; } auto* state = Reads[id].Shard; - for (; processedRows < batch->size(); ++processedRows) { - NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[processedRows].GetElements()); + for (; result.ProcessedRows < batch->size(); ++result.ProcessedRows) { + NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements()); if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) { break; } - resultVector.push_back(std::move((*batch)[processedRows])); + resultVector.push_back(std::move((*batch)[result.ProcessedRows])); ProcessedRowCount += 1; bytes += rowSize.AllocatedBytes; } CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows"); - if (batch->size() == processedRows) { + if (batch->size() == result.ProcessedRows) { auto& record = msg.Record; if (Reads[id].IsLastMessage(msg)) { Reads[id].Reset(); @@ -835,7 +854,6 @@ public: auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); - delete state; Reads[id].Reset(); ResetReads++; } @@ -861,6 +879,29 @@ public: return bytes; } + void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last) override { + if (last) { + NDqProto::TDqTableStats* tableStats = nullptr; + for (size_t i = 0; i < stats->TablesSize(); ++i) { + auto* table = stats->MutableTables(i); + if (table->GetTablePath() == Settings.GetTable().GetTablePath()) { + tableStats = table; + } + } + if (!tableStats) { + tableStats = stats->AddTables(); + tableStats->SetTablePath(Settings.GetTable().GetTablePath()); + + } + + //FIXME: use evread statistics after KIKIMR-16924 + tableStats->SetReadRows(tableStats->GetReadRows() + RecievedRowCount); + tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes); + tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size()); + } + } + + void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) override {} void CommitState(const NYql::NDqProto::TCheckpoint&) override {} void LoadState(const NYql::NDqProto::TSourceState&) override {} @@ -897,13 +938,25 @@ public: return result; } + + NScheme::TTypeInfo MakeTypeInfo(const NKikimrTxDataShard::TKqpTransaction_TColumnMeta& info) { + auto typeId = info.GetType(); + return NScheme::TTypeInfo( + (NScheme::TTypeId)typeId, + (typeId == NScheme::NTypeIds::Pg) ? + NPg::TypeDescFromPgTypeId( + info.GetTypeInfo().GetPgTypeId() + ) : nullptr); + } + private: NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings; TVector<NScheme::TTypeInfo> KeyColumnTypes; - size_t RecievedRowCount = 0; - size_t ProcessedRowCount = 0; + NMiniKQL::TBytesStatistics BytesStats; + ui64 RecievedRowCount = 0; + ui64 ProcessedRowCount = 0; ui64 ResetReads = 0; ui64 ReadId = 0; TVector<TReadState> Reads; @@ -915,18 +968,6 @@ private: TShardQueue InFlightShards; TShardQueue PendingShards; - struct TResult { - ui64 ShardId; - THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; - TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch; - size_t ProcessedRows = 0; - - TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult) - : ShardId(shardId) - , ReadResult(std::move(readResult)) - { - } - }; TQueue<TResult> Results; TVector<NKikimrTxDataShard::TLock> Locks; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 6d0de59aab..5d886a2b1a 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -255,8 +255,10 @@ message TKqpReadRangesSourceSettings { repeated TKqpTransaction.TColumnMeta Columns = 4; optional uint64 ItemsLimit = 5; optional bool Reverse = 6; - repeated string SkipNullKeys = 7; + repeated uint32 KeyColumnTypes = 8; + repeated NKikimrProto.TTypeInfo KeyColumnTypeInfos = 15; + optional EScanDataFormat DataFormat = 9; optional NKikimrProto.TRowVersion Snapshot = 10; optional uint64 ShardIdHint = 11; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index d7245f1632..19c3c1d612 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -89,6 +89,8 @@ struct IDqComputeActorAsyncInput { virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; } + virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { } + // The same signature as IActor::PassAway(). // It is guaranted that this method will be called with bound MKQL allocator. // So, it is the right place to destroy all internal UnboxedValues. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e51ae3ae14..64d3d11547 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1917,10 +1917,11 @@ public: THashMap<ui64, ui64> ingressBytesMap; for (auto& [inputIndex, sourceInfo] : SourcesMap) { - if (sourceInfo.AsyncInput) { + if (auto* source = sourceInfo.AsyncInput) { auto ingressBytes = sourceInfo.AsyncInput->GetIngressBytes(); ingressBytesMap.emplace(inputIndex, ingressBytes); Ingress[sourceInfo.Type] = Ingress.Value(sourceInfo.Type, 0) + ingressBytes; + source->FillExtraStats(protoTask, last); } } FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats(), ingressBytesMap); @@ -1970,6 +1971,10 @@ public: protoTransform->SetIngressBytes(ingressBytes); protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage); + + if (auto* transform = transformInfo.AsyncInput) { + transform->FillExtraStats(protoTask, last); + } } } |