Bloqueo de Threads

Control de concurrencia para evitar conflictos entre agentes en la misma conversacion

threadbloqueolockconcurrenciaejecucionconflictosherencia

Bloqueo de Threads

El sistema de bloqueo previene que multiples agentes envien mensajes simultaneamente al mismo candidato, evitando confusiones y mensajes duplicados.

Por que es Necesario

Problema sin Bloqueo

Agente A: "Hola! Necesito tus documentos"
Agente B: "Hola! Tienes una evaluacion pendiente"
Agente A: "Por favor sube tu INE"
Agente B: "El test tiene 30 preguntas"
Candidato: "??? Con quien hablo?"

Solucion con Bloqueo

Agente A adquiere bloqueo
Agente A: "Hola! Necesito tus documentos"
Agente A: "Por favor sube tu INE"
Agente B: (mensaje encolado - thread bloqueado)
Agente A libera bloqueo
Agente B: "Ahora tienes una evaluacion pendiente"

Como Funciona

Campos en Thread

Cada thread tiene campos para control de bloqueo:

CampoTipoDescripcion
activeExecutionIdnumberID de la ejecucion que tiene el bloqueo
executionLockedAtdatetimeCuando se adquirio el bloqueo

Ciclo de Vida del Bloqueo

1. Ejecucion inicia
   ↓
2. Intenta enviar mensaje
   ↓
3. Verifica si thread esta libre
   ↓
   ├── SI: Adquiere bloqueo
   │       ↓
   │       Envia mensaje
   │       ↓
   │       (continua con bloqueo)
   │
   └── NO: Mensaje se encola
           ↓
           Espera liberacion
           ↓
           Reintenta

Adquirir Bloqueo

Proceso de Adquisicion

// MessageService.acquireThreadLock()

1. Verificar bloqueo actual
   - Si no hay bloqueo → adquirir
   - Si bloqueo propio → refrescar timestamp
   - Si bloqueo ajeno → verificar si expiro

2. Si expiro (>10 min) → limpiar y adquirir

3. Si no expiro → rechazar

Timeout de Bloqueo

Los bloqueos expiran automaticamente despues de 10 minutos de inactividad.

Proposito: Evitar bloqueos huerfanos si una ejecucion falla sin liberar.

const LOCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutos

// Si el bloqueo es mas viejo que el timeout, se considera invalido
if (lockAge > LOCK_TIMEOUT_MS) {
  // Liberar bloqueo stale
  await releaseThreadLock(threadId, oldExecutionId);
}

Liberar Bloqueo

Liberacion Automatica

El bloqueo se libera automaticamente cuando:

EventoMomento
Ejecucion completaAl finalizar exitosamente
Ejecucion fallaAl detectar error
TimeoutDespues de 10 minutos

Liberacion en Codigo

// AgentGraph.finalizeExecution()

// Release any thread locks held by this execution
const lockedThreads = await threadRepo.find({
  where: { activeExecutionId: executionId }
});

if (lockedThreads.length > 0) {
  await threadRepo.update(
    { activeExecutionId: executionId },
    { activeExecutionId: null, executionLockedAt: null }
  );
}

Herencia de Bloqueo

Cuando Aplica

Cuando un agente padre llama a un hijo con waitForCompletion=true, el hijo puede heredar el bloqueo.

Sin Herencia

Padre tiene bloqueo (exec 100)
  ↓
Padre llama a Hijo (waitForCompletion=false)
  ↓
Hijo (exec 101) intenta enviar
  ↓
Thread bloqueado por exec 100
  ↓
Mensaje encolado

Con Herencia

Padre tiene bloqueo (exec 100)
  ↓
Padre llama a Hijo (waitForCompletion=true)
  ↓
Hijo (exec 101) intenta enviar
  ↓
Verifica: Es hijo de 100? SI
Verifica: canInheritLock? SI
  ↓
Hijo hereda bloqueo
  ↓
Mensaje enviado exitosamente

Configuracion de Herencia

// Al crear ejecucion hijo
eventData: {
  parentExecutionId: execution.id,
  canInheritLock: waitForCompletion  // true si padre espera
}

Verificacion de Herencia

// MessageService.isChildOfExecution()

const childExecution = await executionRepo.findOne({
  where: { id: childExecutionId }
});

const eventData = childExecution.eventData;
const isChild = eventData.parentExecutionId === parentExecutionId;
const canInherit = eventData.canInheritLock === true;

return isChild && canInherit;

