summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <[email protected]>2023-06-23 12:34:10 +0300
committernsofya <[email protected]>2023-06-23 12:34:10 +0300
commitad927b4ea86d4c53ad2393d6714e78b2115344be (patch)
tree9db332c9e3dcf42df8ffefd3d2bb02d898687987
parentcf5721e6720a858911ba779b066590df89d6b35c (diff)
Implement json_value
-rw-r--r--ydb/core/formats/arrow/program.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/ut_program.cpp163
-rw-r--r--ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp43
-rw-r--r--ydb/library/yql/core/arrow_kernels/request/request.cpp66
-rw-r--r--ydb/library/yql/core/arrow_kernels/request/request.h1
5 files changed, 275 insertions, 4 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index 528766a30a3..333430594f1 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -143,7 +143,11 @@ public:
if (!arguments) {
return arrow::Status::Invalid("Error parsing args.");
}
- return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
+ try {
+ return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
+ } catch (const std::exception& ex) {
+ return arrow::Status::ExecutionError(ex.what());
+ }
}
};
diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp
index f6be9dcdc6a..f2a41f57a28 100644
--- a/ydb/core/tx/columnshard/engines/ut_program.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_program.cpp
@@ -88,6 +88,19 @@ Y_UNIT_TEST_SUITE(TestProgram) {
return ReqBuilder->JsonExists(blockOptJsonType, scalarStringType, blockBoolType);
}
+ ui32 AddJsonValue(bool isBinaryType = true, NYql::EDataSlot resultType = NYql::EDataSlot::Utf8) {
+ NYql::TExprContext ctx;
+ auto blockOptJsonType = ctx.template MakeType<NYql::TBlockExprType>(
+ ctx.template MakeType<NYql::TOptionalExprType>(
+ ctx.template MakeType<NYql::TDataExprType>(isBinaryType ? NYql::EDataSlot::JsonDocument : NYql::EDataSlot::Json)));
+ auto scalarStringType = ctx.template MakeType<NYql::TScalarExprType>(ctx.template MakeType<NYql::TDataExprType>(NYql::EDataSlot::Utf8));
+ auto blockResultType = ctx.template MakeType<NYql::TBlockExprType>(
+ ctx.template MakeType<NYql::TOptionalExprType>(
+ ctx.template MakeType<NYql::TDataExprType>(resultType)));
+
+ return ReqBuilder->JsonValue(blockOptJsonType, scalarStringType, blockResultType);
+ }
+
TString Serialize() {
return ReqBuilder->Serialize();
}
@@ -300,6 +313,156 @@ Y_UNIT_TEST_SUITE(TestProgram) {
JsonExistsImpl(true);
}
+ void JsonValueImpl(bool isBinaryType, NYql::EDataSlot resultType) {
+ TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);
+ TIndexColumnResolver columnResolver(indexInfo);
+
+ NKikimrSSA::TProgram programProto;
+ {
+ auto* command = programProto.AddCommand();
+ auto* constantProto = command->MutableAssign()->MutableConstant();
+ constantProto->SetText("$.key");
+ command->MutableAssign()->MutableColumn()->SetName("json_path");
+ }
+ {
+ auto* command = programProto.AddCommand();
+ auto* functionProto = command->MutableAssign()->MutableFunction();
+ functionProto->SetFunctionType(NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL);
+ functionProto->SetKernelIdx(0);
+ functionProto->AddArguments()->SetName("json_data");
+ functionProto->AddArguments()->SetName("json_path");
+ functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH);
+ }
+ {
+ auto* command = programProto.AddCommand();
+ auto* prjectionProto = command->MutableProjection();
+ auto* column = prjectionProto->AddColumns();
+ column->SetName("0");
+ }
+
+ TKernelsWrapper kernels;
+ kernels.AddJsonValue(isBinaryType, resultType);
+ const auto programSerialized = SerializeProgram(programProto, kernels.Serialize());
+
+ TProgramContainer program;
+ TString errors;
+ UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors);
+
+ TTableUpdatesBuilder updates(NArrow::MakeArrowSchema({{"json_data", TTypeInfo(isBinaryType ? NTypeIds::JsonDocument : NTypeIds::Utf8) }}));
+ {
+ NJson::TJsonValue testJson;
+ testJson["key"] = "value";
+ updates.AddRow().Add<std::string>(testJson.GetStringRobust());
+ }
+ {
+ NJson::TJsonValue testJson;
+ testJson["key"] = 10;
+ updates.AddRow().Add<std::string>(testJson.GetStringRobust());
+ }
+ {
+ NJson::TJsonValue testJson;
+ testJson["key"] = 0.1;
+ updates.AddRow().Add<std::string>(testJson.GetStringRobust());
+ }
+ {
+ NJson::TJsonValue testJson;
+ testJson["key"] = false;
+ updates.AddRow().Add<std::string>(testJson.GetStringRobust());
+ }
+ {
+ NJson::TJsonValue testJson;
+ testJson["another"] = "value";
+ updates.AddRow().Add<std::string>(testJson.GetStringRobust());
+ }
+ {
+ updates.AddRow().Add<std::string>(NJson::TJsonValue(NJson::JSON_ARRAY).GetStringRobust());
+ }
+
+ auto batch = updates.BuildArrow();
+ Cerr << batch->ToString() << Endl;
+
+ if (isBinaryType) {
+ THashMap<TString, NScheme::TTypeInfo> cc;
+ cc["json_data"] = TTypeInfo(NTypeIds::JsonDocument);
+ batch = NArrow::ConvertColumns(batch, cc);
+ Cerr << batch->ToString() << Endl;
+ }
+
+ auto res = program.ApplyProgram(batch);
+ UNIT_ASSERT_C(res.ok(), res.ToString());
+
+ Cerr << "Check output for " << resultType << Endl;
+ if (resultType == NYql::EDataSlot::Utf8) {
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Utf8)) }));
+
+ result.AddRow().Add<std::string>("value");
+ result.AddRow().Add<std::string>("10");
+ result.AddRow().Add<std::string>("0.1");
+ result.AddRow().Add<std::string>("false");
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ } else if (resultType == NYql::EDataSlot::Bool) {
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Uint8)) }));
+
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+ result.AddRow().Add<ui8>(0);
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ } else if (resultType == NYql::EDataSlot::Int64 || resultType == NYql::EDataSlot::Uint64) {
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Int64)) }));
+
+ result.AddRow().AddNull();
+ result.AddRow().Add<i64>(10);
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ } else if (resultType == NYql::EDataSlot::Double || resultType == NYql::EDataSlot::Float) {
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Double)) }));
+
+ result.AddRow().AddNull();
+ result.AddRow().Add<double>(10);
+ result.AddRow().Add<double>(0.1);
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+ result.AddRow().AddNull();
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ } else {
+ Y_FAIL("Not implemented");
+ }
+ }
+
+ Y_UNIT_TEST(JsonValue) {
+ JsonValueImpl(false, NYql::EDataSlot::Utf8);
+ JsonValueImpl(false, NYql::EDataSlot::Bool);
+ JsonValueImpl(false, NYql::EDataSlot::Int64);
+ JsonValueImpl(false, NYql::EDataSlot::Uint64);
+ JsonValueImpl(false, NYql::EDataSlot::Float);
+ JsonValueImpl(false, NYql::EDataSlot::Double);
+ }
+
+ Y_UNIT_TEST(JsonValueBinary) {
+ JsonValueImpl(true, NYql::EDataSlot::Utf8);
+ JsonValueImpl(true, NYql::EDataSlot::Bool);
+ JsonValueImpl(true, NYql::EDataSlot::Int64);
+ JsonValueImpl(true, NYql::EDataSlot::Uint64);
+ JsonValueImpl(true, NYql::EDataSlot::Float);
+ JsonValueImpl(true, NYql::EDataSlot::Double);
+ }
+
Y_UNIT_TEST(SimpleFunction) {
TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);;
TIndexColumnResolver columnResolver(indexInfo);
diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
index 797343859a6..8cab9844ddb 100644
--- a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
+++ b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
@@ -163,4 +163,47 @@ Y_UNIT_TEST_SUITE(TKernelRegistryTest) {
return b.JsonExists(blockJsonType, scalarUtf8Type, blockOptBoolType);
});
}
+
+ void TesJsonValueImpl(EDataSlot jsonType, NYql::EDataSlot resultType) {
+ TestOne([&](auto& b,auto& ctx) {
+ auto blockJsonType = ctx.template MakeType<TBlockExprType>(
+ ctx.template MakeType<TDataExprType>(jsonType));
+ auto scalarUtf8Type = ctx.template MakeType<TScalarExprType>(
+ ctx.template MakeType<TDataExprType>(EDataSlot::Utf8));
+ auto blockOptType = ctx.template MakeType<TBlockExprType>(
+ ctx.template MakeType<TOptionalExprType>(
+ ctx.template MakeType<TDataExprType>(resultType)));
+ return b.JsonValue(blockJsonType, scalarUtf8Type, blockOptType);
+ });
+ }
+
+ Y_UNIT_TEST(TestJsonValueUtf8) {
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Utf8);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Utf8);
+ }
+
+ Y_UNIT_TEST(TestJsonValueBool) {
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Bool);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Bool);
+ }
+
+ Y_UNIT_TEST(TestJsonValueInt64) {
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Int64);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Int64);
+ }
+
+ Y_UNIT_TEST(TestJsonValueUint64) {
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Uint64);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Uint64);
+ }
+
+ Y_UNIT_TEST(TestJsonValueFloat) {;
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Float);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Float);
+ }
+
+ Y_UNIT_TEST(TestJsonValueDouble) {;
+ TesJsonValueImpl(EDataSlot::Json, NYql::EDataSlot::Double);
+ TesJsonValueImpl(EDataSlot::JsonDocument, NYql::EDataSlot::Double);
+ }
}
diff --git a/ydb/library/yql/core/arrow_kernels/request/request.cpp b/ydb/library/yql/core/arrow_kernels/request/request.cpp
index 8cd544e80fe..0eb3a8df415 100644
--- a/ydb/library/yql/core/arrow_kernels/request/request.cpp
+++ b/ydb/library/yql/core/arrow_kernels/request/request.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
namespace NYql {
@@ -92,10 +93,13 @@ ui32 TKernelRequestBuilder::Udf(const TString& name, bool isPolymorphic, const s
ui32 TKernelRequestBuilder::JsonExists(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) {
TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_);
+
+ bool isScalar = false;
+ bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument);
- bool isBinaryJson = (RemoveOptionalType(arg1Type->Cast<TBlockExprType>()->GetItemType())->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument);
-
- auto exists = Pb_.Udf(isBinaryJson ? "Json2.JsonDocumentSqlExists" : "Json2.SqlExists");
+ auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" ) << "SqlExists";
+
+ auto exists = Pb_.Udf(udfName);
auto parse = Pb_.Udf("Json2.Parse");
auto compilePath = Pb_.Udf("Json2.CompilePath");
auto outType = MakeType(retType);
@@ -122,6 +126,62 @@ ui32 TKernelRequestBuilder::JsonExists(const TTypeAnnotationNode* arg1Type, cons
return Items_.size() - 1;
}
+ui32 TKernelRequestBuilder::JsonValue(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) {
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_);
+
+ bool isScalar = false;
+ bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument);
+ auto resultSlot = RemoveOptionalType(NYql::GetBlockItemType(*retType, isScalar))->Cast<TDataExprType>()->GetSlot();
+
+ auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" );
+ if (NYql::IsDataTypeFloat(resultSlot)) {
+ udfName << "SqlValueNumber";
+ } else if (NYql::IsDataTypeNumeric(resultSlot)) {
+ udfName << "SqlValueInt64";
+ } else if (NYql::IsDataTypeString(resultSlot)) {
+ udfName << "SqlValueConvertToUtf8";
+ } else if (resultSlot == EDataSlot::Bool) {
+ udfName << "SqlValueBool";
+ } else {
+ Y_ENSURE(false, "Invalid return type");
+ }
+
+ auto jsonValue = Pb_.Udf(udfName);
+
+ auto parse = Pb_.Udf("Json2.Parse");
+ auto compilePath = Pb_.Udf("Json2.CompilePath");
+ auto outType = MakeType(retType);
+ auto arg1 = MakeArg(arg1Type);
+ auto arg2 = MakeArg(arg2Type);
+
+ auto scalarApply = Pb_.ScalarApply({arg1, arg2}, [&](const auto& args) {
+ auto json = args[0];
+ auto processJson = [&](auto unpacked) {
+ auto input = Pb_.NewOptional( isBinaryJson ? unpacked : Pb_.Apply(parse, { unpacked }));
+ auto path = Pb_.Apply(compilePath, { args[1] });
+ auto dictType = Pb_.NewDictType(Pb_.NewDataType(NUdf::EDataSlot::Utf8), Pb_.NewResourceType("JsonNode"), false);
+ auto resultTuple = Pb_.Apply(jsonValue, { input, path, Pb_.NewDict(dictType, {})});
+ return Pb_.VisitAll(resultTuple, [&](ui32 index, NKikimr::NMiniKQL::TRuntimeNode item) {
+ if (index == 0) {
+ return Pb_.NewEmptyOptional(outType->GetItemType());
+ }
+ Y_ENSURE(index == 1);
+ return Pb_.Cast(item, outType->GetItemType());
+ });
+ };
+
+ if (json.GetStaticType()->IsOptional()) {
+ return Pb_.FlatMap(json, processJson);
+ } else {
+ return processJson(json);
+ }
+ });
+
+ Y_ENSURE(outType->IsSameType(*scalarApply.GetStaticType()));
+ Items_.emplace_back(scalarApply);
+ return Items_.size() - 1;
+}
+
TString TKernelRequestBuilder::Serialize() {
TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard(Alloc_);
auto kernelTuple = Items_.empty() ? Pb_.AsScalar(Pb_.NewEmptyTuple()) : Pb_.BlockAsTuple(Items_);
diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h
index 5e5b385c9c5..3aa4715a343 100644
--- a/ydb/library/yql/core/arrow_kernels/request/request.h
+++ b/ydb/library/yql/core/arrow_kernels/request/request.h
@@ -30,6 +30,7 @@ public:
ui32 Udf(const TString& name, bool isPolymorphic, const std::vector<const TTypeAnnotationNode*>& argTypes, const TTypeAnnotationNode* retType);
// (json/json?,utf8)->bool/bool?
ui32 JsonExists(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType);
+ ui32 JsonValue(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType);
TString Serialize();
private: