Source code for pamqp.encode

# -*- encoding: utf-8 -*-
"""
Functions for encoding data of various types including field tables and arrays

"""
import calendar
import datetime
import decimal as _decimal
import logging
import struct
import time
import typing

from pamqp import common

LOGGER = logging.getLogger(__name__)

DEPRECATED_RABBITMQ_SUPPORT = False
"""Toggle to support older versions of RabbitMQ."""


[docs]def support_deprecated_rabbitmq(enabled: bool = True) -> None: """Toggle the data types available in field-tables If called with `True`, than RabbitMQ versions, the field-table integer types will not support the full AMQP spec. :param enabled: Specify if deprecated RabbitMQ versions are supported """ global DEPRECATED_RABBITMQ_SUPPORT DEPRECATED_RABBITMQ_SUPPORT = enabled
[docs]def by_type(value: common.FieldValue, data_type: str) -> bytes: """Takes a value of any type and tries to encode it with the specified encoder. :param value: The value to encode :type value: :const:`pamqp.common.FieldValue` :param data_type: The data type name to use for encoding :raises TypeError: when the :data:`data_type` is unknown """ try: return METHODS[str(data_type)](value) except KeyError: raise TypeError('Unknown type: {}'.format(value))
[docs]def bit(value: int, byte: int, position: int) -> int: """Encode a bit value :param value: Value to encode :param byte: The byte to apply the value to :param position: The position in the byte to set the bit on """ return byte | (value << position)
[docs]def boolean(value: bool) -> bytes: """Encode a boolean value :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, bool): raise TypeError('bool required, received {}'.format(type(value))) return common.Struct.short_short_uint.pack(int(value))
[docs]def byte_array(value: bytearray) -> bytes: """Encode a byte array value :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, bytearray): raise TypeError('bytearray required, received {}'.format(type(value))) return common.Struct.integer.pack(len(value)) + value
[docs]def decimal(value: _decimal.Decimal) -> bytes: """Encode a decimal.Decimal value :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, _decimal.Decimal): raise TypeError('decimal.Decimal required, received {}'.format( type(value))) tmp = str(value) if '.' in tmp: decimals = len(tmp.split('.')[-1]) value = value.normalize() raw = int(value * (_decimal.Decimal(10)**decimals)) return struct.pack('>Bi', decimals, raw) return struct.pack('>Bi', 0, int(value))
[docs]def double(value: float) -> bytes: """Encode a floating point value as a double :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, float): raise TypeError('float required, received {}'.format(type(value))) return common.Struct.double.pack(value)
[docs]def floating_point(value: float) -> bytes: """Encode a floating point value :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, float): raise TypeError('float required, received {}'.format(type(value))) return common.Struct.float.pack(value)
[docs]def long_int(value: int) -> bytes: """Encode a long integer :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) elif not (-2147483648 <= value <= 2147483647): raise TypeError('Long integer range: -2147483648 to 2147483647') return common.Struct.long.pack(value)
[docs]def long_uint(value: int) -> bytes: """Encode a long unsigned integer :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) elif not (0 <= value <= 4294967295): raise TypeError('Long unsigned-integer range: 0 to 4294967295') return common.Struct.ulong.pack(value)
[docs]def long_long_int(value: int) -> bytes: """Encode a long-long int :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) elif not (-9223372036854775808 <= value <= 9223372036854775807): raise TypeError('long-long integer range: ' '-9223372036854775808 to 9223372036854775807') return common.Struct.long_long_int.pack(value)
[docs]def long_string(value: str) -> bytes: """Encode a "long string" :param value: Value to encode :raises TypeError: when the value is not the correct type """ return _string(common.Struct.integer, value)
[docs]def octet(value: int) -> bytes: """Encode an octet value :param value: Value to encode :raises TypeError: when the value is not the correct type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) return common.Struct.byte.pack(value)
[docs]def short_int(value: int) -> bytes: """Encode a short integer :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) elif not (-32768 <= value <= 32767): raise TypeError('Short integer range: -32678 to 32767') return common.Struct.short.pack(value)
[docs]def short_uint(value: int) -> bytes: """Encode an unsigned short integer :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if not isinstance(value, int): raise TypeError('int required, received {}'.format(type(value))) elif not (0 <= value <= 65535): raise TypeError('Short unsigned integer range: 0 to 65535') return common.Struct.ushort.pack(value)
[docs]def short_string(value: str) -> bytes: """ Encode a string :param value: Value to encode :raises TypeError: when the value is not the correct type """ return _string(common.Struct.byte, value)
[docs]def timestamp(value: typing.Union[datetime.datetime, time.struct_time]) \ -> bytes: """Encode a datetime.datetime object or time.struct_time :param value: Value to encode :raises TypeError: when the value is not the correct type """ if isinstance(value, datetime.datetime): if value.tzinfo is None or value.tzinfo.utcoffset(value) is None: # assume datetime object is UTC value = value.replace(tzinfo=datetime.timezone.utc) return common.Struct.timestamp.pack(int(value.timestamp())) if isinstance(value, time.struct_time): return common.Struct.timestamp.pack(calendar.timegm(value)) raise TypeError( 'datetime.datetime or time.struct_time required, received {}'.format( type(value)))
[docs]def field_array(value: common.FieldArray) -> bytes: """Encode a field array from a list of values :param value: Value to encode :type value: :const:`pamqp.common.FieldArray` :raises TypeError: when the value is not the correct type """ if not isinstance(value, list): raise TypeError('list of values required, received {}'.format( type(value))) data = [] for item in value: data.append(encode_table_value(item)) output = b''.join(data) return common.Struct.integer.pack(len(output)) + output
[docs]def field_table(value: common.FieldTable) -> bytes: """Encode a field table from a dict :param value: Value to encode :type value: :const:`pamqp.common.FieldTable` :raises TypeError: when the value is not the correct type """ if not value: # If there is no value, return a standard 4 null bytes return common.Struct.integer.pack(0) elif not isinstance(value, dict): raise TypeError('dict required, received {}'.format(type(value))) data = [] for key, value in sorted(value.items()): if len(key) > 128: # field names have 128 char max LOGGER.warning('Truncating key %s to 128 bytes', key) key = key[0:128] data.append(short_string(key)) try: data.append(encode_table_value(value)) except TypeError as err: raise TypeError('{} error: {}/'.format(key, err)) output = b''.join(data) return common.Struct.integer.pack(len(output)) + output
[docs]def table_integer(value: int) -> bytes: """Determines the best type of numeric type to encode value as, preferring the smallest data size first. :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if DEPRECATED_RABBITMQ_SUPPORT: return _deprecated_table_integer(value) if -128 <= value <= 127: return b'b' + octet(value) elif -32768 <= value <= 32767: return b's' + short_int(value) elif 0 <= value <= 65535: return b'u' + short_uint(value) elif -2147483648 <= value <= 2147483647: return b'I' + long_int(value) elif 0 <= value <= 4294967295: return b'i' + long_uint(value) elif -9223372036854775808 <= value <= 9223372036854775807: return b'l' + long_long_int(value) raise TypeError('Unsupported numeric value: {}'.format(value))
def _deprecated_table_integer(value: int) -> bytes: """Determines the best type of numeric type to encode value as, preferring the smallest data size first, supporting versions of RabbitMQ < 3.6 :param value: Value to encode :raises TypeError: when the value is not the correct type or outside the acceptable range for the data type """ if -128 <= value <= 127: return b'b' + octet(value) elif -32768 <= value <= 32767: return b's' + short_int(value) elif -2147483648 <= value <= 2147483647: return b'I' + long_int(value) elif -9223372036854775808 <= value <= 9223372036854775807: return b'l' + long_long_int(value) raise TypeError('Unsupported numeric value: {}'.format(value)) def _string(encoder: struct.Struct, value: str) -> bytes: """Reduce a small amount of duplication in string handling :raises: TypeError """ if not isinstance(value, str): raise TypeError('str required, received {}'.format(type(value))) temp = value.encode('utf-8') return encoder.pack(len(temp)) + temp
[docs]def encode_table_value(value: typing.Union[common.FieldArray, common.FieldTable, common.FieldValue]) -> bytes: """Takes a value of any type and tries to encode it with the proper encoder :param value: Value to encode :type value: :const:`pamqp.common.FieldArray` or :const:`pamqp.common.FieldTable` or :const:`pamqp.common.FieldValue` :raises TypeError: when the type of the value is not supported """ if isinstance(value, bool): return b't' + boolean(value) elif isinstance(value, int): return table_integer(value) elif isinstance(value, _decimal.Decimal): return b'D' + decimal(value) elif isinstance(value, float): return b'f' + floating_point(value) elif isinstance(value, str): return b'S' + long_string(value) elif isinstance(value, (datetime.datetime, time.struct_time)): return b'T' + timestamp(value) elif isinstance(value, dict): return b'F' + field_table(value) elif isinstance(value, list): return b'A' + field_array(value) elif isinstance(value, bytearray): return b'x' + byte_array(value) elif value is None: return b'V' raise TypeError('Unknown type: {} ({!r})'.format(type(value), value))
METHODS = { 'bytearray': byte_array, 'double': double, 'field_array': field_array, 'long': long_uint, 'longlong': long_long_int, 'longstr': long_string, 'octet': octet, 'short': short_uint, 'shortstr': short_string, 'table': field_table, 'timestamp': timestamp, 'void': lambda _: None, }