"""AMQP Data Decoder
The rmqid.codec.decode module contains all of the methods required to decode
AMQP data types including field tables and arrays. There is a
rmqid.codec.decode3 module but it is only a Python 3 support overlay that is
transparent to the use of the library.
"""
import decimal as _decimal
import struct
import time
[docs]def bit(value, position):
"""Decode a bit value
:param str value: Value to decode
:return tuple: bytes used, bool value
:raises: ValueError
"""
bit_buffer = struct.unpack('B', value[0])[0]
try:
return 0, (bit_buffer & (1 << position)) != 0
except TypeError:
raise ValueError('Could not unpack data')
[docs]def boolean(value):
"""Decode a boolean value
:param str value: Value to decode
:return tuple: bytes used, bool
:raises: ValueError
"""
try:
return 1, bool(struct.unpack_from('B', value[0])[0])
except TypeError:
raise ValueError('Could not unpack data')
[docs]def decimal(value):
"""Decode a decimal value
:param str value: Value to decode
:return tuple: bytes used, decimal.Decimal value
:raises: ValueError
"""
try:
decimals = struct.unpack('B', value[0])[0]
raw = struct.unpack('>I', value[1:5])[0]
return 5, _decimal.Decimal(raw) * (_decimal.Decimal(10) ** -decimals)
except TypeError:
raise ValueError('Could not unpack data')
[docs]def floating_point(value):
"""Decode a floating point value
:param str value: Value to decode
:return tuple: bytes used, float
:raises: ValueError
"""
try:
return 4, struct.unpack_from('>f', value)[0]
except TypeError:
raise ValueError('Could not unpack data')
[docs]def long_int(value):
"""Decode a long integer value
:param str value: Value to decode
:return tuple: bytes used, int
:raises: ValueError
"""
try:
return 4, struct.unpack('>l', value[0:4])[0]
except TypeError:
raise ValueError('Could not unpack data')
[docs]def long_long_int(value):
"""Decode a long-long integer value
:param str value: Value to decode
:return tuple: bytes used, int
:raises: ValueError
"""
try:
return 8, struct.unpack('>q', value[0:8])[0]
except TypeError:
raise ValueError('Could not unpack data')
[docs]def long_str(value):
"""Decode a string value
:param str value: Value to decode
:return tuple: bytes used, unicode|str
:raises: ValueError
"""
try:
length = struct.unpack('>I', value[0:4])[0]
value = value[4:length + 4].decode('utf-8')
try:
value = str(value)
except UnicodeEncodeError:
pass
return length + 4, value
except TypeError:
raise ValueError('Could not unpack data')
[docs]def octet(value):
"""Decode an octet value
:param str value: Value to decode
:return tuple: bytes used, int
:raises: ValueError
"""
try:
return 1, struct.unpack('B', value[0])[0]
except TypeError:
raise ValueError('Could not unpack data')
[docs]def short_int(value):
"""Decode a short integer value
:param str value: Value to decode
:return tuple: bytes used, int
:raises: ValueError
"""
try:
return 2, struct.unpack_from('>H', value[0:2])[0]
except TypeError:
raise ValueError('Could not unpack data')
[docs]def short_str(value):
"""Decode a string value
:param value: Value to decode
:type value: str or bytes
:return tuple: bytes used, unicode|str
:raises: ValueError
"""
try:
length = struct.unpack('B', value[0])[0]
value = value[1:length + 1].decode('utf-8')
try:
value = str(value)
except UnicodeEncodeError:
pass
return length + 1, value
except TypeError:
raise ValueError('Could not unpack data')
[docs]def timestamp(value):
"""Decode a timestamp value
:param str value: Value to decode
:return tuple: bytes used, struct_time
:raises: ValueError
"""
try:
return 8, time.gmtime(struct.unpack('>Q', value[0:8])[0])
except TypeError:
raise ValueError('Could not unpack data')
[docs]def field_array(value):
"""Decode a field array value
:param str value: Value to decode
:return tuple: bytes used, list
:raises: ValueError
"""
try:
length = struct.unpack('>I', value[0:4])[0]
offset = 4
data = list()
field_array_end = offset + length
while offset < field_array_end:
consumed, result = _embedded_value(value[offset:])
offset += consumed
data.append(result)
return offset, data
except TypeError:
raise ValueError('Could not unpack data')
[docs]def field_table(value):
"""Decode a field array value
:param str value: Value to decode
:return tuple: bytes used, dict
:raises: ValueError
"""
try:
length = struct.unpack('>I', value[0:4])[0]
offset = 4
data = dict()
field_table_end = offset + length
while offset < field_table_end:
key_length = struct.unpack_from('B', value, offset)[0]
offset += 1
key = value[offset:offset + key_length]
offset += key_length
consumed, result = _embedded_value(value[offset:])
offset += consumed
data[key] = result
return field_table_end, data
except TypeError:
raise ValueError('Could not unpack data')
def _embedded_value(value):
"""Takes in a value looking at the first byte to determine which decoder to
use
:param str value: Value to decode
:return tuple: bytes consumed, mixed
"""
if not value:
return 0, None
# Determine the field type and encode it
if value[0] == 'A':
bytes_consumed, value = field_array(value[1:])
elif value[0] == 'D':
bytes_consumed, value = decimal(value[1:])
elif value[0] == 'f':
bytes_consumed, value = floating_point(value[1:])
elif value[0] == 'F':
bytes_consumed, value = field_table(value[1:])
elif value[0] == 'I':
bytes_consumed, value = long_int(value[1:])
elif value[0] == 'L':
bytes_consumed, value = long_long_int(value[1:])
elif value[0] == 't':
bytes_consumed, value = boolean(value[1:])
elif value[0] == 'T':
bytes_consumed, value = timestamp(value[1:])
elif value[0] == 's':
bytes_consumed, value = short_str(value[1:])
elif value[0] == 'S':
bytes_consumed, value = long_str(value[1:])
elif value[0] == 'U':
bytes_consumed, value = short_int(value[1:])
elif value[0] == 'V':
return 0, None
elif value[0] == '\x00':
return 0, None
else:
raise ValueError('Unknown type "%s"' % value[0])
return bytes_consumed + 1, value
[docs]def by_type(value, data_type, offset=0):
"""Decodes values using the specified type
:param str value: Value to decode
:param str data_type: type of data to decode
:return tuple: bytes consumed, mixed based on field type
"""
# Determine the field type and encode it
if data_type == 'array':
return field_array(value)
elif data_type == 'bit':
return bit(value, offset)
elif data_type == 'boolean':
return boolean(value)
elif data_type == 'decimal':
return decimal(value)
elif data_type == 'float':
return floating_point(value)
elif data_type == 'long':
return long_int(value)
elif data_type == 'longlong':
return long_long_int(value)
elif data_type == 'longstr':
return long_str(value)
elif data_type == 'octet':
return octet(value)
elif data_type == 'short':
return short_int(value)
elif data_type == 'shortstr':
return short_str(value)
elif data_type == 'table':
return field_table(value)
elif data_type == 'timestamp':
return timestamp(value)
elif data_type == 'void':
return None
raise ValueError('Unknown type "%s"' % value)
# Define a data type mapping to methods
METHODS = {'array': field_array,
'bit': bit,
'boolen': boolean,
'decimal': decimal,
'float': floating_point,
'long': long_int,
'longlong': long_long_int,
'longstr': long_str,
'octet': octet,
'short': short_int,
'shortstr': short_str,
'table': field_table,
'timestamp': timestamp}