Saltar a contenido

flow-engine

FSM por conversación. Ejecuta steps de un flow, persiste estado en DynamoDB, emite eventos de dominio.

Endpoints

API GW HTTP https://xkj1y85c7c.execute-api.us-east-2.amazonaws.com:

Método Path Auth Uso
POST /v1/flows/:id/run JWT invoca un flow (tests, internal)
GET /v1/flows/:id/state/:conversationId JWT inspecciona estado
GET /v1/admin/flows JWT admin listado
POST /v1/admin/flows JWT admin crea flow (borrador)
POST /v1/admin/flows/:id/publish JWT admin publica versión

Invocación desde inbound-router es interna (Lambda-to-Lambda invoke).

Schema de un flow (v1)

{
  "id": "flow-bienvenida",
  "tenantId": "iplacex-demo",
  "version": 3,
  "initialStep": "saludo",
  "context": {
    "user.firstName": { "type": "string" },
    "user.email":     { "type": "string" }
  },
  "steps": {
    "saludo": {
      "type": "text",
      "text": "Hola {{ context.user.firstName }}, ¿en qué te ayudo?",
      "next": "router"
    },
    "router": {
      "type": "condition",
      "branches": [
        { "if": "context.intent == 'cursos'", "next": "list-cursos" },
        { "if": "context.intent == 'agente'", "next": "handoff" }
      ],
      "default": "fallback"
    },
    "list-cursos": { "type": "http", "endpoint": "...", "next": "saludo" },
    "handoff":     { "type": "handoff", "channel": "five9-digital" },
    "fallback":    { "type": "text", "text": "No entendí, prueba otra vez.", "next": "saludo" }
  }
}

Validación con Zod en packages/flow-schema.

Step types

Tipo Side effects Notas
text publica Conversation.MessageDelivered Soporta plantillas Handlebars sobre context.
condition ninguno Expresiones JsonLogic-like; no eval JS.
set-context ninguno Setea variables. Validadas contra schema.
handoff publica Conversation.HandedOff Termina FSM bot, abre sesión humana.
encuesta publica Survey.Triggered Engancha al encuesta-service.
http llamada externa con timeout 8s + 2 retries Body parametrizable; errores → branch onError.
llm invoca Bedrock / OpenAI Prompts en packages/prompts/. Rate-limit por tenant.

Persistencia de estado

DDB zen-dev-flow-state:

Atributo Tipo Notas
pk S TENANT#<tenantId>#CONV#<conversationId>
sk S STATE#current o STATE#vN (history)
flowId S
flowVersion N
currentStep S
context M variables del usuario
revision N optimistic concurrency
updatedAt S ISO
ttl N 90 días tras último cambio

GSI GSI1 por tenantId para listados administrativos.

Optimistic concurrency

Cada PutItem lleva ConditionExpression: revision = :expectedRevision. Si dos invocaciones concurrentes intentan avanzar el mismo estado, la segunda recibe ConditionalCheckFailedException y reintenta releyendo.

Eventos publicados

  • Conversation.Started
  • Conversation.MessageDelivered
  • Conversation.HandedOff
  • Conversation.Closed
  • Flow.StepFailed (para observabilidad)

Detalle de payloads en Eventos y webhooks.

Cómo agregar un step type nuevo

  1. TDD primero. Agrega test unit con fixture del nuevo tipo en services/flow-engine/tests/unit/steps/<tipo>.test.ts.
  2. Define el schema Zod en packages/flow-schema/src/steps/<tipo>.ts.
  3. Implementa el executor en services/flow-engine/src/domain/steps/<tipo>.ts. Hexagonal: la lógica debe ser pura. Side-effects vía port → adapter.
  4. Registra el executor en el dispatcher services/flow-engine/src/domain/dispatcher.ts.
  5. Agrega fixture al parity gate si aplica reemplazar un legacy step.
  6. ADR si la decisión es no-trivial (ej. nuevos eventos, nueva dependencia externa).

Transpiler legacy → v2

packages/flow-transpiler convierte JSON de flows legacy V1 al schema v2. Cubre 21 flows reales. Cualquier step no soportado → fail explícito con mensaje útil.

Ver Parity Gate.