diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-30 20:49:53 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-30 21:17:44 +0300 |
commit | f154e22342f327342effe873b0a00ad80c975e76 (patch) | |
tree | fff231496c10fbfcff025ed953b512bf2a82d7c0 | |
parent | 4ebafdd49d8b0706c5af76ef7c2d0b3b498d0310 (diff) | |
download | ydb-f154e22342f327342effe873b0a00ad80c975e76.tar.gz |
Moved udf_test and refactored test_framework
Локально упавший тест выполняется
%%
vvvv@mr-nvme-testing-08:~/repo/arcadia/statbox/nile/tests/yql/py2/part_2$ arc checkout move_udf_test_and_refactor_tf
Switched to branch 'move_udf_test_and_refactor_tf'
vvvv@mr-nvme-testing-08:~/repo/arcadia/statbox/nile/tests/yql/py2/part_2$ ya make -tA -F '*test_unchanged_table*'
Warn[-WPluginErr]: in $B/statbox/nile/tests/yql/py2/part_2/libpy2-part_2.so: Requirement cpu is redefined 2 -> 4
Warn[-WPluginErr]: in $B/statbox/nile/tests/yql/py2/part_2/libpy2-part_2.so: Requirement ram is redefined 16 -> 9
Number of suites skipped by name: 2, by filter *test_unchanged_table*
Total 1 suite:
1 - GOOD
Total 4 tests:
4 - GOOD
Ok
%%
судя по ошибке он flaky
28 files changed, 5821 insertions, 1 deletions
diff --git a/build/conf/project_specific/yql_udf.conf b/build/conf/project_specific/yql_udf.conf index 9da1324b93..5e5ad8b198 100644 --- a/build/conf/project_specific/yql_udf.conf +++ b/build/conf/project_specific/yql_udf.conf @@ -34,7 +34,7 @@ macro UDF_NO_PROBE() { ### Documentation: https://yql.yandex-team.ru/docs/yt/libraries/testing/ ### Documentation about the Arcadia test system: https://wiki.yandex-team.ru/yatool/test/ module YQL_UDF_TEST: PY3TEST_BIN { - PEERDIR(yql/library/udf_test) + PEERDIR(ydb/library/yql/tests/common/udf_test) DEPENDS(ydb/library/yql/tools/astdiff) DEPENDS(ydb/library/yql/tools/yqlrun) diff --git a/library/c/cyson/cyson.cpp b/library/c/cyson/cyson.cpp new file mode 100644 index 0000000000..f8bff01a96 --- /dev/null +++ b/library/c/cyson/cyson.cpp @@ -0,0 +1,447 @@ +// Export visible API + +#include "cyson.h" + +#include <library/cpp/yson_pull/yson.h> +#include <library/cpp/yson_pull/detail/reader.h> +#include <library/cpp/yson_pull/detail/writer.h> +#include <library/cpp/yson_pull/detail/input/stream.h> +#include <library/cpp/yson_pull/detail/input/stdio_file.h> +#include <library/cpp/yson_pull/detail/output/buffered.h> +#include <library/cpp/yson_pull/detail/output/stream.h> +#include <library/cpp/yson_pull/detail/output/stdio_file.h> + +#include <util/generic/ptr.h> +#include <util/generic/strbuf.h> +#include <util/generic/string.h> + +namespace { + template <typename T> + void safe_assign_string(TString& dest, T&& value) noexcept { + try { + dest = std::forward<T>(value); + } catch (...) { + // Suppress exception + } + } + +} // anonymous namespace + +struct yson_reader { + NYsonPull::NDetail::gen_reader_impl<false> impl; + TString error_message; + + yson_reader(NYsonPull::NInput::IStream* stream, NYsonPull::EStreamType mode) + : impl(*stream, mode) + { + error_message.reserve(64); + } + + yson_event_type safe_get_next_event() noexcept { + try { + auto& event = impl.next_event(); + return static_cast<yson_event_type>(event.Type()); + } catch (...) { + safe_assign_string(error_message, CurrentExceptionMessage()); + return YSON_EVENT_ERROR; + } + } +}; + +struct yson_writer { + THolder<NYsonPull::IConsumer> consumer; + TString error_message; + + yson_writer(THolder<NYsonPull::IConsumer> consumer_) + : consumer{std::move(consumer_)} { + error_message.reserve(64); + } + + template <typename T> + yson_writer_result safe_write(T&& func) noexcept { + try { + func(*consumer); + return YSON_WRITER_RESULT_OK; + } catch (const NYsonPull::NException::TBadOutput& err) { + safe_assign_string(error_message, err.what()); + return YSON_WRITER_RESULT_BAD_STREAM; + } catch (...) { + safe_assign_string(error_message, CurrentExceptionMessage()); + return YSON_WRITER_RESULT_ERROR; + } + } +}; + +namespace { + class callback_error: public std::exception { + public: + const char* what() const noexcept override { + return "User callback returned error result code"; + } + }; + + class c_yson_input_stream: public NYsonPull::NInput::IStream { + void* ctx_; + yson_input_stream_func callback_; + + public: + c_yson_input_stream(void* ctx, yson_input_stream_func callback) + : ctx_{ctx} + , callback_{callback} { + } + + protected: + result do_fill_buffer() override { + const char* ptr; + size_t length; + switch (callback_(ctx_, &ptr, &length)) { + case YSON_INPUT_STREAM_RESULT_OK: + buffer().reset( + reinterpret_cast<const ui8*>(ptr), + reinterpret_cast<const ui8*>(ptr) + length); + return result::have_more_data; + + case YSON_INPUT_STREAM_RESULT_EOF: + return result::at_end; + + default: + case YSON_INPUT_STREAM_RESULT_ERROR: + throw callback_error(); + } + } + }; + + class c_yson_output_stream: public NYsonPull::NDetail::NOutput::TBuffered<c_yson_output_stream> { + using base_type = NYsonPull::NDetail::NOutput::TBuffered<c_yson_output_stream>; + + void* ctx_; + yson_output_stream_func callback_; + + public: + c_yson_output_stream(void* ctx, yson_output_stream_func callback, size_t buffer_size) + : base_type(buffer_size) + , ctx_{ctx} + , callback_{callback} { + } + + void write(TStringBuf data) { + switch (callback_(ctx_, data.data(), data.size())) { + case YSON_OUTPUT_STREAM_RESULT_OK: + return; + + default: + case YSON_OUTPUT_STREAM_RESULT_ERROR: + throw callback_error(); + } + } + }; + + // Type marshalling + + const yson_string* to_yson_string(const NYsonPull::TScalar& value) { + assert(value.Type() == NYsonPull::EScalarType::String); + auto* result = &value.AsUnsafeValue().AsString; + return reinterpret_cast<const yson_string*>(result); + } + + yson_input_stream* to_yson_input_stream(NYsonPull::NInput::IStream* ptr) { + return reinterpret_cast<yson_input_stream*>(ptr); + } + + NYsonPull::NInput::IStream* from_yson_input_stream(yson_input_stream* ptr) { + return reinterpret_cast<NYsonPull::NInput::IStream*>(ptr); + } + + yson_output_stream* to_yson_output_stream(NYsonPull::NOutput::IStream* ptr) { + return reinterpret_cast<yson_output_stream*>(ptr); + } + + NYsonPull::NOutput::IStream* from_yson_output_stream(yson_output_stream* ptr) { + return reinterpret_cast<NYsonPull::NOutput::IStream*>(ptr); + } + + // Exception-handling new/delete wrappers + + template <typename T, typename... Args> + T* safe_new(Args&&... args) noexcept { + try { + return new T(std::forward<Args>(args)...); + } catch (...) { + return nullptr; + } + } + + template <typename T> + void safe_delete(T* ptr) noexcept { + assert(ptr != nullptr); + try { + delete ptr; + } catch (...) { + // Suppress destructor exceptions + } + } + + template <typename T, typename... Args> + yson_writer* safe_new_writer(yson_output_stream* stream, Args&&... args) noexcept { + try { + auto impl = MakeHolder<T>( + *from_yson_output_stream(stream), + std::forward<Args>(args)...); + return new yson_writer(std::move(impl)); + } catch (...) { + return nullptr; + } + } + +} // anonymous namespace + +extern "C" { +// Input stream + +yson_input_stream* yson_input_stream_from_string(const char* ptr, size_t length) { + auto buf = TStringBuf{ptr, length}; + auto* result = safe_new<NYsonPull::NDetail::NInput::TOwned<TMemoryInput>>(buf); + return to_yson_input_stream(result); +} + +yson_input_stream* yson_input_stream_from_file(FILE* file, size_t buffer_size) { + auto* result = safe_new<NYsonPull::NDetail::NInput::TStdioFile>(file, buffer_size); + return to_yson_input_stream(result); +} + +yson_input_stream* yson_input_stream_from_fd(int fd, size_t buffer_size) { + auto* result = safe_new<NYsonPull::NDetail::NInput::TFHandle>(fd, buffer_size); + return to_yson_input_stream(result); +} + +yson_input_stream* yson_input_stream_new(void* ctx, yson_input_stream_func callback) { + auto* result = safe_new<c_yson_input_stream>(ctx, callback); + return to_yson_input_stream(result); +} + +void yson_input_stream_delete(yson_input_stream* stream) { + assert(stream != nullptr); + safe_delete(from_yson_input_stream(stream)); +} + +// Reader + +yson_reader* yson_reader_new(yson_input_stream* stream, yson_stream_type mode) { + assert(stream != nullptr); + return safe_new<yson_reader>( + from_yson_input_stream(stream), + static_cast<NYsonPull::EStreamType>(mode)); +} + +void yson_reader_delete(yson_reader* reader) { + assert(reader != nullptr); + safe_delete(reader); +} + +yson_event_type yson_reader_get_next_event(yson_reader* reader) { + assert(reader != nullptr); + return reader->safe_get_next_event(); +} + +const char* yson_reader_get_error_message(yson_reader* reader) { + assert(reader != nullptr); + return reader->error_message.c_str(); +} + +yson_scalar_type yson_reader_get_scalar_type(yson_reader* reader) { + assert(reader != nullptr); + auto& event = reader->impl.last_event(); + return static_cast<yson_scalar_type>(event.AsScalar().Type()); +} + +int yson_reader_get_boolean(yson_reader* reader) { + assert(reader != nullptr); + auto& event = reader->impl.last_event(); + return static_cast<int>(event.AsScalar().AsBoolean()); +} + +i64 yson_reader_get_int64(yson_reader* reader) { + assert(reader != nullptr); + auto& event = reader->impl.last_event(); + return event.AsScalar().AsInt64(); +} + +ui64 yson_reader_get_uint64(yson_reader* reader) { + assert(reader != nullptr); + auto& event = reader->impl.last_event(); + return event.AsScalar().AsUInt64(); +} + +double yson_reader_get_float64(yson_reader* reader) { + assert(reader != nullptr); + auto& event = reader->impl.last_event(); + return event.AsScalar().AsFloat64(); +} + +const yson_string* yson_reader_get_string(yson_reader* reader) { + assert(reader != nullptr); + return to_yson_string(reader->impl.last_event().AsScalar()); +} + +// Output stream + +yson_output_stream* yson_output_stream_from_file(FILE* file, size_t buffer_size) { + auto* result = safe_new<NYsonPull::NDetail::NOutput::TStdioFile>(file, buffer_size); + return to_yson_output_stream(result); +} + +yson_output_stream* yson_output_stream_from_fd(int fd, size_t buffer_size) { + auto* result = safe_new<NYsonPull::NDetail::NOutput::TFHandle>(fd, buffer_size); + return to_yson_output_stream(result); +} + +yson_output_stream* yson_output_stream_new(void* ctx, yson_output_stream_func callback, size_t buffer_size) { + auto* result = safe_new<c_yson_output_stream>(ctx, callback, buffer_size); + return to_yson_output_stream(result); +} + +void yson_output_stream_delete(yson_output_stream* stream) { + assert(stream != nullptr); + safe_delete(from_yson_output_stream(stream)); +} + +// Writer + +yson_writer* yson_writer_new_binary(yson_output_stream* stream, yson_stream_type mode) { + assert(stream != nullptr); + return safe_new_writer<NYsonPull::NDetail::TBinaryWriterImpl>( + stream, + static_cast<NYsonPull::EStreamType>(mode)); +} + +yson_writer* yson_writer_new_text(yson_output_stream* stream, yson_stream_type mode) { + assert(stream != nullptr); + return safe_new_writer<NYsonPull::NDetail::TTextWriterImpl>( + stream, + static_cast<NYsonPull::EStreamType>(mode)); +} + +yson_writer* yson_writer_new_pretty_text(yson_output_stream* stream, yson_stream_type mode, size_t indent) { + assert(stream != nullptr); + return safe_new_writer<NYsonPull::NDetail::TPrettyWriterImpl>( + stream, + static_cast<NYsonPull::EStreamType>(mode), + indent); +} + +void yson_writer_delete(yson_writer* writer) { + assert(writer != nullptr); + safe_delete(writer); +} + +const char* yson_writer_get_error_message(yson_writer* writer) { + assert(writer != nullptr); + return writer->error_message.c_str(); +} + +yson_writer_result yson_writer_write_begin_stream(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnBeginStream(); + }); +} + +yson_writer_result yson_writer_write_end_stream(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnEndStream(); + }); +} + +yson_writer_result yson_writer_write_begin_list(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnBeginList(); + }); +} + +yson_writer_result yson_writer_write_end_list(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnEndList(); + }); +} + +yson_writer_result yson_writer_write_begin_map(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnBeginMap(); + }); +} + +yson_writer_result yson_writer_write_end_map(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnEndMap(); + }); +} + +yson_writer_result yson_writer_write_begin_attributes(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnBeginAttributes(); + }); +} + +yson_writer_result yson_writer_write_end_attributes(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnEndAttributes(); + }); +} + +yson_writer_result yson_writer_write_entity(yson_writer* writer) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnEntity(); + }); +} + +yson_writer_result yson_writer_write_key(yson_writer* writer, const char* ptr, size_t length) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnKey({ptr, length}); + }); +} + +yson_writer_result yson_writer_write_string(yson_writer* writer, const char* ptr, size_t length) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnScalarString({ptr, length}); + }); +} + +yson_writer_result yson_writer_write_int64(yson_writer* writer, i64 value) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnScalarInt64(value); + }); +} + +yson_writer_result yson_writer_write_uint64(yson_writer* writer, ui64 value) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnScalarUInt64(value); + }); +} + +yson_writer_result yson_writer_write_boolean(yson_writer* writer, int value) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnScalarBoolean(static_cast<bool>(value)); + }); +} + +yson_writer_result yson_writer_write_float64(yson_writer* writer, double value) { + assert(writer != nullptr); + return writer->safe_write([=](NYsonPull::IConsumer& consumer) { + consumer.OnScalarFloat64(value); + }); +} + +} // extern "C" diff --git a/library/c/cyson/cyson.h b/library/c/cyson/cyson.h new file mode 100644 index 0000000000..1151c7fd89 --- /dev/null +++ b/library/c/cyson/cyson.h @@ -0,0 +1,164 @@ +#pragma once + +#include <library/cpp/yson_pull/cyson_enums.h> + +#include <util/system/types.h> + +#include <stddef.h> +#include <stdio.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct yson_string { + const char* ptr; + size_t length; +} yson_string; + +typedef yson_input_stream_result (*yson_input_stream_func)( + void* ctx, + const char** ptr, + size_t* length); + +typedef yson_output_stream_result (*yson_output_stream_func)( + void* ctx, + const char* ptr, + size_t length); + +/* Abstract types */ + +typedef struct yson_input_stream yson_input_stream; +typedef struct yson_output_stream yson_output_stream; + +typedef struct yson_reader yson_reader; +typedef struct yson_writer yson_writer; + +/* Input stream */ + +yson_input_stream* +yson_input_stream_from_string(const char* ptr, size_t length); + +yson_input_stream* +yson_input_stream_from_file(FILE* file, size_t buffer_size); + +yson_input_stream* +yson_input_stream_from_fd(int fd, size_t buffer_size); + +yson_input_stream* +yson_input_stream_new(void* ctx, yson_input_stream_func callback); + +void yson_input_stream_delete(yson_input_stream* stream); + +/* Output stream */ + +yson_output_stream* +yson_output_stream_from_file(FILE* file, size_t buffer_size); + +yson_output_stream* +yson_output_stream_from_fd(int fd, size_t buffer_size); + +yson_output_stream* +yson_output_stream_new(void* ctx, yson_output_stream_func callback, size_t buffer_size); + +void yson_output_stream_delete(yson_output_stream* stream); + +/* Reader */ + +yson_reader* +yson_reader_new(yson_input_stream* stream, yson_stream_type mode); + +void yson_reader_delete(yson_reader* reader); + +yson_event_type +yson_reader_get_next_event(yson_reader* reader); + +const char* +yson_reader_get_error_message(yson_reader* reader); + +yson_scalar_type +yson_reader_get_scalar_type(yson_reader* reader); + +int yson_reader_get_boolean(yson_reader* reader); + +i64 yson_reader_get_int64(yson_reader* reader); + +ui64 yson_reader_get_uint64(yson_reader* reader); + +double +yson_reader_get_float64(yson_reader* reader); + +const yson_string* +yson_reader_get_string(yson_reader* reader); + +/* Writer */ + +yson_writer* +yson_writer_new_binary( + yson_output_stream* stream, + yson_stream_type mode); + +yson_writer* +yson_writer_new_text( + yson_output_stream* stream, + yson_stream_type mode); + +yson_writer* +yson_writer_new_pretty_text( + yson_output_stream* stream, + yson_stream_type mode, + size_t indent); + +void yson_writer_delete(yson_writer* writer); + +const char* +yson_writer_get_error_message(yson_writer* writer); + +yson_writer_result +yson_writer_write_begin_stream(yson_writer* writer); + +yson_writer_result +yson_writer_write_end_stream(yson_writer* writer); + +yson_writer_result +yson_writer_write_begin_list(yson_writer* writer); + +yson_writer_result +yson_writer_write_end_list(yson_writer* writer); + +yson_writer_result +yson_writer_write_begin_map(yson_writer* writer); + +yson_writer_result +yson_writer_write_end_map(yson_writer* writer); + +yson_writer_result +yson_writer_write_begin_attributes(yson_writer* writer); + +yson_writer_result +yson_writer_write_end_attributes(yson_writer* writer); + +yson_writer_result +yson_writer_write_entity(yson_writer* writer); + +yson_writer_result +yson_writer_write_key(yson_writer* writer, const char* ptr, size_t length); + +yson_writer_result +yson_writer_write_string(yson_writer* writer, const char* ptr, size_t length); + +yson_writer_result +yson_writer_write_int64(yson_writer* writer, i64 value); + +yson_writer_result +yson_writer_write_uint64(yson_writer* writer, ui64 value); + +yson_writer_result +yson_writer_write_boolean(yson_writer* writer, int value); + +yson_writer_result +yson_writer_write_float64(yson_writer* writer, double value); + +#ifdef __cplusplus +} /* extern "C" */ +#endif diff --git a/library/c/cyson/ya.make b/library/c/cyson/ya.make new file mode 100644 index 0000000000..c42965a094 --- /dev/null +++ b/library/c/cyson/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +PEERDIR( + library/cpp/yson_pull +) + +SRCS( + cyson.cpp +) + +END() diff --git a/library/python/cyson/README.md b/library/python/cyson/README.md new file mode 100644 index 0000000000..5cc67f0d2c --- /dev/null +++ b/library/python/cyson/README.md @@ -0,0 +1,99 @@ +# Python-биндинги к libyson + +## Высокоуровневый интерфейс + +Функции `dumps`, `loads` для преобразования строк: +```python +>>> from cyson import dumps, loads + +>>> print dumps(1234) +1234 +>>> print dumps("Hello world! Привет!") +"Hello world! Привет!" +>>> print dumps([1, "foo", None, {'aaa': 'bbb'}]) +[1; "foo"; #; {"aaa" = "bbb"}] +>>> dumps([1, "foo", None, {'aaa': 'bbb'}], format='binary') +'[\x02\x02;\x01\x06foo;#;{\x01\x06aaa=\x01\x06bbb}]' +>>> print dumps([1, "foo", None, {'aaa': 'bbb'}], format='pretty') +[ + 1; + "foo"; + #; + { + "aaa" = "bbb" + } +] + +>>> loads('1234') +1234 +>>> loads('3.14') +3.14 +>>> loads('[1; "foo"; #; {"aaa" = "bbb"}]') +[1, 'foo', None, {'aaa': 'bbb'}] +>>> loads('[\x02\x02;\x01\x06foo;#;{\x01\x06aaa=\x01\x06bbb}]') +[1, 'foo', None, {'aaa': 'bbb'}] +``` + +Функции `list_fragments`, `map_fragments` для удобного чтения из входного +потока. +```python +import cyson + +input = cyson.list_fragments( + cyson.InputStream.from_fd(STDIN_FILENO), + process_table_index=True, +) + +for record in input: + ... +``` + +## Низкоуровневый интерфейс + +### Адаптеры потоков ввода-вывода + +Классы `InputStream`, `OutputStream` не предоставляют никакой функциональности +сами по себе, но позволяют подключить поток ввода/вывода к Reader или Writer. + +Конструкторы классов - статические методы с именами `from_*`: + +```python +input = cyson.InputStream.from_fd(0) +input = cyson.InputStream.from_string("...") +input = cyson.InputStream.from_iter(iter_chunks) + +output = cyson.OutputStream.from_fd(1) +output = cyson.OutputStream.from_file(stringio_file) +``` + +### Reader/Writer + +`Reader` - самый быстрый метод десериализации, и в целом позволяет получать +объекты привычных и ожидаемых типов. При отсутствии атрибутов, порождает +встроенные типы, иначе - `Yson*`. Не позволяет различать `list`/`tuple`, или +получать на входе `set`. + +`Writer` позволяет выводить низкоуровневые элементы потока, или сериализовать +объекты. Для сериализации объектов следует использовать метод `write()`. + +### StrictReader + +`StrictReader` отличается от Reader тем, что всегда создает объекты типа +`Yson*`, независимо от наличия атрибутов. + +Никакое специальное поведение при записи в таком случае не требуется, так что +вместе с ним можно использовать обычный `Writer`. + +### PyReader/PyWriter + +Пара для сериализации-десериализации произвольных python-типов. Тип кодируется +атрибутом `py` у значения. + +Поддержка дополнительных типов добавляется с помощью декораторов +`pywriter_handler`, `pyreader_scalar_handler`, `pyreader_list_handler`, +`pyreader_map_handler`. + +### UnicodeReader + +Декодирует все строки в юникод. Удобен при работе с `python3`, но может +ухудшить производительность. diff --git a/library/python/cyson/cyson/__init__.py b/library/python/cyson/cyson/__init__.py new file mode 100644 index 0000000000..5e0cac2241 --- /dev/null +++ b/library/python/cyson/cyson/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function, absolute_import, division + +from ._cyson import * # noqa + +__all__ = [ + 'loads', 'dumps', 'dumps_into', + 'list_fragments', 'key_switched_list_fragments', 'map_fragments', + 'InputStream', 'OutputStream', + 'Reader', 'Writer', + 'PyReader', 'PyWriter', + 'StrictReader', + 'YsonEntity', 'YsonString', 'YsonInt64', 'YsonUInt64', + 'YsonFloat64', 'YsonBoolean', 'YsonList', 'YsonMap', + 'UInt', 'UnicodeReader', +] diff --git a/library/python/cyson/cyson/_cyson.pyx b/library/python/cyson/cyson/_cyson.pyx new file mode 100644 index 0000000000..ff538b121f --- /dev/null +++ b/library/python/cyson/cyson/_cyson.pyx @@ -0,0 +1,2148 @@ +# -*- coding: utf-8 -*- +#cython: embedsignature=True +#cython: infer_types=True + +cimport cython +cimport libc.stdio +cimport libc.string +cimport libcyson as C + +from libc.stdint cimport uint64_t, int64_t +from libc.stddef cimport size_t + +cimport cpython.pycapsule + +from cpython.bytearray cimport ( + PyByteArray_FromStringAndSize, + PyByteArray_Resize, + PyByteArray_AS_STRING, + PyByteArray_GET_SIZE, +) +from cpython.bytes cimport ( + PyBytes_Check, + PyBytes_FromStringAndSize, + PyBytes_AS_STRING, + PyBytes_GET_SIZE, +) +from cpython.dict cimport PyDict_Next, PyDict_Copy, PyDict_Check +from cpython.int cimport PyInt_FromLong +from cpython.long cimport PyLong_AsUnsignedLong +from cpython.list cimport PyList_GET_SIZE, PyList_GET_ITEM +from cpython.object cimport ( + PyObject, PyTypeObject, + PyObject_GetIter, PyObject_GetItem, PyObject_CallMethod, +) +from cpython.ref cimport Py_DECREF +from cpython.tuple cimport PyTuple_GET_SIZE, PyTuple_GET_ITEM +from cpython.unicode cimport ( + PyUnicode_AsUTF8String, PyUnicode_DecodeUTF8, PyUnicode_Check +) + + +cdef extern from "Python.h": + # Actually returns a new reference (if an iterator isn't exhausted), but + # returns NULL without setting an exception for exhausted iterator (so + # return type isn't `object`). This declaration requires manual `Py_DECREF` + # calls (cython thinks the return value is a borrowed reference). + PyObject* PyIter_Next(object) except? NULL + + +cdef extern from "library/python/cyson/cyson/helpers.h": + libc.stdio.FILE* PyFile_AsFile(object) except NULL + bint PyFile_CheckExact(object) + bint GenericCheckBuffer(object) + + +cdef extern from "library/python/cyson/cyson/helpers.h" namespace "NCYson": + bint PY3 + bint PY2 + + object ConvertPyStringToPyNativeString(object) + bytes ConvertPyStringToPyBytes(object) + object GetCharBufferAndOwner(object, const char**, size_t*) + bytes ConvertPyLongToPyBytes(object) + + +cdef extern from "library/python/cyson/cyson/unsigned_long.h" namespace "NCYson": + PyTypeObject PyUnsignedLong_Type + + type PreparePyUIntType(object) + object ConstructPyNumberFromUint(uint64_t) + + +cdef struct PycStringIO_CAPI: + int (*cread)(object, char **, Py_ssize_t) except -1 + int (*creadline)(object, char **) except -1 + int (*cwrite)(object, const char *, Py_ssize_t) except -1 + bytes (*cgetvalue)(object) + object (*NewOutput)(int) + object (*NewInput)(object) + void* InputType + void* OutputType + + +# set numpy aliases for possible use +cdef np = None +cdef np_import_failed = None +cdef npy_generic = None +cdef npy_integers = None +cdef npy_uintegers = None +cdef npy_floats = None + + +cdef PycStringIO_CAPI* cStringIO_CAPI +if PY2: + cStringIO_CAPI = <PycStringIO_CAPI*>cpython.pycapsule.PyCapsule_Import( + "cStringIO.cStringIO_CAPI", 0 + ) + + +class UnableToSerializeError(TypeError): + def __init__(self, value): + super(UnableToSerializeError, self).__init__(value) + self.value = value + + def __str__(self): + return ( + "Unable to serialize an object of type {!r}: {!r}" + .format(type(self.value), self.value) + ) + + +def _yson_repr(obj): + return ConvertPyStringToPyNativeString(dumps(obj)) + + +UInt = PreparePyUIntType(_yson_repr) + + +class YsonEntity(object): + def __init__(self, attributes=None): + self.attributes = attributes + + def __repr__(self): + return _yson_repr(self) + + def __eq__(self, other): + return type(other) is YsonEntity and self.attributes == other.attributes + + +class YsonString(bytes): + def __new__(cls, value, attributes=None): + self = bytes.__new__(cls, ConvertPyStringToPyBytes(value)) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonInt64(int): + def __new__(cls, value, attributes=None): + self = int.__new__(cls, value) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonUInt64(long): + def __new__(cls, value, attributes=None): + self = long.__new__(cls, value) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonFloat64(float): + def __new__(cls, value, attributes=None): + self = float.__new__(cls, value) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonBoolean(int): + def __new__(cls, value, attributes=None): + self = int.__new__(cls, bool(value)) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonList(list): + def __new__(cls, value, attributes=None): + self = list.__new__(cls, value) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +class YsonMap(dict): + def __new__(cls, value, attributes=None): + self = dict.__new__(cls, value) + self.attributes = attributes + return self + + def __repr__(self): + return _yson_repr(self) + + +# Input + +@cython.freelist(16) +cdef class InputStream: + """YSON input stream adaptor. + + Provides means for YSON Reader to read raw bytes from some stream. + + A proper way to construct an InputStream instance is to use + one of its static constructor methods + (:meth:`from_string`, :meth:`from_file`, etc). + + """ + + cdef object capsule + cdef object data + + def __cinit__(self, capsule, data=None): + self.capsule = capsule + self.data = data + + cdef C.yson_input_stream* ptr(self): + return _c_open_yson_input_stream_capsule(self.capsule) + + @staticmethod + def from_string(value): + """Read from a contiguous memory buffer. + + :param value: Input memory buffer: any object implementing char buffer protocol (i.e. str/bytes). + + """ + + cdef const char* data + cdef size_t size + cdef object holder = GetCharBufferAndOwner(value, &data, &size) + + capsule = _c_make_yson_input_stream_capsule( + C.yson_input_stream_from_string(data, size) + ) + + return InputStream.__new__(InputStream, capsule, holder) + + @staticmethod + def from_file(file_obj, int buffer_size=65536): + """Read from an arbitrary file-like object. + + Two special cases are available within CPython: + - Builtin python file objects (effectively ``FILE*`` wrappers) + - :class:`cStringIO.StringIO` objects + + In other cases ``file_obj`` is required to have a read() method + accepting integer argument. + + """ + + cdef libc.stdio.FILE* c_file + + # Special case for cStringIO streams, both types support reading + t = type(file_obj) + if cStringIO_CAPI and ( + t is <object>cStringIO_CAPI.InputType or + t is <object>cStringIO_CAPI.OutputType + ): + yson_input_stream = C.yson_input_stream_new( + <void*>file_obj, + _c_yson_input_stream_cstringio_read, + ) + # Special case for building file objects (effectively FILE* wrappers) + elif PyFile_CheckExact(file_obj): + yson_input_stream = C.yson_input_stream_from_file( + PyFile_AsFile(file_obj), buffer_size + ) + # General case: use Python read(<size>) method + else: + file_obj = _ReadableHolder.__new__( + _ReadableHolder, file_obj, buffer_size + ) + yson_input_stream = C.yson_input_stream_new( + <void*>file_obj, + _c_yson_input_stream_generic_read, + ) + + return InputStream.__new__( + InputStream, + _c_make_yson_input_stream_capsule(yson_input_stream), + file_obj, + ) + + @staticmethod + def from_fd(int fd, int buffer_size=65536): + """Read from a POSIX file descriptor. + + :param fd: File descriptor number. + :param buffer_size: InputStream internal buffer size. + + """ + + capsule = _c_make_yson_input_stream_capsule( + C.yson_input_stream_from_fd(fd, buffer_size) + ) + return InputStream.__new__(InputStream, capsule) + + @staticmethod + def from_iter(iter_obj): + """Read from a chunked input stream. + + :param iter_obj: An iterator which yields a series + of contiguous memory buffer objects. + + >>> from cyson import InputStream, Reader + >>> def stream(): + ... yield '[' + ... yield '123;' + ... yield 'hel' + ... yield 'lo;' + ... yield 'world;' + ... yield '#;' + ... yield ']' + ... + >>> Reader(InputStream.from_iter(stream()), mode='node').node() + [123, 'hello', 'world', None] + + """ + + holder = _IteratorHolder.__new__(_IteratorHolder, iter_obj) + capsule = _c_make_yson_input_stream_capsule( + C.yson_input_stream_new(<void*> holder, _c_yson_input_stream_next) + ) + return InputStream.__new__(InputStream, capsule, holder) + + +cdef void _c_destroy_yson_input_stream_capsule(object capsule): + cdef C.yson_input_stream* c_stream + c_stream = _c_open_yson_input_stream_capsule(capsule) + C.yson_input_stream_delete(c_stream) + + +cdef inline C.yson_input_stream* _c_open_yson_input_stream_capsule(object capsule): + return <C.yson_input_stream*> cpython.pycapsule.PyCapsule_GetPointer( + capsule, + "yson_input_stream" + ) + + +cdef inline object _c_make_yson_input_stream_capsule(C.yson_input_stream* c_stream): + return cpython.pycapsule.PyCapsule_New( + c_stream, + "yson_input_stream", + _c_destroy_yson_input_stream_capsule, + ) + + +cdef class _IteratorHolder: + cdef object iter_ + cdef object last_buffer_holder_ + + def __cinit__(self, obj): + self.iter_ = obj + self.last_buffer_holder_ = None + + cdef inline get_next_buffer(self, const char** data, size_t* size): + self.last_buffer_holder_ = GetCharBufferAndOwner(next(self.iter_), data, size) + + +cdef class _ReadableHolder: + cdef object fileobj_ + cdef object chunk_size_ + cdef size_t c_chunk_size_ + cdef object last_buffer_holder_ + + def __cinit__(self, fileobj, chunk_size): + self.fileobj_ = fileobj + self.chunk_size_ = chunk_size + self.c_chunk_size_ = <size_t>PyLong_AsUnsignedLong(chunk_size) + + cdef inline read_chunk(self, const char** data, size_t* size): + self.last_buffer_holder_ = GetCharBufferAndOwner( + self.fileobj_.read(self.chunk_size_), data, size + ) + + +cdef C.yson_input_stream_result _c_yson_input_stream_next( + void* ctx, const char** ptr, size_t* length +) except C.YSON_INPUT_STREAM_RESULT_ERROR: + try: + (<_IteratorHolder>ctx).get_next_buffer(ptr, length) + except StopIteration: + return C.YSON_INPUT_STREAM_RESULT_EOF + else: + return C.YSON_INPUT_STREAM_RESULT_OK + + +cdef C.yson_input_stream_result _c_yson_input_stream_cstringio_read( + void* ctx, const char** ptr, size_t* length +) except C.YSON_INPUT_STREAM_RESULT_ERROR: + """Callback for reading from cStringIO.StringIO objects.""" + + nread = cStringIO_CAPI.cread(<object>ctx, <char**>ptr, -1) + length[0] = nread + + if nread > 0: + return C.YSON_INPUT_STREAM_RESULT_OK + if nread == 0: + return C.YSON_INPUT_STREAM_RESULT_EOF + else: + return C.YSON_INPUT_STREAM_RESULT_ERROR + + +cdef C.yson_input_stream_result _c_yson_input_stream_generic_read( + void* ctx, const char** ptr, size_t* length +) except C.YSON_INPUT_STREAM_RESULT_ERROR: + """Callback for reading from arbitrary Python file-like objects.""" + + cdef _ReadableHolder holder = <_ReadableHolder>ctx + + holder.read_chunk(ptr, length) + + if length[0] > holder.c_chunk_size_: + raise RuntimeError( + 'reading inconsistency: {} bytes were read, but only {} requested' + .format(length[0], holder.chunk_size_) + ) + + if length[0] > 0: + return C.YSON_INPUT_STREAM_RESULT_OK + elif length[0] == 0: + return C.YSON_INPUT_STREAM_RESULT_EOF + else: + return C.YSON_INPUT_STREAM_RESULT_ERROR + + +# Output + +@cython.freelist(16) +cdef class OutputStream: + """YSON output stream adaptor. + + Provides means for YSON Writer to write raw bytes to some stream. + + A proper way to construct an InputStream instance is to use + one of its static constructor methods + (:meth:`from_file`, :meth:`from_fd`). + + To write into a string, use :meth:`to_file` with + a :class:`cStringIO.StringIO` instance. + + To write into a custom stream, provide your object + with ``write()`` method. + + """ + + cdef object capsule + cdef object data + + def __cinit__(self, capsule, data = None): + self.capsule = capsule + self.data = data + + cdef C.yson_output_stream* ptr(self): + return _c_open_yson_output_stream_capsule(self.capsule) + + @staticmethod + def from_file(file_obj, int buffer_size=65536): + """Write to an arbitrary file-like object. + + A file object is required to have a write() method accepting str/bytes + arguments. + + Two special cases are available within CPython: + - Builtin python file objects (effectively ``FILE*`` wrappers). + - :class:`cStringIO.StringIO` objects. + + For these cases, special optimized implementations are used. + + """ + + cdef libc.stdio.FILE* c_file + + # Special case for cStringIO streams + if cStringIO_CAPI and type(file_obj) is <object> cStringIO_CAPI.OutputType: + capsule = _c_make_yson_output_stream_capsule( + C.yson_output_stream_new( + <void*> file_obj, + _c_yson_output_stream_cstringio_write, + buffer_size + ) + ) + # Special case for builting file objects (effectively FILE* wrappers) + elif PyFile_CheckExact(file_obj): + c_file = PyFile_AsFile(file_obj) + capsule = _c_make_yson_output_stream_capsule( + C.yson_output_stream_from_file( + c_file, + buffer_size + ) + ) + # General case: use python write() method + else: + capsule = _c_make_yson_output_stream_capsule( + C.yson_output_stream_new( + <void*> file_obj, + _c_yson_output_stream_write, + buffer_size + ) + ) + + return OutputStream.__new__(OutputStream, capsule, file_obj) + + @staticmethod + def from_fd(int fd, int buffer_size=65536): + """Write to a POSIX file descriptor. + + :param fd: File descriptor number. + :param buffer_size: OutputStream internal buffer size. + + """ + + capsule = _c_make_yson_output_stream_capsule( + C.yson_output_stream_from_fd(fd, buffer_size) + ) + return OutputStream.__new__(OutputStream, capsule) + + @staticmethod + def from_bytearray(bytearray dest, int buffer_size=0): + capsule = _c_make_yson_output_stream_capsule( + C.yson_output_stream_new( + <void*> dest, + _c_yson_output_stream_bytearray_write, + buffer_size + ) + ) + return OutputStream.__new__(OutputStream, capsule, dest) + + +cdef void _c_destroy_yson_output_stream_capsule(object capsule): + cdef C.yson_output_stream* c_stream + c_stream = _c_open_yson_output_stream_capsule(capsule) + C.yson_output_stream_delete(c_stream) + + +cdef inline C.yson_output_stream* _c_open_yson_output_stream_capsule(object capsule): + return <C.yson_output_stream*> cpython.pycapsule.PyCapsule_GetPointer( + capsule, + "yson_output_stream" + ) + + +cdef inline object _c_make_yson_output_stream_capsule(C.yson_output_stream* c_stream): + return cpython.pycapsule.PyCapsule_New( + c_stream, + "yson_output_stream", + _c_destroy_yson_output_stream_capsule, + ) + + +cdef C.yson_output_stream_result _c_yson_output_stream_write( + void* ctx, + const char* ptr, + size_t length +) except C.YSON_OUTPUT_STREAM_RESULT_ERROR: + """Callback for writing into arbitrary Python file objects.""" + + obj = <object> ctx + data = PyBytes_FromStringAndSize(ptr, length) + obj.write(data) + return C.YSON_OUTPUT_STREAM_RESULT_OK + + +cdef C.yson_output_stream_result _c_yson_output_stream_cstringio_write( + void* ctx, + const char* ptr, + size_t length +) except C.YSON_OUTPUT_STREAM_RESULT_ERROR: + """Callback for writing into cStringIO.StringIO objects.""" + + obj = <object> ctx + cStringIO_CAPI.cwrite(obj, ptr, length); + return C.YSON_OUTPUT_STREAM_RESULT_OK + + +cdef C.yson_output_stream_result _c_yson_output_stream_bytearray_write( + void* ctx, + const char* ptr, + size_t length +) except C.YSON_OUTPUT_STREAM_RESULT_ERROR: + """Callback for writing into bytearray objects.""" + + cdef bytearray obj = <bytearray>ctx + cdef size_t old_length = PyByteArray_GET_SIZE(obj) + + PyByteArray_Resize(obj, old_length + length) + libc.string.memcpy(PyByteArray_AS_STRING(obj) + old_length, ptr, length) + + return C.YSON_OUTPUT_STREAM_RESULT_OK + + +# Reader + +cdef class ListFragmentIterator + +@cython.freelist(16) +cdef class Reader: + cdef C.yson_reader* c_reader + cdef InputStream stream + + def __cinit__(self, InputStream stream not None, mode=b'node'): + """Create a YSON Reader for reading from ``stream``. + + Attributes on values are ignored. + + :param stream: Input stream object. + :param mode: Input stream shape: 'node', 'list_frament', 'map_fragment'. + + """ + cdef C.yson_stream_type stream_type + + cdef bytes bytes_mode = ConvertPyStringToPyBytes(mode) + + if bytes_mode == b'node': + stream_type = C.YSON_STREAM_TYPE_NODE + elif bytes_mode == b'list_fragment': + stream_type = C.YSON_STREAM_TYPE_LIST_FRAGMENT + elif bytes_mode == b'map_fragment': + stream_type = C.YSON_STREAM_TYPE_MAP_FRAGMENT + else: + raise ValueError("Invalid reader mode {!r}".format(bytes_mode)) + + self.c_reader = C.yson_reader_new( + stream.ptr(), + stream_type + ) + self.stream = stream + + def __dealloc__(self): + C.yson_reader_delete(self.c_reader) + + cdef _scalar_handler(self, value, dict attributes): + if value is None: + return YsonEntity(attributes) + return value + + cdef _list_handler(self, ListFragmentIterator items, dict attributes): + return YsonList(items) + + cdef _map_handler(self, MapFragmentIterator items, dict attributes): + return YsonMap(items) + + cdef _read_object(self, C.yson_event_type event_type): + return _reader_read_object(self, event_type) + + def node(self): + """Read whole input stream as a single YSON node. + + >>> from cyson import InputStream, Reader + + >>> s = '{key=1;value=foo}' + >>> r = Reader(InputStream.from_string(s), mode='node') + >>> r.node() == {'value': 'foo', 'key': 1} + True + + Invalid stream shape results in exception: + + >>> s = '{key=1;value=foo}; {key=2; value=bar}' + >>> r = Reader(InputStream.from_string(s), mode='node') + >>> r.node() + Traceback (most recent call last): + ... + ValueError: Invalid YSON at offset 17: Expected stream end, but found ";" + + """ + + cdef C.yson_event_type event_type + + event_type = _c_yson_reader_get_next_event(self.c_reader) + assert event_type == C.YSON_EVENT_BEGIN_STREAM + + event_type = _c_yson_reader_get_next_event(self.c_reader) + result = self._read_object(event_type) + + event_type = _c_yson_reader_get_next_event(self.c_reader) + assert event_type == C.YSON_EVENT_END_STREAM + + return result + + def list_fragments( + self, process_table_index=False, process_attributes=False, + stop_at_key_switch=False, keep_control_records=False + ): + """Iterate over input stream as a sequence of objects. + + >>> from cyson import InputStream, Reader + + >>> s = '{key=1;value=foo}; {key=2; value=bar}' + >>> r = Reader(InputStream.from_string(s), mode='list_fragment') + >>> l = list(r.list_fragments()) + >>> l == [{'value': 'foo', 'key': 1}, {'value': 'bar', 'key': 2}] + True + + """ + + cdef C.yson_event_type event_type + + event_type = _c_yson_reader_get_next_event(self.c_reader) + assert event_type == C.YSON_EVENT_BEGIN_STREAM + + # NOTE: for backward compatibility purpose + process_attributes = process_attributes or process_table_index + + return ListFragmentIterator( + self, + C.YSON_EVENT_END_STREAM, + process_attributes, + stop_at_key_switch, + keep_control_records, + ) + + def map_fragments(self): + """Iterate over input stream as a sequence of (key, value) pairs. + + >>> from cyson import InputStream, Reader + + >>> s = 'a=b; c=d; e=[1;3;4]' + >>> r = Reader(InputStream.from_string(s), mode='map_fragment') + >>> list(r.map_fragments()) + [('a', 'b'), ('c', 'd'), ('e', [1, 3, 4])] + + """ + + cdef C.yson_event_type event_type + + event_type = _c_yson_reader_get_next_event(self.c_reader) + assert event_type == C.YSON_EVENT_BEGIN_STREAM + + return MapFragmentIterator(self, C.YSON_EVENT_END_STREAM) + + +@cython.freelist(16) +cdef class ListFragmentIterator: + cdef Reader reader + cdef readonly bint at_begin + cdef readonly bint at_end + cdef bint stop_at_key_switch + cdef C.yson_event_type end_event + cdef bint process_attributes + cdef bint keep_control_records + cdef readonly int table_index + cdef int64_t row_index_base + cdef int64_t row_index_offset + cdef int64_t range_index + cdef readonly bint is_key_switched + + def __cinit__( + self, + Reader reader not None, + C.yson_event_type end_event, + bint process_attributes, + bint stop_at_key_switch, + bint keep_control_records + ): + self.reader = reader + self.at_begin = True + self.at_end = False + self.end_event = end_event + self.process_attributes = process_attributes + self.stop_at_key_switch = stop_at_key_switch + self.keep_control_records = keep_control_records + self.table_index = 0 + self.row_index_base = -1 + self.row_index_offset = -1 + self.range_index = -1 + self.is_key_switched = False + + def __iter__(self): + return self + + cpdef close(self): + if not self.at_end: + for _ in self: + pass + self.at_end = True + + def __next__(self): + cdef C.yson_event_type event_type + cdef bint was_at_begin = self.at_begin + + if self.at_end: + raise StopIteration + + self.at_begin = False + self.is_key_switched = False + + event_type = _c_yson_reader_get_next_event(self.reader.c_reader) + if event_type == self.end_event: + self.at_end = True + raise StopIteration + + value = self.reader._read_object(event_type) + if self.process_attributes and isinstance(value, YsonEntity): + self.do_process_attributes(value.attributes) + + if not was_at_begin and self.stop_at_key_switch and self.is_key_switched: + raise StopIteration + + if self.keep_control_records: + return value + + return self.__next__() + else: + self.row_index_offset += 1 + return value + + property row_index: + def __get__(self): + if self.row_index_base >= 0: + return self.row_index_base + self.row_index_offset + + property range_index: + def __get__(self): + if self.range_index >= 0: + return self.range_index + + cdef do_process_attributes(self, dict attributes): + table_index = attributes.get(b'table_index') + if table_index is not None: + self.table_index = table_index + + row_index = attributes.get(b'row_index') + if row_index is not None: + self.row_index_base = row_index + self.row_index_offset = -1 + + range_index = attributes.get(b'range_index') + if range_index is not None: + self.range_index = range_index + + key_switch = attributes.get(b'key_switch') + if key_switch is not None: + self.is_key_switched = key_switch + + +cdef class MapFragmentIterator: + cdef Reader reader + cdef bint at_end + cdef C.yson_event_type end_event + + def __cinit__(self, + Reader reader not None, + C.yson_event_type end_event): + self.reader = reader + self.at_end = False + self.end_event = end_event + + def __iter__(self): + return self + + cpdef close(self): + if not self.at_end: + for _ in self: + pass + self.at_end = True + + def __next__(self): + cdef C.yson_event_type event_type + + if self.at_end: + raise StopIteration + + event_type = _c_yson_reader_get_next_event(self.reader.c_reader) + if event_type == self.end_event: + self.at_end = True + raise StopIteration + + key = _c_yson_reader_get_byte_string(self.reader.c_reader) + value = self.reader._read_object( + _c_yson_reader_get_next_event(self.reader.c_reader) + ) + return key, value + + +cdef inline bytes _c_yson_reader_get_error(C.yson_reader* c_reader): + return <bytes> C.yson_reader_get_error_message(c_reader) + + +cdef inline C.yson_event_type _c_yson_reader_get_next_event( + C.yson_reader* c_reader +) except C.YSON_EVENT_ERROR: + cdef C.yson_event_type event_type + + event_type = C.yson_reader_get_next_event(c_reader) + # A propagated exception would have fired earlier + if event_type == C.YSON_EVENT_ERROR: + raise ValueError(_c_yson_reader_get_error(c_reader)) + return event_type + + +cdef object _c_yson_reader_get_scalar(C.yson_reader* c_reader): + cdef C.yson_scalar_type scalar_type + + scalar_type = C.yson_reader_get_scalar_type(c_reader) + if scalar_type == C.YSON_SCALAR_ENTITY: + return None + elif scalar_type == C.YSON_SCALAR_BOOLEAN: + return C.yson_reader_get_boolean(c_reader) + elif scalar_type == C.YSON_SCALAR_INT64: + return PyInt_FromLong(C.yson_reader_get_int64(c_reader)) + elif scalar_type == C.YSON_SCALAR_UINT64: + return ConstructPyNumberFromUint(C.yson_reader_get_uint64(c_reader)) + elif scalar_type == C.YSON_SCALAR_FLOAT64: + return C.yson_reader_get_float64(c_reader) + elif scalar_type == C.YSON_SCALAR_STRING: + return _c_yson_reader_get_byte_string(c_reader) + + +cdef inline bytes _c_yson_reader_get_byte_string(C.yson_reader* c_reader): + cdef const C.yson_string* ref + + ref = C.yson_reader_get_string(c_reader) + return PyBytes_FromStringAndSize(ref.ptr, ref.length) + + +cdef list _reader_read_list(Reader reader): + cdef C.yson_event_type event_type + + result = [] + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == C.YSON_EVENT_END_LIST: + return result + else: + result.append(_reader_read_object(reader, event_type)) + + +cdef dict _reader_read_map( + Reader reader, + C.yson_event_type end = C.YSON_EVENT_END_MAP, +): + cdef C.yson_event_type event_type + + result = {} + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == end: + return result + else: + key = _c_yson_reader_get_byte_string(reader.c_reader) + value = _reader_read_object( + reader, + _c_yson_reader_get_next_event(reader.c_reader) + ) + result[key] = value + + +cdef _reader_read_object( + Reader reader, + C.yson_event_type event_type, +): + if event_type == C.YSON_EVENT_SCALAR: + return _c_yson_reader_get_scalar(reader.c_reader) + + elif event_type == C.YSON_EVENT_BEGIN_LIST: + return _reader_read_list(reader) + + elif event_type == C.YSON_EVENT_BEGIN_MAP: + return _reader_read_map(reader) + + elif event_type == C.YSON_EVENT_BEGIN_ATTRIBUTES: + attributes = _reader_read_map(reader, C.YSON_EVENT_END_ATTRIBUTES) + event_type = _c_yson_reader_get_next_event(reader.c_reader) + return _reader_read_object_with_attributes(reader, event_type, attributes) + + +cdef _reader_read_object_with_attributes( + Reader reader, + C.yson_event_type event_type, + dict attributes, +): + cdef ListFragmentIterator l_items + cdef MapFragmentIterator m_items + + if event_type == C.YSON_EVENT_SCALAR: + return reader._scalar_handler( + _c_yson_reader_get_scalar(reader.c_reader), + attributes + ) + + elif event_type == C.YSON_EVENT_BEGIN_LIST: + l_items = ListFragmentIterator.__new__( + ListFragmentIterator, + reader, + C.YSON_EVENT_END_LIST, + False, + False, + False + ) + result = reader._list_handler(l_items, attributes) + l_items.close() + return result + + elif event_type == C.YSON_EVENT_BEGIN_MAP: + m_items = MapFragmentIterator.__new__( + MapFragmentIterator, + reader, + C.YSON_EVENT_END_MAP, + ) + result = reader._map_handler(m_items, attributes) + m_items.close() + return result + + +# StrictReader + +cdef class StrictReader(Reader): + + cdef _read_object(self, C.yson_event_type event_type): + return _strict_reader_read_object(self, event_type) + + +cdef object _c_yson_reader_get_scalar_strict(C.yson_reader* c_reader): + cdef C.yson_scalar_type scalar_type + + scalar_type = C.yson_reader_get_scalar_type(c_reader) + if scalar_type == C.YSON_SCALAR_ENTITY: + return YsonEntity() + elif scalar_type == C.YSON_SCALAR_BOOLEAN: + return YsonBoolean(C.yson_reader_get_boolean(c_reader)) + elif scalar_type == C.YSON_SCALAR_INT64: + return YsonInt64(C.yson_reader_get_int64(c_reader)) + elif scalar_type == C.YSON_SCALAR_UINT64: + return YsonUInt64(C.yson_reader_get_uint64(c_reader)) + elif scalar_type == C.YSON_SCALAR_FLOAT64: + return YsonFloat64(C.yson_reader_get_float64(c_reader)) + elif scalar_type == C.YSON_SCALAR_STRING: + return YsonString(_c_yson_reader_get_byte_string(c_reader)) + + +cdef _strict_reader_read_list(Reader reader): + cdef C.yson_event_type event_type + + result = YsonList([]) + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == C.YSON_EVENT_END_LIST: + return result + else: + result.append(_strict_reader_read_object(reader, event_type)) + + +cdef _strict_reader_read_map( + Reader reader, + C.yson_event_type end = C.YSON_EVENT_END_MAP, +): + cdef C.yson_event_type event_type + + result = YsonMap({}) + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == end: + return result + else: + key = _c_yson_reader_get_byte_string(reader.c_reader) + value = _strict_reader_read_object( + reader, + _c_yson_reader_get_next_event(reader.c_reader) + ) + result[key] = value + + +cdef _strict_reader_read_object( + Reader reader, + C.yson_event_type event_type, +): + if event_type == C.YSON_EVENT_SCALAR: + return _c_yson_reader_get_scalar_strict(reader.c_reader) + + elif event_type == C.YSON_EVENT_BEGIN_LIST: + return _strict_reader_read_list(reader) + + elif event_type == C.YSON_EVENT_BEGIN_MAP: + return _strict_reader_read_map(reader) + + elif event_type == C.YSON_EVENT_BEGIN_ATTRIBUTES: + attributes = _strict_reader_read_map(reader, C.YSON_EVENT_END_ATTRIBUTES) + event_type = _c_yson_reader_get_next_event(reader.c_reader) + obj = _strict_reader_read_object(reader, event_type) + obj.attributes = PyDict_Copy(attributes) + return obj + + +# UnicodeDecodeReader + +cdef class UnicodeReader(Reader): + + cdef _read_object(self, C.yson_event_type event_type): + return _unicode_reader_read_object(self, event_type) + + +cdef inline unicode _c_yson_reader_get_unicode_string(C.yson_reader* c_reader): + cdef const C.yson_string* ref + + ref = C.yson_reader_get_string(c_reader) + return PyUnicode_DecodeUTF8(ref.ptr, ref.length, NULL) + + +cdef object _c_yson_unicode_reader_get_scalar(C.yson_reader* c_reader): + cdef C.yson_scalar_type scalar_type + + scalar_type = C.yson_reader_get_scalar_type(c_reader) + if scalar_type == C.YSON_SCALAR_ENTITY: + return None + elif scalar_type == C.YSON_SCALAR_BOOLEAN: + return C.yson_reader_get_boolean(c_reader) + elif scalar_type == C.YSON_SCALAR_INT64: + return PyInt_FromLong(C.yson_reader_get_int64(c_reader)) + elif scalar_type == C.YSON_SCALAR_UINT64: + return ConstructPyNumberFromUint(C.yson_reader_get_uint64(c_reader)) + elif scalar_type == C.YSON_SCALAR_FLOAT64: + return C.yson_reader_get_float64(c_reader) + elif scalar_type == C.YSON_SCALAR_STRING: + return _c_yson_reader_get_unicode_string(c_reader) + + +cdef list _unicode_reader_read_list(Reader reader): + cdef C.yson_event_type event_type + + result = [] + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == C.YSON_EVENT_END_LIST: + return result + else: + result.append(_unicode_reader_read_object(reader, event_type)) + + +cdef dict _unicode_reader_read_map( + Reader reader, + C.yson_event_type end = C.YSON_EVENT_END_MAP, +): + cdef C.yson_event_type event_type + + result = {} + while True: + event_type = _c_yson_reader_get_next_event(reader.c_reader) + if event_type == end: + return result + else: + key = _c_yson_reader_get_unicode_string(reader.c_reader) + value = _unicode_reader_read_object( + reader, + _c_yson_reader_get_next_event(reader.c_reader), + ) + result[key] = value + + +cdef _unicode_reader_read_object_with_attributes( + Reader reader, + C.yson_event_type event_type, + dict attributes, +): + cdef ListFragmentIterator l_items + cdef MapFragmentIterator m_items + + if event_type == C.YSON_EVENT_SCALAR: + return reader._scalar_handler( + _c_yson_unicode_reader_get_scalar(reader.c_reader), + attributes + ) + + elif event_type == C.YSON_EVENT_BEGIN_LIST: + l_items = ListFragmentIterator.__new__( + ListFragmentIterator, + reader, + C.YSON_EVENT_END_LIST, + False, + False, + False + ) + result = reader._list_handler(l_items, attributes) + l_items.close() + return result + + elif event_type == C.YSON_EVENT_BEGIN_MAP: + m_items = MapFragmentIterator.__new__( + MapFragmentIterator, + reader, + C.YSON_EVENT_END_MAP, + ) + result = reader._map_handler(m_items, attributes) + m_items.close() + return result + + +cdef _unicode_reader_read_object( + Reader reader, + C.yson_event_type event_type, +): + if event_type == C.YSON_EVENT_SCALAR: + return _c_yson_unicode_reader_get_scalar(reader.c_reader) + + elif event_type == C.YSON_EVENT_BEGIN_LIST: + return _unicode_reader_read_list(reader) + + elif event_type == C.YSON_EVENT_BEGIN_MAP: + return _unicode_reader_read_map(reader) + + elif event_type == C.YSON_EVENT_BEGIN_ATTRIBUTES: + attributes = _reader_read_map(reader, C.YSON_EVENT_END_ATTRIBUTES) + event_type = _c_yson_reader_get_next_event(reader.c_reader) + return _unicode_reader_read_object_with_attributes( + reader, event_type, attributes) + + +# Writer + +@cython.freelist(16) +cdef class Writer: + cdef C.yson_writer* c_writer + cdef OutputStream stream + + def __cinit__( + self, + OutputStream stream not None, + format=b'text', + mode=b'node', + int indent=4 + ): + cdef C.yson_stream_type stream_type + + cdef bytes bytes_mode = ConvertPyStringToPyBytes(mode) + cdef bytes bytes_format = ConvertPyStringToPyBytes(format) + + if bytes_mode == b'node': + stream_type = C.YSON_STREAM_TYPE_NODE + elif bytes_mode == b'list_fragment': + stream_type = C.YSON_STREAM_TYPE_LIST_FRAGMENT + elif bytes_mode == b'map_fragment': + stream_type = C.YSON_STREAM_TYPE_MAP_FRAGMENT + else: + raise ValueError("Invalid writer mode {!r}".format(bytes_mode)) + + if bytes_format == b'text': + self.c_writer = C.yson_writer_new_text( + stream.ptr(), + stream_type, + ) + elif bytes_format == b'pretty': + self.c_writer = C.yson_writer_new_pretty_text( + stream.ptr(), + stream_type, + indent, + ) + elif bytes_format == b'binary': + self.c_writer = C.yson_writer_new_binary( + stream.ptr(), + stream_type, + ) + else: + raise ValueError("Bad YSON format {!r}".format(bytes_format)) + + self.stream = stream + + def __dealloc__(self): + C.yson_writer_delete(self.c_writer) + + def begin_stream(self): + _c_writer_begin_stream(self.c_writer) + return self + + def end_stream(self): + _c_writer_end_stream(self.c_writer) + return self + + def begin_list(self): + _c_writer_begin_list(self.c_writer) + return self + + def end_list(self): + _c_writer_end_list(self.c_writer) + return self + + def begin_map(self): + _c_writer_begin_map(self.c_writer) + return self + + def end_map(self): + _c_writer_end_map(self.c_writer) + return self + + def begin_attributes(self): + _c_writer_begin_attributes(self.c_writer) + return self + + def end_attributes(self): + _c_writer_end_attributes(self.c_writer) + return self + + def entity(self): + _c_writer_entity(self.c_writer) + return self + + def key(self, value): + _c_writer_key(self.c_writer, value) + return self + + def string(self, value): + if isinstance(value, bytes): + _c_writer_bytes(self.c_writer, <bytes>value) + elif isinstance(value, unicode): + _c_writer_unicode(self.c_writer, <unicode>value) + else: + _c_writer_string(self.c_writer, value) + + return self + + def int64(self, int64_t value): + _c_writer_int64(self.c_writer, value) + return self + + def uint64(self, uint64_t value): + _c_writer_uint64(self.c_writer, value) + return self + + def boolean(self, bint value): + _c_writer_boolean(self.c_writer, value) + return self + + def float64(self, double value): + _c_writer_float64(self.c_writer, value) + return self + + def attributes(self, attrs): + _c_writer_begin_attributes(self.c_writer) + + if isinstance(attrs, dict): + self._dict_common(<dict>attrs) + else: + self._mapping_common(attrs) + + _c_writer_end_attributes(self.c_writer) + + return self + + def list(self, obj): + _c_writer_begin_list(self.c_writer) + for item in obj: + self.write(item) + _c_writer_end_list(self.c_writer) + return self + + def map(self, obj): + _c_writer_begin_map(self.c_writer) + + if isinstance(obj, dict): + self._dict_common(<dict>obj) + else: + self._mapping_common(obj) + + _c_writer_end_map(self.c_writer) + + return self + + cpdef write(self, obj): + _c_writer_write(self.c_writer, obj) + return self + + def switch_table(self, int index): + self.write(YsonEntity({'table_index': index})) + return self + + cdef inline _dict_common(self, dict obj): + cdef PyObject* key + cdef PyObject* value + cdef Py_ssize_t pos = 0 + + while PyDict_Next(obj, &pos, &key, &value): + _c_writer_key(self.c_writer, <object>key) + self.write(<object>value) + + cdef inline _mapping_common(self, obj): + for key in obj.keys(): + _c_writer_key(self.c_writer, key) + self.write(obj[key]) + + +cdef inline int _c_writer_check_exc(C.yson_writer_result result) except? 0: + if result == C.YSON_WRITER_RESULT_OK: + return 1 + return 0 + + +cdef inline _c_writer_check( + C.yson_writer* c_writer, + C.yson_writer_result result +): + _c_writer_check_exc(result) + if result == C.YSON_WRITER_RESULT_BAD_STREAM: + raise RuntimeError(C.yson_writer_get_error_message(c_writer)) + elif result == C.YSON_WRITER_RESULT_ERROR: + raise IOError(C.yson_writer_get_error_message(c_writer)) + + +cdef _c_writer_begin_stream(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_begin_stream(c_writer) + ) + + +cdef _c_writer_end_stream(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_end_stream(c_writer) + ) + + +cdef _c_writer_begin_list(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_begin_list(c_writer) + ) + + +cdef _c_writer_end_list(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_end_list(c_writer) + ) + + +cdef _c_writer_begin_map(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_begin_map(c_writer) + ) + + +cdef _c_writer_end_map(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_end_map(c_writer) + ) + + +cdef _c_writer_begin_attributes(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_begin_attributes(c_writer) + ) + + +cdef _c_writer_end_attributes(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_end_attributes(c_writer) + ) + + +cdef _c_writer_entity(C.yson_writer* c_writer): + _c_writer_check( + c_writer, + C.yson_writer_write_entity(c_writer) + ) + + +# fallback if key is not bytes or unicode +cdef _c_writer_key(C.yson_writer* c_writer, value): + cdef const char* data + cdef size_t size + cdef object holder = GetCharBufferAndOwner(value, &data, &size) + + _c_writer_check( + c_writer, + C.yson_writer_write_key(c_writer, data, size) + ) + + +# fallback if string-like object is not bytes or unicode +cdef _c_writer_string(C.yson_writer* c_writer, value): + cdef const char* data + cdef size_t size + cdef object holder = GetCharBufferAndOwner(value, &data, &size) + + _c_writer_check( + c_writer, + C.yson_writer_write_string(c_writer, data, size) + ) + + +cdef _c_writer_key_bytes(C.yson_writer* c_writer, bytes value): + _c_writer_check( + c_writer, + C.yson_writer_write_key( + c_writer, + PyBytes_AS_STRING(value), + PyBytes_GET_SIZE(value), + ) + ) + + +cdef inline _c_writer_key_unicode(C.yson_writer* c_writer, unicode value): + _c_writer_key_bytes(c_writer, PyUnicode_AsUTF8String(value)) + + +cdef _c_writer_bytes(C.yson_writer* c_writer, bytes value): + _c_writer_check( + c_writer, + C.yson_writer_write_string( + c_writer, + PyBytes_AS_STRING(value), + PyBytes_GET_SIZE(value), + ) + ) + + +cdef inline _c_writer_unicode(C.yson_writer* c_writer, unicode value): + _c_writer_bytes(c_writer, PyUnicode_AsUTF8String(value)) + + +cdef _c_writer_int64(C.yson_writer* c_writer, int64_t value): + _c_writer_check( + c_writer, + C.yson_writer_write_int64(c_writer, value) + ) + + +cdef _c_writer_uint64(C.yson_writer* c_writer, uint64_t value): + _c_writer_check( + c_writer, + C.yson_writer_write_uint64(c_writer, value) + ) + + +cdef _c_writer_boolean(C.yson_writer* c_writer, bint value): + _c_writer_check( + c_writer, + C.yson_writer_write_boolean(c_writer, value) + ) + + +cdef _c_writer_float64(C.yson_writer* c_writer, double value): + _c_writer_check( + c_writer, + C.yson_writer_write_float64(c_writer, value) + ) + + +cdef _c_writer_list(C.yson_writer* c_writer, list value): + cdef Py_ssize_t index + + _c_writer_begin_list(c_writer) + + for index in range(PyList_GET_SIZE(value)): + _c_writer_write(c_writer, <object>PyList_GET_ITEM(value, index)) + + _c_writer_end_list(c_writer) + + +cdef _c_writer_tuple(C.yson_writer* c_writer, tuple value): + cdef Py_ssize_t index + + _c_writer_begin_list(c_writer) + + for index in range(PyTuple_GET_SIZE(value)): + _c_writer_write(c_writer, <object>PyTuple_GET_ITEM(value, index)) + + _c_writer_end_list(c_writer) + + +cdef inline _c_writer_dict_common(C.yson_writer* c_writer, dict mapping): + cdef PyObject* key + cdef PyObject* value + cdef Py_ssize_t pos = 0 + + while PyDict_Next(mapping, &pos, &key, &value): + if PyBytes_Check(<object>key): + _c_writer_key_bytes(c_writer, <bytes>key) + elif PyUnicode_Check(<object>key): + _c_writer_key_unicode(c_writer, <unicode>key) + else: + _c_writer_key(c_writer, <object>key) + + _c_writer_write(c_writer, <object>value) + + +# fallback if attributes is not dict +cdef inline _c_writer_mapping_common(C.yson_writer* c_writer, mapping): + cdef object keys_iter = PyObject_GetIter(PyObject_CallMethod(mapping, "keys", NULL)) + cdef PyObject* key = PyIter_Next(keys_iter) + + while key: + if PyBytes_Check(<object>key): + _c_writer_key_bytes(c_writer, <bytes>key) + elif PyUnicode_Check(<object>key): + _c_writer_key_unicode(c_writer, <unicode>key) + else: + _c_writer_key(c_writer, <object>key) + + _c_writer_write(c_writer, PyObject_GetItem(mapping, <object>key)) + + # See note in `PyIter_Next` declaration + Py_DECREF(<object>key) + + key = PyIter_Next(keys_iter) + + +cdef _c_writer_dict(C.yson_writer* c_writer, dict value): + _c_writer_begin_map(c_writer) + _c_writer_dict_common(c_writer, value) + _c_writer_end_map(c_writer) + + +cdef _c_writer_attributes(C.yson_writer* c_writer, value): + _c_writer_begin_attributes(c_writer) + + if PyDict_Check(value): + _c_writer_dict_common(c_writer, <dict>value) + else: + _c_writer_mapping_common(c_writer, value) + + _c_writer_end_attributes(c_writer) + + +cdef _c_writer_write(C.yson_writer* c_writer, value): + t = type(value) + + if t is bytes: + _c_writer_bytes(c_writer, <bytes>value) + elif t is int: + _c_writer_int64(c_writer, value) + elif t is dict: + _c_writer_dict(c_writer, <dict>value) + elif t is tuple: + _c_writer_tuple(c_writer, <tuple>value) + elif t is list: + _c_writer_list(c_writer, <list>value) + elif t is float: + _c_writer_float64(c_writer, value) + elif t is type(None): + _c_writer_entity(c_writer) + elif PY2 and t is long: + _c_writer_uint64(c_writer, value) + elif t is <type>&PyUnsignedLong_Type: + _c_writer_uint64(c_writer, value) + elif t is unicode: + _c_writer_unicode(c_writer, <unicode>value) + else: + _c_writer_write_fallback(c_writer, value) + + +cdef _c_writer_write_fallback(C.yson_writer* c_writer, obj): + if isinstance(obj, (YsonEntity, YsonString, YsonInt64, YsonUInt64, + YsonFloat64, YsonBoolean, YsonList, YsonMap)) and \ + obj.attributes is not None: + _c_writer_attributes(c_writer, obj.attributes) + + if isinstance(obj, (bytes, unicode, YsonString)): + _c_writer_string(c_writer, obj) + elif isinstance(obj, (bool, YsonBoolean)): + _c_writer_boolean(c_writer, obj) + elif isinstance(obj, YsonUInt64): + _c_writer_uint64(c_writer, obj) + elif isinstance(obj, int): + _c_writer_int64(c_writer, obj) + elif PY2 and isinstance(obj, long): + _c_writer_uint64(c_writer, obj) + elif isinstance(obj, float): + _c_writer_float64(c_writer, obj) + elif isinstance(obj, (type(None), YsonEntity)): + _c_writer_entity(c_writer) + elif isinstance(obj, list): + _c_writer_list(c_writer, <list>obj) + elif isinstance(obj, tuple): + _c_writer_tuple(c_writer, <tuple>obj) + elif isinstance(obj, dict): + _c_writer_dict(c_writer, <dict>obj) + else: + if np_import_failed is None: + load_numpy_symbols() + + if np is not None and isinstance(obj, npy_generic): + if isinstance(obj, npy_integers): + _c_writer_int64(c_writer, obj) + elif isinstance(obj, npy_uintegers): + _c_writer_uint64(c_writer, obj) + elif isinstance(obj, npy_floats): + _c_writer_float64(c_writer, obj) + else: + raise UnableToSerializeError(obj) + elif GenericCheckBuffer(obj): + _c_writer_string(c_writer, obj) + else: + raise UnableToSerializeError(obj) + + +cdef void load_numpy_symbols() except *: + global np + global np_import_failed + + try: + import numpy as np + np_import_failed = False + except ImportError: + np_import_failed = True + return + + global npy_generic + npy_generic = np.generic + + global npy_integers + npy_integers = (np.int8, np.int16, np.int32, np.int64) + + global npy_uintegers + npy_uintegers = (np.uint8, np.uint16, np.uint32, np.uint64) + + global npy_floats + npy_floats = (np.float16, np.float32, np.float64) + + +# PyReader + +DEFAULT_PYREADER_SCALAR_HANDLERS = {} +DEFAULT_PYREADER_LIST_HANDLERS = {} +DEFAULT_PYREADER_MAP_HANDLERS = {} + + +cdef class PyReader(Reader): + cdef public dict scalar_handlers + cdef public dict list_handlers + cdef public dict map_handlers + + def __cinit__( + self, + InputStream stream not None, + mode=b'node', + scalar_handlers=DEFAULT_PYREADER_SCALAR_HANDLERS, + list_handlers=DEFAULT_PYREADER_LIST_HANDLERS, + map_handlers=DEFAULT_PYREADER_MAP_HANDLERS, + ): + self.scalar_handlers = scalar_handlers + self.list_handlers = list_handlers + self.map_handlers = map_handlers + + + cdef _scalar_handler(self, value, dict attributes): + handler = _pyreader_find_handler(attributes, self.scalar_handlers, b'scalar') + if handler is None: + return self._generic_scalar_handler(value, attributes) + else: + return handler(value) + + cdef _list_handler(self, ListFragmentIterator items, dict attributes): + handler = _pyreader_find_handler(attributes, self.list_handlers, b'list') + if handler is None: + return self._generic_list_handler(items, attributes) + else: + return handler(items) + + cdef _map_handler(self, MapFragmentIterator items, dict attributes): + handler = _pyreader_find_handler(attributes, self.map_handlers, b'map') + if handler is None: + return self._generic_map_handler(items, attributes) + else: + return handler(items) + + cdef _generic_scalar_handler(self, value, dict attributes): + return value + + cdef _generic_list_handler(self, ListFragmentIterator items, dict attributes): + return list(items) + + cdef _generic_map_handler(self, MapFragmentIterator items, dict attributes): + return dict(items) + + +def pyreader_scalar_handler(py_type): + def wrapper(function): + DEFAULT_PYREADER_SCALAR_HANDLERS[py_type] = function + return function + return wrapper + + +def pyreader_list_handler(py_type): + def wrapper(function): + DEFAULT_PYREADER_LIST_HANDLERS[py_type] = function + return function + return wrapper + + +def pyreader_map_handler(py_type): + def wrapper(function): + DEFAULT_PYREADER_MAP_HANDLERS[py_type] = function + return function + return wrapper + + +cdef _pyreader_find_handler(dict attributes, dict handlers, bytes type): + py_type = attributes.get(b'py') + if py_type is None: + return None + + handler = handlers.get(py_type) + if handler is None: + raise ValueError("No {} handler for {}".format(type, py_type)) + + return handler + + +@pyreader_scalar_handler(b'unicode') +def _pyreader_read_unicode(bytes value not None): + return PyUnicode_DecodeUTF8( + PyBytes_AS_STRING(value), PyBytes_GET_SIZE(value), NULL + ) + + +@pyreader_list_handler(b'dict') +def _pyreader_read_dict(ListFragmentIterator items not None): + result = {} + try: + while True: + key = next(items) + value = next(items) + result[key] = value + except StopIteration: + pass + return result + + +pyreader_scalar_handler(b'long')(long) + +pyreader_list_handler(b'list')(list) +pyreader_list_handler(b'tuple')(tuple) +pyreader_list_handler(b'set')(set) +pyreader_list_handler(b'frozenset')(frozenset) + + +# PyWriter + +DEFAULT_PYWRITER_HANDLERS = {} + +cdef class PyWriter(Writer): + cdef public dict handlers + + def __cinit__( + self, + OutputStream stream not None, + format=b'text', + mode=b'node', + int indent=4, + handlers=DEFAULT_PYWRITER_HANDLERS + ): + self.handlers = handlers + + cpdef py_type(self, bytes name): + _c_pywriter_py_type(self.c_writer, name) + return self + + cpdef write(self, obj): + handler = self.handlers.get(type(obj)) + if handler is None: + raise UnableToSerializeError(obj) + + handler(self, obj) + return self + +def pywriter_handler(type_): + def wrapper(function): + DEFAULT_PYWRITER_HANDLERS[type_] = function + return function + + return wrapper + + +cdef _c_pywriter_py_type(C.yson_writer* c_writer, bytes name): + _c_writer_begin_attributes(c_writer) + _c_writer_key_bytes(c_writer, b'py') + _c_writer_bytes(c_writer, name) + _c_writer_end_attributes(c_writer) + + +@pywriter_handler(type(None)) +def _pywriter_write_none(PyWriter writer, _): + _c_writer_entity(writer.c_writer) + + +@pywriter_handler(bool) +def _pywriter_write_bool(PyWriter writer, bint obj): + _c_writer_boolean(writer.c_writer, obj) + + +if PY2: + @pywriter_handler(int) + def _pywriter_write_int(PyWriter writer, int64_t obj): + _c_writer_int64(writer.c_writer, obj) + + +@pywriter_handler(long) +def _pywriter_write_long(PyWriter writer, obj not None): + _c_pywriter_py_type(writer.c_writer, b'long') + _c_writer_bytes(writer.c_writer, ConvertPyLongToPyBytes(obj)) + + +@pywriter_handler(float) +def _pywriter_write_float(PyWriter writer, double obj): + _c_writer_float64(writer.c_writer, obj) + + +@pywriter_handler(bytes) +def _pywriter_write_bytes(PyWriter writer, bytes obj not None): + _c_writer_bytes(writer.c_writer, obj) + + +@pywriter_handler(unicode) +def _pywriter_write_unicode(PyWriter writer, unicode obj not None): + _c_pywriter_py_type(writer.c_writer, b'unicode') + _c_writer_unicode(writer.c_writer, obj) + + +cdef inline _pywriter_write_iterable(PyWriter writer, obj): + _c_writer_begin_list(writer.c_writer) + for item in obj: + writer.write(item) + _c_writer_end_list(writer.c_writer) + + +@pywriter_handler(list) +def _pywriter_write_list(PyWriter writer, list obj not None): + _c_pywriter_py_type(writer.c_writer, b'list') + #_write_iterable(writer, obj) + # inline manually for type monomorphization + _c_writer_begin_list(writer.c_writer) + for item in obj: + writer.write(item) + _c_writer_end_list(writer.c_writer) + + +@pywriter_handler(tuple) +def _pywriter_write_tuple(PyWriter writer, tuple obj not None): + _c_pywriter_py_type(writer.c_writer, b'tuple') + # _write_iterable(writer, obj) + # inline manually for type monomorphization + _c_writer_begin_list(writer.c_writer) + for item in obj: + writer.write(item) + _c_writer_end_list(writer.c_writer) + + +@pywriter_handler(set) +def _pywriter_write_set(PyWriter writer, set obj not None): + _c_pywriter_py_type(writer.c_writer, b'set') + _pywriter_write_iterable(writer, obj) + + +@pywriter_handler(frozenset) +def _pywriter_write_frozenset(PyWriter writer, frozenset obj not None): + _c_pywriter_py_type(writer.c_writer, b'frozenset') + _pywriter_write_iterable(writer, obj) + + +@pywriter_handler(dict) +def _pywriter_write_dict(PyWriter writer, dict obj not None): + cdef bint good_keys + cdef PyObject* c_key + cdef PyObject* c_value + cdef Py_ssize_t c_pos = 0 + + # Check whether all keys are strings + good_keys = True + while PyDict_Next(obj, &c_pos, &c_key, &c_value): + if not isinstance(<object>c_key, bytes): + good_keys = False + break + + c_pos = 0 + + if good_keys: + # All keys are strings, can use YSON map form + _c_writer_begin_map(writer.c_writer) + while PyDict_Next(obj, &c_pos, &c_key, &c_value): + _c_writer_key_bytes(writer.c_writer, <bytes>c_key) + writer.write(<object>c_value) + _c_writer_end_map(writer.c_writer) + else: + # Some keys are not strings, need to write dictionary as a list + _c_pywriter_py_type(writer.c_writer, b'dict') + _c_writer_begin_list(writer.c_writer) + while PyDict_Next(obj, &c_pos, &c_key, &c_value): + writer.write(<object>c_key) + writer.write(<object>c_value) + _c_writer_end_list(writer.c_writer) + + +# Simple API + +@cython.returns(bytes) +def dumps(value, format=b'text', Writer not None=Writer): + r"""Convert an object to YSON node string. + + :param value: Python object to convert. + :param format: YSON format to use: 'binary', 'text' or 'pretty'. + :param Writer: YSON Writer class, may be supplied for custom serialization policies. + + >>> from cyson import dumps + + >>> print dumps(1234) + 1234 + >>> print dumps("Hello world! Привет!") + "Hello world! Привет!" + >>> print dumps([1, "foo", None, {'aaa': 'bbb'}]) + [1; "foo"; #; {"aaa" = "bbb"}] + >>> dumps([1, "foo", None, {'aaa': 'bbb'}], format='binary') + '[\x02\x02;\x01\x06foo;#;{\x01\x06aaa=\x01\x06bbb}]' + >>> print dumps([1, "foo", None, {'aaa': 'bbb'}], format='pretty') + [ + 1; + "foo"; + #; + { + "aaa" = "bbb" + } + ] + + """ + + sink = PyByteArray_FromStringAndSize(NULL, 0) + writer = Writer( + OutputStream.from_bytearray(sink, 200), + format + ) + writer.begin_stream().write(value).end_stream() + + return PyBytes_FromStringAndSize( + PyByteArray_AS_STRING(sink), PyByteArray_GET_SIZE(sink) + ) + + +def dumps_into(bytearray dest, value, format=b'text', Writer not None=Writer): + r"""Convert an object to YSON node string. + + :param dest: Destination bytearray. + :param value: Python object to convert. + :param format: YSON format to use: 'binary', 'text' or 'pretty'. + :param Writer: YSON Writer class, may be supplied for custom serialization policies. + + >>> dest = bytearray() + >>> dumps_into(dest, [1, "foo", None, {'aaa': 'bbb'}]) + >>> dest + bytearray(b'[1; "foo"; #; {"aaa" = "bbb"}]') + + """ + + writer = Writer( + OutputStream.from_bytearray(dest, 200), + format + ) + writer.begin_stream().write(value).end_stream() + + +@cython.returns(object) +def loads(value, Reader not None=Reader): + r"""Convert a YSON node string to Python object. + + :param value: YSON node string. + :param Reader: YSON Reader class, may be supplied for custom serialization policies. + + >>> from cyson import loads + + >>> loads('1234') + 1234 + >>> loads('3.14') + 3.14 + >>> loads('[1; "foo"; #; {"aaa" = "bbb"}]') + [1, 'foo', None, {'aaa': 'bbb'}] + >>> loads('[\x02\x02;\x01\x06foo;#;{\x01\x06aaa=\x01\x06bbb}]') + [1, 'foo', None, {'aaa': 'bbb'}] + + """ + + reader = Reader( + InputStream.from_string(value) + ) + + return reader.node() + + +def list_fragments(InputStream stream not None, + Reader not None=Reader, + bint process_table_index=False, + bint process_attributes=False, + bint stop_at_key_switch=False, + bint keep_control_records=False): + + reader = Reader(stream, b'list_fragment') + + # NOTE: for backward compatibility purpose + process_attributes = process_attributes or process_table_index + + return reader.list_fragments( + process_attributes=process_attributes, + stop_at_key_switch=stop_at_key_switch, + keep_control_records=keep_control_records, + ) + + +cdef inline void exhaust_key_switched_iterator(ListFragmentIterator iterator): + if not (iterator.is_key_switched or iterator.at_end): + for _ in iterator: + pass + + +def _make_first_group_iterator(first_value, ListFragmentIterator iterator): + yield first_value + + if not (iterator.is_key_switched or iterator.at_end): + for item in iterator: + yield item + + +def key_switched_list_fragments( + InputStream stream not None, Reader not None=Reader +): + cdef ListFragmentIterator iterator = list_fragments( + stream, + Reader, + process_attributes=True, + stop_at_key_switch=True, + keep_control_records=False, + ) + + cdef PyObject* first_value = PyIter_Next(iterator) + + if not first_value: + return + + yield _make_first_group_iterator(<object>first_value, iterator) + + Py_DECREF(<object>first_value) + + # manually iterate over unused records in group + exhaust_key_switched_iterator(iterator) + + while not iterator.at_end: + yield iterator + + # manually iterate over unused records in group + exhaust_key_switched_iterator(iterator) + + +def map_fragments(InputStream stream not None, Reader not None=Reader): + reader = Reader(stream, b'map_fragment') + return reader.map_fragments() diff --git a/library/python/cyson/cyson/helpers.cpp b/library/python/cyson/cyson/helpers.cpp new file mode 100644 index 0000000000..ae4a5dd2fa --- /dev/null +++ b/library/python/cyson/cyson/helpers.cpp @@ -0,0 +1,179 @@ +#include "helpers.h" + +#include <util/system/compiler.h> + +namespace NCYson { + void SetPrettyTypeError(PyObject* obj, const char* expected) { +#if PY_MAJOR_VERSION >= 3 + PyObject* bytes_repr = nullptr; + PyObject* tmp = PyObject_Repr(obj); + if (Y_LIKELY(tmp)) { + bytes_repr = PyUnicode_AsUTF8String(tmp); + Py_DECREF(tmp); + } +#else + PyObject* bytes_repr = PyObject_Repr(obj); +#endif + assert(PyBytes_Check(bytes_repr)); + + PyErr_Format( + PyExc_TypeError, + "expected %s, got %s (%s)", + expected, + Py_TYPE(obj)->tp_name, + bytes_repr ? PyBytes_AS_STRING(bytes_repr) : "<repr failed>"); + + Py_XDECREF(bytes_repr); + } + + PyObject* ConvertPyStringToPyBytes(PyObject* obj) { + if (PyBytes_Check(obj)) { + Py_INCREF(obj); + return obj; + } + + if (PyUnicode_Check(obj)) { + return PyUnicode_AsUTF8String(obj); + } + + SetPrettyTypeError(obj, "bytes or unicode"); + + return nullptr; + } + +#define FILL_DATA_FROM_BUFFER \ + *data = (char*)view.buf; \ + *size = (size_t)view.len; \ + PyBuffer_Release(&view) + + PyObject* GetCharBufferAndOwner(PyObject* obj, const char** data, size_t* size) { +#if PY_MAJOR_VERSION >= 3 + Py_buffer view; +#endif + + if (PyUnicode_Check(obj)) { + PyObject* encoded = PyUnicode_AsUTF8String(obj); + if (!encoded) { + return nullptr; + } + +#if PY_MAJOR_VERSION >= 3 + if (PyObject_GetBuffer(encoded, &view, PyBUF_SIMPLE) < 0) { +#else + if (PyObject_AsCharBuffer(encoded, data, (Py_ssize_t*)size) < 0) { +#endif + Py_DECREF(encoded); + return nullptr; + } + +#if PY_MAJOR_VERSION >= 3 + FILL_DATA_FROM_BUFFER; +#endif + + return encoded; + } + +#if PY_MAJOR_VERSION >= 3 + if (PyObject_GetBuffer(obj, &view, PyBUF_SIMPLE) < 0) { +#else + if (PyObject_AsCharBuffer(obj, data, (Py_ssize_t*)size) < 0) { +#endif + return nullptr; + } + +#if PY_MAJOR_VERSION >= 3 + FILL_DATA_FROM_BUFFER; +#endif + + Py_INCREF(obj); + + return obj; + } + +#undef FILL_DATA_FROM_BUFFER + + PyObject* ConvertPyStringToPyNativeString(PyObject* obj) { + if (PyBytes_Check(obj)) { +#if PY_MAJOR_VERSION >=3 + return PyUnicode_DecodeUTF8(PyBytes_AS_STRING(obj), PyBytes_GET_SIZE(obj), nullptr); +#else + Py_INCREF(obj); + return obj; +#endif + } + + if (PyUnicode_Check(obj)) { +#if PY_MAJOR_VERSION >=3 + Py_INCREF(obj); + return obj; +#else + return PyUnicode_AsUTF8String(obj); +#endif + } + + SetPrettyTypeError(obj, "bytes or unicode"); + + return nullptr; + } + + PyObject* ConvertPyLongToPyBytes(PyObject* obj) { + PyObject* result; + + if (!PyLong_Check(obj)) { + SetPrettyTypeError(obj, "long"); + return nullptr; + } + +#if PY_MAJOR_VERSION >= 3 + PyObject* tmp = _PyLong_Format(obj, 10); + if (!tmp) { + return nullptr; + } + + result = PyUnicode_AsUTF8String(tmp); + + Py_DECREF(tmp); +#else + result = _PyLong_Format(obj, 10, 0, 0); +#endif + return result; + } + + namespace NPrivate { + TPyObjectPtr::TPyObjectPtr() { + Ptr_ = nullptr; + } + + void TPyObjectPtr::Reset(PyObject* ptr) { + PyObject* tmp = Ptr_; + Py_XINCREF(ptr); + Ptr_ = ptr; + Py_XDECREF(tmp); + } + + PyObject* TPyObjectPtr::GetNew() { + Py_XINCREF(Ptr_); + return Ptr_; + } + + PyObject* TPyObjectPtr::GetBorrowed() { + return Ptr_; + } + + TPyObjectPtr::~TPyObjectPtr() { +#if PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >= 7 + if (_Py_IsFinalizing()) { + return; + } +#elif PY_MAJOR_VERSION >= 3 + // https://github.com/python/cpython/blob/3.6/Python/sysmodule.c#L1345 + if (_Py_Finalizing != NULL) { + return; + } +#endif + PyObject* tmp = Ptr_; + Ptr_ = nullptr; + Py_XDECREF(tmp); + } + } +} diff --git a/library/python/cyson/cyson/helpers.h b/library/python/cyson/cyson/helpers.h new file mode 100644 index 0000000000..326e6ffbe1 --- /dev/null +++ b/library/python/cyson/cyson/helpers.h @@ -0,0 +1,48 @@ +#pragma once + +#include <Python.h> + +namespace NCYson { + constexpr bool PY3 = PY_MAJOR_VERSION == 3; + constexpr bool PY2 = PY_MAJOR_VERSION == 2; + + void SetPrettyTypeError(PyObject*, const char*); + PyObject* ConvertPyStringToPyBytes(PyObject*); + PyObject* GetCharBufferAndOwner(PyObject*, const char**, size_t*); + PyObject* ConvertPyStringToPyNativeString(PyObject*); + PyObject* ConvertPyLongToPyBytes(PyObject*); + + inline PyObject* GetSelf(PyObject* self) { + Py_INCREF(self); + return self; + } + + namespace NPrivate { + class TPyObjectPtr { + public: + void Reset(PyObject*); + PyObject* GetNew(); + PyObject* GetBorrowed(); + + TPyObjectPtr(); + ~TPyObjectPtr(); + + private: + PyObject* Ptr_; + }; + } +} + +#if PY_MAJOR_VERSION >= 3 +#define GenericCheckBuffer PyObject_CheckBuffer +#define PyFile_CheckExact(x) 0 +#define PyFile_AsFile(x) (FILE*)(PyErr_Format(PyExc_NotImplementedError, "PyFile_AsFile not implemented for Python3")) +#else +#define GenericCheckBuffer PyObject_CheckReadBuffer +#endif + +#if PY_VERSION_HEX < 0x030900A4 && !defined(Py_SET_SIZE) +static inline void _Py_SET_SIZE(PyVarObject *ob, Py_ssize_t size) +{ ob->ob_size = size; } +#define Py_SET_SIZE(ob, size) _Py_SET_SIZE((PyVarObject*)(ob), size) +#endif diff --git a/library/python/cyson/cyson/libcyson.pxd b/library/python/cyson/cyson/libcyson.pxd new file mode 100644 index 0000000000..091870e4bc --- /dev/null +++ b/library/python/cyson/cyson/libcyson.pxd @@ -0,0 +1,116 @@ +from libc.stddef cimport size_t +from libc.stdint cimport uint64_t, int64_t +from libc.stdio cimport FILE + +cdef extern from "<library/c/cyson/cyson.h>": + struct yson_input_stream: + pass + struct yson_output_stream: + pass + + struct yson_reader: + pass + struct yson_writer: + pass + + struct yson_string: + const char* ptr + size_t length + + enum yson_event_type: + YSON_EVENT_BEGIN_STREAM + YSON_EVENT_END_STREAM + YSON_EVENT_BEGIN_LIST + YSON_EVENT_END_LIST + YSON_EVENT_BEGIN_MAP + YSON_EVENT_END_MAP + YSON_EVENT_BEGIN_ATTRIBUTES + YSON_EVENT_END_ATTRIBUTES + YSON_EVENT_KEY + YSON_EVENT_SCALAR + YSON_EVENT_ERROR + + enum yson_scalar_type: + YSON_SCALAR_ENTITY + YSON_SCALAR_BOOLEAN + YSON_SCALAR_INT64 + YSON_SCALAR_UINT64 + YSON_SCALAR_FLOAT64 + YSON_SCALAR_STRING + + enum yson_input_stream_result: + YSON_INPUT_STREAM_RESULT_OK + YSON_INPUT_STREAM_RESULT_EOF + YSON_INPUT_STREAM_RESULT_ERROR + + enum yson_output_stream_result: + YSON_OUTPUT_STREAM_RESULT_OK + YSON_OUTPUT_STREAM_RESULT_ERROR + + enum yson_writer_result: + YSON_WRITER_RESULT_OK + YSON_WRITER_RESULT_BAD_STREAM + YSON_WRITER_RESULT_ERROR + + enum yson_stream_type: + YSON_STREAM_TYPE_NODE + YSON_STREAM_TYPE_LIST_FRAGMENT + YSON_STREAM_TYPE_MAP_FRAGMENT + + ctypedef yson_input_stream_result (*yson_input_stream_func)( + void* ctx, + const char** ptr, + size_t* length) except YSON_INPUT_STREAM_RESULT_ERROR + + ctypedef yson_output_stream_result (*yson_output_stream_func)( + void* ctx, + const char* ptr, + size_t length) except YSON_OUTPUT_STREAM_RESULT_ERROR + + yson_input_stream* yson_input_stream_from_string(const char* ptr, size_t length) + yson_input_stream* yson_input_stream_from_file(FILE* file, size_t buffer_size) + yson_input_stream* yson_input_stream_from_fd(int fd, size_t buffer_size) + yson_input_stream* yson_input_stream_new(void* ctx, yson_input_stream_func callback); + void yson_input_stream_delete(yson_input_stream* stream) + + yson_output_stream* yson_output_stream_from_file(FILE* file, size_t buffer_size) + yson_output_stream* yson_output_stream_from_fd(int fd, size_t buffer_size) + yson_output_stream* yson_output_stream_new(void* ctx, yson_output_stream_func callback, size_t buffer_size); + void yson_output_stream_delete(yson_output_stream* stream) + + yson_reader* yson_reader_new(yson_input_stream* stream, yson_stream_type mode) + void yson_reader_delete(yson_reader* reader) + const char* yson_reader_get_error_message(yson_reader* reader) + + yson_event_type yson_reader_get_next_event(yson_reader* reader) except? YSON_EVENT_ERROR + yson_scalar_type yson_reader_get_scalar_type(yson_reader* reader) + + bint yson_reader_get_boolean(yson_reader* reader) + int64_t yson_reader_get_int64(yson_reader* reader) + uint64_t yson_reader_get_uint64(yson_reader* reader) + double yson_reader_get_float64(yson_reader* reader) + const yson_string* yson_reader_get_string(yson_reader* reader) + + yson_writer* yson_writer_new_binary(yson_output_stream* stream, yson_stream_type mode) + yson_writer* yson_writer_new_text(yson_output_stream* stream, yson_stream_type mode) + yson_writer* yson_writer_new_pretty_text(yson_output_stream* stream, yson_stream_type mode, size_t indent) + void yson_writer_delete(yson_writer* writer) + const char* yson_writer_get_error_message(yson_writer* writer) + + yson_writer_result yson_writer_write_begin_stream(yson_writer* writer) + yson_writer_result yson_writer_write_end_stream(yson_writer* writer) + yson_writer_result yson_writer_write_begin_list(yson_writer* writer) + yson_writer_result yson_writer_write_end_list(yson_writer* writer) + yson_writer_result yson_writer_write_begin_map(yson_writer* writer) + yson_writer_result yson_writer_write_end_map(yson_writer* writer) + yson_writer_result yson_writer_write_begin_attributes(yson_writer* writer) + yson_writer_result yson_writer_write_end_attributes(yson_writer* writer) + + yson_writer_result yson_writer_write_entity(yson_writer* writer) + yson_writer_result yson_writer_write_key(yson_writer* writer, const char* ptr, size_t length) + yson_writer_result yson_writer_write_string(yson_writer* writer, const char* ptr, size_t length) + yson_writer_result yson_writer_write_int64(yson_writer* writer, int64_t value) + yson_writer_result yson_writer_write_uint64(yson_writer* writer, uint64_t value) + yson_writer_result yson_writer_write_boolean(yson_writer* writer, int value) + yson_writer_result yson_writer_write_float64(yson_writer* writer, double value) + diff --git a/library/python/cyson/cyson/unsigned_long.cpp b/library/python/cyson/cyson/unsigned_long.cpp new file mode 100644 index 0000000000..f20b3f106e --- /dev/null +++ b/library/python/cyson/cyson/unsigned_long.cpp @@ -0,0 +1,292 @@ +#include "unsigned_long.h" + +#include "helpers.h" + +#include <util/generic/va_args.h> +#include <util/system/compiler.h> + +#if (PY_MAJOR_VERSION == 2) +#include <longintrepr.h> +#endif + +#if (PY_MAJOR_VERSION >= 3) && defined(Py_LIMITED_API) +#error "limited API for Python3 not supported yet" +#endif + +#if PY_VERSION_HEX >= 0x030800b4 && PY_VERSION_HEX < 0x03090000 +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#endif + +#ifndef Py_TPFLAGS_CHECKTYPES +#define Py_TPFLAGS_CHECKTYPES 0 +#endif + +namespace NCYson { + static NPrivate::TPyObjectPtr PyUnsignedLong_ReprPtr; + + static void SetNegativeValueError(PyObject* obj, PyTypeObject* type) { + assert(PyLong_Check(obj)); + assert(Py_SIZE(obj) < 0); + + PyObject* repr = ConvertPyLongToPyBytes(obj); + + PyErr_Format( + PyExc_OverflowError, + "cannot convert negative value (%s) to %s", + Y_LIKELY(repr) ? PyBytes_AS_STRING(repr) : "???", + type->tp_name); + + Py_XDECREF(repr); + } + + PyObject* PreparePyUIntType(PyObject* reprfunc) { + if (Y_UNLIKELY(PyType_Ready(&PyUnsignedLong_Type) < 0)) { + return nullptr; + } + + PyUnsignedLong_ReprPtr.Reset(reprfunc); + + Py_INCREF((PyObject*)&PyUnsignedLong_Type); + + return (PyObject*)&PyUnsignedLong_Type; + } + + PyObject* ConstructPyUIntFromPyLong(PyLongObject* obj) { + PyObject *result; + Py_ssize_t index; + Py_ssize_t size; + + assert(PyLong_Check(obj)); + + size = Py_SIZE(obj); + if (size < 0) { + SetNegativeValueError((PyObject*)obj, &PyUnsignedLong_Type); + return nullptr; + } + + result = PyLong_Type.tp_alloc(&PyUnsignedLong_Type, size); + if (Y_UNLIKELY(!result)) { + return nullptr; + } + + assert(IsExactPyUInt(result)); + + Py_SET_SIZE(result, size); + + for (index = 0; index < size; ++index) { + ((PyLongObject*)result)->ob_digit[index] = obj->ob_digit[index]; + } + + return result; + } + + PyObject* ConstructPyUIntFromUint(uint64_t n) { + PyObject* result; + PyObject* tmp; + + tmp = PyLong_FromUnsignedLong(n); + if (Y_UNLIKELY(!tmp)) { + return nullptr; + } + + result = ConstructPyUIntFromPyLong((PyLongObject*)tmp); + + Py_DECREF(tmp); + + return result; + } + + static PyObject* unsigned_long_new(PyTypeObject *type, PyObject *args, PyObject* kws) { + PyObject* result; + + result = PyLong_Type.tp_new(type, args, kws); + if (Y_UNLIKELY(!result)) { + return nullptr; + } + + assert(IsExactPyUInt(result)); + + if (Py_SIZE(result) < 0) { + SetNegativeValueError(result, type); + Py_DECREF(result); + return nullptr; + } + + return result; + } + + static PyObject* unsigned_long_repr(PyObject* self) { + PyObject* result; + + PyObject* callable = PyUnsignedLong_ReprPtr.GetBorrowed(); + + if (callable) { + result = PyObject_CallFunctionObjArgs(callable, self, nullptr); + } else { + result = PyObject_Repr(self); + } + + return result; + } + +#define PYOBJECT_ARG(o) PyObject* o, +#define PYOBJECT_ARG_LAST(o) PyObject* o + +#define UNSIGNED_LONG_OPERATION(SLOT, ...) \ + static PyObject* unsigned_long_##SLOT(Y_MAP_ARGS_WITH_LAST(PYOBJECT_ARG, PYOBJECT_ARG_LAST, __VA_ARGS__)) { \ + PyObject* result = PyLong_Type.tp_as_number->nb_##SLOT(__VA_ARGS__); \ + if (result && PyLong_CheckExact(result) && (Py_SIZE(result) >= 0)) { \ + PyObject* tmp = result; \ + result = ConstructPyUIntFromPyLong((PyLongObject*)tmp); \ + Py_DECREF(tmp); \ + } \ + return result; \ + } + + UNSIGNED_LONG_OPERATION(add, x, y); + UNSIGNED_LONG_OPERATION(subtract, x, y); + UNSIGNED_LONG_OPERATION(multiply, x, y); +#if PY_MAJOR_VERSION < 3 + UNSIGNED_LONG_OPERATION(divide, x, y); +#endif + UNSIGNED_LONG_OPERATION(remainder, x, y); + UNSIGNED_LONG_OPERATION(power, x, y, z); + UNSIGNED_LONG_OPERATION(lshift, x, y); + UNSIGNED_LONG_OPERATION(rshift, x, y); + UNSIGNED_LONG_OPERATION(and, x, y); + UNSIGNED_LONG_OPERATION(xor, x, y); + UNSIGNED_LONG_OPERATION(or, x, y); + UNSIGNED_LONG_OPERATION(floor_divide, x, y); + UNSIGNED_LONG_OPERATION(true_divide, x, y); + +#undef UNSIGNED_LONG_OPERATION +#undef PYOBJECT_ARG_LAST +#undef PYOBJECT_ARG + + static PyNumberMethods unsigned_long_as_number = { + unsigned_long_add, /*nb_add*/ + unsigned_long_subtract, /*nb_subtract*/ + unsigned_long_multiply, /*nb_multiply*/ +#if PY_MAJOR_VERSION < 3 + unsigned_long_divide, /*nb_divide*/ +#endif + unsigned_long_remainder, /*nb_remainder*/ + 0, /*nb_divmod*/ + unsigned_long_power, /*nb_power*/ + 0, /*nb_negative*/ + GetSelf, /*nb_positive*/ + GetSelf, /*nb_absolute*/ + 0, /*nb_nonzero*/ + 0, /*nb_invert*/ + unsigned_long_lshift, /*nb_lshift*/ + unsigned_long_rshift, /*nb_rshift*/ + unsigned_long_and, /*nb_and*/ + unsigned_long_xor, /*nb_xor*/ + unsigned_long_or, /*nb_or*/ +#if PY_MAJOR_VERSION < 3 + 0, /*nb_coerce*/ +#endif + 0, /*nb_int*/ +#if PY_MAJOR_VERSION < 3 + 0, /*nb_long*/ +#else + 0, /*reserved*/ +#endif + 0, /*nb_float*/ +#if PY_MAJOR_VERSION < 3 + 0, /*nb_oct*/ + 0, /*nb_hex*/ +#endif + 0, /*nb_inplace_add*/ + 0, /*nb_inplace_subtract*/ + 0, /*nb_inplace_multiply*/ +#if PY_MAJOR_VERSION < 3 + 0, /*nb_inplace_divide*/ +#endif + 0, /*nb_inplace_remainder*/ + 0, /*nb_inplace_power*/ + 0, /*nb_inplace_lshift*/ + 0, /*nb_inplace_rshift*/ + 0, /*nb_inplace_and*/ + 0, /*nb_inplace_xor*/ + 0, /*nb_inplace_or*/ + unsigned_long_floor_divide, /*nb_floor_divide*/ + unsigned_long_true_divide, /*nb_true_divide*/ + 0, /*nb_inplace_floor_divide*/ + 0, /*nb_inplace_true_divide*/ + 0, /*nb_index*/ +#if PY_VERSION_HEX >= 0x03050000 + 0, /*nb_matrix_multiply*/ + 0, /*nb_inplace_matrix_multiply*/ +#endif + }; + + PyTypeObject PyUnsignedLong_Type = { + PyVarObject_HEAD_INIT(0, 0) + "cyson._cyson.UInt", /*tp_name*/ + PyLong_Type.tp_basicsize, /*tp_basicsize*/ + PyLong_Type.tp_itemsize, /*tp_itemsize*/ + PyLong_Type.tp_dealloc, /*tp_dealloc*/ +#if PY_VERSION_HEX < 0x030800b4 + 0, /*tp_print*/ +#endif +#if PY_VERSION_HEX >= 0x030800b4 + 0, /*tp_vectorcall_offset*/ +#endif + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ +#if PY_MAJOR_VERSION < 3 + 0, /*tp_compare*/ +#endif +#if PY_MAJOR_VERSION >= 3 + 0, /*tp_as_async*/ +#endif + unsigned_long_repr, /*tp_repr*/ + &unsigned_long_as_number, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT|Py_TPFLAGS_CHECKTYPES|Py_TPFLAGS_LONG_SUBCLASS, /*tp_flags*/ + "UInt(0) -> UInt\nUInt(x, base=10) -> UInt", /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + 0, /*tp_methods*/ + 0, /*tp_members*/ + 0, /*tp_getset*/ + &PyLong_Type, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + 0, /*tp_init*/ + 0, /*tp_alloc*/ + unsigned_long_new, /*tp_new*/ + 0, /*tp_free*/ + 0, /*tp_is_gc*/ + 0, /*tp_bases*/ + 0, /*tp_mro*/ + 0, /*tp_cache*/ + 0, /*tp_subclasses*/ + 0, /*tp_weaklist*/ + 0, /*tp_del*/ + 0, /*tp_version_tag*/ +#if PY_VERSION_HEX >= 0x030400a1 + 0, /*tp_finalize*/ +#endif +#if PY_VERSION_HEX >= 0x030800b1 + 0, /*tp_vectorcall*/ +#endif +#if PY_VERSION_HEX >= 0x030800b4 && PY_VERSION_HEX < 0x03090000 + 0, /*tp_print*/ +#endif + }; +} diff --git a/library/python/cyson/cyson/unsigned_long.h b/library/python/cyson/cyson/unsigned_long.h new file mode 100644 index 0000000000..8f8c8c1da7 --- /dev/null +++ b/library/python/cyson/cyson/unsigned_long.h @@ -0,0 +1,27 @@ +#pragma once + +#include <Python.h> + +#include <stdint.h> + +namespace NCYson { + extern PyTypeObject PyUnsignedLong_Type; + + PyObject* PreparePyUIntType(PyObject* repr = nullptr); + PyObject* ConstructPyUIntFromPyLong(PyLongObject*); + PyObject* ConstructPyUIntFromUint(uint64_t); + + inline int IsExactPyUInt(PyObject* obj) { + return Py_TYPE(obj) == &PyUnsignedLong_Type; + } + +#if PY_MAJOR_VERSION >= 3 + inline PyObject* ConstructPyNumberFromUint(uint64_t n) { + return ConstructPyUIntFromUint(n); + } +#else + inline PyObject* ConstructPyNumberFromUint(uint64_t n) { + return PyLong_FromUnsignedLong(n); + } +#endif +} diff --git a/library/python/cyson/pymodule/ya.make b/library/python/cyson/pymodule/ya.make new file mode 100644 index 0000000000..dd308bd8a0 --- /dev/null +++ b/library/python/cyson/pymodule/ya.make @@ -0,0 +1,21 @@ +PY_ANY_MODULE(_cyson) + +IF (PYTHON_CONFIG MATCHES "python3" OR USE_SYSTEM_PYTHON MATCHES "3.") + PYTHON3_MODULE() +ELSE() + PYTHON2_MODULE() +ENDIF() + +NO_WSHADOW() + +PEERDIR( + library/c/cyson +) + +SRCS( + library/python/cyson/cyson/_cyson.pyx + library/python/cyson/cyson/helpers.cpp + library/python/cyson/cyson/unsigned_long.cpp +) + +END() diff --git a/library/python/cyson/ut/test_control_attributes.py b/library/python/cyson/ut/test_control_attributes.py new file mode 100644 index 0000000000..221542b12d --- /dev/null +++ b/library/python/cyson/ut/test_control_attributes.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function, absolute_import, division + +import itertools +from functools import partial + +import pytest +import six + +from cyson import ( + YsonEntity, InputStream, + list_fragments, key_switched_list_fragments, + Reader, UnicodeReader +) + + +def filter_control_records(list): + return [ + _ for _ in list + if not isinstance(_[2], YsonEntity) + ] + + +def canonize(val, as_unicode): + _canonize = partial(canonize, as_unicode=as_unicode) + + if isinstance(val, six.binary_type) and as_unicode: + return val.decode('utf8') + elif isinstance(val, six.text_type) and not as_unicode: + return val.encode('utf8') + elif isinstance(val, (list, tuple)): + return [_canonize(elem) for elem in val] + elif isinstance(val, dict): + return {_canonize(k): _canonize(v) for k, v in val.items()} + return val + + +@pytest.mark.parametrize( + 'reader, as_unicode', [ + [Reader, False], + [UnicodeReader, True], + ], +) +@pytest.mark.parametrize( + 'keep_control_records', [True, False] +) +def test_row_index(keep_control_records, reader, as_unicode): + _ = partial(canonize, as_unicode=as_unicode) + + data = b""" + <row_index=0>#; + {a=1;b=2}; + {a=2;b=3}; + {a=3;b=4}; + <row_index=10000>#; + {a=-1;b=-1}; + {a=-2;b=-2}; + """ + + iter = list_fragments( + stream=InputStream.from_string(data), + Reader=reader, + process_attributes=True, + keep_control_records=keep_control_records, + ) + records = [(iter.range_index, iter.row_index, __) for __ in iter] + + etalon = [ + (None, -1, YsonEntity(attributes={b'row_index': 0})), + (None, 0, _({b'a': 1, b'b': 2})), + (None, 1, _({b'a': 2, b'b': 3})), + (None, 2, _({b'a': 3, b'b': 4})), + (None, 9999, YsonEntity(attributes={b'row_index': 10000})), + (None, 10000, _({b'a': -1, b'b': -1})), + (None, 10001, _({b'a': -2, b'b': -2})), + ] + + if not keep_control_records: + etalon = filter_control_records(etalon) + + assert records == etalon + + +@pytest.mark.parametrize( + 'reader, as_unicode', [ + [Reader, False], + [UnicodeReader, True], + ] +) +@pytest.mark.parametrize( + 'keep_control_records', [True, False], +) +@pytest.mark.parametrize( + 'parameter_name', + ['process_attributes', 'process_table_index'] +) +def test_range_index(parameter_name, keep_control_records, reader, as_unicode): + _ = partial(canonize, as_unicode=as_unicode) + + data = b""" + <range_index=2; row_index=0>#; + {a=1;b=2}; + {a=2;b=3}; + {a=3;b=4}; + <range_index=0; row_index=10000>#; + {a=-1;b=-1}; + {a=-2;b=-2}; + """ + + iter = list_fragments( + stream=InputStream.from_string(data), + Reader=reader, + **{parameter_name: True, 'keep_control_records': keep_control_records} + ) + records = [(iter.range_index, iter.row_index, __) for __ in iter] + + etalon = [ + (2, -1, YsonEntity(attributes={b'range_index': 2, b'row_index': 0})), + (2, 0, _({b'a': 1, b'b': 2})), + (2, 1, _({b'a': 2, b'b': 3})), + (2, 2, _({b'a': 3, b'b': 4})), + (0, 9999, YsonEntity(attributes={b'range_index': 0, b'row_index': 10000})), + (0, 10000, _({b'a': -1, b'b': -1})), + (0, 10001, _({b'a': -2, b'b': -2})), + ] + + if not keep_control_records: + etalon = filter_control_records(etalon) + + assert records == etalon + + +@pytest.mark.parametrize( + 'reader, as_unicode', [ + [Reader, False], + [UnicodeReader, True], + ] +) +def test_key_switch_first(reader, as_unicode): + _ = partial(canonize, as_unicode=as_unicode) + + data = b""" + <key_switch=True>#; + {k=1;a=1;b=2}; + {k=1;a=2;b=3}; + {k=1;a=3;b=4}; + <key_switch=True>#; + {k=2;a=-1;b=-1}; + {k=2;a=-2;b=-2}; + """ + + iter = key_switched_list_fragments( + stream=InputStream.from_string(data), + Reader=reader, + ) + records = [list(__) for __ in iter] + + assert records == [ + [ + _({b'k': 1, b'a': 1, b'b': 2}), + _({b'k': 1, b'a': 2, b'b': 3}), + _({b'k': 1, b'a': 3, b'b': 4}), + ], + [ + _({b'k': 2, b'a': -1, b'b': -1}), + _({b'k': 2, b'a': -2, b'b': -2}), + ] + ] + + +@pytest.mark.parametrize( + 'reader, as_unicode', [ + [Reader, False], + [UnicodeReader, True], + ] +) +def test_key_switch_nofirst(reader, as_unicode): + _ = partial(canonize, as_unicode=as_unicode) + + data = b""" + {k=1;a=1;b=2}; + {k=1;a=2;b=3}; + {k=1;a=3;b=4}; + <key_switch=True>#; + {k=2;a=-1;b=-1}; + {k=2;a=-2;b=-2}; + """ + + iter = key_switched_list_fragments( + stream=InputStream.from_string(data), + Reader=reader + ) + records = [list(__) for __ in iter] + + assert records == [ + [ + _({b'k': 1, b'a': 1, b'b': 2}), + _({b'k': 1, b'a': 2, b'b': 3}), + _({b'k': 1, b'a': 3, b'b': 4}), + ], + [ + _({b'k': 2, b'a': -1, b'b': -1}), + _({b'k': 2, b'a': -2, b'b': -2}), + ] + ] + + +@pytest.mark.parametrize( + 'reader, as_unicode', [ + [Reader, False], + [UnicodeReader, True], + ] +) +def test_key_switch_exhaust_unused_records(reader, as_unicode): + _ = partial(canonize, as_unicode=as_unicode) + + data = b""" + {k=1;a=1;b=2}; + {k=1;a=2;b=3}; + {k=1;a=3;b=4}; + <key_switch=True>#; + {k=2;a=-1;b=-1}; + {k=2;a=-2;b=-2}; + """ + + iter = key_switched_list_fragments( + stream=InputStream.from_string(data), + Reader=reader, + ) + + records = [] + + for group in iter: + records.append( + list(itertools.islice(group, 2)) + ) + + assert records == [ + [ + _({b'k': 1, b'a': 1, b'b': 2}), + _({b'k': 1, b'a': 2, b'b': 3}), + ], + [ + _({b'k': 2, b'a': -1, b'b': -1}), + _({b'k': 2, b'a': -2, b'b': -2}), + ] + ] + + +@pytest.mark.parametrize('reader', [Reader, UnicodeReader]) +def test_key_switch_empty(reader): + assert list( + key_switched_list_fragments( + stream=InputStream.from_string(""), + Reader=reader, + ) + ) == [] diff --git a/library/python/cyson/ut/test_input_stream.py b/library/python/cyson/ut/test_input_stream.py new file mode 100644 index 0000000000..ae7c0e8f1e --- /dev/null +++ b/library/python/cyson/ut/test_input_stream.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function, absolute_import, division + +import atexit +import io +import os +import tempfile + +import pytest +import six + +from cyson import Reader, InputStream, dumps + + +def prepare_file(string): + filepath = tempfile.mktemp() + + with open(filepath, 'wb') as sink: + sink.write(string) + + atexit.register(os.remove, filepath) + + return filepath + + +def prepare_bytesio(string, klass): + obj = klass() + obj.write(b'?:!;*') + obj.write(string) + obj.seek(5) + + return obj + + +def slice_string(string): + index = 0 + length = len(string) + + while index < length: + yield string[index:index + 2] + index += 2 + + +# <method name>, <input constructor> +CASES = ( + ('from_string', lambda x: x), + ('from_iter', slice_string), + ('from_file', lambda x: prepare_bytesio(x, io.BytesIO)), + ('from_file', lambda x: open(prepare_file(x), 'rb')), + ('from_fd', lambda x: os.open(prepare_file(x), os.O_RDONLY)), +) + +if six.PY2: + import StringIO + import cStringIO + + CASES += ( + ('from_file', lambda x: prepare_bytesio(x, StringIO.StringIO)), + ('from_file', lambda x: prepare_bytesio(x, cStringIO.StringIO)), + ) + + +DATA = {u'a': [1, u'word', 3], b'b': b'xyz', u'c': None} +ETALON = {b'a': [1, b'word', 3], b'b': b'xyz', b'c': None} + + +@pytest.fixture(scope='module') +def serialized_data(): + return dumps(DATA, format='binary') + + +def test_serizlized_data(serialized_data): + assert type(serialized_data) is bytes + + +@pytest.mark.parametrize('method_name,make_input', CASES) +def test_input_streams(method_name, make_input, serialized_data): + method = getattr(InputStream, method_name) + input_stream = method(make_input(serialized_data)) + + assert Reader(input_stream).node() == ETALON diff --git a/library/python/cyson/ut/test_py_reader_writer.py b/library/python/cyson/ut/test_py_reader_writer.py new file mode 100644 index 0000000000..0238040f50 --- /dev/null +++ b/library/python/cyson/ut/test_py_reader_writer.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function, absolute_import, division + +import pytest +import six + +from cyson import PyWriter, PyReader, dumps, loads, dumps_into + + +if six.PY3: + unicode = str + + +def switch_string_type(string): + if isinstance(string, unicode): + return string.encode('utf8') + elif isinstance(string, bytes): + return string.decode('utf8') + + raise TypeError('expected bytes or unicode, got {!r}'.format(string)) + + +CASES = [ + None, + # int + 0, 1, -1, 2**63, -2**63, 2**64 - 1, + # float + 0.0, 100.0, -100.0, + # long + 10**100, 2**300, -7**100, + # bytes + b'', b'hello', u'Привет'.encode('utf8'), + # unicode + u'', u'hello', u'Привет', + # tuple + (), (0,), (1, 'hello'), (17, 'q') * 100, + # list + [], [0], ['hello', set([1, 2, 3])], [17, 'q'] * 100, + # dict + {}, {'a': 'b'}, {'a': 17}, {'a': frozenset([1, 2, 3])}, {b'a': 1, u'b': 2}, + {1: 2, 3: 4, 5: None}, {(1, 2, 3): (1, 4, 9), None: 0}, + # set + set(), {1, 2, 3}, {'hello', 'world'}, + # frozenset + frozenset(), frozenset([1, 2, 3]), frozenset(['hello', 'world']), +] + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +@pytest.mark.parametrize('value', CASES) +def test_roundtrip(value, format): + encoded = dumps(value, format=format, Writer=PyWriter) + decoded = loads(encoded, Reader=PyReader) + assert encoded == dumps(value, format=switch_string_type(format), Writer=PyWriter) + assert type(decoded) is type(value) + assert decoded == value + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +@pytest.mark.parametrize('value', CASES) +def test_roundtrip_bytearray(value, format): + encoded1 = bytearray() + encoded2 = bytearray() + dumps_into(encoded1, value, format=format, Writer=PyWriter) + dumps_into(encoded2, value, format=switch_string_type(format), Writer=PyWriter) + decoded = loads(encoded1, Reader=PyReader) + assert decoded == loads(encoded2, Reader=PyReader) + assert type(decoded) is type(value) + assert decoded == value diff --git a/library/python/cyson/ut/test_reader_writer.py b/library/python/cyson/ut/test_reader_writer.py new file mode 100644 index 0000000000..6428ea0b56 --- /dev/null +++ b/library/python/cyson/ut/test_reader_writer.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function, absolute_import, division + +import io +import math +import pytest +import six +import sys + +from functools import partial + +from cyson import ( + dumps, loads, YsonInt64, YsonUInt64, UInt, Writer, OutputStream, + UnicodeReader, +) + + +if six.PY2: + NativeUInt = long # noqa: F821 +elif six.PY3: + NativeUInt = UInt + unicode = str + long = int +else: + raise RuntimeError('Unsupported Python version') + + +def canonize(value, as_unicode=False): + _canonize = partial(canonize, as_unicode=as_unicode) + + if isinstance(value, (list, tuple)): + return [_canonize(_) for _ in value] + elif isinstance(value, dict): + return {_canonize(k): _canonize(value[k]) for k in value} + elif isinstance(value, unicode) and not as_unicode: + return value.encode('utf8') + elif isinstance(value, bytes) and as_unicode: + return value.decode('utf8') + + return value + + +def switch_string_type(string): + if isinstance(string, bytes): + return string.decode('utf8') + elif isinstance(string, unicode): + return string.encode('utf8') + + raise TypeError('expected unicode or bytes, got {!r}'.format(string)) + + +def coerce(obj, to, via=None): + if via is None: + via = to + + if isinstance(obj, to): + return obj + + return via(obj) + + +SKIP_PY3 = pytest.mark.skipif(six.PY3, reason='Makes no sense for Python3') + + +if six.PY3 and sys.platform == 'win32': + NUMPY_CASES = [] +else: + import numpy as np + + NUMPY_CASES = [ + # numpy int + np.int8(2 ** 7 - 1), np.int16(2 ** 15 - 1), + np.int32(2 ** 31 - 1), np.int64(2 ** 63 - 1), + # numpy uint + np.uint8(2 ** 8 - 1), np.uint16(2 ** 16 - 1), + np.uint32(2 ** 32 - 1), np.uint64(2 ** 64 - 1), + # numpy float + np.float16(100.0), np.float32(100.0), np.float64(100.0), + ] + + +CASES = [ + # NoneType + None, + # boolean + True, False, + # int + 0, 1, -1, int(2 ** 63 - 1), int(-2 ** 63), + # float + 0.0, 100.0, -100.0, float('inf'), float('-inf'), + # bytes + b'', b'hello', u'Привет'.encode('utf8'), + # unicode + u'', u'hello', u'Привет', + # list + [], [0], [1, 'hello'], [17, 'q'] * 100, [b'bytes'], + # tuple + (), (0,), (1, 'hello'), (17, 'q') * 100, (b'bytes',), + # dict + {}, {'a': 'b'}, {'a': 17}, {'a': [1, 2, 3]}, {b'a': 1, u'b': b'a'} +] + NUMPY_CASES + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +@pytest.mark.parametrize('value', CASES) +def test_roundtrip(value, format): + encoded = dumps(value, format) + decoded = loads(encoded) + assert encoded == dumps(value, switch_string_type(format)) + assert decoded == canonize(value) + + +# NOTE: roundtrip test doesn't work for NaN (NaN != NaN) +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +def test_nan(format): + encoded = dumps(float('nan'), format) + decoded = loads(encoded) + assert encoded == dumps(float('nan'), switch_string_type(format)) + assert math.isnan(decoded) + + +@SKIP_PY3 +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +@pytest.mark.parametrize( + 'value', [long(0), long(1), long(2 ** 63), long(2 ** 64 - 1)] +) +def test_long_roundtrip(value, format): + encoded = dumps(value, format) + decoded = loads(encoded) + assert encoded == dumps(value, switch_string_type(format)) + assert decoded == value + + +@pytest.mark.parametrize( + 'value', [NativeUInt(0), NativeUInt(111), NativeUInt(2 ** 63), NativeUInt(2 ** 64 - 1)] +) +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +def test_readwrite_uint64(value, format): + dumped_uint64 = dumps(coerce(value, YsonUInt64), format=format) + loaded_uint64 = loads(dumped_uint64) + + assert type(value) is NativeUInt + assert type(loaded_uint64) is NativeUInt + assert dumps(value, format=format) == dumped_uint64 + + +@pytest.mark.parametrize('value', [int(-2 ** 63), -111, 0, 111, int(2 ** 63 - 1)]) +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +def test_readwrite_int64(value, format): + dumped_int64 = dumps(YsonInt64(value), format=format) + loaded_int64 = loads(dumped_int64) + + assert type(value) is int + assert type(loaded_int64) is int + assert dumps(value, format=format) == dumped_int64 + + +@SKIP_PY3 +def test_long_overflow(): + with pytest.raises(OverflowError): + dumps(long(-1)) + + with pytest.raises(OverflowError): + dumps(long(2**64)) + + +@pytest.mark.parametrize('value', [2 ** 63, -2 ** 63 - 1]) +def test_int64_overflow(value): + with pytest.raises(OverflowError): + int64_value = YsonInt64(value) + dumps(int64_value) + + if six.PY3: + with pytest.raises(OverflowError): + dumps(value) + + +@pytest.mark.parametrize('value', [2 ** 64, 2 ** 100]) +def test_uint64_overflow(value): + with pytest.raises(OverflowError): + uint64_value = YsonUInt64(value) + dumps(uint64_value) + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +def test_force_write_sequence(format): + class Sequence(object): + def __init__(self, seq): + self._seq = seq + + def __getitem__(self, index): + return self._seq[index] + + def __len__(self): + return len(self._seq) + + sequence = [1, 1.1, None, b'xyz'] + + sink = io.BytesIO() + writer = Writer(OutputStream.from_file(sink), format=format) + + writer.begin_stream() + writer.list(Sequence(sequence)) + writer.end_stream() + + assert sink.getvalue() == dumps(sequence, format) + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +def test_force_write_mapping(format): + class Mapping(object): + def __init__(self, mapping): + self._mapping = mapping + + def __getitem__(self, key): + return self._mapping[key] + + def keys(self): + return self._mapping.keys() + + mapping = {b'a': 1, b'b': 1.1, b'c': None, b'd': b'some'} + + sink = io.BytesIO() + writer = Writer(OutputStream.from_file(sink), format=format) + + writer.begin_stream() + writer.map(Mapping(mapping)) + writer.end_stream() + + assert sink.getvalue() == dumps(mapping, format) + + +@pytest.mark.parametrize('format', ['binary', 'text', 'pretty']) +@pytest.mark.parametrize('value', CASES) +def test_unicode_reader(value, format): + expected = canonize(value, as_unicode=True) + got = loads(dumps(value, format), UnicodeReader) + assert expected == got + + +def test_unicode_reader_raises_unicode_decode_error(): + not_decodable = b'\x80\x81' + with pytest.raises(UnicodeDecodeError): + loads(dumps(not_decodable, format='binary'), UnicodeReader) + + +def test_unicode_reader_decodes_object_with_attributes(): + data = b'{"a" = "b"; "c" = <"foo" = "bar">"d"}' + expected = {u"a": u"b", u"c": u"d"} + assert loads(data, UnicodeReader) == expected diff --git a/library/python/cyson/ut/test_unsigned_long.py b/library/python/cyson/ut/test_unsigned_long.py new file mode 100644 index 0000000000..3cd4ffe440 --- /dev/null +++ b/library/python/cyson/ut/test_unsigned_long.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +from __future__ import division + +import pytest +import six + +from cyson import UInt + + +if six.PY3: + long = int + + +def equals_with_type(data, etalon): + return type(data) is type(etalon) and data == etalon + + +def equals_as_uint(data, etalon): + return type(data) is UInt and data == etalon + + +N = long(12) +UN = UInt(N) + + +def test_uint64_initialization(): + assert UInt(2**63 - 1) == 2**63 - 1 + assert UInt() == UInt(0) == 0 + assert UInt(long(78)) == 78 + assert UInt(23.57) == 23 + assert UInt('111') == 111 + + with pytest.raises(OverflowError): + UInt(-10) + + +def test_add(): + assert equals_as_uint(UN + 1, N + 1) + assert equals_as_uint(UN + long(1), N + 1) + assert equals_as_uint(UN + UInt(1), N + 1) + assert equals_as_uint(1 + UN, 1 + N) + assert equals_as_uint(long(1) + UN, long(1) + N) + assert equals_as_uint(UInt(1) + UN, 1 + N) + assert equals_with_type(UN + 1.1, N + 1.1) + assert equals_with_type(1.1 + UN, 1.1 + N) + assert equals_with_type(UN + int(-N - 1), N + int(-N - 1)) + assert equals_with_type(UN + long(-N - 1), N + long(-N - 1)) + assert equals_with_type(int(-N - 1) + UN, int(-N - 1) + N) + assert equals_with_type(long(-N - 1) + UN, long(-N - 1) + N) + + +def test_sub(): + assert equals_as_uint(UN - 1, N - 1) + assert equals_as_uint(UN - long(1), N - long(1)) + assert equals_as_uint(UN - UInt(1), N - 1) + assert equals_as_uint(13 - UN, 13 - UN) + assert equals_as_uint(UInt(13) - UN, long(13) - N) + assert equals_as_uint(long(13) - UN, long(13) - UN) + assert equals_with_type(UN - 0.1, N - 0.1) + assert equals_with_type(13.1 - UN, 13.1 - N) + assert equals_with_type(1 - UN, long(1) - N) + assert equals_with_type(long(1) - UN, long(1) - N) + assert equals_with_type(UInt(1) - UN, long(1) - N) + assert equals_with_type(UN - int(UN + 1), N - int(UN + 1)) + assert equals_with_type(UN - long(UN + 1), N - long(UN + 1)) + assert equals_with_type(UN - UInt(UN + 1), N - long(UN + 1)) + + +def test_mul(): + assert equals_as_uint(UN * 2, N * 2) + assert equals_as_uint(UN * long(2), N * long(2)) + assert equals_as_uint(UN * UInt(2), N * long(2)) + assert equals_as_uint(2 * UN, 2 * N) + assert equals_as_uint(long(2) * UN, long(2) * UN) + assert equals_as_uint(UInt(2) * UN, long(2) * UN) + assert equals_with_type(-3 * UN, -3 * N) + assert equals_with_type(long(-3) * UN, long(-3) * N) + assert equals_with_type(UN * -3, N * -3) + assert equals_with_type(UN * long(-3), N * long(-3)) + assert equals_with_type(UN * 1.1, N * 1.1) + assert equals_with_type(1.1 * UN, 1.1 * N) + + +def test_truediv(): + assert equals_with_type(UN / 1, N / long(1)) + assert equals_with_type(UN / UInt(1), N / long(1)) + assert equals_with_type(1 / UN, long(1) / N) + assert equals_with_type(UInt(1) / UN, long(1) / N) + assert equals_with_type(UN / N, N / long(N)) + assert equals_with_type(UN / UInt(N), N / long(N)) + assert equals_with_type(UN / -1, N / long(-1)) + assert equals_with_type(-1 / UN, long(-1) / N) + assert equals_with_type(UN / 1.1, N / 1.1) + assert equals_with_type(1.1 / UN, 1.1 / N) + + +def test_floordiv(): + # floor division (__floordiv__) + assert equals_as_uint(UN // 1, N // 1) + assert equals_as_uint(UN // long(1), N // long(1)) + assert equals_as_uint(UN // UInt(1), N // long(1)) + assert equals_as_uint(1 // UN, 1 // N) + assert equals_as_uint(long(1) // UN, long(1) // N) + assert equals_as_uint(UInt(1) // UN, long(1) // N) + assert equals_as_uint(UN // N, N // N) + assert equals_as_uint(UN // UN, N // N) + assert equals_with_type(UN // -1, N // long(-1)) + assert equals_with_type(UN // long(-1), N // long(-1)) + assert equals_with_type(-1 // UN, -long(1) // N) + assert equals_with_type(long(-1) // UN, long(-1) // N) + assert equals_with_type(UN // 1.1, N // 1.1) + assert equals_with_type(1.1 // UN, 1.1 // N) + + +def test_mod(): + assert equals_as_uint(UN % 7, N % 7) + assert equals_as_uint(UN % long(7), N % long(7)) + assert equals_as_uint(UN % UInt(7), N % long(7)) + assert equals_as_uint(23 % UN, 23 % N) + assert equals_as_uint(long(23) % UN, long(23) % N) + assert equals_as_uint(UInt(23) % UN, long(23) % N) + assert equals_as_uint(-23 % UN, -23 % N) + assert equals_as_uint(long(-23) % UN, long(-23) % N) + assert equals_with_type(UN % -11, N % long(-11)) + assert equals_with_type(UN % long(-11), N % long(-11)) + + +def test_pow(): + assert equals_as_uint(UN ** 2, N ** 2) + assert equals_as_uint(UN ** long(2), N ** long(2)) + assert equals_as_uint(UN ** UInt(2), N ** long(2)) + assert equals_as_uint(2 ** UN, 2 ** N) + assert equals_as_uint(long(2) ** UN, long(2) ** N) + assert equals_as_uint(UInt(2) ** UN, long(2) ** N) + assert equals_with_type(UN ** -1, N ** long(-1)) + assert equals_with_type(UN ** long(-1), N ** -long(1)) + assert equals_with_type(UN ** 1.1, N ** 1.1) + assert equals_with_type(UN ** -1.1, N ** -1.1) + assert equals_with_type(1.1 ** UN, 1.1 ** N) + assert equals_with_type(UN ** 0.5, N ** 0.5) + assert equals_with_type(0.5 ** UN, 0.5 ** N) + + +def test_neg(): + assert equals_with_type(-UN, -N) + assert equals_with_type(-UInt(0), long(0)) + + +def test_pos(): + assert equals_as_uint(+UN, N) + assert equals_as_uint(+UInt(0), 0) + + +def test_abs(): + assert equals_as_uint(abs(UN), N) + assert abs(UN) is UN + + +def test_invert(): + assert equals_with_type(~UN, ~N) + assert equals_with_type(~UInt(0), ~long(0)) + + +def test_lshift(): + assert equals_as_uint(1 << UN, 1 << N) + assert equals_as_uint(long(1) << UN, long(1) << N) + assert equals_as_uint(UInt(1) << UN, long(1) << N) + assert equals_as_uint(UN << 2, N << 2) + assert equals_as_uint(UN << long(2), N << 2) + assert equals_as_uint(UN << UInt(2), N << 2) + assert equals_with_type(-1 << UN, -1 << N) + assert equals_with_type(long(-1) << UN, -long(1) << N) + + with pytest.raises(TypeError): + UN << 1.1 + with pytest.raises(TypeError): + 1.1 << UN + with pytest.raises(ValueError): + UN << -1 + + +def test_rshift(): + assert equals_as_uint(10000 >> UN, 10000 >> N) + assert equals_as_uint(long(10000) >> UN, long(10000) >> N) + assert equals_as_uint(UInt(10000) >> UN, long(10000) >> N) + assert equals_as_uint(UN >> 2, N >> 2) + assert equals_as_uint(UN >> long(2), N >> long(2)) + assert equals_as_uint(UN >> UInt(2), N >> long(2)) + assert equals_with_type(-10000 >> UN, -10000 >> N) + assert equals_with_type(long(-10000) >> UN, long(-10000) >> N) + + with pytest.raises(TypeError): + UN >> 1.1 + with pytest.raises(TypeError): + 1.1 >> UN + with pytest.raises(ValueError): + UN >> -1 + + +def test_and(): + assert equals_as_uint(UN & 15, N & 15) + assert equals_as_uint(UN & long(15), N & long(15)) + + with pytest.raises(TypeError): + UN & 1.1 + + +def test_or(): + assert equals_as_uint(UN | 15, N | 15) + assert equals_as_uint(UN | long(15), N | long(15)) + + with pytest.raises(TypeError): + UN | 1.1 + + +def test_xor(): + assert equals_as_uint(UN ^ 9, N ^ 9) + assert equals_as_uint(UN ^ long(9), N ^ long(9)) + + with pytest.raises(TypeError): + UN ^ 1.1 diff --git a/library/python/cyson/ut/ya.make b/library/python/cyson/ut/ya.make new file mode 100644 index 0000000000..1af753735f --- /dev/null +++ b/library/python/cyson/ut/ya.make @@ -0,0 +1,21 @@ +PY23_TEST() + +PEERDIR( + library/python/cyson +) + +IF(NOT OS_WINDOWS) + PEERDIR( + contrib/python/numpy + ) +ENDIF() + +TEST_SRCS( + test_control_attributes.py + test_input_stream.py + test_py_reader_writer.py + test_reader_writer.py + test_unsigned_long.py +) + +END() diff --git a/library/python/cyson/ya.make b/library/python/cyson/ya.make new file mode 100644 index 0000000000..3a66455904 --- /dev/null +++ b/library/python/cyson/ya.make @@ -0,0 +1,28 @@ +PY23_LIBRARY() + +NO_WSHADOW() + +PEERDIR( + library/c/cyson +) + +SRCS( + cyson/helpers.cpp + cyson/unsigned_long.cpp +) + +PY_SRCS( + TOP_LEVEL + cyson/_cyson.pyx + cyson/__init__.py +) + +END() + +RECURSE( + pymodule +) + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/tests/common/test_framework/ya.make b/ydb/library/yql/tests/common/test_framework/ya.make new file mode 100644 index 0000000000..7c7905dc17 --- /dev/null +++ b/ydb/library/yql/tests/common/test_framework/ya.make @@ -0,0 +1,17 @@ +PY23_LIBRARY() + +PY_SRCS( + TOP_LEVEL + yql_utils.py + yqlrun.py +) + +PEERDIR( + contrib/python/six + contrib/python/urllib3 + library/python/cyson + ydb/library/yql/core/file_storage/proto + ydb/library/yql/providers/common/proto +) + +END() diff --git a/ydb/library/yql/tests/common/test_framework/yql_utils.py b/ydb/library/yql/tests/common/test_framework/yql_utils.py new file mode 100644 index 0000000000..7b6718d3f5 --- /dev/null +++ b/ydb/library/yql/tests/common/test_framework/yql_utils.py @@ -0,0 +1,847 @@ +from __future__ import print_function + +import hashlib +import io +import os +import os.path +import six +import sys +import re +import tempfile +import shutil + +from collections import namedtuple, defaultdict +from functools import partial +import codecs +import decimal + +import pytest +import yatest.common + +import logging +import getpass + +logger = logging.getLogger(__name__) + +KSV_ATTR = '''{_yql_row_spec={ + Type=[StructType; + [[key;[DataType;String]]; + [subkey;[DataType;String]]; + [value;[DataType;String]]]]}}''' + + +def get_param(name, default=None): + name = 'YQL_' + name.upper() + return yatest.common.get_param(name, os.environ.get(name) or default) + + +def find_file(path): + arcadia_root = '.' + while '.arcadia.root' not in os.listdir(arcadia_root): + arcadia_root = os.path.join(arcadia_root, '..') + res = os.path.abspath(os.path.join(arcadia_root, path)) + assert os.path.exists(res) + return res + + +output_path_cache = {} + + +def yql_output_path(*args, **kwargs): + if not get_param('LOCAL_BENCH_XX'): + # abspath is needed, because output_path may be relative when test is run directly (without ya make). + return os.path.abspath(yatest.common.output_path(*args, **kwargs)) + + else: + if args and args in output_path_cache: + return output_path_cache[args] + res = os.path.join(tempfile.mkdtemp(prefix='yql_tmp_'), *args) + if args: + output_path_cache[args] = res + return res + + +def yql_binary_path(*args, **kwargs): + if not get_param('LOCAL_BENCH_XX'): + return yatest.common.binary_path(*args, **kwargs) + + else: + return find_file(args[0]) + + +def yql_source_path(*args, **kwargs): + if not get_param('LOCAL_BENCH_XX'): + return yatest.common.source_path(*args, **kwargs) + else: + return find_file(args[0]) + + +def yql_work_path(): + return os.path.abspath('.') + + +YQLExecResult = namedtuple('YQLExecResult', ( + 'std_out', + 'std_err', + 'results', + 'results_file', + 'opt', + 'opt_file', + 'plan', + 'plan_file', + 'program', + 'execution_result', + 'statistics' +)) + +Table = namedtuple('Table', ( + 'name', + 'full_name', + 'content', + 'file', + 'yqlrun_file', + 'attr', + 'format', + 'exists' +)) + + +def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir=None, + attr=None, format_name='yson', def_attr=None, should_exist=False, src_file_alternative=None): + assert '.' in full_name, 'expected name like cedar.Input' + name = '.'.join(full_name.split('.')[1:]) + + if res_dir is None: + res_dir = get_yql_dir('table_') + + exists = True + if content is None: + # try read content from files + src_file = file_path or yqlrun_file + if src_file is None: + # nonexistent table, will be output for query + content = '' + exists = False + else: + if os.path.exists(src_file): + with open(src_file, 'rb') as f: + content = f.read() + elif src_file_alternative and os.path.exists(src_file_alternative): + with open(src_file_alternative, 'rb') as f: + content = f.read() + src_file = src_file_alternative + yqlrun_file, src_file_alternative = src_file_alternative, yqlrun_file + else: + content = '' + exists = False + + file_path = os.path.join(res_dir, name + '.txt') + new_yqlrun_file = os.path.join(res_dir, name + '.yqlrun.txt') + + if exists: + with open(file_path, 'wb') as f: + f.write(content) + + # copy or create yqlrun_file in proper dir + if yqlrun_file is not None: + shutil.copyfile(yqlrun_file, new_yqlrun_file) + else: + with open(new_yqlrun_file, 'wb') as f: + f.write(content) + else: + assert not should_exist, locals() + + if attr is None: + # try read from file + attr_file = None + if os.path.exists(file_path + '.attr'): + attr_file = file_path + '.attr' + elif yqlrun_file is not None and os.path.exists(yqlrun_file + '.attr'): + attr_file = yqlrun_file + '.attr' + elif src_file_alternative is not None and os.path.exists(src_file_alternative + '.attr'): + attr_file = src_file_alternative + '.attr' + + if attr_file is not None: + with open(attr_file) as f: + attr = f.read() + + if attr is None: + attr = def_attr + + if attr is not None: + # probably we get it, now write attr file to proper place + attr_file = new_yqlrun_file + '.attr' + with open(attr_file, 'w') as f: + f.write(attr) + + return Table( + name, + full_name, + content, + file_path, + new_yqlrun_file, + attr, + format_name, + exists + ) + + +def ensure_dir_exists(dir): + # handle race between isdir and mkdir + if os.path.isdir(dir): + return + + try: + os.mkdir(dir) + except OSError: + if not os.path.isdir(dir): + raise + + +def get_yql_dir(prefix): + yql_dir = yql_output_path('yql') + ensure_dir_exists(yql_dir) + res_dir = tempfile.mkdtemp(prefix=prefix, dir=yql_dir) + os.chmod(res_dir, 0o755) + return res_dir + + +def get_cmd_for_files(arg, files): + cmd = ' '.join( + arg + ' ' + name + '@' + files[name] + for name in files + ) + cmd += ' ' + return cmd + + +def read_res_file(file_path): + if os.path.exists(file_path): + with codecs.open(file_path, encoding="utf-8") as descr: + res = descr.read().strip() + if res == '': + log_res = '<EMPTY>' + else: + log_res = res + else: + res = '' + log_res = '<NOTHING>' + return res, log_res + + +def normalize_yson(y): + from cyson import YsonBoolean, YsonEntity + if isinstance(y, YsonBoolean) or isinstance(y, bool): + return 'true' if y else 'false' + if isinstance(y, YsonEntity) or y is None: + return None + if isinstance(y, list): + return [normalize_yson(i) for i in y] + if isinstance(y, dict): + return {normalize_yson(k): normalize_yson(v) for k, v in six.iteritems(y)} + s = str(y) if not isinstance(y, six.text_type) else y.encode('utf-8', errors='xmlcharrefreplace') + return s + + +volatile_attrs = {'DataSize', 'ModifyTime', 'Id', 'Revision'} +current_user = getpass.getuser() + + +def _replace_vals_impl(y): + if isinstance(y, list): + return [_replace_vals_impl(i) for i in y] + if isinstance(y, dict): + return {_replace_vals_impl(k): _replace_vals_impl(v) for k, v in six.iteritems(y) if k not in volatile_attrs} + if isinstance(y, str): + s = y.replace('tmp/yql/' + current_user + '/', 'tmp/') + s = re.sub(r'tmp/[0-9a-f]+-[0-9a-f]+-[0-9a-f]+-[0-9a-f]+', 'tmp/<temp_table_guid>', s) + return s + return y + + +def replace_vals(y): + y = normalize_yson(y) + y = _replace_vals_impl(y) + return y + + +def patch_yson_vals(y, patcher): + if isinstance(y, list): + return [patch_yson_vals(i, patcher) for i in y] + if isinstance(y, dict): + return {patch_yson_vals(k, patcher): patch_yson_vals(v, patcher) for k, v in six.iteritems(y)} + if isinstance(y, str): + return patcher(y) + return y + + +floatRe = re.compile(r'^-?\d*\.\d+$') +floatERe = re.compile(r'^-?(\d*\.)?\d+e[\+\-]?\d+$', re.IGNORECASE) +specFloatRe = re.compile(r'^(-?inf|nan)$', re.IGNORECASE) + + +def fix_double(x): + if floatRe.match(x) and len(x.replace('.', '').replace('-', '')) > 10: + # Emulate the same double precision as C++ code has + decimal.getcontext().rounding = decimal.ROUND_HALF_DOWN + decimal.getcontext().prec = 10 + return str(decimal.Decimal(0) + decimal.Decimal(x)).rstrip('0') + if floatERe.match(x): + # Emulate the same double precision as C++ code has + decimal.getcontext().rounding = decimal.ROUND_HALF_DOWN + decimal.getcontext().prec = 10 + return str(decimal.Decimal(0) + decimal.Decimal(x)).lower() + if specFloatRe.match(x): + return x.lower() + return x + + +def remove_volatile_ast_parts(ast): + return re.sub(r"\(KiClusterConfig '\('\(.*\) '\"\d\" '\"\d\" '\"\d\"\)\)", "(KiClusterConfig)", ast) + + +def prepare_program(program, program_file, yql_dir, ext='yql'): + assert not (program is None and program_file is None), 'Needs program or program_file' + + if program is None: + with codecs.open(program_file, encoding='utf-8') as program_file_descr: + program = program_file_descr.read() + + program_file = os.path.join(yql_dir, 'program.' + ext) + with codecs.open(program_file, 'w', encoding='utf-8') as program_file_descr: + program_file_descr.write(program) + + return program, program_file + + +def get_program_cfg(suite, case, DATA_PATH): + ret = [] + config = os.path.join(DATA_PATH, suite if suite else '', case + '.cfg') + if not os.path.exists(config): + config = os.path.join(DATA_PATH, suite if suite else '', 'default.cfg') + + if os.path.exists(config): + for line in open(config, 'r'): + if line.strip(): + ret.append(tuple(line.split())) + else: + in_filename = case + '.in' + in_path = os.path.join(DATA_PATH, in_filename) + default_filename = 'default.in' + default_path = os.path.join(DATA_PATH, default_filename) + for filepath in [in_path, in_filename, default_path, default_filename]: + if os.path.exists(filepath): + try: + shutil.copy2(filepath, in_path) + except shutil.Error: + pass + ret.append(('in', 'yamr.plato.Input', in_path)) + break + + if not is_os_supported(ret): + pytest.skip('%s not supported here' % sys.platform) + + return ret + + +def find_user_file(suite, path, DATA_PATH): + source_path = os.path.join(DATA_PATH, suite, path) + if os.path.exists(source_path): + return source_path + else: + try: + return yql_binary_path(path) + except Exception: + raise Exception('Can not find file ' + path) + + +def get_input_tables(suite, cfg, DATA_PATH, def_attr=None): + in_tables = [] + for item in cfg: + if item[0] in ('in', 'out'): + io, table_name, file_name = item + if io == 'in': + in_tables.append(new_table( + full_name=table_name.replace('yamr.', '').replace('yt.', ''), + yqlrun_file=os.path.join(DATA_PATH, suite if suite else '', file_name), + src_file_alternative=os.path.join(yql_work_path(), suite if suite else '', file_name), + def_attr=def_attr, + should_exist=True + )) + return in_tables + + +def get_tables(suite, cfg, DATA_PATH, def_attr=None): + in_tables = [] + out_tables = [] + suite_dir = os.path.join(DATA_PATH, suite) + res_dir = get_yql_dir('table_') + + for splitted in cfg: + if splitted[0] == 'udf' and yatest.common.context.sanitize == 'undefined': + pytest.skip("udf under ubsan") + + if len(splitted) == 4: + type_name, table, file_name, format_name = splitted + elif len(splitted) == 3: + type_name, table, file_name = splitted + format_name = 'yson' + else: + continue + yqlrun_file = os.path.join(suite_dir, file_name) + if type_name == 'in': + in_tables.append(new_table( + full_name='plato.' + table if '.' not in table else table, + yqlrun_file=yqlrun_file, + format_name=format_name, + def_attr=def_attr, + res_dir=res_dir + )) + if type_name == 'out': + out_tables.append(new_table( + full_name='plato.' + table if '.' not in table else table, + yqlrun_file=yqlrun_file if os.path.exists(yqlrun_file) else None, + res_dir=res_dir + )) + return in_tables, out_tables + + +def get_supported_providers(cfg): + providers = 'yt', 'kikimr', 'dq' + for item in cfg: + if item[0] == 'providers': + providers = [i.strip() for i in ''.join(item[1:]).split(',')] + return providers + + +def is_os_supported(cfg): + for item in cfg: + if item[0] == 'os': + return any(sys.platform.startswith(_os) for _os in item[1].split(',')) + return True + + +def is_xfail(cfg): + for item in cfg: + if item[0] == 'xfail': + return True + return False + + +def is_canonize_peephole(cfg): + for item in cfg: + if item[0] == 'canonize_peephole': + return True + return False + + +def is_canonize_lineage(cfg): + for item in cfg: + if item[0] == 'canonize_lineage': + return True + return False + + +def get_pragmas(cfg): + pragmas = [] + for item in cfg: + if item[0] == 'pragma': + pragmas.append(' '.join(item)) + return pragmas + + +def execute( + klass=None, + program=None, + program_file=None, + files=None, + urls=None, + run_sql=False, + verbose=False, + check_error=True, + input_tables=None, + output_tables=None, + pretty_plan=True, + parameters={}, +): + ''' + Executes YQL/SQL + + :param klass: KiKiMRForYQL if instance (default: YQLRun) + :param program: string with YQL or SQL program + :param program_file: file with YQL or SQL program (optional, if :param program: is None) + :param files: dict like {'name': '/path'} with extra files + :param urls: dict like {'name': url} with extra files urls + :param run_sql: execute sql instead of yql + :param verbose: log all results and diagnostics + :param check_error: fail on non-zero exit code + :param input_tables: list of Table (will be written if not exist) + :param output_tables: list of Table (will be returned) + :param pretty_plan: whether to use pretty printing for plan or not + :param parameters: query parameters as dict like {name: json_value} + :return: YQLExecResult + ''' + + if input_tables is None: + input_tables = [] + else: + assert isinstance(input_tables, list) + if output_tables is None: + output_tables = [] + + klass.write_tables(input_tables + output_tables) + + res = klass.yql_exec( + program=program, + program_file=program_file, + files=files, + urls=urls, + run_sql=run_sql, + verbose=verbose, + check_error=check_error, + tables=(output_tables + input_tables), + pretty_plan=pretty_plan, + parameters=parameters + ) + + try: + res_tables = klass.get_tables(output_tables) + except Exception: + if check_error: + raise + res_tables = {} + + return res, res_tables + + +execute_sql = partial(execute, run_sql=True) + + +def log(s): + if get_param('STDERR'): + print(s, file=sys.stderr) + else: + logger.debug(s) + + +def tmpdir_module(request): + return tempfile.mkdtemp(prefix='kikimr_test_') + + +@pytest.fixture(name='tmpdir_module', scope='module') +def tmpdir_module_fixture(request): + return tmpdir_module(request) + + +def escape_backslash(s): + return s.replace('\\', '\\\\') + + +def get_default_mount_point_config_content(): + return ''' + MountPoints { + RootAlias: '/lib' + MountPoint: '%s' + Library: true + } + ''' % ( + escape_backslash(yql_source_path('ydb/library/yql/mount/lib')) + ) + + +def get_mount_config_file(content=None): + config = yql_output_path('mount.cfg') + if not os.path.exists(config): + with open(config, 'w') as f: + f.write(content or get_default_mount_point_config_content()) + return config + + +def run_command(program, cmd, tmpdir_module=None, stdin=None, + check_exit_code=True, env=None, stdout=None): + if tmpdir_module is None: + tmpdir_module = tempfile.mkdtemp() + + stdin_stream = None + if isinstance(stdin, six.string_types): + with tempfile.NamedTemporaryFile( + prefix='stdin_', + dir=tmpdir_module, + delete=False + ) as stdin_file: + stdin_file.write(stdin.encode() if isinstance(stdin, str) else stdin) + stdin_stream = open(stdin_file.name) + elif isinstance(stdin, io.IOBase): + stdin_stream = stdin + elif stdin is not None: + assert 0, 'Strange stdin ' + repr(stdin) + + if isinstance(cmd, six.string_types): + cmd = cmd.split() + else: + cmd = [str(c) for c in cmd] + log(' '.join('\'%s\'' % c if ' ' in c else c for c in cmd)) + cmd = [program] + cmd + + stderr_stream = None + stdout_stream = None + + if stdout: + stdout_stream = stdout + + res = yatest.common.execute( + cmd, + cwd=tmpdir_module, + stdin=stdin_stream, + stdout=stdout_stream, + stderr=stderr_stream, + check_exit_code=check_exit_code, + env=env, + wait=True + ) + + if res.std_err: + log(res.std_err) + if res.std_out: + log(res.std_out) + return res + + +def yson_to_csv(yson_content, columns=None, with_header=True, strict=False): + import cyson as yson + if columns: + headers = sorted(columns) + else: + headers = set() + for item in yson.loads(yson_content, yson_type='list_fragment'): + headers.update(six.iterkeys(item)) + headers = sorted(headers) + csv_content = [] + if with_header: + csv_content.append(';'.join(headers)) + for item in yson.loads(yson_content, yson_type='list_fragment'): + if strict and sorted(six.iterkeys(item)) != headers: + return None + csv_content.append(';'.join([str(item[h]).replace('YsonEntity', '').encode('string_escape') if h in item else '' for h in headers])) + return '\n'.join(csv_content) + + +def get_udfs_path(extra_paths=None): + udfs_build_path = yatest.common.build_path('yql/udfs') + ydb_udfs_build_path = yatest.common.build_path('ydb/library/yql/udfs') + rthub_udfs_build_path = yatest.common.build_path('robot/rthub/yql/udfs') + kwyt_udfs_build_path = yatest.common.build_path('robot/kwyt/yql/udfs') + + try: + udfs_bin_path = yatest.common.binary_path('yql/udfs') + except Exception: + udfs_bin_path = None + + try: + udfs_project_path = yql_binary_path('yql/library/test_framework/udfs_deps') + except Exception: + udfs_project_path = None + + merged_udfs_path = yql_output_path('yql_udfs') + if not os.path.isdir(merged_udfs_path): + os.mkdir(merged_udfs_path) + + udfs_paths = [udfs_project_path, udfs_bin_path, udfs_build_path, ydb_udfs_build_path, rthub_udfs_build_path, kwyt_udfs_build_path] + if extra_paths is not None: + udfs_paths += extra_paths + + log('process search UDF in: %s, %s, %s' % (udfs_project_path, udfs_bin_path, udfs_build_path)) + for _udfs_path in udfs_paths: + if _udfs_path: + for dirpath, dnames, fnames in os.walk(_udfs_path): + for f in fnames: + if f.endswith('.so'): + f = os.path.join(dirpath, f) + if not os.path.exists(f) and os.path.lexists(f): # seems like broken symlink + try: + os.unlink(f) + except OSError: + pass + link_name = os.path.join(merged_udfs_path, os.path.basename(f)) + if not os.path.exists(link_name): + os.symlink(f, link_name) + log('Added UDF: ' + f) + return merged_udfs_path + + +def get_test_prefix(): + return 'yql_tmp_' + hashlib.md5(yatest.common.context.test_name).hexdigest() + + +def normalize_plan_ids(plan, no_detailed=False): + remapOps = {} + + for node in sorted(filter(lambda n: n["type"] == "in", plan["Basic"]["nodes"]), key=lambda n: n.get("name")): + if node["id"] not in remapOps: + remapOps[node["id"]] = len(remapOps) + 1 + + for node in plan["Basic"]["nodes"]: + if node["id"] not in remapOps: + remapOps[node["id"]] = len(remapOps) + 1 + + def subst_basic(y): + if isinstance(y, list): + return [subst_basic(i) for i in y] + if isinstance(y, dict): + res = {} + for k, v in six.iteritems(y): + if k in {'source', 'target', 'id'}: + res[k] = remapOps.get(v) + elif k == "links": + res[k] = sorted(subst_basic(v), key=lambda x: (x["source"], x["target"])) + elif k == "nodes": + res[k] = sorted(subst_basic(v), key=lambda x: x["id"]) + else: + res[k] = subst_basic(v) + return res + return y + + # Sort and normalize input ids + def subst_detailed(y): + if isinstance(y, list): + return [subst_detailed(i) for i in y] + if isinstance(y, dict): + res = {} + for k, v in six.iteritems(y): + if k == "DependsOn": + res[k] = sorted([remapOps.get(i) for i in v]) + elif k == "Providers": + res[k] = v + elif k in {'OperationRoot', 'Id'}: + res[k] = remapOps.get(v) + else: + res[k] = subst_detailed(v) + return res + return y + + if no_detailed: + return {"Basic": subst_basic(plan["Basic"])} + return {"Basic": subst_basic(plan["Basic"]), "Detailed": subst_detailed(plan["Detailed"])} + + +def normalized_plan_stats(plan): + renameMap = { + "MrLMap!": "YtMap!", + "MrMapReduce!": "YtMapReduce!", + "MrLReduce!": "YtMapReduce!", + "MrOrderedReduce!": "YtReduce!", + "MrSort!": "YtSort!", + "MrCopy!": "YtCopy!", + "YtMerge!": "YtCopy!", + "MrFill!": "YtFill!", + "MrDrop!": "YtDropTable!", + "YtTouch!": None, + "MrReadTable!": None, + "YtReadTable!": None, + "MrPublish!": "YtPublish!", + "MrReadTableScheme!": "YtReadTableScheme!", + } + + normalizedStat = defaultdict(int) + + for op, stat in six.iteritems(plan["Detailed"]["OperationStats"]): + renamedOp = renameMap.get(op, op) + if renamedOp is not None: + normalizedStat[renamedOp] += stat + + return normalizedStat + + +def normalize_table_yson(y): + from cyson import YsonEntity + if isinstance(y, list): + return [normalize_table_yson(i) for i in y] + if isinstance(y, dict): + normDict = dict() + for k, v in six.iteritems(y): + if k == "_other": + normDict[normalize_table_yson(k)] = sorted(normalize_table_yson(v)) + elif v != "Void" and v is not None and not isinstance(v, YsonEntity): + normDict[normalize_table_yson(k)] = normalize_table_yson(v) + return normDict + return y + + +def hide_source_line_number(s): + # replace line number in source code with 'xxx' + s = re.sub(r'\b(yql/[\w/]+(?:\.cpp|\.h)):(?:\d+)', r'\1:xxx', s) + return re.sub(r'(/lib/yql/[\w/]+(?:\.yql|\.sql)):(?:\d+):(?:\d+)', r'\1:xxx:yyy', s) + + +def do_get_files(suite, config, DATA_PATH, config_key): + files = dict() + suite_dir = os.path.join(DATA_PATH, suite) + res_dir = None + for line in config: + if line[0] == config_key: + _, name, path = line + userpath = find_user_file(suite, path, DATA_PATH) + relpath = os.path.relpath(userpath, suite_dir) + if os.path.exists(os.path.join('cwd', relpath)): + path = relpath + else: + path = userpath + + if not res_dir: + res_dir = get_yql_dir('file_') + + new_path = os.path.join(res_dir, os.path.basename(path)) + shutil.copyfile(path, new_path) + + files[name] = new_path + + return files + + +def get_files(suite, config, DATA_PATH): + return do_get_files(suite, config, DATA_PATH, 'file') + + +def get_http_files(suite, config, DATA_PATH): + return do_get_files(suite, config, DATA_PATH, 'http_file') + + +def get_yt_files(suite, config, DATA_PATH): + return do_get_files(suite, config, DATA_PATH, 'yt_file') + + +def get_syntax_version(program): + syntax_version_param = get_param('SYNTAX_VERSION') + default_syntax_version = 1 + if 'syntax version 0' in program: + return 0 + elif 'syntax version 1' in program: + return 1 + elif syntax_version_param: + return int(syntax_version_param) + else: + return default_syntax_version + + +def ansi_lexer_enabled(program): + return 'ansi_lexer' in program + + +class LoggingDowngrade(object): + + def __init__(self, loggers, level=logging.CRITICAL): + self.loggers = [(name, logging.getLogger(name).getEffectiveLevel()) for name in loggers] + self.level = level + + def __enter__(self): + self.prev_levels = [] + for name, _ in self.loggers: + log = logging.getLogger(name) + log.setLevel(self.level) + return self + + def __exit__(self, exc_type, exc_value, tb): + for name, level in self.loggers: + log = logging.getLogger(name) + log.setLevel(level) + return True diff --git a/ydb/library/yql/tests/common/test_framework/yqlrun.py b/ydb/library/yql/tests/common/test_framework/yqlrun.py new file mode 100644 index 0000000000..f0b1303613 --- /dev/null +++ b/ydb/library/yql/tests/common/test_framework/yqlrun.py @@ -0,0 +1,331 @@ +import os +import pytest +import shutil +import yatest.common +import yql_utils +import cyson as yson +import ydb.library.yql.providers.common.proto.gateways_config_pb2 as gateways_config_pb2 +import ydb.library.yql.core.file_storage.proto.file_storage_pb2 as file_storage_pb2 + +import six + +from google.protobuf import text_format + +ARCADIA_PREFIX = 'arcadia/' +ARCADIA_TESTS_DATA_PREFIX = 'arcadia_tests_data/' + +VAR_CHAR_PREFIX = '$' +FIX_DIR_PREFIXES = { + 'SOURCE': yatest.common.source_path, + 'BUILD': yatest.common.build_path, + 'TEST_SOURCE': yatest.common.test_source_path, + 'DATA': yatest.common.data_path, + 'BINARY': yatest.common.binary_path, +} + + +class YQLRun(object): + + def __init__(self, udfs_dir=None, prov='yt', use_sql2yql=False, keep_temp=True, binary=None, gateway_config=None, fs_config=None, extra_args=[], cfg_dir=None): + if binary is None: + self.yqlrun_binary = yql_utils.yql_binary_path('ydb/library/yql/tools/yqlrun/yqlrun') + else: + self.yqlrun_binary = binary + self.extra_args = extra_args + + try: + self.sql2yql_binary = yql_utils.yql_binary_path('ydb/library/yql/tools/sql2yql/sql2yql') + except BaseException: + self.sql2yql_binary = None + + try: + self.udf_resolver_binary = yql_utils.yql_binary_path('yql/tools/udf_resolver/udf_resolver') + except Exception: + self.udf_resolver_binary = None + + if udfs_dir is None: + self.udfs_path = yql_utils.get_udfs_path() + else: + self.udfs_path = udfs_dir + res_dir = yql_utils.get_yql_dir(prefix='yqlrun_') + self.res_dir = res_dir + self.tables = {} + self.prov = prov + self.use_sql2yql = use_sql2yql + self.keep_temp = keep_temp + + self.gateway_config = gateways_config_pb2.TGatewaysConfig() + if gateway_config is not None: + text_format.Merge(gateway_config, self.gateway_config) + + if cfg_dir is None: + cfg_dir = 'yql/cfg/tests' + with open(yql_utils.yql_source_path(cfg_dir + '/gateways.conf')) as f: + text_format.Merge(f.read(), self.gateway_config) + + self.fs_config = file_storage_pb2.TFileStorageConfig() + + with open(yql_utils.yql_source_path(cfg_dir + '/fs.conf')) as f: + text_format.Merge(f.read(), self.fs_config) + + if fs_config is not None: + text_format.Merge(fs_config, self.fs_config) + + if yql_utils.get_param('USE_NATIVE_YT_TYPES'): + attr = self.gateway_config.Yt.DefaultSettings.add() + attr.Name = 'UseNativeYtTypes' + attr.Value = 'true' + + if yql_utils.get_param('SQL_FLAGS'): + flags = yql_utils.get_param('SQL_FLAGS').split(',') + self.gateway_config.SqlCore.TranslationFlags.extend(flags) + + def yql_exec(self, program=None, program_file=None, files=None, urls=None, + run_sql=False, verbose=False, check_error=True, tables=None, pretty_plan=True, + wait=True, parameters={}, extra_env={}): + del pretty_plan + + res_dir = self.res_dir + + def res_file_path(name): + return os.path.join(res_dir, name) + + opt_file = res_file_path('opt.yql') + results_file = res_file_path('results.txt') + plan_file = res_file_path('plan.txt') + err_file = res_file_path('err.txt') + + udfs_dir = self.udfs_path + prov = self.prov + + program, program_file = yql_utils.prepare_program(program, program_file, res_dir, + ext='sql' if run_sql else 'yql') + + syntax_version = yql_utils.get_syntax_version(program) + ansi_lexer = yql_utils.ansi_lexer_enabled(program) + + if run_sql and self.use_sql2yql: + orig_sql = program_file + '.orig_sql' + shutil.copy2(program_file, orig_sql) + cmd = [ + self.sql2yql_binary, + orig_sql, + '--yql', + '--output=' + program_file, + '--syntax-version=%d' % syntax_version + ] + if ansi_lexer: + cmd.append('--ansi-lexer') + env = {'YQL_DETERMINISTIC_MODE': '1'} + env.update(extra_env) + for var in ['LLVM_PROFILE_FILE', 'GO_COVERAGE_PREFIX', 'PYTHON_COVERAGE_PREFIX', 'NLG_COVERAGE_FILENAME']: + if var in os.environ: + env[var] = os.environ[var] + yatest.common.process.execute(cmd, cwd=res_dir, env=env) + + with open(program_file) as f: + yql_program = f.read() + with open(program_file, 'w') as f: + f.write(yql_program) + + gateways_cfg_file = res_file_path('gateways.conf') + with open(gateways_cfg_file, 'w') as f: + f.write(str(self.gateway_config)) + + fs_cfg_file = res_file_path('fs.conf') + with open(fs_cfg_file, 'w') as f: + f.write(str(self.fs_config)) + + cmd = self.yqlrun_binary + ' ' + + if yql_utils.get_param('TRACE_OPT'): + cmd += '--trace-opt ' + + cmd += '-L ' \ + '--program=%(program_file)s ' \ + '--expr-file=%(opt_file)s ' \ + '--result-file=%(results_file)s ' \ + '--plan-file=%(plan_file)s ' \ + '--err-file=%(err_file)s ' \ + '--udfs-dir=%(udfs_dir)s ' \ + '--gateways=%(prov)s ' \ + '--syntax-version=%(syntax_version)d ' \ + '--tmp-dir=%(res_dir)s ' \ + '--gateways-cfg=%(gateways_cfg_file)s ' \ + '--fs-cfg=%(fs_cfg_file)s ' % locals() + + if ansi_lexer: + cmd += '--ansi-lexer ' + + if self.keep_temp: + cmd += '--keep-temp ' + + if self.extra_args: + cmd += " ".join(self.extra_args) + " " + + cmd += '--mounts=' + yql_utils.get_mount_config_file() + ' ' + + if files: + for f in files: + if files[f].startswith(ARCADIA_PREFIX): # how does it work with folders? and does it? + files[f] = yatest.common.source_path(files[f][len(ARCADIA_PREFIX):]) + continue + if files[f].startswith(ARCADIA_TESTS_DATA_PREFIX): + files[f] = yatest.common.data_path(files[f][len(ARCADIA_TESTS_DATA_PREFIX):]) + continue + + if files[f].startswith(VAR_CHAR_PREFIX): + for prefix, func in six.iteritems(FIX_DIR_PREFIXES): + if files[f].startswith(VAR_CHAR_PREFIX + prefix): + real_path = func(files[f][len(prefix) + 2:]) # $ + prefix + / + break + else: + raise Exception("unknown prefix in file path %s" % (files[f],)) + copy_dest = os.path.join(res_dir, f) + if not os.path.exists(os.path.dirname(copy_dest)): + os.makedirs(os.path.dirname(copy_dest)) + shutil.copy2( + real_path, + copy_dest, + ) + files[f] = f + continue + + if not files[f].startswith('/'): # why do we check files[f] instead of f here? + path_to_copy = os.path.join( + yatest.common.work_path(), + files[f] + ) + if '/' in files[f]: + copy_dest = os.path.join( + res_dir, + os.path.dirname(files[f]) + ) + if not os.path.exists(copy_dest): + os.makedirs(copy_dest) + else: + copy_dest = res_dir + files[f] = os.path.basename(files[f]) + shutil.copy2(path_to_copy, copy_dest) + else: + shutil.copy2(files[f], res_dir) + files[f] = os.path.basename(files[f]) + cmd += yql_utils.get_cmd_for_files('--file', files) + + if urls: + cmd += yql_utils.get_cmd_for_files('--url', urls) + + optimize_only = False + if tables: + for table in tables: + self.tables[table.full_name] = table + if table.format != 'yson': + optimize_only = True + for name in self.tables: + cmd += '--table=yt.%s@%s ' % (name, self.tables[name].yqlrun_file) + + if "--lineage" not in self.extra_args: + if optimize_only: + cmd += '-O ' + else: + cmd += '--run ' + + if yql_utils.get_param('UDF_RESOLVER'): + cmd += '--udf-resolver=' + self.udf_resolver_binary + ' --scan-udfs ' + if not yatest.common.context.sanitize: + cmd += ' --udf-resolver-filter-syscalls ' + + if run_sql and not self.use_sql2yql: + cmd += '--sql ' + + if yql_utils.get_param('MULTIRUN'): + if '/* multirun can not */' in yql_program: + pytest.skip('multirun can not execute this') + cmd += '-M %s ' % yql_utils.get_param('MULTIRUN') + + if parameters: + parameters_file = res_file_path('params.yson') + with open(parameters_file, 'w') as f: + f.write(yson.dumps(parameters)) + cmd += '--params-file=%s ' % parameters_file + + if verbose: + yql_utils.log('prov is ' + self.prov) + + env = {'YQL_DETERMINISTIC_MODE': '1'} + env.update(extra_env) + for var in ['LLVM_PROFILE_FILE', 'GO_COVERAGE_PREFIX', 'PYTHON_COVERAGE_PREFIX', 'NLG_COVERAGE_FILENAME']: + if var in os.environ: + env[var] = os.environ[var] + if yql_utils.get_param('STDERR'): + debug_udfs_dir = os.path.join(os.path.abspath('.'), '..', '..', '..') + env_setters = ";".join("{}={}".format(k, v) for k, v in six.iteritems(env)) + yql_utils.log('GDB launch command:') + yql_utils.log('(cd "%s" && %s ya tool gdb --args %s)' % (res_dir, env_setters, cmd.replace(udfs_dir, debug_udfs_dir))) + + proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=res_dir, env=env) + if proc_result.exit_code != 0 and check_error: + with open(err_file, 'r') as f: + err_file_text = f.read() + assert 0, \ + 'Command\n%(command)s\n finished with exit code %(code)d, stderr:\n\n%(stderr)s\n\nerror file:\n%(err_file)s' % { + 'command': cmd, + 'code': proc_result.exit_code, + 'stderr': proc_result.std_err, + 'err_file': err_file_text + } + + if os.path.exists(results_file) and os.stat(results_file).st_size == 0: + os.unlink(results_file) # kikimr yql-exec compatibility + + results, log_results = yql_utils.read_res_file(results_file) + plan, log_plan = yql_utils.read_res_file(plan_file) + opt, log_opt = yql_utils.read_res_file(opt_file) + err, log_err = yql_utils.read_res_file(err_file) + + if verbose: + yql_utils.log('PROGRAM:') + yql_utils.log(program) + yql_utils.log('OPT:') + yql_utils.log(log_opt) + yql_utils.log('PLAN:') + yql_utils.log(log_plan) + yql_utils.log('RESULTS:') + yql_utils.log(log_results) + yql_utils.log('ERROR:') + yql_utils.log(log_err) + + return yql_utils.YQLExecResult( + proc_result.std_out, + yql_utils.hide_source_line_number(err.replace(res_dir, '<tmp_path>')), + results, + results_file, + opt, + opt_file, + plan, + plan_file, + program, + proc_result, + None + ) + + def create_empty_tables(self, tables): + pass + + def write_tables(self, tables): + pass + + def get_tables(self, tables): + res = {} + for table in tables: + # recreate table after yql program was executed + res[table.full_name] = yql_utils.new_table( + table.full_name, + yqlrun_file=self.tables[table.full_name].yqlrun_file, + res_dir=self.res_dir + ) + + yql_utils.log('YQLRun table ' + table.full_name) + yql_utils.log(res[table.full_name].content) + + return res diff --git a/ydb/library/yql/tests/common/udf_test/test.py b/ydb/library/yql/tests/common/udf_test/test.py new file mode 100644 index 0000000000..dffe4cd381 --- /dev/null +++ b/ydb/library/yql/tests/common/udf_test/test.py @@ -0,0 +1,106 @@ +import os +import os.path +import glob +import codecs +import shutil + +import pytest + +import yql_utils +from yqlrun import YQLRun + +import yatest.common + +project_path = yatest.common.context.project_path +SOURCE_PATH = yql_utils.yql_source_path((project_path + '/cases').replace('\\', '/')) +DATA_PATH = yatest.common.output_path('cases') +ASTDIFF_PATH = yql_utils.yql_binary_path('ydb/library/yql/tools/astdiff/astdiff') + + +def pytest_generate_tests(metafunc): + if os.path.exists(SOURCE_PATH): + shutil.copytree(SOURCE_PATH, DATA_PATH) + cases = sorted([os.path.basename(sql_query)[:-4] for sql_query in glob.glob(DATA_PATH + '/*.sql')]) + + else: + cases = [] + metafunc.parametrize(['case'], [(case, ) for case in cases]) + + +def test(case): + program_file = os.path.join(DATA_PATH, case + '.sql') + + with codecs.open(program_file, encoding='utf-8') as f: + program = f.readlines() + + header = program[0] + canonize_ast = False + + if header.startswith('--ignore'): + pytest.skip(header) + elif header.startswith('--sanitizer ignore') and yatest.common.context.sanitize is not None: + pytest.skip(header) + elif header.startswith('--sanitizer ignore address') and yatest.common.context.sanitize == 'address': + pytest.skip(header) + elif header.startswith('--sanitizer ignore memory') and yatest.common.context.sanitize == 'memory': + pytest.skip(header) + elif header.startswith('--sanitizer ignore thread') and yatest.common.context.sanitize == 'thread': + pytest.skip(header) + elif header.startswith('--sanitizer ignore undefined') and yatest.common.context.sanitize == 'undefined': + pytest.skip(header) + elif header.startswith('--canonize ast'): + canonize_ast = True + + program = '\n'.join(['use plato;'] + program) + + cfg = yql_utils.get_program_cfg(None, case, DATA_PATH) + files = {} + diff_tool = None + for item in cfg: + if item[0] == 'file': + files[item[1]] = item[2] + if item[0] == 'diff_tool': + diff_tool = item[1:] + + in_tables = yql_utils.get_input_tables(None, cfg, DATA_PATH, def_attr=yql_utils.KSV_ATTR) + + udfs_dir = yql_utils.get_udfs_path([ + yatest.common.build_path(os.path.join(yatest.common.context.project_path, "..")) + ]) + + xfail = yql_utils.is_xfail(cfg) + if yql_utils.get_param('TARGET_PLATFORM') and xfail: + pytest.skip('xfail is not supported on non-default target platform') + + extra_env = dict(os.environ) + extra_env["YQL_UDF_RESOLVER"] = "1" + extra_env["YQL_ARCADIA_BINARY_PATH"] = os.path.expandvars(yatest.common.build_path('.')) + extra_env["YQL_ARCADIA_SOURCE_PATH"] = os.path.expandvars(yatest.common.source_path('.')) + extra_env["Y_NO_AVX_IN_DOT_PRODUCT"] = "1" + + # this breaks tests using V0 syntax + if "YA_TEST_RUNNER" in extra_env: + del extra_env["YA_TEST_RUNNER"] + + yqlrun_res = YQLRun(udfs_dir=udfs_dir, prov='yt', use_sql2yql=False, cfg_dir='ydb/library/yql/cfg/udf_test').yql_exec( + program=program, + run_sql=True, + tables=in_tables, + files=files, + check_error=not xfail, + extra_env=extra_env + ) + + if xfail: + assert yqlrun_res.execution_result.exit_code != 0 + + results_path = os.path.join(yql_utils.yql_output_path(), case + '.results.txt') + with open(results_path, 'w') as f: + f.write(yqlrun_res.results) + + to_canonize = [yqlrun_res.std_err] if xfail else [yatest.common.canonical_file(yqlrun_res.results_file, local=True, diff_tool=diff_tool)] + + if canonize_ast: + to_canonize += [yatest.common.canonical_file(yqlrun_res.opt_file, local=True, diff_tool=ASTDIFF_PATH)] + + return to_canonize diff --git a/ydb/library/yql/tests/common/udf_test/ya.make b/ydb/library/yql/tests/common/udf_test/ya.make new file mode 100644 index 0000000000..ff12e9e4f5 --- /dev/null +++ b/ydb/library/yql/tests/common/udf_test/ya.make @@ -0,0 +1,9 @@ +PY23_LIBRARY() + +TEST_SRCS(test.py) + +PEERDIR( + ydb/library/yql/tests/common/test_framework +) + +END() diff --git a/ydb/library/yql/tests/common/ya.make b/ydb/library/yql/tests/common/ya.make new file mode 100644 index 0000000000..d1f83c5bca --- /dev/null +++ b/ydb/library/yql/tests/common/ya.make @@ -0,0 +1,5 @@ +RECURSE( + udf_test + test_framework +) + diff --git a/ydb/library/yql/tests/ya.make b/ydb/library/yql/tests/ya.make new file mode 100644 index 0000000000..25ab5886b2 --- /dev/null +++ b/ydb/library/yql/tests/ya.make @@ -0,0 +1,3 @@ +RECURSE( + common +) diff --git a/ydb/library/yql/ya.make b/ydb/library/yql/ya.make index 1358128815..3aafe2834a 100644 --- a/ydb/library/yql/ya.make +++ b/ydb/library/yql/ya.make @@ -8,6 +8,7 @@ RECURSE( providers public sql + tests tools udfs utils |