我想通过使用redis来提高应用程序的性能。我成功地将它用于缓存和计数器,现在试图用它来搜索我的朋友活动。使用redis获取我的朋友活动(redis JOIN替代方案)
我们有2个表:
- 活动(用户,活动,时间戳)
- 朋友(用户,朋友)
我需要能够得到排序我的朋友活动时间戳。在SQL它可能看起来像:
SELECT act.activity, act.timestamp FROM activities act
JOIN friends fr ON fr.friend=act.user AND fr.user="{user}"
WHERE act.timestamp < {last}
ORDER BY act.timestamp DESC
LIMIT {limit}
UPD要点:https://gist.github.com/nanvel/8725b9c71c0040b0472b
UPD时间:https://gist.github.com/nanvel/8725b9c71c0040b0472b#file-timings-sqlite-vs-redis
我的实现与Redis的(考虑,用户可以有上千个活动的朋友和数百个):
import os.path
import sqlite3
import redis
import time
import uuid
class RedisSearch(object):
@property
def conn(self):
if hasattr(self, '_conn'):
return self._conn
self._conn = redis.StrictRedis(host='localhost')
return self._conn
def clean(self):
for key in self.conn.keys('test:*'):
self.conn.delete(key)
def add_friend(self, user, friend):
self.conn.sadd('test:friends:{user}'.format(user=user), friend)
def add_activity(self, user, activity, timestamp):
pipe = self.conn.pipeline()
pipe.zadd('test:last_user_activity', timestamp, user)
pipe.zadd('test:user_activities:{user}'.format(user=user), timestamp, activity)
pipe.execute()
def search(self, user, last, limit):
tmp_key = 'text:tmp:{user}'.format(user=user)
pipe = self.conn.pipeline(False)
pipe.zinterstore(
dest=tmp_key,
keys=['test:last_user_activity', 'test:friends:{user}'.format(user=user)],
aggregate='max')
pipe.zrevrange(tmp_key, 0, -1)
pipe.delete(tmp_key)
users = pipe.execute()[1]
if not users:
return []
user_keys = []
for u in users:
user_keys.append('test:user_activities:{user}'.format(user=u))
pipe = self.conn.pipeline(False)
pipe.zunionstore(dest=tmp_key, keys=user_keys, aggregate='max')
pipe.zremrangebyscore(tmp_key, min=last, max=get_timestamp())
pipe.zrevrange(tmp_key, 0, limit-1)
pipe.delete(tmp_key)
return pipe.execute()[2]
def get_timestamp():
return int(time.time() * 1000000)
if __name__ == '__main__':
db_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'activities.sqlite3')
con = sqlite3.connect(db_path)
redis_search = RedisSearch()
redis_search.clean()
with con:
cur = con.cursor()
cur.executescript(u"""
DROP TABLE IF EXISTS activities;
DROP TABLE IF EXISTS friends;
CREATE TABLE activities(id INTEGER PRIMARY KEY, user VARCHAR(31), activity VARCHAR(31), timestamp INTEGER);
CREATE TABLE friends(id INTEGER PRIMARY KEY, user VARCHAR(31), friend VARCHAR(31));
""")
authors = []
for i in xrange(100):
# create 100 activities
author = uuid.uuid4()
authors.append(author)
activity = uuid.uuid4()
timestamp = get_timestamp()
cur.executescript(u"""
INSERT INTO activities(user, activity, timestamp) VALUES("{user}", "{activity}", {timestamp});
""".format(user=author, activity=activity, timestamp=timestamp))
redis_search.add_activity(user=author, activity=activity, timestamp=timestamp)
user = uuid.uuid4()
for i in xrange(100):
# create friends
friend = uuid.uuid4()
cur.executescript(u"""
INSERT INTO friends(user, friend) VALUES("{user}", "{friend}");
""".format(user=user, friend=friend))
redis_search.add_friend(user=user, friend=friend)
# more friends
for i in xrange(100):
u = uuid.uuid4()
f = uuid.uuid4()
cur.executescript(u"""
INSERT INTO friends(user, friend) VALUES("{user}", "{friend}");
""".format(user=u, friend=f))
redis_search.add_friend(user=u, friend=f)
# add outhors to friends
for i in xrange(20):
cur.executescript(u"""
INSERT INTO friends(user, friend) VALUES("{user}", "{friend}");
""".format(user=user, friend=authors[i]))
redis_search.add_friend(user=user, friend=authors[i])
# select my friends activities
last = get_timestamp()
for i in xrange(2):
print '--- page {n} ---'.format(n=i + 1)
cur.execute(u"""
SELECT act.activity, act.timestamp from activities act
JOIN friends fr ON fr.friend=act.user AND fr.user="{user}"
WHERE act.timestamp < {last}
ORDER BY act.timestamp DESC
LIMIT {limit}
""".format(user=user, last=last, limit=10))
new_last = last
for r, timestamp in cur:
print r
new_last = timestamp
print '---'
for r in redis_search.search(user=user, last=last, limit=10):
print r
last = new_last
非常感谢您的回答!
UPD:我改写了搜索功能与卢阿:
def search(self, user, last, limit):
SCRIPT = """
redis.call("ZINTERSTORE", "test:tmp:" .. ARGV[1], 2, "test:last_user_activity", "test:friends:" .. ARGV[1], "AGGREGATE", "MAX")
local users = redis.call("ZREVRANGE", "test:tmp:" .. ARGV[1], 0, -1, "WITHSCORES")
if users == nil then
return {}
end
redis.call("DEL", "test:tmp:" .. ARGV[1])
local counter = 0
local lastval = users[1]
for k, v in pairs(users) do
if (counter % 2 == 0) then
lastval = v
else
redis.call("ZUNIONSTORE", "test:tmp:" .. ARGV[1], 2, "test:tmp:" .. ARGV[1], "test:user_activities:" .. lastval, "AGGREGATE", "MAX")
redis.call("ZREMRANGEBYSCORE", "test:tmp:" .. ARGV[1], ARGV[2], ARGV[3])
if redis.call("ZCOUNT", "test:tmp:" .. ARGV[1], v, ARGV[2]) >= tonumber(ARGV[4]) then break end
end
counter = counter + 1
end
local users = redis.call("ZREVRANGE", "test:tmp:" .. ARGV[1], 0, ARGV[4] - 1)
redis.call("DEL", "test:tmp:" .. ARGV[1])
return users
"""
return self.conn.eval(SCRIPT, 0, user, last, get_timestamp(), limit)
UPD 2016年5月19日
我这样做是错的,也有相关链接,以正确的解决方案:
- How Instagram Feeds Work: Celery and RabbitMQ
- Stream-Framework
- The Architecture Twitter Uses To Deal With 150M Active Users, 300K QPS, A 22 MB/S Firehose, And Send Tweets In Under 5 Seconds
- Facebook’s Instagram: Making the Switch to Cassandra from Redis, a 75% ‘Insta’ Savings