Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion status callback update #142

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 0 additions & 2 deletions app/domain/data/lecture_unit_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


class LectureUnitDTO(BaseModel):
to_update: bool = Field(alias="toUpdate")
base_url: str = Field(alias="artemisBaseUrl")
pdf_file_base64: str = Field(default="", alias="pdfFile")
lecture_unit_id: int = Field(alias="lectureUnitId")
lecture_unit_name: str = Field(default="", alias="lectureUnitName")
Expand Down
15 changes: 15 additions & 0 deletions app/domain/ingestion/deletionPipelineExecutionDto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class LecturesDeletionExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
6 changes: 4 additions & 2 deletions app/domain/ingestion/ingestion_pipeline_execution_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@


class IngestionPipelineExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(
..., alias="pyrisLectureUnitWebhookDTOS"
lecture_unit: LectureUnitDTO = Field(..., alias="pyrisLectureUnit")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
Comment on lines +11 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add missing imports and address type annotations.

The class IngestionPipelineExecutionDto has been updated with new attributes that require additional imports to resolve the issues flagged by static analysis tools:

  1. The Optional type is used but not imported from typing.
  2. PipelineExecutionSettingsDTO and StageDTO are referenced but not defined or imported.

To resolve these issues, add the necessary imports and definitions:

+from typing import Optional
+from app.domain.settings import PipelineExecutionSettingsDTO
+from app.domain.stages import StageDTO

These changes will ensure that the code is syntactically correct and that the references to other DTOs are properly resolved.

Committable suggestion was skipped due to low confidence.

Tools
Ruff

11-11: Undefined name Optional

(F821)


11-11: Undefined name PipelineExecutionSettingsDTO

(F821)


12-12: Undefined name Optional

(F821)


12-12: Undefined name StageDTO

(F821)

)
1 change: 1 addition & 0 deletions app/domain/ingestion/ingestion_status_update_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

class IngestionStatusUpdateDTO(StatusUpdateDTO):
result: Optional[str] = None
id: Optional[int] = None
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.web.routers.health import router as health_router
from app.web.routers.pipelines import router as pipelines_router
from app.web.routers.webhooks import router as webhooks_router
from app.web.routers.ingestion_status import router as ingestion_status_router

import logging
from fastapi import FastAPI, Request, status
Expand Down Expand Up @@ -57,3 +58,4 @@ async def some_middleware(request: Request, call_next):
app.include_router(health_router)
app.include_router(pipelines_router)
app.include_router(webhooks_router)
app.include_router(ingestion_status_router)
51 changes: 28 additions & 23 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import tempfile
import threading
from asyncio.log import logger
from typing import Optional

import fitz
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
Expand All @@ -27,9 +29,10 @@
CapabilityRequestHandler,
RequirementList,
)
from ..web.status import IngestionStatusCallback
from langchain_text_splitters import RecursiveCharacterTextSplitter

from ..web.status import ingestion_status_callback

batch_update_lock = threading.Lock()


Expand Down Expand Up @@ -90,8 +93,8 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline):
def __init__(
self,
client: WeaviateClient,
dto: IngestionPipelineExecutionDto,
callback: IngestionStatusCallback,
dto: Optional[IngestionPipelineExecutionDto],
callback: ingestion_status_callback,
):
super().__init__()
self.collection = init_lecture_schema(client)
Expand All @@ -116,33 +119,31 @@ def __init__(
def __call__(self) -> bool:
try:
self.callback.in_progress("Deleting old slides from database...")
self.delete_old_lectures()
self.delete_lecture_unit(
self.dto.lecture_unit.course_id,
self.dto.lecture_unit.lecture_id,
self.dto.lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
)
Comment on lines +122 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streamlined __call__ method approved.

The method has been effectively simplified to focus on a single lecture_unit, which aligns with the updated constructor and enhances clarity. The integration of the new callback mechanism is well-handled.

Consider adding error handling for potential None values in dto to prevent runtime errors.

if self.dto is not None:
    # existing code
else:
    logger.error("DTO is None, cannot proceed with ingestion.")
    return False

Also applies to: 131-139

self.callback.done("Old slides removed")
# Here we check if the operation is for updating or for deleting,
# we only check the first file because all the files will have the same operation
if not self.dto.lecture_units[0].to_update:
self.callback.skip("Lecture Chunking and interpretation Skipped")
self.callback.skip("No new slides to update")
return True
self.callback.in_progress("Chunking and interpreting lecture...")
chunks = []
for i, lecture_unit in enumerate(self.dto.lecture_units):
pdf_path = save_pdf(lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
self.callback.done("Lecture Chunking and interpretation Finished")
self.callback.in_progress("Ingesting lecture chunks into database...")
self.batch_update(chunks)
self.callback.done("Lecture Ingestion Finished")
logger.info(
f"Lecture ingestion pipeline finished Successfully for course "
f"{self.dto.lecture_units[0].course_name}"
f"{self.dto.lecture_unit.course_name}"
)
return True
except Exception as e:
Expand Down Expand Up @@ -294,23 +295,27 @@ def get_course_language(self, page_content: str) -> str:
)
return response.contents[0].text_content

def delete_old_lectures(self):
def delete_old_lectures(
self, lecture_units: list[LectureUnitDTO], artemis_base_url: str
):
"""
Delete the lecture unit from the database
"""
try:
for lecture_unit in self.dto.lecture_units:
for lecture_unit in lecture_units:
if self.delete_lecture_unit(
lecture_unit.course_id,
lecture_unit.lecture_id,
lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
artemis_base_url,
):
logger.info("Lecture deleted successfully")
else:
logger.error("Failed to delete lecture")
self.callback.done("Old slides removed")
except Exception as e:
logger.error(f"Error deleting lecture unit: {e}")
self.callback.error("Error while removing old slides")
return False

def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url):
Expand Down
46 changes: 46 additions & 0 deletions app/web/routers/ingestion_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from urllib.parse import unquote

