message.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046
  1. // Copyright (c) 2021 Tulir Asokan
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this
  5. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package whatsmeow
  7. import (
  8. "bytes"
  9. "compress/zlib"
  10. "context"
  11. "crypto/sha256"
  12. "encoding/hex"
  13. "errors"
  14. "fmt"
  15. "io"
  16. "runtime/debug"
  17. "strconv"
  18. "time"
  19. "github.com/rs/zerolog"
  20. "go.mau.fi/libsignal/groups"
  21. "go.mau.fi/libsignal/protocol"
  22. "go.mau.fi/libsignal/session"
  23. "go.mau.fi/libsignal/signalerror"
  24. "go.mau.fi/util/random"
  25. "google.golang.org/protobuf/proto"
  26. "git.bobomao.top/joey/testwh/appstate"
  27. waBinary "git.bobomao.top/joey/testwh/binary"
  28. "git.bobomao.top/joey/testwh/proto/waE2E"
  29. "git.bobomao.top/joey/testwh/proto/waHistorySync"
  30. "git.bobomao.top/joey/testwh/proto/waLidMigrationSyncPayload"
  31. "git.bobomao.top/joey/testwh/proto/waWeb"
  32. "git.bobomao.top/joey/testwh/store"
  33. "git.bobomao.top/joey/testwh/types"
  34. "git.bobomao.top/joey/testwh/types/events"
  35. )
  36. var pbSerializer = store.SignalProtobufSerializer
  37. func (cli *Client) handleEncryptedMessage(ctx context.Context, node *waBinary.Node) {
  38. info, err := cli.parseMessageInfo(node)
  39. if err != nil {
  40. cli.Log.Warnf("Failed to parse message: %v", err)
  41. } else {
  42. if !info.SenderAlt.IsEmpty() {
  43. cli.StoreLIDPNMapping(ctx, info.SenderAlt, info.Sender)
  44. } else if !info.RecipientAlt.IsEmpty() {
  45. cli.StoreLIDPNMapping(ctx, info.RecipientAlt, info.Chat)
  46. }
  47. if info.VerifiedName != nil && len(info.VerifiedName.Details.GetVerifiedName()) > 0 {
  48. go cli.updateBusinessName(ctx, info.Sender, info.SenderAlt, info, info.VerifiedName.Details.GetVerifiedName())
  49. }
  50. if len(info.PushName) > 0 && info.PushName != "-" && (cli.MessengerConfig == nil || info.PushName != "username") {
  51. go cli.updatePushName(ctx, info.Sender, info.SenderAlt, info, info.PushName)
  52. }
  53. if info.Sender.Server == types.NewsletterServer {
  54. var cancelled bool
  55. defer cli.maybeDeferredAck(ctx, node)(&cancelled)
  56. cancelled = cli.handlePlaintextMessage(ctx, info, node)
  57. } else {
  58. cli.decryptMessages(ctx, info, node)
  59. }
  60. }
  61. }
  62. func (cli *Client) parseMessageSource(node *waBinary.Node, requireParticipant bool) (source types.MessageSource, err error) {
  63. clientID := cli.getOwnID()
  64. clientLID := cli.getOwnLID()
  65. if clientID.IsEmpty() {
  66. err = ErrNotLoggedIn
  67. return
  68. }
  69. ag := node.AttrGetter()
  70. from := ag.JID("from")
  71. source.AddressingMode = types.AddressingMode(ag.OptionalString("addressing_mode"))
  72. if from.Server == types.GroupServer || from.Server == types.BroadcastServer {
  73. source.IsGroup = true
  74. source.Chat = from
  75. if requireParticipant {
  76. source.Sender = ag.JID("participant")
  77. } else {
  78. source.Sender = ag.OptionalJIDOrEmpty("participant")
  79. }
  80. if source.AddressingMode == types.AddressingModeLID {
  81. source.SenderAlt = ag.OptionalJIDOrEmpty("participant_pn")
  82. } else {
  83. source.SenderAlt = ag.OptionalJIDOrEmpty("participant_lid")
  84. }
  85. if source.Sender.User == clientID.User || source.Sender.User == clientLID.User {
  86. source.IsFromMe = true
  87. }
  88. if from.Server == types.BroadcastServer {
  89. source.BroadcastListOwner = ag.OptionalJIDOrEmpty("recipient")
  90. participants, ok := node.GetOptionalChildByTag("participants")
  91. if ok && source.IsFromMe {
  92. children := participants.GetChildren()
  93. source.BroadcastRecipients = make([]types.BroadcastRecipient, 0, len(children))
  94. for _, child := range children {
  95. if child.Tag != "to" {
  96. continue
  97. }
  98. cag := child.AttrGetter()
  99. mainJID := cag.JID("jid")
  100. if mainJID.Server == types.HiddenUserServer {
  101. source.BroadcastRecipients = append(source.BroadcastRecipients, types.BroadcastRecipient{
  102. LID: mainJID,
  103. PN: cag.OptionalJIDOrEmpty("peer_recipient_pn"),
  104. })
  105. } else {
  106. source.BroadcastRecipients = append(source.BroadcastRecipients, types.BroadcastRecipient{
  107. LID: cag.OptionalJIDOrEmpty("peer_recipient_lid"),
  108. PN: mainJID,
  109. })
  110. }
  111. }
  112. }
  113. }
  114. } else if from.Server == types.NewsletterServer {
  115. source.Chat = from
  116. source.Sender = from
  117. // TODO IsFromMe?
  118. } else if from.User == clientID.User || from.User == clientLID.User {
  119. if from.Server == types.HostedServer {
  120. from.Server = types.DefaultUserServer
  121. } else if from.Server == types.HostedLIDServer {
  122. from.Server = types.HiddenUserServer
  123. }
  124. source.IsFromMe = true
  125. source.Sender = from
  126. recipient := ag.OptionalJID("recipient")
  127. if recipient != nil {
  128. source.Chat = *recipient
  129. } else {
  130. source.Chat = from.ToNonAD()
  131. }
  132. if source.Chat.Server == types.HiddenUserServer || source.Chat.Server == types.HostedLIDServer {
  133. source.RecipientAlt = ag.OptionalJIDOrEmpty("peer_recipient_pn")
  134. } else {
  135. source.RecipientAlt = ag.OptionalJIDOrEmpty("peer_recipient_lid")
  136. }
  137. } else if from.IsBot() {
  138. source.Sender = from
  139. meta := node.GetChildByTag("meta")
  140. ag = meta.AttrGetter()
  141. targetChatJID := ag.OptionalJID("target_chat_jid")
  142. if targetChatJID != nil {
  143. source.Chat = targetChatJID.ToNonAD()
  144. } else {
  145. source.Chat = from
  146. }
  147. } else {
  148. if from.Server == types.HostedServer {
  149. from.Server = types.DefaultUserServer
  150. } else if from.Server == types.HostedLIDServer {
  151. from.Server = types.HiddenUserServer
  152. }
  153. source.Chat = from.ToNonAD()
  154. source.Sender = from
  155. if source.Sender.Server == types.HiddenUserServer || source.Chat.Server == types.HostedLIDServer {
  156. source.SenderAlt = ag.OptionalJIDOrEmpty("sender_pn")
  157. } else {
  158. source.SenderAlt = ag.OptionalJIDOrEmpty("sender_lid")
  159. }
  160. }
  161. if !source.SenderAlt.IsEmpty() && source.SenderAlt.Device == 0 {
  162. source.SenderAlt.Device = source.Sender.Device
  163. }
  164. err = ag.Error()
  165. return
  166. }
  167. func (cli *Client) parseMsgBotInfo(node waBinary.Node) (botInfo types.MsgBotInfo, err error) {
  168. botNode := node.GetChildByTag("bot")
  169. ag := botNode.AttrGetter()
  170. botInfo.EditType = types.BotEditType(ag.String("edit"))
  171. if botInfo.EditType == types.EditTypeInner || botInfo.EditType == types.EditTypeLast {
  172. botInfo.EditTargetID = types.MessageID(ag.String("edit_target_id"))
  173. botInfo.EditSenderTimestampMS = ag.UnixMilli("sender_timestamp_ms")
  174. }
  175. err = ag.Error()
  176. return
  177. }
  178. func (cli *Client) parseMsgMetaInfo(node waBinary.Node) (metaInfo types.MsgMetaInfo, err error) {
  179. metaNode := node.GetChildByTag("meta")
  180. ag := metaNode.AttrGetter()
  181. metaInfo.TargetID = types.MessageID(ag.OptionalString("target_id"))
  182. metaInfo.TargetSender = ag.OptionalJIDOrEmpty("target_sender_jid")
  183. metaInfo.TargetChat = ag.OptionalJIDOrEmpty("target_chat_jid")
  184. deprecatedLIDSession, ok := ag.GetBool("deprecated_lid_session", false)
  185. if ok {
  186. metaInfo.DeprecatedLIDSession = &deprecatedLIDSession
  187. }
  188. metaInfo.ThreadMessageID = types.MessageID(ag.OptionalString("thread_msg_id"))
  189. metaInfo.ThreadMessageSenderJID = ag.OptionalJIDOrEmpty("thread_msg_sender_jid")
  190. err = ag.Error()
  191. return
  192. }
  193. func (cli *Client) parseMessageInfo(node *waBinary.Node) (*types.MessageInfo, error) {
  194. var info types.MessageInfo
  195. var err error
  196. info.MessageSource, err = cli.parseMessageSource(node, true)
  197. if err != nil {
  198. return nil, err
  199. }
  200. ag := node.AttrGetter()
  201. info.ID = types.MessageID(ag.String("id"))
  202. info.ServerID = types.MessageServerID(ag.OptionalInt("server_id"))
  203. info.Timestamp = ag.UnixTime("t")
  204. info.PushName = ag.OptionalString("notify")
  205. info.Category = ag.OptionalString("category")
  206. info.Type = ag.OptionalString("type")
  207. info.Edit = types.EditAttribute(ag.OptionalString("edit"))
  208. if !ag.OK() {
  209. return nil, ag.Error()
  210. }
  211. for _, child := range node.GetChildren() {
  212. switch child.Tag {
  213. case "multicast":
  214. info.Multicast = true
  215. case "verified_name":
  216. info.VerifiedName, err = parseVerifiedNameContent(child)
  217. if err != nil {
  218. cli.Log.Warnf("Failed to parse verified_name node in %s: %v", info.ID, err)
  219. }
  220. case "bot":
  221. info.MsgBotInfo, err = cli.parseMsgBotInfo(child)
  222. if err != nil {
  223. cli.Log.Warnf("Failed to parse <bot> node in %s: %v", info.ID, err)
  224. }
  225. case "meta":
  226. info.MsgMetaInfo, err = cli.parseMsgMetaInfo(child)
  227. if err != nil {
  228. cli.Log.Warnf("Failed to parse <meta> node in %s: %v", info.ID, err)
  229. }
  230. case "franking":
  231. // TODO
  232. case "trace":
  233. // TODO
  234. default:
  235. if mediaType, ok := child.AttrGetter().GetString("mediatype", false); ok {
  236. info.MediaType = mediaType
  237. }
  238. }
  239. }
  240. return &info, nil
  241. }
  242. func (cli *Client) handlePlaintextMessage(ctx context.Context, info *types.MessageInfo, node *waBinary.Node) (handlerFailed bool) {
  243. // TODO edits have an additional <meta msg_edit_t="1696321271735" original_msg_t="1696321248"/> node
  244. plaintext, ok := node.GetOptionalChildByTag("plaintext")
  245. if !ok {
  246. // 3:
  247. return
  248. }
  249. plaintextBody, ok := plaintext.Content.([]byte)
  250. if !ok {
  251. cli.Log.Warnf("Plaintext message from %s doesn't have byte content", info.SourceString())
  252. return
  253. }
  254. var msg waE2E.Message
  255. err := proto.Unmarshal(plaintextBody, &msg)
  256. if err != nil {
  257. cli.Log.Warnf("Error unmarshaling plaintext message from %s: %v", info.SourceString(), err)
  258. return
  259. }
  260. cli.storeMessageSecret(ctx, info, &msg)
  261. evt := &events.Message{
  262. Info: *info,
  263. RawMessage: &msg,
  264. }
  265. meta, ok := node.GetOptionalChildByTag("meta")
  266. if ok {
  267. evt.NewsletterMeta = &events.NewsletterMessageMeta{
  268. EditTS: meta.AttrGetter().UnixMilli("msg_edit_t"),
  269. OriginalTS: meta.AttrGetter().UnixTime("original_msg_t"),
  270. }
  271. }
  272. return cli.dispatchEvent(evt.UnwrapRaw())
  273. }
  274. func (cli *Client) migrateSessionStore(ctx context.Context, pn, lid types.JID) {
  275. err := cli.Store.Sessions.MigratePNToLID(ctx, pn, lid)
  276. if err != nil {
  277. cli.Log.Errorf("Failed to migrate signal store from %s to %s: %v", pn, lid, err)
  278. }
  279. }
  280. func (cli *Client) decryptMessages(ctx context.Context, info *types.MessageInfo, node *waBinary.Node) {
  281. unavailableNode, ok := node.GetOptionalChildByTag("unavailable")
  282. if ok && len(node.GetChildrenByTag("enc")) == 0 {
  283. uType := events.UnavailableType(unavailableNode.AttrGetter().String("type"))
  284. cli.Log.Warnf("Unavailable message %s from %s (type: %q)", info.ID, info.SourceString(), uType)
  285. cli.backgroundIfAsyncAck(func() {
  286. cli.immediateRequestMessageFromPhone(ctx, info)
  287. cli.sendAck(ctx, node, 0)
  288. })
  289. cli.dispatchEvent(&events.UndecryptableMessage{Info: *info, IsUnavailable: true, UnavailableType: uType})
  290. return
  291. }
  292. children := node.GetChildren()
  293. cli.Log.Debugf("Decrypting message from %s", info.SourceString())
  294. containsDirectMsg := false
  295. senderEncryptionJID := info.Sender
  296. if info.Sender.Server == types.DefaultUserServer && !info.Sender.IsBot() {
  297. if info.SenderAlt.Server == types.HiddenUserServer {
  298. senderEncryptionJID = info.SenderAlt
  299. cli.migrateSessionStore(ctx, info.Sender, info.SenderAlt)
  300. } else if lid, err := cli.Store.LIDs.GetLIDForPN(ctx, info.Sender); err != nil {
  301. cli.Log.Errorf("Failed to get LID for %s: %v", info.Sender, err)
  302. } else if !lid.IsEmpty() {
  303. cli.migrateSessionStore(ctx, info.Sender, lid)
  304. senderEncryptionJID = lid
  305. info.SenderAlt = lid
  306. } else {
  307. cli.Log.Warnf("No LID found for %s", info.Sender)
  308. }
  309. }
  310. var recognizedStanza, protobufFailed bool
  311. for _, child := range children {
  312. if child.Tag != "enc" {
  313. continue
  314. }
  315. recognizedStanza = true
  316. ag := child.AttrGetter()
  317. encType, ok := ag.GetString("type", false)
  318. if !ok {
  319. continue
  320. }
  321. var decrypted []byte
  322. var ciphertextHash *[32]byte
  323. var err error
  324. if encType == "pkmsg" || encType == "msg" {
  325. decrypted, ciphertextHash, err = cli.decryptDM(ctx, &child, senderEncryptionJID, encType == "pkmsg", info.Timestamp)
  326. containsDirectMsg = true
  327. } else if info.IsGroup && encType == "skmsg" {
  328. decrypted, ciphertextHash, err = cli.decryptGroupMsg(ctx, &child, senderEncryptionJID, info.Chat, info.Timestamp)
  329. } else if encType == "msmsg" && info.Sender.IsBot() {
  330. targetSenderJID := info.MsgMetaInfo.TargetSender
  331. if targetSenderJID.User == "" {
  332. if info.Sender.Server == types.BotServer {
  333. targetSenderJID = cli.getOwnLID()
  334. } else {
  335. targetSenderJID = cli.getOwnID()
  336. }
  337. }
  338. var decryptMessageID string
  339. if info.MsgBotInfo.EditType == types.EditTypeInner || info.MsgBotInfo.EditType == types.EditTypeLast {
  340. decryptMessageID = info.MsgBotInfo.EditTargetID
  341. } else {
  342. decryptMessageID = info.ID
  343. }
  344. var msMsg waE2E.MessageSecretMessage
  345. var messageSecret []byte
  346. if messageSecret, _, err = cli.Store.MsgSecrets.GetMessageSecret(ctx, info.Chat, targetSenderJID, info.MsgMetaInfo.TargetID); err != nil {
  347. err = fmt.Errorf("failed to get message secret for %s: %v", info.MsgMetaInfo.TargetID, err)
  348. } else if messageSecret == nil {
  349. err = fmt.Errorf("message secret for %s not found", info.MsgMetaInfo.TargetID)
  350. } else if err = proto.Unmarshal(child.Content.([]byte), &msMsg); err != nil {
  351. err = fmt.Errorf("failed to unmarshal MessageSecretMessage protobuf: %v", err)
  352. } else {
  353. decrypted, err = cli.decryptBotMessage(ctx, messageSecret, &msMsg, decryptMessageID, targetSenderJID, info)
  354. }
  355. } else {
  356. cli.Log.Warnf("Unhandled encrypted message (type %s) from %s", encType, info.SourceString())
  357. continue
  358. }
  359. if errors.Is(err, EventAlreadyProcessed) {
  360. cli.Log.Debugf("Ignoring message %s from %s: %v", info.ID, info.SourceString(), err)
  361. continue
  362. } else if errors.Is(err, signalerror.ErrOldCounter) {
  363. cli.Log.Warnf("Ignoring message %s from %s: %v", info.ID, info.SourceString(), err)
  364. continue
  365. } else if err != nil {
  366. cli.Log.Warnf("Error decrypting message %s from %s: %v", info.ID, info.SourceString(), err)
  367. if ctx.Err() != nil || errors.Is(err, context.Canceled) {
  368. return
  369. }
  370. isUnavailable := encType == "skmsg" && !containsDirectMsg && errors.Is(err, signalerror.ErrNoSenderKeyForUser)
  371. if encType == "msmsg" {
  372. cli.backgroundIfAsyncAck(func() {
  373. cli.sendAck(ctx, node, NackMissingMessageSecret)
  374. })
  375. } else if cli.SynchronousAck {
  376. cli.sendRetryReceipt(ctx, node, info, isUnavailable)
  377. // TODO this probably isn't supposed to ack
  378. cli.sendAck(ctx, node, 0)
  379. } else {
  380. go cli.sendRetryReceipt(context.WithoutCancel(ctx), node, info, isUnavailable)
  381. go cli.sendAck(ctx, node, 0)
  382. }
  383. cli.dispatchEvent(&events.UndecryptableMessage{
  384. Info: *info,
  385. IsUnavailable: isUnavailable,
  386. DecryptFailMode: events.DecryptFailMode(ag.OptionalString("decrypt-fail")),
  387. })
  388. return
  389. }
  390. retryCount := ag.OptionalInt("count")
  391. cli.cancelDelayedRequestFromPhone(info.ID)
  392. var msg waE2E.Message
  393. var handlerFailed bool
  394. switch ag.Int("v") {
  395. case 2:
  396. err = proto.Unmarshal(decrypted, &msg)
  397. if err != nil {
  398. cli.Log.Warnf("Error unmarshaling decrypted message from %s: %v", info.SourceString(), err)
  399. protobufFailed = true
  400. continue
  401. }
  402. protobufFailed = false
  403. handlerFailed = cli.handleDecryptedMessage(ctx, info, &msg, retryCount)
  404. case 3:
  405. handlerFailed, protobufFailed = cli.handleDecryptedArmadillo(ctx, info, decrypted, retryCount)
  406. default:
  407. cli.Log.Warnf("Unknown version %d in decrypted message from %s", ag.Int("v"), info.SourceString())
  408. }
  409. if handlerFailed {
  410. cli.Log.Warnf("Handler for %s failed", info.ID)
  411. return
  412. }
  413. if ciphertextHash != nil && cli.EnableDecryptedEventBuffer {
  414. // Use the context passed to decryptMessages
  415. err = cli.Store.EventBuffer.ClearBufferedEventPlaintext(ctx, *ciphertextHash)
  416. if err != nil {
  417. zerolog.Ctx(ctx).Err(err).
  418. Hex("ciphertext_hash", ciphertextHash[:]).
  419. Str("message_id", info.ID).
  420. Msg("Failed to clear buffered event plaintext")
  421. } else {
  422. zerolog.Ctx(ctx).Debug().
  423. Hex("ciphertext_hash", ciphertextHash[:]).
  424. Str("message_id", info.ID).
  425. Msg("Deleted event plaintext from buffer")
  426. }
  427. if time.Since(cli.lastDecryptedBufferClear) > 12*time.Hour && ctx.Err() == nil {
  428. cli.lastDecryptedBufferClear = time.Now()
  429. go func() {
  430. err := cli.Store.EventBuffer.DeleteOldBufferedHashes(context.WithoutCancel(ctx))
  431. if err != nil {
  432. zerolog.Ctx(ctx).Err(err).Msg("Failed to delete old buffered hashes")
  433. }
  434. }()
  435. }
  436. }
  437. }
  438. cli.backgroundIfAsyncAck(func() {
  439. if !recognizedStanza {
  440. cli.sendAck(ctx, node, NackUnrecognizedStanza)
  441. } else if protobufFailed {
  442. cli.sendAck(ctx, node, NackInvalidProtobuf)
  443. } else {
  444. cli.sendMessageReceipt(ctx, info, node)
  445. }
  446. })
  447. return
  448. }
  449. func (cli *Client) clearUntrustedIdentity(ctx context.Context, target types.JID) error {
  450. err := cli.Store.Identities.DeleteIdentity(ctx, target.SignalAddress().String())
  451. if err != nil {
  452. return fmt.Errorf("failed to delete identity: %w", err)
  453. }
  454. err = cli.Store.Sessions.DeleteSession(ctx, target.SignalAddress().String())
  455. if err != nil {
  456. return fmt.Errorf("failed to delete session: %w", err)
  457. }
  458. go cli.dispatchEvent(&events.IdentityChange{JID: target, Timestamp: time.Now(), Implicit: true})
  459. return nil
  460. }
  461. var EventAlreadyProcessed = errors.New("event was already processed")
  462. func (cli *Client) bufferedDecrypt(
  463. ctx context.Context,
  464. ciphertext []byte,
  465. serverTimestamp time.Time,
  466. decrypt func(context.Context) ([]byte, error),
  467. ) (plaintext []byte, ciphertextHash [32]byte, err error) {
  468. if !cli.EnableDecryptedEventBuffer {
  469. plaintext, err = decrypt(ctx)
  470. return
  471. }
  472. ciphertextHash = sha256.Sum256(ciphertext)
  473. var buf *store.BufferedEvent
  474. buf, err = cli.Store.EventBuffer.GetBufferedEvent(ctx, ciphertextHash)
  475. if err != nil {
  476. err = fmt.Errorf("failed to get buffered event: %w", err)
  477. return
  478. } else if buf != nil {
  479. if buf.Plaintext == nil {
  480. zerolog.Ctx(ctx).Debug().
  481. Hex("ciphertext_hash", ciphertextHash[:]).
  482. Time("insertion_time", buf.InsertTime).
  483. Msg("Returning event already processed error")
  484. err = fmt.Errorf("%w at %s", EventAlreadyProcessed, buf.InsertTime.String())
  485. return
  486. }
  487. zerolog.Ctx(ctx).Debug().
  488. Hex("ciphertext_hash", ciphertextHash[:]).
  489. Time("insertion_time", buf.InsertTime).
  490. Msg("Returning previously decrypted plaintext")
  491. plaintext = buf.Plaintext
  492. return
  493. }
  494. err = cli.Store.EventBuffer.DoDecryptionTxn(ctx, func(ctx context.Context) (innerErr error) {
  495. plaintext, innerErr = decrypt(ctx)
  496. if innerErr != nil {
  497. return
  498. }
  499. innerErr = cli.Store.EventBuffer.PutBufferedEvent(ctx, ciphertextHash, plaintext, serverTimestamp)
  500. if innerErr != nil {
  501. innerErr = fmt.Errorf("failed to save decrypted event to buffer: %w", innerErr)
  502. }
  503. return
  504. })
  505. if err == nil {
  506. zerolog.Ctx(ctx).Debug().
  507. Hex("ciphertext_hash", ciphertextHash[:]).
  508. Msg("Successfully decrypted and saved event")
  509. }
  510. return
  511. }
  512. func (cli *Client) decryptDM(ctx context.Context, child *waBinary.Node, from types.JID, isPreKey bool, serverTS time.Time) ([]byte, *[32]byte, error) {
  513. content, ok := child.Content.([]byte)
  514. if !ok {
  515. return nil, nil, fmt.Errorf("message content is not a byte slice")
  516. }
  517. builder := session.NewBuilderFromSignal(cli.Store, from.SignalAddress(), pbSerializer)
  518. cipher := session.NewCipher(builder, from.SignalAddress())
  519. var plaintext []byte
  520. var ciphertextHash [32]byte
  521. if isPreKey {
  522. preKeyMsg, err := protocol.NewPreKeySignalMessageFromBytes(content, pbSerializer.PreKeySignalMessage, pbSerializer.SignalMessage)
  523. if err != nil {
  524. return nil, nil, fmt.Errorf("failed to parse prekey message: %w", err)
  525. }
  526. plaintext, ciphertextHash, err = cli.bufferedDecrypt(ctx, content, serverTS, func(decryptCtx context.Context) ([]byte, error) {
  527. pt, innerErr := cipher.DecryptMessage(decryptCtx, preKeyMsg)
  528. if cli.AutoTrustIdentity && errors.Is(innerErr, signalerror.ErrUntrustedIdentity) {
  529. cli.Log.Warnf("Got %v error while trying to decrypt prekey message from %s, clearing stored identity and retrying", innerErr, from)
  530. if innerErr = cli.clearUntrustedIdentity(decryptCtx, from); innerErr != nil {
  531. innerErr = fmt.Errorf("failed to clear untrusted identity: %w", innerErr)
  532. return nil, innerErr
  533. }
  534. pt, innerErr = cipher.DecryptMessage(decryptCtx, preKeyMsg)
  535. }
  536. return pt, innerErr
  537. })
  538. if err != nil {
  539. return nil, nil, fmt.Errorf("failed to decrypt prekey message: %w", err)
  540. }
  541. } else {
  542. msg, err := protocol.NewSignalMessageFromBytes(content, pbSerializer.SignalMessage)
  543. if err != nil {
  544. return nil, nil, fmt.Errorf("failed to parse normal message: %w", err)
  545. }
  546. plaintext, ciphertextHash, err = cli.bufferedDecrypt(ctx, content, serverTS, func(decryptCtx context.Context) ([]byte, error) {
  547. return cipher.Decrypt(decryptCtx, msg)
  548. })
  549. if err != nil {
  550. return nil, nil, fmt.Errorf("failed to decrypt normal message: %w", err)
  551. }
  552. }
  553. var err error
  554. plaintext, err = unpadMessage(plaintext, child.AttrGetter().Int("v"))
  555. if err != nil {
  556. return nil, nil, fmt.Errorf("failed to unpad message: %w", err)
  557. }
  558. return plaintext, &ciphertextHash, nil
  559. }
  560. func (cli *Client) decryptGroupMsg(ctx context.Context, child *waBinary.Node, from types.JID, chat types.JID, serverTS time.Time) ([]byte, *[32]byte, error) {
  561. content, ok := child.Content.([]byte)
  562. if !ok {
  563. return nil, nil, fmt.Errorf("message content is not a byte slice")
  564. }
  565. senderKeyName := protocol.NewSenderKeyName(chat.String(), from.SignalAddress())
  566. builder := groups.NewGroupSessionBuilder(cli.Store, pbSerializer)
  567. cipher := groups.NewGroupCipher(builder, senderKeyName, cli.Store)
  568. msg, err := protocol.NewSenderKeyMessageFromBytes(content, pbSerializer.SenderKeyMessage)
  569. if err != nil {
  570. return nil, nil, fmt.Errorf("failed to parse group message: %w", err)
  571. }
  572. plaintext, ciphertextHash, err := cli.bufferedDecrypt(ctx, content, serverTS, func(decryptCtx context.Context) ([]byte, error) {
  573. return cipher.Decrypt(decryptCtx, msg)
  574. })
  575. if err != nil {
  576. return nil, nil, fmt.Errorf("failed to decrypt group message: %w", err)
  577. }
  578. plaintext, err = unpadMessage(plaintext, child.AttrGetter().Int("v"))
  579. if err != nil {
  580. return nil, nil, err
  581. }
  582. return plaintext, &ciphertextHash, nil
  583. }
  584. const checkPadding = true
  585. func isValidPadding(plaintext []byte) bool {
  586. lastByte := plaintext[len(plaintext)-1]
  587. expectedPadding := bytes.Repeat([]byte{lastByte}, int(lastByte))
  588. return bytes.HasSuffix(plaintext, expectedPadding)
  589. }
  590. func unpadMessage(plaintext []byte, version int) ([]byte, error) {
  591. if version == 3 {
  592. return plaintext, nil
  593. } else if len(plaintext) == 0 {
  594. return nil, fmt.Errorf("plaintext is empty")
  595. } else if checkPadding && !isValidPadding(plaintext) {
  596. return nil, fmt.Errorf("plaintext doesn't have expected padding")
  597. } else {
  598. return plaintext[:len(plaintext)-int(plaintext[len(plaintext)-1])], nil
  599. }
  600. }
  601. func padMessage(plaintext []byte) []byte {
  602. pad := random.Bytes(1)
  603. pad[0] &= 0xf
  604. if pad[0] == 0 {
  605. pad[0] = 0xf
  606. }
  607. plaintext = append(plaintext, bytes.Repeat(pad, int(pad[0]))...)
  608. return plaintext
  609. }
  610. func (cli *Client) handleSenderKeyDistributionMessage(ctx context.Context, chat, from types.JID, axolotlSKDM []byte) {
  611. builder := groups.NewGroupSessionBuilder(cli.Store, pbSerializer)
  612. senderKeyName := protocol.NewSenderKeyName(chat.String(), from.SignalAddress())
  613. sdkMsg, err := protocol.NewSenderKeyDistributionMessageFromBytes(axolotlSKDM, pbSerializer.SenderKeyDistributionMessage)
  614. if err != nil {
  615. cli.Log.Errorf("Failed to parse sender key distribution message from %s for %s: %v", from, chat, err)
  616. return
  617. }
  618. err = builder.Process(ctx, senderKeyName, sdkMsg)
  619. if err != nil {
  620. cli.Log.Errorf("Failed to process sender key distribution message from %s for %s: %v", from, chat, err)
  621. return
  622. }
  623. cli.Log.Debugf("Processed sender key distribution message from %s in %s", senderKeyName.Sender().String(), senderKeyName.GroupID())
  624. }
  625. func (cli *Client) handleHistorySyncNotificationLoop() {
  626. defer func() {
  627. cli.historySyncHandlerStarted.Store(false)
  628. err := recover()
  629. if err != nil {
  630. cli.Log.Errorf("History sync handler panicked: %v\n%s", err, debug.Stack())
  631. }
  632. // Check in case something new appeared in the channel between the loop stopping
  633. // and the atomic variable being updated. If yes, restart the loop.
  634. if len(cli.historySyncNotifications) > 0 && cli.historySyncHandlerStarted.CompareAndSwap(false, true) {
  635. cli.Log.Warnf("New history sync notifications appeared after loop stopped, restarting loop...")
  636. go cli.handleHistorySyncNotificationLoop()
  637. }
  638. }()
  639. ctx := cli.BackgroundEventCtx
  640. for notif := range cli.historySyncNotifications {
  641. blob, err := cli.DownloadHistorySync(ctx, notif, false)
  642. if err != nil {
  643. cli.Log.Errorf("Failed to download history sync: %v", err)
  644. } else {
  645. cli.dispatchEvent(&events.HistorySync{Data: blob})
  646. }
  647. }
  648. }
  649. // DownloadHistorySync will download and parse the history sync blob from the given history sync notification.
  650. //
  651. // You only need to call this manually if you set [Client.ManualHistorySyncDownload] to true.
  652. // By default, whatsmeow will call this automatically and dispatch an [events.HistorySync] with the parsed data.
  653. func (cli *Client) DownloadHistorySync(ctx context.Context, notif *waE2E.HistorySyncNotification, synchronousStorage bool) (*waHistorySync.HistorySync, error) {
  654. var data []byte
  655. var err error
  656. if notif.InitialHistBootstrapInlinePayload != nil {
  657. data = notif.InitialHistBootstrapInlinePayload
  658. } else if data, err = cli.Download(ctx, notif); err != nil {
  659. return nil, fmt.Errorf("failed to download: %w", err)
  660. }
  661. var historySync waHistorySync.HistorySync
  662. if reader, err := zlib.NewReader(bytes.NewReader(data)); err != nil {
  663. return nil, fmt.Errorf("failed to prepare to decompress: %w", err)
  664. } else if rawData, err := io.ReadAll(reader); err != nil {
  665. return nil, fmt.Errorf("failed to decompress: %w", err)
  666. } else if err = proto.Unmarshal(rawData, &historySync); err != nil {
  667. return nil, fmt.Errorf("failed to unmarshal: %w", err)
  668. }
  669. cli.Log.Debugf("Received history sync (type %s, chunk %d, progress %d)", historySync.GetSyncType(), historySync.GetChunkOrder(), historySync.GetProgress())
  670. doStorage := func(ctx context.Context) {
  671. if historySync.GetSyncType() == waHistorySync.HistorySync_PUSH_NAME {
  672. cli.handleHistoricalPushNames(ctx, historySync.GetPushnames())
  673. } else if len(historySync.GetConversations()) > 0 {
  674. cli.storeHistoricalMessageSecrets(ctx, historySync.GetConversations())
  675. }
  676. if len(historySync.GetPhoneNumberToLidMappings()) > 0 {
  677. cli.storeHistoricalPNLIDMappings(ctx, historySync.GetPhoneNumberToLidMappings())
  678. }
  679. if historySync.GlobalSettings != nil {
  680. cli.storeGlobalSettings(ctx, historySync.GlobalSettings)
  681. }
  682. }
  683. if synchronousStorage {
  684. doStorage(ctx)
  685. } else {
  686. go doStorage(context.WithoutCancel(ctx))
  687. }
  688. return &historySync, nil
  689. }
  690. func (cli *Client) handleAppStateSyncKeyShare(ctx context.Context, keys *waE2E.AppStateSyncKeyShare) {
  691. onlyResyncIfNotSynced := true
  692. cli.Log.Debugf("Got %d new app state keys", len(keys.GetKeys()))
  693. cli.appStateKeyRequestsLock.RLock()
  694. for _, key := range keys.GetKeys() {
  695. marshaledFingerprint, err := proto.Marshal(key.GetKeyData().GetFingerprint())
  696. if err != nil {
  697. cli.Log.Errorf("Failed to marshal fingerprint of app state sync key %X", key.GetKeyID().GetKeyID())
  698. continue
  699. }
  700. _, isReRequest := cli.appStateKeyRequests[hex.EncodeToString(key.GetKeyID().GetKeyID())]
  701. if isReRequest {
  702. onlyResyncIfNotSynced = false
  703. }
  704. err = cli.Store.AppStateKeys.PutAppStateSyncKey(ctx, key.GetKeyID().GetKeyID(), store.AppStateSyncKey{
  705. Data: key.GetKeyData().GetKeyData(),
  706. Fingerprint: marshaledFingerprint,
  707. Timestamp: key.GetKeyData().GetTimestamp(),
  708. })
  709. if err != nil {
  710. cli.Log.Errorf("Failed to store app state sync key %X: %v", key.GetKeyID().GetKeyID(), err)
  711. continue
  712. }
  713. cli.Log.Debugf("Received app state sync key %X (ts: %d)", key.GetKeyID().GetKeyID(), key.GetKeyData().GetTimestamp())
  714. }
  715. cli.appStateKeyRequestsLock.RUnlock()
  716. for _, name := range appstate.AllPatchNames {
  717. err := cli.FetchAppState(ctx, name, false, onlyResyncIfNotSynced)
  718. if err != nil {
  719. cli.Log.Errorf("Failed to do initial fetch of app state %s: %v", name, err)
  720. }
  721. }
  722. }
  723. func (cli *Client) handlePlaceholderResendResponse(msg *waE2E.PeerDataOperationRequestResponseMessage) (ok bool) {
  724. reqID := msg.GetStanzaID()
  725. parts := msg.GetPeerDataOperationResult()
  726. cli.Log.Debugf("Handling response to placeholder resend request %s with %d items", reqID, len(parts))
  727. ok = true
  728. for i, part := range parts {
  729. var webMsg waWeb.WebMessageInfo
  730. if resp := part.GetPlaceholderMessageResendResponse(); resp == nil {
  731. cli.Log.Warnf("Missing response in item #%d of response to %s", i+1, reqID)
  732. } else if err := proto.Unmarshal(resp.GetWebMessageInfoBytes(), &webMsg); err != nil {
  733. cli.Log.Warnf("Failed to unmarshal protobuf web message in item #%d of response to %s: %v", i+1, reqID, err)
  734. } else if msgEvt, err := cli.ParseWebMessage(types.EmptyJID, &webMsg); err != nil {
  735. cli.Log.Warnf("Failed to parse web message info in item #%d of response to %s: %v", i+1, reqID, err)
  736. } else {
  737. msgEvt.UnavailableRequestID = reqID
  738. ok = cli.dispatchEvent(msgEvt) && ok
  739. }
  740. }
  741. return
  742. }
  743. func (cli *Client) handleProtocolMessage(ctx context.Context, info *types.MessageInfo, msg *waE2E.Message) (ok bool) {
  744. ok = true
  745. protoMsg := msg.GetProtocolMessage()
  746. if !info.IsFromMe {
  747. return
  748. }
  749. if protoMsg.GetHistorySyncNotification() != nil {
  750. if !cli.ManualHistorySyncDownload {
  751. cli.historySyncNotifications <- protoMsg.HistorySyncNotification
  752. if cli.historySyncHandlerStarted.CompareAndSwap(false, true) {
  753. go cli.handleHistorySyncNotificationLoop()
  754. }
  755. }
  756. go cli.sendProtocolMessageReceipt(ctx, info.ID, types.ReceiptTypeHistorySync)
  757. }
  758. if protoMsg.GetLidMigrationMappingSyncMessage() != nil {
  759. cli.storeLIDSyncMessage(ctx, protoMsg.GetLidMigrationMappingSyncMessage().GetEncodedMappingPayload())
  760. }
  761. if protoMsg.GetPeerDataOperationRequestResponseMessage().GetPeerDataOperationRequestType() == waE2E.PeerDataOperationRequestType_PLACEHOLDER_MESSAGE_RESEND {
  762. ok = cli.handlePlaceholderResendResponse(protoMsg.GetPeerDataOperationRequestResponseMessage()) && ok
  763. }
  764. if protoMsg.GetAppStateSyncKeyShare() != nil {
  765. go cli.handleAppStateSyncKeyShare(context.WithoutCancel(ctx), protoMsg.AppStateSyncKeyShare)
  766. }
  767. if info.Category == "peer" {
  768. go cli.sendProtocolMessageReceipt(ctx, info.ID, types.ReceiptTypePeerMsg)
  769. }
  770. return
  771. }
  772. func (cli *Client) processProtocolParts(ctx context.Context, info *types.MessageInfo, msg *waE2E.Message) (ok bool) {
  773. ok = true
  774. cli.storeMessageSecret(ctx, info, msg)
  775. // Hopefully sender key distribution messages and protocol messages can't be inside ephemeral messages
  776. if msg.GetDeviceSentMessage().GetMessage() != nil {
  777. msg = msg.GetDeviceSentMessage().GetMessage()
  778. }
  779. if msg.GetSenderKeyDistributionMessage() != nil {
  780. if !info.IsGroup {
  781. cli.Log.Warnf("Got sender key distribution message in non-group chat from %s", info.Sender)
  782. } else {
  783. encryptionIdentity := info.Sender
  784. if encryptionIdentity.Server == types.DefaultUserServer && info.SenderAlt.Server == types.HiddenUserServer {
  785. encryptionIdentity = info.SenderAlt
  786. }
  787. cli.handleSenderKeyDistributionMessage(ctx, info.Chat, encryptionIdentity, msg.SenderKeyDistributionMessage.AxolotlSenderKeyDistributionMessage)
  788. }
  789. }
  790. // N.B. Edits are protocol messages, but they're also wrapped inside EditedMessage,
  791. // which is only unwrapped after processProtocolParts, so this won't trigger for edits.
  792. if msg.GetProtocolMessage() != nil {
  793. ok = cli.handleProtocolMessage(ctx, info, msg) && ok
  794. }
  795. return
  796. }
  797. func (cli *Client) storeMessageSecret(ctx context.Context, info *types.MessageInfo, msg *waE2E.Message) {
  798. if msgSecret := msg.GetMessageContextInfo().GetMessageSecret(); len(msgSecret) > 0 {
  799. err := cli.Store.MsgSecrets.PutMessageSecret(ctx, info.Chat, info.Sender, info.ID, msgSecret)
  800. if err != nil {
  801. cli.Log.Errorf("Failed to store message secret key for %s: %v", info.ID, err)
  802. } else {
  803. cli.Log.Debugf("Stored message secret key for %s", info.ID)
  804. }
  805. }
  806. }
  807. func (cli *Client) storeHistoricalMessageSecrets(ctx context.Context, conversations []*waHistorySync.Conversation) {
  808. var secrets []store.MessageSecretInsert
  809. var privacyTokens []store.PrivacyToken
  810. ownID := cli.getOwnID().ToNonAD()
  811. if ownID.IsEmpty() {
  812. return
  813. }
  814. for _, conv := range conversations {
  815. chatJID, _ := types.ParseJID(conv.GetID())
  816. if chatJID.IsEmpty() {
  817. continue
  818. }
  819. if chatJID.Server == types.DefaultUserServer && conv.GetTcToken() != nil {
  820. ts := conv.GetTcTokenSenderTimestamp()
  821. if ts == 0 {
  822. ts = conv.GetTcTokenTimestamp()
  823. }
  824. privacyTokens = append(privacyTokens, store.PrivacyToken{
  825. User: chatJID,
  826. Token: conv.GetTcToken(),
  827. Timestamp: time.Unix(int64(ts), 0),
  828. })
  829. }
  830. for _, msg := range conv.GetMessages() {
  831. if secret := msg.GetMessage().GetMessageSecret(); secret != nil {
  832. var senderJID types.JID
  833. msgKey := msg.GetMessage().GetKey()
  834. if msgKey.GetFromMe() {
  835. senderJID = ownID
  836. } else if chatJID.Server == types.DefaultUserServer {
  837. senderJID = chatJID
  838. } else if msgKey.GetParticipant() != "" {
  839. senderJID, _ = types.ParseJID(msgKey.GetParticipant())
  840. } else if msg.GetMessage().GetParticipant() != "" {
  841. senderJID, _ = types.ParseJID(msg.GetMessage().GetParticipant())
  842. }
  843. if senderJID.IsEmpty() || msgKey.GetID() == "" {
  844. continue
  845. }
  846. secrets = append(secrets, store.MessageSecretInsert{
  847. Chat: chatJID,
  848. Sender: senderJID,
  849. ID: msgKey.GetID(),
  850. Secret: secret,
  851. })
  852. }
  853. }
  854. }
  855. if len(secrets) > 0 {
  856. cli.Log.Debugf("Storing %d message secret keys in history sync", len(secrets))
  857. err := cli.Store.MsgSecrets.PutMessageSecrets(ctx, secrets)
  858. if err != nil {
  859. cli.Log.Errorf("Failed to store message secret keys in history sync: %v", err)
  860. } else {
  861. cli.Log.Infof("Stored %d message secret keys from history sync", len(secrets))
  862. }
  863. }
  864. if len(privacyTokens) > 0 {
  865. cli.Log.Debugf("Storing %d privacy tokens in history sync", len(privacyTokens))
  866. err := cli.Store.PrivacyTokens.PutPrivacyTokens(ctx, privacyTokens...)
  867. if err != nil {
  868. cli.Log.Errorf("Failed to store privacy tokens in history sync: %v", err)
  869. } else {
  870. cli.Log.Infof("Stored %d privacy tokens from history sync", len(privacyTokens))
  871. }
  872. }
  873. }
  874. func (cli *Client) storeLIDSyncMessage(ctx context.Context, msg []byte) {
  875. var decoded waLidMigrationSyncPayload.LIDMigrationMappingSyncPayload
  876. err := proto.Unmarshal(msg, &decoded)
  877. if err != nil {
  878. zerolog.Ctx(ctx).Err(err).Msg("Failed to unmarshal LID migration mapping sync payload")
  879. return
  880. }
  881. if cli.Store.LIDMigrationTimestamp == 0 && decoded.GetChatDbMigrationTimestamp() > 0 {
  882. cli.Store.LIDMigrationTimestamp = int64(decoded.GetChatDbMigrationTimestamp())
  883. err = cli.Store.Save(ctx)
  884. if err != nil {
  885. zerolog.Ctx(ctx).Err(err).
  886. Int64("lid_migration_timestamp", cli.Store.LIDMigrationTimestamp).
  887. Msg("Failed to save chat DB LID migration timestamp")
  888. } else {
  889. zerolog.Ctx(ctx).Debug().
  890. Int64("lid_migration_timestamp", cli.Store.LIDMigrationTimestamp).
  891. Msg("Saved chat DB LID migration timestamp")
  892. }
  893. }
  894. lidPairs := make([]store.LIDMapping, len(decoded.PnToLidMappings))
  895. for i, mapping := range decoded.PnToLidMappings {
  896. lidPairs[i] = store.LIDMapping{
  897. LID: types.JID{User: strconv.FormatUint(mapping.GetAssignedLid(), 10), Server: types.HiddenUserServer},
  898. PN: types.JID{User: strconv.FormatUint(mapping.GetPn(), 10), Server: types.DefaultUserServer},
  899. }
  900. }
  901. err = cli.Store.LIDs.PutManyLIDMappings(ctx, lidPairs)
  902. if err != nil {
  903. zerolog.Ctx(ctx).Err(err).
  904. Int("pair_count", len(lidPairs)).
  905. Msg("Failed to store phone number to LID mappings from sync message")
  906. } else {
  907. zerolog.Ctx(ctx).Debug().
  908. Int("pair_count", len(lidPairs)).
  909. Msg("Stored PN-LID mappings from sync message")
  910. }
  911. }
  912. func (cli *Client) storeGlobalSettings(ctx context.Context, settings *waHistorySync.GlobalSettings) {
  913. if cli.Store.LIDMigrationTimestamp == 0 && settings.GetChatDbLidMigrationTimestamp() > 0 {
  914. cli.Store.LIDMigrationTimestamp = settings.GetChatDbLidMigrationTimestamp()
  915. err := cli.Store.Save(ctx)
  916. if err != nil {
  917. zerolog.Ctx(ctx).Err(err).
  918. Int64("lid_migration_timestamp", cli.Store.LIDMigrationTimestamp).
  919. Msg("Failed to save chat DB LID migration timestamp")
  920. } else {
  921. zerolog.Ctx(ctx).Debug().
  922. Int64("lid_migration_timestamp", cli.Store.LIDMigrationTimestamp).
  923. Msg("Saved chat DB LID migration timestamp")
  924. }
  925. }
  926. }
  927. func (cli *Client) storeHistoricalPNLIDMappings(ctx context.Context, mappings []*waHistorySync.PhoneNumberToLIDMapping) {
  928. lidPairs := make([]store.LIDMapping, 0, len(mappings))
  929. for _, mapping := range mappings {
  930. pn, err := types.ParseJID(mapping.GetPnJID())
  931. if err != nil {
  932. zerolog.Ctx(ctx).Err(err).
  933. Str("pn_jid", mapping.GetPnJID()).
  934. Str("lid_jid", mapping.GetLidJID()).
  935. Msg("Failed to parse phone number from history sync")
  936. continue
  937. }
  938. if pn.Server == types.LegacyUserServer {
  939. pn.Server = types.DefaultUserServer
  940. }
  941. lid, err := types.ParseJID(mapping.GetLidJID())
  942. if err != nil {
  943. zerolog.Ctx(ctx).Err(err).
  944. Str("pn_jid", mapping.GetPnJID()).
  945. Str("lid_jid", mapping.GetLidJID()).
  946. Msg("Failed to parse LID from history sync")
  947. continue
  948. }
  949. lidPairs = append(lidPairs, store.LIDMapping{
  950. LID: lid,
  951. PN: pn,
  952. })
  953. }
  954. err := cli.Store.LIDs.PutManyLIDMappings(ctx, lidPairs)
  955. if err != nil {
  956. zerolog.Ctx(ctx).Err(err).
  957. Int("pair_count", len(lidPairs)).
  958. Msg("Failed to store phone number to LID mappings from history sync")
  959. } else {
  960. zerolog.Ctx(ctx).Debug().
  961. Int("pair_count", len(lidPairs)).
  962. Msg("Stored PN-LID mappings from history sync")
  963. }
  964. }
  965. func (cli *Client) handleDecryptedMessage(ctx context.Context, info *types.MessageInfo, msg *waE2E.Message, retryCount int) bool {
  966. ok := cli.processProtocolParts(ctx, info, msg)
  967. if !ok {
  968. return false
  969. }
  970. evt := &events.Message{Info: *info, RawMessage: msg, RetryCount: retryCount}
  971. return cli.dispatchEvent(evt.UnwrapRaw())
  972. }
  973. func (cli *Client) sendProtocolMessageReceipt(ctx context.Context, id types.MessageID, msgType types.ReceiptType) {
  974. if len(id) == 0 {
  975. return
  976. }
  977. err := cli.sendNode(ctx, waBinary.Node{
  978. Tag: "receipt",
  979. Attrs: waBinary.Attrs{
  980. "id": string(id),
  981. "type": string(msgType),
  982. "to": cli.getOwnID().ToNonAD(),
  983. },
  984. Content: nil,
  985. })
  986. if err != nil {
  987. cli.Log.Warnf("Failed to send acknowledgement for protocol message %s: %v", id, err)
  988. }
  989. }