| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- // Copyright (c) 2022 Tulir Asokan
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this
- // file, You can obtain one at http://mozilla.org/MPL/2.0/.
- package whatsmeow
- import (
- "context"
- "encoding/hex"
- "errors"
- "fmt"
- "time"
- "github.com/rs/zerolog"
- "git.bobomao.top/joey/testwh/appstate"
- waBinary "git.bobomao.top/joey/testwh/binary"
- "git.bobomao.top/joey/testwh/proto/waE2E"
- "git.bobomao.top/joey/testwh/proto/waServerSync"
- "git.bobomao.top/joey/testwh/store"
- "git.bobomao.top/joey/testwh/types"
- "git.bobomao.top/joey/testwh/types/events"
- )
- // FetchAppState fetches updates to the given type of app state. If fullSync is true, the current
- // cached state will be removed and all app state patches will be re-fetched from the server.
- func (cli *Client) FetchAppState(ctx context.Context, name appstate.WAPatchName, fullSync, onlyIfNotSynced bool) error {
- eventsToDispatch, err := cli.fetchAppState(ctx, name, fullSync, onlyIfNotSynced)
- if err != nil {
- return err
- }
- for _, evt := range eventsToDispatch {
- cli.dispatchEvent(evt)
- }
- return nil
- }
- func (cli *Client) fetchAppState(ctx context.Context, name appstate.WAPatchName, fullSync, onlyIfNotSynced bool) ([]any, error) {
- if cli == nil {
- return nil, ErrClientIsNil
- }
- cli.appStateSyncLock.Lock()
- defer cli.appStateSyncLock.Unlock()
- if fullSync {
- err := cli.Store.AppState.DeleteAppStateVersion(ctx, string(name))
- if err != nil {
- return nil, fmt.Errorf("failed to reset app state %s version: %w", name, err)
- }
- }
- version, hash, err := cli.Store.AppState.GetAppStateVersion(ctx, string(name))
- if err != nil {
- return nil, fmt.Errorf("failed to get app state %s version: %w", name, err)
- }
- if version == 0 {
- fullSync = true
- } else if onlyIfNotSynced {
- return nil, nil
- }
- state := appstate.HashState{Version: version, Hash: hash}
- hasMore := true
- wantSnapshot := fullSync
- var eventsToDispatch []any
- eventsToDispatchPtr := &eventsToDispatch
- if fullSync && !cli.EmitAppStateEventsOnFullSync {
- eventsToDispatchPtr = nil
- }
- for hasMore {
- patches, err := cli.fetchAppStatePatches(ctx, name, state.Version, wantSnapshot)
- wantSnapshot = false
- if err != nil {
- return nil, fmt.Errorf("failed to fetch app state %s patches: %w", name, err)
- }
- hasMore = patches.HasMorePatches
- state, err = cli.applyAppStatePatches(ctx, name, state, patches, fullSync, eventsToDispatchPtr)
- if err != nil {
- return nil, err
- }
- }
- if fullSync {
- cli.Log.Debugf("Full sync of app state %s completed. Current version: %d", name, state.Version)
- eventsToDispatch = append(eventsToDispatch, &events.AppStateSyncComplete{Name: name})
- } else {
- cli.Log.Debugf("Synced app state %s from version %d to %d", name, version, state.Version)
- }
- return eventsToDispatch, nil
- }
- func (cli *Client) applyAppStatePatches(
- ctx context.Context,
- name appstate.WAPatchName,
- state appstate.HashState,
- patches *appstate.PatchList,
- fullSync bool,
- eventsToDispatch *[]any,
- ) (appstate.HashState, error) {
- mutations, newState, err := cli.appStateProc.DecodePatches(ctx, patches, state, true)
- if err != nil {
- if errors.Is(err, appstate.ErrKeyNotFound) {
- go cli.requestMissingAppStateKeys(context.WithoutCancel(ctx), patches)
- }
- return state, fmt.Errorf("failed to decode app state %s patches: %w", name, err)
- }
- wasFullSync := state.Version == 0 && patches.Snapshot != nil
- state = newState
- if name == appstate.WAPatchCriticalUnblockLow && wasFullSync && !cli.EmitAppStateEventsOnFullSync {
- var contacts []store.ContactEntry
- mutations, contacts = cli.filterContacts(mutations)
- cli.Log.Debugf("Mass inserting app state snapshot with %d contacts into the store", len(contacts))
- err = cli.Store.Contacts.PutAllContactNames(ctx, contacts)
- if err != nil {
- // This is a fairly serious failure, so just abort the whole thing
- return state, fmt.Errorf("failed to update contact store with data from snapshot: %v", err)
- }
- }
- for _, mutation := range mutations {
- if eventsToDispatch != nil && mutation.Operation == waServerSync.SyncdMutation_SET {
- *eventsToDispatch = append(*eventsToDispatch, &events.AppState{Index: mutation.Index, SyncActionValue: mutation.Action})
- }
- evt := cli.dispatchAppState(ctx, mutation, fullSync)
- if eventsToDispatch != nil && evt != nil {
- *eventsToDispatch = append(*eventsToDispatch, evt)
- }
- }
- return state, nil
- }
- func (cli *Client) filterContacts(mutations []appstate.Mutation) ([]appstate.Mutation, []store.ContactEntry) {
- filteredMutations := mutations[:0]
- contacts := make([]store.ContactEntry, 0, len(mutations))
- for _, mutation := range mutations {
- if mutation.Index[0] == "contact" && len(mutation.Index) > 1 {
- jid, _ := types.ParseJID(mutation.Index[1])
- act := mutation.Action.GetContactAction()
- contacts = append(contacts, store.ContactEntry{
- JID: jid,
- FirstName: act.GetFirstName(),
- FullName: act.GetFullName(),
- })
- } else {
- filteredMutations = append(filteredMutations, mutation)
- }
- }
- return filteredMutations, contacts
- }
- func (cli *Client) dispatchAppState(ctx context.Context, mutation appstate.Mutation, fullSync bool) (eventToDispatch any) {
- zerolog.Ctx(ctx).Trace().Any("mutation", mutation).Msg("Dispatching app state mutation")
- if mutation.Operation != waServerSync.SyncdMutation_SET {
- return
- }
- var jid types.JID
- if len(mutation.Index) > 1 {
- jid, _ = types.ParseJID(mutation.Index[1])
- }
- ts := time.UnixMilli(mutation.Action.GetTimestamp())
- var storeUpdateError error
- switch mutation.Index[0] {
- case appstate.IndexMute:
- act := mutation.Action.GetMuteAction()
- eventToDispatch = &events.Mute{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- var mutedUntil time.Time
- if act.GetMuted() {
- if act.GetMuteEndTimestamp() < 0 {
- mutedUntil = store.MutedForever
- } else {
- mutedUntil = time.UnixMilli(act.GetMuteEndTimestamp())
- }
- }
- if cli.Store.ChatSettings != nil {
- storeUpdateError = cli.Store.ChatSettings.PutMutedUntil(ctx, jid, mutedUntil)
- }
- case appstate.IndexPin:
- act := mutation.Action.GetPinAction()
- eventToDispatch = &events.Pin{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- if cli.Store.ChatSettings != nil {
- storeUpdateError = cli.Store.ChatSettings.PutPinned(ctx, jid, act.GetPinned())
- }
- case appstate.IndexArchive:
- act := mutation.Action.GetArchiveChatAction()
- eventToDispatch = &events.Archive{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- if cli.Store.ChatSettings != nil {
- storeUpdateError = cli.Store.ChatSettings.PutArchived(ctx, jid, act.GetArchived())
- }
- case appstate.IndexContact:
- act := mutation.Action.GetContactAction()
- eventToDispatch = &events.Contact{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- if cli.Store.Contacts != nil {
- storeUpdateError = cli.Store.Contacts.PutContactName(ctx, jid, act.GetFirstName(), act.GetFullName())
- }
- case appstate.IndexClearChat:
- act := mutation.Action.GetClearChatAction()
- eventToDispatch = &events.ClearChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- case appstate.IndexDeleteChat:
- act := mutation.Action.GetDeleteChatAction()
- eventToDispatch = &events.DeleteChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
- case appstate.IndexStar:
- if len(mutation.Index) < 5 {
- return
- }
- evt := events.Star{
- ChatJID: jid,
- MessageID: mutation.Index[2],
- Timestamp: ts,
- Action: mutation.Action.GetStarAction(),
- IsFromMe: mutation.Index[3] == "1",
- FromFullSync: fullSync,
- }
- if mutation.Index[4] != "0" {
- evt.SenderJID, _ = types.ParseJID(mutation.Index[4])
- }
- eventToDispatch = &evt
- case appstate.IndexDeleteMessageForMe:
- if len(mutation.Index) < 5 {
- return
- }
- evt := events.DeleteForMe{
- ChatJID: jid,
- MessageID: mutation.Index[2],
- Timestamp: ts,
- Action: mutation.Action.GetDeleteMessageForMeAction(),
- IsFromMe: mutation.Index[3] == "1",
- FromFullSync: fullSync,
- }
- if mutation.Index[4] != "0" {
- evt.SenderJID, _ = types.ParseJID(mutation.Index[4])
- }
- eventToDispatch = &evt
- case appstate.IndexMarkChatAsRead:
- eventToDispatch = &events.MarkChatAsRead{
- JID: jid,
- Timestamp: ts,
- Action: mutation.Action.GetMarkChatAsReadAction(),
- FromFullSync: fullSync,
- }
- case appstate.IndexSettingPushName:
- eventToDispatch = &events.PushNameSetting{
- Timestamp: ts,
- Action: mutation.Action.GetPushNameSetting(),
- FromFullSync: fullSync,
- }
- cli.Store.PushName = mutation.Action.GetPushNameSetting().GetName()
- err := cli.Store.Save(ctx)
- if err != nil {
- cli.Log.Errorf("Failed to save device store after updating push name: %v", err)
- }
- case appstate.IndexSettingUnarchiveChats:
- eventToDispatch = &events.UnarchiveChatsSetting{
- Timestamp: ts,
- Action: mutation.Action.GetUnarchiveChatsSetting(),
- FromFullSync: fullSync,
- }
- case appstate.IndexUserStatusMute:
- eventToDispatch = &events.UserStatusMute{
- JID: jid,
- Timestamp: ts,
- Action: mutation.Action.GetUserStatusMuteAction(),
- FromFullSync: fullSync,
- }
- case appstate.IndexLabelEdit:
- act := mutation.Action.GetLabelEditAction()
- eventToDispatch = &events.LabelEdit{
- Timestamp: ts,
- LabelID: mutation.Index[1],
- Action: act,
- FromFullSync: fullSync,
- }
- case appstate.IndexLabelAssociationChat:
- if len(mutation.Index) < 3 {
- return
- }
- jid, _ = types.ParseJID(mutation.Index[2])
- act := mutation.Action.GetLabelAssociationAction()
- eventToDispatch = &events.LabelAssociationChat{
- JID: jid,
- Timestamp: ts,
- LabelID: mutation.Index[1],
- Action: act,
- FromFullSync: fullSync,
- }
- case appstate.IndexLabelAssociationMessage:
- if len(mutation.Index) < 6 {
- return
- }
- jid, _ = types.ParseJID(mutation.Index[2])
- act := mutation.Action.GetLabelAssociationAction()
- eventToDispatch = &events.LabelAssociationMessage{
- JID: jid,
- Timestamp: ts,
- LabelID: mutation.Index[1],
- MessageID: mutation.Index[3],
- Action: act,
- FromFullSync: fullSync,
- }
- }
- if storeUpdateError != nil {
- cli.Log.Errorf("Failed to update device store after app state mutation: %v", storeUpdateError)
- }
- return
- }
- func (cli *Client) downloadExternalAppStateBlob(ctx context.Context, ref *waServerSync.ExternalBlobReference) ([]byte, error) {
- return cli.Download(ctx, ref)
- }
- func (cli *Client) fetchAppStatePatches(ctx context.Context, name appstate.WAPatchName, fromVersion uint64, snapshot bool) (*appstate.PatchList, error) {
- attrs := waBinary.Attrs{
- "name": string(name),
- "return_snapshot": snapshot,
- }
- if !snapshot {
- attrs["version"] = fromVersion
- }
- resp, err := cli.sendIQ(ctx, infoQuery{
- Namespace: "w:sync:app:state",
- Type: "set",
- To: types.ServerJID,
- Content: []waBinary.Node{{
- Tag: "sync",
- Content: []waBinary.Node{{
- Tag: "collection",
- Attrs: attrs,
- }},
- }},
- })
- if err != nil {
- return nil, err
- }
- collection, ok := resp.GetOptionalChildByTag("sync", "collection")
- if !ok {
- return nil, &ElementMissingError{Tag: "collection", In: "app state patch response"}
- }
- return appstate.ParsePatchList(ctx, &collection, cli.downloadExternalAppStateBlob)
- }
- func (cli *Client) requestMissingAppStateKeys(ctx context.Context, patches *appstate.PatchList) {
- cli.appStateKeyRequestsLock.Lock()
- rawKeyIDs := cli.appStateProc.GetMissingKeyIDs(ctx, patches)
- filteredKeyIDs := make([][]byte, 0, len(rawKeyIDs))
- now := time.Now()
- for _, keyID := range rawKeyIDs {
- stringKeyID := hex.EncodeToString(keyID)
- lastRequestTime := cli.appStateKeyRequests[stringKeyID]
- if lastRequestTime.IsZero() || lastRequestTime.Add(24*time.Hour).Before(now) {
- cli.appStateKeyRequests[stringKeyID] = now
- filteredKeyIDs = append(filteredKeyIDs, keyID)
- }
- }
- cli.appStateKeyRequestsLock.Unlock()
- cli.requestAppStateKeys(ctx, filteredKeyIDs)
- }
- func (cli *Client) requestAppStateKeys(ctx context.Context, rawKeyIDs [][]byte) {
- keyIDs := make([]*waE2E.AppStateSyncKeyId, len(rawKeyIDs))
- debugKeyIDs := make([]string, len(rawKeyIDs))
- for i, keyID := range rawKeyIDs {
- keyIDs[i] = &waE2E.AppStateSyncKeyId{KeyID: keyID}
- debugKeyIDs[i] = hex.EncodeToString(keyID)
- }
- msg := &waE2E.Message{
- ProtocolMessage: &waE2E.ProtocolMessage{
- Type: waE2E.ProtocolMessage_APP_STATE_SYNC_KEY_REQUEST.Enum(),
- AppStateSyncKeyRequest: &waE2E.AppStateSyncKeyRequest{
- KeyIDs: keyIDs,
- },
- },
- }
- ownID := cli.getOwnID().ToNonAD()
- if ownID.IsEmpty() || len(debugKeyIDs) == 0 {
- return
- }
- cli.Log.Infof("Sending key request for app state keys %+v", debugKeyIDs)
- _, err := cli.SendMessage(ctx, ownID, msg, SendRequestExtra{Peer: true})
- if err != nil {
- cli.Log.Warnf("Failed to send app state key request: %v", err)
- }
- }
- // SendAppState sends the given app state patch, then triggers a background resync of that app state type
- // to update local caches and send events for the updates.
- //
- // You can use the Build methods in the appstate package to build the parameter for this method, e.g.
- //
- // cli.SendAppState(ctx, appstate.BuildMute(targetJID, true, 24 * time.Hour))
- func (cli *Client) SendAppState(ctx context.Context, patch appstate.PatchInfo) error {
- return cli.sendAppState(ctx, patch, true)
- }
- func (cli *Client) sendAppState(ctx context.Context, patch appstate.PatchInfo, allowRetry bool) error {
- if cli == nil {
- return ErrClientIsNil
- }
- version, hash, err := cli.Store.AppState.GetAppStateVersion(ctx, string(patch.Type))
- if err != nil {
- return err
- }
- // TODO create new key instead of reusing the primary client's keys
- latestKeyID, err := cli.Store.AppStateKeys.GetLatestAppStateSyncKeyID(ctx)
- if err != nil {
- return fmt.Errorf("failed to get latest app state key ID: %w", err)
- } else if latestKeyID == nil {
- return fmt.Errorf("no app state keys found, creating app state keys is not yet supported")
- }
- state := appstate.HashState{Version: version, Hash: hash}
- encodedPatch, err := cli.appStateProc.EncodePatch(ctx, latestKeyID, state, patch)
- if err != nil {
- return err
- }
- resp, err := cli.sendIQ(ctx, infoQuery{
- Namespace: "w:sync:app:state",
- Type: iqSet,
- To: types.ServerJID,
- Content: []waBinary.Node{{
- Tag: "sync",
- Content: []waBinary.Node{{
- Tag: "collection",
- Attrs: waBinary.Attrs{
- "name": string(patch.Type),
- "version": version,
- "return_snapshot": false,
- },
- Content: []waBinary.Node{{
- Tag: "patch",
- Content: encodedPatch,
- }},
- }},
- }},
- })
- if err != nil {
- return err
- }
- respCollection, ok := resp.GetOptionalChildByTag("sync", "collection")
- if !ok {
- return &ElementMissingError{Tag: "collection", In: "app state send response"}
- }
- respCollectionAttr := respCollection.AttrGetter()
- if respCollectionAttr.OptionalString("type") == "error" {
- errorTag, ok := respCollection.GetOptionalChildByTag("error")
- mainErr := fmt.Errorf("%w: %s", ErrAppStateUpdate, respCollection.XMLString())
- if ok {
- mainErr = fmt.Errorf("%w (%s): %s", ErrAppStateUpdate, patch.Type, errorTag.XMLString())
- }
- if ok && errorTag.AttrGetter().Int("code") == 409 && allowRetry {
- zerolog.Ctx(ctx).Warn().Err(mainErr).Msg("Failed to update app state, trying to apply conflicts and retry")
- var eventsToDispatch []any
- patches, err := appstate.ParsePatchList(ctx, &respCollection, cli.downloadExternalAppStateBlob)
- if err != nil {
- return fmt.Errorf("%w (also, parsing patches in the response failed: %w)", mainErr, err)
- } else if state, err = cli.applyAppStatePatches(ctx, patch.Type, state, patches, false, &eventsToDispatch); err != nil {
- return fmt.Errorf("%w (also, applying patches in the response failed: %w)", mainErr, err)
- } else {
- zerolog.Ctx(ctx).Debug().Msg("Retrying app state send after applying conflicting patches")
- go func() {
- for _, evt := range eventsToDispatch {
- cli.dispatchEvent(evt)
- }
- }()
- return cli.sendAppState(ctx, patch, false)
- }
- }
- return mainErr
- }
- eventsToDispatch, err := cli.fetchAppState(ctx, patch.Type, false, false)
- if err != nil {
- return fmt.Errorf("failed to fetch app state after sending update: %w", err)
- }
- go func() {
- for _, evt := range eventsToDispatch {
- cli.dispatchEvent(evt)
- }
- }()
- return nil
- }
- func (cli *Client) MarkNotDirty(ctx context.Context, cleanType string, ts time.Time) error {
- _, err := cli.sendIQ(ctx, infoQuery{
- Namespace: "urn:xmpp:whatsapp:dirty",
- Type: iqSet,
- To: types.ServerJID,
- Content: []waBinary.Node{{
- Tag: "clean",
- Attrs: waBinary.Attrs{
- "type": cleanType,
- "timestamp": ts.Unix(),
- },
- }},
- })
- return err
- }
|