You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
caddy-hugo2/deltas.go

240 lines
4.5 KiB

package caddyhugo
import (
"fmt"
"log"
"net/http"
"time"
"git.stephensearles.com/stephen/acedoc"
"github.com/gorilla/websocket"
)
// DeltaConn identifies the methods used in the original websocket implementation.
type DeltaConn interface {
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
}
func (ch *CaddyHugo) ObserveLTime(ltime uint64) uint64 {
ch.mtx.Lock()
if ch.ltime < ltime {
ch.ltime = ltime
}
ch.mtx.Unlock()
return ch.LTime()
}
func (ch *CaddyHugo) LTime() uint64 {
ch.mtx.Lock()
defer ch.mtx.Unlock()
ch.ltime++
return ch.ltime
}
func (ch *CaddyHugo) ShouldApply(ltime uint64) bool {
lowest := ch.LowestPendingConfirmation()
for _, c := range ch.Confirming() {
if lowest == 0 || c < lowest {
lowest = c
}
}
if ltime < lowest {
return false
}
ch.mtx.Lock()
defer ch.mtx.Unlock()
if _, ok := ch.confirmingToClient[ltime]; ok {
return false
}
return true
}
func (ch *CaddyHugo) ConfirmLTime(ltime uint64) {
ch.mtx.Lock()
defer ch.mtx.Unlock()
ch.confirmingToClient[ltime] = struct{}{}
}
func (ch *CaddyHugo) Confirming() []uint64 {
var times []uint64
ch.mtx.Lock()
defer ch.mtx.Unlock()
for ltime := range ch.confirmingToClient {
times = append(times, ltime)
}
return times
}
func (ch *CaddyHugo) LowestPendingConfirmation() uint64 {
var lowest uint64
for _, c := range ch.Confirming() {
if lowest == 0 || c < lowest {
lowest = c
}
}
return lowest
}
func (ch *CaddyHugo) ClearConfirmed(lowestPending uint64) {
ch.mtx.Lock()
defer ch.mtx.Unlock()
for ltime := range ch.confirmingToClient {
if ltime < lowestPending {
delete(ch.confirmingToClient, ltime)
}
}
}
func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println(err)
return http.StatusBadRequest, err
}
conn.SetReadDeadline(time.Time{})
doc, err := ch.editSession(docNameFromEditRequest(r))
if err != nil {
fmt.Println(err)
return http.StatusBadRequest, err
}
return ch.handleDeltaConn(conn, doc)
}
func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
return Message{
Deltas: deltas,
LTime: ch.LTime(),
Confirmed: ch.Confirming(),
LowestPending: ch.LowestPendingConfirmation(),
}
}
func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) {
const idlePing = 15 * time.Second
const idlePingShort = 1 * time.Millisecond
errCh := make(chan error)
doneCh := make(chan struct{})
defer func() {
close(doneCh)
close(errCh)
for err := range errCh {
log.Println(err)
}
}()
timer := time.NewTimer(idlePing)
resetTimer := func(d time.Duration) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(d)
}
wroteMessagesCh := make(chan Message, 2)
client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error {
m := ch.Message(ds...)
wroteMessagesCh <- m
return conn.WriteJSON(m)
}))
ch.mtx.Lock()
doc.clients++
ch.mtx.Unlock()
defer func() {
ch.mtx.Lock()
client.Close()
doc.clients--
ch.mtx.Unlock()
}()
readMessagesCh := make(chan Message, 2)
go func() {
for {
var message Message
err := conn.ReadJSON(&message)
if err != nil {
errCh <- fmt.Errorf("error reading message from client conn: %v", err)
return
}
ch.ObserveLTime(message.LTime)
if len(message.Deltas) == 0 {
time.Sleep(10 * time.Microsecond)
continue
}
if !ch.ShouldApply(message.LTime) {
continue
}
err = client.PushDeltas(message.Deltas...)
if err != nil {
errCh <- fmt.Errorf("error pushing deltas into document: %v", err)
return
}
ch.ConfirmLTime(message.LTime)
ch.ClearConfirmed(message.LowestPending)
select {
case readMessagesCh <- message:
case <-doneCh:
return
}
}
}()
for {
select {
case err := <-errCh:
fmt.Println("error handling websocket connection:", err)
return 500, err
default:
}
select {
case <-timer.C:
conn.WriteJSON(ch.Message())
resetTimer(idlePing)
case <-readMessagesCh:
resetTimer(idlePingShort)
case <-wroteMessagesCh:
resetTimer(idlePing)
case <-doneCh:
return 200, nil
}
}
}
type Message struct {
Deltas []acedoc.Delta `json:"deltas"`
LTime uint64 `json:"ltime"`
Confirmed []uint64 `json:"confirmed"`
LowestPending uint64 `json:"lowestPending"`
}