Monitor Celery Queue Consumption Speed
home // page // Monitor Celery Queue Consumption Speed

Monitor Celery Queue Consumption Speed

A lot of times when using the python Celery library it’s useful to be able to monitor how fast the queue is being consumed. This is useful if you need to do a back of the envelope to see how long  a queue will take to chew through, or just to see how the current status of the queue to make sure there aren’t any blockages. So I decided to whip up a simple script to do just that. While this is no replacement for Flower, it’s still a great little tool to have.

import argparse
import redis
import sys
import time


def celery_speed(redis_connection, celery_queue_name):
    """Display the speed at which items in the celery queue are being
    consumed.

    :param redis_connection: A connection to redis
    :type redis_connection: redis.StrictRedis
    :param celery_queue_name: Name of celery queue.
    :type celery_queue_name: str or unicode
    """
    last_value, last_time = None, None

    while True:
        try:
            curr_value = redis_connection.llen(celery_queue_name)
            curr_time = time.time()
            if last_value and last_time:
                speed = (
                    (curr_value - last_value) /
                    float(curr_time - last_time)
                )
                sys.stdout.write(
                    '{} total, {:0.02f} items/sec\r'.format(
                        curr_value, speed
                    )
                )
                sys.stdout.flush()
            last_value = curr_value
            last_time = curr_time
            time.sleep(1)
        except KeyboardInterrupt:
            print 'Shutdown'
            sys.exit()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Show speed at which celery processing queue items.'
    )
    parser.add_argument(
        '--redis-host', dest='redis_host', default='localhost',
        help='(Optional, default is localhost) The redis host'
    )
    parser.add_argument(
        '--queue-name', dest='queue_name', default='celery',
        help='(Optional, default is celery) The name of the celery queue.'
    )
    args = parser.parse_args()
    print 'host: {}, queue: {}'.format(args.redis_host, args.queue_name)
    print 'negative = queue decreasing, positive = queue increasing'
    print
    redis_host = (args.redis_host or 'localhost')
    celery_queue_name = (args.queue_name or 'celery')
    redis_connection = redis.StrictRedis(redis_host)
    celery_speed(redis_connection, celery_queue_name)

You use this script as follows:

$ celery_speed
host: localhost, queue: celery
negative = queue decreasing, positive = queue increasing

1238 total, -15.3 items/sec

You can find the whole gist below.

Complete Gist Of Code