skeleton of the core app

This commit is contained in:
Lucas Tettamanti
2026-01-02 00:21:46 -03:00
parent a1fb46f70c
commit 947b0bd1ee
12 changed files with 735 additions and 152 deletions

15
src/db/pool.js Normal file
View File

@@ -0,0 +1,15 @@
import pg from "pg";
const { Pool } = pg;
export const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: parseInt(process.env.PG_POOL_MAX || "10", 10),
idleTimeoutMillis: parseInt(process.env.PG_IDLE_TIMEOUT_MS || "30000", 10),
connectionTimeoutMillis: parseInt(process.env.PG_CONN_TIMEOUT_MS || "5000", 10),
});
pool.on("error", (err) => {
console.error("[pg pool] unexpected error:", err);
});

287
src/db/repo.js Normal file
View File

@@ -0,0 +1,287 @@
import { pool } from "./pool.js";
export async function ensureTenant({ key, name }) {
const q = `
insert into tenants (key, name)
values ($1, $2)
on conflict (key) do update set name = excluded.name
returning id
`;
const { rows } = await pool.query(q, [key, name]);
return rows[0].id;
}
export async function getConversationState(tenant_id, wa_chat_id) {
const q = `
select tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at
from wa_conversation_state
where tenant_id=$1 and wa_chat_id=$2
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function upsertConversationState({
tenant_id,
wa_chat_id,
state,
last_intent = null,
last_order_id = null,
context = {},
}) {
const q = `
insert into wa_conversation_state
(tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, now())
on conflict (tenant_id, wa_chat_id)
do update set
state = excluded.state,
state_updated_at = now(),
last_intent = excluded.last_intent,
last_order_id = excluded.last_order_id,
context = excluded.context,
updated_at = now()
returning tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at, updated_at
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
state,
last_intent,
last_order_id,
JSON.stringify(context ?? {}),
]);
return rows[0];
}
export async function insertMessage({
tenant_id,
wa_chat_id,
provider,
message_id,
direction, // in|out
text = null,
payload = {},
run_id = null,
ts = null,
}) {
const q = `
insert into wa_messages
(tenant_id, wa_chat_id, provider, message_id, direction, ts, text, payload, run_id)
values
($1, $2, $3, $4, $5, coalesce($6::timestamptz, now()), $7, $8::jsonb, $9)
on conflict (tenant_id, provider, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
provider,
message_id,
direction,
ts,
text,
JSON.stringify(payload ?? {}),
run_id,
]);
return rows[0]?.id || null;
}
export async function insertRun({
tenant_id,
wa_chat_id,
message_id,
prev_state = null,
user_text = null,
llm_output = null,
tools = [],
invariants = {},
final_reply = null,
order_id = null,
payment_link = null,
status = "ok",
error_code = null,
error_detail = null,
latency_ms = null,
}) {
const q = `
insert into conversation_runs
(tenant_id, wa_chat_id, message_id, ts, prev_state, user_text, llm_output, tools, invariants,
final_reply, order_id, payment_link, status, error_code, error_detail, latency_ms)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, $7::jsonb, $8::jsonb,
$9, $10, $11, $12, $13, $14, $15)
on conflict (tenant_id, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
message_id,
prev_state,
user_text,
llm_output ? JSON.stringify(llm_output) : null,
JSON.stringify(tools ?? []),
JSON.stringify(invariants ?? {}),
final_reply,
order_id,
payment_link,
status,
error_code,
error_detail,
latency_ms,
]);
return rows[0]?.id || null;
}
export async function listConversations({ tenant_id, q = "", status = "", state = "", limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (q) {
params.push(`%${q}%`);
where += ` and wa_chat_id ilike $${params.length}`;
}
if (status) {
// status derivado no implementado en MVP
}
if (state) {
params.push(state);
where += ` and state = $${params.length}`;
}
const qsql = `
select tenant_id, wa_chat_id,
state,
coalesce(last_intent,'other') as intent,
updated_at as last_activity
from wa_conversation_state
${where}
order by updated_at desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(qsql, params);
return rows.map((r) => ({
chat_id: r.wa_chat_id,
from: r.wa_chat_id.replace(/^sim:/, ""),
state: r.state,
intent: r.intent,
status: "ok",
last_activity: r.last_activity,
last_run_id: null,
}));
}
export async function listRuns({ tenant_id, wa_chat_id = null, limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (wa_chat_id) {
params.push(wa_chat_id);
where += ` and wa_chat_id=$${params.length}`;
}
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
${where}
order by ts desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(q, params);
return rows.map((r) => ({
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
}));
}
export async function getRunById({ tenant_id, run_id }) {
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
where tenant_id=$1 and id=$2
`;
const { rows } = await pool.query(q, [tenant_id, run_id]);
const r = rows[0];
if (!r) return null;
return {
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
};
}
async function getRecentMessages({ tenant_id, wa_chat_id, limit = 20 }) {
const lim = Math.max(1, Math.min(50, parseInt(limit, 10) || 20)); // hard cap 50
const q = `
select direction, ts, text
from wa_messages
where tenant_id=$1 and wa_chat_id=$2
order by ts desc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
// rows vienen newest-first, lo devolvemos oldest-first
return rows.reverse().map(r => ({
role: r.direction === "in" ? "user" : "assistant",
content: r.text || "",
ts: r.ts,
}));
}
export async function getRecentMessagesForLLM({
tenant_id,
wa_chat_id,
limit = 20,
maxCharsPerMessage = 800,
}) {
const lim = Math.max(1, Math.min(50, parseInt(limit, 10) || 20));
const q = `
select direction, ts, text
from wa_messages
where tenant_id=$1
and wa_chat_id=$2
and text is not null
and length(trim(text)) > 0
order by ts desc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
return rows.reverse().map(r => ({
role: r.direction === "in" ? "user" : "assistant",
content: String(r.text).trim().slice(0, maxCharsPerMessage),
}));
}

View File

@@ -10,7 +10,7 @@ export async function llmPlan({ promptSystem, input }) {
model: "gpt-5-mini", // o gpt-5 (más caro/mejor) / el que estés usando
input: [
{ role: "system", content: promptSystem },
{ role: "user", content: JSON.stringify(input) }
{ role: "user", content: JSON.stringify(llmInput) }
],
// Si estás usando "Structured Outputs" nativo, acá va tu schema.
// En caso de que tu SDK no lo soporte directo, lo hacemos con zod/JSON parse robusto.

149
src/services/pipeline.js Normal file
View File

@@ -0,0 +1,149 @@
import crypto from "crypto";
import {
getConversationState,
insertMessage,
insertRun,
upsertConversationState,
getRecentMessagesForLLM,
} from "../db/repo.js";
import { sseSend } from "./sse.js";
function nowIso() {
return new Date().toISOString();
}
function newId(prefix = "run") {
return `${prefix}_${crypto.randomUUID()}`;
}
export async function processMessage({ tenantId, chat_id, from, text, provider, message_id }) {
const started_at = Date.now();
const prev = await getConversationState(tenantId, chat_id);
const prev_state = prev?.state || "IDLE";
await insertMessage({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider,
message_id,
direction: "in",
text,
payload: { raw: { from, text } },
run_id: null,
});
const plan = {
reply: `Recibido: "${text}". ¿Querés retiro o envío?`,
next_state: "BUILDING_ORDER",
intent: "create_order",
missing_fields: ["delivery_or_pickup"],
order_action: "none",
basket_resolved: { items: [] },
};
const invariants = {
ok: true,
checks: [
{ name: "required_keys_present", ok: true },
{ name: "no_checkout_without_payment_link", ok: true },
{ name: "no_order_action_without_items", ok: true },
],
};
const latency_ms = Date.now() - started_at;
const run_id = await insertRun({
tenant_id: tenantId,
wa_chat_id: chat_id,
message_id: `${provider}:${message_id}`,
prev_state,
user_text: text,
llm_output: plan,
tools: [],
invariants,
final_reply: plan.reply,
status: "ok",
latency_ms,
});
const outMessageId = newId("out");
await insertMessage({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider,
message_id: outMessageId,
direction: "out",
text: plan.reply,
payload: { reply: plan.reply },
run_id,
});
const context = {
missing_fields: plan.missing_fields || [],
basket_resolved: plan.basket_resolved || { items: [] },
};
const stateRow = await upsertConversationState({
tenant_id: tenantId,
wa_chat_id: chat_id,
state: plan.next_state,
last_intent: plan.intent,
last_order_id: null,
context,
});
sseSend("conversation.upsert", {
chat_id: stateRow.wa_chat_id,
from: stateRow.wa_chat_id.replace(/^sim:/, ""),
state: stateRow.state,
intent: stateRow.last_intent || "other",
status: "ok",
last_activity: stateRow.updated_at,
last_run_id: run_id,
});
sseSend("run.created", {
run_id,
ts: nowIso(),
chat_id,
from,
status: "ok",
prev_state,
input: { text },
llm_output: plan,
tools: [],
invariants,
final_reply: plan.reply,
order_id: null,
payment_link: null,
latency_ms,
});
const history = await getRecentMessagesForLLM({
tenant_id: TENANT_ID,
wa_chat_id: chat_id,
limit: 20,
});
const llmInput = {
wa_chat_id: chat_id,
last_user_message: text,
conversation_history: history,
current_conversation_state: prev_state,
context: prev?.context || {},
};
return { run_id, reply: plan.reply };
}
export function collapseAssistantMessages(messages) {
const out = [];
for (const m of messages) {
const last = out[out.length - 1];
if (last && last.role === "assistant" && m.role === "assistant") continue;
out.push(m);
}
return out;
}

15
src/services/sse.js Normal file
View File

@@ -0,0 +1,15 @@
const sseClients = new Set();
export function addSseClient(res) {
sseClients.add(res);
}
export function removeSseClient(res) {
sseClients.delete(res);
}
export function sseSend(event, data) {
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const res of sseClients) res.write(payload);
}