Skip to content

RedisGears Examples

The following sections consist of various recipes and basic examples showing the uses for RedisGears.

To contribute your example or recipe (and get the credit for it), click the "Edit this page" button at the top to submit a Pull Request.

Recipes

This is a list of RedisGears recipes that can be used as-is or as a source of inspiration.

Recipe Description Author License URL
WriteBehind Write-Behind and Write-Through from Redis to other SQL/No-SQL databases Redis BSD-3-Clause git
AnimalRecognitionDemo An example of using Redis Streams, RedisGears and RedisAI for Real-time Video Analytics (i.e. filtering cats) Redis BSD-3-Clause git
EdgeRealtimeVideoAnalytics An example of using Redis Streams, RedisGears, RedisAI and RedisTimeSeries for Realtime Video Analytics (i.e. counting people) Redis Apache-2.0 git
FraudDetectionDemo An example that combines several Redis data structures and along with RedisGears and RedisAI to showcase the advantage of data locality during transaction scoring Redis BSD-3-Clause git
AdGears Using RedisGears to maximize advertising revenue example. Utilizes RedisGears, RedisTimeSeries and RedisBloom. Redis BSD-3-Clause git

Word Count

The counting of words.

Author: Redis

Assumptions

All keys store Redis String values. Each value is a sentence.

Python API

gb = GearsBuilder()
gb.map(lambda x: x['value'])     # map records to "sentence" values
gb.flatmap(lambda x: x.split())  # split sentences to words
gb.countby()                     # count each word's occurrences
gb.run()

Delete by Key Prefix

Deletes all keys whose name begins with a specified prefix and return their count.

Author: Redis

Assumptions

There may be keys in the database. Some of these may have names beginning with the "delete_me:" prefix.

Python API

gb = GearsBuilder()
gb.map(lambda x: x['key'])               # map the records to key names
gb.foreach(lambda x: execute('DEL', x))  # delete each key
gb.count()                               # count the records
gb.run('delete_me:*')

Average on age field in json

Calculates the average age scanning all JSON docs keys that start with prefix docs

Author: Redis

Assumptions

JSON module is also loaded to Redis

Python API

import json

gb = GB()
gb.map(lambda r: json.loads(execute('json.get', r['key'], '$'))) # read the json
gb.map(lambda x: x['age']) # extrac age field
gb.avg() # calculate avg
gb.run("doc*") # run the execution on all keys start with doc*

Basic Redis Stream Processing

Copy every new message from a Redis Stream to a Redis Hash key.

Author: Redis

Assumptions

An input Redis Stream is stored under the "mystream" key.

Python API

gb = GearsBuilder('StreamReader')
gb.foreach(lambda x: execute('HMSET', x['streamId'], *x))  # write to Redis Hash
gb.register('mystream')

Automatic Expiry

Sets the time to live (TTL) for every updated key to one hour.

Author: Redis

Python API

gb = GB()
gb.foreach(lambda x: execute('EXPIRE', x['key'], 3600))
gb.register('*', mode='sync', readValue=False)

Author: Redis

Keyspace Notification Processing

This example demonstrates a two-step process that:

  1. Synchronously captures distributed keyspace events
  2. Asynchronously processes the events' stream

Specifically, the example shows how expired key names can be output to the log.

Author: Redis

def process(x):
    '''
    Processes a message from the local expiration stream
    Note: in this example we simply print to the log, but feel free to replace
    this logic with your own, e.g. an HTTP request to a REST API or a call to an
    external data store.
    '''
    log(f"Key '{x['value']['key']}' expired at {x['id'].split('-')[0]}")

# Capture an expiration event and adds it to the shard's local 'expired' stream
cap = GB('KeysReader')
cap.foreach(lambda x:
            execute('XADD', f'expired:{hashtag()}', '*', 'key', x['key']))
cap.register(prefix='*',
             mode='sync',
             eventTypes=['expired'],
             readValue=False)

# Consume new messages from expiration streams and process them somehow
proc = GB('StreamReader')
proc.foreach(process)
proc.register(prefix='expired:*',
              batch=100,
              duration=1)

Reliable Keyspace Notification

Capture each keyspace event and store to a Stream

Author: Redis

Python API

GearsBuilder() \
.foreach(lambda x: execute('XADD', "notifications-stream", '*', *sum([[k,v] for k,v in x.items()],[]))) \
.register(prefix="person:*", eventTypes=['hset', 'hmset'], mode='sync')

Distributed Monte Carlo to Estimate pi

Estimate pi by throwing darts at a carefully-constructed dartboard.

There are far better ways to get Pi's value

This example is intended for educational purposes only. For all practical purposes, you'd be better off using the constant value 3.14159265359.

Author: Redis

Python API

TOTAL_DARTS = 1000000                            # total number of darts

def inside(p):
    ''' Generates a random point that is or isn't inside the circle '''
    from random import random
    x, y = random(), random()
    return x*x + y*y < 1

def throws():
    ''' Calculates each shard's number of throws '''
    global TOTAL_DARTS
    throws = TOTAL_DARTS
    ci = execute('RG.INFOCLUSTER')
    if type(ci) is not str:                       # assume a cluster
        n = len(ci[2])                            # number of shards
        me = ci[1]                                # my shard's ID
        ids = [x[1] for x in ci[2]].sort()        # shards' IDs list
        i = ids.index(me)                         # my index
        throws = TOTAL_DARTS // n                 # minimum throws per shard
        if i == 0 and TOTAL_DARTS % n > 0:        # first shard gets remainder
            throws += 1
    yield throws

def estimate(hits):
    ''' Estimates Pi's value from hits '''
    from math import log10
    hits = hits * 4                               # one quadrant is used
    r = hits / 10 ** int(log10(hits))             # make it irrational
    return f'Pi\'s estimated value is {r}'

gb = GB('PythonReader')
gb.flatmap(lambda x: [i for i in range(int(x))])  # throw the local darts
gb.filter(inside)                                 # throw out missed darts
gb.accumulate(lambda a, x: 1 + (a if a else 0))   # count the remaining darts
gb.collect()                                      # collect the results
gb.accumulate(lambda a, x: x + (a if a else 0))   # merge darts' counts
gb.map(estimate)                                  # four pieces of pie
gb.run(throws)