package acedoc import ( "encoding/json" "os" "path" "sync" "time" ) func (d *Document) Client(h DeltaHandler) *Client { c := &Client{ ch: make(chan Delta, 10), errs: make(chan error, 10), doc: d, handler: h, } go func() { for { dls, ok := c.pullDeltas() if len(dls) > 0 && c.handler != nil { go func() { err := c.handler.Handle(dls) if err != nil { select { case c.errs <- err: default: c.Close() return } } }() } if !ok { return } } }() d.mtx.Lock() defer d.mtx.Unlock() d.clients = append(d.clients, c) return c } func (d *Document) Close() { if d.logger != nil { d.logger.Close() } } func (d *Document) removeClient(c *Client) { d.mtx.Lock() defer d.mtx.Unlock() for i, ci := range d.clients { if ci == c { newclients := append([]*Client{}, d.clients[:i]...) if len(d.clients) > i { newclients = append(newclients, d.clients[i+1:]...) } d.clients = newclients return } } } type Client struct { ch chan Delta errs chan error closing sync.Mutex closed bool doc *Document maxDeltaBucket int pending []Delta handler DeltaHandler } // Close closes the client func (c *Client) Close() { c.closing.Lock() defer c.closing.Unlock() if !c.closed { c.doc.removeClient(c) close(c.ch) close(c.errs) c.closed = true } } // Error returns any pending errors from pushing deltas to the client. func (c *Client) Error() error { var err error select { case err = <-c.errs: return err default: } return nil } // PushDeltas pushes deltas into the document. func (c *Client) PushDeltas(d ...Delta) error { ds := make([]Delta, len(d)) for i := range d { ds[i] = d[i] ds[i].source = c } return c.doc.Apply(ds...) } func (c *Client) pullDeltas() ([]Delta, bool) { maxSize := c.maxDeltaBucket if maxSize == 0 { maxSize = 50 } var timer <-chan time.Time const timeout = 200 * time.Microsecond var ds []Delta for { var d Delta var ok bool select { case d, ok = <-c.ch: if !ok { return ds, false } case <-timer: return ds, true } if d.Equal(Delta{}) { continue } if d.source == c { continue } timer = time.After(timeout) ds = append(ds, d) if len(ds) == maxSize { return ds, true } } } func (d *Document) LogToFile(filename string) error { if d.logger != nil { return nil } var f *os.File var err error var mtx sync.Mutex var closeTimer *time.Timer err = os.MkdirAll(path.Dir(filename), 0755) if err != nil { return err } getFile := func() { if f != nil { return } f, err = os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) closeTimer.Reset(1 * time.Minute) } closeFile := func() { mtx.Lock() defer mtx.Unlock() if f != nil { err = f.Close() f = nil } } closeTimer = time.AfterFunc(1*time.Minute, closeFile) getFile() if err != nil { return err } d.logger = d.Client(DeltaHandlerFunc(func(dls []Delta) error { mtx.Lock() defer mtx.Unlock() getFile() if err != nil { return err } enc := json.NewEncoder(f) for _, dl := range dls { err = enc.Encode(dl) if err != nil { return err } } return nil })) return nil }