Coverage for hub_service/scheduler/repository_scheduler.py: 30%

96 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-06-03 11:06 +0000

1from datetime import datetime 

2 

3from sqlalchemy import exists, func, select, update 

4from sqlalchemy.exc import SQLAlchemyError 

5from sqlalchemy.ext.asyncio import AsyncSession 

6 

7from hub_service.scheduler.model import ScheduledTask 

8 

9from .exception import RepositoryCreateTaskError, RepositoryReadTaskError, RepositoryUpdateTaskError 

10 

11 

12class SchedulerRepository: 

13 def __init__(self, session: AsyncSession) -> None: 

14 self.session = session 

15 

16 async def create_task(self, task_data: dict) -> ScheduledTask: 

17 try: 

18 new_task = ScheduledTask(**task_data) 

19 self.session.add(new_task) 

20 await self.session.commit() 

21 except SQLAlchemyError as e: 

22 await self.session.rollback() 

23 raise RepositoryCreateTaskError from e 

24 return new_task 

25 

26 async def get_active_tasks_by_group(self, user_id: str, task_group: str) -> ScheduledTask | None: 

27 try: 

28 stmt = ( 

29 select(ScheduledTask) 

30 .where( 

31 ScheduledTask.user_id == user_id, 

32 ScheduledTask.task_group == task_group, 

33 ) 

34 .order_by(ScheduledTask.priority.desc()) 

35 ) 

36 result = await self.session.execute(stmt) 

37 return result.scalars().first() 

38 except SQLAlchemyError as e: 

39 raise RepositoryReadTaskError from e 

40 

41 async def disable_group_tasks(self, user_id: str, task_group: str) -> None: 

42 try: 

43 await self.session.execute( 

44 update(ScheduledTask) 

45 .where(ScheduledTask.user_id == user_id, ScheduledTask.task_group == task_group) 

46 .values(enabled=False) 

47 ) 

48 await self.session.commit() 

49 except SQLAlchemyError as e: 

50 await self.session.rollback() 

51 raise RepositoryUpdateTaskError(message="Disable tasks failed") from e 

52 

53 async def get_pending_user_groups(self) -> list[tuple[str, str]]: 

54 try: 

55 result = await self.session.execute( 

56 select(ScheduledTask.user_id, ScheduledTask.task_group) 

57 .where(ScheduledTask.enabled, ScheduledTask.next_run <= func.now()) 

58 .distinct() 

59 ) 

60 return result.fetchall() 

61 except SQLAlchemyError as e: 

62 raise RepositoryReadTaskError(message="Read pending groups failed") from e 

63 

64 async def lock_and_get_tasks(self, user_id: str, task_group: str) -> list[ScheduledTask]: 

65 try: 

66 stmt = ( 

67 select(ScheduledTask) 

68 .where( 

69 ScheduledTask.user_id == user_id, 

70 ScheduledTask.task_group == task_group, 

71 ScheduledTask.enabled, 

72 ScheduledTask.next_run <= func.now(), 

73 ) 

74 .with_for_update(skip_locked=True) 

75 .order_by(ScheduledTask.priority.desc(), ScheduledTask.next_run.desc()) 

76 ) 

77 result = await self.session.execute(stmt) 

78 return result.scalars().all() 

79 except SQLAlchemyError as e: 

80 raise RepositoryReadTaskError(message="Lock tasks failed") from e 

81 

82 async def update_task_enabled_status(self, task_id: str, enabled: bool) -> None: 

83 try: 

84 result = await self.session.execute( 

85 update(ScheduledTask) 

86 .where(ScheduledTask.id == task_id) 

87 .values(enabled=enabled) 

88 .returning(ScheduledTask.id) 

89 ) 

90 

91 if not result.scalar_one_or_none(): 

92 raise RepositoryUpdateTaskError(message="Task not found or not updated") 

93 

94 await self.session.commit() 

95 except SQLAlchemyError as e: 

96 await self.session.rollback() 

97 raise RepositoryUpdateTaskError(message="Failed to update task status") from e 

98 

99 async def exists_by_id(self, task_id: int) -> bool: 

100 """Проверяет существование задачи по указанному ID.""" 

101 try: 

102 stmt = select(exists().where(ScheduledTask.id == task_id)) 

103 result = await self.session.execute(stmt) 

104 return result.scalar() 

105 except SQLAlchemyError as e: 

106 raise RepositoryReadTaskError(message="Failed to check task existence") from e 

107 

108 async def update_task_deadline(self, task_id: int, new_deadline: datetime) -> None: 

109 try: 

110 # Выполняем обновление с проверкой существования задачи 

111 result = await self.session.execute( 

112 update(ScheduledTask) 

113 .where(ScheduledTask.id == task_id) 

114 .values(deadline=new_deadline.replace(tzinfo=None)) 

115 .returning(ScheduledTask.id) 

116 ) 

117 

118 # Проверяем что задача была обновлена 

119 if not result.scalar_one_or_none(): 

120 raise RepositoryUpdateTaskError(message="Task not found or not updated") 

121 

122 await self.session.commit() 

123 except SQLAlchemyError as e: 

124 await self.session.rollback() 

125 raise RepositoryUpdateTaskError(message="Failed to update task deadline") from e 

126 

127 async def update_task_frequency_profile(self, task_id: int, new_profile_id: int | None) -> None: 

128 try: 

129 result = await self.session.execute( 

130 update(ScheduledTask) 

131 .where(ScheduledTask.id == task_id) 

132 .values(frequency_profile=new_profile_id) 

133 .returning(ScheduledTask.id) 

134 ) 

135 

136 if not result.scalar_one_or_none(): 

137 raise RepositoryUpdateTaskError(message="Task not found or not updated") 

138 

139 await self.session.commit() 

140 except SQLAlchemyError as e: 

141 await self.session.rollback() 

142 raise RepositoryUpdateTaskError(message="Failed to update task frequency profile") from e 

143 

144 async def reset_retry_count(self, task_id: int) -> None: 

145 try: 

146 result = await self.session.execute( 

147 update(ScheduledTask) 

148 .where(ScheduledTask.id == task_id) 

149 .values(retry_count=0) 

150 .returning(ScheduledTask.id) 

151 ) 

152 

153 if not result.scalar_one_or_none(): 

154 raise RepositoryUpdateTaskError(message="Task not found or retry count not reset") 

155 

156 await self.session.commit() 

157 except SQLAlchemyError as e: 

158 await self.session.rollback() 

159 raise RepositoryUpdateTaskError(message="Failed to reset retry count") from e 

160 

161 async def reset_all_retry_counts(self) -> int: 

162 """ 

163 Сбрасывает счетчики попыток для ВСЕХ задач в системе 

164 Возвращает количество обновленных задач 

165 """ 

166 try: 

167 result = await self.session.execute(update(ScheduledTask).values(retry_count=0)) 

168 await self.session.commit() 

169 except SQLAlchemyError as e: 

170 await self.session.rollback() 

171 raise RepositoryUpdateTaskError(message="Mass reset failed") from e 

172 else: 

173 return result.rowcount