separated in modules

This commit is contained in:
Lucas Tettamanti
2026-01-15 22:45:33 -03:00
parent eedd16afdb
commit ea62385e3d
41 changed files with 1116 additions and 2918 deletions

View File

@@ -0,0 +1,59 @@
import { refreshProductByWooId } from "../../shared/wooSnapshot.js";
import { getTenantByKey } from "../db/repo.js";
function unauthorized(res) {
res.setHeader("WWW-Authenticate", 'Basic realm="woo-webhook"');
return res.status(401).json({ ok: false, error: "unauthorized" });
}
function checkBasicAuth(req) {
const user = process.env.WOO_WEBHOOK_USER || "";
const pass = process.env.WOO_WEBHOOK_PASS || "";
const auth = req.headers?.authorization || "";
if (!user || !pass) return { ok: false, reason: "missing_env" };
if (!auth.startsWith("Basic ")) return { ok: false, reason: "missing_basic" };
const decoded = Buffer.from(auth.slice(6), "base64").toString("utf8");
const [u, p] = decoded.split(":");
if (u === user && p === pass) return { ok: true };
return { ok: false, reason: "invalid_creds" };
}
function parseWooId(payload) {
const id = payload?.id || payload?.data?.id || null;
const parentId = payload?.parent_id || payload?.data?.parent_id || null;
const resource = payload?.resource || payload?.topic || null;
return { id: id ? Number(id) : null, parentId: parentId ? Number(parentId) : null, resource };
}
export function makeWooProductWebhook() {
return async function handleWooProductWebhook(req, res) {
const auth = checkBasicAuth(req);
if (!auth.ok) return unauthorized(res);
const { id, parentId, resource } = parseWooId(req.body || {});
if (!id) return res.status(400).json({ ok: false, error: "missing_id" });
// Determinar tenant por query ?tenant_key=...
const tenantKey = req.query?.tenant_key || process.env.TENANT_KEY || null;
if (!tenantKey) return res.status(400).json({ ok: false, error: "missing_tenant_key" });
const tenant = await getTenantByKey(String(tenantKey).toLowerCase());
if (!tenant?.id) return res.status(404).json({ ok: false, error: "tenant_not_found" });
const parentForVariation =
resource && String(resource).includes("variation") ? parentId || null : null;
const updated = await refreshProductByWooId({
tenantId: tenant.id,
wooId: id,
parentId: parentForVariation,
});
return res.status(200).json({
ok: true,
woo_id: updated?.woo_id || id,
type: updated?.type || null,
parent_id: updated?.parent_id || null,
});
};
}

View File

@@ -0,0 +1,15 @@
import pg from "pg";
const { Pool } = pg;
export const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: parseInt(process.env.PG_POOL_MAX || "10", 10),
idleTimeoutMillis: parseInt(process.env.PG_IDLE_TIMEOUT_MS || "30000", 10),
connectionTimeoutMillis: parseInt(process.env.PG_CONN_TIMEOUT_MS || "5000", 10),
});
pool.on("error", (err) => {
console.error("[pg pool] unexpected error:", err);
});

View File

