Source code for pamqp.commands

"""
The classes inside :mod:`pamqp.commands` allow for the automatic marshaling
and unmarshaling of AMQP method frames and
:class:`Basic.Properties <pamqp.commands.Basic.Properties>`. In addition the
command classes contain information that designates if they are synchronous
commands and if so, what the expected responses are. Each commands arguments
are detailed in the class and are listed in the attributes property.

.. note:: All AMQ classes and methods extend :class:`pamqp.base.Frame`.

"""
# Auto-generated, do not edit this file.
import datetime
import typing
import warnings

from pamqp import base, common, constants


[docs]class Connection: """Work with socket connections The connection class provides methods for a client to establish a network connection to a server, and for both peers to operate the connection thereafter. """ __slots__: typing.List[str] = [] frame_id = 10 # AMQP Frame ID index = 0x000A0000 # pamqp Mapping Index
[docs] class Start(base.Frame): """Start connection negotiation This method starts the connection negotiation process by telling the client the protocol version that the server proposes, along with a list of security mechanisms which the client can use for authentication. :param version_major: Protocol major version - Default: ``0`` :param version_minor: Protocol minor version - Default: ``9`` :param server_properties: Server properties - Default: ``{}`` :type server_properties: :const:`~pamqp.common.FieldTable` :param mechanisms: Available security mechanisms - Default: ``PLAIN`` :param locales: Available message locales - Default: ``en_US`` """ __annotations__: typing.Dict[str, object] = { 'version_major': int, 'version_minor': int, 'server_properties': common.FieldTable, 'mechanisms': str, 'locales': str } __slots__: typing.List[str] = [ # AMQ Method Attributes 'version_major', 'version_minor', 'server_properties', 'mechanisms', 'locales' ] frame_id = 10 # AMQP Frame ID index = 0x000A000A # pamqp Mapping Index name = 'Connection.Start' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.StartOk'] # Class Attribute Types for unmarshaling _version_major = 'octet' _version_minor = 'octet' _server_properties = 'table' _mechanisms = 'longstr' _locales = 'longstr' def __init__(self, version_major: int = 0, version_minor: int = 9, server_properties: typing.Optional[ common.FieldTable] = None, mechanisms: str = 'PLAIN', locales: str = 'en_US') -> None: """Initialize the :class:`Connection.Start` class""" self.version_major = version_major self.version_minor = version_minor self.server_properties = server_properties or {} self.mechanisms = mechanisms self.locales = locales
[docs] class StartOk(base.Frame): """Select security mechanism and locale This method selects a SASL security mechanism. :param client_properties: Client properties - Default: ``{}`` :type client_properties: :const:`~pamqp.common.FieldTable` :param mechanism: Selected security mechanism - Default: ``PLAIN`` :param response: Security response data - Default: ``''`` :param locale: Selected message locale - Default: ``en_US`` """ __annotations__: typing.Dict[str, object] = { 'client_properties': common.FieldTable, 'mechanism': str, 'response': str, 'locale': str } __slots__: typing.List[str] = [ # AMQ Method Attributes 'client_properties', 'mechanism', 'response', 'locale' ] frame_id = 11 # AMQP Frame ID index = 0x000A000B # pamqp Mapping Index name = 'Connection.StartOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _client_properties = 'table' _mechanism = 'shortstr' _response = 'longstr' _locale = 'shortstr' def __init__(self, client_properties: typing.Optional[ common.FieldTable] = None, mechanism: str = 'PLAIN', response: str = '', locale: str = 'en_US') -> None: """Initialize the :class:`Connection.StartOk` class""" self.client_properties = client_properties or {} self.mechanism = mechanism self.response = response self.locale = locale
[docs] class Secure(base.Frame): """Security mechanism challenge The SASL protocol works by exchanging challenges and responses until both peers have received sufficient information to authenticate each other. This method challenges the client to provide more information. :param challenge: Security challenge data """ __annotations__: typing.Dict[str, object] = {'challenge': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'challenge' ] frame_id = 20 # AMQP Frame ID index = 0x000A0014 # pamqp Mapping Index name = 'Connection.Secure' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.SecureOk'] # Class Attribute Types for unmarshaling _challenge = 'longstr' def __init__(self, challenge: typing.Optional[str] = None) -> None: """Initialize the :class:`Connection.Secure` class""" self.challenge = challenge
[docs] class SecureOk(base.Frame): """Security mechanism response This method attempts to authenticate, passing a block of SASL data for the security mechanism at the server side. :param response: Security response data """ __annotations__: typing.Dict[str, object] = {'response': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'response' ] frame_id = 21 # AMQP Frame ID index = 0x000A0015 # pamqp Mapping Index name = 'Connection.SecureOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _response = 'longstr' def __init__(self, response: typing.Optional[str] = None) -> None: """Initialize the :class:`Connection.SecureOk` class""" self.response = response
[docs] class Tune(base.Frame): """Propose connection tuning parameters This method proposes a set of connection configuration values to the client. The client can accept and/or adjust these. :param channel_max: Proposed maximum channels - Default: ``0`` :param frame_max: Proposed maximum frame size - Default: ``0`` :param heartbeat: Desired heartbeat delay - Default: ``0`` """ __annotations__: typing.Dict[str, object] = { 'channel_max': int, 'frame_max': int, 'heartbeat': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'channel_max', 'frame_max', 'heartbeat' ] frame_id = 30 # AMQP Frame ID index = 0x000A001E # pamqp Mapping Index name = 'Connection.Tune' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.TuneOk'] # Class Attribute Types for unmarshaling _channel_max = 'short' _frame_max = 'long' _heartbeat = 'short' def __init__(self, channel_max: int = 0, frame_max: int = 0, heartbeat: int = 0) -> None: """Initialize the :class:`Connection.Tune` class""" self.channel_max = channel_max self.frame_max = frame_max self.heartbeat = heartbeat
[docs] class TuneOk(base.Frame): """Negotiate connection tuning parameters This method sends the client's connection tuning parameters to the server. Certain fields are negotiated, others provide capability information. :param channel_max: Negotiated maximum channels - Default: ``0`` :param frame_max: Negotiated maximum frame size - Default: ``0`` :param heartbeat: Desired heartbeat delay - Default: ``0`` """ __annotations__: typing.Dict[str, object] = { 'channel_max': int, 'frame_max': int, 'heartbeat': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'channel_max', 'frame_max', 'heartbeat' ] frame_id = 31 # AMQP Frame ID index = 0x000A001F # pamqp Mapping Index name = 'Connection.TuneOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _channel_max = 'short' _frame_max = 'long' _heartbeat = 'short' def __init__(self, channel_max: int = 0, frame_max: int = 0, heartbeat: int = 0) -> None: """Initialize the :class:`Connection.TuneOk` class""" self.channel_max = channel_max self.frame_max = frame_max self.heartbeat = heartbeat
[docs] class Open(base.Frame): """Open connection to virtual host This method opens a connection to a virtual host, which is a collection of resources, and acts to separate multiple application domains within a server. The server may apply arbitrary limits per virtual host, such as the number of each type of entity that may be used, per connection and/or in total. :param virtual_host: Virtual host name - Default: ``/`` :param capabilities: Deprecated, must be empty - Default: ``''`` :param insist: Deprecated, must be ``False`` - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'virtual_host': str, 'capabilities': str, 'insist': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'virtual_host', 'capabilities', 'insist' ] frame_id = 40 # AMQP Frame ID index = 0x000A0028 # pamqp Mapping Index name = 'Connection.Open' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.OpenOk'] # Class Attribute Types for unmarshaling _virtual_host = 'shortstr' _capabilities = 'shortstr' _insist = 'bit' def __init__(self, virtual_host: str = '/', capabilities: str = '', insist: bool = False) -> None: """Initialize the :class:`Connection.Open` class""" self.virtual_host = virtual_host self.capabilities = capabilities self.insist = insist self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.virtual_host is not None and len(self.virtual_host) > 127: raise ValueError('Max length exceeded for virtual_host') if self.capabilities is not None and self.capabilities != '': raise ValueError('capabilities must be empty') if self.insist is not None and self.insist is not False: raise ValueError('insist must be False')
[docs] class OpenOk(base.Frame): """Signal that connection is ready This method signals to the client that the connection is ready for use. :param known_hosts: Deprecated, must be empty - Default: ``''`` """ __annotations__: typing.Dict[str, object] = {'known_hosts': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'known_hosts' ] frame_id = 41 # AMQP Frame ID index = 0x000A0029 # pamqp Mapping Index name = 'Connection.OpenOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _known_hosts = 'shortstr' def __init__(self, known_hosts: str = '') -> None: """Initialize the :class:`Connection.OpenOk` class""" self.known_hosts = known_hosts self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.known_hosts is not None and self.known_hosts != '': raise ValueError('known_hosts must be empty')
[docs] class Close(base.Frame): """Request a connection close This method indicates that the sender wants to close the connection. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. :param reply_code: Reply code from server :param reply_text: Localised reply text - Default: ``''`` :param class_id: Failing method class :param method_id: Failing method ID """ __annotations__: typing.Dict[str, object] = { 'reply_code': int, 'reply_text': str, 'class_id': int, 'method_id': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'reply_code', 'reply_text', 'class_id', 'method_id' ] frame_id = 50 # AMQP Frame ID index = 0x000A0032 # pamqp Mapping Index name = 'Connection.Close' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.CloseOk'] # Class Attribute Types for unmarshaling _reply_code = 'short' _reply_text = 'shortstr' _class_id = 'short' _method_id = 'short' def __init__(self, reply_code: typing.Optional[int] = None, reply_text: str = '', class_id: typing.Optional[int] = None, method_id: typing.Optional[int] = None) -> None: """Initialize the :class:`Connection.Close` class""" self.reply_code = reply_code self.reply_text = reply_text or '' self.class_id = class_id self.method_id = method_id
[docs] class CloseOk(base.Frame): """Confirm a connection close This method confirms a :class:`Connection.Close` method and tells the recipient that it is safe to release resources for the connection and close the socket. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 51 # AMQP Frame ID index = 0x000A0033 # pamqp Mapping Index name = 'Connection.CloseOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Blocked(base.Frame): """Indicate that connection is blocked This method indicates that a connection has been blocked and does not accept new publishes. :param reason: Block reason - Default: ``''`` """ __annotations__: typing.Dict[str, object] = {'reason': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'reason' ] frame_id = 60 # AMQP Frame ID index = 0x000A003C # pamqp Mapping Index name = 'Connection.Blocked' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _reason = 'shortstr' def __init__(self, reason: str = '') -> None: """Initialize the :class:`Connection.Blocked` class""" self.reason = reason
[docs] class Unblocked(base.Frame): """Indicate that connection is unblocked This method indicates that a connection has been unblocked and now accepts publishes. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 61 # AMQP Frame ID index = 0x000A003D # pamqp Mapping Index name = 'Connection.Unblocked' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class UpdateSecret(base.Frame): """Update secret This method updates the secret used to authenticate this connection. It is used when secrets have an expiration date and need to be renewed, like OAuth 2 tokens. :param new_secret: New secret :param reason: Reason """ __annotations__: typing.Dict[str, object] = { 'new_secret': str, 'reason': str } __slots__: typing.List[str] = [ # AMQ Method Attributes 'new_secret', 'reason' ] frame_id = 70 # AMQP Frame ID index = 0x000A0046 # pamqp Mapping Index name = 'Connection.UpdateSecret' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Connection.UpdateSecretOk'] # Class Attribute Types for unmarshaling _new_secret = 'longstr' _reason = 'shortstr' def __init__(self, new_secret: typing.Optional[str] = None, reason: typing.Optional[str] = None) -> None: """Initialize the :class:`Connection.UpdateSecret` class""" self.new_secret = new_secret self.reason = reason
[docs] class UpdateSecretOk(base.Frame): """Update secret response This method confirms the updated secret is valid. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 71 # AMQP Frame ID index = 0x000A0047 # pamqp Mapping Index name = 'Connection.UpdateSecretOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Channel: """Work with channels The channel class provides methods for a client to establish a channel to a server and for both peers to operate the channel thereafter. """ __slots__: typing.List[str] = [] frame_id = 20 # AMQP Frame ID index = 0x00140000 # pamqp Mapping Index
[docs] class Open(base.Frame): """Open a channel for use This method opens a channel to the server. :param out_of_band: Protocol level field, do not use, must be ``0``. - Default: ``0`` """ __annotations__: typing.Dict[str, object] = {'out_of_band': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'out_of_band' ] frame_id = 10 # AMQP Frame ID index = 0x0014000A # pamqp Mapping Index name = 'Channel.Open' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Channel.OpenOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _out_of_band = 'shortstr' def __init__(self, out_of_band: str = '0') -> None: """Initialize the :class:`Channel.Open` class""" self.out_of_band = out_of_band self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.out_of_band is not None and self.out_of_band != '0': raise ValueError('out_of_band must be 0')
[docs] class OpenOk(base.Frame): """Signal that the channel is ready This method signals to the client that the channel is ready for use. :param channel_id: Deprecated, must be ``0`` - Default: ``0`` """ __annotations__: typing.Dict[str, object] = {'channel_id': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'channel_id' ] frame_id = 11 # AMQP Frame ID index = 0x0014000B # pamqp Mapping Index name = 'Channel.OpenOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _channel_id = 'longstr' def __init__(self, channel_id: str = '0') -> None: """Initialize the :class:`Channel.OpenOk` class""" self.channel_id = channel_id self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.channel_id is not None and self.channel_id != '0': raise ValueError('channel_id must be 0')
[docs] class Flow(base.Frame): """Enable/disable flow from peer This method asks the peer to pause or restart the flow of content data sent by a consumer. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by :class:`Basic.GetOk` methods. :param active: Start/stop content frames """ __annotations__: typing.Dict[str, object] = {'active': bool} __slots__: typing.List[str] = [ # AMQ Method Attributes 'active' ] frame_id = 20 # AMQP Frame ID index = 0x00140014 # pamqp Mapping Index name = 'Channel.Flow' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Channel.FlowOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _active = 'bit' def __init__(self, active: typing.Optional[bool] = None) -> None: """Initialize the :class:`Channel.Flow` class""" self.active = active
[docs] class FlowOk(base.Frame): """Confirm a flow method Confirms to the peer that a flow command was received and processed. :param active: Current flow setting """ __annotations__: typing.Dict[str, object] = {'active': bool} __slots__: typing.List[str] = [ # AMQ Method Attributes 'active' ] frame_id = 21 # AMQP Frame ID index = 0x00140015 # pamqp Mapping Index name = 'Channel.FlowOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _active = 'bit' def __init__(self, active: typing.Optional[bool] = None) -> None: """Initialize the :class:`Channel.FlowOk` class""" self.active = active
[docs] class Close(base.Frame): """Request a channel close This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. :param reply_code: Reply code from server :param reply_text: Localised reply text - Default: ``''`` :param class_id: Failing method class :param method_id: Failing method ID """ __annotations__: typing.Dict[str, object] = { 'reply_code': int, 'reply_text': str, 'class_id': int, 'method_id': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'reply_code', 'reply_text', 'class_id', 'method_id' ] frame_id = 40 # AMQP Frame ID index = 0x00140028 # pamqp Mapping Index name = 'Channel.Close' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Channel.CloseOk'] # Class Attribute Types for unmarshaling _reply_code = 'short' _reply_text = 'shortstr' _class_id = 'short' _method_id = 'short' def __init__(self, reply_code: typing.Optional[int] = None, reply_text: str = '', class_id: typing.Optional[int] = None, method_id: typing.Optional[int] = None) -> None: """Initialize the :class:`Channel.Close` class""" self.reply_code = reply_code self.reply_text = reply_text or '' self.class_id = class_id self.method_id = method_id
[docs] class CloseOk(base.Frame): """Confirm a channel close This method confirms a :class:`Channel.Close` method and tells the recipient that it is safe to release resources for the channel. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 41 # AMQP Frame ID index = 0x00140029 # pamqp Mapping Index name = 'Channel.CloseOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Exchange: """Work with exchanges Exchanges match and distribute messages across queues. Exchanges can be configured in the server or declared at runtime. """ __slots__: typing.List[str] = [] frame_id = 40 # AMQP Frame ID index = 0x00280000 # pamqp Mapping Index
[docs] class Declare(base.Frame): """Verify exchange exists, create if needed This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. .. note:: The AMQP type argument is referred to as "exchange_type" to not conflict with the Python type keyword. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param exchange: exchange name - Default: ``''`` :param exchange_type: Exchange type - Default: ``direct`` :param passive: Do not create exchange - Default: ``False`` :param durable: Request a durable exchange - Default: ``False`` :param auto_delete: Auto-delete when unused - Default: ``False`` :param internal: Create internal exchange - Default: ``False`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments for declaration - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'exchange': str, 'exchange_type': str, 'passive': bool, 'durable': bool, 'auto_delete': bool, 'internal': bool, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'exchange', 'exchange_type', 'passive', 'durable', 'auto_delete', 'internal', 'nowait', 'arguments' ] frame_id = 10 # AMQP Frame ID index = 0x0028000A # pamqp Mapping Index name = 'Exchange.Declare' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Exchange.DeclareOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _exchange = 'shortstr' _exchange_type = 'shortstr' _passive = 'bit' _durable = 'bit' _auto_delete = 'bit' _internal = 'bit' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, exchange: str = '', exchange_type: str = 'direct', passive: bool = False, durable: bool = False, auto_delete: bool = False, internal: bool = False, nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Exchange.Declare` class""" self.ticket = ticket self.exchange = exchange self.exchange_type = exchange_type self.passive = passive self.durable = durable self.auto_delete = auto_delete self.internal = internal self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange') if self.internal is not None and self.internal is not False: raise ValueError('internal must be False')
[docs] class DeclareOk(base.Frame): """Confirm exchange declaration This method confirms a Declare method and confirms the name of the exchange, essential for automatically-named exchanges. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 11 # AMQP Frame ID index = 0x0028000B # pamqp Mapping Index name = 'Exchange.DeclareOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Delete(base.Frame): """Delete an exchange This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param exchange: exchange name - Default: ``''`` :param if_unused: Delete only if unused - Default: ``False`` :param nowait: Do not send a reply method - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'exchange': str, 'if_unused': bool, 'nowait': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'exchange', 'if_unused', 'nowait' ] frame_id = 20 # AMQP Frame ID index = 0x00280014 # pamqp Mapping Index name = 'Exchange.Delete' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Exchange.DeleteOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _exchange = 'shortstr' _if_unused = 'bit' _nowait = 'bit' def __init__(self, ticket: int = 0, exchange: str = '', if_unused: bool = False, nowait: bool = False) -> None: """Initialize the :class:`Exchange.Delete` class""" self.ticket = ticket self.exchange = exchange self.if_unused = if_unused self.nowait = nowait self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class DeleteOk(base.Frame): """Confirm deletion of an exchange This method confirms the deletion of an exchange. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 21 # AMQP Frame ID index = 0x00280015 # pamqp Mapping Index name = 'Exchange.DeleteOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Bind(base.Frame): """Bind exchange to an exchange This method binds an exchange to an exchange. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param destination: Name of the destination exchange to bind to - Default: ``''`` :param source: Name of the source exchange to bind to - Default: ``''`` :param routing_key: Message routing key - Default: ``''`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments for binding - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'destination': str, 'source': str, 'routing_key': str, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'destination', 'source', 'routing_key', 'nowait', 'arguments' ] frame_id = 30 # AMQP Frame ID index = 0x0028001E # pamqp Mapping Index name = 'Exchange.Bind' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Exchange.BindOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _destination = 'shortstr' _source = 'shortstr' _routing_key = 'shortstr' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, destination: str = '', source: str = '', routing_key: str = '', nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Exchange.Bind` class""" self.ticket = ticket self.destination = destination self.source = source self.routing_key = routing_key self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.destination is not None and len(self.destination) > 127: raise ValueError('Max length exceeded for destination') if self.destination is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.destination): raise ValueError('Invalid value for destination') if self.source is not None and len(self.source) > 127: raise ValueError('Max length exceeded for source') if self.source is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.source): raise ValueError('Invalid value for source')
[docs] class BindOk(base.Frame): """Confirm bind successful This method confirms that the bind was successful. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 31 # AMQP Frame ID index = 0x0028001F # pamqp Mapping Index name = 'Exchange.BindOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Unbind(base.Frame): """Unbind an exchange from an exchange This method unbinds an exchange from an exchange. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param destination: Specifies the name of the destination exchange to unbind. - Default: ``''`` :param source: Specifies the name of the source exchange to unbind. - Default: ``''`` :param routing_key: Routing key of binding - Default: ``''`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments of binding - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'destination': str, 'source': str, 'routing_key': str, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'destination', 'source', 'routing_key', 'nowait', 'arguments' ] frame_id = 40 # AMQP Frame ID index = 0x00280028 # pamqp Mapping Index name = 'Exchange.Unbind' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Exchange.UnbindOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _destination = 'shortstr' _source = 'shortstr' _routing_key = 'shortstr' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, destination: str = '', source: str = '', routing_key: str = '', nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Exchange.Unbind` class""" self.ticket = ticket self.destination = destination self.source = source self.routing_key = routing_key self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.destination is not None and len(self.destination) > 127: raise ValueError('Max length exceeded for destination') if self.destination is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.destination): raise ValueError('Invalid value for destination') if self.source is not None and len(self.source) > 127: raise ValueError('Max length exceeded for source') if self.source is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.source): raise ValueError('Invalid value for source')
[docs] class UnbindOk(base.Frame): """Confirm unbind successful This method confirms that the unbind was successful. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 51 # AMQP Frame ID index = 0x00280033 # pamqp Mapping Index name = 'Exchange.UnbindOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Queue: """Work with queues Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers. """ __slots__: typing.List[str] = [] frame_id = 50 # AMQP Frame ID index = 0x00320000 # pamqp Mapping Index
[docs] class Declare(base.Frame): """Declare queue, create if needed This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: queue name - Default: ``''`` :param passive: Do not create queue - Default: ``False`` :param durable: Request a durable queue - Default: ``False`` :param exclusive: Request an exclusive queue - Default: ``False`` :param auto_delete: Auto-delete queue when unused - Default: ``False`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments for declaration - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'passive': bool, 'durable': bool, 'exclusive': bool, 'auto_delete': bool, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'passive', 'durable', 'exclusive', 'auto_delete', 'nowait', 'arguments' ] frame_id = 10 # AMQP Frame ID index = 0x0032000A # pamqp Mapping Index name = 'Queue.Declare' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Queue.DeclareOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _passive = 'bit' _durable = 'bit' _exclusive = 'bit' _auto_delete = 'bit' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, queue: str = '', passive: bool = False, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Queue.Declare` class""" self.ticket = ticket self.queue = queue self.passive = passive self.durable = durable self.exclusive = exclusive self.auto_delete = auto_delete self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class DeclareOk(base.Frame): """Confirms a queue definition This method confirms a Declare method and confirms the name of the queue, essential for automatically-named queues. :param queue: Reports the name of the queue. If the server generated a queue name, this field contains that name. :param message_count: Number of messages in the queue. :param consumer_count: Number of consumers :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'queue': str, 'message_count': int, 'consumer_count': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'queue', 'message_count', 'consumer_count' ] frame_id = 11 # AMQP Frame ID index = 0x0032000B # pamqp Mapping Index name = 'Queue.DeclareOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _queue = 'shortstr' _message_count = 'long' _consumer_count = 'long' def __init__(self, queue: typing.Optional[str] = None, message_count: typing.Optional[int] = None, consumer_count: typing.Optional[int] = None) -> None: """Initialize the :class:`Queue.DeclareOk` class""" self.queue = queue self.message_count = message_count self.consumer_count = consumer_count self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class Bind(base.Frame): """Bind queue to an exchange This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to bind. - Default: ``''`` :param exchange: Name of the exchange to bind to - Default: ``''`` :param routing_key: Message routing key - Default: ``''`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments for binding - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'exchange': str, 'routing_key': str, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'exchange', 'routing_key', 'nowait', 'arguments' ] frame_id = 20 # AMQP Frame ID index = 0x00320014 # pamqp Mapping Index name = 'Queue.Bind' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Queue.BindOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _exchange = 'shortstr' _routing_key = 'shortstr' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, queue: str = '', exchange: str = '', routing_key: str = '', nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Queue.Bind` class""" self.ticket = ticket self.queue = queue self.exchange = exchange self.routing_key = routing_key self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue') if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class BindOk(base.Frame): """Confirm bind successful This method confirms that the bind was successful. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 21 # AMQP Frame ID index = 0x00320015 # pamqp Mapping Index name = 'Queue.BindOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Purge(base.Frame): """Purge a queue This method removes all messages from a queue which are not awaiting acknowledgment. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to purge. - Default: ``''`` :param nowait: Do not send a reply method - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'nowait': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'nowait' ] frame_id = 30 # AMQP Frame ID index = 0x0032001E # pamqp Mapping Index name = 'Queue.Purge' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Queue.PurgeOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _nowait = 'bit' def __init__(self, ticket: int = 0, queue: str = '', nowait: bool = False) -> None: """Initialize the :class:`Queue.Purge` class""" self.ticket = ticket self.queue = queue self.nowait = nowait self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class PurgeOk(base.Frame): """Confirms a queue purge This method confirms the purge of a queue. :param message_count: Reports the number of messages purged. """ __annotations__: typing.Dict[str, object] = {'message_count': int} __slots__: typing.List[str] = [ # AMQ Method Attributes 'message_count' ] frame_id = 31 # AMQP Frame ID index = 0x0032001F # pamqp Mapping Index name = 'Queue.PurgeOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _message_count = 'long' def __init__(self, message_count: typing.Optional[int] = None) -> None: """Initialize the :class:`Queue.PurgeOk` class""" self.message_count = message_count
[docs] class Delete(base.Frame): """Delete a queue This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to delete. - Default: ``''`` :param if_unused: Delete only if unused - Default: ``False`` :param if_empty: Delete only if empty - Default: ``False`` :param nowait: Do not send a reply method - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'if_unused': bool, 'if_empty': bool, 'nowait': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'if_unused', 'if_empty', 'nowait' ] frame_id = 40 # AMQP Frame ID index = 0x00320028 # pamqp Mapping Index name = 'Queue.Delete' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Queue.DeleteOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _if_unused = 'bit' _if_empty = 'bit' _nowait = 'bit' def __init__(self, ticket: int = 0, queue: str = '', if_unused: bool = False, if_empty: bool = False, nowait: bool = False) -> None: """Initialize the :class:`Queue.Delete` class""" self.ticket = ticket self.queue = queue self.if_unused = if_unused self.if_empty = if_empty self.nowait = nowait self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class DeleteOk(base.Frame): """Confirm deletion of a queue This method confirms the deletion of a queue. :param message_count: Reports the number of messages deleted. """ __annotations__: typing.Dict[str, object] = {'message_count': int} __slots__: typing.List[str] = [ # AMQ Method Attributes 'message_count' ] frame_id = 41 # AMQP Frame ID index = 0x00320029 # pamqp Mapping Index name = 'Queue.DeleteOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _message_count = 'long' def __init__(self, message_count: typing.Optional[int] = None) -> None: """Initialize the :class:`Queue.DeleteOk` class""" self.message_count = message_count
[docs] class Unbind(base.Frame): """Unbind a queue from an exchange This method unbinds a queue from an exchange. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to unbind. - Default: ``''`` :param exchange: The name of the exchange to unbind from. - Default: ``''`` :param routing_key: Routing key of binding - Default: ``''`` :param arguments: Arguments of binding - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'exchange': str, 'routing_key': str, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'exchange', 'routing_key', 'arguments' ] frame_id = 50 # AMQP Frame ID index = 0x00320032 # pamqp Mapping Index name = 'Queue.Unbind' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Queue.UnbindOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _exchange = 'shortstr' _routing_key = 'shortstr' _arguments = 'table' def __init__( self, ticket: int = 0, queue: str = '', exchange: str = '', routing_key: str = '', arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Queue.Unbind` class""" self.ticket = ticket self.queue = queue self.exchange = exchange self.routing_key = routing_key self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue') if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class UnbindOk(base.Frame): """Confirm unbind successful This method confirms that the unbind was successful. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 51 # AMQP Frame ID index = 0x00320033 # pamqp Mapping Index name = 'Queue.UnbindOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Basic: """Work with basic content The Basic class provides methods that support an industry-standard messaging model. """ __slots__: typing.List[str] = [] frame_id = 60 # AMQP Frame ID index = 0x003C0000 # pamqp Mapping Index
[docs] class Qos(base.Frame): """Specify quality of service This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server. :param prefetch_size: Prefetch window in octets - Default: ``0`` :param prefetch_count: Prefetch window in messages - Default: ``0`` :param global_: Apply to entire connection - Default: ``False`` """ __annotations__: typing.Dict[str, object] = { 'prefetch_size': int, 'prefetch_count': int, 'global_': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'prefetch_size', 'prefetch_count', 'global_' ] frame_id = 10 # AMQP Frame ID index = 0x003C000A # pamqp Mapping Index name = 'Basic.Qos' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Basic.QosOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _prefetch_size = 'long' _prefetch_count = 'short' _global_ = 'bit' def __init__(self, prefetch_size: int = 0, prefetch_count: int = 0, global_: bool = False) -> None: """Initialize the :class:`Basic.Qos` class""" self.prefetch_size = prefetch_size self.prefetch_count = prefetch_count self.global_ = global_
[docs] class QosOk(base.Frame): """Confirm the requested qos This method tells the client that the requested QoS levels could be handled by the server. The requested QoS applies to all active consumers until a new QoS is defined. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 11 # AMQP Frame ID index = 0x003C000B # pamqp Mapping Index name = 'Basic.QosOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Consume(base.Frame): """Start a queue consumer This method asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to consume from. - Default: ``''`` :param consumer_tag: Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag. - Default: ``''`` :param no_local: Do not deliver own messages - Default: ``False`` :param no_ack: No acknowledgement needed - Default: ``False`` :param exclusive: Request exclusive access - Default: ``False`` :param nowait: Do not send a reply method - Default: ``False`` :param arguments: Arguments for declaration - Default: ``{}`` :type arguments: :const:`~pamqp.common.Arguments` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'consumer_tag': str, 'no_local': bool, 'no_ack': bool, 'exclusive': bool, 'nowait': bool, 'arguments': common.Arguments } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'consumer_tag', 'no_local', 'no_ack', 'exclusive', 'nowait', 'arguments' ] frame_id = 20 # AMQP Frame ID index = 0x003C0014 # pamqp Mapping Index name = 'Basic.Consume' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Basic.ConsumeOk'] # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _consumer_tag = 'shortstr' _no_local = 'bit' _no_ack = 'bit' _exclusive = 'bit' _nowait = 'bit' _arguments = 'table' def __init__( self, ticket: int = 0, queue: str = '', consumer_tag: str = '', no_local: bool = False, no_ack: bool = False, exclusive: bool = False, nowait: bool = False, arguments: typing.Optional[common.Arguments] = None) -> None: """Initialize the :class:`Basic.Consume` class""" self.ticket = ticket self.queue = queue self.consumer_tag = consumer_tag self.no_local = no_local self.no_ack = no_ack self.exclusive = exclusive self.nowait = nowait self.arguments = arguments or {} self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class ConsumeOk(base.Frame): """Confirm a new consumer The server provides the client with a consumer tag, which is used by the client for methods called on the consumer at a later stage. :param consumer_tag: Holds the consumer tag specified by the client or provided by the server. """ __annotations__: typing.Dict[str, object] = {'consumer_tag': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'consumer_tag' ] frame_id = 21 # AMQP Frame ID index = 0x003C0015 # pamqp Mapping Index name = 'Basic.ConsumeOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _consumer_tag = 'shortstr' def __init__(self, consumer_tag: typing.Optional[str] = None) -> None: """Initialize the :class:`Basic.ConsumeOk` class""" self.consumer_tag = consumer_tag
[docs] class Cancel(base.Frame): """End a queue consumer This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel- ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion. Note that as it is not a MUST for clients to accept this method from the server, it is advisable for the broker to be able to identify those clients that are capable of accepting the method, through some means of capability negotiation. :param consumer_tag: Consumer tag :param nowait: Do not send a reply method - Default: ``False`` """ __annotations__: typing.Dict[str, object] = { 'consumer_tag': str, 'nowait': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'consumer_tag', 'nowait' ] frame_id = 30 # AMQP Frame ID index = 0x003C001E # pamqp Mapping Index name = 'Basic.Cancel' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Basic.CancelOk'] # Valid responses to this method # Class Attribute Types for unmarshaling _consumer_tag = 'shortstr' _nowait = 'bit' def __init__(self, consumer_tag: typing.Optional[str] = None, nowait: bool = False) -> None: """Initialize the :class:`Basic.Cancel` class""" self.consumer_tag = consumer_tag self.nowait = nowait or False
[docs] class CancelOk(base.Frame): """Confirm a cancelled consumer This method confirms that the cancellation was completed. :param consumer_tag: Consumer tag """ __annotations__: typing.Dict[str, object] = {'consumer_tag': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'consumer_tag' ] frame_id = 31 # AMQP Frame ID index = 0x003C001F # pamqp Mapping Index name = 'Basic.CancelOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _consumer_tag = 'shortstr' def __init__(self, consumer_tag: typing.Optional[str] = None) -> None: """Initialize the :class:`Basic.CancelOk` class""" self.consumer_tag = consumer_tag
[docs] class Publish(base.Frame): """Publish a message This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param exchange: Specifies the name of the exchange to publish to. The exchange name can be empty, meaning the default exchange. If the exchange name is specified, and that exchange does not exist, the server will raise a channel exception. - Default: ``''`` :param routing_key: Message routing key - Default: ``''`` :param mandatory: Indicate mandatory routing - Default: ``False`` :param immediate: Request immediate delivery - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'exchange': str, 'routing_key': str, 'mandatory': bool, 'immediate': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'exchange', 'routing_key', 'mandatory', 'immediate' ] frame_id = 40 # AMQP Frame ID index = 0x003C0028 # pamqp Mapping Index name = 'Basic.Publish' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _ticket = 'short' _exchange = 'shortstr' _routing_key = 'shortstr' _mandatory = 'bit' _immediate = 'bit' def __init__(self, ticket: int = 0, exchange: str = '', routing_key: str = '', mandatory: bool = False, immediate: bool = False) -> None: """Initialize the :class:`Basic.Publish` class""" self.ticket = ticket self.exchange = exchange self.routing_key = routing_key self.mandatory = mandatory self.immediate = immediate self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class Return(base.Frame): """Return a failed message This method returns an undeliverable message that was published with the "immediate" flag set, or an unroutable message published with the "mandatory" flag set. The reply code and text provide information about the reason that the message was undeliverable. :param reply_code: Reply code from server :param reply_text: Localised reply text - Default: ``''`` :param exchange: Specifies the name of the exchange that the message was originally published to. May be empty, meaning the default exchange. - Default: ``''`` :param routing_key: Message routing key :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'reply_code': int, 'reply_text': str, 'exchange': str, 'routing_key': str } __slots__: typing.List[str] = [ # AMQ Method Attributes 'reply_code', 'reply_text', 'exchange', 'routing_key' ] frame_id = 50 # AMQP Frame ID index = 0x003C0032 # pamqp Mapping Index name = 'Basic.Return' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _reply_code = 'short' _reply_text = 'shortstr' _exchange = 'shortstr' _routing_key = 'shortstr' def __init__(self, reply_code: typing.Optional[int] = None, reply_text: str = '', exchange: str = '', routing_key: typing.Optional[str] = None) -> None: """Initialize the :class:`Basic.Return` class""" self.reply_code = reply_code self.reply_text = reply_text or '' self.exchange = exchange or '' self.routing_key = routing_key self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class Deliver(base.Frame): """Notify the client of a consumer message This method delivers a message to the client, via a consumer. In the asynchronous message delivery model, the client starts a consumer using the Consume method, then the server responds with Deliver methods as and when messages arrive for that consumer. :param consumer_tag: Consumer tag :param delivery_tag: Server-assigned delivery tag :param redelivered: Message is being redelivered - Default: ``False`` :param exchange: Specifies the name of the exchange that the message was originally published to. May be empty, indicating the default exchange. - Default: ``''`` :param routing_key: Message routing key :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'consumer_tag': str, 'delivery_tag': int, 'redelivered': bool, 'exchange': str, 'routing_key': str } __slots__: typing.List[str] = [ # AMQ Method Attributes 'consumer_tag', 'delivery_tag', 'redelivered', 'exchange', 'routing_key' ] frame_id = 60 # AMQP Frame ID index = 0x003C003C # pamqp Mapping Index name = 'Basic.Deliver' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _consumer_tag = 'shortstr' _delivery_tag = 'longlong' _redelivered = 'bit' _exchange = 'shortstr' _routing_key = 'shortstr' def __init__(self, consumer_tag: typing.Optional[str] = None, delivery_tag: typing.Optional[int] = None, redelivered: bool = False, exchange: str = '', routing_key: typing.Optional[str] = None) -> None: """Initialize the :class:`Basic.Deliver` class""" self.consumer_tag = consumer_tag self.delivery_tag = delivery_tag self.redelivered = redelivered or False self.exchange = exchange or '' self.routing_key = routing_key self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class Get(base.Frame): """Direct access to a queue This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance. :param ticket: Deprecated, must be ``0`` - Default: ``0`` :param queue: Specifies the name of the queue to get a message from. - Default: ``''`` :param no_ack: No acknowledgement needed - Default: ``False`` :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'ticket': int, 'queue': str, 'no_ack': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'ticket', 'queue', 'no_ack' ] frame_id = 70 # AMQP Frame ID index = 0x003C0046 # pamqp Mapping Index name = 'Basic.Get' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Basic.GetOk', 'Basic.GetEmpty'] # Class Attribute Types for unmarshaling _ticket = 'short' _queue = 'shortstr' _no_ack = 'bit' def __init__(self, ticket: int = 0, queue: str = '', no_ack: bool = False) -> None: """Initialize the :class:`Basic.Get` class""" self.ticket = ticket self.queue = queue self.no_ack = no_ack self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.ticket is not None and self.ticket != 0: raise ValueError('ticket must be 0') if self.queue is not None and len(self.queue) > 256: raise ValueError('Max length exceeded for queue') if self.queue is not None and not constants.DOMAIN_REGEX[ 'queue-name'].fullmatch(self.queue): raise ValueError('Invalid value for queue')
[docs] class GetOk(base.Frame): """Provide client with a message This method delivers a message to the client following a get method. A message delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the get method. :param delivery_tag: Server-assigned delivery tag :param redelivered: Message is being redelivered - Default: ``False`` :param exchange: Specifies the name of the exchange that the message was originally published to. If empty, the message was published to the default exchange. - Default: ``''`` :param routing_key: Message routing key :param message_count: Number of messages in the queue. :raises ValueError: when an argument fails to validate """ __annotations__: typing.Dict[str, object] = { 'delivery_tag': int, 'redelivered': bool, 'exchange': str, 'routing_key': str, 'message_count': int } __slots__: typing.List[str] = [ # AMQ Method Attributes 'delivery_tag', 'redelivered', 'exchange', 'routing_key', 'message_count' ] frame_id = 71 # AMQP Frame ID index = 0x003C0047 # pamqp Mapping Index name = 'Basic.GetOk' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _delivery_tag = 'longlong' _redelivered = 'bit' _exchange = 'shortstr' _routing_key = 'shortstr' _message_count = 'long' def __init__(self, delivery_tag: typing.Optional[int] = None, redelivered: bool = False, exchange: str = '', routing_key: typing.Optional[str] = None, message_count: typing.Optional[int] = None) -> None: """Initialize the :class:`Basic.GetOk` class""" self.delivery_tag = delivery_tag self.redelivered = redelivered or False self.exchange = exchange or '' self.routing_key = routing_key self.message_count = message_count self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.exchange is not None and len(self.exchange) > 127: raise ValueError('Max length exceeded for exchange') if self.exchange is not None and not constants.DOMAIN_REGEX[ 'exchange-name'].fullmatch(self.exchange): raise ValueError('Invalid value for exchange')
[docs] class GetEmpty(base.Frame): """Indicate no messages available This method tells the client that the queue has no messages available for the client. :param cluster_id: Deprecated, must be empty - Default: ``''`` """ __annotations__: typing.Dict[str, object] = {'cluster_id': str} __slots__: typing.List[str] = [ # AMQ Method Attributes 'cluster_id' ] frame_id = 72 # AMQP Frame ID index = 0x003C0048 # pamqp Mapping Index name = 'Basic.GetEmpty' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _cluster_id = 'shortstr' def __init__(self, cluster_id: str = '') -> None: """Initialize the :class:`Basic.GetEmpty` class""" self.cluster_id = cluster_id self.validate()
[docs] def validate(self) -> None: """Validate the frame data ensuring all domains or attributes adhere to the protocol specification. :raises ValueError: on validation error """ if self.cluster_id is not None and self.cluster_id != '': raise ValueError('cluster_id must be empty')
[docs] class Ack(base.Frame): """Acknowledge one or more messages When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message. :param delivery_tag: Server-assigned delivery tag - Default: ``0`` :param multiple: Acknowledge multiple messages - Default: ``False`` """ __annotations__: typing.Dict[str, object] = { 'delivery_tag': int, 'multiple': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'delivery_tag', 'multiple' ] frame_id = 80 # AMQP Frame ID index = 0x003C0050 # pamqp Mapping Index name = 'Basic.Ack' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _delivery_tag = 'longlong' _multiple = 'bit' def __init__(self, delivery_tag: int = 0, multiple: bool = False) -> None: """Initialize the :class:`Basic.Ack` class""" self.delivery_tag = delivery_tag self.multiple = multiple
[docs] class Reject(base.Frame): """Reject an incoming message This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. :param delivery_tag: Server-assigned delivery tag :param requeue: Requeue the message - Default: ``True`` """ __annotations__: typing.Dict[str, object] = { 'delivery_tag': int, 'requeue': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'delivery_tag', 'requeue' ] frame_id = 90 # AMQP Frame ID index = 0x003C005A # pamqp Mapping Index name = 'Basic.Reject' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _delivery_tag = 'longlong' _requeue = 'bit' def __init__(self, delivery_tag: typing.Optional[int] = None, requeue: bool = True) -> None: """Initialize the :class:`Basic.Reject` class""" self.delivery_tag = delivery_tag self.requeue = requeue
[docs] class RecoverAsync(base.Frame): """Redeliver unacknowledged messages This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is deprecated in favour of the synchronous Recover/Recover-Ok. .. deprecated:: This command is deprecated in AMQP 0-9-1 :param requeue: Requeue the message - Default: ``False`` """ __annotations__: typing.Dict[str, object] = {'requeue': bool} __slots__: typing.List[str] = [ # AMQ Method Attributes 'requeue' ] frame_id = 100 # AMQP Frame ID index = 0x003C0064 # pamqp Mapping Index name = 'Basic.RecoverAsync' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _requeue = 'bit' def __init__(self, requeue: bool = False) -> None: """Initialize the :class:`Basic.RecoverAsync` class""" self.requeue = requeue warnings.warn(constants.DEPRECATION_WARNING, category=DeprecationWarning)
[docs] class Recover(base.Frame): """Redeliver unacknowledged messages This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover. :param requeue: Requeue the message - Default: ``False`` """ __annotations__: typing.Dict[str, object] = {'requeue': bool} __slots__: typing.List[str] = [ # AMQ Method Attributes 'requeue' ] frame_id = 110 # AMQP Frame ID index = 0x003C006E # pamqp Mapping Index name = 'Basic.Recover' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Basic.RecoverOk'] # Class Attribute Types for unmarshaling _requeue = 'bit' def __init__(self, requeue: bool = False) -> None: """Initialize the :class:`Basic.Recover` class""" self.requeue = requeue
[docs] class RecoverOk(base.Frame): """Confirm recovery This method acknowledges a :class:`Basic.Recover` method. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 111 # AMQP Frame ID index = 0x003C006F # pamqp Mapping Index name = 'Basic.RecoverOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Nack(base.Frame): """Reject one or more incoming messages This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages. :param delivery_tag: Server-assigned delivery tag - Default: ``0`` :param multiple: Reject multiple messages - Default: ``False`` :param requeue: Requeue the message - Default: ``True`` """ __annotations__: typing.Dict[str, object] = { 'delivery_tag': int, 'multiple': bool, 'requeue': bool } __slots__: typing.List[str] = [ # AMQ Method Attributes 'delivery_tag', 'multiple', 'requeue' ] frame_id = 120 # AMQP Frame ID index = 0x003C0078 # pamqp Mapping Index name = 'Basic.Nack' synchronous = False # Indicates if this is a synchronous AMQP method # Class Attribute Types for unmarshaling _delivery_tag = 'longlong' _multiple = 'bit' _requeue = 'bit' def __init__(self, delivery_tag: int = 0, multiple: bool = False, requeue: bool = True) -> None: """Initialize the :class:`Basic.Nack` class""" self.delivery_tag = delivery_tag self.multiple = multiple self.requeue = requeue
[docs] class Properties(base.BasicProperties): """Content Properties .. Note:: The AMQP property type is named ``message_type`` as to not conflict with the Python ``type`` keyword :param content_type: MIME content type :param content_encoding: MIME content encoding :param headers: Message header field table :type headers: typing.Optional[:const:`~pamqp.common.FieldTable`] :param delivery_mode: Non-persistent (1) or persistent (2) :param priority: Message priority, 0 to 9 :param correlation_id: Application correlation identifier :param reply_to: Address to reply to :param expiration: Message expiration specification :param message_id: Application message identifier :param timestamp: Message timestamp :param message_type: Message type name :param user_id: Creating user id :param app_id: Creating application id :param cluster_id: Deprecated, must be empty :raises: ValueError """ __annotations__: typing.Dict[str, object] = { 'content_type': str, 'content_encoding': str, 'headers': common.FieldTable, 'delivery_mode': int, 'priority': int, 'correlation_id': str, 'reply_to': str, 'expiration': str, 'message_id': str, 'timestamp': datetime.datetime, 'message_type': str, 'user_id': str, 'app_id': str, 'cluster_id': str } __slots__: typing.List[str] = [ # AMQ Properties Attributes 'content_type', 'content_encoding', 'headers', 'delivery_mode', 'priority', 'correlation_id', 'reply_to', 'expiration', 'message_id', 'timestamp', 'message_type', 'user_id', 'app_id', 'cluster_id' ] # Flag values for marshaling / unmarshaling flags = { 'content_type': 32768, 'content_encoding': 16384, 'headers': 8192, 'delivery_mode': 4096, 'priority': 2048, 'correlation_id': 1024, 'reply_to': 512, 'expiration': 256, 'message_id': 128, 'timestamp': 64, 'message_type': 32, 'user_id': 16, 'app_id': 8, 'cluster_id': 4 } frame_id = 60 # AMQP Frame ID index = 0x003C # pamqp Mapping Index name = 'Basic.Properties' # Class Attribute Types for unmarshaling _content_type = 'shortstr' _content_encoding = 'shortstr' _headers = 'table' _delivery_mode = 'octet' _priority = 'octet' _correlation_id = 'shortstr' _reply_to = 'shortstr' _expiration = 'shortstr' _message_id = 'shortstr' _timestamp = 'timestamp' _message_type = 'shortstr' _user_id = 'shortstr' _app_id = 'shortstr' _cluster_id = 'shortstr' def __init__(self, content_type: typing.Optional[str] = None, content_encoding: typing.Optional[str] = None, headers: typing.Optional[common.FieldTable] = None, delivery_mode: typing.Optional[int] = None, priority: typing.Optional[int] = None, correlation_id: typing.Optional[str] = None, reply_to: typing.Optional[str] = None, expiration: typing.Optional[str] = None, message_id: typing.Optional[str] = None, timestamp: typing.Optional[datetime.datetime] = None, message_type: typing.Optional[str] = None, user_id: typing.Optional[str] = None, app_id: typing.Optional[str] = None, cluster_id: str = '') -> None: """Initialize the Basic.Properties class""" self.content_type = content_type self.content_encoding = content_encoding self.headers = headers self.delivery_mode = delivery_mode self.priority = priority self.correlation_id = correlation_id self.reply_to = reply_to self.expiration = expiration self.message_id = message_id self.timestamp = timestamp self.message_type = message_type self.user_id = user_id self.app_id = app_id self.cluster_id = cluster_id self.validate()
[docs]class Tx: """Work with transactions The Tx class allows publish and ack operations to be batched into atomic units of work. The intention is that all publish and ack requests issued within a transaction will complete successfully or none of them will. Servers SHOULD implement atomic transactions at least where all publish or ack requests affect a single queue. Transactions that cover multiple queues may be non-atomic, given that queues can be created and destroyed asynchronously, and such events do not form part of any transaction. Further, the behaviour of transactions with respect to the immediate and mandatory flags on :class:`Basic.Publish` methods is not defined. """ __slots__: typing.List[str] = [] frame_id = 90 # AMQP Frame ID index = 0x005A0000 # pamqp Mapping Index
[docs] class Select(base.Frame): """Select standard transaction mode This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 10 # AMQP Frame ID index = 0x005A000A # pamqp Mapping Index name = 'Tx.Select' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Tx.SelectOk'] # Valid responses to this method
[docs] class SelectOk(base.Frame): """Confirm transaction mode This method confirms to the client that the channel was successfully set to use standard transactions. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 11 # AMQP Frame ID index = 0x005A000B # pamqp Mapping Index name = 'Tx.SelectOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Commit(base.Frame): """Commit the current transaction This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 20 # AMQP Frame ID index = 0x005A0014 # pamqp Mapping Index name = 'Tx.Commit' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Tx.CommitOk'] # Valid responses to this method
[docs] class CommitOk(base.Frame): """Confirm a successful commit This method confirms to the client that the commit succeeded. Note that if a commit fails, the server raises a channel exception. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 21 # AMQP Frame ID index = 0x005A0015 # pamqp Mapping Index name = 'Tx.CommitOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Rollback(base.Frame): """Abandon the current transaction This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 30 # AMQP Frame ID index = 0x005A001E # pamqp Mapping Index name = 'Tx.Rollback' synchronous = True # Indicates if this is a synchronous AMQP method valid_responses = ['Tx.RollbackOk'] # Valid responses to this method
[docs] class RollbackOk(base.Frame): """Confirm successful rollback This method confirms to the client that the rollback succeeded. Note that if an rollback fails, the server raises a channel exception. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 31 # AMQP Frame ID index = 0x005A001F # pamqp Mapping Index name = 'Tx.RollbackOk' synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Confirm: """Work with confirms The Confirm class allows publishers to put the channel in confirm mode and subsequently be notified when messages have been handled by the broker. The intention is that all messages published on a channel in confirm mode will be acknowledged at some point. By acknowledging a message the broker assumes responsibility for it and indicates that it has done something it deems reasonable with it. Unroutable mandatory or immediate messages are acknowledged right after the :class:`Basic.Return` method. Messages are acknowledged when all queues to which the message has been routed have either delivered the message and received an acknowledgement (if required), or enqueued the message (and persisted it if required). Published messages are assigned ascending sequence numbers, starting at 1 with the first :class:`Confirm.Select` method. The server confirms messages by sending :class:`Basic.Ack` methods referring to these sequence numbers. """ __slots__: typing.List[str] = [] frame_id = 85 # AMQP Frame ID index = 0x00550000 # pamqp Mapping Index
[docs] class Select(base.Frame): """Select confirm mode (i.e. enable publisher acknowledgements) This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel. :param nowait: Do not send a reply method - Default: ``False`` """ __annotations__: typing.Dict[str, object] = {'nowait': bool} __slots__: typing.List[str] = [ # AMQ Method Attributes 'nowait' ] frame_id = 10 # AMQP Frame ID index = 0x0055000A # pamqp Mapping Index name = 'Confirm.Select' synchronous = True # Indicates if this is a synchronous AMQP method # Valid responses to this method valid_responses = ['Confirm.SelectOk'] # Class Attribute Types for unmarshaling _nowait = 'bit' def __init__(self, nowait: bool = False) -> None: """Initialize the :class:`Confirm.Select` class""" self.nowait = nowait
[docs] class SelectOk(base.Frame): """Acknowledge confirm mode This method confirms to the client that the channel was successfully set to use publisher acknowledgements. """ __annotations__: typing.Dict[str, object] = {} __slots__: typing.List[str] = [] # AMQ Method Attributes frame_id = 11 # AMQP Frame ID index = 0x0055000B # pamqp Mapping Index name = 'Confirm.SelectOk' synchronous = False # Indicates if this is a synchronous AMQP method
# AMQP Class.Method Index Mapping INDEX_MAPPING = { 0x000A000A: Connection.Start, 0x000A000B: Connection.StartOk, 0x000A0014: Connection.Secure, 0x000A0015: Connection.SecureOk, 0x000A001E: Connection.Tune, 0x000A001F: Connection.TuneOk, 0x000A0028: Connection.Open, 0x000A0029: Connection.OpenOk, 0x000A0032: Connection.Close, 0x000A0033: Connection.CloseOk, 0x000A003C: Connection.Blocked, 0x000A003D: Connection.Unblocked, 0x000A0046: Connection.UpdateSecret, 0x000A0047: Connection.UpdateSecretOk, 0x0014000A: Channel.Open, 0x0014000B: Channel.OpenOk, 0x00140014: Channel.Flow, 0x00140015: Channel.FlowOk, 0x00140028: Channel.Close, 0x00140029: Channel.CloseOk, 0x0028000A: Exchange.Declare, 0x0028000B: Exchange.DeclareOk, 0x00280014: Exchange.Delete, 0x00280015: Exchange.DeleteOk, 0x0028001E: Exchange.Bind, 0x0028001F: Exchange.BindOk, 0x00280028: Exchange.Unbind, 0x00280033: Exchange.UnbindOk, 0x0032000A: Queue.Declare, 0x0032000B: Queue.DeclareOk, 0x00320014: Queue.Bind, 0x00320015: Queue.BindOk, 0x0032001E: Queue.Purge, 0x0032001F: Queue.PurgeOk, 0x00320028: Queue.Delete, 0x00320029: Queue.DeleteOk, 0x00320032: Queue.Unbind, 0x00320033: Queue.UnbindOk, 0x003C000A: Basic.Qos, 0x003C000B: Basic.QosOk, 0x003C0014: Basic.Consume, 0x003C0015: Basic.ConsumeOk, 0x003C001E: Basic.Cancel, 0x003C001F: Basic.CancelOk, 0x003C0028: Basic.Publish, 0x003C0032: Basic.Return, 0x003C003C: Basic.Deliver, 0x003C0046: Basic.Get, 0x003C0047: Basic.GetOk, 0x003C0048: Basic.GetEmpty, 0x003C0050: Basic.Ack, 0x003C005A: Basic.Reject, 0x003C0064: Basic.RecoverAsync, 0x003C006E: Basic.Recover, 0x003C006F: Basic.RecoverOk, 0x003C0078: Basic.Nack, 0x005A000A: Tx.Select, 0x005A000B: Tx.SelectOk, 0x005A0014: Tx.Commit, 0x005A0015: Tx.CommitOk, 0x005A001E: Tx.Rollback, 0x005A001F: Tx.RollbackOk, 0x0055000A: Confirm.Select, 0x0055000B: Confirm.SelectOk }