appstate.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. // Copyright (c) 2022 Tulir Asokan
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this
  5. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package whatsmeow
  7. import (
  8. "context"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "time"
  13. "github.com/rs/zerolog"
  14. "go.mau.fi/whatsmeow/appstate"
  15. waBinary "go.mau.fi/whatsmeow/binary"
  16. "go.mau.fi/whatsmeow/proto/waE2E"
  17. "go.mau.fi/whatsmeow/proto/waServerSync"
  18. "go.mau.fi/whatsmeow/store"
  19. "go.mau.fi/whatsmeow/types"
  20. "go.mau.fi/whatsmeow/types/events"
  21. )
  22. // FetchAppState fetches updates to the given type of app state. If fullSync is true, the current
  23. // cached state will be removed and all app state patches will be re-fetched from the server.
  24. func (cli *Client) FetchAppState(ctx context.Context, name appstate.WAPatchName, fullSync, onlyIfNotSynced bool) error {
  25. eventsToDispatch, err := cli.fetchAppState(ctx, name, fullSync, onlyIfNotSynced)
  26. if err != nil {
  27. return err
  28. }
  29. for _, evt := range eventsToDispatch {
  30. cli.dispatchEvent(evt)
  31. }
  32. return nil
  33. }
  34. func (cli *Client) fetchAppState(ctx context.Context, name appstate.WAPatchName, fullSync, onlyIfNotSynced bool) ([]any, error) {
  35. if cli == nil {
  36. return nil, ErrClientIsNil
  37. }
  38. cli.appStateSyncLock.Lock()
  39. defer cli.appStateSyncLock.Unlock()
  40. if fullSync {
  41. err := cli.Store.AppState.DeleteAppStateVersion(ctx, string(name))
  42. if err != nil {
  43. return nil, fmt.Errorf("failed to reset app state %s version: %w", name, err)
  44. }
  45. }
  46. version, hash, err := cli.Store.AppState.GetAppStateVersion(ctx, string(name))
  47. if err != nil {
  48. return nil, fmt.Errorf("failed to get app state %s version: %w", name, err)
  49. }
  50. if version == 0 {
  51. fullSync = true
  52. } else if onlyIfNotSynced {
  53. return nil, nil
  54. }
  55. state := appstate.HashState{Version: version, Hash: hash}
  56. hasMore := true
  57. wantSnapshot := fullSync
  58. var eventsToDispatch []any
  59. eventsToDispatchPtr := &eventsToDispatch
  60. if fullSync && !cli.EmitAppStateEventsOnFullSync {
  61. eventsToDispatchPtr = nil
  62. }
  63. for hasMore {
  64. patches, err := cli.fetchAppStatePatches(ctx, name, state.Version, wantSnapshot)
  65. wantSnapshot = false
  66. if err != nil {
  67. return nil, fmt.Errorf("failed to fetch app state %s patches: %w", name, err)
  68. }
  69. hasMore = patches.HasMorePatches
  70. state, err = cli.applyAppStatePatches(ctx, name, state, patches, fullSync, eventsToDispatchPtr)
  71. if err != nil {
  72. return nil, err
  73. }
  74. }
  75. if fullSync {
  76. cli.Log.Debugf("Full sync of app state %s completed. Current version: %d", name, state.Version)
  77. eventsToDispatch = append(eventsToDispatch, &events.AppStateSyncComplete{Name: name})
  78. } else {
  79. cli.Log.Debugf("Synced app state %s from version %d to %d", name, version, state.Version)
  80. }
  81. return eventsToDispatch, nil
  82. }
  83. func (cli *Client) applyAppStatePatches(
  84. ctx context.Context,
  85. name appstate.WAPatchName,
  86. state appstate.HashState,
  87. patches *appstate.PatchList,
  88. fullSync bool,
  89. eventsToDispatch *[]any,
  90. ) (appstate.HashState, error) {
  91. mutations, newState, err := cli.appStateProc.DecodePatches(ctx, patches, state, true)
  92. if err != nil {
  93. if errors.Is(err, appstate.ErrKeyNotFound) {
  94. go cli.requestMissingAppStateKeys(context.WithoutCancel(ctx), patches)
  95. }
  96. return state, fmt.Errorf("failed to decode app state %s patches: %w", name, err)
  97. }
  98. wasFullSync := state.Version == 0 && patches.Snapshot != nil
  99. state = newState
  100. if name == appstate.WAPatchCriticalUnblockLow && wasFullSync && !cli.EmitAppStateEventsOnFullSync {
  101. var contacts []store.ContactEntry
  102. mutations, contacts = cli.filterContacts(mutations)
  103. cli.Log.Debugf("Mass inserting app state snapshot with %d contacts into the store", len(contacts))
  104. err = cli.Store.Contacts.PutAllContactNames(ctx, contacts)
  105. if err != nil {
  106. // This is a fairly serious failure, so just abort the whole thing
  107. return state, fmt.Errorf("failed to update contact store with data from snapshot: %v", err)
  108. }
  109. }
  110. for _, mutation := range mutations {
  111. if eventsToDispatch != nil && mutation.Operation == waServerSync.SyncdMutation_SET {
  112. *eventsToDispatch = append(*eventsToDispatch, &events.AppState{Index: mutation.Index, SyncActionValue: mutation.Action})
  113. }
  114. evt := cli.dispatchAppState(ctx, mutation, fullSync)
  115. if eventsToDispatch != nil && evt != nil {
  116. *eventsToDispatch = append(*eventsToDispatch, evt)
  117. }
  118. }
  119. return state, nil
  120. }
  121. func (cli *Client) filterContacts(mutations []appstate.Mutation) ([]appstate.Mutation, []store.ContactEntry) {
  122. filteredMutations := mutations[:0]
  123. contacts := make([]store.ContactEntry, 0, len(mutations))
  124. for _, mutation := range mutations {
  125. if mutation.Index[0] == "contact" && len(mutation.Index) > 1 {
  126. jid, _ := types.ParseJID(mutation.Index[1])
  127. act := mutation.Action.GetContactAction()
  128. contacts = append(contacts, store.ContactEntry{
  129. JID: jid,
  130. FirstName: act.GetFirstName(),
  131. FullName: act.GetFullName(),
  132. })
  133. } else {
  134. filteredMutations = append(filteredMutations, mutation)
  135. }
  136. }
  137. return filteredMutations, contacts
  138. }
  139. func (cli *Client) dispatchAppState(ctx context.Context, mutation appstate.Mutation, fullSync bool) (eventToDispatch any) {
  140. zerolog.Ctx(ctx).Trace().Any("mutation", mutation).Msg("Dispatching app state mutation")
  141. if mutation.Operation != waServerSync.SyncdMutation_SET {
  142. return
  143. }
  144. var jid types.JID
  145. if len(mutation.Index) > 1 {
  146. jid, _ = types.ParseJID(mutation.Index[1])
  147. }
  148. ts := time.UnixMilli(mutation.Action.GetTimestamp())
  149. var storeUpdateError error
  150. switch mutation.Index[0] {
  151. case appstate.IndexMute:
  152. act := mutation.Action.GetMuteAction()
  153. eventToDispatch = &events.Mute{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  154. var mutedUntil time.Time
  155. if act.GetMuted() {
  156. if act.GetMuteEndTimestamp() < 0 {
  157. mutedUntil = store.MutedForever
  158. } else {
  159. mutedUntil = time.UnixMilli(act.GetMuteEndTimestamp())
  160. }
  161. }
  162. if cli.Store.ChatSettings != nil {
  163. storeUpdateError = cli.Store.ChatSettings.PutMutedUntil(ctx, jid, mutedUntil)
  164. }
  165. case appstate.IndexPin:
  166. act := mutation.Action.GetPinAction()
  167. eventToDispatch = &events.Pin{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  168. if cli.Store.ChatSettings != nil {
  169. storeUpdateError = cli.Store.ChatSettings.PutPinned(ctx, jid, act.GetPinned())
  170. }
  171. case appstate.IndexArchive:
  172. act := mutation.Action.GetArchiveChatAction()
  173. eventToDispatch = &events.Archive{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  174. if cli.Store.ChatSettings != nil {
  175. storeUpdateError = cli.Store.ChatSettings.PutArchived(ctx, jid, act.GetArchived())
  176. }
  177. case appstate.IndexContact:
  178. act := mutation.Action.GetContactAction()
  179. eventToDispatch = &events.Contact{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  180. if cli.Store.Contacts != nil {
  181. storeUpdateError = cli.Store.Contacts.PutContactName(ctx, jid, act.GetFirstName(), act.GetFullName())
  182. }
  183. case appstate.IndexClearChat:
  184. act := mutation.Action.GetClearChatAction()
  185. eventToDispatch = &events.ClearChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  186. case appstate.IndexDeleteChat:
  187. act := mutation.Action.GetDeleteChatAction()
  188. eventToDispatch = &events.DeleteChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
  189. case appstate.IndexStar:
  190. if len(mutation.Index) < 5 {
  191. return
  192. }
  193. evt := events.Star{
  194. ChatJID: jid,
  195. MessageID: mutation.Index[2],
  196. Timestamp: ts,
  197. Action: mutation.Action.GetStarAction(),
  198. IsFromMe: mutation.Index[3] == "1",
  199. FromFullSync: fullSync,
  200. }
  201. if mutation.Index[4] != "0" {
  202. evt.SenderJID, _ = types.ParseJID(mutation.Index[4])
  203. }
  204. eventToDispatch = &evt
  205. case appstate.IndexDeleteMessageForMe:
  206. if len(mutation.Index) < 5 {
  207. return
  208. }
  209. evt := events.DeleteForMe{
  210. ChatJID: jid,
  211. MessageID: mutation.Index[2],
  212. Timestamp: ts,
  213. Action: mutation.Action.GetDeleteMessageForMeAction(),
  214. IsFromMe: mutation.Index[3] == "1",
  215. FromFullSync: fullSync,
  216. }
  217. if mutation.Index[4] != "0" {
  218. evt.SenderJID, _ = types.ParseJID(mutation.Index[4])
  219. }
  220. eventToDispatch = &evt
  221. case appstate.IndexMarkChatAsRead:
  222. eventToDispatch = &events.MarkChatAsRead{
  223. JID: jid,
  224. Timestamp: ts,
  225. Action: mutation.Action.GetMarkChatAsReadAction(),
  226. FromFullSync: fullSync,
  227. }
  228. case appstate.IndexSettingPushName:
  229. eventToDispatch = &events.PushNameSetting{
  230. Timestamp: ts,
  231. Action: mutation.Action.GetPushNameSetting(),
  232. FromFullSync: fullSync,
  233. }
  234. cli.Store.PushName = mutation.Action.GetPushNameSetting().GetName()
  235. err := cli.Store.Save(ctx)
  236. if err != nil {
  237. cli.Log.Errorf("Failed to save device store after updating push name: %v", err)
  238. }
  239. case appstate.IndexSettingUnarchiveChats:
  240. eventToDispatch = &events.UnarchiveChatsSetting{
  241. Timestamp: ts,
  242. Action: mutation.Action.GetUnarchiveChatsSetting(),
  243. FromFullSync: fullSync,
  244. }
  245. case appstate.IndexUserStatusMute:
  246. eventToDispatch = &events.UserStatusMute{
  247. JID: jid,
  248. Timestamp: ts,
  249. Action: mutation.Action.GetUserStatusMuteAction(),
  250. FromFullSync: fullSync,
  251. }
  252. case appstate.IndexLabelEdit:
  253. act := mutation.Action.GetLabelEditAction()
  254. eventToDispatch = &events.LabelEdit{
  255. Timestamp: ts,
  256. LabelID: mutation.Index[1],
  257. Action: act,
  258. FromFullSync: fullSync,
  259. }
  260. case appstate.IndexLabelAssociationChat:
  261. if len(mutation.Index) < 3 {
  262. return
  263. }
  264. jid, _ = types.ParseJID(mutation.Index[2])
  265. act := mutation.Action.GetLabelAssociationAction()
  266. eventToDispatch = &events.LabelAssociationChat{
  267. JID: jid,
  268. Timestamp: ts,
  269. LabelID: mutation.Index[1],
  270. Action: act,
  271. FromFullSync: fullSync,
  272. }
  273. case appstate.IndexLabelAssociationMessage:
  274. if len(mutation.Index) < 6 {
  275. return
  276. }
  277. jid, _ = types.ParseJID(mutation.Index[2])
  278. act := mutation.Action.GetLabelAssociationAction()
  279. eventToDispatch = &events.LabelAssociationMessage{
  280. JID: jid,
  281. Timestamp: ts,
  282. LabelID: mutation.Index[1],
  283. MessageID: mutation.Index[3],
  284. Action: act,
  285. FromFullSync: fullSync,
  286. }
  287. }
  288. if storeUpdateError != nil {
  289. cli.Log.Errorf("Failed to update device store after app state mutation: %v", storeUpdateError)
  290. }
  291. return
  292. }
  293. func (cli *Client) downloadExternalAppStateBlob(ctx context.Context, ref *waServerSync.ExternalBlobReference) ([]byte, error) {
  294. return cli.Download(ctx, ref)
  295. }
  296. func (cli *Client) fetchAppStatePatches(ctx context.Context, name appstate.WAPatchName, fromVersion uint64, snapshot bool) (*appstate.PatchList, error) {
  297. attrs := waBinary.Attrs{
  298. "name": string(name),
  299. "return_snapshot": snapshot,
  300. }
  301. if !snapshot {
  302. attrs["version"] = fromVersion
  303. }
  304. resp, err := cli.sendIQ(ctx, infoQuery{
  305. Namespace: "w:sync:app:state",
  306. Type: "set",
  307. To: types.ServerJID,
  308. Content: []waBinary.Node{{
  309. Tag: "sync",
  310. Content: []waBinary.Node{{
  311. Tag: "collection",
  312. Attrs: attrs,
  313. }},
  314. }},
  315. })
  316. if err != nil {
  317. return nil, err
  318. }
  319. collection, ok := resp.GetOptionalChildByTag("sync", "collection")
  320. if !ok {
  321. return nil, &ElementMissingError{Tag: "collection", In: "app state patch response"}
  322. }
  323. return appstate.ParsePatchList(ctx, &collection, cli.downloadExternalAppStateBlob)
  324. }
  325. func (cli *Client) requestMissingAppStateKeys(ctx context.Context, patches *appstate.PatchList) {
  326. cli.appStateKeyRequestsLock.Lock()
  327. rawKeyIDs := cli.appStateProc.GetMissingKeyIDs(ctx, patches)
  328. filteredKeyIDs := make([][]byte, 0, len(rawKeyIDs))
  329. now := time.Now()
  330. for _, keyID := range rawKeyIDs {
  331. stringKeyID := hex.EncodeToString(keyID)
  332. lastRequestTime := cli.appStateKeyRequests[stringKeyID]
  333. if lastRequestTime.IsZero() || lastRequestTime.Add(24*time.Hour).Before(now) {
  334. cli.appStateKeyRequests[stringKeyID] = now
  335. filteredKeyIDs = append(filteredKeyIDs, keyID)
  336. }
  337. }
  338. cli.appStateKeyRequestsLock.Unlock()
  339. cli.requestAppStateKeys(ctx, filteredKeyIDs)
  340. }
  341. func (cli *Client) requestAppStateKeys(ctx context.Context, rawKeyIDs [][]byte) {
  342. keyIDs := make([]*waE2E.AppStateSyncKeyId, len(rawKeyIDs))
  343. debugKeyIDs := make([]string, len(rawKeyIDs))
  344. for i, keyID := range rawKeyIDs {
  345. keyIDs[i] = &waE2E.AppStateSyncKeyId{KeyID: keyID}
  346. debugKeyIDs[i] = hex.EncodeToString(keyID)
  347. }
  348. msg := &waE2E.Message{
  349. ProtocolMessage: &waE2E.ProtocolMessage{
  350. Type: waE2E.ProtocolMessage_APP_STATE_SYNC_KEY_REQUEST.Enum(),
  351. AppStateSyncKeyRequest: &waE2E.AppStateSyncKeyRequest{
  352. KeyIDs: keyIDs,
  353. },
  354. },
  355. }
  356. ownID := cli.getOwnID().ToNonAD()
  357. if ownID.IsEmpty() || len(debugKeyIDs) == 0 {
  358. return
  359. }
  360. cli.Log.Infof("Sending key request for app state keys %+v", debugKeyIDs)
  361. _, err := cli.SendMessage(ctx, ownID, msg, SendRequestExtra{Peer: true})
  362. if err != nil {
  363. cli.Log.Warnf("Failed to send app state key request: %v", err)
  364. }
  365. }
  366. // SendAppState sends the given app state patch, then triggers a background resync of that app state type
  367. // to update local caches and send events for the updates.
  368. //
  369. // You can use the Build methods in the appstate package to build the parameter for this method, e.g.
  370. //
  371. // cli.SendAppState(ctx, appstate.BuildMute(targetJID, true, 24 * time.Hour))
  372. func (cli *Client) SendAppState(ctx context.Context, patch appstate.PatchInfo) error {
  373. return cli.sendAppState(ctx, patch, true)
  374. }
  375. func (cli *Client) sendAppState(ctx context.Context, patch appstate.PatchInfo, allowRetry bool) error {
  376. if cli == nil {
  377. return ErrClientIsNil
  378. }
  379. version, hash, err := cli.Store.AppState.GetAppStateVersion(ctx, string(patch.Type))
  380. if err != nil {
  381. return err
  382. }
  383. // TODO create new key instead of reusing the primary client's keys
  384. latestKeyID, err := cli.Store.AppStateKeys.GetLatestAppStateSyncKeyID(ctx)
  385. if err != nil {
  386. return fmt.Errorf("failed to get latest app state key ID: %w", err)
  387. } else if latestKeyID == nil {
  388. return fmt.Errorf("no app state keys found, creating app state keys is not yet supported")
  389. }
  390. state := appstate.HashState{Version: version, Hash: hash}
  391. encodedPatch, err := cli.appStateProc.EncodePatch(ctx, latestKeyID, state, patch)
  392. if err != nil {
  393. return err
  394. }
  395. resp, err := cli.sendIQ(ctx, infoQuery{
  396. Namespace: "w:sync:app:state",
  397. Type: iqSet,
  398. To: types.ServerJID,
  399. Content: []waBinary.Node{{
  400. Tag: "sync",
  401. Content: []waBinary.Node{{
  402. Tag: "collection",
  403. Attrs: waBinary.Attrs{
  404. "name": string(patch.Type),
  405. "version": version,
  406. "return_snapshot": false,
  407. },
  408. Content: []waBinary.Node{{
  409. Tag: "patch",
  410. Content: encodedPatch,
  411. }},
  412. }},
  413. }},
  414. })
  415. if err != nil {
  416. return err
  417. }
  418. respCollection, ok := resp.GetOptionalChildByTag("sync", "collection")
  419. if !ok {
  420. return &ElementMissingError{Tag: "collection", In: "app state send response"}
  421. }
  422. respCollectionAttr := respCollection.AttrGetter()
  423. if respCollectionAttr.OptionalString("type") == "error" {
  424. errorTag, ok := respCollection.GetOptionalChildByTag("error")
  425. mainErr := fmt.Errorf("%w: %s", ErrAppStateUpdate, respCollection.XMLString())
  426. if ok {
  427. mainErr = fmt.Errorf("%w (%s): %s", ErrAppStateUpdate, patch.Type, errorTag.XMLString())
  428. }
  429. if ok && errorTag.AttrGetter().Int("code") == 409 && allowRetry {
  430. zerolog.Ctx(ctx).Warn().Err(mainErr).Msg("Failed to update app state, trying to apply conflicts and retry")
  431. var eventsToDispatch []any
  432. patches, err := appstate.ParsePatchList(ctx, &respCollection, cli.downloadExternalAppStateBlob)
  433. if err != nil {
  434. return fmt.Errorf("%w (also, parsing patches in the response failed: %w)", mainErr, err)
  435. } else if state, err = cli.applyAppStatePatches(ctx, patch.Type, state, patches, false, &eventsToDispatch); err != nil {
  436. return fmt.Errorf("%w (also, applying patches in the response failed: %w)", mainErr, err)
  437. } else {
  438. zerolog.Ctx(ctx).Debug().Msg("Retrying app state send after applying conflicting patches")
  439. go func() {
  440. for _, evt := range eventsToDispatch {
  441. cli.dispatchEvent(evt)
  442. }
  443. }()
  444. return cli.sendAppState(ctx, patch, false)
  445. }
  446. }
  447. return mainErr
  448. }
  449. eventsToDispatch, err := cli.fetchAppState(ctx, patch.Type, false, false)
  450. if err != nil {
  451. return fmt.Errorf("failed to fetch app state after sending update: %w", err)
  452. }
  453. go func() {
  454. for _, evt := range eventsToDispatch {
  455. cli.dispatchEvent(evt)
  456. }
  457. }()
  458. return nil
  459. }
  460. func (cli *Client) MarkNotDirty(ctx context.Context, cleanType string, ts time.Time) error {
  461. _, err := cli.sendIQ(ctx, infoQuery{
  462. Namespace: "urn:xmpp:whatsapp:dirty",
  463. Type: iqSet,
  464. To: types.ServerJID,
  465. Content: []waBinary.Node{{
  466. Tag: "clean",
  467. Attrs: waBinary.Attrs{
  468. "type": cleanType,
  469. "timestamp": ts.Unix(),
  470. },
  471. }},
  472. })
  473. return err
  474. }