From 054aeea4252fea402c6612bd201002510b13de1f Mon Sep 17 00:00:00 2001 From: keyan Date: Mon, 17 Jan 2022 11:41:17 -0600 Subject: [PATCH] break out worker into files using HOF --- worker/index.js | 150 +++++------------------------------------------ worker/repin.js | 42 +++++++++++++ worker/trust.js | 7 +++ worker/wallet.js | 88 +++++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 135 deletions(-) create mode 100644 worker/repin.js create mode 100644 worker/trust.js create mode 100644 worker/wallet.js diff --git a/worker/index.js b/worker/index.js index 3b4ac013..0d8d67b4 100644 --- a/worker/index.js +++ b/worker/index.js @@ -1,145 +1,25 @@ const PgBoss = require('pg-boss') const dotenv = require('dotenv') -const serialize = require('../api/resolvers/serial') -const { PrismaClient } = require('@prisma/client') -const { authenticatedLndGrpc, getInvoice, getPayment } = require('ln-service') - dotenv.config({ path: '..' }) - -const boss = new PgBoss(process.env.DATABASE_URL) -const { lnd } = authenticatedLndGrpc({ - cert: process.env.LND_CERT, - macaroon: process.env.LND_MACAROON, - socket: process.env.LND_SOCKET -}) -const models = new PrismaClient() -const walletOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true } - -boss.on('error', error => console.error(error)) +const { PrismaClient } = require('@prisma/client') +const { checkInvoice, checkWithdrawal } = require('./wallet') +const { repin } = require('./repin') +const { trust } = require('./trust') async function work () { + const boss = new PgBoss(process.env.DATABASE_URL) + const models = new PrismaClient() + const args = { boss, models } + + boss.on('error', error => console.error(error)) + await boss.start() - await boss.work('checkInvoice', checkInvoice) - await boss.work('checkWithdrawal', checkWithdrawal) - await boss.work('repin-*', repin) - await boss.work('trust', trust) + await boss.work('checkInvoice', checkInvoice(args)) + await boss.work('checkWithdrawal', checkWithdrawal(args)) + await boss.work('repin-*', repin(args)) + await boss.work('trust', trust(args)) + console.log('working jobs') } -async function trust () { - return null -} - -async function repin ({ name }) { - console.log(name) - // get the id - const id = name.slice('repin-'.length) - if (id.length === 0 || isNaN(id)) { - console.log('repin id not found in', name) - return - } - - // get the latest item with this id - const pinId = Number(id) - const current = await models.item.findFirst( - { - where: { - pinId - }, - orderBy: { - createdAt: 'desc' - } - } - ) - - if (!current) { - console.log('could not find existing item for', name) - return - } - - // create a new item with matching 1) title, text, and url and 2) setting pinId - await models.item.create({ - data: { - title: current.title, - text: current.text, - url: current.url, - userId: current.userId, - pinId - } - }) -} - -async function checkInvoice ({ data: { hash } }) { - let inv - try { - inv = await getInvoice({ id: hash, lnd }) - } catch (err) { - console.log(err) - // on lnd related errors, we manually retry which so we don't exponentially backoff - await boss.send('checkInvoice', { hash }, walletOptions) - return - } - console.log(inv) - - if (inv.is_confirmed) { - await serialize(models, - models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`) - } else if (inv.is_canceled) { - // mark as cancelled - await serialize(models, - models.invoice.update({ - where: { - hash: inv.id - }, - data: { - cancelled: true - } - })) - } else if (new Date(inv.expires_at) > new Date()) { - // not expired, recheck in 5 seconds - await boss.send('checkInvoice', { hash }, walletOptions) - } -} - -async function checkWithdrawal ({ data: { id, hash } }) { - let wdrwl - let notFound = false - try { - wdrwl = await getPayment({ id: hash, lnd }) - } catch (err) { - console.log(err) - if (err[1] === 'SentPaymentNotFound') { - notFound = true - } else { - // on lnd related errors, we manually retry which so we don't exponentially backoff - await boss.send('checkWithdrawal', { id, hash }, walletOptions) - return - } - } - console.log(wdrwl) - - if (wdrwl?.is_confirmed) { - const fee = Number(wdrwl.payment.fee_mtokens) - const paid = Number(wdrwl.payment.mtokens) - fee - await serialize(models, models.$executeRaw` - SELECT confirm_withdrawl(${id}, ${paid}, ${fee})`) - } else if (wdrwl?.is_failed || notFound) { - let status = 'UNKNOWN_FAILURE' - if (wdrwl?.failed.is_insufficient_balance) { - status = 'INSUFFICIENT_BALANCE' - } else if (wdrwl?.failed.is_invalid_payment) { - status = 'INVALID_PAYMENT' - } else if (wdrwl?.failed.is_pathfinding_timeout) { - status = 'PATHFINDING_TIMEOUT' - } else if (wdrwl?.failed.is_route_not_found) { - status = 'ROUTE_NOT_FOUND' - } - await serialize(models, models.$executeRaw` - SELECT reverse_withdrawl(${id}, ${status})`) - } else { - // we need to requeue to check again in 5 seconds - await boss.send('checkWithdrawal', { id, hash }, walletOptions) - } -} - work() diff --git a/worker/repin.js b/worker/repin.js new file mode 100644 index 00000000..ea0e76ce --- /dev/null +++ b/worker/repin.js @@ -0,0 +1,42 @@ +function repin ({ models }) { + return async function ({ name }) { + console.log(name) + // get the id + const id = name.slice('repin-'.length) + if (id.length === 0 || isNaN(id)) { + console.log('repin id not found in', name) + return + } + + // get the latest item with this id + const pinId = Number(id) + const current = await models.item.findFirst( + { + where: { + pinId + }, + orderBy: { + createdAt: 'desc' + } + } + ) + + if (!current) { + console.log('could not find existing item for', name) + return + } + + // create a new item with matching 1) title, text, and url and 2) setting pinId + await models.item.create({ + data: { + title: current.title, + text: current.text, + url: current.url, + userId: current.userId, + pinId + } + }) + } +} + +module.exports = { repin } diff --git a/worker/trust.js b/worker/trust.js new file mode 100644 index 00000000..b34d7c32 --- /dev/null +++ b/worker/trust.js @@ -0,0 +1,7 @@ +function trust ({ boss, models }) { + return async function () { + return null + } +} + +module.exports = { trust } diff --git a/worker/wallet.js b/worker/wallet.js new file mode 100644 index 00000000..15a17381 --- /dev/null +++ b/worker/wallet.js @@ -0,0 +1,88 @@ +const serialize = require('../api/resolvers/serial') +const { authenticatedLndGrpc, getInvoice, getPayment } = require('ln-service') + +const { lnd } = authenticatedLndGrpc({ + cert: process.env.LND_CERT, + macaroon: process.env.LND_MACAROON, + socket: process.env.LND_SOCKET +}) +const walletOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true } + +function checkInvoice ({ boss, models }) { + return async function ({ data: { hash } }) { + let inv + try { + inv = await getInvoice({ id: hash, lnd }) + } catch (err) { + console.log(err) + // on lnd related errors, we manually retry which so we don't exponentially backoff + await boss.send('checkInvoice', { hash }, walletOptions) + return + } + console.log(inv) + + if (inv.is_confirmed) { + await serialize(models, + models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`) + } else if (inv.is_canceled) { + // mark as cancelled + await serialize(models, + models.invoice.update({ + where: { + hash: inv.id + }, + data: { + cancelled: true + } + })) + } else if (new Date(inv.expires_at) > new Date()) { + // not expired, recheck in 5 seconds + await boss.send('checkInvoice', { hash }, walletOptions) + } + } +} + +function checkWithdrawal ({ boss, models }) { + return async function ({ data: { id, hash } }) { + let wdrwl + let notFound = false + try { + wdrwl = await getPayment({ id: hash, lnd }) + } catch (err) { + console.log(err) + if (err[1] === 'SentPaymentNotFound') { + notFound = true + } else { + // on lnd related errors, we manually retry which so we don't exponentially backoff + await boss.send('checkWithdrawal', { id, hash }, walletOptions) + return + } + } + console.log(wdrwl) + + if (wdrwl?.is_confirmed) { + const fee = Number(wdrwl.payment.fee_mtokens) + const paid = Number(wdrwl.payment.mtokens) - fee + await serialize(models, models.$executeRaw` + SELECT confirm_withdrawl(${id}, ${paid}, ${fee})`) + } else if (wdrwl?.is_failed || notFound) { + let status = 'UNKNOWN_FAILURE' + if (wdrwl?.failed.is_insufficient_balance) { + status = 'INSUFFICIENT_BALANCE' + } else if (wdrwl?.failed.is_invalid_payment) { + status = 'INVALID_PAYMENT' + } else if (wdrwl?.failed.is_pathfinding_timeout) { + status = 'PATHFINDING_TIMEOUT' + } else if (wdrwl?.failed.is_route_not_found) { + status = 'ROUTE_NOT_FOUND' + } + await serialize(models, models.$executeRaw` + SELECT reverse_withdrawl(${id}, ${status})`) + } else { + // we need to requeue to check again in 5 seconds + await boss.send('checkWithdrawal', { id, hash }, walletOptions) + } + } +} + +module.exports = { checkInvoice, checkWithdrawal }