aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornonexistence <nonexistence@yandex-team.ru>2022-02-10 16:52:13 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:13 +0300
commit986f48c33fde808e393e27d5dd0057658efbb660 (patch)
treeab7fbbf3253d4c0e2793218f09378908beb025fb
parente94168c80216b9145837ea08560d3103dfaafdb8 (diff)
downloadydb-986f48c33fde808e393e27d5dd0057658efbb660.tar.gz
Restoring authorship annotation for <nonexistence@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--ydb/core/kqp/runtime/kqp_output_stream.cpp2
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h2
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp88
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h8
-rw-r--r--ydb/library/yql/dq/runtime/ya.make2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/service.proto6
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp4
12 files changed, 67 insertions, 67 deletions
diff --git a/ydb/core/kqp/runtime/kqp_output_stream.cpp b/ydb/core/kqp/runtime/kqp_output_stream.cpp
index a669564bc61..640706d9b45 100644
--- a/ydb/core/kqp/runtime/kqp_output_stream.cpp
+++ b/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -31,7 +31,7 @@ public:
MKQL_ENSURE_S(!Partitions.empty());
MKQL_ENSURE_S(KeyColumnTypes.size() == KeyColumnIndices.size());
- SortPartitions(Partitions, KeyColumnTypes, [](const auto& partition) { return partition.Range; });
+ SortPartitions(Partitions, KeyColumnTypes, [](const auto& partition) { return partition.Range; });
}
bool IsFull() const override {
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index 2bef612a6b9..50a3c1f4013 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -36,7 +36,7 @@ TBuildKqpDataTxOutRSUnit::TBuildKqpDataTxOutRSUnit(TDataShard& dataShard, TPipel
TBuildKqpDataTxOutRSUnit::~TBuildKqpDataTxOutRSUnit() {}
-bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const {
+bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const {
return true;
}
@@ -44,7 +44,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
const TActorContext& ctx)
{
TSetupSysLocks guardLocks(op, DataShard);
- TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
DataShard.ReleaseCache(*tx);
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 5aade3ac478..a262c98de5a 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -404,7 +404,7 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc
}
}
-NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
+NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner)
{
return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, tasks, tasksRunner, /* applyEffects */ false);
@@ -421,7 +421,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
return nullptr;
}
- MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished);
+ MKQL_ENSURE_S(runStatus == NYql::NDq::ERunStatus::Finished);
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA,
origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 25017d2a233..228f71fd37d 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -20,7 +20,7 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo,
void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay);
-NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
+NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner);
THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx,
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index c26b575a615..b32f635cc0f 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -11,29 +11,29 @@ using namespace NMiniKQL;
class TExecuteDataTxUnit : public TExecutionUnit {
public:
TExecuteDataTxUnit(TDataShard& dataShard,
- TPipeline& pipeline);
+ TPipeline& pipeline);
~TExecuteDataTxUnit() override;
bool IsReadyToExecute(TOperation::TPtr op) const override;
EExecutionStatus Execute(TOperation::TPtr op,
- TTransactionContext& txc,
- const TActorContext& ctx) override;
+ TTransactionContext& txc,
+ const TActorContext& ctx) override;
void Complete(TOperation::TPtr op,
- const TActorContext& ctx) override;
+ const TActorContext& ctx) override;
private:
void ExecuteDataTx(TOperation::TPtr op,
- TTransactionContext& txc,
- const TActorContext& ctx);
+ TTransactionContext& txc,
+ const TActorContext& ctx);
void AddLocksToResult(TOperation::TPtr op);
};
TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard,
- TPipeline& pipeline)
- : TExecutionUnit(EExecutionUnitKind::ExecuteDataTx, true, dataShard, pipeline) {
+ TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::ExecuteDataTx, true, dataShard, pipeline) {
}
-TExecuteDataTxUnit::~TExecuteDataTxUnit() {
+TExecuteDataTxUnit::~TExecuteDataTxUnit() {
}
bool TExecuteDataTxUnit::IsReadyToExecute(TOperation::TPtr op) const {
@@ -50,7 +50,7 @@ bool TExecuteDataTxUnit::IsReadyToExecute(TOperation::TPtr op) const {
}
EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
- TTransactionContext& txc,
+ TTransactionContext& txc,
const TActorContext& ctx)
{
if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) {
@@ -67,7 +67,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
TSetupSysLocks guardLocks(op, DataShard);
- TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
if (tx->IsTxDataReleased()) {
@@ -94,7 +94,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
}
- IEngineFlat* engine = tx->GetDataTx()->GetEngine();
+ IEngineFlat* engine = tx->GetDataTx()->GetEngine();
Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID());
if (op->IsImmediate() && !tx->ReValidateKeys()) {
@@ -132,7 +132,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
engine->PinPages(pageFaultCount);
throw;
}
- } catch (const TMemoryLimitExceededException&) {
+ } catch (const TMemoryLimitExceededException&) {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << DataShard.TabletID()
<< " exceeded memory limit " << txc.GetMemoryLimit()
<< " and requests " << txc.GetMemoryLimit() * MEMORY_REQUEST_FACTOR
@@ -170,9 +170,9 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
TTransactionContext& txc,
- const TActorContext& ctx) {
- TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
- IEngineFlat* engine = tx->GetDataTx()->GetEngine();
+ const TActorContext& ctx) {
+ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ IEngineFlat* engine = tx->GetDataTx()->GetEngine();
DataShard.ReleaseCache(*tx);
tx->GetDataTx()->ResetCounters();
@@ -187,8 +187,8 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
engine->AfterOutgoingReadsetsExtracted();
}
- for (auto& rs : op->InReadSets()) {
- for (auto& rsdata : rs.second) {
+ for (auto& rs : op->InReadSets()) {
+ for (auto& rsdata : rs.second) {
engine->AddIncomingReadset(rsdata.Body);
}
}
@@ -199,35 +199,35 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
IEngineFlat::EResult engineResult = engine->Execute();
if (engineResult != IEngineFlat::EResult::Ok) {
- TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at "
- << DataShard.TabletID() << ": " << engine->GetErrors();
+ TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at "
+ << DataShard.TabletID() << ": " << engine->GetErrors();
switch (engineResult) {
- case IEngineFlat::EResult::ResultTooBig:
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
- break;
- case IEngineFlat::EResult::Cancelled:
- LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
+ case IEngineFlat::EResult::ResultTooBig:
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
+ break;
+ case IEngineFlat::EResult::Cancelled:
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
Y_VERIFY(tx->GetDataTx()->CanCancel());
- break;
- default:
- if (op->IsReadOnly() || op->IsImmediate()) {
- LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
- } else {
- // TODO: Kill only current datashard tablet.
- Y_FAIL_S("Unexpected execution error in read-write transaction: "
- << errorMessage);
- }
- break;
+ break;
+ default:
+ if (op->IsReadOnly() || op->IsImmediate()) {
+ LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, errorMessage);
+ } else {
+ // TODO: Kill only current datashard tablet.
+ Y_FAIL_S("Unexpected execution error in read-write transaction: "
+ << errorMessage);
+ }
+ break;
}
}
if (engineResult == IEngineFlat::EResult::Cancelled)
DataShard.IncCounter(op->IsImmediate()
- ? COUNTER_IMMEDIATE_TX_CANCELLED
- : COUNTER_PLANNED_TX_CANCELLED);
+ ? COUNTER_IMMEDIATE_TX_CANCELLED
+ : COUNTER_PLANNED_TX_CANCELLED);
- auto& result = BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
+ auto& result = BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
result->Record.SetOrderId(op->GetTxId());
if (!op->IsImmediate())
result->Record.SetStep(op->GetStep());
@@ -250,13 +250,13 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Executed operation " << *op << " at tablet " << DataShard.TabletID()
- << " with status " << result->GetStatus());
+ << " with status " << result->GetStatus());
- auto& counters = tx->GetDataTx()->GetCounters();
+ auto& counters = tx->GetDataTx()->GetCounters();
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Datashard execution counters for " << *op << " at "
- << DataShard.TabletID() << ": " << counters.ToString());
+ << DataShard.TabletID() << ": " << counters.ToString());
KqpUpdateDataShardStatCounters(DataShard, counters);
if (tx->GetDataTx()->CollectStats()) {
@@ -272,20 +272,20 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
Pipeline.AddCommittingOp(op);
}
-void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) {
+void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op) {
auto locks = DataShard.SysLocksTable().ApplyLocks();
for (const auto& lock : locks) {
if (lock.IsError()) {
LOG_NOTICE_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD,
"Lock is not set for " << *op << " at " << DataShard.TabletID()
- << " lock " << lock);
+ << " lock " << lock);
}
op->Result()->AddTxLock(lock.LockId, lock.DataShard, lock.Generation, lock.Counter,
lock.SchemeShard, lock.PathId);
}
}
-void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {
+void TExecuteDataTxUnit::Complete(TOperation::TPtr, const TActorContext&) {
}
THolder<TExecutionUnit> CreateExecuteDataTxUnit(TDataShard& dataShard, TPipeline& pipeline) {
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 972b027aa97..70e7be3662b 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -68,7 +68,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
TSetupSysLocks guardLocks(op, DataShard);
- TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
DataShard.ReleaseCache(*tx);
@@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
KqpFillTxStats(DataShard, dataTx->GetCounters(), *op->Result());
KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result());
}
- } catch (const TMemoryLimitExceededException&) {
+ } catch (const TMemoryLimitExceededException&) {
txc.NotEnoughMemory();
LOG_T("Operation " << *op << " at " << tabletId
diff --git a/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp
index 3a6ad08296e..1362a5149f4 100644
--- a/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_scan_tx_unit.cpp
@@ -12,10 +12,10 @@ class TExecuteKqpScanTxUnit : public TExecutionUnit {
public:
TExecuteKqpScanTxUnit(TDataShard& dataShard, TPipeline& pipeline)
: TExecutionUnit(EExecutionUnitKind::ExecuteKqpScanTx, false, dataShard, pipeline) {
- }
+ }
~TExecuteKqpScanTxUnit() override {
- }
+ }
bool IsReadyToExecute(TOperation::TPtr op) const override {
if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && WillRejectDataTx(op)) {
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index b809f689a8c..10900352c3b 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -203,8 +203,8 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// TDqTaskRunner
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-class TDqTaskRunner : public IDqTaskRunner {
-public:
+class TDqTaskRunner : public IDqTaskRunner {
+public:
TDqTaskRunner(const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: Context(context)
, Settings(settings)
@@ -538,7 +538,7 @@ public:
}
if (Y_UNLIKELY(CollectProfileStats)) {
StopWaiting(Stats->FinishTs);
- }
+ }
return ERunStatus::Finished;
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index d8bc72696c4..a0cc816e91e 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -21,12 +21,12 @@
namespace NYql::NDq {
-enum class ERunStatus : ui32 {
- Finished,
+enum class ERunStatus : ui32 {
+ Finished,
PendingInput,
PendingOutput
-};
-
+};
+
class TRunStatusTimeMetrics {
public:
void UpdateStatusTime(TDuration computeCpuTime = TDuration::Zero()) {
diff --git a/ydb/library/yql/dq/runtime/ya.make b/ydb/library/yql/dq/runtime/ya.make
index f407943ffa7..4cc8585431d 100644
--- a/ydb/library/yql/dq/runtime/ya.make
+++ b/ydb/library/yql/dq/runtime/ya.make
@@ -33,7 +33,7 @@ SRCS(
)
GENERATE_ENUM_SERIALIZATION(dq_tasks_runner.h)
-
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto
index aa75f459747..b91351e5d12 100644
--- a/ydb/library/yql/providers/dq/api/protos/service.proto
+++ b/ydb/library/yql/providers/dq/api/protos/service.proto
@@ -21,10 +21,10 @@ message ResponseMetric {
int64 Count = 6;
}
-message ExecuteQueryResult {
- Ydb.ResultSet result = 1;
+message ExecuteQueryResult {
+ Ydb.ResultSet result = 1;
bytes yson = 2;
-}
+}
message TFile {
enum EFileType {
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index f95581bb2b9..65a9e500d9d 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -23,14 +23,14 @@
#include <library/cpp/grpc/server/grpc_counters.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
-
+
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/actors/helpers/future_callback.h>
#include <library/cpp/build_info/build_info.h>
#include <library/cpp/svnversion/svnversion.h>
#include <util/string/split.h>
-#include <util/system/env.h>
+#include <util/system/env.h>
namespace NYql::NDqs {
using namespace NYql::NDqs;