diff options
author | Sergei Puchin <[email protected]> | 2022-05-20 23:10:18 +0300 |
---|---|---|
committer | Sergei Puchin <[email protected]> | 2022-05-20 23:10:18 +0300 |
commit | 12266215500c273b247f996c161719cf89bf1e95 (patch) | |
tree | c1e32bc44a1ae2b6fd8e04a8c6f7b1567675e3a1 | |
parent | f9aad7b0fae8618c015312c913b11aefba000f72 (diff) |
Fix row values lifetime for TKqpScanWideReadTableWrapperBase generated code. (KIKIMR-14913)
ref:a54132eea49d2d676785aa7914e696f90376f8b6
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_table.cpp | 59 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 42 |
2 files changed, 67 insertions, 34 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp index d382795590b..70b33e12fa2 100644 --- a/ydb/core/kqp/runtime/kqp_read_table.cpp +++ b/ydb/core/kqp/runtime/kqp_read_table.cpp @@ -246,8 +246,7 @@ public: TKqpScanWideReadTableWrapperBase(TKqpScanComputeContext& computeCtx, std::vector<EValueRepresentation>&& representations) : TBase(this) , ComputeCtx(computeCtx) - , Representations(std::move(representations)) - {} + , Representations(std::move(representations)) {} EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const { Y_UNUSED(ctx); @@ -263,45 +262,43 @@ public: ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*& block) const { auto& context = ctx.Codegen->GetContext(); const auto size = GetAllColumnsSize(); - ICodegeneratorInlineWideNode::TGettersList getters(size); + + Row.resize(size); const auto valueType = Type::getInt128Ty(context); - const auto valuesType = ArrayType::get(valueType, size); - const auto values = new AllocaInst(valuesType, 0U, "values", &ctx.Func->getEntryBlock().back()); + const auto valuePtrType = PointerType::getUnqual(valueType); + const auto valuesPtr = CastInst::Create(Instruction::IntToPtr, + ConstantInt::get(Type::getInt64Ty(context), uintptr_t(Row.data())), + valuePtrType, "values", &ctx.Func->getEntryBlock().back()); + + ICodegeneratorInlineWideNode::TGettersList getters(size); + const auto indexType = Type::getInt32Ty(context); + for (auto i = 0U; i < size; ++i) { + getters[i] = [i, valueType, valuesPtr, indexType] (const TCodegenContext&, BasicBlock*& block) { + const auto loadPtr = GetElementPtrInst::Create(valueType, valuesPtr, + {ConstantInt::get(indexType, i)}, + (TString("loadPtr_") += ToString(i)).c_str(), + block); + return new LoadInst(loadPtr, "load", block); + }; + } - const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), size); + const auto fieldsType = ArrayType::get(valuePtrType, size); const auto fields = new AllocaInst(fieldsType, 0U, "fields", &ctx.Func->getEntryBlock().back()); - const auto indexType = Type::getInt32Ty(context); Value* init = UndefValue::get(fieldsType); for (auto i = 0U; i < size; ++i) { - const auto pointer = GetElementPtrInst::CreateInBounds(values, - {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, + const auto pointer = GetElementPtrInst::Create(valueType, valuesPtr, + {ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), &ctx.Func->getEntryBlock().back()); - init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(), &ctx.Func->getEntryBlock().back()); - - new StoreInst(ConstantInt::get(valueType, 0), pointer, &ctx.Func->getEntryBlock().back()); - getters[i] = [i, indexType, values] (const TCodegenContext&, BasicBlock*& block) { - const auto loadPtr = GetElementPtrInst::CreateInBounds(values, - {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, - (TString("loadPtr_") += ToString(i)).c_str(), - block); - return new LoadInst(loadPtr, "load", block); - }; + init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(), + &ctx.Func->getEntryBlock().back()); } new StoreInst(init, fields, &ctx.Func->getEntryBlock().back()); - for (ui32 i = 0U; i < size; ++i) { - const auto pointer = GetElementPtrInst::CreateInBounds(values, - {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, - (TString("ptr_") += ToString(i)).c_str(), block); - ValueCleanup(Representations[i], pointer, ctx, block); - new StoreInst(ConstantInt::get(valueType, 0), pointer, block); - } - const auto ptrType = PointerType::getUnqual(StructType::get(context)); const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TKqpScanWideReadTableWrapperBase::DoCalculate)); const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block); @@ -309,13 +306,6 @@ public: const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block); const auto result = CallInst::Create(funcPtr, { self, ctx.Ctx, fields }, "fetch", block); - for (ui32 i = 0U; i < size; ++i) { - const auto pointer = GetElementPtrInst::CreateInBounds(values, - {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, - (TString("ptr_") += ToString(i)).c_str(), block); - ValueRelease(Representations[i], pointer, ctx, block); - } - return {result, std::move(getters)}; } #endif @@ -327,6 +317,7 @@ private: TKqpScanComputeContext& ComputeCtx; mutable TIntrusivePtr<IKqpTableReader> TableReader; const std::vector<EValueRepresentation> Representations; + mutable std::vector<NUdf::TUnboxedValue> Row; }; class TKqpScanWideReadTableWrapper : public TKqpScanWideReadTableWrapperBase { diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index f96545316d7..cf45985c599 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -624,6 +624,48 @@ Y_UNIT_TEST_SUITE(KqpScan) { test(false); } + Y_UNIT_TEST_TWIN(Join4, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg()); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateSampleTables(kikimr); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE Tmp ( + Id Int32, + Value Uint64, + PRIMARY KEY(Id) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + --!syntax_v1 + UPSERT INTO Tmp (Id, Value) VALUES + (100, 300), + (200, 300), + (300, 100), + (400, 300); + )", TTxControl::BeginTx().CommitTx()).GetValueSync()); + + auto it = db.StreamExecuteScanQuery(R"( + SELECT t1.Name, t1.Amount, t2.Id + FROM Test AS t1 + LEFT JOIN Tmp AS t2 + ON t1.Amount = t2.Value; + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + CompareYson(R"([ + [["Anna"];[3500u];#]; + [["Paul"];[300u];[100]]; + [["Paul"];[300u];[200]]; + [["Paul"];[300u];[400]]; + [["Tony"];[7200u];#] + ])", StreamResultToYson(it)); + } + Y_UNIT_TEST_TWIN(LeftSemiJoinSimple, UseSessionActor) { auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg()); auto db = kikimr.GetTableClient(); |