@@ -0,0 +1,648 @@
import { pool } from "./pool.js";
export async function ensureTenant({ key, name }) {
const q = `
insert into tenants (key, name)
values ($1, $2)
on conflict (key) do update set name = excluded.name
returning id
`;
const { rows } = await pool.query(q, [key, name]);
return rows[0].id;
}
export async function getConversationState(tenant_id, wa_chat_id) {
const q = `
select tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at
from wa_conversation_state
where tenant_id=$1 and wa_chat_id=$2
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function upsertConversationState({
tenant_id,
wa_chat_id,
state,
last_intent = null,
last_order_id = null,
context = {},
}) {
const q = `
insert into wa_conversation_state
(tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, now())
on conflict (tenant_id, wa_chat_id)
do update set
state = excluded.state,
state_updated_at = now(),
last_intent = excluded.last_intent,
last_order_id = excluded.last_order_id,
context = excluded.context,
updated_at = now()
returning tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at, updated_at
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
state,
last_intent,
last_order_id,
JSON.stringify(context ?? {}),
]);
return rows[0];
}
// Crea la conversación si no existe y, si existe, solo “toca” updated_at (no pisa state/context).
export async function touchConversationState({ tenant_id, wa_chat_id }) {
const q = `
insert into wa_conversation_state
(tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at)
values
($1, $2, 'IDLE', now(), 'other', null, '{}'::jsonb, now())
on conflict (tenant_id, wa_chat_id)
do update set
updated_at = now()
returning tenant_id, wa_chat_id, state, last_intent, context, updated_at
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function insertMessage({
tenant_id,
wa_chat_id,
provider,
message_id,
direction, // in|out
text = null,
payload = {},
run_id = null,
ts = null,
}) {
const q = `
insert into wa_messages
(tenant_id, wa_chat_id, provider, message_id, direction, ts, text, payload, run_id)
values
($1, $2, $3, $4, $5, coalesce($6::timestamptz, now()), $7, $8::jsonb, $9)
on conflict (tenant_id, provider, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
provider,
message_id,
direction,
ts,
text,
JSON.stringify(payload ?? {}),
run_id,
]);
return rows[0]?.id || null;
}
export async function insertRun({
tenant_id,
wa_chat_id,
message_id,
prev_state = null,
user_text = null,
llm_output = null,
tools = [],
invariants = {},
final_reply = null,
order_id = null,
payment_link = null,
status = "ok",
error_code = null,
error_detail = null,
latency_ms = null,
}) {
const q = `
insert into conversation_runs
(tenant_id, wa_chat_id, message_id, ts, prev_state, user_text, llm_output, tools, invariants,
final_reply, order_id, payment_link, status, error_code, error_detail, latency_ms)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, $7::jsonb, $8::jsonb,
$9, $10, $11, $12, $13, $14, $15)
on conflict (tenant_id, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
message_id,
prev_state,
user_text,
llm_output ? JSON.stringify(llm_output) : null,
JSON.stringify(tools ?? []),
JSON.stringify(invariants ?? {}),
final_reply,
order_id,
payment_link,
status,
error_code,
error_detail,
latency_ms,
]);
return rows[0]?.id || null;
}
export async function updateRunLatency({ tenant_id, run_id, latency_ms }) {
if (!tenant_id || !run_id) return false;
const q = `
update conversation_runs
set latency_ms = $3
where tenant_id = $1 and id = $2
`;
await pool.query(q, [tenant_id, run_id, latency_ms]);
return true;
}
export async function listConversations({ tenant_id, q = "", status = "", state = "", limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (q) {
params.push(`%${q}%`);
where += ` and wa_chat_id ilike $${params.length}`;
}
if (status) {
// status derivado no implementado en MVP
}
if (state) {
params.push(state);
where += ` and state = $${params.length}`;
}
const qsql = `
select tenant_id, wa_chat_id,
state,
coalesce(last_intent,'other') as intent,
updated_at as last_activity
from wa_conversation_state
${where}
order by updated_at desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(qsql, params);
return rows.map((r) => ({
chat_id: r.wa_chat_id,
from: r.wa_chat_id.replace(/^sim:/, ""),
state: r.state,
intent: r.intent,
status: "ok",
last_activity: r.last_activity,
last_run_id: null,
}));
}
export async function listRuns({ tenant_id, wa_chat_id = null, limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (wa_chat_id) {
params.push(wa_chat_id);
where += ` and wa_chat_id=$${params.length}`;
}
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
${where}
order by ts desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(q, params);
return rows.map((r) => ({
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
}));
}
export async function getRunById({ tenant_id, run_id }) {
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
where tenant_id=$1 and id=$2
`;
const { rows } = await pool.query(q, [tenant_id, run_id]);
const r = rows[0];
if (!r) return null;
return {
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
};
}
export async function getRecentMessagesForLLM({
tenant_id,
wa_chat_id,
limit = 20,
maxCharsPerMessage = 800,
}) {
const lim = Math.max(1, Math.min(50, parseInt(limit, 10) || 20));
const q = `
select direction, ts, text
from wa_messages
where tenant_id=$1
and wa_chat_id=$2
and text is not null
and length(trim(text)) > 0
order by ts desc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
return rows.reverse().map((r) => ({
role: r.direction === "in" ? "user" : "assistant",
content: String(r.text).trim().slice(0, maxCharsPerMessage),
}));
}
export async function listMessages({ tenant_id, wa_chat_id, limit = 200 }) {
const lim = Math.max(1, Math.min(500, parseInt(limit, 10) || 200));
const q = `
select provider, message_id, direction, ts, text, payload, run_id
from wa_messages
where tenant_id=$1 and wa_chat_id=$2
order by ts asc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
return rows.map((r) => ({
provider: r.provider,
message_id: r.message_id,
direction: r.direction,
ts: r.ts,
text: r.text,
payload: r.payload,
run_id: r.run_id,
}));
}
export async function deleteConversationData({ tenant_id, wa_chat_id }) {
const client = await pool.connect();
try {
await client.query("BEGIN");
const r1 = await client.query(`delete from wa_messages where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
const r2 = await client.query(`delete from conversation_runs where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
const r3 = await client.query(`delete from wa_conversation_state where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
await client.query("COMMIT");
return { ok: true, deleted: { messages: r1.rowCount, runs: r2.rowCount, state: r3.rowCount } };
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
}
export async function listUsers({ tenant_id, q = "", limit = 200 }) {
const lim = Math.max(1, Math.min(500, parseInt(limit, 10) || 200));
const qstr = String(q || "").trim();
// Lista de “usuarios” = conversaciones existentes (wa_conversation_state), con join al mapping Woo si existe.
// Esto permite ver usuarios aunque nunca se haya creado el customer en Woo.
const sql = `
select *
from (
select s.wa_chat_id,
'woo' as provider,
m.external_customer_id,
lastmsg.ts as last_ts,
nullif(coalesce(lastmsg.payload #>> '{raw,meta,pushName}', lastmsg.payload #>> '{raw,meta,pushname}', ''), '') as push_name
from wa_conversation_state s
left join wa_identity_map m
on m.tenant_id = s.tenant_id
and m.wa_chat_id = s.wa_chat_id
and m.provider = 'woo'
left join lateral (
select ts, payload
from wa_messages
where tenant_id = s.tenant_id
and wa_chat_id = s.wa_chat_id
and direction = 'in'
order by ts desc
limit 1
) lastmsg on true
where s.tenant_id = $1
) t
where ($2 = '' or t.wa_chat_id ilike $3 or coalesce(t.push_name,'') ilike $3)
order by coalesce(t.last_ts, now()) desc
limit $4
`;
const like = qstr ? `%${qstr}%` : "";
const { rows } = await pool.query(sql, [tenant_id, qstr, like, lim]);
return rows.map((r) => ({
chat_id: r.wa_chat_id,
provider: r.provider,
external_customer_id: r.external_customer_id,
push_name: r.push_name || null,
last_ts: r.last_ts || null,
}));
}
export async function getLastInboundMessage({ tenant_id, wa_chat_id }) {
const q = `
select provider, message_id, ts, text, payload
from wa_messages
where tenant_id=$1 and wa_chat_id=$2 and direction='in'
order by ts desc
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function cleanupLastRunForRetry({ tenant_id, wa_chat_id }) {
const client = await pool.connect();
try {
await client.query("BEGIN");
const { rows } = await client.query(
`
select id
from conversation_runs
where tenant_id=$1 and wa_chat_id=$2
order by ts desc
limit 1
`,
[tenant_id, wa_chat_id]
);
const run_id = rows[0]?.id || null;
if (!run_id) {
await client.query("COMMIT");
return { ok: true, run_id: null, deleted_out_messages: 0, deleted_runs: 0 };
}
const r1 = await client.query(
`delete from wa_messages where tenant_id=$1 and wa_chat_id=$2 and run_id=$3 and direction='out'`,
[tenant_id, wa_chat_id, run_id]
);
const r2 = await client.query(`delete from conversation_runs where tenant_id=$1 and id=$2`, [tenant_id, run_id]);
await client.query("COMMIT");
return { ok: true, run_id, deleted_out_messages: r1.rowCount || 0, deleted_runs: r2.rowCount || 0 };
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
}
export async function getIdentityMapByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `
select tenant_id, wa_chat_id, provider, external_customer_id, created_at, updated_at
from wa_identity_map
where tenant_id=$1 and wa_chat_id=$2 and provider=$3
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rows[0] || null;
}
export async function deleteIdentityMapByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `delete from wa_identity_map where tenant_id=$1 and wa_chat_id=$2 and provider=$3`;
const { rowCount } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rowCount || 0;
}
export async function getTenantByKey(key) {
const { rows } = await pool.query(`select id, key, name from tenants where key=$1`, [key]);
return rows[0] || null;
}
export async function getTenantIdByChannel({ channel_type, channel_key }) {
const q = `
select tenant_id
from tenant_channels
where channel_type=$1 and channel_key=$2
`;
const { rows } = await pool.query(q, [channel_type, channel_key]);
return rows[0]?.tenant_id || null;
}
export async function getExternalCustomerIdByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `
select external_customer_id
from wa_identity_map
where tenant_id=$1 and wa_chat_id=$2 and provider=$3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rows[0]?.external_customer_id || null;
}
export async function upsertExternalCustomerMap({
tenant_id,
wa_chat_id,
external_customer_id,
provider = "woo",
}) {
const q = `
insert into wa_identity_map (tenant_id, wa_chat_id, provider, external_customer_id, created_at, updated_at)
values ($1, $2, $3, $4, now(), now())
on conflict (tenant_id, wa_chat_id)
do update set external_customer_id = excluded.external_customer_id, updated_at = now()
returning external_customer_id
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider, external_customer_id]);
return rows[0]?.external_customer_id || null;
}
export async function getTenantEcommerceConfig({ tenant_id, provider = "woo" }) {
const q = `
select id, tenant_id, provider, base_url, credential_ref, api_version, timeout_ms,
enc_consumer_key, enc_consumer_secret, encryption_salt, enabled
from tenant_ecommerce_config
where tenant_id = $1 and provider = $2 and enabled = true
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, provider]);
return rows[0] || null;
}
export async function getDecryptedTenantEcommerceConfig({
tenant_id,
provider = "woo",
encryption_key,
}) {
if (!encryption_key) {
throw new Error("encryption_key is required to decrypt ecommerce credentials");
}
const q = `
select id, tenant_id, provider, base_url, credential_ref, api_version, timeout_ms, enabled,
pgp_sym_decrypt(enc_consumer_key, $3)::text as consumer_key,
pgp_sym_decrypt(enc_consumer_secret, $3)::text as consumer_secret
from tenant_ecommerce_config
where tenant_id = $1 and provider = $2 and enabled = true
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, provider, encryption_key]);
return rows[0] || null;
}
export async function searchProductAliases({ tenant_id, q = "", limit = 20 }) {
const lim = Math.max(1, Math.min(200, parseInt(limit, 10) || 20));
const query = String(q || "").trim();
if (!query) return [];
const normalized = query.toLowerCase();
const like = `%${query}%`;
const nlike = `%${normalized}%`;
const sql = `
select tenant_id, alias, normalized_alias, woo_product_id, category_hint, boost, metadata, updated_at
from product_aliases
where tenant_id=$1
and (alias ilike $2 or normalized_alias ilike $3)
order by boost desc, updated_at desc
limit $4
`;
const { rows } = await pool.query(sql, [tenant_id, like, nlike, lim]);
return rows.map((r) => ({
tenant_id: r.tenant_id,
alias: r.alias,
normalized_alias: r.normalized_alias,
woo_product_id: r.woo_product_id,
category_hint: r.category_hint,
boost: r.boost,
metadata: r.metadata,
updated_at: r.updated_at,
}));
}
export async function getProductEmbedding({ tenant_id, content_hash }) {
const sql = `
select tenant_id, content_hash, content_text, embedding, model, updated_at
from product_embeddings_cache
where tenant_id=$1 and content_hash=$2
limit 1
`;
const { rows } = await pool.query(sql, [tenant_id, content_hash]);
return rows[0] || null;
}
export async function upsertProductEmbedding({
tenant_id,
content_hash,
content_text,
embedding,
model,
}) {
const sql = `
insert into product_embeddings_cache
(tenant_id, content_hash, content_text, embedding, model, updated_at)
values
($1, $2, $3, $4::jsonb, $5, now())
on conflict (tenant_id, content_hash)
do update set
content_text = excluded.content_text,
embedding = excluded.embedding,
model = excluded.model,
updated_at = now()
returning tenant_id, content_hash, content_text, embedding, model, updated_at
`;
const { rows } = await pool.query(sql, [
tenant_id,
content_hash,
content_text,
JSON.stringify(embedding ?? []),
model,
]);
return rows[0] || null;
}
export async function upsertMpPayment({
tenant_id,
woo_order_id = null,
preference_id = null,
payment_id = null,
status = null,
paid_at = null,
raw = {},
}) {
if (!payment_id) throw new Error("payment_id_required");
const sql = `
insert into mp_payments
(tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, created_at, updated_at)
values
($1, $2, $3, $4, $5, $6::timestamptz, $7::jsonb, now(), now())
on conflict (tenant_id, payment_id)
do update set
woo_order_id = excluded.woo_order_id,
preference_id = excluded.preference_id,
status = excluded.status,
paid_at = excluded.paid_at,
raw = excluded.raw,
updated_at = now()
returning tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, updated_at
`;
const { rows } = await pool.query(sql, [
tenant_id,
woo_order_id,
preference_id,
payment_id,
status,
paid_at,
JSON.stringify(raw ?? {}),
]);
return rows[0] || null;
}
export async function getMpPaymentById({ tenant_id, payment_id }) {
const sql = `
select tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, updated_at
from mp_payments
where tenant_id=$1 and payment_id=$2
limit 1
`;
const { rows } = await pool.query(sql, [tenant_id, payment_id]);
return rows[0] || null;
}

