retry.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. // Copyright (c) 2021 Tulir Asokan
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this
  5. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package whatsmeow
  7. import (
  8. "context"
  9. "crypto/hmac"
  10. "crypto/sha256"
  11. "encoding/binary"
  12. "fmt"
  13. "time"
  14. "go.mau.fi/libsignal/ecc"
  15. "go.mau.fi/libsignal/groups"
  16. "go.mau.fi/libsignal/keys/prekey"
  17. "go.mau.fi/libsignal/protocol"
  18. "google.golang.org/protobuf/proto"
  19. waBinary "go.mau.fi/whatsmeow/binary"
  20. "go.mau.fi/whatsmeow/proto/waCommon"
  21. "go.mau.fi/whatsmeow/proto/waConsumerApplication"
  22. "go.mau.fi/whatsmeow/proto/waE2E"
  23. "go.mau.fi/whatsmeow/proto/waMsgApplication"
  24. "go.mau.fi/whatsmeow/proto/waMsgTransport"
  25. "go.mau.fi/whatsmeow/types"
  26. "go.mau.fi/whatsmeow/types/events"
  27. )
  28. // Number of sent messages to cache in memory for handling retry receipts.
  29. const recentMessagesSize = 256
  30. type recentMessageKey struct {
  31. To types.JID
  32. ID types.MessageID
  33. }
  34. type RecentMessage struct {
  35. wa *waE2E.Message
  36. fb *waMsgApplication.MessageApplication
  37. }
  38. func (rm RecentMessage) IsEmpty() bool {
  39. return rm.wa == nil && rm.fb == nil
  40. }
  41. func (cli *Client) addRecentMessage(to types.JID, id types.MessageID, wa *waE2E.Message, fb *waMsgApplication.MessageApplication) {
  42. cli.recentMessagesLock.Lock()
  43. key := recentMessageKey{to, id}
  44. if cli.recentMessagesList[cli.recentMessagesPtr].ID != "" {
  45. delete(cli.recentMessagesMap, cli.recentMessagesList[cli.recentMessagesPtr])
  46. }
  47. cli.recentMessagesMap[key] = RecentMessage{wa: wa, fb: fb}
  48. cli.recentMessagesList[cli.recentMessagesPtr] = key
  49. cli.recentMessagesPtr++
  50. if cli.recentMessagesPtr >= len(cli.recentMessagesList) {
  51. cli.recentMessagesPtr = 0
  52. }
  53. cli.recentMessagesLock.Unlock()
  54. }
  55. func (cli *Client) getRecentMessage(to types.JID, id types.MessageID) RecentMessage {
  56. cli.recentMessagesLock.RLock()
  57. msg, _ := cli.recentMessagesMap[recentMessageKey{to, id}]
  58. cli.recentMessagesLock.RUnlock()
  59. return msg
  60. }
  61. func (cli *Client) getMessageForRetry(ctx context.Context, receipt *events.Receipt, messageID types.MessageID) (RecentMessage, error) {
  62. msg := cli.getRecentMessage(receipt.Chat, messageID)
  63. if msg.IsEmpty() {
  64. waMsg := cli.GetMessageForRetry(receipt.Sender, receipt.Chat, messageID)
  65. if waMsg == nil {
  66. return RecentMessage{}, fmt.Errorf("couldn't find message %s", messageID)
  67. } else {
  68. cli.Log.Debugf("Found message in GetMessageForRetry to accept retry receipt for %s/%s from %s", receipt.Chat, messageID, receipt.Sender)
  69. }
  70. msg = RecentMessage{wa: waMsg}
  71. } else {
  72. cli.Log.Debugf("Found message in local cache to accept retry receipt for %s/%s from %s", receipt.Chat, messageID, receipt.Sender)
  73. }
  74. return msg, nil
  75. }
  76. const recreateSessionTimeout = 1 * time.Hour
  77. func (cli *Client) shouldRecreateSession(ctx context.Context, retryCount int, jid types.JID) (reason string, recreate bool) {
  78. cli.sessionRecreateHistoryLock.Lock()
  79. defer cli.sessionRecreateHistoryLock.Unlock()
  80. if contains, err := cli.Store.ContainsSession(ctx, jid.SignalAddress()); err != nil {
  81. return "", false
  82. } else if !contains {
  83. cli.sessionRecreateHistory[jid] = time.Now()
  84. return "we don't have a Signal session with them", true
  85. } else if retryCount < 2 {
  86. return "", false
  87. }
  88. prevTime, ok := cli.sessionRecreateHistory[jid]
  89. if !ok || prevTime.Add(recreateSessionTimeout).Before(time.Now()) {
  90. cli.sessionRecreateHistory[jid] = time.Now()
  91. return "retry count > 1 and over an hour since last recreation", true
  92. }
  93. return "", false
  94. }
  95. type incomingRetryKey struct {
  96. jid types.JID
  97. messageID types.MessageID
  98. }
  99. // handleRetryReceipt handles an incoming retry receipt for an outgoing message.
  100. func (cli *Client) handleRetryReceipt(ctx context.Context, receipt *events.Receipt, node *waBinary.Node) error {
  101. retryChild, ok := node.GetOptionalChildByTag("retry")
  102. if !ok {
  103. return &ElementMissingError{Tag: "retry", In: "retry receipt"}
  104. }
  105. ag := retryChild.AttrGetter()
  106. messageID := ag.String("id")
  107. timestamp := ag.UnixTime("t")
  108. retryCount := ag.Int("count")
  109. if !ag.OK() {
  110. return ag.Error()
  111. }
  112. msg, err := cli.getMessageForRetry(ctx, receipt, messageID)
  113. if err != nil {
  114. return err
  115. }
  116. var fbConsumerMsg *waConsumerApplication.ConsumerApplication
  117. if msg.fb != nil {
  118. subProto, ok := msg.fb.GetPayload().GetSubProtocol().GetSubProtocol().(*waMsgApplication.MessageApplication_SubProtocolPayload_ConsumerMessage)
  119. if ok {
  120. fbConsumerMsg, err = subProto.Decode()
  121. if err != nil {
  122. return fmt.Errorf("failed to decode consumer message for retry: %w", err)
  123. }
  124. }
  125. }
  126. retryKey := incomingRetryKey{receipt.Sender, messageID}
  127. cli.incomingRetryRequestCounterLock.Lock()
  128. cli.incomingRetryRequestCounter[retryKey]++
  129. internalCounter := cli.incomingRetryRequestCounter[retryKey]
  130. cli.incomingRetryRequestCounterLock.Unlock()
  131. if internalCounter >= 10 {
  132. cli.Log.Warnf("Dropping retry request from %s for %s: internal retry counter is %d", messageID, receipt.Sender, internalCounter)
  133. return nil
  134. }
  135. var fbSKDM *waMsgTransport.MessageTransport_Protocol_Ancillary_SenderKeyDistributionMessage
  136. var fbDSM *waMsgTransport.MessageTransport_Protocol_Integral_DeviceSentMessage
  137. if receipt.IsGroup {
  138. builder := groups.NewGroupSessionBuilder(cli.Store, pbSerializer)
  139. senderKeyName := protocol.NewSenderKeyName(receipt.Chat.String(), cli.getOwnLID().SignalAddress())
  140. signalSKDMessage, err := builder.Create(ctx, senderKeyName)
  141. if err != nil {
  142. cli.Log.Warnf("Failed to create sender key distribution message to include in retry of %s in %s to %s: %v", messageID, receipt.Chat, receipt.Sender, err)
  143. } else if msg.wa != nil {
  144. msg.wa.SenderKeyDistributionMessage = &waE2E.SenderKeyDistributionMessage{
  145. GroupID: proto.String(receipt.Chat.String()),
  146. AxolotlSenderKeyDistributionMessage: signalSKDMessage.Serialize(),
  147. }
  148. } else {
  149. fbSKDM = &waMsgTransport.MessageTransport_Protocol_Ancillary_SenderKeyDistributionMessage{
  150. GroupID: proto.String(receipt.Chat.String()),
  151. AxolotlSenderKeyDistributionMessage: signalSKDMessage.Serialize(),
  152. }
  153. }
  154. } else if receipt.IsFromMe {
  155. if msg.wa != nil {
  156. msg.wa = &waE2E.Message{
  157. DeviceSentMessage: &waE2E.DeviceSentMessage{
  158. DestinationJID: proto.String(receipt.Chat.String()),
  159. Message: msg.wa,
  160. },
  161. }
  162. } else {
  163. fbDSM = &waMsgTransport.MessageTransport_Protocol_Integral_DeviceSentMessage{
  164. DestinationJID: proto.String(receipt.Chat.String()),
  165. }
  166. }
  167. }
  168. // TODO pre-retry callback for fb
  169. if cli.PreRetryCallback != nil && !cli.PreRetryCallback(receipt, messageID, retryCount, msg.wa) {
  170. cli.Log.Debugf("Cancelled retry receipt in PreRetryCallback")
  171. return nil
  172. }
  173. var plaintext, frankingTag []byte
  174. if msg.wa != nil {
  175. plaintext, err = proto.Marshal(msg.wa)
  176. if err != nil {
  177. return fmt.Errorf("failed to marshal message: %w", err)
  178. }
  179. } else {
  180. plaintext, err = proto.Marshal(msg.fb)
  181. if err != nil {
  182. return fmt.Errorf("failed to marshal consumer message: %w", err)
  183. }
  184. frankingHash := hmac.New(sha256.New, msg.fb.GetMetadata().GetFrankingKey())
  185. frankingHash.Write(plaintext)
  186. frankingTag = frankingHash.Sum(nil)
  187. }
  188. _, hasKeys := node.GetOptionalChildByTag("keys")
  189. var bundle *prekey.Bundle
  190. if hasKeys {
  191. bundle, err = nodeToPreKeyBundle(uint32(receipt.Sender.Device), *node)
  192. if err != nil {
  193. return fmt.Errorf("failed to read prekey bundle in retry receipt: %w", err)
  194. }
  195. } else if reason, recreate := cli.shouldRecreateSession(ctx, retryCount, receipt.Sender); recreate {
  196. cli.Log.Debugf("Fetching prekeys for %s for handling retry receipt with no prekey bundle because %s", receipt.Sender, reason)
  197. var keys map[types.JID]preKeyResp
  198. keys, err = cli.fetchPreKeys(ctx, []types.JID{receipt.Sender})
  199. if err != nil {
  200. return err
  201. }
  202. bundle, err = keys[receipt.Sender].bundle, keys[receipt.Sender].err
  203. if err != nil {
  204. return fmt.Errorf("failed to fetch prekeys: %w", err)
  205. } else if bundle == nil {
  206. return fmt.Errorf("didn't get prekey bundle for %s (response size: %d)", receipt.Sender, len(keys))
  207. }
  208. }
  209. encAttrs := waBinary.Attrs{}
  210. var msgAttrs messageAttrs
  211. if msg.wa != nil {
  212. msgAttrs.MediaType = getMediaTypeFromMessage(msg.wa)
  213. msgAttrs.Type = getTypeFromMessage(msg.wa)
  214. } else if fbConsumerMsg != nil {
  215. msgAttrs = getAttrsFromFBMessage(fbConsumerMsg)
  216. } else {
  217. msgAttrs.Type = "text"
  218. }
  219. if msgAttrs.MediaType != "" {
  220. encAttrs["mediatype"] = msgAttrs.MediaType
  221. }
  222. var encrypted *waBinary.Node
  223. var includeDeviceIdentity bool
  224. if msg.wa != nil {
  225. encryptionIdentity := receipt.Sender
  226. if receipt.Sender.Server == types.DefaultUserServer {
  227. lidForPN, err := cli.Store.LIDs.GetLIDForPN(ctx, receipt.Sender)
  228. if err != nil {
  229. cli.Log.Warnf("Failed to get LID for %s: %v", receipt.Sender, err)
  230. } else if !lidForPN.IsEmpty() {
  231. cli.migrateSessionStore(ctx, receipt.Sender, lidForPN)
  232. encryptionIdentity = lidForPN
  233. }
  234. }
  235. encrypted, includeDeviceIdentity, err = cli.encryptMessageForDevice(ctx, plaintext, encryptionIdentity, bundle, encAttrs, nil)
  236. } else {
  237. encrypted, err = cli.encryptMessageForDeviceV3(ctx, &waMsgTransport.MessageTransport_Payload{
  238. ApplicationPayload: &waCommon.SubProtocol{
  239. Payload: plaintext,
  240. Version: proto.Int32(FBMessageApplicationVersion),
  241. },
  242. FutureProof: waCommon.FutureProofBehavior_PLACEHOLDER.Enum(),
  243. }, fbSKDM, fbDSM, receipt.Sender, bundle, encAttrs)
  244. }
  245. if err != nil {
  246. return fmt.Errorf("failed to encrypt message for retry: %w", err)
  247. }
  248. encrypted.Attrs["count"] = retryCount
  249. attrs := waBinary.Attrs{
  250. "to": node.Attrs["from"],
  251. "type": msgAttrs.Type,
  252. "id": messageID,
  253. "t": timestamp.Unix(),
  254. }
  255. if !receipt.IsGroup {
  256. attrs["device_fanout"] = false
  257. }
  258. if participant, ok := node.Attrs["participant"]; ok {
  259. attrs["participant"] = participant
  260. }
  261. if recipient, ok := node.Attrs["recipient"]; ok {
  262. attrs["recipient"] = recipient
  263. }
  264. if edit, ok := node.Attrs["edit"]; ok {
  265. attrs["edit"] = edit
  266. }
  267. var content []waBinary.Node
  268. if msg.wa != nil {
  269. content = cli.getMessageContent(
  270. *encrypted, msg.wa, attrs, includeDeviceIdentity, nodeExtraParams{},
  271. )
  272. } else {
  273. content = []waBinary.Node{
  274. *encrypted,
  275. {Tag: "franking", Content: []waBinary.Node{{Tag: "franking_tag", Content: frankingTag}}},
  276. }
  277. }
  278. err = cli.sendNode(ctx, waBinary.Node{
  279. Tag: "message",
  280. Attrs: attrs,
  281. Content: content,
  282. })
  283. if err != nil {
  284. return fmt.Errorf("failed to send retry message: %w", err)
  285. }
  286. cli.Log.Debugf("Sent retry #%d for %s/%s to %s", retryCount, receipt.Chat, messageID, receipt.Sender)
  287. return nil
  288. }
  289. func (cli *Client) cancelDelayedRequestFromPhone(msgID types.MessageID) {
  290. if !cli.AutomaticMessageRerequestFromPhone || cli.MessengerConfig != nil {
  291. return
  292. }
  293. cli.pendingPhoneRerequestsLock.RLock()
  294. cancelPendingRequest, ok := cli.pendingPhoneRerequests[msgID]
  295. if ok {
  296. cancelPendingRequest()
  297. }
  298. cli.pendingPhoneRerequestsLock.RUnlock()
  299. }
  300. // RequestFromPhoneDelay specifies how long to wait for the sender to resend the message before requesting from your phone.
  301. // This is only used if Client.AutomaticMessageRerequestFromPhone is true.
  302. var RequestFromPhoneDelay = 5 * time.Second
  303. func (cli *Client) delayedRequestMessageFromPhone(info *types.MessageInfo) {
  304. if !cli.AutomaticMessageRerequestFromPhone || cli.MessengerConfig != nil {
  305. return
  306. }
  307. cli.pendingPhoneRerequestsLock.Lock()
  308. _, alreadyRequesting := cli.pendingPhoneRerequests[info.ID]
  309. if alreadyRequesting {
  310. cli.pendingPhoneRerequestsLock.Unlock()
  311. return
  312. }
  313. ctx, cancel := context.WithCancel(cli.BackgroundEventCtx)
  314. defer cancel()
  315. cli.pendingPhoneRerequests[info.ID] = cancel
  316. cli.pendingPhoneRerequestsLock.Unlock()
  317. defer func() {
  318. cli.pendingPhoneRerequestsLock.Lock()
  319. delete(cli.pendingPhoneRerequests, info.ID)
  320. cli.pendingPhoneRerequestsLock.Unlock()
  321. }()
  322. select {
  323. case <-time.After(RequestFromPhoneDelay):
  324. case <-ctx.Done():
  325. cli.Log.Debugf("Cancelled delayed request for message %s from phone", info.ID)
  326. return
  327. }
  328. cli.immediateRequestMessageFromPhone(ctx, info)
  329. }
  330. func (cli *Client) immediateRequestMessageFromPhone(ctx context.Context, info *types.MessageInfo) {
  331. _, err := cli.SendMessage(
  332. ctx,
  333. cli.getOwnID().ToNonAD(),
  334. cli.BuildUnavailableMessageRequest(info.Chat, info.Sender, info.ID),
  335. SendRequestExtra{Peer: true},
  336. )
  337. if err != nil {
  338. cli.Log.Warnf("Failed to send request for unavailable message %s to phone: %v", info.ID, err)
  339. } else {
  340. cli.Log.Debugf("Requested message %s from phone", info.ID)
  341. }
  342. return
  343. }
  344. func (cli *Client) clearDelayedMessageRequests() {
  345. cli.pendingPhoneRerequestsLock.Lock()
  346. defer cli.pendingPhoneRerequestsLock.Unlock()
  347. for _, cancel := range cli.pendingPhoneRerequests {
  348. cancel()
  349. }
  350. }
  351. // sendRetryReceipt sends a retry receipt for an incoming message.
  352. func (cli *Client) sendRetryReceipt(ctx context.Context, node *waBinary.Node, info *types.MessageInfo, forceIncludeIdentity bool) {
  353. id, _ := node.Attrs["id"].(string)
  354. children := node.GetChildren()
  355. var retryCountInMsg int
  356. if len(children) == 1 && children[0].Tag == "enc" {
  357. retryCountInMsg = children[0].AttrGetter().OptionalInt("count")
  358. }
  359. cli.messageRetriesLock.Lock()
  360. cli.messageRetries[id]++
  361. retryCount := cli.messageRetries[id]
  362. // In case the message is a retry response, and we restarted in between, find the count from the message
  363. if retryCount == 1 && retryCountInMsg > 0 {
  364. retryCount = retryCountInMsg + 1
  365. cli.messageRetries[id] = retryCount
  366. }
  367. cli.messageRetriesLock.Unlock()
  368. if retryCount >= 5 {
  369. cli.Log.Warnf("Not sending any more retry receipts for %s", id)
  370. return
  371. }
  372. if retryCount == 1 {
  373. if cli.SynchronousAck {
  374. cli.immediateRequestMessageFromPhone(ctx, info)
  375. } else {
  376. go cli.delayedRequestMessageFromPhone(info)
  377. }
  378. }
  379. var registrationIDBytes [4]byte
  380. binary.BigEndian.PutUint32(registrationIDBytes[:], cli.Store.RegistrationID)
  381. attrs := buildBaseReceipt(info.ID, node)
  382. attrs["type"] = "retry"
  383. if info.Type == "peer_msg" && info.IsFromMe {
  384. attrs["category"] = "peer"
  385. }
  386. payload := waBinary.Node{
  387. Tag: "receipt",
  388. Attrs: attrs,
  389. Content: []waBinary.Node{
  390. {Tag: "retry", Attrs: waBinary.Attrs{
  391. "count": retryCount,
  392. "id": id,
  393. "t": node.Attrs["t"],
  394. "v": 1,
  395. }},
  396. {Tag: "registration", Content: registrationIDBytes[:]},
  397. },
  398. }
  399. if retryCount > 1 || forceIncludeIdentity {
  400. if key, err := cli.Store.PreKeys.GenOnePreKey(ctx); err != nil {
  401. cli.Log.Errorf("Failed to get prekey for retry receipt: %v", err)
  402. } else if deviceIdentity, err := proto.Marshal(cli.Store.Account); err != nil {
  403. cli.Log.Errorf("Failed to marshal account info: %v", err)
  404. return
  405. } else {
  406. payload.Content = append(payload.GetChildren(), waBinary.Node{
  407. Tag: "keys",
  408. Content: []waBinary.Node{
  409. {Tag: "type", Content: []byte{ecc.DjbType}},
  410. {Tag: "identity", Content: cli.Store.IdentityKey.Pub[:]},
  411. preKeyToNode(key),
  412. preKeyToNode(cli.Store.SignedPreKey),
  413. {Tag: "device-identity", Content: deviceIdentity},
  414. },
  415. })
  416. }
  417. }
  418. err := cli.sendNode(ctx, payload)
  419. if err != nil {
  420. cli.Log.Errorf("Failed to send retry receipt for %s: %v", id, err)
  421. }
  422. }