diff --git a/caddyhugo.go b/caddyhugo.go index 36730c4..776d7a8 100644 --- a/caddyhugo.go +++ b/caddyhugo.go @@ -27,7 +27,6 @@ func init() { Action: plugin.SetupCaddy, }) - // ... there are others. See the godoc. } // CaddyHugo implements the plugin @@ -46,7 +45,8 @@ type CaddyHugo struct { authorTmpl, adminTmpl, editTmpl *template.Template - ltime uint64 + ltime uint64 + confirmingToClient map[uint64]struct{} } // Build rebuilds the cached state of the site. TODO: determine if this republishes diff --git a/deltas.go b/deltas.go index 5955bfb..d3036cb 100644 --- a/deltas.go +++ b/deltas.go @@ -34,6 +34,67 @@ func (ch *CaddyHugo) LTime() uint64 { return ch.ltime } +func (ch *CaddyHugo) ShouldApply(ltime uint64) bool { + lowest := ch.LowestPendingConfirmation() + for _, c := range ch.Confirming() { + if lowest == 0 || c < lowest { + lowest = c + } + } + + if ltime < lowest { + return false + } + + ch.mtx.Lock() + defer ch.mtx.Unlock() + + if _, ok := ch.confirmingToClient[ltime]; ok { + return false + } + + return true +} + +func (ch *CaddyHugo) ConfirmLTime(ltime uint64) { + ch.mtx.Lock() + defer ch.mtx.Unlock() + ch.confirmingToClient[ltime] = struct{}{} +} + +func (ch *CaddyHugo) Confirming() []uint64 { + var times []uint64 + + ch.mtx.Lock() + defer ch.mtx.Unlock() + + for ltime := range ch.confirmingToClient { + times = append(times, ltime) + } + return times +} + +func (ch *CaddyHugo) LowestPendingConfirmation() uint64 { + var lowest uint64 + for _, c := range ch.Confirming() { + if lowest == 0 || c < lowest { + lowest = c + } + } + return lowest +} + +func (ch *CaddyHugo) ClearConfirmed(lowestPending uint64) { + ch.mtx.Lock() + defer ch.mtx.Unlock() + + for ltime := range ch.confirmingToClient { + if ltime < lowestPending { + delete(ch.confirmingToClient, ltime) + } + } +} + func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) { var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, @@ -59,8 +120,10 @@ func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message { return Message{ - Deltas: deltas, - LTime: ch.LTime(), + Deltas: deltas, + LTime: ch.LTime(), + Confirmed: ch.Confirming(), + LowestPending: ch.LowestPendingConfirmation(), } } @@ -125,12 +188,19 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err continue } + if !ch.ShouldApply(message.LTime) { + continue + } + err = client.PushDeltas(message.Deltas...) if err != nil { errCh <- fmt.Errorf("error pushing deltas into document: %v", err) return } + ch.ConfirmLTime(message.LTime) + ch.ClearConfirmed(message.LowestPending) + select { case readMessagesCh <- message: case <-doneCh: @@ -163,6 +233,8 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err } type Message struct { - Deltas []acedoc.Delta `json:"deltas"` - LTime uint64 `json:"ltime"` + Deltas []acedoc.Delta `json:"deltas"` + LTime uint64 `json:"ltime"` + Confirmed []uint64 `json:"confirmed"` + LowestPending uint64 `json:"lowestPending"` } diff --git a/setup.go b/setup.go index af4354f..649279d 100644 --- a/setup.go +++ b/setup.go @@ -17,6 +17,14 @@ import ( func (ch *CaddyHugo) SetupCaddy(c *caddy.Controller) error { ch.Site = httpserver.GetConfig(c) err := ch.Setup(ch.Site.Root) + + caddy.RegisterEventHook("caddyhugo-shutdown", func(eventType caddy.EventName, eventInfo interface{}) error { + if eventType == caddy.ShutdownEvent { + return ch.persistAllEdits() + } + return nil + }) + ch.Site.AddMiddleware(ch.Middleware(c)) return err } @@ -26,6 +34,7 @@ func (ch *CaddyHugo) Setup(dir string) error { ch.Dir = dir ch.docs = make(map[string]*editSession) + ch.confirmingToClient = make(map[uint64]struct{}) ch.HugoCfg = &deps.DepsCfg{} diff --git a/templates.go b/templates.go index af04aac..a0deb30 100644 --- a/templates.go +++ b/templates.go @@ -127,7 +127,7 @@ a {