"""AMQP Data Encoder
This module contains all of the methods required to encode AMQP data types.
There is a rmqid.codec.encode3 module but it is only a Python 3 support overlay
that is transparent to the use of the library.
"""
import calendar
import decimal as _decimal
import datetime
import struct
import time
[docs]def bit(value, byte, position):
"""Encode a bit value
:param int value: Value to decode
:param int byte: The byte to apply the value to
:param int position: The position in the byte to set the bit on
:rtype: byte
"""
return byte | (value << position)
[docs]def boolean(value):
"""Encode a boolean value.
:param bool value: Value to encode
:rtype: str
"""
if not isinstance(value, bool):
raise ValueError("bool type required")
return struct.pack('>B', int(value))
[docs]def decimal(value):
"""Encode a decimal.Decimal value.
:param decimal.Decimal value: Value to encode
:rtype: str
"""
if not isinstance(value, _decimal.Decimal):
raise ValueError("decimal.Decimal type required")
value = value.normalize()
if value._exp < 0:
decimals = -value._exp
raw = int(value * (_decimal.Decimal(10) ** decimals))
return struct.pack('>Bi', decimals, raw)
# per spec, the "decimal.Decimals" octet is unsigned (!)
return struct.pack('>Bi', 0, int(value))
[docs]def floating_point(value):
"""Encode a floating point value.
:param float value: Value to encode
:rtype: str
"""
if not isinstance(value, float):
raise ValueError("float type required")
return struct.pack('>f', value)
[docs]def long_int(value):
"""Encode a long integer.
:param long or int value: Value to encode
:rtype: str
"""
if not isinstance(value, long) and not isinstance(value, int):
raise ValueError("int or long type required")
if value < -2147483648 or value > 2147483647:
raise ValueError("Long integer range: -2147483648 to 2147483647")
return struct.pack('>l', value)
[docs]def long_long_int(value):
"""Encode a long-long int.
:param long or int value: Value to encode
:rtype: str
"""
if not isinstance(value, long) and not isinstance(value, int):
raise ValueError("int or long type required")
if value < -9223372036854775808 or value > 9223372036854775807:
raise ValueError("long-long integer range: \
-9223372036854775808 to 9223372036854775807")
return struct.pack('>q', value)
[docs]def long_string(value):
"""Encode a string.
:param str value: Value to encode
:rtype: str
"""
if not isinstance(value, basestring):
raise ValueError("str or unicode type required")
if isinstance(value, unicode):
value = value.encode('utf-8')
return struct.pack('>I', len(value)) + value
[docs]def octet(value):
"""Encode an octet value.
:param value: Value to encode
:rtype: str
"""
if not isinstance(value, int):
raise ValueError("int type required")
return struct.pack('B', value)
[docs]def short_int(value):
"""Encode a short integer.
:param int value: Value to encode
:rtype: str
"""
if not isinstance(value, int):
raise ValueError("int type required")
if value < -32768 or value > 32767:
raise ValueError("Short range: -32768 to 32767")
return struct.pack('>H', value)
[docs]def short_string(value):
""" Encode a string.
:param str value: Value to encode
:rtype: str
"""
if not isinstance(value, basestring):
raise ValueError("str or unicode type required, received %s:%r" %
type(value), value)
if isinstance(value, unicode):
value = value.encode('utf-8')
return struct.pack('B', len(value)) + value
[docs]def timestamp(value):
"""Encode a datetime.datetime object or time.struct_time.
:param datetime.datetime or time.struct_time value value: Value to encode
:rtype: str
"""
if isinstance(value, datetime.datetime):
value = value.timetuple()
if isinstance(value, time.struct_time):
return struct.pack('>Q', calendar.timegm(value))
raise ValueError("datetime.datetime or time.struct_time type required")
[docs]def field_array(value):
"""Encode a field array from a dictionary.
:param list value: Value to encode
:rtype: str
"""
if not isinstance(value, list):
raise ValueError("list type required")
data = list()
for item in value:
data.append(encode_table_value(item))
output = ''.join(data)
return struct.pack('>I', len(output)) + output
[docs]def field_table(value):
"""Encode a field table from a dictionary.
:param dict value: Value to encode
:rtype: str
"""
# If there is no value, return a standard 4 null bytes
if not value:
return struct.pack('>I', 0)
if not isinstance(value, dict):
raise ValueError("dict type required, got %s", type(value))
# Iterate through all of the keys and encode the data into a table
data = list()
for key in sorted(value.keys()):
# Append the field header / delimiter
data.append(struct.pack('B', len(key)))
data.append(key)
try:
data.append(encode_table_value(value[key]))
except ValueError as err:
raise ValueError("%s error: %s", key, err)
# Join all of the data together as a string
output = ''.join(data)
return struct.pack('>I', len(output)) + output
[docs]def table_integer(value):
"""Determines the best type of numeric type to encode value as, preferring
the smallest data size first.
:param int or long value: Value to encode
:rtype: str
"""
# Send the appropriately sized data value
if -32768 < value < 32767:
return 'U' + short_int(int(value))
elif -2147483648 < value < 2147483647:
return 'I' + long_int(long(value))
elif -9223372036854775808 < value < 9223372036854775807:
return 'L' + long_long_int(long(value))
raise ValueError("Numeric value exceeds long-long-int max: %r" % value)
[docs]def encode_table_value(value):
"""Takes a value of any type and tries to encode it with the proper encoder
:param any value: Value to encode
:rtype: str
"""
# Determine the field type and encode it
if isinstance(value, bool):
result = 't' + boolean(value)
elif isinstance(value, int) or isinstance(value, long):
result = table_integer(value)
elif isinstance(value, _decimal.Decimal):
result = 'D' + decimal(value)
elif isinstance(value, float):
result = 'f' + floating_point(value)
elif isinstance(value, basestring):
result = 'S' + long_string(value)
elif (isinstance(value, datetime.datetime) or
isinstance(value, time.struct_time)):
result = 'T' + timestamp(value)
elif isinstance(value, dict):
result = 'F' + field_table(value)
elif isinstance(value, list):
result = 'A' + field_array(value)
elif value is None:
result = 'V'
else:
raise ValueError("Unknown type: %s (%r)", type(value), value)
# Return the encoded value
return result
[docs]def by_type(value, data_type):
"""Takes a value of any type and tries to encode it with the specified
encoder.
:param any value: Value to encode
:param str data_type: type of data to encode
:rtype: str
"""
# Determine the field type and encode it
if data_type == 'field_array':
return field_array(value)
elif data_type == 'long':
return long_int(value)
elif data_type == 'longlong':
return long_long_int(value)
elif data_type == 'longstr':
return long_string(value)
elif data_type == 'octet':
return octet(value)
elif data_type == 'short':
return short_int(value)
elif data_type == 'shortstr':
return short_string(value)
elif data_type == 'table':
return field_table(value)
elif data_type == 'timestamp':
return timestamp(value)
elif data_type == 'void':
return None
else:
raise ValueError("Unknown type: %s", value)