Coverage for dormatory/api/routes/types.py: 100%

82 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-08-04 00:22 +0000

1""" 

2Types API routes for DORMATORY. 

3 

4This module provides RESTful API endpoints for managing type entities. 

5""" 

6 

7from typing import List, Optional 

8from uuid import UUID, uuid4 

9 

10from fastapi import APIRouter, HTTPException, Depends 

11from pydantic import BaseModel 

12from sqlalchemy.orm import Session 

13 

14from dormatory.api.dependencies import get_db 

15from dormatory.models.dormatory_model import Type, Object 

16 

17router = APIRouter(tags=["types"]) 

18 

19 

20class TypeCreate(BaseModel): 

21 type_name: str 

22 

23 

24class TypeUpdate(BaseModel): 

25 type_name: Optional[str] = None 

26 

27 

28class TypeResponse(BaseModel): 

29 id: UUID 

30 type_name: str 

31 

32 class Config: 

33 from_attributes = True 

34 

35 

36class ObjectResponse(BaseModel): 

37 id: int 

38 name: str 

39 version: int 

40 type_id: UUID 

41 created_on: str 

42 created_by: str 

43 

44 class Config: 

45 from_attributes = True 

46 

47 

48@router.post("/", response_model=TypeResponse) 

49async def create_type(type_data: TypeCreate, db: Session = Depends(get_db)): 

50 """ 

51 Create a new type. 

52  

53 Args: 

54 type_data: Type creation data 

55 db: Database session 

56  

57 Returns: 

58 Created type data 

59 """ 

60 # Create the type 

61 db_type = Type(type_name=type_data.type_name) 

62 

63 db.add(db_type) 

64 db.commit() 

65 db.refresh(db_type) 

66 

67 return TypeResponse.from_orm(db_type) 

68 

69 

70@router.get("/{type_id}", response_model=TypeResponse) 

71async def get_type_by_id(type_id: UUID, db: Session = Depends(get_db)): 

72 """ 

73 Get a type by its ID. 

74  

75 Args: 

76 type_id: Type ID 

77 db: Database session 

78  

79 Returns: 

80 Type data 

81 """ 

82 db_type = db.query(Type).filter(Type.id == type_id).first() 

83 if not db_type: 

84 raise HTTPException(status_code=404, detail="Type not found") 

85 

86 return TypeResponse.from_orm(db_type) 

87 

88 

89@router.get("/", response_model=List[TypeResponse]) 

90async def get_all_types( 

91 skip: int = 0, 

92 limit: int = 100, 

93 type_name: Optional[str] = None, 

94 db: Session = Depends(get_db) 

95): 

96 """ 

97 Get all types with optional filtering. 

98  

99 Args: 

100 skip: Number of records to skip 

101 limit: Maximum number of records to return 

102 type_name: Filter by type name 

103 db: Database session 

104  

105 Returns: 

106 List of types 

107 """ 

108 query = db.query(Type) 

109 

110 # Apply filters 

111 if type_name: 

112 query = query.filter(Type.type_name.contains(type_name)) 

113 

114 # Apply pagination 

115 types = query.offset(skip).limit(limit).all() 

116 

117 return [TypeResponse.from_orm(type_obj) for type_obj in types] 

118 

119 

120@router.put("/{type_id}", response_model=TypeResponse) 

121async def update_type(type_id: UUID, type_data: TypeUpdate, db: Session = Depends(get_db)): 

122 """ 

123 Update an existing type. 

124  

125 Args: 

126 type_id: Type ID to update 

127 type_data: Updated type data 

128 db: Database session 

129  

130 Returns: 

131 Updated type data 

132 """ 

133 db_type = db.query(Type).filter(Type.id == type_id).first() 

134 if not db_type: 

135 raise HTTPException(status_code=404, detail="Type not found") 

136 

137 # Update fields if provided 

138 if type_data.type_name is not None: 

139 db_type.type_name = type_data.type_name 

140 

141 db.commit() 

142 db.refresh(db_type) 

143 

144 return TypeResponse.from_orm(db_type) 

145 

146 

147@router.delete("/{type_id}") 

148async def delete_type(type_id: UUID, db: Session = Depends(get_db)): 

149 """ 

150 Delete a type. 

151  

152 Args: 

153 type_id: Type ID to delete 

154 db: Database session 

155  

156 Returns: 

157 Success message 

158 """ 

159 db_type = db.query(Type).filter(Type.id == type_id).first() 

160 if not db_type: 

161 raise HTTPException(status_code=404, detail="Type not found") 

162 

163 db.delete(db_type) 

164 db.commit() 

165 

166 return {"message": "Type deleted successfully"} 

167 

168 

169@router.post("/bulk", response_model=List[TypeResponse]) 

170async def create_types_bulk(type_data: List[TypeCreate], db: Session = Depends(get_db)): 

171 """ 

172 Create multiple types in a single operation. 

173  

174 Args: 

175 type_data: List of type creation data 

176 db: Database session 

177  

178 Returns: 

179 List of created types 

180 """ 

181 created_types = [] 

182 

183 for item in type_data: 

184 # Create the type 

185 db_type = Type(type_name=item.type_name) 

186 

187 db.add(db_type) 

188 created_types.append(db_type) 

189 

190 db.commit() 

191 

192 # Refresh all types to get their IDs 

193 for type_obj in created_types: 

194 db.refresh(type_obj) 

195 

196 return [TypeResponse.from_orm(type_obj) for type_obj in created_types] 

197 

198 

199@router.get("/{type_id}/objects", response_model=List[ObjectResponse]) 

200async def get_objects_by_type(type_id: UUID, db: Session = Depends(get_db)): 

201 """ 

202 Get all objects of a specific type. 

203  

204 Args: 

205 type_id: Type ID 

206 db: Database session 

207  

208 Returns: 

209 List of objects of the specified type 

210 """ 

211 # First verify the type exists 

212 db_type = db.query(Type).filter(Type.id == type_id).first() 

213 if not db_type: 

214 raise HTTPException(status_code=404, detail="Type not found") 

215 

216 # Get all objects of this type 

217 objects = db.query(Object).filter(Object.type_id == type_id).all() 

218 

219 return [ObjectResponse.from_orm(obj) for obj in objects]