The chatops bot of aventer
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clients.go 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. package clients
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "time"
  9. "../api"
  10. "../database"
  11. "../matrix"
  12. "../metrics"
  13. nlp "../services/nlp"
  14. "../types"
  15. "git.aventer.biz/AVENTER/gomatrix"
  16. shellwords "github.com/mattn/go-shellwords"
  17. log "github.com/sirupsen/logrus"
  18. )
  19. // A Clients is a collection of clients used for bot services.
  20. type Clients struct {
  21. db database.Storer
  22. httpClient *http.Client
  23. dbMutex sync.Mutex
  24. mapMutex sync.Mutex
  25. clients map[string]clientEntry
  26. }
  27. // New makes a new collection of matrix clients
  28. func New(db database.Storer, cli *http.Client) *Clients {
  29. clients := &Clients{
  30. db: db,
  31. httpClient: cli,
  32. clients: make(map[string]clientEntry), // user_id => clientEntry
  33. }
  34. return clients
  35. }
  36. // Client gets a client for the userID
  37. func (c *Clients) Client(userID string) (*gomatrix.Client, error) {
  38. entry := c.getClient(userID)
  39. if entry.client != nil {
  40. return entry.client, nil
  41. }
  42. entry, err := c.loadClientFromDB(userID)
  43. return entry.client, err
  44. }
  45. // Update updates the config for a matrix client
  46. func (c *Clients) Update(config api.ClientConfig) (api.ClientConfig, error) {
  47. _, old, err := c.updateClientInDB(config)
  48. return old.config, err
  49. }
  50. // Start listening on client /sync streams
  51. func (c *Clients) Start() error {
  52. configs, err := c.db.LoadMatrixClientConfigs()
  53. if err != nil {
  54. return err
  55. }
  56. for _, cfg := range configs {
  57. if cfg.Sync {
  58. if _, err := c.Client(cfg.UserID); err != nil {
  59. return err
  60. }
  61. }
  62. }
  63. return nil
  64. }
  65. type clientEntry struct {
  66. config api.ClientConfig
  67. client *gomatrix.Client
  68. }
  69. func (c *Clients) getClient(userID string) clientEntry {
  70. c.mapMutex.Lock()
  71. defer c.mapMutex.Unlock()
  72. return c.clients[userID]
  73. }
  74. func (c *Clients) setClient(client clientEntry) {
  75. c.mapMutex.Lock()
  76. defer c.mapMutex.Unlock()
  77. c.clients[client.config.UserID] = client
  78. }
  79. func (c *Clients) loadClientFromDB(userID string) (entry clientEntry, err error) {
  80. c.dbMutex.Lock()
  81. defer c.dbMutex.Unlock()
  82. entry = c.getClient(userID)
  83. if entry.client != nil {
  84. return
  85. }
  86. if entry.config, err = c.db.LoadMatrixClientConfig(userID); err != nil {
  87. if err == sql.ErrNoRows {
  88. err = fmt.Errorf("client with user ID %s does not exist", userID)
  89. }
  90. return
  91. }
  92. if entry.client, err = c.newClient(entry.config); err != nil {
  93. return
  94. }
  95. c.setClient(entry)
  96. return
  97. }
  98. func (c *Clients) updateClientInDB(newConfig api.ClientConfig) (new clientEntry, old clientEntry, err error) {
  99. c.dbMutex.Lock()
  100. defer c.dbMutex.Unlock()
  101. old = c.getClient(newConfig.UserID)
  102. if old.client != nil && old.config == newConfig {
  103. // Already have a client with that config.
  104. new = old
  105. return
  106. }
  107. new.config = newConfig
  108. if new.client, err = c.newClient(new.config); err != nil {
  109. return
  110. }
  111. // set the new display name if they differ
  112. if old.config.DisplayName != new.config.DisplayName {
  113. if err := new.client.SetDisplayName(new.config.DisplayName); err != nil {
  114. // whine about it but don't stop: this isn't fatal.
  115. log.WithFields(log.Fields{
  116. log.ErrorKey: err,
  117. "displayname": new.config.DisplayName,
  118. "user_id": new.config.UserID,
  119. }).Error("Failed to set display name")
  120. }
  121. }
  122. if old.config, err = c.db.StoreMatrixClientConfig(new.config); err != nil {
  123. new.client.StopSync()
  124. return
  125. }
  126. if old.client != nil {
  127. old.client.StopSync()
  128. return
  129. }
  130. c.setClient(new)
  131. return
  132. }
  133. func (c *Clients) onMessageEvent(client *gomatrix.Client, event *gomatrix.Event) {
  134. services, err := c.db.LoadServicesForUser(client.UserID)
  135. if err != nil {
  136. log.WithFields(log.Fields{
  137. log.ErrorKey: err,
  138. "room_id": event.RoomID,
  139. "service_user_id": client.UserID,
  140. }).Warn("Error loading services")
  141. }
  142. body, ok := event.Body()
  143. if !ok || body == "" {
  144. return
  145. }
  146. // filter m.notice to prevent loops
  147. if msgtype, ok := event.MessageType(); !ok || msgtype == "m.notice" {
  148. return
  149. }
  150. // replace all smart quotes with their normal counterparts so shellwords can parse it
  151. body = strings.Replace(body, `‘`, `'`, -1)
  152. body = strings.Replace(body, `’`, `'`, -1)
  153. body = strings.Replace(body, `“`, `"`, -1)
  154. body = strings.Replace(body, `”`, `"`, -1)
  155. var responses []interface{}
  156. // Ignore everymessage, if its from the bot byself
  157. if event.Sender != client.UserID {
  158. // send every message to the natual language processor
  159. response := nlp.CmdForwardToNLP(event.RoomID, client.UserID, body)
  160. if response != nil {
  161. responses = append(responses, response)
  162. }
  163. for _, service := range services {
  164. if body[0] == '!' { // message is a command
  165. args, err := shellwords.Parse(body[1:])
  166. if err != nil {
  167. args = strings.Split(body[1:], " ")
  168. }
  169. if response := runCommandForService(service.Commands(client), event, args); response != nil {
  170. responses = append(responses, response)
  171. }
  172. } else { // message isn't a command, it might need expanding
  173. expansions := runExpansionsForService(service.Expansions(client), event, body)
  174. responses = append(responses, expansions...)
  175. }
  176. }
  177. for _, content := range responses {
  178. if _, err := client.SendMessageEvent(event.RoomID, "m.room.message", content); err != nil {
  179. log.WithFields(log.Fields{
  180. log.ErrorKey: err,
  181. "room_id": event.RoomID,
  182. "user_id": event.Sender,
  183. "content": content,
  184. }).Print("Failed to send command response")
  185. }
  186. }
  187. }
  188. }
  189. // runCommandForService runs a single command read from a matrix event. Runs
  190. // the matching command with the longest path. Returns the JSON encodable
  191. // content of a single matrix message event to use as a response or nil if no
  192. // response is appropriate.
  193. func runCommandForService(cmds []types.Command, event *gomatrix.Event, arguments []string) interface{} {
  194. var bestMatch *types.Command
  195. for i, command := range cmds {
  196. matches := command.Matches(arguments)
  197. betterMatch := bestMatch == nil || len(bestMatch.Path) < len(command.Path)
  198. if matches && betterMatch {
  199. bestMatch = &cmds[i]
  200. }
  201. }
  202. if bestMatch == nil {
  203. return nil
  204. }
  205. cmdArgs := arguments[len(bestMatch.Path):]
  206. log.WithFields(log.Fields{
  207. "room_id": event.RoomID,
  208. "user_id": event.Sender,
  209. "command": bestMatch.Path,
  210. }).Info("Executing command")
  211. content, err := bestMatch.Command(event.RoomID, event.Sender, cmdArgs)
  212. if err != nil {
  213. if content != nil {
  214. log.WithFields(log.Fields{
  215. log.ErrorKey: err,
  216. "room_id": event.RoomID,
  217. "user_id": event.Sender,
  218. "command": bestMatch.Path,
  219. "args": cmdArgs,
  220. }).Warn("Command returned both error and content.")
  221. }
  222. metrics.IncrementCommand(bestMatch.Path[0], metrics.StatusFailure)
  223. content = gomatrix.TextMessage{"m.notice", err.Error()}
  224. } else {
  225. metrics.IncrementCommand(bestMatch.Path[0], metrics.StatusSuccess)
  226. }
  227. return content
  228. }
  229. // run the expansions for a matrix event.
  230. func runExpansionsForService(expans []types.Expansion, event *gomatrix.Event, body string) []interface{} {
  231. var responses []interface{}
  232. for _, expansion := range expans {
  233. matches := map[string]bool{}
  234. for _, matchingGroups := range expansion.Regexp.FindAllStringSubmatch(body, -1) {
  235. matchingText := matchingGroups[0] // first element is always the complete match
  236. if matches[matchingText] {
  237. // Only expand the first occurrence of a matching string
  238. continue
  239. }
  240. matches[matchingText] = true
  241. if response := expansion.Expand(event.RoomID, event.Sender, matchingGroups); response != nil {
  242. responses = append(responses, response)
  243. }
  244. }
  245. }
  246. return responses
  247. }
  248. func (c *Clients) onBotOptionsEvent(client *gomatrix.Client, event *gomatrix.Event) {
  249. // see if these options are for us. The state key is the user ID with a leading _
  250. // to get around restrictions in the HS about having user IDs as state keys.
  251. targetUserID := strings.TrimPrefix(*event.StateKey, "_")
  252. if targetUserID != client.UserID {
  253. return
  254. }
  255. // these options fully clobber what was there previously.
  256. opts := types.BotOptions{
  257. UserID: client.UserID,
  258. RoomID: event.RoomID,
  259. SetByUserID: event.Sender,
  260. Options: event.Content,
  261. }
  262. if _, err := c.db.StoreBotOptions(opts); err != nil {
  263. log.WithFields(log.Fields{
  264. log.ErrorKey: err,
  265. "room_id": event.RoomID,
  266. "bot_user_id": client.UserID,
  267. "set_by_user_id": event.Sender,
  268. }).Error("Failed to persist bot options")
  269. }
  270. }
  271. func (c *Clients) onRoomMemberEvent(client *gomatrix.Client, event *gomatrix.Event) {
  272. if *event.StateKey != client.UserID {
  273. return // not our member event
  274. }
  275. m := event.Content["membership"]
  276. membership, ok := m.(string)
  277. if !ok {
  278. return
  279. }
  280. if membership == "invite" {
  281. logger := log.WithFields(log.Fields{
  282. "room_id": event.RoomID,
  283. "service_user_id": client.UserID,
  284. "inviter": event.Sender,
  285. })
  286. logger.Print("Accepting invite from user")
  287. content := struct {
  288. Inviter string `json:"inviter"`
  289. }{event.Sender}
  290. if _, err := client.JoinRoom(event.RoomID, "", content); err != nil {
  291. logger.WithError(err).Print("Failed to join room")
  292. } else {
  293. logger.Print("Joined room")
  294. }
  295. }
  296. }
  297. func (c *Clients) newClient(config api.ClientConfig) (*gomatrix.Client, error) {
  298. client, err := gomatrix.NewClient(config.HomeserverURL, config.UserID, config.AccessToken)
  299. if err != nil {
  300. return nil, err
  301. }
  302. client.Client = c.httpClient
  303. syncer := client.Syncer.(*gomatrix.DefaultSyncer)
  304. nebStore := &matrix.NEBStore{
  305. InMemoryStore: *gomatrix.NewInMemoryStore(),
  306. Database: c.db,
  307. ClientConfig: config,
  308. }
  309. client.Store = nebStore
  310. syncer.Store = nebStore
  311. // TODO: Check that the access token is valid for the userID by peforming
  312. // a request against the server.
  313. syncer.OnEventType("m.room.message", func(event *gomatrix.Event) {
  314. c.onMessageEvent(client, event)
  315. })
  316. syncer.OnEventType("m.room.bot.options", func(event *gomatrix.Event) {
  317. c.onBotOptionsEvent(client, event)
  318. })
  319. if config.AutoJoinRooms {
  320. syncer.OnEventType("m.room.member", func(event *gomatrix.Event) {
  321. c.onRoomMemberEvent(client, event)
  322. })
  323. }
  324. log.WithFields(log.Fields{
  325. "user_id": config.UserID,
  326. "sync": config.Sync,
  327. "auto_join_rooms": config.AutoJoinRooms,
  328. "since": nebStore.LoadNextBatch(config.UserID),
  329. }).Info("Created new client")
  330. if config.Sync {
  331. go func() {
  332. for {
  333. if e := client.Sync(); e != nil {
  334. log.WithFields(log.Fields{
  335. log.ErrorKey: e,
  336. "user_id": config.UserID,
  337. }).Error("Fatal Sync() error")
  338. time.Sleep(10 * time.Second)
  339. } else {
  340. log.WithField("user_id", config.UserID).Info("Stopping Sync()")
  341. return
  342. }
  343. }
  344. }()
  345. }
  346. return client, nil
  347. }