View File

@@ -0,0 +1,9 @@
import express from "express";
import { makeWooProductWebhook } from "../controllers/wooWebhooks.js";
export function createWooWebhooksRouter() {
const router = express.Router();
router.post("/webhook/woo/products", makeWooProductWebhook());
return router;
}

View File

@@ -0,0 +1,437 @@
import crypto from "crypto";
import {
getConversationState,
insertMessage,
insertRun,
touchConversationState,
upsertConversationState,
getRecentMessagesForLLM,
getExternalCustomerIdByChat,
upsertExternalCustomerMap,
updateRunLatency,
getTenantByKey,
getTenantIdByChannel,
} from "../db/repo.js";
import { sseSend } from "../../shared/sse.js";
import { createWooCustomer, getWooCustomerById } from "./woo.js";
import { debug as dbg } from "../../shared/debug.js";
import { runTurnV3 } from "../../3-turn-engine/turnEngineV3.js";
import { safeNextState } from "../../3-turn-engine/fsm.js";
import { createOrder, updateOrder } from "../../4-woo-orders/wooOrders.js";
import { createPreference } from "../../6-mercadopago/mercadoPago.js";
function nowIso() {
return new Date().toISOString();
}
function newId(prefix = "run") {
return `${prefix}_${crypto.randomUUID()}`;
}
function makePerf() {
const started_at = Date.now();
const perf = { t0: started_at, marks: {} };
const mark = (name) => {
perf.marks[name] = Date.now();
};
const msBetween = (a, b) => {
const ta = a === "t0" ? perf.t0 : perf.marks[a];
const tb = b === "t0" ? perf.t0 : perf.marks[b];
if (!ta || !tb) return null;
return tb - ta;
};
return { started_at, perf, mark, msBetween };
}
function logStage(enabled, stage, payload) {
if (!enabled) return;
console.log(`[pipeline] ${stage}`, payload);
}
function collapseAssistantMessages(messages) {
const out = [];
for (const m of messages || []) {
const last = out[out.length - 1];
if (last && last.role === "assistant" && m.role === "assistant") continue;
out.push(m);
}
return out;
}
async function ensureWooCustomerId({
tenantId,
chat_id,
displayName,
from,
externalCustomerId,
run_id,
}) {
let updatedId = externalCustomerId;
let error = null;
try {
if (updatedId) {
const found = await getWooCustomerById({ tenantId, id: updatedId });
if (!found) {
const phone = chat_id.replace(/@.+$/, "");
const name = displayName || from || phone;
const created = await createWooCustomer({ tenantId, wa_chat_id: chat_id, phone, name });
if (!created?.id) throw new Error("woo_customer_id_missing");
updatedId = await upsertExternalCustomerMap({
tenant_id: tenantId,
wa_chat_id: chat_id,
external_customer_id: created?.id,
provider: "woo",
});
} else {
updatedId = await upsertExternalCustomerMap({
tenant_id: tenantId,
wa_chat_id: chat_id,
external_customer_id: updatedId,
provider: "woo",
});
}
} else {
const phone = chat_id.replace(/@.+$/, "");
const name = displayName || from || phone;
const created = await createWooCustomer({ tenantId, wa_chat_id: chat_id, phone, name });
if (!created?.id) throw new Error("woo_customer_id_missing");
updatedId = await upsertExternalCustomerMap({
tenant_id: tenantId,
wa_chat_id: chat_id,
external_customer_id: created?.id,
provider: "woo",
});
}
} catch (e) {
error = {
message: String(e?.message || e),
status: e?.status || e?.cause?.status || null,
code: e?.body?.code || e?.cause?.body?.code || null,
run_id: run_id || null,
};
}
return { external_customer_id: updatedId, error };
}
export async function processMessage({
tenantId,
chat_id,
from,
text,
provider,
message_id,
displayName = null,
meta = null,
}) {
const { started_at, mark, msBetween } = makePerf();
await touchConversationState({ tenant_id: tenantId, wa_chat_id: chat_id });
mark("start");
const stageDebug = dbg.perf;
const prev = await getConversationState(tenantId, chat_id);
mark("after_getConversationState");
const isStale =
prev?.state_updated_at &&
Date.now() - new Date(prev.state_updated_at).getTime() > 24 * 60 * 60 * 1000;
const prev_state = isStale ? "IDLE" : prev?.state || "IDLE";
let externalCustomerId = await getExternalCustomerIdByChat({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider: "woo",
});
mark("after_getExternalCustomerIdByChat");
await insertMessage({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider,
message_id,
direction: "in",
text,
payload: { raw: { from, text, meta } },
run_id: null,
});
mark("after_insertMessage_in");
mark("before_getRecentMessagesForLLM_for_plan");
const history = await getRecentMessagesForLLM({
tenant_id: tenantId,
wa_chat_id: chat_id,
limit: 20,
});
const conversation_history = collapseAssistantMessages(history);
mark("after_getRecentMessagesForLLM_for_plan");
logStage(stageDebug, "history", { has_history: Array.isArray(conversation_history), state: prev_state });
let reducedContext = prev?.context && typeof prev?.context === "object" ? { ...prev.context } : {};
let decision;
let plan;
let llmMeta;
let tools = [];
mark("before_turn_v3");
const out = await runTurnV3({
tenantId,
chat_id,
text,
prev_state,
prev_context: reducedContext,
conversation_history,
});
plan = out.plan;
decision = out.decision || { context_patch: {}, actions: [], audit: {} };
llmMeta = { kind: "nlu_v3", audit: decision.audit || null };
tools = [];
mark("after_turn_v3");
const runStatus = llmMeta?.error ? "warn" : "ok";
const isSimulated = provider === "sim" || meta?.source === "sim";
const invariants = {
ok: true,
checks: [
{ name: "required_keys_present", ok: true },
{ name: "no_checkout_without_payment_link", ok: true },
{ name: "no_order_action_without_items", ok: true },
],
};
mark("before_insertRun");
// --- Ejecutar acciones determinísticas ---
let actionPatch = {};
if (Array.isArray(decision?.actions) && decision.actions.length) {
const newTools = [];
const actions = decision.actions;
const calcOrderTotal = (order) => {
const rawTotal = Number(order?.raw?.total);
if (Number.isFinite(rawTotal) && rawTotal > 0) return rawTotal;
const items = Array.isArray(order?.line_items) ? order.line_items : [];
let sum = 0;
for (const it of items) {
const t = Number(it?.total);
if (Number.isFinite(t)) sum += t;
}
return sum > 0 ? sum : null;
};
const needsWoo = actions.some((a) => a.type === "create_order" || a.type === "update_order");
if (needsWoo) {
const ensured = await ensureWooCustomerId({
tenantId,
chat_id,
displayName,
from,
externalCustomerId,
});
externalCustomerId = ensured.external_customer_id;
if (ensured.error) {
newTools.push({ type: "ensure_woo_customer", ok: false, error: ensured.error });
} else {
newTools.push({ type: "ensure_woo_customer", ok: true, external_customer_id: externalCustomerId });
}
}
for (const act of actions) {
try {
if (act.type === "create_order") {
const order = await createOrder({
tenantId,
wooCustomerId: externalCustomerId,
basket: reducedContext?.order_basket || plan?.basket_resolved || { items: [] },
address: reducedContext?.delivery_address || reducedContext?.address || null,
run_id: null,
});
actionPatch.woo_order_id = order?.id || null;
actionPatch.order_total = calcOrderTotal(order);
newTools.push({ type: "create_order", ok: true, order_id: order?.id || null });
} else if (act.type === "update_order") {
const order = await updateOrder({
tenantId,
wooOrderId: reducedContext?.woo_order_id || prev?.last_order_id || null,
basket: reducedContext?.order_basket || plan?.basket_resolved || { items: [] },
address: reducedContext?.delivery_address || reducedContext?.address || null,
run_id: null,
});
actionPatch.woo_order_id = order?.id || null;
actionPatch.order_total = calcOrderTotal(order);
newTools.push({ type: "update_order", ok: true, order_id: order?.id || null });
} else if (act.type === "send_payment_link") {
const total = Number(actionPatch?.order_total || reducedContext?.order_total || 0) || null;
if (!total || total <= 0) {
throw new Error("order_total_missing");
}
const pref = await createPreference({
tenantId,
wooOrderId: actionPatch.woo_order_id || reducedContext?.woo_order_id || prev?.last_order_id,
amount: total || 0,
});
actionPatch.payment_link = pref?.init_point || null;
actionPatch.mp = {
preference_id: pref?.preference_id || null,
init_point: pref?.init_point || null,
};
newTools.push({ type: "send_payment_link", ok: true, preference_id: pref?.preference_id || null });
if (pref?.init_point) {
plan.reply = `Listo, acá tenés el link de pago: ${pref.init_point}\nAvisame cuando esté listo.`;
}
}
} catch (e) {
newTools.push({ type: act.type, ok: false, error: String(e?.message || e) });
}
}
tools = newTools;
}
const run_id = await insertRun({
tenant_id: tenantId,
wa_chat_id: chat_id,
message_id: `${provider}:${message_id}`,
prev_state,
user_text: text,
llm_output: { ...plan, _llm: llmMeta },
tools,
invariants,
final_reply: plan.reply,
status: runStatus,
latency_ms: null,
});
mark("after_insertRun");
const outMessageId = newId("out");
await insertMessage({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider,
message_id: outMessageId,
direction: "out",
text: plan.reply,
payload: { reply: plan.reply, railguard: { simulated: isSimulated, source: meta?.source || null } },
run_id,
});
mark("after_insertMessage_out");
if (llmMeta?.error) {
const errMsgId = newId("err");
await insertMessage({
tenant_id: tenantId,
wa_chat_id: chat_id,
provider: "system",
message_id: errMsgId,
direction: "out",
text: `[ERROR] openai: ${llmMeta.error}`,
payload: { error: { source: "openai", ...llmMeta }, railguard: { simulated: isSimulated, source: meta?.source || null } },
run_id,
});
}
let wooCustomerError = null;
if (tools.some((t) => t.type === "ensure_woo_customer" && !t.ok)) {
wooCustomerError = tools.find((t) => t.type === "ensure_woo_customer" && !t.ok)?.error || null;
}
const context = {
...(reducedContext || {}),
...(decision?.context_patch || {}),
...(actionPatch || {}),
missing_fields: plan.missing_fields || [],
basket_resolved: (reducedContext?.order_basket?.items?.length ? reducedContext.order_basket : plan.basket_resolved) || { items: [] },
external_customer_id: externalCustomerId ?? prev?.context?.external_customer_id ?? null,
railguard: { simulated: isSimulated, source: meta?.source || null },
woo_customer_error: wooCustomerError,
};
const nextState = safeNextState(prev_state, context, { requested_checkout: plan.intent === "checkout" }).next_state;
plan.next_state = nextState;
const stateRow = await upsertConversationState({
tenant_id: tenantId,
wa_chat_id: chat_id,
state: nextState,
last_intent: plan.intent,
last_order_id: null,
context,
});
mark("after_upsertConversationState");
sseSend("conversation.upsert", {
chat_id: stateRow.wa_chat_id,
from: stateRow.wa_chat_id.replace(/^sim:/, ""),
state: stateRow.state,
intent: stateRow.last_intent || "other",
status: runStatus,
last_activity: stateRow.updated_at,
last_run_id: run_id,
});
const end_to_end_ms = Date.now() - started_at;
if (run_id) {
await updateRunLatency({ tenant_id: tenantId, run_id, latency_ms: end_to_end_ms });
}
sseSend("run.created", {
run_id,
ts: nowIso(),
chat_id,
from,
status: runStatus,
prev_state,
input: { text },
llm_output: { ...plan, _llm: llmMeta },
tools,
invariants,
final_reply: plan.reply,
order_id: actionPatch.woo_order_id || null,
payment_link: actionPatch.payment_link || null,
latency_ms: end_to_end_ms,
});
console.log("[perf] processMessage", {
tenantId,
chat_id,
provider,
message_id,
run_id,
end_to_end_ms,
ms: {
db_state_ms: msBetween("start", "after_getConversationState"),
db_identity_ms: msBetween("after_getConversationState", "after_getExternalCustomerIdByChat"),
insert_in_ms: msBetween("after_getExternalCustomerIdByChat", "after_insertMessage_in"),
history_for_plan_ms: msBetween("after_insertMessage_in", "after_getRecentMessagesForLLM_for_plan"),
insert_run_ms: msBetween("before_insertRun", "after_insertRun"),
insert_out_ms: msBetween("after_insertRun", "after_insertMessage_out"),
upsert_state_ms: msBetween("after_insertMessage_out", "after_upsertConversationState"),
},
});
return { run_id, reply: plan.reply };
}
function parseTenantFromChatId(chat_id) {
const m = /^([a-z0-9_-]+):/.exec(chat_id);
return m?.[1]?.toLowerCase() || null;
}
export async function resolveTenantId({ chat_id, to_phone = null, tenant_key = null }) {
const explicit = (tenant_key || parseTenantFromChatId(chat_id) || "").toLowerCase();
if (explicit) {
const t = await getTenantByKey(explicit);
if (t) return t.id;
throw new Error(`tenant_not_found: ${explicit}`);
}
if (to_phone) {
const id = await getTenantIdByChannel({ channel_type: "whatsapp", channel_key: to_phone });
if (id) return id;
}
const fallbackKey = (process.env.TENANT_KEY || "piaf").toLowerCase();
const t = await getTenantByKey(fallbackKey);
if (t) return t.id;
throw new Error(`tenant_not_found: ${fallbackKey}`);
}

