EzDev.org

redis-py

Redis Python Client


redis-py raises AttributeError

In what circumstances would redis-py raise the following AttributeError exception?
Isn't redis-py built by design to raise only redis.exceptions.RedisError based exceptions?
What would be a reasonable handling logic?

Traceback (most recent call last):
  File "c:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "c:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File C:\Users\Administrator\Documents\my_proj\my_module.py", line 33, in inner
    ret = protected_func(*args, **kwargs)
  File C:\Users\Administrator\Documents\my_proj\my_module.py", line 104, in _listen
    for message in _pubsub.listen():
  File "C:\Users\Administrator\virtual_environments\my_env\lib\site-packages\redis\client.py", line 1555, in listen
    r = self.parse_response()
  File "C:\Users\Administrator\virtual_environments\my_env\lib\site-packages\redis\client.py", line 1499, in parse_response
    response = self.connection.read_response()
  File "C:\Users\Administrator\virtual_environments\my_env\lib\site-packages\redis\connection.py", line 306, in read_response
    response = self._parser.read_response()
  File "C:\Users\Administrator\virtual_environments\my_env\lib\site-packages\redis\connection.py", line 104, in read_response
    response = self.read()
  File "C:\Users\Administrator\virtual_environments\my_env\lib\site-packages\redis\connection.py", line 89, in read
    return self._fp.readline()[:-2]
AttributeError: 'NoneType' object has no attribute 'readline'

Source: (StackOverflow)

Use sorted set to notifications system

I am using redis sorted sets to save user notifications. But as i never did a notification system, I am asking about my logic.

I need to save 4 things for each notification.

  • post_id
  • post_type - A/B
  • visible - Y/N
  • checked - Y/N

My question is how can I store this type of structure in sorted sets?

ZADD users_notifications:1 10 1_A_Y_Y 
ZADD users_notifications:1 20 2_A_Y_N
....

There is a better way to do this type of stuff in redis? In the case above i am saving the four thing in each element, and i need to split by the underscore in the server language.


Source: (StackOverflow)

Using gevent-socketio paster integration causes my application to be unresponsive

I am writing a Pyramid application that relies on gevent-socketio and redis. However, I noticed that when I navigate away from the view that establishes the socket.io connection, my application becomes unresponsive. In order to try and isolate the issue, I created another bare-bones application and discovered that using pubsub.listen() was causing the issue:

class TestNamespace(BaseNamespace):

    def initialize(self):
        self.spawn(self.emitter)

    def emitter(self):
        client = redis.pubsub()
        client.subscribe('anything')
        for broadcast in client.listen():
            if broadcast['type'] != 'message':
                continue

The way I'm starting up my application is as follows:

pserve --reload development.ini

However, I can only get my application to work if use use the serve.py from the examples:

import os.path

from socketio.server import SocketIOServer
from pyramid.paster import get_app
from gevent import monkey; monkey.patch_all()

HERE = os.path.abspath(os.path.dirname(__file__))

if __name__ == '__main__':

    app = get_app(os.path.join(HERE, 'development.ini'))
    print 'Listening on port http://0.0.0.0:8080 and on port 10843 (flash policy server)'

    SocketIOServer(('0.0.0.0', 8080), app,
        resource="socket.io", policy_server=True,
        policy_listener=('0.0.0.0', 10843)).serve_forever()

Unfortunatey this is rather cumbersome for development as I lose --reload functionality. Ideally I'd like to use the paster integration entry point

Another thing I noticed is that the gevent-sockectio paster integration does not monkey patch gevent, whereas the examples server.py does.

How can I get pserve --reload to work with gevent-socketio?

I've uploaded my test application to github: https://github.com/m-martinez/iotest


Source: (StackOverflow)

Terminate a hung redis pubsub.listen() thread

Related to this question I have the following code which subscribes to a redis pubsub queue and uses the handler provided in __init__ to feed the messages to the class that processes them:

from threading import Thread
import msgpack

