From 3d57e589e6e18dc187a20e062fe46637d18612b7 Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Sun, 24 Sep 2017 22:10:27 -0400 Subject: [PATCH 1/5] wip --- deltas.go | 25 ++++++++++++------------- doc_test.go | 19 +++++++++++-------- hugo.go | 1 - 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/deltas.go b/deltas.go index d3036cb..f1822e9 100644 --- a/deltas.go +++ b/deltas.go @@ -35,33 +35,27 @@ func (ch *CaddyHugo) LTime() uint64 { } func (ch *CaddyHugo) ShouldApply(ltime uint64) bool { - lowest := ch.LowestPendingConfirmation() - for _, c := range ch.Confirming() { - if lowest == 0 || c < lowest { - lowest = c - } - } - - if ltime < lowest { - return false - } ch.mtx.Lock() defer ch.mtx.Unlock() if _, ok := ch.confirmingToClient[ltime]; ok { + fmt.Println("rejecting ltime", ltime, "because was already processed") return false } return true } +// ConfirmLTime marks an ltime as something that should be confirmed with clients func (ch *CaddyHugo) ConfirmLTime(ltime uint64) { ch.mtx.Lock() defer ch.mtx.Unlock() ch.confirmingToClient[ltime] = struct{}{} } +// Confirming returns the current list of LTimes that need to be confirmed +// to clients func (ch *CaddyHugo) Confirming() []uint64 { var times []uint64 @@ -74,6 +68,8 @@ func (ch *CaddyHugo) Confirming() []uint64 { return times } +// LowestPendingConfirmation identifies the lowest LTime that needs to be +// confirmed to clients func (ch *CaddyHugo) LowestPendingConfirmation() uint64 { var lowest uint64 for _, c := range ch.Confirming() { @@ -128,7 +124,7 @@ func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message { } func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) { - const idlePing = 15 * time.Second + const idlePing = 1 * time.Second const idlePingShort = 1 * time.Millisecond errCh := make(chan error) @@ -157,6 +153,7 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { m := ch.Message(ds...) wroteMessagesCh <- m + fmt.Println("WRITING") return conn.WriteJSON(m) })) @@ -181,16 +178,18 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err errCh <- fmt.Errorf("error reading message from client conn: %v", err) return } - ch.ObserveLTime(message.LTime) + if message.LTime != 0 { + ch.ObserveLTime(message.LTime) + } if len(message.Deltas) == 0 { - time.Sleep(10 * time.Microsecond) continue } if !ch.ShouldApply(message.LTime) { continue } + fmt.Printf("READ %p\n", conn) err = client.PushDeltas(message.Deltas...) if err != nil { diff --git a/doc_test.go b/doc_test.go index fd72a33..31741ab 100644 --- a/doc_test.go +++ b/doc_test.go @@ -100,6 +100,7 @@ type WebsocketTester struct { mtx sync.Mutex } +// ReadJSON reads the next pending message from the "client" into v func (ws *WebsocketTester) ReadJSON(v interface{}) error { ws.mtx.Lock() defer ws.mtx.Unlock() @@ -113,6 +114,7 @@ func (ws *WebsocketTester) ReadJSON(v interface{}) error { return err } +// WriteJSON "sends" a message, v, to the "client" func (ws *WebsocketTester) WriteJSON(v interface{}) error { ws.mtx.Lock() defer ws.mtx.Unlock() @@ -122,16 +124,13 @@ func (ws *WebsocketTester) WriteJSON(v interface{}) error { panic("wrong type written to WebsocketTester") } - if len(m.Deltas) == 0 { - return nil - } - ws.wroteMessages = append(ws.wroteMessages, m) ws.wroteDeltas = append(ws.wroteDeltas, m.Deltas...) return nil } +// ReceiveJSON queues a message to be sent to the client func (ws *WebsocketTester) ReceiveJSON(v interface{}) error { ws.mtx.Lock() defer ws.mtx.Unlock() @@ -177,6 +176,7 @@ func TestDeltasSingle(t *testing.T) { // so we expect to have written 0 messages if len(client.wroteMessages) != 0 { t.Errorf("client wrote %d messages, should have written %d", len(client.wroteMessages), 0) + t.Logf("%v", client.wroteMessages) } // we received one, so make sure that's counted properly @@ -268,13 +268,15 @@ func TestDeltasMulti(t *testing.T) { doc, err := w.CH.editSession("content/" + title + ".md") if err != nil { - t.Fatal("couldn't establish docref:", err) + t.Fatal("couldn't establish edit session:", err) } go w.CH.handleDeltaConn(clients[0], doc) go w.CH.handleDeltaConn(clients[1], doc) go w.CH.handleDeltaConn(clients[2], doc) + time.Sleep(100 * time.Millisecond) + a := acedoc.Insert(0, 0, "a") b := acedoc.Insert(0, 0, "b") c := acedoc.Insert(0, 0, "c") @@ -283,12 +285,13 @@ func TestDeltasMulti(t *testing.T) { clients[1].ReceiveJSON(w.CH.Message(b)) clients[2].ReceiveJSON(w.CH.Message(c)) - time.Sleep(400 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) for i, client := range clients { client.mtx.Lock() - // all clients should have "written" 2 deltas (could be the same - // message) that came from the other clients + t.Logf("client %d exists", i) + // all clients should have "written" 2 deltas out to their "browser" + // that came from the other clients if len(client.wroteDeltas) != 2 { t.Errorf("client %d wrote %d deltas, should have written 2", i, len(client.wroteDeltas)) } diff --git a/hugo.go b/hugo.go index 4d6e491..b5e6434 100644 --- a/hugo.go +++ b/hugo.go @@ -52,7 +52,6 @@ func writeThemeFiles(dir string) error { if err != nil { return err } - fmt.Println("writing", path.Join(dir, asset)) f, err := os.Create(path.Join(dir, asset)) if err != nil { return err From 561b4feee138789ede88adb8d9da53ff7075619b Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Thu, 28 Sep 2017 07:16:02 -0400 Subject: [PATCH 2/5] making tests a little easier to read --- doc_test.go | 126 ++++++++++-------------------------------------- helpers_test.go | 96 ++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 100 deletions(-) create mode 100644 helpers_test.go diff --git a/doc_test.go b/doc_test.go index 31741ab..a332762 100644 --- a/doc_test.go +++ b/doc_test.go @@ -1,10 +1,6 @@ package caddyhugo import ( - "encoding/json" - "io/ioutil" - "os" - "os/exec" "path" "sync" "testing" @@ -13,135 +9,65 @@ import ( "git.stephensearles.com/stephen/acedoc" ) -type World struct { - CH *CaddyHugo - BlogFolder string -} - -func (w *World) Clean() { - if w.BlogFolder != "" { - os.RemoveAll(w.BlogFolder) - } -} - -func NewWorld(t *testing.T) *World { - dir, err := ioutil.TempDir("", "caddy-hugo2-test-") - if err != nil { - t.Fatalf("error initializing test environment: %v", err) - } - - w := &World{BlogFolder: dir} - - cmd := exec.Command("hugo", "new", "site", dir) - cmd.Dir = dir - out, err := cmd.CombinedOutput() - if err != nil { - t.Fatalf("error initializing test site: %v\n\n%v", err, string(out)) - } - - w.CH = &CaddyHugo{} - w.CH.Setup(dir) - - return w -} - func TestEdits(t *testing.T) { w := NewWorld(t) defer w.Clean() const title = "sometitle" - var contentPath = path.Join("content", title+".md") - w.CH.NewContent(title, "") + var ( + mtx sync.Mutex - send := []acedoc.Delta{ - acedoc.Insert(0, 0, "hello"), - acedoc.Insert(0, 5, " world"), - acedoc.Insert(0, 11, " world"), - } - var mtx sync.Mutex - received := []acedoc.Delta{} + contentPath = path.Join("content", title+".md") + + // we will send these to the doc + send = []acedoc.Delta{ + acedoc.Insert(0, 0, "hello"), + acedoc.Insert(0, 5, " world"), + acedoc.Insert(0, 11, " world"), + } + + // we will use this to track what we get out of the doc + received = []acedoc.Delta{} + ) + + // prepare a new post + w.CH.NewContent(title, "") + // start an edit session doc, err := w.CH.editSession(contentPath) if err != nil { t.Fatal("error creating document client:", err) } - doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { - // receive some deltas... + // register an *extra* client that just adds to the received delta slice we're tracking + c := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { mtx.Lock() defer mtx.Unlock() received = append(received, ds...) return nil })) + // make sure we have an edit session _, ok := w.CH.hasEditSession(contentPath) if !ok { t.Fatal("expected there to be an established client") } - doc.doc.Apply(send...) + // push the deltas + c.PushDeltas(send...) + // wait... <-time.After(5 * time.Second) + // be sure we got the correct number of deltas back mtx.Lock() defer mtx.Unlock() if len(received) != len(send) { t.Errorf("expected %d deltas, received %d; expected: %v, received: %v", len(send), len(received), send, received) } -} - -type WebsocketTester struct { - receivedPointer int - received [][]byte - wroteMessages []Message - wroteDeltas []acedoc.Delta - mtx sync.Mutex -} - -// ReadJSON reads the next pending message from the "client" into v -func (ws *WebsocketTester) ReadJSON(v interface{}) error { - ws.mtx.Lock() - defer ws.mtx.Unlock() - - if len(ws.received) <= ws.receivedPointer { - return nil - } - - err := json.Unmarshal(ws.received[ws.receivedPointer], v) - ws.receivedPointer++ - return err -} - -// WriteJSON "sends" a message, v, to the "client" -func (ws *WebsocketTester) WriteJSON(v interface{}) error { - ws.mtx.Lock() - defer ws.mtx.Unlock() - - m, ok := v.(Message) - if !ok { - panic("wrong type written to WebsocketTester") - } - - ws.wroteMessages = append(ws.wroteMessages, m) - ws.wroteDeltas = append(ws.wroteDeltas, m.Deltas...) - - return nil -} - -// ReceiveJSON queues a message to be sent to the client -func (ws *WebsocketTester) ReceiveJSON(v interface{}) error { - ws.mtx.Lock() - defer ws.mtx.Unlock() - - out, err := json.Marshal(v) - if err != nil { - return err - } - - ws.received = append(ws.received, out) - return nil + t.Log(w.CH.Contents()) } func TestDeltasSingle(t *testing.T) { diff --git a/helpers_test.go b/helpers_test.go new file mode 100644 index 0000000..d26f5a0 --- /dev/null +++ b/helpers_test.go @@ -0,0 +1,96 @@ +package caddyhugo + +import ( + "encoding/json" + "io/ioutil" + "os" + "os/exec" + "sync" + "testing" + + "git.stephensearles.com/stephen/acedoc" +) + +type World struct { + CH *CaddyHugo + BlogFolder string +} + +func (w *World) Clean() { + if w.BlogFolder != "" { + os.RemoveAll(w.BlogFolder) + } +} + +func NewWorld(t *testing.T) *World { + dir, err := ioutil.TempDir("", "caddy-hugo2-test-") + if err != nil { + t.Fatalf("error initializing test environment: %v", err) + } + + w := &World{BlogFolder: dir} + + cmd := exec.Command("hugo", "new", "site", dir) + cmd.Dir = dir + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("error initializing test site: %v\n\n%v", err, string(out)) + } + + w.CH = &CaddyHugo{} + w.CH.Setup(dir) + + return w +} + +type WebsocketTester struct { + receivedPointer int + received [][]byte + wroteMessages []Message + wroteDeltas []acedoc.Delta + mtx sync.Mutex +} + +// ReadJSON reads the next pending message from the "client" into v +func (ws *WebsocketTester) ReadJSON(v interface{}) error { + ws.mtx.Lock() + defer ws.mtx.Unlock() + + if len(ws.received) <= ws.receivedPointer { + return nil + } + + err := json.Unmarshal(ws.received[ws.receivedPointer], v) + ws.receivedPointer++ + return err +} + +// WriteJSON "sends" a message, v, to the "client" +func (ws *WebsocketTester) WriteJSON(v interface{}) error { + ws.mtx.Lock() + defer ws.mtx.Unlock() + + m, ok := v.(Message) + if !ok { + panic("wrong type written to WebsocketTester") + } + + ws.wroteMessages = append(ws.wroteMessages, m) + ws.wroteDeltas = append(ws.wroteDeltas, m.Deltas...) + + return nil +} + +// ReceiveJSON queues a message to be sent to the client +func (ws *WebsocketTester) ReceiveJSON(v interface{}) error { + ws.mtx.Lock() + defer ws.mtx.Unlock() + + out, err := json.Marshal(v) + if err != nil { + return err + } + + ws.received = append(ws.received, out) + return nil +} From 21caa0f7e41fdd4244187143a37ca762032df4c5 Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Thu, 28 Sep 2017 07:46:40 -0400 Subject: [PATCH 3/5] fixed one test --- doc_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/doc_test.go b/doc_test.go index a332762..82c9551 100644 --- a/doc_test.go +++ b/doc_test.go @@ -35,13 +35,20 @@ func TestEdits(t *testing.T) { w.CH.NewContent(title, "") // start an edit session - doc, err := w.CH.editSession(contentPath) + es, err := w.CH.editSession(contentPath) if err != nil { t.Fatal("error creating document client:", err) } - // register an *extra* client that just adds to the received delta slice we're tracking - c := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { + // register a client that we will use to push the deltas. we dont need + // to actually do anything to receive deltas here, though. + c := es.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { + return nil + })) + + // register an *extra* client that just adds to the received delta + // slice we're tracking + es.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { mtx.Lock() defer mtx.Unlock() received = append(received, ds...) @@ -66,8 +73,6 @@ func TestEdits(t *testing.T) { if len(received) != len(send) { t.Errorf("expected %d deltas, received %d; expected: %v, received: %v", len(send), len(received), send, received) } - - t.Log(w.CH.Contents()) } func TestDeltasSingle(t *testing.T) { From 1910e74085443dc2b202ccb2d2356b7e3c076900 Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Thu, 28 Sep 2017 08:23:03 -0400 Subject: [PATCH 4/5] fixing tests --- deltas.go | 3 --- doc_test.go | 17 +++++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/deltas.go b/deltas.go index f1822e9..736cf24 100644 --- a/deltas.go +++ b/deltas.go @@ -40,7 +40,6 @@ func (ch *CaddyHugo) ShouldApply(ltime uint64) bool { defer ch.mtx.Unlock() if _, ok := ch.confirmingToClient[ltime]; ok { - fmt.Println("rejecting ltime", ltime, "because was already processed") return false } @@ -153,7 +152,6 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error { m := ch.Message(ds...) wroteMessagesCh <- m - fmt.Println("WRITING") return conn.WriteJSON(m) })) @@ -189,7 +187,6 @@ func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, err if !ch.ShouldApply(message.LTime) { continue } - fmt.Printf("READ %p\n", conn) err = client.PushDeltas(message.Deltas...) if err != nil { diff --git a/doc_test.go b/doc_test.go index 82c9551..4d8b528 100644 --- a/doc_test.go +++ b/doc_test.go @@ -88,12 +88,12 @@ func TestDeltasSingle(t *testing.T) { client := new(WebsocketTester) - doc, err := w.CH.editSession("content/" + title + ".md") + es, err := w.CH.editSession("content/" + title + ".md") if err != nil { t.Fatal("couldn't establish docref for client 0:", err) } - go w.CH.handleDeltaConn(client, doc) + go w.CH.handleDeltaConn(client, es) a := acedoc.Insert(0, 0, "a") @@ -104,9 +104,10 @@ func TestDeltasSingle(t *testing.T) { time.Sleep(50 * time.Millisecond) // we shouldn't have written back to the client, - // so we expect to have written 0 messages - if len(client.wroteMessages) != 0 { - t.Errorf("client wrote %d messages, should have written %d", len(client.wroteMessages), 0) + // so we expect to have written 0 *deltas*. (we may have written + // empty messages without deltas because of the pings to the client) + if len(client.wroteDeltas) != 0 { + t.Errorf("client wrote %d deltas, should have written %d", len(client.wroteMessages), 0) t.Logf("%v", client.wroteMessages) } @@ -148,7 +149,7 @@ func TestDeltasDouble(t *testing.T) { // so we expect clientA to have written 0 messages, and // clientB to have written 1 - if len(clientA.wroteMessages) != 0 || len(clientB.wroteMessages) != 1 { + if len(clientA.wroteDeltas) != 0 || len(clientB.wroteDeltas) != 1 { t.Errorf("clientA wrote %d messages, should have written 0. clientB wrote %d, should have written 1", len(clientA.wroteMessages), len(clientB.wroteMessages)) } @@ -169,9 +170,9 @@ func TestDeltasDouble(t *testing.T) { clientA.mtx.Lock() clientB.mtx.Lock() - // so we expect clientA to have written 1 message this time, and + // so we expect clientA to have written 1 delta this time, and // clientB to have written nothing new, so 1 still - if len(clientA.wroteMessages) != 1 || len(clientB.wroteMessages) != 1 { + if len(clientA.wroteDeltas) != 1 || len(clientB.wroteDeltas) != 1 { t.Errorf("clientA wrote %d messages, should have written 1. clientB wrote %d, should have written 1 (just from before)", len(clientA.wroteMessages), len(clientB.wroteMessages)) } From 33dffdad9578d21122bfcfce54a48e18fd652c18 Mon Sep 17 00:00:00 2001 From: Stephen Searles Date: Thu, 28 Sep 2017 08:28:18 -0400 Subject: [PATCH 5/5] tweaks from reviewing PR --- deltas.go | 2 +- doc_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/deltas.go b/deltas.go index 736cf24..b01ec78 100644 --- a/deltas.go +++ b/deltas.go @@ -123,7 +123,7 @@ func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message { } func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) { - const idlePing = 1 * time.Second + const idlePing = 15 * time.Second const idlePingShort = 1 * time.Millisecond errCh := make(chan error) diff --git a/doc_test.go b/doc_test.go index 4d8b528..5e34ae1 100644 --- a/doc_test.go +++ b/doc_test.go @@ -221,7 +221,6 @@ func TestDeltasMulti(t *testing.T) { for i, client := range clients { client.mtx.Lock() - t.Logf("client %d exists", i) // all clients should have "written" 2 deltas out to their "browser" // that came from the other clients if len(client.wroteDeltas) != 2 {