import path from "node:path"; import { parseDocument, isSupportedDocument } from "../parsers/parser-registry.js"; import { chunkDocument, codeChunkingPolicy, documentalChunkingPolicy } from "../process/chunking.js"; import type { EmbeddingProvider } from "../embeddings/provider.js"; import type { VectorStoreClient } from "../vectorstore/client.js"; import type { IngestResult, IngestSourceInput, IngestedChunk } from "../../shared/types/rag.js"; import { buildChunkId, buildDocumentId, buildQdrantPointId, buildSourceId, normalizeDocumentKey } from "../../shared/utils/ids.js"; import { listFilesRecursively } from "../../shared/utils/files.js"; import { env } from "../../config/env.js"; export class IngestService { constructor( private readonly embeddingProvider: EmbeddingProvider, private readonly vectorStore: VectorStoreClient ) {} async ingest(source: IngestSourceInput): Promise { const sourceId = source.sourceId ?? buildSourceId(source.sourceType, source.sourceRef); const discoveredFiles = await this.resolveInputFiles(source); const supportedFiles = discoveredFiles.filter(isSupportedDocument); let documentsProcessed = 0; let chunksStored = 0; for (const filePath of supportedFiles) { const parsed = await parseDocument(filePath); if (!parsed.content.trim()) { continue; } const documentKey = normalizeDocumentKey( source.sourceType === "folder" ? path.relative(path.resolve(source.sourceRef), path.resolve(filePath)) : path.basename(filePath) ); const documentId = buildDocumentId(sourceId, documentKey); const chunkingPolicy = parsed.chunkMode === "codigo" ? codeChunkingPolicy : documentalChunkingPolicy; const chunks = chunkDocument(parsed.title, parsed.content, chunkingPolicy); const embeddings = await this.embeddingProvider.embed(chunks.map((chunk) => chunk.content)); const qdrantChunks: IngestedChunk[] = chunks.map((chunk, index) => { const chunkId = buildChunkId(documentId, chunkingPolicy.mode, chunk.index); return { id: buildQdrantPointId(chunkId), vector: embeddings[index], payload: { chunk_id: chunkId, source_id: sourceId, source_type: source.sourceType, source_ref: source.sourceRef, document_id: documentId, document_key: documentKey, title: parsed.title, mime_type: parsed.mimeType, section_title: chunk.sectionTitle, chunk_mode: chunkingPolicy.mode, chunk_index: chunk.index, start_line: chunk.startLine, end_line: chunk.endLine, content: chunk.content, embedding_provider: this.embeddingProvider.providerName, embedding_model: this.embeddingProvider.modelName, embedding_dimensions: embeddings[index]?.length ?? 0, status: "active", updated_at: new Date().toISOString(), tags: source.tags ?? [] } }; }); await this.vectorStore.upsert(qdrantChunks); documentsProcessed += 1; chunksStored += qdrantChunks.length; } return { accepted: true, source, filesDiscovered: supportedFiles.length, documentsProcessed, chunksStored, collectionName: env.qdrantCollection }; } private async resolveInputFiles(source: IngestSourceInput): Promise { if (source.sourceType === "file") { return [source.readPath ?? source.sourceRef]; } return listFilesRecursively(source.sourceRef); } }