from fastapi import APIRouter, status, Response, Depends
from weaviate.collections.classes.filters import Filter

from app.dependencies import TokenValidator
from ...vector_database.database import VectorDatabase
from ...vector_database.lecture_schema import LectureSchema

router = APIRouter(prefix="/api/v1", tags=["ingestion_status"])


@router.get(
"/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state",
dependencies=[Depends(TokenValidator())],
)
def get_lecture_unit_ingestion_state(
course_id: int, lecture_id: int, lecture_unit_id: int, baseUrl: str
):
db = VectorDatabase()
decoded_base_url = unquote(baseUrl)
result = db.lectures.query.fetch_objects(
filters=(
Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url)
& Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id)
& Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id)
& Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal(
lecture_unit_id
)
),
limit=1,
return_properties=[LectureSchema.LECTURE_UNIT_NAME.value],
)

if len(result.objects) > 0:
return Response(
status_code=status.HTTP_200_OK,
content='"DONE"',
media_type="application/json",
)
else:
return Response(
status_code=status.HTTP_200_OK,
content='"NOT_STARTED"',
media_type="application/json",
)
42 changes: 39 additions & 3 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
from app.domain.ingestion.ingestion_pipeline_execution_dto import (
IngestionPipelineExecutionDto,
)
from ..status.IngestionStatusCallback import IngestionStatusCallback
from ..status.ingestion_status_callback import IngestionStatusCallback
from ..status.lecture_deletion_status_callback import LecturesDeletionStatusCallback
from ...domain.ingestion.deletionPipelineExecutionDto import (
LecturesDeletionExecutionDto,
)
from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline
from ...vector_database.database import VectorDatabase

router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"])


semaphore = Semaphore(5)


Expand All @@ -29,6 +32,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
lecture_unit_id=dto.lecture_unit.lecture_unit_id,
)
db = VectorDatabase()
client = db.get_client()
Expand All @@ -44,14 +48,46 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
semaphore.release()


def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto):
"""
Run the exercise chat pipeline in a separate thread
"""
try:
callback = LecturesDeletionStatusCallback(
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
)
db = VectorDatabase()
client = db.get_client()
pipeline = LectureIngestionPipeline(client=client, dto=None, callback=callback)
pipeline.delete_old_lectures(dto.lecture_units)
except Exception as e:
logger.error(f"Error while deleting lectures: {e}")
logger.error(traceback.format_exc())


@router.post(
"/lectures/fullIngestion",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_webhook(dto: IngestionPipelineExecutionDto):
def lecture_ingestion_webhook(dto: IngestionPipelineExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,))
thread.start()


@router.post(
"/lectures/delete",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,))
thread.start()
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@

class IngestionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
Callback class for updating the status of a Lecture ingestion Pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
self,
run_id: str,
base_url: str,
initial_stages: List[StageDTO] = None,
lecture_unit_id: int = None,
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

Expand All @@ -36,6 +40,6 @@ def __init__(
name="Slides ingestion",
),
]
status = IngestionStatusUpdateDTO(stages=stages)
status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
31 changes: 31 additions & 0 deletions app/web/status/lecture_deletion_status_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List

from .status_update import StatusCallback
from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO
from ...domain.status.stage_state_dto import StageStateEnum
from ...domain.status.stage_dto import StageDTO
import logging

logger = logging.getLogger(__name__)


class LecturesDeletionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

current_stage_index = len(initial_stages) if initial_stages else 0
stages = initial_stages or []
stages += [
StageDTO(
weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal"
),
]
status = IngestionStatusUpdateDTO(stages=stages)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
Loading