Source code for pamqp.decode

# -*- encoding: utf-8 -*-
"""
Functions for decoding data of various types including field tables and arrays

"""
import datetime
import decimal as _decimal
import typing

from pamqp import common


[docs]def by_type(value: bytes, data_type: str, offset: int = 0) -> typing.Tuple[int, common.FieldValue]: """Decodes values using the specified type :param value: The binary value to decode :param data_type: The data type name of the value :param offset: The starting position of the data in the byte stream :rtype: :class:`tuple` (:class:`int`, :const:`pamqp.common.FieldValue`) :raises ValueError: when the data type is unknown """ if data_type == 'bit': return bit(value, offset) decoder = METHODS.get(data_type) if decoder is None: raise ValueError('Unknown type: {}'.format(data_type)) return decoder(value)
[docs]def bit(value: bytes, position: int) -> typing.Tuple[int, bool]: """Decode a bit value, returning bytes consumed and the value. :param value: The binary value to decode :param position: The position in the byte of the bit value :rtype: :class:`tuple` (:class:`int`, :class:`bool`) :raises ValueError: when the binary data can not be unpacked """ bit_buffer = common.Struct.byte.unpack_from(value)[0] try: return 0, (bit_buffer & (1 << position)) != 0 except TypeError: raise ValueError('Could not unpack bit value')
[docs]def boolean(value: bytes) -> typing.Tuple[int, bool]: """Decode a boolean value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`bool`) :raises ValueError: when the binary data can not be unpacked """ try: return 1, bool(common.Struct.byte.unpack_from(value[0:1])[0]) except TypeError: raise ValueError('Could not unpack boolean value')
[docs]def byte_array(value: bytes) -> typing.Tuple[int, bytearray]: """Decode a byte_array value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`bytearray`) :raises ValueError: when the binary data can not be unpacked """ try: length = common.Struct.integer.unpack(value[0:4])[0] return length + 4, bytearray(value[4:length + 4]) except TypeError: raise ValueError('Could not unpack byte array value')
[docs]def decimal(value: bytes) -> typing.Tuple[int, _decimal.Decimal]: """Decode a decimal value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`decimal.Decimal`) :raises ValueError: when the binary data can not be unpacked """ try: decimals = common.Struct.byte.unpack(value[0:1])[0] raw = common.Struct.integer.unpack(value[1:5])[0] return 5, _decimal.Decimal(raw) * (_decimal.Decimal(10)**-decimals) except TypeError: raise ValueError('Could not unpack decimal value')
[docs]def double(value: bytes) -> typing.Tuple[int, float]: """Decode a double value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`float`) :raises ValueError: when the binary data can not be unpacked """ try: return 8, common.Struct.double.unpack_from(value)[0] except TypeError: raise ValueError('Could not unpack double value')
[docs]def floating_point(value: bytes) -> typing.Tuple[int, float]: """Decode a floating point value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`float`) :raises ValueError: when the binary data can not be unpacked """ try: return 4, common.Struct.float.unpack_from(value)[0] except TypeError: raise ValueError('Could not unpack floating point value')
[docs]def long_int(value: bytes) -> typing.Tuple[int, int]: """Decode a long integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 4, common.Struct.long.unpack(value[0:4])[0] except TypeError: raise ValueError('Could not unpack long integer value')
[docs]def long_uint(value: bytes) -> typing.Tuple[int, int]: """Decode an unsigned long integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 4, common.Struct.ulong.unpack(value[0:4])[0] except TypeError: raise ValueError('Could not unpack unsigned long integer value')
[docs]def long_long_int(value: bytes) -> typing.Tuple[int, int]: """Decode a long-long integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 8, common.Struct.long_long_int.unpack(value[0:8])[0] except TypeError: raise ValueError('Could not unpack long-long integer value')
[docs]def long_str(value: bytes) -> typing.Tuple[int, typing.Union[str, bytes]]: """Decode a string value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`str`) :raises ValueError: when the binary data can not be unpacked """ try: length = common.Struct.integer.unpack(value[0:4])[0] return length + 4, value[4:length + 4].decode('utf-8') except TypeError: raise ValueError('Could not unpack long string value') except UnicodeDecodeError: return length + 4, value[4:length + 4]
[docs]def octet(value: bytes) -> typing.Tuple[int, int]: """Decode an octet value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 1, common.Struct.byte.unpack(value[0:1])[0] except TypeError: raise ValueError('Could not unpack octet value')
[docs]def short_int(value: bytes) -> typing.Tuple[int, int]: """Decode a short integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 2, common.Struct.short.unpack_from(value[0:2])[0] except TypeError: raise ValueError('Could not unpack short integer value')
[docs]def short_uint(value: bytes) -> typing.Tuple[int, int]: """Decode an unsigned short integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 2, common.Struct.ushort.unpack_from(value[0:2])[0] except TypeError: raise ValueError('Could not unpack unsigned short integer value')
[docs]def short_short_int(value: bytes) -> typing.Tuple[int, int]: """Decode a short-short integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 1, common.Struct.short_short_int.unpack_from(value[0:1])[0] except TypeError: raise ValueError('Could not unpack short-short integer value')
[docs]def short_short_uint(value: bytes) -> typing.Tuple[int, int]: """Decode a unsigned short-short integer value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`int`) :raises ValueError: when the binary data can not be unpacked """ try: return 1, common.Struct.short_short_uint.unpack_from(value[0:1])[0] except TypeError: raise ValueError('Could not unpack unsigned short-short integer value')
[docs]def short_str(value: bytes) -> typing.Tuple[int, str]: """Decode a string value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`str`) :raises ValueError: when the binary data can not be unpacked """ try: length = common.Struct.byte.unpack(value[0:1])[0] return length + 1, value[1:length + 1].decode('utf-8') except TypeError: raise ValueError('Could not unpack short string value')
[docs]def timestamp(value: bytes) -> typing.Tuple[int, datetime.datetime]: """Decode a timestamp value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :class:`datetime.datetime`) :raises ValueError: when the binary data can not be unpacked """ try: temp = common.Struct.timestamp.unpack(value[0:8]) return 8, datetime.datetime.utcfromtimestamp(temp[0]) except TypeError: raise ValueError('Could not unpack timestamp value')
[docs]def embedded_value(value: bytes) -> typing.Tuple[int, common.FieldValue]: """Dynamically decode a value based upon the starting byte :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :const:`pamqp.common.FieldValue`) :raises ValueError: when the binary data can not be unpacked """ if not value: return 0, None try: bytes_consumed, temp = TABLE_MAPPING[value[0:1]](value[1:]) except KeyError: raise ValueError('Unknown type: {!r}'.format(value[:1])) return bytes_consumed + 1, temp
[docs]def field_array(value: bytes) -> typing.Tuple[int, common.FieldArray]: """Decode a field array value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :const:`pamqp.common.FieldArray`) :raises ValueError: when the binary data can not be unpacked """ try: length = common.Struct.integer.unpack(value[0:4])[0] offset = 4 data = [] field_array_end = offset + length while offset < field_array_end: consumed, result = embedded_value(value[offset:]) offset += consumed data.append(result) return offset, data except TypeError: raise ValueError('Could not unpack data')
[docs]def field_table(value: bytes) -> typing.Tuple[int, common.FieldTable]: """Decode a field array value, returning bytes consumed and the value. :param value: The binary value to decode :rtype: :class:`tuple` (:class:`int`, :const:`pamqp.common.FieldTable`) :raises ValueError: when the binary data can not be unpacked """ try: length = common.Struct.integer.unpack(value[0:4])[0] offset = 4 data = {} field_table_end = offset + length while offset < field_table_end: key_length = common.Struct.byte.unpack_from(value, offset)[0] offset += 1 key = value[offset:offset + key_length].decode('utf-8') offset += key_length consumed, result = embedded_value(value[offset:]) offset += consumed data[key] = result return field_table_end, data except TypeError: raise ValueError('Could not unpack data')
[docs]def void(_: bytes) -> typing.Tuple[int, None]: """Return a void, no data to decode :param _: The empty bytes object to ignore :rtype: :class:`tuple` (:class:`int`, :const:`None`) """ return 0, None
METHODS = { 'array': field_array, 'bit': bit, 'boolean': boolean, 'byte_array': byte_array, 'decimal': decimal, 'double': double, 'float': floating_point, 'long': long_uint, 'longlong': long_long_int, 'longstr': long_str, 'octet': octet, 'short': short_uint, 'shortstr': short_str, 'table': field_table, 'timestamp': timestamp, 'void': void, } # Define a data type mapping to methods for by_type() # See https://www.rabbitmq.com/amqp-0-9-1-errata.html TABLE_MAPPING = { b't': boolean, b'b': short_short_int, b'B': short_short_uint, b's': short_int, b'u': short_uint, b'I': long_int, b'i': long_uint, b'l': long_long_int, b'L': long_long_int, b'f': floating_point, b'd': double, b'D': decimal, b'S': long_str, b'A': field_array, b'T': timestamp, b'F': field_table, b'V': void, b'\x00': void, # While not documented, have seen this in the wild b'x': byte_array, } # Define a mapping for use in `field_array()` and `field_table()`