robust lnd subscriptions and robust held recording
This commit is contained in:
		
							parent
							
								
									df1edd5b79
								
							
						
					
					
						commit
						c243a6d8be
					
				@ -7,12 +7,35 @@ import { sendUserNotification } from '../api/webPush/index.js'
 | 
				
			|||||||
import { msatsToSats, numWithUnits } from '../lib/format'
 | 
					import { msatsToSats, numWithUnits } from '../lib/format'
 | 
				
			||||||
import { INVOICE_RETENTION_DAYS } from '../lib/constants'
 | 
					import { INVOICE_RETENTION_DAYS } from '../lib/constants'
 | 
				
			||||||
import { sleep } from '../lib/time.js'
 | 
					import { sleep } from '../lib/time.js'
 | 
				
			||||||
 | 
					import retry from 'async-retry'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
export async function subscribeToWallet (args) {
 | 
					export async function subscribeToWallet (args) {
 | 
				
			||||||
  await subscribeToDeposits(args)
 | 
					  await subscribeToDeposits(args)
 | 
				
			||||||
  await subscribeToWithdrawals(args)
 | 
					  await subscribeToWithdrawals(args)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// lnd subscriptions can fail, so they need to be retried
 | 
				
			||||||
 | 
					function subscribeForever (subscribe) {
 | 
				
			||||||
 | 
					  retry(async bail => {
 | 
				
			||||||
 | 
					    let sub
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      return await new Promise((resolve, reject) => {
 | 
				
			||||||
 | 
					        sub = subscribe(resolve, bail)
 | 
				
			||||||
 | 
					        if (!sub) {
 | 
				
			||||||
 | 
					          return bail(new Error('function passed to subscribeForever must return a subscription object'))
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        sub.on('error', reject)
 | 
				
			||||||
 | 
					      })
 | 
				
			||||||
 | 
					    } catch (error) {
 | 
				
			||||||
 | 
					      throw new Error('error subscribing - trying again')
 | 
				
			||||||
 | 
					    } finally {
 | 
				
			||||||
 | 
					      sub?.removeAllListeners()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  },
 | 
				
			||||||
 | 
					  // retry every .1-10 seconds forever
 | 
				
			||||||
 | 
					  { forever: true, minTimeout: 100, maxTimeout: 10000, onRetry: e => console.error(e.message) })
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args)
 | 
					const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args)
 | 
				
			||||||
const logEventError = (name, error) => console.error(`error running ${name}`, error)
 | 
					const logEventError = (name, error) => console.error(`error running ${name}`, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -25,8 +48,9 @@ async function subscribeToDeposits (args) {
 | 
				
			|||||||
    ORDER BY "confirmedIndex" DESC NULLS LAST
 | 
					    ORDER BY "confirmedIndex" DESC NULLS LAST
 | 
				
			||||||
    LIMIT 1`
 | 
					    LIMIT 1`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // https://www.npmjs.com/package/ln-service#subscribetoinvoices
 | 
					  subscribeForever(() => {
 | 
				
			||||||
    const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex })
 | 
					    const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sub.on('invoice_updated', async (inv) => {
 | 
					    sub.on('invoice_updated', async (inv) => {
 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        if (inv.secret) {
 | 
					        if (inv.secret) {
 | 
				
			||||||
@ -37,44 +61,33 @@ async function subscribeToDeposits (args) {
 | 
				
			|||||||
          // https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice
 | 
					          // https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice
 | 
				
			||||||
          // SubscribeToInvoices is only for invoice creation and settlement transitions
 | 
					          // SubscribeToInvoices is only for invoice creation and settlement transitions
 | 
				
			||||||
          // https://api.lightning.community/api/lnd/lightning/subscribe-invoices
 | 
					          // https://api.lightning.community/api/lnd/lightning/subscribe-invoices
 | 
				
			||||||
        await subscribeToHodlInvoice({ hash: inv.id, ...args })
 | 
					          subscribeToHodlInvoice({ hash: inv.id, ...args })
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      } catch (error) {
 | 
					      } catch (error) {
 | 
				
			||||||
      // XXX This is a critical error
 | 
					 | 
				
			||||||
      // It might mean that we failed to record an invoice confirming
 | 
					 | 
				
			||||||
      // and we won't get another chance to record it until restart
 | 
					 | 
				
			||||||
        logEventError('invoice_updated', error)
 | 
					        logEventError('invoice_updated', error)
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
  sub.on('error', console.error)
 | 
					
 | 
				
			||||||
 | 
					    return sub
 | 
				
			||||||
 | 
					  })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // check pending deposits as a redundancy in case we failed to record
 | 
					  // check pending deposits as a redundancy in case we failed to record
 | 
				
			||||||
  // an invoice_updated event
 | 
					  // an invoice_updated event
 | 
				
			||||||
  await checkPendingDeposits(args)
 | 
					  await checkPendingDeposits(args)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async function subscribeToHodlInvoice (args) {
 | 
					function subscribeToHodlInvoice (args) {
 | 
				
			||||||
  const { lnd, hash, models } = args
 | 
					  const { lnd, hash } = args
 | 
				
			||||||
  let sub
 | 
					
 | 
				
			||||||
  try {
 | 
					  subscribeForever((resolve, reject) => {
 | 
				
			||||||
    await new Promise((resolve, reject) => {
 | 
					    const sub = subscribeToInvoice({ id: hash, lnd })
 | 
				
			||||||
      // https://www.npmjs.com/package/ln-service#subscribetoinvoice
 | 
					
 | 
				
			||||||
      sub = subscribeToInvoice({ id: hash, lnd })
 | 
					 | 
				
			||||||
    sub.on('invoice_updated', async (inv) => {
 | 
					    sub.on('invoice_updated', async (inv) => {
 | 
				
			||||||
      logEvent('hodl_invoice_updated', inv)
 | 
					      logEvent('hodl_invoice_updated', inv)
 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        // record the is_held transition
 | 
					        // record the is_held transition
 | 
				
			||||||
        if (inv.is_held) {
 | 
					        if (inv.is_held) {
 | 
				
			||||||
            // this is basically confirm_invoice without setting confirmed_at
 | 
					          await checkInvoice({ data: { hash: inv.id }, ...args })
 | 
				
			||||||
            // and without setting the user balance
 | 
					 | 
				
			||||||
            // those will be set when the invoice is settled by user action
 | 
					 | 
				
			||||||
            await models.invoice.update({
 | 
					 | 
				
			||||||
              where: { hash },
 | 
					 | 
				
			||||||
              data: {
 | 
					 | 
				
			||||||
                msatsReceived: Number(inv.received_mtokens),
 | 
					 | 
				
			||||||
                isHeld: true
 | 
					 | 
				
			||||||
              }
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
          // after that we can stop listening for updates
 | 
					          // after that we can stop listening for updates
 | 
				
			||||||
          resolve()
 | 
					          resolve()
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -83,11 +96,9 @@ async function subscribeToHodlInvoice (args) {
 | 
				
			|||||||
        reject(error)
 | 
					        reject(error)
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
      sub.on('error', reject)
 | 
					
 | 
				
			||||||
 | 
					    return sub
 | 
				
			||||||
  })
 | 
					  })
 | 
				
			||||||
  } finally {
 | 
					 | 
				
			||||||
    sub?.removeAllListeners()
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
 | 
					async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
 | 
				
			||||||
@ -123,6 +134,19 @@ async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
 | 
				
			|||||||
    return await boss.send('nip57', { hash })
 | 
					    return await boss.send('nip57', { hash })
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (inv.is_held) {
 | 
				
			||||||
 | 
					    // this is basically confirm_invoice without setting confirmed_at
 | 
				
			||||||
 | 
					    // and without setting the user balance
 | 
				
			||||||
 | 
					    // those will be set when the invoice is settled by user action
 | 
				
			||||||
 | 
					    return await serialize(models, models.invoice.update({
 | 
				
			||||||
 | 
					      where: { hash },
 | 
				
			||||||
 | 
					      data: {
 | 
				
			||||||
 | 
					        msatsReceived: Number(inv.received_mtokens),
 | 
				
			||||||
 | 
					        isHeld: true
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }))
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (inv.is_canceled) {
 | 
					  if (inv.is_canceled) {
 | 
				
			||||||
    return await serialize(models,
 | 
					    return await serialize(models,
 | 
				
			||||||
      models.invoice.update({
 | 
					      models.invoice.update({
 | 
				
			||||||
@ -140,32 +164,29 @@ async function subscribeToWithdrawals (args) {
 | 
				
			|||||||
  const { lnd } = args
 | 
					  const { lnd } = args
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // https://www.npmjs.com/package/ln-service#subscribetopayments
 | 
					  // https://www.npmjs.com/package/ln-service#subscribetopayments
 | 
				
			||||||
 | 
					  subscribeForever(() => {
 | 
				
			||||||
    const sub = subscribeToPayments({ lnd })
 | 
					    const sub = subscribeToPayments({ lnd })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sub.on('confirmed', async (payment) => {
 | 
					    sub.on('confirmed', async (payment) => {
 | 
				
			||||||
      logEvent('confirmed', payment)
 | 
					      logEvent('confirmed', payment)
 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        await checkWithdrawal({ data: { hash: payment.id }, ...args })
 | 
					        await checkWithdrawal({ data: { hash: payment.id }, ...args })
 | 
				
			||||||
      } catch (error) {
 | 
					      } catch (error) {
 | 
				
			||||||
      // XXX This is a critical error
 | 
					 | 
				
			||||||
      // It might mean that we failed to record an invoice confirming
 | 
					 | 
				
			||||||
      // and we won't get another chance to record it until restart
 | 
					 | 
				
			||||||
        logEventError('confirmed', error)
 | 
					        logEventError('confirmed', error)
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sub.on('failed', async (payment) => {
 | 
					    sub.on('failed', async (payment) => {
 | 
				
			||||||
      logEvent('failed', payment)
 | 
					      logEvent('failed', payment)
 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        await checkWithdrawal({ data: { hash: payment.id }, ...args })
 | 
					        await checkWithdrawal({ data: { hash: payment.id }, ...args })
 | 
				
			||||||
      } catch (error) {
 | 
					      } catch (error) {
 | 
				
			||||||
      // XXX This is a critical error
 | 
					 | 
				
			||||||
      // It might mean that we failed to record an invoice confirming
 | 
					 | 
				
			||||||
      // and we won't get another chance to record it until restart
 | 
					 | 
				
			||||||
        logEventError('failed', error)
 | 
					        logEventError('failed', error)
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
  // ignore payment attempts
 | 
					
 | 
				
			||||||
  sub.on('paying', (attempt) => {})
 | 
					    return sub
 | 
				
			||||||
  sub.on('error', console.error)
 | 
					  })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // check pending withdrawals since they might have been paid while worker was down
 | 
					  // check pending withdrawals since they might have been paid while worker was down
 | 
				
			||||||
  await checkPendingWithdrawals(args)
 | 
					  await checkPendingWithdrawals(args)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user