store.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008
  1. // Copyright (c) 2025 Tulir Asokan
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this
  5. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package sqlstore contains an SQL-backed implementation of the interfaces in the store package.
  7. package sqlstore
  8. import (
  9. "context"
  10. "database/sql"
  11. "database/sql/driver"
  12. "errors"
  13. "fmt"
  14. "slices"
  15. "strings"
  16. "sync"
  17. "time"
  18. "go.mau.fi/util/dbutil"
  19. "go.mau.fi/util/exslices"
  20. "go.mau.fi/util/exsync"
  21. "git.bobomao.top/joey/testwh/store"
  22. "git.bobomao.top/joey/testwh/types"
  23. "git.bobomao.top/joey/testwh/util/keys"
  24. )
  25. // ErrInvalidLength is returned by some database getters if the database returned a byte array with an unexpected length.
  26. // This should be impossible, as the database schema contains CHECK()s for all the relevant columns.
  27. var ErrInvalidLength = errors.New("database returned byte array with illegal length")
  28. // PostgresArrayWrapper is a function to wrap array values before passing them to the sql package.
  29. //
  30. // When using github.com/lib/pq, you should set
  31. //
  32. // whatsmeow.PostgresArrayWrapper = pq.Array
  33. var PostgresArrayWrapper func(any) interface {
  34. driver.Valuer
  35. sql.Scanner
  36. }
  37. type SQLStore struct {
  38. *Container
  39. JID string
  40. preKeyLock sync.Mutex
  41. contactCache map[types.JID]*types.ContactInfo
  42. contactCacheLock sync.Mutex
  43. migratedPNSessionsCache *exsync.Set[string]
  44. }
  45. // NewSQLStore creates a new SQLStore with the given database container and user JID.
  46. // It contains implementations of all the different stores in the store package.
  47. //
  48. // In general, you should use Container.NewDevice or Container.GetDevice instead of this.
  49. func NewSQLStore(c *Container, jid types.JID) *SQLStore {
  50. return &SQLStore{
  51. Container: c,
  52. JID: jid.String(),
  53. contactCache: make(map[types.JID]*types.ContactInfo),
  54. migratedPNSessionsCache: exsync.NewSet[string](),
  55. }
  56. }
  57. var _ store.AllSessionSpecificStores = (*SQLStore)(nil)
  58. const (
  59. putIdentityQuery = `
  60. INSERT INTO whatsmeow_identity_keys (our_jid, their_id, identity) VALUES ($1, $2, $3)
  61. ON CONFLICT (our_jid, their_id) DO UPDATE SET identity=excluded.identity
  62. `
  63. deleteAllIdentitiesQuery = `DELETE FROM whatsmeow_identity_keys WHERE our_jid=$1 AND their_id LIKE $2`
  64. deleteIdentityQuery = `DELETE FROM whatsmeow_identity_keys WHERE our_jid=$1 AND their_id=$2`
  65. getIdentityQuery = `SELECT identity FROM whatsmeow_identity_keys WHERE our_jid=$1 AND their_id=$2`
  66. )
  67. func (s *SQLStore) PutIdentity(ctx context.Context, address string, key [32]byte) error {
  68. _, err := s.db.Exec(ctx, putIdentityQuery, s.JID, address, key[:])
  69. return err
  70. }
  71. func (s *SQLStore) DeleteAllIdentities(ctx context.Context, phone string) error {
  72. _, err := s.db.Exec(ctx, deleteAllIdentitiesQuery, s.JID, phone+":%")
  73. return err
  74. }
  75. func (s *SQLStore) DeleteIdentity(ctx context.Context, address string) error {
  76. _, err := s.db.Exec(ctx, deleteAllIdentitiesQuery, s.JID, address)
  77. return err
  78. }
  79. func (s *SQLStore) IsTrustedIdentity(ctx context.Context, address string, key [32]byte) (bool, error) {
  80. var existingIdentity []byte
  81. err := s.db.QueryRow(ctx, getIdentityQuery, s.JID, address).Scan(&existingIdentity)
  82. if errors.Is(err, sql.ErrNoRows) {
  83. // Trust if not known, it'll be saved automatically later
  84. return true, nil
  85. } else if err != nil {
  86. return false, err
  87. } else if len(existingIdentity) != 32 {
  88. return false, ErrInvalidLength
  89. }
  90. return *(*[32]byte)(existingIdentity) == key, nil
  91. }
  92. const (
  93. getSessionQuery = `SELECT session FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id=$2`
  94. hasSessionQuery = `SELECT true FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id=$2`
  95. getManySessionQueryPostgres = `SELECT their_id, session FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id = ANY($2)`
  96. getManySessionQueryGeneric = `SELECT their_id, session FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id IN (%s)`
  97. putSessionQuery = `
  98. INSERT INTO whatsmeow_sessions (our_jid, their_id, session) VALUES ($1, $2, $3)
  99. ON CONFLICT (our_jid, their_id) DO UPDATE SET session=excluded.session
  100. `
  101. deleteAllSessionsQuery = `DELETE FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id LIKE $2`
  102. deleteSessionQuery = `DELETE FROM whatsmeow_sessions WHERE our_jid=$1 AND their_id=$2`
  103. migratePNToLIDSessionsQuery = `
  104. INSERT INTO whatsmeow_sessions (our_jid, their_id, session)
  105. SELECT our_jid, replace(their_id, $2, $3), session
  106. FROM whatsmeow_sessions
  107. WHERE our_jid=$1 AND their_id LIKE $2 || ':%'
  108. ON CONFLICT (our_jid, their_id) DO UPDATE SET session=excluded.session
  109. `
  110. deleteAllIdentityKeysQuery = `DELETE FROM whatsmeow_identity_keys WHERE our_jid=$1 AND their_id LIKE $2`
  111. migratePNToLIDIdentityKeysQuery = `
  112. INSERT INTO whatsmeow_identity_keys (our_jid, their_id, identity)
  113. SELECT our_jid, replace(their_id, $2, $3), identity
  114. FROM whatsmeow_identity_keys
  115. WHERE our_jid=$1 AND their_id LIKE $2 || ':%'
  116. ON CONFLICT (our_jid, their_id) DO UPDATE SET identity=excluded.identity
  117. `
  118. deleteAllSenderKeysQuery = `DELETE FROM whatsmeow_sender_keys WHERE our_jid=$1 AND sender_id LIKE $2`
  119. migratePNToLIDSenderKeysQuery = `
  120. INSERT INTO whatsmeow_sender_keys (our_jid, chat_id, sender_id, sender_key)
  121. SELECT our_jid, chat_id, replace(sender_id, $2, $3), sender_key
  122. FROM whatsmeow_sender_keys
  123. WHERE our_jid=$1 AND sender_id LIKE $2 || ':%'
  124. ON CONFLICT (our_jid, chat_id, sender_id) DO UPDATE SET sender_key=excluded.sender_key
  125. `
  126. )
  127. func (s *SQLStore) GetSession(ctx context.Context, address string) (session []byte, err error) {
  128. err = s.db.QueryRow(ctx, getSessionQuery, s.JID, address).Scan(&session)
  129. if errors.Is(err, sql.ErrNoRows) {
  130. err = nil
  131. }
  132. return
  133. }
  134. func (s *SQLStore) HasSession(ctx context.Context, address string) (has bool, err error) {
  135. err = s.db.QueryRow(ctx, hasSessionQuery, s.JID, address).Scan(&has)
  136. if errors.Is(err, sql.ErrNoRows) {
  137. err = nil
  138. }
  139. return
  140. }
  141. type addressSessionTuple struct {
  142. Address string
  143. Session []byte
  144. }
  145. var sessionScanner = dbutil.ConvertRowFn[addressSessionTuple](func(row dbutil.Scannable) (out addressSessionTuple, err error) {
  146. err = row.Scan(&out.Address, &out.Session)
  147. return
  148. })
  149. func (s *SQLStore) GetManySessions(ctx context.Context, addresses []string) (map[string][]byte, error) {
  150. if len(addresses) == 0 {
  151. return nil, nil
  152. }
  153. var rows dbutil.Rows
  154. var err error
  155. if s.db.Dialect == dbutil.Postgres && PostgresArrayWrapper != nil {
  156. rows, err = s.db.Query(ctx, getManySessionQueryPostgres, s.JID, PostgresArrayWrapper(addresses))
  157. } else {
  158. args := make([]any, len(addresses)+1)
  159. placeholders := make([]string, len(addresses))
  160. args[0] = s.JID
  161. for i, addr := range addresses {
  162. args[i+1] = addr
  163. placeholders[i] = fmt.Sprintf("$%d", i+2)
  164. }
  165. rows, err = s.db.Query(ctx, fmt.Sprintf(getManySessionQueryGeneric, strings.Join(placeholders, ",")), args...)
  166. }
  167. result := make(map[string][]byte, len(addresses))
  168. for _, addr := range addresses {
  169. result[addr] = nil
  170. }
  171. err = sessionScanner.NewRowIter(rows, err).Iter(func(tuple addressSessionTuple) (bool, error) {
  172. result[tuple.Address] = tuple.Session
  173. return true, nil
  174. })
  175. if err != nil {
  176. return nil, err
  177. }
  178. return result, nil
  179. }
  180. func (s *SQLStore) PutManySessions(ctx context.Context, sessions map[string][]byte) error {
  181. return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  182. for addr, sess := range sessions {
  183. err := s.PutSession(ctx, addr, sess)
  184. if err != nil {
  185. return err
  186. }
  187. }
  188. return nil
  189. })
  190. }
  191. func (s *SQLStore) PutSession(ctx context.Context, address string, session []byte) error {
  192. _, err := s.db.Exec(ctx, putSessionQuery, s.JID, address, session)
  193. return err
  194. }
  195. func (s *SQLStore) DeleteAllSessions(ctx context.Context, phone string) error {
  196. return s.deleteAllSessions(ctx, phone)
  197. }
  198. func (s *SQLStore) deleteAllSessions(ctx context.Context, phone string) error {
  199. _, err := s.db.Exec(ctx, deleteAllSessionsQuery, s.JID, phone+":%")
  200. return err
  201. }
  202. func (s *SQLStore) deleteAllSenderKeys(ctx context.Context, phone string) error {
  203. _, err := s.db.Exec(ctx, deleteAllSenderKeysQuery, s.JID, phone+":%")
  204. return err
  205. }
  206. func (s *SQLStore) deleteAllIdentityKeys(ctx context.Context, phone string) error {
  207. _, err := s.db.Exec(ctx, deleteAllIdentityKeysQuery, s.JID, phone+":%")
  208. return err
  209. }
  210. func (s *SQLStore) DeleteSession(ctx context.Context, address string) error {
  211. _, err := s.db.Exec(ctx, deleteSessionQuery, s.JID, address)
  212. return err
  213. }
  214. func (s *SQLStore) MigratePNToLID(ctx context.Context, pn, lid types.JID) error {
  215. pnSignal := pn.SignalAddressUser()
  216. if !s.migratedPNSessionsCache.Add(pnSignal) {
  217. return nil
  218. }
  219. var sessionsUpdated, identityKeysUpdated, senderKeysUpdated int64
  220. lidSignal := lid.SignalAddressUser()
  221. err := s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  222. res, err := s.db.Exec(ctx, migratePNToLIDSessionsQuery, s.JID, pnSignal, lidSignal)
  223. if err != nil {
  224. return fmt.Errorf("failed to migrate sessions: %w", err)
  225. }
  226. sessionsUpdated, err = res.RowsAffected()
  227. if err != nil {
  228. return fmt.Errorf("failed to get rows affected for sessions: %w", err)
  229. }
  230. err = s.deleteAllSessions(ctx, pnSignal)
  231. if err != nil {
  232. return fmt.Errorf("failed to delete extra sessions: %w", err)
  233. }
  234. res, err = s.db.Exec(ctx, migratePNToLIDIdentityKeysQuery, s.JID, pnSignal, lidSignal)
  235. if err != nil {
  236. return fmt.Errorf("failed to migrate identity keys: %w", err)
  237. }
  238. identityKeysUpdated, err = res.RowsAffected()
  239. if err != nil {
  240. return fmt.Errorf("failed to get rows affected for identity keys: %w", err)
  241. }
  242. err = s.deleteAllIdentityKeys(ctx, pnSignal)
  243. if err != nil {
  244. return fmt.Errorf("failed to delete extra identity keys: %w", err)
  245. }
  246. res, err = s.db.Exec(ctx, migratePNToLIDSenderKeysQuery, s.JID, pnSignal, lidSignal)
  247. if err != nil {
  248. return fmt.Errorf("failed to migrate sender keys: %w", err)
  249. }
  250. senderKeysUpdated, err = res.RowsAffected()
  251. if err != nil {
  252. return fmt.Errorf("failed to get rows affected for sender keys: %w", err)
  253. }
  254. err = s.deleteAllSenderKeys(ctx, pnSignal)
  255. if err != nil {
  256. return fmt.Errorf("failed to delete extra sender keys: %w", err)
  257. }
  258. return nil
  259. })
  260. if err != nil {
  261. return err
  262. }
  263. if sessionsUpdated > 0 || senderKeysUpdated > 0 || identityKeysUpdated > 0 {
  264. s.log.Infof("Migrated %d sessions, %d identity keys and %d sender keys from %s to %s", sessionsUpdated, identityKeysUpdated, senderKeysUpdated, pnSignal, lidSignal)
  265. } else {
  266. s.log.Debugf("No sessions or sender keys found to migrate from %s to %s", pnSignal, lidSignal)
  267. }
  268. return nil
  269. }
  270. const (
  271. getLastPreKeyIDQuery = `SELECT MAX(key_id) FROM whatsmeow_pre_keys WHERE jid=$1`
  272. insertPreKeyQuery = `INSERT INTO whatsmeow_pre_keys (jid, key_id, key, uploaded) VALUES ($1, $2, $3, $4)`
  273. getUnuploadedPreKeysQuery = `SELECT key_id, key FROM whatsmeow_pre_keys WHERE jid=$1 AND uploaded=false ORDER BY key_id LIMIT $2`
  274. getPreKeyQuery = `SELECT key_id, key FROM whatsmeow_pre_keys WHERE jid=$1 AND key_id=$2`
  275. deletePreKeyQuery = `DELETE FROM whatsmeow_pre_keys WHERE jid=$1 AND key_id=$2`
  276. markPreKeysAsUploadedQuery = `UPDATE whatsmeow_pre_keys SET uploaded=true WHERE jid=$1 AND key_id<=$2`
  277. getUploadedPreKeyCountQuery = `SELECT COUNT(*) FROM whatsmeow_pre_keys WHERE jid=$1 AND uploaded=true`
  278. )
  279. func (s *SQLStore) genOnePreKey(ctx context.Context, id uint32, markUploaded bool) (*keys.PreKey, error) {
  280. key := keys.NewPreKey(id)
  281. _, err := s.db.Exec(ctx, insertPreKeyQuery, s.JID, key.KeyID, key.Priv[:], markUploaded)
  282. return key, err
  283. }
  284. func (s *SQLStore) getNextPreKeyID(ctx context.Context) (uint32, error) {
  285. var lastKeyID sql.NullInt32
  286. err := s.db.QueryRow(ctx, getLastPreKeyIDQuery, s.JID).Scan(&lastKeyID)
  287. if err != nil {
  288. return 0, fmt.Errorf("failed to query next prekey ID: %w", err)
  289. }
  290. return uint32(lastKeyID.Int32) + 1, nil
  291. }
  292. func (s *SQLStore) GenOnePreKey(ctx context.Context) (*keys.PreKey, error) {
  293. s.preKeyLock.Lock()
  294. defer s.preKeyLock.Unlock()
  295. nextKeyID, err := s.getNextPreKeyID(ctx)
  296. if err != nil {
  297. return nil, err
  298. }
  299. return s.genOnePreKey(ctx, nextKeyID, true)
  300. }
  301. func (s *SQLStore) GetOrGenPreKeys(ctx context.Context, count uint32) ([]*keys.PreKey, error) {
  302. s.preKeyLock.Lock()
  303. defer s.preKeyLock.Unlock()
  304. res, err := s.db.Query(ctx, getUnuploadedPreKeysQuery, s.JID, count)
  305. if err != nil {
  306. return nil, fmt.Errorf("failed to query existing prekeys: %w", err)
  307. }
  308. newKeys := make([]*keys.PreKey, count)
  309. var existingCount uint32
  310. for res.Next() {
  311. var key *keys.PreKey
  312. key, err = scanPreKey(res)
  313. if err != nil {
  314. return nil, err
  315. } else if key != nil {
  316. newKeys[existingCount] = key
  317. existingCount++
  318. }
  319. }
  320. if existingCount < uint32(len(newKeys)) {
  321. var nextKeyID uint32
  322. nextKeyID, err = s.getNextPreKeyID(ctx)
  323. if err != nil {
  324. return nil, err
  325. }
  326. for i := existingCount; i < count; i++ {
  327. newKeys[i], err = s.genOnePreKey(ctx, nextKeyID, false)
  328. if err != nil {
  329. return nil, fmt.Errorf("failed to generate prekey: %w", err)
  330. }
  331. nextKeyID++
  332. }
  333. }
  334. return newKeys, nil
  335. }
  336. func scanPreKey(row dbutil.Scannable) (*keys.PreKey, error) {
  337. var priv []byte
  338. var id uint32
  339. err := row.Scan(&id, &priv)
  340. if errors.Is(err, sql.ErrNoRows) {
  341. return nil, nil
  342. } else if err != nil {
  343. return nil, err
  344. } else if len(priv) != 32 {
  345. return nil, ErrInvalidLength
  346. }
  347. return &keys.PreKey{
  348. KeyPair: *keys.NewKeyPairFromPrivateKey(*(*[32]byte)(priv)),
  349. KeyID: id,
  350. }, nil
  351. }
  352. func (s *SQLStore) GetPreKey(ctx context.Context, id uint32) (*keys.PreKey, error) {
  353. return scanPreKey(s.db.QueryRow(ctx, getPreKeyQuery, s.JID, id))
  354. }
  355. func (s *SQLStore) RemovePreKey(ctx context.Context, id uint32) error {
  356. _, err := s.db.Exec(ctx, deletePreKeyQuery, s.JID, id)
  357. return err
  358. }
  359. func (s *SQLStore) MarkPreKeysAsUploaded(ctx context.Context, upToID uint32) error {
  360. _, err := s.db.Exec(ctx, markPreKeysAsUploadedQuery, s.JID, upToID)
  361. return err
  362. }
  363. func (s *SQLStore) UploadedPreKeyCount(ctx context.Context) (count int, err error) {
  364. err = s.db.QueryRow(ctx, getUploadedPreKeyCountQuery, s.JID).Scan(&count)
  365. return
  366. }
  367. const (
  368. getSenderKeyQuery = `SELECT sender_key FROM whatsmeow_sender_keys WHERE our_jid=$1 AND chat_id=$2 AND sender_id=$3`
  369. putSenderKeyQuery = `
  370. INSERT INTO whatsmeow_sender_keys (our_jid, chat_id, sender_id, sender_key) VALUES ($1, $2, $3, $4)
  371. ON CONFLICT (our_jid, chat_id, sender_id) DO UPDATE SET sender_key=excluded.sender_key
  372. `
  373. )
  374. func (s *SQLStore) PutSenderKey(ctx context.Context, group, user string, session []byte) error {
  375. _, err := s.db.Exec(ctx, putSenderKeyQuery, s.JID, group, user, session)
  376. return err
  377. }
  378. func (s *SQLStore) GetSenderKey(ctx context.Context, group, user string) (key []byte, err error) {
  379. err = s.db.QueryRow(ctx, getSenderKeyQuery, s.JID, group, user).Scan(&key)
  380. if errors.Is(err, sql.ErrNoRows) {
  381. err = nil
  382. }
  383. return
  384. }
  385. const (
  386. putAppStateSyncKeyQuery = `
  387. INSERT INTO whatsmeow_app_state_sync_keys (jid, key_id, key_data, timestamp, fingerprint) VALUES ($1, $2, $3, $4, $5)
  388. ON CONFLICT (jid, key_id) DO UPDATE
  389. SET key_data=excluded.key_data, timestamp=excluded.timestamp, fingerprint=excluded.fingerprint
  390. WHERE excluded.timestamp > whatsmeow_app_state_sync_keys.timestamp
  391. `
  392. getAppStateSyncKeyQuery = `SELECT key_data, timestamp, fingerprint FROM whatsmeow_app_state_sync_keys WHERE jid=$1 AND key_id=$2`
  393. getLatestAppStateSyncKeyIDQuery = `SELECT key_id FROM whatsmeow_app_state_sync_keys WHERE jid=$1 ORDER BY timestamp DESC LIMIT 1`
  394. )
  395. func (s *SQLStore) PutAppStateSyncKey(ctx context.Context, id []byte, key store.AppStateSyncKey) error {
  396. _, err := s.db.Exec(ctx, putAppStateSyncKeyQuery, s.JID, id, key.Data, key.Timestamp, key.Fingerprint)
  397. return err
  398. }
  399. func (s *SQLStore) GetAppStateSyncKey(ctx context.Context, id []byte) (*store.AppStateSyncKey, error) {
  400. var key store.AppStateSyncKey
  401. err := s.db.QueryRow(ctx, getAppStateSyncKeyQuery, s.JID, id).Scan(&key.Data, &key.Timestamp, &key.Fingerprint)
  402. if errors.Is(err, sql.ErrNoRows) {
  403. return nil, nil
  404. }
  405. return &key, err
  406. }
  407. func (s *SQLStore) GetLatestAppStateSyncKeyID(ctx context.Context) ([]byte, error) {
  408. var keyID []byte
  409. err := s.db.QueryRow(ctx, getLatestAppStateSyncKeyIDQuery, s.JID).Scan(&keyID)
  410. if errors.Is(err, sql.ErrNoRows) {
  411. return nil, nil
  412. }
  413. return keyID, err
  414. }
  415. const (
  416. putAppStateVersionQuery = `
  417. INSERT INTO whatsmeow_app_state_version (jid, name, version, hash) VALUES ($1, $2, $3, $4)
  418. ON CONFLICT (jid, name) DO UPDATE SET version=excluded.version, hash=excluded.hash
  419. `
  420. getAppStateVersionQuery = `SELECT version, hash FROM whatsmeow_app_state_version WHERE jid=$1 AND name=$2`
  421. deleteAppStateVersionQuery = `DELETE FROM whatsmeow_app_state_version WHERE jid=$1 AND name=$2`
  422. putAppStateMutationMACsQuery = `INSERT INTO whatsmeow_app_state_mutation_macs (jid, name, version, index_mac, value_mac) VALUES `
  423. deleteAppStateMutationMACsQueryPostgres = `DELETE FROM whatsmeow_app_state_mutation_macs WHERE jid=$1 AND name=$2 AND index_mac=ANY($3::bytea[])`
  424. deleteAppStateMutationMACsQueryGeneric = `DELETE FROM whatsmeow_app_state_mutation_macs WHERE jid=$1 AND name=$2 AND index_mac IN `
  425. getAppStateMutationMACQuery = `SELECT value_mac FROM whatsmeow_app_state_mutation_macs WHERE jid=$1 AND name=$2 AND index_mac=$3 ORDER BY version DESC LIMIT 1`
  426. )
  427. func (s *SQLStore) PutAppStateVersion(ctx context.Context, name string, version uint64, hash [128]byte) error {
  428. _, err := s.db.Exec(ctx, putAppStateVersionQuery, s.JID, name, version, hash[:])
  429. return err
  430. }
  431. func (s *SQLStore) GetAppStateVersion(ctx context.Context, name string) (version uint64, hash [128]byte, err error) {
  432. var uncheckedHash []byte
  433. err = s.db.QueryRow(ctx, getAppStateVersionQuery, s.JID, name).Scan(&version, &uncheckedHash)
  434. if errors.Is(err, sql.ErrNoRows) {
  435. // version will be 0 and hash will be an empty array, which is the correct initial state
  436. err = nil
  437. } else if err != nil {
  438. // There's an error, just return it
  439. } else if len(uncheckedHash) != 128 {
  440. // This shouldn't happen
  441. err = ErrInvalidLength
  442. } else {
  443. // No errors, convert hash slice to array
  444. hash = *(*[128]byte)(uncheckedHash)
  445. }
  446. return
  447. }
  448. func (s *SQLStore) DeleteAppStateVersion(ctx context.Context, name string) error {
  449. _, err := s.db.Exec(ctx, deleteAppStateVersionQuery, s.JID, name)
  450. return err
  451. }
  452. func (s *SQLStore) putAppStateMutationMACs(ctx context.Context, name string, version uint64, mutations []store.AppStateMutationMAC) error {
  453. values := make([]any, 3+len(mutations)*2)
  454. queryParts := make([]string, len(mutations))
  455. values[0] = s.JID
  456. values[1] = name
  457. values[2] = version
  458. placeholderSyntax := "($1, $2, $3, $%d, $%d)"
  459. if s.db.Dialect == dbutil.SQLite {
  460. placeholderSyntax = "(?1, ?2, ?3, ?%d, ?%d)"
  461. }
  462. for i, mutation := range mutations {
  463. baseIndex := 3 + i*2
  464. values[baseIndex] = mutation.IndexMAC
  465. values[baseIndex+1] = mutation.ValueMAC
  466. queryParts[i] = fmt.Sprintf(placeholderSyntax, baseIndex+1, baseIndex+2)
  467. }
  468. _, err := s.db.Exec(ctx, putAppStateMutationMACsQuery+strings.Join(queryParts, ","), values...)
  469. return err
  470. }
  471. const mutationBatchSize = 400
  472. func (s *SQLStore) PutAppStateMutationMACs(ctx context.Context, name string, version uint64, mutations []store.AppStateMutationMAC) error {
  473. if len(mutations) == 0 {
  474. return nil
  475. }
  476. return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  477. for slice := range slices.Chunk(mutations, mutationBatchSize) {
  478. err := s.putAppStateMutationMACs(ctx, name, version, slice)
  479. if err != nil {
  480. return err
  481. }
  482. }
  483. return nil
  484. })
  485. }
  486. func (s *SQLStore) DeleteAppStateMutationMACs(ctx context.Context, name string, indexMACs [][]byte) (err error) {
  487. if len(indexMACs) == 0 {
  488. return
  489. }
  490. if s.db.Dialect == dbutil.Postgres && PostgresArrayWrapper != nil {
  491. _, err = s.db.Exec(ctx, deleteAppStateMutationMACsQueryPostgres, s.JID, name, PostgresArrayWrapper(indexMACs))
  492. } else {
  493. args := make([]any, 2+len(indexMACs))
  494. args[0] = s.JID
  495. args[1] = name
  496. queryParts := make([]string, len(indexMACs))
  497. for i, item := range indexMACs {
  498. args[2+i] = item
  499. queryParts[i] = fmt.Sprintf("$%d", i+3)
  500. }
  501. _, err = s.db.Exec(ctx, deleteAppStateMutationMACsQueryGeneric+"("+strings.Join(queryParts, ",")+")", args...)
  502. }
  503. return
  504. }
  505. func (s *SQLStore) GetAppStateMutationMAC(ctx context.Context, name string, indexMAC []byte) (valueMAC []byte, err error) {
  506. err = s.db.QueryRow(ctx, getAppStateMutationMACQuery, s.JID, name, indexMAC).Scan(&valueMAC)
  507. if errors.Is(err, sql.ErrNoRows) {
  508. err = nil
  509. }
  510. return
  511. }
  512. const (
  513. putContactNameQuery = `
  514. INSERT INTO whatsmeow_contacts (our_jid, their_jid, first_name, full_name) VALUES ($1, $2, $3, $4)
  515. ON CONFLICT (our_jid, their_jid) DO UPDATE SET first_name=excluded.first_name, full_name=excluded.full_name
  516. `
  517. putRedactedPhoneQuery = `
  518. INSERT INTO whatsmeow_contacts (our_jid, their_jid, redacted_phone)
  519. VALUES ($1, $2, $3)
  520. ON CONFLICT (our_jid, their_jid) DO UPDATE SET redacted_phone=excluded.redacted_phone
  521. `
  522. putPushNameQuery = `
  523. INSERT INTO whatsmeow_contacts (our_jid, their_jid, push_name) VALUES ($1, $2, $3)
  524. ON CONFLICT (our_jid, their_jid) DO UPDATE SET push_name=excluded.push_name
  525. `
  526. putBusinessNameQuery = `
  527. INSERT INTO whatsmeow_contacts (our_jid, their_jid, business_name) VALUES ($1, $2, $3)
  528. ON CONFLICT (our_jid, their_jid) DO UPDATE SET business_name=excluded.business_name
  529. `
  530. getContactQuery = `
  531. SELECT first_name, full_name, push_name, business_name, redacted_phone FROM whatsmeow_contacts WHERE our_jid=$1 AND their_jid=$2
  532. `
  533. getAllContactsQuery = `
  534. SELECT their_jid, first_name, full_name, push_name, business_name, redacted_phone FROM whatsmeow_contacts WHERE our_jid=$1
  535. `
  536. )
  537. var putContactNamesMassInsertBuilder = dbutil.NewMassInsertBuilder[store.ContactEntry, [1]any](
  538. putContactNameQuery, "($1, $%d, $%d, $%d)",
  539. )
  540. var putRedactedPhonesMassInsertBuilder = dbutil.NewMassInsertBuilder[store.RedactedPhoneEntry, [1]any](
  541. putRedactedPhoneQuery, "($1, $%d, $%d)",
  542. )
  543. func (s *SQLStore) PutPushName(ctx context.Context, user types.JID, pushName string) (bool, string, error) {
  544. s.contactCacheLock.Lock()
  545. defer s.contactCacheLock.Unlock()
  546. cached, err := s.getContact(ctx, user)
  547. if err != nil {
  548. return false, "", err
  549. }
  550. if cached.PushName != pushName {
  551. _, err = s.db.Exec(ctx, putPushNameQuery, s.JID, user, pushName)
  552. if err != nil {
  553. return false, "", err
  554. }
  555. previousName := cached.PushName
  556. cached.PushName = pushName
  557. cached.Found = true
  558. return true, previousName, nil
  559. }
  560. return false, "", nil
  561. }
  562. func (s *SQLStore) PutBusinessName(ctx context.Context, user types.JID, businessName string) (bool, string, error) {
  563. s.contactCacheLock.Lock()
  564. defer s.contactCacheLock.Unlock()
  565. cached, err := s.getContact(ctx, user)
  566. if err != nil {
  567. return false, "", err
  568. }
  569. if cached.BusinessName != businessName {
  570. _, err = s.db.Exec(ctx, putBusinessNameQuery, s.JID, user, businessName)
  571. if err != nil {
  572. return false, "", err
  573. }
  574. previousName := cached.BusinessName
  575. cached.BusinessName = businessName
  576. cached.Found = true
  577. return true, previousName, nil
  578. }
  579. return false, "", nil
  580. }
  581. func (s *SQLStore) PutContactName(ctx context.Context, user types.JID, firstName, fullName string) error {
  582. s.contactCacheLock.Lock()
  583. defer s.contactCacheLock.Unlock()
  584. cached, err := s.getContact(ctx, user)
  585. if err != nil {
  586. return err
  587. }
  588. if cached.FirstName != firstName || cached.FullName != fullName {
  589. _, err = s.db.Exec(ctx, putContactNameQuery, s.JID, user, firstName, fullName)
  590. if err != nil {
  591. return err
  592. }
  593. cached.FirstName = firstName
  594. cached.FullName = fullName
  595. cached.Found = true
  596. }
  597. return nil
  598. }
  599. const contactBatchSize = 300
  600. func (s *SQLStore) PutAllContactNames(ctx context.Context, contacts []store.ContactEntry) error {
  601. if len(contacts) == 0 {
  602. return nil
  603. }
  604. origLen := len(contacts)
  605. contacts = exslices.DeduplicateUnsortedOverwriteFunc(contacts, func(t store.ContactEntry) types.JID {
  606. return t.JID
  607. })
  608. if origLen != len(contacts) {
  609. s.log.Warnf("%d duplicate contacts found in PutAllContactNames", origLen-len(contacts))
  610. }
  611. err := s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  612. for slice := range slices.Chunk(contacts, contactBatchSize) {
  613. query, vars := putContactNamesMassInsertBuilder.Build([1]any{s.JID}, slice)
  614. _, err := s.db.Exec(ctx, query, vars...)
  615. if err != nil {
  616. return err
  617. }
  618. }
  619. return nil
  620. })
  621. if err != nil {
  622. return err
  623. }
  624. s.contactCacheLock.Lock()
  625. // Just clear the cache, fetching pushnames and business names would be too much effort
  626. s.contactCache = make(map[types.JID]*types.ContactInfo)
  627. s.contactCacheLock.Unlock()
  628. return nil
  629. }
  630. func (s *SQLStore) PutManyRedactedPhones(ctx context.Context, entries []store.RedactedPhoneEntry) error {
  631. if len(entries) == 0 {
  632. return nil
  633. }
  634. origLen := len(entries)
  635. entries = exslices.DeduplicateUnsortedOverwriteFunc(entries, func(t store.RedactedPhoneEntry) types.JID {
  636. return t.JID
  637. })
  638. if origLen != len(entries) {
  639. s.log.Warnf("%d duplicate contacts found in PutManyRedactedPhones", origLen-len(entries))
  640. }
  641. err := s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  642. for slice := range slices.Chunk(entries, contactBatchSize) {
  643. query, vars := putRedactedPhonesMassInsertBuilder.Build([1]any{s.JID}, slice)
  644. _, err := s.db.Exec(ctx, query, vars...)
  645. if err != nil {
  646. return err
  647. }
  648. }
  649. return nil
  650. })
  651. if err != nil {
  652. return err
  653. }
  654. s.contactCacheLock.Lock()
  655. for _, entry := range entries {
  656. if cached, ok := s.contactCache[entry.JID]; ok && cached.RedactedPhone == entry.RedactedPhone {
  657. continue
  658. }
  659. delete(s.contactCache, entry.JID)
  660. }
  661. s.contactCacheLock.Unlock()
  662. return nil
  663. }
  664. func (s *SQLStore) getContact(ctx context.Context, user types.JID) (*types.ContactInfo, error) {
  665. cached, ok := s.contactCache[user]
  666. if ok {
  667. return cached, nil
  668. }
  669. var first, full, push, business, redactedPhone sql.NullString
  670. err := s.db.QueryRow(ctx, getContactQuery, s.JID, user).Scan(&first, &full, &push, &business, &redactedPhone)
  671. if err != nil && !errors.Is(err, sql.ErrNoRows) {
  672. return nil, err
  673. }
  674. info := &types.ContactInfo{
  675. Found: err == nil,
  676. FirstName: first.String,
  677. FullName: full.String,
  678. PushName: push.String,
  679. BusinessName: business.String,
  680. RedactedPhone: redactedPhone.String,
  681. }
  682. s.contactCache[user] = info
  683. return info, nil
  684. }
  685. func (s *SQLStore) GetContact(ctx context.Context, user types.JID) (types.ContactInfo, error) {
  686. s.contactCacheLock.Lock()
  687. info, err := s.getContact(ctx, user)
  688. s.contactCacheLock.Unlock()
  689. if err != nil {
  690. return types.ContactInfo{}, err
  691. }
  692. return *info, nil
  693. }
  694. func (s *SQLStore) GetAllContacts(ctx context.Context) (map[types.JID]types.ContactInfo, error) {
  695. s.contactCacheLock.Lock()
  696. defer s.contactCacheLock.Unlock()
  697. rows, err := s.db.Query(ctx, getAllContactsQuery, s.JID)
  698. if err != nil {
  699. return nil, err
  700. }
  701. output := make(map[types.JID]types.ContactInfo, len(s.contactCache))
  702. for rows.Next() {
  703. var jid types.JID
  704. var first, full, push, business, redactedPhone sql.NullString
  705. err = rows.Scan(&jid, &first, &full, &push, &business, &redactedPhone)
  706. if err != nil {
  707. return nil, fmt.Errorf("error scanning row: %w", err)
  708. }
  709. info := types.ContactInfo{
  710. Found: true,
  711. FirstName: first.String,
  712. FullName: full.String,
  713. PushName: push.String,
  714. BusinessName: business.String,
  715. RedactedPhone: redactedPhone.String,
  716. }
  717. output[jid] = info
  718. s.contactCache[jid] = &info
  719. }
  720. return output, nil
  721. }
  722. const (
  723. putChatSettingQuery = `
  724. INSERT INTO whatsmeow_chat_settings (our_jid, chat_jid, %[1]s) VALUES ($1, $2, $3)
  725. ON CONFLICT (our_jid, chat_jid) DO UPDATE SET %[1]s=excluded.%[1]s
  726. `
  727. getChatSettingsQuery = `
  728. SELECT muted_until, pinned, archived FROM whatsmeow_chat_settings WHERE our_jid=$1 AND chat_jid=$2
  729. `
  730. )
  731. func (s *SQLStore) PutMutedUntil(ctx context.Context, chat types.JID, mutedUntil time.Time) error {
  732. var val int64
  733. if mutedUntil == store.MutedForever {
  734. val = -1
  735. } else if !mutedUntil.IsZero() {
  736. val = mutedUntil.Unix()
  737. }
  738. _, err := s.db.Exec(ctx, fmt.Sprintf(putChatSettingQuery, "muted_until"), s.JID, chat, val)
  739. return err
  740. }
  741. func (s *SQLStore) PutPinned(ctx context.Context, chat types.JID, pinned bool) error {
  742. _, err := s.db.Exec(ctx, fmt.Sprintf(putChatSettingQuery, "pinned"), s.JID, chat, pinned)
  743. return err
  744. }
  745. func (s *SQLStore) PutArchived(ctx context.Context, chat types.JID, archived bool) error {
  746. _, err := s.db.Exec(ctx, fmt.Sprintf(putChatSettingQuery, "archived"), s.JID, chat, archived)
  747. return err
  748. }
  749. func (s *SQLStore) GetChatSettings(ctx context.Context, chat types.JID) (settings types.LocalChatSettings, err error) {
  750. var mutedUntil int64
  751. err = s.db.QueryRow(ctx, getChatSettingsQuery, s.JID, chat).Scan(&mutedUntil, &settings.Pinned, &settings.Archived)
  752. if errors.Is(err, sql.ErrNoRows) {
  753. err = nil
  754. } else if err != nil {
  755. return
  756. } else {
  757. settings.Found = true
  758. }
  759. if mutedUntil < 0 {
  760. settings.MutedUntil = store.MutedForever
  761. } else if mutedUntil > 0 {
  762. settings.MutedUntil = time.Unix(mutedUntil, 0)
  763. }
  764. return
  765. }
  766. const (
  767. putMsgSecret = `
  768. INSERT INTO whatsmeow_message_secrets (our_jid, chat_jid, sender_jid, message_id, key)
  769. VALUES ($1, $2, $3, $4, $5)
  770. ON CONFLICT (our_jid, chat_jid, sender_jid, message_id) DO NOTHING
  771. `
  772. getMsgSecret = `
  773. SELECT key, sender_jid
  774. FROM whatsmeow_message_secrets
  775. WHERE our_jid=$1 AND (chat_jid=$2 OR chat_jid=(
  776. CASE
  777. WHEN $2 LIKE '%@lid'
  778. THEN (SELECT pn || '@s.whatsapp.net' FROM whatsmeow_lid_map WHERE lid=replace($2, '@lid', ''))
  779. WHEN $2 LIKE '%@s.whatsapp.net'
  780. THEN (SELECT lid || '@lid' FROM whatsmeow_lid_map WHERE pn=replace($2, '@s.whatsapp.net', ''))
  781. END
  782. )) AND message_id=$4 AND (sender_jid=$3 OR sender_jid=(
  783. CASE
  784. WHEN $3 LIKE '%@lid'
  785. THEN (SELECT pn || '@s.whatsapp.net' FROM whatsmeow_lid_map WHERE lid=replace($3, '@lid', ''))
  786. WHEN $3 LIKE '%@s.whatsapp.net'
  787. THEN (SELECT lid || '@lid' FROM whatsmeow_lid_map WHERE pn=replace($3, '@s.whatsapp.net', ''))
  788. END
  789. ))
  790. `
  791. )
  792. func (s *SQLStore) PutMessageSecrets(ctx context.Context, inserts []store.MessageSecretInsert) (err error) {
  793. if len(inserts) == 0 {
  794. return nil
  795. }
  796. return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
  797. for _, insert := range inserts {
  798. _, err = s.db.Exec(ctx, putMsgSecret, s.JID, insert.Chat.ToNonAD(), insert.Sender.ToNonAD(), insert.ID, insert.Secret)
  799. if err != nil {
  800. return err
  801. }
  802. }
  803. return nil
  804. })
  805. }
  806. func (s *SQLStore) PutMessageSecret(ctx context.Context, chat, sender types.JID, id types.MessageID, secret []byte) (err error) {
  807. _, err = s.db.Exec(ctx, putMsgSecret, s.JID, chat.ToNonAD(), sender.ToNonAD(), id, secret)
  808. return
  809. }
  810. func (s *SQLStore) GetMessageSecret(ctx context.Context, chat, sender types.JID, id types.MessageID) (secret []byte, realSender types.JID, err error) {
  811. err = s.db.QueryRow(ctx, getMsgSecret, s.JID, chat.ToNonAD(), sender.ToNonAD(), id).Scan(&secret, &realSender)
  812. if errors.Is(err, sql.ErrNoRows) {
  813. err = nil
  814. }
  815. return
  816. }
  817. const (
  818. putPrivacyTokens = `
  819. INSERT INTO whatsmeow_privacy_tokens (our_jid, their_jid, token, timestamp)
  820. VALUES ($1, $2, $3, $4)
  821. ON CONFLICT (our_jid, their_jid) DO UPDATE SET token=EXCLUDED.token, timestamp=EXCLUDED.timestamp
  822. `
  823. getPrivacyToken = `
  824. SELECT token, timestamp FROM whatsmeow_privacy_tokens WHERE our_jid=$1 AND (their_jid=$2 OR their_jid=(
  825. CASE
  826. WHEN $2 LIKE '%@lid'
  827. THEN (SELECT pn || '@s.whatsapp.net' FROM whatsmeow_lid_map WHERE lid=replace($2, '@lid', ''))
  828. WHEN $2 LIKE '%@s.whatsapp.net'
  829. THEN (SELECT lid || '@lid' FROM whatsmeow_lid_map WHERE pn=replace($2, '@s.whatsapp.net', ''))
  830. ELSE $2
  831. END
  832. ))
  833. ORDER BY timestamp DESC LIMIT 1
  834. `
  835. )
  836. func (s *SQLStore) PutPrivacyTokens(ctx context.Context, tokens ...store.PrivacyToken) error {
  837. args := make([]any, 1+len(tokens)*3)
  838. placeholders := make([]string, len(tokens))
  839. args[0] = s.JID
  840. for i, token := range tokens {
  841. args[i*3+1] = token.User.ToNonAD().String()
  842. args[i*3+2] = token.Token
  843. args[i*3+3] = token.Timestamp.Unix()
  844. placeholders[i] = fmt.Sprintf("($1, $%d, $%d, $%d)", i*3+2, i*3+3, i*3+4)
  845. }
  846. query := strings.ReplaceAll(putPrivacyTokens, "($1, $2, $3, $4)", strings.Join(placeholders, ","))
  847. _, err := s.db.Exec(ctx, query, args...)
  848. return err
  849. }
  850. func (s *SQLStore) GetPrivacyToken(ctx context.Context, user types.JID) (*store.PrivacyToken, error) {
  851. var token store.PrivacyToken
  852. token.User = user.ToNonAD()
  853. var ts int64
  854. err := s.db.QueryRow(ctx, getPrivacyToken, s.JID, token.User).Scan(&token.Token, &ts)
  855. if errors.Is(err, sql.ErrNoRows) {
  856. return nil, nil
  857. } else if err != nil {
  858. return nil, err
  859. } else {
  860. token.Timestamp = time.Unix(ts, 0)
  861. return &token, nil
  862. }
  863. }
  864. const (
  865. getBufferedEventQuery = `
  866. SELECT plaintext, server_timestamp, insert_timestamp FROM whatsmeow_event_buffer WHERE our_jid = $1 AND ciphertext_hash = $2
  867. `
  868. putBufferedEventQuery = `
  869. INSERT INTO whatsmeow_event_buffer (our_jid, ciphertext_hash, plaintext, server_timestamp, insert_timestamp)
  870. VALUES ($1, $2, $3, $4, $5)
  871. `
  872. clearBufferedEventPlaintextQuery = `
  873. UPDATE whatsmeow_event_buffer SET plaintext = NULL WHERE our_jid = $1 AND ciphertext_hash = $2
  874. `
  875. deleteOldBufferedHashesQuery = `
  876. DELETE FROM whatsmeow_event_buffer WHERE insert_timestamp < $1
  877. `
  878. )
  879. func (s *SQLStore) GetBufferedEvent(ctx context.Context, ciphertextHash [32]byte) (*store.BufferedEvent, error) {
  880. var insertTimeMS, serverTimeSeconds int64
  881. var buf store.BufferedEvent
  882. err := s.db.QueryRow(ctx, getBufferedEventQuery, s.JID, ciphertextHash[:]).Scan(&buf.Plaintext, &serverTimeSeconds, &insertTimeMS)
  883. if errors.Is(err, sql.ErrNoRows) {
  884. return nil, nil
  885. } else if err != nil {
  886. return nil, err
  887. }
  888. buf.ServerTime = time.Unix(serverTimeSeconds, 0)
  889. buf.InsertTime = time.UnixMilli(insertTimeMS)
  890. return &buf, nil
  891. }
  892. func (s *SQLStore) PutBufferedEvent(ctx context.Context, ciphertextHash [32]byte, plaintext []byte, serverTimestamp time.Time) error {
  893. _, err := s.db.Exec(ctx, putBufferedEventQuery, s.JID, ciphertextHash[:], plaintext, serverTimestamp.Unix(), time.Now().UnixMilli())
  894. return err
  895. }
  896. func (s *SQLStore) DoDecryptionTxn(ctx context.Context, fn func(context.Context) error) error {
  897. ctx = context.WithValue(ctx, dbutil.ContextKeyDoTxnCallerSkip, 2)
  898. return s.db.DoTxn(ctx, nil, fn)
  899. }
  900. func (s *SQLStore) ClearBufferedEventPlaintext(ctx context.Context, ciphertextHash [32]byte) error {
  901. _, err := s.db.Exec(ctx, clearBufferedEventPlaintextQuery, s.JID, ciphertextHash[:])
  902. return err
  903. }
  904. func (s *SQLStore) DeleteOldBufferedHashes(ctx context.Context) error {
  905. // The WhatsApp servers only buffer events for 14 days,
  906. // so we can safely delete anything older than that.
  907. _, err := s.db.Exec(ctx, deleteOldBufferedHashesQuery, time.Now().Add(-14*24*time.Hour).UnixMilli())
  908. return err
  909. }