parent
eafc496037
commit
b66e36f0e3
@ -0,0 +1,134 @@ |
||||
package acedoc |
||||
|
||||
import ( |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
func (d *Document) Client(h DeltaHandler) *Client { |
||||
c := &Client{ |
||||
ch: make(chan Delta, 10), |
||||
errs: make(chan error, 10), |
||||
doc: d, |
||||
handler: h, |
||||
} |
||||
|
||||
go func() { |
||||
for { |
||||
dls := c.pullDeltas() |
||||
if dls != nil && c.handler != nil { |
||||
go func() { |
||||
err := c.handler.Handle(dls) |
||||
if err != nil { |
||||
select { |
||||
case c.errs <- err: |
||||
default: |
||||
c.Close() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
} |
||||
}() |
||||
|
||||
d.mtx.Lock() |
||||
defer d.mtx.Unlock() |
||||
|
||||
d.clients = append(d.clients, c) |
||||
|
||||
return c |
||||
} |
||||
|
||||
func (d *Document) removeClient(c *Client) { |
||||
d.mtx.Lock() |
||||
defer d.mtx.Unlock() |
||||
|
||||
for i, ci := range d.clients { |
||||
if ci == c { |
||||
newclients := append([]*Client{}, d.clients[:i]...) |
||||
if len(d.clients) > i { |
||||
newclients = append(newclients, d.clients[i+1:]...) |
||||
} |
||||
d.clients = newclients |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
type Client struct { |
||||
ch chan Delta |
||||
errs chan error |
||||
|
||||
closing sync.Mutex |
||||
closed bool |
||||
|
||||
doc *Document |
||||
maxDeltaBucket int |
||||
pending []Delta |
||||
handler DeltaHandler |
||||
} |
||||
|
||||
// Close closes the client
|
||||
func (c *Client) Close() { |
||||
c.closing.Lock() |
||||
defer c.closing.Unlock() |
||||
|
||||
if !c.closed { |
||||
c.doc.removeClient(c) |
||||
close(c.ch) |
||||
close(c.errs) |
||||
c.closed = true |
||||
} |
||||
} |
||||
|
||||
// Error returns any pending errors from pushing deltas to the client.
|
||||
func (c *Client) Error() error { |
||||
var err error |
||||
select { |
||||
case err = <-c.errs: |
||||
return err |
||||
default: |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// PushDeltas pushes deltas into the document.
|
||||
func (c *Client) PushDeltas(d ...Delta) error { |
||||
return c.doc.Apply(d...) |
||||
} |
||||
|
||||
func (c *Client) pullDeltas() []Delta { |
||||
maxSize := c.maxDeltaBucket |
||||
if maxSize == 0 { |
||||
maxSize = 50 |
||||
} |
||||
|
||||
timeout := 10 * time.Microsecond |
||||
|
||||
var ds []Delta |
||||
for { |
||||
var d Delta |
||||
var ok bool |
||||
|
||||
select { |
||||
case d, ok = <-c.ch: |
||||
case <-time.After(timeout): |
||||
return ds |
||||
} |
||||
|
||||
if !ok { |
||||
return ds |
||||
} |
||||
|
||||
if d.Equal(Delta{}) { |
||||
continue |
||||
} |
||||
|
||||
ds = append(ds, d) |
||||
if len(ds) == maxSize { |
||||
return ds |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,46 @@ |
||||
package acedoc |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
) |
||||
|
||||
func TestClient(t *testing.T) { |
||||
doc := NewString("") |
||||
|
||||
aliceCh := make(chan []Delta, 5) |
||||
alice := doc.Client(DeltaHandlerFunc(func(ds []Delta) error { |
||||
aliceCh <- ds |
||||
return nil |
||||
})) |
||||
defer alice.Close() |
||||
|
||||
bobCh := make(chan []Delta, 5) |
||||
bob := doc.Client(DeltaHandlerFunc(func(ds []Delta) error { |
||||
bobCh <- ds |
||||
return nil |
||||
})) |
||||
defer bob.Close() |
||||
|
||||
delta := Insert(0, 0, "alphabet") |
||||
|
||||
err := alice.PushDeltas(delta) |
||||
if err != nil { |
||||
t.Error("got error pushing delta:", err) |
||||
} |
||||
|
||||
select { |
||||
case got := <-bobCh: |
||||
if len(got) != 1 { |
||||
t.Errorf("got %d deltas, expected %d", len(got), 1) |
||||
} else if !got[0].Equal(delta) { |
||||
t.Error("didn't get the correct delta, saw:", got[0], "expected:", delta) |
||||
} |
||||
case <-time.After(10 * time.Millisecond): |
||||
t.Error("didn't get any delta") |
||||
} |
||||
|
||||
if got := doc.Contents(); got != "alphabet" { |
||||
t.Errorf("saw doc %q, expected %q", got, "alphabet") |
||||
} |
||||
} |
Loading…
Reference in new issue