resilience improvements

pull/10/head
Stephen Searles 7 years ago
parent 7ae2e08c4a
commit 4b76df1811
  1. 2
      caddyhugo.go
  2. 72
      deltas.go
  3. 9
      setup.go
  4. 41
      templates.go

@ -27,7 +27,6 @@ func init() {
Action: plugin.SetupCaddy,
})
// ... there are others. See the godoc.
}
// CaddyHugo implements the plugin
@ -47,6 +46,7 @@ type CaddyHugo struct {
authorTmpl, adminTmpl, editTmpl *template.Template
ltime uint64
confirmingToClient map[uint64]struct{}
}
// Build rebuilds the cached state of the site. TODO: determine if this republishes

@ -34,6 +34,67 @@ func (ch *CaddyHugo) LTime() uint64 {
return ch.ltime
}
func (ch *CaddyHugo) ShouldApply(ltime uint64) bool {
lowest := ch.LowestPendingConfirmation()
for _, c := range ch.Confirming() {
if lowest == 0 || c < lowest {
lowest = c
}
}
if ltime < lowest {
return false
}
ch.mtx.Lock()
defer ch.mtx.Unlock()
if _, ok := ch.confirmingToClient[ltime]; ok {
return false
}
return true
}
func (ch *CaddyHugo) ConfirmLTime(ltime uint64) {
ch.mtx.Lock()
defer ch.mtx.Unlock()
ch.confirmingToClient[ltime] = struct{}{}
}
func (ch *CaddyHugo) Confirming() []uint64 {
var times []uint64
ch.mtx.Lock()
defer ch.mtx.Unlock()
for ltime := range ch.confirmingToClient {
times = append(times, ltime)
}
return times
}
func (ch *CaddyHugo) LowestPendingConfirmation() uint64 {
var lowest uint64
for _, c := range ch.Confirming() {
if lowest == 0 || c < lowest {
lowest = c
}
}
return lowest
}
func (ch *CaddyHugo) ClearConfirmed(lowestPending uint64) {
ch.mtx.Lock()
defer ch.mtx.Unlock()
for ltime := range ch.confirmingToClient {
if ltime < lowestPending {
delete(ch.confirmingToClient, ltime)
}
}
}
func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
@ -61,6 +122,8 @@ func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
return Message{
Deltas: deltas,
LTime: ch.LTime(),
Confirmed: ch.Confirming(),
LowestPending: ch.LowestPendingConfirmation(),
}
}
@ -125,12 +188,19 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err
continue
}
if !ch.ShouldApply(message.LTime) {
continue
}
err = client.PushDeltas(message.Deltas...)
if err != nil {
errCh <- fmt.Errorf("error pushing deltas into document: %v", err)
return
}
ch.ConfirmLTime(message.LTime)
ch.ClearConfirmed(message.LowestPending)
select {
case readMessagesCh <- message:
case <-doneCh:
@ -165,4 +235,6 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err
type Message struct {
Deltas []acedoc.Delta `json:"deltas"`
LTime uint64 `json:"ltime"`
Confirmed []uint64 `json:"confirmed"`
LowestPending uint64 `json:"lowestPending"`
}

@ -17,6 +17,14 @@ import (
func (ch *CaddyHugo) SetupCaddy(c *caddy.Controller) error {
ch.Site = httpserver.GetConfig(c)
err := ch.Setup(ch.Site.Root)
caddy.RegisterEventHook("caddyhugo-shutdown", func(eventType caddy.EventName, eventInfo interface{}) error {
if eventType == caddy.ShutdownEvent {
return ch.persistAllEdits()
}
return nil
})
ch.Site.AddMiddleware(ch.Middleware(c))
return err
}
@ -26,6 +34,7 @@ func (ch *CaddyHugo) Setup(dir string) error {
ch.Dir = dir
ch.docs = make(map[string]*editSession)
ch.confirmingToClient = make(map[uint64]struct{})
ch.HugoCfg = &deps.DepsCfg{}

@ -127,7 +127,7 @@ a {
<div id="container" >
<div id="header">
<div id="lastSaved">
<span v-if="ltime >serverLtime && (sendQueue.length > 0 || sentRecently.length > 0)">last saved ${ lastSaved.from(now) }, saving</span>
<span v-if="sendQueue.length > 0 || Object.keys(needConfirmation).length > 0">last saved ${ lastSaved.from(now) }, saving</span>
<span v-else>saved</span>
<span v-if="connectionError">, ${connectionError}</span>
</div>
@ -158,13 +158,14 @@ a {
}
var uiBindings = {
ltime: 0,
ltime: {{ .LTime }},
serverLtime: 0,
lastSaved: moment(),
now: moment(),
connectionError: null,
sendQueue: [],
sentRecently: [],
needConfirmation: {},
};
var app = new Vue({
@ -178,10 +179,12 @@ a {
return uiBindings.ltime
}
function observeServer(l) {
function observeServer(l, confirmed) {
uiBindings.serverLtime = l;
while (uiBindings.sentRecently.length > 0 && uiBindings.sentRecently[0].ltime < l) {
uiBindings.sentRecently.pop();
if (confirmed && confirmed.length > 0) {
confirmed.forEach(function (e) {
delete uiBindings.needConfirmation[e];
})
}
observe(l);
}
@ -231,20 +234,36 @@ a {
}
}
}
uiBindings.now = moment();
if (uiBindings.connectionError) {
socket = connect();
} else if (uiBindings.sendQueue.length > 0) {
var ltime = getLtime();
socket.send(JSON.stringify({
// record lowest pending
// ltime at the time this message
// was serialized
var lowestPending = ltime;
for (c in uiBindings.needConfirmation) {
c = parseInt(c, 10);
if (lowestPending === 0 || c < lowestPending) {
lowestPending = c;
}
}
var msg = JSON.stringify({
"deltas": uiBindings.sendQueue,
"ltime": ltime,
}));
uiBindings.sentRecently.push({
"ltime": ltime,
"sent": uiBindings.sendQueue,
"lowestPending": lowestPending,
});
uiBindings.sendQueue = [];
uiBindings.needConfirmation[ltime] = msg;
}
for (ltime in uiBindings.needConfirmation) {
var msg = uiBindings.needConfirmation[ltime];
socket.send(msg);
}
}, 500);
@ -254,7 +273,7 @@ a {
// Listen for messages
socket.addEventListener('message', function (event) {
var message = JSON.parse(event.data);
observeServer(message.ltime);
observeServer(message.ltime, message.confirmed);
var deltas = [];
deltas.push.apply(deltas, message.deltas);

Loading…
Cancel
Save