diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/apache/avro/impl | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/apache/avro/impl')
28 files changed, 9429 insertions, 0 deletions
diff --git a/contrib/libs/apache/avro/impl/BinaryDecoder.cc b/contrib/libs/apache/avro/impl/BinaryDecoder.cc new file mode 100644 index 0000000000..71cbf9f107 --- /dev/null +++ b/contrib/libs/apache/avro/impl/BinaryDecoder.cc @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define __STDC_LIMIT_MACROS + +#include <memory> +#include "Decoder.hh" +#include "Zigzag.hh" +#include "Exception.hh" + +namespace avro { + +using std::make_shared; + +class BinaryDecoder : public Decoder { + StreamReader in_; + const uint8_t* next_; + const uint8_t* end_; + + void init(InputStream& ib); + void decodeNull(); + bool decodeBool(); + int32_t decodeInt(); + int64_t decodeLong(); + float decodeFloat(); + double decodeDouble(); + void decodeString(std::string& value); + void skipString(); + void decodeBytes(std::vector<uint8_t>& value); + void skipBytes(); + void decodeFixed(size_t n, std::vector<uint8_t>& value); + void skipFixed(size_t n); + size_t decodeEnum(); + size_t arrayStart(); + size_t arrayNext(); + size_t skipArray(); + size_t mapStart(); + size_t mapNext(); + size_t skipMap(); + size_t decodeUnionIndex(); + + int64_t doDecodeLong(); + size_t doDecodeItemCount(); + size_t doDecodeLength(); + void drain(); + void more(); +}; + +DecoderPtr binaryDecoder() +{ + return make_shared<BinaryDecoder>(); +} + +void BinaryDecoder::init(InputStream& is) +{ + in_.reset(is); +} + +void BinaryDecoder::decodeNull() +{ +} + +bool BinaryDecoder::decodeBool() +{ + uint8_t v = in_.read(); + if (v == 0) { + return false; + } else if (v == 1) { + return true; + } + throw Exception("Invalid value for bool"); +} + +int32_t BinaryDecoder::decodeInt() +{ + int64_t val = doDecodeLong(); + if (val < INT32_MIN || val > INT32_MAX) { + throw Exception( + boost::format("Value out of range for Avro int: %1%") % val); + } + return static_cast<int32_t>(val); +} + +int64_t BinaryDecoder::decodeLong() +{ + return doDecodeLong(); +} + +float BinaryDecoder::decodeFloat() +{ + float result; + in_.readBytes(reinterpret_cast<uint8_t *>(&result), sizeof(float)); + return result; +} + +double BinaryDecoder::decodeDouble() +{ + double result; + in_.readBytes(reinterpret_cast<uint8_t *>(&result), sizeof(double)); + return result; +} + +size_t BinaryDecoder::doDecodeLength() +{ + ssize_t len = decodeInt(); + if (len < 0) { + throw Exception( + boost::format("Cannot have negative length: %1%") % len); + } + return len; +} + +void BinaryDecoder::drain() +{ + in_.drain(false); +} + +void BinaryDecoder::decodeString(std::string& value) +{ + size_t len = doDecodeLength(); + value.resize(len); + if (len > 0) { + in_.readBytes(const_cast<uint8_t*>( + reinterpret_cast<const uint8_t*>(value.c_str())), len); + } +} + +void BinaryDecoder::skipString() +{ + size_t len = doDecodeLength(); + in_.skipBytes(len); +} + +void BinaryDecoder::decodeBytes(std::vector<uint8_t>& value) +{ + size_t len = doDecodeLength(); + value.resize(len); + if (len > 0) { + in_.readBytes(value.data(), len); + } +} + +void BinaryDecoder::skipBytes() +{ + size_t len = doDecodeLength(); + in_.skipBytes(len); +} + +void BinaryDecoder::decodeFixed(size_t n, std::vector<uint8_t>& value) +{ + value.resize(n); + if (n > 0) { + in_.readBytes(value.data(), n); + } +} + +void BinaryDecoder::skipFixed(size_t n) +{ + in_.skipBytes(n); +} + +size_t BinaryDecoder::decodeEnum() +{ + return static_cast<size_t>(doDecodeLong()); +} + +size_t BinaryDecoder::arrayStart() +{ + return doDecodeItemCount(); +} + +size_t BinaryDecoder::doDecodeItemCount() +{ + int64_t result = doDecodeLong(); + if (result < 0) { + doDecodeLong(); + return static_cast<size_t>(-result); + } + return static_cast<size_t>(result); +} + +size_t BinaryDecoder::arrayNext() +{ + return static_cast<size_t>(doDecodeLong()); +} + +size_t BinaryDecoder::skipArray() +{ + for (; ;) { + int64_t r = doDecodeLong(); + if (r < 0) { + size_t n = static_cast<size_t>(doDecodeLong()); + in_.skipBytes(n); + } else { + return static_cast<size_t>(r); + } + } +} + +size_t BinaryDecoder::mapStart() +{ + return doDecodeItemCount(); +} + +size_t BinaryDecoder::mapNext() +{ + return doDecodeItemCount(); +} + +size_t BinaryDecoder::skipMap() +{ + return skipArray(); +} + +size_t BinaryDecoder::decodeUnionIndex() +{ + return static_cast<size_t>(doDecodeLong()); +} + +int64_t BinaryDecoder::doDecodeLong() { + uint64_t encoded = 0; + int shift = 0; + uint8_t u; + do { + if (shift >= 64) { + throw Exception("Invalid Avro varint"); + } + u = in_.read(); + encoded |= static_cast<uint64_t>(u & 0x7f) << shift; + shift += 7; + } while (u & 0x80); + + return decodeZigzag64(encoded); +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/BinaryEncoder.cc b/contrib/libs/apache/avro/impl/BinaryEncoder.cc new file mode 100644 index 0000000000..5ceb872f8c --- /dev/null +++ b/contrib/libs/apache/avro/impl/BinaryEncoder.cc @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Encoder.hh" +#include "Zigzag.hh" +#include <array> + +namespace avro { + +using std::make_shared; + +class BinaryEncoder : public Encoder { + StreamWriter out_; + + void init(OutputStream& os); + void flush(); + int64_t byteCount() const; + void encodeNull(); + void encodeBool(bool b); + void encodeInt(int32_t i); + void encodeLong(int64_t l); + void encodeFloat(float f); + void encodeDouble(double d); + void encodeString(const std::string& s); + void encodeBytes(const uint8_t *bytes, size_t len); + void encodeFixed(const uint8_t *bytes, size_t len); + void encodeEnum(size_t e); + void arrayStart(); + void arrayEnd(); + void mapStart(); + void mapEnd(); + void setItemCount(size_t count); + void startItem(); + void encodeUnionIndex(size_t e); + + void doEncodeLong(int64_t l); +}; + +EncoderPtr binaryEncoder() +{ + return make_shared<BinaryEncoder>(); +} + +void BinaryEncoder::init(OutputStream& os) +{ + out_.reset(os); +} + +void BinaryEncoder::flush() +{ + out_.flush(); +} + +void BinaryEncoder::encodeNull() +{ +} + +void BinaryEncoder::encodeBool(bool b) +{ + out_.write(b ? 1 : 0); +} + +void BinaryEncoder::encodeInt(int32_t i) +{ + doEncodeLong(i); +} + +void BinaryEncoder::encodeLong(int64_t l) +{ + doEncodeLong(l); +} + +void BinaryEncoder::encodeFloat(float f) +{ + const uint8_t* p = reinterpret_cast<const uint8_t*>(&f); + out_.writeBytes(p, sizeof(float)); +} + +void BinaryEncoder::encodeDouble(double d) +{ + const uint8_t* p = reinterpret_cast<const uint8_t*>(&d); + out_.writeBytes(p, sizeof(double)); +} + +void BinaryEncoder::encodeString(const std::string& s) +{ + doEncodeLong(s.size()); + out_.writeBytes(reinterpret_cast<const uint8_t*>(s.c_str()), s.size()); +} + +void BinaryEncoder::encodeBytes(const uint8_t *bytes, size_t len) +{ + doEncodeLong(len); + out_.writeBytes(bytes, len); +} + +void BinaryEncoder::encodeFixed(const uint8_t *bytes, size_t len) +{ + out_.writeBytes(bytes, len); +} + +void BinaryEncoder::encodeEnum(size_t e) +{ + doEncodeLong(e); +} + +void BinaryEncoder::arrayStart() +{ +} + +void BinaryEncoder::arrayEnd() +{ + doEncodeLong(0); +} + +void BinaryEncoder::mapStart() +{ +} + +void BinaryEncoder::mapEnd() +{ + doEncodeLong(0); +} + +void BinaryEncoder::setItemCount(size_t count) +{ + if (count == 0) { + throw Exception("Count cannot be zero"); + } + doEncodeLong(count); +} + +void BinaryEncoder::startItem() +{ +} + +void BinaryEncoder::encodeUnionIndex(size_t e) +{ + doEncodeLong(e); +} + +int64_t BinaryEncoder::byteCount() const { + return out_.byteCount(); +} + + +void BinaryEncoder::doEncodeLong(int64_t l) +{ + std::array<uint8_t, 10> bytes; + size_t size = encodeInt64(l, bytes); + out_.writeBytes(bytes.data(), size); +} +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Compiler.cc b/contrib/libs/apache/avro/impl/Compiler.cc new file mode 100644 index 0000000000..6453db8f17 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Compiler.cc @@ -0,0 +1,591 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <boost/algorithm/string/replace.hpp> +#include <sstream> + +#include "Compiler.hh" +#include "Types.hh" +#include "Schema.hh" +#include "ValidSchema.hh" +#include "Stream.hh" + +#include "json/JsonDom.hh" + +using std::string; +using std::map; +using std::vector; +using std::pair; +using std::make_pair; + +namespace avro { +using json::Entity; +using json::Object; +using json::Array; +using json::EntityType; + +typedef map<Name, NodePtr> SymbolTable; + + +// #define DEBUG_VERBOSE + +static NodePtr makePrimitive(const string& t) +{ + if (t == "null") { + return NodePtr(new NodePrimitive(AVRO_NULL)); + } else if (t == "boolean") { + return NodePtr(new NodePrimitive(AVRO_BOOL)); + } else if (t == "int") { + return NodePtr(new NodePrimitive(AVRO_INT)); + } else if (t == "long") { + return NodePtr(new NodePrimitive(AVRO_LONG)); + } else if (t == "float") { + return NodePtr(new NodePrimitive(AVRO_FLOAT)); + } else if (t == "double") { + return NodePtr(new NodePrimitive(AVRO_DOUBLE)); + } else if (t == "string") { + return NodePtr(new NodePrimitive(AVRO_STRING)); + } else if (t == "bytes") { + return NodePtr(new NodePrimitive(AVRO_BYTES)); + } else { + return NodePtr(); + } +} + +static NodePtr makeNode(const json::Entity& e, SymbolTable& st, const string &ns); + +template <typename T> +concepts::SingleAttribute<T> asSingleAttribute(const T& t) +{ + concepts::SingleAttribute<T> n; + n.add(t); + return n; +} + +static bool isFullName(const string &s) +{ + return s.find('.') != string::npos; +} + +static Name getName(const string &name, const string &ns) +{ + return (isFullName(name)) ? Name(name) : Name(name, ns); +} + +static NodePtr makeNode(const string &t, SymbolTable &st, const string &ns) +{ + NodePtr result = makePrimitive(t); + if (result) { + return result; + } + Name n = getName(t, ns); + + SymbolTable::const_iterator it = st.find(n); + if (it != st.end()) { + return NodePtr(new NodeSymbolic(asSingleAttribute(n), it->second)); + } + throw Exception(boost::format("Unknown type: %1%") % n.fullname()); +} + +/** Returns "true" if the field is in the container */ +// e.g.: can be false for non-mandatory fields +bool containsField(const Object& m, const string& fieldName) { + Object::const_iterator it = m.find(fieldName); + return (it != m.end()); +} + +const json::Object::const_iterator findField(const Entity& e, + const Object& m, const string& fieldName) +{ + Object::const_iterator it = m.find(fieldName); + if (it == m.end()) { + throw Exception(boost::format("Missing Json field \"%1%\": %2%") % + fieldName % e.toString()); + } else { + return it; + } +} + +template <typename T> void ensureType(const Entity &e, const string &name) +{ + if (e.type() != json::type_traits<T>::type()) { + throw Exception(boost::format("Json field \"%1%\" is not a %2%: %3%") % + name % json::type_traits<T>::name() % e.toString()); + } +} + +string getStringField(const Entity &e, const Object &m, + const string &fieldName) +{ + Object::const_iterator it = findField(e, m, fieldName); + ensureType<string>(it->second, fieldName); + return it->second.stringValue(); +} + +const Array& getArrayField(const Entity& e, const Object& m, + const string& fieldName) +{ + Object::const_iterator it = findField(e, m, fieldName); + ensureType<Array >(it->second, fieldName); + return it->second.arrayValue(); +} + +const int64_t getLongField(const Entity& e, const Object& m, + const string& fieldName) +{ + Object::const_iterator it = findField(e, m, fieldName); + ensureType<int64_t>(it->second, fieldName); + return it->second.longValue(); +} + +// Unescape double quotes (") for de-serialization. This method complements the +// method NodeImpl::escape() which is used for serialization. +static void unescape(string& s) { + boost::replace_all(s, "\\\"", "\""); +} + +const string getDocField(const Entity& e, const Object& m) +{ + string doc = getStringField(e, m, "doc"); + unescape(doc); + return doc; +} + +struct Field { + const string name; + const NodePtr schema; + const GenericDatum defaultValue; + Field(const string& n, const NodePtr& v, GenericDatum dv) : + name(n), schema(v), defaultValue(dv) { } +}; + +static void assertType(const Entity& e, EntityType et) +{ + if (e.type() != et) { + throw Exception(boost::format("Unexpected type for default value: " + "Expected %1%, but found %2% in line %3%") % + json::typeToString(et) % json::typeToString(e.type()) % + e.line()); + } +} + +static vector<uint8_t> toBin(const string& s) +{ + vector<uint8_t> result(s.size()); + if (s.size() > 0) { + std::copy(s.c_str(), s.c_str() + s.size(), result.data()); + } + return result; +} + +static GenericDatum makeGenericDatum(NodePtr n, + const Entity& e, const SymbolTable& st) +{ + Type t = n->type(); + EntityType dt = e.type(); + + if (t == AVRO_SYMBOLIC) { + n = st.find(n->name())->second; + t = n->type(); + } + switch (t) { + case AVRO_STRING: + assertType(e, json::etString); + return GenericDatum(e.stringValue()); + case AVRO_BYTES: + assertType(e, json::etString); + return GenericDatum(toBin(e.bytesValue())); + case AVRO_INT: + assertType(e, json::etLong); + return GenericDatum(static_cast<int32_t>(e.longValue())); + case AVRO_LONG: + assertType(e, json::etLong); + return GenericDatum(e.longValue()); + case AVRO_FLOAT: + if (dt == json::etLong) { + return GenericDatum(static_cast<float>(e.longValue())); + } + assertType(e, json::etDouble); + return GenericDatum(static_cast<float>(e.doubleValue())); + case AVRO_DOUBLE: + if (dt == json::etLong) { + return GenericDatum(static_cast<double>(e.longValue())); + } + assertType(e, json::etDouble); + return GenericDatum(e.doubleValue()); + case AVRO_BOOL: + assertType(e, json::etBool); + return GenericDatum(e.boolValue()); + case AVRO_NULL: + assertType(e, json::etNull); + return GenericDatum(); + case AVRO_RECORD: + { + assertType(e, json::etObject); + GenericRecord result(n); + const map<string, Entity>& v = e.objectValue(); + for (size_t i = 0; i < n->leaves(); ++i) { + map<string, Entity>::const_iterator it = v.find(n->nameAt(i)); + if (it == v.end()) { + throw Exception(boost::format( + "No value found in default for %1%") % n->nameAt(i)); + } + result.setFieldAt(i, + makeGenericDatum(n->leafAt(i), it->second, st)); + } + return GenericDatum(n, result); + } + case AVRO_ENUM: + assertType(e, json::etString); + return GenericDatum(n, GenericEnum(n, e.stringValue())); + case AVRO_ARRAY: + { + assertType(e, json::etArray); + GenericArray result(n); + const vector<Entity>& elements = e.arrayValue(); + for (vector<Entity>::const_iterator it = elements.begin(); + it != elements.end(); ++it) { + result.value().push_back(makeGenericDatum(n->leafAt(0), *it, st)); + } + return GenericDatum(n, result); + } + case AVRO_MAP: + { + assertType(e, json::etObject); + GenericMap result(n); + const map<string, Entity>& v = e.objectValue(); + for (map<string, Entity>::const_iterator it = v.begin(); + it != v.end(); ++it) { + result.value().push_back(make_pair(it->first, + makeGenericDatum(n->leafAt(1), it->second, st))); + } + return GenericDatum(n, result); + } + case AVRO_UNION: + { + GenericUnion result(n); + result.selectBranch(0); + result.datum() = makeGenericDatum(n->leafAt(0), e, st); + return GenericDatum(n, result); + } + case AVRO_FIXED: + assertType(e, json::etString); + return GenericDatum(n, GenericFixed(n, toBin(e.bytesValue()))); + default: + throw Exception(boost::format("Unknown type: %1%") % t); + } + return GenericDatum(); +} + + +static Field makeField(const Entity& e, SymbolTable& st, const string& ns) +{ + const Object& m = e.objectValue(); + const string& n = getStringField(e, m, "name"); + Object::const_iterator it = findField(e, m, "type"); + map<string, Entity>::const_iterator it2 = m.find("default"); + NodePtr node = makeNode(it->second, st, ns); + if (containsField(m, "doc")) { + node->setDoc(getDocField(e, m)); + } + GenericDatum d = (it2 == m.end()) ? GenericDatum() : + makeGenericDatum(node, it2->second, st); + return Field(n, node, d); +} + +// Extended makeRecordNode (with doc). +static NodePtr makeRecordNode(const Entity& e, const Name& name, + const string* doc, const Object& m, + SymbolTable& st, const string& ns) { + const Array& v = getArrayField(e, m, "fields"); + concepts::MultiAttribute<string> fieldNames; + concepts::MultiAttribute<NodePtr> fieldValues; + vector<GenericDatum> defaultValues; + + for (Array::const_iterator it = v.begin(); it != v.end(); ++it) { + Field f = makeField(*it, st, ns); + fieldNames.add(f.name); + fieldValues.add(f.schema); + defaultValues.push_back(f.defaultValue); + } + NodeRecord* node; + if (doc == NULL) { + node = new NodeRecord(asSingleAttribute(name), fieldValues, fieldNames, + defaultValues); + } else { + node = new NodeRecord(asSingleAttribute(name), asSingleAttribute(*doc), + fieldValues, fieldNames, defaultValues); + } + return NodePtr(node); +} + +static LogicalType makeLogicalType(const Entity& e, const Object& m) { + if (!containsField(m, "logicalType")) { + return LogicalType(LogicalType::NONE); + } + + const std::string& typeField = getStringField(e, m, "logicalType"); + + if (typeField == "decimal") { + LogicalType decimalType(LogicalType::DECIMAL); + try { + decimalType.setPrecision(getLongField(e, m, "precision")); + if (containsField(m, "scale")) { + decimalType.setScale(getLongField(e, m, "scale")); + } + } catch (Exception& ex) { + // If any part of the logical type is malformed, per the standard we + // must ignore the whole attribute. + return LogicalType(LogicalType::NONE); + } + return decimalType; + } + + LogicalType::Type t = LogicalType::NONE; + if (typeField == "date") + t = LogicalType::DATE; + else if (typeField == "time-millis") + t = LogicalType::TIME_MILLIS; + else if (typeField == "time-micros") + t = LogicalType::TIME_MICROS; + else if (typeField == "timestamp-millis") + t = LogicalType::TIMESTAMP_MILLIS; + else if (typeField == "timestamp-micros") + t = LogicalType::TIMESTAMP_MICROS; + else if (typeField == "duration") + t = LogicalType::DURATION; + else if (typeField == "uuid") + t = LogicalType::UUID; + return LogicalType(t); +} + +static NodePtr makeEnumNode(const Entity& e, + const Name& name, const Object& m) +{ + const Array& v = getArrayField(e, m, "symbols"); + concepts::MultiAttribute<string> symbols; + for (Array::const_iterator it = v.begin(); it != v.end(); ++it) { + if (it->type() != json::etString) { + throw Exception(boost::format("Enum symbol not a string: %1%") % + it->toString()); + } + symbols.add(it->stringValue()); + } + NodePtr node = NodePtr(new NodeEnum(asSingleAttribute(name), symbols)); + if (containsField(m, "doc")) { + node->setDoc(getDocField(e, m)); + } + return node; +} + +static NodePtr makeFixedNode(const Entity& e, + const Name& name, const Object& m) +{ + int v = static_cast<int>(getLongField(e, m, "size")); + if (v <= 0) { + throw Exception(boost::format("Size for fixed is not positive: %1%") % + e.toString()); + } + NodePtr node = + NodePtr(new NodeFixed(asSingleAttribute(name), asSingleAttribute(v))); + if (containsField(m, "doc")) { + node->setDoc(getDocField(e, m)); + } + return node; +} + +static NodePtr makeArrayNode(const Entity& e, const Object& m, + SymbolTable& st, const string& ns) +{ + Object::const_iterator it = findField(e, m, "items"); + NodePtr node = NodePtr(new NodeArray( + asSingleAttribute(makeNode(it->second, st, ns)))); + if (containsField(m, "doc")) { + node->setDoc(getDocField(e, m)); + } + return node; +} + +static NodePtr makeMapNode(const Entity& e, const Object& m, + SymbolTable& st, const string& ns) +{ + Object::const_iterator it = findField(e, m, "values"); + + NodePtr node = NodePtr(new NodeMap( + asSingleAttribute(makeNode(it->second, st, ns)))); + if (containsField(m, "doc")) { + node->setDoc(getDocField(e, m)); + } + return node; +} + +static Name getName(const Entity& e, const Object& m, const string& ns) +{ + const string& name = getStringField(e, m, "name"); + + if (isFullName(name)) { + return Name(name); + } else { + Object::const_iterator it = m.find("namespace"); + if (it != m.end()) { + if (it->second.type() != json::type_traits<string>::type()) { + throw Exception(boost::format( + "Json field \"%1%\" is not a %2%: %3%") % + "namespace" % json::type_traits<string>::name() % + it->second.toString()); + } + Name result = Name(name, it->second.stringValue()); + return result; + } + return Name(name, ns); + } +} + +static NodePtr makeNode(const Entity& e, const Object& m, + SymbolTable& st, const string& ns) +{ + const string& type = getStringField(e, m, "type"); + NodePtr result; + if (type == "record" || type == "error" || + type == "enum" || type == "fixed") { + Name nm = getName(e, m, ns); + if (type == "record" || type == "error") { + result = NodePtr(new NodeRecord()); + st[nm] = result; + // Get field doc + if (containsField(m, "doc")) { + string doc = getDocField(e, m); + + NodePtr r = makeRecordNode(e, nm, &doc, m, st, nm.ns()); + (std::dynamic_pointer_cast<NodeRecord>(r))->swap( + *std::dynamic_pointer_cast<NodeRecord>(result)); + } else { // No doc + NodePtr r = + makeRecordNode(e, nm, NULL, m, st, nm.ns()); + (std::dynamic_pointer_cast<NodeRecord>(r)) + ->swap(*std::dynamic_pointer_cast<NodeRecord>(result)); + } + } else { + result = (type == "enum") ? makeEnumNode(e, nm, m) : + makeFixedNode(e, nm, m); + st[nm] = result; + } + } else if (type == "array") { + result = makeArrayNode(e, m, st, ns); + } else if (type == "map") { + result = makeMapNode(e, m, st, ns); + } else { + result = makePrimitive(type); + } + + if (result) { + try { + result->setLogicalType(makeLogicalType(e, m)); + } catch (Exception& ex) { + // Per the standard we must ignore the logical type attribute if it + // is malformed. + } + return result; + } + + throw Exception(boost::format("Unknown type definition: %1%") + % e.toString()); +} + +static NodePtr makeNode(const Entity& e, const Array& m, + SymbolTable& st, const string& ns) +{ + concepts::MultiAttribute<NodePtr> mm; + for (Array::const_iterator it = m.begin(); it != m.end(); ++it) { + mm.add(makeNode(*it, st, ns)); + } + return NodePtr(new NodeUnion(mm)); +} + +static NodePtr makeNode(const json::Entity& e, SymbolTable& st, const string& ns) +{ + switch (e.type()) { + case json::etString: + return makeNode(e.stringValue(), st, ns); + case json::etObject: + return makeNode(e, e.objectValue(), st, ns); + case json::etArray: + return makeNode(e, e.arrayValue(), st, ns); + default: + throw Exception(boost::format("Invalid Avro type: %1%") % e.toString()); + } +} + +ValidSchema compileJsonSchemaFromStream(InputStream& is) +{ + json::Entity e = json::loadEntity(is); + SymbolTable st; + NodePtr n = makeNode(e, st, ""); + return ValidSchema(n); +} + +AVRO_DECL ValidSchema compileJsonSchemaFromFile(const char* filename) +{ + std::unique_ptr<InputStream> s = fileInputStream(filename); + return compileJsonSchemaFromStream(*s); +} + +AVRO_DECL ValidSchema compileJsonSchemaFromMemory(const uint8_t* input, size_t len) +{ + return compileJsonSchemaFromStream(*memoryInputStream(input, len)); +} + +AVRO_DECL ValidSchema compileJsonSchemaFromString(const char* input) +{ + return compileJsonSchemaFromMemory(reinterpret_cast<const uint8_t*>(input), + ::strlen(input)); +} + +AVRO_DECL ValidSchema compileJsonSchemaFromString(const string& input) +{ + return compileJsonSchemaFromMemory( + reinterpret_cast<const uint8_t*>(input.data()), input.size()); +} + +static ValidSchema compile(std::istream& is) +{ + std::unique_ptr<InputStream> in = istreamInputStream(is); + return compileJsonSchemaFromStream(*in); +} + +void compileJsonSchema(std::istream &is, ValidSchema &schema) +{ + if (!is.good()) { + throw Exception("Input stream is not good"); + } + + schema = compile(is); +} + +AVRO_DECL bool compileJsonSchema(std::istream &is, ValidSchema &schema, string &error) +{ + try { + compileJsonSchema(is, schema); + return true; + } catch (const Exception &e) { + error = e.what(); + return false; + } + +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/DataFile.cc b/contrib/libs/apache/avro/impl/DataFile.cc new file mode 100644 index 0000000000..e20e605827 --- /dev/null +++ b/contrib/libs/apache/avro/impl/DataFile.cc @@ -0,0 +1,600 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DataFile.hh" +#include "Compiler.hh" +#include "Exception.hh" + +#include <sstream> + +#include <boost/random/mersenne_twister.hpp> +#include <boost/iostreams/device/file.hpp> +#include <boost/iostreams/filter/gzip.hpp> +#include <boost/iostreams/filter/zlib.hpp> +#include <boost/crc.hpp> // for boost::crc_32_type + +#ifdef SNAPPY_CODEC_AVAILABLE +#include <snappy.h> +#endif + +namespace avro { +using std::unique_ptr; +using std::ostringstream; +using std::istringstream; +using std::vector; +using std::copy; +using std::string; + +using std::array; + +namespace { +const string AVRO_SCHEMA_KEY("avro.schema"); +const string AVRO_CODEC_KEY("avro.codec"); +const string AVRO_NULL_CODEC("null"); +const string AVRO_DEFLATE_CODEC("deflate"); + +#ifdef SNAPPY_CODEC_AVAILABLE +const string AVRO_SNAPPY_CODEC = "snappy"; +#endif + +const size_t minSyncInterval = 32; +const size_t maxSyncInterval = 1u << 30; + +boost::iostreams::zlib_params get_zlib_params() { + boost::iostreams::zlib_params ret; + ret.method = boost::iostreams::zlib::deflated; + ret.noheader = true; + return ret; +} +} + +DataFileWriterBase::DataFileWriterBase(const char* filename, const ValidSchema& schema, size_t syncInterval, + Codec codec) : + filename_(filename), + schema_(schema), + encoderPtr_(binaryEncoder()), + syncInterval_(syncInterval), + codec_(codec), + stream_(fileOutputStream(filename)), + buffer_(memoryOutputStream()), + sync_(makeSync()), + objectCount_(0), + lastSync_(0) +{ + init(schema, syncInterval, codec); +} + +DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream, + const ValidSchema& schema, size_t syncInterval, Codec codec) : + filename_(), + schema_(schema), + encoderPtr_(binaryEncoder()), + syncInterval_(syncInterval), + codec_(codec), + stream_(std::move(outputStream)), + buffer_(memoryOutputStream()), + sync_(makeSync()), + objectCount_(0), + lastSync_(0) +{ + init(schema, syncInterval, codec); +} + +void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) { + if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) { + throw Exception(boost::format("Invalid sync interval: %1%. " + "Should be between %2% and %3%") % syncInterval % + minSyncInterval % maxSyncInterval); + } + setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); + + if (codec_ == NULL_CODEC) { + setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); + } else if (codec_ == DEFLATE_CODEC) { + setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC); +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC); +#endif + } else { + throw Exception(boost::format("Unknown codec: %1%") % codec); + } + setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false)); + + writeHeader(); + encoderPtr_->init(*buffer_); + + lastSync_ = stream_->byteCount(); +} + + +DataFileWriterBase::~DataFileWriterBase() +{ + if (stream_.get()) { + close(); + } +} + +void DataFileWriterBase::close() +{ + flush(); + stream_.reset(); +} + +void DataFileWriterBase::sync() +{ + encoderPtr_->flush(); + + encoderPtr_->init(*stream_); + avro::encode(*encoderPtr_, objectCount_); + if (codec_ == NULL_CODEC) { + int64_t byteCount = buffer_->byteCount(); + avro::encode(*encoderPtr_, byteCount); + encoderPtr_->flush(); + std::unique_ptr<InputStream> in = memoryInputStream(*buffer_); + copy(*in, *stream_); + } else if (codec_ == DEFLATE_CODEC) { + std::vector<char> buf; + { + boost::iostreams::filtering_ostream os; + os.push(boost::iostreams::zlib_compressor(get_zlib_params())); + os.push(boost::iostreams::back_inserter(buf)); + const uint8_t* data; + size_t len; + + std::unique_ptr<InputStream> input = memoryInputStream(*buffer_); + while (input->next(&data, &len)) { + boost::iostreams::write(os, reinterpret_cast<const char*>(data), len); + } + } // make sure all is flushed + std::unique_ptr<InputStream> in = memoryInputStream( + reinterpret_cast<const uint8_t*>(buf.data()), buf.size()); + int64_t byteCount = buf.size(); + avro::encode(*encoderPtr_, byteCount); + encoderPtr_->flush(); + copy(*in, *stream_); +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + std::vector<char> temp; + std::string compressed; + boost::crc_32_type crc; + { + boost::iostreams::filtering_ostream os; + os.push(boost::iostreams::back_inserter(temp)); + const uint8_t* data; + size_t len; + + std::unique_ptr<InputStream> input = memoryInputStream(*buffer_); + while (input->next(&data, &len)) { + boost::iostreams::write(os, reinterpret_cast<const char*>(data), + len); + } + } // make sure all is flushed + + crc.process_bytes(reinterpret_cast<const char*>(temp.data()), + temp.size()); + // For Snappy, add the CRC32 checksum + int32_t checksum = crc(); + + // Now compress + size_t compressed_size = snappy::Compress( + reinterpret_cast<const char*>(temp.data()), temp.size(), + &compressed); + temp.clear(); + { + boost::iostreams::filtering_ostream os; + os.push(boost::iostreams::back_inserter(temp)); + boost::iostreams::write(os, compressed.c_str(), compressed_size); + } + temp.push_back((checksum >> 24) & 0xFF); + temp.push_back((checksum >> 16) & 0xFF); + temp.push_back((checksum >> 8) & 0xFF); + temp.push_back(checksum & 0xFF); + std::unique_ptr<InputStream> in = memoryInputStream( + reinterpret_cast<const uint8_t*>(temp.data()), temp.size()); + int64_t byteCount = temp.size(); + avro::encode(*encoderPtr_, byteCount); + encoderPtr_->flush(); + copy(*in, *stream_); +#endif + } + + encoderPtr_->init(*stream_); + avro::encode(*encoderPtr_, sync_); + encoderPtr_->flush(); + + lastSync_ = stream_->byteCount(); + + buffer_ = memoryOutputStream(); + encoderPtr_->init(*buffer_); + objectCount_ = 0; +} + +void DataFileWriterBase::syncIfNeeded() +{ + encoderPtr_->flush(); + if (buffer_->byteCount() >= syncInterval_) { + sync(); + } +} + +uint64_t DataFileWriterBase::getCurrentBlockStart() +{ + return lastSync_; +} + +void DataFileWriterBase::flush() +{ + sync(); +} + +boost::mt19937 random(static_cast<uint32_t>(time(0))); + +DataFileSync DataFileWriterBase::makeSync() +{ + DataFileSync sync; + for (size_t i = 0; i < sync.size(); ++i) { + sync[i] = random(); + } + return sync; +} + +typedef array<uint8_t, 4> Magic; +static Magic magic = { { 'O', 'b', 'j', '\x01' } }; + +void DataFileWriterBase::writeHeader() +{ + encoderPtr_->init(*stream_); + avro::encode(*encoderPtr_, magic); + avro::encode(*encoderPtr_, metadata_); + avro::encode(*encoderPtr_, sync_); + encoderPtr_->flush(); +} + +void DataFileWriterBase::setMetadata(const string& key, const string& value) +{ + vector<uint8_t> v(value.size()); + copy(value.begin(), value.end(), v.begin()); + metadata_[key] = v; +} + +DataFileReaderBase::DataFileReaderBase(const char* filename) : + filename_(filename), stream_(fileSeekableInputStream(filename)), + decoder_(binaryDecoder()), objectCount_(0), eof_(false), blockStart_(-1), + blockEnd_(-1) +{ + readHeader(); +} + +DataFileReaderBase::DataFileReaderBase(std::unique_ptr<InputStream> inputStream) : + filename_(""), stream_(std::move(inputStream)), + decoder_(binaryDecoder()), objectCount_(0), eof_(false) +{ + readHeader(); +} + +void DataFileReaderBase::init() +{ + readerSchema_ = dataSchema_; + dataDecoder_ = binaryDecoder(); + readDataBlock(); +} + +void DataFileReaderBase::init(const ValidSchema& readerSchema) +{ + readerSchema_ = readerSchema; + dataDecoder_ = (readerSchema_.toJson(true) != dataSchema_.toJson(true)) ? + resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) : + binaryDecoder(); + readDataBlock(); +} + +static void drain(InputStream& in) +{ + const uint8_t *p = 0; + size_t n = 0; + while (in.next(&p, &n)); +} + +char hex(unsigned int x) +{ + return x + (x < 10 ? '0' : ('a' - 10)); +} + +std::ostream& operator << (std::ostream& os, const DataFileSync& s) +{ + for (size_t i = 0; i < s.size(); ++i) { + os << hex(s[i] / 16) << hex(s[i] % 16) << ' '; + } + os << std::endl; + return os; +} + + +bool DataFileReaderBase::hasMore() +{ + for (; ;) { + if (eof_) { + return false; + } else if (objectCount_ != 0) { + return true; + } + + dataDecoder_->init(*dataStream_); + drain(*dataStream_); + DataFileSync s; + decoder_->init(*stream_); + avro::decode(*decoder_, s); + if (s != sync_) { + throw Exception("Sync mismatch"); + } + readDataBlock(); + } +} + +class BoundedInputStream : public InputStream { + InputStream& in_; + size_t limit_; + + bool next(const uint8_t** data, size_t* len) { + if (limit_ != 0 && in_.next(data, len)) { + if (*len > limit_) { + in_.backup(*len - limit_); + *len = limit_; + } + limit_ -= *len; + return true; + } + return false; + } + + void backup(size_t len) { + in_.backup(len); + limit_ += len; + } + + void skip(size_t len) { + if (len > limit_) { + len = limit_; + } + in_.skip(len); + limit_ -= len; + } + + size_t byteCount() const { + return in_.byteCount(); + } + +public: + BoundedInputStream(InputStream& in, size_t limit) : + in_(in), limit_(limit) { } +}; + +unique_ptr<InputStream> boundedInputStream(InputStream& in, size_t limit) +{ + return unique_ptr<InputStream>(new BoundedInputStream(in, limit)); +} + +void DataFileReaderBase::readDataBlock() +{ + decoder_->init(*stream_); + blockStart_ = stream_->byteCount(); + const uint8_t* p = 0; + size_t n = 0; + if (! stream_->next(&p, &n)) { + eof_ = true; + return; + } + stream_->backup(n); + avro::decode(*decoder_, objectCount_); + int64_t byteCount; + avro::decode(*decoder_, byteCount); + decoder_->init(*stream_); + blockEnd_ = stream_->byteCount() + byteCount; + + unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount)); + if (codec_ == NULL_CODEC) { + dataDecoder_->init(*st); + dataStream_ = std::move(st); +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + boost::crc_32_type crc; + uint32_t checksum = 0; + compressed_.clear(); + uncompressed.clear(); + const uint8_t* data; + size_t len; + while (st->next(&data, &len)) { + compressed_.insert(compressed_.end(), data, data + len); + } + len = compressed_.size(); + int b1 = compressed_[len - 4] & 0xFF; + int b2 = compressed_[len - 3] & 0xFF; + int b3 = compressed_[len - 2] & 0xFF; + int b4 = compressed_[len - 1] & 0xFF; + + checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4); + if (!snappy::Uncompress(reinterpret_cast<const char*>(compressed_.data()), + len - 4, &uncompressed)) { + throw Exception( + "Snappy Compression reported an error when decompressing"); + } + crc.process_bytes(uncompressed.c_str(), uncompressed.size()); + uint32_t c = crc(); + if (checksum != c) { + throw Exception(boost::format("Checksum did not match for Snappy compression: Expected: %1%, computed: %2%") % checksum % c); + } + os_.reset(new boost::iostreams::filtering_istream()); + os_->push( + boost::iostreams::basic_array_source<char>(uncompressed.c_str(), + uncompressed.size())); + std::unique_ptr<InputStream> in = istreamInputStream(*os_); + + dataDecoder_->init(*in); + dataStream_ = std::move(in); +#endif + } else { + compressed_.clear(); + const uint8_t* data; + size_t len; + while (st->next(&data, &len)) { + compressed_.insert(compressed_.end(), data, data + len); + } + // boost::iostreams::write(os, reinterpret_cast<const char*>(data), len); + os_.reset(new boost::iostreams::filtering_istream()); + os_->push(boost::iostreams::zlib_decompressor(get_zlib_params())); + os_->push(boost::iostreams::basic_array_source<char>( + compressed_.data(), compressed_.size())); + + std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_); + dataDecoder_->init(*in); + dataStream_ = std::move(in); + } +} + +void DataFileReaderBase::close() +{ +} + +static string toString(const vector<uint8_t>& v) +{ + string result; + result.resize(v.size()); + copy(v.begin(), v.end(), result.begin()); + return result; +} + +static ValidSchema makeSchema(const vector<uint8_t>& v) +{ + istringstream iss(toString(v)); + ValidSchema vs; + compileJsonSchema(iss, vs); + return ValidSchema(vs); +} + +void DataFileReaderBase::readHeader() +{ + decoder_->init(*stream_); + Magic m; + avro::decode(*decoder_, m); + if (magic != m) { + throw Exception("Invalid data file. Magic does not match: " + + filename_); + } + avro::decode(*decoder_, metadata_); + Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY); + if (it == metadata_.end()) { + throw Exception("No schema in metadata"); + } + + dataSchema_ = makeSchema(it->second); + if (! readerSchema_.root()) { + readerSchema_ = dataSchema(); + } + + it = metadata_.find(AVRO_CODEC_KEY); + if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) { + codec_ = DEFLATE_CODEC; +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (it != metadata_.end() + && toString(it->second) == AVRO_SNAPPY_CODEC) { + codec_ = SNAPPY_CODEC; +#endif + } else { + codec_ = NULL_CODEC; + if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) { + throw Exception("Unknown codec in data file: " + toString(it->second)); + } + } + + avro::decode(*decoder_, sync_); + decoder_->init(*stream_); + blockStart_ = stream_->byteCount(); +} + +void DataFileReaderBase::doSeek(int64_t position) +{ + if (SeekableInputStream *ss = dynamic_cast<SeekableInputStream *>(stream_.get())) { + if (!eof_) { + dataDecoder_->init(*dataStream_); + drain(*dataStream_); + } + decoder_->init(*stream_); + ss->seek(position); + eof_ = false; + } else { + throw Exception("seek not supported on non-SeekableInputStream"); + } +} + +void DataFileReaderBase::seek(int64_t position) +{ + doSeek(position); + readDataBlock(); +} + +void DataFileReaderBase::sync(int64_t position) +{ + doSeek(position); + DataFileSync sync_buffer; + const uint8_t *p = 0; + size_t n = 0; + size_t i = 0; + while (i < SyncSize) { + if (n == 0 && !stream_->next(&p, &n)) { + eof_ = true; + return; + } + int len = + std::min(static_cast<size_t>(SyncSize - i), n); + memcpy(&sync_buffer[i], p, len); + p += len; + n -= len; + i += len; + } + for (;;) { + size_t j = 0; + for (; j < SyncSize; ++j) { + if (sync_[j] != sync_buffer[(i + j) % SyncSize]) { + break; + } + } + if (j == SyncSize) { + // Found the sync marker! + break; + } + if (n == 0 && !stream_->next(&p, &n)) { + eof_ = true; + return; + } + sync_buffer[i++ % SyncSize] = *p++; + --n; + } + stream_->backup(n); + readDataBlock(); +} + +bool DataFileReaderBase::pastSync(int64_t position) { + return !hasMore() || blockStart_ >= position + SyncSize; +} + +int64_t DataFileReaderBase::previousSync() { + return blockStart_; +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/FileStream.cc b/contrib/libs/apache/avro/impl/FileStream.cc new file mode 100644 index 0000000000..ed601b4c6f --- /dev/null +++ b/contrib/libs/apache/avro/impl/FileStream.cc @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fstream> +#include "Stream.hh" +#ifndef _WIN32 +#include "unistd.h" +#include "fcntl.h" +#include "errno.h" + +#ifndef O_BINARY +#define O_BINARY 0 +#endif +#else +#include "Windows.h" + +#ifdef min +#undef min +#endif +#endif + +using std::unique_ptr; +using std::istream; +using std::ostream; + +namespace avro { +namespace { +struct BufferCopyIn { + virtual ~BufferCopyIn() { } + virtual void seek(size_t len) = 0; + virtual bool read(uint8_t* b, size_t toRead, size_t& actual) = 0; + +}; + +struct FileBufferCopyIn : public BufferCopyIn { +#ifdef _WIN32 + HANDLE h_; + FileBufferCopyIn(const char* filename) : + h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { + if (h_ == INVALID_HANDLE_VALUE) { + throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); + } + } + + ~FileBufferCopyIn() { + ::CloseHandle(h_); + } + + void seek(size_t len) { + if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) { + throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError()); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + DWORD dw = 0; + if (! ::ReadFile(h_, b, toRead, &dw, NULL)) { + throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); + } + actual = static_cast<size_t>(dw); + return actual != 0; + } +#else + const int fd_; + + FileBufferCopyIn(const char* filename) : + fd_(open(filename, O_RDONLY | O_BINARY)) { + if (fd_ < 0) { + throw Exception(boost::format("Cannot open file: %1%") % + ::strerror(errno)); + } + } + + ~FileBufferCopyIn() { + ::close(fd_); + } + + void seek(size_t len) { + off_t r = ::lseek(fd_, len, SEEK_CUR); + if (r == static_cast<off_t>(-1)) { + throw Exception(boost::format("Cannot skip file: %1%") % + strerror(errno)); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + int n = ::read(fd_, b, toRead); + if (n > 0) { + actual = n; + return true; + } + return false; + } +#endif + +}; + +struct IStreamBufferCopyIn : public BufferCopyIn { + istream& is_; + + IStreamBufferCopyIn(istream& is) : is_(is) { + } + + void seek(size_t len) { + if (! is_.seekg(len, std::ios_base::cur)) { + throw Exception("Cannot skip stream"); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + is_.read(reinterpret_cast<char*>(b), toRead); + if (is_.bad()) { + return false; + } + actual = static_cast<size_t>(is_.gcount()); + return (! is_.eof() || actual != 0); + } + +}; + +struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn { + NonSeekableIStreamBufferCopyIn(istream& is) : IStreamBufferCopyIn(is) { } + + void seek(size_t len) { + const size_t bufSize = 4096; + uint8_t buf[bufSize]; + while (len > 0) { + size_t n = std::min(len, bufSize); + is_.read(reinterpret_cast<char*>(buf), n); + if (is_.bad()) { + throw Exception("Cannot skip stream"); + } + size_t actual = static_cast<size_t>(is_.gcount()); + if (is_.eof() && actual == 0) { + throw Exception("Cannot skip stream"); + } + len -= n; + } + } +}; + +} + +class BufferCopyInInputStream : public SeekableInputStream { + const size_t bufferSize_; + uint8_t* const buffer_; + unique_ptr<BufferCopyIn> in_; + size_t byteCount_; + uint8_t* next_; + size_t available_; + + bool next(const uint8_t** data, size_t *size) { + if (available_ == 0 && ! fill()) { + return false; + } + *data = next_; + *size = available_; + next_ += available_; + byteCount_ += available_; + available_ = 0; + return true; + } + + void backup(size_t len) { + next_ -= len; + available_ += len; + byteCount_ -= len; + } + + void skip(size_t len) { + while (len > 0) { + if (available_ == 0) { + in_->seek(len); + byteCount_ += len; + return; + } + size_t n = std::min(available_, len); + available_ -= n; + next_ += n; + len -= n; + byteCount_ += n; + } + } + + size_t byteCount() const { return byteCount_; } + + bool fill() { + size_t n = 0; + if (in_->read(buffer_, bufferSize_, n)) { + next_ = buffer_; + available_ = n; + return true; + } + return false; + } + + void seek(int64_t position) { + // BufferCopyIn::seek is relative to byteCount_, whereas position is + // absolute. + in_->seek(position - byteCount_ - available_); + byteCount_ = position; + available_ = 0; + } + +public: + BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) : + bufferSize_(bufferSize), + buffer_(new uint8_t[bufferSize]), + in_(std::move(in)), + byteCount_(0), + next_(buffer_), + available_(0) { } + + ~BufferCopyInInputStream() { + delete[] buffer_; + } +}; + +namespace { +struct BufferCopyOut { + virtual ~BufferCopyOut() { } + virtual void write(const uint8_t* b, size_t len) = 0; +}; + +struct FileBufferCopyOut : public BufferCopyOut { +#ifdef _WIN32 + HANDLE h_; + FileBufferCopyOut(const char* filename) : + h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { + if (h_ == INVALID_HANDLE_VALUE) { + throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); + } + } + + ~FileBufferCopyOut() { + ::CloseHandle(h_); + } + + void write(const uint8_t* b, size_t len) { + while (len > 0) { + DWORD dw = 0; + if (! ::WriteFile(h_, b, len, &dw, NULL)) { + throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); + } + b += dw; + len -= dw; + } + } +#else + const int fd_; + + FileBufferCopyOut(const char* filename) : + fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) { + + if (fd_ < 0) { + throw Exception(boost::format("Cannot open file: %1%") % + ::strerror(errno)); + } + } + + ~FileBufferCopyOut() { + ::close(fd_); + } + + void write(const uint8_t* b, size_t len) { + if (::write(fd_, b, len) < 0) { + throw Exception(boost::format("Cannot write file: %1%") % + ::strerror(errno)); + } + } +#endif + +}; + +struct OStreamBufferCopyOut : public BufferCopyOut { + ostream& os_; + + OStreamBufferCopyOut(ostream& os) : os_(os) { + } + + void write(const uint8_t* b, size_t len) { + os_.write(reinterpret_cast<const char*>(b), len); + } + +}; + +} + +class BufferCopyOutputStream : public OutputStream { + size_t bufferSize_; + uint8_t* const buffer_; + unique_ptr<BufferCopyOut> out_; + uint8_t* next_; + size_t available_; + size_t byteCount_; + + // Invaiant: byteCount_ == byteswritten + bufferSize_ - available_; + bool next(uint8_t** data, size_t* len) { + if (available_ == 0) { + flush(); + } + *data = next_; + *len = available_; + next_ += available_; + byteCount_ += available_; + available_ = 0; + return true; + } + + void backup(size_t len) { + available_ += len; + next_ -= len; + byteCount_ -= len; + } + + uint64_t byteCount() const { + return byteCount_; + } + + void flush() { + out_->write(buffer_, bufferSize_ - available_); + next_ = buffer_; + available_ = bufferSize_; + } + +public: + BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) : + bufferSize_(bufferSize), + buffer_(new uint8_t[bufferSize]), + out_(std::move(out)), + next_(buffer_), + available_(bufferSize_), byteCount_(0) { } + + ~BufferCopyOutputStream() { + delete[] buffer_; + } +}; + +unique_ptr<InputStream> fileInputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<SeekableInputStream> fileSeekableInputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); + return unique_ptr<SeekableInputStream>( new BufferCopyInInputStream(std::move(in), + bufferSize)); +} + +unique_ptr<InputStream> istreamInputStream(istream& is, size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<InputStream> nonSeekableIstreamInputStream( + istream& is, size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<OutputStream> fileOutputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename)); + return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); +} + +unique_ptr<OutputStream> ostreamOutputStream(ostream& os, + size_t bufferSize) +{ + unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os)); + return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); +} + + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Generic.cc b/contrib/libs/apache/avro/impl/Generic.cc new file mode 100644 index 0000000000..8efb7e9ac4 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Generic.cc @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Generic.hh" +#include <sstream> + +namespace avro { + +using std::string; +using std::vector; +using std::ostringstream; + +typedef vector<uint8_t> bytes; + +void GenericContainer::assertType(const NodePtr& schema, Type type) { + if (schema->type() != type) { + throw Exception(boost::format("Schema type %1 expected %2") % + toString(schema->type()) % toString(type)); + } +} + +GenericReader::GenericReader(const ValidSchema& s, const DecoderPtr& decoder) : + schema_(s), isResolving_(dynamic_cast<ResolvingDecoder*>(&(*decoder)) != 0), + decoder_(decoder) +{ +} + +GenericReader::GenericReader(const ValidSchema& writerSchema, + const ValidSchema& readerSchema, const DecoderPtr& decoder) : + schema_(readerSchema), + isResolving_(true), + decoder_(resolvingDecoder(writerSchema, readerSchema, decoder)) +{ +} + +void GenericReader::read(GenericDatum& datum) const +{ + datum = GenericDatum(schema_.root()); + read(datum, *decoder_, isResolving_); +} + +void GenericReader::read(GenericDatum& datum, Decoder& d, bool isResolving) +{ + if (datum.isUnion()) { + datum.selectBranch(d.decodeUnionIndex()); + } + switch (datum.type()) { + case AVRO_NULL: + d.decodeNull(); + break; + case AVRO_BOOL: + datum.value<bool>() = d.decodeBool(); + break; + case AVRO_INT: + datum.value<int32_t>() = d.decodeInt(); + break; + case AVRO_LONG: + datum.value<int64_t>() = d.decodeLong(); + break; + case AVRO_FLOAT: + datum.value<float>() = d.decodeFloat(); + break; + case AVRO_DOUBLE: + datum.value<double>() = d.decodeDouble(); + break; + case AVRO_STRING: + d.decodeString(datum.value<string>()); + break; + case AVRO_BYTES: + d.decodeBytes(datum.value<bytes>()); + break; + case AVRO_FIXED: + { + GenericFixed& f = datum.value<GenericFixed>(); + d.decodeFixed(f.schema()->fixedSize(), f.value()); + } + break; + case AVRO_RECORD: + { + GenericRecord& r = datum.value<GenericRecord>(); + size_t c = r.schema()->leaves(); + if (isResolving) { + std::vector<size_t> fo = + static_cast<ResolvingDecoder&>(d).fieldOrder(); + for (size_t i = 0; i < c; ++i) { + read(r.fieldAt(fo[i]), d, isResolving); + } + } else { + for (size_t i = 0; i < c; ++i) { + read(r.fieldAt(i), d, isResolving); + } + } + } + break; + case AVRO_ENUM: + datum.value<GenericEnum>().set(d.decodeEnum()); + break; + case AVRO_ARRAY: + { + GenericArray& v = datum.value<GenericArray>(); + vector<GenericDatum>& r = v.value(); + const NodePtr& nn = v.schema()->leafAt(0); + r.resize(0); + size_t start = 0; + for (size_t m = d.arrayStart(); m != 0; m = d.arrayNext()) { + r.resize(r.size() + m); + for (; start < r.size(); ++start) { + r[start] = GenericDatum(nn); + read(r[start], d, isResolving); + } + } + } + break; + case AVRO_MAP: + { + GenericMap& v = datum.value<GenericMap>(); + GenericMap::Value& r = v.value(); + const NodePtr& nn = v.schema()->leafAt(1); + r.resize(0); + size_t start = 0; + for (size_t m = d.mapStart(); m != 0; m = d.mapNext()) { + r.resize(r.size() + m); + for (; start < r.size(); ++start) { + d.decodeString(r[start].first); + r[start].second = GenericDatum(nn); + read(r[start].second, d, isResolving); + } + } + } + break; + default: + throw Exception(boost::format("Unknown schema type %1%") % + toString(datum.type())); + } +} + +void GenericReader::read(Decoder& d, GenericDatum& g, const ValidSchema& s) +{ + g = GenericDatum(s); + read(d, g); +} + +void GenericReader::read(Decoder& d, GenericDatum& g) +{ + read(g, d, dynamic_cast<ResolvingDecoder*>(&d) != 0); +} + +GenericWriter::GenericWriter(const ValidSchema& s, const EncoderPtr& encoder) : + schema_(s), encoder_(encoder) +{ +} + +void GenericWriter::write(const GenericDatum& datum) const +{ + write(datum, *encoder_); +} + +void GenericWriter::write(const GenericDatum& datum, Encoder& e) +{ + if (datum.isUnion()) { + e.encodeUnionIndex(datum.unionBranch()); + } + switch (datum.type()) { + case AVRO_NULL: + e.encodeNull(); + break; + case AVRO_BOOL: + e.encodeBool(datum.value<bool>()); + break; + case AVRO_INT: + e.encodeInt(datum.value<int32_t>()); + break; + case AVRO_LONG: + e.encodeLong(datum.value<int64_t>()); + break; + case AVRO_FLOAT: + e.encodeFloat(datum.value<float>()); + break; + case AVRO_DOUBLE: + e.encodeDouble(datum.value<double>()); + break; + case AVRO_STRING: + e.encodeString(datum.value<string>()); + break; + case AVRO_BYTES: + e.encodeBytes(datum.value<bytes>()); + break; + case AVRO_FIXED: + e.encodeFixed(datum.value<GenericFixed>().value()); + break; + case AVRO_RECORD: + { + const GenericRecord& r = datum.value<GenericRecord>(); + size_t c = r.schema()->leaves(); + for (size_t i = 0; i < c; ++i) { + write(r.fieldAt(i), e); + } + } + break; + case AVRO_ENUM: + e.encodeEnum(datum.value<GenericEnum>().value()); + break; + case AVRO_ARRAY: + { + const GenericArray::Value& r = datum.value<GenericArray>().value(); + e.arrayStart(); + if (! r.empty()) { + e.setItemCount(r.size()); + for (GenericArray::Value::const_iterator it = r.begin(); + it != r.end(); ++it) { + e.startItem(); + write(*it, e); + } + } + e.arrayEnd(); + } + break; + case AVRO_MAP: + { + const GenericMap::Value& r = datum.value<GenericMap>().value(); + e.mapStart(); + if (! r.empty()) { + e.setItemCount(r.size()); + for (GenericMap::Value::const_iterator it = r.begin(); + it != r.end(); ++it) { + e.startItem(); + e.encodeString(it->first); + write(it->second, e); + } + } + e.mapEnd(); + } + break; + default: + throw Exception(boost::format("Unknown schema type %1%") % + toString(datum.type())); + } +} + +void GenericWriter::write(Encoder& e, const GenericDatum& g) +{ + write(g, e); +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/GenericDatum.cc b/contrib/libs/apache/avro/impl/GenericDatum.cc new file mode 100644 index 0000000000..cdf9006eef --- /dev/null +++ b/contrib/libs/apache/avro/impl/GenericDatum.cc @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GenericDatum.hh" +#include "NodeImpl.hh" + +using std::string; +using std::vector; + +namespace avro { + +GenericDatum::GenericDatum(const ValidSchema& schema) : + type_(schema.root()->type()), + logicalType_(schema.root()->logicalType()) +{ + init(schema.root()); +} + +GenericDatum::GenericDatum(const NodePtr& schema) : + type_(schema->type()), + logicalType_(schema->logicalType()) +{ + init(schema); +} + +void GenericDatum::init(const NodePtr& schema) +{ + NodePtr sc = schema; + if (type_ == AVRO_SYMBOLIC) { + sc = resolveSymbol(schema); + type_ = sc->type(); + logicalType_ = sc->logicalType(); + } + switch (type_) { + case AVRO_NULL: + break; + case AVRO_BOOL: + value_ = bool(); + break; + case AVRO_INT: + value_ = int32_t(); + break; + case AVRO_LONG: + value_ = int64_t(); + break; + case AVRO_FLOAT: + value_ = float(); + break; + case AVRO_DOUBLE: + value_ = double(); + break; + case AVRO_STRING: + value_ = string(); + break; + case AVRO_BYTES: + value_ = vector<uint8_t>(); + break; + case AVRO_FIXED: + value_ = GenericFixed(sc); + break; + case AVRO_RECORD: + value_ = GenericRecord(sc); + break; + case AVRO_ENUM: + value_ = GenericEnum(sc); + break; + case AVRO_ARRAY: + value_ = GenericArray(sc); + break; + case AVRO_MAP: + value_ = GenericMap(sc); + break; + case AVRO_UNION: + value_ = GenericUnion(sc); + break; + default: + throw Exception(boost::format("Unknown schema type %1%") % + toString(type_)); + } +} + +GenericRecord::GenericRecord(const NodePtr& schema) : + GenericContainer(AVRO_RECORD, schema) { + fields_.resize(schema->leaves()); + for (size_t i = 0; i < schema->leaves(); ++i) { + fields_[i] = GenericDatum(schema->leafAt(i)); + } +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/LogicalType.cc b/contrib/libs/apache/avro/impl/LogicalType.cc new file mode 100644 index 0000000000..a0d9cc3b6f --- /dev/null +++ b/contrib/libs/apache/avro/impl/LogicalType.cc @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Exception.hh" +#include "LogicalType.hh" + +namespace avro { + +LogicalType::LogicalType(Type type) + : type_(type), precision_(0), scale_(0) {} + +LogicalType::Type LogicalType::type() const { + return type_; +} + +void LogicalType::setPrecision(int precision) { + if (type_ != DECIMAL) { + throw Exception("Only logical type DECIMAL can have precision"); + } + if (precision <= 0) { + throw Exception(boost::format("Precision cannot be: %1%") % precision); + } + precision_ = precision; +} + +void LogicalType::setScale(int scale) { + if (type_ != DECIMAL) { + throw Exception("Only logical type DECIMAL can have scale"); + } + if (scale < 0) { + throw Exception(boost::format("Scale cannot be: %1%") % scale); + } + scale_ = scale; +} + +void LogicalType::printJson(std::ostream& os) const { + switch (type_) { + case LogicalType::NONE: + break; + case LogicalType::DECIMAL: + os << "\"logicalType\": \"decimal\""; + os << ", \"precision\": " << precision_; + os << ", \"scale\": " << scale_; + break; + case DATE: + os << "\"logicalType\": \"date\""; + break; + case TIME_MILLIS: + os << "\"logicalType\": \"time-millis\""; + break; + case TIME_MICROS: + os << "\"logicalType\": \"time-micros\""; + break; + case TIMESTAMP_MILLIS: + os << "\"logicalType\": \"timestamp-millis\""; + break; + case TIMESTAMP_MICROS: + os << "\"logicalType\": \"timestamp-micros\""; + break; + case DURATION: + os << "\"logicalType\": \"duration\""; + break; + case UUID: + os << "\"logicalType\": \"uuid\""; + break; + } +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Node.cc b/contrib/libs/apache/avro/impl/Node.cc new file mode 100644 index 0000000000..bb510cc147 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Node.cc @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cmath> + +#include "Node.hh" + +namespace avro { + +using std::string; + +Node::~Node() +{ } + +Name::Name(const std::string& name) +{ + fullname(name); +} + +const string Name::fullname() const +{ + return (ns_.empty()) ? simpleName_ : ns_ + "." + simpleName_; +} + +void Name::fullname(const string& name) +{ + string::size_type n = name.find_last_of('.'); + if (n == string::npos) { + simpleName_ = name; + ns_.clear(); + } else { + ns_ = name.substr(0, n); + simpleName_ = name.substr(n + 1); + } + check(); +} + +bool Name::operator < (const Name& n) const +{ + return (ns_ < n.ns_) ? true : + (n.ns_ < ns_) ? false : + (simpleName_ < n.simpleName_); +} + +static bool invalidChar1(char c) +{ + return !isalnum(c) && c != '_' && c != '.' && c != '$'; +} + +static bool invalidChar2(char c) +{ + return !isalnum(c) && c != '_'; +} + +void Name::check() const +{ + if (! ns_.empty() && (ns_[0] == '.' || ns_[ns_.size() - 1] == '.' || std::find_if(ns_.begin(), ns_.end(), invalidChar1) != ns_.end())) { + throw Exception("Invalid namespace: " + ns_); + } + if (simpleName_.empty() || std::find_if(simpleName_.begin(), simpleName_.end(), invalidChar2) != simpleName_.end()) { + throw Exception("Invalid name: " + simpleName_); + } +} + +bool Name::operator == (const Name& n) const +{ + return ns_ == n.ns_ && simpleName_ == n.simpleName_; +} + +void Node::setLogicalType(LogicalType logicalType) { + checkLock(); + + // Check that the logical type is applicable to the node type. + switch (logicalType.type()) { + case LogicalType::NONE: + break; + case LogicalType::DECIMAL: { + if (type_ != AVRO_BYTES && type_ != AVRO_FIXED) { + throw Exception("DECIMAL logical type can annotate " + "only BYTES or FIXED type"); + } + if (type_ == AVRO_FIXED) { + // Max precision that can be supported by the current size of + // the FIXED type. + long maxPrecision = floor(log10(2.0) * (8.0 * fixedSize() - 1)); + if (logicalType.precision() > maxPrecision) { + throw Exception( + boost::format( + "DECIMAL precision %1% is too large for the " + "FIXED type of size %2%, precision cannot be " + "larget than %3%") % logicalType.precision() % + fixedSize() % maxPrecision); + } + } + if (logicalType.scale() > logicalType.precision()) { + throw Exception("DECIMAL scale cannot exceed precision"); + } + break; + } + case LogicalType::DATE: + if (type_ != AVRO_INT) { + throw Exception("DATE logical type can only annotate INT type"); + } + break; + case LogicalType::TIME_MILLIS: + if (type_ != AVRO_INT) { + throw Exception("TIME-MILLIS logical type can only annotate " + "INT type"); + } + break; + case LogicalType::TIME_MICROS: + if (type_ != AVRO_LONG) { + throw Exception("TIME-MICROS logical type can only annotate " + "LONG type"); + } + break; + case LogicalType::TIMESTAMP_MILLIS: + if (type_ != AVRO_LONG) { + throw Exception("TIMESTAMP-MILLIS logical type can only annotate " + "LONG type"); + } + break; + case LogicalType::TIMESTAMP_MICROS: + if (type_ != AVRO_LONG) { + throw Exception("TIMESTAMP-MICROS logical type can only annotate " + "LONG type"); + } + break; + case LogicalType::DURATION: + if (type_ != AVRO_FIXED || fixedSize() != 12) { + throw Exception("DURATION logical type can only annotate " + "FIXED type of size 12"); + } + break; + case LogicalType::UUID: + if (type_ != AVRO_STRING) { + throw Exception("UUID logical type can only annotate " + "STRING type"); + } + break; + } + + logicalType_ = logicalType; +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/NodeImpl.cc b/contrib/libs/apache/avro/impl/NodeImpl.cc new file mode 100644 index 0000000000..4a0acb92c0 --- /dev/null +++ b/contrib/libs/apache/avro/impl/NodeImpl.cc @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <sstream> +#include <iomanip> +#include <boost/algorithm/string/replace.hpp> +#include "NodeImpl.hh" + + +using std::string; +namespace avro { + +namespace { + +// Escape string for serialization. +string escape(const string &unescaped) { + string s; + s.reserve(unescaped.length()); + for (std::string::const_iterator it = unescaped.begin(); it != unescaped.end(); ++it) { + char c = *it; + switch (c) { + case '\\': + case '"': + case '/': + s += '\\'; + s += c; + break; + case '\b': + s += '\\'; + s += 'b'; + break; + case '\f': + s += '\f'; + break; + case '\n': + s += '\\'; + s += 'n'; + break; + case '\r': + s += '\\'; + s += 'r'; + break; + case '\t': + s += '\\'; + s += 't'; + break; + default: + if (!std::iscntrl(c, std::locale::classic())) { + s += c; + continue; + } + s += intToHex(static_cast<unsigned int>(c)); + break; + } + } + return s; +} + +// Wrap an indentation in a struct for ostream operator<< +struct indent { + indent(int depth) : + d(depth) + { } + int d; +}; + +/// ostream operator for indent +std::ostream& operator <<(std::ostream &os, indent x) +{ + static const string spaces(" "); + while (x.d--) { + os << spaces; + } + return os; +} + +} // anonymous namespace + +const int kByteStringSize = 6; + +SchemaResolution +NodePrimitive::resolve(const Node &reader) const +{ + if (type() == reader.type()) { + return RESOLVE_MATCH; + } + + switch ( type() ) { + + case AVRO_INT: + + if ( reader.type() == AVRO_LONG ) { + return RESOLVE_PROMOTABLE_TO_LONG; + } + + // fall-through intentional + + case AVRO_LONG: + + if (reader.type() == AVRO_FLOAT) { + return RESOLVE_PROMOTABLE_TO_FLOAT; + } + + // fall-through intentional + + case AVRO_FLOAT: + + if (reader.type() == AVRO_DOUBLE) { + return RESOLVE_PROMOTABLE_TO_DOUBLE; + } + + default: + break; + } + + return furtherResolution(reader); +} + +SchemaResolution +NodeRecord::resolve(const Node &reader) const +{ + if (reader.type() == AVRO_RECORD) { + if (name() == reader.name()) { + return RESOLVE_MATCH; + } + } + return furtherResolution(reader); +} + +SchemaResolution +NodeEnum::resolve(const Node &reader) const +{ + if (reader.type() == AVRO_ENUM) { + return (name() == reader.name()) ? RESOLVE_MATCH : RESOLVE_NO_MATCH; + } + return furtherResolution(reader); +} + +SchemaResolution +NodeArray::resolve(const Node &reader) const +{ + if (reader.type() == AVRO_ARRAY) { + const NodePtr &arrayType = leafAt(0); + return arrayType->resolve(*reader.leafAt(0)); + } + return furtherResolution(reader); +} + +SchemaResolution +NodeMap::resolve(const Node &reader) const +{ + if (reader.type() == AVRO_MAP) { + const NodePtr &mapType = leafAt(1); + return mapType->resolve(*reader.leafAt(1)); + } + return furtherResolution(reader); +} + +SchemaResolution +NodeUnion::resolve(const Node &reader) const +{ + + // If the writer is union, resolution only needs to occur when the selected + // type of the writer is known, so this function is not very helpful. + // + // In this case, this function returns if there is a possible match given + // any writer type, so just search type by type returning the best match + // found. + + SchemaResolution match = RESOLVE_NO_MATCH; + for (size_t i=0; i < leaves(); ++i) { + const NodePtr &node = leafAt(i); + SchemaResolution thisMatch = node->resolve(reader); + if (thisMatch == RESOLVE_MATCH) { + match = thisMatch; + break; + } + if (match == RESOLVE_NO_MATCH) { + match = thisMatch; + } + } + return match; +} + +SchemaResolution +NodeFixed::resolve(const Node &reader) const +{ + if (reader.type() == AVRO_FIXED) { + return ( + (reader.fixedSize() == fixedSize()) && + (reader.name() == name()) + ) ? + RESOLVE_MATCH : RESOLVE_NO_MATCH; + } + return furtherResolution(reader); +} + +SchemaResolution +NodeSymbolic::resolve(const Node &reader) const +{ + const NodePtr &node = leafAt(0); + return node->resolve(reader); +} + +void +NodePrimitive::printJson(std::ostream &os, int depth) const +{ + bool hasLogicalType = logicalType().type() != LogicalType::NONE; + + if (hasLogicalType) { + os << "{\n" << indent(depth) << "\"type\": "; + } + + os << '\"' << type() << '\"'; + + if (hasLogicalType) { + os << ",\n" << indent(depth); + logicalType().printJson(os); + os << "\n}"; + } + if (getDoc().size()) { + os << ",\n" << indent(depth) << "\"doc\": \"" + << escape(getDoc()) << "\""; + } +} + +void +NodeSymbolic::printJson(std::ostream &os, int depth) const +{ + os << '\"' << nameAttribute_.get() << '\"'; + if (getDoc().size()) { + os << ",\n" << indent(depth) << "\"doc\": \"" + << escape(getDoc()) << "\""; + } +} + +static void printName(std::ostream& os, const Name& n, int depth) +{ + if (!n.ns().empty()) { + os << indent(depth) << "\"namespace\": \"" << n.ns() << "\",\n"; + } + os << indent(depth) << "\"name\": \"" << n.simpleName() << "\",\n"; +} + +void +NodeRecord::printJson(std::ostream &os, int depth) const +{ + os << "{\n"; + os << indent(++depth) << "\"type\": \"record\",\n"; + printName(os, nameAttribute_.get(), depth); + if (getDoc().size()) { + os << indent(depth) << "\"doc\": \"" + << escape(getDoc()) << "\",\n"; + } + os << indent(depth) << "\"fields\": ["; + + size_t fields = leafAttributes_.size(); + ++depth; + // Serialize "default" field: + assert(defaultValues.empty() || (defaultValues.size() == fields)); + for (size_t i = 0; i < fields; ++i) { + if (i > 0) { + os << ','; + } + os << '\n' << indent(depth) << "{\n"; + os << indent(++depth) << "\"name\": \"" << leafNameAttributes_.get(i) << "\",\n"; + os << indent(depth) << "\"type\": "; + leafAttributes_.get(i)->printJson(os, depth); + + if (!defaultValues.empty()) { + if (!defaultValues[i].isUnion() && + defaultValues[i].type() == AVRO_NULL) { + // No "default" field. + } else { + os << ",\n" << indent(depth) << "\"default\": "; + leafAttributes_.get(i)->printDefaultToJson(defaultValues[i], os, + depth); + } + } + os << '\n'; + os << indent(--depth) << '}'; + } + os << '\n' << indent(--depth) << "]\n"; + os << indent(--depth) << '}'; +} + +void NodePrimitive::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(isPrimitive(g.type())); + + switch (g.type()) { + case AVRO_NULL: + os << "null"; + break; + case AVRO_BOOL: + os << (g.value<bool>() ? "true" : "false"); + break; + case AVRO_INT: + os << g.value<int32_t>(); + break; + case AVRO_LONG: + os << g.value<int64_t>(); + break; + case AVRO_FLOAT: + os << g.value<float>(); + break; + case AVRO_DOUBLE: + os << g.value<double>(); + break; + case AVRO_STRING: + os << "\"" << escape(g.value<string>()) << "\""; + break; + case AVRO_BYTES: { + // Convert to a string: + const std::vector<uint8_t> &vg = g.value<std::vector<uint8_t> >(); + string s; + s.resize(vg.size() * kByteStringSize); + for (unsigned int i = 0; i < vg.size(); i++) { + string hex_string = intToHex(static_cast<int>(vg[i])); + s.replace(i*kByteStringSize, kByteStringSize, hex_string); + } + os << "\"" << s << "\""; + } break; + default: + break; + } +} + +void NodeEnum::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(g.type() == AVRO_ENUM); + os << "\"" << g.value<GenericEnum>().symbol() << "\""; +} + +void NodeFixed::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(g.type() == AVRO_FIXED); + // ex: "\uOOff" + // Convert to a string + const std::vector<uint8_t> &vg = g.value<GenericFixed>().value(); + string s; + s.resize(vg.size() * kByteStringSize); + for (unsigned int i = 0; i < vg.size(); i++) { + string hex_string = intToHex(static_cast<int>(vg[i])); + s.replace(i*kByteStringSize, kByteStringSize, hex_string); + } + os << "\"" << s << "\""; +} + +void NodeUnion::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + leafAt(0)->printDefaultToJson(g, os, depth); +} + +void NodeArray::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(g.type() == AVRO_ARRAY); + // ex: "default": [1] + if (g.value<GenericArray>().value().empty()) { + os << "[]"; + } else { + os << "[\n"; + depth++; + + // Serialize all values of the array with recursive calls: + for (unsigned int i = 0; i < g.value<GenericArray>().value().size(); i++) { + if (i > 0) { + os << ",\n"; + } + os << indent(depth); + leafAt(0)->printDefaultToJson(g.value<GenericArray>().value()[i], os, + depth); + } + os << "\n" << indent(--depth) << "]"; + } +} + +void NodeSymbolic::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + getNode()->printDefaultToJson(g, os, depth); +} + +void NodeRecord::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(g.type() == AVRO_RECORD); + if (g.value<GenericRecord>().fieldCount() == 0) { + os << "{}"; + } else { + os << "{\n"; + + // Serialize all fields of the record with recursive calls: + for (unsigned int i = 0; i < g.value<GenericRecord>().fieldCount(); i++) { + if (i == 0) { + ++depth; + } else { // i > 0 + os << ",\n"; + } + + os << indent(depth) << "\""; + assert(i < leaves()); + os << leafNameAttributes_.get(i); + os << "\": "; + + // Recursive call on child node to be able to get the name attribute + // (In case of a record we need the name of the leaves (contained in + // 'this')) + leafAt(i)->printDefaultToJson(g.value<GenericRecord>().fieldAt(i), os, + depth); + } + os << "\n" << indent(--depth) << "}"; + } +} + +void NodeMap::printDefaultToJson(const GenericDatum &g, std::ostream &os, + int depth) const { + assert(g.type() == AVRO_MAP); + //{"a": 1} + if (g.value<GenericMap>().value().empty()) { + os << "{}"; + } else { + os << "{\n"; + + for (unsigned int i = 0; i < g.value<GenericMap>().value().size(); i++) { + if (i == 0) { + ++depth; + } else { + os << ",\n"; + } + os << indent(depth) << "\"" << g.value<GenericMap>().value()[i].first + << "\": "; + + leafAt(i)->printDefaultToJson(g.value<GenericMap>().value()[i].second, os, + depth); + } + os << "\n" << indent(--depth) << "}"; + } +} + +void +NodeEnum::printJson(std::ostream &os, int depth) const +{ + os << "{\n"; + os << indent(++depth) << "\"type\": \"enum\",\n"; + if (getDoc().size()) { + os << indent(depth) << "\"doc\": \"" + << escape(getDoc()) << "\",\n"; + } + printName(os, nameAttribute_.get(), depth); + os << indent(depth) << "\"symbols\": [\n"; + + int names = leafNameAttributes_.size(); + ++depth; + for (int i = 0; i < names; ++i) { + if (i > 0) { + os << ",\n"; + } + os << indent(depth) << '\"' << leafNameAttributes_.get(i) << '\"'; + } + os << '\n'; + os << indent(--depth) << "]\n"; + os << indent(--depth) << '}'; +} + +void +NodeArray::printJson(std::ostream &os, int depth) const +{ + os << "{\n"; + os << indent(depth+1) << "\"type\": \"array\",\n"; + if (getDoc().size()) { + os << indent(depth+1) << "\"doc\": \"" + << escape(getDoc()) << "\",\n"; + } + os << indent(depth+1) << "\"items\": "; + leafAttributes_.get()->printJson(os, depth+1); + os << '\n'; + os << indent(depth) << '}'; +} + +void +NodeMap::printJson(std::ostream &os, int depth) const +{ + os << "{\n"; + os << indent(depth+1) <<"\"type\": \"map\",\n"; + if (getDoc().size()) { + os << indent(depth+1) << "\"doc\": \"" + << escape(getDoc()) << "\",\n"; + } + os << indent(depth+1) << "\"values\": "; + leafAttributes_.get(1)->printJson(os, depth+1); + os << '\n'; + os << indent(depth) << '}'; +} + +void +NodeUnion::printJson(std::ostream &os, int depth) const +{ + os << "[\n"; + int fields = leafAttributes_.size(); + ++depth; + for (int i = 0; i < fields; ++i) { + if (i > 0) { + os << ",\n"; + } + os << indent(depth); + leafAttributes_.get(i)->printJson(os, depth); + } + os << '\n'; + os << indent(--depth) << ']'; +} + +void +NodeFixed::printJson(std::ostream &os, int depth) const +{ + os << "{\n"; + os << indent(++depth) << "\"type\": \"fixed\",\n"; + if (getDoc().size()) { + os << indent(depth) << "\"doc\": \"" + << escape(getDoc()) << "\",\n"; + } + printName(os, nameAttribute_.get(), depth); + os << indent(depth) << "\"size\": " << sizeAttribute_.get(); + + if (logicalType().type() != LogicalType::NONE) { + os << ",\n" << indent(depth); + logicalType().printJson(os); + } + + os << "\n" << indent(--depth) << '}'; +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Resolver.cc b/contrib/libs/apache/avro/impl/Resolver.cc new file mode 100644 index 0000000000..43467c028d --- /dev/null +++ b/contrib/libs/apache/avro/impl/Resolver.cc @@ -0,0 +1,872 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> +#include "Resolver.hh" +#include "Layout.hh" +#include "NodeImpl.hh" +#include "ValidSchema.hh" +#include "Reader.hh" +#include "AvroTraits.hh" + +namespace avro { +using std::unique_ptr; + +class ResolverFactory; +typedef std::shared_ptr<Resolver> ResolverPtr; +typedef std::vector<std::unique_ptr<Resolver> > ResolverPtrVector; + +// #define DEBUG_VERBOSE + +#ifdef DEBUG_VERBOSE +#define DEBUG_OUT(str) std::cout << str << '\n' +#else +class NoOp {}; +template<typename T> NoOp& operator<<(NoOp &noOp, const T&) { + return noOp; +} +NoOp noop; +#define DEBUG_OUT(str) noop << str +#endif + +template<typename T> +class PrimitiveSkipper : public Resolver +{ + public: + + PrimitiveSkipper() : + Resolver() + {} + + virtual void parse(Reader &reader, uint8_t *address) const + { + T val; + reader.readValue(val); + DEBUG_OUT("Skipping " << val); + } +}; + +template<typename T> +class PrimitiveParser : public Resolver +{ + public: + + PrimitiveParser(const PrimitiveLayout &offset) : + Resolver(), + offset_(offset.offset()) + {} + + virtual void parse(Reader &reader, uint8_t *address) const + { + T* location = reinterpret_cast<T *> (address + offset_); + reader.readValue(*location); + DEBUG_OUT("Reading " << *location); + } + + private: + + size_t offset_; +}; + +template<typename WT, typename RT> +class PrimitivePromoter : public Resolver +{ + public: + + PrimitivePromoter(const PrimitiveLayout &offset) : + Resolver(), + offset_(offset.offset()) + {} + + virtual void parse(Reader &reader, uint8_t *address) const + { + parseIt<WT>(reader, address); + } + + private: + + void parseIt(Reader &reader, uint8_t *address, const std::true_type &) const + { + WT val; + reader.readValue(val); + RT *location = reinterpret_cast<RT *> (address + offset_); + *location = static_cast<RT>(val); + DEBUG_OUT("Promoting " << val); + } + + void parseIt(Reader &reader, uint8_t *address, const std::false_type &) const + { } + + template<typename T> + void parseIt(Reader &reader, uint8_t *address) const + { + parseIt(reader, address, is_promotable<T>()); + } + + size_t offset_; +}; + +template <> +class PrimitiveSkipper<std::vector<uint8_t> > : public Resolver +{ + public: + + PrimitiveSkipper() : + Resolver() + {} + + virtual void parse(Reader &reader, uint8_t *address) const + { + std::vector<uint8_t> val; + reader.readBytes(val); + DEBUG_OUT("Skipping bytes"); + } +}; + +template <> +class PrimitiveParser<std::vector<uint8_t> > : public Resolver +{ + public: + + PrimitiveParser(const PrimitiveLayout &offset) : + Resolver(), + offset_(offset.offset()) + {} + + virtual void parse(Reader &reader, uint8_t *address) const + { + std::vector<uint8_t> *location = reinterpret_cast<std::vector<uint8_t> *> (address + offset_); + reader.readBytes(*location); + DEBUG_OUT("Reading bytes"); + } + + private: + + size_t offset_; +}; + +class RecordSkipper : public Resolver +{ + public: + + RecordSkipper(ResolverFactory &factory, const NodePtr &writer); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Skipping record"); + + reader.readRecord(); + size_t steps = resolvers_.size(); + for(size_t i = 0; i < steps; ++i) { + resolvers_[i]->parse(reader, address); + } + } + + protected: + + ResolverPtrVector resolvers_; + +}; + +class RecordParser : public Resolver +{ + public: + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading record"); + + reader.readRecord(); + size_t steps = resolvers_.size(); + for(size_t i = 0; i < steps; ++i) { + resolvers_[i]->parse(reader, address); + } + } + + RecordParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets); + + protected: + + ResolverPtrVector resolvers_; + +}; + + +class MapSkipper : public Resolver +{ + public: + + MapSkipper(ResolverFactory &factory, const NodePtr &writer); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Skipping map"); + + std::string key; + int64_t size = 0; + do { + size = reader.readMapBlockSize(); + for(int64_t i = 0; i < size; ++i) { + reader.readValue(key); + resolver_->parse(reader, address); + } + } while (size != 0); + } + + protected: + + ResolverPtr resolver_; +}; + + +class MapParser : public Resolver +{ + public: + + typedef uint8_t *(*GenericMapSetter)(uint8_t *map, const std::string &key); + + MapParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading map"); + + uint8_t *mapAddress = address + offset_; + + std::string key; + GenericMapSetter* setter = reinterpret_cast<GenericMapSetter *> (address + setFuncOffset_); + + int64_t size = 0; + do { + size = reader.readMapBlockSize(); + for(int64_t i = 0; i < size; ++i) { + reader.readValue(key); + + // create a new map entry and get the address + uint8_t *location = (*setter)(mapAddress, key); + resolver_->parse(reader, location); + } + } while (size != 0); + } + + protected: + + ResolverPtr resolver_; + size_t offset_; + size_t setFuncOffset_; +}; + +class ArraySkipper : public Resolver +{ + public: + + ArraySkipper(ResolverFactory &factory, const NodePtr &writer); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Skipping array"); + + int64_t size = 0; + do { + size = reader.readArrayBlockSize(); + for(int64_t i = 0; i < size; ++i) { + resolver_->parse(reader, address); + } + } while (size != 0); + } + + protected: + + ResolverPtr resolver_; +}; + +typedef uint8_t *(*GenericArraySetter)(uint8_t *array); + +class ArrayParser : public Resolver +{ + public: + + ArrayParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading array"); + + uint8_t *arrayAddress = address + offset_; + + GenericArraySetter* setter = reinterpret_cast<GenericArraySetter *> (address + setFuncOffset_); + + int64_t size = 0; + do { + size = reader.readArrayBlockSize(); + for(int64_t i = 0; i < size; ++i) { + // create a new map entry and get the address + uint8_t *location = (*setter)(arrayAddress); + resolver_->parse(reader, location); + } + } while (size != 0); + } + + protected: + + ArrayParser() : + Resolver() + {} + + ResolverPtr resolver_; + size_t offset_; + size_t setFuncOffset_; +}; + +class EnumSkipper : public Resolver +{ + public: + + EnumSkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver() + { } + + virtual void parse(Reader &reader, uint8_t *address) const + { + int64_t val = reader.readEnum(); + DEBUG_OUT("Skipping enum" << val); + } +}; + +class EnumParser : public Resolver +{ + public: + + enum EnumRepresentation { + VAL + }; + + EnumParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver(), + offset_(offsets.at(0).offset()), + readerSize_(reader->names()) + { + const size_t writerSize = writer->names(); + + mapping_.reserve(writerSize); + + for(size_t i = 0; i < writerSize; ++i) { + const std::string &name = writer->nameAt(i); + size_t readerIndex = readerSize_; + reader->nameIndex(name, readerIndex); + mapping_.push_back(readerIndex); + } + } + + virtual void parse(Reader &reader, uint8_t *address) const + { + size_t val = static_cast<size_t>(reader.readEnum()); + assert(static_cast<size_t>(val) < mapping_.size()); + + if(mapping_[val] < readerSize_) { + EnumRepresentation* location = reinterpret_cast<EnumRepresentation *> (address + offset_); + *location = static_cast<EnumRepresentation>(mapping_[val]); + DEBUG_OUT("Setting enum" << *location); + } + } + +protected: + + size_t offset_; + size_t readerSize_; + std::vector<size_t> mapping_; + +}; + +class UnionSkipper : public Resolver +{ + public: + + UnionSkipper(ResolverFactory &factory, const NodePtr &writer); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Skipping union"); + size_t choice = static_cast<size_t>(reader.readUnion()); + resolvers_[choice]->parse(reader, address); + } + + protected: + + ResolverPtrVector resolvers_; +}; + + +class UnionParser : public Resolver +{ + public: + + typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t); + + UnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading union"); + size_t writerChoice = static_cast<size_t>(reader.readUnion()); + int64_t *readerChoice = reinterpret_cast<int64_t *>(address + choiceOffset_); + + *readerChoice = choiceMapping_[writerChoice]; + GenericUnionSetter* setter = reinterpret_cast<GenericUnionSetter *> (address + setFuncOffset_); + uint8_t *value = reinterpret_cast<uint8_t *> (address + offset_); + uint8_t *location = (*setter)(value, *readerChoice); + + resolvers_[writerChoice]->parse(reader, location); + } + + protected: + + ResolverPtrVector resolvers_; + std::vector<int64_t> choiceMapping_; + size_t offset_; + size_t choiceOffset_; + size_t setFuncOffset_; +}; + +class UnionToNonUnionParser : public Resolver +{ + public: + + typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t); + + UnionToNonUnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const Layout &offsets); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading union to non-union"); + size_t choice = static_cast<size_t>(reader.readUnion()); + resolvers_[choice]->parse(reader, address); + } + + protected: + + ResolverPtrVector resolvers_; +}; + +class NonUnionToUnionParser : public Resolver +{ + public: + + typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t); + + NonUnionToUnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets); + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading non-union to union"); + + int64_t *choice = reinterpret_cast<int64_t *>(address + choiceOffset_); + *choice = choice_; + GenericUnionSetter* setter = reinterpret_cast<GenericUnionSetter *> (address + setFuncOffset_); + uint8_t *value = reinterpret_cast<uint8_t *> (address + offset_); + uint8_t *location = (*setter)(value, choice_); + + resolver_->parse(reader, location); + } + + protected: + + ResolverPtr resolver_; + size_t choice_; + size_t offset_; + size_t choiceOffset_; + size_t setFuncOffset_; +}; + +class FixedSkipper : public Resolver +{ + public: + + FixedSkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver() + { + size_ = writer->fixedSize(); + } + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Skipping fixed"); + std::unique_ptr<uint8_t[]> val(new uint8_t[size_]); + reader.readFixed(&val[0], size_); + } + + protected: + + int size_; + +}; + +class FixedParser : public Resolver +{ + public: + + FixedParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver() + { + size_ = writer->fixedSize(); + offset_ = offsets.at(0).offset(); + } + + virtual void parse(Reader &reader, uint8_t *address) const + { + DEBUG_OUT("Reading fixed"); + uint8_t *location = reinterpret_cast<uint8_t *> (address + offset_); + reader.readFixed(location, size_); + } + + protected: + + int size_; + size_t offset_; + +}; + + +class ResolverFactory : private boost::noncopyable { + + template<typename T> + unique_ptr<Resolver> + constructPrimitiveSkipper(const NodePtr &writer) + { + return unique_ptr<Resolver>(new PrimitiveSkipper<T>()); + } + + template<typename T> + unique_ptr<Resolver> + constructPrimitive(const NodePtr &writer, const NodePtr &reader, const Layout &offset) + { + unique_ptr<Resolver> instruction; + + SchemaResolution match = writer->resolve(*reader); + + if (match == RESOLVE_NO_MATCH) { + instruction = unique_ptr<Resolver>(new PrimitiveSkipper<T>()); + } + else if (reader->type() == AVRO_UNION) { + const CompoundLayout &compoundLayout = static_cast<const CompoundLayout &>(offset); + instruction = unique_ptr<Resolver>(new NonUnionToUnionParser(*this, writer, reader, compoundLayout)); + } + else if (match == RESOLVE_MATCH) { + const PrimitiveLayout &primitiveLayout = static_cast<const PrimitiveLayout &>(offset); + instruction = unique_ptr<Resolver>(new PrimitiveParser<T>(primitiveLayout)); + } + else if(match == RESOLVE_PROMOTABLE_TO_LONG) { + const PrimitiveLayout &primitiveLayout = static_cast<const PrimitiveLayout &>(offset); + instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, int64_t>(primitiveLayout)); + } + else if(match == RESOLVE_PROMOTABLE_TO_FLOAT) { + const PrimitiveLayout &primitiveLayout = static_cast<const PrimitiveLayout &>(offset); + instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, float>(primitiveLayout)); + } + else if(match == RESOLVE_PROMOTABLE_TO_DOUBLE) { + const PrimitiveLayout &primitiveLayout = static_cast<const PrimitiveLayout &>(offset); + instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, double>(primitiveLayout)); + } + else { + assert(0); + } + return instruction; + } + + template<typename Skipper> + unique_ptr<Resolver> + constructCompoundSkipper(const NodePtr &writer) + { + return unique_ptr<Resolver>(new Skipper(*this, writer)); + } + + + template<typename Parser, typename Skipper> + unique_ptr<Resolver> + constructCompound(const NodePtr &writer, const NodePtr &reader, const Layout &offset) + { + unique_ptr<Resolver> instruction; + + SchemaResolution match = RESOLVE_NO_MATCH; + + match = writer->resolve(*reader); + + if (match == RESOLVE_NO_MATCH) { + instruction = unique_ptr<Resolver>(new Skipper(*this, writer)); + } + else if(writer->type() != AVRO_UNION && reader->type() == AVRO_UNION) { + const CompoundLayout &compoundLayout = dynamic_cast<const CompoundLayout &>(offset); + instruction = unique_ptr<Resolver>(new NonUnionToUnionParser(*this, writer, reader, compoundLayout)); + } + else if(writer->type() == AVRO_UNION && reader->type() != AVRO_UNION) { + instruction = unique_ptr<Resolver>(new UnionToNonUnionParser(*this, writer, reader, offset)); + } + else { + const CompoundLayout &compoundLayout = dynamic_cast<const CompoundLayout &>(offset); + instruction = unique_ptr<Resolver>(new Parser(*this, writer, reader, compoundLayout)); + } + + return instruction; + } + + public: + + unique_ptr<Resolver> + construct(const NodePtr &writer, const NodePtr &reader, const Layout &offset) + { + + typedef unique_ptr<Resolver> (ResolverFactory::*BuilderFunc)(const NodePtr &writer, const NodePtr &reader, const Layout &offset); + + NodePtr currentWriter = (writer->type() == AVRO_SYMBOLIC) ? + resolveSymbol(writer) : writer; + + NodePtr currentReader = (reader->type() == AVRO_SYMBOLIC) ? + resolveSymbol(reader) : reader; + + static const BuilderFunc funcs[] = { + &ResolverFactory::constructPrimitive<std::string>, + &ResolverFactory::constructPrimitive<std::vector<uint8_t> >, + &ResolverFactory::constructPrimitive<int32_t>, + &ResolverFactory::constructPrimitive<int64_t>, + &ResolverFactory::constructPrimitive<float>, + &ResolverFactory::constructPrimitive<double>, + &ResolverFactory::constructPrimitive<bool>, + &ResolverFactory::constructPrimitive<Null>, + &ResolverFactory::constructCompound<RecordParser, RecordSkipper>, + &ResolverFactory::constructCompound<EnumParser, EnumSkipper>, + &ResolverFactory::constructCompound<ArrayParser, ArraySkipper>, + &ResolverFactory::constructCompound<MapParser, MapSkipper>, + &ResolverFactory::constructCompound<UnionParser, UnionSkipper>, + &ResolverFactory::constructCompound<FixedParser, FixedSkipper> + }; + + static_assert((sizeof(funcs)/sizeof(BuilderFunc)) == (AVRO_NUM_TYPES), + "Invalid number of builder functions"); + + BuilderFunc func = funcs[currentWriter->type()]; + assert(func); + + return ((this)->*(func))(currentWriter, currentReader, offset); + } + + unique_ptr<Resolver> + skipper(const NodePtr &writer) + { + + typedef unique_ptr<Resolver> (ResolverFactory::*BuilderFunc)(const NodePtr &writer); + + NodePtr currentWriter = (writer->type() == AVRO_SYMBOLIC) ? + writer->leafAt(0) : writer; + + static const BuilderFunc funcs[] = { + &ResolverFactory::constructPrimitiveSkipper<std::string>, + &ResolverFactory::constructPrimitiveSkipper<std::vector<uint8_t> >, + &ResolverFactory::constructPrimitiveSkipper<int32_t>, + &ResolverFactory::constructPrimitiveSkipper<int64_t>, + &ResolverFactory::constructPrimitiveSkipper<float>, + &ResolverFactory::constructPrimitiveSkipper<double>, + &ResolverFactory::constructPrimitiveSkipper<bool>, + &ResolverFactory::constructPrimitiveSkipper<Null>, + &ResolverFactory::constructCompoundSkipper<RecordSkipper>, + &ResolverFactory::constructCompoundSkipper<EnumSkipper>, + &ResolverFactory::constructCompoundSkipper<ArraySkipper>, + &ResolverFactory::constructCompoundSkipper<MapSkipper>, + &ResolverFactory::constructCompoundSkipper<UnionSkipper>, + &ResolverFactory::constructCompoundSkipper<FixedSkipper> + }; + + static_assert((sizeof(funcs)/sizeof(BuilderFunc)) == (AVRO_NUM_TYPES), + "Invalid number of builder functions"); + + BuilderFunc func = funcs[currentWriter->type()]; + assert(func); + + return ((this)->*(func))(currentWriter); + } +}; + + +RecordSkipper::RecordSkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver() +{ + size_t leaves = writer->leaves(); + resolvers_.reserve(leaves); + for(size_t i = 0; i < leaves; ++i) { + const NodePtr &w = writer->leafAt(i); + resolvers_.push_back(factory.skipper(w)); + } +} + +RecordParser::RecordParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver() +{ + size_t leaves = writer->leaves(); + resolvers_.reserve(leaves); + for(size_t i = 0; i < leaves; ++i) { + + const NodePtr &w = writer->leafAt(i); + + const std::string &name = writer->nameAt(i); + + size_t readerIndex = 0; + bool found = reader->nameIndex(name, readerIndex); + + if(found) { + const NodePtr &r = reader->leafAt(readerIndex); + resolvers_.push_back(factory.construct(w, r, offsets.at(readerIndex))); + } + else { + resolvers_.push_back(factory.skipper(w)); + } + } +} + +MapSkipper::MapSkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver(), + resolver_(factory.skipper(writer->leafAt(1))) +{ } + +MapParser::MapParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver(), + resolver_(factory.construct(writer->leafAt(1), reader->leafAt(1), offsets.at(1))), + offset_(offsets.offset()), + setFuncOffset_( offsets.at(0).offset()) +{ } + +ArraySkipper::ArraySkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver(), + resolver_(factory.skipper(writer->leafAt(0))) +{ } + +ArrayParser::ArrayParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver(), + resolver_(factory.construct(writer->leafAt(0), reader->leafAt(0), offsets.at(1))), + offset_(offsets.offset()), + setFuncOffset_(offsets.at(0).offset()) +{ } + +UnionSkipper::UnionSkipper(ResolverFactory &factory, const NodePtr &writer) : + Resolver() +{ + size_t leaves = writer->leaves(); + resolvers_.reserve(leaves); + for(size_t i = 0; i < leaves; ++i) { + const NodePtr &w = writer->leafAt(i); + resolvers_.push_back(factory.skipper(w)); + } +} + +namespace { + +// asumes the writer is NOT a union, and the reader IS a union + +SchemaResolution +checkUnionMatch(const NodePtr &writer, const NodePtr &reader, size_t &index) +{ + SchemaResolution bestMatch = RESOLVE_NO_MATCH; + + index = 0; + size_t leaves = reader->leaves(); + + for(size_t i=0; i < leaves; ++i) { + + const NodePtr &leaf = reader->leafAt(i); + SchemaResolution newMatch = writer->resolve(*leaf); + + if(newMatch == RESOLVE_MATCH) { + bestMatch = newMatch; + index = i; + break; + } + if(bestMatch == RESOLVE_NO_MATCH) { + bestMatch = newMatch; + index = i; + } + } + + return bestMatch; +} + +}; + +UnionParser::UnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver(), + offset_(offsets.offset()), + choiceOffset_(offsets.at(0).offset()), + setFuncOffset_(offsets.at(1).offset()) +{ + + size_t leaves = writer->leaves(); + resolvers_.reserve(leaves); + choiceMapping_.reserve(leaves); + for(size_t i = 0; i < leaves; ++i) { + + // for each writer, we need a schema match for the reader + const NodePtr &w = writer->leafAt(i); + size_t index = 0; + + SchemaResolution match = checkUnionMatch(w, reader, index); + + if(match == RESOLVE_NO_MATCH) { + resolvers_.push_back(factory.skipper(w)); + // push back a non-sensical number + choiceMapping_.push_back(reader->leaves()); + } + else { + const NodePtr &r = reader->leafAt(index); + resolvers_.push_back(factory.construct(w, r, offsets.at(index+2))); + choiceMapping_.push_back(index); + } + } +} + +NonUnionToUnionParser::NonUnionToUnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : + Resolver(), + offset_(offsets.offset()), + choiceOffset_(offsets.at(0).offset()), + setFuncOffset_(offsets.at(1).offset()) +{ +#ifndef NDEBUG + SchemaResolution bestMatch = +#endif + checkUnionMatch(writer, reader, choice_); + assert(bestMatch != RESOLVE_NO_MATCH); + resolver_ = factory.construct(writer, reader->leafAt(choice_), offsets.at(choice_+2)); +} + +UnionToNonUnionParser::UnionToNonUnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const Layout &offsets) : + Resolver() +{ + size_t leaves = writer->leaves(); + resolvers_.reserve(leaves); + for(size_t i = 0; i < leaves; ++i) { + const NodePtr &w = writer->leafAt(i); + resolvers_.push_back(factory.construct(w, reader, offsets)); + } +} + +unique_ptr<Resolver> constructResolver(const ValidSchema &writerSchema, + const ValidSchema &readerSchema, + const Layout &readerLayout) +{ + ResolverFactory factory; + return factory.construct(writerSchema.root(), readerSchema.root(), readerLayout); +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/ResolverSchema.cc b/contrib/libs/apache/avro/impl/ResolverSchema.cc new file mode 100644 index 0000000000..f42946d692 --- /dev/null +++ b/contrib/libs/apache/avro/impl/ResolverSchema.cc @@ -0,0 +1,39 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ResolverSchema.hh" +#include "Resolver.hh" +#include "ValidSchema.hh" + +namespace avro { + +ResolverSchema::ResolverSchema( + const ValidSchema &writerSchema, + const ValidSchema &readerSchema, + const Layout &readerLayout) : + resolver_(constructResolver(writerSchema, readerSchema, readerLayout)) +{ } + +void +ResolverSchema::parse(Reader &reader, uint8_t *address) +{ + resolver_->parse(reader, address); +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Schema.cc b/contrib/libs/apache/avro/impl/Schema.cc new file mode 100644 index 0000000000..e6cfa45c2e --- /dev/null +++ b/contrib/libs/apache/avro/impl/Schema.cc @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "Schema.hh" + +namespace avro { + +Schema::Schema() +{ } + +Schema::~Schema() +{ } + +Schema::Schema(const NodePtr &node) : + node_(node) +{ } + +Schema::Schema(Node *node) : + node_(node) +{ } + +RecordSchema::RecordSchema(const std::string &name) : + Schema(new NodeRecord) +{ + node_->setName(name); +} + +void +RecordSchema::addField(const std::string &name, const Schema &fieldSchema) +{ + // add the name first. it will throw if the name is a duplicate, preventing + // the leaf from being added + node_->addName(name); + + node_->addLeaf(fieldSchema.root()); +} + +std::string RecordSchema::getDoc() const +{ + return node_->getDoc(); +} +void RecordSchema::setDoc(const std::string& doc) +{ + node_->setDoc(doc); +} + +EnumSchema::EnumSchema(const std::string &name) : + Schema(new NodeEnum) +{ + node_->setName(name); +} + +void +EnumSchema::addSymbol(const std::string &symbol) +{ + node_->addName(symbol); +} + +ArraySchema::ArraySchema(const Schema &itemsSchema) : + Schema(new NodeArray) +{ + node_->addLeaf(itemsSchema.root()); +} + +ArraySchema::ArraySchema(const ArraySchema &itemsSchema) : + Schema(new NodeArray) +{ + node_->addLeaf(itemsSchema.root()); +} + +MapSchema::MapSchema(const Schema &valuesSchema) : + Schema(new NodeMap) +{ + node_->addLeaf(valuesSchema.root()); +} + +MapSchema::MapSchema(const MapSchema &valuesSchema) : + Schema(new NodeMap) +{ + node_->addLeaf(valuesSchema.root()); +} + +UnionSchema::UnionSchema() : + Schema(new NodeUnion) +{ } + +void +UnionSchema::addType(const Schema &typeSchema) +{ + if(typeSchema.type() == AVRO_UNION) { + throw Exception("Cannot add unions to unions"); + } + + if(typeSchema.type() == AVRO_RECORD) { + // check for duplicate records + size_t types = node_->leaves(); + for(size_t i = 0; i < types; ++i) { + const NodePtr &leaf = node_->leafAt(i); + // TODO, more checks? + if(leaf->type() == AVRO_RECORD && leaf->name() == typeSchema.root()->name()) { + throw Exception("Records in unions cannot have duplicate names"); + } + } + } + + node_->addLeaf(typeSchema.root()); +} + +FixedSchema::FixedSchema(int size, const std::string &name) : + Schema(new NodeFixed) +{ + node_->setFixedSize(size); + node_->setName(name); +} + +SymbolicSchema::SymbolicSchema(const Name &name, const NodePtr& link) : + Schema(new NodeSymbolic(HasName(name), link)) +{ +} + + + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Stream.cc b/contrib/libs/apache/avro/impl/Stream.cc new file mode 100644 index 0000000000..7023f3f213 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Stream.cc @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Stream.hh" +#include <vector> + +namespace avro { + +using std::vector; + +class MemoryInputStream : public InputStream { + const std::vector<uint8_t*>& data_; + const size_t chunkSize_; + const size_t size_; + const size_t available_; + size_t cur_; + size_t curLen_; + + size_t maxLen() { + size_t n = (cur_ == (size_ - 1)) ? available_ : chunkSize_; + if (n == curLen_) { + if (cur_ == (size_ - 1)) { + return 0; + } + ++cur_; + n = (cur_ == (size_ - 1)) ? available_ : chunkSize_; + curLen_ = 0; + } + return n; + } + +public: + MemoryInputStream(const std::vector<uint8_t*>& b, + size_t chunkSize, size_t available) : + data_(b), chunkSize_(chunkSize), size_(b.size()), + available_(available), cur_(0), curLen_(0) { } + + bool next(const uint8_t** data, size_t* len) { + if (size_t n = maxLen()) { + *data = data_[cur_] + curLen_; + *len = n - curLen_; + curLen_ = n; + return true; + } + return false; + } + + void backup(size_t len) { + curLen_ -= len; + } + + void skip(size_t len) { + while (len > 0) { + if (size_t n = maxLen()) { + if ((curLen_ + len) < n) { + n = curLen_ + len; + } + len -= n - curLen_; + curLen_ = n; + } else { + break; + } + } + } + + size_t byteCount() const { + return cur_ * chunkSize_ + curLen_; + } +}; + +class MemoryInputStream2 : public InputStream { + const uint8_t* const data_; + const size_t size_; + size_t curLen_; +public: + MemoryInputStream2(const uint8_t *data, size_t len) + : data_(data), size_(len), curLen_(0) { } + + bool next(const uint8_t** data, size_t* len) { + if (curLen_ == size_) { + return false; + } + *data = &data_[curLen_]; + *len = size_ - curLen_; + curLen_ = size_; + return true; + } + + void backup(size_t len) { + curLen_ -= len; + } + + void skip(size_t len) { + if (len > (size_ - curLen_)) { + len = size_ - curLen_; + } + curLen_ += len; + } + + size_t byteCount() const { + return curLen_; + } +}; + +class MemoryOutputStream : public OutputStream { +public: + const size_t chunkSize_; + std::vector<uint8_t*> data_; + size_t available_; + size_t byteCount_; + + MemoryOutputStream(size_t chunkSize) : chunkSize_(chunkSize), + available_(0), byteCount_(0) { } + ~MemoryOutputStream() { + for (std::vector<uint8_t*>::const_iterator it = data_.begin(); + it != data_.end(); ++it) { + delete[] *it; + } + } + + bool next(uint8_t** data, size_t* len) { + if (available_ == 0) { + data_.push_back(new uint8_t[chunkSize_]); + available_ = chunkSize_; + } + *data = &data_.back()[chunkSize_ - available_]; + *len = available_; + byteCount_ += available_; + available_ = 0; + return true; + } + + void backup(size_t len) { + available_ += len; + byteCount_ -= len; + } + + uint64_t byteCount() const { + return byteCount_; + } + + void flush() { } +}; + +std::unique_ptr<OutputStream> memoryOutputStream(size_t chunkSize) +{ + return std::unique_ptr<OutputStream>(new MemoryOutputStream(chunkSize)); +} + +std::unique_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len) +{ + return std::unique_ptr<InputStream>(new MemoryInputStream2(data, len)); +} + +std::unique_ptr<InputStream> memoryInputStream(const OutputStream& source) +{ + const MemoryOutputStream& mos = + dynamic_cast<const MemoryOutputStream&>(source); + return (mos.data_.empty()) ? + std::unique_ptr<InputStream>(new MemoryInputStream2(0, 0)) : + std::unique_ptr<InputStream>(new MemoryInputStream(mos.data_, + mos.chunkSize_, + (mos.chunkSize_ - mos.available_))); +} + +std::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source) +{ + const MemoryOutputStream& mos = + dynamic_cast<const MemoryOutputStream&>(source); + std::shared_ptr<std::vector<uint8_t> > result(new std::vector<uint8_t>()); + size_t c = mos.byteCount_; + result->reserve(mos.byteCount_); + for (vector<uint8_t*>::const_iterator it = mos.data_.begin(); + it != mos.data_.end(); ++it) { + size_t n = std::min(c, mos.chunkSize_); + std::copy(*it, *it + n, std::back_inserter(*result)); + c -= n; + } + return result; +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/Types.cc b/contrib/libs/apache/avro/impl/Types.cc new file mode 100644 index 0000000000..6c9d702b83 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Types.cc @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> +#include <string> +#include "Types.hh" + +namespace avro { + +namespace strings { +const std::string typeToString[] = { + "string", + "bytes", + "int", + "long", + "float", + "double", + "boolean", + "null", + "record", + "enum", + "array", + "map", + "union", + "fixed", + "symbolic" +}; + +static_assert((sizeof(typeToString)/sizeof(std::string)) == (AVRO_NUM_TYPES+1), + "Incorrect Avro typeToString"); + +} // namespace strings + + +// this static assert exists because a 32 bit integer is used as a bit-flag for each type, +// and it would be a problem for this flag if we ever supported more than 32 types +static_assert(AVRO_NUM_TYPES < 32, "Too many Avro types"); + +const std::string& toString(Type type) +{ + static std::string undefinedType = "Undefined type"; + if (isAvroTypeOrPseudoType(type)) { + return strings::typeToString[type]; + } else { + return undefinedType; + } +} + +std::ostream &operator<< (std::ostream &os, Type type) +{ + if(isAvroTypeOrPseudoType(type)) { + os << strings::typeToString[type]; + } + else { + os << static_cast<int>(type); + } + return os; +} + +std::ostream &operator<< (std::ostream &os, const Null &) +{ + os << "(null value)"; + return os; +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/ValidSchema.cc b/contrib/libs/apache/avro/impl/ValidSchema.cc new file mode 100644 index 0000000000..74a3f845c5 --- /dev/null +++ b/contrib/libs/apache/avro/impl/ValidSchema.cc @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/format.hpp> +#include <cctype> +#include <sstream> + +#include "ValidSchema.hh" +#include "Schema.hh" +#include "Node.hh" + +using std::string; +using std::ostringstream; +using std::make_pair; +using boost::format; +using std::shared_ptr; +using std::static_pointer_cast; + +namespace avro { +typedef std::map<Name, NodePtr> SymbolMap; + +static bool validate(const NodePtr &node, SymbolMap &symbolMap) +{ + if (! node->isValid()) { + throw Exception(format("Schema is invalid, due to bad node of type %1%") + % node->type()); + } + + if (node->hasName()) { + const Name& nm = node->name(); + SymbolMap::iterator it = symbolMap.find(nm); + bool found = it != symbolMap.end() && nm == it->first; + + if (node->type() == AVRO_SYMBOLIC) { + if (! found) { + throw Exception(format("Symbolic name \"%1%\" is unknown") % + node->name()); + } + + shared_ptr<NodeSymbolic> symNode = + static_pointer_cast<NodeSymbolic>(node); + + // if the symbolic link is already resolved, we return true, + // otherwise returning false will force it to be resolved + return symNode->isSet(); + } + + if (found) { + return false; + } + symbolMap.insert(it, make_pair(nm, node)); + } + + node->lock(); + size_t leaves = node->leaves(); + for (size_t i = 0; i < leaves; ++i) { + const NodePtr &leaf(node->leafAt(i)); + + if (! validate(leaf, symbolMap)) { + + // if validate returns false it means a node with this name already + // existed in the map, instead of keeping this node twice in the + // map (which could potentially create circular shared pointer + // links that could not be easily freed), replace this node with a + // symbolic link to the original one. + + node->setLeafToSymbolic(i, symbolMap.find(leaf->name())->second); + } + } + + return true; +} + +static void validate(const NodePtr& p) +{ + SymbolMap m; + validate(p, m); +} + +ValidSchema::ValidSchema(const NodePtr &root) : root_(root) +{ + validate(root_); +} + +ValidSchema::ValidSchema(const Schema &schema) : root_(schema.root()) +{ + validate(root_); +} + +ValidSchema::ValidSchema() : root_(NullSchema().root()) +{ + validate(root_); +} + +void +ValidSchema::setSchema(const Schema &schema) +{ + root_ = schema.root(); + validate(root_); +} + +void +ValidSchema::toJson(std::ostream &os) const +{ + root_->printJson(os, 0); + os << '\n'; +} + +string +ValidSchema::toJson(bool prettyPrint) const +{ + ostringstream oss; + toJson(oss); + if (!prettyPrint) { + return compactSchema(oss.str()); + } + return oss.str(); +} + +void +ValidSchema::toFlatList(std::ostream &os) const +{ + root_->printBasicInfo(os); +} + +/* + * compactSchema compacts and returns a formatted string representation + * of a ValidSchema object by removing the whitespaces outside of the quoted + * field names and values. It can handle the cases where the quoted value is + * in UTF-8 format. Note that this method is not responsible for validating + * the schema. + */ +string ValidSchema::compactSchema(const string& schema) { + bool insideQuote = false; + size_t newPos = 0; + string data(schema.data()); + + for (size_t currentPos = 0; currentPos < schema.size(); currentPos++) { + if (!insideQuote && std::isspace(data[currentPos])) { + // Skip the white spaces outside quotes. + continue; + } + + if (data[currentPos] == '\"') { + // It is valid for a quote to be part of the value for some fields, + // e.g., the "doc" field. In that case, the quote is expected to be + // escaped inside the schema. Since the escape character '\\' could + // be escaped itself, we need to check whether there are an even + // number of consecutive slashes prior to the quote. + int leadingSlashes = 0; + for (int i = newPos - 1; i >= 0; i--) { + if (data[i] == '\\') { + leadingSlashes++; + } else { + break; + } + } + if (leadingSlashes % 2 == 0) { + // Found a real quote which identifies either the start or the + // end of a field name or value. + insideQuote = !insideQuote; + } + } + data[newPos++] = data[currentPos]; + } + + if (insideQuote) { + throw Exception("Schema is not well formed with mismatched quotes"); + } + + if (newPos < schema.size()) { + data.resize(newPos); + } + return data; +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/Validator.cc b/contrib/libs/apache/avro/impl/Validator.cc new file mode 100644 index 0000000000..2e74b06b66 --- /dev/null +++ b/contrib/libs/apache/avro/impl/Validator.cc @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Validator.hh" +#include "ValidSchema.hh" +#include "NodeImpl.hh" + +namespace avro { + +Validator::Validator(const ValidSchema &schema) : + schema_(schema), + nextType_(AVRO_NULL), + expectedTypesFlag_(0), + compoundStarted_(false), + waitingForCount_(false), + count_(0) +{ + setupOperation(schema_.root()); +} + +void +Validator::setWaitingForCount() +{ + waitingForCount_ = true; + count_ = 0; + expectedTypesFlag_ = typeToFlag(AVRO_INT) | typeToFlag(AVRO_LONG); + nextType_ = AVRO_LONG; +} + +void +Validator::enumAdvance() +{ + if(compoundStarted_) { + setWaitingForCount(); + compoundStarted_ = false; + } + else { + waitingForCount_ = false; + compoundStack_.pop_back(); + } +} + +bool +Validator::countingSetup() +{ + bool proceed = true; + if(compoundStarted_) { + setWaitingForCount(); + compoundStarted_ = false; + proceed = false; + } + else if(waitingForCount_) { + waitingForCount_ = false; + if(count_ == 0) { + compoundStack_.pop_back(); + proceed = false; + } + else { + counters_.push_back(static_cast<size_t>(count_)); + } + } + + return proceed; +} + +void +Validator::countingAdvance() +{ + if(countingSetup()) { + + size_t index = (compoundStack_.back().pos)++; + const NodePtr &node = compoundStack_.back().node; + + if(index < node->leaves() ) { + setupOperation(node->leafAt(index)); + } + else { + compoundStack_.back().pos = 0; + int count = --counters_.back(); + if(count == 0) { + counters_.pop_back(); + compoundStarted_ = true; + nextType_ = node->type(); + expectedTypesFlag_ = typeToFlag(nextType_); + } + else { + size_t index = (compoundStack_.back().pos)++; + setupOperation(node->leafAt(index)); + } + } + } +} + +void +Validator::unionAdvance() +{ + if(compoundStarted_) { + setWaitingForCount(); + compoundStarted_ = false; + } + else { + waitingForCount_ = false; + NodePtr node = compoundStack_.back().node; + + if(count_ < static_cast<int64_t>(node->leaves())) { + compoundStack_.pop_back(); + setupOperation(node->leafAt(static_cast<int>(count_))); + } + else { + throw Exception( + boost::format("Union selection out of range, got %1%," \ + " expecting 0-%2%") + % count_ % (node->leaves() -1) + ); + } + } +} + +void +Validator::fixedAdvance() +{ + compoundStarted_ = false; + compoundStack_.pop_back(); +} + +int +Validator::nextSizeExpected() const +{ + return compoundStack_.back().node->fixedSize(); +} + +void +Validator::doAdvance() +{ + typedef void (Validator::*AdvanceFunc)(); + + // only the compound types need advance functions here + static const AdvanceFunc funcs[] = { + 0, // string + 0, // bytes + 0, // int + 0, // long + 0, // float + 0, // double + 0, // bool + 0, // null + &Validator::countingAdvance, // Record is treated like counting with count == 1 + &Validator::enumAdvance, + &Validator::countingAdvance, + &Validator::countingAdvance, + &Validator::unionAdvance, + &Validator::fixedAdvance + }; + static_assert((sizeof(funcs)/sizeof(AdvanceFunc)) == (AVRO_NUM_TYPES), + "Invalid number of advance functions"); + + expectedTypesFlag_ = 0; + // loop until we encounter a next expected type, or we've exited all compound types + while(!expectedTypesFlag_ && !compoundStack_.empty() ) { + + Type type = compoundStack_.back().node->type(); + + AdvanceFunc func = funcs[type]; + + // only compound functions are put on the status stack so it is ok to + // assume that func is not null + assert(func); + + ((this)->*(func))(); + } + + if(compoundStack_.empty()) { + nextType_ = AVRO_NULL; + } +} + +void Validator::advance() +{ + if(!waitingForCount_) { + doAdvance(); + } +} + +void +Validator::setCount(int64_t count) +{ + if(!waitingForCount_) { + throw Exception("Not expecting count"); + } + else if(count_ < 0) { + throw Exception("Count cannot be negative"); + } + count_ = count; + + doAdvance(); +} + +void +Validator::setupFlag(Type type) +{ + // use flags instead of strictly types, so that we can be more lax about the type + // (for example, a long should be able to accept an int type, but not vice versa) + static const flag_t flags[] = { + typeToFlag(AVRO_STRING) | typeToFlag(AVRO_BYTES), + typeToFlag(AVRO_STRING) | typeToFlag(AVRO_BYTES), + typeToFlag(AVRO_INT), + typeToFlag(AVRO_INT) | typeToFlag(AVRO_LONG), + typeToFlag(AVRO_FLOAT), + typeToFlag(AVRO_DOUBLE), + typeToFlag(AVRO_BOOL), + typeToFlag(AVRO_NULL), + typeToFlag(AVRO_RECORD), + typeToFlag(AVRO_ENUM), + typeToFlag(AVRO_ARRAY), + typeToFlag(AVRO_MAP), + typeToFlag(AVRO_UNION), + typeToFlag(AVRO_FIXED) + }; + static_assert((sizeof(flags)/sizeof(flag_t)) == (AVRO_NUM_TYPES), + "Invalid number of avro type flags"); + + expectedTypesFlag_ = flags[type]; +} + +void +Validator::setupOperation(const NodePtr &node) +{ + nextType_ = node->type(); + + if(nextType_ == AVRO_SYMBOLIC) { + NodePtr actualNode = resolveSymbol(node); + assert(actualNode); + setupOperation(actualNode); + return; + } + + assert(nextType_ < AVRO_SYMBOLIC); + + setupFlag(nextType_); + + if(!isPrimitive(nextType_)) { + compoundStack_.push_back(CompoundType(node)); + compoundStarted_ = true; + } +} + +bool +Validator::getCurrentRecordName(std::string &name) const +{ + bool found = false; + name.clear(); + + int idx = -1; + // if the top of the stack is a record I want this record name + if(!compoundStack_.empty() && (isPrimitive(nextType_) || nextType_ == AVRO_RECORD)) { + idx = compoundStack_.size() -1; + } + else { + idx = compoundStack_.size() -2; + } + + if(idx >= 0 && compoundStack_[idx].node->type() == AVRO_RECORD) { + name = compoundStack_[idx].node->name().simpleName(); + found = true; + } + return found; +} + +bool +Validator::getNextFieldName(std::string &name) const +{ + bool found = false; + name.clear(); + int idx = isCompound(nextType_) ? compoundStack_.size()-2 : compoundStack_.size()-1; + if(idx >= 0 && compoundStack_[idx].node->type() == AVRO_RECORD) { + size_t pos = compoundStack_[idx].pos-1; + const NodePtr &node = compoundStack_[idx].node; + if(pos < node->leaves()) { + name = node->nameAt(pos); + found = true; + } + } + return found; +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/Zigzag.cc b/contrib/libs/apache/avro/impl/Zigzag.cc new file mode 100644 index 0000000000..06db5b4e7b --- /dev/null +++ b/contrib/libs/apache/avro/impl/Zigzag.cc @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "Zigzag.hh" + +namespace avro { + +uint64_t +encodeZigzag64(int64_t input) +{ + // cppcheck-suppress shiftTooManyBitsSigned + return ((input << 1) ^ (input >> 63)); +} + +int64_t +decodeZigzag64(uint64_t input) +{ + return static_cast<int64_t>(((input >> 1) ^ -(static_cast<int64_t>(input) & 1))); +} + +uint32_t +encodeZigzag32(int32_t input) +{ + // cppcheck-suppress shiftTooManyBitsSigned + return ((input << 1) ^ (input >> 31)); +} + +int32_t +decodeZigzag32(uint32_t input) +{ + return static_cast<int32_t>(((input >> 1) ^ -(static_cast<int64_t>(input) & 1))); +} + +size_t +encodeInt64(int64_t input, std::array<uint8_t, 10> &output) +{ + // get the zigzag encoding + uint64_t val = encodeZigzag64(input); + + // put values in an array of bytes with variable length encoding + const int mask = 0x7F; + output[0] = val & mask; + size_t bytesOut = 1; + while( val >>=7 ) { + output[bytesOut-1] |= 0x80; + output[bytesOut++] = (val & mask); + } + + return bytesOut; +} + +size_t +encodeInt32(int32_t input, std::array<uint8_t, 5> &output) +{ + // get the zigzag encoding + uint32_t val = encodeZigzag32(input); + + // put values in an array of bytes with variable length encoding + const int mask = 0x7F; + output[0] = val & mask; + size_t bytesOut = 1; + while( val >>=7 ) { + output[bytesOut-1] |= 0x80; + output[bytesOut++] = (val & mask); + } + + return bytesOut; +} + +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/json/JsonDom.cc b/contrib/libs/apache/avro/impl/json/JsonDom.cc new file mode 100644 index 0000000000..ac4d8c9bfc --- /dev/null +++ b/contrib/libs/apache/avro/impl/json/JsonDom.cc @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "JsonDom.hh" + +#include <stdexcept> + +#include <string.h> + +#include "Stream.hh" +#include "JsonIO.hh" + +using std::string; +using boost::format; + +namespace avro { +namespace json { +const char* typeToString(EntityType t) +{ + switch (t) { + case etNull: return "null"; + case etBool: return "bool"; + case etLong: return "long"; + case etDouble: return "double"; + case etString: return "string"; + case etArray: return "array"; + case etObject: return "object"; + default: return "unknown"; + } +} + +Entity readEntity(JsonParser& p) +{ + switch (p.peek()) { + case JsonParser::tkNull: + p.advance(); + return Entity(p.line()); + case JsonParser::tkBool: + p.advance(); + return Entity(p.boolValue(), p.line()); + case JsonParser::tkLong: + p.advance(); + return Entity(p.longValue(), p.line()); + case JsonParser::tkDouble: + p.advance(); + return Entity(p.doubleValue(), p.line()); + case JsonParser::tkString: + p.advance(); + return Entity(std::make_shared<String>(p.rawString()), p.line()); + case JsonParser::tkArrayStart: + { + size_t l = p.line(); + p.advance(); + std::shared_ptr<Array> v = std::make_shared<Array>(); + while (p.peek() != JsonParser::tkArrayEnd) { + v->push_back(readEntity(p)); + } + p.advance(); + return Entity(v, l); + } + case JsonParser::tkObjectStart: + { + size_t l = p.line(); + p.advance(); + std::shared_ptr<Object> v = std::make_shared<Object>(); + while (p.peek() != JsonParser::tkObjectEnd) { + p.advance(); + std::string k = p.stringValue(); + Entity n = readEntity(p); + v->insert(std::make_pair(k, n)); + } + p.advance(); + return Entity(v, l); + } + default: + throw std::domain_error(JsonParser::toString(p.peek())); + } + +} + +Entity loadEntity(const char* text) +{ + return loadEntity(reinterpret_cast<const uint8_t*>(text), ::strlen(text)); +} + +Entity loadEntity(InputStream& in) +{ + JsonParser p; + p.init(in); + return readEntity(p); +} + +Entity loadEntity(const uint8_t* text, size_t len) +{ + std::unique_ptr<InputStream> in = memoryInputStream(text, len); + return loadEntity(*in); +} + +void writeEntity(JsonGenerator<JsonNullFormatter>& g, const Entity& n) +{ + switch (n.type()) { + case etNull: + g.encodeNull(); + break; + case etBool: + g.encodeBool(n.boolValue()); + break; + case etLong: + g.encodeNumber(n.longValue()); + break; + case etDouble: + g.encodeNumber(n.doubleValue()); + break; + case etString: + g.encodeString(n.stringValue()); + break; + case etArray: + { + g.arrayStart(); + const Array& v = n.arrayValue(); + for (Array::const_iterator it = v.begin(); + it != v.end(); ++it) { + writeEntity(g, *it); + } + g.arrayEnd(); + } + break; + case etObject: + { + g.objectStart(); + const Object& v = n.objectValue(); + for (Object::const_iterator it = v.begin(); it != v.end(); ++it) { + g.encodeString(it->first); + writeEntity(g, it->second); + } + g.objectEnd(); + } + break; + } +} + +void Entity::ensureType(EntityType type) const +{ + if (type_ != type) { + format msg = format("Invalid type. Expected \"%1%\" actual %2%") % + typeToString(type) % typeToString(type_); + throw Exception(msg); + } +} + +String Entity::stringValue() const { + ensureType(etString); + return JsonParser::toStringValue(**boost::any_cast<std::shared_ptr<String> >(&value_)); +} + +String Entity::bytesValue() const { + ensureType(etString); + return JsonParser::toBytesValue(**boost::any_cast<std::shared_ptr<String> >(&value_)); +} + +std::string Entity::toString() const +{ + std::unique_ptr<OutputStream> out = memoryOutputStream(); + JsonGenerator<JsonNullFormatter> g; + g.init(*out); + writeEntity(g, *this); + g.flush(); + std::unique_ptr<InputStream> in = memoryInputStream(*out); + const uint8_t *p = 0; + size_t n = 0; + size_t c = 0; + while (in->next(&p, &n)) { + c += n; + } + std::string result; + result.resize(c); + c = 0; + std::unique_ptr<InputStream> in2 = memoryInputStream(*out); + while (in2->next(&p, &n)) { + ::memcpy(&result[c], p, n); + c += n; + } + return result; +} + +} +} + diff --git a/contrib/libs/apache/avro/impl/json/JsonDom.hh b/contrib/libs/apache/avro/impl/json/JsonDom.hh new file mode 100644 index 0000000000..e1f549dfea --- /dev/null +++ b/contrib/libs/apache/avro/impl/json/JsonDom.hh @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef avro_json_JsonDom_hh__ +#define avro_json_JsonDom_hh__ + +#include <iostream> +#include <stdint.h> +#include <map> +#include <string> +#include <vector> +#include <memory> + +#include "boost/any.hpp" +#include "Config.hh" + +namespace avro { + +class AVRO_DECL InputStream; + +namespace json { +class Entity; + +typedef bool Bool; +typedef int64_t Long; +typedef double Double; +typedef std::string String; +typedef std::vector<Entity> Array; +typedef std::map<std::string, Entity> Object; + +class AVRO_DECL JsonParser; +class JsonNullFormatter; + +template<typename F = JsonNullFormatter> +class AVRO_DECL JsonGenerator; + +enum EntityType { + etNull, + etBool, + etLong, + etDouble, + etString, + etArray, + etObject +}; + +const char* typeToString(EntityType t); + +class AVRO_DECL Entity { + EntityType type_; + boost::any value_; + size_t line_; // can't be const else noncopyable... + + void ensureType(EntityType) const; +public: + Entity(size_t line = 0) : type_(etNull), line_(line) { } + Entity(Bool v, size_t line = 0) : type_(etBool), value_(v), line_(line) { } + Entity(Long v, size_t line = 0) : type_(etLong), value_(v), line_(line) { } + Entity(Double v, size_t line = 0) : type_(etDouble), value_(v), line_(line) { } + Entity(const std::shared_ptr<String>& v, size_t line = 0) : type_(etString), value_(v), line_(line) { } + Entity(const std::shared_ptr<Array>& v, size_t line = 0) : type_(etArray), value_(v), line_(line) { } + Entity(const std::shared_ptr<Object>& v, size_t line = 0) : type_(etObject), value_(v), line_(line) { } + + EntityType type() const { return type_; } + + size_t line() const { return line_; } + + Bool boolValue() const { + ensureType(etBool); + return boost::any_cast<Bool>(value_); + } + + Long longValue() const { + ensureType(etLong); + return boost::any_cast<Long>(value_); + } + + Double doubleValue() const { + ensureType(etDouble); + return boost::any_cast<Double>(value_); + } + + String stringValue() const; + + String bytesValue() const; + + const Array& arrayValue() const { + ensureType(etArray); + return **boost::any_cast<std::shared_ptr<Array> >(&value_); + } + + const Object& objectValue() const { + ensureType(etObject); + return **boost::any_cast<std::shared_ptr<Object> >(&value_); + } + + std::string toString() const; +}; + +template <typename T> +struct type_traits { +}; + +template <> struct type_traits<bool> { + static EntityType type() { return etBool; } + static const char* name() { return "bool"; } +}; + +template <> struct type_traits<int64_t> { + static EntityType type() { return etLong; } + static const char* name() { return "long"; } +}; + +template <> struct type_traits<double> { + static EntityType type() { return etDouble; } + static const char* name() { return "double"; } +}; + +template <> struct type_traits<std::string> { + static EntityType type() { return etString; } + static const char* name() { return "string"; } +}; + +template <> struct type_traits<std::vector<Entity> > { + static EntityType type() { return etArray; } + static const char* name() { return "array"; } +}; + +template <> struct type_traits<std::map<std::string, Entity> > { + static EntityType type() { return etObject; } + static const char* name() { return "object"; } +}; + +AVRO_DECL Entity readEntity(JsonParser& p); + +AVRO_DECL Entity loadEntity(InputStream& in); +AVRO_DECL Entity loadEntity(const char* text); +AVRO_DECL Entity loadEntity(const uint8_t* text, size_t len); + +void writeEntity(JsonGenerator<JsonNullFormatter>& g, const Entity& n); + +} +} + +#endif + + diff --git a/contrib/libs/apache/avro/impl/json/JsonIO.cc b/contrib/libs/apache/avro/impl/json/JsonIO.cc new file mode 100644 index 0000000000..c11a722ad4 --- /dev/null +++ b/contrib/libs/apache/avro/impl/json/JsonIO.cc @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "JsonIO.hh" + +namespace avro { +namespace json { + +using std::ostringstream; +using std::string; + +const char* const +JsonParser::tokenNames[] = { + "Null", + "Bool", + "Integer", + "Double", + "String", + "Array start", + "Array end", + "Object start", + "Object end", +}; + +char JsonParser::next() +{ + char ch = hasNext ? nextChar : ' '; + while (isspace(ch)) { + if (ch == '\n') { + line_++; + } + ch = in_.read(); + } + hasNext = false; + return ch; +} + +void JsonParser::expectToken(Token tk) +{ + if (advance() != tk) { + if (tk == tkDouble) { + if(cur() == tkString + && (sv == "Infinity" || sv == "-Infinity" || sv == "NaN")) { + curToken = tkDouble; + dv = sv == "Infinity" ? + std::numeric_limits<double>::infinity() : + sv == "-Infinity" ? + -std::numeric_limits<double>::infinity() : + std::numeric_limits<double>::quiet_NaN(); + return; + } else if (cur() == tkLong) { + dv = double(lv); + return; + } + } + ostringstream oss; + oss << "Incorrect token in the stream. Expected: " + << JsonParser::toString(tk) << ", found " + << JsonParser::toString(cur()); + throw Exception(oss.str()); + } +} + +JsonParser::Token JsonParser::doAdvance() +{ + char ch = next(); + if (ch == ']') { + if (curState == stArray0 || curState == stArrayN) { + curState = stateStack.top(); + stateStack.pop(); + return tkArrayEnd; + } else { + throw unexpected(ch); + } + } else if (ch == '}') { + if (curState == stObject0 || curState == stObjectN) { + curState = stateStack.top(); + stateStack.pop(); + return tkObjectEnd; + } else { + throw unexpected(ch); + } + } else if (ch == ',') { + if (curState != stObjectN && curState != stArrayN) { + throw unexpected(ch); + } + if (curState == stObjectN) { + curState = stObject0; + } + ch = next(); + } else if (ch == ':') { + if (curState != stKey) { + throw unexpected(ch); + } + curState = stObjectN; + ch = next(); + } + + if (curState == stObject0) { + if (ch != '"') { + throw unexpected(ch); + } + curState = stKey; + } else if (curState == stArray0) { + curState = stArrayN; + } + + switch (ch) { + case '[': + stateStack.push(curState); + curState = stArray0; + return tkArrayStart; + case '{': + stateStack.push(curState); + curState = stObject0; + return tkObjectStart; + case '"': + return tryString(); + case 't': + bv = true; + return tryLiteral("rue", 3, tkBool); + case 'f': + bv = false; + return tryLiteral("alse", 4, tkBool); + case 'n': + return tryLiteral("ull", 3, tkNull); + default: + if (isdigit(ch) || ch == '-') { + return tryNumber(ch); + } else { + throw unexpected(ch); + } + } +} + +JsonParser::Token JsonParser::tryNumber(char ch) +{ + sv.clear(); + sv.push_back(ch); + + hasNext = false; + int state = (ch == '-') ? 0 : (ch == '0') ? 1 : 2; + for (; ;) { + switch (state) { + case 0: + if (in_.hasMore()) { + ch = in_.read(); + if (isdigit(ch)) { + state = (ch == '0') ? 1 : 2; + sv.push_back(ch); + continue; + } + hasNext = true; + } + break; + case 1: + if (in_.hasMore()) { + ch = in_.read(); + if (ch == '.') { + state = 3; + sv.push_back(ch); + continue; + } else if (ch == 'e' || ch == 'E') { + sv.push_back(ch); + state = 5; + continue; + } + hasNext = true; + } + break; + case 2: + if (in_.hasMore()) { + ch = in_.read(); + if (isdigit(ch)) { + sv.push_back(ch); + continue; + } else if (ch == '.') { + state = 3; + sv.push_back(ch); + continue; + } else if (ch == 'e' || ch == 'E') { + sv.push_back(ch); + state = 5; + continue; + } + hasNext = true; + } + break; + case 3: + case 6: + if (in_.hasMore()) { + ch = in_.read(); + if (isdigit(ch)) { + sv.push_back(ch); + state++; + continue; + } + hasNext = true; + } + break; + case 4: + if (in_.hasMore()) { + ch = in_.read(); + if (isdigit(ch)) { + sv.push_back(ch); + continue; + } else if (ch == 'e' || ch == 'E') { + sv.push_back(ch); + state = 5; + continue; + } + hasNext = true; + } + break; + case 5: + if (in_.hasMore()) { + ch = in_.read(); + if (ch == '+' || ch == '-') { + sv.push_back(ch); + state = 6; + continue; + } else if (isdigit(ch)) { + sv.push_back(ch); + state = 7; + continue; + } + hasNext = true; + } + break; + case 7: + if (in_.hasMore()) { + ch = in_.read(); + if (isdigit(ch)) { + sv.push_back(ch); + continue; + } + hasNext = true; + } + break; + } + if (state == 1 || state == 2 || state == 4 || state == 7) { + if (hasNext) { + nextChar = ch; + } + std::istringstream iss(sv); + if (state == 1 || state == 2) { + iss >> lv; + return tkLong; + } else { + iss >> dv; + return tkDouble; + } + } else { + if (hasNext) { + throw unexpected(ch); + } else { + throw Exception("Unexpected EOF"); + } + } + } +} + +JsonParser::Token JsonParser::tryString() +{ + sv.clear(); + for ( ; ;) { + char ch = in_.read(); + if (ch == '"') { + return tkString; + } else if (ch == '\\') { + ch = in_.read(); + switch (ch) { + case '"': + case '\\': + case '/': + case 'b': + case 'f': + case 'n': + case 'r': + case 't': + sv.push_back('\\'); + sv.push_back(ch); + break; + case 'u': + case 'U': + { + uint32_t n = 0; + char e[4]; + in_.readBytes(reinterpret_cast<uint8_t*>(e), 4); + sv.push_back('\\'); + sv.push_back(ch); + for (int i = 0; i < 4; i++) { + n *= 16; + char c = e[i]; + if (isdigit(c) || + (c >= 'a' && c <= 'f') || + (c >= 'A' && c <= 'F')) { + sv.push_back(c); + } else { + throw unexpected(c); + } + } + } + break; + default: + throw unexpected(ch); + } + } else { + sv.push_back(ch); + } + } +} + + +string JsonParser::decodeString(const string& s, bool binary) +{ + string result; + for (string::const_iterator it = s.begin(); it != s.end(); ++it) { + char ch = *it; + if (ch == '\\') { + ch = *++it; + switch (ch) { + case '"': + case '\\': + case '/': + result.push_back(ch); + continue; + case 'b': + result.push_back('\b'); + continue; + case 'f': + result.push_back('\f'); + continue; + case 'n': + result.push_back('\n'); + continue; + case 'r': + result.push_back('\r'); + continue; + case 't': + result.push_back('\t'); + continue; + case 'u': + case 'U': + { + uint32_t n = 0; + char e[4]; + for (int i = 0; i < 4; i++) { + n *= 16; + char c = *++it; + e[i] = c; + if (isdigit(c)) { + n += c - '0'; + } else if (c >= 'a' && c <= 'f') { + n += c - 'a' + 10; + } else if (c >= 'A' && c <= 'F') { + n += c - 'A' + 10; + } + } + if (binary) { + if (n > 0xff) { + throw Exception(boost::format( + "Invalid byte for binary: %1%%2%") % ch % + string(e, 4)); + } else { + result.push_back(n); + continue; + } + } + if (n < 0x80) { + result.push_back(n); + } else if (n < 0x800) { + result.push_back((n >> 6) | 0xc0); + result.push_back((n & 0x3f) | 0x80); + } else if (n < 0x10000) { + result.push_back((n >> 12) | 0xe0); + result.push_back(((n >> 6)& 0x3f) | 0x80); + result.push_back((n & 0x3f) | 0x80); + } else if (n < 110000) { + result.push_back((n >> 18) | 0xf0); + result.push_back(((n >> 12)& 0x3f) | 0x80); + result.push_back(((n >> 6)& 0x3f) | 0x80); + result.push_back((n & 0x3f) | 0x80); + } else { + throw Exception(boost::format( + "Invalid unicode value: %1%i%2%") % ch % + string(e, 4)); + } + } + continue; + } + } else { + result.push_back(ch); + } + } + return result; +} + +Exception JsonParser::unexpected(unsigned char c) +{ + std::ostringstream oss; + oss << "Unexpected character in json " << toHex(c / 16) << toHex(c % 16); + return Exception(oss.str()); +} + +JsonParser::Token JsonParser::tryLiteral(const char exp[], size_t n, Token tk) +{ + char c[100]; + in_.readBytes(reinterpret_cast<uint8_t*>(c), n); + for (size_t i = 0; i < n; ++i) { + if (c[i] != exp[i]) { + throw unexpected(c[i]); + } + } + if (in_.hasMore()) { + nextChar = in_.read(); + if (isdigit(nextChar) || isalpha(nextChar)) { + throw unexpected(nextChar); + } + hasNext = true; + } + return tk; +} + +} +} + diff --git a/contrib/libs/apache/avro/impl/json/JsonIO.hh b/contrib/libs/apache/avro/impl/json/JsonIO.hh new file mode 100644 index 0000000000..5ae7ae07dc --- /dev/null +++ b/contrib/libs/apache/avro/impl/json/JsonIO.hh @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef avro_json_JsonIO_hh__ +#define avro_json_JsonIO_hh__ + +#include <locale> +#include <stack> +#include <string> +#include <sstream> +#include <boost/math/special_functions/fpclassify.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/utility.hpp> + +#include "Config.hh" +#include "Stream.hh" + +namespace avro { +namespace json { + +inline char toHex(unsigned int n) { + return (n < 10) ? (n + '0') : (n + 'a' - 10); +} + + +class AVRO_DECL JsonParser : boost::noncopyable { +public: + enum Token { + tkNull, + tkBool, + tkLong, + tkDouble, + tkString, + tkArrayStart, + tkArrayEnd, + tkObjectStart, + tkObjectEnd + }; + + size_t line() const { return line_; } + +private: + enum State { + stValue, // Expect a data type + stArray0, // Expect a data type or ']' + stArrayN, // Expect a ',' or ']' + stObject0, // Expect a string or a '}' + stObjectN, // Expect a ',' or '}' + stKey // Expect a ':' + }; + std::stack<State> stateStack; + State curState; + bool hasNext; + char nextChar; + bool peeked; + + StreamReader in_; + Token curToken; + bool bv; + int64_t lv; + double dv; + std::string sv; + size_t line_; + + Token doAdvance(); + Token tryLiteral(const char exp[], size_t n, Token tk); + Token tryNumber(char ch); + Token tryString(); + Exception unexpected(unsigned char ch); + char next(); + + static std::string decodeString(const std::string& s, bool binary); + +public: + JsonParser() : curState(stValue), hasNext(false), peeked(false), line_(1) { } + + void init(InputStream& is) { + // Clear by swapping with an empty stack + std::stack<State>().swap(stateStack); + curState = stValue; + hasNext = false; + peeked = false; + line_ = 1; + in_.reset(is); + } + + Token advance() { + if (! peeked) { + curToken = doAdvance(); + } else { + peeked = false; + } + return curToken; + } + + Token peek() { + if (! peeked) { + curToken = doAdvance(); + peeked = true; + } + return curToken; + } + + void expectToken(Token tk); + + bool boolValue() const { + return bv; + } + + Token cur() const { + return curToken; + } + + double doubleValue() const { + return dv; + } + + int64_t longValue() const { + return lv; + } + + const std::string& rawString() const { + return sv; + } + + std::string stringValue() const { + return decodeString(sv, false); + } + + std::string bytesValue() const { + return decodeString(sv, true); + } + + void drain() { + if (!stateStack.empty() || peeked) { + throw Exception("Invalid state for draining"); + } + in_.drain(hasNext); + hasNext = false; + } + + /** + * Return UTF-8 encoded string value. + */ + static std::string toStringValue(const std::string& sv) { + return decodeString(sv, false); + } + + /** + * Return byte-encoded string value. It is an error if the input + * JSON string contained unicode characters more than "\u00ff'. + */ + static std::string toBytesValue(const std::string& sv) { + return decodeString(sv, true); + } + + static const char* const tokenNames[]; + + static const char* toString(Token tk) { + return tokenNames[tk]; + } +}; + +class AVRO_DECL JsonNullFormatter { +public: + JsonNullFormatter(StreamWriter&) { } + + void handleObjectStart() {} + void handleObjectEnd() {} + void handleValueEnd() {} + void handleColon() {} +}; + +class AVRO_DECL JsonPrettyFormatter { + StreamWriter& out_; + size_t level_; + std::vector<uint8_t> indent_; + + static const int CHARS_PER_LEVEL = 2; + + void printIndent() { + size_t charsToIndent = level_ * CHARS_PER_LEVEL; + if (indent_.size() < charsToIndent) { + indent_.resize(charsToIndent * 2, ' '); + } + out_.writeBytes(indent_.data(), charsToIndent); + } +public: + JsonPrettyFormatter(StreamWriter& out) : out_(out), level_(0), indent_(10, ' ') { } + + void handleObjectStart() { + out_.write('\n'); + ++level_; + printIndent(); + } + + void handleObjectEnd() { + out_.write('\n'); + --level_; + printIndent(); + } + + void handleValueEnd() { + out_.write('\n'); + printIndent(); + } + + void handleColon() { + out_.write(' '); + } +}; + +template <class F> +class AVRO_DECL JsonGenerator { + StreamWriter out_; + F formatter_; + enum State { + stStart, + stArray0, + stArrayN, + stMap0, + stMapN, + stKey, + }; + + std::stack<State> stateStack; + State top; + + void write(const char *b, const char* p) { + if (b != p) { + out_.writeBytes(reinterpret_cast<const uint8_t*>(b), p - b); + } + } + + void escape(char c, const char* b, const char *p) { + write(b, p); + out_.write('\\'); + out_.write(c); + } + + void escapeCtl(char c) { + escapeUnicode(static_cast<uint8_t>(c)); + } + + void writeHex(char c) { + out_.write(toHex((static_cast<unsigned char>(c)) / 16)); + out_.write(toHex((static_cast<unsigned char>(c)) % 16)); + } + + void escapeUnicode(uint32_t c) { + out_.write('\\'); + out_.write('u'); + writeHex((c >> 8) & 0xff); + writeHex(c & 0xff); + } + void doEncodeString(const char* b, size_t len, bool binary) { + const char* e = b + len; + out_.write('"'); + for (const char* p = b; p != e; p++) { + if ((*p & 0x80) != 0) { + write(b, p); + if (binary) { + escapeCtl(*p); + } else if ((*p & 0x40) == 0) { + throw Exception("Invalid UTF-8 sequence"); + } else { + int more = 1; + uint32_t value = 0; + if ((*p & 0x20) != 0) { + more++; + if ((*p & 0x10) != 0) { + more++; + if ((*p & 0x08) != 0) { + throw Exception("Invalid UTF-8 sequence"); + } else { + value = *p & 0x07; + } + } else { + value = *p & 0x0f; + } + } else { + value = *p & 0x1f; + } + for (int i = 0; i < more; ++i) { + if (++p == e || (*p & 0xc0) != 0x80) { + throw Exception("Invalid UTF-8 sequence"); + } + value <<= 6; + value |= *p & 0x3f; + } + escapeUnicode(value); + } + } else { + switch (*p) { + case '\\': + case '"': + case '/': + escape(*p, b, p); + break; + case '\b': + escape('b', b, p); + break; + case '\f': + escape('f', b, p); + break; + case '\n': + escape('n', b, p); + break; + case '\r': + escape('r', b, p); + break; + case '\t': + escape('t', b, p); + break; + default: + if (std::iscntrl(*p, std::locale::classic())) { + write(b, p); + escapeCtl(*p); + break; + } else { + continue; + } + } + } + b = p + 1; + } + write(b, e); + out_.write('"'); + } + + void sep() { + if (top == stArrayN) { + out_.write(','); + formatter_.handleValueEnd(); + } else if (top == stArray0) { + top = stArrayN; + } + } + + void sep2() { + if (top == stKey) { + top = stMapN; + } + } + +public: + JsonGenerator() : formatter_(out_), top(stStart) { } + + void init(OutputStream& os) { + out_.reset(os); + } + + void flush() { + out_.flush(); + } + + int64_t byteCount() const { + return out_.byteCount(); + } + + void encodeNull() { + sep(); + out_.writeBytes(reinterpret_cast<const uint8_t*>("null"), 4); + sep2(); + } + + void encodeBool(bool b) { + sep(); + if (b) { + out_.writeBytes(reinterpret_cast<const uint8_t*>("true"), 4); + } else { + out_.writeBytes(reinterpret_cast<const uint8_t*>("false"), 5); + } + sep2(); + } + + template <typename T> + void encodeNumber(T t) { + sep(); + std::ostringstream oss; + oss << boost::lexical_cast<std::string>(t); + const std::string& s = oss.str(); + out_.writeBytes(reinterpret_cast<const uint8_t*>(s.data()), s.size()); + sep2(); + } + + void encodeNumber(double t) { + sep(); + std::ostringstream oss; + if (boost::math::isfinite(t)) { + oss << boost::lexical_cast<std::string>(t); + } else if (boost::math::isnan(t)) { + oss << "NaN"; + } else if (t == std::numeric_limits<double>::infinity()) { + oss << "Infinity"; + } else { + oss << "-Infinity"; + } + const std::string& s = oss.str(); + out_.writeBytes(reinterpret_cast<const uint8_t*>(s.data()), s.size()); + sep2(); + } + + + void encodeString(const std::string& s) { + if (top == stMap0) { + top = stKey; + } else if (top == stMapN) { + out_.write(','); + formatter_.handleValueEnd(); + top = stKey; + } else if (top == stKey) { + top = stMapN; + } else { + sep(); + } + doEncodeString(s.c_str(), s.size(), false); + if (top == stKey) { + out_.write(':'); + formatter_.handleColon(); + } + } + + void encodeBinary(const uint8_t* bytes, size_t len) { + sep(); + doEncodeString(reinterpret_cast<const char *>(bytes), len, true); + sep2(); + } + + void arrayStart() { + sep(); + stateStack.push(top); + top = stArray0; + out_.write('['); + formatter_.handleObjectStart(); + } + + void arrayEnd() { + top = stateStack.top(); + stateStack.pop(); + formatter_.handleObjectEnd(); + out_.write(']'); + sep2(); + } + + void objectStart() { + sep(); + stateStack.push(top); + top = stMap0; + out_.write('{'); + formatter_.handleObjectStart(); + } + + void objectEnd() { + top = stateStack.top(); + stateStack.pop(); + formatter_.handleObjectEnd(); + out_.write('}'); + sep2(); + } + +}; + +} +} + +#endif diff --git a/contrib/libs/apache/avro/impl/parsing/JsonCodec.cc b/contrib/libs/apache/avro/impl/parsing/JsonCodec.cc new file mode 100644 index 0000000000..8bca2984ae --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/JsonCodec.cc @@ -0,0 +1,718 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define __STDC_LIMIT_MACROS + +#include <string> +#include <map> +#include <algorithm> +#include <ctype.h> +#include <memory> +#include <boost/math/special_functions/fpclassify.hpp> + +#include "ValidatingCodec.hh" +#include "Symbol.hh" +#include "ValidSchema.hh" +#include "Decoder.hh" +#include "Encoder.hh" +#include "NodeImpl.hh" + +#include "../json/JsonIO.hh" + +namespace avro { + +namespace parsing { + +using std::make_shared; + +using std::map; +using std::vector; +using std::string; +using std::reverse; +using std::ostringstream; +using std::istringstream; + +using avro::json::JsonParser; +using avro::json::JsonGenerator; +using avro::json::JsonNullFormatter; + +class JsonGrammarGenerator : public ValidatingGrammarGenerator { + ProductionPtr doGenerate(const NodePtr& n, + std::map<NodePtr, ProductionPtr> &m); +}; + +static std::string nameOf(const NodePtr& n) +{ + if (n->hasName()) { + return n->name(); + } + std::ostringstream oss; + oss << n->type(); + return oss.str(); +} + +ProductionPtr JsonGrammarGenerator::doGenerate(const NodePtr& n, + std::map<NodePtr, ProductionPtr> &m) { + switch (n->type()) { + case AVRO_NULL: + case AVRO_BOOL: + case AVRO_INT: + case AVRO_LONG: + case AVRO_FLOAT: + case AVRO_DOUBLE: + case AVRO_STRING: + case AVRO_BYTES: + case AVRO_FIXED: + case AVRO_ARRAY: + case AVRO_MAP: + case AVRO_SYMBOLIC: + return ValidatingGrammarGenerator::doGenerate(n, m); + case AVRO_RECORD: + { + ProductionPtr result = make_shared<Production>(); + + m.erase(n); + + size_t c = n->leaves(); + result->reserve(2 + 2 * c); + result->push_back(Symbol::recordStartSymbol()); + for (size_t i = 0; i < c; ++i) { + const NodePtr& leaf = n->leafAt(i); + ProductionPtr v = doGenerate(leaf, m); + result->push_back(Symbol::fieldSymbol(n->nameAt(i))); + copy(v->rbegin(), v->rend(), back_inserter(*result)); + } + result->push_back(Symbol::recordEndSymbol()); + reverse(result->begin(), result->end()); + + m[n] = result; + return make_shared<Production>(1, Symbol::indirect(result)); + } + case AVRO_ENUM: + { + vector<string> nn; + size_t c = n->names(); + nn.reserve(c); + for (size_t i = 0; i < c; ++i) { + nn.push_back(n->nameAt(i)); + } + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::nameListSymbol(nn)); + result->push_back(Symbol::enumSymbol()); + m[n] = result; + return result; + } + case AVRO_UNION: + { + size_t c = n->leaves(); + + vector<ProductionPtr> vv; + vv.reserve(c); + + vector<string> names; + names.reserve(c); + + for (size_t i = 0; i < c; ++i) { + const NodePtr& nn = n->leafAt(i); + ProductionPtr v = doGenerate(nn, m); + if (nn->type() != AVRO_NULL) { + ProductionPtr v2 = make_shared<Production>(); + v2->push_back(Symbol::recordEndSymbol()); + copy(v->begin(), v->end(), back_inserter(*v2)); + v.swap(v2); + } + vv.push_back(v); + names.push_back(nameOf(nn)); + } + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::alternative(vv)); + result->push_back(Symbol::nameListSymbol(names)); + result->push_back(Symbol::unionSymbol()); + return result; + } + default: + throw Exception("Unknown node type"); + } +} + +static void expectToken(JsonParser& in, JsonParser::Token tk) +{ + in.expectToken(tk); +} + +class JsonDecoderHandler { + JsonParser& in_; +public: + JsonDecoderHandler(JsonParser& p) : in_(p) { } + size_t handle(const Symbol& s) { + switch (s.kind()) { + case Symbol::sRecordStart: + expectToken(in_, JsonParser::tkObjectStart); + break; + case Symbol::sRecordEnd: + expectToken(in_, JsonParser::tkObjectEnd); + break; + case Symbol::sField: + expectToken(in_, JsonParser::tkString); + if (s.extra<string>() != in_.stringValue()) { + throw Exception("Incorrect field"); + } + break; + default: + break; + } + return 0; + } +}; + +template <typename P> +class JsonDecoder : public Decoder { + JsonParser in_; + JsonDecoderHandler handler_; + P parser_; + + void init(InputStream& is); + void decodeNull(); + bool decodeBool(); + int32_t decodeInt(); + int64_t decodeLong(); + float decodeFloat(); + double decodeDouble(); + void decodeString(string& value); + void skipString(); + void decodeBytes(vector<uint8_t>& value); + void skipBytes(); + void decodeFixed(size_t n, vector<uint8_t>& value); + void skipFixed(size_t n); + size_t decodeEnum(); + size_t arrayStart(); + size_t arrayNext(); + size_t skipArray(); + size_t mapStart(); + size_t mapNext(); + size_t skipMap(); + size_t decodeUnionIndex(); + + void expect(JsonParser::Token tk); + void skipComposite(); + void drain(); +public: + + JsonDecoder(const ValidSchema& s) : + handler_(in_), + parser_(JsonGrammarGenerator().generate(s), NULL, handler_) { } + +}; + +template <typename P> +void JsonDecoder<P>::init(InputStream& is) +{ + in_.init(is); + parser_.reset(); +} + +template <typename P> +void JsonDecoder<P>::expect(JsonParser::Token tk) +{ + expectToken(in_, tk); +} + +template <typename P> +void JsonDecoder<P>::decodeNull() +{ + parser_.advance(Symbol::sNull); + expect(JsonParser::tkNull); +} + +template <typename P> +bool JsonDecoder<P>::decodeBool() +{ + parser_.advance(Symbol::sBool); + expect(JsonParser::tkBool); + bool result = in_.boolValue(); + return result; +} + +template <typename P> +int32_t JsonDecoder<P>::decodeInt() +{ + parser_.advance(Symbol::sInt); + expect(JsonParser::tkLong); + int64_t result = in_.longValue(); + if (result < INT32_MIN || result > INT32_MAX) { + throw Exception(boost::format("Value out of range for Avro int: %1%") + % result); + } + return static_cast<int32_t>(result); +} + +template <typename P> +int64_t JsonDecoder<P>::decodeLong() +{ + parser_.advance(Symbol::sLong); + expect(JsonParser::tkLong); + int64_t result = in_.longValue(); + return result; +} + +template <typename P> +float JsonDecoder<P>::decodeFloat() +{ + parser_.advance(Symbol::sFloat); + expect(JsonParser::tkDouble); + double result = in_.doubleValue(); + return static_cast<float>(result); +} + +template <typename P> +double JsonDecoder<P>::decodeDouble() +{ + parser_.advance(Symbol::sDouble); + expect(JsonParser::tkDouble); + double result = in_.doubleValue(); + return result; +} + +template <typename P> +void JsonDecoder<P>::decodeString(string& value) +{ + parser_.advance(Symbol::sString); + expect(JsonParser::tkString); + value = in_.stringValue(); +} + +template <typename P> +void JsonDecoder<P>::skipString() +{ + parser_.advance(Symbol::sString); + expect(JsonParser::tkString); +} + +static vector<uint8_t> toBytes(const string& s) +{ + return vector<uint8_t>(s.begin(), s.end()); +} + +template <typename P> +void JsonDecoder<P>::decodeBytes(vector<uint8_t>& value ) +{ + parser_.advance(Symbol::sBytes); + expect(JsonParser::tkString); + value = toBytes(in_.bytesValue()); +} + +template <typename P> +void JsonDecoder<P>::skipBytes() +{ + parser_.advance(Symbol::sBytes); + expect(JsonParser::tkString); +} + +template <typename P> +void JsonDecoder<P>::decodeFixed(size_t n, vector<uint8_t>& value) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(n); + expect(JsonParser::tkString); + value = toBytes(in_.bytesValue()); + if (value.size() != n) { + throw Exception("Incorrect value for fixed"); + } +} + +template <typename P> +void JsonDecoder<P>::skipFixed(size_t n) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(n); + expect(JsonParser::tkString); + vector<uint8_t> result = toBytes(in_.bytesValue()); + if (result.size() != n) { + throw Exception("Incorrect value for fixed"); + } +} + +template <typename P> +size_t JsonDecoder<P>::decodeEnum() +{ + parser_.advance(Symbol::sEnum); + expect(JsonParser::tkString); + size_t result = parser_.indexForName(in_.stringValue()); + return result; +} + +template <typename P> +size_t JsonDecoder<P>::arrayStart() +{ + parser_.advance(Symbol::sArrayStart); + parser_.pushRepeatCount(0); + expect(JsonParser::tkArrayStart); + return arrayNext(); +} + +template <typename P> +size_t JsonDecoder<P>::arrayNext() +{ + parser_.processImplicitActions(); + if (in_.peek() == JsonParser::tkArrayEnd) { + in_.advance(); + parser_.popRepeater(); + parser_.advance(Symbol::sArrayEnd); + return 0; + } + parser_.nextRepeatCount(1); + return 1; +} + +template<typename P> +void JsonDecoder<P>::skipComposite() +{ + size_t level = 0; + for (; ;) { + switch (in_.advance()) { + case JsonParser::tkArrayStart: + case JsonParser::tkObjectStart: + ++level; + continue; + case JsonParser::tkArrayEnd: + case JsonParser::tkObjectEnd: + if (level == 0) { + return; + } + --level; + continue; + default: + continue; + } + } +} + +template<typename P> +void JsonDecoder<P>::drain() +{ + parser_.processImplicitActions(); + in_.drain(); +} + +template <typename P> +size_t JsonDecoder<P>::skipArray() +{ + parser_.advance(Symbol::sArrayStart); + parser_.pop(); + parser_.advance(Symbol::sArrayEnd); + expect(JsonParser::tkArrayStart); + skipComposite(); + return 0; +} + +template <typename P> +size_t JsonDecoder<P>::mapStart() +{ + parser_.advance(Symbol::sMapStart); + parser_.pushRepeatCount(0); + expect(JsonParser::tkObjectStart); + return mapNext(); +} + +template <typename P> +size_t JsonDecoder<P>::mapNext() +{ + parser_.processImplicitActions(); + if (in_.peek() == JsonParser::tkObjectEnd) { + in_.advance(); + parser_.popRepeater(); + parser_.advance(Symbol::sMapEnd); + return 0; + } + parser_.nextRepeatCount(1); + return 1; +} + +template <typename P> +size_t JsonDecoder<P>::skipMap() +{ + parser_.advance(Symbol::sMapStart); + parser_.pop(); + parser_.advance(Symbol::sMapEnd); + expect(JsonParser::tkObjectStart); + skipComposite(); + return 0; +} + +template <typename P> +size_t JsonDecoder<P>::decodeUnionIndex() +{ + parser_.advance(Symbol::sUnion); + + size_t result; + if (in_.peek() == JsonParser::tkNull) { + result = parser_.indexForName("null"); + } else { + expect(JsonParser::tkObjectStart); + expect(JsonParser::tkString); + result = parser_.indexForName(in_.stringValue()); + } + parser_.selectBranch(result); + return result; +} + +template<typename F = JsonNullFormatter> +class JsonHandler { + JsonGenerator<F>& generator_; +public: + JsonHandler(JsonGenerator<F>& g) : generator_(g) { } + size_t handle(const Symbol& s) { + switch (s.kind()) { + case Symbol::sRecordStart: + generator_.objectStart(); + break; + case Symbol::sRecordEnd: + generator_.objectEnd(); + break; + case Symbol::sField: + generator_.encodeString(s.extra<string>()); + break; + default: + break; + } + return 0; + } +}; + +template <typename P, typename F = JsonNullFormatter> +class JsonEncoder : public Encoder { + JsonGenerator<F> out_; + JsonHandler<F> handler_; + P parser_; + + void init(OutputStream& os); + void flush(); + int64_t byteCount() const; + void encodeNull(); + void encodeBool(bool b); + void encodeInt(int32_t i); + void encodeLong(int64_t l); + void encodeFloat(float f); + void encodeDouble(double d); + void encodeString(const std::string& s); + void encodeBytes(const uint8_t *bytes, size_t len); + void encodeFixed(const uint8_t *bytes, size_t len); + void encodeEnum(size_t e); + void arrayStart(); + void arrayEnd(); + void mapStart(); + void mapEnd(); + void setItemCount(size_t count); + void startItem(); + void encodeUnionIndex(size_t e); +public: + JsonEncoder(const ValidSchema& schema) : + handler_(out_), + parser_(JsonGrammarGenerator().generate(schema), NULL, handler_) { } +}; + +template<typename P, typename F> +void JsonEncoder<P, F>::init(OutputStream& os) +{ + out_.init(os); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::flush() +{ + parser_.processImplicitActions(); + out_.flush(); +} + +template<typename P, typename F> +int64_t JsonEncoder<P, F>::byteCount() const +{ + return out_.byteCount(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeNull() +{ + parser_.advance(Symbol::sNull); + out_.encodeNull(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeBool(bool b) +{ + parser_.advance(Symbol::sBool); + out_.encodeBool(b); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeInt(int32_t i) +{ + parser_.advance(Symbol::sInt); + out_.encodeNumber(i); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeLong(int64_t l) +{ + parser_.advance(Symbol::sLong); + out_.encodeNumber(l); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeFloat(float f) +{ + parser_.advance(Symbol::sFloat); + if (f == std::numeric_limits<float>::infinity()) { + out_.encodeString("Infinity"); + } else if (f == -std::numeric_limits<float>::infinity()) { + out_.encodeString("-Infinity"); + } else if (boost::math::isnan(f)) { + out_.encodeString("NaN"); + } else { + out_.encodeNumber(f); + } +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeDouble(double d) +{ + parser_.advance(Symbol::sDouble); + if (d == std::numeric_limits<double>::infinity()) { + out_.encodeString("Infinity"); + } else if (d == -std::numeric_limits<double>::infinity()) { + out_.encodeString("-Infinity"); + } else if (boost::math::isnan(d)) { + out_.encodeString("NaN"); + } else { + out_.encodeNumber(d); + } +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeString(const std::string& s) +{ + parser_.advance(Symbol::sString); + out_.encodeString(s); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeBytes(const uint8_t *bytes, size_t len) +{ + parser_.advance(Symbol::sBytes); + out_.encodeBinary(bytes, len); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeFixed(const uint8_t *bytes, size_t len) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(len); + out_.encodeBinary(bytes, len); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeEnum(size_t e) +{ + parser_.advance(Symbol::sEnum); + const string& s = parser_.nameForIndex(e); + out_.encodeString(s); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::arrayStart() +{ + parser_.advance(Symbol::sArrayStart); + parser_.pushRepeatCount(0); + out_.arrayStart(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::arrayEnd() +{ + parser_.popRepeater(); + parser_.advance(Symbol::sArrayEnd); + out_.arrayEnd(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::mapStart() +{ + parser_.advance(Symbol::sMapStart); + parser_.pushRepeatCount(0); + out_.objectStart(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::mapEnd() +{ + parser_.popRepeater(); + parser_.advance(Symbol::sMapEnd); + out_.objectEnd(); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::setItemCount(size_t count) +{ + parser_.nextRepeatCount(count); +} + +template<typename P, typename F> +void JsonEncoder<P, F>::startItem() +{ + parser_.processImplicitActions(); + if (parser_.top() != Symbol::sRepeater) { + throw Exception("startItem at not an item boundary"); + } +} + +template<typename P, typename F> +void JsonEncoder<P, F>::encodeUnionIndex(size_t e) +{ + parser_.advance(Symbol::sUnion); + + const std::string name = parser_.nameForIndex(e); + + if (name != "null") { + out_.objectStart(); + out_.encodeString(name); + } + parser_.selectBranch(e); +} + +} // namespace parsing + +DecoderPtr jsonDecoder(const ValidSchema& s) +{ + return std::make_shared<parsing::JsonDecoder< + parsing::SimpleParser<parsing::JsonDecoderHandler> > >(s); +} + +EncoderPtr jsonEncoder(const ValidSchema& schema) +{ + return std::make_shared<parsing::JsonEncoder< + parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonNullFormatter> >, avro::json::JsonNullFormatter> >(schema); +} + +EncoderPtr jsonPrettyEncoder(const ValidSchema& schema) +{ + return std::make_shared<parsing::JsonEncoder< + parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonPrettyFormatter> >, avro::json::JsonPrettyFormatter> >(schema); +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/parsing/ResolvingDecoder.cc b/contrib/libs/apache/avro/impl/parsing/ResolvingDecoder.cc new file mode 100644 index 0000000000..f6dbacabcf --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/ResolvingDecoder.cc @@ -0,0 +1,740 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define __STDC_LIMIT_MACROS + +#include <string> +#include <stack> +#include <map> +#include <algorithm> +#include <memory> +#include <ctype.h> + +#include "ValidatingCodec.hh" +#include "Symbol.hh" +#include "Types.hh" +#include "ValidSchema.hh" +#include "Decoder.hh" +#include "Encoder.hh" +#include "NodeImpl.hh" +#include "Generic.hh" +#include "Stream.hh" + +namespace avro { + +using std::make_shared; + +namespace parsing { + +using std::shared_ptr; +using std::static_pointer_cast; +using std::make_shared; + +using std::unique_ptr; +using std::map; +using std::pair; +using std::vector; +using std::string; +using std::reverse; +using std::ostringstream; +using std::istringstream; +using std::stack; +using std::find_if; +using std::make_pair; + +typedef pair<NodePtr, NodePtr> NodePair; + +class ResolvingGrammarGenerator : public ValidatingGrammarGenerator { + ProductionPtr doGenerate2(const NodePtr& writer, + const NodePtr& reader, map<NodePair, ProductionPtr> &m, + map<NodePtr, ProductionPtr> &m2); + ProductionPtr resolveRecords(const NodePtr& writer, + const NodePtr& reader, map<NodePair, ProductionPtr> &m, + map<NodePtr, ProductionPtr> &m2); + ProductionPtr resolveUnion(const NodePtr& writer, + const NodePtr& reader, map<NodePair, ProductionPtr> &m, + map<NodePtr, ProductionPtr> &m2); + + static vector<pair<string, size_t> > fields(const NodePtr& n) { + vector<pair<string, size_t> > result; + size_t c = n->names(); + for (size_t i = 0; i < c; ++i) { + result.push_back(make_pair(n->nameAt(i), i)); + } + return result; + } + + static int bestBranch(const NodePtr& writer, const NodePtr& reader); + + ProductionPtr getWriterProduction(const NodePtr& n, + map<NodePtr, ProductionPtr>& m2); + +public: + Symbol generate( + const ValidSchema& writer, const ValidSchema& reader); +}; + +Symbol ResolvingGrammarGenerator::generate( + const ValidSchema& writer, const ValidSchema& reader) { + map<NodePtr, ProductionPtr> m2; + + const NodePtr& rr = reader.root(); + const NodePtr& rw = writer.root(); + ProductionPtr backup = ValidatingGrammarGenerator::doGenerate(rw, m2); + fixup(backup, m2); + + map<NodePair, ProductionPtr> m; + ProductionPtr main = doGenerate2(rw, rr, m, m2); + fixup(main, m); + return Symbol::rootSymbol(main, backup); +} + +int ResolvingGrammarGenerator::bestBranch(const NodePtr& writer, + const NodePtr& reader) +{ + Type t = writer->type(); + + const size_t c = reader->leaves(); + for (size_t j = 0; j < c; ++j) { + NodePtr r = reader->leafAt(j); + if (r->type() == AVRO_SYMBOLIC) { + r = resolveSymbol(r); + } + if (t == r->type()) { + if (r->hasName()) { + if (r->name() == writer->name()) { + return j; + } + } else { + return j; + } + } + } + + for (size_t j = 0; j < c; ++j) { + const NodePtr& r = reader->leafAt(j); + Type rt = r->type(); + switch (t) { + case AVRO_INT: + if (rt == AVRO_LONG || rt == AVRO_DOUBLE || rt == AVRO_FLOAT) { + return j; + } + break; + case AVRO_LONG: + case AVRO_FLOAT: + if (rt == AVRO_DOUBLE) { + return j; + } + break; + default: + break; + } + } + return -1; +} + +static shared_ptr<vector<uint8_t> > getAvroBinary( + const GenericDatum& defaultValue) +{ + EncoderPtr e = binaryEncoder(); + unique_ptr<OutputStream> os = memoryOutputStream(); + e->init(*os); + GenericWriter::write(*e, defaultValue); + e->flush(); + return snapshot(*os); +} + +template<typename T1, typename T2> +struct equalsFirst +{ + const T1& v_; + equalsFirst(const T1& v) : v_(v) { } + bool operator()(const pair<T1, T2>& p) { + return p.first == v_; + } +}; + +ProductionPtr ResolvingGrammarGenerator::getWriterProduction( + const NodePtr& n, map<NodePtr, ProductionPtr>& m2) +{ + const NodePtr& nn = (n->type() == AVRO_SYMBOLIC) ? + static_cast<const NodeSymbolic& >(*n).getNode() : n; + map<NodePtr, ProductionPtr>::const_iterator it2 = m2.find(nn); + if (it2 != m2.end()) { + return it2->second; + } else { + ProductionPtr result = ValidatingGrammarGenerator::doGenerate(nn, m2); + fixup(result, m2); + return result; + } +} + +ProductionPtr ResolvingGrammarGenerator::resolveRecords( + const NodePtr& writer, const NodePtr& reader, + map<NodePair, ProductionPtr>& m, + map<NodePtr, ProductionPtr>& m2) +{ + ProductionPtr result = make_shared<Production>(); + + vector<pair<string, size_t> > wf = fields(writer); + vector<pair<string, size_t> > rf = fields(reader); + vector<size_t> fieldOrder; + fieldOrder.reserve(reader->names()); + + /* + * We look for all writer fields in the reader. If found, recursively + * resolve the corresponding fields. Then erase the reader field. + * If no matching field is found for reader, arrange to skip the writer + * field. + */ + for (vector<pair<string, size_t> >::const_iterator it = wf.begin(); + it != wf.end(); ++it) { + vector<pair<string, size_t> >::iterator it2 = + find_if(rf.begin(), rf.end(), + equalsFirst<string, size_t>(it->first)); + if (it2 != rf.end()) { + ProductionPtr p = doGenerate2(writer->leafAt(it->second), + reader->leafAt(it2->second), m, m2); + copy(p->rbegin(), p->rend(), back_inserter(*result)); + fieldOrder.push_back(it2->second); + rf.erase(it2); + } else { + ProductionPtr p = getWriterProduction( + writer->leafAt(it->second), m2); + result->push_back(Symbol::skipStart()); + if (p->size() == 1) { + result->push_back((*p)[0]); + } else { + result->push_back(Symbol::indirect(p)); + } + } + } + + /* + * Examine the reader fields left out, (i.e. those didn't have corresponding + * writer field). + */ + for (vector<pair<string, size_t> >::const_iterator it = rf.begin(); + it != rf.end(); ++it) { + + NodePtr s = reader->leafAt(it->second); + fieldOrder.push_back(it->second); + + if (s->type() == AVRO_SYMBOLIC) { + s = resolveSymbol(s); + } + shared_ptr<vector<uint8_t> > defaultBinary = + getAvroBinary(reader->defaultValueAt(it->second)); + result->push_back(Symbol::defaultStartAction(defaultBinary)); + map<NodePair, shared_ptr<Production> >::const_iterator it2 = + m.find(NodePair(s, s)); + ProductionPtr p = (it2 == m.end()) ? + doGenerate2(s, s, m, m2) : it2->second; + copy(p->rbegin(), p->rend(), back_inserter(*result)); + result->push_back(Symbol::defaultEndAction()); + } + reverse(result->begin(), result->end()); + result->push_back(Symbol::sizeListAction(fieldOrder)); + result->push_back(Symbol::recordAction()); + + return result; + +} + +ProductionPtr ResolvingGrammarGenerator::resolveUnion( + const NodePtr& writer, const NodePtr& reader, + map<NodePair, ProductionPtr>& m, + map<NodePtr, ProductionPtr>& m2) +{ + vector<ProductionPtr> v; + size_t c = writer->leaves(); + v.reserve(c); + for (size_t i = 0; i < c; ++i) { + ProductionPtr p = doGenerate2(writer->leafAt(i), reader, m, m2); + v.push_back(p); + } + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::alternative(v)); + result->push_back(Symbol::writerUnionAction()); + return result; +} + +ProductionPtr ResolvingGrammarGenerator::doGenerate2( + const NodePtr& w, const NodePtr& r, + map<NodePair, ProductionPtr> &m, + map<NodePtr, ProductionPtr> &m2) +{ + const NodePtr writer = w->type() == AVRO_SYMBOLIC ? resolveSymbol(w) : w; + const NodePtr reader = r->type() == AVRO_SYMBOLIC ? resolveSymbol(r) : r; + Type writerType = writer->type(); + Type readerType = reader->type(); + + if (writerType == readerType) { + switch (writerType) { + case AVRO_NULL: + return make_shared<Production>(1, Symbol::nullSymbol()); + case AVRO_BOOL: + return make_shared<Production>(1, Symbol::boolSymbol()); + case AVRO_INT: + return make_shared<Production>(1, Symbol::intSymbol()); + case AVRO_LONG: + return make_shared<Production>(1, Symbol::longSymbol()); + case AVRO_FLOAT: + return make_shared<Production>(1, Symbol::floatSymbol()); + case AVRO_DOUBLE: + return make_shared<Production>(1, Symbol::doubleSymbol()); + case AVRO_STRING: + return make_shared<Production>(1, Symbol::stringSymbol()); + case AVRO_BYTES: + return make_shared<Production>(1, Symbol::bytesSymbol()); + case AVRO_FIXED: + if (writer->name() == reader->name() && + writer->fixedSize() == reader->fixedSize()) { + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::sizeCheckSymbol(reader->fixedSize())); + result->push_back(Symbol::fixedSymbol()); + m[make_pair(writer, reader)] = result; + return result; + } + break; + case AVRO_RECORD: + if (writer->name() == reader->name()) { + const pair<NodePtr, NodePtr> key(writer, reader); + map<NodePair, ProductionPtr>::const_iterator kp = m.find(key); + if (kp != m.end()) { + return (kp->second) ? kp->second : + make_shared<Production>(1, Symbol::placeholder(key)); + } + m[key] = ProductionPtr(); + ProductionPtr result = resolveRecords(writer, reader, m, m2); + m[key] = result; + return make_shared<Production>(1, Symbol::indirect(result)); + } + break; + + case AVRO_ENUM: + if (writer->name() == reader->name()) { + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::enumAdjustSymbol(writer, reader)); + result->push_back(Symbol::enumSymbol()); + m[make_pair(writer, reader)] = result; + return result; + } + break; + + case AVRO_ARRAY: + { + ProductionPtr p = getWriterProduction(writer->leafAt(0), m2); + ProductionPtr p2 = doGenerate2(writer->leafAt(0), reader->leafAt(0), m, m2); + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::arrayEndSymbol()); + result->push_back(Symbol::repeater(p2, p, true)); + result->push_back(Symbol::arrayStartSymbol()); + return result; + } + case AVRO_MAP: + { + ProductionPtr pp = + doGenerate2(writer->leafAt(1),reader->leafAt(1), m, m2); + ProductionPtr v(new Production(*pp)); + v->push_back(Symbol::stringSymbol()); + + ProductionPtr pp2 = getWriterProduction(writer->leafAt(1), m2); + ProductionPtr v2(new Production(*pp2)); + + v2->push_back(Symbol::stringSymbol()); + + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::mapEndSymbol()); + result->push_back(Symbol::repeater(v, v2, false)); + result->push_back(Symbol::mapStartSymbol()); + return result; + } + case AVRO_UNION: + return resolveUnion(writer, reader, m, m2); + case AVRO_SYMBOLIC: + { + shared_ptr<NodeSymbolic> w = + static_pointer_cast<NodeSymbolic>(writer); + shared_ptr<NodeSymbolic> r = + static_pointer_cast<NodeSymbolic>(reader); + NodePair p(w->getNode(), r->getNode()); + map<NodePair, ProductionPtr>::iterator it = m.find(p); + if (it != m.end() && it->second) { + return it->second; + } else { + m[p] = ProductionPtr(); + return make_shared<Production>(1, Symbol::placeholder(p)); + } + } + default: + throw Exception("Unknown node type"); + } + } else if (writerType == AVRO_UNION) { + return resolveUnion(writer, reader, m, m2); + } else { + switch (readerType) { + case AVRO_LONG: + if (writerType == AVRO_INT) { + return make_shared<Production>(1, + Symbol::resolveSymbol(Symbol::sInt, Symbol::sLong)); + } + break; + case AVRO_FLOAT: + if (writerType == AVRO_INT || writerType == AVRO_LONG) { + return make_shared<Production>(1, + Symbol::resolveSymbol(writerType == AVRO_INT ? + Symbol::sInt : Symbol::sLong, Symbol::sFloat)); + } + break; + case AVRO_DOUBLE: + if (writerType == AVRO_INT || writerType == AVRO_LONG + || writerType == AVRO_FLOAT) { + return make_shared<Production>(1, + Symbol::resolveSymbol(writerType == AVRO_INT ? + Symbol::sInt : writerType == AVRO_LONG ? + Symbol::sLong : Symbol::sFloat, Symbol::sDouble)); + } + break; + + case AVRO_UNION: + { + int j = bestBranch(writer, reader); + if (j >= 0) { + ProductionPtr p = doGenerate2(writer, reader->leafAt(j), m, m2); + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::unionAdjustSymbol(j, p)); + result->push_back(Symbol::unionSymbol()); + return result; + } + } + break; + case AVRO_NULL: + case AVRO_BOOL: + case AVRO_INT: + case AVRO_STRING: + case AVRO_BYTES: + case AVRO_ENUM: + case AVRO_ARRAY: + case AVRO_MAP: + case AVRO_RECORD: + break; + default: + throw Exception("Unknown node type"); + } + } + return make_shared<Production>(1, Symbol::error(writer, reader)); +} + +class ResolvingDecoderHandler { + shared_ptr<vector<uint8_t> > defaultData_; + unique_ptr<InputStream> inp_; + DecoderPtr backup_; + DecoderPtr& base_; + const DecoderPtr binDecoder; + public: + ResolvingDecoderHandler(DecoderPtr& base) : base_(base), + binDecoder(binaryDecoder()) { } + size_t handle(const Symbol& s) { + switch (s.kind()) { + case Symbol::sWriterUnion: + return base_->decodeUnionIndex(); + case Symbol::sDefaultStart: + defaultData_ = s.extra<shared_ptr<vector<uint8_t> > >(); + backup_ = base_; + inp_ = memoryInputStream(&(*defaultData_)[0], defaultData_->size()); + base_ = binDecoder; + base_->init(*inp_); + return 0; + case Symbol::sDefaultEnd: + base_= backup_; + backup_.reset(); + return 0; + default: + return 0; + } + } + + void reset() + { + if (backup_ != NULL) + { + base_= backup_; + backup_.reset(); + } + } +}; + +template <typename Parser> +class ResolvingDecoderImpl : public ResolvingDecoder +{ + DecoderPtr base_; + ResolvingDecoderHandler handler_; + Parser parser_; + + void init(InputStream& is); + void decodeNull(); + bool decodeBool(); + int32_t decodeInt(); + int64_t decodeLong(); + float decodeFloat(); + double decodeDouble(); + void decodeString(string& value); + void skipString(); + void decodeBytes(vector<uint8_t>& value); + void skipBytes(); + void decodeFixed(size_t n, vector<uint8_t>& value); + void skipFixed(size_t n); + size_t decodeEnum(); + size_t arrayStart(); + size_t arrayNext(); + size_t skipArray(); + size_t mapStart(); + size_t mapNext(); + size_t skipMap(); + size_t decodeUnionIndex(); + const vector<size_t>& fieldOrder(); + void drain() { + parser_.processImplicitActions(); + base_->drain(); + } +public: + ResolvingDecoderImpl(const ValidSchema& writer, const ValidSchema& reader, + const DecoderPtr& base) : + base_(base), + handler_(base_), + parser_(ResolvingGrammarGenerator().generate(writer, reader), + &(*base_), handler_) + { + } +}; + +template <typename P> +void ResolvingDecoderImpl<P>::init(InputStream& is) +{ + handler_.reset(); + base_->init(is); + parser_.reset(); +} + +template <typename P> +void ResolvingDecoderImpl<P>::decodeNull() +{ + parser_.advance(Symbol::sNull); + base_->decodeNull(); +} + +template <typename P> +bool ResolvingDecoderImpl<P>::decodeBool() +{ + parser_.advance(Symbol::sBool); + return base_->decodeBool(); +} + +template <typename P> +int32_t ResolvingDecoderImpl<P>::decodeInt() +{ + parser_.advance(Symbol::sInt); + return base_->decodeInt(); +} + +template <typename P> +int64_t ResolvingDecoderImpl<P>::decodeLong() +{ + Symbol::Kind k = parser_.advance(Symbol::sLong); + return k == Symbol::sInt ? base_->decodeInt() : base_->decodeLong(); +} + +template <typename P> +float ResolvingDecoderImpl<P>::decodeFloat() +{ + Symbol::Kind k = parser_.advance(Symbol::sFloat); + return k == Symbol::sInt ? base_->decodeInt() : + k == Symbol::sLong ? base_->decodeLong() : + base_->decodeFloat(); +} + +template <typename P> +double ResolvingDecoderImpl<P>::decodeDouble() +{ + Symbol::Kind k = parser_.advance(Symbol::sDouble); + return k == Symbol::sInt ? base_->decodeInt() : + k == Symbol::sLong ? base_->decodeLong() : + k == Symbol::sFloat ? base_->decodeFloat() : + base_->decodeDouble(); +} + +template <typename P> +void ResolvingDecoderImpl<P>::decodeString(string& value) +{ + parser_.advance(Symbol::sString); + base_->decodeString(value); +} + +template <typename P> +void ResolvingDecoderImpl<P>::skipString() +{ + parser_.advance(Symbol::sString); + base_->skipString(); +} + +template <typename P> +void ResolvingDecoderImpl<P>::decodeBytes(vector<uint8_t>& value) +{ + parser_.advance(Symbol::sBytes); + base_->decodeBytes(value); +} + +template <typename P> +void ResolvingDecoderImpl<P>::skipBytes() +{ + parser_.advance(Symbol::sBytes); + base_->skipBytes(); +} + +template <typename P> +void ResolvingDecoderImpl<P>::decodeFixed(size_t n, vector<uint8_t>& value) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(n); + return base_->decodeFixed(n, value); +} + +template <typename P> +void ResolvingDecoderImpl<P>::skipFixed(size_t n) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(n); + base_->skipFixed(n); +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::decodeEnum() +{ + parser_.advance(Symbol::sEnum); + size_t n = base_->decodeEnum(); + return parser_.enumAdjust(n); +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::arrayStart() +{ + parser_.advance(Symbol::sArrayStart); + size_t result = base_->arrayStart(); + parser_.pushRepeatCount(result); + if (result == 0) { + parser_.popRepeater(); + parser_.advance(Symbol::sArrayEnd); + } + return result; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::arrayNext() +{ + parser_.processImplicitActions(); + size_t result = base_->arrayNext(); + parser_.nextRepeatCount(result); + if (result == 0) { + parser_.popRepeater(); + parser_.advance(Symbol::sArrayEnd); + } + return result; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::skipArray() +{ + parser_.advance(Symbol::sArrayStart); + size_t n = base_->skipArray(); + if (n == 0) { + parser_.pop(); + } else { + parser_.pushRepeatCount(n); + parser_.skip(*base_); + } + parser_.advance(Symbol::sArrayEnd); + return 0; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::mapStart() +{ + parser_.advance(Symbol::sMapStart); + size_t result = base_->mapStart(); + parser_.pushRepeatCount(result); + if (result == 0) { + parser_.popRepeater(); + parser_.advance(Symbol::sMapEnd); + } + return result; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::mapNext() +{ + parser_.processImplicitActions(); + size_t result = base_->mapNext(); + parser_.nextRepeatCount(result); + if (result == 0) { + parser_.popRepeater(); + parser_.advance(Symbol::sMapEnd); + } + return result; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::skipMap() +{ + parser_.advance(Symbol::sMapStart); + size_t n = base_->skipMap(); + if (n == 0) { + parser_.pop(); + } else { + parser_.pushRepeatCount(n); + parser_.skip(*base_); + } + parser_.advance(Symbol::sMapEnd); + return 0; +} + +template <typename P> +size_t ResolvingDecoderImpl<P>::decodeUnionIndex() +{ + parser_.advance(Symbol::sUnion); + return parser_.unionAdjust(); +} + +template <typename P> +const vector<size_t>& ResolvingDecoderImpl<P>::fieldOrder() +{ + parser_.advance(Symbol::sRecord); + return parser_.sizeList(); +} + +} // namespace parsing + +ResolvingDecoderPtr resolvingDecoder(const ValidSchema& writer, + const ValidSchema& reader, const DecoderPtr& base) { + return make_shared<parsing::ResolvingDecoderImpl + <parsing::SimpleParser<parsing::ResolvingDecoderHandler> > >( + writer, reader, base); +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/parsing/Symbol.cc b/contrib/libs/apache/avro/impl/parsing/Symbol.cc new file mode 100644 index 0000000000..6eb83309be --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/Symbol.cc @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "Symbol.hh" + +namespace avro { +namespace parsing { + +using std::vector; +using std::string; +using std::ostringstream; + +const char* Symbol::stringValues[] = { + "TerminalLow", + "Null", + "Bool", + "Int", + "Long", + "Float", + "Double", + "String", + "Bytes", + "ArrayStart", + "ArrayEnd", + "MapStart", + "MapEnd", + "Fixed", + "Enum", + "Union", + "TerminalHigh", + "SizeCheck", + "NameList", + "Root", + "Repeater", + "Alternative", + "Placeholder", + "Indirect", + "Symbolic", + "EnumAdjust", + "UnionAdjust", + "SkipStart", + "Resolve", + "ImplicitActionLow", + "RecordStart", + "RecordEnd", + "Field", + "Record", + "SizeList", + "WriterUnion", + "DefaultStart", + "DefaultEnd", + "ImplicitActionHigh", + "Error" +}; + +Symbol Symbol::enumAdjustSymbol(const NodePtr& writer, const NodePtr& reader) +{ + vector<string> rs; + size_t rc = reader->names(); + for (size_t i = 0; i < rc; ++i) { + rs.push_back(reader->nameAt(i)); + } + + size_t wc = writer->names(); + vector<int> adj; + adj.reserve(wc); + + vector<string> err; + + for (size_t i = 0; i < wc; ++i) { + const string& s = writer->nameAt(i); + vector<string>::const_iterator it = find(rs.begin(), rs.end(), s); + if (it == rs.end()) { + int pos = err.size() + 1; + adj.push_back(-pos); + err.push_back(s); + } else { + adj.push_back(it - rs.begin()); + } + } + return Symbol(sEnumAdjust, make_pair(adj, err)); +} + +Symbol Symbol::error(const NodePtr& writer, const NodePtr& reader) +{ + ostringstream oss; + oss << "Cannot resolve: " << std::endl; + writer->printJson(oss, 0); + oss << std::endl << "with" << std::endl; + reader->printJson(oss, 0); + return Symbol(sError, oss.str()); +} + +} // namespace parsing +} // namespace avro diff --git a/contrib/libs/apache/avro/impl/parsing/Symbol.hh b/contrib/libs/apache/avro/impl/parsing/Symbol.hh new file mode 100644 index 0000000000..f4ecfe6e83 --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/Symbol.hh @@ -0,0 +1,854 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef avro_parsing_Symbol_hh__ +#define avro_parsing_Symbol_hh__ + +#include <vector> +#include <map> +#include <set> +#include <stack> +#include <sstream> + +#include <boost/any.hpp> +#include <boost/tuple/tuple.hpp> + +#include "Node.hh" +#include "Decoder.hh" +#include "Exception.hh" + +namespace avro { +namespace parsing { + +class Symbol; + +typedef std::vector<Symbol> Production; +typedef std::shared_ptr<Production> ProductionPtr; +typedef boost::tuple<std::stack<ssize_t>, bool, ProductionPtr, ProductionPtr> RepeaterInfo; +typedef boost::tuple<ProductionPtr, ProductionPtr> RootInfo; + +class Symbol { +public: + enum Kind { + sTerminalLow, // extra has nothing + sNull, + sBool, + sInt, + sLong, + sFloat, + sDouble, + sString, + sBytes, + sArrayStart, + sArrayEnd, + sMapStart, + sMapEnd, + sFixed, + sEnum, + sUnion, + sTerminalHigh, + sSizeCheck, // Extra has size + sNameList, // Extra has a vector<string> + sRoot, // Root for a schema, extra is Symbol + sRepeater, // Array or Map, extra is symbol + sAlternative, // One of many (union), extra is Union + sPlaceholder, // To be fixed up later. + sIndirect, // extra is shared_ptr<Production> + sSymbolic, // extra is weal_ptr<Production> + sEnumAdjust, + sUnionAdjust, + sSkipStart, + sResolve, + + sImplicitActionLow, + sRecordStart, + sRecordEnd, + sField, // extra is string + sRecord, + sSizeList, + sWriterUnion, + sDefaultStart, // extra has default value in Avro binary encoding + sDefaultEnd, + sImplicitActionHigh, + sError + }; + +private: + Kind kind_; + boost::any extra_; + + + explicit Symbol(Kind k) : kind_(k) { } + template <typename T> Symbol(Kind k, T t) : kind_(k), extra_(t) { } +public: + + Kind kind() const { + return kind_; + } + + template <typename T> T extra() const { + return boost::any_cast<T>(extra_); + } + + template <typename T> T* extrap() { + return boost::any_cast<T>(&extra_); + } + + template <typename T> const T* extrap() const { + return boost::any_cast<T>(&extra_); + } + + template <typename T> void extra(const T& t) { + extra_ = t; + } + + bool isTerminal() const { + return kind_ > sTerminalLow && kind_ < sTerminalHigh; + } + + bool isImplicitAction() const { + return kind_ > sImplicitActionLow && kind_ < sImplicitActionHigh; + } + + static const char* stringValues[]; + static const char* toString(Kind k) { + return stringValues[k]; + } + + static Symbol rootSymbol(ProductionPtr& s) + { + return Symbol(Symbol::sRoot, RootInfo(s, std::make_shared<Production>())); + } + + static Symbol rootSymbol(const ProductionPtr& main, + const ProductionPtr& backup) + { + return Symbol(Symbol::sRoot, RootInfo(main, backup)); + } + + static Symbol nullSymbol() { + return Symbol(sNull); + } + + static Symbol boolSymbol() { + return Symbol(sBool); + } + + static Symbol intSymbol() { + return Symbol(sInt); + } + + static Symbol longSymbol() { + return Symbol(sLong); + } + + static Symbol floatSymbol() { + return Symbol(sFloat); + } + + static Symbol doubleSymbol() { + return Symbol(sDouble); + } + + static Symbol stringSymbol() { + return Symbol(sString); + } + + static Symbol bytesSymbol() { + return Symbol(sBytes); + } + + static Symbol sizeCheckSymbol(size_t s) { + return Symbol(sSizeCheck, s); + } + + static Symbol fixedSymbol() { + return Symbol(sFixed); + } + + static Symbol enumSymbol() { + return Symbol(sEnum); + } + + static Symbol arrayStartSymbol() { + return Symbol(sArrayStart); + } + + static Symbol arrayEndSymbol() { + return Symbol(sArrayEnd); + } + + static Symbol mapStartSymbol() { + return Symbol(sMapStart); + } + + static Symbol mapEndSymbol() { + return Symbol(sMapEnd); + } + + static Symbol repeater(const ProductionPtr& p, + bool isArray) { + return repeater(p, p, isArray); + } + + static Symbol repeater(const ProductionPtr& read, + const ProductionPtr& skip, + bool isArray) { + std::stack<ssize_t> s; + return Symbol(sRepeater, RepeaterInfo(s, isArray, read, skip)); + } + + static Symbol defaultStartAction(std::shared_ptr<std::vector<uint8_t> > bb) + { + return Symbol(sDefaultStart, bb); + } + + static Symbol defaultEndAction() + { + return Symbol(sDefaultEnd); + } + + static Symbol alternative( + const std::vector<ProductionPtr>& branches) + { + return Symbol(Symbol::sAlternative, branches); + } + + static Symbol unionSymbol() { + return Symbol(sUnion); + } + + static Symbol recordStartSymbol() { + return Symbol(sRecordStart); + } + + static Symbol recordEndSymbol() { + return Symbol(sRecordEnd); + } + + static Symbol fieldSymbol(const std::string& name) { + return Symbol(sField, name); + } + + static Symbol writerUnionAction() { + return Symbol(sWriterUnion); + } + + static Symbol nameListSymbol( + const std::vector<std::string>& v) { + return Symbol(sNameList, v); + } + + template <typename T> + static Symbol placeholder(const T& n) { + return Symbol(sPlaceholder, n); + } + + static Symbol indirect(const ProductionPtr& p) { + return Symbol(sIndirect, p); + } + + static Symbol symbolic(const std::weak_ptr<Production>& p) { + return Symbol(sSymbolic, p); + } + + static Symbol enumAdjustSymbol(const NodePtr& writer, + const NodePtr& reader); + + static Symbol unionAdjustSymbol(size_t branch, + const ProductionPtr& p) { + return Symbol(sUnionAdjust, std::make_pair(branch, p)); + } + + static Symbol sizeListAction(std::vector<size_t> order) { + return Symbol(sSizeList, order); + } + + static Symbol recordAction() { + return Symbol(sRecord); + } + + static Symbol error(const NodePtr& writer, const NodePtr& reader); + + static Symbol resolveSymbol(Kind w, Kind r) { + return Symbol(sResolve, std::make_pair(w, r)); + } + + static Symbol skipStart() { + return Symbol(sSkipStart); + } + +}; + +/** + * Recursively replaces all placeholders in the production with the + * corresponding values. + */ +template<typename T> +void fixup(const ProductionPtr& p, + const std::map<T, ProductionPtr> &m) +{ + std::set<ProductionPtr> seen; + for (Production::iterator it = p->begin(); it != p->end(); ++it) { + fixup(*it, m, seen); + } +} + + +/** + * Recursively replaces all placeholders in the symbol with the values with the + * corresponding values. + */ +template<typename T> +void fixup_internal(const ProductionPtr& p, + const std::map<T, ProductionPtr> &m, + std::set<ProductionPtr>& seen) +{ + if (seen.find(p) == seen.end()) { + seen.insert(p); + for (Production::iterator it = p->begin(); it != p->end(); ++it) { + fixup(*it, m, seen); + } + } +} + +template<typename T> +void fixup(Symbol& s, const std::map<T, ProductionPtr> &m, + std::set<ProductionPtr>& seen) +{ + switch (s.kind()) { + case Symbol::sIndirect: + fixup_internal(s.extra<ProductionPtr>(), m, seen); + break; + case Symbol::sAlternative: + { + const std::vector<ProductionPtr> *vv = + s.extrap<std::vector<ProductionPtr> >(); + for (std::vector<ProductionPtr>::const_iterator it = vv->begin(); + it != vv->end(); ++it) { + fixup_internal(*it, m, seen); + } + } + break; + case Symbol::sRepeater: + { + const RepeaterInfo& ri = *s.extrap<RepeaterInfo>(); + fixup_internal(boost::tuples::get<2>(ri), m, seen); + fixup_internal(boost::tuples::get<3>(ri), m, seen); + } + break; + case Symbol::sPlaceholder: + { + typename std::map<T, std::shared_ptr<Production> >::const_iterator it = + m.find(s.extra<T>()); + if (it == m.end()) { + throw Exception("Placeholder symbol cannot be resolved"); + } + s = Symbol::symbolic(std::weak_ptr<Production>(it->second)); + } + break; + case Symbol::sUnionAdjust: + fixup_internal(s.extrap<std::pair<size_t, ProductionPtr> >()->second, + m, seen); + break; + default: + break; + } +} + +template<typename Handler> +class SimpleParser { + Decoder* decoder_; + Handler& handler_; + std::stack<Symbol> parsingStack; + + static void throwMismatch(Symbol::Kind actual, Symbol::Kind expected) + { + std::ostringstream oss; + oss << "Invalid operation. Schema requires: " << + Symbol::toString(expected) << ", got: " << + Symbol::toString(actual); + throw Exception(oss.str()); + } + + static void assertMatch(Symbol::Kind actual, Symbol::Kind expected) + { + if (expected != actual) { + throwMismatch(actual, expected); + } + + } + + void append(const ProductionPtr& ss) { + for (Production::const_iterator it = ss->begin(); + it != ss->end(); ++it) { + parsingStack.push(*it); + } + } + + size_t popSize() { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sSizeCheck, s.kind()); + size_t result = s.extra<size_t>(); + parsingStack.pop(); + return result; + } + + static void assertLessThan(size_t n, size_t s) { + if (n >= s) { + std::ostringstream oss; + oss << "Size max value. Upper bound: " << s << " found " << n; + throw Exception(oss.str()); + } + } + +public: + Symbol::Kind advance(Symbol::Kind k) { + for (; ;) { + Symbol& s = parsingStack.top(); +// std::cout << "advance: " << Symbol::toString(s.kind()) +// << " looking for " << Symbol::toString(k) << '\n'; + if (s.kind() == k) { + parsingStack.pop(); + return k; + } else if (s.isTerminal()) { + throwMismatch(k, s.kind()); + } else { + switch (s.kind()) { + case Symbol::sRoot: + append(boost::tuples::get<0>(*s.extrap<RootInfo>())); + continue; + case Symbol::sIndirect: + { + ProductionPtr pp = + s.extra<ProductionPtr>(); + parsingStack.pop(); + append(pp); + } + continue; + case Symbol::sSymbolic: + { + ProductionPtr pp( + s.extra<std::weak_ptr<Production> >()); + parsingStack.pop(); + append(pp); + } + continue; + case Symbol::sRepeater: + { + RepeaterInfo *p = s.extrap<RepeaterInfo>(); + std::stack<ssize_t>& ns = boost::tuples::get<0>(*p); + if (ns.empty()) { + throw Exception( + "Empty item count stack in repeater advance"); + } + if (ns.top() == 0) { + throw Exception( + "Zero item count in repeater advance"); + } + --ns.top(); + append(boost::tuples::get<2>(*p)); + } + continue; + case Symbol::sError: + throw Exception(s.extra<std::string>()); + case Symbol::sResolve: + { + const std::pair<Symbol::Kind, Symbol::Kind>* p = + s.extrap<std::pair<Symbol::Kind, Symbol::Kind> >(); + assertMatch(p->second, k); + Symbol::Kind result = p->first; + parsingStack.pop(); + return result; + } + case Symbol::sSkipStart: + parsingStack.pop(); + skip(*decoder_); + break; + default: + if (s.isImplicitAction()) { + size_t n = handler_.handle(s); + if (s.kind() == Symbol::sWriterUnion) { + parsingStack.pop(); + selectBranch(n); + } else { + parsingStack.pop(); + } + } else { + std::ostringstream oss; + oss << "Encountered " << Symbol::toString(s.kind()) + << " while looking for " << Symbol::toString(k); + throw Exception(oss.str()); + } + } + } + } + } + + void skip(Decoder& d) { + const size_t sz = parsingStack.size(); + if (sz == 0) { + throw Exception("Nothing to skip!"); + } + while (parsingStack.size() >= sz) { + Symbol& t = parsingStack.top(); + // std::cout << "skip: " << Symbol::toString(t.kind()) << '\n'; + switch (t.kind()) { + case Symbol::sNull: + d.decodeNull(); + break; + case Symbol::sBool: + d.decodeBool(); + break; + case Symbol::sInt: + d.decodeInt(); + break; + case Symbol::sLong: + d.decodeLong(); + break; + case Symbol::sFloat: + d.decodeFloat(); + break; + case Symbol::sDouble: + d.decodeDouble(); + break; + case Symbol::sString: + d.skipString(); + break; + case Symbol::sBytes: + d.skipBytes(); + break; + case Symbol::sArrayStart: + { + parsingStack.pop(); + size_t n = d.skipArray(); + processImplicitActions(); + assertMatch(Symbol::sRepeater, parsingStack.top().kind()); + if (n == 0) { + break; + } + Symbol& t = parsingStack.top(); + RepeaterInfo *p = t.extrap<RepeaterInfo>(); + boost::tuples::get<0>(*p).push(n); + continue; + } + case Symbol::sArrayEnd: + break; + case Symbol::sMapStart: + { + parsingStack.pop(); + size_t n = d.skipMap(); + processImplicitActions(); + assertMatch(Symbol::sRepeater, parsingStack.top().kind()); + if (n == 0) { + break; + } + Symbol& t = parsingStack.top(); + RepeaterInfo *p = t.extrap<RepeaterInfo>(); + boost::tuples::get<0>(*p).push(n); + continue; + } + case Symbol::sMapEnd: + break; + case Symbol::sFixed: + { + parsingStack.pop(); + Symbol& t = parsingStack.top(); + d.decodeFixed(t.extra<size_t>()); + } + break; + case Symbol::sEnum: + parsingStack.pop(); + d.decodeEnum(); + break; + case Symbol::sUnion: + { + parsingStack.pop(); + size_t n = d.decodeUnionIndex(); + selectBranch(n); + continue; + } + case Symbol::sRepeater: + { + RepeaterInfo *p = t.extrap<RepeaterInfo>(); + std::stack<ssize_t>& ns = boost::tuples::get<0>(*p); + if (ns.empty()) { + throw Exception( + "Empty item count stack in repeater skip"); + } + ssize_t& n = ns.top(); + if (n == 0) { + n = boost::tuples::get<1>(*p) ? d.arrayNext() + : d.mapNext(); + } + if (n != 0) { + --n; + append(boost::tuples::get<3>(*p)); + continue; + } else { + ns.pop(); + } + } + break; + case Symbol::sIndirect: + { + ProductionPtr pp = + t.extra<ProductionPtr>(); + parsingStack.pop(); + append(pp); + } + continue; + case Symbol::sSymbolic: + { + ProductionPtr pp( + t.extra<std::weak_ptr<Production> >()); + parsingStack.pop(); + append(pp); + } + continue; + default: + { + std::ostringstream oss; + oss << "Don't know how to skip " + << Symbol::toString(t.kind()); + throw Exception(oss.str()); + } + } + parsingStack.pop(); + } + } + + void assertSize(size_t n) { + size_t s = popSize(); + if (s != n) { + std::ostringstream oss; + oss << "Incorrect size. Expected: " << s << " found " << n; + throw Exception(oss.str()); + } + } + + void assertLessThanSize(size_t n) { + assertLessThan(n, popSize()); + } + + size_t enumAdjust(size_t n) { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sEnumAdjust, s.kind()); + const std::pair<std::vector<int>, std::vector<std::string> >* v = + s.extrap<std::pair<std::vector<int>, std::vector<std::string> > >(); + assertLessThan(n, v->first.size()); + + int result = v->first[n]; + if (result < 0) { + std::ostringstream oss; + oss << "Cannot resolve symbol: " << v->second[-result - 1] + << std::endl; + throw Exception(oss.str()); + } + parsingStack.pop(); + return result; + } + + size_t unionAdjust() { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sUnionAdjust, s.kind()); + std::pair<size_t, ProductionPtr> p = + s.extra<std::pair<size_t, ProductionPtr> >(); + parsingStack.pop(); + append(p.second); + return p.first; + } + + std::string nameForIndex(size_t e) { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sNameList, s.kind()); + const std::vector<std::string> names = + s.extra<std::vector<std::string> >(); + if (e >= names.size()) { + throw Exception("Not that many names"); + } + std::string result = names[e]; + parsingStack.pop(); + return result; + } + + size_t indexForName(const std::string &name) { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sNameList, s.kind()); + const std::vector<std::string> names = + s.extra<std::vector<std::string> >(); + std::vector<std::string>::const_iterator it = + std::find(names.begin(), names.end(), name); + if (it == names.end()) { + throw Exception("No such enum symbol"); + } + size_t result = it - names.begin(); + parsingStack.pop(); + return result; + } + + void pushRepeatCount(size_t n) { + processImplicitActions(); + Symbol& s = parsingStack.top(); + assertMatch(Symbol::sRepeater, s.kind()); + RepeaterInfo *p = s.extrap<RepeaterInfo>(); + std::stack<ssize_t> &nn = boost::tuples::get<0>(*p); + nn.push(n); + } + + void nextRepeatCount(size_t n) { + processImplicitActions(); + Symbol& s = parsingStack.top(); + assertMatch(Symbol::sRepeater, s.kind()); + RepeaterInfo *p = s.extrap<RepeaterInfo>(); + std::stack<ssize_t> &nn = boost::tuples::get<0>(*p); + if (nn.empty() || nn.top() != 0) { + throw Exception("Wrong number of items"); + } + nn.top() = n; + } + + void popRepeater() { + processImplicitActions(); + Symbol& s = parsingStack.top(); + assertMatch(Symbol::sRepeater, s.kind()); + RepeaterInfo *p = s.extrap<RepeaterInfo>(); + std::stack<ssize_t> &ns = boost::tuples::get<0>(*p); + if (ns.empty()) { + throw Exception("Incorrect number of items (empty)"); + } + if (ns.top() > 0) { + throw Exception("Incorrect number of items (non-zero)"); + } + ns.pop(); + parsingStack.pop(); + } + + void selectBranch(size_t n) { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sAlternative, s.kind()); + std::vector<ProductionPtr> v = + s.extra<std::vector<ProductionPtr> >(); + if (n >= v.size()) { + throw Exception("Not that many branches"); + } + parsingStack.pop(); + append(v[n]); + } + + const std::vector<size_t>& sizeList() { + const Symbol& s = parsingStack.top(); + assertMatch(Symbol::sSizeList, s.kind()); + return *s.extrap<std::vector<size_t> >(); + } + + Symbol::Kind top() const { + return parsingStack.top().kind(); + } + + void pop() { + parsingStack.pop(); + } + + void processImplicitActions() { + for (; ;) { + Symbol& s = parsingStack.top(); + if (s.isImplicitAction()) { + handler_.handle(s); + parsingStack.pop(); + } else if (s.kind() == Symbol::sSkipStart) { + parsingStack.pop(); + skip(*decoder_); + } else { + break; + } + } + } + + SimpleParser(const Symbol& s, Decoder* d, Handler& h) : + decoder_(d), handler_(h) { + parsingStack.push(s); + } + + void reset() { + while (parsingStack.size() > 1) { + parsingStack.pop(); + } + } + +}; + +inline std::ostream& operator<<(std::ostream& os, const Symbol s); + +inline std::ostream& operator<<(std::ostream& os, const Production p) +{ + os << '('; + for (Production::const_iterator it = p.begin(); it != p.end(); ++it) { + os << *it << ", "; + } + os << ')'; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const Symbol s) +{ + switch (s.kind()) { + case Symbol::sRepeater: + { + const RepeaterInfo& ri = *s.extrap<RepeaterInfo>(); + os << '(' << Symbol::toString(s.kind()) + << ' ' << *boost::tuples::get<2>(ri) + << ' ' << *boost::tuples::get<3>(ri) + << ')'; + } + break; + case Symbol::sIndirect: + { + os << '(' << Symbol::toString(s.kind()) << ' ' + << *s.extra<std::shared_ptr<Production> >() << ')'; + } + break; + case Symbol::sAlternative: + { + os << '(' << Symbol::toString(s.kind()); + for (std::vector<ProductionPtr>::const_iterator it = + s.extrap<std::vector<ProductionPtr> >()->begin(); + it != s.extrap<std::vector<ProductionPtr> >()->end(); + ++it) { + os << ' ' << **it; + } + os << ')'; + } + break; + case Symbol::sSymbolic: + { + os << '(' << Symbol::toString(s.kind()) + << ' ' << s.extra<std::weak_ptr<Production> >().lock() + << ')'; + } + break; + default: + os << Symbol::toString(s.kind()); + break; + } + return os; + } +} // namespace parsing +} // namespace avro + +#endif diff --git a/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.cc b/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.cc new file mode 100644 index 0000000000..fdf6ef898f --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.cc @@ -0,0 +1,591 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ValidatingCodec.hh" + +#include <string> +#include <map> +#include <algorithm> +#include <memory> +#include <boost/any.hpp> + +#include "ValidSchema.hh" +#include "Decoder.hh" +#include "Encoder.hh" +#include "NodeImpl.hh" + +namespace avro { + +using std::make_shared; + +namespace parsing { + +using std::shared_ptr; +using std::static_pointer_cast; + +using std::map; +using std::vector; +using std::pair; +using std::string; +using std::reverse; +using std::ostringstream; + +/** Follows the design of Avro Parser in Java. */ +ProductionPtr ValidatingGrammarGenerator::generate(const NodePtr& n) +{ + map<NodePtr, ProductionPtr> m; + ProductionPtr result = doGenerate(n, m); + fixup(result, m); + return result; +} + +Symbol ValidatingGrammarGenerator::generate(const ValidSchema& schema) +{ + ProductionPtr r = generate(schema.root()); + return Symbol::rootSymbol(r); +} + +ProductionPtr ValidatingGrammarGenerator::doGenerate(const NodePtr& n, + map<NodePtr, ProductionPtr> &m) { + switch (n->type()) { + case AVRO_NULL: + return make_shared<Production>(1, Symbol::nullSymbol()); + case AVRO_BOOL: + return make_shared<Production>(1, Symbol::boolSymbol()); + case AVRO_INT: + return make_shared<Production>(1, Symbol::intSymbol()); + case AVRO_LONG: + return make_shared<Production>(1, Symbol::longSymbol()); + case AVRO_FLOAT: + return make_shared<Production>(1, Symbol::floatSymbol()); + case AVRO_DOUBLE: + return make_shared<Production>(1, Symbol::doubleSymbol()); + case AVRO_STRING: + return make_shared<Production>(1, Symbol::stringSymbol()); + case AVRO_BYTES: + return make_shared<Production>(1, Symbol::bytesSymbol()); + case AVRO_FIXED: + { + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::sizeCheckSymbol(n->fixedSize())); + result->push_back(Symbol::fixedSymbol()); + m[n] = result; + return result; + } + case AVRO_RECORD: + { + ProductionPtr result = make_shared<Production>(); + + m.erase(n); + size_t c = n->leaves(); + for (size_t i = 0; i < c; ++i) { + const NodePtr& leaf = n->leafAt(i); + ProductionPtr v = doGenerate(leaf, m); + copy(v->rbegin(), v->rend(), back_inserter(*result)); + } + reverse(result->begin(), result->end()); + + m[n] = result; + return make_shared<Production>(1, Symbol::indirect(result)); + } + case AVRO_ENUM: + { + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::sizeCheckSymbol(n->names())); + result->push_back(Symbol::enumSymbol()); + m[n] = result; + return result; + } + case AVRO_ARRAY: + { + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::arrayEndSymbol()); + result->push_back(Symbol::repeater(doGenerate(n->leafAt(0), m), true)); + result->push_back(Symbol::arrayStartSymbol()); + return result; + } + case AVRO_MAP: + { + ProductionPtr pp = doGenerate(n->leafAt(1), m); + ProductionPtr v(new Production(*pp)); + v->push_back(Symbol::stringSymbol()); + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::mapEndSymbol()); + result->push_back(Symbol::repeater(v, false)); + result->push_back(Symbol::mapStartSymbol()); + return result; + } + case AVRO_UNION: + { + vector<ProductionPtr> vv; + size_t c = n->leaves(); + vv.reserve(c); + for (size_t i = 0; i < c; ++i) { + vv.push_back(doGenerate(n->leafAt(i), m)); + } + ProductionPtr result = make_shared<Production>(); + result->push_back(Symbol::alternative(vv)); + result->push_back(Symbol::unionSymbol()); + return result; + } + case AVRO_SYMBOLIC: + { + shared_ptr<NodeSymbolic> ns = static_pointer_cast<NodeSymbolic>(n); + NodePtr nn = ns->getNode(); + map<NodePtr, ProductionPtr>::iterator it = + m.find(nn); + if (it != m.end() && it->second) { + return it->second; + } else { + m[nn] = ProductionPtr(); + return make_shared<Production>(1, Symbol::placeholder(nn)); + } + } + default: + throw Exception("Unknown node type"); + } +} + +struct DummyHandler { + size_t handle(const Symbol& s) { + return 0; + } +}; + +template <typename P> +class ValidatingDecoder : public Decoder { + const shared_ptr<Decoder> base; + DummyHandler handler_; + P parser; + + void init(InputStream& is); + void decodeNull(); + bool decodeBool(); + int32_t decodeInt(); + int64_t decodeLong(); + float decodeFloat(); + double decodeDouble(); + void decodeString(string& value); + void skipString(); + void decodeBytes(vector<uint8_t>& value); + void skipBytes(); + void decodeFixed(size_t n, vector<uint8_t>& value); + void skipFixed(size_t n); + size_t decodeEnum(); + size_t arrayStart(); + size_t arrayNext(); + size_t skipArray(); + size_t mapStart(); + size_t mapNext(); + size_t skipMap(); + size_t decodeUnionIndex(); + void drain() { + base->drain(); + } + +public: + + ValidatingDecoder(const ValidSchema& s, const shared_ptr<Decoder> b) : + base(b), + parser(ValidatingGrammarGenerator().generate(s), NULL, handler_) { } + +}; + +template <typename P> +void ValidatingDecoder<P>::init(InputStream& is) +{ + base->init(is); +} + +template <typename P> +void ValidatingDecoder<P>::decodeNull() +{ + parser.advance(Symbol::sNull); + base->decodeNull(); +} + +template <typename P> +bool ValidatingDecoder<P>::decodeBool() +{ + parser.advance(Symbol::sBool); + return base->decodeBool(); +} + +template <typename P> +int32_t ValidatingDecoder<P>::decodeInt() +{ + parser.advance(Symbol::sInt); + return base->decodeInt(); +} + +template <typename P> +int64_t ValidatingDecoder<P>::decodeLong() +{ + parser.advance(Symbol::sLong); + return base->decodeLong(); +} + +template <typename P> +float ValidatingDecoder<P>::decodeFloat() +{ + parser.advance(Symbol::sFloat); + return base->decodeFloat(); +} + +template <typename P> +double ValidatingDecoder<P>::decodeDouble() +{ + parser.advance(Symbol::sDouble); + return base->decodeDouble(); +} + +template <typename P> +void ValidatingDecoder<P>::decodeString(string& value) +{ + parser.advance(Symbol::sString); + base->decodeString(value); +} + +template <typename P> +void ValidatingDecoder<P>::skipString() +{ + parser.advance(Symbol::sString); + base->skipString(); +} + +template <typename P> +void ValidatingDecoder<P>::decodeBytes(vector<uint8_t>& value) +{ + parser.advance(Symbol::sBytes); + base->decodeBytes(value); +} + +template <typename P> +void ValidatingDecoder<P>::skipBytes() +{ + parser.advance(Symbol::sBytes); + base->skipBytes(); +} + +template <typename P> +void ValidatingDecoder<P>::decodeFixed(size_t n, vector<uint8_t>& value) +{ + parser.advance(Symbol::sFixed); + parser.assertSize(n); + base->decodeFixed(n, value); +} + +template <typename P> +void ValidatingDecoder<P>::skipFixed(size_t n) +{ + parser.advance(Symbol::sFixed); + parser.assertSize(n); + base->skipFixed(n); +} + +template <typename P> +size_t ValidatingDecoder<P>::decodeEnum() +{ + parser.advance(Symbol::sEnum); + size_t result = base->decodeEnum(); + parser.assertLessThanSize(result); + return result; +} + +template <typename P> +size_t ValidatingDecoder<P>::arrayStart() +{ + parser.advance(Symbol::sArrayStart); + size_t result = base->arrayStart(); + parser.pushRepeatCount(result); + if (result == 0) { + parser.popRepeater(); + parser.advance(Symbol::sArrayEnd); + } + return result; +} + +template <typename P> +size_t ValidatingDecoder<P>::arrayNext() +{ + size_t result = base->arrayNext(); + parser.nextRepeatCount(result); + if (result == 0) { + parser.popRepeater(); + parser.advance(Symbol::sArrayEnd); + } + return result; +} + +template <typename P> +size_t ValidatingDecoder<P>::skipArray() +{ + parser.advance(Symbol::sArrayStart); + size_t n = base->skipArray(); + if (n == 0) { + parser.pop(); + } else { + parser.pushRepeatCount(n); + parser.skip(*base); + } + parser.advance(Symbol::sArrayEnd); + return 0; +} + +template <typename P> +size_t ValidatingDecoder<P>::mapStart() +{ + parser.advance(Symbol::sMapStart); + size_t result = base->mapStart(); + parser.pushRepeatCount(result); + if (result == 0) { + parser.popRepeater(); + parser.advance(Symbol::sMapEnd); + } + return result; +} + +template <typename P> +size_t ValidatingDecoder<P>::mapNext() +{ + size_t result = base->mapNext(); + parser.nextRepeatCount(result); + if (result == 0) { + parser.popRepeater(); + parser.advance(Symbol::sMapEnd); + } + return result; +} + +template <typename P> +size_t ValidatingDecoder<P>::skipMap() +{ + parser.advance(Symbol::sMapStart); + size_t n = base->skipMap(); + if (n == 0) { + parser.pop(); + } else { + parser.pushRepeatCount(n); + parser.skip(*base); + } + parser.advance(Symbol::sMapEnd); + return 0; +} + +template <typename P> +size_t ValidatingDecoder<P>::decodeUnionIndex() +{ + parser.advance(Symbol::sUnion); + size_t result = base->decodeUnionIndex(); + parser.selectBranch(result); + return result; +} + +template <typename P> +class ValidatingEncoder : public Encoder { + DummyHandler handler_; + P parser_; + EncoderPtr base_; + + void init(OutputStream& os); + void flush(); + int64_t byteCount() const; + void encodeNull(); + void encodeBool(bool b); + void encodeInt(int32_t i); + void encodeLong(int64_t l); + void encodeFloat(float f); + void encodeDouble(double d); + void encodeString(const std::string& s); + void encodeBytes(const uint8_t *bytes, size_t len); + void encodeFixed(const uint8_t *bytes, size_t len); + void encodeEnum(size_t e); + void arrayStart(); + void arrayEnd(); + void mapStart(); + void mapEnd(); + void setItemCount(size_t count); + void startItem(); + void encodeUnionIndex(size_t e); +public: + ValidatingEncoder(const ValidSchema& schema, const EncoderPtr& base) : + parser_(ValidatingGrammarGenerator().generate(schema), NULL, handler_), + base_(base) { } +}; + +template<typename P> +void ValidatingEncoder<P>::init(OutputStream& os) +{ + base_->init(os); +} + +template<typename P> +void ValidatingEncoder<P>::flush() +{ + base_->flush(); +} + +template<typename P> +void ValidatingEncoder<P>::encodeNull() +{ + parser_.advance(Symbol::sNull); + base_->encodeNull(); +} + +template<typename P> +void ValidatingEncoder<P>::encodeBool(bool b) +{ + parser_.advance(Symbol::sBool); + base_->encodeBool(b); +} + +template<typename P> +void ValidatingEncoder<P>::encodeInt(int32_t i) +{ + parser_.advance(Symbol::sInt); + base_->encodeInt(i); +} + +template<typename P> +void ValidatingEncoder<P>::encodeLong(int64_t l) +{ + parser_.advance(Symbol::sLong); + base_->encodeLong(l); +} + +template<typename P> +void ValidatingEncoder<P>::encodeFloat(float f) +{ + parser_.advance(Symbol::sFloat); + base_->encodeFloat(f); +} + +template<typename P> +void ValidatingEncoder<P>::encodeDouble(double d) +{ + parser_.advance(Symbol::sDouble); + base_->encodeDouble(d); +} + +template<typename P> +void ValidatingEncoder<P>::encodeString(const std::string& s) +{ + parser_.advance(Symbol::sString); + base_->encodeString(s); +} + +template<typename P> +void ValidatingEncoder<P>::encodeBytes(const uint8_t *bytes, size_t len) +{ + parser_.advance(Symbol::sBytes); + base_->encodeBytes(bytes, len); +} + +template<typename P> +void ValidatingEncoder<P>::encodeFixed(const uint8_t *bytes, size_t len) +{ + parser_.advance(Symbol::sFixed); + parser_.assertSize(len); + base_->encodeFixed(bytes, len); +} + +template<typename P> +void ValidatingEncoder<P>::encodeEnum(size_t e) +{ + parser_.advance(Symbol::sEnum); + parser_.assertLessThanSize(e); + base_->encodeEnum(e); +} + +template<typename P> +void ValidatingEncoder<P>::arrayStart() +{ + parser_.advance(Symbol::sArrayStart); + parser_.pushRepeatCount(0); + base_->arrayStart(); +} + +template<typename P> +void ValidatingEncoder<P>::arrayEnd() +{ + parser_.popRepeater(); + parser_.advance(Symbol::sArrayEnd); + base_->arrayEnd(); +} + +template<typename P> +void ValidatingEncoder<P>::mapStart() +{ + parser_.advance(Symbol::sMapStart); + parser_.pushRepeatCount(0); + base_->mapStart(); +} + +template<typename P> +void ValidatingEncoder<P>::mapEnd() +{ + parser_.popRepeater(); + parser_.advance(Symbol::sMapEnd); + base_->mapEnd(); +} + +template<typename P> +void ValidatingEncoder<P>::setItemCount(size_t count) +{ + parser_.nextRepeatCount(count); + base_->setItemCount(count); +} + +template<typename P> +void ValidatingEncoder<P>::startItem() +{ + if (parser_.top() != Symbol::sRepeater) { + throw Exception("startItem at not an item boundary"); + } + base_->startItem(); +} + +template<typename P> +void ValidatingEncoder<P>::encodeUnionIndex(size_t e) +{ + parser_.advance(Symbol::sUnion); + parser_.selectBranch(e); + base_->encodeUnionIndex(e); +} + +template<typename P> +int64_t ValidatingEncoder<P>::byteCount() const +{ + return base_->byteCount(); +} + +} // namespace parsing + +DecoderPtr validatingDecoder(const ValidSchema& s, + const DecoderPtr& base) +{ + return make_shared<parsing::ValidatingDecoder<parsing::SimpleParser<parsing::DummyHandler> > >(s, base); +} + +EncoderPtr validatingEncoder(const ValidSchema& schema, const EncoderPtr& base) +{ + return make_shared<parsing::ValidatingEncoder<parsing::SimpleParser<parsing::DummyHandler> > >(schema, base); +} + +} // namespace avro + diff --git a/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.hh b/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.hh new file mode 100644 index 0000000000..39ceda033e --- /dev/null +++ b/contrib/libs/apache/avro/impl/parsing/ValidatingCodec.hh @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef avro_parsing_ValidatingCodec_hh__ +#define avro_parsing_ValidatingCodec_hh__ + +#include <map> +#include <vector> + +#include "Symbol.hh" +#include "ValidSchema.hh" +#include "NodeImpl.hh" + +namespace avro { +namespace parsing { + +class ValidatingGrammarGenerator { +protected: + template<typename T> + static void doFixup(Production& p, const std::map<T, ProductionPtr> &m); + + template<typename T> + static void doFixup(Symbol &s, const std::map<T, ProductionPtr> &m); + virtual ProductionPtr doGenerate(const NodePtr& n, + std::map<NodePtr, ProductionPtr> &m); + + ProductionPtr generate(const NodePtr& schema); +public: + Symbol generate(const ValidSchema& schema); + +}; + +} // namespace parsing +} // namespace avro + +#endif |