Use LND subscriptions (#726)

* Use parallel invoice subscriptions

* Fix missing idempotency

* Log error

* Use cursor for invoice subscription

* Subscribe to outgoing payments for withdrawals

* Add TODO comments regarding migration to LND subscriptions

* Also use isPoll variable in checkInvoice

* Queue status check of pending withdrawals

* Use for loop to check pending withdrawals

* Reconnect to LND gRPC API on error

* Fix hash modified of applied migrations

* Separate wallet code from worker index

* refactor subscription code some more

* remove unnecessary subWrapper abstraction
* move all wallet related code into worker/wallet.js such that only a single import is needed in worker/index.js

* Migrate from polling to LND subscriptions

* Remove unnecessary reconnect code

* Add FIXME

* Add listener for HODL invoice updates

* Remove obsolete comment

* Update README

* Add job to cancel hodl invoice if expired

* Fix missing else

* small bug fixes and readability enhancements

* refine and add periodic redundant deposit/withdrawal checks

---------

Co-authored-by: ekzyis <ek@stacker.news>
Co-authored-by: Keyan <34140557+huumn@users.noreply.github.com>
Co-authored-by: keyan <keyan.kousha+huumn@gmail.com>
This commit is contained in:
ekzyis 2024-01-08 23:37:58 +01:00 committed by GitHub
parent cb076eca77
commit 2151323c8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 314 additions and 69 deletions

View File

@ -34,7 +34,7 @@ To configure the image proxy, you will need to set the `IMGPROXY_` env vars. `NE
The site is written in javascript using Next.js, a React framework. The backend API is provided via GraphQL. The database is PostgreSQL modeled with Prisma. The job queue is also maintained in PostgreSQL. We use lnd for our lightning node. A customized Bootstrap theme is used for styling.
# processes
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. polling lnd for invoice/payment status
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. processing images.
# wallet transaction safety
To ensure stackers balances are kept sane, all wallet updates are run in serializable transactions at the database level. Because prisma has relatively poor support for transactions all wallet touching code is written in plpgsql stored procedures and can be found in the prisma/migrations folder.

View File

@ -77,9 +77,8 @@ export async function serializeInvoicable (query, { models, lnd, hash, hmac, me,
if (hash) {
invoice = await checkInvoice(models, hash, hmac, enforceFee)
trx = [
models.$queryRaw`UPDATE users SET msats = msats + ${invoice.msatsReceived} WHERE id = ${invoice.user.id}`,
...trx,
models.invoice.update({ where: { hash: invoice.hash }, data: { confirmedAt: new Date() } })
models.$executeRaw`SELECT confirm_invoice(${hash}, ${invoice.msatsReceived})`,
...trx
]
}

View File

@ -297,12 +297,10 @@ export default {
console.log('invoice', balanceLimit)
const [inv] = await serialize(models,
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request},
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${hodlInvoice ? invoice.secret : null}::TEXT, ${invoice.request},
${expiresAt}::timestamp, ${amount * 1000}, ${user.id}::INTEGER, ${description}, NULL, NULL,
${invLimit}::INTEGER, ${balanceLimit})`)
if (hodlInvoice) await models.invoice.update({ where: { hash: invoice.id }, data: { preimage: invoice.secret } })
// the HMAC is only returned during invoice creation
// this makes sure that only the person who created this invoice
// has access to the HMAC

View File

@ -31,10 +31,10 @@ export const MAX_TERRITORY_DESC_LENGTH = 1000 // 1k
export const MAX_POLL_CHOICE_LENGTH = 40
export const ITEM_SPAM_INTERVAL = '10m'
export const ANON_ITEM_SPAM_INTERVAL = '0'
export const INV_PENDING_LIMIT = 10
export const INV_PENDING_LIMIT = 100
export const BALANCE_LIMIT_MSATS = 250000000 // 250k sat
export const SN_USER_IDS = [616, 6030, 946, 4502]
export const ANON_INV_PENDING_LIMIT = 100
export const ANON_INV_PENDING_LIMIT = 1000
export const ANON_BALANCE_LIMIT_MSATS = 0 // disable
export const MAX_POLL_NUM_CHOICES = 10
export const MIN_POLL_NUM_CHOICES = 2

View File