class Subscriber(Thread):

    def __init__(self, redis_connection, channel_name, handler):
        super(Subscriber, self).__init__(name="Receiver")
        self.connection = redis_connection
        self.pubsub = self.connection.pubsub()
        self.channel_name = channel_name
        self.handler = handler
        self.should_die = False

    def start(self):
        self.pubsub.subscribe(self.channel_name)
        super(Subscriber, self).start()

    def run(self):
        for msg in  self.pubsub.listen():
            if self.should_die:
                return
            try:
                data = msg["data"]
                unpacked = msgpack.unpackb(data)
            except TypeError:
                # stop non-msgpacked, invalid, messages breaking stuff
                # other validation happens in handler
                continue
            self.handler(unpacked)

    def die(self):
        self.should_die = True

In the linked question above, it is noted that pubsub.listen() never returns if the connection is dropped. Therefore, my die() function, while it can be called, will never actually cause the thread to terminate because it is hanging on the call to listen() inside the thread's run().

The accepted answer on the linked question mentions hacking redis-py's connection pool. I really don't want to do this and have a forked version of redis-py (at least until the fix is hopefully accepted into master), but I've had a look at the redis-py code anyway and don't immediately see where this change would be made.

Does anyone have an idea how to cleanly solve the hanging redis-py listen() call?

What issues will I incur by directly using Thread._Thread__stop?


Source: (StackOverflow)

What's the difference between the API of Redis and StrictRedis?

I'm working on a project with redis.py, I works when I connect the app to a Redis client, but failed with StrictRedis.

So, I wanna know the difference between the two, but searched with no satisfied answer.

My project is here: https://github.com/kxxoling/librorum Sorry for the Chinese annotation!


Source: (StackOverflow)

Share redis connection between django views

While debugging I've noticed that each redis accessing django view uses a separate redis connection.

Why is this so?
Is django using a thread per view and redis-py creates a connection per thread? Or is it some other reason?

How can I make django share a single connection between the various views?


Source: (StackOverflow)

Insert a new database in redis using redis.StrictRedis()

I know that Redis have 16 databases by default, but what if i need to add another database, how can i do that using redis-py?


Source: (StackOverflow)

Debugging memory leak in Python service that uses Redis pub/sub

I'm trying to build a Python 2.7 service using Redis publish/subscribe functionality. I use redis 2.8.17 on Ubuntu 12.04 with redis-py 2.10.3 as a client. Unfortunately my service seems to be leaking memory. The memory consumption seems to increase linearl-ish with the amount of messages the service receives/consumes/handles.

I tried to debug this using the tool memory_profiler by decorating my main subscribe loop. In order to have it print output continuously, I changed it to exits every every hundredth message it receives. The output looks like this:

Line #    Mem usage    Increment   Line Contents
================================================
    62     39.3 MiB      0.0 MiB       @memory_profiler.profile
    63                                 def _listen(self, callback):
    64     40.1 MiB      0.7 MiB           for _ in self.redis_pubsub.listen():
    65     40.1 MiB      0.0 MiB               self.count += 1
    66     40.1 MiB      0.0 MiB               self._consume(callback)
    67     40.1 MiB      0.0 MiB               if self.count == 100:
    68     40.1 MiB      0.0 MiB                   self.count = 0
    69     40.1 MiB      0.0 MiB                   break
    70     40.1 MiB      0.0 MiB           gc.collect()

It reports a similar increase for every hundred message pushed to the service. The callback is the function that actually does application things, so line 65 is where I'd actually expect a memory increase if there was something wrong in my app code ..

The output made me suspect the redis client so I also checked the size of the self.redis_pubsub and redis.StrictRedis objects using pympler.asizeof. These objects are small to begin with and does not increase at all as the service receives messages.

Further, when trying to look for leaking objects using pympler.muppy and pympler.summarize, it does not report any growing object-counts or accumulating memory whatsoever. Also, the total numbers for memory consumptions and growth does not resemble the numbers provided by top in Linux.

I'm stuck, do anyone have any idea what might be going on or have any ideas on how I can debug this further?


Source: (StackOverflow)

Redis page counter

I am writing a flask app that will count page views from multiple websites. I've decided to use Redis and Redispy but I am having a hard time deciding how to structure. Originally I tried to have a something like this

