Ingestion Pipeline
Document parsing, chunking, and embedding via Celery tasks.
The ingestion pipeline runs as Celery tasks, processing documents asynchronously after upload or URL submission.
File Ingestion
Task: documents.process_document
@shared_task(
max_retries=1,
soft_time_limit=300, # 5 minutes
time_limit=360, # 6 minutes hard limit
acks_late=True,
reject_on_worker_lost=True,
)
def process_document(document_id, processing_task_id=None):Pipeline Steps
1. Download from S3 → temp directory
│
▼
2. Parse document
├─ Docling (primary) → structured output with sections, tables, headings
└─ PyMuPDF (fallback) → plain page-by-page text extraction
│
▼
3. Chunk text
├─ Docling → HierarchicalChunker (max 512 tokens, respects section boundaries)
└─ PyMuPDF → RecursiveCharacterTextSplitter (2048 chars, 100 overlap)
│
▼
4. Embed chunks → OpenAI text-embedding-3-small (1536 dimensions)
│
▼
5. Bulk insert DocumentChunk rows
│
▼
6. Update Document status → "ready"Parsing with Docling
Docling is the primary document parser, providing structured output with:
- Section headings and hierarchy
- Table extraction
- Page numbers and bounding boxes
- Element types (paragraph, heading, list, table)
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
result = converter.convert(file_path) # Timeout: 120sIf Docling is unavailable or times out, the pipeline falls back to PyMuPDF for basic page-level text extraction.
Chunking Strategy
With Docling (HierarchicalChunker):
- Respects document structure — never splits mid-table or mid-section
- Max 512 tokens per chunk
- Preserves section headings and page numbers in metadata
With PyMuPDF (RecursiveCharacterTextSplitter):
- Character-based splitting with 100-character overlap
- Chunk size: 2048 characters
- Page numbers tracked per chunk
Each chunk produces:
{
"text": "chunk content...",
"page_number": 3,
"section_heading": "Installation Guide",
"metadata": {"element_type": "paragraph", ...},
"chunk_index": 0
}Embedding
Chunks are embedded in batches of 512 using OpenAI's API:
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=1536
)
vectors = embeddings.embed_documents(texts) # BatchedURL Ingestion
Task: documents.process_url_document
@shared_task(
max_retries=1,
soft_time_limit=120, # 2 minutes
time_limit=180, # 3 minutes hard limit
)
def process_url_document(document_id, processing_task_id=None):Web Pages
Uses Trafilatura for article extraction:
from trafilatura import fetch_url, extract
downloaded = fetch_url(url)
text = extract(downloaded, include_tables=True)Falls back to HTML stripping if Trafilatura yields nothing. Text is chunked with RecursiveCharacterTextSplitter.
YouTube Videos
Extracts transcripts using youtube-transcript-api:
from youtube_transcript_api import YouTubeTranscriptApi
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# Returns: [{"start": 0.0, "text": "Hello...", "duration": 2.5}, ...]Transcripts are formatted with timestamps ([MM:SS] text) and chunked by token limit while preserving timestamp boundaries. Video title is fetched via the oEmbed endpoint (no API key required).
Error Handling
| Scenario | Behavior |
|---|---|
| Parse failure | Retry once after 30s, then mark as failed |
| Embedding API error | Retry once after 30s, then mark as failed |
| Worker lost mid-task | Task is rejected and requeued (reject_on_worker_lost) |
| Soft time limit | SoftTimeLimitExceeded raised, cleanup runs |
| Hard time limit | Worker kills the task after 6 minutes |
Failed documents store the error in Document.processing_error (truncated to 5000 chars) and can be retried via the /reprocess/ endpoint.
Task Tracking
Each ingestion dispatches a DocumentProcessingTask record:
class DocumentProcessingTask(BaseModel):
document = models.ForeignKey(Document)
task_id = models.CharField(unique=True) # Celery task ID
status = models.CharField() # PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED
error_message = models.TextField(blank=True)The frontend polls /processing-status/ to track progress without querying Celery directly.