Browse Source

wip

pull/16/head
Stephen Searles 2 years ago
parent
commit
3d57e589e6
3 changed files with 23 additions and 22 deletions
  1. 12
    13
      deltas.go
  2. 11
    8
      doc_test.go
  3. 0
    1
      hugo.go

+ 12
- 13
deltas.go View File

@@ -35,33 +35,27 @@ func (ch *CaddyHugo) LTime() uint64 {
}

func (ch *CaddyHugo) ShouldApply(ltime uint64) bool {
lowest := ch.LowestPendingConfirmation()
for _, c := range ch.Confirming() {
if lowest == 0 || c < lowest {
lowest = c
}
}

if ltime < lowest {
return false
}

ch.mtx.Lock()
defer ch.mtx.Unlock()

if _, ok := ch.confirmingToClient[ltime]; ok {
fmt.Println("rejecting ltime", ltime, "because was already processed")
return false
}

return true
}

// ConfirmLTime marks an ltime as something that should be confirmed with clients
func (ch *CaddyHugo) ConfirmLTime(ltime uint64) {
ch.mtx.Lock()
defer ch.mtx.Unlock()
ch.confirmingToClient[ltime] = struct{}{}
}

// Confirming returns the current list of LTimes that need to be confirmed
// to clients
func (ch *CaddyHugo) Confirming() []uint64 {
var times []uint64

@@ -74,6 +68,8 @@ func (ch *CaddyHugo) Confirming() []uint64 {
return times
}

// LowestPendingConfirmation identifies the lowest LTime that needs to be
// confirmed to clients
func (ch *CaddyHugo) LowestPendingConfirmation() uint64 {
var lowest uint64
for _, c := range ch.Confirming() {
@@ -128,7 +124,7 @@ func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
}

func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) {
const idlePing = 15 * time.Second
const idlePing = 1 * time.Second
const idlePingShort = 1 * time.Millisecond

errCh := make(chan error)
@@ -157,6 +153,7 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err
client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error {
m := ch.Message(ds...)
wroteMessagesCh <- m
fmt.Println("WRITING")
return conn.WriteJSON(m)
}))

@@ -181,16 +178,18 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err
errCh <- fmt.Errorf("error reading message from client conn: %v", err)
return
}
ch.ObserveLTime(message.LTime)
if message.LTime != 0 {
ch.ObserveLTime(message.LTime)
}

if len(message.Deltas) == 0 {
time.Sleep(10 * time.Microsecond)
continue
}

if !ch.ShouldApply(message.LTime) {
continue
}
fmt.Printf("READ %p\n", conn)

err = client.PushDeltas(message.Deltas...)
if err != nil {

+ 11
- 8
doc_test.go View File

@@ -100,6 +100,7 @@ type WebsocketTester struct {
mtx sync.Mutex
}

// ReadJSON reads the next pending message from the "client" into v
func (ws *WebsocketTester) ReadJSON(v interface{}) error {
ws.mtx.Lock()
defer ws.mtx.Unlock()
@@ -113,6 +114,7 @@ func (ws *WebsocketTester) ReadJSON(v interface{}) error {
return err
}

// WriteJSON "sends" a message, v, to the "client"
func (ws *WebsocketTester) WriteJSON(v interface{}) error {
ws.mtx.Lock()
defer ws.mtx.Unlock()
@@ -122,16 +124,13 @@ func (ws *WebsocketTester) WriteJSON(v interface{}) error {
panic("wrong type written to WebsocketTester")
}

if len(m.Deltas) == 0 {
return nil
}

ws.wroteMessages = append(ws.wroteMessages, m)
ws.wroteDeltas = append(ws.wroteDeltas, m.Deltas...)

return nil
}

// ReceiveJSON queues a message to be sent to the client
func (ws *WebsocketTester) ReceiveJSON(v interface{}) error {
ws.mtx.Lock()
defer ws.mtx.Unlock()
@@ -177,6 +176,7 @@ func TestDeltasSingle(t *testing.T) {
// so we expect to have written 0 messages
if len(client.wroteMessages) != 0 {
t.Errorf("client wrote %d messages, should have written %d", len(client.wroteMessages), 0)
t.Logf("%v", client.wroteMessages)
}

// we received one, so make sure that's counted properly
@@ -268,13 +268,15 @@ func TestDeltasMulti(t *testing.T) {

doc, err := w.CH.editSession("content/" + title + ".md")
if err != nil {
t.Fatal("couldn't establish docref:", err)
t.Fatal("couldn't establish edit session:", err)
}

go w.CH.handleDeltaConn(clients[0], doc)
go w.CH.handleDeltaConn(clients[1], doc)
go w.CH.handleDeltaConn(clients[2], doc)

time.Sleep(100 * time.Millisecond)

a := acedoc.Insert(0, 0, "a")
b := acedoc.Insert(0, 0, "b")
c := acedoc.Insert(0, 0, "c")
@@ -283,12 +285,13 @@ func TestDeltasMulti(t *testing.T) {
clients[1].ReceiveJSON(w.CH.Message(b))
clients[2].ReceiveJSON(w.CH.Message(c))

time.Sleep(400 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)

for i, client := range clients {
client.mtx.Lock()
// all clients should have "written" 2 deltas (could be the same
// message) that came from the other clients
t.Logf("client %d exists", i)
// all clients should have "written" 2 deltas out to their "browser"
// that came from the other clients
if len(client.wroteDeltas) != 2 {
t.Errorf("client %d wrote %d deltas, should have written 2", i, len(client.wroteDeltas))
}

+ 0
- 1
hugo.go View File

@@ -52,7 +52,6 @@ func writeThemeFiles(dir string) error {
if err != nil {
return err
}
fmt.Println("writing", path.Join(dir, asset))
f, err := os.Create(path.Join(dir, asset))
if err != nil {
return err

Loading…
Cancel
Save