@ -81,7 +81,7 @@ export default async ({ query: { username, amount, nostr, comment, payerdata: pa
})
await serialize(models,
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request},
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, NULL, ${invoice.request},
${expiresAt}::timestamp, ${Number(amount)}, ${user.id}::INTEGER, ${noteStr || description},
${comment || null}, ${parsedPayerData || null}::JSONB, ${INV_PENDING_LIMIT}::INTEGER, ${BALANCE_LIMIT_MSATS})`)

View File

@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "Invoice" ADD COLUMN "confirmedIndex" BIGINT;
-- CreateIndex
CREATE INDEX "Invoice.confirmedIndex_index" ON "Invoice"("confirmedIndex");

View File

@ -0,0 +1,98 @@
-- remove 'checkInvoice' job insertion since we're using LND subscriptions now
-- also allow function to take preimage as an argument
DROP FUNCTION IF EXISTS create_invoice(hash TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone,
msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT);
CREATE OR REPLACE FUNCTION create_invoice(hash TEXT, preimage TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone,
msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT)
RETURNS "Invoice"
LANGUAGE plpgsql
AS $$
DECLARE
invoice "Invoice";
inv_limit_reached BOOLEAN;
balance_limit_reached BOOLEAN;
inv_pending_msats BIGINT;
BEGIN
PERFORM ASSERT_SERIALIZED();
-- prevent too many pending invoices
SELECT inv_limit > 0 AND count(*) >= inv_limit, COALESCE(sum("msatsRequested"), 0) INTO inv_limit_reached, inv_pending_msats
FROM "Invoice"
WHERE "userId" = user_id AND "expiresAt" > now_utc() AND "confirmedAt" IS NULL AND cancelled = false;
IF inv_limit_reached THEN
RAISE EXCEPTION 'SN_INV_PENDING_LIMIT';
END IF;
-- prevent pending invoices + msats from exceeding the limit
SELECT balance_limit_msats > 0 AND inv_pending_msats+msats_req+msats > balance_limit_msats INTO balance_limit_reached
FROM users
WHERE id = user_id;
IF balance_limit_reached THEN
RAISE EXCEPTION 'SN_INV_EXCEED_BALANCE';
END IF;
-- we good, proceed frens
INSERT INTO "Invoice" (hash, preimage, bolt11, "expiresAt", "msatsRequested", "userId", created_at, updated_at, "desc", comment, "lud18Data")
VALUES (hash, preimage, bolt11, expires_at, msats_req, user_id, now_utc(), now_utc(), idesc, comment, lud18_data) RETURNING * INTO invoice;
IF preimage IS NOT NULL THEN
INSERT INTO pgboss.job (name, data, retrylimit, retrybackoff, startafter)
VALUES ('finalizeHodlInvoice', jsonb_build_object('hash', hash), 21, true, expires_at);
END IF;
RETURN invoice;
END;
$$;
-- remove 'checkWithdrawal' job insertion since we're using LND subscriptions now
CREATE OR REPLACE FUNCTION create_withdrawl(lnd_id TEXT, invoice TEXT, msats_amount BIGINT, msats_max_fee BIGINT, username TEXT)
RETURNS "Withdrawl"
LANGUAGE plpgsql
AS $$
DECLARE
user_id INTEGER;
user_msats BIGINT;
withdrawl "Withdrawl";
BEGIN
PERFORM ASSERT_SERIALIZED();
SELECT msats, id INTO user_msats, user_id FROM users WHERE name = username;
IF (msats_amount + msats_max_fee) > user_msats THEN
RAISE EXCEPTION 'SN_INSUFFICIENT_FUNDS';
END IF;
IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status IS NULL) THEN
RAISE EXCEPTION 'SN_PENDING_WITHDRAWL_EXISTS';
END IF;
IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status = 'CONFIRMED') THEN
RAISE EXCEPTION 'SN_CONFIRMED_WITHDRAWL_EXISTS';
END IF;
INSERT INTO "Withdrawl" (hash, bolt11, "msatsPaying", "msatsFeePaying", "userId", created_at, updated_at)
VALUES (lnd_id, invoice, msats_amount, msats_max_fee, user_id, now_utc(), now_utc()) RETURNING * INTO withdrawl;
UPDATE users SET msats = msats - msats_amount - msats_max_fee WHERE id = user_id;
RETURN withdrawl;
END;
$$;
CREATE OR REPLACE FUNCTION check_invoices_and_withdrawals()
RETURNS INTEGER
LANGUAGE plpgsql
AS $$
DECLARE
BEGIN
INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingDeposits', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING;
INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingWithdrawals', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING;
return 0;
EXCEPTION WHEN OTHERS THEN
return 0;
END;
$$;
SELECT check_invoices_and_withdrawals();
DROP FUNCTION check_invoices_and_withdrawals();

View File

@ -540,6 +540,7 @@ model Invoice {
bolt11 String
expiresAt DateTime
confirmedAt DateTime?
confirmedIndex BigInt?
cancelled Boolean @default(false)
msatsRequested BigInt
msatsReceived BigInt?
@ -550,6 +551,7 @@ model Invoice {
@@index([createdAt], map: "Invoice.created_at_index")
@@index([userId], map: "Invoice.userId_index")
@@index([confirmedIndex], map: "Invoice.confirmedIndex_index")
}
model Withdrawl {

View File

@ -1,7 +1,7 @@
import PgBoss from 'pg-boss'
import nextEnv from '@next/env'
import { PrismaClient } from '@prisma/client'
import { checkInvoice, checkWithdrawal, autoDropBolt11s } from './wallet.js'
import { autoDropBolt11s, checkPendingDeposits, checkPendingWithdrawals, finalizeHodlInvoice, subscribeToWallet } from './wallet.js'
import { repin } from './repin.js'
import { trust } from './trust.js'
import { auction } from './auction.js'
@ -71,8 +71,11 @@ async function work () {
}
await boss.start()
await boss.work('checkInvoice', jobWrapper(checkInvoice))
await boss.work('checkWithdrawal', jobWrapper(checkWithdrawal))
await subscribeToWallet(args)
await boss.work('finalizeHodlInvoice', jobWrapper(finalizeHodlInvoice))
await boss.work('checkPendingDeposits', jobWrapper(checkPendingDeposits))
await boss.work('checkPendingWithdrawals', jobWrapper(checkPendingWithdrawals))
await boss.work('autoDropBolt11s', jobWrapper(autoDropBolt11s))
await boss.work('repin-*', jobWrapper(repin))
await boss.work('trust', jobWrapper(trust))

View File

@ -1,47 +1,130 @@
import serialize from '../api/resolvers/serial.js'
import { getInvoice, getPayment, cancelHodlInvoice } from 'ln-service'
import { datePivot } from '../lib/time.js'
import {
getInvoice, getPayment, cancelHodlInvoice,
subscribeToInvoices, subscribeToPayments, subscribeToInvoice
} from 'ln-service'
import { sendUserNotification } from '../api/webPush/index.js'
import { msatsToSats, numWithUnits } from '../lib/format'
import { INVOICE_RETENTION_DAYS } from '../lib/constants'
import { sleep } from '../lib/time.js'
const walletOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true }
export async function subscribeToWallet (args) {
await subscribeToDeposits(args)
await subscribeToWithdrawals(args)
}
// TODO this should all be done via websockets
export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, lnd }) {
let inv
const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args)
const logEventError = (name, error) => console.error(`error running ${name}`, error)
async function subscribeToDeposits (args) {
const { models, lnd } = args
const [lastConfirmed] = await models.$queryRaw`
SELECT "confirmedIndex"
FROM "Invoice"
ORDER BY "confirmedIndex" DESC NULLS LAST
LIMIT 1`
// https://www.npmjs.com/package/ln-service#subscribetoinvoices
const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex })
sub.on('invoice_updated', async (inv) => {
try {
inv = await getInvoice({ id: hash, lnd })
} catch (err) {
console.log(err, hash)
// on lnd related errors, we manually retry so we don't exponentially backoff
await boss.send('checkInvoice', { hash }, walletOptions)
if (inv.secret) {
logEvent('invoice_updated', inv)
await checkInvoice({ data: { hash: inv.id }, ...args })
} else {
// this is a HODL invoice. We need to use SubscribeToInvoice which has is_held transitions
// https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice
// SubscribeToInvoices is only for invoice creation and settlement transitions
// https://api.lightning.community/api/lnd/lightning/subscribe-invoices
await subscribeToHodlInvoice({ hash: inv.id, ...args })
}
} catch (error) {
// XXX This is a critical error
// It might mean that we failed to record an invoice confirming
// and we won't get another chance to record it until restart
logEventError('invoice_updated', error)
}
})
sub.on('error', console.error)
// check pending deposits as a redundancy in case we failed to record
// an invoice_updated event
await checkPendingDeposits(args)
}
async function subscribeToHodlInvoice (args) {
const { lnd, hash, models } = args
let sub
try {
await new Promise((resolve, reject) => {
// https://www.npmjs.com/package/ln-service#subscribetoinvoice
sub = subscribeToInvoice({ id: hash, lnd })
sub.on('invoice_updated', async (inv) => {
logEvent('hodl_invoice_updated', inv)
try {
// record the is_held transition
if (inv.is_held) {
// this is basically confirm_invoice without setting confirmed_at
// and without setting the user balance
// those will be set when the invoice is settled by user action
await models.invoice.update({
where: { hash },
data: {
msatsReceived: Number(inv.received_mtokens),
isHeld: true
}
})
// after that we can stop listening for updates
resolve()
}
} catch (error) {
logEventError('hodl_invoice_updated', error)
reject(error)
}
})
sub.on('error', reject)
})
} finally {
sub?.removeAllListeners()
}
}
async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
const inv = await getInvoice({ id: hash, lnd })
// invoice could be created by LND but wasn't inserted into the database yet
// this is expected and the function will be called again with the updates
const dbInv = await models.invoice.findUnique({ where: { hash } })
if (!dbInv) {
console.log('invoice not found in database', hash)
return
}
console.log(inv)
// check if invoice still exists since HODL invoices get deleted after usage
const dbInv = await models.invoice.findUnique({ where: { hash } })
if (!dbInv) return
const expired = new Date(inv.expires_at) <= new Date()
if (inv.is_confirmed && !inv.is_held) {
// never mark hodl invoices as confirmed here because
// we manually confirm them when we settle them
if (inv.is_confirmed) {
// NOTE: confirm invoice prevents double confirmations (idempotent)
// ALSO: is_confirmed and is_held are mutually exclusive
// that is, a hold invoice will first be is_held but not is_confirmed
// and once it's settled it will be is_confirmed but not is_held
await serialize(models,
models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`)
models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`,
models.invoice.update({ where: { hash }, data: { confirmedIndex: inv.confirmed_index } })
)
// don't send notifications for hodl invoices
if (dbInv.preimage) return
sendUserNotification(dbInv.userId, {
title: `${numWithUnits(msatsToSats(inv.received_mtokens), { abbreviate: false })} were deposited in your account`,
body: dbInv.comment || undefined,
tag: 'DEPOSIT',
data: { sats: msatsToSats(inv.received_mtokens) }
}).catch(console.error)
return boss.send('nip57', { hash })
return await boss.send('nip57', { hash })
}
if (inv.is_canceled) {
return serialize(models,
return await serialize(models,
models.invoice.update({
where: {
hash: inv.id
@ -51,41 +134,62 @@ export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, l
}
}))
}
if (inv.is_held && !isHeldSet) {
// this is basically confirm_invoice without setting confirmed_at since it's not settled yet
// and without setting the user balance since that's done inside the same tx as the HODL invoice action.
await serialize(models,
models.invoice.update({ where: { hash }, data: { msatsReceived: Number(inv.received_mtokens), isHeld: true } }))
// remember that we already executed this if clause
// (even though the query above is idempotent but imo, this makes the flow more clear)
isHeldSet = true
}
if (!expired) {
// recheck in 5 seconds if the invoice is younger than 5 minutes
// otherwise recheck in 60 seconds
const startAfter = new Date(inv.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60
await boss.send('checkInvoice', { hash, isHeldSet }, { ...walletOptions, startAfter })
}
if (expired && inv.is_held) {
await cancelHodlInvoice({ id: hash, lnd })
}
}
export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }) {
async function subscribeToWithdrawals (args) {
const { lnd } = args
// https://www.npmjs.com/package/ln-service#subscribetopayments
const sub = subscribeToPayments({ lnd })
sub.on('confirmed', async (payment) => {
logEvent('confirmed', payment)
try {
await checkWithdrawal({ data: { hash: payment.id }, ...args })
} catch (error) {
// XXX This is a critical error
// It might mean that we failed to record an invoice confirming
// and we won't get another chance to record it until restart
logEventError('confirmed', error)
}
})
sub.on('failed', async (payment) => {
logEvent('failed', payment)
try {
await checkWithdrawal({ data: { hash: payment.id }, ...args })
} catch (error) {
// XXX This is a critical error
// It might mean that we failed to record an invoice confirming
// and we won't get another chance to record it until restart
logEventError('failed', error)
}
})
// ignore payment attempts
sub.on('paying', (attempt) => {})
sub.on('error', console.error)
// check pending withdrawals since they might have been paid while worker was down
await checkPendingWithdrawals(args)
}
async function checkWithdrawal ({ data: { hash }, boss, models, lnd }) {
const dbWdrwl = await models.withdrawl.findFirst({ where: { hash } })
if (!dbWdrwl) {
// [WARNING] LND paid an invoice that wasn't created via the SN GraphQL API.
// >>> an adversary might be draining our funds right now <<<
console.error('unexpected outgoing payment detected:', hash)
// TODO: log this in Slack
return
}
let wdrwl
let notFound = false
try {
wdrwl = await getPayment({ id: hash, lnd })
} catch (err) {
console.log(err)
if (err[1] === 'SentPaymentNotFound') {
notFound = true
} else {
// on lnd related errors, we manually retry so we don't exponentially backoff
await boss.send('checkWithdrawal', { id, hash }, walletOptions)
console.error('error getting payment', err)
return
}
}
@ -94,7 +198,7 @@ export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }
const fee = Number(wdrwl.payment.fee_mtokens)
const paid = Number(wdrwl.payment.mtokens) - fee
await serialize(models, models.$executeRaw`
SELECT confirm_withdrawl(${id}::INTEGER, ${paid}, ${fee})`)
SELECT confirm_withdrawl(${dbWdrwl.id}::INTEGER, ${paid}, ${fee})`)
} else if (wdrwl?.is_failed || notFound) {
let status = 'UNKNOWN_FAILURE'
if (wdrwl?.failed.is_insufficient_balance) {
@ -106,12 +210,11 @@ export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }
} else if (wdrwl?.failed.is_route_not_found) {
status = 'ROUTE_NOT_FOUND'
}
await serialize(models, models.$executeRaw`
SELECT reverse_withdrawl(${id}::INTEGER, ${status}::"WithdrawlStatus")`)
} else {
// we need to requeue to check again in 5 seconds
const startAfter = new Date(wdrwl.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60
await boss.send('checkWithdrawal', { id, hash }, { ...walletOptions, startAfter })
await serialize(models,
models.$executeRaw`
SELECT reverse_withdrawl(${dbWdrwl.id}::INTEGER, ${status}::"WithdrawlStatus")`
)
}
}
@ -124,3 +227,40 @@ export async function autoDropBolt11s ({ models }) {
AND hash IS NOT NULL;`
)
}
// The callback subscriptions above will NOT get called for HODL invoices that are already paid.
// So we manually cancel the HODL invoice here if it wasn't settled by user action
export async function finalizeHodlInvoice ({ data: { hash }, models, lnd }) {
const inv = await getInvoice({ id: hash, lnd })
if (inv.is_confirmed) {
return
}
await cancelHodlInvoice({ id: hash, lnd })
}
export async function checkPendingDeposits (args) {
const { models } = args
const pendingDeposits = await models.invoice.findMany({ where: { confirmedAt: null, cancelled: false } })
for (const d of pendingDeposits) {
try {
await checkInvoice({ data: { id: d.id, hash: d.hash }, ...args })
await sleep(10)
} catch {
console.error('error checking invoice', d.hash)
}
}
}
export async function checkPendingWithdrawals (args) {
const { models } = args
const pendingWithdrawals = await models.withdrawl.findMany({ where: { status: null } })
for (const w of pendingWithdrawals) {
try {
await checkWithdrawal({ data: { id: w.id, hash: w.hash }, ...args })
await sleep(10)
} catch {
console.error('error checking withdrawal', w.hash)
}
}
}