diff options
| author | nsofya <[email protected]> | 2023-06-23 12:34:10 +0300 |
|---|---|---|
| committer | nsofya <[email protected]> | 2023-06-23 12:34:10 +0300 |
| commit | ad927b4ea86d4c53ad2393d6714e78b2115344be (patch) | |
| tree | 9db332c9e3dcf42df8ffefd3d2bb02d898687987 | |
| parent | cf5721e6720a858911ba779b066590df89d6b35c (diff) | |
Implement json_value
| -rw-r--r-- | ydb/core/formats/arrow/program.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/engines/ut_program.cpp | 163 | ||||
| -rw-r--r-- | ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp | 43 | ||||
| -rw-r--r-- | ydb/library/yql/core/arrow_kernels/request/request.cpp | 66 | ||||
| -rw-r--r-- | ydb/library/yql/core/arrow_kernels/request/request.h | 1 |
5 files changed, 275 insertions, 4 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 528766a30a3..333430594f1 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -143,7 +143,11 @@ public: if (!arguments) { return arrow::Status::Invalid("Error parsing args."); } - return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx); + try { + return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx); + } catch (const std::exception& ex) { + return arrow::Status::ExecutionError(ex.what()); + } } }; diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp index f6be9dcdc6a..f2a41f57a28 100644 --- a/ydb/core/tx/columnshard/engines/ut_program.cpp +++ b/ydb/core/tx/columnshard/engines/ut_program.cpp @@ -88,6 +88,19 @@ Y_UNIT_TEST_SUITE(TestProgram) { return ReqBuilder->JsonExists(blockOptJsonType, scalarStringType, blockBoolType); } + ui32 AddJsonValue(bool isBinaryType = true, NYql::EDataSlot resultType = NYql::EDataSlot::Utf8) { + NYql::TExprContext ctx; + auto blockOptJsonType = ctx.template MakeType<NYql::TBlockExprType>( + ctx.template MakeType<NYql::TOptionalExprType>( + ctx.template MakeType<NYql::TDataExprType>(isBinaryType ? NYql::EDataSlot::JsonDocument : NYql::EDataSlot::Json))); + auto scalarStringType = ctx.template MakeType<NYql::TScalarExprType>(ctx.template MakeType<NYql::TDataExprType>(NYql::EDataSlot::Utf8)); + auto blockResultType = ctx.template MakeType<NYql::TBlockExprType>( + ctx.template MakeType<NYql::TOptionalExprType>( + ctx.template MakeType<NYql::TDataExprType>(resultType))); + + return ReqBuilder->JsonValue(blockOptJsonType, scalarStringType, blockResultType); + } + TString Serialize() { return ReqBuilder->Serialize(); } @@ -300,6 +313,156 @@ Y_UNIT_TEST_SUITE(TestProgram) { JsonExistsImpl(true); } + void JsonValueImpl(bool isBinaryType, NYql::EDataSlot resultType) { + TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey); + TIndexColumnResolver columnResolver(indexInfo); + + NKikimrSSA::TProgram programProto; + { + auto* command = programProto.AddCommand(); + auto* constantProto = command->MutableAssign()->MutableConstant(); + constantProto->SetText("$.key"); + command->MutableAssign()->MutableColumn()->SetName("json_path"); + } + { + auto* command = programProto.AddCommand(); + auto* functionProto = command->MutableAssign()->MutableFunction(); + functionProto->SetFunctionType(NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL); + functionProto->SetKernelIdx(0); + functionProto->AddArguments()->SetName("json_data"); + functionProto->AddArguments()->SetName("json_path"); + functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH); + } + { + auto* command = programProto.AddCommand(); + auto* prjectionProto = command->MutableProjection(); + auto* column = prjectionProto->AddColumns(); + column->SetName("0"); + } + + TKernelsWrapper kernels; + kernels.AddJsonValue(isBinaryType, resultType); + const auto programSerialized = SerializeProgram(programProto, kernels.Serialize()); + + TProgramContainer program; + TString errors; + UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors); + + TTableUpdatesBuilder updates(NArrow::MakeArrowSchema({{"json_data", TTypeInfo(isBinaryType ? NTypeIds::JsonDocument : NTypeIds::Utf8) }})); + { + NJson::TJsonValue testJson; + testJson["key"] = "value"; + updates.AddRow().Add<std::string>(testJson.GetStringRobust()); + } + { + NJson::TJsonValue testJson; + testJson["key"] = 10; + updates.AddRow().Add<std::string>(testJson.GetStringRobust()); + } + { + NJson::TJsonValue testJson; + testJson["key"] = 0.1; + updates.AddRow().Add<std::string>(testJson.GetStringRobust()); + } + { + NJson::TJsonValue testJson; + testJson["key"] = false; + updates.AddRow().Add<std::string>(testJson.GetStringRobust()); + } + { + NJson::TJsonValue testJson; + testJson["another"] = "value"; + updates.AddRow().Add<std::string>(testJson.GetStringRobust()); + } + { + updates.AddRow().Add<std::string>(NJson::TJsonValue(NJson::JSON_ARRAY).GetStringRobust()); + } + + auto batch = updates.BuildArrow(); + Cerr << batch->ToString() << Endl; + + if (isBinaryType) { + THashMap<TString, NScheme::TTypeInfo> cc; + cc["json_data"] = TTypeInfo(NTypeIds::JsonDocument); + batch = NArrow::ConvertColumns(batch, cc); + Cerr << batch->ToString() << Endl; + } + + auto res = program.ApplyProgram(batch); + UNIT_ASSERT_C(res.ok(), res.ToString()); + + Cerr << "Check output for " << resultType << Endl; + if (resultType == NYql::EDataSlot::Utf8) { + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Utf8)) })); + + result.AddRow().Add<std::string>("value"); + result.AddRow().Add<std::string>("10"); + result.AddRow().Add<std::string>("0.1"); + result.AddRow().Add<std::string>("false"); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } else if (resultType == NYql::EDataSlot::Bool) { + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Uint8)) })); + + result.AddRow().AddNull(); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + result.AddRow().Add<ui8>(0); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } else if (resultType == NYql::EDataSlot::Int64 || resultType == NYql::EDataSlot::Uint64) { + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Int64)) })); + + result.AddRow().AddNull(); + result.AddRow().Add<i64>(10); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } else if (resultType == NYql::EDataSlot::Double || resultType == NYql::EDataSlot::Float) { + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Double)) })); + + result.AddRow().AddNull(); + result.AddRow().Add<double>(10); + result.AddRow().Add<double>(0.1); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + result.AddRow().AddNull(); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } else { + Y_FAIL("Not implemented"); + } + } + + Y_UNIT_TEST(JsonValue) { + JsonValueImpl(false, NYql::EDataSlot::Utf8); + JsonValueImpl(false, NYql::EDataSlot::Bool); + JsonValueImpl(false, NYql::EDataSlot::Int64); + JsonValueImpl(false, NYql::EDataSlot::Uint64); + JsonValueImpl(false, NYql::EDataSlot::Float); + JsonValueImpl(false, NYql::EDataSlot::Double); + } + + Y_UNIT_TEST(JsonValueBinary) { + JsonValueImpl(true, NYql::EDataSlot::Utf8); + JsonValueImpl(true, NYql::EDataSlot::Bool); + JsonValueImpl(true, NYql::EDataSlot::Int64); + JsonValueImpl(true, NYql::EDataSlot::Uint64); + JsonValueImpl(true, NYql::EDataSlot::Float); + JsonValueImpl(true, NYql::EDataSlot::Double); + } + Y_UNIT_TEST(SimpleFunction) { TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);; TIndexColumnResolver columnResolver(indexInfo); diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp index 797343859a6..8cab9844ddb 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp @@ -163,4 +163,47 @@ Y_UNIT_TEST_SUITE(TKernelRegistryTest) { return b.JsonExists(blockJsonType, scalarUtf8Type, blockOptBoolType); }); } + + void TesJsonValueImpl(EDataSlot jsonType, NYql::EDataSlot resultType) { + TestOne([&](auto& b,auto& ctx) { + auto blockJsonType = ctx.template MakeType<TBlockExprType>( + ctx.template MakeType<TDataExprType>(jsonType)); + auto scalarUtf8Type = ctx.template MakeType<TScalarExprType>( + ctx.template MakeType<TDataExprType>(EDataSlot::Utf8)); + auto blockOptType = ctx.template MakeType<TBlockExprType>( + ctx.template MakeType<TOptionalExprType>( + ctx.template MakeType<TDataExprType>(resultType))); + return b.JsonValue(blockJsonType, scalarUtf8Type, blockOptType); + }); + } + + Y_UNIT_TEST(TestJsonValueUtf8) { + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Utf8); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Utf8); + } + + Y_UNIT_TEST(TestJsonValueBool) { + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Bool); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Bool); + } + + Y_UNIT_TEST(TestJsonValueInt64) { + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Int64); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Int64); + } + + Y_UNIT_TEST(TestJsonValueUint64) { + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Uint64); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Uint64); + } + + Y_UNIT_TEST(TestJsonValueFloat) {; + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Float); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Float); + } + + Y_UNIT_TEST(TestJsonValueDouble) {; + TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Double); + TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Double); + } } diff --git a/ydb/library/yql/core/arrow_kernels/request/request.cpp b/ydb/library/yql/core/arrow_kernels/request/request.cpp index 8cd544e80fe..0eb3a8df415 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.cpp +++ b/ydb/library/yql/core/arrow_kernels/request/request.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/yql_expr_type_annotation.h> namespace NYql { @@ -92,10 +93,13 @@ ui32 TKernelRequestBuilder::Udf(const TString& name, bool isPolymorphic, const s ui32 TKernelRequestBuilder::JsonExists(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) { TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_); + + bool isScalar = false; + bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument); - bool isBinaryJson = (RemoveOptionalType(arg1Type->Cast<TBlockExprType>()->GetItemType())->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument); - - auto exists = Pb_.Udf(isBinaryJson ? "Json2.JsonDocumentSqlExists" : "Json2.SqlExists"); + auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" ) << "SqlExists"; + + auto exists = Pb_.Udf(udfName); auto parse = Pb_.Udf("Json2.Parse"); auto compilePath = Pb_.Udf("Json2.CompilePath"); auto outType = MakeType(retType); @@ -122,6 +126,62 @@ ui32 TKernelRequestBuilder::JsonExists(const TTypeAnnotationNode* arg1Type, cons return Items_.size() - 1; } +ui32 TKernelRequestBuilder::JsonValue(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) { + TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_); + + bool isScalar = false; + bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument); + auto resultSlot = RemoveOptionalType(NYql::GetBlockItemType(*retType, isScalar))->Cast<TDataExprType>()->GetSlot(); + + auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" ); + if (NYql::IsDataTypeFloat(resultSlot)) { + udfName << "SqlValueNumber"; + } else if (NYql::IsDataTypeNumeric(resultSlot)) { + udfName << "SqlValueInt64"; + } else if (NYql::IsDataTypeString(resultSlot)) { + udfName << "SqlValueConvertToUtf8"; + } else if (resultSlot == EDataSlot::Bool) { + udfName << "SqlValueBool"; + } else { + Y_ENSURE(false, "Invalid return type"); + } + + auto jsonValue = Pb_.Udf(udfName); + + auto parse = Pb_.Udf("Json2.Parse"); + auto compilePath = Pb_.Udf("Json2.CompilePath"); + auto outType = MakeType(retType); + auto arg1 = MakeArg(arg1Type); + auto arg2 = MakeArg(arg2Type); + + auto scalarApply = Pb_.ScalarApply({arg1, arg2}, [&](const auto& args) { + auto json = args[0]; + auto processJson = [&](auto unpacked) { + auto input = Pb_.NewOptional( isBinaryJson ? unpacked : Pb_.Apply(parse, { unpacked })); + auto path = Pb_.Apply(compilePath, { args[1] }); + auto dictType = Pb_.NewDictType(Pb_.NewDataType(NUdf::EDataSlot::Utf8), Pb_.NewResourceType("JsonNode"), false); + auto resultTuple = Pb_.Apply(jsonValue, { input, path, Pb_.NewDict(dictType, {})}); + return Pb_.VisitAll(resultTuple, [&](ui32 index, NKikimr::NMiniKQL::TRuntimeNode item) { + if (index == 0) { + return Pb_.NewEmptyOptional(outType->GetItemType()); + } + Y_ENSURE(index == 1); + return Pb_.Cast(item, outType->GetItemType()); + }); + }; + + if (json.GetStaticType()->IsOptional()) { + return Pb_.FlatMap(json, processJson); + } else { + return processJson(json); + } + }); + + Y_ENSURE(outType->IsSameType(*scalarApply.GetStaticType())); + Items_.emplace_back(scalarApply); + return Items_.size() - 1; +} + TString TKernelRequestBuilder::Serialize() { TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_); auto kernelTuple = Items_.empty() ? Pb_.AsScalar(Pb_.NewEmptyTuple()) : Pb_.BlockAsTuple(Items_); diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h index 5e5b385c9c5..3aa4715a343 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.h +++ b/ydb/library/yql/core/arrow_kernels/request/request.h @@ -30,6 +30,7 @@ public: ui32 Udf(const TString& name, bool isPolymorphic, const std::vector<const TTypeAnnotationNode*>& argTypes, const TTypeAnnotationNode* retType); // (json/json?,utf8)->bool/bool? ui32 JsonExists(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType); + ui32 JsonValue(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType); TString Serialize(); private: |
