From c243a6d8bec11c1d46427fc947b6da7eff3f310c Mon Sep 17 00:00:00 2001 From: keyan Date: Wed, 10 Jan 2024 09:50:42 -0600 Subject: [PATCH] robust lnd subscriptions and robust held recording --- worker/wallet.js | 177 ++++++++++++++++++++++++++--------------------- 1 file changed, 99 insertions(+), 78 deletions(-) diff --git a/worker/wallet.js b/worker/wallet.js index 816f9abc..b4128a5e 100644 --- a/worker/wallet.js +++ b/worker/wallet.js @@ -7,12 +7,35 @@ import { sendUserNotification } from '../api/webPush/index.js' import { msatsToSats, numWithUnits } from '../lib/format' import { INVOICE_RETENTION_DAYS } from '../lib/constants' import { sleep } from '../lib/time.js' +import retry from 'async-retry' export async function subscribeToWallet (args) { await subscribeToDeposits(args) await subscribeToWithdrawals(args) } +// lnd subscriptions can fail, so they need to be retried +function subscribeForever (subscribe) { + retry(async bail => { + let sub + try { + return await new Promise((resolve, reject) => { + sub = subscribe(resolve, bail) + if (!sub) { + return bail(new Error('function passed to subscribeForever must return a subscription object')) + } + sub.on('error', reject) + }) + } catch (error) { + throw new Error('error subscribing - trying again') + } finally { + sub?.removeAllListeners() + } + }, + // retry every .1-10 seconds forever + { forever: true, minTimeout: 100, maxTimeout: 10000, onRetry: e => console.error(e.message) }) +} + const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args) const logEventError = (name, error) => console.error(`error running ${name}`, error) @@ -25,69 +48,57 @@ async function subscribeToDeposits (args) { ORDER BY "confirmedIndex" DESC NULLS LAST LIMIT 1` - // https://www.npmjs.com/package/ln-service#subscribetoinvoices - const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex }) - sub.on('invoice_updated', async (inv) => { - try { - if (inv.secret) { - logEvent('invoice_updated', inv) - await checkInvoice({ data: { hash: inv.id }, ...args }) - } else { - // this is a HODL invoice. We need to use SubscribeToInvoice which has is_held transitions - // https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice - // SubscribeToInvoices is only for invoice creation and settlement transitions - // https://api.lightning.community/api/lnd/lightning/subscribe-invoices - await subscribeToHodlInvoice({ hash: inv.id, ...args }) + subscribeForever(() => { + const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex }) + + sub.on('invoice_updated', async (inv) => { + try { + if (inv.secret) { + logEvent('invoice_updated', inv) + await checkInvoice({ data: { hash: inv.id }, ...args }) + } else { + // this is a HODL invoice. We need to use SubscribeToInvoice which has is_held transitions + // https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice + // SubscribeToInvoices is only for invoice creation and settlement transitions + // https://api.lightning.community/api/lnd/lightning/subscribe-invoices + subscribeToHodlInvoice({ hash: inv.id, ...args }) + } + } catch (error) { + logEventError('invoice_updated', error) } - } catch (error) { - // XXX This is a critical error - // It might mean that we failed to record an invoice confirming - // and we won't get another chance to record it until restart - logEventError('invoice_updated', error) - } + }) + + return sub }) - sub.on('error', console.error) // check pending deposits as a redundancy in case we failed to record // an invoice_updated event await checkPendingDeposits(args) } -async function subscribeToHodlInvoice (args) { - const { lnd, hash, models } = args - let sub - try { - await new Promise((resolve, reject) => { - // https://www.npmjs.com/package/ln-service#subscribetoinvoice - sub = subscribeToInvoice({ id: hash, lnd }) - sub.on('invoice_updated', async (inv) => { - logEvent('hodl_invoice_updated', inv) - try { - // record the is_held transition - if (inv.is_held) { - // this is basically confirm_invoice without setting confirmed_at - // and without setting the user balance - // those will be set when the invoice is settled by user action - await models.invoice.update({ - where: { hash }, - data: { - msatsReceived: Number(inv.received_mtokens), - isHeld: true - } - }) - // after that we can stop listening for updates - resolve() - } - } catch (error) { - logEventError('hodl_invoice_updated', error) - reject(error) +function subscribeToHodlInvoice (args) { + const { lnd, hash } = args + + subscribeForever((resolve, reject) => { + const sub = subscribeToInvoice({ id: hash, lnd }) + + sub.on('invoice_updated', async (inv) => { + logEvent('hodl_invoice_updated', inv) + try { + // record the is_held transition + if (inv.is_held) { + await checkInvoice({ data: { hash: inv.id }, ...args }) + // after that we can stop listening for updates + resolve() } - }) - sub.on('error', reject) + } catch (error) { + logEventError('hodl_invoice_updated', error) + reject(error) + } }) - } finally { - sub?.removeAllListeners() - } + + return sub + }) } async function checkInvoice ({ data: { hash }, boss, models, lnd }) { @@ -123,6 +134,19 @@ async function checkInvoice ({ data: { hash }, boss, models, lnd }) { return await boss.send('nip57', { hash }) } + if (inv.is_held) { + // this is basically confirm_invoice without setting confirmed_at + // and without setting the user balance + // those will be set when the invoice is settled by user action + return await serialize(models, models.invoice.update({ + where: { hash }, + data: { + msatsReceived: Number(inv.received_mtokens), + isHeld: true + } + })) + } + if (inv.is_canceled) { return await serialize(models, models.invoice.update({ @@ -140,32 +164,29 @@ async function subscribeToWithdrawals (args) { const { lnd } = args // https://www.npmjs.com/package/ln-service#subscribetopayments - const sub = subscribeToPayments({ lnd }) - sub.on('confirmed', async (payment) => { - logEvent('confirmed', payment) - try { - await checkWithdrawal({ data: { hash: payment.id }, ...args }) - } catch (error) { - // XXX This is a critical error - // It might mean that we failed to record an invoice confirming - // and we won't get another chance to record it until restart - logEventError('confirmed', error) - } + subscribeForever(() => { + const sub = subscribeToPayments({ lnd }) + + sub.on('confirmed', async (payment) => { + logEvent('confirmed', payment) + try { + await checkWithdrawal({ data: { hash: payment.id }, ...args }) + } catch (error) { + logEventError('confirmed', error) + } + }) + + sub.on('failed', async (payment) => { + logEvent('failed', payment) + try { + await checkWithdrawal({ data: { hash: payment.id }, ...args }) + } catch (error) { + logEventError('failed', error) + } + }) + + return sub }) - sub.on('failed', async (payment) => { - logEvent('failed', payment) - try { - await checkWithdrawal({ data: { hash: payment.id }, ...args }) - } catch (error) { - // XXX This is a critical error - // It might mean that we failed to record an invoice confirming - // and we won't get another chance to record it until restart - logEventError('failed', error) - } - }) - // ignore payment attempts - sub.on('paying', (attempt) => {}) - sub.on('error', console.error) // check pending withdrawals since they might have been paid while worker was down await checkPendingWithdrawals(args)