package acedoc import ( "sync" "time" ) func (d *Document) Client(h DeltaHandler) *Client { c := &Client{ ch: make(chan Delta, 10), errs: make(chan error, 10), doc: d, handler: h, } go func() { for { dls := c.pullDeltas() if dls != nil && c.handler != nil { go func() { err := c.handler.Handle(dls) if err != nil { select { case c.errs <- err: default: c.Close() return } } }() } } }() d.mtx.Lock() defer d.mtx.Unlock() d.clients = append(d.clients, c) return c } func (d *Document) removeClient(c *Client) { d.mtx.Lock() defer d.mtx.Unlock() for i, ci := range d.clients { if ci == c { newclients := append([]*Client{}, d.clients[:i]...) if len(d.clients) > i { newclients = append(newclients, d.clients[i+1:]...) } d.clients = newclients return } } } type Client struct { ch chan Delta errs chan error closing sync.Mutex closed bool doc *Document maxDeltaBucket int pending []Delta handler DeltaHandler } // Close closes the client func (c *Client) Close() { c.closing.Lock() defer c.closing.Unlock() if !c.closed { c.doc.removeClient(c) close(c.ch) close(c.errs) c.closed = true } } // Error returns any pending errors from pushing deltas to the client. func (c *Client) Error() error { var err error select { case err = <-c.errs: return err default: } return nil } // PushDeltas pushes deltas into the document. func (c *Client) PushDeltas(d ...Delta) error { ds := make([]Delta, len(d)) for i := range d { ds[i] = d[i] ds[i].source = c } return c.doc.Apply(ds...) } func (c *Client) pullDeltas() []Delta { maxSize := c.maxDeltaBucket if maxSize == 0 { maxSize = 50 } var timer <-chan time.Time const timeout = 200 * time.Microsecond var ds []Delta for { var d Delta var ok bool select { case d, ok = <-c.ch: case <-timer: return ds } timer = time.After(timeout) if !ok { return ds } if d.Equal(Delta{}) { continue } if d.source == c { continue } ds = append(ds, d) if len(ds) == maxSize { return ds } } }