redis.set("date:YYYYMMDD:site:sitename", 1)

I want to be able to query by date, or by site name and display the count value. I tried to use .keys to query by date or by site name but the REDIS docs say to avoid using keys.
So then I thought maybe I can use redis hashes:

redis.hset("date:YYYYMMDD", "site", "sitename")
redis.hset("counter", 1)

Ultimately I want to be able run reports on site counters by site name, by date, or display all values from all dates. I can't seem to find the right structure to allow me get what I want.

Any suggestions would be greatly appreciated! I have not used REDIS before.


Source: (StackOverflow)

python redis pubsub blocking

import redis
import threading

class Listener(threading.Thread):
    def __init__(self, r, channel):
        threading.Thread.__init__(self)
        self.redis = r
        self.pubsub = self.redis.pubsub()
        self.pubsub.subscribe(channel)

def run(self):
    for item in self.pubsub.listen():
        # do stuff
        pass

in the above code how I do stop the thread?

Below I have an example code to show you what I would like:

class Listener(threading.Thread):
    def __init__(self, r, channel):
        threading.Thread.__init__(self)
         self.redis = r
         self.pubsub = self.redis.pubsub()
         self.pubsub.subscribe(channel)
         self.stop = False

    def run(self):
        while not stop:
            # get item from channel

So when the attribute stop == True the thread will exit the loop and will finish. Is that possible? If it is not what are the alternatives?


Source: (StackOverflow)

lock doesn't work with redis-py - unknown command 'EVALSHA'

The following creates, acquires and releases a lock in redis:

import redis

redis_url = 'redis://127.0.0.1:6379/'
redis_conn = redis.from_url(redis_url)

l = redis_conn.lock('lock-test')
l.acquire()
l.release()

How come upon acquire() the following error is thrown?

Traceback (most recent call last):
  File "C:/dev/myproj/test.py", line 11, in <module>
    l.acquire()
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\lock.py", line 111, in acquire
    if self.do_acquire(token):
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\lock.py", line 258, in do_acquire
    client=self.redis))
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\client.py", line 2646, in __call__
    return client.evalsha(self.sha, len(keys), *args)
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\client.py", line 1911, in evalsha
    return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\client.py", line 565, in execute_command
    return self.parse_response(connection, command_name, **options)
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\client.py", line 577, in parse_response
    response = connection.read_response()
  File "C:\dev\myproj\venv_myproj\lib\site-packages\redis\connection.py", line 574, in read_response
    raise response
redis.exceptions.ResponseError: unknown command 'EVALSHA'

My setup: python 2.7.8, redis 2.4.5 64bit, win8


Source: (StackOverflow)

in redis-py , is redis.StrictRedis.pipe thread safe?

short question.
I'm using redis-py to set some keys on my redis server and I'm experiencing some weird behaviour.
I suspect it has something to do with the StrictRedis.pipe. I have multiple threads pushing commands to the same pipe and after a while I run execute on this pipe and run all of its commands. I wanted to know if the pipe is thread-safe? can I push commands from multiple threads without any synchronisation mechanism?

Thank you.


Source: (StackOverflow)

What are equivalent functions of MULTI and EXEC commands in redis-py?

I tested all the transaction commands (MULTI, EXEC, WATCH, DISCARD) in redis-cli. But when i tried with redis-py the following error occurred:

AttributeError: 'Redis' object has no attribute 'multi'

I have tried the following code snippet:

import redis,time

r = redis.Redis()
try:
    r.set("transError",10)
    r.watch("transError")
    var = r.get("transError")
    var = int(var) + 1
    print "Run other client to simulate an error without transaction"
    time.sleep(4)
    r.multi()
    r.set("transError",var)
    r.execute()
    print "Value in first client",r.get("transError")

except redis.WatchError:
    print "Value Altered"

I have seen code example that is using multi() and execute() but they are not working for me. Any help?


Source: (StackOverflow)

Python redis pubsub: what happen to types when it gets published?

pub.py

import redis
import datetime
import time

