tux.bot
¶
Tux Discord bot core implementation.
This module defines the primary Tux
class, which serves as the central orchestrator for the entire bot application. It extends discord.py
's commands.Bot
and is responsible for the following key areas:
- Lifecycle Management: Handles the startup and shutdown sequences, ensuring that all sub-systems are initialized and terminated gracefully.
- Component Orchestration: Initializes and holds instances of various manager classes (e.g.,
TaskManager
,SentryManager
,EmojiManager
) that encapsulate specific functionalities. - Cog Loading: Triggers the loading of all command cogs from the
tux/cogs
directory via theCogLoader
. - Event Handling: Implements core
discord.py
event listeners likeon_ready
andsetup_hook
to manage the bot's state as it interacts with Discord.
Classes:
Name | Description |
---|---|
TaskCategory | Categories for background tasks. |
BotState | Manages the bot's operational state flags. |
DatabaseConnectionError | Raised when database connection fails. |
Tux | The main class for the Tux Discord bot. |
Classes¶
BotState(is_shutting_down: bool = False, setup_complete: bool = False, start_time: float | None = None, emoji_manager_initialized: bool = False, hot_reload_loaded: bool = False, banner_logged: bool = False)
dataclass
¶
Manages the bot's operational state flags.
DatabaseConnectionError
¶
Tux(*args: Any, **kwargs: Any)
¶
Bases: Bot
The main class for the Tux Discord bot.
This class extends discord.py
's commands.Bot
and serves as the central orchestrator for the application. It is responsible for initializing sub-systems (like database, Sentry, emoji management), loading cogs, handling the bot's lifecycle (setup, shutdown), and processing events.
Initializes the Tux bot, its managers, and lifecycle steps.
This sets up the core state, managers (Sentry, Emoji, Task), and defines the sequence of operations for the bot's startup and shutdown routines. It also creates and schedules the main setup task.
Methods:
Name | Description |
---|---|
setup | Executes the bot's startup routine in a defined sequence. |
shutdown | Executes the bot's shutdown routine in a defined sequence. |
setup_hook | Performs one-time async setup before connecting to Discord. |
on_ready | Called when the bot is ready and connected to Discord. |
on_disconnect | Logs and reports when the bot disconnects from Discord. |
remove_cog | Remove a cog and clean up associated tasks. |
invoke | Overrides the default invoke method to wrap command execution in a Sentry transaction. |
Source code in tux/bot.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initializes the Tux bot, its managers, and lifecycle steps.
This sets up the core state, managers (Sentry, Emoji, Task), and defines
the sequence of operations for the bot's startup and shutdown routines.
It also creates and schedules the main setup task.
"""
super().__init__(*args, **kwargs)
# Core bot state flags, managed by the BotState dataclass.
self.state = BotState()
self.prefix_cache: dict[int, str] = {}
self.setup_task: asyncio.Task[None] | None = None
self._startup_task: asyncio.Task[None] | None = None
# Sub-systems and managers that encapsulate specific functionalities.
self.sentry_manager: SentryManager = SentryManager()
self.emoji_manager = EmojiManager(self)
self.task_manager = TaskManager(self)
self.console = Console(stderr=True, force_terminal=True)
# Bot lifecycle routines are now inlined directly in setup() and shutdown() methods
# for better readability and explicit sequencing
# The main setup routine is started as a background task immediately.
logger.debug("Creating bot setup task")
self.setup_task = asyncio.create_task(self.setup(), name="bot_setup")
self.setup_task.add_done_callback(self._setup_callback)
Functions¶
setup() -> None
async
¶
Executes the bot's startup routine in a defined sequence.
This method performs each setup step in order to ensure the bot is properly initialized before it goes online. If any step fails, the setup is aborted, and a graceful shutdown is triggered.
Raises:
Type | Description |
---|---|
Exception | Propagates any exception that occurs during a setup step, which is then caught, logged, and triggers a shutdown. |
Source code in tux/bot.py
async def setup(self) -> None:
"""
Executes the bot's startup routine in a defined sequence.
This method performs each setup step in order to ensure the bot is properly
initialized before it goes online. If any step fails, the setup is aborted,
and a graceful shutdown is triggered.
Raises
------
Exception
Propagates any exception that occurs during a setup step,
which is then caught, logged, and triggers a shutdown.
"""
try:
with start_span("bot.setup", "Bot setup process") as span:
# Database connection
set_setup_phase_tag(span, "database", "starting")
await self._setup_database()
set_setup_phase_tag(span, "database", "finished")
# Load jishaku extension
set_setup_phase_tag(span, "jishaku", "starting")
await self._load_jishaku()
set_setup_phase_tag(span, "jishaku", "finished")
# Load all cogs
set_setup_phase_tag(span, "cogs", "starting")
await self._load_cogs()
set_setup_phase_tag(span, "cogs", "finished")
# Setup hot reload
set_setup_phase_tag(span, "hot_reload", "starting")
await self._setup_hot_reload()
set_setup_phase_tag(span, "hot_reload", "finished")
# Register critical tasks
set_setup_phase_tag(span, "register_tasks", "starting")
await self._register_critical_tasks()
set_setup_phase_tag(span, "register_tasks", "finished")
# Start monitoring
set_setup_phase_tag(span, "monitoring", "starting")
self.task_manager.start()
set_setup_phase_tag(span, "monitoring", "finished")
# Setup task instrumentation
set_setup_phase_tag(span, "instrument_tasks", "starting")
self.task_manager.setup_task_instrumentation()
set_setup_phase_tag(span, "instrument_tasks", "finished")
# Setup command instrumentation
set_setup_phase_tag(span, "instrument_commands", "starting")
instrument_bot_commands(self)
set_setup_phase_tag(span, "instrument_commands", "finished")
except Exception as e:
# If any part of the setup fails, log the critical error
# and initiate a graceful shutdown to prevent a partial startup.
logger.critical(f"Critical error during setup: {e}")
self.sentry_manager.set_context("setup_failure", {"error": str(e), "error_type": type(e).__name__})
self.sentry_manager.capture_exception(e)
await self.shutdown()
raise
shutdown() -> None
async
¶
Executes the bot's shutdown routine in a defined sequence.
This method ensures that all resources are cleaned up properly, including cancelling tasks, stopping task loops, and closing database and Discord connections. The is_shutting_down
flag prevents this from running more than once.
Source code in tux/bot.py
async def shutdown(self) -> None:
"""
Executes the bot's shutdown routine in a defined sequence.
This method ensures that all resources are cleaned up properly,
including cancelling tasks, stopping task loops, and closing
database and Discord connections. The `is_shutting_down` flag
prevents this from running more than once.
"""
with start_transaction("bot.shutdown", "Bot shutdown process") as transaction:
# The is_shutting_down flag prevents re-entrant calls to shutdown.
if self.state.is_shutting_down:
logger.info("Shutdown already in progress. Exiting.")
transaction.set_data("already_shutting_down", True)
return
self.state.is_shutting_down = True
transaction.set_tag("shutdown_initiated", True)
logger.info("Shutting down...")
# Handle setup task cleanup
transaction.set_tag("handle_setup_task_handled", False)
await self._handle_setup_task()
transaction.set_tag("handle_setup_task_handled", True)
# Cancel all tasks
transaction.set_tag("cleanup_tasks_handled", False)
await self.task_manager.cancel_all_tasks()
transaction.set_tag("cleanup_tasks_handled", True)
# Close connections
transaction.set_tag("close_connections_handled", False)
await self._close_connections()
transaction.set_tag("close_connections_handled", True)
logger.info("Bot shutdown complete.")
setup_hook() -> None
async
¶
Performs one-time async setup before connecting to Discord.
This discord.py
hook is used to initialize the emoji manager and schedule the _post_ready_startup
task, which runs after both the internal setup and the Discord connection are fully established.
Source code in tux/bot.py
async def setup_hook(self) -> None:
"""
Performs one-time async setup before connecting to Discord.
This `discord.py` hook is used to initialize the emoji manager and
schedule the `_post_ready_startup` task, which runs after both the
internal setup and the Discord connection are fully established.
"""
# Initialize the emoji manager as soon as the bot's loop is running.
if not self.state.emoji_manager_initialized:
await self.emoji_manager.init()
self.state.emoji_manager_initialized = True
# The `_post_ready_startup` task should only be created once.
# This check prevents it from being recreated on subsequent reconnects.
if self._startup_task is None or self._startup_task.done():
self._startup_task = self.loop.create_task(self._post_ready_startup())
on_ready() -> None
async
¶
Called when the bot is ready and connected to Discord.
This sets the bot's presence and indicates that it is online. It waits for the internal setup to complete before proceeding.
Source code in tux/bot.py
async def on_ready(self) -> None:
"""
Called when the bot is ready and connected to Discord.
This sets the bot's presence and indicates that it is online.
It waits for the internal setup to complete before proceeding.
"""
await self._wait_for_setup()
# Set bot status
activity = discord.Activity(type=discord.ActivityType.watching, name="for /help")
await self.change_presence(activity=activity, status=discord.Status.online)
on_disconnect() -> None
async
¶
Logs and reports when the bot disconnects from Discord.
This is a normal event during bot operation and is usually followed by a reconnect, so it is logged as a warning.
Source code in tux/bot.py
async def on_disconnect(self) -> None:
"""
Logs and reports when the bot disconnects from Discord.
This is a normal event during bot operation and is usually followed
by a reconnect, so it is logged as a warning.
"""
logger.info("Bot has disconnected from Discord.")
self.sentry_manager.capture_message(
"Bot disconnected from Discord, this happens sometimes and is fine as long as it's not happening too often",
)
_setup_database() -> None
async
¶
Connects to the database and validates the connection.
Raises:
Type | Description |
---|---|
Exception | Propagates any database connection errors from the client. |
Source code in tux/bot.py
async def _setup_database(self) -> None:
"""
Connects to the database and validates the connection.
Raises
------
Exception
Propagates any database connection errors from the client.
"""
with start_span("bot.database_connect", "Setting up database connection") as span:
logger.info("Setting up database connection...")
try:
await db.connect()
self._validate_db_connection()
span.set_tag("db.connected", db.is_connected())
span.set_tag("db.registered", db.is_registered())
logger.info(f"Database connected: {db.is_connected()}")
logger.info(f"Database models registered: {db.is_registered()}")
except Exception as e:
set_span_error(span, e, "db_error")
raise
_load_jishaku() -> None
async
¶
Loads the Jishaku extension for debugging and development.
Source code in tux/bot.py
async def _load_jishaku(self) -> None:
"""Loads the Jishaku extension for debugging and development."""
with start_span("bot.load_jishaku", "Loading jishaku debug extension") as span:
try:
await self.load_extension("jishaku")
logger.info("Successfully loaded jishaku extension")
span.set_tag("jishaku.loaded", True)
except commands.ExtensionError as e:
logger.warning(f"Failed to load jishaku: {e}")
span.set_tag("jishaku.loaded", False)
span.set_data("error", str(e))
_load_cogs() -> None
async
¶
Loads all command cogs using the CogLoader utility.
Raises:
Type | Description |
---|---|
Exception | Propagates any exceptions that occur during cog loading. |
Source code in tux/bot.py
async def _load_cogs(self) -> None:
"""
Loads all command cogs using the CogLoader utility.
Raises
------
Exception
Propagates any exceptions that occur during cog loading.
"""
from tux.cog_loader import CogLoader # noqa: PLC0415
with start_span("bot.load_cogs", "Loading all cogs") as span:
logger.info("Loading cogs...")
try:
await CogLoader.setup(self)
span.set_tag("cogs_loaded", True)
except Exception as e:
logger.critical(f"Error loading cogs: {e}")
span.set_tag("cogs_loaded", False)
set_span_error(span, e, "error")
self.sentry_manager.capture_exception(e)
raise
_setup_hot_reload() -> None
async
¶
Sets up the hot-reload system for development.
This allows for automatic reloading of cogs and modules when files are changed, speeding up the development workflow.
Raises:
Type | Description |
---|---|
Exception | Propagates exceptions from |
Source code in tux/bot.py
async def _setup_hot_reload(self) -> None:
"""
Sets up the hot-reload system for development.
This allows for automatic reloading of cogs and modules when files
are changed, speeding up the development workflow.
Raises
------
Exception
Propagates exceptions from `load_extension` if hot-reload fails.
"""
if not self.state.hot_reload_loaded and "tux.utils.hot_reload" not in self.extensions:
with start_span("bot.setup_hot_reload", "Setting up hot reload system"):
try:
await self.load_extension("tux.utils.hot_reload")
self.state.hot_reload_loaded = True
logger.info("🔥 Hot reload system initialized")
except Exception as e:
logger.error(f"Failed to load hot reload extension: {e}")
self.sentry_manager.capture_exception(e)
_register_critical_tasks() -> None
async
¶
Registers critical tasks after cogs are loaded.
This method uses dynamic discovery to find critical tasks from cogs, making the system more flexible and cog-driven.
Source code in tux/bot.py
async def _register_critical_tasks(self) -> None:
"""
Registers critical tasks after cogs are loaded.
This method uses dynamic discovery to find critical tasks from cogs,
making the system more flexible and cog-driven.
"""
with start_span("bot.register_critical_tasks", "Registering critical tasks") as span:
logger.info("Registering critical tasks...")
try:
# Clear any existing critical tasks to avoid duplicates
self.task_manager.critical_tasks.clear()
self.task_manager.task_metrics.clear()
# Discover and register tasks from cogs dynamically
self.task_manager.discover_and_register_cog_tasks()
span.set_tag("tasks_registered", len(self.task_manager.critical_tasks))
logger.info(f"Registered {len(self.task_manager.critical_tasks)} critical tasks.")
except Exception as e:
logger.critical(f"Failed to register critical tasks: {e}")
self.sentry_manager.capture_exception(e)
raise
_handle_cog_unload(cog_name: str) -> None
¶
Handle cleanup when a cog is unloaded.
This method cleans up any critical tasks associated with the unloaded cog to prevent orphaned task references.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cog_name | str | The name of the cog that was unloaded. | required |
Source code in tux/bot.py
def _handle_cog_unload(self, cog_name: str) -> None:
"""
Handle cleanup when a cog is unloaded.
This method cleans up any critical tasks associated with the unloaded cog
to prevent orphaned task references.
Parameters
----------
cog_name : str
The name of the cog that was unloaded.
"""
logger.debug(f"Handling unload for cog: {cog_name}")
self.task_manager.cleanup_cog_tasks(cog_name)
remove_cog(name: str, /, *, guild: discord.abc.Snowflake | None = None, guilds: collections.abc.Sequence[discord.abc.Snowflake] | None = None) -> commands.Cog | None
async
¶
Remove a cog and clean up associated tasks.
This overrides the default remove_cog method to ensure that critical tasks associated with the unloaded cog are properly cleaned up when the cog is unloaded.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | The name of the cog to remove. | required |
guild | Snowflake | None | The guild to remove the cog from, by default None | None |
guilds | Sequence[Snowflake] | None | The guilds to remove the cog from, by default None | None |
Returns:
Type | Description |
---|---|
Cog | None | The removed cog, or None if it wasn't loaded. |
Source code in tux/bot.py
async def remove_cog(
self,
name: str,
/,
*,
guild: discord.abc.Snowflake | None = None,
guilds: collections.abc.Sequence[discord.abc.Snowflake] | None = None,
) -> commands.Cog | None:
"""
Remove a cog and clean up associated tasks.
This overrides the default remove_cog method to ensure that critical tasks
associated with the unloaded cog are properly cleaned up when the cog is unloaded.
Parameters
----------
name : str
The name of the cog to remove.
guild : discord.abc.Snowflake | None, optional
The guild to remove the cog from, by default None
guilds : collections.abc.Sequence[discord.abc.Snowflake] | None, optional
The guilds to remove the cog from, by default None
Returns
-------
commands.Cog | None
The removed cog, or None if it wasn't loaded.
"""
# Remove the cog using the parent method
if guilds is not None:
removed_cog = await super().remove_cog(name, guild=guild, guilds=guilds)
elif guild is not None:
removed_cog = await super().remove_cog(name, guild=guild)
else:
removed_cog = await super().remove_cog(name)
# Clean up associated tasks if the cog was successfully removed
if removed_cog is not None:
self._handle_cog_unload(name)
return removed_cog
_handle_setup_task() -> None
async
¶
Handles the main setup task during shutdown.
If the bot is shut down while the initial setup is still running, this method ensures the setup task is properly cancelled.
Source code in tux/bot.py
async def _handle_setup_task(self) -> None:
"""
Handles the main setup task during shutdown.
If the bot is shut down while the initial setup is still running,
this method ensures the setup task is properly cancelled.
"""
with start_span("bot.handle_setup_task", "Handling setup task during shutdown"):
if self.setup_task and not self.setup_task.done():
self.setup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.setup_task
_close_connections() -> None
async
¶
_close_discord() -> None
async
¶
Closes the connection to the Discord Gateway.
Source code in tux/bot.py
async def _close_discord(self) -> None:
"""Closes the connection to the Discord Gateway."""
with start_span("bot.close_discord", "Closing Discord connection") as span:
try:
logger.debug("Closing Discord connection.")
await self.close()
logger.debug("Discord connection closed.")
span.set_tag("discord_closed", True)
except Exception as e:
logger.error(f"Error during Discord shutdown: {e}")
span.set_tag("discord_closed", False)
set_span_error(span, e, "discord_error")
self.sentry_manager.capture_exception(e)
_close_database() -> None
async
¶
Closes the database connection pool.
Source code in tux/bot.py
async def _close_database(self) -> None:
"""Closes the database connection pool."""
with start_span("bot.close_database", "Closing database connection") as span:
if not db.is_connected():
logger.warning("Database was not connected, no disconnect needed.")
span.set_tag("db_connected", False)
return
try:
logger.debug("Closing database connection.")
await db.disconnect()
logger.debug("Database connection closed.")
span.set_tag("db_closed", True)
except Exception as e:
logger.critical(f"Error during database disconnection: {e}")
span.set_tag("db_closed", False)
set_span_error(span, e, "db_error")
self.sentry_manager.capture_exception(e)
_setup_callback(task: asyncio.Task[None]) -> None
¶
A callback that runs upon completion of the main setup task.
This updates the bot's state to reflect whether the setup was successful or failed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | Task[None] | The setup task that has completed. | required |
Source code in tux/bot.py
def _setup_callback(self, task: asyncio.Task[None]) -> None:
"""
A callback that runs upon completion of the main setup task.
This updates the bot's state to reflect whether the setup
was successful or failed.
Parameters
----------
task : asyncio.Task[None]
The setup task that has completed.
"""
try:
task.result()
self.state.setup_complete = True
logger.info("Bot setup completed successfully")
self.sentry_manager.set_tag("bot.setup_complete", True)
except Exception as e:
logger.critical(f"Setup failed: {e}")
self.state.setup_complete = False
self.sentry_manager.set_tag("bot.setup_complete", False)
self.sentry_manager.set_tag("bot.setup_failed", True)
self.sentry_manager.capture_exception(e)
_wait_for_setup() -> None
async
¶
Waits for the internal setup task to complete before proceeding.
This is a crucial step in event handlers like on_ready
to ensure that cogs, database connections, etc., are available before the bot tries to interact with them.
Source code in tux/bot.py
async def _wait_for_setup(self) -> None:
"""
Waits for the internal setup task to complete before proceeding.
This is a crucial step in event handlers like `on_ready` to ensure
that cogs, database connections, etc., are available before the bot
tries to interact with them.
"""
if self.setup_task and not self.setup_task.done():
with start_span("bot.wait_setup", "Waiting for setup to complete"):
try:
await self.setup_task
except Exception as e:
logger.critical(f"Setup failed during on_ready: {e}")
self.sentry_manager.capture_exception(e)
await self.shutdown()
_post_ready_startup()
async
¶
Runs tasks that require the bot to be fully online and ready.
This method waits for two conditions: 1. The bot is connected to the Discord Gateway (wait_until_ready
). 2. The bot has completed its own internal setup (_wait_for_setup
).
Once ready, it logs the startup banner and reports initial stats.
Source code in tux/bot.py
async def _post_ready_startup(self):
"""
Runs tasks that require the bot to be fully online and ready.
This method waits for two conditions:
1. The bot is connected to the Discord Gateway (`wait_until_ready`).
2. The bot has completed its own internal setup (`_wait_for_setup`).
Once ready, it logs the startup banner and reports initial stats.
"""
await self.wait_until_ready()
# Also wait for internal bot setup (cogs, db, etc.) to complete
await self._wait_for_setup()
if not self.state.start_time:
self.state.start_time = discord.utils.utcnow().timestamp()
if not self.state.banner_logged:
await self._log_startup_banner()
self.state.banner_logged = True
self.sentry_manager.set_context(
"bot_stats",
{
"guild_count": len(self.guilds),
"user_count": len(self.users),
"channel_count": sum(len(g.channels) for g in self.guilds),
"uptime": discord.utils.utcnow().timestamp() - (self.state.start_time or 0),
},
)
_log_startup_banner() -> None
async
¶
Logs the bot's startup banner and stats to the console.
Source code in tux/bot.py
async def _log_startup_banner(self) -> None:
"""Logs the bot's startup banner and stats to the console."""
with start_span("bot.log_banner", "Displaying startup banner"):
banner = create_banner(
bot_name=Config.BOT_NAME,
version=Config.BOT_VERSION,
bot_id=str(self.user.id) if self.user else None,
guild_count=len(self.guilds),
user_count=len(self.users),
prefix=Config.DEFAULT_PREFIX,
dev_mode=is_dev_mode(),
)
self.console.print(banner)
_validate_db_connection() -> None
staticmethod
¶
Ensures the database is properly connected.
Raises:
Type | Description |
---|---|
DatabaseConnectionError | If the database is not connected or models are not registered. |
Source code in tux/bot.py
@staticmethod
def _validate_db_connection() -> None:
"""
Ensures the database is properly connected.
Raises
------
DatabaseConnectionError
If the database is not connected or models are not registered.
"""
if not db.is_connected() or not db.is_registered():
raise DatabaseConnectionError(DatabaseConnectionError.CONNECTION_FAILED)
_get_prefix(bot: Tux, message: discord.Message) -> list[str]
async
¶
Resolves the command prefix for a given message with caching.
This method dynamically retrieves the command prefix for a guild, caching the result to avoid repeated database lookups. It falls back to the default prefix if one is not configured or if a database error occurs. It also allows the bot to be invoked by mentioning it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bot | Tux | The instance of the bot. | required |
message | Message | The message to resolve the prefix for. | required |
Returns:
Type | Description |
---|---|
list[str] | A list of command prefixes, including mentions. |
Source code in tux/bot.py
async def _get_prefix(self, bot: Tux, message: discord.Message) -> list[str]:
"""
Resolves the command prefix for a given message with caching.
This method dynamically retrieves the command prefix for a guild, caching
the result to avoid repeated database lookups. It falls back to the
default prefix if one is not configured or if a database error occurs.
It also allows the bot to be invoked by mentioning it.
Parameters
----------
bot : Tux
The instance of the bot.
message : discord.Message
The message to resolve the prefix for.
Returns
-------
list[str]
A list of command prefixes, including mentions.
"""
if not message.guild:
return commands.when_mentioned_or(CONFIG.DEFAULT_PREFIX)(self, message)
# Check the cache for a stored prefix
if cached_prefix := self.prefix_cache.get(message.guild.id):
return commands.when_mentioned_or(cached_prefix)(self, message)
# If not in cache, query the database
if db.is_connected():
try:
if prefix := await DatabaseController().guild_config.get_guild_prefix(message.guild.id):
self.prefix_cache[message.guild.id] = prefix
return commands.when_mentioned_or(prefix)(self, message)
except Exception as e:
logger.error(f"Error getting guild prefix for guild {message.guild.id}: {e}")
self.sentry_manager.capture_exception(e)
# Fallback to the default prefix if no custom one is found
return commands.when_mentioned_or(CONFIG.DEFAULT_PREFIX)(self, message)
invoke(ctx: commands.Context[Any]) -> None
async
¶
Overrides the default invoke method to wrap command execution in a Sentry transaction.
This ensures that every command invocation is traced, allowing for performance monitoring and capturing of related spans (e.g., database queries).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx | Context[Any] | The context of the command invocation. | required |
Source code in tux/bot.py
async def invoke(self, ctx: commands.Context[Any]) -> None:
"""
Overrides the default invoke method to wrap command execution in a Sentry transaction.
This ensures that every command invocation is traced, allowing for performance
monitoring and capturing of related spans (e.g., database queries).
Parameters
----------
ctx : commands.Context[Any]
The context of the command invocation.
"""
if not self.sentry_manager.is_initialized:
await super().invoke(ctx)
return
# Create a transaction for every invocation, even if ctx.command is None
command_name = getattr(ctx.command, "qualified_name", None) or "unknown_command"
op = "command"
description = ctx.message.content
with start_transaction(op, command_name, description):
# Set comprehensive context using the SentryManager
self.sentry_manager.set_command_context(ctx)
# Execute the original command invocation logic
await super().invoke(ctx)