Skip to content

RedisGears Examples

The following sections consist of various recipes and basic examples showing the uses for RedisGears.

To contribute your example or recipe (and get the credit for it), click the "Edit this page" button at the top to submit a Pull Request.

Recipes

This is a list of RedisGears recipes that can be used as-is or as an inspiration source.

Recipe Description Author License URL
WriteBehind Write-Behind and Write-Through from Redis to other SQL/No-SQL databases RedisLabs BSD-3-Clause git
AnimalRecognitionDemo An example of using Redis Streams, RedisGears and RedisAI for Realtime Video Analytics (i.e. filtering cats) RedisLabs BSD-3-Clause git
EdgeRealtimeVideoAnalytics An example of using Redis Streams, RedisGears, RedisAI and RedisTimeSeries for Realtime Video Analytics (i.e. counting people) RedisLabs Apache-2.0 git
FraudDetectionDemo An example that combines several Redis data structures and along with RedisGears and RedisAI to showcase the advantage of data locality during transaction scoring RedisLabs BSD-3-Clause git

Word Count

The counting of words.

Author: RedisLabs

Assumptions

All keys store Redis String values. Each value is a sentence.

Python API

gb = GearsBuilder()
gb.map(lambda x: x['value'])     # map records to "sentence" values
gb.flatmap(lambda x: x.split())  # split sentences to words
gb.countby()                     # count each word's occurances
gb.run()

Delete by Key Prefix

Deletes all the keys with name beginning with a prefix and return their count.

Author: RedisLabs

Assumptions

There may be keys in the database. Some of these may have names beginning with the "delete_me:" prefix.

Python API

gb = GearsBuilder()
gb.map(lambda x: x['key'])               # map the records to key names
gb.foreach(lambda x: execute('DEL', x))  # delete each key
gb.count()                               # count the records
gb.run('delete_me:*')

Basic Redis Stream Processing

Copy every new message from the Redis Stream to a Redis Hash key.

Author: RedisLabs

Assumptions

An input Redis Stream is stored under the "mystream" key.

Python API

gb = GearsBuilder('StreamReader')
gb.foreach(lambda x: execute('HMSET', x['streamId'], *x))  # write to Redis Hash
gb.register('mystream')

Automatic Expiry

Sets the time to live (TTL) for every updated key to one hour.

Author: RedisLabs

Python API

gb = GB()
gb.foreach(lambda x: execute('EXPIRE', x['key'], 3600))
gb.register('*', mode='sync', readValue=False)

Author: RedisLabs

Keyspace Notification Processing

This example demonstrates a two-step process that:

  1. Synchronously captures distributed keyspace events
  2. Asynchronously processes the events' stream

Specifically, the example shows how expired key names can be output to the log.

Author: RedisLabs

def process(x):
    '''
    Processes a message from the local expiration stream
    Note: in this example we simply print to the log, but feel free to replace
    this logic with your own, e.g. an HTTP request to a REST API or a call to an
    external data store.
    '''
    log(f"Key '{x['value']['key']}' expired at {x['id'].split('-')[0]}")

# Capture an expiration event and adds it to the shard's local 'expired' stream
cap = GB('KeysReader')
cap.foreach(lambda x:
            execute('XADD', f'expired:{hashtag()}', '*', 'key', x['key']))
cap.register(prefix='*',
             mode='sync',
             eventTypes=['expired'],
             readValue=False)

# Consume new messages from expiration streams and process them somehow
proc = GB('StreamReader')
proc.foreach(process)
proc.register(prefix='expired:*',
              batch=100,
              duration=1)

Distributed Monte Carlo Estimation of Pi's Value

Estimate Pi by throwing darts at a carefully-constructed dartboard.

There are far better ways to get Pi's value

This example is intended for educational purposes only. For all practical purposes, you'd be better off using the constant value of 3.14159265359.

Author: RedisLabs

Python API

TOTAL_DARTS = 1000000                            # total number of darts

def inside(p):
    ''' Generates a random point that is or isn't inside the circle '''
    from random import random
    x, y = random(), random()
    return x*x + y*y < 1

def throws():
    ''' Calculates each shard's number of throws '''
    global TOTAL_DARTS
    throws = TOTAL_DARTS
    ci = execute('RG.INFOCLUSTER')
    if type(ci) is not str:                       # assume a cluster
        n = len(ci[2])                            # number of shards
        me = ci[1]                                # my shard's ID
        ids = [x[1] for x in ci[2]].sort()        # shards' IDs list
        i = ids.index(me)                         # my index
        throws = TOTAL_DARTS // n                 # minimum throws per shard
        if i == 0 and TOTAL_DARTS % n > 0:        # first shard gets remainder
            throws += 1
    yield throws

def estimate(hits):
    ''' Estimates Pi's value from hits '''
    from math import log10
    hits = hits * 4                               # one quadrant is used
    r = hits / 10 ** int(log10(hits))             # make it irrational
    return f'Pi\'s estimated value is {r}'

gb = GB('PythonReader')
gb.flatmap(lambda x: [i for i in range(int(x))])  # throw the local darts
gb.filter(inside)                                 # throw out missed darts
gb.accumulate(lambda a, x: 1 + (a if a else 0))   # count the remaining darts
gb.collect()                                      # collect the results
gb.accumulate(lambda a, x: x + (a if a else 0))   # merge darts' counts
gb.map(estimate)                                  # four pieces of pie
gb.run(throws)