diff options
author | maxkovalev <maxkovalev@yandex-team.com> | 2024-11-02 12:22:02 +0300 |
---|---|---|
committer | maxkovalev <maxkovalev@yandex-team.com> | 2024-11-02 12:36:24 +0300 |
commit | c9afd90243fb7161534fe20c6da35c24a14e905e (patch) | |
tree | 6ee0a521a9aabb0df9a2de1acb54afe2ff0568c6 /yql/essentials/tools/pgrun/pgrun.cpp | |
parent | c1e88b2643346523bc487519ca3615245fe372d2 (diff) | |
download | ydb-c9afd90243fb7161534fe20c6da35c24a14e905e.tar.gz |
YQL-19206: Move contrib/ydb/library/yql/tools/pg* to yql/essentials/tools/pg*
YQL-19206: Move contrib/ydb/library/yql/tools to yql/essentials/tools
commit_hash:4e16a5a9355fc868aa23f7aa8363d8057b912c71
Diffstat (limited to 'yql/essentials/tools/pgrun/pgrun.cpp')
-rw-r--r-- | yql/essentials/tools/pgrun/pgrun.cpp | 1348 |
1 files changed, 1348 insertions, 0 deletions
diff --git a/yql/essentials/tools/pgrun/pgrun.cpp b/yql/essentials/tools/pgrun/pgrun.cpp new file mode 100644 index 0000000000..ca39a50b61 --- /dev/null +++ b/yql/essentials/tools/pgrun/pgrun.cpp @@ -0,0 +1,1348 @@ +#include <yql/essentials/utils/backtrace/backtrace.h> +#include <contrib/ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <contrib/ydb/library/yql/minikql/mkql_function_registry.h> +#include <contrib/ydb/library/yql/core/facade/yql_facade.h> +#include <contrib/ydb/library/yql/core/yql_opt_utils.h> +#include <contrib/ydb/library/yql/core/yql_expr_optimize.h> +#include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <contrib/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <contrib/ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <contrib/ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <contrib/ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include "contrib/ydb/library/yql/providers/yt/common/yql_names.h" +#include <contrib/ydb/library/yql/providers/yt/provider/yql_yt_provider.h> +#include <contrib/ydb/library/yql/providers/pg/provider/yql_pg_provider.h> +#include <yql/essentials/public/issue/yql_issue.h> +#include <contrib/ydb/library/yql/parser/pg_wrapper/interface/utils.h> +#include <contrib/ydb/library/yql/providers/yt/lib/schema/schema.h> +#include <contrib/ydb/library/yql/core/services/mounts/yql_mounts.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/yson/public.h> +#include "library/cpp/yson/node/node_io.h" +#include <library/cpp/yt/yson_string/string.h> +#include <fmt/format.h> + +#include <util/system/user.h> +#include <util/stream/file.h> +#include <util/system/fs.h> +#include <util/folder/path.h> +#include <util/folder/tempdir.h> +#include <util/string/split.h> +#include <util/generic/yexception.h> +#include <util/generic/iterator.h> +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <library/cpp/yson/parser.h> +#include <library/cpp/yson/node/node.h> +#include <library/cpp/yson/node/node_builder.h> +#include <library/cpp/string_utils/base64/base64.h> + +using namespace NYql; +using namespace NKikimr::NMiniKQL; +using namespace NNodes; +using NUdf::EDataSlot; + +namespace NMiniKQL = NKikimr::NMiniKQL; + +const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | + NYql::TAstPrintFlags::AdaptArbitraryContent; + +enum class EByteaOutput{ + hex, + escape, +}; + +TString nullRepr(""); +EByteaOutput byteaOutput = EByteaOutput::hex; + +bool IsEscapedChar(const TString& s, size_t pos) { + bool escaped = false; + while (s[--pos] == '\\') { + escaped = !escaped; + } + return escaped; +} + +class TStatementIterator final + : public TInputRangeAdaptor<TStatementIterator> +{ + enum class State { + InOperator, + EndOfOperator, + LineComment, + BlockComment, + QuotedIdentifier, + StringLiteral, + EscapedStringLiteral, + DollarStringLiteral, + InMetaCommand, + InCopyFromStdin, + InVar, + }; + +public: + TStatementIterator(const TString&& program) + : Program_(std::move(program)) + , Cur_() + , Pos_(0) + , State_(State::InOperator) + , AtStmtStart_(true) + , Mode_(State::InOperator) + , Depth_(0) + , Tag_() + , StandardConformingStrings_(true) + { + } + + static bool IsInWsSignificantState(State state) { + switch (state) { + case State::QuotedIdentifier: + case State::StringLiteral: + case State::EscapedStringLiteral: + case State::DollarStringLiteral: + return true; + default: + return false; + } + } + + TString RemoveEmptyLines(const TString& s, bool inStatement) { + if (s.empty()) { + return {}; + } + + TStringBuilder sb; + auto isFirstLine = true; + + if (inStatement && s[0] == '\n') { + sb << '\n'; + } + + for (TStringBuf line : StringSplitter(s).SplitBySet("\r\n").SkipEmpty()) { + if (isFirstLine) { + isFirstLine = false; + } else { + sb << '\n'; + } + sb << line; + } + return sb; + } + + const TString* Next() + { + if (TStringBuf::npos == Pos_) + return nullptr; + + size_t startPos = Pos_; + size_t curPos = Pos_; + size_t endPos; + auto prevState = State_; + + TStringBuilder stmt; + TStringBuilder rawStmt; + auto inStatement = false; + + while (!CallParser(startPos)) { + endPos = (TStringBuf::npos != Pos_) ? Pos_ : Program_.length(); + + TStringBuf part{&Program_[curPos], endPos - curPos}; + + if (IsInWsSignificantState(prevState)) { + if (!rawStmt.empty()) { + stmt << RemoveEmptyLines(rawStmt, inStatement); + rawStmt.clear(); + } + stmt << part; + inStatement = true; + } else { + rawStmt << part; + } + curPos = endPos; + prevState = State_; + } + endPos = (TStringBuf::npos != Pos_) ? Pos_ : Program_.length(); + + TStringBuf part{&Program_[curPos], endPos - curPos}; + + if (IsInWsSignificantState(prevState)) { + if (!rawStmt.empty()) { + stmt << RemoveEmptyLines(rawStmt, inStatement); + rawStmt.clear(); + } + stmt << part; + inStatement = true; + } else { + rawStmt << part; + } + +#if 0 + if (0 < Pos_ && !(Pos_ == TStringBuf::npos || Program_[Pos_-1] == '\n')) { + Cerr << "Last char: '" << Program_[Pos_-1] << "'\n"; + } +#endif + + stmt << RemoveEmptyLines(rawStmt, inStatement); + // inv: Pos_ is at the start of next token + if (startPos == endPos) + return nullptr; + + stmt << '\n'; + Cur_ = stmt; + + ApplyStateFromStatement(Cur_); + + return &Cur_; + } + +private: + + // States: + // - in-operator + // - line comment + // - block comment + // - quoted identifier (U& quoted identifier is no difference) + // - string literal (U& string literal is the same for our purpose) + // - E string literal + // - $ string literal + // - end-of-operator + + // Rules: + // - in-operator + // -- -> next: line comment + // /* -> depth := 1, next: block comment + // " -> next: quoted identifier + // ' -> next: string literal + // E' -> next: E string literal + // $tag$, not preceded by alnum char (a bit of simplification here but sufficient) -> tag := tag, next: $ string literal + // ; -> current_mode := end-of-operator, next: end-of-operator + + // - line comment + // EOL -> next: current_mode + + // - block comment + // /* -> ++depth + // */ -> --depth, if (depth == 0) -> next: current_mode + + // - quoted identifier + // " -> next: in-operator + + // - string literal + // ' -> next: in-operator + + // - E string literal + // ' -> if not preceeded by \ next: in-operator + + // - $ string literal + // $tag$ -> next: in-operator + + // - end-of-operator + // -- -> next: line comment, just once + // /* -> depth := 1, next: block comment + // non-space char -> unget, emit, current_mode := in-operator, next: in-operator + + // In every state: + // EOS -> emit if consumed part of the input is not empty + + bool SaveDollarTag() { + if (Pos_ + 1 == Program_.length()) + return false; + + auto p = Program_.cbegin() + (Pos_ + 1); + + if (std::isdigit(*p)) + return false; + + for (;p != Program_.cend(); ++p) { + if (*p == '$') { + auto bp = &Program_[Pos_]; + auto l = p - bp; + Tag_ = TStringBuf(bp, l + 1); + Pos_ += l; + + return true; + } + if (!(std::isalpha(*p) || std::isdigit(*p) || *p == '_')) + return false; + } + return false; + } + + bool IsCopyFromStdin(size_t startPos, size_t endPos) { + TString stmt(Program_, startPos, endPos - startPos + 1); + stmt.to_upper(); + // FROM STDOUT is used in insert.sql testcase, probably a bug + return stmt.Contains(" FROM STDIN") || stmt.Contains(" FROM STDOUT"); + } + + bool InOperatorParser(size_t startPos) { + // need \ to detect psql meta-commands + static const TString midNextTokens{"'\";-/$\\"}; + // need : for basic psql-vars support + static const TString initNextTokens{"'\";-/$\\:"}; + const auto& nextTokens = (AtStmtStart_) ? initNextTokens : midNextTokens; + + if (AtStmtStart_) { + Pos_ = Program_.find_first_not_of(" \t\n\r\v", Pos_); + if (TString::npos == Pos_) { + return true; + } + } + + Pos_ = Program_.find_first_of(nextTokens, Pos_); + + if (TString::npos == Pos_) { + return true; + } + + switch (Program_[Pos_]) { + case '\'': + State_ = (!StandardConformingStrings_ || 0 < Pos_ && std::toupper(Program_[Pos_ - 1]) == 'E') + ? State::EscapedStringLiteral + : State::StringLiteral; + break; + + case '"': + State_ = State::QuotedIdentifier; + break; + + case ';': + State_ = Mode_ = IsCopyFromStdin(startPos, Pos_) + ? State::InCopyFromStdin + : State::EndOfOperator; + break; + + case '-': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '-') { + State_ = State::LineComment; + ++Pos_; + } + break; + + case '/': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') { + State_ = State::BlockComment; + ++Depth_; + ++Pos_; + } + break; + + case '$': + if (Pos_ == 0 || std::isspace(Program_[Pos_ - 1])) { + if (SaveDollarTag()) + State_ = State::DollarStringLiteral; + } + break; + + case '\\': + if (AtStmtStart_) { + State_ = State::InMetaCommand; + } else if (Program_.Contains("\\gexec", Pos_)) { + Pos_ += 6; + return Emit(Program_[Pos_] == '\n'); + } + break; + + case ':': + if (Pos_ == 0 || Program_[Pos_-1] == '\n') { + State_ = State::InVar; + } + break; + } + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + return false; + } + + bool Emit(bool atEol) { + State_ = Mode_ = State::InOperator; + AtStmtStart_ = true; + + if (atEol) { + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + } + // else do not consume as we're expected to be on the first char of the next statement + + return true; + } + + bool EndOfOperatorParser() { + const auto p = std::find_if_not(Program_.cbegin() + Pos_, Program_.cend(), [](const auto& c) { + return c == ' ' || c == '\t' || c == '\r'; + }); + + if (p == Program_.cend()) { + Pos_ = TStringBuf::npos; + return true; + } + + Pos_ = p - Program_.cbegin(); + + switch (*p) { + case '-': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '-') { + State_ = State::LineComment; + ++Pos_; + } + break; + + case '/': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') { + State_ = State::BlockComment; + ++Depth_; + ++Pos_; + } + break; + + default: + return Emit(*p == '\n'); + } + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + return false; + } + + bool LineCommentParser() { + Pos_ = Program_.find('\n', Pos_); + + if (TString::npos == Pos_) + return true; + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + if (Mode_ == State::EndOfOperator) { + return Emit(false); + } + + State_ = Mode_; + + return false; + } + + bool BlockCommentParser() { + Pos_ = Program_.find_first_of("*/", Pos_); + + if (TString::npos == Pos_) + return true; + + switch(Program_[Pos_]) { + case '/': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') { + ++Depth_; + ++Pos_; + } + break; + + case '*': + if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '/') { + --Depth_; + ++Pos_; + + if (0 == Depth_) { + State_ = Mode_; + } + } + break; + } + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + return false; + } + + bool QuotedIdentifierParser() { + Pos_ = Program_.find('"', Pos_); + + if (TString::npos == Pos_) + return true; + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + State_ = State::InOperator; + AtStmtStart_ = false; + + return false; + } + + bool StringLiteralParser() { + Pos_ = Program_.find('\'', Pos_); + + if (TString::npos == Pos_) + return true; + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + State_ = State::InOperator; + AtStmtStart_ = false; + + return false; + } + + bool EscapedStringLiteralParser() { + Pos_ = Program_.find('\'', Pos_); + + if (TString::npos == Pos_) + return true; + + if (IsEscapedChar(Program_, Pos_)) { + ++Pos_; + return false; + } + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + State_ = State::InOperator; + AtStmtStart_ = false; + + return false; + } + + bool DollarStringLiteralParser() { + Y_ENSURE(Tag_ != nullptr && 2 <= Tag_.length()); + + Pos_ = Program_.find(Tag_, Pos_); + + if (TString::npos == Pos_) + return true; + + Pos_ += Tag_.length(); + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + Tag_.Clear(); + + State_ = State::InOperator; + AtStmtStart_ = false; + + return false; + } + + bool MetaCommandParser() { + Pos_ = Program_.find('\n', Pos_); + + if (TString::npos == Pos_) + return true; + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + return Emit(false); + } + + bool InCopyFromStdinParser() { + Pos_ = Program_.find("\n\\.\n", Pos_); + + if (TString::npos == Pos_) + return true; + + Pos_ += 4; + return Emit(false); + } + + // For now we support vars occupying a whole line only + bool VarParser() { + // TODO: validate var name + Pos_ = Program_.find('\n', Pos_); + + if (TString::npos == Pos_) + return true; + + ++Pos_; + if (Program_.length() == Pos_) { + Pos_ = TString::npos; + return true; + } + + return Emit(false); + } + + bool CallParser(size_t startPos) { + switch (State_) { + case State::InOperator: + return InOperatorParser(startPos); + + case State::EndOfOperator: + return EndOfOperatorParser(); + + case State::LineComment: + return LineCommentParser(); + + case State::BlockComment: + return BlockCommentParser(); + + case State::QuotedIdentifier: + return QuotedIdentifierParser(); + + case State::StringLiteral: + return StringLiteralParser(); + + case State::EscapedStringLiteral: + return EscapedStringLiteralParser(); + + case State::DollarStringLiteral: + return DollarStringLiteralParser(); + + case State::InMetaCommand: + return MetaCommandParser(); + + case State::InCopyFromStdin: + return InCopyFromStdinParser(); + + case State::InVar: + return VarParser(); + + default: + Y_UNREACHABLE(); + } + } + + void ApplyStateFromStatement(const TStringBuf& stmt) { + if (stmt.contains("set standard_conforming_strings = on;") || + stmt.contains("reset standard_conforming_strings;")) + { + StandardConformingStrings_ = true; + } else if (stmt.contains("set standard_conforming_strings = off;")) { + StandardConformingStrings_ = false; + } + } + + TString Program_; + TString Cur_; + size_t Pos_; + State State_; + bool AtStmtStart_; + + State Mode_; + ui16 Depth_; + TStringBuf Tag_; + bool StandardConformingStrings_; +}; + +TString GetFormattedStmt(const TStringBuf& stmt) { + TString result; + result.reserve(stmt.length()); + size_t pos = 0, next_pos = TStringBuf::npos; + + while (TStringBuf::npos != (next_pos = stmt.find('\n', pos))) { + if (0 < next_pos - pos) { + if (!(stmt[pos] == '\r' && 1 == next_pos - pos)) { + result += stmt.substr(pos, next_pos - pos + 1); + } + } + pos = next_pos + 1; + } + if (pos < stmt.length()) + result += stmt.substr(pos); + + if (0 < result.length() && '\n' == result.back()) + result.pop_back(); + + if (0 < result.length() && '\r' == result.back()) + result.pop_back(); + + return result; +} + +void PrintExprTo(const TProgramPtr& program, IOutputStream& out) { + TStringStream baseSS; + + auto baseAst = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), + NYql::TExprAnnotationFlags::None, true); + baseAst.Root->PrettyPrintTo(baseSS, PRETTY_FLAGS); + + out << baseSS.Data(); +} + +NYT::TNode ParseYson(const TString& yson) +{ + NYT::TNode root; + NYT::TNodeBuilder builder(&root); + NYson::TStatelessYsonParser resultParser(&builder); + resultParser.Parse(yson); + + return root; +} + +TString GetPgErrorMessage(const TIssue& issue) { + const TString anchor("reason: "); + const auto& msg = issue.GetMessage(); + + auto pos = msg.find(anchor); + + if (TString::npos == pos) + return TString(msg); + + return msg.substr(pos + anchor.length()); +} + +void WriteErrorToStream(const TProgramPtr program) +{ + program->PrintErrorsTo(Cerr); + + for (const auto& topIssue: program->Issues()) { + WalkThroughIssues(topIssue, true, [&](const TIssue& issue, ui16 /*level*/) { + const auto msg = GetPgErrorMessage(issue); + Cout << msg; + if (msg.back() != '\n') { + Cout << '\n'; + } + }); + } +} + +using CellFormatter = std::function<const TString(const TString&)>; +using TColumnType = TString; + +inline const TString FormatBool(const TString& value) +{ + static const TString T = "t"; + static const TString F = "f"; + + return (value == "true") ? T + : (value == "false") ? F + : (value == nullRepr) ? nullRepr + : ythrow yexception() << "Unexpected bool literal: " << value; +} + +inline const TString FormatNumeric(const TString& value) +{ + static const TString Zero = "0.0"; + + return (value == "0") ? Zero : value; +} + +const TString FormatFloat(const TString& value, std::function<TString(const TString&)> formatter) { + static const TString nan = "NaN"; + static const TString inf = "Infinity"; + static const TString minf = "-Infinity"; + + try { + return (value == "") ? "" + : (value == "nan") ? nan + : (value == "inf") ? inf + : (value == "-inf") ? minf + : formatter(value); + } catch (const std::exception& e) { + Cerr << "Unexpected float value '" << value << "'\n"; + return ""; + } +} + +inline const TString FormatFloat4(const TString& value) +{ + return FormatFloat(value, + [] (const TString& val) { return TString(fmt::format("{:.8g}", std::stof(val))); }); +} + +inline const TString FormatFloat8(const TString& value) +{ + return FormatFloat(value, + [] (const TString& val) { return TString(fmt::format("{:.15g}", std::stod(val))); }); +} + +inline const TString FormatTransparent(const TString& value) +{ + return value; +} + +static const THashMap<TColumnType, CellFormatter> ColumnFormatters { + { "bool", FormatBool }, + { "numeric", FormatNumeric }, + { "float4", FormatFloat4 }, + { "float8", FormatFloat8 }, +}; + +static const THashSet<TColumnType> RightAlignedTypes { + "int2", + "int4", + "int8", + "float4", + "float8", + "numeric", + "oid", +}; + +struct TColumn { + TString Name; + TString Type; + size_t Width; + CellFormatter Formatter; + bool RightAligned; +}; + +std::string FormatCell(const TString& data, const TColumn& column, size_t index, size_t numberOfColumns) { + const auto delim = (index == 0) ? " " : " | "; + + if (column.RightAligned) + return fmt::format("{0}{1:>{2}}", delim, data, column.Width); + + if (index == numberOfColumns - 1) + return fmt::format("{0}{1}", delim, data); + + return fmt::format("{0}{1:<{2}}", delim, data, column.Width); +} + +TString GetCellData(const NYT::TNode& cell, const TColumn& column) { + if (column.Type == "bytea") { + const auto rawValue = (cell.IsList()) + ? Base64Decode(cell.AsList()[0].AsString()) + : cell.AsString(); + + switch (byteaOutput) { + case EByteaOutput::hex: { + TString result; + + const auto expectedSize = rawValue.size() * 2 + 2; + result.resize(expectedSize); + result[0] = '\\'; + result[1] = 'x'; + const auto cnt = HexEncode(rawValue.data(), rawValue.size(), result.begin() + 2); + + Y_ASSERT(cnt + 2 == expectedSize); + + return result; + } + case EByteaOutput::escape: { + TString result; + + ui64 expectedSize = std::accumulate(rawValue.cbegin(), rawValue.cend(), 0U, + [] (ui64 acc, char c) { + return acc + ((c == '\\') + ? 2 + : ((ui8)c < 0x20 || 0x7e < (ui8)c) + ? 4 + : 1); + }); + result.resize(expectedSize); + auto p = result.begin(); + for (const auto c : rawValue) { + if (c == '\\') { + *p++ = '\\'; + *p++ = '\\'; + } else if ((ui8)c < 0x20 || 0x7e < (ui8)c) { + auto val = (ui8)c; + + *p++ = '\\'; + *p++ = ((val >> 6) & 03) + '0'; + *p++ = ((val >> 3) & 07) + '0'; + *p++ = (val & 07) + '0'; + } else { + *p++ = c; + } + } + + return result; + } + default: + throw yexception() << "Unhandled EByteaOutput value"; + } + } + return cell.AsString(); +} + +void WriteTableToStream(IOutputStream& stream, const NYT::TNode::TListType& cols, const NYT::TNode::TListType& rows) +{ + TVector<TColumn> columns; + TList<TVector<TString>> formattedData; + + for (const auto& col: cols) { + const auto& colName = col[0].AsString(); + const auto& colType = col[1][1].AsString(); + + auto& c = columns.emplace_back(); + + c.Name = colName; + c.Type = colType; + c.Width = colName.length(); + c.Formatter = ColumnFormatters.Value(colType, FormatTransparent); + c.RightAligned = RightAlignedTypes.contains(colType); + } + + for (const auto& row : rows) { + auto& rowData = formattedData.emplace_back(); + + { int i = 0; + for (const auto& cell : row.AsList()) { + auto& c = columns[i]; + + const auto cellData = cell.HasValue() ? GetCellData(cell, c) : nullRepr; + + rowData.emplace_back(c.Formatter(cellData)); + c.Width = std::max(c.Width, rowData.back().length()); + + ++i; + }} + } + + if (columns.empty()) { + stream << "--"; + } else { + const auto totalTableWidth = + std::accumulate(columns.cbegin(), columns.cend(), std::size_t{0}, + [] (const auto& sum, const auto& elem) { return sum + elem.Width; }) + columns.size() * 3 - 1; + TString filler(totalTableWidth, '-'); + stream << fmt::format(" {0:^{1}} ", columns[0].Name, columns[0].Width); + for (size_t i = 1, pos = columns[0].Width + 2; i < columns.size(); ++i) { + const auto& c = columns[i]; + + stream << fmt::format("| {0:^{1}} ", c.Name, c.Width); + filler[pos] = '+'; + pos += c.Width + 3; + } + stream << '\n' << filler; + } + + for (const auto& row : formattedData) { + stream << '\n'; + + for (size_t i = 0; i < row.size(); ++i) { + stream << FormatCell(row[i], columns[i], i, columns.size()); + } + } + stream << fmt::format("\n({} {})\n", formattedData.size(), (formattedData.size() == 1) ? "row" : "rows"); +} + +std::pair<TString, TString> GetYtTableDataPaths(const TFsPath& dataDir, const TString tableName) { + const auto dataFileName = dataDir / tableName; + const auto attrFileName = dataDir / (tableName + ".attr"); + return {dataFileName, attrFileName}; +} + +void CreateYtFileTable(const TFsPath& dataDir, const TString tableName, const TExprNode::TPtr columnsNode, + THashMap<TString, TString>& tablesMapping, TExprContext& ctx, const TPosition& pos) { + const auto [dataFilePath, attrFilePath] = + GetYtTableDataPaths(dataDir, tableName); + + TFile dataFile{dataFilePath, CreateNew}; + TFile attrFile{attrFilePath, CreateNew}; + + auto rowSpec = MakeIntrusive<TYqlRowSpecInfo>(); + + TColumnOrder columnOrder; + columnOrder.Reserve(columnsNode->ChildrenSize()); + + TStringBuilder ysonType; + ysonType << "[\"StructType\";["; + + for (const auto &columnNode : columnsNode->Children()) { + const auto &colName = columnNode->Child(0)->Content(); + const auto &colTypeNode = columnNode->Child(1); + + columnOrder.AddColumn(TString(colName)); + + ysonType << fmt::format("[\"{0}\";[\"{1}\";\"{2}\";];];", + colName, colTypeNode->Content(), + colTypeNode->Child(0)->Content()); + } + ysonType << "];]"; + const auto *typeNode = NCommon::ParseTypeFromYson(TStringBuf(ysonType), ctx, pos); + + rowSpec->SetType(typeNode->Cast<TStructExprType>()); + rowSpec->SetColumnOrder(std::move(columnOrder)); + + NYT::TNode attrs = NYT::TNode::CreateMap(); + rowSpec->FillAttrNode(attrs[YqlRowSpecAttribute], 0, false); + + NYT::TNode spec; + rowSpec->FillCodecNode(spec[YqlRowSpecAttribute]); + + attrs["schema"] = RowSpecToYTSchema(spec[YqlRowSpecAttribute], 0).ToNode(); + + TOFStream of(attrFile.GetName()); + of.Write(NYT::NodeToYsonString(attrs, NYson::EYsonFormat::Pretty)); + + tablesMapping[TString("yt.plato.") + tableName] = dataFile.GetName(); +} + +bool RemoveFile(const TString& fileName) { + if (NFs::Remove(fileName)) { + return true; + } + + switch (errno) { + case ENOENT: + return false; + default: + throw yexception() << "Cannot remove existing table file \"" << fileName << "\"\n"; + } +} + +void DeleteYtFileTable(const TFsPath& dataDir, const TString tableName, THashMap<TString, TString>& tablesMapping) { + const auto [dataFilePath, attrFilePath] = GetYtTableDataPaths(dataDir, tableName); + + if (!RemoveFile(dataFilePath)) { + Cout << "table \"" << tableName << "\" does not exist\n"; + } + RemoveFile(attrFilePath); + + tablesMapping.erase(TString("yt.plato.") + tableName); +} + +int SplitStatements(int argc, char* argv[]) { + Y_UNUSED(argc); + Y_UNUSED(argv); + + const TString delimiter{"===a738dc70-2d81-45b4-88f2-738d09b186b7===\n"}; + + for (const auto& stmt : TStatementIterator{Cin.ReadAll()}) { + Cout << delimiter << stmt; + } + return 0; +} + +void WriteToYtTableScheme( + const NYql::TExprNode& writeNode, + const TTempDir& tempDir, + const TIntrusivePtr<class NYql::NFile::TYtFileServices> yqlNativeServices, + TExprContext& ctx) { + const auto* keyNode = writeNode.Child(2); + + const auto* tableNameNode = keyNode->Child(0)->Child(1); + Y_ENSURE(tableNameNode->IsCallable("String")); + + const auto& tableName = tableNameNode->Child(0)->Content(); + Y_ENSURE(!tableName.empty()); + + const auto* optionsNode = writeNode.Child(4); + Y_ENSURE(optionsNode); + + const auto modeNode = GetSetting(*optionsNode, "mode"); + Y_ENSURE(modeNode); + const auto mode = modeNode->Child(1)->Content(); + + if (mode == "create") { + const auto columnsNode = GetSetting(*optionsNode, "columns"); + Y_ENSURE(columnsNode); + + CreateYtFileTable(tempDir.Path(), TString(tableName), columnsNode->ChildPtr(1), + yqlNativeServices->GetTablesMapping(), ctx, writeNode.Pos(ctx)); + } + else if (mode == "drop") { + DeleteYtFileTable(tempDir.Path(), TString(tableName), yqlNativeServices->GetTablesMapping()); + } +} + +void ProcessMetaCmd(const TStringBuf& cmd) { + const TStringBuf pset_null("\\pset null "); + + if (cmd.starts_with(pset_null)) { + const auto secondArgPos = cmd.find_first_not_of(" ", pset_null.length()); + if (secondArgPos != std::string_view::npos) { + TStringBuf newNullRepr(cmd, secondArgPos); + + if (newNullRepr.front() == '\'') { + newNullRepr.remove_prefix(1); + + if (newNullRepr.back() == '\'') { + newNullRepr.remove_suffix(1); + } + } + nullRepr = newNullRepr; + + return; + } + } + Cerr << "Metacommand " << cmd << " is not supported\n"; +} + +void ShowFinalAst(TProgramPtr& program, IOutputStream& stream) { + Cerr << "Final AST:\n"; + PrintExprTo(program, stream); +} + +void FillTablesMapping(const TFsPath& dataDir, THashMap<TString, TString>& tablesMapping) { + TVector<TFsPath> children; + + dataDir.List(children); + + bool regMsgLogged = false; + for (const auto& f: children) { + if (f.GetExtension() != "attr") { + continue; + } + auto tableName = f.Basename(); + tableName.resize(tableName.length() - 5); + + if (tableName.EndsWith(".tmp")) { + continue; + } + if (!regMsgLogged) { + regMsgLogged = true; + + Cerr << "Registering pre-existing tables\n"; + } + const auto fullTableName = f.Parent() / tableName; + Cerr << '\t' << tableName << '\n'; + tablesMapping[TString("yt.plato.") + tableName] = f.Parent() / tableName; + } +} + +int Main(int argc, char* argv[]) +{ + using namespace NLastGetopt; + TOpts opts = TOpts::Default(); + + const TString runnerName{"pgrun"}; + TVector<TString> udfsPaths; + + TString rawDataDir; + THashMap<TString, TString> clusterMapping; + + static const TString DefaultCluster{"plato"}; + clusterMapping[DefaultCluster] = YtProviderName; + clusterMapping["pg_catalog"] = PgProviderName; + clusterMapping["information_schema"] = PgProviderName; + + opts.AddHelpOption(); + opts.AddLongOption("print-ast", "print initial & final ASTs to stderr").NoArgument(); + opts.AddLongOption("print-result", "print program execution result to stderr").NoArgument(); + opts.AddLongOption("datadir", "directory for tables").StoreResult<TString>(&rawDataDir); + opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths); + opts.SetFreeArgsMax(0); + + TOptsParseResult res(&opts, argc, argv); + + const auto needPrintAst = res.Has("print-ast"); + const auto needPrintResult = res.Has("print-result"); + + const bool tempDirExists = !rawDataDir.empty() && NFs::Exists(rawDataDir); + TTempDir tempDir{rawDataDir.empty() ? TTempDir{} : TTempDir{rawDataDir}}; + if (tempDirExists) { + tempDir.DoNotRemove(); + } + + auto fsConfig = MakeHolder<TFileStorageConfig>(); + + THolder<TGatewaysConfig> gatewaysConfig; + + auto fileStorage = CreateFileStorage(*fsConfig); + fileStorage = WithAsync(fileStorage); + + auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), false, udfsPaths); + IUdfResolver::TPtr udfResolver = NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), fileStorage, true);; + + bool keepTempFiles = true; + bool emulateOutputForMultirun = false; + + auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), {}, fileStorage, tempDir.Path(), keepTempFiles); + auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun); + if (tempDirExists) { + FillTablesMapping(tempDir.Path(), yqlNativeServices->GetTablesMapping()); + } + + TVector<TDataProviderInitializer> dataProvidersInit; + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + dataProvidersInit.push_back(GetPgDataProviderInitializer()); + + TExprContext ctx; + + IModuleResolver::TPtr moduleResolver; + if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping, true)) { + Cerr << "Errors loading default YQL libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(Cerr); + return -1; + } + + TExprContext::TFreezeGuard freezeGuard(ctx); + + TProgramFactory factory(true, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, runnerName); + factory.SetModules(moduleResolver); + factory.SetUdfResolver(udfResolver); + factory.SetGatewaysConfig(gatewaysConfig.Get()); + + const TString username = GetUsername(); + THashSet<TString> sqlFlags; + + NSQLTranslation::TTranslationSettings settings; + settings.ClusterMapping = clusterMapping; + settings.DefaultCluster = DefaultCluster; + settings.Flags = sqlFlags; + settings.SyntaxVersion = 1; + settings.AnsiLexer = false; + settings.V0Behavior = NSQLTranslation::EV0Behavior::Report; + settings.AssumeYdbOnClusterWithSlash = false; + settings.PgParser = true; + + for (const auto& raw_stmt : TStatementIterator{Cin.ReadAll()}) { + const auto stmt = GetFormattedStmt(raw_stmt); + Cout << stmt << '\n'; + + Cerr << "<sql-statement>\n" << stmt << "\n</sql-statement>\n"; + + if (stmt[0] == '\\') { + ProcessMetaCmd(stmt); + continue; + } + + { + const auto metaCmdStart = stmt.find("\n\\"); + if (TString::npos != metaCmdStart) { + const auto metaCmdEnd = stmt.find_first_of("\r\n", metaCmdStart + 2); + ProcessMetaCmd(stmt.substr(metaCmdStart + 1, metaCmdEnd)); + continue; + } + } + + if (TString::npos != stmt.find("SET bytea_output TO hex")) { + byteaOutput = EByteaOutput::hex; + continue; + } + + if (TString::npos != stmt.find("SET bytea_output TO escape")) { + byteaOutput = EByteaOutput::escape; + continue; + } + + google::protobuf::Arena arena; + settings.Arena = &arena; + + auto program = factory.Create("-stdin-", stmt); + + if (!program->ParseSql(settings)) { + WriteErrorToStream(program); + + continue; + } + + if (!program->Compile(username)) { + WriteErrorToStream(program); + + continue; + } + +#if 0 + auto validate_status = program->Validate(username, &Cout, true); + program->PrintErrorsTo(Cerr); + if (validate_status == TProgram::TStatus::Error) { + return 1; + } + + auto optimize_status = program->Optimize(username, nullptr, &Cerr, &Cout, true); + program->PrintErrorsTo(Cerr); + if (optimize_status == TProgram::TStatus::Error) { + return 1; + } +#endif + + if (needPrintAst) { + Cerr << "Initial AST:\n"; + PrintExprTo(program, Cerr); + } + + static const THashSet<TString> ignoredNodes{"CommitAll!", "Commit!" }; + const auto opNode = NYql::FindNode(program->ExprRoot(), + [] (const TExprNode::TPtr& node) { return !ignoredNodes.contains(node->Content()); }); + if (opNode->IsCallable("Write!")) { + Y_ENSURE(opNode->ChildrenSize() == 5); + + const auto* keyNode = opNode->Child(2); + const bool isWriteToTableSchemeNode = keyNode->IsCallable("Key") && 0 < keyNode->ChildrenSize() && + keyNode->Child(0)->Child(0)->IsAtom("tablescheme"); + + if (isWriteToTableSchemeNode) { + try { + WriteToYtTableScheme(*opNode, tempDir, yqlNativeServices, program->ExprCtx()); + } catch (const yexception& e) { + program->Issues().AddIssue(e.what()); + + WriteErrorToStream(program); + + continue; + } + + if (needPrintAst) { + program->Optimize(username); + + ShowFinalAst(program, Cerr); + } + + continue; + } + } + + auto status = program->Run(username, nullptr, nullptr, nullptr, true); + program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text); + + if (status == TProgram::TStatus::Error) { + WriteErrorToStream(program); + continue; + } + if (needPrintAst) { + ShowFinalAst(program, Cerr); + } + + if (program->HasResults()) { + if (needPrintResult) { + Cerr << program->ResultsAsString() << Endl; + } + + const auto root = ParseYson(program->ResultsAsString()); + + const auto& cols = root[0]["Write"][0]["Type"][1][1].AsList(); + const auto& data = root[0]["Write"][0]["Data"].AsList(); + + WriteTableToStream(Cout, cols, data); + Cout << Endl; + } + } + + return 0; +} + +int main(int argc, char* argv[]) +{ + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + + try { + if (1 < argc) { + if (TString(argv[1]) == "split-statements") { + return SplitStatements(argc, argv); + } + } + return Main(argc, argv); + } + catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return 1; + } +} + |