summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <[email protected]>2022-05-20 23:10:18 +0300
committerSergei Puchin <[email protected]>2022-05-20 23:10:18 +0300
commit12266215500c273b247f996c161719cf89bf1e95 (patch)
treec1e32bc44a1ae2b6fd8e04a8c6f7b1567675e3a1
parentf9aad7b0fae8618c015312c913b11aefba000f72 (diff)
Fix row values lifetime for TKqpScanWideReadTableWrapperBase generated code. (KIKIMR-14913)
ref:a54132eea49d2d676785aa7914e696f90376f8b6
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.cpp59
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp42
2 files changed, 67 insertions, 34 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp
index d382795590b..70b33e12fa2 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_table.cpp
@@ -246,8 +246,7 @@ public:
TKqpScanWideReadTableWrapperBase(TKqpScanComputeContext& computeCtx, std::vector<EValueRepresentation>&& representations)
: TBase(this)
, ComputeCtx(computeCtx)
- , Representations(std::move(representations))
- {}
+ , Representations(std::move(representations)) {}
EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const {
Y_UNUSED(ctx);
@@ -263,45 +262,43 @@ public:
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*& block) const {
auto& context = ctx.Codegen->GetContext();
const auto size = GetAllColumnsSize();
- ICodegeneratorInlineWideNode::TGettersList getters(size);
+
+ Row.resize(size);
const auto valueType = Type::getInt128Ty(context);
- const auto valuesType = ArrayType::get(valueType, size);
- const auto values = new AllocaInst(valuesType, 0U, "values", &ctx.Func->getEntryBlock().back());
+ const auto valuePtrType = PointerType::getUnqual(valueType);
+ const auto valuesPtr = CastInst::Create(Instruction::IntToPtr,
+ ConstantInt::get(Type::getInt64Ty(context), uintptr_t(Row.data())),
+ valuePtrType, "values", &ctx.Func->getEntryBlock().back());
+
+ ICodegeneratorInlineWideNode::TGettersList getters(size);
+ const auto indexType = Type::getInt32Ty(context);
+ for (auto i = 0U; i < size; ++i) {
+ getters[i] = [i, valueType, valuesPtr, indexType] (const TCodegenContext&, BasicBlock*& block) {
+ const auto loadPtr = GetElementPtrInst::Create(valueType, valuesPtr,
+ {ConstantInt::get(indexType, i)},
+ (TString("loadPtr_") += ToString(i)).c_str(),
+ block);
+ return new LoadInst(loadPtr, "load", block);
+ };
+ }
- const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), size);
+ const auto fieldsType = ArrayType::get(valuePtrType, size);
const auto fields = new AllocaInst(fieldsType, 0U, "fields", &ctx.Func->getEntryBlock().back());
- const auto indexType = Type::getInt32Ty(context);
Value* init = UndefValue::get(fieldsType);
for (auto i = 0U; i < size; ++i) {
- const auto pointer = GetElementPtrInst::CreateInBounds(values,
- {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)},
+ const auto pointer = GetElementPtrInst::Create(valueType, valuesPtr,
+ {ConstantInt::get(indexType, i)},
(TString("ptr_") += ToString(i)).c_str(),
&ctx.Func->getEntryBlock().back());
- init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(), &ctx.Func->getEntryBlock().back());
-
- new StoreInst(ConstantInt::get(valueType, 0), pointer, &ctx.Func->getEntryBlock().back());
- getters[i] = [i, indexType, values] (const TCodegenContext&, BasicBlock*& block) {
- const auto loadPtr = GetElementPtrInst::CreateInBounds(values,
- {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)},
- (TString("loadPtr_") += ToString(i)).c_str(),
- block);
- return new LoadInst(loadPtr, "load", block);
- };
+ init = InsertValueInst::Create(init, pointer, {i}, (TString("insert_") += ToString(i)).c_str(),
+ &ctx.Func->getEntryBlock().back());
}
new StoreInst(init, fields, &ctx.Func->getEntryBlock().back());
- for (ui32 i = 0U; i < size; ++i) {
- const auto pointer = GetElementPtrInst::CreateInBounds(values,
- {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)},
- (TString("ptr_") += ToString(i)).c_str(), block);
- ValueCleanup(Representations[i], pointer, ctx, block);
- new StoreInst(ConstantInt::get(valueType, 0), pointer, block);
- }
-
const auto ptrType = PointerType::getUnqual(StructType::get(context));
const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TKqpScanWideReadTableWrapperBase::DoCalculate));
const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
@@ -309,13 +306,6 @@ public:
const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block);
const auto result = CallInst::Create(funcPtr, { self, ctx.Ctx, fields }, "fetch", block);
- for (ui32 i = 0U; i < size; ++i) {
- const auto pointer = GetElementPtrInst::CreateInBounds(values,
- {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)},
- (TString("ptr_") += ToString(i)).c_str(), block);
- ValueRelease(Representations[i], pointer, ctx, block);
- }
-
return {result, std::move(getters)};
}
#endif
@@ -327,6 +317,7 @@ private:
TKqpScanComputeContext& ComputeCtx;
mutable TIntrusivePtr<IKqpTableReader> TableReader;
const std::vector<EValueRepresentation> Representations;
+ mutable std::vector<NUdf::TUnboxedValue> Row;
};
class TKqpScanWideReadTableWrapper : public TKqpScanWideReadTableWrapperBase {
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index f96545316d7..cf45985c599 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -624,6 +624,48 @@ Y_UNIT_TEST_SUITE(KqpScan) {
test(false);
}
+ Y_UNIT_TEST_TWIN(Join4, UseSessionActor) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg());
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateSampleTables(kikimr);
+
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+ CREATE TABLE Tmp (
+ Id Int32,
+ Value Uint64,
+ PRIMARY KEY(Id)
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ UPSERT INTO Tmp (Id, Value) VALUES
+ (100, 300),
+ (200, 300),
+ (300, 100),
+ (400, 300);
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync());
+
+ auto it = db.StreamExecuteScanQuery(R"(
+ SELECT t1.Name, t1.Amount, t2.Id
+ FROM Test AS t1
+ LEFT JOIN Tmp AS t2
+ ON t1.Amount = t2.Value;
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ CompareYson(R"([
+ [["Anna"];[3500u];#];
+ [["Paul"];[300u];[100]];
+ [["Paul"];[300u];[200]];
+ [["Paul"];[300u];[400]];
+ [["Tony"];[7200u];#]
+ ])", StreamResultToYson(it));
+ }
+
Y_UNIT_TEST_TWIN(LeftSemiJoinSimple, UseSessionActor) {
auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg());
auto db = kikimr.GetTableClient();