Source code for pamqp.decode

# -*- encoding: utf-8 -*-
"""
AMQP Data Decoder
=================

Functions for decoding data of various types including field tables and arrays

"""
import decimal as _decimal
import struct
import time

from pamqp import PYTHON3


[docs]class Struct(object): """Simple object for getting to the struct objects""" byte = struct.Struct('B') double = struct.Struct('>d') float = struct.Struct('>f') integer = struct.Struct('>I') ulong = struct.Struct('>L') long = struct.Struct('>l') ushort = struct.Struct('>H') short = struct.Struct('>h') short_short = struct.Struct('>B') timestamp = struct.Struct('>Q')
[docs]def bit(value, position): """Decode a bit value :param bytes value: Value to decode :param int position: The bit position to retrieve :return tuple: bytes used, bool value :raises: ValueError """ bit_buffer = Struct.byte.unpack_from(value)[0] try: return 0, (bit_buffer & (1 << position)) != 0 except TypeError: raise ValueError('Could not unpack data')
[docs]def boolean(value): """Decode a boolean value :param bytes value: Value to decode :return tuple: bytes used, bool :raises: ValueError """ try: return 1, bool(Struct.byte.unpack_from(value[0:1])[0]) except TypeError: raise ValueError('Could not unpack data')
[docs]def byte_array(value): """Decode a byte_array value :param bytes value: Value to decode :return tuple: bytes used, bool :raises: ValueError """ try: length = Struct.integer.unpack(value[0:4])[0] return length + 4, bytearray(value[4:length + 4]) except TypeError: raise ValueError('Could not unpack data')
[docs]def decimal(value): """Decode a decimal value :param bytes value: Value to decode :return tuple: bytes used, decimal.Decimal value :raises: ValueError """ try: decimals = Struct.byte.unpack(value[0:1])[0] raw = Struct.integer.unpack(value[1:5])[0] return 5, _decimal.Decimal(raw) * (_decimal.Decimal(10)**-decimals) except TypeError: raise ValueError('Could not unpack data')
[docs]def double(value): """Decode a double value :param bytes value: Value to decode :return tuple: bytes used, float :raises: ValueError """ try: return 8, Struct.double.unpack_from(value)[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def floating_point(value): """Decode a floating point value :param bytes value: Value to decode :return tuple: bytes used, float :raises: ValueError """ try: return 4, Struct.float.unpack_from(value)[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def long_int(value): """Decode a long integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 4, Struct.long.unpack(value[0:4])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def long_uint(value): """Decode an unsigned long integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 4, Struct.ulong.unpack(value[0:4])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def long_long_int(value): """Decode a long-long integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 8, struct.unpack('>q', value[0:8])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def long_str(value): """Decode a string value :param bytes value: Value to decode :return tuple: bytes used, bytes|str :raises: ValueError """ try: length = Struct.integer.unpack(value[0:4])[0] return length + 4, value[4:length + 4] except TypeError: raise ValueError('Could not unpack data')
[docs]def octet(value): """Decode an octet value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 1, Struct.byte.unpack(value[0:1])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def short_int(value): """Decode a short integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 2, Struct.short.unpack_from(value[0:2])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def short_uint(value): """Decode an unsigned short integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 2, Struct.ushort.unpack_from(value[0:2])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def short_short_int(value): """Decode a short, short integer value :param bytes value: Value to decode :return tuple: bytes used, int :raises: ValueError """ try: return 1, Struct.short_short.unpack_from(value[0:1])[0] except TypeError: raise ValueError('Could not unpack data')
[docs]def short_str(value): """Decode a string value :param bytes value: Value to decode :return tuple: bytes used, unicode|str :raises: ValueError """ try: length = Struct.byte.unpack(value[0:1])[0] return length + 1, _to_str(value[1:length + 1]) except TypeError: raise ValueError('Could not unpack data')
[docs]def timestamp(value): """Decode a timestamp value :param bytes value: Value to decode :return tuple: bytes used, struct_time :raises: ValueError """ try: value = Struct.timestamp.unpack(value[0:8]) return 8, time.gmtime(value[0]) except TypeError: raise ValueError('Could not unpack data')
[docs]def field_array(value): """Decode a field array value :param bytes value: Value to decode :return tuple: bytes used, list :raises: ValueError """ try: length = Struct.integer.unpack(value[0:4])[0] offset = 4 data = [] field_array_end = offset + length while offset < field_array_end: consumed, result = _embedded_value(value[offset:]) offset += consumed data.append(result) return offset, data except TypeError: raise ValueError('Could not unpack data')
[docs]def field_table(value): """Decode a field array value :param bytes value: Value to decode :return tuple: bytes used, dict :raises: ValueError """ try: length = Struct.integer.unpack(value[0:4])[0] offset = 4 data = {} field_table_end = offset + length while offset < field_table_end: key_length = Struct.byte.unpack_from(value, offset)[0] offset += 1 key = _to_str(value[offset:offset + key_length]) offset += key_length consumed, result = _embedded_value(value[offset:]) offset += consumed data[key] = result return field_table_end, data except TypeError: raise ValueError('Could not unpack data')
def _embedded_value(value): """Takes in a value looking at the first byte to determine which decoder to use :param bytes value: Value to decode :return tuple: bytes consumed, mixed """ if not value: return 0, None # Determine the field type and encode it if value[0:1] == b't': bytes_consumed, value = boolean(value[1:]) elif value[0:1] == b'b': bytes_consumed, value = short_short_int(value[1:]) elif value[0:1] == b's': bytes_consumed, value = short_int(value[1:]) elif value[0:1] == b'u': bytes_consumed, value = short_uint(value[1:]) elif value[0:1] == b'I': bytes_consumed, value = long_int(value[1:]) elif value[0:1] == b'i': bytes_consumed, value = long_uint(value[1:]) elif value[0:1] == b'l': bytes_consumed, value = long_long_int(value[1:]) elif value[0:1] == b'f': bytes_consumed, value = floating_point(value[1:]) elif value[0:1] == b'd': bytes_consumed, value = double(value[1:]) elif value[0:1] == b'D': bytes_consumed, value = decimal(value[1:]) elif value[0:1] == b'S': bytes_consumed, value = long_str(value[1:]) elif value[0:1] == b'A': bytes_consumed, value = field_array(value[1:]) elif value[0:1] == b'T': bytes_consumed, value = timestamp(value[1:]) elif value[0:1] == b'F': bytes_consumed, value = field_table(value[1:]) elif value[0:1] == b'V': bytes_consumed, value = 0, None elif value[0:1] == b'x': bytes_consumed, value = byte_array(value[1:]) elif value[0:1] == b'\x00': return 0, None else: raise ValueError('Unknown type: {!r}'.format(value[:1])) return bytes_consumed + 1, value
[docs]def by_type(value, data_type, offset=0): """Decodes values using the specified type :param bytes value: Value to decode :param str data_type: type of data to decode :return tuple: bytes consumed, mixed based on field type """ # Determine the field type and encode it data_type = str(data_type) if data_type == 'array': return field_array(value) elif data_type == 'bit': return bit(value, offset) elif data_type == 'boolean': return boolean(value) elif data_type == 'byte_array': return byte_array(value) elif data_type == 'decimal': return decimal(value) elif data_type == 'double': return double(value) elif data_type == 'float': return floating_point(value) elif data_type == 'long': return long_uint(value) elif data_type == 'longlong': return long_long_int(value) elif data_type == 'longstr': return long_str(value) elif data_type == 'octet': return octet(value) elif data_type == 'short': return short_uint(value) elif data_type == 'shortstr': return short_str(value) elif data_type == 'table': return field_table(value) elif data_type == 'timestamp': return timestamp(value) elif data_type == 'void': return None raise ValueError('Unknown type: {}'.format(data_type))
def _to_str(value): """Ensure the field table keys are strings (unicode or otherwise) :param bytes value: The value to try and decode to unicode :rtype: unicode or str """ if PYTHON3: return value.decode('utf-8') try: return str(value.decode('utf-8')) except UnicodeEncodeError: return value.decode('utf-8') # Define a data type mapping to methods METHODS = { 'array': field_array, 'bit': bit, 'boolean': boolean, 'byte_array': byte_array, 'decimal': decimal, 'double': double, 'float': floating_point, 'long': long_uint, 'longlong': long_long_int, 'longstr': long_str, 'octet': octet, 'short': short_uint, 'shortstr': short_str, 'table': field_table, 'timestamp': timestamp }