Use SSE to send zaps to overlay

This commit is contained in:
ekzyis 2024-12-27 03:54:09 +01:00
parent aa584d2cb5
commit 4eaf0e1003
15 changed files with 559 additions and 19 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
.env
zaply
*_templ.go
__livereload

8
Makefile Normal file
View File

@ -0,0 +1,8 @@
.PHONY: dev
dev:
bash livereload.sh
build:
templ generate
go build -o zaply main.go

2
env/env.go vendored
View File

@ -16,6 +16,7 @@ var (
PhoenixdLimitedAccessToken string
CommitLongSha string
CommitShortSha string
Env string
)
func Load(filenames ...string) error {
@ -26,6 +27,7 @@ func Load(filenames ...string) error {
flag.StringVar(&PublicUrl, "PUBLIC_URL", "", "Base URL")
flag.StringVar(&PhoenixdURL, "PHOENIXD_URL", "", "Phoenixd URL")
flag.StringVar(&PhoenixdLimitedAccessToken, "PHOENIXD_LIMITED_ACCESS_TOKEN", "", "Phoenixd limited access token")
flag.StringVar(&Env, "ENV", "development", "Build environment")
return nil
}

1
go.mod
View File

@ -3,6 +3,7 @@ module github.com/ekzyis/zaply
go 1.23.4
require (
github.com/a-h/templ v0.2.793 // indirect
github.com/btcsuite/btcutil v1.0.2 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/labstack/echo/v4 v4.13.3 // indirect

2
go.sum
View File

@ -1,3 +1,5 @@
github.com/a-h/templ v0.2.793 h1:Io+/ocnfGWYO4VHdR0zBbf39PQlnzVCVVD+wEEs6/qY=
github.com/a-h/templ v0.2.793/go.mod h1:lq48JXoUvuQrU0VThrK31yFwdRjTCnIE5bcPCM9IP1w=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=

View File

@ -7,14 +7,16 @@ type PaymentRequest string
type Lightning interface {
CreateInvoice(msats int64, description string) (PaymentRequest, error)
GetInvoice(paymentHash string) (*Invoice, error)
IncomingPayments() chan *Invoice
}
type Invoice struct {
PaymentHash string
Preimage string
Msats int64
Description string
PaymentRequest string
CreatedAt time.Time
ConfirmedAt time.Time
PaymentHash string `json:"paymentHash"`
Preimage string `json:"preimage"`
Msats int64 `json:"msats"`
Description string `json:"description"`
PaymentRequest string `json:"paymentRequest"`
CreatedAt time.Time `json:"createdAt"`
ConfirmedAt time.Time `json:"confirmedAt"`
}

View File

@ -20,10 +20,14 @@ type Phoenixd struct {
accessToken string
limitedAccessToken string
webhookUrl string
paymentsChan chan *lightning.Invoice
}
func NewPhoenixd(opts ...func(*Phoenixd) *Phoenixd) *Phoenixd {
ln := &Phoenixd{}
ln := &Phoenixd{
paymentsChan: make(chan *lightning.Invoice),
}
for _, opt := range opts {
opt(ln)
}
@ -153,18 +157,21 @@ func (p *Phoenixd) GetInvoice(paymentHash string) (*lightning.Invoice, error) {
}, nil
}
func (p *Phoenixd) WebhookHandler(c echo.Context) error {
go func() {
var webhook struct {
Type string `json:"type"`
AmountSat int64 `json:"amountSat"`
PaymentHash string `json:"paymentHash"`
}
if err := c.Bind(&webhook); err != nil {
c.Logger().Error(err)
return
}
func (p *Phoenixd) IncomingPayments() chan *lightning.Invoice {
return p.paymentsChan
}
func (p *Phoenixd) WebhookHandler(c echo.Context) error {
var webhook struct {
Type string `json:"type"`
AmountSat int64 `json:"amountSat"`
PaymentHash string `json:"paymentHash"`
}
if err := c.Bind(&webhook); err != nil {
return err
}
go func() {
inv, err := p.GetInvoice(webhook.PaymentHash)
if err != nil {
c.Logger().Error(err)
@ -176,6 +183,8 @@ func (p *Phoenixd) WebhookHandler(c echo.Context) error {
inv.PaymentHash, inv.Msats, inv.Description,
inv.CreatedAt.Format(time.RFC3339), inv.ConfirmedAt.Format(time.RFC3339),
)
p.paymentsChan <- inv
}()
return c.NoContent(http.StatusOK)

38
livereload.sh Normal file
View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
PID=$(pidof zaply)
DIRS="env/ lightning/ lnurl/ pages/ server/"
set -e
echo ":: remote port forwarding for zap-dev.ekzy.is ::"
ssh -fnNR 5555:localhost:4444 zap-dev.ekzy.is
echo
function restart_server() {
set +e
[[ -z "$PID" ]] || kill -15 $PID
ENV=development make build -B
set -e
./zaply 2>&1 &
PID=$(pidof zaply)
}
function restart() {
restart_server
# give server time start listening for connections
sleep 1
date +%s.%N > public/__livereload
}
function cleanup() {
rm -f public/__livereload
[[ -z "$PID" ]] || kill -15 $PID
}
trap cleanup EXIT
restart
while inotifywait -r -e modify $DIRS; do
restart
done

32
pages/overlay.templ Normal file
View File

@ -0,0 +1,32 @@
package pages
templ Overlay() {
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>zaply</title>
<script src={ GetBaseUrl(ctx) + "/js/htmx.min.js" } integrity="sha384-HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+" crossorigin="anonymous"></script>
<script src={ GetBaseUrl(ctx) + "/js/htmx-sse.js" } crossorigin="anonymous"></script>
if GetEnv(ctx) == "development" {
<script src={ GetBaseUrl(ctx) + "/js/livereload.js" }></script>
}
<script>
const sse = new EventSource("/overlay/sse");
sse.onmessage = (event) => {
// console.log("event", event)
};
sse.addEventListener("zap", event => {
let inv
try {
inv = JSON.parse(event.data)
} catch(err) {
console.error("error parsing zap event", err)
return
}
console.log("zap received:", inv.paymentHash, inv.msats, inv.description)
})
</script>
</head>
</html>
}

46
pages/render.go Normal file
View File

@ -0,0 +1,46 @@
package pages
import (
"context"
"net/http"
"strings"
"github.com/a-h/templ"
"github.com/ekzyis/zaply/env"
"github.com/labstack/echo/v4"
)
var baseUrlContextKey = "baseUrl"
var envContextKey = "env"
func GetBaseUrl(ctx context.Context) string {
if u, ok := ctx.Value(baseUrlContextKey).(string); ok {
return strings.TrimRight(u, "/")
}
return ""
}
func GetEnv(ctx context.Context) string {
if u, ok := ctx.Value(envContextKey).(string); ok {
return u
}
return "development"
}
func OverlayHandler(c echo.Context) error {
return render(c, http.StatusOK, Overlay())
}
func render(ctx echo.Context, statusCode int, t templ.Component) error {
buf := templ.GetBuffer()
defer templ.ReleaseBuffer(buf)
renderContext := context.WithValue(ctx.Request().Context(), baseUrlContextKey, env.PublicUrl)
renderContext = context.WithValue(renderContext, envContextKey, env.Env)
if err := t.Render(renderContext, buf); err != nil {
return err
}
return ctx.HTML(statusCode, buf.String())
}

290
public/js/htmx-sse.js Normal file
View File

@ -0,0 +1,290 @@
/*
Server Sent Events Extension
============================
This extension adds support for Server Sent Events to htmx. See /www/extensions/sse.md for usage instructions.
*/
(function() {
/** @type {import("../htmx").HtmxInternalApi} */
var api
htmx.defineExtension('sse', {
/**
* Init saves the provided reference to the internal HTMX API.
*
* @param {import("../htmx").HtmxInternalApi} api
* @returns void
*/
init: function(apiRef) {
// store a reference to the internal API.
api = apiRef
// set a function in the public API for creating new EventSource objects
if (htmx.createEventSource == undefined) {
htmx.createEventSource = createEventSource
}
},
getSelectors: function() {
return ['[sse-connect]', '[data-sse-connect]', '[sse-swap]', '[data-sse-swap]']
},
/**
* onEvent handles all events passed to this extension.
*
* @param {string} name
* @param {Event} evt
* @returns void
*/
onEvent: function(name, evt) {
var parent = evt.target || evt.detail.elt
switch (name) {
case 'htmx:beforeCleanupElement':
var internalData = api.getInternalData(parent)
// Try to remove remove an EventSource when elements are removed
var source = internalData.sseEventSource
if (source) {
api.triggerEvent(parent, 'htmx:sseClose', {
source,
type: 'nodeReplaced',
})
internalData.sseEventSource.close()
}
return
// Try to create EventSources when elements are processed
case 'htmx:afterProcessNode':
ensureEventSourceOnElement(parent)
}
}
})
/// ////////////////////////////////////////////
// HELPER FUNCTIONS
/// ////////////////////////////////////////////
/**
* createEventSource is the default method for creating new EventSource objects.
* it is hoisted into htmx.config.createEventSource to be overridden by the user, if needed.
*
* @param {string} url
* @returns EventSource
*/
function createEventSource(url) {
return new EventSource(url, { withCredentials: true })
}
/**
* registerSSE looks for attributes that can contain sse events, right
* now hx-trigger and sse-swap and adds listeners based on these attributes too
* the closest event source
*
* @param {HTMLElement} elt
*/
function registerSSE(elt) {
// Add message handlers for every `sse-swap` attribute
if (api.getAttributeValue(elt, 'sse-swap')) {
// Find closest existing event source
var sourceElement = api.getClosestMatch(elt, hasEventSource)
if (sourceElement == null) {
// api.triggerErrorEvent(elt, "htmx:noSSESourceError")
return null // no eventsource in parentage, orphaned element
}
// Set internalData and source
var internalData = api.getInternalData(sourceElement)
var source = internalData.sseEventSource
var sseSwapAttr = api.getAttributeValue(elt, 'sse-swap')
var sseEventNames = sseSwapAttr.split(',')
for (var i = 0; i < sseEventNames.length; i++) {
const sseEventName = sseEventNames[i].trim()
const listener = function(event) {
// If the source is missing then close SSE
if (maybeCloseSSESource(sourceElement)) {
return
}
// If the body no longer contains the element, remove the listener
if (!api.bodyContains(elt)) {
source.removeEventListener(sseEventName, listener)
return
}
// swap the response into the DOM and trigger a notification
if (!api.triggerEvent(elt, 'htmx:sseBeforeMessage', event)) {
return
}
swap(elt, event.data)
api.triggerEvent(elt, 'htmx:sseMessage', event)
}
// Register the new listener
api.getInternalData(elt).sseEventListener = listener
source.addEventListener(sseEventName, listener)
}
}
// Add message handlers for every `hx-trigger="sse:*"` attribute
if (api.getAttributeValue(elt, 'hx-trigger')) {
// Find closest existing event source
var sourceElement = api.getClosestMatch(elt, hasEventSource)
if (sourceElement == null) {
// api.triggerErrorEvent(elt, "htmx:noSSESourceError")
return null // no eventsource in parentage, orphaned element
}
// Set internalData and source
var internalData = api.getInternalData(sourceElement)
var source = internalData.sseEventSource
var triggerSpecs = api.getTriggerSpecs(elt)
triggerSpecs.forEach(function(ts) {
if (ts.trigger.slice(0, 4) !== 'sse:') {
return
}
var listener = function (event) {
if (maybeCloseSSESource(sourceElement)) {
return
}
if (!api.bodyContains(elt)) {
source.removeEventListener(ts.trigger.slice(4), listener)
}
// Trigger events to be handled by the rest of htmx
htmx.trigger(elt, ts.trigger, event)
htmx.trigger(elt, 'htmx:sseMessage', event)
}
// Register the new listener
api.getInternalData(elt).sseEventListener = listener
source.addEventListener(ts.trigger.slice(4), listener)
})
}
}
/**
* ensureEventSourceOnElement creates a new EventSource connection on the provided element.
* If a usable EventSource already exists, then it is returned. If not, then a new EventSource
* is created and stored in the element's internalData.
* @param {HTMLElement} elt
* @param {number} retryCount
* @returns {EventSource | null}
*/
function ensureEventSourceOnElement(elt, retryCount) {
if (elt == null) {
return null
}
// handle extension source creation attribute
if (api.getAttributeValue(elt, 'sse-connect')) {
var sseURL = api.getAttributeValue(elt, 'sse-connect')
if (sseURL == null) {
return
}
ensureEventSource(elt, sseURL, retryCount)
}
registerSSE(elt)
}
function ensureEventSource(elt, url, retryCount) {
var source = htmx.createEventSource(url)
source.onerror = function(err) {
// Log an error event
api.triggerErrorEvent(elt, 'htmx:sseError', { error: err, source })
// If parent no longer exists in the document, then clean up this EventSource
if (maybeCloseSSESource(elt)) {
return
}
// Otherwise, try to reconnect the EventSource
if (source.readyState === EventSource.CLOSED) {
retryCount = retryCount || 0
retryCount = Math.max(Math.min(retryCount * 2, 128), 1)
var timeout = retryCount * 500
window.setTimeout(function() {
ensureEventSourceOnElement(elt, retryCount)
}, timeout)
}
}
source.onopen = function(evt) {
api.triggerEvent(elt, 'htmx:sseOpen', { source })
if (retryCount && retryCount > 0) {
const childrenToFix = elt.querySelectorAll("[sse-swap], [data-sse-swap], [hx-trigger], [data-hx-trigger]")
for (let i = 0; i < childrenToFix.length; i++) {
registerSSE(childrenToFix[i])
}
// We want to increase the reconnection delay for consecutive failed attempts only
retryCount = 0
}
}
api.getInternalData(elt).sseEventSource = source
var closeAttribute = api.getAttributeValue(elt, "sse-close");
if (closeAttribute) {
// close eventsource when this message is received
source.addEventListener(closeAttribute, function() {
api.triggerEvent(elt, 'htmx:sseClose', {
source,
type: 'message',
})
source.close()
});
}
}
/**
* maybeCloseSSESource confirms that the parent element still exists.
* If not, then any associated SSE source is closed and the function returns true.
*
* @param {HTMLElement} elt
* @returns boolean
*/
function maybeCloseSSESource(elt) {
if (!api.bodyContains(elt)) {
var source = api.getInternalData(elt).sseEventSource
if (source != undefined) {
api.triggerEvent(elt, 'htmx:sseClose', {
source,
type: 'nodeMissing',
})
source.close()
// source = null
return true
}
}
return false
}
/**
* @param {HTMLElement} elt
* @param {string} content
*/
function swap(elt, content) {
api.withExtensions(elt, function(extension) {
content = extension.transformResponse(content, null, elt)
})
var swapSpec = api.getSwapSpecification(elt)
var target = api.getTarget(elt)
api.swap(target, content, swapSpec)
}
function hasEventSource(node) {
return api.getInternalData(node).sseEventSource != null
}
})()

1
public/js/htmx.min.js vendored Normal file

File diff suppressed because one or more lines are too long

18
public/js/livereload.js Normal file
View File

@ -0,0 +1,18 @@
async function buildTime() {
const r = await fetch("/__livereload", { cache: "no-cache"})
return r.text()
}
async function liveReload() {
console.log("running in development mode")
let t_old = await buildTime()
setInterval(async () => {
let t_new = await buildTime()
if (t_old !== t_new) {
window.location.reload()
}
}, 1000)
}
liveReload().catch(console.error)

View File

@ -7,6 +7,7 @@ import (
"github.com/ekzyis/zaply/env"
"github.com/ekzyis/zaply/lightning/phoenixd"
"github.com/ekzyis/zaply/lnurl"
"github.com/ekzyis/zaply/pages"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
@ -41,6 +42,11 @@ func NewServer() *Server {
lnurl.Router(s.Echo, p)
s.Static("/", "public/")
s.GET("/overlay", pages.OverlayHandler)
s.GET("/overlay/sse", sseHandler(p.IncomingPayments()))
return s
}

83
server/sse.go Normal file
View File

@ -0,0 +1,83 @@
package server
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"time"
"github.com/ekzyis/zaply/lightning"
"github.com/labstack/echo/v4"
)
type Event struct {
Event []byte
Data []byte
}
func (ev *Event) MarshalTo(w io.Writer) error {
if _, err := fmt.Fprintf(w, "event: %s\n", ev.Event); err != nil {
return err
}
for _, line := range bytes.Split(ev.Data, []byte("\n")) {
if _, err := fmt.Fprintf(w, "data: %s\n", line); err != nil {
return err
}
}
if _, err := fmt.Fprint(w, "\n"); err != nil {
return err
}
return nil
}
func sseHandler(invSrc chan *lightning.Invoice) echo.HandlerFunc {
return func(c echo.Context) error {
w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// disable nginx buffering
w.Header().Set("X-Accel-Buffering", "no")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Request().Context().Done():
return nil
case <-ticker.C:
event := Event{
Event: []byte("message"),
Data: []byte("keepalive"),
}
if err := event.MarshalTo(w); err != nil {
return err
}
case inv := <-invSrc:
data, err := json.Marshal(inv)
if err != nil {
return err
}
event := Event{
Event: []byte("zap"),
Data: data,
}
log.Printf("sending zap event: %s", inv.PaymentHash)
if err := event.MarshalTo(w); err != nil {
return err
}
}
w.Flush()
}
}
}