From 06b661625ce5515997c774dc11a7c56e51bfa75c Mon Sep 17 00:00:00 2001 From: ekzyis Date: Sun, 18 Aug 2024 17:28:39 -0500 Subject: [PATCH] Use custom relay API (#1302) * Use custom relay API Relay from nostr-tools was cumbersome to use. This custom abstraction over window.WebSocket makes interacting with nostr relays easier. * Use variables for nostr message parts * Fix NWC save * Use try/finally * Refactor crossposting code * use custom replay API * simplify callWithTimeout * Use isomorphic-ws for nip57 zap receipts * Use async map * Reject with timeout error * Move time functions into lib/time * Remove outdated comment regarding relay.close() --- components/nostr-auth.js | 2 +- components/use-crossposter.js | 3 +- lib/nostr.js | 204 +++++++++++++++++++++++++--------- lib/time.js | 17 +++ package-lock.json | 9 ++ package.json | 1 + wallets/nwc/client.js | 140 ++++++++--------------- worker/nostr.js | 32 ++---- 8 files changed, 234 insertions(+), 174 deletions(-) diff --git a/components/nostr-auth.js b/components/nostr-auth.js index 530e3865..bc7223c3 100644 --- a/components/nostr-auth.js +++ b/components/nostr-auth.js @@ -8,7 +8,7 @@ import { useRouter } from 'next/router' import AccordianItem from './accordian-item' import BackIcon from '@/svgs/arrow-left-line.svg' import styles from './lightning-auth.module.css' -import { callWithTimeout } from '@/lib/nostr' +import { callWithTimeout } from '@/lib/time' function ExtensionError ({ message, details }) { return ( diff --git a/components/use-crossposter.js b/components/use-crossposter.js index c4549815..189e6cae 100644 --- a/components/use-crossposter.js +++ b/components/use-crossposter.js @@ -1,7 +1,8 @@ import { useCallback } from 'react' import { useToast } from './toast' import { Button } from 'react-bootstrap' -import { DEFAULT_CROSSPOSTING_RELAYS, crosspost, callWithTimeout } from '@/lib/nostr' +import { DEFAULT_CROSSPOSTING_RELAYS, crosspost } from '@/lib/nostr' +import { callWithTimeout } from '@/lib/time' import { gql, useMutation, useQuery, useLazyQuery } from '@apollo/client' import { SETTINGS } from '@/fragments/users' import { ITEM_FULL_FIELDS, POLL_FIELDS } from '@/fragments/items' diff --git a/lib/nostr.js b/lib/nostr.js index c016b0e0..35a207fe 100644 --- a/lib/nostr.js +++ b/lib/nostr.js @@ -1,5 +1,7 @@ import { bech32 } from 'bech32' import { nip19 } from 'nostr-tools' +import WebSocket from 'isomorphic-ws' +import { callWithTimeout, withTimeout } from '@/lib/time' export const NOSTR_PUBKEY_HEX = /^[0-9a-fA-F]{64}$/ export const NOSTR_PUBKEY_BECH32 = /^npub1[02-9ac-hj-np-z]+$/ @@ -15,6 +17,149 @@ export const DEFAULT_CROSSPOSTING_RELAYS = [ 'wss://relay.mutinywallet.com/' ] +export class Relay { + constructor (relayUrl) { + const ws = new WebSocket(relayUrl) + + ws.onmessage = function (msg) { + const [type, notice] = JSON.parse(msg.data) + if (type === 'NOTICE') { + console.log('relay notice:', notice) + } + } + + ws.onerror = function (err) { + console.error('websocket error: ' + err) + this.error = err + } + + this.ws = ws + } + + static async connect (url, { timeout } = {}) { + const relay = new Relay(url) + await relay.waitUntilConnected({ timeout }) + return relay + } + + get connected () { + return this.ws.readyState === WebSocket.OPEN + } + + get closed () { + return this.ws.readyState === WebSocket.CLOSING || this.ws.readyState === WebSocket.CLOSED + } + + async waitUntilConnected ({ timeout } = {}) { + let interval + + const checkPromise = new Promise((resolve, reject) => { + interval = setInterval(() => { + if (this.connected) { + resolve() + } + if (this.closed) { + reject(new Error(`failed to connect to ${this.url}: ` + this.error)) + } + }, 100) + }) + + try { + return await withTimeout(checkPromise, timeout) + } finally { + clearInterval(interval) + } + } + + close () { + const state = this.ws.readyState + if (state !== WebSocket.CLOSING && state !== WebSocket.CLOSED) { + this.ws.close() + } + } + + async publish (event, { timeout } = {}) { + const ws = this.ws + + let listener + const ackPromise = new Promise((resolve, reject) => { + ws.send(JSON.stringify(['EVENT', event])) + + listener = function onmessage (msg) { + const [type, eventId, accepted, reason] = JSON.parse(msg.data) + + if (type !== 'OK' || eventId !== event.id) return + + if (accepted) { + resolve(eventId) + } else { + reject(new Error(reason || `event rejected: ${eventId}`)) + } + } + + ws.addEventListener('message', listener) + }) + + try { + return await withTimeout(ackPromise, timeout) + } finally { + ws.removeEventListener('message', listener) + } + } + + async fetch (filter, { timeout } = {}) { + const ws = this.ws + + let listener + const ackPromise = new Promise((resolve, reject) => { + const id = crypto.randomUUID() + + ws.send(JSON.stringify(['REQ', id, ...filter])) + + const events = [] + let eose = false + + listener = function onmessage (msg) { + const [type, eventId, event] = JSON.parse(msg.data) + + if (eventId !== id) return + + if (type === 'EVENT') { + events.push(event) + if (eose) { + // EOSE was already received: + // return first event after EOSE + resolve(events) + } + return + } + + if (type === 'CLOSED') { + return resolve(events) + } + + if (type === 'EOSE') { + eose = true + if (events.length > 0) { + // we already received events before EOSE: + // return all events before EOSE + ws.send(JSON.stringify(['CLOSE', id])) + return resolve(events) + } + } + } + + ws.addEventListener('message', listener) + }) + + try { + return await withTimeout(ackPromise, timeout) + } finally { + ws.removeEventListener('message', listener) + } + } +} + export function hexToBech32 (hex, prefix = 'npub') { return bech32.encode(prefix, bech32.toWords(Buffer.from(hex, 'hex'))) } @@ -36,51 +181,10 @@ export function nostrZapDetails (zap) { return { npub, content, note } } -async function publishNostrEvent (signedEvent, relay) { - return new Promise((resolve, reject) => { - const timeout = 3000 - const wsRelay = new window.WebSocket(relay) - let timer - let isMessageSentSuccessfully = false - - function timedout () { - clearTimeout(timer) - wsRelay.close() - reject(new Error(`relay timeout for ${relay}`)) - } - - timer = setTimeout(timedout, timeout) - - wsRelay.onopen = function () { - clearTimeout(timer) - timer = setTimeout(timedout, timeout) - wsRelay.send(JSON.stringify(['EVENT', signedEvent])) - } - - wsRelay.onmessage = function (msg) { - const m = JSON.parse(msg.data) - if (m[0] === 'OK') { - isMessageSentSuccessfully = true - clearTimeout(timer) - wsRelay.close() - console.log('Successfully sent event to', relay) - resolve() - } - } - - wsRelay.onerror = function (error) { - clearTimeout(timer) - console.log(error) - reject(new Error(`relay error: Failed to send to ${relay}`)) - } - - wsRelay.onclose = function () { - clearTimeout(timer) - if (!isMessageSentSuccessfully) { - reject(new Error(`relay error: Failed to send to ${relay}`)) - } - } - }) +async function publishNostrEvent (signedEvent, relayUrl) { + const timeout = 3000 + const relay = await Relay.connect(relayUrl, { timeout }) + await relay.publish(signedEvent, { timeout }) } export async function crosspost (event, relays = DEFAULT_CROSSPOSTING_RELAYS) { @@ -118,13 +222,3 @@ export async function crosspost (event, relays = DEFAULT_CROSSPOSTING_RELAYS) { return { error } } } - -export function callWithTimeout (targetFunction, timeoutMs) { - return new Promise((resolve, reject) => { - Promise.race([ - targetFunction(), - new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeouted after ' + timeoutMs + ' ms waiting for extension')), timeoutMs)) - ]).then(resolve) - .catch(reject) - }) -} diff --git a/lib/time.js b/lib/time.js index fdc41a0f..aedbf4b6 100644 --- a/lib/time.js +++ b/lib/time.js @@ -127,3 +127,20 @@ function tzOffset (tz) { const targetOffsetHours = (date.getTime() - targetDate.getTime()) / 1000 / 60 / 60 return targetOffsetHours } + +function timeoutPromise (timeout) { + return new Promise((resolve, reject) => { + // if no timeout is specified, never settle + if (!timeout) return + + setTimeout(() => reject(new Error('timeout')), timeout) + }) +} + +export async function withTimeout (promise, timeout) { + return await Promise.race([promise, timeoutPromise(timeout)]) +} + +export async function callWithTimeout (fn, timeout) { + return await Promise.race([fn(), timeoutPromise(timeout)]) +} diff --git a/package-lock.json b/package-lock.json index 46663ebd..0a6358e9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -41,6 +41,7 @@ "graphql-scalar": "^0.1.0", "graphql-tag": "^2.12.6", "graphql-type-json": "^0.3.2", + "isomorphic-ws": "^5.0.0", "ln-service": "^57.1.3", "macaroon": "^3.0.4", "mathjs": "^11.11.2", @@ -10548,6 +10549,14 @@ "integrity": "sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==", "dev": true }, + "node_modules/isomorphic-ws": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz", + "integrity": "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==", + "peerDependencies": { + "ws": "*" + } + }, "node_modules/isstream": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz", diff --git a/package.json b/package.json index d11e0c14..282b0f00 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "graphql-scalar": "^0.1.0", "graphql-tag": "^2.12.6", "graphql-type-json": "^0.3.2", + "isomorphic-ws": "^5.0.0", "ln-service": "^57.1.3", "macaroon": "^3.0.4", "mathjs": "^11.11.2", diff --git a/wallets/nwc/client.js b/wallets/nwc/client.js index d6bbf61e..b06021d2 100644 --- a/wallets/nwc/client.js +++ b/wallets/nwc/client.js @@ -1,5 +1,6 @@ import { parseNwcUrl } from '@/lib/url' -import { Relay, finalizeEvent, nip04 } from 'nostr-tools' +import { finalizeEvent, nip04 } from 'nostr-tools' +import { Relay } from '@/lib/nostr' export * from 'wallets/nwc' @@ -7,112 +8,67 @@ export async function testConnectClient ({ nwcUrl }, { logger }) { const { relayUrl, walletPubkey } = parseNwcUrl(nwcUrl) logger.info(`requesting info event from ${relayUrl}`) - const relay = await Relay - .connect(relayUrl) - .catch(() => { - // NOTE: passed error is undefined for some reason - const msg = `failed to connect to ${relayUrl}` - logger.error(msg) - throw new Error(msg) - }) + + const relay = await Relay.connect(relayUrl) logger.ok(`connected to ${relayUrl}`) try { - await new Promise((resolve, reject) => { - let found = false - const sub = relay.subscribe([ - { - kinds: [13194], - authors: [walletPubkey] - } - ], { - onevent (event) { - found = true - logger.ok(`received info event from ${relayUrl}`) - resolve(event) - }, - onclose (reason) { - if (!['closed by caller', 'relay connection closed by us'].includes(reason)) { - // only log if not closed by us (caller) - const msg = 'connection closed: ' + (reason || 'unknown reason') - logger.error(msg) - reject(new Error(msg)) - } - }, - oneose () { - if (!found) { - const msg = 'EOSE received without info event' - logger.error(msg) - reject(new Error(msg)) - } - sub?.close() - } - }) - }) + const [info] = await relay.fetch([{ + kinds: [13194], + authors: [walletPubkey] + }]) + + if (info) { + logger.ok(`received info event from ${relayUrl}`) + } else { + throw new Error('info event not found') + } } finally { - // For some reason, this throws 'WebSocket is already in CLOSING or CLOSED state' - // even though relay connection is still open here - relay?.close()?.catch() - if (relay) logger.info(`closed connection to ${relayUrl}`) + relay?.close() + logger.info(`closed connection to ${relayUrl}`) } } export async function sendPayment (bolt11, { nwcUrl }, { logger }) { const { relayUrl, walletPubkey, secret } = parseNwcUrl(nwcUrl) - const relay = await Relay.connect(relayUrl).catch(() => { - // NOTE: passed error is undefined for some reason - throw new Error(`failed to connect to ${relayUrl}`) - }) + const relay = await Relay.connect(relayUrl) logger.ok(`connected to ${relayUrl}`) try { - const ret = await new Promise(function (resolve, reject) { - (async function () { - const payload = { - method: 'pay_invoice', - params: { invoice: bolt11 } - } - const content = await nip04.encrypt(secret, walletPubkey, JSON.stringify(payload)) + const payload = { + method: 'pay_invoice', + params: { invoice: bolt11 } + } + const encrypted = await nip04.encrypt(secret, walletPubkey, JSON.stringify(payload)) - const request = finalizeEvent({ - kind: 23194, - created_at: Math.floor(Date.now() / 1000), - tags: [['p', walletPubkey]], - content - }, secret) - await relay.publish(request) + const request = finalizeEvent({ + kind: 23194, + created_at: Math.floor(Date.now() / 1000), + tags: [['p', walletPubkey]], + content: encrypted + }, secret) + await relay.publish(request) - const filter = { - kinds: [23195], - authors: [walletPubkey], - '#e': [request.id] - } - relay.subscribe([filter], { - async onevent (response) { - try { - const content = JSON.parse(await nip04.decrypt(secret, walletPubkey, response.content)) - if (content.error) return reject(new Error(content.error.message)) - if (content.result) return resolve({ preimage: content.result.preimage }) - } catch (err) { - return reject(err) - } - }, - onclose (reason) { - if (!['closed by caller', 'relay connection closed by us'].includes(reason)) { - // only log if not closed by us (caller) - const msg = 'connection closed: ' + (reason || 'unknown reason') - reject(new Error(msg)) - } - } - }) - })().catch(reject) - }) - return ret + const [response] = await relay.fetch([{ + kinds: [23195], + authors: [walletPubkey], + '#e': [request.id] + }]) + + if (!response) { + throw new Error('no response') + } + + const decrypted = await nip04.decrypt(secret, walletPubkey, response.content) + const content = JSON.parse(decrypted) + + if (content.error) throw new Error(content.error.message) + if (content.result) return { preimage: content.result.preimage } + + throw new Error('invalid response') } finally { - // For some reason, this throws 'WebSocket is already in CLOSING or CLOSED state' - // even though relay connection is still open here - relay?.close()?.catch() - if (relay) logger.info(`closed connection to ${relayUrl}`) + relay?.close() + logger.info(`closed connection to ${relayUrl}`) } } diff --git a/worker/nostr.js b/worker/nostr.js index 4553f8af..b8ff8128 100644 --- a/worker/nostr.js +++ b/worker/nostr.js @@ -1,5 +1,6 @@ import { getInvoice } from 'ln-service' -import { Relay, signId, calculateId, getPublicKey } from 'nostr' +import { signId, calculateId, getPublicKey } from 'nostr' +import { Relay } from '@/lib/nostr' const nostrOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true } @@ -50,31 +51,12 @@ export async function nip57 ({ data: { hash }, boss, lnd, models }) { console.log('zap note', e, relays) await Promise.allSettled( - relays.map(r => new Promise((resolve, reject) => { + relays.map(async r => { const timeout = 1000 - const relay = Relay(r) - - function timedout () { - relay.close() - console.log('failed to send to', r) - reject(new Error('relay timeout')) - } - - let timer = setTimeout(timedout, timeout) - - relay.on('open', () => { - clearTimeout(timer) - timer = setTimeout(timedout, timeout) - relay.send(['EVENT', e]) - }) - - relay.on('ok', () => { - clearTimeout(timer) - relay.close() - console.log('sent zap to', r) - resolve() - }) - }))) + const relay = await Relay.connect(r, { timeout }) + await relay.publish(e, { timeout }) + }) + ) } catch (e) { console.log(e) }