flow-engine¶
FSM por conversación. Ejecuta steps de un flow, persiste estado en DynamoDB, emite eventos de dominio.
Endpoints¶
API GW HTTP https://xkj1y85c7c.execute-api.us-east-2.amazonaws.com:
| Método | Path | Auth | Uso |
|---|---|---|---|
| POST | /v1/flows/:id/run |
JWT | invoca un flow (tests, internal) |
| GET | /v1/flows/:id/state/:conversationId |
JWT | inspecciona estado |
| GET | /v1/admin/flows |
JWT admin | listado |
| POST | /v1/admin/flows |
JWT admin | crea flow (borrador) |
| POST | /v1/admin/flows/:id/publish |
JWT admin | publica versión |
Invocación desde inbound-router es interna (Lambda-to-Lambda invoke).
Schema de un flow (v1)¶
{
"id": "flow-bienvenida",
"tenantId": "iplacex-demo",
"version": 3,
"initialStep": "saludo",
"context": {
"user.firstName": { "type": "string" },
"user.email": { "type": "string" }
},
"steps": {
"saludo": {
"type": "text",
"text": "Hola {{ context.user.firstName }}, ¿en qué te ayudo?",
"next": "router"
},
"router": {
"type": "condition",
"branches": [
{ "if": "context.intent == 'cursos'", "next": "list-cursos" },
{ "if": "context.intent == 'agente'", "next": "handoff" }
],
"default": "fallback"
},
"list-cursos": { "type": "http", "endpoint": "...", "next": "saludo" },
"handoff": { "type": "handoff", "channel": "five9-digital" },
"fallback": { "type": "text", "text": "No entendí, prueba otra vez.", "next": "saludo" }
}
}
Validación con Zod en packages/flow-schema.
Step types¶
| Tipo | Side effects | Notas |
|---|---|---|
text |
publica Conversation.MessageDelivered |
Soporta plantillas Handlebars sobre context. |
condition |
ninguno | Expresiones JsonLogic-like; no eval JS. |
set-context |
ninguno | Setea variables. Validadas contra schema. |
handoff |
publica Conversation.HandedOff |
Termina FSM bot, abre sesión humana. |
encuesta |
publica Survey.Triggered |
Engancha al encuesta-service. |
http |
llamada externa con timeout 8s + 2 retries | Body parametrizable; errores → branch onError. |
llm |
invoca Bedrock / OpenAI | Prompts en packages/prompts/. Rate-limit por tenant. |
Persistencia de estado¶
DDB zen-dev-flow-state:
| Atributo | Tipo | Notas |
|---|---|---|
pk |
S | TENANT#<tenantId>#CONV#<conversationId> |
sk |
S | STATE#current o STATE#vN (history) |
flowId |
S | |
flowVersion |
N | |
currentStep |
S | |
context |
M | variables del usuario |
revision |
N | optimistic concurrency |
updatedAt |
S | ISO |
ttl |
N | 90 días tras último cambio |
GSI GSI1 por tenantId para listados administrativos.
Optimistic concurrency¶
Cada PutItem lleva ConditionExpression: revision = :expectedRevision.
Si dos invocaciones concurrentes intentan avanzar el mismo estado, la
segunda recibe ConditionalCheckFailedException y reintenta releyendo.
Eventos publicados¶
Conversation.StartedConversation.MessageDeliveredConversation.HandedOffConversation.ClosedFlow.StepFailed(para observabilidad)
Detalle de payloads en Eventos y webhooks.
Cómo agregar un step type nuevo¶
- TDD primero. Agrega test unit con fixture del nuevo tipo en
services/flow-engine/tests/unit/steps/<tipo>.test.ts. - Define el schema Zod en
packages/flow-schema/src/steps/<tipo>.ts. - Implementa el executor en
services/flow-engine/src/domain/steps/<tipo>.ts. Hexagonal: la lógica debe ser pura. Side-effects vía port → adapter. - Registra el executor en el dispatcher
services/flow-engine/src/domain/dispatcher.ts. - Agrega fixture al parity gate si aplica reemplazar un legacy step.
- ADR si la decisión es no-trivial (ej. nuevos eventos, nueva dependencia externa).
Transpiler legacy → v2¶
packages/flow-transpiler convierte JSON de flows legacy V1 al schema
v2. Cubre 21 flows reales. Cualquier step no soportado → fail explícito
con mensaje útil.
Ver Parity Gate.