You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
acedoc/client.go

142 lines
2.1 KiB

package acedoc
import (
"sync"
"time"
)
func (d *Document) Client(h DeltaHandler) *Client {
c := &Client{
ch: make(chan Delta, 10),
errs: make(chan error, 10),
doc: d,
handler: h,
}
go func() {
for {
dls := c.pullDeltas()
if dls != nil && c.handler != nil {
go func() {
err := c.handler.Handle(dls)
if err != nil {
select {
case c.errs <- err:
default:
c.Close()
return
}
}
}()
}
}
}()
d.mtx.Lock()
defer d.mtx.Unlock()
d.clients = append(d.clients, c)
return c
}
func (d *Document) removeClient(c *Client) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, ci := range d.clients {
if ci == c {
newclients := append([]*Client{}, d.clients[:i]...)
if len(d.clients) > i {
newclients = append(newclients, d.clients[i+1:]...)
}
d.clients = newclients
return
}
}
}
type Client struct {
ch chan Delta
errs chan error
closing sync.Mutex
closed bool
doc *Document
maxDeltaBucket int
pending []Delta
handler DeltaHandler
}
// Close closes the client
func (c *Client) Close() {
c.closing.Lock()
defer c.closing.Unlock()
if !c.closed {
c.doc.removeClient(c)
close(c.ch)
close(c.errs)
c.closed = true
}
}
// Error returns any pending errors from pushing deltas to the client.
func (c *Client) Error() error {
var err error
select {
case err = <-c.errs:
return err
default:
}
return nil
}
// PushDeltas pushes deltas into the document.
func (c *Client) PushDeltas(d ...Delta) error {
ds := make([]Delta, len(d))
for i := range d {
ds[i] = d[i]
ds[i].source = c
}
return c.doc.Apply(ds...)
}
func (c *Client) pullDeltas() []Delta {
maxSize := c.maxDeltaBucket
if maxSize == 0 {
maxSize = 50
}
timeout := 10 * time.Microsecond
var ds []Delta
for {
var d Delta
var ok bool
select {
case d, ok = <-c.ch:
case <-time.After(timeout):
return ds
}
if !ok {
return ds
}
if d.Equal(Delta{}) {
continue
}
if d.source == c {
continue
}
ds = append(ds, d)
if len(ds) == maxSize {
return ds
}
}
}