Coverage for hub_service/scheduler/repository_scheduler.py: 30%
96 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-06-03 11:06 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-06-03 11:06 +0000
1from datetime import datetime
3from sqlalchemy import exists, func, select, update
4from sqlalchemy.exc import SQLAlchemyError
5from sqlalchemy.ext.asyncio import AsyncSession
7from hub_service.scheduler.model import ScheduledTask
9from .exception import RepositoryCreateTaskError, RepositoryReadTaskError, RepositoryUpdateTaskError
12class SchedulerRepository:
13 def __init__(self, session: AsyncSession) -> None:
14 self.session = session
16 async def create_task(self, task_data: dict) -> ScheduledTask:
17 try:
18 new_task = ScheduledTask(**task_data)
19 self.session.add(new_task)
20 await self.session.commit()
21 except SQLAlchemyError as e:
22 await self.session.rollback()
23 raise RepositoryCreateTaskError from e
24 return new_task
26 async def get_active_tasks_by_group(self, user_id: str, task_group: str) -> ScheduledTask | None:
27 try:
28 stmt = (
29 select(ScheduledTask)
30 .where(
31 ScheduledTask.user_id == user_id,
32 ScheduledTask.task_group == task_group,
33 )
34 .order_by(ScheduledTask.priority.desc())
35 )
36 result = await self.session.execute(stmt)
37 return result.scalars().first()
38 except SQLAlchemyError as e:
39 raise RepositoryReadTaskError from e
41 async def disable_group_tasks(self, user_id: str, task_group: str) -> None:
42 try:
43 await self.session.execute(
44 update(ScheduledTask)
45 .where(ScheduledTask.user_id == user_id, ScheduledTask.task_group == task_group)
46 .values(enabled=False)
47 )
48 await self.session.commit()
49 except SQLAlchemyError as e:
50 await self.session.rollback()
51 raise RepositoryUpdateTaskError(message="Disable tasks failed") from e
53 async def get_pending_user_groups(self) -> list[tuple[str, str]]:
54 try:
55 result = await self.session.execute(
56 select(ScheduledTask.user_id, ScheduledTask.task_group)
57 .where(ScheduledTask.enabled, ScheduledTask.next_run <= func.now())
58 .distinct()
59 )
60 return result.fetchall()
61 except SQLAlchemyError as e:
62 raise RepositoryReadTaskError(message="Read pending groups failed") from e
64 async def lock_and_get_tasks(self, user_id: str, task_group: str) -> list[ScheduledTask]:
65 try:
66 stmt = (
67 select(ScheduledTask)
68 .where(
69 ScheduledTask.user_id == user_id,
70 ScheduledTask.task_group == task_group,
71 ScheduledTask.enabled,
72 ScheduledTask.next_run <= func.now(),
73 )
74 .with_for_update(skip_locked=True)
75 .order_by(ScheduledTask.priority.desc(), ScheduledTask.next_run.desc())
76 )
77 result = await self.session.execute(stmt)
78 return result.scalars().all()
79 except SQLAlchemyError as e:
80 raise RepositoryReadTaskError(message="Lock tasks failed") from e
82 async def update_task_enabled_status(self, task_id: str, enabled: bool) -> None:
83 try:
84 result = await self.session.execute(
85 update(ScheduledTask)
86 .where(ScheduledTask.id == task_id)
87 .values(enabled=enabled)
88 .returning(ScheduledTask.id)
89 )
91 if not result.scalar_one_or_none():
92 raise RepositoryUpdateTaskError(message="Task not found or not updated")
94 await self.session.commit()
95 except SQLAlchemyError as e:
96 await self.session.rollback()
97 raise RepositoryUpdateTaskError(message="Failed to update task status") from e
99 async def exists_by_id(self, task_id: int) -> bool:
100 """Проверяет существование задачи по указанному ID."""
101 try:
102 stmt = select(exists().where(ScheduledTask.id == task_id))
103 result = await self.session.execute(stmt)
104 return result.scalar()
105 except SQLAlchemyError as e:
106 raise RepositoryReadTaskError(message="Failed to check task existence") from e
108 async def update_task_deadline(self, task_id: int, new_deadline: datetime) -> None:
109 try:
110 # Выполняем обновление с проверкой существования задачи
111 result = await self.session.execute(
112 update(ScheduledTask)
113 .where(ScheduledTask.id == task_id)
114 .values(deadline=new_deadline.replace(tzinfo=None))
115 .returning(ScheduledTask.id)
116 )
118 # Проверяем что задача была обновлена
119 if not result.scalar_one_or_none():
120 raise RepositoryUpdateTaskError(message="Task not found or not updated")
122 await self.session.commit()
123 except SQLAlchemyError as e:
124 await self.session.rollback()
125 raise RepositoryUpdateTaskError(message="Failed to update task deadline") from e
127 async def update_task_frequency_profile(self, task_id: int, new_profile_id: int | None) -> None:
128 try:
129 result = await self.session.execute(
130 update(ScheduledTask)
131 .where(ScheduledTask.id == task_id)
132 .values(frequency_profile=new_profile_id)
133 .returning(ScheduledTask.id)
134 )
136 if not result.scalar_one_or_none():
137 raise RepositoryUpdateTaskError(message="Task not found or not updated")
139 await self.session.commit()
140 except SQLAlchemyError as e:
141 await self.session.rollback()
142 raise RepositoryUpdateTaskError(message="Failed to update task frequency profile") from e
144 async def reset_retry_count(self, task_id: int) -> None:
145 try:
146 result = await self.session.execute(
147 update(ScheduledTask)
148 .where(ScheduledTask.id == task_id)
149 .values(retry_count=0)
150 .returning(ScheduledTask.id)
151 )
153 if not result.scalar_one_or_none():
154 raise RepositoryUpdateTaskError(message="Task not found or retry count not reset")
156 await self.session.commit()
157 except SQLAlchemyError as e:
158 await self.session.rollback()
159 raise RepositoryUpdateTaskError(message="Failed to reset retry count") from e
161 async def reset_all_retry_counts(self) -> int:
162 """
163 Сбрасывает счетчики попыток для ВСЕХ задач в системе
164 Возвращает количество обновленных задач
165 """
166 try:
167 result = await self.session.execute(update(ScheduledTask).values(retry_count=0))
168 await self.session.commit()
169 except SQLAlchemyError as e:
170 await self.session.rollback()
171 raise RepositoryUpdateTaskError(message="Mass reset failed") from e
172 else:
173 return result.rowcount