View File

@@ -0,0 +1,433 @@
import crypto from "crypto";
import { getDecryptedTenantEcommerceConfig } from "../db/repo.js";
import { debug } from "../../shared/debug.js";
// --- Simple in-memory lock to serialize work per key (e.g. wa_chat_id) ---
const locks = new Map();
async function withLock(key, fn) {
const prev = locks.get(key) || Promise.resolve();
let release;
const next = new Promise((r) => (release = r));
locks.set(key, prev.then(() => next));
const queuedAt = Date.now();
await prev;
const acquiredAt = Date.now();
try {
return await fn({ lock_wait_ms: acquiredAt - queuedAt });
} finally {
release();
// cleanup
if (locks.get(key) === next) locks.delete(key);
}
}
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
function getDeep(obj, path) {
let cur = obj;
for (const k of path) {
cur = cur?.[k];
}
return cur;
}
function isRetryableNetworkError(err) {
// fetchWoo wraps errors as { message: "...", cause: originalError }
const e0 = err;
const e1 = err?.cause;
const e2 = getDeep(err, ["cause", "cause"]);
const candidates = [e0, e1, e2].filter(Boolean);
const codes = new Set(candidates.map((e) => e.code).filter(Boolean));
const names = new Set(candidates.map((e) => e.name).filter(Boolean));
const messages = candidates.map((e) => String(e.message || "")).join(" | ").toLowerCase();
const aborted =
names.has("AbortError") ||
messages.includes("aborted") ||
messages.includes("timeout") ||
messages.includes("timed out");
// ECONNRESET/ETIMEDOUT are common; undici may also surface UND_ERR_* codes
const retryCodes = new Set(["ECONNRESET", "ETIMEDOUT", "UND_ERR_CONNECT_TIMEOUT", "UND_ERR_SOCKET"]);
const byCode = [...codes].some((c) => retryCodes.has(c));
return aborted || byCode;
}
async function fetchWoo({ url, method = "GET", body = null, timeout = 20000, headers = {} }) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(new Error("timeout")), timeout);
const t0 = Date.now();
try {
const res = await fetch(url, {
method,
headers: {
"Content-Type": "application/json",
...headers,
},
body: body ? JSON.stringify(body) : undefined,
signal: controller.signal,
});
if (debug.wooHttp) console.log("woo headers in", Date.now() - t0, "ms", res.status);
const text = await res.text();
if (debug.wooHttp) console.log("woo body in", Date.now() - t0, "ms", "len", text.length);
let parsed;
try {
parsed = text ? JSON.parse(text) : null;
} catch {
parsed = text;
}
if (!res.ok) {
const err = new Error(`Woo HTTP ${res.status}`);
err.status = res.status;
err.body = parsed;
err.url = redactWooUrl(url);
err.method = method;
throw err;
}
return parsed;
} catch (e) {
const err = new Error(`Woo request failed after ${Date.now() - t0}ms: ${e.message}`);
err.cause = e;
// Propagar status/body para que el caller pueda decidir retries/auth fallback
err.status = e?.status || null;
err.body = e?.body || null;
err.url = redactWooUrl(url);
err.method = method;
throw err;
} finally {
clearTimeout(timer);
}
}
function redactWooUrl(url) {
try {
const u = new URL(url);
if (u.searchParams.has("consumer_key")) u.searchParams.set("consumer_key", "REDACTED");
if (u.searchParams.has("consumer_secret")) u.searchParams.set("consumer_secret", "REDACTED");
return u.toString();
} catch {
return url;
}
}
async function searchWooCustomerByEmail({ base, consumerKey, consumerSecret, email, timeout }) {
const url = `${base}/customers?email=${encodeURIComponent(email)}`;
const auth = Buffer.from(`${consumerKey}:${consumerSecret}`).toString("base64");
const data = await fetchWoo({
url,
method: "GET",
timeout,
headers: { Authorization: `Basic ${auth}` },
});
if (Array.isArray(data) && data.length > 0) return data[0];
return null;
}
async function searchWooCustomerByUsername({ base, consumerKey, consumerSecret, username, timeout }) {
const url = `${base}/customers?search=${encodeURIComponent(username)}`;
const auth = Buffer.from(`${consumerKey}:${consumerSecret}`).toString("base64");
const data = await fetchWoo({
url,
method: "GET",
timeout,
headers: { Authorization: `Basic ${auth}` },
});
if (Array.isArray(data) && data.length > 0) {
const exact = data.find((c) => c.username === username);
return exact || data[0];
}
return null;
}
export async function createWooCustomer({ tenantId, wa_chat_id, phone, name }) {
const lockKey = `${tenantId}:${wa_chat_id}`;
const t0 = Date.now();
return withLock(lockKey, async ({ lock_wait_ms }) => {
const encryptionKey = process.env.APP_ENCRYPTION_KEY;
if (!encryptionKey) throw new Error("APP_ENCRYPTION_KEY is required to decrypt Woo credentials");
const cfg = await getDecryptedTenantEcommerceConfig({
tenant_id: tenantId,
provider: "woo",
encryption_key: encryptionKey,
});
if (!cfg) throw new Error("Woo config not found for tenant");
const consumerKey =
cfg.consumer_key ||
process.env.WOO_CONSUMER_KEY ||
(() => {
throw new Error("consumer_key not set");
})();
const consumerSecret =
cfg.consumer_secret ||
process.env.WOO_CONSUMER_SECRET ||
(() => {
throw new Error("consumer_secret not set");
})();
const base = cfg.base_url.replace(/\/+$/, "");
const email = `${phone || wa_chat_id}@no-email.local`;
const username = wa_chat_id;
const timeout = Math.max(cfg.timeout_ms ?? 20000, 20000);
const getTimeout = timeout;
const postTimeout = 2000;
// Para existencia/idempotencia: email primero (más estable y liviano que search=...)
const existing = await searchWooCustomerByEmail({
base,
consumerKey,
consumerSecret,
email,
timeout: getTimeout,
});
if (existing?.id) {
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: true,
reason: "email_hit",
lock_wait_ms,
total_ms,
});
return { id: existing.id, raw: existing, reused: true };
}
// Fallback (solo por compatibilidad): si alguien creó el customer con username pero email distinto
const existingByUser = await searchWooCustomerByUsername({
base,
consumerKey,
consumerSecret,
username,
timeout: getTimeout,
});
if (existingByUser?.id) {
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: true,
reason: "username_hit",
lock_wait_ms,
total_ms,
});
return { id: existingByUser.id, raw: existingByUser, reused: true };
}
const url = `${base}/customers`;
const auth = Buffer.from(`${consumerKey}:${consumerSecret}`).toString("base64");
const payload = {
email,
first_name: name || phone || wa_chat_id,
username,
password: crypto.randomBytes(12).toString("base64url"), // requerido por Woo
billing: {
phone: phone || wa_chat_id,
},
};
const doPost = async () =>
await fetchWoo({
url,
method: "POST",
body: payload,
timeout: postTimeout,
headers: { Authorization: `Basic ${auth}` },
});
try {
const data = await doPost();
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: false,
reason: "post_ok",
lock_wait_ms,
total_ms,
});
return { id: data?.id, raw: data, reused: false };
} catch (err) {
// Caso estándar: Woo dice "email exists" => recuperar por email y devolver
if (err.status === 400 && err.body?.code === "registration-error-email-exists") {
const found = await searchWooCustomerByEmail({
base,
consumerKey,
consumerSecret,
email,
timeout: getTimeout,
});
if (found?.id) {
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: true,
reason: "email_exists_recovered",
lock_wait_ms,
total_ms,
});
return { id: found.id, raw: found, reused: true };
}
}
// Retry seguro solo para timeouts / ECONNRESET:
// POST pudo haber creado el customer pero la respuesta no volvió => hacer GET por email.
if (isRetryableNetworkError(err)) {
await sleep(300 + Math.floor(Math.random() * 300));
const foundAfterTimeout = await searchWooCustomerByEmail({
base,
consumerKey,
consumerSecret,
email,
timeout: getTimeout,
});
if (foundAfterTimeout?.id) {
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: true,
reason: "timeout_get_recovered",
lock_wait_ms,
total_ms,
});
return { id: foundAfterTimeout.id, raw: foundAfterTimeout, reused: true };
}
// si no existe, reintentar POST una vez
try {
const data2 = await doPost();
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: false,
reason: "timeout_post_retry_ok",
lock_wait_ms,
total_ms,
});
return { id: data2?.id, raw: data2, reused: false };
} catch (err2) {
// último intento de "recovery" (por si 2do POST también timeouteó)
if (isRetryableNetworkError(err2)) {
const foundAfterTimeout2 = await searchWooCustomerByEmail({
base,
consumerKey,
consumerSecret,
email,
timeout: getTimeout,
});
if (foundAfterTimeout2?.id) {
const total_ms = Date.now() - t0;
console.log("[perf] woo.createCustomer", {
tenantId,
wa_chat_id,
reused: true,
reason: "timeout_post_retry_get_recovered",
lock_wait_ms,
total_ms,
});
return { id: foundAfterTimeout2.id, raw: foundAfterTimeout2, reused: true };
}
}
throw err2;
}
}
throw err;
}
});
}
export async function getWooCustomerById({ tenantId, id }) {
if (!id) return null;
const encryptionKey = process.env.APP_ENCRYPTION_KEY;
if (!encryptionKey) throw new Error("APP_ENCRYPTION_KEY is required to decrypt Woo credentials");
const cfg = await getDecryptedTenantEcommerceConfig({
tenant_id: tenantId,
provider: "woo",
encryption_key: encryptionKey,
});
if (!cfg) throw new Error("Woo config not found for tenant");
const consumerKey =
cfg.consumer_key ||
process.env.WOO_CONSUMER_KEY ||
(() => {
throw new Error("consumer_key not set");
})();
const consumerSecret =
cfg.consumer_secret ||
process.env.WOO_CONSUMER_SECRET ||
(() => {
throw new Error("consumer_secret not set");
})();
const base = cfg.base_url.replace(/\/+$/, "");
const url = `${base}/customers/${encodeURIComponent(id)}`;
const auth = Buffer.from(`${consumerKey}:${consumerSecret}`).toString("base64");
try {
const data = await fetchWoo({
url,
method: "GET",
timeout: cfg.timeout_ms,
headers: { Authorization: `Basic ${auth}` },
});
return data;
} catch (err) {
if (err.status === 404) return null;
throw err;
}
}
export async function deleteWooCustomer({ tenantId, id, force = true }) {
if (!id) return { ok: false, error: "missing_id" };
const encryptionKey = process.env.APP_ENCRYPTION_KEY;
if (!encryptionKey) throw new Error("APP_ENCRYPTION_KEY is required to decrypt Woo credentials");
const cfg = await getDecryptedTenantEcommerceConfig({
tenant_id: tenantId,
provider: "woo",
encryption_key: encryptionKey,
});
if (!cfg) throw new Error("Woo config not found for tenant");
const consumerKey =
cfg.consumer_key ||
process.env.WOO_CONSUMER_KEY ||
(() => {
throw new Error("consumer_key not set");
})();
const consumerSecret =
cfg.consumer_secret ||
process.env.WOO_CONSUMER_SECRET ||
(() => {
throw new Error("consumer_secret not set");
})();
const base = cfg.base_url.replace(/\/+$/, "");
const auth = Buffer.from(`${consumerKey}:${consumerSecret}`).toString("base64");
const timeout = Math.max(cfg.timeout_ms ?? 20000, 20000);
const url = `${base}/customers/${encodeURIComponent(id)}${force ? "?force=true" : ""}`;
const data = await fetchWoo({
url,
method: "DELETE",
timeout,
headers: { Authorization: `Basic ${auth}` },
});
return { ok: true, raw: data };
}