Source code for pamqp.header
# -*- encoding: utf-8 -*-
"""
AMQP Header Class Definitions
For encoding AMQP Header frames into binary AMQP stream data and decoding AMQP
binary data into AMQP Header frames.
"""
import struct
import typing
from pamqp import commands, constants, decode
BasicProperties = typing.Optional[commands.Basic.Properties]
[docs]class ProtocolHeader:
"""Class that represents the AMQP Protocol Header"""
name = 'ProtocolHeader'
def __init__(self,
major_version: int = constants.VERSION[0],
minor_version: int = constants.VERSION[1],
revision: int = constants.VERSION[2]):
"""Construct a Protocol Header frame object for the specified AMQP
version.
:param major_version: The AMQP major version (``0``)
:param minor_version: The AMQP major version (``9``)
:param revision: The AMQP major version (``1``)
"""
self.major_version = major_version
self.minor_version = minor_version
self.revision = revision
[docs] def marshal(self) -> bytes:
"""Return the full AMQP wire protocol frame data representation of the
ProtocolHeader frame.
"""
return constants.AMQP + struct.pack(
'BBBB', 0, self.major_version, self.minor_version, self.revision)
[docs] def unmarshal(self, data: bytes) -> int:
"""Dynamically decode the frame data applying the values to the method
object by iterating through the attributes in order and decoding them.
:param data: The frame value to unpack
:raises: ValueError
"""
try:
(self.major_version, self.minor_version,
self.revision) = struct.unpack('BBB', data[5:8])
except struct.error:
raise ValueError(
'Could not unpack protocol header from {!r}'.format(data))
return 8
[docs]class ContentHeader:
"""Represent a content header frame
A Content Header frame is received after a Basic.Deliver or Basic.GetOk
frame and has the data and properties for the Content Body frames that
follow.
"""
name = 'ContentHeader'
def __init__(self,
weight: int = 0,
body_size: int = 0,
properties: typing.Optional[BasicProperties] = None):
"""Initialize the Exchange.DeleteOk class
Weight is unused and must be `0`
:param weight: The unused content weight field
:param body_size: The size in bytes of the message body
:param properties: The message properties
"""
self.class_id = None
self.weight = weight
self.body_size = body_size
self.properties = properties or commands.Basic.Properties()
[docs] def marshal(self) -> bytes:
"""Return the AMQP binary encoded value of the frame"""
return struct.pack('>HxxQ', commands.Basic.frame_id,
self.body_size) + self.properties.marshal()
[docs] def unmarshal(self, data: bytes) -> None:
"""Dynamically decode the frame data applying the values to the method
object by iterating through the attributes in order and decoding them.
:param data: The raw frame data to unmarshal
"""
self.class_id, self.weight, self.body_size = struct.unpack(
'>HHQ', data[0:12])
offset, flags = self._get_flags(data[12:])
self.properties.unmarshal(flags, data[12 + offset:])
@staticmethod
def _get_flags(data: bytes) -> typing.Tuple[int, int]:
"""Decode the flags from the data returning the bytes consumed and
flags.
"""
bytes_consumed, flags, flagword_index = 0, 0, 0
while True:
consumed, partial_flags = decode.short_int(data)
bytes_consumed += consumed
flags |= (partial_flags << (flagword_index * 16))
if not partial_flags & 1: # pragma: nocover
break
flagword_index += 1 # pragma: nocover
return bytes_consumed, flags