diff --git a/acedoc.go b/acedoc.go index 1a48df2..299383c 100644 --- a/acedoc.go +++ b/acedoc.go @@ -6,26 +6,38 @@ import ( ) type Document struct { - mtx sync.RWMutex - Lines []string + mtx sync.RWMutex + lines []string + deltas []Delta + clients []*Client +} + +type DeltaHandler interface { + Handle([]Delta) error +} + +type DeltaHandlerFunc func([]Delta) error + +func (fn DeltaHandlerFunc) Handle(d []Delta) error { + return fn(d) } func NewString(str string) *Document { d := &Document{ - Lines: strings.Split(str, "\n"), + lines: strings.Split(str, "\n"), } return d } -func (d *Document) NLines() uint { - n := uint(len(d.Lines)) +func (d *Document) nLines() uint { + n := uint(len(d.lines)) if n == 0 { n = 1 } return n } -func (d Delta) NLines() uint { +func (d Delta) nLines() uint { n := uint(len(d.Lines)) if n == 0 { n = 1 @@ -33,20 +45,21 @@ func (d Delta) NLines() uint { return n } -func (d Document) Line(i uint) string { - if i >= uint(len(d.Lines)) { +func (d Document) line(i uint) string { + if i >= uint(len(d.lines)) { return "" } - return d.Lines[i] + return d.lines[i] } -func (d Delta) Line(i uint) string { + +func (d Delta) line(i uint) string { if i >= uint(len(d.Lines)) { return "" } return d.Lines[i] } -func (d Delta) NRows() uint { +func (d Delta) nRows() uint { r := d.End.Row - d.Start.Row r += 1 if r == 0 { @@ -55,12 +68,12 @@ func (d Delta) NRows() uint { return r } -func (d Delta) Rows() uint { +func (d Delta) rows() uint { r := d.End.Row - d.Start.Row return r } -func (d Delta) Cols() uint { +func (d Delta) cols() uint { r := d.End.Column return r } @@ -69,7 +82,7 @@ func (d *Document) Contents() string { d.mtx.RLock() defer d.mtx.RUnlock() - return strings.Join(d.Lines, "\n") + return strings.Join(d.lines, "\n") } type Position struct { @@ -77,11 +90,11 @@ type Position struct { } func (d *Document) InDocument(pos Position) bool { - if pos.Row > d.NLines() { + if pos.Row > d.nLines() { return false } - line := d.Line(pos.Row) + line := d.line(pos.Row) if pos.Column > uint(len(line)) { return false } diff --git a/client.go b/client.go new file mode 100644 index 0000000..3b40e2a --- /dev/null +++ b/client.go @@ -0,0 +1,134 @@ +package acedoc + +import ( + "sync" + "time" +) + +func (d *Document) Client(h DeltaHandler) *Client { + c := &Client{ + ch: make(chan Delta, 10), + errs: make(chan error, 10), + doc: d, + handler: h, + } + + go func() { + for { + dls := c.pullDeltas() + if dls != nil && c.handler != nil { + go func() { + err := c.handler.Handle(dls) + if err != nil { + select { + case c.errs <- err: + default: + c.Close() + return + } + } + }() + } + } + }() + + d.mtx.Lock() + defer d.mtx.Unlock() + + d.clients = append(d.clients, c) + + return c +} + +func (d *Document) removeClient(c *Client) { + d.mtx.Lock() + defer d.mtx.Unlock() + + for i, ci := range d.clients { + if ci == c { + newclients := append([]*Client{}, d.clients[:i]...) + if len(d.clients) > i { + newclients = append(newclients, d.clients[i+1:]...) + } + d.clients = newclients + return + } + } +} + +type Client struct { + ch chan Delta + errs chan error + + closing sync.Mutex + closed bool + + doc *Document + maxDeltaBucket int + pending []Delta + handler DeltaHandler +} + +// Close closes the client +func (c *Client) Close() { + c.closing.Lock() + defer c.closing.Unlock() + + if !c.closed { + c.doc.removeClient(c) + close(c.ch) + close(c.errs) + c.closed = true + } +} + +// Error returns any pending errors from pushing deltas to the client. +func (c *Client) Error() error { + var err error + select { + case err = <-c.errs: + return err + default: + } + + return nil +} + +// PushDeltas pushes deltas into the document. +func (c *Client) PushDeltas(d ...Delta) error { + return c.doc.Apply(d...) +} + +func (c *Client) pullDeltas() []Delta { + maxSize := c.maxDeltaBucket + if maxSize == 0 { + maxSize = 50 + } + + timeout := 10 * time.Microsecond + + var ds []Delta + for { + var d Delta + var ok bool + + select { + case d, ok = <-c.ch: + case <-time.After(timeout): + return ds + } + + if !ok { + return ds + } + + if d.Equal(Delta{}) { + continue + } + + ds = append(ds, d) + if len(ds) == maxSize { + return ds + } + } +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..23e72ba --- /dev/null +++ b/client_test.go @@ -0,0 +1,46 @@ +package acedoc + +import ( + "testing" + "time" +) + +func TestClient(t *testing.T) { + doc := NewString("") + + aliceCh := make(chan []Delta, 5) + alice := doc.Client(DeltaHandlerFunc(func(ds []Delta) error { + aliceCh <- ds + return nil + })) + defer alice.Close() + + bobCh := make(chan []Delta, 5) + bob := doc.Client(DeltaHandlerFunc(func(ds []Delta) error { + bobCh <- ds + return nil + })) + defer bob.Close() + + delta := Insert(0, 0, "alphabet") + + err := alice.PushDeltas(delta) + if err != nil { + t.Error("got error pushing delta:", err) + } + + select { + case got := <-bobCh: + if len(got) != 1 { + t.Errorf("got %d deltas, expected %d", len(got), 1) + } else if !got[0].Equal(delta) { + t.Error("didn't get the correct delta, saw:", got[0], "expected:", delta) + } + case <-time.After(10 * time.Millisecond): + t.Error("didn't get any delta") + } + + if got := doc.Contents(); got != "alphabet" { + t.Errorf("saw doc %q, expected %q", got, "alphabet") + } +} diff --git a/delta.go b/delta.go index 2a983bc..bc34c1e 100644 --- a/delta.go +++ b/delta.go @@ -51,6 +51,32 @@ type Delta struct { Start, End Position } +func (d Delta) Equal(other Delta) bool { + if d.Start != other.Start { + return false + } + + if d.End != other.End { + return false + } + + if d.Action != other.Action { + return false + } + + if len(d.Lines) != len(other.Lines) { + return false + } + + for i := range d.Lines { + if d.Lines[i] != other.Lines[i] { + return false + } + } + + return true +} + func Remove(row, col uint, str string) Delta { lines := strings.Split(str, "\n") nlines := len(lines) - 1 @@ -73,65 +99,71 @@ func Insert(row, col uint, str string) Delta { } } -func (d *Document) applyInsert(dl Delta) error { +func (d *Document) applyInsert(dl Delta) { row := dl.Start.Row col := dl.Start.Column - line := d.Lines[row] + line := d.lines[row] - if dl.NLines() == 1 { - d.Lines[row] = line[:col] + dl.Line(0) + line[col:] + if dl.nLines() == 1 { + d.lines[row] = line[:col] + dl.line(0) + line[col:] } else { newlines := []string{} if row != 0 { - newlines = append(newlines, d.Lines[:row-1]...) // old content + newlines = append(newlines, d.lines[:row-1]...) // old content } - newlines = append(newlines, d.Lines[row][:col]+dl.Lines[0]) + newlines = append(newlines, d.lines[row][:col]+dl.Lines[0]) newlines = append(newlines, dl.Lines[1:]...) // new content - newlines[len(newlines)-1] += d.Lines[row][col:] - if uint(len(d.Lines)) > row { - newlines = append(newlines, d.Lines[row+1:]...) // old content + newlines[len(newlines)-1] += d.lines[row][col:] + if uint(len(d.lines)) > row { + newlines = append(newlines, d.lines[row+1:]...) // old content } - d.Lines = newlines + d.lines = newlines } - return nil } -func (d *Document) applyRemove(dl Delta) error { +func (d *Document) applyRemove(dl Delta) { row := dl.Start.Row col := dl.Start.Column - line := d.Lines[row] + line := d.lines[row] endCol := dl.End.Column endRow := dl.End.Row if row == endRow { - d.Lines[row] = line[:col] + line[endCol:] + d.lines[row] = line[:col] + line[endCol:] } else { - d.Lines[endRow] = d.Lines[row][:col] + d.Lines[endRow][endCol:] + d.lines[endRow] = d.lines[row][:col] + d.lines[endRow][endCol:] newlines := []string{} - newlines = append(newlines, d.Lines[:row]...) - newlines = append(newlines, d.Lines[endRow:]...) - d.Lines = newlines + newlines = append(newlines, d.lines[:row]...) + newlines = append(newlines, d.lines[endRow:]...) + d.lines = newlines } - - return nil } -func (d *Document) Apply(dl Delta) error { +func (d *Document) Apply(dls ...Delta) error { d.mtx.Lock() defer d.mtx.Unlock() - err := d.validate(dl) - if err != nil { - return fmt.Errorf("delta does not apply to document: %v", err) + for _, dl := range dls { + err := d.validate(dl) + if err != nil { + return fmt.Errorf("delta does not apply to document: %v", err) + } } - switch dl.Action { - case DeltaInsert: - return d.applyInsert(dl) - case DeltaRemove: - return d.applyRemove(dl) + for _, dl := range dls { + switch dl.Action { + case DeltaInsert: + d.applyInsert(dl) + case DeltaRemove: + d.applyRemove(dl) + } + + d.deltas = append(d.deltas, dl) + for _, c := range d.clients { + c.ch <- dl + } } return nil @@ -146,14 +178,14 @@ func (d *Document) validate(dl Delta) error { return fmt.Errorf("end is not in document for remove") } - if dl.NLines() != dl.NRows() { - return fmt.Errorf("delta has %d lines, but positions show range of %d lines", dl.NLines(), dl.NRows()) + if dl.nLines() != dl.nRows() { + return fmt.Errorf("delta has %d lines, but positions show range of %d lines", dl.nLines(), dl.nRows()) } - lastDlLine := dl.Line(dl.NRows() - 1) + lastDlLine := dl.line(dl.nRows() - 1) - if uint(len(lastDlLine)) != dl.Cols() { - return fmt.Errorf("delta has %d chars on the final line, but positions show range of %d chars", len(lastDlLine), dl.Cols()) + if uint(len(lastDlLine)) != dl.cols() { + return fmt.Errorf("delta has %d chars on the final line, but positions show range of %d chars", len(lastDlLine), dl.cols()) } return nil