run noncritical side effects outside critical path of paid action (#1464)

* run noncritical side effects outside critical path of paid action

* fix item fetching of zap side effect

* fix vapid pubkey env var name in readme
This commit is contained in:
Keyan 2024-10-08 11:48:19 -05:00 committed by GitHub
parent 4532e00085
commit fec7c92fd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 64 additions and 27 deletions

View File

@ -429,7 +429,7 @@ GITHUB_SECRET=<Client secret>
## Enabling web push notifications ## Enabling web push notifications
To enable Web Push locally, you will need to set the `VAPID_*` env vars. `VAPID_MAILTO` needs to be an email address using the `mailto:` scheme. For `NEXT_PUBLIC_VAPID_KEY` and `VAPID_PRIVKEY`, you can run `npx web-push generate-vapid-keys`. To enable Web Push locally, you will need to set the `VAPID_*` env vars. `VAPID_MAILTO` needs to be an email address using the `mailto:` scheme. For `NEXT_PUBLIC_VAPID_PUBKEY` and `VAPID_PRIVKEY`, you can run `npx web-push generate-vapid-keys`.
<br> <br>

View File

@ -154,7 +154,11 @@ All functions have the following signature: `function(args: Object, context: Obj
- it can optionally store in the invoice with the `invoiceId` the `actionId` to be able to link the action with the invoice regardless of retries - it can optionally store in the invoice with the `invoiceId` the `actionId` to be able to link the action with the invoice regardless of retries
- `onPaid`: called when the action is paid - `onPaid`: called when the action is paid
- if the action does not support optimism, this function is optional - if the action does not support optimism, this function is optional
- this function should be used to mark the rows created in `perform` as `PAID` and perform any other side effects of the action (like notifications or denormalizations) - this function should be used to mark the rows created in `perform` as `PAID` and perform critical side effects of the action (like denormalizations)
- `nonCriticalSideEffects`: called after the action is paid to run any side effects whose failure does not affect the action's execution
- this function is always optional
- it's passed the result of the action (or the action's paid invoice) and the current context
- this is where things like push notifications should be handled
- `onFail`: called when the action fails - `onFail`: called when the action fails
- if the action does not support optimism, this function is optional - if the action does not support optimism, this function is optional
- this function should be used to mark the rows created in `perform` as `FAILED` - this function should be used to mark the rows created in `perform` as `FAILED`

View File

@ -38,7 +38,7 @@ export async function retry ({ invoiceId, newInvoiceId }, { tx, cost }) {
return { id, sats: msatsToSats(cost), act: 'BOOST', path } return { id, sats: msatsToSats(cost), act: 'BOOST', path }
} }
export async function onPaid ({ invoice, actId }, { models, tx }) { export async function onPaid ({ invoice, actId }, { tx }) {
let itemAct let itemAct
if (invoice) { if (invoice) {
await tx.itemAct.updateMany({ await tx.itemAct.updateMany({

View File

@ -99,7 +99,7 @@ async function performFeeCreditAction (actionType, args, context) {
const { me, models, cost } = context const { me, models, cost } = context
const action = paidActions[actionType] const action = paidActions[actionType]
return await models.$transaction(async tx => { const result = await models.$transaction(async tx => {
context.tx = tx context.tx = tx
await tx.user.update({ await tx.user.update({
@ -121,6 +121,11 @@ async function performFeeCreditAction (actionType, args, context) {
paymentMethod: 'FEE_CREDIT' paymentMethod: 'FEE_CREDIT'
} }
}, { isolationLevel: Prisma.TransactionIsolationLevel.ReadCommitted }) }, { isolationLevel: Prisma.TransactionIsolationLevel.ReadCommitted })
// run non critical side effects in the background
// after the transaction has been committed
action.nonCriticalSideEffects?.(result.result, context).catch(console.error)
return result
} }
async function performOptimisticAction (actionType, args, context) { async function performOptimisticAction (actionType, args, context) {

View File

@ -154,15 +154,13 @@ export async function retry ({ invoiceId, newInvoiceId }, { tx }) {
} }
export async function onPaid ({ invoice, id }, context) { export async function onPaid ({ invoice, id }, context) {
const { models, tx } = context const { tx } = context
let item let item
if (invoice) { if (invoice) {
item = await tx.item.findFirst({ item = await tx.item.findFirst({
where: { invoiceId: invoice.id }, where: { invoiceId: invoice.id },
include: { include: {
mentions: true,
itemReferrers: { include: { refereeItem: true } },
user: true user: true
} }
}) })
@ -173,8 +171,6 @@ export async function onPaid ({ invoice, id }, context) {
item = await tx.item.findUnique({ item = await tx.item.findUnique({
where: { id }, where: { id },
include: { include: {
mentions: true,
itemReferrers: { include: { refereeItem: true } },
user: true, user: true,
itemUploads: { include: { upload: true } } itemUploads: { include: { upload: true } }
} }
@ -224,17 +220,30 @@ export async function onPaid ({ invoice, id }, context) {
SELECT comment.created_at, comment.updated_at, ancestors.id, ancestors."userId", SELECT comment.created_at, comment.updated_at, ancestors.id, ancestors."userId",
comment.id, comment."userId", nlevel(comment.path) - nlevel(ancestors.path) comment.id, comment."userId", nlevel(comment.path) - nlevel(ancestors.path)
FROM ancestors, comment` FROM ancestors, comment`
}
}
export async function nonCriticalSideEffects ({ invoice, id }, { models }) {
const item = await models.item.findFirst({
where: invoice ? { invoiceId: invoice.id } : { id: parseInt(id) },
include: {
mentions: true,
itemReferrers: { include: { refereeItem: true } },
user: true
}
})
if (item.parentId) {
notifyItemParents({ item, models }).catch(console.error) notifyItemParents({ item, models }).catch(console.error)
} }
for (const { userId } of item.mentions) { for (const { userId } of item.mentions) {
notifyMention({ models, item, userId }).catch(console.error) notifyMention({ models, item, userId }).catch(console.error)
} }
for (const { refereeItem } of item.itemReferrers) { for (const { refereeItem } of item.itemReferrers) {
notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error) notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error)
} }
notifyUserSubscribers({ models: tx, item }).catch(console.error)
notifyUserSubscribers({ models, item }).catch(console.error)
notifyTerritorySubscribers({ models, item }).catch(console.error) notifyTerritorySubscribers({ models, item }).catch(console.error)
} }

View File

@ -24,7 +24,7 @@ export async function getCost ({ id, boost = 0, uploadIds, bio }, { me, models }
export async function perform (args, context) { export async function perform (args, context) {
const { id, boost = 0, uploadIds = [], options: pollOptions = [], forwardUsers: itemForwards = [], ...data } = args const { id, boost = 0, uploadIds = [], options: pollOptions = [], forwardUsers: itemForwards = [], ...data } = args
const { tx, me, models } = context const { tx, me } = context
const old = await tx.item.findUnique({ const old = await tx.item.findUnique({
where: { id: parseInt(id) }, where: { id: parseInt(id) },
include: { include: {
@ -63,12 +63,8 @@ export async function perform (args, context) {
// we put boost in the where clause because we don't want to update the boost // we put boost in the where clause because we don't want to update the boost
// if it has changed concurrently // if it has changed concurrently
const item = await tx.item.update({ await tx.item.update({
where: { id: parseInt(id), boost: old.boost }, where: { id: parseInt(id), boost: old.boost },
include: {
mentions: true,
itemReferrers: { include: { refereeItem: true } }
},
data: { data: {
...data, ...data,
boost: { boost: {
@ -151,6 +147,21 @@ export async function perform (args, context) {
await performBotBehavior(args, context) await performBotBehavior(args, context)
// ltree is unsupported in Prisma, so we have to query it manually (FUCK!)
return (await tx.$queryRaw`
SELECT *, ltree2text(path) AS path, created_at AS "createdAt", updated_at AS "updatedAt"
FROM "Item" WHERE id = ${parseInt(id)}::INTEGER`
)[0]
}
export async function nonCriticalSideEffects ({ invoice, id }, { models }) {
const item = await models.item.findFirst({
where: invoice ? { invoiceId: invoice.id } : { id: parseInt(id) },
include: {
mentions: true,
itemReferrers: { include: { refereeItem: true } }
}
})
// compare timestamps to only notify if mention or item referral was just created to avoid duplicates on edits // compare timestamps to only notify if mention or item referral was just created to avoid duplicates on edits
for (const { userId, createdAt } of item.mentions) { for (const { userId, createdAt } of item.mentions) {
if (item.updatedAt.getTime() !== createdAt.getTime()) continue if (item.updatedAt.getTime() !== createdAt.getTime()) continue
@ -160,12 +171,6 @@ export async function perform (args, context) {
if (item.updatedAt.getTime() !== createdAt.getTime()) continue if (item.updatedAt.getTime() !== createdAt.getTime()) continue
notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error) notifyItemMention({ models, referrerItem: item, refereeItem }).catch(console.error)
} }
// ltree is unsupported in Prisma, so we have to query it manually (FUCK!)
return (await tx.$queryRaw`
SELECT *, ltree2text(path) AS path, created_at AS "createdAt", updated_at AS "updatedAt"
FROM "Item" WHERE id = ${parseInt(id)}::INTEGER`
)[0]
} }
export async function describe ({ id, parentId }, context) { export async function describe ({ id, parentId }, context) {

View File

@ -64,7 +64,7 @@ export async function retry ({ invoiceId, newInvoiceId }, { tx, cost }) {
return { id, sats: msatsToSats(cost), act: 'TIP', path } return { id, sats: msatsToSats(cost), act: 'TIP', path }
} }
export async function onPaid ({ invoice, actIds }, { models, tx }) { export async function onPaid ({ invoice, actIds }, { tx }) {
let acts let acts
if (invoice) { if (invoice) {
await tx.itemAct.updateMany({ await tx.itemAct.updateMany({
@ -114,7 +114,7 @@ export async function onPaid ({ invoice, actIds }, { models, tx }) {
// perform denomormalized aggregates: weighted votes, upvotes, msats, lastZapAt // perform denomormalized aggregates: weighted votes, upvotes, msats, lastZapAt
// NOTE: for the rows that might be updated by a concurrent zap, we use UPDATE for implicit locking // NOTE: for the rows that might be updated by a concurrent zap, we use UPDATE for implicit locking
const [item] = await tx.$queryRaw` await tx.$queryRaw`
WITH zapper AS ( WITH zapper AS (
SELECT trust FROM users WHERE id = ${itemAct.userId}::INTEGER SELECT trust FROM users WHERE id = ${itemAct.userId}::INTEGER
), zap AS ( ), zap AS (
@ -163,8 +163,14 @@ export async function onPaid ({ invoice, actIds }, { models, tx }) {
SET "commentMsats" = "Item"."commentMsats" + ${msats}::BIGINT SET "commentMsats" = "Item"."commentMsats" + ${msats}::BIGINT
FROM zapped FROM zapped
WHERE "Item".path @> zapped.path AND "Item".id <> zapped.id` WHERE "Item".path @> zapped.path AND "Item".id <> zapped.id`
}
notifyZapped({ models, item }).catch(console.error) export async function nonCriticalSideEffects ({ invoice, actIds }, { models }) {
const itemAct = await models.itemAct.findFirst({
where: invoice ? { invoiceId: invoice.id } : { id: { in: actIds } },
include: { item: true }
})
notifyZapped({ models, item: itemAct.item }).catch(console.error)
} }
export async function onFail ({ invoice }, { tx }) { export async function onFail ({ invoice }, { tx }) {

View File

@ -134,7 +134,7 @@ async function performPessimisticAction ({ lndInvoice, dbInvoice, tx, models, ln
} }
export async function paidActionPaid ({ data: { invoiceId, ...args }, models, lnd, boss }) { export async function paidActionPaid ({ data: { invoiceId, ...args }, models, lnd, boss }) {
return await transitionInvoice('paidActionPaid', { const transitionedInvoice = await transitionInvoice('paidActionPaid', {
invoiceId, invoiceId,
fromState: ['HELD', 'PENDING', 'FORWARDED'], fromState: ['HELD', 'PENDING', 'FORWARDED'],
toState: 'PAID', toState: 'PAID',
@ -156,6 +156,14 @@ export async function paidActionPaid ({ data: { invoiceId, ...args }, models, ln
}, },
...args ...args
}, { models, lnd, boss }) }, { models, lnd, boss })
if (transitionedInvoice) {
// run non critical side effects in the background
// after the transaction has been committed
paidActions[transitionedInvoice.actionType]
.nonCriticalSideEffects?.({ invoice: transitionedInvoice }, { models, lnd })
.catch(console.error)
}
} }
// this performs forward creating the outgoing payment // this performs forward creating the outgoing payment