def main():
    redis_host = '10.235.13.29'
        r = redis.client.StrictRedis(host=redis_host, port=6379)
        while True:
            now = datetime.datetime.now()
            print 'Sending {0}'.format(now)
            print 'data type is %s' % type(now)
            r.publish('clock', now)
            time.sleep(1)

if __name__ == '__main__':
  main()

OUTPUT:

Sending 2014-10-08 13:10:58.338765
data type is <type 'datetime.datetime'>
Sending 2014-10-08 13:10:59.368707
data type is <type 'datetime.datetime'>
Sending 2014-10-08 13:11:00.378723
data type is <type 'datetime.datetime'>
Sending 2014-10-08 13:11:01.398132
data type is <type 'datetime.datetime'>
Sending 2014-10-08 13:11:02.419030
data type is <type 'datetime.datetime'>

sub.py

import redis
import threading
import time
import datetime

def callback():
    redis_host = '10.235.13.29'
    r = redis.client.StrictRedis(host=redis_host, port=6379)
    sub = r.pubsub()
    sub.subscribe('clock')
    while True:
        for m in sub.listen():
            #print m #'Recieved: {0}'.format(m['data'])
            now = datetime.datetime.now()
            print 'Recieved: %s at %s' % (m['data'], now)
            print 'Data type is %s' % type(m['data'])
            dur = 1
            print 'It took %s to receive' % dur

def main():
    t = threading.Thread(target=callback)
    t.setDaemon(True)
    t.start()
    while True:
        print 'Waiting'
        time.sleep(30)

if __name__ == '__main__':
    main()

OUTPUT:

{}: ./sub.py
Waiting
Recieved: 1 at 2014-10-08 13:09:36.708088
Data type is <type 'long'>
It took 1 to receive
Recieved: 2014-10-08 13:09:37.629664 at 2014-10-08 13:09:37.630479
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:38.630661 at 2014-10-08 13:09:38.631585
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:39.632663 at 2014-10-08 13:09:39.633480
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:40.633662 at 2014-10-08 13:09:40.634464
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:41.634665 at 2014-10-08 13:09:41.635557
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:42.635662 at 2014-10-08 13:09:42.636673
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:43.642665 at 2014-10-08 13:09:43.643441
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:44.643663 at 2014-10-08 13:09:44.644582
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:45.644667 at 2014-10-08 13:09:45.673734
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:46.672918 at 2014-10-08 13:09:46.673874
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:47.673913 at 2014-10-08 13:09:47.675014
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:48.674920 at 2014-10-08 13:09:48.675804
Data type is <type 'str'>
It took 1 to receive
Recieved: 2014-10-08 13:09:49.675912 at 2014-10-08 13:09:49.677346
Data type is <type 'str'>

The type changed from datetime.datetime to str
Is it possible to preserve the type because i am trying to find the duration i cant subtracte datetime obj to str?


Source: (StackOverflow)

redis-py "ConnectionError: Socket closed on remote end"

Using redis-py's PubSub class I sometimes get the following exception:

Exception in thread listener_2013-10-24 12:50:31.687000:
Traceback (most recent call last):
  File "c:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "c:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Users\Administrator\Documents\my_proj\my_module.py", line 69, in _listen
    for message in _pubsub.listen():
  File "C:\Users\Administrator\virtual_environments\spyker\lib\site-packages\redis\client.py", line 1555, in listen
    r = self.parse_response()
  File "C:\Users\Administrator\virtual_environments\spyker\lib\site-packages\redis\client.py", line 1499, in parse_response
    response = self.connection.read_response()
  File "C:\Users\Administrator\virtual_environments\spyker\lib\site-packages\redis\connection.py", line 306, in read_response
    response = self._parser.read_response()
  File "C:\Users\Administrator\virtual_environments\spyker\lib\site-packages\redis\connection.py", line 106, in read_response
    raise ConnectionError("Socket closed on remote end")
ConnectionError: Socket closed on remote end

What would cause such an event?
If I catch this exception, what would be a reasonable handling logic? Would retrying listen() be futile?

The reason for asking and not simply trying is that I do not know how to reproduce this problem. It's rare but it's detrimental, so I must create some logic before this error strikes again.


Source: (StackOverflow)