No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

deltas.go 4.5KB


  1. package caddyhugo
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "time"
  7. "git.stephensearles.com/stephen/acedoc"
  8. "github.com/gorilla/websocket"
  9. )
  10. // DeltaConn identifies the methods used in the original websocket implementation.
  11. type DeltaConn interface {
  12. ReadJSON(v interface{}) error
  13. WriteJSON(v interface{}) error
  14. }
  15. func (ch *CaddyHugo) ObserveLTime(ltime uint64) uint64 {
  16. ch.mtx.Lock()
  17. if ch.ltime < ltime {
  18. ch.ltime = ltime
  19. }
  20. ch.mtx.Unlock()
  21. return ch.LTime()
  22. }
  23. func (ch *CaddyHugo) LTime() uint64 {
  24. ch.mtx.Lock()
  25. defer ch.mtx.Unlock()
  26. ch.ltime++
  27. return ch.ltime
  28. }
  29. func (ch *CaddyHugo) ShouldApply(ltime uint64) bool {
  30. ch.mtx.Lock()
  31. defer ch.mtx.Unlock()
  32. if _, ok := ch.confirmingToClient[ltime]; ok {
  33. return false
  34. }
  35. return true
  36. }
  37. // ConfirmLTime marks an ltime as something that should be confirmed with clients
  38. func (ch *CaddyHugo) ConfirmLTime(ltime uint64) {
  39. ch.mtx.Lock()
  40. defer ch.mtx.Unlock()
  41. ch.confirmingToClient[ltime] = struct{}{}
  42. }
  43. // Confirming returns the current list of LTimes that need to be confirmed
  44. // to clients
  45. func (ch *CaddyHugo) Confirming() []uint64 {
  46. var times []uint64
  47. ch.mtx.Lock()
  48. defer ch.mtx.Unlock()
  49. for ltime := range ch.confirmingToClient {
  50. times = append(times, ltime)
  51. }
  52. return times
  53. }
  54. // LowestPendingConfirmation identifies the lowest LTime that needs to be
  55. // confirmed to clients
  56. func (ch *CaddyHugo) LowestPendingConfirmation() uint64 {
  57. var lowest uint64
  58. for _, c := range ch.Confirming() {
  59. if lowest == 0 || c < lowest {
  60. lowest = c
  61. }
  62. }
  63. return lowest
  64. }
  65. func (ch *CaddyHugo) ClearConfirmed(lowestPending uint64) {
  66. ch.mtx.Lock()
  67. defer ch.mtx.Unlock()
  68. for ltime := range ch.confirmingToClient {
  69. if ltime < lowestPending {
  70. delete(ch.confirmingToClient, ltime)
  71. }
  72. }
  73. }
  74. func (ch *CaddyHugo) DeltaWebsocket(w http.ResponseWriter, r *http.Request) (int, error) {
  75. var upgrader = websocket.Upgrader{
  76. ReadBufferSize: 1024,
  77. WriteBufferSize: 1024,
  78. }
  79. conn, err := upgrader.Upgrade(w, r, nil)
  80. if err != nil {
  81. fmt.Println(err)
  82. return http.StatusBadRequest, err
  83. }
  84. conn.SetReadDeadline(time.Time{})
  85. doc, err := ch.editSession(docNameFromEditRequest(r))
  86. if err != nil {
  87. fmt.Println(err)
  88. return http.StatusBadRequest, err
  89. }
  90. return ch.handleDeltaConn(conn, doc)
  91. }
  92. func (ch *CaddyHugo) Message(deltas ...acedoc.Delta) Message {
  93. return Message{
  94. Deltas: deltas,
  95. LTime: ch.LTime(),
  96. Confirmed: ch.Confirming(),
  97. LowestPending: ch.LowestPendingConfirmation(),
  98. }
  99. }
  100. func (ch *CaddyHugo) handleDeltaConn(conn DeltaConn, doc *editSession) (int, error) {
  101. const idlePing = 15 * time.Second
  102. const idlePingShort = 1 * time.Millisecond
  103. errCh := make(chan error)
  104. doneCh := make(chan struct{})
  105. defer func() {
  106. close(doneCh)
  107. close(errCh)
  108. for err := range errCh {
  109. log.Println(err)
  110. }
  111. }()
  112. timer := time.NewTimer(idlePing)
  113. resetTimer := func(d time.Duration) {
  114. if !timer.Stop() {
  115. select {
  116. case <-timer.C:
  117. default:
  118. }
  119. }
  120. timer.Reset(d)
  121. }
  122. wroteMessagesCh := make(chan Message, 2)
  123. client := doc.doc.Client(acedoc.DeltaHandlerFunc(func(ds []acedoc.Delta) error {
  124. m := ch.Message(ds...)
  125. wroteMessagesCh <- m
  126. return conn.WriteJSON(m)
  127. }))
  128. ch.mtx.Lock()
  129. doc.clients++
  130. ch.mtx.Unlock()
  131. defer func() {
  132. ch.mtx.Lock()
  133. client.Close()
  134. doc.clients--
  135. ch.mtx.Unlock()
  136. }()
  137. readMessagesCh := make(chan Message, 2)
  138. go func() {
  139. for {
  140. var message Message
  141. err := conn.ReadJSON(&message)
  142. if err != nil {
  143. errCh <- fmt.Errorf("error reading message from client conn: %v", err)
  144. return
  145. }
  146. if message.LTime != 0 {
  147. ch.ObserveLTime(message.LTime)
  148. }
  149. if len(message.Deltas) == 0 {
  150. continue
  151. }
  152. if !ch.ShouldApply(message.LTime) {
  153. continue
  154. }
  155. err = client.PushDeltas(message.Deltas...)
  156. if err != nil {
  157. errCh <- fmt.Errorf("error pushing deltas into document: %v", err)
  158. return
  159. }
  160. ch.ConfirmLTime(message.LTime)
  161. ch.ClearConfirmed(message.LowestPending)
  162. select {
  163. case readMessagesCh <- message:
  164. case <-doneCh:
  165. return
  166. }
  167. }
  168. }()
  169. for {
  170. select {
  171. case err := <-errCh:
  172. fmt.Println("error handling websocket connection:", err)
  173. return 500, err
  174. default:
  175. }
  176. select {
  177. case <-timer.C:
  178. conn.WriteJSON(ch.Message())
  179. resetTimer(idlePing)
  180. case <-readMessagesCh:
  181. resetTimer(idlePingShort)
  182. case <-wroteMessagesCh:
  183. resetTimer(idlePing)
  184. case <-doneCh:
  185. return 200, nil
  186. }
  187. }
  188. }
  189. type Message struct {
  190. Deltas []acedoc.Delta `json:"deltas"`
  191. LTime uint64 `json:"ltime"`
  192. Confirmed []uint64 `json:"confirmed"`
  193. LowestPending uint64 `json:"lowestPending"`
  194. }