diff options
author | nonexistence <nonexistence@yandex-team.ru> | 2022-02-10 16:52:13 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:13 +0300 |
commit | 986f48c33fde808e393e27d5dd0057658efbb660 (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | e94168c80216b9145837ea08560d3103dfaafdb8 (diff) | |
download | ydb-986f48c33fde808e393e27d5dd0057658efbb660.tar.gz |
Restoring authorship annotation for <nonexistence@yandex-team.ru>. Commit 2 of 2.
-rw-r--r-- | ydb/core/kqp/runtime/kqp_output_stream.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 88 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/ya.make | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/api/protos/service.proto | 6 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/service/grpc_service.cpp | 4 |
12 files changed, 67 insertions, 67 deletions
diff --git a/ydb/core/kqp/runtime/kqp_output_stream.cpp b/ydb/core/kqp/runtime/kqp_output_stream.cpp index a669564bc61..640706d9b45 100644 --- a/ydb/core/kqp/runtime/kqp_output_stream.cpp +++ b/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -31,7 +31,7 @@ public: MKQL_ENSURE_S(!Partitions.empty()); MKQL_ENSURE_S(KeyColumnTypes.size() == KeyColumnIndices.size()); - SortPartitions(Partitions, KeyColumnTypes, [](const auto& partition) { return partition.Range; }); + SortPartitions(Partitions, KeyColumnTypes, [](const auto& partition) { return partition.Range; }); } bool IsFull() const override { diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 2bef612a6b9..50a3c1f4013 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -36,7 +36,7 @@ TBuildKqpDataTxOutRSUnit::TBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipel TBuildKqpDataTxOutRSUnit::~TBuildKqpDataTxOutRSUnit() {} -bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const { +bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const { return true; } @@ -44,7 +44,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac const TActorContext& ctx) { TSetupSysLocks guardLocks(op, DataShard); - TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); + TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); DataShard.ReleaseCache(*tx); diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 5aade3ac478..a262c98de5a 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -404,7 +404,7 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc } } -NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, +NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner) { return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, tasks, tasksRunner, /* applyEffects */ false); @@ -421,7 +421,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const return nullptr; } - MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished); + MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished); auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 25017d2a233..228f71fd37d 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -20,7 +20,7 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay); -NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, +NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner); THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx, diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index c26b575a615..b32f635cc0f 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -11,29 +11,29 @@ using namespace NMiniKQL; class TExecuteDataTxUnit : public TExecutionUnit { public: TExecuteDataTxUnit(TDataShard& dataShard, - TPipeline& pipeline); + TPipeline& pipeline); ~TExecuteDataTxUnit() override; bool IsReadyToExecute(TOperation::TPtr op) const override; EExecutionStatus Execute(TOperation::TPtr op, - TTransactionContext& txc, - const TActorContext& ctx) override; + TTransactionContext& txc, + const TActorContext& ctx) override; void Complete(TOperation::TPtr op, - const TActorContext& ctx) override; + const TActorContext& ctx) override; private: void ExecuteDataTx(TOperation::TPtr op, - TTransactionContext& txc, - const TActorContext& ctx); + TTransactionContext& txc, + const TActorContext& ctx); void AddLocksToResult(TOperation::TPtr op); }; TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard, - TPipeline& pipeline) - : TExecutionUnit(EExecutionUnitKind::ExecuteDataTx, true, dataShard, pipeline) { + TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::ExecuteDataTx, true, dataShard, pipeline) { } -TExecuteDataTxUnit::~TExecuteDataTxUnit() { +TExecuteDataTxUnit::~TExecuteDataTxUnit() { } bool TExecuteDataTxUnit::IsReadyToExecute(TOperation::TPtr op) const { @@ -50,7 +50,7 @@ bool TExecuteDataTxUnit::IsReadyToExecute(TOperation::TPtr op) const { } EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, - TTransactionContext& txc, + TTransactionContext& txc, const TActorContext& ctx) { if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) { @@ -67,7 +67,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } TSetupSysLocks guardLocks(op, DataShard); - TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); + TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); if (tx->IsTxDataReleased()) { @@ -94,7 +94,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } } - IEngineFlat* engine = tx->GetDataTx()->GetEngine(); + IEngineFlat* engine = tx->GetDataTx()->GetEngine(); Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID()); if (op->IsImmediate() && !tx->ReValidateKeys()) { @@ -132,7 +132,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, engine->PinPages(pageFaultCount); throw; } - } catch (const TMemoryLimitExceededException&) { + } catch (const TMemoryLimitExceededException&) { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << DataShard.TabletID() << " exceeded memory limit " << txc.GetMemoryLimit() << " and requests " << txc.GetMemoryLimit() * MEMORY_REQUEST_FACTOR @@ -170,9 +170,9 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, TTransactionContext& txc, - const TActorContext& ctx) { - TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); - IEngineFlat* engine = tx->GetDataTx()->GetEngine(); + const TActorContext& ctx) { + TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); + IEngineFlat* engine = tx->GetDataTx()->GetEngine(); DataShard.ReleaseCache(*tx); tx->GetDataTx()->ResetCounters(); @@ -187,8 +187,8 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, engine->AfterOutgoingReadsetsExtracted(); } - for (auto& rs : op->InReadSets()) { - for (auto& rsdata : rs.second) { + for (auto& rs : op->InReadSets()) { + for (auto& rsdata : rs.second) { engine->AddIncomingReadset(rsdata.Body); } } @@ -199,35 +199,35 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, IEngineFlat::EResult engineResult = engine->Execute(); if (engineResult != IEngineFlat::EResult::Ok) { - TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at " - << DataShard.TabletID() << ": " << engine->GetErrors(); + TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at " + << DataShard.TabletID() << ": " << engine->GetErrors(); switch (engineResult) { - case IEngineFlat::EResult::ResultTooBig: - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); - break; - case IEngineFlat::EResult::Cancelled: - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); + case IEngineFlat::EResult::ResultTooBig: + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); + break; + case IEngineFlat::EResult::Cancelled: + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); Y_VERIFY(tx->GetDataTx()->CanCancel()); - break; - default: - if (op->IsReadOnly() || op->IsImmediate()) { - LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); - } else { - // TODO: Kill only current datashard tablet. - Y_FAIL_S("Unexpected execution error in read-write transaction: " - << errorMessage); - } - break; + break; + default: + if (op->IsReadOnly() || op->IsImmediate()) { + LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage); + } else { + // TODO: Kill only current datashard tablet. + Y_FAIL_S("Unexpected execution error in read-write transaction: " + << errorMessage); + } + break; } } if (engineResult == IEngineFlat::EResult::Cancelled) DataShard.IncCounter(op->IsImmediate() - ? COUNTER_IMMEDIATE_TX_CANCELLED - : COUNTER_PLANNED_TX_CANCELLED); + ? COUNTER_IMMEDIATE_TX_CANCELLED + : COUNTER_PLANNED_TX_CANCELLED); - auto& result = BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); + auto& result = BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); result->Record.SetOrderId(op->GetTxId()); if (!op->IsImmediate()) result->Record.SetStep(op->GetStep()); @@ -250,13 +250,13 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Executed operation " << *op << " at tablet " << DataShard.TabletID() - << " with status " << result->GetStatus()); + << " with status " << result->GetStatus()); - auto& counters = tx->GetDataTx()->GetCounters(); + auto& counters = tx->GetDataTx()->GetCounters(); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Datashard execution counters for " << *op << " at " - << DataShard.TabletID() << ": " << counters.ToString()); + << DataShard.TabletID() << ": " << counters.ToString()); KqpUpdateDataShardStatCounters(DataShard, counters); if (tx->GetDataTx()->CollectStats()) { @@ -272,20 +272,20 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, Pipeline.AddCommittingOp(op); } -void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) { +void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) { auto locks = DataShard.SysLocksTable().ApplyLocks(); for (const auto& lock : locks) { if (lock.IsError()) { LOG_NOTICE_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Lock is not set for " << *op << " at " << DataShard.TabletID() - << " lock " << lock); + << " lock " << lock); } op->Result()->AddTxLock(lock.LockId, lock.DataShard, lock.Generation, lock.Counter, lock.SchemeShard, lock.PathId); } } -void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) { +void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) { } THolder<TExecutionUnit> CreateExecuteDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) { diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 972b027aa97..70e7be3662b 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -68,7 +68,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } TSetupSysLocks guardLocks(op, DataShard); - TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); + TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); DataShard.ReleaseCache(*tx); @@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio KqpFillTxStats(DataShard, dataTx->GetCounters(), *op->Result()); KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result()); } - } catch (const TMemoryLimitExceededException&) { + } catch (const TMemoryLimitExceededException&) { txc.NotEnoughMemory(); LOG_T("Operation " << *op << " at " << tabletId diff --git a/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp index 3a6ad08296e..1362a5149f4 100644 --- a/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp @@ -12,10 +12,10 @@ class TExecuteKqpScanTxUnit : public TExecutionUnit { public: TExecuteKqpScanTxUnit(TDataShard& dataShard, TPipeline& pipeline) : TExecutionUnit(EExecutionUnitKind::ExecuteKqpScanTx, false, dataShard, pipeline) { - } + } ~TExecuteKqpScanTxUnit() override { - } + } bool IsReadyToExecute(TOperation::TPtr op) const override { if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && WillRejectDataTx(op)) { diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index b809f689a8c..10900352c3b 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -203,8 +203,8 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// TDqTaskRunner //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class TDqTaskRunner : public IDqTaskRunner { -public: +class TDqTaskRunner : public IDqTaskRunner { +public: TDqTaskRunner(const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) : Context(context) , Settings(settings) @@ -538,7 +538,7 @@ public: } if (Y_UNLIKELY(CollectProfileStats)) { StopWaiting(Stats->FinishTs); - } + } return ERunStatus::Finished; } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index d8bc72696c4..a0cc816e91e 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -21,12 +21,12 @@ namespace NYql::NDq { -enum class ERunStatus : ui32 { - Finished, +enum class ERunStatus : ui32 { + Finished, PendingInput, PendingOutput -}; - +}; + class TRunStatusTimeMetrics { public: void UpdateStatusTime(TDuration computeCpuTime = TDuration::Zero()) { diff --git a/ydb/library/yql/dq/runtime/ya.make b/ydb/library/yql/dq/runtime/ya.make index f407943ffa7..4cc8585431d 100644 --- a/ydb/library/yql/dq/runtime/ya.make +++ b/ydb/library/yql/dq/runtime/ya.make @@ -33,7 +33,7 @@ SRCS( ) GENERATE_ENUM_SERIALIZATION(dq_tasks_runner.h) - + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index aa75f459747..b91351e5d12 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -21,10 +21,10 @@ message ResponseMetric { int64 Count = 6; } -message ExecuteQueryResult { - Ydb.ResultSet result = 1; +message ExecuteQueryResult { + Ydb.ResultSet result = 1; bytes yson = 2; -} +} message TFile { enum EFileType { diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index f95581bb2b9..65a9e500d9d 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -23,14 +23,14 @@ #include <library/cpp/grpc/server/grpc_counters.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> - + #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/actors/helpers/future_callback.h> #include <library/cpp/build_info/build_info.h> #include <library/cpp/svnversion/svnversion.h> #include <util/string/split.h> -#include <util/system/env.h> +#include <util/system/env.h> namespace NYql::NDqs { using namespace NYql::NDqs; |