Files
botino/src/modules/2-identity/db/repo.js
2026-01-18 18:28:28 -03:00

784 lines
23 KiB
JavaScript

import { pool } from "../../shared/db/pool.js";
export async function ensureTenant({ key, name }) {
const q = `
insert into tenants (key, name)
values ($1, $2)
on conflict (key) do update set name = excluded.name
returning id
`;
const { rows } = await pool.query(q, [key, name]);
return rows[0].id;
}
export async function getConversationState(tenant_id, wa_chat_id) {
const q = `
select tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at
from wa_conversation_state
where tenant_id=$1 and wa_chat_id=$2
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function upsertConversationState({
tenant_id,
wa_chat_id,
state,
last_intent = null,
last_order_id = null,
context = {},
}) {
const q = `
insert into wa_conversation_state
(tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, now())
on conflict (tenant_id, wa_chat_id)
do update set
state = excluded.state,
state_updated_at = now(),
last_intent = excluded.last_intent,
last_order_id = excluded.last_order_id,
context = excluded.context,
updated_at = now()
returning tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at, updated_at
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
state,
last_intent,
last_order_id,
JSON.stringify(context ?? {}),
]);
return rows[0];
}
// Crea la conversación si no existe y, si existe, solo “toca” updated_at (no pisa state/context).
export async function touchConversationState({ tenant_id, wa_chat_id }) {
const q = `
insert into wa_conversation_state
(tenant_id, wa_chat_id, state, state_updated_at, last_intent, last_order_id, context, updated_at)
values
($1, $2, 'IDLE', now(), 'other', null, '{}'::jsonb, now())
on conflict (tenant_id, wa_chat_id)
do update set
updated_at = now()
returning tenant_id, wa_chat_id, state, last_intent, last_order_id, context, state_updated_at, updated_at, created_at
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function insertMessage({
tenant_id,
wa_chat_id,
provider,
message_id,
direction, // in|out
text = null,
payload = {},
run_id = null,
ts = null,
}) {
const q = `
insert into wa_messages
(tenant_id, wa_chat_id, provider, message_id, direction, ts, text, payload, run_id)
values
($1, $2, $3, $4, $5, coalesce($6::timestamptz, now()), $7, $8::jsonb, $9)
on conflict (tenant_id, provider, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
provider,
message_id,
direction,
ts,
text,
JSON.stringify(payload ?? {}),
run_id,
]);
return rows[0]?.id || null;
}
export async function insertRun({
tenant_id,
wa_chat_id,
message_id,
prev_state = null,
user_text = null,
llm_output = null,
tools = [],
invariants = {},
final_reply = null,
order_id = null,
payment_link = null,
status = "ok",
error_code = null,
error_detail = null,
latency_ms = null,
}) {
const q = `
insert into conversation_runs
(tenant_id, wa_chat_id, message_id, ts, prev_state, user_text, llm_output, tools, invariants,
final_reply, order_id, payment_link, status, error_code, error_detail, latency_ms)
values
($1, $2, $3, now(), $4, $5, $6::jsonb, $7::jsonb, $8::jsonb,
$9, $10, $11, $12, $13, $14, $15)
on conflict (tenant_id, message_id) do nothing
returning id
`;
const { rows } = await pool.query(q, [
tenant_id,
wa_chat_id,
message_id,
prev_state,
user_text,
llm_output ? JSON.stringify(llm_output) : null,
JSON.stringify(tools ?? []),
JSON.stringify(invariants ?? {}),
final_reply,
order_id,
payment_link,
status,
error_code,
error_detail,
latency_ms,
]);
return rows[0]?.id || null;
}
export async function updateRunLatency({ tenant_id, run_id, latency_ms }) {
if (!tenant_id || !run_id) return false;
const q = `
update conversation_runs
set latency_ms = $3
where tenant_id = $1 and id = $2
`;
await pool.query(q, [tenant_id, run_id, latency_ms]);
return true;
}
export async function listConversations({ tenant_id, q = "", status = "", state = "", limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (q) {
params.push(`%${q}%`);
where += ` and wa_chat_id ilike $${params.length}`;
}
if (status) {
// status derivado no implementado en MVP
}
if (state) {
params.push(state);
where += ` and state = $${params.length}`;
}
const qsql = `
select tenant_id, wa_chat_id,
state,
coalesce(last_intent,'other') as intent,
updated_at as last_activity
from wa_conversation_state
${where}
order by updated_at desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(qsql, params);
return rows.map((r) => ({
chat_id: r.wa_chat_id,
from: r.wa_chat_id.replace(/^sim:/, ""),
state: r.state,
intent: r.intent,
status: "ok",
last_activity: r.last_activity,
last_run_id: null,
}));
}
export async function listRuns({ tenant_id, wa_chat_id = null, limit = 50 }) {
const params = [tenant_id];
let where = `where tenant_id=$1`;
if (wa_chat_id) {
params.push(wa_chat_id);
where += ` and wa_chat_id=$${params.length}`;
}
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
${where}
order by ts desc
limit ${Math.max(1, Math.min(200, limit))}
`;
const { rows } = await pool.query(q, params);
return rows.map((r) => ({
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
}));
}
export async function getRunById({ tenant_id, run_id }) {
const q = `
select id as run_id, ts, wa_chat_id as chat_id,
status, prev_state, user_text,
llm_output, tools, invariants,
final_reply, order_id, payment_link, latency_ms
from conversation_runs
where tenant_id=$1 and id=$2
`;
const { rows } = await pool.query(q, [tenant_id, run_id]);
const r = rows[0];
if (!r) return null;
return {
run_id: r.run_id,
ts: r.ts,
chat_id: r.chat_id,
from: r.chat_id.replace(/^sim:/, ""),
status: r.status,
prev_state: r.prev_state,
input: { text: r.user_text },
llm_output: r.llm_output,
tools: r.tools,
invariants: r.invariants,
final_reply: r.final_reply,
order_id: r.order_id,
payment_link: r.payment_link,
latency_ms: r.latency_ms,
};
}
export async function getRecentMessagesForLLM({
tenant_id,
wa_chat_id,
}) {
const limRaw = parseInt(process.env.LIMIT_CONVERSATIONS || "", 10);
const maxCharsRaw = parseInt(process.env.MAX_CHARS_PER_MESSAGE || "", 10);
if (!Number.isFinite(limRaw) || limRaw <= 0) {
throw new Error("LIMIT_CONVERSATIONS env is required and must be a positive integer");
}
if (!Number.isFinite(maxCharsRaw) || maxCharsRaw <= 0) {
throw new Error("MAX_CHARS_PER_MESSAGE env is required and must be a positive integer");
}
const lim = Math.max(1, Math.min(50, limRaw));
const q = `
select direction, ts, text
from wa_messages
where tenant_id=$1
and wa_chat_id=$2
and text is not null
and length(trim(text)) > 0
order by ts desc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
return rows.reverse().map((r) => ({
role: r.direction === "in" ? "user" : "assistant",
content: String(r.text).trim().slice(0, maxCharsRaw),
}));
}
export async function listMessages({ tenant_id, wa_chat_id, limit = 200 }) {
const lim = Math.max(1, Math.min(500, parseInt(limit, 10) || 200));
const q = `
select provider, message_id, direction, ts, text, payload, run_id
from wa_messages
where tenant_id=$1 and wa_chat_id=$2
order by ts asc
limit $3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, lim]);
return rows.map((r) => ({
provider: r.provider,
message_id: r.message_id,
direction: r.direction,
ts: r.ts,
text: r.text,
payload: r.payload,
run_id: r.run_id,
}));
}
export async function deleteConversationData({ tenant_id, wa_chat_id }) {
const client = await pool.connect();
try {
await client.query("BEGIN");
const r1 = await client.query(`delete from wa_messages where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
const r2 = await client.query(`delete from conversation_runs where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
const r3 = await client.query(`delete from wa_conversation_state where tenant_id=$1 and wa_chat_id=$2`, [
tenant_id,
wa_chat_id,
]);
await client.query("COMMIT");
return { ok: true, deleted: { messages: r1.rowCount, runs: r2.rowCount, state: r3.rowCount } };
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
}
export async function listUsers({ tenant_id, q = "", limit = 200 }) {
const lim = Math.max(1, Math.min(500, parseInt(limit, 10) || 200));
const qstr = String(q || "").trim();
// Lista de “usuarios” = conversaciones existentes (wa_conversation_state), con join al mapping Woo si existe.
// Esto permite ver usuarios aunque nunca se haya creado el customer en Woo.
const sql = `
select *
from (
select s.wa_chat_id,
'woo' as provider,
m.external_customer_id,
lastmsg.ts as last_ts,
nullif(coalesce(lastmsg.payload #>> '{raw,meta,pushName}', lastmsg.payload #>> '{raw,meta,pushname}', ''), '') as push_name
from wa_conversation_state s
left join wa_identity_map m
on m.tenant_id = s.tenant_id
and m.wa_chat_id = s.wa_chat_id
and m.provider = 'woo'
left join lateral (
select ts, payload
from wa_messages
where tenant_id = s.tenant_id
and wa_chat_id = s.wa_chat_id
and direction = 'in'
order by ts desc
limit 1
) lastmsg on true
where s.tenant_id = $1
) t
where ($2 = '' or t.wa_chat_id ilike $3 or coalesce(t.push_name,'') ilike $3)
order by coalesce(t.last_ts, now()) desc
limit $4
`;
const like = qstr ? `%${qstr}%` : "";
const { rows } = await pool.query(sql, [tenant_id, qstr, like, lim]);
return rows.map((r) => ({
chat_id: r.wa_chat_id,
provider: r.provider,
external_customer_id: r.external_customer_id,
push_name: r.push_name || null,
last_ts: r.last_ts || null,
}));
}
export async function getLastInboundMessage({ tenant_id, wa_chat_id }) {
const q = `
select provider, message_id, ts, text, payload
from wa_messages
where tenant_id=$1 and wa_chat_id=$2 and direction='in'
order by ts desc
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id]);
return rows[0] || null;
}
export async function cleanupLastRunForRetry({ tenant_id, wa_chat_id }) {
const client = await pool.connect();
try {
await client.query("BEGIN");
const { rows } = await client.query(
`
select id
from conversation_runs
where tenant_id=$1 and wa_chat_id=$2
order by ts desc
limit 1
`,
[tenant_id, wa_chat_id]
);
const run_id = rows[0]?.id || null;
if (!run_id) {
await client.query("COMMIT");
return { ok: true, run_id: null, deleted_out_messages: 0, deleted_runs: 0 };
}
const r1 = await client.query(
`delete from wa_messages where tenant_id=$1 and wa_chat_id=$2 and run_id=$3 and direction='out'`,
[tenant_id, wa_chat_id, run_id]
);
const r2 = await client.query(`delete from conversation_runs where tenant_id=$1 and id=$2`, [tenant_id, run_id]);
await client.query("COMMIT");
return { ok: true, run_id, deleted_out_messages: r1.rowCount || 0, deleted_runs: r2.rowCount || 0 };
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
}
export async function getIdentityMapByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `
select tenant_id, wa_chat_id, provider, external_customer_id, created_at, updated_at
from wa_identity_map
where tenant_id=$1 and wa_chat_id=$2 and provider=$3
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rows[0] || null;
}
export async function deleteIdentityMapByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `delete from wa_identity_map where tenant_id=$1 and wa_chat_id=$2 and provider=$3`;
const { rowCount } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rowCount || 0;
}
export async function getTenantByKey(key) {
const { rows } = await pool.query(`select id, key, name from tenants where key=$1`, [key]);
return rows[0] || null;
}
export async function getTenantIdByChannel({ channel_type, channel_key }) {
const q = `
select tenant_id
from tenant_channels
where channel_type=$1 and channel_key=$2
`;
const { rows } = await pool.query(q, [channel_type, channel_key]);
return rows[0]?.tenant_id || null;
}
export async function getExternalCustomerIdByChat({ tenant_id, wa_chat_id, provider = "woo" }) {
const q = `
select external_customer_id
from wa_identity_map
where tenant_id=$1 and wa_chat_id=$2 and provider=$3
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider]);
return rows[0]?.external_customer_id || null;
}
export async function upsertExternalCustomerMap({
tenant_id,
wa_chat_id,
external_customer_id,
provider = "woo",
}) {
const q = `
insert into wa_identity_map (tenant_id, wa_chat_id, provider, external_customer_id, created_at, updated_at)
values ($1, $2, $3, $4, now(), now())
on conflict (tenant_id, wa_chat_id)
do update set external_customer_id = excluded.external_customer_id, updated_at = now()
returning external_customer_id
`;
const { rows } = await pool.query(q, [tenant_id, wa_chat_id, provider, external_customer_id]);
return rows[0]?.external_customer_id || null;
}
export async function getTenantEcommerceConfig({ tenant_id, provider = "woo" }) {
const q = `
select id, tenant_id, provider, base_url, credential_ref, api_version, timeout_ms,
enc_consumer_key, enc_consumer_secret, encryption_salt, enabled
from tenant_ecommerce_config
where tenant_id = $1 and provider = $2 and enabled = true
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, provider]);
return rows[0] || null;
}
export async function getDecryptedTenantEcommerceConfig({
tenant_id,
provider = "woo",
encryption_key,
}) {
if (!encryption_key) {
throw new Error("encryption_key is required to decrypt ecommerce credentials");
}
const q = `
select id, tenant_id, provider, base_url, credential_ref, api_version, timeout_ms, enabled,
pgp_sym_decrypt(enc_consumer_key, $3)::text as consumer_key,
pgp_sym_decrypt(enc_consumer_secret, $3)::text as consumer_secret
from tenant_ecommerce_config
where tenant_id = $1 and provider = $2 and enabled = true
limit 1
`;
const { rows } = await pool.query(q, [tenant_id, provider, encryption_key]);
return rows[0] || null;
}
export async function searchProductAliases({ tenant_id, q = "", limit = 20 }) {
const lim = Math.max(1, Math.min(200, parseInt(limit, 10) || 20));
const query = String(q || "").trim();
if (!query) return [];
const normalized = query.toLowerCase();
const like = `%${query}%`;
const nlike = `%${normalized}%`;
const sql = `
select tenant_id, alias, normalized_alias, woo_product_id, category_hint, boost, metadata, updated_at
from product_aliases
where tenant_id=$1
and (alias ilike $2 or normalized_alias ilike $3)
order by boost desc, updated_at desc
limit $4
`;
const { rows } = await pool.query(sql, [tenant_id, like, nlike, lim]);
return rows.map((r) => ({
tenant_id: r.tenant_id,
alias: r.alias,
normalized_alias: r.normalized_alias,
woo_product_id: r.woo_product_id,
category_hint: r.category_hint,
boost: r.boost,
metadata: r.metadata,
updated_at: r.updated_at,
}));
}
export async function getRecoRules({ tenant_id }) {
const sql = `
select id, tenant_id, rule_key, trigger, queries, boosts, ask_slots, active, priority,
trigger_product_ids, recommended_product_ids, created_at, updated_at
from product_reco_rules
where tenant_id=$1 and active=true
order by priority asc, id asc
`;
const { rows } = await pool.query(sql, [tenant_id]);
return rows;
}
/**
* Buscar reglas que tengan alguno de los productos como trigger.
*/
export async function getRecoRulesByProductIds({ tenant_id, product_ids = [] }) {
if (!product_ids?.length) return [];
const sql = `
select id, tenant_id, rule_key, trigger, queries, boosts, ask_slots, active, priority,
trigger_product_ids, recommended_product_ids, created_at, updated_at
from product_reco_rules
where tenant_id=$1 and active=true and trigger_product_ids && $2::int[]
order by priority asc, id asc
`;
const { rows } = await pool.query(sql, [tenant_id, product_ids]);
return rows;
}
export async function getRecoRuleByKey({ tenant_id, rule_key }) {
const sql = `
select id, tenant_id, rule_key, trigger, queries, boosts, ask_slots, active, priority,
trigger_product_ids, recommended_product_ids, rule_type, trigger_event, created_at, updated_at
from product_reco_rules
where tenant_id=$1 and rule_key=$2
limit 1
`;
const { rows } = await pool.query(sql, [tenant_id, rule_key]);
return rows[0] || null;
}
/**
* Obtener reglas de qty_per_person por tipo de evento (asado, horno, etc.)
* DEPRECATED: Usar getProductQtyRulesByEvent en su lugar
*/
export async function getQtyPerPersonRules({ tenant_id, event_type }) {
const sql = `
select r.id, r.rule_key, r.rule_type, r.trigger_event, r.priority,
json_agg(json_build_object(
'woo_product_id', i.woo_product_id,
'audience_type', i.audience_type,
'qty_per_person', i.qty_per_person,
'unit', i.unit,
'reason', i.reason,
'display_order', i.display_order
) order by i.display_order) as items
from product_reco_rules r
inner join reco_rule_items i on i.rule_id = r.id
where r.tenant_id = $1
and r.active = true
and r.rule_type = 'qty_per_person'
and (r.trigger_event = $2 or r.trigger_event is null)
group by r.id, r.rule_key, r.rule_type, r.trigger_event, r.priority
order by
case when r.trigger_event = $2 then 0 else 1 end,
r.priority asc
`;
const { rows } = await pool.query(sql, [tenant_id, event_type]);
return rows;
}
/**
* Obtener reglas de cantidad por evento desde la nueva tabla product_qty_rules
*/
export async function getProductQtyRulesByEvent({ tenant_id, event_type }) {
const sql = `
select woo_product_id, event_type, person_type, qty_per_person, unit
from product_qty_rules
where tenant_id = $1 and event_type = $2
order by woo_product_id, person_type
`;
const { rows } = await pool.query(sql, [tenant_id, event_type]);
return rows;
}
/**
* Obtener items de una regla específica con detalles
*/
export async function getRecoRuleItems({ rule_id }) {
const sql = `
select id, rule_id, woo_product_id, audience_type, qty_per_person, unit, reason, display_order
from reco_rule_items
where rule_id = $1
order by display_order asc
`;
const { rows } = await pool.query(sql, [rule_id]);
return rows;
}
/**
* Obtener productos mapeados a un alias con scores
*/
export async function getAliasProductMappings({ tenant_id, alias }) {
const normalizedAlias = String(alias || "").toLowerCase().trim();
if (!normalizedAlias) return [];
const sql = `
select woo_product_id, score
from alias_product_mappings
where tenant_id = $1 and alias = $2
order by score desc
`;
const { rows } = await pool.query(sql, [tenant_id, normalizedAlias]);
return rows;
}
/**
* Obtener todos los mappings de alias para un tenant (para búsqueda)
*/
export async function getAllAliasProductMappings({ tenant_id }) {
const sql = `
select alias, woo_product_id, score
from alias_product_mappings
where tenant_id = $1
order by alias, score desc
`;
const { rows } = await pool.query(sql, [tenant_id]);
return rows;
}
export async function getProductEmbedding({ tenant_id, content_hash }) {
const sql = `
select tenant_id, content_hash, content_text, embedding, model, updated_at
from product_embeddings_cache
where tenant_id=$1 and content_hash=$2
limit 1
`;
const { rows } = await pool.query(sql, [tenant_id, content_hash]);
return rows[0] || null;
}
export async function upsertProductEmbedding({
tenant_id,
content_hash,
content_text,
embedding,
model,
}) {
const sql = `
insert into product_embeddings_cache
(tenant_id, content_hash, content_text, embedding, model, updated_at)
values
($1, $2, $3, $4::jsonb, $5, now())
on conflict (tenant_id, content_hash)
do update set
content_text = excluded.content_text,
embedding = excluded.embedding,
model = excluded.model,
updated_at = now()
returning tenant_id, content_hash, content_text, embedding, model, updated_at
`;
const { rows } = await pool.query(sql, [
tenant_id,
content_hash,
content_text,
JSON.stringify(embedding ?? []),
model,
]);
return rows[0] || null;
}
export async function upsertMpPayment({
tenant_id,
woo_order_id = null,
preference_id = null,
payment_id = null,
status = null,
paid_at = null,
raw = {},
}) {
if (!payment_id) throw new Error("payment_id_required");
const sql = `
insert into mp_payments
(tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, created_at, updated_at)
values
($1, $2, $3, $4, $5, $6::timestamptz, $7::jsonb, now(), now())
on conflict (tenant_id, payment_id)
do update set
woo_order_id = excluded.woo_order_id,
preference_id = excluded.preference_id,
status = excluded.status,
paid_at = excluded.paid_at,
raw = excluded.raw,
updated_at = now()
returning tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, updated_at
`;
const { rows } = await pool.query(sql, [
tenant_id,
woo_order_id,
preference_id,
payment_id,
status,
paid_at,
JSON.stringify(raw ?? {}),
]);
return rows[0] || null;
}
export async function getMpPaymentById({ tenant_id, payment_id }) {
const sql = `
select tenant_id, woo_order_id, preference_id, payment_id, status, paid_at, raw, updated_at
from mp_payments
where tenant_id=$1 and payment_id=$2
limit 1
`;
const { rows } = await pool.query(sql, [tenant_id, payment_id]);
return rows[0] || null;
}