-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingestion status callback update #142
base: main
Are you sure you want to change the base?
Changes from 10 commits
7eef6d0
b276050
377a8d3
537375f
a9f6ec1
201b4b4
2a11f63
854adeb
21980f5
9881684
14cfedc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from typing import List, Optional | ||
|
||
from pydantic import Field | ||
|
||
from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO | ||
from app.domain.data.lecture_unit_dto import LectureUnitDTO | ||
from app.domain.status.stage_dto import StageDTO | ||
|
||
|
||
class LecturesDeletionExecutionDto(PipelineExecutionDTO): | ||
lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits") | ||
settings: Optional[PipelineExecutionSettingsDTO] | ||
initial_stages: Optional[List[StageDTO]] = Field( | ||
default=None, alias="initialStages" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,3 +5,4 @@ | |
|
||
class IngestionStatusUpdateDTO(StatusUpdateDTO): | ||
result: Optional[str] = None | ||
id: Optional[int] = None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
import tempfile | ||
import threading | ||
from asyncio.log import logger | ||
from typing import Optional | ||
|
||
import fitz | ||
from langchain_core.output_parsers import StrOutputParser | ||
from langchain_core.prompts import ChatPromptTemplate | ||
|
@@ -27,9 +29,10 @@ | |
CapabilityRequestHandler, | ||
RequirementList, | ||
) | ||
from ..web.status import IngestionStatusCallback | ||
from langchain_text_splitters import RecursiveCharacterTextSplitter | ||
|
||
from ..web.status import ingestion_status_callback | ||
|
||
batch_update_lock = threading.Lock() | ||
|
||
|
||
|
@@ -90,8 +93,8 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline): | |
def __init__( | ||
self, | ||
client: WeaviateClient, | ||
dto: IngestionPipelineExecutionDto, | ||
callback: IngestionStatusCallback, | ||
dto: Optional[IngestionPipelineExecutionDto], | ||
callback: ingestion_status_callback, | ||
): | ||
super().__init__() | ||
self.collection = init_lecture_schema(client) | ||
|
@@ -116,33 +119,31 @@ def __init__( | |
def __call__(self) -> bool: | ||
try: | ||
self.callback.in_progress("Deleting old slides from database...") | ||
self.delete_old_lectures() | ||
self.delete_lecture_unit( | ||
self.dto.lecture_unit.course_id, | ||
self.dto.lecture_unit.lecture_id, | ||
self.dto.lecture_unit.lecture_unit_id, | ||
self.dto.settings.artemis_base_url, | ||
) | ||
Comment on lines
+122
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Streamlined The method has been effectively simplified to focus on a single Consider adding error handling for potential if self.dto is not None:
# existing code
else:
logger.error("DTO is None, cannot proceed with ingestion.")
return False Also applies to: 131-139 |
||
self.callback.done("Old slides removed") | ||
# Here we check if the operation is for updating or for deleting, | ||
# we only check the first file because all the files will have the same operation | ||
if not self.dto.lecture_units[0].to_update: | ||
self.callback.skip("Lecture Chunking and interpretation Skipped") | ||
self.callback.skip("No new slides to update") | ||
return True | ||
self.callback.in_progress("Chunking and interpreting lecture...") | ||
chunks = [] | ||
for i, lecture_unit in enumerate(self.dto.lecture_units): | ||
pdf_path = save_pdf(lecture_unit.pdf_file_base64) | ||
chunks.extend( | ||
self.chunk_data( | ||
lecture_pdf=pdf_path, | ||
lecture_unit_dto=lecture_unit, | ||
base_url=self.dto.settings.artemis_base_url, | ||
) | ||
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64) | ||
chunks.extend( | ||
self.chunk_data( | ||
lecture_pdf=pdf_path, | ||
lecture_unit_dto=self.dto.lecture_unit, | ||
base_url=self.dto.settings.artemis_base_url, | ||
) | ||
cleanup_temporary_file(pdf_path) | ||
) | ||
cleanup_temporary_file(pdf_path) | ||
self.callback.done("Lecture Chunking and interpretation Finished") | ||
self.callback.in_progress("Ingesting lecture chunks into database...") | ||
self.batch_update(chunks) | ||
self.callback.done("Lecture Ingestion Finished") | ||
logger.info( | ||
f"Lecture ingestion pipeline finished Successfully for course " | ||
f"{self.dto.lecture_units[0].course_name}" | ||
f"{self.dto.lecture_unit.course_name}" | ||
) | ||
return True | ||
except Exception as e: | ||
|
@@ -294,23 +295,27 @@ def get_course_language(self, page_content: str) -> str: | |
) | ||
return response.contents[0].text_content | ||
|
||
def delete_old_lectures(self): | ||
def delete_old_lectures( | ||
self, lecture_units: list[LectureUnitDTO], artemis_base_url: str | ||
): | ||
""" | ||
Delete the lecture unit from the database | ||
""" | ||
try: | ||
for lecture_unit in self.dto.lecture_units: | ||
for lecture_unit in lecture_units: | ||
if self.delete_lecture_unit( | ||
lecture_unit.course_id, | ||
lecture_unit.lecture_id, | ||
lecture_unit.lecture_unit_id, | ||
self.dto.settings.artemis_base_url, | ||
artemis_base_url, | ||
): | ||
logger.info("Lecture deleted successfully") | ||
else: | ||
logger.error("Failed to delete lecture") | ||
self.callback.done("Old slides removed") | ||
except Exception as e: | ||
logger.error(f"Error deleting lecture unit: {e}") | ||
self.callback.error("Error while removing old slides") | ||
return False | ||
|
||
def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from urllib.parse import unquote | ||
|
||
from fastapi import APIRouter, status, Response, Depends | ||
from weaviate.collections.classes.filters import Filter | ||
|
||
from app.dependencies import TokenValidator | ||
from ...vector_database.database import VectorDatabase | ||
from ...vector_database.lecture_schema import LectureSchema | ||
|
||
router = APIRouter(prefix="/api/v1", tags=["ingestion_status"]) | ||
|
||
|
||
@router.get( | ||
"/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state", | ||
dependencies=[Depends(TokenValidator())], | ||
) | ||
def get_lecture_unit_ingestion_state( | ||
course_id: int, lecture_id: int, lecture_unit_id: int, baseUrl: str | ||
): | ||
db = VectorDatabase() | ||
decoded_base_url = unquote(baseUrl) | ||
result = db.lectures.query.fetch_objects( | ||
filters=( | ||
Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url) | ||
& Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id) | ||
& Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id) | ||
& Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal( | ||
lecture_unit_id | ||
) | ||
), | ||
limit=1, | ||
return_properties=[LectureSchema.LECTURE_UNIT_NAME.value], | ||
) | ||
|
||
if len(result.objects) > 0: | ||
return Response( | ||
status_code=status.HTTP_200_OK, | ||
content='"DONE"', | ||
media_type="application/json", | ||
) | ||
else: | ||
return Response( | ||
status_code=status.HTTP_200_OK, | ||
content='"NOT_STARTED"', | ||
media_type="application/json", | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from typing import List | ||
|
||
from .status_update import StatusCallback | ||
from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO | ||
from ...domain.status.stage_state_dto import StageStateEnum | ||
from ...domain.status.stage_dto import StageDTO | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LecturesDeletionStatusCallback(StatusCallback): | ||
""" | ||
Callback class for updating the status of a Tutor Chat pipeline run. | ||
""" | ||
|
||
def __init__( | ||
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None | ||
): | ||
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status" | ||
|
||
current_stage_index = len(initial_stages) if initial_stages else 0 | ||
stages = initial_stages or [] | ||
stages += [ | ||
StageDTO( | ||
weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal" | ||
), | ||
] | ||
status = IngestionStatusUpdateDTO(stages=stages) | ||
stage = stages[current_stage_index] | ||
super().__init__(url, run_id, status, stage, current_stage_index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing imports and address type annotations.
The class
IngestionPipelineExecutionDto
has been updated with new attributes that require additional imports to resolve the issues flagged by static analysis tools:Optional
type is used but not imported fromtyping
.PipelineExecutionSettingsDTO
andStageDTO
are referenced but not defined or imported.To resolve these issues, add the necessary imports and definitions:
These changes will ensure that the code is syntactically correct and that the references to other DTOs are properly resolved.
Tools
Ruff