Initial commit
This commit is contained in:
0
gestion_pedidos/services/__init__.py
Normal file
0
gestion_pedidos/services/__init__.py
Normal file
188
gestion_pedidos/services/albaran_processor.py
Normal file
188
gestion_pedidos/services/albaran_processor.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Procesador de albaranes con OCR y vinculación automática
|
||||
"""
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, List
|
||||
from django.utils import timezone
|
||||
from datetime import datetime
|
||||
from .ocr_service import OCRService
|
||||
from ..models import (
|
||||
Proveedor, Albaran, ReferenciaAlbaran,
|
||||
ReferenciaPedidoProveedor, PedidoProveedor
|
||||
)
|
||||
|
||||
|
||||
class AlbaranProcessor:
|
||||
"""Procesa albaranes y los vincula con pedidos pendientes"""
|
||||
|
||||
def __init__(self):
|
||||
self.ocr_service = OCRService()
|
||||
|
||||
def _find_proveedor(self, datos: Dict) -> Optional[Proveedor]:
|
||||
"""Busca el proveedor basándose en los datos del albarán"""
|
||||
nombre = datos.get('proveedor', {}).get('nombre', '').strip()
|
||||
numero = datos.get('proveedor', {}).get('numero', '').strip()
|
||||
|
||||
# Buscar por nombre
|
||||
if nombre:
|
||||
proveedor = Proveedor.objects.filter(
|
||||
nombre__icontains=nombre
|
||||
).first()
|
||||
if proveedor:
|
||||
return proveedor
|
||||
|
||||
# Buscar por número (si se implementa un campo número_proveedor)
|
||||
# Por ahora, retornamos None si no se encuentra
|
||||
return None
|
||||
|
||||
def _parse_fecha(self, fecha_str: str) -> Optional[datetime]:
|
||||
"""Parsea una fecha desde string"""
|
||||
if not fecha_str:
|
||||
return None
|
||||
|
||||
formatos = [
|
||||
'%Y-%m-%d',
|
||||
'%d/%m/%Y',
|
||||
'%d-%m-%Y',
|
||||
'%Y/%m/%d',
|
||||
]
|
||||
|
||||
for fmt in formatos:
|
||||
try:
|
||||
return datetime.strptime(fecha_str, fmt).date()
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def _match_referencias(
|
||||
self,
|
||||
referencias_albaran: List[Dict],
|
||||
proveedor: Proveedor
|
||||
) -> Dict[str, ReferenciaPedidoProveedor]:
|
||||
"""
|
||||
Busca referencias del albarán en pedidos pendientes del proveedor
|
||||
|
||||
Returns:
|
||||
Dict mapping referencia -> ReferenciaPedidoProveedor
|
||||
"""
|
||||
matches = {}
|
||||
|
||||
# Obtener todas las referencias pendientes del proveedor
|
||||
pedidos_pendientes = PedidoProveedor.objects.filter(
|
||||
proveedor=proveedor,
|
||||
estado__in=['pendiente_recepcion', 'parcial']
|
||||
).prefetch_related('referencias')
|
||||
|
||||
for pedido in pedidos_pendientes:
|
||||
for ref_pedido in pedido.referencias.filter(estado__in=['pendiente', 'parcial']):
|
||||
# Buscar coincidencia en el albarán
|
||||
for ref_albaran in referencias_albaran:
|
||||
if ref_albaran['referencia'].strip().upper() == ref_pedido.referencia.strip().upper():
|
||||
if ref_pedido.referencia not in matches:
|
||||
matches[ref_pedido.referencia] = ref_pedido
|
||||
break
|
||||
|
||||
return matches
|
||||
|
||||
def _match_and_update_referencias(self, albaran: Albaran):
|
||||
"""Vincula y actualiza referencias del albarán con pedidos pendientes"""
|
||||
if not albaran.proveedor:
|
||||
return
|
||||
|
||||
referencias_albaran = albaran.referencias.all()
|
||||
matches = self._match_referencias(
|
||||
[{'referencia': ref.referencia} for ref in referencias_albaran],
|
||||
albaran.proveedor
|
||||
)
|
||||
|
||||
for ref_albaran in referencias_albaran:
|
||||
ref_pedido_proveedor = matches.get(ref_albaran.referencia.strip().upper())
|
||||
|
||||
if ref_pedido_proveedor:
|
||||
ref_albaran.referencia_pedido_proveedor = ref_pedido_proveedor
|
||||
ref_albaran.save()
|
||||
|
||||
# Actualizar pedido proveedor
|
||||
ref_pedido_proveedor.unidades_recibidas += ref_albaran.unidades
|
||||
|
||||
if ref_pedido_proveedor.unidades_recibidas >= ref_pedido_proveedor.unidades_pedidas:
|
||||
ref_pedido_proveedor.estado = 'recibido'
|
||||
elif ref_pedido_proveedor.unidades_recibidas > 0:
|
||||
ref_pedido_proveedor.estado = 'parcial'
|
||||
|
||||
ref_pedido_proveedor.save()
|
||||
|
||||
# Actualizar referencia pedido cliente
|
||||
if ref_pedido_proveedor.referencia_pedido_cliente:
|
||||
ref_cliente = ref_pedido_proveedor.referencia_pedido_cliente
|
||||
ref_cliente.unidades_en_stock += ref_albaran.unidades
|
||||
ref_cliente.save()
|
||||
|
||||
def process_albaran_file(self, file_path: Path) -> Albaran:
|
||||
"""
|
||||
Procesa un archivo de albarán (imagen o PDF)
|
||||
|
||||
Returns:
|
||||
Albaran creado o actualizado
|
||||
"""
|
||||
# Procesar con OCR
|
||||
datos = self.ocr_service.process_albaran(file_path)
|
||||
|
||||
# Buscar proveedor
|
||||
proveedor = self._find_proveedor(datos)
|
||||
|
||||
# Parsear fecha
|
||||
fecha_albaran = self._parse_fecha(datos.get('fecha_albaran', ''))
|
||||
|
||||
# Crear albarán
|
||||
albaran = Albaran.objects.create(
|
||||
proveedor=proveedor,
|
||||
numero_albaran=datos.get('numero_albaran', '').strip() or None,
|
||||
fecha_albaran=fecha_albaran,
|
||||
archivo_path=str(file_path),
|
||||
estado_procesado='procesado' if proveedor else 'clasificacion',
|
||||
fecha_procesado=timezone.now(),
|
||||
datos_ocr=datos,
|
||||
)
|
||||
|
||||
# Crear referencias del albarán
|
||||
referencias_data = datos.get('referencias', [])
|
||||
matches = {}
|
||||
|
||||
if proveedor:
|
||||
matches = self._match_referencias(referencias_data, proveedor)
|
||||
|
||||
for ref_data in referencias_data:
|
||||
ref_pedido_proveedor = matches.get(ref_data['referencia'].strip().upper())
|
||||
|
||||
referencia = ReferenciaAlbaran.objects.create(
|
||||
albaran=albaran,
|
||||
referencia=ref_data.get('referencia', '').strip(),
|
||||
denominacion=ref_data.get('denominacion', '').strip(),
|
||||
unidades=int(ref_data.get('unidades', 1)),
|
||||
precio_unitario=float(ref_data.get('precio_unitario', 0)),
|
||||
impuesto_tipo=ref_data.get('impuesto_tipo', '21'),
|
||||
impuesto_valor=float(ref_data.get('impuesto_valor', 0)),
|
||||
referencia_pedido_proveedor=ref_pedido_proveedor,
|
||||
)
|
||||
|
||||
# Actualizar pedido proveedor si hay match
|
||||
if ref_pedido_proveedor:
|
||||
ref_pedido_proveedor.unidades_recibidas += referencia.unidades
|
||||
|
||||
# Actualizar estado
|
||||
if ref_pedido_proveedor.unidades_recibidas >= ref_pedido_proveedor.unidades_pedidas:
|
||||
ref_pedido_proveedor.estado = 'recibido'
|
||||
elif ref_pedido_proveedor.unidades_recibidas > 0:
|
||||
ref_pedido_proveedor.estado = 'parcial'
|
||||
|
||||
ref_pedido_proveedor.save()
|
||||
|
||||
# Actualizar referencia pedido cliente
|
||||
if ref_pedido_proveedor.referencia_pedido_cliente:
|
||||
ref_cliente = ref_pedido_proveedor.referencia_pedido_cliente
|
||||
ref_cliente.unidades_en_stock += referencia.unidades
|
||||
ref_cliente.save()
|
||||
|
||||
return albaran
|
||||
104
gestion_pedidos/services/email_parser.py
Normal file
104
gestion_pedidos/services/email_parser.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""
|
||||
Parser para emails de confirmación de pedidos a proveedores
|
||||
"""
|
||||
import email
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
import re
|
||||
|
||||
|
||||
class EmailPedidoParser:
|
||||
"""Parser para extraer información de emails de confirmación de pedidos"""
|
||||
|
||||
def parse_email_file(self, email_path: Path) -> Dict:
|
||||
"""
|
||||
Parsea un archivo de email (.eml)
|
||||
|
||||
Returns:
|
||||
Dict con información del pedido
|
||||
"""
|
||||
with open(email_path, 'rb') as f:
|
||||
msg = email.message_from_bytes(f.read())
|
||||
|
||||
# Extraer información básica
|
||||
subject = msg.get('Subject', '')
|
||||
from_addr = msg.get('From', '')
|
||||
date = msg.get('Date', '')
|
||||
|
||||
# Extraer cuerpo del email
|
||||
body = self._get_email_body(msg)
|
||||
|
||||
# Buscar número de pedido
|
||||
numero_pedido = self._extract_numero_pedido(subject, body)
|
||||
|
||||
# Buscar referencias en el cuerpo
|
||||
referencias = self._extract_referencias(body)
|
||||
|
||||
return {
|
||||
'numero_pedido': numero_pedido,
|
||||
'proveedor_email': from_addr,
|
||||
'fecha': date,
|
||||
'asunto': subject,
|
||||
'cuerpo': body,
|
||||
'referencias': referencias,
|
||||
}
|
||||
|
||||
def _get_email_body(self, msg: email.message.Message) -> str:
|
||||
"""Extrae el cuerpo del email"""
|
||||
body = ""
|
||||
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
if content_type == "text/plain":
|
||||
try:
|
||||
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||||
break
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||||
except:
|
||||
pass
|
||||
|
||||
return body
|
||||
|
||||
def _extract_numero_pedido(self, subject: str, body: str) -> Optional[str]:
|
||||
"""Extrae el número de pedido del asunto o cuerpo"""
|
||||
# Patrones comunes
|
||||
patterns = [
|
||||
r'pedido[:\s]+([A-Z0-9\-]+)',
|
||||
r'pedido[:\s]+#?(\d+)',
|
||||
r'ref[:\s]+([A-Z0-9\-]+)',
|
||||
r'order[:\s]+([A-Z0-9\-]+)',
|
||||
]
|
||||
|
||||
text = f"{subject} {body}".lower()
|
||||
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, text, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
|
||||
return None
|
||||
|
||||
def _extract_referencias(self, body: str) -> List[Dict]:
|
||||
"""Extrae referencias del cuerpo del email"""
|
||||
referencias = []
|
||||
|
||||
# Buscar líneas que parezcan referencias
|
||||
# Formato común: REF123 - Descripción - Cantidad
|
||||
pattern = r'([A-Z0-9\-]+)\s*[-–]\s*([^-\n]+?)\s*[-–]\s*(\d+)'
|
||||
|
||||
matches = re.finditer(pattern, body, re.IGNORECASE | re.MULTILINE)
|
||||
|
||||
for match in matches:
|
||||
referencias.append({
|
||||
'referencia': match.group(1).strip(),
|
||||
'denominacion': match.group(2).strip(),
|
||||
'unidades': int(match.group(3)),
|
||||
})
|
||||
|
||||
return referencias
|
||||
|
||||
113
gestion_pedidos/services/file_watcher.py
Normal file
113
gestion_pedidos/services/file_watcher.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""
|
||||
Servicio para monitorear carpetas y procesar nuevos archivos
|
||||
"""
|
||||
import time
|
||||
from pathlib import Path
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from django.conf import settings
|
||||
from .pdf_parser import PDFPedidoParser
|
||||
from .albaran_processor import AlbaranProcessor
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PDFPedidoHandler(FileSystemEventHandler):
|
||||
"""Handler para procesar nuevos PDFs de pedidos de cliente"""
|
||||
|
||||
def __init__(self):
|
||||
self.parser = PDFPedidoParser()
|
||||
self.procesados = set()
|
||||
|
||||
def on_created(self, event):
|
||||
if event.is_directory:
|
||||
return
|
||||
|
||||
file_path = Path(event.src_path)
|
||||
|
||||
# Solo procesar PDFs
|
||||
if file_path.suffix.lower() != '.pdf':
|
||||
return
|
||||
|
||||
# Evitar procesar el mismo archivo múltiples veces
|
||||
if str(file_path) in self.procesados:
|
||||
return
|
||||
|
||||
# Esperar un poco para asegurar que el archivo esté completamente escrito
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
logger.info(f"Procesando nuevo PDF: {file_path}")
|
||||
pedido = self.parser.create_pedido_from_pdf(file_path)
|
||||
logger.info(f"Pedido creado exitosamente: {pedido.numero_pedido}")
|
||||
self.procesados.add(str(file_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Error al procesar PDF {file_path}: {e}")
|
||||
|
||||
|
||||
class AlbaranHandler(FileSystemEventHandler):
|
||||
"""Handler para procesar nuevos albaranes"""
|
||||
|
||||
def __init__(self):
|
||||
self.processor = AlbaranProcessor()
|
||||
self.procesados = set()
|
||||
|
||||
def on_created(self, event):
|
||||
if event.is_directory:
|
||||
return
|
||||
|
||||
file_path = Path(event.src_path)
|
||||
|
||||
# Procesar imágenes y PDFs
|
||||
if file_path.suffix.lower() not in ['.pdf', '.jpg', '.jpeg', '.png']:
|
||||
return
|
||||
|
||||
if str(file_path) in self.procesados:
|
||||
return
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
logger.info(f"Procesando nuevo albarán: {file_path}")
|
||||
albaran = self.processor.process_albaran_file(file_path)
|
||||
logger.info(f"Albarán procesado: {albaran.id}")
|
||||
self.procesados.add(str(file_path))
|
||||
except Exception as e:
|
||||
logger.error(f"Error al procesar albarán {file_path}: {e}")
|
||||
|
||||
|
||||
class FileWatcherService:
|
||||
"""Servicio para monitorear carpetas"""
|
||||
|
||||
def __init__(self):
|
||||
self.observer = Observer()
|
||||
self.running = False
|
||||
|
||||
def start(self):
|
||||
"""Inicia el monitoreo de carpetas"""
|
||||
if self.running:
|
||||
return
|
||||
|
||||
# Monitorear carpeta de pedidos de cliente
|
||||
pedidos_dir = settings.PEDIDOS_CLIENTES_PDF_DIR
|
||||
pedidos_handler = PDFPedidoHandler()
|
||||
self.observer.schedule(pedidos_handler, str(pedidos_dir), recursive=False)
|
||||
|
||||
# Monitorear carpeta de albaranes
|
||||
albaranes_dir = settings.ALBARANES_ESCANEADOS_DIR
|
||||
albaranes_handler = AlbaranHandler()
|
||||
self.observer.schedule(albaranes_handler, str(albaranes_dir), recursive=False)
|
||||
|
||||
self.observer.start()
|
||||
self.running = True
|
||||
logger.info("File watcher iniciado")
|
||||
|
||||
def stop(self):
|
||||
"""Detiene el monitoreo"""
|
||||
if self.running:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
self.running = False
|
||||
logger.info("File watcher detenido")
|
||||
|
||||
196
gestion_pedidos/services/ocr_service.py
Normal file
196
gestion_pedidos/services/ocr_service.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""
|
||||
Servicio de OCR usando GPT-4 Vision API
|
||||
"""
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from openai import OpenAI
|
||||
from django.conf import settings
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
|
||||
class OCRService:
|
||||
"""Servicio para procesar imágenes y PDFs con GPT-4 Vision"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = OpenAI(api_key=settings.OPENAI_API_KEY)
|
||||
|
||||
def _encode_image(self, image_path: Path) -> str:
|
||||
"""Codifica una imagen en base64"""
|
||||
with open(image_path, "rb") as image_file:
|
||||
return base64.b64encode(image_file.read()).decode('utf-8')
|
||||
|
||||
def _encode_pil_image(self, image: Image.Image) -> str:
|
||||
"""Codifica una imagen PIL en base64"""
|
||||
buffered = io.BytesIO()
|
||||
image.save(buffered, format="PNG")
|
||||
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
||||
|
||||
def process_pdf_pedido_cliente(self, pdf_path: Path) -> Dict:
|
||||
"""
|
||||
Procesa un PDF de pedido de cliente y extrae la información
|
||||
|
||||
Returns:
|
||||
Dict con: numero_pedido, matricula, fecha_cita, referencias (lista)
|
||||
"""
|
||||
from pdf2image import convert_from_path
|
||||
|
||||
# Convertir PDF a imágenes
|
||||
images = convert_from_path(pdf_path, dpi=200)
|
||||
|
||||
if not images:
|
||||
raise ValueError("No se pudieron extraer imágenes del PDF")
|
||||
|
||||
# Procesar la primera página (o todas si es necesario)
|
||||
image = images[0]
|
||||
base64_image = self._encode_pil_image(image)
|
||||
|
||||
prompt = """
|
||||
Analiza este documento de pedido de cliente de recambios. Extrae la siguiente información en formato JSON:
|
||||
|
||||
{
|
||||
"numero_pedido": "número único del pedido",
|
||||
"matricula": "matrícula del vehículo",
|
||||
"fecha_cita": "fecha de la cita en formato YYYY-MM-DD o YYYY-MM-DD HH:MM",
|
||||
"referencias": [
|
||||
{
|
||||
"referencia": "código de la referencia",
|
||||
"denominacion": "descripción/nombre de la pieza",
|
||||
"unidades": número de unidades
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Si algún campo no está disponible, usa null. La fecha debe estar en formato ISO si es posible.
|
||||
"""
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
model="gpt-4o",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{base64_image}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
max_tokens=2000
|
||||
)
|
||||
|
||||
content = response.choices[0].message.content
|
||||
|
||||
# Extraer JSON de la respuesta
|
||||
try:
|
||||
# Buscar JSON en la respuesta
|
||||
json_start = content.find('{')
|
||||
json_end = content.rfind('}') + 1
|
||||
if json_start != -1 and json_end > json_start:
|
||||
json_str = content[json_start:json_end]
|
||||
return json.loads(json_str)
|
||||
else:
|
||||
raise ValueError("No se encontró JSON en la respuesta")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Error al parsear JSON: {e}. Respuesta: {content}")
|
||||
|
||||
def process_albaran(self, image_path: Path) -> Dict:
|
||||
"""
|
||||
Procesa un albarán y extrae la información
|
||||
|
||||
Returns:
|
||||
Dict con: proveedor (nombre o número), numero_albaran, fecha_albaran,
|
||||
referencias (lista con precios e impuestos)
|
||||
"""
|
||||
base64_image = self._encode_image(image_path)
|
||||
|
||||
prompt = """
|
||||
Analiza este albarán de proveedor. Extrae la siguiente información en formato JSON:
|
||||
|
||||
{
|
||||
"proveedor": {
|
||||
"nombre": "nombre del proveedor",
|
||||
"numero": "número de proveedor si está visible"
|
||||
},
|
||||
"numero_albaran": "número del albarán",
|
||||
"fecha_albaran": "fecha del albarán en formato YYYY-MM-DD",
|
||||
"referencias": [
|
||||
{
|
||||
"referencia": "código de la referencia",
|
||||
"denominacion": "descripción de la pieza",
|
||||
"unidades": número de unidades,
|
||||
"precio_unitario": precio por unidad (número decimal),
|
||||
"impuesto_tipo": "21", "10", "7", "4", "3" o "0" según el porcentaje de IVA,
|
||||
"impuesto_valor": valor del impuesto (número decimal)
|
||||
}
|
||||
],
|
||||
"totales": {
|
||||
"base_imponible": total base imponible,
|
||||
"iva_21": total IVA al 21%,
|
||||
"iva_10": total IVA al 10%,
|
||||
"iva_7": total IVA al 7%,
|
||||
"iva_4": total IVA al 4%,
|
||||
"iva_3": total IVA al 3%,
|
||||
"total": total general
|
||||
}
|
||||
}
|
||||
|
||||
IMPORTANTE:
|
||||
- Si hay múltiples tipos de impuestos, identifica qué referencias tienen cada tipo
|
||||
- Si el impuesto no está claro por referencia, intenta calcularlo basándote en los totales
|
||||
- Si algún campo no está disponible, usa null
|
||||
"""
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
model="gpt-4o",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{base64_image}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
max_tokens=3000
|
||||
)
|
||||
|
||||
content = response.choices[0].message.content
|
||||
|
||||
try:
|
||||
json_start = content.find('{')
|
||||
json_end = content.rfind('}') + 1
|
||||
if json_start != -1 and json_end > json_start:
|
||||
json_str = content[json_start:json_end]
|
||||
return json.loads(json_str)
|
||||
else:
|
||||
raise ValueError("No se encontró JSON en la respuesta")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Error al parsear JSON: {e}. Respuesta: {content}")
|
||||
|
||||
def process_image(self, image_path: Path, tipo: str = 'albaran') -> Dict:
|
||||
"""
|
||||
Procesa una imagen (albarán o pedido)
|
||||
|
||||
Args:
|
||||
image_path: Ruta a la imagen
|
||||
tipo: 'albaran' o 'pedido_cliente'
|
||||
"""
|
||||
if tipo == 'albaran':
|
||||
return self.process_albaran(image_path)
|
||||
elif tipo == 'pedido_cliente':
|
||||
return self.process_pdf_pedido_cliente(image_path)
|
||||
else:
|
||||
raise ValueError(f"Tipo no soportado: {tipo}")
|
||||
|
||||
95
gestion_pedidos/services/pdf_parser.py
Normal file
95
gestion_pedidos/services/pdf_parser.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
Servicio para procesar PDFs de pedidos de cliente
|
||||
"""
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
from django.utils import timezone
|
||||
from datetime import datetime
|
||||
from .ocr_service import OCRService
|
||||
from ..models import Cliente, PedidoCliente, ReferenciaPedidoCliente
|
||||
|
||||
|
||||
class PDFPedidoParser:
|
||||
"""Parser para procesar PDFs de pedidos de cliente"""
|
||||
|
||||
def __init__(self):
|
||||
self.ocr_service = OCRService()
|
||||
|
||||
def parse_pdf(self, pdf_path: Path) -> Dict:
|
||||
"""
|
||||
Parsea un PDF de pedido de cliente
|
||||
|
||||
Returns:
|
||||
Dict con los datos extraídos
|
||||
"""
|
||||
return self.ocr_service.process_pdf_pedido_cliente(pdf_path)
|
||||
|
||||
def create_pedido_from_pdf(self, pdf_path: Path) -> PedidoCliente:
|
||||
"""
|
||||
Crea un PedidoCliente y sus referencias desde un PDF
|
||||
|
||||
Returns:
|
||||
PedidoCliente creado
|
||||
"""
|
||||
# Parsear PDF
|
||||
datos = self.parse_pdf(pdf_path)
|
||||
|
||||
# Obtener o crear cliente
|
||||
matricula = datos.get('matricula', '').strip().upper()
|
||||
if not matricula:
|
||||
raise ValueError("No se pudo extraer la matrícula del PDF")
|
||||
|
||||
cliente, _ = Cliente.objects.get_or_create(
|
||||
matricula_vehiculo=matricula,
|
||||
defaults={
|
||||
'nombre': datos.get('nombre_cliente', f'Cliente {matricula}'),
|
||||
}
|
||||
)
|
||||
|
||||
# Parsear fecha de cita
|
||||
fecha_cita = None
|
||||
fecha_cita_str = datos.get('fecha_cita')
|
||||
if fecha_cita_str:
|
||||
try:
|
||||
# Intentar diferentes formatos
|
||||
for fmt in ['%Y-%m-%d %H:%M', '%Y-%m-%d', '%d/%m/%Y %H:%M', '%d/%m/%Y']:
|
||||
try:
|
||||
fecha_cita = datetime.strptime(fecha_cita_str, fmt)
|
||||
fecha_cita = timezone.make_aware(fecha_cita)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Crear pedido
|
||||
numero_pedido = datos.get('numero_pedido', '').strip()
|
||||
if not numero_pedido:
|
||||
raise ValueError("No se pudo extraer el número de pedido del PDF")
|
||||
|
||||
# Verificar si ya existe
|
||||
if PedidoCliente.objects.filter(numero_pedido=numero_pedido).exists():
|
||||
raise ValueError(f"El pedido {numero_pedido} ya existe")
|
||||
|
||||
pedido = PedidoCliente.objects.create(
|
||||
numero_pedido=numero_pedido,
|
||||
cliente=cliente,
|
||||
fecha_cita=fecha_cita,
|
||||
estado='pendiente_revision',
|
||||
archivo_pdf_path=str(pdf_path),
|
||||
)
|
||||
|
||||
# Crear referencias
|
||||
referencias_data = datos.get('referencias', [])
|
||||
for ref_data in referencias_data:
|
||||
ReferenciaPedidoCliente.objects.create(
|
||||
pedido_cliente=pedido,
|
||||
referencia=ref_data.get('referencia', '').strip(),
|
||||
denominacion=ref_data.get('denominacion', '').strip(),
|
||||
unidades_solicitadas=int(ref_data.get('unidades', 1)),
|
||||
unidades_en_stock=0,
|
||||
)
|
||||
|
||||
return pedido
|
||||
|
||||
Reference in New Issue
Block a user