Cola de Mensajes

Cuando se Encola

Si una ejecucion intenta enviar pero el thread esta bloqueado por otra:

if (currentLock && currentLock !== executionId && !canInheritLock) {
  // Encolar mensaje para envio posterior
  await agentQueue.scheduleMessageWait({
    executionId,
    threadId,
    message,
    channel,
    // ...
  });

  return { success: true, queued: true };
}

Procesamiento de Cola

Cuando el thread se libera:

  1. Sistema detecta liberacion
  2. Busca mensajes encolados para ese thread
  3. Procesa mensajes en orden FIFO
  4. Cada mensaje adquiere bloqueo temporalmente

Escenarios de Uso

Escenario 1: Agentes Independientes

Thread: Candidato Juan
Agente A: Solicitar documentos (14:00)
Agente B: Enviar evaluacion (14:01)

Resultado:
14:00 - Agente A adquiere bloqueo
14:00 - Agente A envia "Necesito tus documentos"
14:01 - Agente B intenta enviar (bloqueado)
14:01 - Mensaje B encolado
14:02 - Agente A libera bloqueo
14:02 - Mensaje B se envia

Escenario 2: Agentes Encadenados

Thread: Candidato Maria
Coordinador → Recolector (waitForCompletion=true)

Resultado:
14:00 - Coordinador adquiere bloqueo
14:00 - Coordinador: "Iniciamos tu proceso"
14:01 - Coordinador dispara Recolector
14:01 - Recolector hereda bloqueo
14:01 - Recolector: "Necesito tu INE"
14:05 - Recolector espera respuesta
14:30 - Maria sube INE
14:30 - Recolector: "Documento recibido"
14:30 - Recolector termina, devuelve bloqueo
14:30 - Coordinador: "Siguiente paso..."

Escenario 3: Timeout de Bloqueo

Thread: Candidato Pedro
Agente A: Se cuelga durante ejecucion

Resultado:
14:00 - Agente A adquiere bloqueo
14:00 - Agente A envia mensaje
14:01 - Agente A tiene error interno (no libera)
...
14:10 - Bloqueo expira (10 min)
14:11 - Agente B intenta enviar
14:11 - Sistema detecta bloqueo stale
14:11 - Sistema libera bloqueo de A
14:11 - Agente B adquiere bloqueo
14:11 - Agente B envia mensaje

Monitoreo

Verificar Bloqueos Activos

SELECT
  ct.id as threadId,
  ct.activeExecutionId,
  ct.executionLockedAt,
  TIMESTAMPDIFF(MINUTE, ct.executionLockedAt, NOW()) as lockMinutes,
  u.name as candidateName
FROM candidate_thread ct
JOIN user u ON ct.candidateId = u.id
WHERE ct.activeExecutionId IS NOT NULL
ORDER BY ct.executionLockedAt;

Detectar Bloqueos Stale

SELECT
  ct.id as threadId,
  ct.activeExecutionId,
  ct.executionLockedAt,
  TIMESTAMPDIFF(MINUTE, ct.executionLockedAt, NOW()) as lockMinutes
FROM candidate_thread ct
WHERE ct.activeExecutionId IS NOT NULL
  AND ct.executionLockedAt < NOW() - INTERVAL 10 MINUTE;

Limpiar Bloqueos Huerfanos

UPDATE candidate_thread
SET activeExecutionId = NULL, executionLockedAt = NULL
WHERE activeExecutionId IS NOT NULL
  AND executionLockedAt < NOW() - INTERVAL 10 MINUTE;

Debugging

Logs Importantes

[MessageService] Thread 123 locked by execution 100, cannot acquire for 101
[MessageService] Thread 123 lock released by execution 100
[MessageService] Lock on thread 123 is stale (650000ms), clearing...
[MessageService] Execution 101 inheriting lock from parent 100 on thread 123
[AgentGraph] Released locks on 2 thread(s) for execution 100

Problemas Comunes

ProblemaCausaSolucion
Mensaje nunca se enviaBloqueo nunca se liberaVerificar finalizacion de ejecucion
Hijo no puede enviarNo hereda bloqueoUsar waitForCompletion=true
Bloqueo persisteEjecucion fallidaEsperar timeout o limpiar manual
Mensajes desordenadosCola no procesa FIFOVerificar procesamiento de cola

Proximos Pasos

¿No encontraste lo que buscabas?

Nuestro equipo de soporte está listo para ayudarte.

Contactar Soporte