"""AMQP Header Class Definitions
For encoding AMQP Header frames into binary AMQP stream data and decoding AMQP
binary data into AMQP Header frames.
"""
import struct
from pamqp import codec
from pamqp import specification
from pamqp import PYTHON3
AMQP = b'AMQP' if PYTHON3 else 'AMQP'
[docs]class ProtocolHeader(object):
"""Class that represents the AMQP Protocol Header"""
name = 'ProtocolHeader'
def __init__(self, major_version=None, minor_version=None, revision=None):
"""Construct a Protocol Header frame object for the specified AMQP
version.
:param int major_version: Major version number
:param int minor_version: Minor version number
:param int revision: Revision number
"""
self.major_version = major_version or specification.VERSION[0]
self.minor_version = minor_version or specification.VERSION[1]
self.revision = revision or specification.VERSION[2]
[docs] def marshal(self):
"""Return the full AMQP wire protocol frame data representation of the
ProtocolHeader frame.
:rtype: str or bytes
"""
return AMQP + struct.pack('BBBB', 0,
self.major_version,
self.minor_version,
self.revision)
[docs] def unmarshal(self, data):
"""Dynamically decode the frame data applying the values to the method
object by iterating through the attributes in order and decoding them.
:param str data: The binary encoded method data
:rtype: int byte count of data used to unmarshal the frame
:raises: ValueError
"""
if data[0:4] == AMQP:
try:
(self.major_version,
self.minor_version,
self.revision) = struct.unpack('BBB', data[5:8])
except struct.error:
raise ValueError('Data did not match the ProtocolHeader '
'format: %r', data)
# All in we consume 8 bytes
return 8
# The first four bytes did not match
raise ValueError('Data did not match the ProtocolHeader format: %r',
data)
[docs]class ContentHeader(object):
"""Represent a content header frame
A Content Header frame is received after a Basic.Deliver or Basic.GetOk
frame and has the data and properties for the Content Body frames that
follow.
"""
name = 'ContentHeader'
def __init__(self, weight=0, body_size=0, properties=None):
"""Initialize the Exchange.DeleteOk class
:param int weight: Unused, must be 0
:param long body_size: The size of the body for the message across all
received AMQP frames
:param specification.Basic.Properties properties: Message properties
"""
self.weight = weight
self.body_size = body_size
self.properties = properties or specification.Basic.Properties()
[docs] def marshal(self):
"""Return the AMQP binary encoded value of the frame
"""
return struct.pack('>HxxQ', specification.Basic.frame_id,
self.body_size) + self.properties.marshal()
[docs] def unmarshal(self, data):
"""Dynamically decode the frame data applying the values to the method
object by iterating through the attributes in order and decoding them.
:param str data: The binary encoded method data
:rtype: int byte count of data used to unmarshal the frame
:raises: ValueError
"""
# Get the class, weight and body size
(self.class_id,
self.weight,
self.body_size) = struct.unpack('>HHQ', data[0:12])
# Get the flags for what properties we have available
offset, flags = self._get_flags(data[12:])
# Demarshal the properties
self.properties.unmarshal(flags, data[12 + offset:])
def _get_flags(self, data):
"""Decode the flags from the data returning the bytes consumed and
flags
:param str data: The data to pull flags out of
:rtype: int, int
"""
# Defaults
bytes_consumed, flags, flagword_index = 0, 0, 0
# Read until we don't have a value pulled out of the flags
while True:
consumed, partial_flags = codec.decode.short_int(data)
bytes_consumed += consumed
flags |= (partial_flags << (flagword_index * 16))
if not partial_flags & 1:
break
flagword_index += 1
# Return the bytes consumed and the flags
return bytes_consumed, flags