HyperSaaS
BackendDocuments & RAG

Ingestion Pipeline

Document parsing, chunking, and embedding via Celery tasks.

The ingestion pipeline runs as Celery tasks, processing documents asynchronously after upload or URL submission.

File Ingestion

Task: documents.process_document

@shared_task(
    max_retries=1,
    soft_time_limit=300,   # 5 minutes
    time_limit=360,        # 6 minutes hard limit
    acks_late=True,
    reject_on_worker_lost=True,
)
def process_document(document_id, processing_task_id=None):

Pipeline Steps

1. Download from S3 → temp directory


2. Parse document
    ├─ Docling (primary) → structured output with sections, tables, headings
    └─ PyMuPDF (fallback) → plain page-by-page text extraction


3. Chunk text
    ├─ Docling → HierarchicalChunker (max 512 tokens, respects section boundaries)
    └─ PyMuPDF → RecursiveCharacterTextSplitter (2048 chars, 100 overlap)


4. Embed chunks → OpenAI text-embedding-3-small (1536 dimensions)


5. Bulk insert DocumentChunk rows


6. Update Document status → "ready"

Parsing with Docling

Docling is the primary document parser, providing structured output with:

  • Section headings and hierarchy
  • Table extraction
  • Page numbers and bounding boxes
  • Element types (paragraph, heading, list, table)
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
result = converter.convert(file_path)  # Timeout: 120s

If Docling is unavailable or times out, the pipeline falls back to PyMuPDF for basic page-level text extraction.

Chunking Strategy

With Docling (HierarchicalChunker):

  • Respects document structure — never splits mid-table or mid-section
  • Max 512 tokens per chunk
  • Preserves section headings and page numbers in metadata

With PyMuPDF (RecursiveCharacterTextSplitter):

  • Character-based splitting with 100-character overlap
  • Chunk size: 2048 characters
  • Page numbers tracked per chunk

Each chunk produces:

{
    "text": "chunk content...",
    "page_number": 3,
    "section_heading": "Installation Guide",
    "metadata": {"element_type": "paragraph", ...},
    "chunk_index": 0
}

Embedding

Chunks are embedded in batches of 512 using OpenAI's API:

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    dimensions=1536
)
vectors = embeddings.embed_documents(texts)  # Batched

URL Ingestion

Task: documents.process_url_document

@shared_task(
    max_retries=1,
    soft_time_limit=120,   # 2 minutes
    time_limit=180,        # 3 minutes hard limit
)
def process_url_document(document_id, processing_task_id=None):

Web Pages

Uses Trafilatura for article extraction:

from trafilatura import fetch_url, extract

downloaded = fetch_url(url)
text = extract(downloaded, include_tables=True)

Falls back to HTML stripping if Trafilatura yields nothing. Text is chunked with RecursiveCharacterTextSplitter.

YouTube Videos

Extracts transcripts using youtube-transcript-api:

from youtube_transcript_api import YouTubeTranscriptApi

transcript = YouTubeTranscriptApi.get_transcript(video_id)
# Returns: [{"start": 0.0, "text": "Hello...", "duration": 2.5}, ...]

Transcripts are formatted with timestamps ([MM:SS] text) and chunked by token limit while preserving timestamp boundaries. Video title is fetched via the oEmbed endpoint (no API key required).

Error Handling

ScenarioBehavior
Parse failureRetry once after 30s, then mark as failed
Embedding API errorRetry once after 30s, then mark as failed
Worker lost mid-taskTask is rejected and requeued (reject_on_worker_lost)
Soft time limitSoftTimeLimitExceeded raised, cleanup runs
Hard time limitWorker kills the task after 6 minutes

Failed documents store the error in Document.processing_error (truncated to 5000 chars) and can be retried via the /reprocess/ endpoint.

Task Tracking

Each ingestion dispatches a DocumentProcessingTask record:

class DocumentProcessingTask(BaseModel):
    document = models.ForeignKey(Document)
    task_id = models.CharField(unique=True)  # Celery task ID
    status = models.CharField()  # PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED
    error_message = models.TextField(blank=True)

The frontend polls /processing-status/ to track progress without querying Celery directly.

On this page