"""
The classes inside :mod:`pamqp.commands` allow for the automatic marshaling
and unmarshaling of AMQP method frames and
:class:`Basic.Properties <pamqp.commands.Basic.Properties>`. In addition the
command classes contain information that designates if they are synchronous
commands and if so, what the expected responses are. Each commands arguments
are detailed in the class and are listed in the attributes property.
.. note:: All AMQ classes and methods extend :class:`pamqp.base.Frame`.
"""
# Auto-generated, do not edit this file.
import datetime
import typing
import warnings
from pamqp import base, common, constants
[docs]class Connection:
"""Work with socket connections
The connection class provides methods for a client to establish a network
connection to a server, and for both peers to operate the connection
thereafter.
"""
__slots__: typing.List[str] = []
frame_id = 10 # AMQP Frame ID
index = 0x000A0000 # pamqp Mapping Index
[docs] class Start(base.Frame):
"""Start connection negotiation
This method starts the connection negotiation process by telling the
client the protocol version that the server proposes, along with a list
of security mechanisms which the client can use for authentication.
:param version_major: Protocol major version
- Default: ``0``
:param version_minor: Protocol minor version
- Default: ``9``
:param server_properties: Server properties
- Default: ``{}``
:type server_properties: :const:`~pamqp.common.FieldTable`
:param mechanisms: Available security mechanisms
- Default: ``PLAIN``
:param locales: Available message locales
- Default: ``en_US``
"""
__annotations__: typing.Dict[str, object] = {
'version_major': int,
'version_minor': int,
'server_properties': common.FieldTable,
'mechanisms': str,
'locales': str
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'version_major', 'version_minor', 'server_properties',
'mechanisms', 'locales'
]
frame_id = 10 # AMQP Frame ID
index = 0x000A000A # pamqp Mapping Index
name = 'Connection.Start'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.StartOk']
# Class Attribute Types for unmarshaling
_version_major = 'octet'
_version_minor = 'octet'
_server_properties = 'table'
_mechanisms = 'longstr'
_locales = 'longstr'
def __init__(self,
version_major: int = 0,
version_minor: int = 9,
server_properties: typing.Optional[
common.FieldTable] = None,
mechanisms: str = 'PLAIN',
locales: str = 'en_US') -> None:
"""Initialize the :class:`Connection.Start` class"""
self.version_major = version_major
self.version_minor = version_minor
self.server_properties = server_properties or {}
self.mechanisms = mechanisms
self.locales = locales
[docs] class StartOk(base.Frame):
"""Select security mechanism and locale
This method selects a SASL security mechanism.
:param client_properties: Client properties
- Default: ``{}``
:type client_properties: :const:`~pamqp.common.FieldTable`
:param mechanism: Selected security mechanism
- Default: ``PLAIN``
:param response: Security response data
- Default: ``''``
:param locale: Selected message locale
- Default: ``en_US``
"""
__annotations__: typing.Dict[str, object] = {
'client_properties': common.FieldTable,
'mechanism': str,
'response': str,
'locale': str
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'client_properties', 'mechanism', 'response', 'locale'
]
frame_id = 11 # AMQP Frame ID
index = 0x000A000B # pamqp Mapping Index
name = 'Connection.StartOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_client_properties = 'table'
_mechanism = 'shortstr'
_response = 'longstr'
_locale = 'shortstr'
def __init__(self,
client_properties: typing.Optional[
common.FieldTable] = None,
mechanism: str = 'PLAIN',
response: str = '',
locale: str = 'en_US') -> None:
"""Initialize the :class:`Connection.StartOk` class"""
self.client_properties = client_properties or {}
self.mechanism = mechanism
self.response = response
self.locale = locale
[docs] class Secure(base.Frame):
"""Security mechanism challenge
The SASL protocol works by exchanging challenges and responses until
both peers have received sufficient information to authenticate each
other. This method challenges the client to provide more information.
:param challenge: Security challenge data
"""
__annotations__: typing.Dict[str, object] = {'challenge': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'challenge'
]
frame_id = 20 # AMQP Frame ID
index = 0x000A0014 # pamqp Mapping Index
name = 'Connection.Secure'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.SecureOk']
# Class Attribute Types for unmarshaling
_challenge = 'longstr'
def __init__(self, challenge: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Connection.Secure` class"""
self.challenge = challenge
[docs] class SecureOk(base.Frame):
"""Security mechanism response
This method attempts to authenticate, passing a block of SASL data for
the security mechanism at the server side.
:param response: Security response data
"""
__annotations__: typing.Dict[str, object] = {'response': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'response'
]
frame_id = 21 # AMQP Frame ID
index = 0x000A0015 # pamqp Mapping Index
name = 'Connection.SecureOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_response = 'longstr'
def __init__(self, response: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Connection.SecureOk` class"""
self.response = response
[docs] class Tune(base.Frame):
"""Propose connection tuning parameters
This method proposes a set of connection configuration values to the
client. The client can accept and/or adjust these.
:param channel_max: Proposed maximum channels
- Default: ``0``
:param frame_max: Proposed maximum frame size
- Default: ``0``
:param heartbeat: Desired heartbeat delay
- Default: ``0``
"""
__annotations__: typing.Dict[str, object] = {
'channel_max': int,
'frame_max': int,
'heartbeat': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'channel_max', 'frame_max', 'heartbeat'
]
frame_id = 30 # AMQP Frame ID
index = 0x000A001E # pamqp Mapping Index
name = 'Connection.Tune'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.TuneOk']
# Class Attribute Types for unmarshaling
_channel_max = 'short'
_frame_max = 'long'
_heartbeat = 'short'
def __init__(self,
channel_max: int = 0,
frame_max: int = 0,
heartbeat: int = 0) -> None:
"""Initialize the :class:`Connection.Tune` class"""
self.channel_max = channel_max
self.frame_max = frame_max
self.heartbeat = heartbeat
[docs] class TuneOk(base.Frame):
"""Negotiate connection tuning parameters
This method sends the client's connection tuning parameters to the
server. Certain fields are negotiated, others provide capability
information.
:param channel_max: Negotiated maximum channels
- Default: ``0``
:param frame_max: Negotiated maximum frame size
- Default: ``0``
:param heartbeat: Desired heartbeat delay
- Default: ``0``
"""
__annotations__: typing.Dict[str, object] = {
'channel_max': int,
'frame_max': int,
'heartbeat': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'channel_max', 'frame_max', 'heartbeat'
]
frame_id = 31 # AMQP Frame ID
index = 0x000A001F # pamqp Mapping Index
name = 'Connection.TuneOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_channel_max = 'short'
_frame_max = 'long'
_heartbeat = 'short'
def __init__(self,
channel_max: int = 0,
frame_max: int = 0,
heartbeat: int = 0) -> None:
"""Initialize the :class:`Connection.TuneOk` class"""
self.channel_max = channel_max
self.frame_max = frame_max
self.heartbeat = heartbeat
[docs] class Open(base.Frame):
"""Open connection to virtual host
This method opens a connection to a virtual host, which is a collection
of resources, and acts to separate multiple application domains within
a server. The server may apply arbitrary limits per virtual host, such
as the number of each type of entity that may be used, per connection
and/or in total.
:param virtual_host: Virtual host name
- Default: ``/``
:param capabilities: Deprecated, must be empty
- Default: ``''``
:param insist: Deprecated, must be ``False``
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'virtual_host': str,
'capabilities': str,
'insist': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'virtual_host', 'capabilities', 'insist'
]
frame_id = 40 # AMQP Frame ID
index = 0x000A0028 # pamqp Mapping Index
name = 'Connection.Open'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.OpenOk']
# Class Attribute Types for unmarshaling
_virtual_host = 'shortstr'
_capabilities = 'shortstr'
_insist = 'bit'
def __init__(self,
virtual_host: str = '/',
capabilities: str = '',
insist: bool = False) -> None:
"""Initialize the :class:`Connection.Open` class"""
self.virtual_host = virtual_host
self.capabilities = capabilities
self.insist = insist
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.virtual_host is not None and len(self.virtual_host) > 127:
raise ValueError('Max length exceeded for virtual_host')
if self.capabilities is not None and self.capabilities != '':
raise ValueError('capabilities must be empty')
if self.insist is not None and self.insist is not False:
raise ValueError('insist must be False')
[docs] class OpenOk(base.Frame):
"""Signal that connection is ready
This method signals to the client that the connection is ready for use.
:param known_hosts: Deprecated, must be empty
- Default: ``''``
"""
__annotations__: typing.Dict[str, object] = {'known_hosts': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'known_hosts'
]
frame_id = 41 # AMQP Frame ID
index = 0x000A0029 # pamqp Mapping Index
name = 'Connection.OpenOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_known_hosts = 'shortstr'
def __init__(self, known_hosts: str = '') -> None:
"""Initialize the :class:`Connection.OpenOk` class"""
self.known_hosts = known_hosts
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.known_hosts is not None and self.known_hosts != '':
raise ValueError('known_hosts must be empty')
[docs] class Close(base.Frame):
"""Request a connection close
This method indicates that the sender wants to close the connection.
This may be due to internal conditions (e.g. a forced shut-down) or due
to an error handling a specific method, i.e. an exception. When a close
is due to an exception, the sender provides the class and method id of
the method which caused the exception.
:param reply_code: Reply code from server
:param reply_text: Localised reply text
- Default: ``''``
:param class_id: Failing method class
:param method_id: Failing method ID
"""
__annotations__: typing.Dict[str, object] = {
'reply_code': int,
'reply_text': str,
'class_id': int,
'method_id': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'reply_code', 'reply_text', 'class_id', 'method_id'
]
frame_id = 50 # AMQP Frame ID
index = 0x000A0032 # pamqp Mapping Index
name = 'Connection.Close'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.CloseOk']
# Class Attribute Types for unmarshaling
_reply_code = 'short'
_reply_text = 'shortstr'
_class_id = 'short'
_method_id = 'short'
def __init__(self,
reply_code: typing.Optional[int] = None,
reply_text: str = '',
class_id: typing.Optional[int] = None,
method_id: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Connection.Close` class"""
self.reply_code = reply_code
self.reply_text = reply_text or ''
self.class_id = class_id
self.method_id = method_id
[docs] class CloseOk(base.Frame):
"""Confirm a connection close
This method confirms a :class:`Connection.Close` method and tells the
recipient that it is safe to release resources for the connection and
close the socket.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 51 # AMQP Frame ID
index = 0x000A0033 # pamqp Mapping Index
name = 'Connection.CloseOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Blocked(base.Frame):
"""Indicate that connection is blocked
This method indicates that a connection has been blocked and does not
accept new publishes.
:param reason: Block reason
- Default: ``''``
"""
__annotations__: typing.Dict[str, object] = {'reason': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'reason'
]
frame_id = 60 # AMQP Frame ID
index = 0x000A003C # pamqp Mapping Index
name = 'Connection.Blocked'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_reason = 'shortstr'
def __init__(self, reason: str = '') -> None:
"""Initialize the :class:`Connection.Blocked` class"""
self.reason = reason
[docs] class Unblocked(base.Frame):
"""Indicate that connection is unblocked
This method indicates that a connection has been unblocked and now
accepts publishes.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 61 # AMQP Frame ID
index = 0x000A003D # pamqp Mapping Index
name = 'Connection.Unblocked'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class UpdateSecret(base.Frame):
"""Update secret
This method updates the secret used to authenticate this connection. It
is used when secrets have an expiration date and need to be renewed,
like OAuth 2 tokens.
:param new_secret: New secret
:param reason: Reason
"""
__annotations__: typing.Dict[str, object] = {
'new_secret': str,
'reason': str
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'new_secret', 'reason'
]
frame_id = 70 # AMQP Frame ID
index = 0x000A0046 # pamqp Mapping Index
name = 'Connection.UpdateSecret'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Connection.UpdateSecretOk']
# Class Attribute Types for unmarshaling
_new_secret = 'longstr'
_reason = 'shortstr'
def __init__(self,
new_secret: typing.Optional[str] = None,
reason: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Connection.UpdateSecret` class"""
self.new_secret = new_secret
self.reason = reason
[docs] class UpdateSecretOk(base.Frame):
"""Update secret response
This method confirms the updated secret is valid.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 71 # AMQP Frame ID
index = 0x000A0047 # pamqp Mapping Index
name = 'Connection.UpdateSecretOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Channel:
"""Work with channels
The channel class provides methods for a client to establish a channel to a
server and for both peers to operate the channel thereafter.
"""
__slots__: typing.List[str] = []
frame_id = 20 # AMQP Frame ID
index = 0x00140000 # pamqp Mapping Index
[docs] class Open(base.Frame):
"""Open a channel for use
This method opens a channel to the server.
:param out_of_band: Protocol level field, do not use, must be ``0``.
- Default: ``0``
"""
__annotations__: typing.Dict[str, object] = {'out_of_band': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'out_of_band'
]
frame_id = 10 # AMQP Frame ID
index = 0x0014000A # pamqp Mapping Index
name = 'Channel.Open'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Channel.OpenOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_out_of_band = 'shortstr'
def __init__(self, out_of_band: str = '0') -> None:
"""Initialize the :class:`Channel.Open` class"""
self.out_of_band = out_of_band
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.out_of_band is not None and self.out_of_band != '0':
raise ValueError('out_of_band must be 0')
[docs] class OpenOk(base.Frame):
"""Signal that the channel is ready
This method signals to the client that the channel is ready for use.
:param channel_id: Deprecated, must be ``0``
- Default: ``0``
"""
__annotations__: typing.Dict[str, object] = {'channel_id': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'channel_id'
]
frame_id = 11 # AMQP Frame ID
index = 0x0014000B # pamqp Mapping Index
name = 'Channel.OpenOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_channel_id = 'longstr'
def __init__(self, channel_id: str = '0') -> None:
"""Initialize the :class:`Channel.OpenOk` class"""
self.channel_id = channel_id
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.channel_id is not None and self.channel_id != '0':
raise ValueError('channel_id must be 0')
[docs] class Flow(base.Frame):
"""Enable/disable flow from peer
This method asks the peer to pause or restart the flow of content data
sent by a consumer. This is a simple flow-control mechanism that a peer
can use to avoid overflowing its queues or otherwise finding itself
receiving more messages than it can process. Note that this method is
not intended for window control. It does not affect contents returned
by :class:`Basic.GetOk` methods.
:param active: Start/stop content frames
"""
__annotations__: typing.Dict[str, object] = {'active': bool}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'active'
]
frame_id = 20 # AMQP Frame ID
index = 0x00140014 # pamqp Mapping Index
name = 'Channel.Flow'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Channel.FlowOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_active = 'bit'
def __init__(self, active: typing.Optional[bool] = None) -> None:
"""Initialize the :class:`Channel.Flow` class"""
self.active = active
[docs] class FlowOk(base.Frame):
"""Confirm a flow method
Confirms to the peer that a flow command was received and processed.
:param active: Current flow setting
"""
__annotations__: typing.Dict[str, object] = {'active': bool}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'active'
]
frame_id = 21 # AMQP Frame ID
index = 0x00140015 # pamqp Mapping Index
name = 'Channel.FlowOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_active = 'bit'
def __init__(self, active: typing.Optional[bool] = None) -> None:
"""Initialize the :class:`Channel.FlowOk` class"""
self.active = active
[docs] class Close(base.Frame):
"""Request a channel close
This method indicates that the sender wants to close the channel. This
may be due to internal conditions (e.g. a forced shut-down) or due to
an error handling a specific method, i.e. an exception. When a close is
due to an exception, the sender provides the class and method id of the
method which caused the exception.
:param reply_code: Reply code from server
:param reply_text: Localised reply text
- Default: ``''``
:param class_id: Failing method class
:param method_id: Failing method ID
"""
__annotations__: typing.Dict[str, object] = {
'reply_code': int,
'reply_text': str,
'class_id': int,
'method_id': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'reply_code', 'reply_text', 'class_id', 'method_id'
]
frame_id = 40 # AMQP Frame ID
index = 0x00140028 # pamqp Mapping Index
name = 'Channel.Close'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Channel.CloseOk']
# Class Attribute Types for unmarshaling
_reply_code = 'short'
_reply_text = 'shortstr'
_class_id = 'short'
_method_id = 'short'
def __init__(self,
reply_code: typing.Optional[int] = None,
reply_text: str = '',
class_id: typing.Optional[int] = None,
method_id: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Channel.Close` class"""
self.reply_code = reply_code
self.reply_text = reply_text or ''
self.class_id = class_id
self.method_id = method_id
[docs] class CloseOk(base.Frame):
"""Confirm a channel close
This method confirms a :class:`Channel.Close` method and tells the
recipient that it is safe to release resources for the channel.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 41 # AMQP Frame ID
index = 0x00140029 # pamqp Mapping Index
name = 'Channel.CloseOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Exchange:
"""Work with exchanges
Exchanges match and distribute messages across queues. Exchanges can be
configured in the server or declared at runtime.
"""
__slots__: typing.List[str] = []
frame_id = 40 # AMQP Frame ID
index = 0x00280000 # pamqp Mapping Index
[docs] class Declare(base.Frame):
"""Verify exchange exists, create if needed
This method creates an exchange if it does not already exist, and if
the exchange exists, verifies that it is of the correct and expected
class.
.. note:: The AMQP type argument is referred to as "exchange_type" to
not conflict with the Python type keyword.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param exchange: exchange name
- Default: ``''``
:param exchange_type: Exchange type
- Default: ``direct``
:param passive: Do not create exchange
- Default: ``False``
:param durable: Request a durable exchange
- Default: ``False``
:param auto_delete: Auto-delete when unused
- Default: ``False``
:param internal: Create internal exchange
- Default: ``False``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments for declaration
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'exchange': str,
'exchange_type': str,
'passive': bool,
'durable': bool,
'auto_delete': bool,
'internal': bool,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'exchange', 'exchange_type', 'passive', 'durable',
'auto_delete', 'internal', 'nowait', 'arguments'
]
frame_id = 10 # AMQP Frame ID
index = 0x0028000A # pamqp Mapping Index
name = 'Exchange.Declare'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Exchange.DeclareOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_exchange = 'shortstr'
_exchange_type = 'shortstr'
_passive = 'bit'
_durable = 'bit'
_auto_delete = 'bit'
_internal = 'bit'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
exchange: str = '',
exchange_type: str = 'direct',
passive: bool = False,
durable: bool = False,
auto_delete: bool = False,
internal: bool = False,
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Exchange.Declare` class"""
self.ticket = ticket
self.exchange = exchange
self.exchange_type = exchange_type
self.passive = passive
self.durable = durable
self.auto_delete = auto_delete
self.internal = internal
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
if self.internal is not None and self.internal is not False:
raise ValueError('internal must be False')
[docs] class DeclareOk(base.Frame):
"""Confirm exchange declaration
This method confirms a Declare method and confirms the name of the
exchange, essential for automatically-named exchanges.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 11 # AMQP Frame ID
index = 0x0028000B # pamqp Mapping Index
name = 'Exchange.DeclareOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Delete(base.Frame):
"""Delete an exchange
This method deletes an exchange. When an exchange is deleted all queue
bindings on the exchange are cancelled.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param exchange: exchange name
- Default: ``''``
:param if_unused: Delete only if unused
- Default: ``False``
:param nowait: Do not send a reply method
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'exchange': str,
'if_unused': bool,
'nowait': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'exchange', 'if_unused', 'nowait'
]
frame_id = 20 # AMQP Frame ID
index = 0x00280014 # pamqp Mapping Index
name = 'Exchange.Delete'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Exchange.DeleteOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_exchange = 'shortstr'
_if_unused = 'bit'
_nowait = 'bit'
def __init__(self,
ticket: int = 0,
exchange: str = '',
if_unused: bool = False,
nowait: bool = False) -> None:
"""Initialize the :class:`Exchange.Delete` class"""
self.ticket = ticket
self.exchange = exchange
self.if_unused = if_unused
self.nowait = nowait
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class DeleteOk(base.Frame):
"""Confirm deletion of an exchange
This method confirms the deletion of an exchange.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 21 # AMQP Frame ID
index = 0x00280015 # pamqp Mapping Index
name = 'Exchange.DeleteOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Bind(base.Frame):
"""Bind exchange to an exchange
This method binds an exchange to an exchange.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param destination: Name of the destination exchange to bind to
- Default: ``''``
:param source: Name of the source exchange to bind to
- Default: ``''``
:param routing_key: Message routing key
- Default: ``''``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments for binding
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'destination': str,
'source': str,
'routing_key': str,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'destination', 'source', 'routing_key', 'nowait',
'arguments'
]
frame_id = 30 # AMQP Frame ID
index = 0x0028001E # pamqp Mapping Index
name = 'Exchange.Bind'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Exchange.BindOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_destination = 'shortstr'
_source = 'shortstr'
_routing_key = 'shortstr'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
destination: str = '',
source: str = '',
routing_key: str = '',
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Exchange.Bind` class"""
self.ticket = ticket
self.destination = destination
self.source = source
self.routing_key = routing_key
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.destination is not None and len(self.destination) > 127:
raise ValueError('Max length exceeded for destination')
if self.destination is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.destination):
raise ValueError('Invalid value for destination')
if self.source is not None and len(self.source) > 127:
raise ValueError('Max length exceeded for source')
if self.source is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.source):
raise ValueError('Invalid value for source')
[docs] class BindOk(base.Frame):
"""Confirm bind successful
This method confirms that the bind was successful.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 31 # AMQP Frame ID
index = 0x0028001F # pamqp Mapping Index
name = 'Exchange.BindOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Unbind(base.Frame):
"""Unbind an exchange from an exchange
This method unbinds an exchange from an exchange.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param destination: Specifies the name of the destination exchange to
unbind.
- Default: ``''``
:param source: Specifies the name of the source exchange to unbind.
- Default: ``''``
:param routing_key: Routing key of binding
- Default: ``''``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments of binding
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'destination': str,
'source': str,
'routing_key': str,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'destination', 'source', 'routing_key', 'nowait',
'arguments'
]
frame_id = 40 # AMQP Frame ID
index = 0x00280028 # pamqp Mapping Index
name = 'Exchange.Unbind'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Exchange.UnbindOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_destination = 'shortstr'
_source = 'shortstr'
_routing_key = 'shortstr'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
destination: str = '',
source: str = '',
routing_key: str = '',
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Exchange.Unbind` class"""
self.ticket = ticket
self.destination = destination
self.source = source
self.routing_key = routing_key
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.destination is not None and len(self.destination) > 127:
raise ValueError('Max length exceeded for destination')
if self.destination is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.destination):
raise ValueError('Invalid value for destination')
if self.source is not None and len(self.source) > 127:
raise ValueError('Max length exceeded for source')
if self.source is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.source):
raise ValueError('Invalid value for source')
[docs] class UnbindOk(base.Frame):
"""Confirm unbind successful
This method confirms that the unbind was successful.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 51 # AMQP Frame ID
index = 0x00280033 # pamqp Mapping Index
name = 'Exchange.UnbindOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Queue:
"""Work with queues
Queues store and forward messages. Queues can be configured in the server
or created at runtime. Queues must be attached to at least one exchange in
order to receive messages from publishers.
"""
__slots__: typing.List[str] = []
frame_id = 50 # AMQP Frame ID
index = 0x00320000 # pamqp Mapping Index
[docs] class Declare(base.Frame):
"""Declare queue, create if needed
This method creates or checks a queue. When creating a new queue the
client can specify various properties that control the durability of
the queue and its contents, and the level of sharing for the queue.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: queue name
- Default: ``''``
:param passive: Do not create queue
- Default: ``False``
:param durable: Request a durable queue
- Default: ``False``
:param exclusive: Request an exclusive queue
- Default: ``False``
:param auto_delete: Auto-delete queue when unused
- Default: ``False``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments for declaration
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'passive': bool,
'durable': bool,
'exclusive': bool,
'auto_delete': bool,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'passive', 'durable', 'exclusive',
'auto_delete', 'nowait', 'arguments'
]
frame_id = 10 # AMQP Frame ID
index = 0x0032000A # pamqp Mapping Index
name = 'Queue.Declare'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Queue.DeclareOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_passive = 'bit'
_durable = 'bit'
_exclusive = 'bit'
_auto_delete = 'bit'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
queue: str = '',
passive: bool = False,
durable: bool = False,
exclusive: bool = False,
auto_delete: bool = False,
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Queue.Declare` class"""
self.ticket = ticket
self.queue = queue
self.passive = passive
self.durable = durable
self.exclusive = exclusive
self.auto_delete = auto_delete
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class DeclareOk(base.Frame):
"""Confirms a queue definition
This method confirms a Declare method and confirms the name of the
queue, essential for automatically-named queues.
:param queue: Reports the name of the queue. If the server generated a
queue name, this field contains that name.
:param message_count: Number of messages in the queue.
:param consumer_count: Number of consumers
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'queue': str,
'message_count': int,
'consumer_count': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'queue', 'message_count', 'consumer_count'
]
frame_id = 11 # AMQP Frame ID
index = 0x0032000B # pamqp Mapping Index
name = 'Queue.DeclareOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_queue = 'shortstr'
_message_count = 'long'
_consumer_count = 'long'
def __init__(self,
queue: typing.Optional[str] = None,
message_count: typing.Optional[int] = None,
consumer_count: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Queue.DeclareOk` class"""
self.queue = queue
self.message_count = message_count
self.consumer_count = consumer_count
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class Bind(base.Frame):
"""Bind queue to an exchange
This method binds a queue to an exchange. Until a queue is bound it
will not receive any messages. In a classic messaging model, store-and-
forward queues are bound to a direct exchange and subscription queues
are bound to a topic exchange.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to bind.
- Default: ``''``
:param exchange: Name of the exchange to bind to
- Default: ``''``
:param routing_key: Message routing key
- Default: ``''``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments for binding
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'exchange': str,
'routing_key': str,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'exchange', 'routing_key', 'nowait', 'arguments'
]
frame_id = 20 # AMQP Frame ID
index = 0x00320014 # pamqp Mapping Index
name = 'Queue.Bind'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Queue.BindOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_exchange = 'shortstr'
_routing_key = 'shortstr'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
queue: str = '',
exchange: str = '',
routing_key: str = '',
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Queue.Bind` class"""
self.ticket = ticket
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class BindOk(base.Frame):
"""Confirm bind successful
This method confirms that the bind was successful.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 21 # AMQP Frame ID
index = 0x00320015 # pamqp Mapping Index
name = 'Queue.BindOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Purge(base.Frame):
"""Purge a queue
This method removes all messages from a queue which are not awaiting
acknowledgment.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to purge.
- Default: ``''``
:param nowait: Do not send a reply method
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'nowait': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'nowait'
]
frame_id = 30 # AMQP Frame ID
index = 0x0032001E # pamqp Mapping Index
name = 'Queue.Purge'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Queue.PurgeOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_nowait = 'bit'
def __init__(self,
ticket: int = 0,
queue: str = '',
nowait: bool = False) -> None:
"""Initialize the :class:`Queue.Purge` class"""
self.ticket = ticket
self.queue = queue
self.nowait = nowait
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class PurgeOk(base.Frame):
"""Confirms a queue purge
This method confirms the purge of a queue.
:param message_count: Reports the number of messages purged.
"""
__annotations__: typing.Dict[str, object] = {'message_count': int}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'message_count'
]
frame_id = 31 # AMQP Frame ID
index = 0x0032001F # pamqp Mapping Index
name = 'Queue.PurgeOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_message_count = 'long'
def __init__(self, message_count: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Queue.PurgeOk` class"""
self.message_count = message_count
[docs] class Delete(base.Frame):
"""Delete a queue
This method deletes a queue. When a queue is deleted any pending
messages are sent to a dead-letter queue if this is defined in the
server configuration, and all consumers on the queue are cancelled.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to delete.
- Default: ``''``
:param if_unused: Delete only if unused
- Default: ``False``
:param if_empty: Delete only if empty
- Default: ``False``
:param nowait: Do not send a reply method
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'if_unused': bool,
'if_empty': bool,
'nowait': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'if_unused', 'if_empty', 'nowait'
]
frame_id = 40 # AMQP Frame ID
index = 0x00320028 # pamqp Mapping Index
name = 'Queue.Delete'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Queue.DeleteOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_if_unused = 'bit'
_if_empty = 'bit'
_nowait = 'bit'
def __init__(self,
ticket: int = 0,
queue: str = '',
if_unused: bool = False,
if_empty: bool = False,
nowait: bool = False) -> None:
"""Initialize the :class:`Queue.Delete` class"""
self.ticket = ticket
self.queue = queue
self.if_unused = if_unused
self.if_empty = if_empty
self.nowait = nowait
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class DeleteOk(base.Frame):
"""Confirm deletion of a queue
This method confirms the deletion of a queue.
:param message_count: Reports the number of messages deleted.
"""
__annotations__: typing.Dict[str, object] = {'message_count': int}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'message_count'
]
frame_id = 41 # AMQP Frame ID
index = 0x00320029 # pamqp Mapping Index
name = 'Queue.DeleteOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_message_count = 'long'
def __init__(self, message_count: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Queue.DeleteOk` class"""
self.message_count = message_count
[docs] class Unbind(base.Frame):
"""Unbind a queue from an exchange
This method unbinds a queue from an exchange.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to unbind.
- Default: ``''``
:param exchange: The name of the exchange to unbind from.
- Default: ``''``
:param routing_key: Routing key of binding
- Default: ``''``
:param arguments: Arguments of binding
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'exchange': str,
'routing_key': str,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'exchange', 'routing_key', 'arguments'
]
frame_id = 50 # AMQP Frame ID
index = 0x00320032 # pamqp Mapping Index
name = 'Queue.Unbind'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Queue.UnbindOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_exchange = 'shortstr'
_routing_key = 'shortstr'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
queue: str = '',
exchange: str = '',
routing_key: str = '',
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Queue.Unbind` class"""
self.ticket = ticket
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class UnbindOk(base.Frame):
"""Confirm unbind successful
This method confirms that the unbind was successful.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 51 # AMQP Frame ID
index = 0x00320033 # pamqp Mapping Index
name = 'Queue.UnbindOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Basic:
"""Work with basic content
The Basic class provides methods that support an industry-standard
messaging model.
"""
__slots__: typing.List[str] = []
frame_id = 60 # AMQP Frame ID
index = 0x003C0000 # pamqp Mapping Index
[docs] class Qos(base.Frame):
"""Specify quality of service
This method requests a specific quality of service. The QoS can be
specified for the current channel or for all channels on the
connection. The particular properties and semantics of a qos method
always depend on the content class semantics. Though the qos method
could in principle apply to both peers, it is currently meaningful only
for the server.
:param prefetch_size: Prefetch window in octets
- Default: ``0``
:param prefetch_count: Prefetch window in messages
- Default: ``0``
:param global_: Apply to entire connection
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {
'prefetch_size': int,
'prefetch_count': int,
'global_': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'prefetch_size', 'prefetch_count', 'global_'
]
frame_id = 10 # AMQP Frame ID
index = 0x003C000A # pamqp Mapping Index
name = 'Basic.Qos'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Basic.QosOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_prefetch_size = 'long'
_prefetch_count = 'short'
_global_ = 'bit'
def __init__(self,
prefetch_size: int = 0,
prefetch_count: int = 0,
global_: bool = False) -> None:
"""Initialize the :class:`Basic.Qos` class"""
self.prefetch_size = prefetch_size
self.prefetch_count = prefetch_count
self.global_ = global_
[docs] class QosOk(base.Frame):
"""Confirm the requested qos
This method tells the client that the requested QoS levels could be
handled by the server. The requested QoS applies to all active
consumers until a new QoS is defined.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 11 # AMQP Frame ID
index = 0x003C000B # pamqp Mapping Index
name = 'Basic.QosOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Consume(base.Frame):
"""Start a queue consumer
This method asks the server to start a "consumer", which is a transient
request for messages from a specific queue. Consumers last as long as
the channel they were declared on, or until the client cancels them.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to consume from.
- Default: ``''``
:param consumer_tag: Specifies the identifier for the consumer. The
consumer tag is local to a channel, so two clients can use the same
consumer tags. If this field is empty the server will generate a
unique tag.
- Default: ``''``
:param no_local: Do not deliver own messages
- Default: ``False``
:param no_ack: No acknowledgement needed
- Default: ``False``
:param exclusive: Request exclusive access
- Default: ``False``
:param nowait: Do not send a reply method
- Default: ``False``
:param arguments: Arguments for declaration
- Default: ``{}``
:type arguments: :const:`~pamqp.common.Arguments`
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'consumer_tag': str,
'no_local': bool,
'no_ack': bool,
'exclusive': bool,
'nowait': bool,
'arguments': common.Arguments
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'consumer_tag', 'no_local', 'no_ack',
'exclusive', 'nowait', 'arguments'
]
frame_id = 20 # AMQP Frame ID
index = 0x003C0014 # pamqp Mapping Index
name = 'Basic.Consume'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Basic.ConsumeOk']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_consumer_tag = 'shortstr'
_no_local = 'bit'
_no_ack = 'bit'
_exclusive = 'bit'
_nowait = 'bit'
_arguments = 'table'
def __init__(
self,
ticket: int = 0,
queue: str = '',
consumer_tag: str = '',
no_local: bool = False,
no_ack: bool = False,
exclusive: bool = False,
nowait: bool = False,
arguments: typing.Optional[common.Arguments] = None) -> None:
"""Initialize the :class:`Basic.Consume` class"""
self.ticket = ticket
self.queue = queue
self.consumer_tag = consumer_tag
self.no_local = no_local
self.no_ack = no_ack
self.exclusive = exclusive
self.nowait = nowait
self.arguments = arguments or {}
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class ConsumeOk(base.Frame):
"""Confirm a new consumer
The server provides the client with a consumer tag, which is used by
the client for methods called on the consumer at a later stage.
:param consumer_tag: Holds the consumer tag specified by the client or
provided by the server.
"""
__annotations__: typing.Dict[str, object] = {'consumer_tag': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'consumer_tag'
]
frame_id = 21 # AMQP Frame ID
index = 0x003C0015 # pamqp Mapping Index
name = 'Basic.ConsumeOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_consumer_tag = 'shortstr'
def __init__(self, consumer_tag: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Basic.ConsumeOk` class"""
self.consumer_tag = consumer_tag
[docs] class Cancel(base.Frame):
"""End a queue consumer
This method cancels a consumer. This does not affect already delivered
messages, but it does mean the server will not send any more messages
for that consumer. The client may receive an arbitrary number of
messages in between sending the cancel method and receiving the cancel-
ok reply. It may also be sent from the server to the client in the
event of the consumer being unexpectedly cancelled (i.e. cancelled for
any reason other than the server receiving the corresponding
basic.cancel from the client). This allows clients to be notified of
the loss of consumers due to events such as queue deletion. Note that
as it is not a MUST for clients to accept this method from the server,
it is advisable for the broker to be able to identify those clients
that are capable of accepting the method, through some means of
capability negotiation.
:param consumer_tag: Consumer tag
:param nowait: Do not send a reply method
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {
'consumer_tag': str,
'nowait': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'consumer_tag', 'nowait'
]
frame_id = 30 # AMQP Frame ID
index = 0x003C001E # pamqp Mapping Index
name = 'Basic.Cancel'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Basic.CancelOk'] # Valid responses to this method
# Class Attribute Types for unmarshaling
_consumer_tag = 'shortstr'
_nowait = 'bit'
def __init__(self,
consumer_tag: typing.Optional[str] = None,
nowait: bool = False) -> None:
"""Initialize the :class:`Basic.Cancel` class"""
self.consumer_tag = consumer_tag
self.nowait = nowait or False
[docs] class CancelOk(base.Frame):
"""Confirm a cancelled consumer
This method confirms that the cancellation was completed.
:param consumer_tag: Consumer tag
"""
__annotations__: typing.Dict[str, object] = {'consumer_tag': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'consumer_tag'
]
frame_id = 31 # AMQP Frame ID
index = 0x003C001F # pamqp Mapping Index
name = 'Basic.CancelOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_consumer_tag = 'shortstr'
def __init__(self, consumer_tag: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Basic.CancelOk` class"""
self.consumer_tag = consumer_tag
[docs] class Publish(base.Frame):
"""Publish a message
This method publishes a message to a specific exchange. The message
will be routed to queues as defined by the exchange configuration and
distributed to any active consumers when the transaction, if any, is
committed.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param exchange: Specifies the name of the exchange to publish to. The
exchange name can be empty, meaning the default exchange. If the
exchange name is specified, and that exchange does not exist, the
server will raise a channel exception.
- Default: ``''``
:param routing_key: Message routing key
- Default: ``''``
:param mandatory: Indicate mandatory routing
- Default: ``False``
:param immediate: Request immediate delivery
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'exchange': str,
'routing_key': str,
'mandatory': bool,
'immediate': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'exchange', 'routing_key', 'mandatory', 'immediate'
]
frame_id = 40 # AMQP Frame ID
index = 0x003C0028 # pamqp Mapping Index
name = 'Basic.Publish'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_ticket = 'short'
_exchange = 'shortstr'
_routing_key = 'shortstr'
_mandatory = 'bit'
_immediate = 'bit'
def __init__(self,
ticket: int = 0,
exchange: str = '',
routing_key: str = '',
mandatory: bool = False,
immediate: bool = False) -> None:
"""Initialize the :class:`Basic.Publish` class"""
self.ticket = ticket
self.exchange = exchange
self.routing_key = routing_key
self.mandatory = mandatory
self.immediate = immediate
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class Return(base.Frame):
"""Return a failed message
This method returns an undeliverable message that was published with
the "immediate" flag set, or an unroutable message published with the
"mandatory" flag set. The reply code and text provide information about
the reason that the message was undeliverable.
:param reply_code: Reply code from server
:param reply_text: Localised reply text
- Default: ``''``
:param exchange: Specifies the name of the exchange that the message
was originally published to. May be empty, meaning the default
exchange.
- Default: ``''``
:param routing_key: Message routing key
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'reply_code': int,
'reply_text': str,
'exchange': str,
'routing_key': str
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'reply_code', 'reply_text', 'exchange', 'routing_key'
]
frame_id = 50 # AMQP Frame ID
index = 0x003C0032 # pamqp Mapping Index
name = 'Basic.Return'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_reply_code = 'short'
_reply_text = 'shortstr'
_exchange = 'shortstr'
_routing_key = 'shortstr'
def __init__(self,
reply_code: typing.Optional[int] = None,
reply_text: str = '',
exchange: str = '',
routing_key: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Basic.Return` class"""
self.reply_code = reply_code
self.reply_text = reply_text or ''
self.exchange = exchange or ''
self.routing_key = routing_key
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class Deliver(base.Frame):
"""Notify the client of a consumer message
This method delivers a message to the client, via a consumer. In the
asynchronous message delivery model, the client starts a consumer using
the Consume method, then the server responds with Deliver methods as
and when messages arrive for that consumer.
:param consumer_tag: Consumer tag
:param delivery_tag: Server-assigned delivery tag
:param redelivered: Message is being redelivered
- Default: ``False``
:param exchange: Specifies the name of the exchange that the message
was originally published to. May be empty, indicating the default
exchange.
- Default: ``''``
:param routing_key: Message routing key
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'consumer_tag': str,
'delivery_tag': int,
'redelivered': bool,
'exchange': str,
'routing_key': str
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'consumer_tag', 'delivery_tag', 'redelivered', 'exchange',
'routing_key'
]
frame_id = 60 # AMQP Frame ID
index = 0x003C003C # pamqp Mapping Index
name = 'Basic.Deliver'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_consumer_tag = 'shortstr'
_delivery_tag = 'longlong'
_redelivered = 'bit'
_exchange = 'shortstr'
_routing_key = 'shortstr'
def __init__(self,
consumer_tag: typing.Optional[str] = None,
delivery_tag: typing.Optional[int] = None,
redelivered: bool = False,
exchange: str = '',
routing_key: typing.Optional[str] = None) -> None:
"""Initialize the :class:`Basic.Deliver` class"""
self.consumer_tag = consumer_tag
self.delivery_tag = delivery_tag
self.redelivered = redelivered or False
self.exchange = exchange or ''
self.routing_key = routing_key
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class Get(base.Frame):
"""Direct access to a queue
This method provides a direct access to the messages in a queue using a
synchronous dialogue that is designed for specific types of application
where synchronous functionality is more important than performance.
:param ticket: Deprecated, must be ``0``
- Default: ``0``
:param queue: Specifies the name of the queue to get a message from.
- Default: ``''``
:param no_ack: No acknowledgement needed
- Default: ``False``
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'ticket': int,
'queue': str,
'no_ack': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'ticket', 'queue', 'no_ack'
]
frame_id = 70 # AMQP Frame ID
index = 0x003C0046 # pamqp Mapping Index
name = 'Basic.Get'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Basic.GetOk', 'Basic.GetEmpty']
# Class Attribute Types for unmarshaling
_ticket = 'short'
_queue = 'shortstr'
_no_ack = 'bit'
def __init__(self,
ticket: int = 0,
queue: str = '',
no_ack: bool = False) -> None:
"""Initialize the :class:`Basic.Get` class"""
self.ticket = ticket
self.queue = queue
self.no_ack = no_ack
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.ticket is not None and self.ticket != 0:
raise ValueError('ticket must be 0')
if self.queue is not None and len(self.queue) > 256:
raise ValueError('Max length exceeded for queue')
if self.queue is not None and not constants.DOMAIN_REGEX[
'queue-name'].fullmatch(self.queue):
raise ValueError('Invalid value for queue')
[docs] class GetOk(base.Frame):
"""Provide client with a message
This method delivers a message to the client following a get method. A
message delivered by 'get-ok' must be acknowledged unless the no-ack
option was set in the get method.
:param delivery_tag: Server-assigned delivery tag
:param redelivered: Message is being redelivered
- Default: ``False``
:param exchange: Specifies the name of the exchange that the message
was originally published to. If empty, the message was published to
the default exchange.
- Default: ``''``
:param routing_key: Message routing key
:param message_count: Number of messages in the queue.
:raises ValueError: when an argument fails to validate
"""
__annotations__: typing.Dict[str, object] = {
'delivery_tag': int,
'redelivered': bool,
'exchange': str,
'routing_key': str,
'message_count': int
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'delivery_tag', 'redelivered', 'exchange', 'routing_key',
'message_count'
]
frame_id = 71 # AMQP Frame ID
index = 0x003C0047 # pamqp Mapping Index
name = 'Basic.GetOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_delivery_tag = 'longlong'
_redelivered = 'bit'
_exchange = 'shortstr'
_routing_key = 'shortstr'
_message_count = 'long'
def __init__(self,
delivery_tag: typing.Optional[int] = None,
redelivered: bool = False,
exchange: str = '',
routing_key: typing.Optional[str] = None,
message_count: typing.Optional[int] = None) -> None:
"""Initialize the :class:`Basic.GetOk` class"""
self.delivery_tag = delivery_tag
self.redelivered = redelivered or False
self.exchange = exchange or ''
self.routing_key = routing_key
self.message_count = message_count
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.exchange is not None and len(self.exchange) > 127:
raise ValueError('Max length exceeded for exchange')
if self.exchange is not None and not constants.DOMAIN_REGEX[
'exchange-name'].fullmatch(self.exchange):
raise ValueError('Invalid value for exchange')
[docs] class GetEmpty(base.Frame):
"""Indicate no messages available
This method tells the client that the queue has no messages available
for the client.
:param cluster_id: Deprecated, must be empty
- Default: ``''``
"""
__annotations__: typing.Dict[str, object] = {'cluster_id': str}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'cluster_id'
]
frame_id = 72 # AMQP Frame ID
index = 0x003C0048 # pamqp Mapping Index
name = 'Basic.GetEmpty'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_cluster_id = 'shortstr'
def __init__(self, cluster_id: str = '') -> None:
"""Initialize the :class:`Basic.GetEmpty` class"""
self.cluster_id = cluster_id
self.validate()
[docs] def validate(self) -> None:
"""Validate the frame data ensuring all domains or attributes
adhere to the protocol specification.
:raises ValueError: on validation error
"""
if self.cluster_id is not None and self.cluster_id != '':
raise ValueError('cluster_id must be empty')
[docs] class Ack(base.Frame):
"""Acknowledge one or more messages
When sent by the client, this method acknowledges one or more messages
delivered via the Deliver or Get-Ok methods. When sent by server, this
method acknowledges one or more messages published with the Publish
method on a channel in confirm mode. The acknowledgement can be for a
single message or a set of messages up to and including a specific
message.
:param delivery_tag: Server-assigned delivery tag
- Default: ``0``
:param multiple: Acknowledge multiple messages
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {
'delivery_tag': int,
'multiple': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'delivery_tag', 'multiple'
]
frame_id = 80 # AMQP Frame ID
index = 0x003C0050 # pamqp Mapping Index
name = 'Basic.Ack'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_delivery_tag = 'longlong'
_multiple = 'bit'
def __init__(self,
delivery_tag: int = 0,
multiple: bool = False) -> None:
"""Initialize the :class:`Basic.Ack` class"""
self.delivery_tag = delivery_tag
self.multiple = multiple
[docs] class Reject(base.Frame):
"""Reject an incoming message
This method allows a client to reject a message. It can be used to
interrupt and cancel large incoming messages, or return untreatable
messages to their original queue.
:param delivery_tag: Server-assigned delivery tag
:param requeue: Requeue the message
- Default: ``True``
"""
__annotations__: typing.Dict[str, object] = {
'delivery_tag': int,
'requeue': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'delivery_tag', 'requeue'
]
frame_id = 90 # AMQP Frame ID
index = 0x003C005A # pamqp Mapping Index
name = 'Basic.Reject'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_delivery_tag = 'longlong'
_requeue = 'bit'
def __init__(self,
delivery_tag: typing.Optional[int] = None,
requeue: bool = True) -> None:
"""Initialize the :class:`Basic.Reject` class"""
self.delivery_tag = delivery_tag
self.requeue = requeue
[docs] class RecoverAsync(base.Frame):
"""Redeliver unacknowledged messages
This method asks the server to redeliver all unacknowledged messages on
a specified channel. Zero or more messages may be redelivered. This
method is deprecated in favour of the synchronous Recover/Recover-Ok.
.. deprecated:: This command is deprecated in AMQP 0-9-1
:param requeue: Requeue the message
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {'requeue': bool}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'requeue'
]
frame_id = 100 # AMQP Frame ID
index = 0x003C0064 # pamqp Mapping Index
name = 'Basic.RecoverAsync'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_requeue = 'bit'
def __init__(self, requeue: bool = False) -> None:
"""Initialize the :class:`Basic.RecoverAsync` class"""
self.requeue = requeue
warnings.warn(constants.DEPRECATION_WARNING,
category=DeprecationWarning)
[docs] class Recover(base.Frame):
"""Redeliver unacknowledged messages
This method asks the server to redeliver all unacknowledged messages on
a specified channel. Zero or more messages may be redelivered. This
method replaces the asynchronous Recover.
:param requeue: Requeue the message
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {'requeue': bool}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'requeue'
]
frame_id = 110 # AMQP Frame ID
index = 0x003C006E # pamqp Mapping Index
name = 'Basic.Recover'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Basic.RecoverOk']
# Class Attribute Types for unmarshaling
_requeue = 'bit'
def __init__(self, requeue: bool = False) -> None:
"""Initialize the :class:`Basic.Recover` class"""
self.requeue = requeue
[docs] class RecoverOk(base.Frame):
"""Confirm recovery
This method acknowledges a :class:`Basic.Recover` method.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 111 # AMQP Frame ID
index = 0x003C006F # pamqp Mapping Index
name = 'Basic.RecoverOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Nack(base.Frame):
"""Reject one or more incoming messages
This method allows a client to reject one or more incoming messages. It
can be used to interrupt and cancel large incoming messages, or return
untreatable messages to their original queue. This method is also used
by the server to inform publishers on channels in confirm mode of
unhandled messages. If a publisher receives this method, it probably
needs to republish the offending messages.
:param delivery_tag: Server-assigned delivery tag
- Default: ``0``
:param multiple: Reject multiple messages
- Default: ``False``
:param requeue: Requeue the message
- Default: ``True``
"""
__annotations__: typing.Dict[str, object] = {
'delivery_tag': int,
'multiple': bool,
'requeue': bool
}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'delivery_tag', 'multiple', 'requeue'
]
frame_id = 120 # AMQP Frame ID
index = 0x003C0078 # pamqp Mapping Index
name = 'Basic.Nack'
synchronous = False # Indicates if this is a synchronous AMQP method
# Class Attribute Types for unmarshaling
_delivery_tag = 'longlong'
_multiple = 'bit'
_requeue = 'bit'
def __init__(self,
delivery_tag: int = 0,
multiple: bool = False,
requeue: bool = True) -> None:
"""Initialize the :class:`Basic.Nack` class"""
self.delivery_tag = delivery_tag
self.multiple = multiple
self.requeue = requeue
[docs] class Properties(base.BasicProperties):
"""Content Properties
.. Note:: The AMQP property type is named ``message_type`` as to not
conflict with the Python ``type`` keyword
:param content_type: MIME content type
:param content_encoding: MIME content encoding
:param headers: Message header field table
:type headers: typing.Optional[:const:`~pamqp.common.FieldTable`]
:param delivery_mode: Non-persistent (1) or persistent (2)
:param priority: Message priority, 0 to 9
:param correlation_id: Application correlation identifier
:param reply_to: Address to reply to
:param expiration: Message expiration specification
:param message_id: Application message identifier
:param timestamp: Message timestamp
:param message_type: Message type name
:param user_id: Creating user id
:param app_id: Creating application id
:param cluster_id: Deprecated, must be empty
:raises: ValueError
"""
__annotations__: typing.Dict[str, object] = {
'content_type': str,
'content_encoding': str,
'headers': common.FieldTable,
'delivery_mode': int,
'priority': int,
'correlation_id': str,
'reply_to': str,
'expiration': str,
'message_id': str,
'timestamp': datetime.datetime,
'message_type': str,
'user_id': str,
'app_id': str,
'cluster_id': str
}
__slots__: typing.List[str] = [ # AMQ Properties Attributes
'content_type', 'content_encoding', 'headers', 'delivery_mode',
'priority', 'correlation_id', 'reply_to', 'expiration',
'message_id', 'timestamp', 'message_type', 'user_id', 'app_id',
'cluster_id'
]
# Flag values for marshaling / unmarshaling
flags = {
'content_type': 32768,
'content_encoding': 16384,
'headers': 8192,
'delivery_mode': 4096,
'priority': 2048,
'correlation_id': 1024,
'reply_to': 512,
'expiration': 256,
'message_id': 128,
'timestamp': 64,
'message_type': 32,
'user_id': 16,
'app_id': 8,
'cluster_id': 4
}
frame_id = 60 # AMQP Frame ID
index = 0x003C # pamqp Mapping Index
name = 'Basic.Properties'
# Class Attribute Types for unmarshaling
_content_type = 'shortstr'
_content_encoding = 'shortstr'
_headers = 'table'
_delivery_mode = 'octet'
_priority = 'octet'
_correlation_id = 'shortstr'
_reply_to = 'shortstr'
_expiration = 'shortstr'
_message_id = 'shortstr'
_timestamp = 'timestamp'
_message_type = 'shortstr'
_user_id = 'shortstr'
_app_id = 'shortstr'
_cluster_id = 'shortstr'
def __init__(self,
content_type: typing.Optional[str] = None,
content_encoding: typing.Optional[str] = None,
headers: typing.Optional[common.FieldTable] = None,
delivery_mode: typing.Optional[int] = None,
priority: typing.Optional[int] = None,
correlation_id: typing.Optional[str] = None,
reply_to: typing.Optional[str] = None,
expiration: typing.Optional[str] = None,
message_id: typing.Optional[str] = None,
timestamp: typing.Optional[datetime.datetime] = None,
message_type: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
app_id: typing.Optional[str] = None,
cluster_id: str = '') -> None:
"""Initialize the Basic.Properties class"""
self.content_type = content_type
self.content_encoding = content_encoding
self.headers = headers
self.delivery_mode = delivery_mode
self.priority = priority
self.correlation_id = correlation_id
self.reply_to = reply_to
self.expiration = expiration
self.message_id = message_id
self.timestamp = timestamp
self.message_type = message_type
self.user_id = user_id
self.app_id = app_id
self.cluster_id = cluster_id
self.validate()
[docs]class Tx:
"""Work with transactions
The Tx class allows publish and ack operations to be batched into atomic
units of work. The intention is that all publish and ack requests issued
within a transaction will complete successfully or none of them will.
Servers SHOULD implement atomic transactions at least where all publish or
ack requests affect a single queue. Transactions that cover multiple
queues may be non-atomic, given that queues can be created and destroyed
asynchronously, and such events do not form part of any transaction.
Further, the behaviour of transactions with respect to the immediate and
mandatory flags on :class:`Basic.Publish` methods is not defined.
"""
__slots__: typing.List[str] = []
frame_id = 90 # AMQP Frame ID
index = 0x005A0000 # pamqp Mapping Index
[docs] class Select(base.Frame):
"""Select standard transaction mode
This method sets the channel to use standard transactions. The client
must use this method at least once on a channel before using the Commit
or Rollback methods.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 10 # AMQP Frame ID
index = 0x005A000A # pamqp Mapping Index
name = 'Tx.Select'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Tx.SelectOk'] # Valid responses to this method
[docs] class SelectOk(base.Frame):
"""Confirm transaction mode
This method confirms to the client that the channel was successfully
set to use standard transactions.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 11 # AMQP Frame ID
index = 0x005A000B # pamqp Mapping Index
name = 'Tx.SelectOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Commit(base.Frame):
"""Commit the current transaction
This method commits all message publications and acknowledgments
performed in the current transaction. A new transaction starts
immediately after a commit.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 20 # AMQP Frame ID
index = 0x005A0014 # pamqp Mapping Index
name = 'Tx.Commit'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Tx.CommitOk'] # Valid responses to this method
[docs] class CommitOk(base.Frame):
"""Confirm a successful commit
This method confirms to the client that the commit succeeded. Note that
if a commit fails, the server raises a channel exception.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 21 # AMQP Frame ID
index = 0x005A0015 # pamqp Mapping Index
name = 'Tx.CommitOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs] class Rollback(base.Frame):
"""Abandon the current transaction
This method abandons all message publications and acknowledgments
performed in the current transaction. A new transaction starts
immediately after a rollback. Note that unacked messages will not be
automatically redelivered by rollback; if that is required an explicit
recover call should be issued.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 30 # AMQP Frame ID
index = 0x005A001E # pamqp Mapping Index
name = 'Tx.Rollback'
synchronous = True # Indicates if this is a synchronous AMQP method
valid_responses = ['Tx.RollbackOk'] # Valid responses to this method
[docs] class RollbackOk(base.Frame):
"""Confirm successful rollback
This method confirms to the client that the rollback succeeded. Note
that if an rollback fails, the server raises a channel exception.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 31 # AMQP Frame ID
index = 0x005A001F # pamqp Mapping Index
name = 'Tx.RollbackOk'
synchronous = False # Indicates if this is a synchronous AMQP method
[docs]class Confirm:
"""Work with confirms
The Confirm class allows publishers to put the channel in confirm mode and
subsequently be notified when messages have been handled by the broker.
The intention is that all messages published on a channel in confirm mode
will be acknowledged at some point. By acknowledging a message the broker
assumes responsibility for it and indicates that it has done something it
deems reasonable with it. Unroutable mandatory or immediate messages are
acknowledged right after the :class:`Basic.Return` method. Messages are
acknowledged when all queues to which the message has been routed have
either delivered the message and received an acknowledgement (if required),
or enqueued the message (and persisted it if required). Published messages
are assigned ascending sequence numbers, starting at 1 with the first
:class:`Confirm.Select` method. The server confirms messages by sending
:class:`Basic.Ack` methods referring to these sequence numbers.
"""
__slots__: typing.List[str] = []
frame_id = 85 # AMQP Frame ID
index = 0x00550000 # pamqp Mapping Index
[docs] class Select(base.Frame):
"""Select confirm mode (i.e. enable publisher acknowledgements)
This method sets the channel to use publisher acknowledgements. The
client can only use this method on a non-transactional channel.
:param nowait: Do not send a reply method
- Default: ``False``
"""
__annotations__: typing.Dict[str, object] = {'nowait': bool}
__slots__: typing.List[str] = [ # AMQ Method Attributes
'nowait'
]
frame_id = 10 # AMQP Frame ID
index = 0x0055000A # pamqp Mapping Index
name = 'Confirm.Select'
synchronous = True # Indicates if this is a synchronous AMQP method
# Valid responses to this method
valid_responses = ['Confirm.SelectOk']
# Class Attribute Types for unmarshaling
_nowait = 'bit'
def __init__(self, nowait: bool = False) -> None:
"""Initialize the :class:`Confirm.Select` class"""
self.nowait = nowait
[docs] class SelectOk(base.Frame):
"""Acknowledge confirm mode
This method confirms to the client that the channel was successfully
set to use publisher acknowledgements.
"""
__annotations__: typing.Dict[str, object] = {}
__slots__: typing.List[str] = [] # AMQ Method Attributes
frame_id = 11 # AMQP Frame ID
index = 0x0055000B # pamqp Mapping Index
name = 'Confirm.SelectOk'
synchronous = False # Indicates if this is a synchronous AMQP method
# AMQP Class.Method Index Mapping
INDEX_MAPPING = {
0x000A000A: Connection.Start,
0x000A000B: Connection.StartOk,
0x000A0014: Connection.Secure,
0x000A0015: Connection.SecureOk,
0x000A001E: Connection.Tune,
0x000A001F: Connection.TuneOk,
0x000A0028: Connection.Open,
0x000A0029: Connection.OpenOk,
0x000A0032: Connection.Close,
0x000A0033: Connection.CloseOk,
0x000A003C: Connection.Blocked,
0x000A003D: Connection.Unblocked,
0x000A0046: Connection.UpdateSecret,
0x000A0047: Connection.UpdateSecretOk,
0x0014000A: Channel.Open,
0x0014000B: Channel.OpenOk,
0x00140014: Channel.Flow,
0x00140015: Channel.FlowOk,
0x00140028: Channel.Close,
0x00140029: Channel.CloseOk,
0x0028000A: Exchange.Declare,
0x0028000B: Exchange.DeclareOk,
0x00280014: Exchange.Delete,
0x00280015: Exchange.DeleteOk,
0x0028001E: Exchange.Bind,
0x0028001F: Exchange.BindOk,
0x00280028: Exchange.Unbind,
0x00280033: Exchange.UnbindOk,
0x0032000A: Queue.Declare,
0x0032000B: Queue.DeclareOk,
0x00320014: Queue.Bind,
0x00320015: Queue.BindOk,
0x0032001E: Queue.Purge,
0x0032001F: Queue.PurgeOk,
0x00320028: Queue.Delete,
0x00320029: Queue.DeleteOk,
0x00320032: Queue.Unbind,
0x00320033: Queue.UnbindOk,
0x003C000A: Basic.Qos,
0x003C000B: Basic.QosOk,
0x003C0014: Basic.Consume,
0x003C0015: Basic.ConsumeOk,
0x003C001E: Basic.Cancel,
0x003C001F: Basic.CancelOk,
0x003C0028: Basic.Publish,
0x003C0032: Basic.Return,
0x003C003C: Basic.Deliver,
0x003C0046: Basic.Get,
0x003C0047: Basic.GetOk,
0x003C0048: Basic.GetEmpty,
0x003C0050: Basic.Ack,
0x003C005A: Basic.Reject,
0x003C0064: Basic.RecoverAsync,
0x003C006E: Basic.Recover,
0x003C006F: Basic.RecoverOk,
0x003C0078: Basic.Nack,
0x005A000A: Tx.Select,
0x005A000B: Tx.SelectOk,
0x005A0014: Tx.Commit,
0x005A0015: Tx.CommitOk,
0x005A001E: Tx.Rollback,
0x005A001F: Tx.RollbackOk,
0x0055000A: Confirm.Select,
0x0055000B: Confirm.SelectOk
}