diff options
author | kniv <kniv@yandex-team.ru> | 2022-02-10 16:52:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:17 +0300 |
commit | e5a19414806c5ec712d5141bb00d350bcac24801 (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | 053189977c4c0e171ef326fa889cd19e3f6c16cb (diff) | |
download | ydb-e5a19414806c5ec712d5141bb00d350bcac24801.tar.gz |
Restoring authorship annotation for <kniv@yandex-team.ru>. Commit 2 of 2.
-rw-r--r-- | ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h | 10 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/url_base/lib/url_query.cpp | 438 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/url_base/lib/url_query.h | 238 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/url_base/lib/ya.make | 4 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/url_base/ya.make | 2 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/http_sender.h | 82 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/http_sender_actor.cpp | 162 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/http_sender_actor.h | 14 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/ya.make | 4 |
10 files changed, 479 insertions, 479 deletions
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 3c5507a6859..fc07635176f 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -101,7 +101,7 @@ public: } STRICT_STFUNC(StateFunc, - hFunc(TEvHttpBase::TEvSendResult, Handle); + hFunc(TEvHttpBase::TEvSendResult, Handle); ) public: @@ -205,7 +205,7 @@ private: size_t InflightRequestsHistoryMin = 0; }; - void Handle(TEvHttpBase::TEvSendResult::TPtr& ev) { + void Handle(TEvHttpBase::TEvSendResult::TPtr& ev) { const auto* res = ev->Get(); const TString& error = res->HttpIncomingResponse->Get()->GetError(); diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h index 7cdd31ea847..a7d9fedd437 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h @@ -1,7 +1,7 @@ #pragma once #include "url_parse.h" -#include "url_query.h" +#include "url_query.h" #include <ydb/library/yql/public/udf/udf_helpers.h> @@ -341,8 +341,8 @@ namespace { TForceHostNameToPunycode, \ TPunycodeToHostName, \ TForcePunycodeToHostName, \ - TCanBePunycodeHostName, \ - TQueryStringToList, \ - TQueryStringToDict, \ - TBuildQueryString + TCanBePunycodeHostName, \ + TQueryStringToList, \ + TQueryStringToDict, \ + TBuildQueryString } diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_query.cpp b/ydb/library/yql/udfs/common/url_base/lib/url_query.cpp index a4945350dc3..188f0b0ae5f 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_query.cpp +++ b/ydb/library/yql/udfs/common/url_base/lib/url_query.cpp @@ -1,220 +1,220 @@ -#include "url_query.h" - +#include "url_query.h" + #include <ydb/library/yql/public/udf/udf_type_printer.h> - -#include <util/string/split.h> - -#include <library/cpp/string_utils/quote/quote.h> - -namespace NUrlUdf { - void TQueryStringParse::MakeSignature(IFunctionTypeInfoBuilder& builder, TType* retType) { - builder.Returns(retType).OptionalArgs(4); - auto args = builder.Args(); - args->Add<TAutoMap<TQueryStr>>(); - args->Add<TKeepBlankValuesNArg>(); - args->Add<TStrictNArg>(); - args->Add<TMaxFieldsNArg>(); - args->Add<TSeparatorNArg>().Done(); - } - - std::vector<std::pair<TString, TString>> - TQueryStringParse::RunImpl(const TUnboxedValuePod* args) const { - const std::string_view query(args[0].AsStringRef()); - if (query.empty()) - return {}; - const bool keepBlankValues = args[1].GetOrDefault(false); - const bool strict = args[2].GetOrDefault(true); - const ui32 maxFieldCnt = args[3].GetOrDefault(Max<ui32>()); - const std::string_view sep(args[4] ? args[4].AsStringRef() : "&"); - - std::vector<TStringBuf> parts; - StringSplitter(query).SplitByString(sep).Collect(&parts); - if (parts.size() > maxFieldCnt) { - UdfTerminate((TStringBuilder() << Pos_ << "Max number of fields (" << maxFieldCnt - << ") exceeded: got " << parts.size()).data()); - } - - std::vector<std::pair<TString, TString>> pairs; - for (const TStringBuf& part: parts) { - if (part.empty() && !strict) { - continue; - } - TVector<TString> nvPair = StringSplitter(part).Split('=').Limit(2); - if (nvPair.size() != 2) { - if (strict) { - UdfTerminate((TStringBuilder() << Pos_ << "Bad query field: \"" - << nvPair[0] << "\"").data()); - } - if (keepBlankValues) { - nvPair.emplace_back(""); - } else { - continue; - } - } - if (!nvPair[1].empty() || keepBlankValues) { - CGIUnescape(nvPair[0]); - CGIUnescape(nvPair[1]); - pairs.emplace_back(nvPair[0], nvPair[1]); - } - } - return pairs; - } - - bool TQueryStringToList::DeclareSignature(const TStringRef& name, - TType*, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - if (Name() == name) { - MakeSignature(builder, GetListType(builder)); - if (!typesOnly) { - builder.Implementation(new TQueryStringToList(builder.GetSourcePosition())); - } - return true; - } - return false; - } - - TUnboxedValue TQueryStringToList::Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const { - const auto pairs = RunImpl(args); - std::vector<TUnboxedValue> ret; - for (const auto nvPair : pairs) { - TUnboxedValue* pair = nullptr; - auto item = valueBuilder->NewArray(2U, pair); - pair[0] = valueBuilder->NewString(nvPair.first); - pair[1] = valueBuilder->NewString(nvPair.second); - ret.push_back(item); - } - return valueBuilder->NewList(ret.data(), ret.size()); - } - - bool TQueryStringToDict::DeclareSignature(const TStringRef& name, - TType*, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - if (Name() == name) { - auto dictType = GetDictType(builder); - MakeSignature(builder, dictType); - if (!typesOnly) { - builder.Implementation(new TQueryStringToDict(dictType, - builder.GetSourcePosition())); - } - return true; - } - return false; - } - - TUnboxedValue TQueryStringToDict::Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const { - const auto pairs = RunImpl(args); - auto ret = valueBuilder->NewDict(DictType_, TDictFlags::Hashed | TDictFlags::Multi); - for (const auto nvPair : pairs) { - ret->Add(valueBuilder->NewString(nvPair.first), - valueBuilder->NewString(nvPair.second)); - } - return ret->Build(); - } - - TUnboxedValue TBuildQueryString::Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const { - const std::string_view sep(args[1] ? args[1].AsStringRef() : "&"); - TStringBuilder ret; - - switch(FirstArgTypeId_) { - case EFirstArgTypeId::Dict: { - TUnboxedValue key, value; - const auto dictIt = args[0].GetDictIterator(); - ui64 wasItem = 0; - while (dictIt.NextPair(key, value)) { - TString keyEscaped = CGIEscapeRet(key.AsStringRef()); - const auto listIt = value.GetListIterator(); - TUnboxedValue item; - while (listIt.Next(item)) { - if (wasItem++) - ret << sep; - ret << keyEscaped << '=' << CGIEscapeRet(item.AsStringRef()); - } - } - break; - } - case EFirstArgTypeId::FlattenDict: { - TUnboxedValue key, value; - const auto dictIt = args[0].GetDictIterator(); - ui64 wasKey = 0; - while (dictIt.NextPair(key, value)) { - if (wasKey++) - ret << sep; - ret << CGIEscapeRet(key.AsStringRef()) << '=' << CGIEscapeRet(value.AsStringRef()); - } - break; - } - case EFirstArgTypeId::List: { - ui64 wasItem = 0; - TUnboxedValue item; - const auto listIt = args[0].GetListIterator(); - while (listIt.Next(item)) { - if (wasItem++) - ret << sep; - TUnboxedValue key = item.GetElement(0), val = item.GetElement(1); - ret << CGIEscapeRet(key.AsStringRef()) << '=' << CGIEscapeRet(val.AsStringRef()); - } - break; - } - default: - Y_FAIL("Current first parameter type is not yet implemented"); - } - return valueBuilder->NewString(ret); - } - - bool TBuildQueryString::DeclareSignature(const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly) { - if (Name() == name) { - if (!userType) { - builder.SetError("Missing user type"); - return true; - } - builder.UserType(userType); - const auto typeHelper = builder.TypeInfoHelper(); - const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); - if (!userTypeInspector || !userTypeInspector.GetElementsCount()) { - builder.SetError("User type is not tuple"); - return true; - } - const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, - userTypeInspector.GetElementType(0)); - if (!argsTypeInspector || !argsTypeInspector.GetElementsCount()) { - builder.SetError("Please provide at least one argument"); - return true; - } - const auto firstArgType = argsTypeInspector.GetElementType(0); - EFirstArgTypeId firstArgTypeId = EFirstArgTypeId::None; - - if (typeHelper->IsSameType(GetDictType(builder), firstArgType)) { - firstArgTypeId = EFirstArgTypeId::Dict; - } else if (typeHelper->IsSameType(GetListType(builder), firstArgType)) { - firstArgTypeId = EFirstArgTypeId::List; - } else if (typeHelper->IsSameType(GetFlattenDictType(builder), firstArgType)) { - firstArgTypeId = EFirstArgTypeId::FlattenDict; - } - if (firstArgTypeId != EFirstArgTypeId::None) { - builder.Returns<TQueryStr>().OptionalArgs(1); - auto args = builder.Args(); - args->Add(firstArgType).Flags(ICallablePayload::TArgumentFlags::AutoMap); - args->Add<TSeparatorNArg>().Done(); - if (!typesOnly) { - builder.Implementation(new TBuildQueryString(builder.GetSourcePosition(), - firstArgTypeId)); - } - } else { - TStringBuilder sb; - sb << "Unsupported first argument type: "; - TTypePrinter(*typeHelper, firstArgType).Out(sb.Out); - builder.SetError(sb); - } - return true; - } - return false; - } -} + +#include <util/string/split.h> + +#include <library/cpp/string_utils/quote/quote.h> + +namespace NUrlUdf { + void TQueryStringParse::MakeSignature(IFunctionTypeInfoBuilder& builder, TType* retType) { + builder.Returns(retType).OptionalArgs(4); + auto args = builder.Args(); + args->Add<TAutoMap<TQueryStr>>(); + args->Add<TKeepBlankValuesNArg>(); + args->Add<TStrictNArg>(); + args->Add<TMaxFieldsNArg>(); + args->Add<TSeparatorNArg>().Done(); + } + + std::vector<std::pair<TString, TString>> + TQueryStringParse::RunImpl(const TUnboxedValuePod* args) const { + const std::string_view query(args[0].AsStringRef()); + if (query.empty()) + return {}; + const bool keepBlankValues = args[1].GetOrDefault(false); + const bool strict = args[2].GetOrDefault(true); + const ui32 maxFieldCnt = args[3].GetOrDefault(Max<ui32>()); + const std::string_view sep(args[4] ? args[4].AsStringRef() : "&"); + + std::vector<TStringBuf> parts; + StringSplitter(query).SplitByString(sep).Collect(&parts); + if (parts.size() > maxFieldCnt) { + UdfTerminate((TStringBuilder() << Pos_ << "Max number of fields (" << maxFieldCnt + << ") exceeded: got " << parts.size()).data()); + } + + std::vector<std::pair<TString, TString>> pairs; + for (const TStringBuf& part: parts) { + if (part.empty() && !strict) { + continue; + } + TVector<TString> nvPair = StringSplitter(part).Split('=').Limit(2); + if (nvPair.size() != 2) { + if (strict) { + UdfTerminate((TStringBuilder() << Pos_ << "Bad query field: \"" + << nvPair[0] << "\"").data()); + } + if (keepBlankValues) { + nvPair.emplace_back(""); + } else { + continue; + } + } + if (!nvPair[1].empty() || keepBlankValues) { + CGIUnescape(nvPair[0]); + CGIUnescape(nvPair[1]); + pairs.emplace_back(nvPair[0], nvPair[1]); + } + } + return pairs; + } + + bool TQueryStringToList::DeclareSignature(const TStringRef& name, + TType*, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + if (Name() == name) { + MakeSignature(builder, GetListType(builder)); + if (!typesOnly) { + builder.Implementation(new TQueryStringToList(builder.GetSourcePosition())); + } + return true; + } + return false; + } + + TUnboxedValue TQueryStringToList::Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const { + const auto pairs = RunImpl(args); + std::vector<TUnboxedValue> ret; + for (const auto nvPair : pairs) { + TUnboxedValue* pair = nullptr; + auto item = valueBuilder->NewArray(2U, pair); + pair[0] = valueBuilder->NewString(nvPair.first); + pair[1] = valueBuilder->NewString(nvPair.second); + ret.push_back(item); + } + return valueBuilder->NewList(ret.data(), ret.size()); + } + + bool TQueryStringToDict::DeclareSignature(const TStringRef& name, + TType*, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + if (Name() == name) { + auto dictType = GetDictType(builder); + MakeSignature(builder, dictType); + if (!typesOnly) { + builder.Implementation(new TQueryStringToDict(dictType, + builder.GetSourcePosition())); + } + return true; + } + return false; + } + + TUnboxedValue TQueryStringToDict::Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const { + const auto pairs = RunImpl(args); + auto ret = valueBuilder->NewDict(DictType_, TDictFlags::Hashed | TDictFlags::Multi); + for (const auto nvPair : pairs) { + ret->Add(valueBuilder->NewString(nvPair.first), + valueBuilder->NewString(nvPair.second)); + } + return ret->Build(); + } + + TUnboxedValue TBuildQueryString::Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const { + const std::string_view sep(args[1] ? args[1].AsStringRef() : "&"); + TStringBuilder ret; + + switch(FirstArgTypeId_) { + case EFirstArgTypeId::Dict: { + TUnboxedValue key, value; + const auto dictIt = args[0].GetDictIterator(); + ui64 wasItem = 0; + while (dictIt.NextPair(key, value)) { + TString keyEscaped = CGIEscapeRet(key.AsStringRef()); + const auto listIt = value.GetListIterator(); + TUnboxedValue item; + while (listIt.Next(item)) { + if (wasItem++) + ret << sep; + ret << keyEscaped << '=' << CGIEscapeRet(item.AsStringRef()); + } + } + break; + } + case EFirstArgTypeId::FlattenDict: { + TUnboxedValue key, value; + const auto dictIt = args[0].GetDictIterator(); + ui64 wasKey = 0; + while (dictIt.NextPair(key, value)) { + if (wasKey++) + ret << sep; + ret << CGIEscapeRet(key.AsStringRef()) << '=' << CGIEscapeRet(value.AsStringRef()); + } + break; + } + case EFirstArgTypeId::List: { + ui64 wasItem = 0; + TUnboxedValue item; + const auto listIt = args[0].GetListIterator(); + while (listIt.Next(item)) { + if (wasItem++) + ret << sep; + TUnboxedValue key = item.GetElement(0), val = item.GetElement(1); + ret << CGIEscapeRet(key.AsStringRef()) << '=' << CGIEscapeRet(val.AsStringRef()); + } + break; + } + default: + Y_FAIL("Current first parameter type is not yet implemented"); + } + return valueBuilder->NewString(ret); + } + + bool TBuildQueryString::DeclareSignature(const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly) { + if (Name() == name) { + if (!userType) { + builder.SetError("Missing user type"); + return true; + } + builder.UserType(userType); + const auto typeHelper = builder.TypeInfoHelper(); + const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); + if (!userTypeInspector || !userTypeInspector.GetElementsCount()) { + builder.SetError("User type is not tuple"); + return true; + } + const auto argsTypeInspector = TTupleTypeInspector(*typeHelper, + userTypeInspector.GetElementType(0)); + if (!argsTypeInspector || !argsTypeInspector.GetElementsCount()) { + builder.SetError("Please provide at least one argument"); + return true; + } + const auto firstArgType = argsTypeInspector.GetElementType(0); + EFirstArgTypeId firstArgTypeId = EFirstArgTypeId::None; + + if (typeHelper->IsSameType(GetDictType(builder), firstArgType)) { + firstArgTypeId = EFirstArgTypeId::Dict; + } else if (typeHelper->IsSameType(GetListType(builder), firstArgType)) { + firstArgTypeId = EFirstArgTypeId::List; + } else if (typeHelper->IsSameType(GetFlattenDictType(builder), firstArgType)) { + firstArgTypeId = EFirstArgTypeId::FlattenDict; + } + if (firstArgTypeId != EFirstArgTypeId::None) { + builder.Returns<TQueryStr>().OptionalArgs(1); + auto args = builder.Args(); + args->Add(firstArgType).Flags(ICallablePayload::TArgumentFlags::AutoMap); + args->Add<TSeparatorNArg>().Done(); + if (!typesOnly) { + builder.Implementation(new TBuildQueryString(builder.GetSourcePosition(), + firstArgTypeId)); + } + } else { + TStringBuilder sb; + sb << "Unsupported first argument type: "; + TTypePrinter(*typeHelper, firstArgType).Out(sb.Out); + builder.SetError(sb); + } + return true; + } + return false; + } +} diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_query.h b/ydb/library/yql/udfs/common/url_base/lib/url_query.h index 75e1bc57bc0..fe64ca7db8b 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_query.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_query.h @@ -1,120 +1,120 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/public/udf/udf_helpers.h> - -namespace NUrlUdf { - using namespace NYql::NUdf; - - struct TQueryStringConv : public TBoxedValue { - protected: - static constexpr char Separator[] = "Separator"; - - using TQueryStr = char*; - using TSeparatorNArg = TNamedArg<TQueryStr, Separator>; - - static inline TType* GetListType(const IFunctionTypeInfoBuilder& builder) { - auto tupleType = builder.Tuple()->Add<TQueryStr>().Add<TQueryStr>().Build(); - return builder.List()->Item(tupleType).Build(); - } - - static inline TType* GetDictType(const IFunctionTypeInfoBuilder& builder) { - auto listType = builder.List()->Item<TQueryStr>().Build(); - return builder.Dict()->Key<TQueryStr>().Value(listType).Build(); - } - - static inline TType* GetFlattenDictType(const IFunctionTypeInfoBuilder& builder) { - return builder.Dict()->Key<TQueryStr>().Value<TQueryStr>().Build(); - } - }; - - struct TQueryStringParse: public TQueryStringConv { - explicit TQueryStringParse(TSourcePosition&& pos) : Pos_(std::move(pos)) {} - - protected: - static constexpr char KeepBlankValues[] = "KeepBlankValues"; - static constexpr char Strict[] = "Strict"; - static constexpr char MaxFields[] = "MaxFields"; - - using TKeepBlankValuesNArg = TNamedArg<bool, KeepBlankValues>; - using TStrictNArg = TNamedArg<bool, Strict>; - using TMaxFieldsNArg = TNamedArg<ui32, MaxFields>; - - static void MakeSignature(IFunctionTypeInfoBuilder& builder, TType* retType); - - std::vector<std::pair<TString, TString>> - RunImpl(const TUnboxedValuePod* args) const; - - private: - TSourcePosition Pos_; - }; - - struct TQueryStringToList : public TQueryStringParse { - explicit TQueryStringToList(TSourcePosition&& pos) - : TQueryStringParse(std::forward<TSourcePosition>(pos)) {} - - static const TStringRef& Name() { - static const auto name = TStringRef::Of("QueryStringToList"); - return name; - } - - static bool DeclareSignature(const TStringRef& name, - TType*, - IFunctionTypeInfoBuilder& builder, - bool typesOnly); - - TUnboxedValue Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override; - }; - - struct TQueryStringToDict : public TQueryStringParse { - explicit TQueryStringToDict(TType* dictType, TSourcePosition&& pos) - : TQueryStringParse(std::move(pos)) - , DictType_(dictType) - {} - - static const TStringRef& Name() { - static const auto name = TStringRef::Of("QueryStringToDict"); - return name; - } - - static bool DeclareSignature(const TStringRef& name, - TType*, - IFunctionTypeInfoBuilder& builder, - bool typesOnly); - - TUnboxedValue Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override; - - private: - TType* DictType_; - }; - - class TBuildQueryString : public TQueryStringConv { - TSourcePosition Pos_; - enum class EFirstArgTypeId { - None, - Dict, - FlattenDict, - List, - } FirstArgTypeId_; - - public: - explicit TBuildQueryString(TSourcePosition&& pos, EFirstArgTypeId firstArgTypeId) - : Pos_(std::move(pos)) - , FirstArgTypeId_(firstArgTypeId) - {} - - static const TStringRef& Name() { - static const auto name = TStringRef::Of("BuildQueryString"); - return name; - } - - TUnboxedValue Run(const IValueBuilder* valueBuilder, - const TUnboxedValuePod* args) const override; - - static bool DeclareSignature(const TStringRef& name, - TType* userType, - IFunctionTypeInfoBuilder& builder, - bool typesOnly); - }; -} + +namespace NUrlUdf { + using namespace NYql::NUdf; + + struct TQueryStringConv : public TBoxedValue { + protected: + static constexpr char Separator[] = "Separator"; + + using TQueryStr = char*; + using TSeparatorNArg = TNamedArg<TQueryStr, Separator>; + + static inline TType* GetListType(const IFunctionTypeInfoBuilder& builder) { + auto tupleType = builder.Tuple()->Add<TQueryStr>().Add<TQueryStr>().Build(); + return builder.List()->Item(tupleType).Build(); + } + + static inline TType* GetDictType(const IFunctionTypeInfoBuilder& builder) { + auto listType = builder.List()->Item<TQueryStr>().Build(); + return builder.Dict()->Key<TQueryStr>().Value(listType).Build(); + } + + static inline TType* GetFlattenDictType(const IFunctionTypeInfoBuilder& builder) { + return builder.Dict()->Key<TQueryStr>().Value<TQueryStr>().Build(); + } + }; + + struct TQueryStringParse: public TQueryStringConv { + explicit TQueryStringParse(TSourcePosition&& pos) : Pos_(std::move(pos)) {} + + protected: + static constexpr char KeepBlankValues[] = "KeepBlankValues"; + static constexpr char Strict[] = "Strict"; + static constexpr char MaxFields[] = "MaxFields"; + + using TKeepBlankValuesNArg = TNamedArg<bool, KeepBlankValues>; + using TStrictNArg = TNamedArg<bool, Strict>; + using TMaxFieldsNArg = TNamedArg<ui32, MaxFields>; + + static void MakeSignature(IFunctionTypeInfoBuilder& builder, TType* retType); + + std::vector<std::pair<TString, TString>> + RunImpl(const TUnboxedValuePod* args) const; + + private: + TSourcePosition Pos_; + }; + + struct TQueryStringToList : public TQueryStringParse { + explicit TQueryStringToList(TSourcePosition&& pos) + : TQueryStringParse(std::forward<TSourcePosition>(pos)) {} + + static const TStringRef& Name() { + static const auto name = TStringRef::Of("QueryStringToList"); + return name; + } + + static bool DeclareSignature(const TStringRef& name, + TType*, + IFunctionTypeInfoBuilder& builder, + bool typesOnly); + + TUnboxedValue Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override; + }; + + struct TQueryStringToDict : public TQueryStringParse { + explicit TQueryStringToDict(TType* dictType, TSourcePosition&& pos) + : TQueryStringParse(std::move(pos)) + , DictType_(dictType) + {} + + static const TStringRef& Name() { + static const auto name = TStringRef::Of("QueryStringToDict"); + return name; + } + + static bool DeclareSignature(const TStringRef& name, + TType*, + IFunctionTypeInfoBuilder& builder, + bool typesOnly); + + TUnboxedValue Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override; + + private: + TType* DictType_; + }; + + class TBuildQueryString : public TQueryStringConv { + TSourcePosition Pos_; + enum class EFirstArgTypeId { + None, + Dict, + FlattenDict, + List, + } FirstArgTypeId_; + + public: + explicit TBuildQueryString(TSourcePosition&& pos, EFirstArgTypeId firstArgTypeId) + : Pos_(std::move(pos)) + , FirstArgTypeId_(firstArgTypeId) + {} + + static const TStringRef& Name() { + static const auto name = TStringRef::Of("BuildQueryString"); + return name; + } + + TUnboxedValue Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override; + + static bool DeclareSignature(const TStringRef& name, + TType* userType, + IFunctionTypeInfoBuilder& builder, + bool typesOnly); + }; +} diff --git a/ydb/library/yql/udfs/common/url_base/lib/ya.make b/ydb/library/yql/udfs/common/url_base/lib/ya.make index 57b90d53ec3..4b16b271e25 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/ya.make +++ b/ydb/library/yql/udfs/common/url_base/lib/ya.make @@ -2,7 +2,7 @@ LIBRARY() YQL_ABI_VERSION( 2 - 23 + 23 0 ) @@ -14,7 +14,7 @@ OWNER( SRCS( url_base_udf.cpp url_parse.cpp - url_query.cpp + url_query.cpp ) PEERDIR( diff --git a/ydb/library/yql/udfs/common/url_base/ya.make b/ydb/library/yql/udfs/common/url_base/ya.make index 6cd90ae8afb..9a2117fb3cb 100644 --- a/ydb/library/yql/udfs/common/url_base/ya.make +++ b/ydb/library/yql/udfs/common/url_base/ya.make @@ -2,7 +2,7 @@ YQL_UDF(url_udf) YQL_ABI_VERSION( 2 - 23 + 23 0 ) diff --git a/ydb/library/yql/utils/actors/http_sender.h b/ydb/library/yql/utils/actors/http_sender.h index 9f2f7b4adcd..b191b5f61b0 100644 --- a/ydb/library/yql/utils/actors/http_sender.h +++ b/ydb/library/yql/utils/actors/http_sender.h @@ -1,43 +1,43 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> #include <ydb/library/yql/public/udf/udf_data_type.h> - -#include <library/cpp/actors/core/actor.h> -#include <library/cpp/actors/core/event_local.h> -#include <library/cpp/actors/core/events.h> -#include <library/cpp/actors/http/http_proxy.h> - -namespace NYql::NDq { - -struct TEvHttpBase { - // Event ids - enum EEv : ui32 { - EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - - EvSendResult = EvBegin, - - EvEnd - }; - - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); - - // Events - - struct TEvSendResult : public NActors::TEventLocal<TEvSendResult, EvSendResult> { - TEvSendResult( - const NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& httpIncomingResponse, - ui32 retryCount, - bool isTerminal) - : HttpIncomingResponse(httpIncomingResponse) - , RetryCount(retryCount) - , IsTerminal(isTerminal) - { } - - NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr HttpIncomingResponse; - ui32 RetryCount = 0; - bool IsTerminal = false; - }; -}; - -} + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/http/http_proxy.h> + +namespace NYql::NDq { + +struct TEvHttpBase { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvSendResult = EvBegin, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + + struct TEvSendResult : public NActors::TEventLocal<TEvSendResult, EvSendResult> { + TEvSendResult( + const NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& httpIncomingResponse, + ui32 retryCount, + bool isTerminal) + : HttpIncomingResponse(httpIncomingResponse) + , RetryCount(retryCount) + , IsTerminal(isTerminal) + { } + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr HttpIncomingResponse; + ui32 RetryCount = 0; + bool IsTerminal = false; + }; +}; + +} diff --git a/ydb/library/yql/utils/actors/http_sender_actor.cpp b/ydb/library/yql/utils/actors/http_sender_actor.cpp index 56f75d269f0..038a8f12bd9 100644 --- a/ydb/library/yql/utils/actors/http_sender_actor.cpp +++ b/ydb/library/yql/utils/actors/http_sender_actor.cpp @@ -1,81 +1,81 @@ -#include "http_sender_actor.h" -#include "http_sender.h" - -#include <library/cpp/actors/core/events.h> - -constexpr std::optional<ui32> MaxRetries = std::nullopt; -constexpr TDuration BaseRetryDelay = TDuration::Seconds(1); -constexpr double ScaleFactor = 1.5; -constexpr TDuration MaxDelay = TDuration::Seconds(10); - -TDuration RandomizeDelay(TDuration baseDelay) { - const TDuration::TValue half = baseDelay.GetValue() / 2; - return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); -} - -using namespace NActors; - -namespace NYql::NDq { - -class THttpSenderActor : public NActors::TActor<THttpSenderActor> { -public: - static constexpr char ActorName[] = "YQL_SOLOMON_HTTP_SENDER_ACTOR"; - - THttpSenderActor( - const TActorId senderId, - const TActorId httpProxyId) - : TActor<THttpSenderActor>(&THttpSenderActor::StateFunc) - , HttpProxyId(httpProxyId) - , SenderId(senderId) - { } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); - hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); - hFunc(TEvents::TEvPoison, Handle); - ) - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& ev) { - Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(ev->Get()->Request)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) { - const auto* res = ev->Get(); - const TString& error = res->GetError(); - - const bool isTerminal = error.empty() || MaxRetries && RetryCount >= MaxRetries; - Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal)); - - if (isTerminal) { - PassAway(); - return; - } - - Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(res->Request)); - } - - void Handle(TEvents::TEvPoison::TPtr&) { - PassAway(); - } - -private: - TDuration GetRetryDelay() { - const TDuration delay = RandomizeDelay(CurrentDelay); - CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay); - return delay; - } - -private: - const TActorId HttpProxyId; - const TActorId SenderId; - - ui32 RetryCount = 0; - TDuration CurrentDelay = BaseRetryDelay; -}; - -NActors::IActor* CreateHttpSenderActor(TActorId senderId, TActorId httpProxyId) { - return new THttpSenderActor(senderId, httpProxyId); -} - -} // NYql::NDq +#include "http_sender_actor.h" +#include "http_sender.h" + +#include <library/cpp/actors/core/events.h> + +constexpr std::optional<ui32> MaxRetries = std::nullopt; +constexpr TDuration BaseRetryDelay = TDuration::Seconds(1); +constexpr double ScaleFactor = 1.5; +constexpr TDuration MaxDelay = TDuration::Seconds(10); + +TDuration RandomizeDelay(TDuration baseDelay) { + const TDuration::TValue half = baseDelay.GetValue() / 2; + return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); +} + +using namespace NActors; + +namespace NYql::NDq { + +class THttpSenderActor : public NActors::TActor<THttpSenderActor> { +public: + static constexpr char ActorName[] = "YQL_SOLOMON_HTTP_SENDER_ACTOR"; + + THttpSenderActor( + const TActorId senderId, + const TActorId httpProxyId) + : TActor<THttpSenderActor>(&THttpSenderActor::StateFunc) + , HttpProxyId(httpProxyId) + , SenderId(senderId) + { } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + hFunc(TEvents::TEvPoison, Handle); + ) + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& ev) { + Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(ev->Get()->Request)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) { + const auto* res = ev->Get(); + const TString& error = res->GetError(); + + const bool isTerminal = error.empty() || MaxRetries && RetryCount >= MaxRetries; + Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal)); + + if (isTerminal) { + PassAway(); + return; + } + + Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(res->Request)); + } + + void Handle(TEvents::TEvPoison::TPtr&) { + PassAway(); + } + +private: + TDuration GetRetryDelay() { + const TDuration delay = RandomizeDelay(CurrentDelay); + CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay); + return delay; + } + +private: + const TActorId HttpProxyId; + const TActorId SenderId; + + ui32 RetryCount = 0; + TDuration CurrentDelay = BaseRetryDelay; +}; + +NActors::IActor* CreateHttpSenderActor(TActorId senderId, TActorId httpProxyId) { + return new THttpSenderActor(senderId, httpProxyId); +} + +} // NYql::NDq diff --git a/ydb/library/yql/utils/actors/http_sender_actor.h b/ydb/library/yql/utils/actors/http_sender_actor.h index ffb03de70cc..464c49d451f 100644 --- a/ydb/library/yql/utils/actors/http_sender_actor.h +++ b/ydb/library/yql/utils/actors/http_sender_actor.h @@ -1,7 +1,7 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> - -namespace NYql::NDq { - NActors::IActor* CreateHttpSenderActor(NActors::TActorId SenderId, NActors::TActorId HttpProxyId); -} // NYql::NDq +#pragma once + +#include <library/cpp/actors/core/actor.h> + +namespace NYql::NDq { + NActors::IActor* CreateHttpSenderActor(NActors::TActorId SenderId, NActors::TActorId HttpProxyId); +} // NYql::NDq diff --git a/ydb/library/yql/utils/actors/ya.make b/ydb/library/yql/utils/actors/ya.make index 0274eae512e..647b18e67be 100644 --- a/ydb/library/yql/utils/actors/ya.make +++ b/ydb/library/yql/utils/actors/ya.make @@ -2,9 +2,9 @@ LIBRARY() OWNER(g:yql) -SRCS( +SRCS( rich_actor.cpp - http_sender_actor.cpp + http_sender_actor.cpp ) PEERDIR( |