tux.utils.task_manager
¶
Asynchronous Task Management Utility.
This module provides the TaskManager
class, which encapsulates the logic for monitoring, categorizing, and managing the lifecycle of asyncio tasks within the bot. By abstracting this functionality, it keeps the main Tux
class cleaner and more focused on its core responsibilities.
The manager is responsible for: - Periodically monitoring all running asyncio tasks. - Categorizing tasks based on their naming conventions (e.g., discord.py internal tasks, scheduled tasks, command tasks). - Gracefully stopping and cancelling tasks during the bot's shutdown sequence. - Health monitoring and automatic recovery of critical tasks. - Collecting performance metrics and statistics.
Classes:
Name | Description |
---|---|
TaskCategory | Categories for background tasks. |
TaskPriority | Task priority levels for shutdown ordering. |
TaskMetrics | Metrics tracking for individual tasks. |
TaskHealth | Health status of a task. |
CriticalTaskConfig | Configuration for critical tasks that should be monitored and restarted. |
CriticalTasksProvider | Protocol for cogs that provide critical tasks. |
TaskManager | Enhanced task manager with health monitoring, metrics, and recovery capabilities. |
Functions:
Name | Description |
---|---|
instrumented_task | Decorator to instrument a task coroutine for monitoring/metrics. |
Classes¶
TaskMetrics(name: str, category: TaskCategory, priority: TaskPriority = TaskPriority.NORMAL, start_time: float = time.time(), restart_count: int = 0, last_restart: float | None = None, total_runtime: float = 0.0, avg_runtime: float = 0.0, max_runtime: float = 0.0, error_count: int = 0, last_error: str | None = None, last_error_time: float | None = None)
dataclass
¶
Metrics tracking for individual tasks.
TaskHealth
¶
CriticalTaskConfig(name: str, cog_name: str, task_attr: str, priority: TaskPriority = TaskPriority.HIGH, max_restarts: int = 5, restart_delay: float = 30.0, health_check_interval: float = 300.0)
dataclass
¶
Configuration for critical tasks that should be monitored and restarted.
TaskManager(bot: BotProtocol)
¶
Enhanced task manager with health monitoring, metrics, and recovery capabilities.
Manages the lifecycle of asyncio tasks for the bot with advanced features: - Task registration and health monitoring - Automatic recovery of failed critical tasks - Performance metrics and statistics collection
Initialize the TaskManager with enhanced monitoring capabilities.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bot | BotProtocol | The bot instance that conforms to the protocol. | required |
Methods:
Name | Description |
---|---|
setup_task_instrumentation | Initializes instrumentation for all registered critical tasks. |
start | Starts the background task monitoring loop if it's not already running. |
stop | Stops the background task monitoring loop. |
register_critical_task | Register a critical task for health monitoring and recovery. |
discover_and_register_cog_tasks | Discover and register critical tasks from all loaded cogs. |
unregister_critical_task | Unregister a critical task when its cog is unloaded. |
cleanup_cog_tasks | Clean up all critical tasks associated with a specific cog. |
get_task_health | Get health status for a specific task. |
get_task_statistics | Get comprehensive task statistics. |
restart_critical_task | Attempt to restart a critical task. |
cancel_all_tasks | Gracefully cancels all managed asyncio tasks with priority ordering. |
Source code in tux/utils/task_manager.py
def __init__(self, bot: BotProtocol) -> None:
"""
Initialize the TaskManager with enhanced monitoring capabilities.
Parameters
----------
bot : BotProtocol
The bot instance that conforms to the protocol.
"""
self.bot = bot
# Task registration and monitoring
self.critical_tasks: dict[str, CriticalTaskConfig] = {}
self.task_metrics: dict[str, TaskMetrics] = {}
self.task_history: dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=100))
# Health monitoring
self.last_health_check: float = 0.0
self.health_check_interval: float = 300.0 # 5 minutes
Functions¶
setup_task_instrumentation() -> None
¶
Initializes instrumentation for all registered critical tasks.
To ensure compatibility with discord.py and avoid relying on internal implementation details, critical task coroutines should be wrapped with the @instrumented_task decorator at definition time. This ensures that instrumentation is applied in a supported and robust manner.
Example usage: @instrumented_task async def my_critical_task(...): ...
This method can still be used for any additional setup or validation.
Source code in tux/utils/task_manager.py
def setup_task_instrumentation(self) -> None:
"""
Initializes instrumentation for all registered critical tasks.
To ensure compatibility with discord.py and avoid relying on internal
implementation details, critical task coroutines should be wrapped with
the @instrumented_task decorator at definition time. This ensures that
instrumentation is applied in a supported and robust manner.
Example usage:
@instrumented_task
async def my_critical_task(...):
...
This method can still be used for any additional setup or validation.
"""
logger.info("Validating Sentry instrumentation for critical tasks...")
for task_name, config in self.critical_tasks.items():
if not (cog := self.bot.cogs.get(config.cog_name)):
logger.warning(f"Cog {config.cog_name} not found for task {task_name}. Skipping instrumentation.")
continue
if not (task_loop := getattr(cog, config.task_attr, None)):
logger.warning(
f"Task loop {config.task_attr} not found in cog {config.cog_name}. Skipping instrumentation.",
)
continue
if isinstance(task_loop, tasks.Loop):
try:
# We are confident .coro exists and is a callable coroutine on a tasks.Loop instance.
# The type checker struggles with this dynamic attribute from the discord.py library.
original_coro = cast(Callable[..., Coroutine[Any, Any, None]], task_loop.coro) # type: ignore[attr-defined]
decorated_loop = transaction(op="task.run", name=f"task.{task_name}")(original_coro)
task_loop.coro = decorated_loop # type: ignore[attr-defined]
logger.debug(f"Instrumented task: {task_name}")
except AttributeError:
logger.warning(f"Could not find a 'coro' on task {task_name}. Skipping instrumentation.")
else:
logger.warning(
f"Attribute {config.task_attr} in {config.cog_name} is not a Loop. Skipping instrumentation.",
)
start() -> None
¶
Starts the background task monitoring loop if it's not already running.
Source code in tux/utils/task_manager.py
stop() -> None
¶
register_critical_task(config: CriticalTaskConfig) -> None
¶
Register a critical task for health monitoring and recovery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | CriticalTaskConfig | Configuration for the critical task to register. | required |
Source code in tux/utils/task_manager.py
def register_critical_task(self, config: CriticalTaskConfig) -> None:
"""
Register a critical task for health monitoring and recovery.
Parameters
----------
config : CriticalTaskConfig
Configuration for the critical task to register.
"""
self.critical_tasks[config.name] = config
self.task_metrics[config.name] = TaskMetrics(
name=config.name,
category=TaskCategory.SCHEDULED,
priority=config.priority,
)
logger.debug(f"Registered critical task: {config.name}")
discover_and_register_cog_tasks() -> None
¶
Discover and register critical tasks from all loaded cogs.
This method iterates through all loaded cogs and looks for a get_critical_tasks
method. If found, it calls the method to get a list of CriticalTaskConfig objects and registers them.
Source code in tux/utils/task_manager.py
def discover_and_register_cog_tasks(self) -> None:
"""
Discover and register critical tasks from all loaded cogs.
This method iterates through all loaded cogs and looks for a
`get_critical_tasks` method. If found, it calls the method to
get a list of CriticalTaskConfig objects and registers them.
"""
for cog_name, cog in self.bot.cogs.items():
if isinstance(cog, CriticalTasksProvider):
try:
task_configs = cog.get_critical_tasks()
for config in task_configs:
self.register_critical_task(config)
logger.debug(f"Discovered task {config.name} from cog {cog_name}")
except Exception as e:
logger.warning(f"Error discovering tasks from cog {cog_name}: {e}")
continue
unregister_critical_task(task_name: str) -> None
¶
Unregister a critical task when its cog is unloaded.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name | str | The name of the task to unregister. | required |
Source code in tux/utils/task_manager.py
def unregister_critical_task(self, task_name: str) -> None:
"""
Unregister a critical task when its cog is unloaded.
Parameters
----------
task_name : str
The name of the task to unregister.
"""
if task_name in self.critical_tasks:
del self.critical_tasks[task_name]
logger.debug(f"Unregistered critical task: {task_name}")
if task_name in self.task_metrics:
del self.task_metrics[task_name]
logger.debug(f"Removed metrics for task: {task_name}")
cleanup_cog_tasks(cog_name: str) -> None
¶
Clean up all critical tasks associated with a specific cog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cog_name | str | The name of the cog that was unloaded. | required |
Source code in tux/utils/task_manager.py
def cleanup_cog_tasks(self, cog_name: str) -> None:
"""
Clean up all critical tasks associated with a specific cog.
Parameters
----------
cog_name : str
The name of the cog that was unloaded.
"""
tasks_to_remove = [
task_name for task_name, config in self.critical_tasks.items() if config.cog_name == cog_name
]
for task_name in tasks_to_remove:
self.unregister_critical_task(task_name)
if tasks_to_remove:
logger.info(f"Cleaned up {len(tasks_to_remove)} critical tasks for unloaded cog: {cog_name}")
get_task_health(task_name: str) -> TaskHealth | None
¶
Get health status for a specific task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name | str | The name of the task to check. | required |
Returns:
Type | Description |
---|---|
TaskHealth | None | Health status or None if task not found. |
Source code in tux/utils/task_manager.py
def get_task_health(self, task_name: str) -> TaskHealth | None:
"""
Get health status for a specific task.
Parameters
----------
task_name : str
The name of the task to check.
Returns
-------
TaskHealth | None
Health status or None if task not found.
"""
if (metrics := self.task_metrics.get(task_name)) is None:
return None
current_time = time.time()
uptime = current_time - metrics.start_time
# Calculate error rate (errors per hour)
error_rate = (metrics.error_count / max(uptime / 3600, 0.1)) if uptime > 0 else 0.0
# Task is healthy if it has low error rate and hasn't been restarting frequently
is_healthy = (
error_rate < 10.0 # Less than 10 errors per hour
and metrics.restart_count < 3 # Less than 3 restarts
and (not metrics.last_restart or current_time - metrics.last_restart > 300) # No restart in last 5 minutes
)
return TaskHealth(
is_healthy=is_healthy,
uptime=uptime,
error_rate=error_rate,
restart_count=metrics.restart_count,
last_seen=current_time,
)
get_task_statistics() -> dict[str, Any]
¶
Get comprehensive task statistics.
Returns:
Type | Description |
---|---|
dict[str, Any] | Statistics about all monitored tasks. |
Source code in tux/utils/task_manager.py
def get_task_statistics(self) -> dict[str, Any]:
"""
Get comprehensive task statistics.
Returns
-------
dict[str, Any]
Statistics about all monitored tasks.
"""
# Initialize counters
healthy_tasks = 0
unhealthy_tasks = 0
total_restarts = 0
total_errors = 0
categories: defaultdict[str, int] = defaultdict(int)
priorities: defaultdict[str, int] = defaultdict(int)
for task_name, metrics in self.task_metrics.items():
if health := self.get_task_health(task_name):
if health.is_healthy:
healthy_tasks += 1
else:
unhealthy_tasks += 1
categories[metrics.category.name] += 1
priorities[metrics.priority.name] += 1
total_restarts += metrics.restart_count
total_errors += metrics.error_count
return {
"total_tasks": len(self.task_metrics),
"critical_tasks": len(self.critical_tasks),
"healthy_tasks": healthy_tasks,
"unhealthy_tasks": unhealthy_tasks,
"categories": dict(categories),
"priorities": dict(priorities),
"total_restarts": total_restarts,
"total_errors": total_errors,
}
restart_critical_task(task_name: str) -> bool
async
¶
Attempt to restart a critical task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name | str | The name of the task to restart. | required |
Returns:
Type | Description |
---|---|
bool | True if restart was successful, False otherwise. |
Source code in tux/utils/task_manager.py
async def restart_critical_task(self, task_name: str) -> bool: # noqa: PLR0911
"""
Attempt to restart a critical task.
Parameters
----------
task_name : str
The name of the task to restart.
Returns
-------
bool
True if restart was successful, False otherwise.
"""
# Validate task is critical and get config/metrics
if task_name not in self.critical_tasks:
logger.warning(f"Cannot restart non-critical task: {task_name}")
return False
config = self.critical_tasks[task_name]
metrics = self.task_metrics[task_name]
current_time = time.time()
# Check restart constraints
if metrics.restart_count >= config.max_restarts:
logger.error(f"Task {task_name} has exceeded max restarts ({config.max_restarts})")
return False
if metrics.last_restart and current_time - metrics.last_restart < config.restart_delay:
logger.warning(f"Task {task_name} is in restart cooldown")
return False
# Find and validate the cog and task
cog = self.bot.cogs.get(config.cog_name)
if not cog:
logger.error(f"Cog {config.cog_name} not found for task {task_name}")
return False
task_loop = getattr(cog, config.task_attr, None)
if not isinstance(task_loop, tasks.Loop):
logger.error(f"Task {config.task_attr} not found in cog {config.cog_name}")
return False
# Attempt restart
try:
if task_loop.is_running():
task_loop.restart()
else:
task_loop.start()
except Exception as e:
logger.error(f"Failed to restart task {task_name}: {e}")
self.bot.sentry_manager.capture_exception(e)
return False
else:
# Update metrics on successful restart
metrics.restart_count += 1
metrics.last_restart = current_time
metrics.start_time = current_time
logger.info(f"Successfully restarted critical task: {task_name}")
return True
cancel_all_tasks() -> None
async
¶
Gracefully cancels all managed asyncio tasks with priority ordering.
This is the main entrypoint for the shutdown process. It stops all discord.ext.tasks
loops and then proceeds to cancel all other categorized tasks in priority order.
Source code in tux/utils/task_manager.py
async def cancel_all_tasks(self) -> None:
"""
Gracefully cancels all managed asyncio tasks with priority ordering.
This is the main entrypoint for the shutdown process. It stops all
`discord.ext.tasks` loops and then proceeds to cancel all other
categorized tasks in priority order.
"""
with start_span("bot.cleanup_tasks", "Cleaning up running tasks"):
try:
await self._stop_task_loops()
all_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
tasks_by_type = self._categorize_tasks(all_tasks)
# Cancel tasks in priority order (low priority first)
await self._cancel_tasks_by_priority(tasks_by_type)
except Exception as e:
logger.error(f"Error during task cleanup: {e}")
self.bot.sentry_manager.capture_exception(e)
_monitor_tasks_loop() -> None
async
¶
Enhanced task monitoring with health checks and metrics collection.
This loop runs every 60 seconds to gather all tasks, categorize them, handle finished tasks, perform health checks, and collect metrics.
Raises:
Type | Description |
---|---|
RuntimeError | If a critical, unhandled exception occurs during task monitoring. |
Source code in tux/utils/task_manager.py
@tasks.loop(seconds=60)
async def _monitor_tasks_loop(self) -> None:
"""
Enhanced task monitoring with health checks and metrics collection.
This loop runs every 60 seconds to gather all tasks, categorize them,
handle finished tasks, perform health checks, and collect metrics.
Raises
------
RuntimeError
If a critical, unhandled exception occurs during task monitoring.
"""
with start_span("bot.monitor_tasks", "Monitoring async tasks"):
try:
current_time = time.time()
all_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
tasks_by_type = self._categorize_tasks(all_tasks)
except Exception as e:
logger.error(f"Error during task categorization: {e}")
self.bot.sentry_manager.capture_exception(e)
return
try:
await self._process_finished_tasks(tasks_by_type)
except Exception as e:
logger.error(f"Error processing finished tasks: {e}")
self.bot.sentry_manager.capture_exception(e)
try:
self._update_task_metrics(tasks_by_type, current_time)
except Exception as e:
logger.error(f"Error updating task metrics: {e}")
self.bot.sentry_manager.capture_exception(e)
try:
if current_time - self.last_health_check > self.health_check_interval:
await self._perform_health_checks()
self.last_health_check = current_time
except Exception as e:
logger.error(f"Error performing health checks: {e}")
self.bot.sentry_manager.capture_exception(e)
_categorize_tasks(tasks: list[asyncio.Task[Any]]) -> dict[TaskCategory, list[asyncio.Task[Any]]]
¶
Categorizes a list of tasks based on their names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks | list[Task[Any]] | The list of asyncio tasks to categorize. | required |
Returns:
Type | Description |
---|---|
dict[TaskCategory, list[Task[Any]]] | A dictionary mapping each task category to a list of tasks. |
Source code in tux/utils/task_manager.py
def _categorize_tasks(self, tasks: list[asyncio.Task[Any]]) -> dict[TaskCategory, list[asyncio.Task[Any]]]:
"""
Categorizes a list of tasks based on their names.
Parameters
----------
tasks : list[asyncio.Task[Any]]
The list of asyncio tasks to categorize.
Returns
-------
dict[TaskCategory, list[asyncio.Task[Any]]]
A dictionary mapping each task category to a list of tasks.
"""
tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]] = {category: [] for category in TaskCategory}
for task in tasks:
if task.done():
continue
name = self._get_task_name(task)
category = self._get_task_category(name)
tasks_by_type.setdefault(category, []).append(task)
if unknown_tasks := tasks_by_type.get(TaskCategory.UNKNOWN):
task_names = [self._get_task_name(t) for t in unknown_tasks]
logger.warning(f"Found {len(unknown_tasks)} uncategorized tasks: {', '.join(task_names)}")
return tasks_by_type
_get_task_category(name: str) -> TaskCategory
¶
Determines the category of a task from its name.
It first checks against the TASK_PREFIX_MAP
for known system/library tasks, then checks for command-related tasks, and finally defaults to a general system task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | The name of the asyncio task. | required |
Returns:
Type | Description |
---|---|
TaskCategory | The determined category for the task. |
Source code in tux/utils/task_manager.py
def _get_task_category(self, name: str) -> TaskCategory:
"""
Determines the category of a task from its name.
It first checks against the `TASK_PREFIX_MAP` for known system/library
tasks, then checks for command-related tasks, and finally defaults
to a general system task.
Parameters
----------
name : str
The name of the asyncio task.
Returns
-------
TaskCategory
The determined category for the task.
"""
if name in self.critical_tasks:
return TaskCategory.SCHEDULED
# Default asyncio tasks (e.g., Task-1) are considered SYSTEM tasks.
if name.startswith("Task-"):
return TaskCategory.SYSTEM
return next(
(
category
for prefixes, category in self.TASK_PREFIX_MAP.items()
if any(name.startswith(p) for p in prefixes)
),
(TaskCategory.COMMAND if "command_" in name.lower() else TaskCategory.UNKNOWN),
)
_process_finished_tasks(tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None
async
¶
Awaits any tasks that have already completed to handle their results.
This is important for preventing "awaitable was never awaited" warnings and ensuring that exceptions from completed tasks are raised and logged.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks_by_type | dict[TaskCategory, list[Task[Any]]] | A dictionary of tasks, categorized by type. | required |
Source code in tux/utils/task_manager.py
async def _process_finished_tasks(self, tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None:
"""
Awaits any tasks that have already completed to handle their results.
This is important for preventing "awaitable was never awaited" warnings
and ensuring that exceptions from completed tasks are raised and logged.
Parameters
----------
tasks_by_type : dict[TaskCategory, list[asyncio.Task[Any]]]
A dictionary of tasks, categorized by type.
"""
for task_list in tasks_by_type.values():
for task in task_list:
if task.done():
with contextlib.suppress(asyncio.CancelledError):
try:
await task
except Exception as e:
# Log task exceptions and update metrics
logger.error(f"Task {(task_name := self._get_task_name(task))} failed with exception: {e}")
self._record_task_error(task_name, str(e))
_update_task_metrics(tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]], current_time: float) -> None
¶
Update metrics for all running tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks_by_type | dict[TaskCategory, list[Task[Any]]] | Categorized tasks. | required |
current_time | float | Current timestamp. | required |
Source code in tux/utils/task_manager.py
def _update_task_metrics(
self,
tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]],
current_time: float,
) -> None:
"""
Update metrics for all running tasks.
Parameters
----------
tasks_by_type : dict[TaskCategory, list[asyncio.Task[Any]]]
Categorized tasks.
current_time : float
Current timestamp.
"""
# Update runtime metrics for critical tasks
for task_name, config in self.critical_tasks.items():
if cog := self.bot.cogs.get(config.cog_name):
task_loop = getattr(cog, config.task_attr, None)
if isinstance(task_loop, tasks.Loop) and task_loop.is_running():
metrics = self.task_metrics[task_name]
metrics.total_runtime = current_time - metrics.start_time
self.task_history[task_name].append(current_time)
_perform_health_checks() -> None
async
¶
Perform health checks on all critical tasks.
Source code in tux/utils/task_manager.py
async def _perform_health_checks(self) -> None:
"""Perform health checks on all critical tasks."""
unhealthy_tasks: list[str] = []
for task_name, config in self.critical_tasks.items():
cog = self.bot.cogs.get(config.cog_name)
if not cog:
logger.warning(f"Cog {config.cog_name} not found for critical task {task_name}")
continue
task_loop = getattr(cog, config.task_attr, None)
if not isinstance(task_loop, tasks.Loop):
logger.warning(f"Task {config.task_attr} not found in cog {config.cog_name}")
continue
# Check if task is running
if not task_loop.is_running():
logger.warning(f"Critical task {task_name} is not running")
unhealthy_tasks.append(task_name)
continue
# Check task health
health = self.get_task_health(task_name)
if health and not health.is_healthy:
logger.warning(f"Critical task {task_name} is unhealthy: {health}")
unhealthy_tasks.append(task_name)
# Attempt to restart unhealthy critical tasks
for task_name in unhealthy_tasks:
if await self.restart_critical_task(task_name):
logger.info(f"Successfully recovered unhealthy task: {task_name}")
_record_task_error(task_name: str, error_msg: str) -> None
¶
Record an error for a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name | str | The name of the task. | required |
error_msg | str | The error message. | required |
Source code in tux/utils/task_manager.py
def _record_task_error(self, task_name: str, error_msg: str) -> None:
"""
Record an error for a task.
Parameters
----------
task_name : str
The name of the task.
error_msg : str
The error message.
"""
if task_name in self.task_metrics:
metrics = self.task_metrics[task_name]
metrics.error_count += 1
metrics.last_error = error_msg
metrics.last_error_time = time.time()
_stop_task_loops() -> None
async
¶
Stops all registered discord.ext.tasks.Loop
instances in all cogs.
This is a critical first step in the cleanup process to prevent new tasks from being created while shutdown is in progress.
Source code in tux/utils/task_manager.py
async def _stop_task_loops(self) -> None:
"""
Stops all registered `discord.ext.tasks.Loop` instances in all cogs.
This is a critical first step in the cleanup process to prevent new
tasks from being created while shutdown is in progress.
"""
with start_span("bot.stop_task_loops", "Stopping task loops"):
for cog_name, cog in self.bot.cogs.items():
if not cog:
continue
for name, value in cog.__dict__.items():
if isinstance(value, tasks.Loop):
try:
value.stop()
logger.debug(f"Stopped task loop {cog_name}.{name}")
except Exception as e:
logger.error(f"Error stopping task loop {cog_name}.{name}: {e}")
# Only stop the monitor loop if all cog tasks were processed without critical errors
if self._monitor_tasks_loop.is_running():
self._monitor_tasks_loop.stop()
_get_task_name(task: asyncio.Task[Any]) -> str
staticmethod
¶
Gets a descriptive name for an asyncio task.
If a task was not explicitly named, it attempts to derive a name from its coroutine object for better logging.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | Task[Any] | The asyncio task to get the name from. | required |
Returns:
Type | Description |
---|---|
str | A descriptive name for the task. |
Source code in tux/utils/task_manager.py
@staticmethod
def _get_task_name(task: asyncio.Task[Any]) -> str:
"""
Gets a descriptive name for an asyncio task.
If a task was not explicitly named, it attempts to derive a name
from its coroutine object for better logging.
Parameters
----------
task : asyncio.Task[Any]
The asyncio task to get the name from.
Returns
-------
str
A descriptive name for the task.
"""
name = task.get_name() or "unnamed"
if name in ("None", "unnamed"):
coro = task.get_coro()
name = getattr(coro, "__qualname__", str(coro))
return name
_cancel_tasks_by_priority(tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None
async
¶
Cancel tasks in priority order (low priority first).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks_by_type | dict[TaskCategory, list[Task[Any]]] | The dictionary of tasks to be cancelled. | required |
Source code in tux/utils/task_manager.py
async def _cancel_tasks_by_priority(self, tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None:
"""
Cancel tasks in priority order (low priority first).
Parameters
----------
tasks_by_type : dict[TaskCategory, list[asyncio.Task[Any]]]
The dictionary of tasks to be cancelled.
"""
# Define shutdown priority order (low priority first)
shutdown_order = [
TaskCategory.UNKNOWN,
TaskCategory.COMMAND,
TaskCategory.SYSTEM,
TaskCategory.SCHEDULED,
TaskCategory.GATEWAY,
]
with start_span("bot.cancel_tasks", "Cancelling tasks by priority") as span:
for category in shutdown_order:
task_list = tasks_by_type.get(category, [])
if not task_list:
continue
task_names = [self._get_task_name(t) for t in task_list]
names = ", ".join(task_names)
logger.debug(f"Cancelling {len(task_list)} {category.name}: {names}")
span.set_data(f"tasks.{category.name.lower()}", task_names)
for task in task_list:
task.cancel()
results = await asyncio.gather(*task_list, return_exceptions=True)
for result in results:
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.error(f"Exception during task cancellation for {category.name}: {result!r}")
logger.debug(f"Cancelled {category.name}")
_cancel_tasks(tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None
async
¶
Legacy method - redirects to priority-based cancellation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks_by_type | dict[TaskCategory, list[Task[Any]]] | The dictionary of tasks to be cancelled. | required |
Source code in tux/utils/task_manager.py
async def _cancel_tasks(self, tasks_by_type: dict[TaskCategory, list[asyncio.Task[Any]]]) -> None:
"""
Legacy method - redirects to priority-based cancellation.
Parameters
----------
tasks_by_type : dict[TaskCategory, list[asyncio.Task[Any]]]
The dictionary of tasks to be cancelled.
"""
await self._cancel_tasks_by_priority(tasks_by_type)
Functions¶
instrumented_task(coro: Callable[..., Coroutine[Any, Any, Any]]) -> Callable[..., Coroutine[Any, Any, Any]]
¶
Decorator to instrument a task coroutine for monitoring/metrics. Apply this decorator to critical task coroutines at definition time.
Source code in tux/utils/task_manager.py
def instrumented_task(coro: Callable[..., Coroutine[Any, Any, Any]]) -> Callable[..., Coroutine[Any, Any, Any]]:
"""
Decorator to instrument a task coroutine for monitoring/metrics.
Apply this decorator to critical task coroutines at definition time.
"""
@functools.wraps(coro)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Insert instrumentation logic here (e.g., Sentry, metrics, logging)
# Start timing, add tracing, etc.
return await coro(*args, **kwargs)
return wrapper