diff --git a/acedoc.go b/acedoc.go index 3f4811b..546843f 100644 --- a/acedoc.go +++ b/acedoc.go @@ -10,6 +10,8 @@ type Document struct { lines []string deltas []Delta clients []*Client + + logger *Client } type DeltaHandler interface { diff --git a/client.go b/client.go index a8bebb3..6cfc550 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,9 @@ package acedoc import ( + "encoding/json" + "os" + "path" "sync" "time" ) @@ -15,8 +18,8 @@ func (d *Document) Client(h DeltaHandler) *Client { go func() { for { - dls := c.pullDeltas() - if dls != nil && c.handler != nil { + dls, ok := c.pullDeltas() + if len(dls) > 0 && c.handler != nil { go func() { err := c.handler.Handle(dls) if err != nil { @@ -29,6 +32,9 @@ func (d *Document) Client(h DeltaHandler) *Client { } }() } + if !ok { + return + } } }() @@ -40,6 +46,12 @@ func (d *Document) Client(h DeltaHandler) *Client { return c } +func (d *Document) Close() { + if d.logger != nil { + d.logger.Close() + } +} + func (d *Document) removeClient(c *Client) { d.mtx.Lock() defer d.mtx.Unlock() @@ -104,7 +116,7 @@ func (c *Client) PushDeltas(d ...Delta) error { return c.doc.Apply(ds...) } -func (c *Client) pullDeltas() []Delta { +func (c *Client) pullDeltas() ([]Delta, bool) { maxSize := c.maxDeltaBucket if maxSize == 0 { maxSize = 50 @@ -120,26 +132,89 @@ func (c *Client) pullDeltas() []Delta { select { case d, ok = <-c.ch: + if !ok { + return ds, false + } case <-timer: - return ds - } - - timer = time.After(timeout) - - if !ok { - return ds + return ds, true } if d.Equal(Delta{}) { continue } + if d.source == c { continue } + timer = time.After(timeout) + ds = append(ds, d) if len(ds) == maxSize { - return ds + return ds, true } } } + +func (d *Document) LogToFile(filename string) error { + if d.logger != nil { + return nil + } + + var f *os.File + var err error + var mtx sync.Mutex + var closeTimer *time.Timer + + err = os.MkdirAll(path.Dir(filename), 0755) + if err != nil { + return err + } + + getFile := func() { + if f != nil { + return + } + f, err = os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + closeTimer.Reset(1 * time.Minute) + } + + closeFile := func() { + mtx.Lock() + defer mtx.Unlock() + + if f != nil { + err = f.Close() + f = nil + } + } + + closeTimer = time.AfterFunc(1*time.Minute, closeFile) + + getFile() + if err != nil { + return err + } + + d.logger = d.Client(DeltaHandlerFunc(func(dls []Delta) error { + mtx.Lock() + defer mtx.Unlock() + + getFile() + if err != nil { + return err + } + + enc := json.NewEncoder(f) + for _, dl := range dls { + err = enc.Encode(dl) + if err != nil { + return err + } + } + + return nil + })) + + return nil +} diff --git a/delta.go b/delta.go index b67de9b..0fefe3c 100644 --- a/delta.go +++ b/delta.go @@ -196,9 +196,7 @@ func (d *Document) Apply(dls ...Delta) error { if err != nil { return fmt.Errorf("delta does not apply to document: %v", err) } - } - for _, dl := range dls { switch dl.Action { case DeltaInsert: d.applyInsert(dl)