diff options
author | Alexander Smirnov <[email protected]> | 2025-01-11 00:21:49 +0000 |
---|---|---|
committer | Alexander Smirnov <[email protected]> | 2025-01-11 00:21:49 +0000 |
commit | 457aacf7daabd8837feef98d1edcfe62420a1f47 (patch) | |
tree | 3f8ca7735aac2ab4574833bf4ea5e1881a02ef84 | |
parent | af411bb10f1133d6e7f4c6324a89dde2f745d675 (diff) | |
parent | 2d3b7f1966f9716551a0d7db72a9608addab8ecf (diff) |
Merge branch 'rightlib' into merge-libs-250111-0020
85 files changed, 1873 insertions, 1883 deletions
diff --git a/build/conf/licenses.json b/build/conf/licenses.json index 7bdd6738bd8..0c77cb38a12 100644 --- a/build/conf/licenses.json +++ b/build/conf/licenses.json @@ -331,7 +331,6 @@ "X11", "X11-Lucent", "X11-XConsortium-Veillard", - "YandexOpen", "Zlib", "ZPL-2.1" ] diff --git a/build/conf/linkers/ld.conf b/build/conf/linkers/ld.conf index df576648f3d..b189bf1e179 100644 --- a/build/conf/linkers/ld.conf +++ b/build/conf/linkers/ld.conf @@ -105,7 +105,7 @@ elsewhen ($OS_LINUX == "yes" || $OS_DARWIN == "yes" || $OS_IOS == "yes") { _LD_USE_STDLIB=-nodefaultlibs } -_C_SYSTEM_LIBRARIES=$_LD_USE_STDLIB $_LD_THREAD_LIBRARY $_LD_SYS_LIB -lc +_C_SYSTEM_LIBRARIES=$_LD_USE_STDLIB $_LD_THREAD_LIBRARY -lc when ($MUSL == "yes") { _C_SYSTEM_LIBRARIES=-nostdlib } @@ -171,7 +171,7 @@ when ($OS_EMSCRIPTEN == "yes") { } _SHARED_FLAG=-shared when ($OS_EMSCRIPTEN == "yes") { - _SHARED_FLAG=-Wl,-shared -Wl,--no-entry -Wl,--export-all -Wl,--import-memory -Wl,--import-undefined + _SHARED_FLAG=-Wl,-shared -Wl,--no-entry -Wl,--export-all -Wl,--import-memory -Wl,--import-undefined } _EXEC_SHARED_FLAG= when ($OS_LINUX == "yes") { diff --git a/build/conf/sysincl.conf b/build/conf/sysincl.conf index 421942e0573..8945843f6c9 100644 --- a/build/conf/sysincl.conf +++ b/build/conf/sysincl.conf @@ -106,3 +106,8 @@ otherwise { SYSINCL+=build/sysincl/nvidia-cub.yml SYSINCL+=build/sysincl/nvidia-thrust.yml } + +when ($USE_SYSTEM_OPENSSL == "yes") { + SYSINCL+=build/sysincl/openssl-headers.yml +} + diff --git a/build/mapping.conf.json b/build/mapping.conf.json index eb4c7e6faa4..caee26bb248 100644 --- a/build/mapping.conf.json +++ b/build/mapping.conf.json @@ -72,8 +72,14 @@ "3573996018": "https://devtools-registry.s3.yandex.net/3573996018", "6385637993": "https://devtools-registry.s3.yandex.net/6385637993", "6385637991": "https://devtools-registry.s3.yandex.net/6385637991", + "7804868939": "https://devtools-registry.s3.yandex.net/7804868939", + "7804868953": "https://devtools-registry.s3.yandex.net/7804868953", "1841245955": "https://devtools-registry.s3.yandex.net/1841245955", "1841245849": "https://devtools-registry.s3.yandex.net/1841245849", + "7633873137": "https://devtools-registry.s3.yandex.net/7633873137", + "7633871466": "https://devtools-registry.s3.yandex.net/7633871466", + "7633869171": "https://devtools-registry.s3.yandex.net/7633869171", + "7633866901": "https://devtools-registry.s3.yandex.net/7633866901", "5424057306": "https://devtools-registry.s3.yandex.net/5424057306", "5424061624": "https://devtools-registry.s3.yandex.net/5424061624", "5424033677": "https://devtools-registry.s3.yandex.net/5424033677", @@ -138,9 +144,13 @@ "6740765286": "https://devtools-registry.s3.yandex.net/6740765286", "6657758332": "https://devtools-registry.s3.yandex.net/6657758332", "6782718840": "https://devtools-registry.s3.yandex.net/6782718840", + "7793716307": "https://devtools-registry.s3.yandex.net/7793716307", "7412448309": "https://devtools-registry.s3.yandex.net/7412448309", + "7793563818": "https://devtools-registry.s3.yandex.net/7793563818", "7412319411": "https://devtools-registry.s3.yandex.net/7412319411", + "7793786096": "https://devtools-registry.s3.yandex.net/7793786096", "7412751020": "https://devtools-registry.s3.yandex.net/7412751020", + "7793618601": "https://devtools-registry.s3.yandex.net/7793618601", "7412874863": "https://devtools-registry.s3.yandex.net/7412874863", "7421008516": "https://devtools-registry.s3.yandex.net/7421008516", "7421180051": "https://devtools-registry.s3.yandex.net/7421180051", @@ -1017,6 +1027,8 @@ "3166999959": "https://devtools-registry.s3.yandex.net/3166999959", "65627450": "https://devtools-registry.s3.yandex.net/65627450", "65627451": "https://devtools-registry.s3.yandex.net/65627451", + "6442167169": "https://devtools-registry.s3.yandex.net/6442167169", + "7503032594": "https://devtools-registry.s3.yandex.net/7503032594", "5631222854": "https://devtools-registry.s3.yandex.net/5631222854", "5631220729": "https://devtools-registry.s3.yandex.net/5631220729", "5631255103": "https://devtools-registry.s3.yandex.net/5631255103", @@ -1141,13 +1153,7 @@ "7686710688": "https://devtools-registry.s3.yandex.net/7686710688", "2980468199": "https://devtools-registry.s3.yandex.net/2980468199", "5562224408": "https://devtools-registry.s3.yandex.net/5562224408", - "7663495611": "https://devtools-registry.s3.yandex.net/7663495611", - "6442167169": "https://devtools-registry.s3.yandex.net/6442167169", - "7503032594": "https://devtools-registry.s3.yandex.net/7503032594", - "7633866901": "https://devtools-registry.s3.yandex.net/7633866901", - "7633869171": "https://devtools-registry.s3.yandex.net/7633869171", - "7633871466": "https://devtools-registry.s3.yandex.net/7633871466", - "7633873137": "https://devtools-registry.s3.yandex.net/7633873137" + "7663495611": "https://devtools-registry.s3.yandex.net/7663495611" }, "resources_descriptions": { "6277415836": "Allure 2.29.0", @@ -1221,8 +1227,14 @@ "3573996018": "JAVA_LIBRARY-none-none-proto-google-common-protos-2.9.0.jar", "6385637993": "JAVA_LIBRARY-none-none-protobuf-java-3.25.3-sources.jar", "6385637991": "JAVA_LIBRARY-none-none-protobuf-java-3.25.3.jar", + "7804868939": "JAVA_LIBRARY-none-none-protobuf-java-3.25.5-sources.jar", + "7804868953": "JAVA_LIBRARY-none-none-protobuf-java-3.25.5.jar", "1841245955": "JAVA_LIBRARY-none-none-snakeyaml-1.27-sources.jar", "1841245849": "JAVA_LIBRARY-none-none-snakeyaml-1.27.jar", + "7633873137": "Node.js 20.18.1 for darwin-arm64", + "7633871466": "Node.js 20.18.1 for darwin-x64", + "7633869171": "Node.js 20.18.1 for linux-arm64", + "7633866901": "Node.js 20.18.1 for linux-x64", "5424057306": "OTHER_RESOURCE-none-1.21.3-y_go1.21.3.darwin-amd64.tar.gz", "5424061624": "OTHER_RESOURCE-none-1.21.3-y_go1.21.3.darwin-arm64.tar.gz", "5424033677": "OTHER_RESOURCE-none-1.21.3-y_go1.21.3.linux-amd64.tar.gz", @@ -1287,9 +1299,13 @@ "6740765286": "bin-lld-16-optimized-linux-x86_64-30f81a61922d8f6d21a62ad26f7a3711ec368b9f", "6657758332": "bin-lld-16-optimized-linux-x86_64-6fcb1f9a0ea89cca05d938ce61f89490b56940c4", "6782718840": "bin-lld-16-optimized-linux-x86_64-e9ee8ec890c372b0d234bbdc8d711e991af44797", + "7793716307": "bin-lld-18-darwin-arm64-707d422c95662f1a2f53ee2de8c4d05db9eaeb33", "7412448309": "bin-lld-18-darwin-arm64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", + "7793563818": "bin-lld-18-darwin-x86_64-707d422c95662f1a2f53ee2de8c4d05db9eaeb33", "7412319411": "bin-lld-18-darwin-x86_64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", + "7793786096": "bin-lld-18-linux-aarch64-707d422c95662f1a2f53ee2de8c4d05db9eaeb33", "7412751020": "bin-lld-18-linux-aarch64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", + "7793618601": "bin-lld-18-linux-x86_64-707d422c95662f1a2f53ee2de8c4d05db9eaeb33", "7412874863": "bin-lld-18-linux-x86_64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", "7421008516": "bin-mold-darwin-arm64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", "7421180051": "bin-mold-darwin-x86_64-9ed8f351aa52a09a3d6ab1d977afb583bf69d8d5", @@ -2166,6 +2182,8 @@ "3166999959": "openjdk 11.0.15 vanilla for windows", "65627450": "org.jetbrains-annotations-13.0-sources.jar", "65627451": "org.jetbrains-annotations-13.0.jar", + "6442167169": "pnpm 8.6.12", + "7503032594": "pnpm 9.12.3 (default)", "5631222854": "python3 for darwin", "5631220729": "python3 for darwin-arm64", "5631255103": "python3 for linux", @@ -2290,13 +2308,7 @@ "7686710688": "yt/go/ytrecipe/cmd/ytexec for linux", "2980468199": "ytexec for linux", "5562224408": "ytexec for linux", - "7663495611": "ytexec for linux", - "6442167169": "pnpm 8.6.12", - "7503032594": "pnpm 9.12.3 (default)", - "7633866901": "Node.js 20.18.1 for linux-x64", - "7633869171": "Node.js 20.18.1 for linux-arm64", - "7633871466": "Node.js 20.18.1 for darwin-x64", - "7633873137": "Node.js 20.18.1 for darwin-arm64" + "7663495611": "ytexec for linux" }, "resources_info": {}, "tasks": {} diff --git a/build/platform/lld/lld18.json b/build/platform/lld/lld18.json index 8ac4f7068c1..b548d05491c 100644 --- a/build/platform/lld/lld18.json +++ b/build/platform/lld/lld18.json @@ -1,16 +1,16 @@ { "by_platform": { "darwin-arm64": { - "uri": "sbr:7412448309" + "uri": "sbr:7793716307" }, "darwin-x86_64": { - "uri": "sbr:7412319411" + "uri": "sbr:7793563818" }, "linux-aarch64": { - "uri": "sbr:7412751020" + "uri": "sbr:7793786096" }, "linux-x86_64": { - "uri": "sbr:7412874863" + "uri": "sbr:7793618601" } } } diff --git a/build/sysincl/openssl-headers.yml b/build/sysincl/openssl-headers.yml new file mode 100644 index 00000000000..98c4253568c --- /dev/null +++ b/build/sysincl/openssl-headers.yml @@ -0,0 +1,124 @@ +- includes: + - openssl/aes.h + - openssl/asn1.h + - openssl/asn1_mac.h + - openssl/asn1err.h + - openssl/asn1t.h + - openssl/async.h + - openssl/asyncerr.h + - openssl/bio.h + - openssl/bioerr.h + - openssl/blowfish.h + - openssl/bn.h + - openssl/bnerr.h + - openssl/buffer.h + - openssl/buffererr.h + - openssl/camellia.h + - openssl/cast.h + - openssl/cmac.h + - openssl/cms.h + - openssl/cmserr.h + - openssl/comp.h + - openssl/comperr.h + - openssl/conf.h + - openssl/conf_api.h + - openssl/conferr.h + - openssl/configurator.h + - openssl/crypto.h + - openssl/cryptoerr.h + - openssl/ct.h + - openssl/cterr.h + - openssl/des.h + - openssl/dh.h + - openssl/dherr.h + - openssl/dsa.h + - openssl/dsaerr.h + - openssl/dtls1.h + - openssl/e_os2.h + - openssl/ebcdic.h + - openssl/ec.h + - openssl/ecdh.h + - openssl/ecdsa.h + - openssl/ecerr.h + - openssl/engine.h + - openssl/engineerr.h + - openssl/err.h + - openssl/evp.h + - openssl/evperr.h + - openssl/fips.h + - openssl/fips_rand.h + - openssl/hmac.h + - openssl/idea.h + - openssl/kdf.h + - openssl/kdferr.h + - openssl/lhash.h + - openssl/md2.h + - openssl/md4.h + - openssl/md5.h + - openssl/mdc2.h + - openssl/modes.h + - openssl/obj_mac.h + - openssl/objects.h + - openssl/objectserr.h + - openssl/ocsp.h + - openssl/ocsperr.h + - openssl/opensslconf-android-arm.h + - openssl/opensslconf-android-arm64.h + - openssl/opensslconf-android-i686.h + - openssl/opensslconf-android-x86_64.h + - openssl/opensslconf-i386.h + - openssl/opensslconf-ios-arm64.h + - openssl/opensslconf-ios-x86_64.h + - openssl/opensslconf-linux-aarch64.h + - openssl/opensslconf-linux-arm.h + - openssl/opensslconf-linux.h + - openssl/opensslconf-osx-arm64.h + - openssl/opensslconf-osx.h + - openssl/opensslconf-win-i686.h + - openssl/opensslconf-win-x86_64.h + - openssl/opensslconf-x86_64.h + - openssl/opensslconf.h + - openssl/opensslconf.h + - openssl/opensslv.h + - openssl/ossl_typ.h + - openssl/pem.h + - openssl/pem2.h + - openssl/pemerr.h + - openssl/pkcs12.h + - openssl/pkcs12err.h + - openssl/pkcs7.h + - openssl/pkcs7err.h + - openssl/rand.h + - openssl/rand_drbg.h + - openssl/randerr.h + - openssl/rc2.h + - openssl/rc4.h + - openssl/rc5.h + - openssl/ripemd.h + - openssl/rsa.h + - openssl/rsaerr.h + - openssl/safestack.h + - openssl/seed.h + - openssl/sha.h + - openssl/srp.h + - openssl/srtp.h + - openssl/ssl.h + - openssl/ssl2.h + - openssl/ssl3.h + - openssl/sslerr.h + - openssl/stack.h + - openssl/store.h + - openssl/storeerr.h + - openssl/symhacks.h + - openssl/tls1.h + - openssl/ts.h + - openssl/tserr.h + - openssl/txt_db.h + - openssl/ui.h + - openssl/uierr.h + - openssl/whrlpool.h + - openssl/x509.h + - openssl/x509_vfy.h + - openssl/x509err.h + - openssl/x509v3.h + - openssl/x509v3err.h diff --git a/build/ymake.core.conf b/build/ymake.core.conf index 2f1189a24fe..f8cd4341f53 100644 --- a/build/ymake.core.conf +++ b/build/ymake.core.conf @@ -1084,10 +1084,6 @@ module _LINK_UNIT: _BASE_UNIT { PEERDIR+=contrib/libs/clang${CLANG_VER}-rt/lib/profile } - when ($USE_LIBCXXRT == "yes") { - PEERDIR += contrib/libs/cxxsupp/libcxxrt - } - when ($USE_DYNAMIC_CUDA == "yes") { LINK_DYN_LIB_FLAGS += --dynamic-cuda LINK_SCRIPT_EXE_FLAGS += --dynamic-cuda diff --git a/build/ymake_conf.py b/build/ymake_conf.py index 1ab3e1cd215..83a8fc726e3 100755 --- a/build/ymake_conf.py +++ b/build/ymake_conf.py @@ -1070,11 +1070,6 @@ class GnuToolchainOptions(ToolchainOptions): self.dwarf_tool = self.target.find_in_dict(self.params.get('dwarf_tool')) - # TODO(somov): Унифицировать формат sys_lib - self.sys_lib = self.params.get('sys_lib', {}) - if isinstance(self.sys_lib, dict): - self.sys_lib = self.target.find_in_dict(self.sys_lib, []) - self.os_sdk = preset('OS_SDK') or self._default_os_sdk() self.os_sdk_local = False @@ -1718,8 +1713,6 @@ class LD(Linker): if self.ld_sdk: self.ld_flags.append(self.ld_sdk) - self.sys_lib = self.tc.sys_lib - if target.is_android: # Use toolchain defaults to link with libunwind/clang_rt.builtins self.use_stdlib = '-nostdlib++' @@ -1737,7 +1730,6 @@ class LD(Linker): emit('OBJDUMP_TOOL_VENDOR', self.objdump) emit('_LD_FLAGS', self.ld_flags) - emit('_LD_SYS_LIB', self.sys_lib) emit('LD_SDK_VERSION', self.ld_sdk) dwarf_tool = self.tc.dwarf_tool diff --git a/contrib/libs/apache/avro/.yandex_meta/__init__.py b/contrib/libs/apache/avro/.yandex_meta/__init__.py index 87d048e50de..94d44b84e89 100644 --- a/contrib/libs/apache/avro/.yandex_meta/__init__.py +++ b/contrib/libs/apache/avro/.yandex_meta/__init__.py @@ -10,7 +10,6 @@ def post_install(self): boost.make_arcdir("any"), boost.make_arcdir("asio"), boost.make_arcdir("crc"), - boost.make_arcdir("format"), boost.make_arcdir("math"), ] diff --git a/contrib/libs/apache/avro/ya.make b/contrib/libs/apache/avro/ya.make index fe166427959..d4fa25728dc 100644 --- a/contrib/libs/apache/avro/ya.make +++ b/contrib/libs/apache/avro/ya.make @@ -19,7 +19,6 @@ PEERDIR( contrib/restricted/boost/any contrib/restricted/boost/asio contrib/restricted/boost/crc - contrib/restricted/boost/format contrib/restricted/boost/iostreams contrib/restricted/boost/math ) diff --git a/contrib/libs/cxxsupp/libcxx/include/__config_site b/contrib/libs/cxxsupp/libcxx/include/__config_site index dc81d088fef..38a2b6abe42 100644 --- a/contrib/libs/cxxsupp/libcxx/include/__config_site +++ b/contrib/libs/cxxsupp/libcxx/include/__config_site @@ -52,8 +52,14 @@ # define _LIBCPP_HAS_MUSL_LIBC #endif -#ifdef NDEBUG -# define _LIBCPP_HARDENING_MODE_DEFAULT _LIBCPP_HARDENING_MODE_NONE +#if defined(__has_feature) && (__has_feature(address_sanitizer) || \ + __has_feature(leak_sanitizer) || \ + __has_feature(thread_sanitizer) || \ + __has_feature(memory_sanitizer) || \ + __has_feature(undefined_behavior_sanitizer)) +# define _LIBCPP_HARDENING_MODE_DEFAULT _LIBCPP_HARDENING_MODE_FAST +#elif defined(NDEBUG) +# define _LIBCPP_HARDENING_MODE_DEFAULT _LIBCPP_HARDENING_MODE_NONE #else # define _LIBCPP_HARDENING_MODE_DEFAULT _LIBCPP_HARDENING_MODE_FAST #endif diff --git a/contrib/libs/openssl/system_openssl.ya.inc b/contrib/libs/openssl/system_openssl.ya.inc new file mode 100644 index 00000000000..a128025c339 --- /dev/null +++ b/contrib/libs/openssl/system_openssl.ya.inc @@ -0,0 +1,26 @@ +LIBRARY() + +WITHOUT_LICENSE_TEXTS() + +SUBSCRIBER(g:cpp-contrib) +PROVIDES(openssl) + +ORIGINAL_SOURCE(https://github.com/openssl/openssl) +VERSION(system-version) +LICENSE(OpenSSL) + +NO_RUNTIME() + +IF (NOT SYSTEM_OPENSSL_INCLUDE) + FATAL("You should provide path to OpenSSL include dir via -DSYSTEM_OPENSSL_INCLUDE=/abs/path/to/openssl/includes") +ENDIF() + +IF (NOT SYSTEM_OPENSSL_LIB) + FATAL("You should provide path to static OpenSSL library via -DSYSTEM_OPENSSL_LIB=/abs/path/to/openssl/lib/dir") +ENDIF() + +EXTRALIBS_STATIC($SYSTEM_OPENSSL_LIB/libcrypto.a) +EXTRALIBS_STATIC($SYSTEM_OPENSSL_LIB/libssl.a) +CFLAGS(GLOBAL -I$SYSTEM_OPENSSL_INCLUDE) + +END() diff --git a/contrib/libs/openssl/ya.make b/contrib/libs/openssl/ya.make index c5962936a69..67c4f080f9d 100644 --- a/contrib/libs/openssl/ya.make +++ b/contrib/libs/openssl/ya.make @@ -1,5 +1,7 @@ # Generated by devtools/yamaker from nixpkgs 22.05. - +IF (USE_SYSTEM_OPENSSL) +INCLUDE(system_openssl.ya.inc) +ELSE() LIBRARY() VERSION(1.1.1t) @@ -350,3 +352,4 @@ RECURSE( apps crypto ) +ENDIF() # IF (USE_SYSTEM_OPENSSL) diff --git a/contrib/libs/simdjson/.yandex_meta/override.nix b/contrib/libs/simdjson/.yandex_meta/override.nix index 0fac8d7ba04..eba60dbb21c 100644 --- a/contrib/libs/simdjson/.yandex_meta/override.nix +++ b/contrib/libs/simdjson/.yandex_meta/override.nix @@ -1,11 +1,11 @@ pkgs: attrs: with pkgs; rec { - version = "3.11.4"; + version = "3.11.5"; src = fetchFromGitHub { owner = "simdjson"; repo = "simdjson"; rev = "v${version}"; - hash = "sha256-mcsMp9P9+3ACHkykJitHADoZ35kBeUza2LN+EPnq8RU="; + hash = "sha256-p7xRrdYZoWXVsuSF45GvB3pw5Ndxpyq/Hi2Uka3mUdQ="; }; cmakeFlags = attrs.cmakeFlags ++ [ diff --git a/contrib/libs/simdjson/include/simdjson/generic/ondemand/raw_json_string-inl.h b/contrib/libs/simdjson/include/simdjson/generic/ondemand/raw_json_string-inl.h index 5b814dd801e..6d36a3741ee 100644 --- a/contrib/libs/simdjson/include/simdjson/generic/ondemand/raw_json_string-inl.h +++ b/contrib/libs/simdjson/include/simdjson/generic/ondemand/raw_json_string-inl.h @@ -20,36 +20,39 @@ simdjson_inline const char * raw_json_string::raw() const noexcept { return rein simdjson_inline bool raw_json_string::is_free_from_unescaped_quote(std::string_view target) noexcept { size_t pos{0}; - // if the content has no escape character, just scan through it quickly! - for(;pos < target.size() && target[pos] != '\\';pos++) {} - // slow path may begin. - bool escaping{false}; - for(;pos < target.size();pos++) { - if((target[pos] == '"') && !escaping) { - return false; - } else if(target[pos] == '\\') { - escaping = !escaping; - } else { - escaping = false; + while(pos < target.size()) { + pos = target.find('"', pos); + if(pos == std::string_view::npos) { return true; } + if(pos != 0 && target[pos-1] != '\\') { return false; } + if(pos > 1 && target[pos-2] == '\\') { + size_t backslash_count{2}; + for(size_t i = 3; i <= pos; i++) { + if(target[pos-i] == '\\') { backslash_count++; } + else { break; } + } + if(backslash_count % 2 == 0) { return false; } } + pos++; } return true; } simdjson_inline bool raw_json_string::is_free_from_unescaped_quote(const char* target) noexcept { size_t pos{0}; - // if the content has no escape character, just scan through it quickly! - for(;target[pos] && target[pos] != '\\';pos++) {} - // slow path may begin. - bool escaping{false}; - for(;target[pos];pos++) { - if((target[pos] == '"') && !escaping) { - return false; - } else if(target[pos] == '\\') { - escaping = !escaping; - } else { - escaping = false; + while(target[pos]) { + const char * result = strchr(target+pos, '"'); + if(result == nullptr) { return true; } + pos = result - target; + if(pos != 0 && target[pos-1] != '\\') { return false; } + if(pos > 1 && target[pos-2] == '\\') { + size_t backslash_count{2}; + for(size_t i = 3; i <= pos; i++) { + if(target[pos-i] == '\\') { backslash_count++; } + else { break; } + } + if(backslash_count % 2 == 0) { return false; } } + pos++; } return true; } @@ -61,7 +64,7 @@ simdjson_inline bool raw_json_string::unsafe_is_equal(size_t length, std::string } simdjson_inline bool raw_json_string::unsafe_is_equal(std::string_view target) const noexcept { - // Assumptions: does not contain unescaped quote characters, and + // Assumptions: does not contain unescaped quote characters("), and // the raw content is quote terminated within a valid JSON string. if(target.size() <= SIMDJSON_PADDING) { return (raw()[target.size()] == '"') && !memcmp(raw(), target.data(), target.size()); diff --git a/contrib/libs/simdjson/include/simdjson/simdjson_version.h b/contrib/libs/simdjson/include/simdjson/simdjson_version.h index 0f102f299d4..bb4e157d255 100644 --- a/contrib/libs/simdjson/include/simdjson/simdjson_version.h +++ b/contrib/libs/simdjson/include/simdjson/simdjson_version.h @@ -4,7 +4,7 @@ #define SIMDJSON_SIMDJSON_VERSION_H /** The version of simdjson being used (major.minor.revision) */ -#define SIMDJSON_VERSION "3.11.4" +#define SIMDJSON_VERSION "3.11.5" namespace simdjson { enum { @@ -19,7 +19,7 @@ enum { /** * The revision (major.minor.REVISION) of simdjson being used. */ - SIMDJSON_VERSION_REVISION = 4 + SIMDJSON_VERSION_REVISION = 5 }; } // namespace simdjson diff --git a/contrib/libs/simdjson/ya.make b/contrib/libs/simdjson/ya.make index f9f60907d8d..52bef657aee 100644 --- a/contrib/libs/simdjson/ya.make +++ b/contrib/libs/simdjson/ya.make @@ -10,9 +10,9 @@ LICENSE( LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(3.11.4) +VERSION(3.11.5) -ORIGINAL_SOURCE(https://github.com/simdjson/simdjson/archive/v3.11.4.tar.gz) +ORIGINAL_SOURCE(https://github.com/simdjson/simdjson/archive/v3.11.5.tar.gz) ADDINCL( GLOBAL contrib/libs/simdjson/include diff --git a/library/cpp/monlib/service/ya.make b/library/cpp/monlib/service/ya.make index b07b3c9d12f..5ecd0e80c23 100644 --- a/library/cpp/monlib/service/ya.make +++ b/library/cpp/monlib/service/ya.make @@ -10,7 +10,6 @@ SRCS( PEERDIR( library/cpp/string_utils/base64 - contrib/libs/protobuf library/cpp/coroutine/engine library/cpp/coroutine/listener library/cpp/http/fetch @@ -18,6 +17,7 @@ PEERDIR( library/cpp/http/io library/cpp/logger library/cpp/malloc/api + library/cpp/protobuf/runtime library/cpp/svnversion library/cpp/uri library/cpp/cgiparam diff --git a/library/cpp/regex/hyperscan/hyperscan.cpp b/library/cpp/regex/hyperscan/hyperscan.cpp index 0a4bfcb9ec5..bfb9d7b5ff9 100644 --- a/library/cpp/regex/hyperscan/hyperscan.cpp +++ b/library/cpp/regex/hyperscan/hyperscan.cpp @@ -96,6 +96,27 @@ namespace NHyperscan { return db; } + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags, hs_platform_info_t* platform) { + hs_database_t* rawDb = nullptr; + hs_compile_error_t* rawCompileErr = nullptr; + hs_error_t status = hs_compile_lit( + literal.data(), + flags, + literal.size(), + HS_MODE_BLOCK, + platform, + &rawDb, + &rawCompileErr); + TDatabase db(rawDb); + NHyperscan::TCompileError compileError(rawCompileErr); + if (status != HS_SUCCESS) { + ythrow TCompileException() + << "Failed to compile literal: " << literal << ". " + << "Error message (hyperscan): " << compileError->message; + } + return db; + } + TDatabase CompileMulti( const TVector<const char*>& regexs, const TVector<unsigned int>& flags, @@ -150,6 +171,61 @@ namespace NHyperscan { return db; } + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens, + hs_platform_info_t* platform) + { + unsigned int count = literals.size(); + if (flags.size() != count) { + ythrow yexception() + << "Mismatch of sizes vectors passed to CompileMultiLiteral. " + << "size(literals) = " << literals.size() << ". " + << "size(flags) = " << flags.size() << "."; + } + if (ids.size() != count) { + ythrow yexception() + << "Mismatch of sizes vectors passed to CompileMultiLiteral. " + << "size(literals) = " << literals.size() << ". " + << "size(ids) = " << ids.size() << "."; + } + if (lens.size() != count) { + ythrow yexception() + << "Mismatch of sizes vectors passed to CompileMultiLiteral. " + << "size(literals) = " << literals.size() << ". " + << "size(lens) = " << lens.size() << "."; + } + hs_database_t* rawDb = nullptr; + hs_compile_error_t* rawCompileErr = nullptr; + hs_error_t status = hs_compile_lit_multi( + literals.data(), + flags.data(), + ids.data(), + lens.data(), + count, + HS_MODE_BLOCK, + platform, + &rawDb, + &rawCompileErr); + TDatabase db(rawDb); + NHyperscan::TCompileError compileError(rawCompileErr); + if (status != HS_SUCCESS) { + if (compileError->expression >= 0) { + const char* literal = literals[compileError->expression]; + ythrow TCompileException() + << "Failed to compile literal: " << literal << ". " + << "Error message (hyperscan): " << compileError->message; + } else { + ythrow TCompileException() + << "Failed to compile multiple literals. " + << "Error message (hyperscan): " << compileError->message; + } + } + return db; + } + bool Matches( const TDatabase& db, const TScratch& scratch, @@ -180,6 +256,16 @@ namespace NHyperscan { return NPrivate::Compile(regex, flags, &platformInfo); } + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags) { + auto platformInfo = NPrivate::MakeCurrentPlatformInfo(); + return NPrivate::CompileLiteral(literal, flags, &platformInfo); + } + + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags, TCPUFeatures cpuFeatures) { + auto platformInfo = NPrivate::MakePlatformInfo(cpuFeatures); + return NPrivate::CompileLiteral(literal, flags, &platformInfo); + } + TDatabase CompileMulti( const TVector<const char*>& regexs, const TVector<unsigned int>& flags, @@ -201,6 +287,27 @@ namespace NHyperscan { return NPrivate::CompileMulti(regexs, flags, ids, &platformInfo, extendedParameters); } + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens) + { + auto platformInfo = NPrivate::MakeCurrentPlatformInfo(); + return NPrivate::CompileMultiLiteral(literals, flags, ids, lens, &platformInfo); + } + + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens, + TCPUFeatures cpuFeatures) + { + auto platformInfo = NPrivate::MakePlatformInfo(cpuFeatures); + return NPrivate::CompileMultiLiteral(literals, flags, ids, lens, &platformInfo); + } + TScratch MakeScratch(const TDatabase& db) { hs_scratch_t* rawScratch = nullptr; hs_error_t status = Singleton<NPrivate::TImpl>()->AllocScratch(db.Get(), &rawScratch); diff --git a/library/cpp/regex/hyperscan/hyperscan.h b/library/cpp/regex/hyperscan/hyperscan.h index eae82fa3842..7f8c877b076 100644 --- a/library/cpp/regex/hyperscan/hyperscan.h +++ b/library/cpp/regex/hyperscan/hyperscan.h @@ -60,6 +60,8 @@ namespace NHyperscan { TDatabase Compile(const TStringBuf& regex, unsigned int flags, hs_platform_info_t* platform); + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags, hs_platform_info_t* platform); + TDatabase CompileMulti( const TVector<const char*>& regexs, const TVector<unsigned int>& flags, @@ -67,6 +69,13 @@ namespace NHyperscan { hs_platform_info_t* platform, const TVector<const hs_expr_ext_t*>* extendedParameters = nullptr); + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens, + hs_platform_info_t* platform); + // We need to parametrize Scan and Matches functions for testing purposes template<typename TCallback> void Scan( @@ -118,6 +127,10 @@ namespace NHyperscan { TDatabase Compile(const TStringBuf& regex, unsigned int flags, TCPUFeatures cpuFeatures); + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags); + + TDatabase CompileLiteral(const TStringBuf& literal, unsigned int flags, TCPUFeatures cpuFeatures); + TDatabase CompileMulti( const TVector<const char*>& regexs, const TVector<unsigned int>& flags, @@ -131,6 +144,19 @@ namespace NHyperscan { TCPUFeatures cpuFeatures, const TVector<const hs_expr_ext_t*>* extendedParameters = nullptr); + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens); + + TDatabase CompileMultiLiteral( + const TVector<const char*>& literals, + const TVector<unsigned int>& flags, + const TVector<unsigned int>& ids, + const TVector<size_t>& lens, + TCPUFeatures cpuFeatures); + TScratch MakeScratch(const TDatabase& db); void GrowScratch(TScratch& scratch, const TDatabase& db); diff --git a/library/cpp/regex/hyperscan/ut/hyperscan_ut.cpp b/library/cpp/regex/hyperscan/ut/hyperscan_ut.cpp index 75cd0bcc897..063ca3dd035 100644 --- a/library/cpp/regex/hyperscan/ut/hyperscan_ut.cpp +++ b/library/cpp/regex/hyperscan/ut/hyperscan_ut.cpp @@ -27,6 +27,22 @@ Y_UNIT_TEST_SUITE(HyperscanWrappers) { UNIT_ASSERT_EQUAL(foundId, 0); } + Y_UNIT_TEST(CompileLiteralAndScan) { + TDatabase db = CompileLiteral("a.c?[)", HS_FLAG_SINGLEMATCH); + TScratch scratch = MakeScratch(db); + + unsigned int foundId = 42; + auto callback = [&](unsigned int id, unsigned long long /* from */, unsigned long long /* to */) { + foundId = id; + }; + NHyperscan::Scan( + db, + scratch, + "a.c?[)", + callback); + UNIT_ASSERT_EQUAL(foundId, 0); + } + Y_UNIT_TEST(Matches) { NHyperscan::TDatabase db = NHyperscan::Compile( "a.c", @@ -71,6 +87,49 @@ Y_UNIT_TEST_SUITE(HyperscanWrappers) { UNIT_ASSERT(foundIds.contains(241)); } + Y_UNIT_TEST(MultiLiteral) { + static const TVector<TString> LITERALS = { + "foo.", + "bar.", + }; + NHyperscan::TDatabase db = NHyperscan::CompileMultiLiteral( + { + LITERALS[0].c_str(), + LITERALS[1].c_str(), + }, + { + HS_FLAG_SINGLEMATCH, + HS_FLAG_SINGLEMATCH | HS_FLAG_CASELESS, + }, + { + 42, + 241, + }, + { + LITERALS[0].size(), + LITERALS[1].size(), + }); + NHyperscan::TScratch scratch = NHyperscan::MakeScratch(db); + + UNIT_ASSERT(NHyperscan::Matches(db, scratch, "foo.")); + UNIT_ASSERT(NHyperscan::Matches(db, scratch, "bar.")); + UNIT_ASSERT(NHyperscan::Matches(db, scratch, "BAR.")); + UNIT_ASSERT(!NHyperscan::Matches(db, scratch, "FOO.")); + + TSet<unsigned int> foundIds; + auto callback = [&](unsigned int id, unsigned long long /* from */, unsigned long long /* to */) { + foundIds.insert(id); + }; + NHyperscan::Scan( + db, + scratch, + "foo.BaR.", + callback); + UNIT_ASSERT_EQUAL(foundIds.size(), 2); + UNIT_ASSERT(foundIds.contains(42)); + UNIT_ASSERT(foundIds.contains(241)); + } + // https://ml.yandex-team.ru/thread/2370000002965712422/ Y_UNIT_TEST(MultiRegression) { NHyperscan::CompileMulti( diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt index fd0339693fa..4ed479feea4 100644 --- a/library/cpp/tld/tlds-alpha-by-domain.txt +++ b/library/cpp/tld/tlds-alpha-by-domain.txt @@ -1,4 +1,4 @@ -# Version 2025010700, Last Updated Tue Jan 7 07:07:01 2025 UTC +# Version 2025011000, Last Updated Fri Jan 10 07:07:01 2025 UTC AAA AARP ABB diff --git a/yql/essentials/core/services/yql_eval_expr.cpp b/yql/essentials/core/services/yql_eval_expr.cpp index 6072abd460f..bc39a9e9c83 100644 --- a/yql/essentials/core/services/yql_eval_expr.cpp +++ b/yql/essentials/core/services/yql_eval_expr.cpp @@ -973,7 +973,7 @@ IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExp clonedArg = ctx.NewCallable(clonedArg->Pos(), "SerializeCode", { clonedArg }); } - TString key; + TString key, yson; NYT::TNode ysonNode; if (types.QContext) { key = MakeCacheKey(*clonedArg); @@ -988,74 +988,80 @@ IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExp } do { - calcProvider.Clear(); - calcWorldRoot.Drop(); - fullTransformer->Rewind(); - auto prevSteps = ctx.Step; - TEvalScope scope(types); - ctx.Step.Reset(); - if (prevSteps.IsDone(TExprStep::Recapture)) { - ctx.Step.Done(TExprStep::Recapture); - } - status = SyncTransform(*fullTransformer, clonedArg, ctx); - ctx.Step = prevSteps; - if (status.Level == IGraphTransformer::TStatus::Error) { - return nullptr; - } + if (ysonNode.IsUndefined() && isAtomPipeline && clonedArg->IsCallable("String")) { + ysonNode = NYT::TNode()("Data",NYT::TNode(clonedArg->Head().Content())); + yson = NYT::NodeToYsonString(ysonNode, NYT::NYson::EYsonFormat::Binary); + } else { + calcProvider.Clear(); + calcWorldRoot.Drop(); + fullTransformer->Rewind(); + auto prevSteps = ctx.Step; + TEvalScope scope(types); + ctx.Step.Reset(); + if (prevSteps.IsDone(TExprStep::Recapture)) { + ctx.Step.Done(TExprStep::Recapture); + } + status = SyncTransform(*fullTransformer, clonedArg, ctx); + ctx.Step = prevSteps; + if (status.Level == IGraphTransformer::TStatus::Error) { + return nullptr; + } - // execute calcWorldRoot - auto execTransformer = CreateExecutionTransformer(types, [](const TOperationProgress&){}, false); - status = SyncTransform(*execTransformer, calcWorldRoot, ctx); - if (status.Level == IGraphTransformer::TStatus::Error) { - return nullptr; - } + // execute calcWorldRoot + auto execTransformer = CreateExecutionTransformer(types, [](const TOperationProgress&){}, false); + status = SyncTransform(*execTransformer, calcWorldRoot, ctx); + if (status.Level == IGraphTransformer::TStatus::Error) { + return nullptr; + } - if (types.QContext.CanRead()) { - break; - } + if (types.QContext.CanRead()) { + break; + } - IDataProvider::TFillSettings fillSettings; - auto delegatedNode = Build<TResult>(ctx, node->Pos()) - .Input(clonedArg) - .BytesLimit() - .Value(TString()) - .Build() - .RowsLimit() - .Value(TString()) - .Build() - .FormatDetails() - .Value(ToString((ui32)NYson::EYsonFormat::Binary)) - .Build() - .Settings().Build() - .Format() - .Value(ToString((ui32)IDataProvider::EResultFormat::Yson)) - .Build() - .PublicId() - .Value(TString()) - .Build() - .Discard() - .Value("false") - .Build() - .Origin(calcWorldRoot) - .Done().Ptr(); - - auto atomType = ctx.MakeType<TUnitExprType>(); - for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails, - TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard, TResOrPullBase::idx_Settings }) { - delegatedNode->Child(idx)->SetTypeAnn(atomType); - delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete); - } + IDataProvider::TFillSettings fillSettings; + auto delegatedNode = Build<TResult>(ctx, node->Pos()) + .Input(clonedArg) + .BytesLimit() + .Value(TString()) + .Build() + .RowsLimit() + .Value(TString()) + .Build() + .FormatDetails() + .Value(ToString((ui32)NYson::EYsonFormat::Binary)) + .Build() + .Settings().Build() + .Format() + .Value(ToString((ui32)IDataProvider::EResultFormat::Yson)) + .Build() + .PublicId() + .Value(TString()) + .Build() + .Discard() + .Value("false") + .Build() + .Origin(calcWorldRoot) + .Done().Ptr(); + + auto atomType = ctx.MakeType<TUnitExprType>(); + for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails, + TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard, TResOrPullBase::idx_Settings }) { + delegatedNode->Child(idx)->SetTypeAnn(atomType); + delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete); + } - delegatedNode->SetTypeAnn(atomType); - delegatedNode->SetState(TExprNode::EState::ConstrComplete); - auto& transformer = calcTransfomer ? *calcTransfomer : (*calcProvider.Get())->GetCallableExecutionTransformer(); - status = SyncTransform(transformer, delegatedNode, ctx); - if (status.Level == IGraphTransformer::TStatus::Error) { - return nullptr; + delegatedNode->SetTypeAnn(atomType); + delegatedNode->SetState(TExprNode::EState::ConstrComplete); + auto& transformer = calcTransfomer ? *calcTransfomer : (*calcProvider.Get())->GetCallableExecutionTransformer(); + status = SyncTransform(transformer, delegatedNode, ctx); + if (status.Level == IGraphTransformer::TStatus::Error) { + return nullptr; + } + + yson = TString{delegatedNode->GetResult().Content()}; + ysonNode = NYT::NodeFromYsonString(yson); } - TString yson{delegatedNode->GetResult().Content()}; - ysonNode = NYT::NodeFromYsonString(yson); if (ysonNode.HasKey("FallbackProvider")) { nextProvider = ysonNode["FallbackProvider"].AsString(); } else if (types.QContext.CanWrite()) { diff --git a/yql/essentials/minikql/arrow/arrow_util.h b/yql/essentials/minikql/arrow/arrow_util.h index efb071ff71c..083ca371419 100644 --- a/yql/essentials/minikql/arrow/arrow_util.h +++ b/yql/essentials/minikql/arrow/arrow_util.h @@ -22,6 +22,8 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it using NYql::NUdf::AllocateBitmapWithReserve; using NYql::NUdf::MakeDenseBitmap; +using NYql::NUdf::MakeDenseBitmapCopy; +using NYql::NUdf::MakeDenseFalseBitmap; inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) { return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 827e295db6f..847e4409b6c 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -3,6 +3,7 @@ #include <yql/essentials/minikql/computation/mkql_block_builder.h> #include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/minikql/computation/mkql_block_reader.h> +#include <yql/essentials/minikql/computation/mkql_block_trimmer.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> @@ -329,12 +330,12 @@ public: public: TIterator() = default; - TIterator(TBlockIndex* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -342,7 +343,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -432,7 +433,7 @@ public: private: EIteratorType Type_; - TBlockIndex* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; @@ -451,7 +452,8 @@ public: const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, NUdf::TUnboxedValue stream, - bool any + bool any, + arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) @@ -466,6 +468,7 @@ public: Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); Hashers_.push_back(helper.MakeHasher(blockItemType)); Comparators_.push_back(helper.MakeComparator(blockItemType)); + Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool)); } } @@ -484,7 +487,12 @@ public: for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); - block.push_back(std::move(datum)); + if (datum.is_scalar()) { + block.push_back(datum); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + block.push_back(Trimmers_[i]->Trim(datum.array())); + } } Data_.push_back(std::move(block)); } @@ -565,7 +573,7 @@ public: return; } - auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter)); + auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter)); if (value->IsInplace()) { iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i])); } else { @@ -574,23 +582,20 @@ public: }); } - TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) { + TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const { Y_ENSURE(entry.BlockOffset < Data_.size()); Y_ENSURE(columnIdx < Inputs_.size() - 1); - - auto& datum = Data_[entry.BlockOffset][columnIdx]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset); + return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset); } - void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) { + void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { Y_ENSURE(row.size() == ioMap.size()); for (size_t i = 0; i < row.size(); i++) { row[i] = GetItem(entry, ioMap[i]); } } - bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { auto indexItem = GetItem(entry, KeyColumns_[i]); @@ -607,10 +612,7 @@ private: ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto& datum = block[keyColumn]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - - auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset); + auto item = GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; @@ -623,11 +625,21 @@ private: return keyHash; } + TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const { + const auto& datum = block[columnIdx]; + if (datum.is_scalar()) { + return Readers_[columnIdx]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + return Readers_[columnIdx]->GetItem(*datum.array(), offset); + } + } + TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } - bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { if (chain->IsInplace()) { return IsKeyEquals(chain->GetEntry(), keyItems); } else { @@ -650,6 +662,7 @@ private: TVector<std::unique_ptr<IBlockReader>> Readers_; TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; TVector<NUdf::IBlockItemComparator::TPtr> Comparators_; + TVector<IBlockTrimmer::TPtr> Trimmers_; std::vector<std::vector<arrow::Datum>> Data_; @@ -705,7 +718,8 @@ public: RightItemTypes_, RightKeyColumns_, std::move(RightStream_->GetValue(ctx)), - RightAny + RightAny, + &ctx.ArrowMemoryPool ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 1407d63cdc6..4c3bf3a059b 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount; namespace { +bool HasMemoryForProcessing() { + return !TlsAllocState->IsMemoryYellowZoneEnabled(); +} + struct TMyValueEqual { TMyValueEqual(const TKeyTypes& types) : Types(types) @@ -244,7 +248,7 @@ private: return KeyWidth + StateWidth; } public: - TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false) + TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true) : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) { CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; @@ -282,6 +286,7 @@ public: if (isNew) { GrowStates(); } + IsOutOfMemory = IsOutOfMemory || !HasMemoryForProcessing(); return isNew; } @@ -298,10 +303,6 @@ public: } } - bool CheckIsOutOfMemory() const { - return IsOutOfMemory; - } - template<bool SkipYields> bool ReadMore() { if constexpr (SkipYields) { @@ -320,6 +321,7 @@ public: CurrentPosition = 0; Tongue = CurrentPage->data(); StoredDataSize = 0; + IsOutOfMemory = false; CleanupCurrentContext(); return true; @@ -352,13 +354,13 @@ public: NUdf::TUnboxedValuePod* Tongue = nullptr; NUdf::TUnboxedValuePod* Throat = nullptr; i64 StoredDataSize = 0; + bool IsOutOfMemory = false; NYql::NUdf::TCounter CounterOutputRows_; private: std::optional<TStorageIterator> ExtractIt; const ui32 KeyWidth, StateWidth; const bool AllowOutOfMemory; - bool IsOutOfMemory = false; ui64 CurrentPosition = 0; TRow* CurrentPage = nullptr; TStorage Storage; @@ -484,7 +486,7 @@ public: ETasteResult TasteIt() { if (GetMode() == EOperatingMode::InMemory) { bool isNew = InMemoryProcessingState.TasteIt(); - if (InMemoryProcessingState.CheckIsOutOfMemory()) { + if (InMemoryProcessingState.IsOutOfMemory) { StateWantsToSpill = true; } Throat = InMemoryProcessingState.Throat; @@ -653,8 +655,12 @@ private: static_cast<NUdf::TUnboxedValue&>(processingState.Throat[i - KeyWidth]) = std::move(keyAndState[i]); } - if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed()) { + if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed() || processingState.IsOutOfMemory) { ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber(); + if (processingState.IsOutOfMemory) { + bucketNumToSpill = bucketId; + processingState.IsOutOfMemory = false; + } SplitStateSpillingBucket = bucketNumToSpill; @@ -861,7 +867,7 @@ private: for (auto &b: SpilledBuckets) { b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB); b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB); - b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal); + b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false); } break; } @@ -889,10 +895,6 @@ private: Mode = mode; } - bool HasMemoryForProcessing() const { - return !TlsAllocState->IsMemoryYellowZoneEnabled(); - } - bool IsSwitchToSpillingModeCondition() const { return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached(); } @@ -942,6 +944,7 @@ private: llvm::PointerType* PtrValueType; llvm::IntegerType* StatusType; llvm::IntegerType* StoredType; + llvm::IntegerType* BoolType; protected: using TBase::Context; public: @@ -951,6 +954,7 @@ public: result.emplace_back(PtrValueType); //tongue result.emplace_back(PtrValueType); //throat result.emplace_back(StoredType); //StoredDataSize + result.emplace_back(BoolType); //IsOutOfMemory result.emplace_back(Type::getInt32Ty(Context)); //size result.emplace_back(Type::getInt32Ty(Context)); //size return result; @@ -972,12 +976,17 @@ public: return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3); } + llvm::Constant* GetIsOutOfMemory() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 4); + } + TLLVMFieldsStructureState(llvm::LLVMContext& context) : TBase(context) , ValueType(Type::getInt128Ty(Context)) , PtrValueType(PointerType::getUnqual(ValueType)) , StatusType(Type::getInt32Ty(Context)) - , StoredType(Type::getInt64Ty(Context)) { + , StoredType(Type::getInt64Ty(Context)) + , BoolType(Type::getInt1Ty(Context)) { } }; @@ -1048,7 +1057,7 @@ public: Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); - } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize)); + } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory); ptr->PushStat(ctx.Stats); } @@ -1328,7 +1337,13 @@ public: } const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block); - BranchInst::Create(done, loop, check, block); + + const auto isOutOfMemoryPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIsOutOfMemory() }, "is_out_of_memory_ptr", block); + const auto isOutOfMemory = new LoadInst(Type::getInt1Ty(context), isOutOfMemoryPtr, "is_out_of_memory", block); + const auto checkIsOutOfMemory = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, isOutOfMemory, ConstantInt::getTrue(context), "check_is_out_of_memory", block); + + const auto any = BinaryOperator::CreateOr(check, checkIsOutOfMemory, "any", block); + BranchInst::Create(done, loop, any, block); block = done; diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index 214b7ae8ff8..ca710d0328f 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -96,19 +96,19 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny, - EJoinKind joinKind, size_t blockSize + EJoinKind joinKind, size_t blockSize, bool scalar ) { TProgramBuilder& pb = *setup.PgmBuilder; Y_ENSURE(leftType->IsList(), "Left node has to be list"); const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType(); Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple"); - TType* leftBlockType = MakeBlockTupleType(pb, leftItemType); + TType* leftBlockType = MakeBlockTupleType(pb, leftItemType, scalar); Y_ENSURE(rightType->IsList(), "Right node has to be list"); const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType(); Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple"); - TType* rightBlockType = MakeBlockTupleType(pb, rightItemType); + TType* rightBlockType = MakeBlockTupleType(pb, rightItemType, scalar); TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType)); TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType)); @@ -122,8 +122,18 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()}); auto& ctx = graph->GetContext(); - graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue))); - graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue))); + + NUdf::TUnboxedValuePod leftBlockListValue, rightBlockListValue; + if (scalar) { + leftBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)); + rightBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)); + } else { + leftBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)); + rightBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)); + } + + graph->GetEntryPoint(0, true)->SetValue(ctx, leftBlockListValue); + graph->GetEntryPoint(1, true)->SetValue(ctx, rightBlockListValue); return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue()); } @@ -131,14 +141,15 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, TType* expectedType, const NUdf::TUnboxedValue& expected, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, - const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false + const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, + bool rightAny = false, bool scalar = false ) { const size_t testSize = leftListValue.GetListLength(); for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) { const auto got = DoTestBlockJoin(setup, leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops, rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny, - joinKind, blockSize + joinKind, blockSize, scalar ); CompareResults(expectedType, expected, got); } @@ -685,6 +696,52 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) { ); } + Y_UNIT_TEST(TestScalar) { + TSetup<false> setup(GetNodeFactory()); + const size_t testSize = 1 << 7; + + // 1. Make input for the "left" stream. + TVector<ui64> leftKeyInit(testSize, 1); + TVector<ui64> leftSubkeyInit(testSize, 2); + TVector<ui64> leftValueInit(testSize, 3); + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(testSize, 1); + TVector<ui64> rightValueInit(testSize, 2); + + // 3. Make "expected" data. + TMultiMap<ui64, ui64> rightMap; + for (size_t i = 0; i < testSize; i++) { + rightMap.insert({rightKeyInit[i], rightValueInit[i]}); + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<ui64> expectedValue; + TVector<ui64> expectedRightValue; + for (size_t i = 0; i < testSize; i++) { + const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]); + for (auto it = begin; it != end; it++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(it->second); + } + } + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0}, false, true + ); + } + } // Y_UNIT_TEST_SUITE Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp index 2a65da92edd..fdd65bbba2b 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp @@ -137,7 +137,7 @@ IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputat } -TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) { +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar) { const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements(); const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id); const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); @@ -145,7 +145,7 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) { TVector<TType*> blockItemTypes; std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes), [&](const auto& itemType) { - return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many); + return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many); }); // XXX: Mind the last block length column. blockItemTypes.push_back(blockLenType); @@ -204,6 +204,37 @@ NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, return holderFactory.CreateDirectListHolder(std::move(listValues)); } +NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values +) { + // Creates a block of scalar values using the first element of the given list + + for (auto type : types) { + // Because IScalarBuilder has no implementations + Y_ENSURE(AS_TYPE(TDataType, type)->GetDataSlot() == NYql::NUdf::EDataSlot::Uint64); + } + + const auto& holderFactory = ctx.HolderFactory; + const size_t width = types.size(); + const size_t rowsCount = values.GetListLength(); + + NUdf::TUnboxedValue row; + Y_ENSURE(values.GetListIterator().Next(row)); + TDefaultListRepresentation listValues; + for (size_t rowOffset = 0; rowOffset < rowsCount; rowOffset += blockSize) { + NUdf::TUnboxedValue* items = nullptr; + const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items); + for (size_t i = 0; i < width; i++) { + const NUdf::TUnboxedValuePod& item = row.GetElement(i); + items[i] = holderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(item.Get<ui64>()))); + } + items[width] = MakeBlockCount(holderFactory, std::min(blockSize, rowsCount - rowOffset)); + listValues = listValues.Append(std::move(tuple)); + } + + return holderFactory.CreateDirectListHolder(std::move(listValues)); +} + NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values ) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h index 2c2f32b3125..ecff426c92e 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h @@ -9,10 +9,12 @@ inline bool IsOptionalOrNull(const TType* type) { return type->IsOptional() || type->IsNull() || type->IsPg(); } -TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType); +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar); NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); +NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp new file mode 100644 index 00000000000..02baba2c23d --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp @@ -0,0 +1,252 @@ +#include "mkql_block_trimmer.h" + +#include <yql/essentials/minikql/arrow/arrow_util.h> +#include <yql/essentials/public/decimal/yql_decimal.h> +#include <yql/essentials/public/udf/arrow/block_reader.h> +#include <yql/essentials/public/udf/arrow/defs.h> +#include <yql/essentials/public/udf/arrow/util.h> +#include <yql/essentials/public/udf/udf_type_inspection.h> +#include <yql/essentials/public/udf/udf_value.h> +#include <yql/essentials/public/udf/udf_value_builder.h> +#include <yql/essentials/utils/yql_panic.h> + +#include <arrow/array/data.h> +#include <arrow/datum.h> + +namespace NKikimr::NMiniKQL { + +class TBlockTrimmerBase : public IBlockTrimmer { +protected: + TBlockTrimmerBase(arrow::MemoryPool* pool) + : Pool_(pool) + {} + + TBlockTrimmerBase() = delete; + + std::shared_ptr<arrow::Buffer> TrimNullBitmap(const std::shared_ptr<arrow::ArrayData>& array) { + auto& nullBitmapBuffer = array->buffers[0]; + + std::shared_ptr<arrow::Buffer> result; + auto nullCount = array->GetNullCount(); + if (nullCount == array->length) { + result = MakeDenseFalseBitmap(array->length, Pool_); + } else if (nullCount > 0) { + result = MakeDenseBitmapCopy(nullBitmapBuffer->data(), array->length, array->offset, Pool_); + } + + return result; + } + +protected: + arrow::MemoryPool* Pool_; +}; + +template<typename TLayout, bool Nullable> +class TFixedSizeBlockTrimmer : public TBlockTrimmerBase { +public: + TFixedSizeBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 2); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origData = array->GetValues<TLayout>(1); + auto dataSize = sizeof(TLayout) * array->length; + + auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_); + memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount()); + } +}; + +template<bool Nullable> +class TResourceBlockTrimmer : public TBlockTrimmerBase { +public: + TResourceBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 2); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origData = array->GetValues<NUdf::TUnboxedValue>(1); + auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length; + + auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_); + ARROW_OK(trimmedBuffer->Resize(dataSize)); + auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data()); + + for (int64_t i = 0; i < array->length; i++) { + ::new(&trimmedBufferData[i]) NUdf::TUnboxedValue(origData[i]); + } + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedBuffer)}, array->GetNullCount()); + } +}; + +template<typename TStringType, bool Nullable> +class TStringBlockTrimmer : public TBlockTrimmerBase { + using TOffset = typename TStringType::offset_type; + +public: + TStringBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 3); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origOffsetData = array->GetValues<TOffset>(1); + auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]); + auto stringDataSize = origOffsetData[array->length] - origOffsetData[0]; + + auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_); + auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_); + + auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data()); + auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data()); + + for (int64_t i = 0; i < array->length + 1; i++) { + trimmedOffsetBufferData[i] = origOffsetData[i] - origOffsetData[0]; + } + memcpy(trimmedStringBufferData, origStringData, stringDataSize); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedOffsetBuffer), std::move(trimmedStringBuffer)}, array->GetNullCount()); + } +}; + +template<bool Nullable> +class TTupleBlockTrimmer : public TBlockTrimmerBase { +public: + TTupleBlockTrimmer(std::vector<IBlockTrimmer::TPtr> children, arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + , Children_(std::move(children)) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 1); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + std::vector<std::shared_ptr<arrow::ArrayData>> trimmedChildren; + Y_ENSURE(array->child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); i++) { + trimmedChildren.push_back(Children_[i]->Trim(array->child_data[i])); + } + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, std::move(trimmedChildren), array->GetNullCount()); + } + +protected: + TTupleBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + +protected: + std::vector<IBlockTrimmer::TPtr> Children_; +}; + +template<typename TDate, bool Nullable> +class TTzDateBlockTrimmer : public TTupleBlockTrimmer<Nullable> { + using TBase = TTupleBlockTrimmer<Nullable>; + using TDateLayout = typename NUdf::TDataType<TDate>::TLayout; + +public: + TTzDateBlockTrimmer(arrow::MemoryPool* pool) + : TBase(pool) + { + this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<TDateLayout, false>>(pool)); + this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<ui16, false>>(pool)); + } +}; + +class TExternalOptionalBlockTrimmer : public TBlockTrimmerBase { +public: + TExternalOptionalBlockTrimmer(IBlockTrimmer::TPtr inner, arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + , Inner_(std::move(inner)) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 1); + Y_ENSURE(array->child_data.size() == 1); + + auto trimmedNullBitmap = TrimNullBitmap(array); + auto trimmedInner = Inner_->Trim(array->child_data[0]); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, {std::move(trimmedInner)}, array->GetNullCount()); + } + +private: + IBlockTrimmer::TPtr Inner_; +}; + +struct TTrimmerTraits { + using TResult = IBlockTrimmer; + template <bool Nullable> + using TTuple = TTupleBlockTrimmer<Nullable>; + template <typename T, bool Nullable> + using TFixedSize = TFixedSizeBlockTrimmer<T, Nullable>; + template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot> + using TStrings = TStringBlockTrimmer<TStringType, Nullable>; + using TExtOptional = TExternalOptionalBlockTrimmer; + template<bool Nullable> + using TResource = TResourceBlockTrimmer<Nullable>; + template<typename TTzDate, bool Nullable> + using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>; + + static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) { + Y_UNUSED(pgBuilder); + if (desc.PassByValue) { + return std::make_unique<TFixedSize<ui64, true>>(pool); + } else { + return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>(pool); + } + } + + static TResult::TPtr MakeResource(bool isOptional, arrow::MemoryPool* pool) { + if (isOptional) { + return std::make_unique<TResource<true>>(pool); + } else { + return std::make_unique<TResource<false>>(pool); + } + } + + template<typename TTzDate> + static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) { + if (isOptional) { + return std::make_unique<TTzDateReader<TTzDate, true>>(pool); + } else { + return std::make_unique<TTzDateReader<TTzDate, false>>(pool); + } + } +}; + +IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) { + return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool); +} + +} diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.h b/yql/essentials/minikql/computation/mkql_block_trimmer.h new file mode 100644 index 00000000000..0ec46ea83c6 --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/noncopyable.h> +#include <yql/essentials/public/udf/udf_types.h> + +#include <arrow/type.h> + +namespace NKikimr::NMiniKQL { + +class IBlockTrimmer : private TNonCopyable { +public: + using TPtr = std::unique_ptr<IBlockTrimmer>; + + virtual ~IBlockTrimmer() = default; + + virtual std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) = 0; +}; + +IBlockTrimmer::TPtr MakeBlockTrimmer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, arrow::MemoryPool* pool); + +} diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp new file mode 100644 index 00000000000..4ebb74f3eca --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp @@ -0,0 +1,373 @@ +#include "mkql_block_trimmer.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <yql/essentials/public/udf/arrow/block_builder.h> +#include <yql/essentials/public/udf/arrow/memory_pool.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> + +using namespace NYql::NUdf; +using namespace NKikimr; + +struct TBlockTrimmerTestData { + TBlockTrimmerTestData() + : FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry())) + , Alloc(__LOCATION__) + , Env(Alloc) + , PgmBuilder(Env, *FunctionRegistry) + , MemInfo("Memory") + , ArrowPool(GetYqlMemoryPool()) + { + } + + TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry; + NMiniKQL::TScopedAlloc Alloc; + NMiniKQL::TTypeEnvironment Env; + NMiniKQL::TProgramBuilder PgmBuilder; + NMiniKQL::TMemoryUsageInfo MemInfo; + arrow::MemoryPool* const ArrowPool; +}; + +Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { + Y_UNIT_TEST(TestFixedSize) { + TBlockTrimmerTestData data; + + const auto int64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(int64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), int64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), int64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), int64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + builder->Add(TBlockItem(i)); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + + Y_UNIT_TEST(TestString) { + TBlockTrimmerTestData data; + + const auto stringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(stringType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + // To fit all strings into single block + constexpr auto testSize = 512; + constexpr auto sliceSize = 128; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), stringType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), stringType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), stringType, data.ArrowPool); + + std::string testString; + testString.resize(testSize); + for (size_t i = 0; i < testSize; i++) { + testString[i] = static_cast<char>(i); + if (i % 2) { + builder->Add(TBlockItem(TStringRef(testString.data(), i + 1))); + } else { + // Empty string + builder->Add(TBlockItem(TStringRef())); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.AsStringRef(), rhs.AsStringRef(), "Expected the same data after trim"); + } + } + } + + Y_UNIT_TEST(TestOptional) { + TBlockTrimmerTestData data; + + const auto optionalInt64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(optionalInt64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), optionalInt64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + if (i % 2) { + builder->Add(TBlockItem()); + } else { + builder->Add(TBlockItem(i)); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim"); + + if (lhs) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + } + + Y_UNIT_TEST(TestExternalOptional) { + TBlockTrimmerTestData data; + + const auto doubleOptInt64Type = data.PgmBuilder.NewOptionalType(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true)); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(doubleOptInt64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + if (i % 2) { + builder->Add(TBlockItem(i).MakeOptional()); + } else if (i % 4) { + builder->Add(TBlockItem()); + } else { + builder->Add(TBlockItem().MakeOptional()); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + for (size_t i = 0; i < 2; i++) { + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim"); + if (!lhs) { + break; + } + + lhs = lhs.GetOptionalValue(); + rhs = rhs.GetOptionalValue(); + } + + if (lhs) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + } + + Y_UNIT_TEST(TestTuple) { + TBlockTrimmerTestData data; + + std::vector<NMiniKQL::TType*> types; + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64)); + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::String)); + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true)); + const auto tupleType = data.PgmBuilder.NewTupleType(types); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tupleType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + // To fit all strings into single block + constexpr auto testSize = 512; + constexpr auto sliceSize = 128; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tupleType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tupleType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tupleType, data.ArrowPool); + + std::string testString; + testString.resize(testSize); + std::vector<TBlockItem*> testTuples(testSize); + for (size_t i = 0; i < testSize; i++) { + testString[i] = static_cast<char>(i); + + TBlockItem* tupleItems = new TBlockItem[3]; + testTuples.push_back(tupleItems); + tupleItems[0] = TBlockItem(i); + tupleItems[1] = TBlockItem(TStringRef(testString.data(), i + 1)); + tupleItems[2] = i % 2 ? TBlockItem(i) : TBlockItem(); + + builder->Add(TBlockItem(tupleItems)); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(0).Get<i64>(), rhs.GetElement(0).Get<i64>(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(1).AsStringRef(), rhs.GetElement(1).AsStringRef(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs.GetElement(2)), bool(rhs.GetElement(2)), "Expected the same optionality after trim"); + if (bool(lhs.GetElement(2))) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(2).Get<i64>(), rhs.GetElement(2).Get<i64>(), "Expected the same data after trim"); + } + } + } + + for (auto tupleItems : testTuples) { + delete[] tupleItems; + } + } + + Y_UNIT_TEST(TestTzDate) { + TBlockTrimmerTestData data; + using TDtLayout = TDataType<TTzDatetime>::TLayout; + + const auto tzDatetimeType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::TzDatetime, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tzDatetimeType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / (sizeof(TDtLayout) + sizeof(ui16)); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tzDatetimeType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + TBlockItem dt = TBlockItem(i); + dt.SetTimezoneId(i * 2); + builder->Add(dt); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<TDtLayout>(), rhs.Get<TDtLayout>(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetTimezoneId(), rhs.GetTimezoneId(), "Expected the same data after trim"); + } + } + } + + extern const char ResourceName[] = "Resource.Name"; + Y_UNIT_TEST(TestResource) { + TBlockTrimmerTestData data; + + const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(resourceType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(TUnboxedValue); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), resourceType, data.ArrowPool); + + struct TWithDtor { + int Payload; + std::shared_ptr<int> DestructorCallsCnt; + TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt): + Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) { + } + ~TWithDtor() { + *DestructorCallsCnt = *DestructorCallsCnt + 1; + } + }; + using TTestResource = TBoxedResource<TWithDtor, ResourceName>; + + auto destructorCallsCnt = std::make_shared<int>(0); + { + for (size_t i = 0; i < testSize; i++) { + builder->Add(TUnboxedValuePod(new TTestResource(i, destructorCallsCnt))); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + auto lhsResource = reinterpret_cast<TTestResource*>(lhs.GetBoxed().Get()); + auto rhsResource = reinterpret_cast<TTestResource*>(rhs.GetBoxed().Get()); + UNIT_ASSERT_VALUES_EQUAL_C(lhsResource->Get()->Payload, rhsResource->Get()->Payload, "Expected the same data after trim"); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL_C(*destructorCallsCnt, testSize, "Expected 1 call to resource destructor"); + } +} diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc index 4de4b13fc50..1f7ae7d6dc2 100644 --- a/yql/essentials/minikql/computation/ut/ya.make.inc +++ b/yql/essentials/minikql/computation/ut/ya.make.inc @@ -12,6 +12,7 @@ ENDIF() SRCDIR(yql/essentials/minikql/computation) SRCS( + mkql_block_trimmer_ut.cpp mkql_computation_node_holders_ut.cpp mkql_computation_node_pack_ut.cpp mkql_computation_node_list_ut.cpp diff --git a/yql/essentials/minikql/computation/ya.make b/yql/essentials/minikql/computation/ya.make index c5a44908171..43745671357 100644 --- a/yql/essentials/minikql/computation/ya.make +++ b/yql/essentials/minikql/computation/ya.make @@ -5,6 +5,7 @@ SRCS( mkql_block_impl.cpp mkql_block_reader.cpp mkql_block_transport.cpp + mkql_block_trimmer.cpp mkql_computation_node.cpp mkql_computation_node_holders.cpp mkql_computation_node_impl.cpp @@ -22,6 +23,7 @@ PEERDIR( yql/essentials/public/types yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf + yql/essentials/public/udf/arrow yql/essentials/minikql/arrow ) diff --git a/yql/essentials/minikql/computation/ya.make.inc b/yql/essentials/minikql/computation/ya.make.inc index 7a663f1a469..14f51bfceed 100644 --- a/yql/essentials/minikql/computation/ya.make.inc +++ b/yql/essentials/minikql/computation/ya.make.inc @@ -23,6 +23,7 @@ PEERDIR( yql/essentials/minikql/computation yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf + yql/essentials/public/udf/arrow yql/essentials/utils library/cpp/threading/future ) diff --git a/yql/essentials/public/udf/arrow/bit_util.h b/yql/essentials/public/udf/arrow/bit_util.h index c2c891b2690..4dbe785aa75 100644 --- a/yql/essentials/public/udf/arrow/bit_util.h +++ b/yql/essentials/public/udf/arrow/bit_util.h @@ -94,5 +94,30 @@ inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t co return dst; } +inline void CopyDenseBitmap(ui8* dst, const ui8* src, size_t srcOffset, size_t len) { + if ((srcOffset & 7) != 0) { + size_t offsetBytes = srcOffset >> 3; + src += offsetBytes; + + ui8 offsetTail = srcOffset & 7; + ui8 offsetHead = 8 - offsetTail; + + ui8 remainder = *src++ >> offsetTail; + size_t dstOffset = offsetHead; + for (; dstOffset < len; dstOffset += 8) { + *dst++ = remainder | (*src << offsetHead); + remainder = *src >> offsetTail; + src++; + } + // dst is guaranteed to have extra length even if it's not needed + *dst++ = remainder; + } else { + src += srcOffset >> 3; + // Round up to 8 + len = (len + 7u) & ~size_t(7u); + memcpy(dst, src, len >> 3); + } +} + } } diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h index d7863329ee4..77a74b155a3 100644 --- a/yql/essentials/public/udf/arrow/block_reader.h +++ b/yql/essentials/public/udf/arrow/block_reader.h @@ -32,7 +32,7 @@ struct TBlockItemSerializeProps { bool IsFixed = true; // true if each block item takes fixed size }; -template<typename T, bool Nullable, typename TDerived> +template<typename T, bool Nullable, typename TDerived> class TFixedSizeBlockReaderBase : public IBlockReader { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { @@ -309,7 +309,7 @@ public: return TBlockItem(Items.data()); } - + size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; @@ -352,7 +352,7 @@ public: Children[i]->SaveItem(*data.child_data[i], index, out); } } - + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); @@ -396,7 +396,7 @@ public: Y_UNUSED(item); return GetChildrenDefaultDataWeight(); } - + size_t GetChildrenDefaultDataWeight() const { ui64 size = 0; if constexpr (Nullable) { @@ -412,7 +412,7 @@ public: DateReader_.SaveItem(*data.child_data[0], index, out); TimezoneReader_.SaveItem(*data.child_data[1], index, out); } - + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { DateReader_.SaveScalarItem(*structScalar.value[0], out); TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); @@ -525,31 +525,30 @@ struct TReaderTraits { } }; -template <typename TTraits> -std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children) { +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children)); + return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), args...); } else { - return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children)); + return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), args...); } } -template <typename TTraits, typename T> -std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional) { +template <typename TTraits, typename T, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TFixedSize<T, true>>(); + return std::make_unique<typename TTraits::template TFixedSize<T, true>>(args...); } else { - return std::make_unique<typename TTraits::template TFixedSize<T, false>>(); + return std::make_unique<typename TTraits::template TFixedSize<T, false>>(args...); } } - -template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal> -std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional) { +template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(); + return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(args...); } else { - return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(); + return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(args...); } } @@ -558,8 +557,8 @@ concept CanInstantiateBlockReaderForDecimal = requires { typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>; }; -template <typename TTraits> -std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder) { +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs... args) { const TType* unpacked = type; TOptionalTypeInspector typeOpt(typeInfoHelper, type); bool isOptional = false; @@ -591,9 +590,9 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe ++nestLevel; } - auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder); + auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder, args...); for (ui32 i = 1; i < nestLevel; ++i) { - reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader)); + reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), args...); } return reader; @@ -606,20 +605,20 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe if (typeStruct) { TVector<std::unique_ptr<typename TTraits::TResult>> members; for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) { - members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder)); + members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, args...)); } // XXX: Use Tuple block reader for Struct. - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members)); + return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members), args...); } TTupleTypeInspector typeTuple(typeInfoHelper, type); if (typeTuple) { TVector<std::unique_ptr<typename TTraits::TResult>> children; for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder)); + children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, args...)); } - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children)); + return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children), args...); } TDataTypeInspector typeData(typeInfoHelper, type); @@ -627,59 +626,59 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe auto typeId = typeData.GetTypeId(); switch (GetDataSlot(typeId)) { case NUdf::EDataSlot::Int8: - return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional, args...); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional, args...); case NUdf::EDataSlot::Int16: - return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional, args...); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional, args...); case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: - return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional, args...); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional, args...); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Interval64: case NUdf::EDataSlot::Datetime64: case NUdf::EDataSlot::Timestamp64: - return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional, args...); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional, args...); case NUdf::EDataSlot::Float: - return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional, args...); case NUdf::EDataSlot::Double: - return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional, args...); case NUdf::EDataSlot::String: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, args...); case NUdf::EDataSlot::Yson: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, args...); case NUdf::EDataSlot::JsonDocument: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, args...); case NUdf::EDataSlot::Utf8: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, args...); case NUdf::EDataSlot::Json: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, args...); case NUdf::EDataSlot::TzDate: - return TTraits::template MakeTzDate<TTzDate>(isOptional); + return TTraits::template MakeTzDate<TTzDate>(isOptional, args...); case NUdf::EDataSlot::TzDatetime: - return TTraits::template MakeTzDate<TTzDatetime>(isOptional); + return TTraits::template MakeTzDate<TTzDatetime>(isOptional, args...); case NUdf::EDataSlot::TzTimestamp: - return TTraits::template MakeTzDate<TTzTimestamp>(isOptional); + return TTraits::template MakeTzDate<TTzTimestamp>(isOptional, args...); case NUdf::EDataSlot::TzDate32: - return TTraits::template MakeTzDate<TTzDate32>(isOptional); + return TTraits::template MakeTzDate<TTzDate32>(isOptional, args...); case NUdf::EDataSlot::TzDatetime64: - return TTraits::template MakeTzDate<TTzDatetime64>(isOptional); + return TTraits::template MakeTzDate<TTzDatetime64>(isOptional, args...); case NUdf::EDataSlot::TzTimestamp64: - return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional); + return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional, args...); case NUdf::EDataSlot::Decimal: { if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) { - return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, args...); } else { Y_ENSURE(false, "Unsupported data slot"); } @@ -692,13 +691,13 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe TResourceTypeInspector resource(typeInfoHelper, type); if (resource) { - return TTraits::MakeResource(isOptional); + return TTraits::MakeResource(isOptional, args...); } TPgTypeInspector typePg(typeInfoHelper, type); if (typePg) { auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); - return TTraits::MakePg(*desc, pgBuilder); + return TTraits::MakePg(*desc, pgBuilder, args...); } Y_ENSURE(false, "Unsupported type"); diff --git a/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp new file mode 100644 index 00000000000..601d3be7c86 --- /dev/null +++ b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp @@ -0,0 +1,41 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <yql/essentials/public/udf/arrow/bit_util.h> +#include <util/random/random.h> + +#include <arrow/util/bit_util.h> + +using namespace NYql::NUdf; + +namespace { + void PerformTest(const ui8* testData, size_t offset, size_t length) { + std::vector<ui8> copied(arrow::BitUtil::BytesForBits(length) + 1, 0); + CopyDenseBitmap(copied.data(), testData, offset, length); + + std::vector<ui8> origSparse(length), copiedSparse(length); + DecompressToSparseBitmap(origSparse.data(), testData, offset, length); + DecompressToSparseBitmap(copiedSparse.data(), copied.data(), 0, length); + for (size_t i = 0; i < length; i++) { + UNIT_ASSERT_EQUAL_C(origSparse[i], copiedSparse[i], "Expected the same data"); + } + } +} + +Y_UNIT_TEST_SUITE(CopyDenseBitmapTest) { + constexpr size_t testSize = 32; + + Y_UNIT_TEST(Test) { + SetRandomSeed(0); + + std::vector<ui8> testData(testSize); + for (size_t i = 0; i < testSize; i++) { + testData[i] = RandomNumber<ui8>(); + } + + for (size_t offset = 0; offset < testSize * 8; offset++) { + for (size_t length = 0; length <= testSize * 8 - offset; length++) { + PerformTest(testData.data(), offset, length); + } + } + } +} diff --git a/yql/essentials/public/udf/arrow/ut/ya.make b/yql/essentials/public/udf/arrow/ut/ya.make index da52d50ded5..ecd94b9c03a 100644 --- a/yql/essentials/public/udf/arrow/ut/ya.make +++ b/yql/essentials/public/udf/arrow/ut/ya.make @@ -2,9 +2,11 @@ UNITTEST() SRCS( array_builder_ut.cpp + bit_util_ut.cpp ) PEERDIR( + contrib/libs/apache/arrow yql/essentials/public/udf/arrow yql/essentials/public/udf/service/exception_policy yql/essentials/sql/pg_dummy diff --git a/yql/essentials/public/udf/arrow/util.cpp b/yql/essentials/public/udf/arrow/util.cpp index 55a728207b2..b36b4f9ce51 100644 --- a/yql/essentials/public/udf/arrow/util.cpp +++ b/yql/essentials/public/udf/arrow/util.cpp @@ -68,6 +68,18 @@ std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_ return bitmap; } +std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + CopyDenseBitmap(bitmap->mutable_data(), src, offset, len); + return bitmap; +} + +std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + std::memset(bitmap->mutable_data(), 0, bitmap->size()); + return bitmap; +} + std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) { Y_ENSURE(data->length >= 0); Y_ENSURE(offset + len <= (size_t)data->length); diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h index b4068f10c91..f7bdb715f98 100644 --- a/yql/essentials/public/udf/arrow/util.h +++ b/yql/essentials/public/udf/arrow/util.h @@ -25,6 +25,9 @@ enum class EPgStringType { std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool); std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); +std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool); + +std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool); /// \brief Recursive version of ArrayData::Slice() method std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len); diff --git a/yql/essentials/sql/v1/builtin.cpp b/yql/essentials/sql/v1/builtin.cpp index b7d29303cd4..0bbcd08b31f 100644 --- a/yql/essentials/sql/v1/builtin.cpp +++ b/yql/essentials/sql/v1/builtin.cpp @@ -3816,9 +3816,6 @@ TNodePtr BuildBuiltinFunc(TContext& ctx, TPosition pos, TString name, const TVec namedArgs = BuildStructure(pos, {encodeUtf8}); usedArgs = {positionalArgs, namedArgs}; } else if (name.StartsWith("From")) { - if (usedArgs) { - usedArgs.resize(1U); - } name = "From"; } else if (name == "GetLength" || name.StartsWith("ConvertTo") || name.StartsWith("Parse") || name.StartsWith("SerializeJson")) { if (usedArgs.size() < 2U) { diff --git a/yql/essentials/udfs/common/yson2/test/canondata/result.json b/yql/essentials/udfs/common/yson2/test/canondata/result.json index 10e1aea65a8..32f8b40ce4d 100644 --- a/yql/essentials/udfs/common/yson2/test/canondata/result.json +++ b/yql/essentials/udfs/common/yson2/test/canondata/result.json @@ -49,6 +49,11 @@ "uri": "file://test.test_Equals_/results.txt" } ], + "test.test[FromTooManyArgs]": [ + { + "uri": "file://test.test_FromTooManyArgs_/extracted" + } + ], "test.test[From]": [ { "uri": "file://test.test_From_/results.txt" diff --git a/yql/essentials/udfs/common/yson2/test/canondata/test.test_FromTooManyArgs_/extracted b/yql/essentials/udfs/common/yson2/test/canondata/test.test_FromTooManyArgs_/extracted new file mode 100644 index 00000000000..e0bdb083f1e --- /dev/null +++ b/yql/essentials/udfs/common/yson2/test/canondata/test.test_FromTooManyArgs_/extracted @@ -0,0 +1,11 @@ +<tmp_path>/program.sql:<main>: Error: Type annotation + + <tmp_path>/program.sql:<main>:2:1: Error: At function: RemovePrefixMembers, At function: Unordered, At function: PersistableRepr, At function: OrderedSqlProject, At function: SqlProjectItem + SELECT Yson::From(1, 2, 3); + ^ + <tmp_path>/program.sql:<main>:2:14: Error: At function: Apply, At function: Udf, At Yson2.From + SELECT Yson::From(1, 2, 3); + ^ + <tmp_path>/program.sql:<main>:2:14: Error: Failed to find UDF function: Yson2.From, reason: Error: Module: Yson2, function: From, error: Expected single argument. + SELECT Yson::From(1, 2, 3); + ^
\ No newline at end of file diff --git a/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.cfg b/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.cfg new file mode 100644 index 00000000000..eb2e5315d1e --- /dev/null +++ b/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.cfg @@ -0,0 +1 @@ +xfail
\ No newline at end of file diff --git a/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.sql b/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.sql new file mode 100644 index 00000000000..214ba63b835 --- /dev/null +++ b/yql/essentials/udfs/common/yson2/test/cases/FromTooManyArgs.sql @@ -0,0 +1 @@ +SELECT Yson::From(1, 2, 3);
\ No newline at end of file diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 7d73756759d..9443ea51fce 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1233,7 +1233,7 @@ TAuthorizationInfo TClient::WhoAmI() return RequestWithRetry<TAuthorizationInfo>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), [this] (TMutationId /*mutationId*/) { - return RawClient_->WhoAmI(); + return NRawClient::WhoAmI(Context_); }); } @@ -1360,7 +1360,7 @@ TNode::TListType TClient::SkyShareTable( response = RequestWithRetry<NHttpClient::IHttpResponsePtr>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), [this, &tablePaths, &options] (TMutationId /*mutationId*/) { - return RawClient_->SkyShareTable(tablePaths, options); + return NRawClient::SkyShareTable(Context_, tablePaths, options); }); TWaitProxy::Get()->Sleep(TDuration::Seconds(5)); } while (response->GetStatusCode() != 200); diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index d2e25512408..8642fb58256 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -204,12 +204,6 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) = 0; - // SkyShare - - virtual NHttpClient::IHttpResponsePtr SkyShareTable( - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options = {}) = 0; - // Files virtual std::unique_ptr<IInputStream> ReadFile( const TTransactionId& transactionId, @@ -340,8 +334,6 @@ public: const TGetTablePartitionsOptions& options = {}) = 0; virtual ui64 GenerateTimestamp() = 0; - - virtual TAuthorizationInfo WhoAmI() = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 22d285e018b..53d2be114af 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -556,32 +556,6 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( return result; } -NHttpClient::IHttpResponsePtr THttpRawClient::SkyShareTable( - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options) -{ - TMutationId mutationId; - THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); - - auto proxyName = Context_.ServerName.substr(0, Context_.ServerName.find('.')); - - auto host = Context_.Config->SkynetApiHost; - if (host == "") { - host = "skynet." + proxyName + ".yt.yandex.net"; - } - - TSkyShareTableOptions patchedOptions = options; - - if (Context_.Config->Pool && !patchedOptions.Pool_) { - patchedOptions.Pool(Context_.Config->Pool); - } - - header.MergeParameters(NRawClient::SerializeParamsForSkyShareTable(proxyName, Context_.Config->Prefix, tablePaths, patchedOptions)); - TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()}); - - return RequestWithoutRetry(skyApiHost, mutationId, header, ""); -} - std::unique_ptr<IInputStream> THttpRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, @@ -951,21 +925,6 @@ ui64 THttpRawClient::GenerateTimestamp() return NodeFromYsonString(responseInfo->GetResponse()).AsUint64(); } -TAuthorizationInfo THttpRawClient::WhoAmI() -{ - TMutationId mutationId; - THttpHeader header("GET", "auth/whoami", /*isApi*/ false); - auto requestResult = RequestWithoutRetry(Context_, mutationId, header); - TAuthorizationInfo result; - - NJson::TJsonValue jsonValue; - bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true); - Y_ABORT_UNLESS(ok); - result.Login = jsonValue["login"].GetString(); - result.Realm = jsonValue["realm"].GetString(); - return result; -} - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index b581681bd84..e18e32a92ef 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -203,12 +203,6 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) override; - // SkyShare - - NHttpClient::IHttpResponsePtr SkyShareTable( - const std::vector<TYPath>& tablePaths, - const TSkyShareTableOptions& options = {}) override; - // Files std::unique_ptr<IInputStream> ReadFile( @@ -341,8 +335,6 @@ public: ui64 GenerateTimestamp() override; - TAuthorizationInfo WhoAmI() override; - private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index a3f01da6fc9..04b5ef5dc0e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -329,6 +329,48 @@ TVector<TRichYPath> CanonizeYPaths( return result; } +NHttpClient::IHttpResponsePtr SkyShareTable( + const TClientContext& context, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "api/v1/share", /*IsApi*/ false); + + auto proxyName = context.ServerName.substr(0, context.ServerName.find('.')); + + auto host = context.Config->SkynetApiHost; + if (host == "") { + host = "skynet." + proxyName + ".yt.yandex.net"; + } + + TSkyShareTableOptions patchedOptions = options; + + if (context.Config->Pool && !patchedOptions.Pool_) { + patchedOptions.Pool(context.Config->Pool); + } + + header.MergeParameters(SerializeParamsForSkyShareTable(proxyName, context.Config->Prefix, tablePaths, patchedOptions)); + TClientContext skyApiHost({.ServerName = host, .HttpClient = NHttpClient::CreateDefaultHttpClient()}); + + return RequestWithoutRetry(skyApiHost, mutationId, header, ""); +} + +TAuthorizationInfo WhoAmI(const TClientContext& context) +{ + TMutationId mutationId; + THttpHeader header("GET", "auth/whoami", /*isApi*/ false); + auto requestResult = RequestWithoutRetry(context, mutationId, header); + TAuthorizationInfo result; + + NJson::TJsonValue jsonValue; + bool ok = NJson::ReadJsonTree(requestResult->GetResponse(), &jsonValue, /*throwOnError*/ true); + Y_ABORT_UNLESS(ok); + result.Login = jsonValue["login"].GetString(); + result.Realm = jsonValue["realm"].GetString(); + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail::NRawClient diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index bcc9a4bfd7f..f46ae0e3b96 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -3,7 +3,10 @@ #include "raw_batch_request.h" #include <yt/cpp/mapreduce/common/fwd.h> + #include <yt/cpp/mapreduce/http/context.h> + +#include <yt/cpp/mapreduce/interface/client.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> #include <yt/cpp/mapreduce/interface/operation.h> @@ -36,8 +39,6 @@ void ExecuteBatch( TRawBatchRequest& batchRequest, const TExecuteBatchOptions& options = {}); -// Misc - TRichYPath CanonizeYPath( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, @@ -48,6 +49,13 @@ TVector<TRichYPath> CanonizeYPaths( const TClientContext& context, const TVector<TRichYPath>& paths); +NHttpClient::IHttpResponsePtr SkyShareTable( + const TClientContext& context, + const std::vector<TYPath>& tablePaths, + const TSkyShareTableOptions& options); + +TAuthorizationInfo WhoAmI(const TClientContext& context); + //////////////////////////////////////////////////////////////////////////////// template<typename TSrc, typename TBatchAdder> diff --git a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp index c0f00497e2e..f25656b6978 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp @@ -427,7 +427,7 @@ private: const TStructExprType* rightItemTypeBeforePremap = nullptr; { - if (leftLeaf && !labels.Inputs.empty()) { + if (leftLeaf) { TYtSection section{leftLeaf->Section}; if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) { return; @@ -438,10 +438,12 @@ private: YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status; return; } - leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op->LeftLabel); + if (!labels.Inputs.empty()) { + leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op->LeftLabel); + } ++numLeaves; } - if (rightLeaf && labels.Inputs.size() > 1) { + if (rightLeaf) { TYtSection section{rightLeaf->Section}; if (Y_UNLIKELY(!section.Settings().Empty() && section.Settings().Item(0).Name() == "Test")) { return; @@ -451,7 +453,9 @@ private: YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status; return; } - rightJoinKeys = BuildJoinKeys(labels.Inputs[1], *op->RightLabel); + if (labels.Inputs.size() > 1) { + rightJoinKeys = BuildJoinKeys(labels.Inputs[1], *op->RightLabel); + } ++numLeaves; } } diff --git a/yt/yql/tests/sql/suites/aggregate/aggregate_with_const_yson_options.sql b/yt/yql/tests/sql/suites/aggregate/aggregate_with_const_yson_options.sql index c940ba32cd1..ef6a64cf428 100644 --- a/yt/yql/tests/sql/suites/aggregate/aggregate_with_const_yson_options.sql +++ b/yt/yql/tests/sql/suites/aggregate/aggregate_with_const_yson_options.sql @@ -2,7 +2,7 @@ USE plato; SELECT key, - Yson::SerializeJson(Yson::From(AGGREGATE_LIST(value), Yson::Options(true AS Strict))) as value + Yson::SerializeJson(Yson::From(AGGREGATE_LIST(value))) as value FROM Input GROUP BY key ORDER BY key diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 5baa8460ee1..d5703d97392 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -643,6 +643,7 @@ void ToProto( protoStatistics->set_incomplete_input(statistics.IncompleteInput); protoStatistics->set_incomplete_output(statistics.IncompleteOutput); protoStatistics->set_memory_usage(statistics.MemoryUsage); + protoStatistics->set_total_grouped_row_count(statistics.TotalGroupedRowCount); ToProto(protoStatistics->mutable_inner_statistics(), statistics.InnerStatistics); } @@ -664,6 +665,7 @@ void FromProto( statistics->IncompleteInput = protoStatistics.incomplete_input(); statistics->IncompleteOutput = protoStatistics.incomplete_output(); statistics->MemoryUsage = protoStatistics.memory_usage(); + statistics->TotalGroupedRowCount = protoStatistics.total_grouped_row_count(); FromProto(&statistics->InnerStatistics, protoStatistics.inner_statistics()); } diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp index ce4f0e63838..e65b8b4f861 100644 --- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp +++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp @@ -44,21 +44,50 @@ void TInstanceResources::Register(TRegistrar registrar) registrar.Parameter("memory", &TThis::Memory) .GreaterThanOrEqual(0) .Default(120_GB); - registrar.Parameter("net", &TThis::Net) + registrar.Parameter("net", &TThis::NetBits) + .Optional(); + registrar.Parameter("net_bytes", &TThis::NetBytes) .Optional(); registrar.Parameter("type", &TThis::Type) .Default(); + + registrar.Postprocessor([] (TThis* config) { + if (config->NetBits.has_value() && config->NetBytes.has_value() && *config->NetBits != *config->NetBytes * 8) { + THROW_ERROR_EXCEPTION("Net parameters are not equal") + << TErrorAttribute("net_bytes", config->NetBytes) + << TErrorAttribute("net_bits", config->NetBits); + } + + config->CanonizeNet(); + }); } void TInstanceResources::Clear() { Vcpu = 0; Memory = 0; + ResetNet(); +} + +void TInstanceResources::CanonizeNet() +{ + // COMPAT(grachevkirill) + if (NetBytes.has_value()) { + NetBits = *NetBytes * 8; + } else if (NetBits.has_value()) { + NetBytes = *NetBits / 8; + } +} + +void TInstanceResources::ResetNet() +{ + NetBits.reset(); + NetBytes.reset(); } bool TInstanceResources::operator==(const TInstanceResources& other) const { - return std::tie(Vcpu, Memory, Net) == std::tie(other.Vcpu, other.Memory, other.Net); + return std::tie(Vcpu, Memory, NetBytes) == std::tie(other.Vcpu, other.Memory, other.NetBytes); } void TDefaultInstanceConfig::Register(TRegistrar registrar) @@ -114,9 +143,27 @@ void TBundleResourceQuota::Register(TRegistrar registrar) .GreaterThanOrEqual(0) .Default(0); - registrar.Parameter("network", &TThis::Network) + registrar.Parameter("network", &TThis::NetworkBits) .GreaterThanOrEqual(0) .Default(0); + registrar.Parameter("network_bytes", &TThis::NetworkBytes) + .GreaterThanOrEqual(0) + .Default(0); + + registrar.Postprocessor([] (TThis* config) { + if (config->NetworkBits != 0 && config->NetworkBytes != 0 && config->NetworkBits != config->NetworkBytes * 8) { + THROW_ERROR_EXCEPTION("Network parameters are not equal") + << TErrorAttribute("network_bytes", config->NetworkBytes) + << TErrorAttribute("network_bits", config->NetworkBits); + } + + // COMPAT(grachevkirill) + if (config->NetworkBytes != 0) { + config->NetworkBits = config->NetworkBytes * 8; + } else if (config->NetworkBits != 0) { + config->NetworkBytes = config->NetworkBits / 8; + } + }); } //////////////////////////////////////////////////////////////////////////////// @@ -188,7 +235,9 @@ void ToProto(NBundleController::NProto::TInstanceResources* protoInstanceResourc { if (instanceResources == nullptr) return; protoInstanceResources->set_memory(instanceResources->Memory); - YT_TOPROTO_OPTIONAL_PTR(protoInstanceResources, net, instanceResources, Net); + // COMPAT(grachevkirill) + YT_VERIFY(instanceResources->NetBytes.has_value() || !instanceResources->NetBits.has_value()); + YT_TOPROTO_OPTIONAL_PTR(protoInstanceResources, net_bytes, instanceResources, NetBytes); protoInstanceResources->set_type(instanceResources->Type); protoInstanceResources->set_vcpu(instanceResources->Vcpu); } @@ -196,9 +245,11 @@ void ToProto(NBundleController::NProto::TInstanceResources* protoInstanceResourc void FromProto(NBundleControllerClient::TInstanceResourcesPtr instanceResources, const NBundleController::NProto::TInstanceResources* protoInstanceResources) { YT_FROMPROTO_OPTIONAL_PTR(protoInstanceResources, memory, instanceResources, Memory); - YT_FROMPROTO_OPTIONAL_PTR(protoInstanceResources, net, instanceResources, Net); + YT_FROMPROTO_OPTIONAL_PTR(protoInstanceResources, net_bytes, instanceResources, NetBytes); YT_FROMPROTO_OPTIONAL_PTR(protoInstanceResources, type, instanceResources, Type); YT_FROMPROTO_OPTIONAL_PTR(protoInstanceResources, vcpu, instanceResources, Vcpu); + // COMPAT(grachevkirill): Remove later. + instanceResources->CanonizeNet(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h index b05802c679c..7473cdc7d5a 100644 --- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h +++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h @@ -57,7 +57,10 @@ struct TInstanceResources { i64 Memory; // Bits per second. - std::optional<i64> Net; + // TODO(grachevkirill): Remove this field. + std::optional<i64> NetBits; + // Bytes per second. + std::optional<i64> NetBytes; TString Type; int Vcpu; @@ -66,6 +69,9 @@ struct TInstanceResources void Clear(); + void CanonizeNet(); + void ResetNet(); + REGISTER_YSON_STRUCT(TInstanceResources); static void Register(TRegistrar registrar); @@ -146,7 +152,9 @@ struct TBundleResourceQuota { int Vcpu; i64 Memory; - i64 Network; + // TODO(grachevkirill): Remove it later. + i64 NetworkBits; + i64 NetworkBytes; REGISTER_YSON_STRUCT(TBundleResourceQuota); diff --git a/yt/yt/client/query_client/query_statistics.cpp b/yt/yt/client/query_client/query_statistics.cpp index 446aa135b53..3398d369fd2 100644 --- a/yt/yt/client/query_client/query_statistics.cpp +++ b/yt/yt/client/query_client/query_statistics.cpp @@ -101,6 +101,7 @@ void Serialize(const TQueryStatistics& statistics, NYson::IYsonConsumer* consume .Item("incomplete_input").Value(statistics.IncompleteInput) .Item("incomplete_output").Value(statistics.IncompleteOutput) .Item("memory_usage").Value(statistics.MemoryUsage) + .Item("total_grouped_row_count").Value(statistics.TotalGroupedRowCount) .DoIf(!statistics.InnerStatistics.empty(), [&] (NYTree::TFluentMap fluent) { fluent .Item("inner_statistics").DoListFor(statistics.InnerStatistics, [=] ( diff --git a/yt/yt/client/table_client/table_upload_options.cpp b/yt/yt/client/table_client/table_upload_options.cpp index 3908a272da3..ba09c3fcfc0 100644 --- a/yt/yt/client/table_client/table_upload_options.cpp +++ b/yt/yt/client/table_client/table_upload_options.cpp @@ -61,7 +61,7 @@ ui64 TEpochSchema::Set(const TTableSchemaPtr& schema) return ++Revision_; } -void TEpochSchema::Persist(const NPhoenix::TPersistenceContext& context) +void TEpochSchema::Persist(const NPhoenix2::TPersistenceContext& context) { using NYT::Persist; @@ -91,7 +91,7 @@ TTableSchemaPtr TTableUploadOptions::GetUploadSchema() const } } -void TTableUploadOptions::Persist(const NPhoenix::TPersistenceContext& context) +void TTableUploadOptions::Persist(const NPhoenix2::TPersistenceContext& context) { using NYT::Persist; diff --git a/yt/yt/client/table_client/table_upload_options.h b/yt/yt/client/table_client/table_upload_options.h index 24e19766fde..74ea5192133 100644 --- a/yt/yt/client/table_client/table_upload_options.h +++ b/yt/yt/client/table_client/table_upload_options.h @@ -13,7 +13,7 @@ #include <yt/yt/core/compression/public.h> -#include <yt/yt/core/misc/phoenix.h> +#include <yt/yt/core/phoenix/context.h> namespace NYT::NTableClient { @@ -39,7 +39,7 @@ public: ui64 Set(const TTableSchemaPtr& schema); - void Persist(const NPhoenix::TPersistenceContext& context); + void Persist(const NPhoenix2::TPersistenceContext& context); ui64 Reset(); @@ -67,7 +67,7 @@ struct TTableUploadOptions TTableSchemaPtr GetUploadSchema() const; - void Persist(const NPhoenix::TPersistenceContext& context); + void Persist(const NPhoenix2::TPersistenceContext& context); }; const std::vector<TString>& GetTableUploadOptionsAttributeKeys(); diff --git a/yt/yt/core/misc/arithmetic_formula.cpp b/yt/yt/core/misc/arithmetic_formula.cpp index 143e3de8f60..d4231991ec1 100644 --- a/yt/yt/core/misc/arithmetic_formula.cpp +++ b/yt/yt/core/misc/arithmetic_formula.cpp @@ -1,5 +1,4 @@ #include "arithmetic_formula.h" -#include "phoenix.h" #include <yt/yt/core/misc/error.h> diff --git a/yt/yt/core/misc/config.cpp b/yt/yt/core/misc/config.cpp index bce3842cc70..2e26b8f5be6 100644 --- a/yt/yt/core/misc/config.cpp +++ b/yt/yt/core/misc/config.cpp @@ -145,6 +145,12 @@ void TExponentialBackoffOptionsSerializer::Register(TRegistrar registrar) registrar.ExternalClassParameter("backoff_jitter", &TThat::BackoffJitter) .Default(TThat::DefaultBackoffJitter); + + registrar.ExternalPostprocessor([] (TThat* config) { + if(config->MinBackoff > config->MaxBackoff) { + THROW_ERROR_EXCEPTION("\"min_backoff\" must be less or equal than \"max_backoff\""); + } + }); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/digest.cpp b/yt/yt/core/misc/digest.cpp index 0ecdb4a37e8..8ce0bd4ef9b 100644 --- a/yt/yt/core/misc/digest.cpp +++ b/yt/yt/core/misc/digest.cpp @@ -1,17 +1,23 @@ #include "digest.h" #include "config.h" -#include <yt/yt/core/misc/phoenix.h> +#include <yt/yt/core/phoenix/type_def.h> namespace NYT { -using namespace NPhoenix; +using namespace NPhoenix2; + +//////////////////////////////////////////////////////////////////////////////// + +void IPersistentDigest::RegisterMetadata(auto&& /*registrar*/) +{ } + +PHOENIX_DEFINE_TYPE(IPersistentDigest); //////////////////////////////////////////////////////////////////////////////// class TLogDigest : public IPersistentDigest - , public NPhoenix::TFactoryTag<NPhoenix::TSimpleFactory> { public: TLogDigest(TLogDigestConfigPtr config) @@ -60,21 +66,6 @@ public: std::fill(Buckets_.begin(), Buckets_.end(), 0); } - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - Persist(context, Step_); - Persist(context, LogStep_); - Persist(context, LowerBound_); - Persist(context, UpperBound_); - Persist(context, DefaultValue_); - Persist(context, BucketCount_); - Persist(context, SampleCount_); - Persist(context, Buckets_); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TLogDigest, 0x42424243); - private: double Step_; double LogStep_; @@ -88,9 +79,23 @@ private: i64 SampleCount_ = 0; std::vector<i64> Buckets_; + + PHOENIX_DECLARE_POLYMORPHIC_TYPE(TLogDigest, 0x42424243); }; -DEFINE_DYNAMIC_PHOENIX_TYPE(TLogDigest); +void TLogDigest::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, Step_)(); + PHOENIX_REGISTER_FIELD(2, LogStep_)(); + PHOENIX_REGISTER_FIELD(3, LowerBound_)(); + PHOENIX_REGISTER_FIELD(4, UpperBound_)(); + PHOENIX_REGISTER_FIELD(5, DefaultValue_)(); + PHOENIX_REGISTER_FIELD(6, BucketCount_)(); + PHOENIX_REGISTER_FIELD(7, SampleCount_)(); + PHOENIX_REGISTER_FIELD(8, Buckets_)(); +} + +PHOENIX_DEFINE_TYPE(TLogDigest); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/digest.h b/yt/yt/core/misc/digest.h index afa80542842..fed29d666d7 100644 --- a/yt/yt/core/misc/digest.h +++ b/yt/yt/core/misc/digest.h @@ -2,7 +2,8 @@ #include "public.h" -#include <yt/yt/core/misc/phoenix.h> +#include <yt/yt/core/phoenix/context.h> +#include <yt/yt/core/phoenix/type_decl.h> namespace NYT { @@ -31,9 +32,12 @@ DEFINE_REFCOUNTED_TYPE(IDigest) struct IPersistentDigest : public IDigest - , public NPhoenix::IPersistent + , public NPhoenix2::IPersistent { - void Persist(const NPhoenix::TPersistenceContext& context) override = 0; + using TLoadContext = NPhoenix2::TLoadContext; + using TSaveContext = NPhoenix2::TSaveContext; + + PHOENIX_DECLARE_POLYMORPHIC_TYPE(IPersistentDigest, 0x1ed99609); }; DEFINE_REFCOUNTED_TYPE(IPersistentDigest) diff --git a/yt/yt/core/misc/histogram.cpp b/yt/yt/core/misc/histogram.cpp index 2d594508324..9ce25ad7845 100644 --- a/yt/yt/core/misc/histogram.cpp +++ b/yt/yt/core/misc/histogram.cpp @@ -1,5 +1,8 @@ #include "histogram.h" +#include <yt/yt/core/phoenix/type_decl.h> +#include <yt/yt/core/phoenix/type_def.h> + #include <yt/yt/core/yson/public.h> #include <yt/yt/core/ytree/fluent.h> @@ -8,13 +11,12 @@ namespace NYT { using namespace NYTree; using namespace NYson; -using namespace NPhoenix; +using namespace NPhoenix2; //////////////////////////////////////////////////////////////////////////////// class THistogram : public IHistogram - , public NPhoenix::TFactoryTag<NPhoenix::TSimpleFactory> { public: THistogram() = default; @@ -74,30 +76,15 @@ public: return result; } - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - Persist(context, MaxBuckets_); - Persist(context, ValueMin_); - Persist(context, ValueMax_); - Persist(context, Items_); - } - private: - struct TItem { + struct TItem + { i64 Value; i64 Count; - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, Value); - Persist(context, Count); - } + PHOENIX_DECLARE_TYPE(TItem, 0x9860143c); }; - DECLARE_DYNAMIC_PHOENIX_TYPE(THistogram, 0x636d76d7); - static const i64 HistogramViewReserveFactor = 2; i64 MaxBuckets_ = 0; @@ -139,9 +126,30 @@ private: View_.Count[GetBucketIndex(item.Value)] += item.Count; } } + + PHOENIX_DECLARE_FRIEND(); + PHOENIX_DECLARE_POLYMORPHIC_TYPE(THistogram, 0x636d76d7); }; -DEFINE_DYNAMIC_PHOENIX_TYPE(THistogram); +void THistogram::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, MaxBuckets_)(); + PHOENIX_REGISTER_FIELD(2, ValueMin_)(); + PHOENIX_REGISTER_FIELD(3, ValueMax_)(); + PHOENIX_REGISTER_FIELD(4, Items_)(); +} + +PHOENIX_DEFINE_TYPE(THistogram); + +//////////////////////////////////////////////////////////////////////////////// + +void THistogram::TItem::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, Value)(); + PHOENIX_REGISTER_FIELD(2, Count)(); +} + +PHOENIX_DEFINE_TYPE(THistogram::TItem); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/histogram.h b/yt/yt/core/misc/histogram.h index 8230790e857..f3615737242 100644 --- a/yt/yt/core/misc/histogram.h +++ b/yt/yt/core/misc/histogram.h @@ -3,7 +3,7 @@ #include "public.h" #include "serialize.h" -#include <yt/yt/core/misc/phoenix.h> +#include <yt/yt/core/phoenix/context.h> namespace NYT { @@ -21,7 +21,7 @@ struct THistogramView }; struct IHistogram - : public virtual NPhoenix::IPersistent + : public virtual NPhoenix2::IPersistent { virtual void AddValue(i64 value, i64 count = 1) = 0; virtual void RemoveValue(i64 value, i64 count = 1) = 0; diff --git a/yt/yt/core/misc/phoenix-inl.h b/yt/yt/core/misc/phoenix-inl.h deleted file mode 100644 index 56cc3f199ca..00000000000 --- a/yt/yt/core/misc/phoenix-inl.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef PHOENIX_INL_H_ -#error "Direct inclusion of this file is not allowed, include phoenix.h" -// For the sake of sane code completion. -#include "phoenix.h" -#endif - -#include <type_traits> - -namespace NYT::NPhoenix { - -//////////////////////////////////////////////////////////////////////////////// - -template <class T> -void* TProfiler::DoInstantiate() -{ - using TFactory = typename TFactoryTraits<T>::TFactory; - using TBase = typename TPolymorphicTraits<T>::TBase; - - T* ptr = TFactory::template Instantiate<T>(); - TBase* basePtr = static_cast<TBase*>(ptr); - return basePtr; -} - -template <class T> -void TProfiler::Register(ui32 tag) -{ - using TIdClass = typename TIdClass<T>::TType; - - auto pair = TagToEntry_.emplace(tag, TEntry()); - YT_VERIFY(pair.second); - auto& entry = pair.first->second; - entry.Tag = tag; - entry.TypeInfo = &typeid(TIdClass); - entry.Factory = std::bind(&DoInstantiate<T>); - YT_VERIFY(TypeInfoToEntry_.emplace(entry.TypeInfo, &entry).second); -} - -template <class T> -T* TProfiler::Instantiate(ui32 tag) -{ - using TBase = typename TPolymorphicTraits<T>::TBase; - TBase* basePtr = static_cast<TBase*>(GetEntry(tag).Factory()); - return dynamic_cast<T*>(basePtr); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NPhoenix diff --git a/yt/yt/core/misc/phoenix.cpp b/yt/yt/core/misc/phoenix.cpp deleted file mode 100644 index fbfb7304693..00000000000 --- a/yt/yt/core/misc/phoenix.cpp +++ /dev/null @@ -1,34 +0,0 @@ -#include "phoenix.h" - -#include "collection_helpers.h" - -namespace NYT::NPhoenix { - -//////////////////////////////////////////////////////////////////////////////// - -TProfiler::TProfiler() -{ } - -TProfiler* TProfiler::Get() -{ - return Singleton<TProfiler>(); -} - -ui32 TProfiler::GetTag(const std::type_info& typeInfo) -{ - return GetEntry(typeInfo).Tag; -} - -const TProfiler::TEntry& TProfiler::GetEntry(ui32 tag) -{ - return GetOrCrash(TagToEntry_, tag); -} - -const TProfiler::TEntry& TProfiler::GetEntry(const std::type_info& typeInfo) -{ - return *GetOrCrash(TypeInfoToEntry_, &typeInfo); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NPhoenix diff --git a/yt/yt/core/misc/phoenix.h b/yt/yt/core/misc/phoenix.h deleted file mode 100644 index cbdd744c24a..00000000000 --- a/yt/yt/core/misc/phoenix.h +++ /dev/null @@ -1,258 +0,0 @@ -#pragma once - -#include "id_generator.h" -#include "mpl.h" -#include "serialize.h" - -#include <yt/yt/core/actions/callback.h> - -#include <yt/yt/core/phoenix/context.h> -#include <yt/yt/core/phoenix/polymorphic.h> -#include <yt/yt/core/phoenix/type_decl.h> -#include <yt/yt/core/phoenix/type_def.h> - -#include <typeinfo> - -namespace NYT::NPhoenix { - -//////////////////////////////////////////////////////////////////////////////// - -constexpr ui32 InlineObjectIdMask = 0x80000000; -constexpr ui32 NullObjectId = 0x00000000; - -//////////////////////////////////////////////////////////////////////////////// - -using NPhoenix2::NDetail::TSerializer; -using NPhoenix2::TLoadContext; -using NPhoenix2::TSaveContext; - -using TDynamicTag = NPhoenix2::TPolymorphicBase; - -//////////////////////////////////////////////////////////////////////////////// - -template <class TFactory> -struct TFactoryTag -{ }; - -//////////////////////////////////////////////////////////////////////////////// - -template <class T, class = void> -struct TPolymorphicTraits -{ - static const bool Dynamic = false; - using TBase = T; -}; - -template <class T> -struct TPolymorphicTraits< - T, - typename std::enable_if_t< - std::is_convertible_v<T&, TDynamicTag&> - > -> -{ - static const bool Dynamic = true; - using TBase = TDynamicTag; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct TNullFactory -{ - template <class T> - static T* Instantiate() - { - YT_ABORT(); - } -}; - -struct TSimpleFactory -{ - template <class T> - static T* Instantiate() - { - return new T(); - } -}; - -struct TRefCountedFactory -{ - template <class T> - static T* Instantiate() - { - return New<T>().Release(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -template <class T, class = void> -struct TFactoryTraits -{ - using TFactory = TSimpleFactory; -}; - -template <class T> -struct TFactoryTraits< - T, - typename std::enable_if_t< - std::conjunction_v< - std::is_convertible<T*, NYT::TRefCountedBase*>, - std::negation<std::is_convertible<T*, TFactoryTag<TNullFactory>*>> - > - > -> -{ - using TFactory = TRefCountedFactory; -}; - -template <class T> -struct TFactoryTraits< - T, - typename std::enable_if_t< - std::is_convertible_v<T*, TFactoryTag<TNullFactory>*> - > -> -{ - using TFactory = TNullFactory; -}; - -//////////////////////////////////////////////////////////////////////////////// - -template <class T, class = void> -struct TIdClass -{ - using TType = T; -}; - -template <class T> -struct TIdClass< - T, - typename std::enable_if_t< - std::is_convertible_v<T*, NYT::TRefCountedBase*> - > -> -{ - using TType = NYT::TRefCountedWrapper<T>; -}; - - -//////////////////////////////////////////////////////////////////////////////// - -class TProfiler -{ -public: - static TProfiler* Get(); - - ui32 GetTag(const std::type_info& typeInfo); - - template <class T> - T* Instantiate(ui32 tag); - - template <class T> - void Register(ui32 tag); - -private: - struct TEntry - { - const std::type_info* TypeInfo; - ui32 Tag; - std::function<void*()> Factory; - }; - - THashMap<const std::type_info*, TEntry*> TypeInfoToEntry_; - THashMap<ui32, TEntry> TagToEntry_; - - TProfiler(); - - const TEntry& GetEntry(ui32 tag); - const TEntry& GetEntry(const std::type_info& typeInfo); - - template <class T> - static void* DoInstantiate(); - - Y_DECLARE_SINGLETON_FRIEND() -}; - -//////////////////////////////////////////////////////////////////////////////// - -template < - class TType, - ui32 tag, - class TFactory = typename TFactoryTraits<TType>::TFactory -> -struct TDynamicInitializer -{ - TDynamicInitializer() - { - TProfiler::Get()->Register<TType>(tag); - } -}; - -template < - class TType, - ui32 tag -> -struct TDynamicInitializer<TType, tag, TRefCountedFactory> -{ - TDynamicInitializer() - { - TProfiler::Get()->Register<TType>(tag); - } -}; - -#define DECLARE_DYNAMIC_PHOENIX_TYPE(...) \ - static ::NYT::NPhoenix::TDynamicInitializer<__VA_ARGS__> \ - DynamicPhoenixInitializer; \ - PHOENIX_DECLARE_OPAQUE_TYPE(__VA_ARGS__) - -// __VA_ARGS__ are used because sometimes we want a template type -// to be an argument but the single macro argument may not contain -// commas. Dat preprocessor :/ -#define DEFINE_DYNAMIC_PHOENIX_TYPE(...) \ - decltype(__VA_ARGS__::DynamicPhoenixInitializer) \ - __VA_ARGS__::DynamicPhoenixInitializer; \ - PHOENIX_DEFINE_OPAQUE_TYPE(__VA_ARGS__) - -#define INHERIT_DYNAMIC_PHOENIX_TYPE(baseType, type, tag) \ -class type \ - : public baseType \ -{ \ -public: \ - using baseType::baseType; \ - \ -private: \ - DECLARE_DYNAMIC_PHOENIX_TYPE(type, tag); \ -} - -#define INHERIT_DYNAMIC_PHOENIX_TYPE_TEMPLATED(baseType, type, tag, ...) \ -class type \ - : public baseType<__VA_ARGS__> \ -{ \ -public: \ - using baseType::baseType; \ - \ -private: \ - DECLARE_DYNAMIC_PHOENIX_TYPE(type, tag); \ -} - -//////////////////////////////////////////////////////////////////////////////// - -template <class C> -struct ICustomPersistent - : public virtual TDynamicTag -{ - virtual ~ICustomPersistent() = default; - virtual void Persist(const C& context) = 0; -}; - -using TPersistenceContext = TCustomPersistenceContext<TSaveContext, TLoadContext>; -using IPersistent = ICustomPersistent<TPersistenceContext>; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NPhoenix - -#define PHOENIX_INL_H_ -#include "phoenix-inl.h" -#undef PHOENIX_INL_H_ diff --git a/yt/yt/core/misc/unittests/phoenix_compatibility_ut.cpp b/yt/yt/core/misc/unittests/phoenix_compatibility_ut.cpp deleted file mode 100644 index 02c1e22516a..00000000000 --- a/yt/yt/core/misc/unittests/phoenix_compatibility_ut.cpp +++ /dev/null @@ -1,433 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/core/misc/blob_output.h> -#include <yt/yt/core/misc/phoenix.h> - -#include <yt/yt/core/phoenix/type_decl.h> -#include <yt/yt/core/phoenix/type_def.h> -#include <yt/yt/core/phoenix/polymorphic.h> - -namespace NYT { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -using TSaveContext = NPhoenix::TSaveContext; -using TLoadContext = NPhoenix::TLoadContext; -using TPersistenceContext = NPhoenix::TPersistenceContext; -using IPersistent = NPhoenix::IPersistent; -using IPersistent2 = NPhoenix2::ICustomPersistent<TSaveContext, TLoadContext, TPersistenceContext>; - -//////////////////////////////////////////////////////////////////////////////// - -template <class T> -TSharedRef Serialize(const T& value) -{ - TBlobOutput output; - TSaveContext context(&output); - Save(context, value); - context.Finish(); - return output.Flush(); -} - -template <class T> -void Deserialize(T& value, TRef ref) -{ - TMemoryInput input(ref.Begin(), ref.Size()); - TLoadContext context(&input); - Load(context, value); -} - -template <class T> -void InplaceDeserialize(TIntrusivePtr<T> value, TRef ref) -{ - TMemoryInput input(ref.Begin(), ref.Size()); - TLoadContext context(&input); - NPhoenix::TSerializer::InplaceLoad(context, value); -} - -//////////////////////////////////////////////////////////////////////////////// - -namespace NPhoenixCompatibilityMixedComposition { - -struct TComponentV1 -{ - int A = 0; - double B = 0; - - bool operator==(const TComponentV1& other) const = default; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, A); - Persist(context, B); - } -}; - -struct TComponentV2 -{ - int C = 0; - double D = 0; - - bool operator==(const TComponentV2& other) const = default; - - PHOENIX_DECLARE_TYPE(TComponentV2, 0x89e65bad); -}; - -void TComponentV2::RegisterMetadata(auto&& registrar) -{ - PHOENIX_REGISTER_FIELD(1, C)(); - PHOENIX_REGISTER_FIELD(2, D)(); -} - -PHOENIX_DEFINE_TYPE(TComponentV2); - -struct TCompositeV1 -{ - TComponentV1 X; - TComponentV2 Y; - - bool operator==(const TCompositeV1& other) const = default; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - Persist(context, Y); - } -}; - -struct TCompositeV2 -{ - TComponentV1 U; - TComponentV2 V; - - bool operator==(const TCompositeV2& other) const = default; - - PHOENIX_DECLARE_TYPE(TCompositeV2, 0xd1c75b13); -}; - -void TCompositeV2::RegisterMetadata(auto&& registrar) -{ - PHOENIX_REGISTER_FIELD(1, U)(); - PHOENIX_REGISTER_FIELD(2, V)(); -} - -PHOENIX_DEFINE_TYPE(TCompositeV2); - -} // namespace NPhoenixCompatibilityMixedComposition - -TEST(TPhoenixCompatibilityTest, MixedComposition) -{ - using namespace NPhoenixCompatibilityMixedComposition; - - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TComponentV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TComponentV2>); - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TCompositeV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TCompositeV2>); - - struct TDerived : public TComponentV2 { - }; - - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TDerived>); - - TComponentV1 component_v1; - component_v1.A = 7; - component_v1.B = 3.1416; - - TComponentV2 component_v2; - component_v2.C = 13; - component_v2.D = 2.7183; - - { - TCompositeV1 composite_v1; - composite_v1.X = component_v1; - composite_v1.Y = component_v2; - - TCompositeV1 composite_v1_2; - Deserialize(composite_v1_2, Serialize(composite_v1)); - - EXPECT_EQ(composite_v1_2, composite_v1); - } - - { - TCompositeV2 composite_v2; - composite_v2.U = component_v1; - composite_v2.V = component_v2; - - TCompositeV2 composite_v2_2; - Deserialize(composite_v2_2, Serialize(composite_v2)); - - EXPECT_EQ(composite_v2_2, composite_v2); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -namespace NPhoenixCompatibilityMixedInheritance { - -struct TBaseV1 -{ - int A = 0; - double B = 0; - - bool operator==(const TBaseV1& other) const = default; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - - Persist(context, A); - Persist(context, B); - } - - PHOENIX_DECLARE_OPAQUE_TYPE(TBaseV1, 0xc5b6cc03); -}; - -PHOENIX_DEFINE_OPAQUE_TYPE(TBaseV1); - -struct TBaseV2 -{ - int C = 0; - double D = 0; - - bool operator==(const TBaseV2& other) const = default; - - PHOENIX_DECLARE_TYPE(TBaseV2, 0x5d59f4a3); -}; - -void TBaseV2::RegisterMetadata(auto&& registrar) -{ - PHOENIX_REGISTER_FIELD(1, C)(); - PHOENIX_REGISTER_FIELD(2, D)(); -} - -PHOENIX_DEFINE_TYPE(TBaseV2); - -struct TDerivedV1 - : public TBaseV1 - , public TBaseV2 -{ - int E = 0; - double F = 0; - - bool operator==(const TDerivedV1& other) const = default; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - - TBaseV1::Persist(context); - TBaseV2::Persist(context); - - Persist(context, E); - Persist(context, F); - } -}; - -struct TDerivedV2 - : public TBaseV1 - , public TBaseV2 -{ - int E = 0; - double F = 0; - - bool operator==(const TDerivedV2& other) const = default; - - PHOENIX_DECLARE_TYPE(TDerivedV2, 0x7d64560b); -}; - -void TDerivedV2::RegisterMetadata(auto&& registrar) -{ - registrar.template BaseType<TBaseV1>(); - registrar.template BaseType<TBaseV2>(); - - PHOENIX_REGISTER_FIELD(1, E)(); - PHOENIX_REGISTER_FIELD(2, F)(); -} - -PHOENIX_DEFINE_TYPE(TDerivedV2); - -} // namespace NPhoenixCompatibilityMixedInheritance - -TEST(TPhoenixCompatibilityTest, MixedInheritance) -{ - using namespace NPhoenixCompatibilityMixedInheritance; - - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TBaseV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TBaseV2>); - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TDerivedV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TDerivedV2>); - - TDerivedV1 derived_v1; - derived_v1.A = 77; - derived_v1.B = -0.77; - derived_v1.C = 33; - derived_v1.D = -0.33; - derived_v1.E = 5; - derived_v1.F = -0.5; - - TDerivedV1 derived_v1_2; - Deserialize(derived_v1_2, Serialize(derived_v1)); - - EXPECT_EQ(derived_v1_2, derived_v1); - - TDerivedV2 derived_v2; - derived_v2.A = 77; - derived_v2.B = -0.77; - derived_v2.C = 33; - derived_v2.D = -0.33; - derived_v2.E = 5; - derived_v2.F = -0.5; - - TDerivedV2 derived_v2_2; - Deserialize(derived_v2_2, Serialize(derived_v2)); - - EXPECT_EQ(derived_v2_2, derived_v2); -} - -//////////////////////////////////////////////////////////////////////////////// - -namespace NPhoenixCompatibilityMixedPolymorphic { - -struct TBaseV1 - : public TRefCounted - , public IPersistent -{ - int A = 0; - double B = 0; - - bool operator==(const TBaseV1& other) const = default; - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - - Persist(context, A); - Persist(context, B); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TBaseV1, 0xbb6b6874); -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TBaseV1); - -struct TBaseV2 - : public TRefCounted - , public IPersistent2 -{ - int C = 0; - double D = 0; - - bool operator==(const TBaseV2& other) const = default; - - PHOENIX_DECLARE_POLYMORPHIC_TYPE(TBaseV2, 0x20673022); -}; - -void TBaseV2::RegisterMetadata(auto&& registrar) -{ - PHOENIX_REGISTER_FIELD(1, C)(); - PHOENIX_REGISTER_FIELD(2, D)(); -} - -PHOENIX_DEFINE_TYPE(TBaseV2); - -struct TDerivedV1 - : public TBaseV2 -{ - int E = 0; - double F = 0; - - bool operator==(const TDerivedV1& other) const = default; - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - - TBaseV2::Persist(context); - - Persist(context, E); - Persist(context, F); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TDerivedV1, 0x5b176bab); -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TDerivedV1); - -struct TDerivedV2 - : public TBaseV1 - , public IPersistent2 -{ - int E = 0; - double F = 0; - - bool operator==(const TDerivedV2& other) const = default; - - PHOENIX_DECLARE_POLYMORPHIC_TYPE(TDerivedV2, 0xc461765b); -}; - -void TDerivedV2::RegisterMetadata(auto&& registrar) -{ - registrar.template BaseType<TBaseV1>(); - - PHOENIX_REGISTER_FIELD(1, E)(); - PHOENIX_REGISTER_FIELD(2, F)(); -} - -PHOENIX_DEFINE_TYPE(TDerivedV2); - -} // namespace NPhoenixCompatibilityMixedPolymorphic - -TEST(TPhoenixCompatibilityTest, MixedPolymorphic) -{ - using namespace NPhoenixCompatibilityMixedPolymorphic; - - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TBaseV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TBaseV2>); - EXPECT_FALSE(NPhoenix2::SupportsPhoenix2<TDerivedV1>); - EXPECT_TRUE(NPhoenix2::SupportsPhoenix2<TDerivedV2>); - - EXPECT_TRUE(NPhoenix2::NDetail::TPolymorphicTraits<TBaseV1>::Polymorphic); - EXPECT_TRUE(NPhoenix2::NDetail::TPolymorphicTraits<TBaseV2>::Polymorphic); - EXPECT_TRUE(NPhoenix2::NDetail::TPolymorphicTraits<TDerivedV1>::Polymorphic); - EXPECT_TRUE(NPhoenix2::NDetail::TPolymorphicTraits<TDerivedV2>::Polymorphic); - - auto derived_v1 = New<TDerivedV1>(); - derived_v1->C = 33; - derived_v1->D = -0.33; - derived_v1->E = 5; - derived_v1->F = -0.5; - - TIntrusivePtr<TBaseV2> base_v2_2; - Deserialize(base_v2_2, Serialize(derived_v1)); - - auto* derived_v1_2 = dynamic_cast<TDerivedV1*>(base_v2_2.Get()); - EXPECT_NE(derived_v1_2, nullptr); - EXPECT_EQ(derived_v1_2->C, 33); - EXPECT_EQ(derived_v1_2->D, -0.33); - EXPECT_EQ(derived_v1_2->E, 5); - EXPECT_EQ(derived_v1_2->F, -0.5); - - auto derived_v2 = New<TDerivedV2>(); - derived_v2->A = 33; - derived_v2->B = -0.33; - derived_v2->E = 5; - derived_v2->F = -0.5; - - TIntrusivePtr<TBaseV1> base_v1_2; - Deserialize(base_v1_2, Serialize(derived_v2)); - - auto* derived_v2_2 = dynamic_cast<TDerivedV2*>(base_v1_2.Get()); - EXPECT_NE(derived_v2_2, nullptr); - EXPECT_EQ(derived_v2_2->A, 33); - EXPECT_EQ(derived_v2_2->B, -0.33); - EXPECT_EQ(derived_v2_2->E, 5); - EXPECT_EQ(derived_v2_2->F, -0.5); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT diff --git a/yt/yt/core/misc/unittests/phoenix_ut.cpp b/yt/yt/core/misc/unittests/phoenix_ut.cpp index 1d416754442..a161cc71eff 100644 --- a/yt/yt/core/misc/unittests/phoenix_ut.cpp +++ b/yt/yt/core/misc/unittests/phoenix_ut.cpp @@ -1,625 +1 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/core/misc/blob_output.h> -#include <yt/yt/core/misc/phoenix.h> - -namespace NYT { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -using TSaveContext = NPhoenix::TSaveContext; -using TLoadContext = NPhoenix::TLoadContext; -using IPersistent = NPhoenix::IPersistent; -using TPersistenceContext = NPhoenix::TPersistenceContext; - -//////////////////////////////////////////////////////////////////////////////// - -template <class T> -TSharedRef Serialize(const T& value) -{ - TBlobOutput output; - TSaveContext context(&output); - Save(context, value); - context.Finish(); - return output.Flush(); -} - -template <class T> -void Deserialize(T& value, TRef ref) -{ - TMemoryInput input(ref.Begin(), ref.Size()); - TLoadContext context(&input); - Load(context, value); -} - -template <class T> -void InplaceDeserialize(TIntrusivePtr<T> value, TRef ref) -{ - TMemoryInput input(ref.Begin(), ref.Size()); - TLoadContext context(&input); - NPhoenix::TSerializer::InplaceLoad(context, value); -} - -//////////////////////////////////////////////////////////////////////////////// - -namespace NSimple { - -TEST(TPhoenixTest, Scalar) -{ - struct C - { - C() - : A(-1) - , B(-1) - { } - - int A; - double B; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, A); - Persist(context, B); - } - }; - - C c1; - c1.A = 10; - c1.B = 3.14; - - C c2; - Deserialize(c2, Serialize(c1)); - - EXPECT_EQ(c1.A, c2.A); - EXPECT_EQ(c1.B, c2.B); -} - -TEST(TPhoenixTest, Guid) -{ - TGuid g1(1, 2, 3, 4); - - TGuid g2(5, 6, 7, 8); - Deserialize(g2, Serialize(g1)); - - EXPECT_EQ(g1, g2); -} - -TEST(TPhoenixTest, Vector) -{ - std::vector<int> v1; - v1.push_back(1); - v1.push_back(2); - v1.push_back(3); - - std::vector<int> v2; - v2.push_back(33); - Deserialize(v2, Serialize(v1)); - - EXPECT_EQ(v1, v2); -} - -} // namespace NSimple - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef1 { - -struct A; -struct B; - -struct A - : public TRefCounted -{ - TIntrusivePtr<B> X; - TIntrusivePtr<B> Y; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - Persist(context, Y); - } - -}; - -struct B - : public TRefCounted -{ - B() - : V(-1) - { } - - int V; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, V); - } - -}; - -TEST(TPhoenixTest, Ref1) -{ - auto a1 = New<A>(); - a1->X = New<B>(); - a1->X->V = 1; - a1->Y = New<B>(); - a1->Y->V = 2; - - TIntrusivePtr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->GetRefCount(), 1); - EXPECT_EQ(a2->X->GetRefCount(), 1); - EXPECT_EQ(a2->X->V, 1); - EXPECT_EQ(a2->Y->GetRefCount(), 1); - EXPECT_EQ(a2->Y->V, 2); -} - -} // namespace NRef1 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef2 { - -struct A - : public TRefCounted -{ - TIntrusivePtr<A> X; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - } - -}; - -TEST(TPhoenixTest, Ref2) -{ - auto a1 = New<A>(); - - TIntrusivePtr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->GetRefCount(), 1); - EXPECT_FALSE(a2->X.operator bool()); -} - -} // namespace NRef2 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef3 { - -struct A - : public TRefCounted -{ - TIntrusivePtr<A> X; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - } - -}; - -TEST(TPhoenixTest, Ref3) -{ - auto a1 = New<A>(); - a1->X = a1; - - TIntrusivePtr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->GetRefCount(), 2); - EXPECT_EQ(a2->X, a2); - - a1->X.Reset(); - a2->X.Reset(); -} - -} // namespace NRef3 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef4 { - -struct A - : public TRefCounted -{ - A() - : X(nullptr) - { } - - A* X; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - } - -}; - -TEST(TPhoenixTest, Ref4) -{ - auto a1 = New<A>(); - a1->X = a1.Get(); - - TIntrusivePtr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->GetRefCount(), 1); - EXPECT_EQ(a2->X, a2); -} - -} // namespace NRef4 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef5 { - -struct A; -struct B; - -struct A - : public TRefCounted -{ - A() - : X(nullptr) - , Y(nullptr) - { } - - B* X; - TIntrusivePtr<B> Y; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - Persist(context, Y); - } -}; - -struct B - : public TRefCounted -{ - B() - : V(-1) - { } - - int V; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, V); - } - -}; - -TEST(TPhoenixTest, Ref5) -{ - auto a1 = New<A>(); - a1->Y = New<B>(); - a1->Y->V = 7; - a1->X = a1->Y.Get(); - - TIntrusivePtr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->GetRefCount(), 1); - EXPECT_EQ(a2->Y->GetRefCount(), 1); - EXPECT_EQ(a2->Y->V, 7); - EXPECT_EQ(a2->X, a2->Y); -} - -} // namespace NRef5 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef6 { - -struct TBase - : public TRefCounted - , public IPersistent -{ }; - -struct TDerived1 - : public TBase -{ - TDerived1() - : V(-1) - { } - - int V; - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - Persist(context, V); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TDerived1, 0x71297841); - -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TDerived1); - -struct TDerived2 - : public TBase -{ - TDerived2() - : V(-1) - { } - - double V; - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - Persist(context, V); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TDerived2, 0x62745629); - -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TDerived2); - -TEST(TPhoenixTest, Ref6) -{ - auto derived1 = New<TDerived1>(); - derived1->V = 5; - TIntrusivePtr<TBase> base1(derived1); - - TIntrusivePtr<TBase> base2; - Deserialize(base2, Serialize(base1)); - - EXPECT_EQ(base2->GetRefCount(), 1); - auto* derived2 = dynamic_cast<TDerived1*>(base2.Get()); - EXPECT_NE(derived2, nullptr); - EXPECT_EQ(derived2->V, 5); -} - -} // namespace NRef6 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef7 { - -struct TNonConstructable - : public TRefCounted - , public NPhoenix::TFactoryTag<NPhoenix::TNullFactory> -{ - explicit TNonConstructable(int x) - : X(x) - { } - - int X; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TNonConstructable, 0x14712618); - -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TNonConstructable); - -TEST(TPhoenixTest, Ref7) -{ - auto obj1 = New<TNonConstructable>(123); - EXPECT_EQ(obj1->X, 123); - - auto obj2 = New<TNonConstructable>(456); - - InplaceDeserialize(obj2, Serialize(obj1)); - - EXPECT_EQ(obj2->X, 123); -} - -} // namespace NRef7 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef8 { - -struct A; -struct B; - -struct A - : public NPhoenix::TFactoryTag<NPhoenix::TSimpleFactory> -{ - int X; - std::unique_ptr<B> T; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - Persist(context, T); - } - -}; - -struct B - : public NPhoenix::TFactoryTag<NPhoenix::TSimpleFactory> -{ - int Y; - A* Z; - - void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, Y); - Persist(context, Z); - } - -}; - -TEST(TPhoenixTest, Ref8) -{ - std::unique_ptr<A> a1(new A()); - a1->X = 123; - a1->T.reset(new B()); - a1->T->Y = 456; - a1->T->Z = a1.get(); - - std::unique_ptr<A> a2; - Deserialize(a2, Serialize(a1)); - - EXPECT_EQ(a2->X, 123); - EXPECT_EQ(a2->T->Y, 456); - EXPECT_EQ(a2->T->Z, a2.get()); -} - -} // namespace NRef8 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef9 { - -struct A; -struct B; - -struct A - : public IPersistent -{ - explicit A(int x) - : X(x) - { } - - int X; - - virtual void Foo() = 0; - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - Persist(context, X); - } - -}; - -struct B - : public A - , public NPhoenix::TFactoryTag<NPhoenix::TSimpleFactory> -{ - B() - : A(0) - , Y(0) - , Z(nullptr) - { } - - int Y; - A* Z; - - void Foo() override - { } - - void Persist(const TPersistenceContext& context) override - { - using NYT::Persist; - A::Persist(context); - Persist(context, Y); - Persist(context, Z); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(B, 0x54717818); - -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(B); - -TEST(TPhoenixTest, Ref9) -{ - std::unique_ptr<B> b1(new B()); - b1->X = 123; - b1->Y = 456; - b1->Z = b1.get(); - - std::unique_ptr<A> a1(b1.release()); - - std::unique_ptr<A> a2; - Deserialize(a2, Serialize(a1)); - - B* b2 = dynamic_cast<B*>(a2.get()); - EXPECT_NE(b2, nullptr); - EXPECT_EQ(b2->X, 123); - EXPECT_EQ(b2->Y, 456); - EXPECT_EQ(b2->Z, b2); -} - -} // namespace NRef9 - -//////////////////////////////////////////////////////////////////////////////// - -namespace NRef10 { - -struct TBase - : public TRefCounted - , public NPhoenix::TDynamicTag -{ - TBase() - : X (0) - { } - - int X; - - virtual void Persist(const TPersistenceContext& context) - { - using NYT::Persist; - Persist(context, X); - } -}; - -struct TDerived - : public TBase -{ - TDerived() - { } - - int Y; - - void Persist(const TPersistenceContext& context) override - { - TBase::Persist(context); - - using NYT::Persist; - Persist(context, Y); - } - - DECLARE_DYNAMIC_PHOENIX_TYPE(TDerived, 0x57818795); - -}; - -DEFINE_DYNAMIC_PHOENIX_TYPE(TDerived); - -TEST(TPhoenixTest, Ref10) -{ - auto obj1 = New<TDerived>(); - obj1->X = 123; - obj1->Y = 456; - - auto obj2 = New<TDerived>(); - - InplaceDeserialize(obj2, Serialize(TIntrusivePtr<TBase>(obj1))); - - EXPECT_EQ(obj2->X, 123); - EXPECT_EQ(obj2->Y, 456); -} - -} // namespace NRef10 - -//////////////////////////////////////////////////////////////////////////////// -} // namespace -} // namespace NYT +// TODO(galtsev): remove file diff --git a/yt/yt/core/misc/unittests/ya.make b/yt/yt/core/misc/unittests/ya.make index f8ab343a69d..aaae16b1dfd 100644 --- a/yt/yt/core/misc/unittests/ya.make +++ b/yt/yt/core/misc/unittests/ya.make @@ -49,8 +49,6 @@ SRCS( mpl_ut.cpp pattern_formatter_ut.cpp persistent_queue_ut.cpp - phoenix_ut.cpp - phoenix_compatibility_ut.cpp pool_allocator_ut.cpp proc_ut.cpp random_ut.cpp diff --git a/yt/yt/core/phoenix/context.h b/yt/yt/core/phoenix/context.h index 7e555c910e9..d4cadf0950e 100644 --- a/yt/yt/core/phoenix/context.h +++ b/yt/yt/core/phoenix/context.h @@ -72,35 +72,16 @@ private: //////////////////////////////////////////////////////////////////////////////// -template <class TSaveContext, class TLoadContext, class TPersistenceContext> +template <class TSaveContext, class TLoadContext> struct ICustomPersistent : public virtual TPolymorphicBase { - virtual void SaveImpl(TSaveContext& context) const - { - const_cast<ICustomPersistent<TSaveContext, TLoadContext, TPersistenceContext>*>(this)->Persist(context); - } - - virtual void LoadImpl(TLoadContext& context) - { - Persist(context); - } - - virtual void Save(TSaveContext& context) const - { - SaveImpl(context); - } - - virtual void Load(TLoadContext& context) - { - LoadImpl(context); - } - - virtual void Persist(const TPersistenceContext& context) = 0; + virtual void Save(TSaveContext& context) const = 0; + virtual void Load(TLoadContext& context) = 0; }; +using IPersistent = ICustomPersistent<TSaveContext, TLoadContext>; using TPersistenceContext = TCustomPersistenceContext<TSaveContext, TLoadContext>; -using IPersistent = ICustomPersistent<TSaveContext, TLoadContext, TPersistenceContext>; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/phoenix/type_decl-inl.h b/yt/yt/core/phoenix/type_decl-inl.h index fc4444370e8..e5db38c3303 100644 --- a/yt/yt/core/phoenix/type_decl-inl.h +++ b/yt/yt/core/phoenix/type_decl-inl.h @@ -57,12 +57,8 @@ private: \ PHOENIX_DECLARE_TYPE__PROLOGUE(type, typeTagValue); \ public: \ static const ::NYT::NPhoenix2::TTypeDescriptor& GetTypeDescriptor(); \ - void SaveImpl(TSaveContext& context) const saveLoadModifier; \ - void LoadImpl(TLoadContext& context) saveLoadModifier; \ void Save(TSaveContext& context) const saveLoadModifier; \ void Load(TLoadContext& context) saveLoadModifier; \ - void Persist(const TPersistenceContext& context) saveLoadModifier; \ - using TPhoenix2SupportTag = type; \ \ private: \ static const ::NYT::NPhoenix2::NDetail::TRuntimeFieldDescriptorMap<type, TLoadContext>& GetRuntimeFieldDescriptorMap() @@ -93,34 +89,14 @@ public: \ return map; \ } \ \ - void SaveImpl(TSaveContext& context) const saveLoadModifier \ - { \ - ::NYT::NPhoenix2::NDetail::SaveImpl(this, context); \ - } \ - \ - void LoadImpl(TLoadContext& context) saveLoadModifier \ - { \ - ::NYT::NPhoenix2::NDetail::LoadImpl(this, context); \ - } \ - \ void Save(TSaveContext& context) const saveLoadModifier \ { \ - const_cast<type*>(this)->Persist(context); \ + ::NYT::NPhoenix2::NDetail::SaveImpl(this, context); \ } \ \ void Load(TLoadContext& context) saveLoadModifier \ { \ - Persist(context); \ - } \ - \ - void Persist(const TPersistenceContext& context) saveLoadModifier \ - { \ - if (context.IsSave()) { \ - type::SaveImpl(context.SaveContext()); \ - } else { \ - YT_VERIFY(context.IsLoad()); \ - type::LoadImpl(context.LoadContext()); \ - } \ + ::NYT::NPhoenix2::NDetail::LoadImpl(this, context); \ } #define PHOENIX_DECLARE_TEMPLATE_TYPE(type, typeTag) \ @@ -134,6 +110,28 @@ public: \ [[maybe_unused]] static constexpr auto TypeTag = ::NYT::NPhoenix2::TTypeTag(typeTagValue); \ static const ::NYT::NPhoenix2::TTypeDescriptor& GetTypeDescriptor() +#define PHOENIX_INHERIT_POLYMORPHIC_TYPE(baseType, type, tag) \ +class type \ + : public baseType \ +{ \ +public: \ + using baseType::baseType; \ + \ +private: \ + PHOENIX_DECLARE_POLYMORPHIC_TYPE(type, tag); \ +} + +#define PHOENIX_INHERIT_POLYMORPHIC_TEMPLATE_TYPE(baseType, type, tag, ...) \ +class type \ + : public baseType<__VA_ARGS__> \ +{ \ +public: \ + using baseType::baseType; \ + \ +private: \ + PHOENIX_DECLARE_POLYMORPHIC_TYPE(type, tag); \ +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NPhoenix2::NDetail diff --git a/yt/yt/core/phoenix/type_def-inl.h b/yt/yt/core/phoenix/type_def-inl.h index b7322d79551..59535a02ec3 100644 --- a/yt/yt/core/phoenix/type_def-inl.h +++ b/yt/yt/core/phoenix/type_def-inl.h @@ -4,7 +4,6 @@ #include "type_def.h" #endif -#include "concepts.h" #include "factory.h" #include "polymorphic.h" #include "context.h" @@ -42,34 +41,14 @@ namespace NYT::NPhoenix2::NDetail { return map; \ } \ \ - void type::SaveImpl(TSaveContext& context) const \ - { \ - ::NYT::NPhoenix2::NDetail::SaveImpl(this, context); \ - } \ - \ - void type::LoadImpl(TLoadContext& context) \ - { \ - ::NYT::NPhoenix2::NDetail::LoadImpl(this, context); \ - } \ - \ void type::Save(TSaveContext& context) const \ { \ - const_cast<type*>(this)->Persist(context); \ + ::NYT::NPhoenix2::NDetail::SaveImpl(this, context); \ } \ \ void type::Load(TLoadContext& context) \ { \ - Persist(context); \ - } \ - \ - void type::Persist(const TPersistenceContext& context) \ - { \ - if (context.IsSave()) { \ - type::SaveImpl(context.SaveContext()); \ - } else { \ - YT_VERIFY(context.IsLoad()); \ - type::LoadImpl(context.LoadContext()); \ - } \ + ::NYT::NPhoenix2::NDetail::LoadImpl(this, context); \ } \ \ template <class T> \ @@ -146,6 +125,11 @@ public: return std::move(*this); } + auto BeforeVersion(auto /*version*/) && + { + return std::move(*this); + } + auto InVersions(auto /*filter*/) && { return std::move(*this); @@ -317,26 +301,11 @@ public: { } template <class TBase> - requires SupportsPhoenix2<TBase> - void BaseType() - { - This_->TBase::SaveImpl(Context_); - } - - template <class TBase> - requires (!SupportsPhoenix2<TBase> && !SupportsPersist<TBase, TContext>) void BaseType() { This_->TBase::Save(Context_); } - template <class TBase> - requires (!SupportsPhoenix2<TBase> && SupportsPersist<TBase, TContext>) - void BaseType() - { - const_cast<TThis*>(This_)->TBase::Persist(Context_); - } - private: const TThis* const This_; TContext& Context_; @@ -346,6 +315,8 @@ template <auto Member, class TThis, class TContext, class TFieldSerializer> class PHOENIX_REGISTRAR_NODISCARD TFieldSaveRegistrar { public: + using TVersion = typename TTraits<TThis>::TVersion; + TFieldSaveRegistrar(const TThis* this_, TContext& context) : This_(this_) , Context_(context) @@ -362,6 +333,12 @@ public: return TFieldSaveRegistrar(std::move(*this)); } + auto BeforeVersion(TVersion version) && + { + BeforeVersion_ = version; + return TFieldSaveRegistrar(std::move(*this)); + } + auto InVersions(TVersionFilter<TThis> filter) && { VersionFilter_ = filter; @@ -381,7 +358,7 @@ public: void operator()() && { - if (!VersionFilter_ || VersionFilter_(Context_.GetVersion())) { + if (auto version = Context_.GetVersion(); version < BeforeVersion_ && (!VersionFilter_ || VersionFilter_(version))) { TFieldSerializer::Save(Context_, This_->*Member); } } @@ -394,6 +371,7 @@ private: TContext& Context_; TVersionFilter<TThis> VersionFilter_ = nullptr; + TVersion BeforeVersion_ = static_cast<TVersion>(std::numeric_limits<int>::max()); }; template <class TThis, class TContext> @@ -420,6 +398,11 @@ public: return TVirtualFieldSaveRegistrar(std::move(*this)); } + auto BeforeVersion(auto /*version*/) && + { + return TVirtualFieldSaveRegistrar(std::move(*this)); + } + auto InVersions(auto /*filter*/) && { return TVirtualFieldSaveRegistrar(std::move(*this)); @@ -504,26 +487,11 @@ public: { } template <class TBase> - requires SupportsPhoenix2<TBase> - void BaseType() - { - This_->TBase::LoadImpl(Context_); - } - - template <class TBase> - requires (!SupportsPhoenix2<TBase> && !SupportsPersist<TBase, TContext>) void BaseType() { This_->TBase::Load(Context_); } - template <class TBase> - requires (!SupportsPhoenix2<TBase> && SupportsPersist<TBase, TContext>) - void BaseType() - { - This_->TBase::Persist(Context_); - } - private: TThis* const This_; TContext& Context_; @@ -548,6 +516,7 @@ public: , Context_(other.Context_) , Name_(other.Name_) , MinVersion_(other.MinVersion_) + , BeforeVersion_(other.BeforeVersion_) , VersionFilter_(other.VersionFilter_) , MissingHandler_(other.MissingHandler_) { } @@ -560,6 +529,12 @@ public: return TFieldLoadRegistrar(std::move(*this)); } + auto BeforeVersion(TVersion version) && + { + BeforeVersion_ = version; + return TFieldLoadRegistrar(std::move(*this)); + } + auto WhenMissing(TFieldMissingHandler<TThis, TContext> handler) && { MissingHandler_ = handler; @@ -581,7 +556,7 @@ public: void operator()() && { - if (auto version = Context_.GetVersion(); version >= MinVersion_ && (!VersionFilter_ || VersionFilter_(version))) { + if (auto version = Context_.GetVersion(); version >= MinVersion_ && version < BeforeVersion_ && (!VersionFilter_ || VersionFilter_(version))) { Context_.Dumper().SetFieldName(Name_); TFieldSerializer::Load(Context_, This_->*Member); } else if (MissingHandler_) { @@ -600,6 +575,7 @@ private: const TStringBuf Name_; TVersion MinVersion_ = static_cast<TVersion>(std::numeric_limits<int>::min()); + TVersion BeforeVersion_ = static_cast<TVersion>(std::numeric_limits<int>::max()); TVersionFilter<TThis> VersionFilter_ = nullptr; TFieldMissingHandler<TThis, TContext> MissingHandler_ = nullptr; }; @@ -623,8 +599,9 @@ public: : This_(other.This_) , Context_(other.Context_) , Name_(other.Name_) - , LoadHandler_(other.LoadHandler) + , LoadHandler_(other.LoadHandler_) , MinVersion_(other.MinVersion_) + , BeforeVersion_(other.BeforeVersion_) , VersionFilter_(other.VersionFilter_) , MissingHandler_(other.MissingHandler_) { } @@ -634,7 +611,13 @@ public: auto SinceVersion(TVersion version) && { MinVersion_ = version; - return TFieldLoadRegistrar(std::move(*this)); + return TVirtualFieldLoadRegistrar(std::move(*this)); + } + + auto BeforeVersion(TVersion version) && + { + BeforeVersion_ = version; + return TVirtualFieldLoadRegistrar(std::move(*this)); } auto WhenMissing(TFieldMissingHandler<TThis, TContext> handler) && @@ -654,7 +637,7 @@ public: void operator()() && { - if (auto version = Context_.GetVersion(); version >= MinVersion_ && (!VersionFilter_ || VersionFilter_(version))) { + if (auto version = Context_.GetVersion(); version >= MinVersion_ && version < BeforeVersion_ && (!VersionFilter_ || VersionFilter_(version))) { Context_.Dumper().SetFieldName(Name_); LoadHandler_(This_, Context_); } else if (MissingHandler_) { @@ -669,6 +652,7 @@ private: const TFieldLoadHandler<TThis, TContext> LoadHandler_; TVersion MinVersion_ = static_cast<TVersion>(std::numeric_limits<int>::min()); + TVersion BeforeVersion_ = static_cast<TVersion>(std::numeric_limits<int>::max()); TVersionFilter VersionFilter_ = nullptr; TFieldMissingHandler<TThis, TContext> MissingHandler_ = nullptr; }; @@ -789,6 +773,11 @@ public: return std::move(*this); } + auto BeforeVersion(auto /*version*/) && + { + return std::move(*this); + } + auto InVersions(auto /*filter*/) && { return std::move(*this); @@ -835,6 +824,11 @@ public: return *this; } + auto BeforeVersion(auto /*version*/) && + { + return *this; + } + auto InVersions(auto /*filter*/) && { return *this; diff --git a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp index ac4ad2163da..165c157be20 100644 --- a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp +++ b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp @@ -25,11 +25,11 @@ using NYT::Load; //////////////////////////////////////////////////////////////////////////////// template <class T> -TString Serialize(const T& value) +TString Serialize(const T& value, int version = 0) { TString buffer; TStringOutput output(buffer); - TSaveContext context(&output); + TSaveContext context(&output, version); Save(context, value); context.Finish(); return buffer; @@ -312,6 +312,71 @@ TEST(TPhoenixTest, SinceVersionNew) //////////////////////////////////////////////////////////////////////////////// +namespace NBeforeVersion { + +struct S +{ + int A; + int B; + int C; + + bool operator==(const S&) const = default; + + PHOENIX_DECLARE_TYPE(S, 0xc8da1575); +}; + +void S::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, A)(); + PHOENIX_REGISTER_FIELD(2, B) + .BeforeVersion(100)(); + PHOENIX_REGISTER_FIELD(3, C) + .BeforeVersion(200) + .WhenMissing([] (TThis* this_, auto& /*context*/) { + this_->C = 777; + })(); +} + +PHOENIX_DEFINE_TYPE(S); + +} // namespace NBeforeVersion + +TEST(TPhoenixTest, BeforeVersionOld) +{ + using namespace NBeforeVersion; + + S s1; + s1.A = 123; + s1.B = 456; + s1.C = 321; + + auto buffer = Serialize(s1); + ASSERT_EQ(buffer.size(), sizeof(s1)); + + auto s2 = Deserialize<S>(buffer); + EXPECT_EQ(s1, s2); +} + +TEST(TPhoenixTest, BeforeVersionNew) +{ + using namespace NBeforeVersion; + + S s1; + s1.A = 123; + s1.B = 0; + s1.C = 777; + + int version = 200; + + auto buffer = Serialize(s1, version); + ASSERT_EQ(buffer.size(), sizeof(s1.A)); + + auto s2 = Deserialize<S>(buffer, version); + EXPECT_EQ(s1, s2); +} + +//////////////////////////////////////////////////////////////////////////////// + namespace NInVersions { struct S diff --git a/yt/yt/core/profiling/unittests/timer_ut.cpp b/yt/yt/core/profiling/unittests/timer_ut.cpp index 6c7720288c4..c1cce5c44d2 100644 --- a/yt/yt/core/profiling/unittests/timer_ut.cpp +++ b/yt/yt/core/profiling/unittests/timer_ut.cpp @@ -6,7 +6,8 @@ #include <yt/yt/core/misc/lazy_ptr.h> #include <yt/yt/core/misc/blob_output.h> -#include <yt/yt/core/misc/phoenix.h> + +#include <yt/yt/core/phoenix/context.h> #include <util/generic/cast.h> @@ -14,7 +15,7 @@ namespace NYT::NProfiling { namespace { using namespace NConcurrency; -using namespace NPhoenix; +using namespace NPhoenix2; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 5e99e1baf39..2ddb306d6b9 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -141,6 +141,10 @@ void TRetryingChannelConfig::Register(TRegistrar registrar) registrar.Parameter("retry_attempts", &TThis::RetryAttempts) .GreaterThanOrEqual(1) .Default(10); + registrar.Parameter("enable_exponential_retry_backoffs", &TThis::EnableExponentialRetryBackoffs) + .Default(false); + registrar.Parameter("retry_backoff", &TThis::RetryBackoff) + .Default(); registrar.Parameter("retry_timeout", &TThis::RetryTimeout) .GreaterThanOrEqual(TDuration::Zero()) .Default(); diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 92f9de08c60..2d7371bd911 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -8,6 +8,8 @@ #include <yt/yt/core/concurrency/config.h> +#include <yt/yt/core/misc/backoff_strategy.h> + #include <library/cpp/yt/misc/enum.h> #include <vector> @@ -182,6 +184,12 @@ public: //! Maximum number of retry attempts to make. int RetryAttempts; + // COMPAT(danilalexeev): YT-23734. + bool EnableExponentialRetryBackoffs; + + //! Retry backoff policy. + TExponentialBackoffOptions RetryBackoff; + //! Maximum time to spend while retrying. //! If null then no limit is enforced. std::optional<TDuration> RetryTimeout; diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index f3d325a3455..2ec1020b86b 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -197,7 +197,7 @@ YT_DEFINE_ERROR_ENUM( ((SslError) (static_cast<int>(NBus::EErrorCode::SslError))) ((RequestMemoryPressure) (120)) // There is no enough memory to handle RPC request. ((GlobalDiscoveryError) (121)) // Single peer discovery interrupts discovery session. - ((ResponseMemoryPressure) (122)) // There is no enouth memory to handle RPC response. + ((ResponseMemoryPressure) (122)) // There is no enough memory to handle RPC response. ); DEFINE_ENUM(EMessageFormat, diff --git a/yt/yt/core/rpc/retrying_channel.cpp b/yt/yt/core/rpc/retrying_channel.cpp index 2de3963d44d..85a56da1c6c 100644 --- a/yt/yt/core/rpc/retrying_channel.cpp +++ b/yt/yt/core/rpc/retrying_channel.cpp @@ -85,6 +85,7 @@ private: , ResponseHandler_(std::move(responseHandler)) , Options_(options) , RetryChecker_(std::move(retryChecker)) + , BackoffStrategy_(Config_->RetryBackoff) { YT_ASSERT(Config_); YT_ASSERT(UnderlyingChannel_); @@ -190,6 +191,7 @@ private: const TCallback<bool(const TError&)> RetryChecker_; const TRetryingRequestControlThunkPtr RequestControlThunk_ = New<TRetryingRequestControlThunk>(); + TBackoffStrategy BackoffStrategy_; //! The current attempt number (1-based). int CurrentAttempt_ = 1; TInstant Deadline_; @@ -210,10 +212,19 @@ private: void HandleError(TError error) override { - YT_LOG_DEBUG(error, "Request attempt failed (RequestId: %v, Attempt: %v of %v)", + YT_LOG_DEBUG(error, "Request attempt failed (RequestId: %v, Attempt: %v)", Request_->GetRequestId(), - CurrentAttempt_, - Config_->RetryAttempts); + MakeFormatterWrapper([&] (auto* builder) { + if (Config_->EnableExponentialRetryBackoffs) { + builder->AppendFormat("%v of %v", + BackoffStrategy_.GetInvocationIndex() + 1, + BackoffStrategy_.GetInvocationCount()); + } else { + builder->AppendFormat("%v of %v", + CurrentAttempt_, + Config_->RetryAttempts); + } + })); if (!RetryChecker_.Run(error)) { ResponseHandler_->HandleError(std::move(error)); @@ -276,15 +287,25 @@ private: void Retry() { - int count = ++CurrentAttempt_; - if (count > Config_->RetryAttempts || TInstant::Now() + Config_->RetryBackoffTime > Deadline_) { + auto retryAttemptsExhausted = false; + auto backoffTime = TDuration::Zero(); + if (Config_->EnableExponentialRetryBackoffs) { + retryAttemptsExhausted = !BackoffStrategy_.Next(); + backoffTime = BackoffStrategy_.GetBackoff(); + } else { + auto count = ++CurrentAttempt_; + retryAttemptsExhausted = count > Config_->RetryAttempts; + backoffTime = Config_->RetryBackoffTime; + } + + if (retryAttemptsExhausted || TInstant::Now() + backoffTime > Deadline_) { ReportError(TError(NRpc::EErrorCode::Unavailable, "Request retries failed")); return; } TDelayedExecutor::Submit( BIND(&TRetryingRequest::DoRetry, MakeStrong(this)), - Config_->RetryBackoffTime, + backoffTime, TDispatcher::Get()->GetHeavyInvoker()); } @@ -305,7 +326,7 @@ private: void DoSend() { - YT_LOG_DEBUG("Request attempt started (RequestId: %v, Method: %v.%v, %v%vAttempt: %v of %v, RequestTimeout: %v, RetryTimeout: %v)", + YT_LOG_DEBUG("Request attempt started (RequestId: %v, Method: %v.%v, %v%vAttempt: %v, RequestTimeout: %v, RetryTimeout: %v)", Request_->GetRequestId(), Request_->GetService(), Request_->GetMethod(), @@ -319,8 +340,17 @@ private: builder->AppendFormat("UserTag: %v, ", Request_->GetUserTag()); } }), - CurrentAttempt_, - Config_->RetryAttempts, + MakeFormatterWrapper([&] (auto* builder) { + if (Config_->EnableExponentialRetryBackoffs) { + builder->AppendFormat("%v of %v", + BackoffStrategy_.GetInvocationIndex() + 1, + BackoffStrategy_.GetInvocationCount()); + } else { + builder->AppendFormat("%v of %v", + CurrentAttempt_, + Config_->RetryAttempts); + } + }), Options_.Timeout, Config_->RetryTimeout); diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index 7b64dd6da94..da19d392a61 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -77,7 +77,7 @@ TYPED_TEST(TRpcTest, RetryingSend) { auto config = New<TRetryingChannelConfig>(); config->Load(ConvertTo<INodePtr>(TYsonString(TStringBuf( - "{retry_backoff_time=10}")))); + "{enable_exponential_retry_backoffs=true;retry_backoff={min_backoff=10}}")))); IChannelPtr channel = CreateRetryingChannel( std::move(config), @@ -129,7 +129,7 @@ TYPED_TEST(TNotUdsTest, Address) { auto config = New<TRetryingChannelConfig>(); config->Load(ConvertTo<INodePtr>(TYsonString(TStringBuf( - "{retry_backoff_time=10}")))); + "{enable_exponential_retry_backoffs=true;retry_backoff={min_backoff=10}}")))); testChannel(CreateRetryingChannel( std::move(config), this->CreateChannel())); diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 04c85cab3a7..e6f18689608 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -143,7 +143,6 @@ SRCS( misc/relaxed_mpsc_queue.cpp misc/parser_helpers.cpp misc/pattern_formatter.cpp - misc/phoenix.cpp misc/pool_allocator.cpp misc/proc.cpp misc/process_exit_profiler.cpp diff --git a/yt/yt_proto/yt/client/bundle_controller/proto/bundle_controller_service.proto b/yt/yt_proto/yt/client/bundle_controller/proto/bundle_controller_service.proto index 69e1894927b..dd3de268ecd 100644 --- a/yt/yt_proto/yt/client/bundle_controller/proto/bundle_controller_service.proto +++ b/yt/yt_proto/yt/client/bundle_controller/proto/bundle_controller_service.proto @@ -28,7 +28,7 @@ message TMemoryLimits message TInstanceResources { optional int64 memory = 1; - optional int64 net = 2; + optional int64 net_bytes = 2; optional string type = 3; optional int32 vcpu = 4; } diff --git a/yt/yt_proto/yt/formats/ya.make b/yt/yt_proto/yt/formats/ya.make index b73e73ba1c1..3dee708ae2d 100644 --- a/yt/yt_proto/yt/formats/ya.make +++ b/yt/yt_proto/yt/formats/ya.make @@ -2,9 +2,7 @@ PROTO_LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/gradle.inc) -IF (NOT PY_PROTOS_FOR) - INCLUDE_TAGS(GO_PROTO) -ENDIF() +INCLUDE_TAGS(GO_PROTO) SRCS( extension.proto |