You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
acedoc/client.go

221 lines
3.2 KiB

package acedoc
import (
"encoding/json"
"os"
"path"
"sync"
"time"
)
func (d *Document) Client(h DeltaHandler) *Client {
c := &Client{
ch: make(chan Delta, 10),
errs: make(chan error, 10),
doc: d,
handler: h,
}
go func() {
for {
dls, ok := c.pullDeltas()
if len(dls) > 0 && c.handler != nil {
go func() {
err := c.handler.Handle(dls)
if err != nil {
select {
case c.errs <- err:
default:
c.Close()
return
}
}
}()
}
if !ok {
return
}
}
}()
d.mtx.Lock()
defer d.mtx.Unlock()
d.clients = append(d.clients, c)
return c
}
func (d *Document) Close() {
if d.logger != nil {
d.logger.Close()
}
}
func (d *Document) removeClient(c *Client) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, ci := range d.clients {
if ci == c {
newclients := append([]*Client{}, d.clients[:i]...)
if len(d.clients) > i {
newclients = append(newclients, d.clients[i+1:]...)
}
d.clients = newclients
return
}
}
}
type Client struct {
ch chan Delta
errs chan error
closing sync.Mutex
closed bool
doc *Document
maxDeltaBucket int
pending []Delta
handler DeltaHandler
}
// Close closes the client
func (c *Client) Close() {
c.closing.Lock()
defer c.closing.Unlock()
if !c.closed {
c.doc.removeClient(c)
close(c.ch)
close(c.errs)
c.closed = true
}
}
// Error returns any pending errors from pushing deltas to the client.
func (c *Client) Error() error {
var err error
select {
case err = <-c.errs:
return err
default:
}
return nil
}
// PushDeltas pushes deltas into the document.
func (c *Client) PushDeltas(d ...Delta) error {
ds := make([]Delta, len(d))
for i := range d {
ds[i] = d[i]
ds[i].source = c
}
return c.doc.Apply(ds...)
}
func (c *Client) pullDeltas() ([]Delta, bool) {
maxSize := c.maxDeltaBucket
if maxSize == 0 {
maxSize = 50
}
var timer <-chan time.Time
const timeout = 200 * time.Microsecond
var ds []Delta
for {
var d Delta
var ok bool
select {
case d, ok = <-c.ch:
if !ok {
return ds, false
}
case <-timer:
return ds, true
}
if d.Equal(Delta{}) {
continue
}
if d.source == c {
continue
}
timer = time.After(timeout)
ds = append(ds, d)
if len(ds) == maxSize {
return ds, true
}
}
}
func (d *Document) LogToFile(filename string) error {
if d.logger != nil {
return nil
}
var f *os.File
var err error
var mtx sync.Mutex
var closeTimer *time.Timer
err = os.MkdirAll(path.Dir(filename), 0755)
if err != nil {
return err
}
getFile := func() {
if f != nil {
return
}
f, err = os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
closeTimer.Reset(1 * time.Minute)
}
closeFile := func() {
mtx.Lock()
defer mtx.Unlock()
if f != nil {
err = f.Close()
f = nil
}
}
closeTimer = time.AfterFunc(1*time.Minute, closeFile)
getFile()
if err != nil {
return err
}
d.logger = d.Client(DeltaHandlerFunc(func(dls []Delta) error {
mtx.Lock()
defer mtx.Unlock()
getFile()
if err != nil {
return err
}
enc := json.NewEncoder(f)
for _, dl := range dls {
err = enc.Encode(dl)
if err != nil {
return err
}
}
return nil
}))
return nil
}