|
- package caddyhugo
-
- import (
- "fmt"
- "log"
- "net/http"
- "time"
-
- "git.stephensearles.com/stephen/acedoc"
- "github.com/gorilla/websocket"
- )
-
- // DeltaConn identifies the methods used in the original websocket implementation.
- type DeltaConn interface {
- ReadJSON(v interface{}) error
- WriteJSON(v interface{}) error
- }
-
- func (ch *CaddyHugo) ObserveLTime(ltime uint64) uint64 {
- ch.mtx.Lock()
-
- if ch.ltime < ltime {
- ch.ltime = ltime
- }
-
- ch.mtx.Unlock()
- return ch.LTime()
- }
-
- func (ch *CaddyHugo) LTime() uint64 {
- ch.mtx.Lock()
- defer ch.mtx.Unlock()
- ch.ltime++
- return ch.ltime
- }
-
- func (ch *CaddyHugo) ShouldApply(ltime uint64) bool {
-
- ch.mtx.Lock()
- defer ch.mtx.Unlock()
-
- if _, ok := ch.confirmingToClient[ltime]; ok {
- return false
- }
-
- return true
- }
-
- // ConfirmLTime marks an ltime as something that should be confirmed with clients
- func (ch *CaddyHugo) ConfirmLTime(ltime uint64) {
- ch.mtx.Lock()
- defer ch.mtx.Unlock()
- ch.confirmingToClient[ltime] = struct{}{}
- }
-
- // Confirming returns the current list of LTimes that need to be confirmed
- // to clients
- func (ch *CaddyHugo) Confirming() []uint64 {
- var times []uint64
-
- ch.mtx.Lock()
- defer ch.mtx.Unlock()
-
- for ltime := range ch.confirmingToClient {
- times = append(times, ltime)
- }
- return times
- }
-
- // LowestPendingConfirmation identifies the lowest LTime that needs to be
- // confirmed to clients
- func (ch *CaddyHugo) LowestPendingConfirmation() uint64 {
- var lowest uint64
- for _, c := range ch.Confirming() {
- if lowest == 0 || c < lowest {
- lowest = c
- }
- }
- return lowest
- }
-
- func (ch *CaddyHugo) ClearConfirmed(lowestPending uint64) {
- ch.mtx.Lock()
- defer ch.mtx.Unlock()
-
- for ltime := range ch.confirmingToClient {
- if ltime < lowestPending {
- delete(ch.confirmingToClient, ltime)
- }
- }
- }
-
- func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) {
- var upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- }
-
- conn, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- fmt.Println(err)
- return http.StatusBadRequest, err
- }
-
- conn.SetReadDeadline(time.Time{})
-
- doc, err := ch.editSession(docNameFromEditRequest(r))
- if err != nil {
- fmt.Println(err)
- return http.StatusBadRequest, err
- }
-
- return ch.handleDeltaConn(conn, doc)
- }
-
- func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
- return Message{
- Deltas: deltas,
- LTime: ch.LTime(),
- Confirmed: ch.Confirming(),
- LowestPending: ch.LowestPendingConfirmation(),
- }
- }
-
- func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) {
- const idlePing = 15 * time.Second
- const idlePingShort = 1 * time.Millisecond
-
- errCh := make(chan error)
- doneCh := make(chan struct{})
- defer func() {
- close(doneCh)
- close(errCh)
- for err := range errCh {
- log.Println(err)
- }
- }()
-
- timer := time.NewTimer(idlePing)
- resetTimer := func(d time.Duration) {
- if !timer.Stop() {
- select {
- case <-timer.C:
- default:
- }
- }
- timer.Reset(d)
- }
-
- wroteMessagesCh := make(chan Message, 2)
-
- client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error {
- m := ch.Message(ds...)
- wroteMessagesCh <- m
- return conn.WriteJSON(m)
- }))
-
- ch.mtx.Lock()
- doc.clients++
- ch.mtx.Unlock()
-
- defer func() {
- ch.mtx.Lock()
- client.Close()
- doc.clients--
- ch.mtx.Unlock()
- }()
-
- readMessagesCh := make(chan Message, 2)
- go func() {
- for {
- var message Message
-
- err := conn.ReadJSON(&message)
- if err != nil {
- errCh <- fmt.Errorf("error reading message from client conn: %v", err)
- return
- }
- if message.LTime != 0 {
- ch.ObserveLTime(message.LTime)
- }
-
- if len(message.Deltas) == 0 {
- continue
- }
-
- if !ch.ShouldApply(message.LTime) {
- continue
- }
-
- err = client.PushDeltas(message.Deltas...)
- if err != nil {
- errCh <- fmt.Errorf("error pushing deltas into document: %v", err)
- return
- }
-
- ch.ConfirmLTime(message.LTime)
- ch.ClearConfirmed(message.LowestPending)
-
- select {
- case readMessagesCh <- message:
- case <-doneCh:
- return
- }
-
- }
- }()
-
- for {
- select {
- case err := <-errCh:
- fmt.Println("error handling websocket connection:", err)
- return 500, err
- default:
- }
-
- select {
- case <-timer.C:
- conn.WriteJSON(ch.Message())
- resetTimer(idlePing)
- case <-readMessagesCh:
- resetTimer(idlePingShort)
- case <-wroteMessagesCh:
- resetTimer(idlePing)
- case <-doneCh:
- return 200, nil
- }
- }
- }
-
- type Message struct {
- Deltas []acedoc.Delta `json:"deltas"`
- LTime uint64 `json:"ltime"`
- Confirmed []uint64 `json:"confirmed"`
- LowestPending uint64 `json:"lowestPending"`
- }
|