You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
caddy-hugo2/deltas.go

171 lines
3.0 KiB

package caddyhugo
import (
"fmt"
"log"
"net/http"
"time"
"git.stephensearles.com/stephen/acedoc"
"github.com/gorilla/websocket"
)
// DeltaConn identifies the methods used in the original websocket implementation.
type DeltaConn interface {
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
}
const (
IdleWebsocketTimeout = 10 * time.Minute
WebsocketFileTicker = 1 * time.Second
)
func (ch *CaddyHugo) ObserveLTime(ltime uint64) uint64 {
ch.mtx.Lock()
if ch.ltime < ltime {
ch.ltime = ltime
}
ch.mtx.Unlock()
return ch.LTime()
}
func (ch *CaddyHugo) LTime() uint64 {
ch.mtx.Lock()
defer ch.mtx.Unlock()
ch.ltime++
return ch.ltime
}
func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println(err)
return http.StatusBadRequest, err
}
conn.SetReadDeadline(time.Time{})
doc, err := ch.doc(r)
if err != nil {
fmt.Println(err)
return http.StatusBadRequest, err
}
return ch.handleConn(conn, doc)
}
func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
return Message{
Deltas: deltas,
LTime: ch.LTime(),
}
}
func (ch *CaddyHugo) handleConn(conn DeltaConn, doc *docref) (int, error) {
const idlePing = 15 * time.Second
const idlePingShort = 1 * time.Millisecond
errCh := make(chan error)
doneCh := make(chan struct{})
defer func() {
close(doneCh)
close(errCh)
for err := range errCh {
log.Println(err)
}
}()
timer := time.NewTimer(idlePing)
resetTimer := func(d time.Duration) {
if !timer.Stop() {
<-timer.C
}
timer.Reset(d)
}
wroteMessagesCh := make(chan Message, 2)
client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error {
m := ch.Message(ds...)
wroteMessagesCh <- m
return conn.WriteJSON(m)
}))
ch.mtx.Lock()
doc.clients++
ch.mtx.Unlock()
defer func() {
ch.mtx.Lock()
client.Close()
doc.clients--
ch.mtx.Unlock()
}()
readMessagesCh := make(chan Message, 2)
go func() {
for {
var message Message
err := conn.ReadJSON(&message)
if err != nil {
errCh <- fmt.Errorf("error reading message from client conn: %v", err)
return
}
ch.ObserveLTime(message.LTime)
if len(message.Deltas) == 0 {
time.Sleep(10 * time.Microsecond)
continue
}
err = client.PushDeltas(message.Deltas...)
if err != nil {
errCh <- fmt.Errorf("error pushing deltas into document: %v", err)
return
}
select {
case readMessagesCh <- message:
case <-doneCh:
return
}
}
}()
for {
select {
case err := <-errCh:
fmt.Println(err)
return 500, err
default:
}
select {
case <-timer.C:
conn.WriteJSON(ch.Message())
resetTimer(idlePing)
case <-readMessagesCh:
resetTimer(idlePingShort)
case <-wroteMessagesCh:
resetTimer(idlePing)
case <-doneCh:
return 200, nil
}
}
}
type Message struct {
Deltas []acedoc.Delta `json:"deltas"`
LTime uint64 `json:"ltime"`
}