From 947b0bd1ee4330a77eff787a5ee29c8e6a581a38 Mon Sep 17 00:00:00 2001 From: Lucas Tettamanti <757326+lkzwieder@users.noreply.github.com> Date: Fri, 2 Jan 2026 00:21:46 -0300 Subject: [PATCH] skeleton of the core app --- .../20260102021529_wa_identity_map.sql | 12 + db/migrations/20260102021640_tenants.sql | 13 - .../20260102031918_tenant_channels.sql | 2 +- docker-compose.yaml | 2 - index.js | 228 ++++++-------- package-lock.json | 160 ++++++++++ package.json | 2 + src/db/pool.js | 15 + src/db/repo.js | 287 ++++++++++++++++++ src/services/openai.js | 2 +- src/services/pipeline.js | 149 +++++++++ src/services/sse.js | 15 + 12 files changed, 735 insertions(+), 152 deletions(-) delete mode 100644 db/migrations/20260102021640_tenants.sql create mode 100644 src/db/pool.js create mode 100644 src/db/repo.js create mode 100644 src/services/pipeline.js create mode 100644 src/services/sse.js diff --git a/db/migrations/20260102021529_wa_identity_map.sql b/db/migrations/20260102021529_wa_identity_map.sql index 10e7c4a..e80795d 100644 --- a/db/migrations/20260102021529_wa_identity_map.sql +++ b/db/migrations/20260102021529_wa_identity_map.sql @@ -1,4 +1,13 @@ -- migrate:up +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE tenants ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + key text NOT NULL UNIQUE, + name text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now() +); + create table wa_identity_map ( tenant_id uuid not null references tenants(id) on delete cascade, wa_chat_id text not null, @@ -10,3 +19,6 @@ create table wa_identity_map ( -- migrate:down drop table if exists wa_identity_map; + +DROP TABLE IF EXISTS tenants; +DROP EXTENSION IF EXISTS pgcrypto; diff --git a/db/migrations/20260102021640_tenants.sql b/db/migrations/20260102021640_tenants.sql deleted file mode 100644 index 469b00d..0000000 --- a/db/migrations/20260102021640_tenants.sql +++ /dev/null @@ -1,13 +0,0 @@ --- migrate:up -CREATE EXTENSION IF NOT EXISTS pgcrypto; - -CREATE TABLE tenants ( - id uuid PRIMARY KEY DEFAULT gen_random_uuid(), - key text NOT NULL UNIQUE, - name text NOT NULL, - created_at timestamptz NOT NULL DEFAULT now() -); - --- migrate:down -DROP TABLE IF EXISTS tenants; -DROP EXTENSION IF EXISTS pgcrypto; \ No newline at end of file diff --git a/db/migrations/20260102031918_tenant_channels.sql b/db/migrations/20260102031918_tenant_channels.sql index e73bc47..eba8402 100644 --- a/db/migrations/20260102031918_tenant_channels.sql +++ b/db/migrations/20260102031918_tenant_channels.sql @@ -8,4 +8,4 @@ create table if not exists tenant_channels ( ); -- migrate:down -drop table if exists tenant_channels; +drop table if exists tenant_channels; \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index d2514ed..c6e2e39 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,5 +1,3 @@ -version: "3.9" - services: app: image: node:20-alpine diff --git a/index.js b/index.js index 4dfcaaa..f70503f 100644 --- a/index.js +++ b/index.js @@ -1,120 +1,35 @@ +import "dotenv/config"; import express from "express"; import cors from "cors"; import crypto from "crypto"; +import path from "path"; +import { fileURLToPath } from "url"; +import { ensureTenant, listConversations, listRuns, getRunById } from "./src/db/repo.js"; +import { addSseClient, removeSseClient, sseSend } from "./src/services/sse.js"; +import { processMessage } from "./src/services/pipeline.js"; const app = express(); app.use(cors()); app.use(express.json({ limit: "1mb" })); -app.use(express.static("public")); + +// Serve /public as static (UI + webcomponents) +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); +const publicDir = path.join(__dirname, "public"); +app.use(express.static(publicDir)); /** - * In-memory store (MVP). Luego lo pasás a Postgres. + * --- Tenant --- */ -const conversations = new Map(); // chat_id -> { chat_id, from, state, intent, last_activity, status, last_run_id } -const runs = new Map(); // run_id -> run payload +const TENANT_KEY = process.env.TENANT_KEY || "piaf"; +let TENANT_ID = null; -/** - * SSE clients - */ -const sseClients = new Set(); -function sseSend(event, data) { - const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; - for (const res of sseClients) res.write(payload); -} - -/** - * Helpers - */ function nowIso() { return new Date().toISOString(); } -function newId(prefix = "run") { - return `${prefix}_${crypto.randomUUID()}`; -} -function upsertConversation(chat_id, patch) { - const prev = conversations.get(chat_id) || { - chat_id, - from: patch.from || chat_id, - state: "IDLE", - intent: "other", - last_activity: nowIso(), - status: "ok", - last_run_id: null, - }; - const next = { ...prev, ...patch, last_activity: nowIso() }; - conversations.set(chat_id, next); - return next; -} /** - * TODO: Reemplazar esto por tu pipeline real: - * - loadState (Postgres) - * - llm.plan (Structured Output) - * - tools: searchProducts (limitado), createOrder, createPaymentLink - * - invariants - * - saveState - */ -async function processMessage({ chat_id, from, text }) { - // Simulación mínima: reemplazá por tu lógica real - const prevConv = conversations.get(chat_id) || null; - - const run_id = newId("run"); - const started_at = Date.now(); - - // Ejemplo de “plan” simulado - const plan = { - reply: `Recibido: "${text}". (Sim) ¿Querés retiro o envío?`, - next_state: "BUILDING_ORDER", - intent: "create_order", - missing_fields: ["delivery_or_pickup"], - order_action: "none", - basket_resolved: { items: [] }, - }; - - const invariants = { - ok: true, - checks: [ - { name: "required_keys_present", ok: true }, - { name: "no_checkout_without_payment_link", ok: true }, - ], - }; - - const run = { - run_id, - ts: nowIso(), - chat_id, - from, - status: "ok", - prev_state: prevConv?.state || "IDLE", - input: { text }, - llm_output: plan, - tools: [], // acá metés searchProducts/createOrder/createPaymentLink (con inputs/outputs recortados) - invariants, - final_reply: plan.reply, - order_id: null, - payment_link: null, - latency_ms: Date.now() - started_at, - }; - - runs.set(run_id, run); - - const conv = upsertConversation(chat_id, { - from, - state: plan.next_state, - intent: plan.intent, - status: run.status, - last_run_id: run_id, - }); - - // Push realtime - sseSend("conversation.upsert", conv); - sseSend("run.created", run); - - return run; -} - -/** - * SSE stream + * --- SSE endpoint --- */ app.get("/stream", (req, res) => { res.setHeader("Content-Type", "text/event-stream"); @@ -123,58 +38,101 @@ app.get("/stream", (req, res) => { res.flushHeaders?.(); res.write(`event: hello\ndata: ${JSON.stringify({ ts: nowIso() })}\n\n`); - sseClients.add(res); + addSseClient(res); req.on("close", () => { - sseClients.delete(res); + removeSseClient(res); res.end(); }); }); /** - * Simulated WhatsApp send + * --- Simulator --- + * POST /sim/send { chat_id, from_phone, text } */ app.post("/sim/send", async (req, res) => { const { chat_id, from_phone, text } = req.body || {}; if (!chat_id || !from_phone || !text) { - return res.status(400).json({ error: "chat_id, from_phone, text are required" }); + return res.status(400).json({ ok: false, error: "chat_id, from_phone, text are required" }); + } + + try { + const provider = "sim"; + const message_id = crypto.randomUUID(); // idempotencia por mensaje sim + const result = await processMessage({ + tenantId: TENANT_ID, + chat_id, + from: from_phone, + text, + provider, + message_id, + }); + res.json({ ok: true, run_id: result.run_id, reply: result.reply }); + } catch (err) { + console.error(err); + res.status(500).json({ ok: false, error: "internal_error", detail: String(err?.message || err) }); } - const run = await processMessage({ chat_id, from: from_phone, text }); - res.json({ ok: true, run_id: run.run_id, reply: run.final_reply }); }); /** - * List conversations + * --- UI data endpoints --- */ -app.get("/conversations", (req, res) => { - const { q = "", status = "", state = "" } = req.query; - const items = [...conversations.values()] - .filter(c => (q ? (c.chat_id.includes(q) || c.from.includes(q)) : true)) - .filter(c => (status ? c.status === status : true)) - .filter(c => (state ? c.state === state : true)) - .sort((a, b) => (a.last_activity < b.last_activity ? 1 : -1)); - res.json({ items }); +app.get("/conversations", async (req, res) => { + const { q = "", status = "", state = "", limit = "50" } = req.query; + try { + const items = await listConversations({ + tenant_id: TENANT_ID, + q: String(q || ""), + status: String(status || ""), + state: String(state || ""), + limit: parseInt(limit, 10) || 50, + }); + res.json({ items }); + } catch (err) { + console.error(err); + res.status(500).json({ ok: false, error: "internal_error" }); + } +}); + +app.get("/runs", async (req, res) => { + const { chat_id = null, limit = "50" } = req.query; + try { + const items = await listRuns({ + tenant_id: TENANT_ID, + wa_chat_id: chat_id ? String(chat_id) : null, + limit: parseInt(limit, 10) || 50, + }); + res.json({ items }); + } catch (err) { + console.error(err); + res.status(500).json({ ok: false, error: "internal_error" }); + } +}); + +app.get("/runs/:run_id", async (req, res) => { + try { + const run = await getRunById({ tenant_id: TENANT_ID, run_id: req.params.run_id }); + if (!run) return res.status(404).json({ ok: false, error: "not_found" }); + res.json(run); + } catch (err) { + console.error(err); + res.status(500).json({ ok: false, error: "internal_error" }); + } +}); + +app.get("/", (req, res) => { + res.sendFile(path.join(publicDir, "index.html")); }); /** - * Runs by chat + * --- Boot --- */ -app.get("/runs", (req, res) => { - const { chat_id, limit = "50" } = req.query; - const lim = Math.max(1, Math.min(200, parseInt(limit, 10) || 50)); - let items = [...runs.values()].sort((a, b) => (a.ts < b.ts ? 1 : -1)); - if (chat_id) items = items.filter(r => r.chat_id === chat_id); - res.json({ items: items.slice(0, lim) }); -}); - -/** - * Run detail - */ -app.get("/runs/:run_id", (req, res) => { - const run = runs.get(req.params.run_id); - if (!run) return res.status(404).json({ error: "not_found" }); - res.json(run); -}); - const port = process.env.PORT || 3000; -app.listen(port, () => console.log(`UI: http://localhost:${port}`)); + +(async function boot() { + TENANT_ID = await ensureTenant({ key: TENANT_KEY, name: TENANT_KEY.toUpperCase() }); + app.listen(port, () => console.log(`UI: http://localhost:${port} (tenant=${TENANT_KEY})`)); +})().catch((err) => { + console.error("Boot failed:", err); + process.exit(1); +}); diff --git a/package-lock.json b/package-lock.json index 76e6823..69029a3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,8 +10,10 @@ "license": "MIT", "dependencies": { "cors": "^2.8.5", + "dotenv": "^17.2.3", "express": "^4.19.2", "openai": "^6.15.0", + "pg": "^8.16.3", "zod": "^4.3.4" }, "devDependencies": { @@ -384,6 +386,18 @@ "npm": "1.2.8000 || >= 1.4.16" } }, + "node_modules/dotenv": { + "version": "17.2.3", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.2.3.tgz", + "integrity": "sha512-JVUnt+DUIzu87TABbhPmNfVdBDt18BLOWjMUFJMSi/Qqg7NTYtabbvSNJGOJ7afbRuv9D/lngizHtP7QyLQ+9w==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", @@ -1003,6 +1017,95 @@ "integrity": "sha512-RA1GjUVMnvYFxuqovrEqZoxxW5NUZqbwKtYz/Tt7nXerk0LbLblQmrsgdeOxV5SFHf0UDggjS/bSeOZwt1pmEQ==", "license": "MIT" }, + "node_modules/pg": { + "version": "8.16.3", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.16.3.tgz", + "integrity": "sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==", + "license": "MIT", + "dependencies": { + "pg-connection-string": "^2.9.1", + "pg-pool": "^3.10.1", + "pg-protocol": "^1.10.3", + "pg-types": "2.2.0", + "pgpass": "1.0.5" + }, + "engines": { + "node": ">= 16.0.0" + }, + "optionalDependencies": { + "pg-cloudflare": "^1.2.7" + }, + "peerDependencies": { + "pg-native": ">=3.0.1" + }, + "peerDependenciesMeta": { + "pg-native": { + "optional": true + } + } + }, + "node_modules/pg-cloudflare": { + "version": "1.2.7", + "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.2.7.tgz", + "integrity": "sha512-YgCtzMH0ptvZJslLM1ffsY4EuGaU0cx4XSdXLRFae8bPP4dS5xL1tNB3k2o/N64cHJpwU7dxKli/nZ2lUa5fLg==", + "license": "MIT", + "optional": true + }, + "node_modules/pg-connection-string": { + "version": "2.9.1", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.9.1.tgz", + "integrity": "sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==", + "license": "MIT" + }, + "node_modules/pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", + "license": "ISC", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/pg-pool": { + "version": "3.10.1", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.10.1.tgz", + "integrity": "sha512-Tu8jMlcX+9d8+QVzKIvM/uJtp07PKr82IUOYEphaWcoBhIYkoHpLXN3qO59nAI11ripznDsEzEv8nUxBVWajGg==", + "license": "MIT", + "peerDependencies": { + "pg": ">=8.0" + } + }, + "node_modules/pg-protocol": { + "version": "1.10.3", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.10.3.tgz", + "integrity": "sha512-6DIBgBQaTKDJyxnXaLiLR8wBpQQcGWuAESkRBX/t6OwA8YsqP+iVSiond2EDy6Y/dsGk8rh/jtax3js5NeV7JQ==", + "license": "MIT" + }, + "node_modules/pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "license": "MIT", + "dependencies": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + }, + "engines": { + "node": ">=4" + } + }, + "node_modules/pgpass": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "license": "MIT", + "dependencies": { + "split2": "^4.1.0" + } + }, "node_modules/picomatch": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", @@ -1016,6 +1119,45 @@ "url": "https://github.com/sponsors/jonschlinkert" } }, + "node_modules/postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/postgres-bytea": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.1.tgz", + "integrity": "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "license": "MIT", + "dependencies": { + "xtend": "^4.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -1263,6 +1405,15 @@ "node": ">=10" } }, + "node_modules/split2": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", + "license": "ISC", + "engines": { + "node": ">= 10.x" + } + }, "node_modules/statuses": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.2.tgz", @@ -1364,6 +1515,15 @@ "node": ">= 0.8" } }, + "node_modules/xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "license": "MIT", + "engines": { + "node": ">=0.4" + } + }, "node_modules/zod": { "version": "4.3.4", "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.4.tgz", diff --git a/package.json b/package.json index 61cacba..343f111 100644 --- a/package.json +++ b/package.json @@ -17,8 +17,10 @@ "license": "MIT", "dependencies": { "cors": "^2.8.5", + "dotenv": "^17.2.3", "express": "^4.19.2", "openai": "^6.15.0", + "pg": "^8.16.3", "zod": "^4.3.4" }, "devDependencies": { diff --git a/src/db/pool.js b/src/db/pool.js new file mode 100644 index 0000000..26e0aaa --- /dev/null +++ b/src/db/pool.js @@ -0,0 +1,15 @@ +import pg from "pg"; + +const { Pool } = pg; + +export const pool = new Pool({ + connectionString: process.env.DATABASE_URL, + max: parseInt(process.env.PG_POOL_MAX || "10", 10), + idleTimeoutMillis: parseInt(process.env.PG_IDLE_TIMEOUT_MS || "30000", 10), + connectionTimeoutMillis: parseInt(process.env.PG_CONN_TIMEOUT_MS || "5000", 10), +}); + +pool.on("error", (err) => { + console.error("[pg pool] unexpected error:", err); +}); + diff --git a/src/db/repo.js b/src/db/repo.js new file mode 100644 index 0000000..efc32a7 --- /dev/null +++ b/src/db/repo.js @@ -0,0 +1,287 @@ +import { pool } from "./pool.js"; + +export async function ensureTenant({ key, name }) { + const q = ` + insert into tenants (key, name) + values ($1, $2) + on conflict (key) do update set name = excluded.name + returning id + `; + const { rows } = await pool.query(q, [key, name]); + return rows[0].id; +} + +export async function getConversationState(tenant_id, wa_chat_id) { + const q = ` + select tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at + from wa_conversation_state + where tenant_id=$1 and wa_chat_id=$2 + `; + const { rows } = await pool.query(q, [tenant_id, wa_chat_id]); + return rows[0] || null; +} + +export async function upsertConversationState({ + tenant_id, + wa_chat_id, + state, + last_intent = null, + last_order_id = null, + context = {}, +}) { + const q = ` + insert into wa_conversation_state + (tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at) + values + ($1, $2, $3, now(), $4, $5, $6::jsonb, now()) + on conflict (tenant_id, wa_chat_id) + do update set + state = excluded.state, + state_updated_at = now(), + last_intent = excluded.last_intent, + last_order_id = excluded.last_order_id, + context = excluded.context, + updated_at = now() + returning tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at, updated_at + `; + const { rows } = await pool.query(q, [ + tenant_id, + wa_chat_id, + state, + last_intent, + last_order_id, + JSON.stringify(context ?? {}), + ]); + return rows[0]; +} + +export async function insertMessage({ + tenant_id, + wa_chat_id, + provider, + message_id, + direction, // in|out + text = null, + payload = {}, + run_id = null, + ts = null, +}) { + const q = ` + insert into wa_messages + (tenant_id, wa_chat_id, provider, message_id, direction, ts, text, payload, run_id) + values + ($1, $2, $3, $4, $5, coalesce($6::timestamptz, now()), $7, $8::jsonb, $9) + on conflict (tenant_id, provider, message_id) do nothing + returning id + `; + const { rows } = await pool.query(q, [ + tenant_id, + wa_chat_id, + provider, + message_id, + direction, + ts, + text, + JSON.stringify(payload ?? {}), + run_id, + ]); + return rows[0]?.id || null; +} + +export async function insertRun({ + tenant_id, + wa_chat_id, + message_id, + prev_state = null, + user_text = null, + llm_output = null, + tools = [], + invariants = {}, + final_reply = null, + order_id = null, + payment_link = null, + status = "ok", + error_code = null, + error_detail = null, + latency_ms = null, +}) { + const q = ` + insert into conversation_runs + (tenant_id, wa_chat_id, message_id, ts, prev_state, user_text, llm_output, tools, invariants, + final_reply, order_id, payment_link, status, error_code, error_detail, latency_ms) + values + ($1, $2, $3, now(), $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, + $9, $10, $11, $12, $13, $14, $15) + on conflict (tenant_id, message_id) do nothing + returning id + `; + const { rows } = await pool.query(q, [ + tenant_id, + wa_chat_id, + message_id, + prev_state, + user_text, + llm_output ? JSON.stringify(llm_output) : null, + JSON.stringify(tools ?? []), + JSON.stringify(invariants ?? {}), + final_reply, + order_id, + payment_link, + status, + error_code, + error_detail, + latency_ms, + ]); + return rows[0]?.id || null; +} + +export async function listConversations({ tenant_id, q = "", status = "", state = "", limit = 50 }) { + const params = [tenant_id]; + let where = `where tenant_id=$1`; + + if (q) { + params.push(`%${q}%`); + where += ` and wa_chat_id ilike $${params.length}`; + } + if (status) { + // status derivado no implementado en MVP + } + if (state) { + params.push(state); + where += ` and state = $${params.length}`; + } + + const qsql = ` + select tenant_id, wa_chat_id, + state, + coalesce(last_intent,'other') as intent, + updated_at as last_activity + from wa_conversation_state + ${where} + order by updated_at desc + limit ${Math.max(1, Math.min(200, limit))} + `; + + const { rows } = await pool.query(qsql, params); + + return rows.map((r) => ({ + chat_id: r.wa_chat_id, + from: r.wa_chat_id.replace(/^sim:/, ""), + state: r.state, + intent: r.intent, + status: "ok", + last_activity: r.last_activity, + last_run_id: null, + })); +} + +export async function listRuns({ tenant_id, wa_chat_id = null, limit = 50 }) { + const params = [tenant_id]; + let where = `where tenant_id=$1`; + if (wa_chat_id) { + params.push(wa_chat_id); + where += ` and wa_chat_id=$${params.length}`; + } + const q = ` + select id as run_id, ts, wa_chat_id as chat_id, + status, prev_state, user_text, + llm_output, tools, invariants, + final_reply, order_id, payment_link, latency_ms + from conversation_runs + ${where} + order by ts desc + limit ${Math.max(1, Math.min(200, limit))} + `; + const { rows } = await pool.query(q, params); + + return rows.map((r) => ({ + run_id: r.run_id, + ts: r.ts, + chat_id: r.chat_id, + from: r.chat_id.replace(/^sim:/, ""), + status: r.status, + prev_state: r.prev_state, + input: { text: r.user_text }, + llm_output: r.llm_output, + tools: r.tools, + invariants: r.invariants, + final_reply: r.final_reply, + order_id: r.order_id, + payment_link: r.payment_link, + latency_ms: r.latency_ms, + })); +} + +export async function getRunById({ tenant_id, run_id }) { + const q = ` + select id as run_id, ts, wa_chat_id as chat_id, + status, prev_state, user_text, + llm_output, tools, invariants, + final_reply, order_id, payment_link, latency_ms + from conversation_runs + where tenant_id=$1 and id=$2 + `; + const { rows } = await pool.query(q, [tenant_id, run_id]); + const r = rows[0]; + if (!r) return null; + return { + run_id: r.run_id, + ts: r.ts, + chat_id: r.chat_id, + from: r.chat_id.replace(/^sim:/, ""), + status: r.status, + prev_state: r.prev_state, + input: { text: r.user_text }, + llm_output: r.llm_output, + tools: r.tools, + invariants: r.invariants, + final_reply: r.final_reply, + order_id: r.order_id, + payment_link: r.payment_link, + latency_ms: r.latency_ms, + }; +} + +async function getRecentMessages({ tenant_id, wa_chat_id, limit = 20 }) { + const lim = Math.max(1, Math.min(50, parseInt(limit, 10) || 20)); // hard cap 50 + const q = ` + select direction, ts, text + from wa_messages + where tenant_id=$1 and wa_chat_id=$2 + order by ts desc + limit $3 + `; + const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]); + + // rows vienen newest-first, lo devolvemos oldest-first + return rows.reverse().map(r => ({ + role: r.direction === "in" ? "user" : "assistant", + content: r.text || "", + ts: r.ts, + })); +} + +export async function getRecentMessagesForLLM({ + tenant_id, + wa_chat_id, + limit = 20, + maxCharsPerMessage = 800, +}) { + const lim = Math.max(1, Math.min(50, parseInt(limit, 10) || 20)); + const q = ` + select direction, ts, text + from wa_messages + where tenant_id=$1 + and wa_chat_id=$2 + and text is not null + and length(trim(text)) > 0 + order by ts desc + limit $3 + `; + const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]); + + return rows.reverse().map(r => ({ + role: r.direction === "in" ? "user" : "assistant", + content: String(r.text).trim().slice(0, maxCharsPerMessage), + })); +} diff --git a/src/services/openai.js b/src/services/openai.js index b9f4f41..36e88d5 100644 --- a/src/services/openai.js +++ b/src/services/openai.js @@ -10,7 +10,7 @@ export async function llmPlan({ promptSystem, input }) { model: "gpt-5-mini", // o gpt-5 (más caro/mejor) / el que estés usando input: [ { role: "system", content: promptSystem }, - { role: "user", content: JSON.stringify(input) } + { role: "user", content: JSON.stringify(llmInput) } ], // Si estás usando "Structured Outputs" nativo, acá va tu schema. // En caso de que tu SDK no lo soporte directo, lo hacemos con zod/JSON parse robusto. diff --git a/src/services/pipeline.js b/src/services/pipeline.js new file mode 100644 index 0000000..c39a9cf --- /dev/null +++ b/src/services/pipeline.js @@ -0,0 +1,149 @@ +import crypto from "crypto"; +import { + getConversationState, + insertMessage, + insertRun, + upsertConversationState, + getRecentMessagesForLLM, +} from "../db/repo.js"; +import { sseSend } from "./sse.js"; + + +function nowIso() { + return new Date().toISOString(); +} + +function newId(prefix = "run") { + return `${prefix}_${crypto.randomUUID()}`; +} + +export async function processMessage({ tenantId, chat_id, from, text, provider, message_id }) { + const started_at = Date.now(); + const prev = await getConversationState(tenantId, chat_id); + const prev_state = prev?.state || "IDLE"; + + await insertMessage({ + tenant_id: tenantId, + wa_chat_id: chat_id, + provider, + message_id, + direction: "in", + text, + payload: { raw: { from, text } }, + run_id: null, + }); + + const plan = { + reply: `Recibido: "${text}". ¿Querés retiro o envío?`, + next_state: "BUILDING_ORDER", + intent: "create_order", + missing_fields: ["delivery_or_pickup"], + order_action: "none", + basket_resolved: { items: [] }, + }; + + const invariants = { + ok: true, + checks: [ + { name: "required_keys_present", ok: true }, + { name: "no_checkout_without_payment_link", ok: true }, + { name: "no_order_action_without_items", ok: true }, + ], + }; + + const latency_ms = Date.now() - started_at; + + const run_id = await insertRun({ + tenant_id: tenantId, + wa_chat_id: chat_id, + message_id: `${provider}:${message_id}`, + prev_state, + user_text: text, + llm_output: plan, + tools: [], + invariants, + final_reply: plan.reply, + status: "ok", + latency_ms, + }); + + const outMessageId = newId("out"); + await insertMessage({ + tenant_id: tenantId, + wa_chat_id: chat_id, + provider, + message_id: outMessageId, + direction: "out", + text: plan.reply, + payload: { reply: plan.reply }, + run_id, + }); + + const context = { + missing_fields: plan.missing_fields || [], + basket_resolved: plan.basket_resolved || { items: [] }, + }; + + const stateRow = await upsertConversationState({ + tenant_id: tenantId, + wa_chat_id: chat_id, + state: plan.next_state, + last_intent: plan.intent, + last_order_id: null, + context, + }); + + sseSend("conversation.upsert", { + chat_id: stateRow.wa_chat_id, + from: stateRow.wa_chat_id.replace(/^sim:/, ""), + state: stateRow.state, + intent: stateRow.last_intent || "other", + status: "ok", + last_activity: stateRow.updated_at, + last_run_id: run_id, + }); + + sseSend("run.created", { + run_id, + ts: nowIso(), + chat_id, + from, + status: "ok", + prev_state, + input: { text }, + llm_output: plan, + tools: [], + invariants, + final_reply: plan.reply, + order_id: null, + payment_link: null, + latency_ms, + }); + + const history = await getRecentMessagesForLLM({ + tenant_id: TENANT_ID, + wa_chat_id: chat_id, + limit: 20, + }); + + const llmInput = { + wa_chat_id: chat_id, + last_user_message: text, + conversation_history: history, + current_conversation_state: prev_state, + context: prev?.context || {}, + }; + + return { run_id, reply: plan.reply }; +} + +export function collapseAssistantMessages(messages) { + const out = []; + for (const m of messages) { + const last = out[out.length - 1]; + if (last && last.role === "assistant" && m.role === "assistant") continue; + out.push(m); + } + return out; +} + diff --git a/src/services/sse.js b/src/services/sse.js new file mode 100644 index 0000000..02622ae --- /dev/null +++ b/src/services/sse.js @@ -0,0 +1,15 @@ +const sseClients = new Set(); + +export function addSseClient(res) { + sseClients.add(res); +} + +export function removeSseClient(res) { + sseClients.delete(res); +} + +export function sseSend(event, data) { + const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; + for (const res of sseClients) res.write(payload); +} +