RedisGears Examples ¶
The following sections consist of various recipes and basic examples showing the uses for RedisGears.
To contribute your example or recipe (and get the credit for it), click the "Edit this page" button at the top to submit a Pull Request.
Recipes ¶
This is a list of RedisGears recipes that can be used as-is or as a source of inspiration.
Recipe | Description | Author | License | URL |
---|---|---|---|---|
WriteBehind | Write-Behind and Write-Through from Redis to other SQL/No-SQL databases | Redis | BSD-3-Clause | git |
AnimalRecognitionDemo | An example of using Redis Streams, RedisGears and RedisAI for Real-time Video Analytics (i.e. filtering cats) | Redis | BSD-3-Clause | git |
EdgeRealtimeVideoAnalytics | An example of using Redis Streams, RedisGears, RedisAI and RedisTimeSeries for Realtime Video Analytics (i.e. counting people) | Redis | Apache-2.0 | git |
FraudDetectionDemo | An example that combines several Redis data structures and along with RedisGears and RedisAI to showcase the advantage of data locality during transaction scoring | Redis | BSD-3-Clause | git |
AdGears | Using RedisGears to maximize advertising revenue example. Utilizes RedisGears, RedisTimeSeries and RedisBloom. | Redis | BSD-3-Clause | git |
Word Count ¶
The counting of words.
Author: Redis
Assumptions
All keys store Redis String values. Each value is a sentence.
Python API
gb = GearsBuilder()
gb.map(lambda x: x['value']) # map records to "sentence" values
gb.flatmap(lambda x: x.split()) # split sentences to words
gb.countby() # count each word's occurrences
gb.run()
Delete by Key Prefix ¶
Deletes all keys whose name begins with a specified prefix and return their count.
Author: Redis
Assumptions
There may be keys in the database. Some of these may have names beginning with the "delete_me:" prefix.
Python API
gb = GearsBuilder()
gb.map(lambda x: x['key']) # map the records to key names
gb.foreach(lambda x: execute('DEL', x)) # delete each key
gb.count() # count the records
gb.run('delete_me:*')
Average on age field in json ¶
Calculates the average
age
scanning all JSON docs keys that start with prefix
docs
Author: Redis
Assumptions
JSON module is also loaded to Redis
Python API
import json
gb = GB()
gb.map(lambda r: json.loads(execute('json.get', r['key'], '$'))) # read the json
gb.map(lambda x: x['age']) # extrac age field
gb.avg() # calculate avg
gb.run("doc*") # run the execution on all keys start with doc*
Basic Redis Stream Processing ¶
Copy every new message from a Redis Stream to a Redis Hash key.
Author: Redis
Assumptions
An input Redis Stream is stored under the "mystream" key.
Python API
gb = GearsBuilder('StreamReader')
gb.foreach(lambda x: execute('HMSET', x['streamId'], *x)) # write to Redis Hash
gb.register('mystream')
Automatic Expiry ¶
Sets the time to live (TTL) for every updated key to one hour.
Author: Redis
Python API
gb = GB()
gb.foreach(lambda x: execute('EXPIRE', x['key'], 3600))
gb.register('*', mode='sync', readValue=False)
Author: Redis
Keyspace Notification Processing ¶
This example demonstrates a two-step process that:
- Synchronously captures distributed keyspace events
- Asynchronously processes the events' stream
Specifically, the example shows how expired key names can be output to the log.
Author: Redis
def process(x):
'''
Processes a message from the local expiration stream
Note: in this example we simply print to the log, but feel free to replace
this logic with your own, e.g. an HTTP request to a REST API or a call to an
external data store.
'''
log(f"Key '{x['value']['key']}' expired at {x['id'].split('-')[0]}")
# Capture an expiration event and adds it to the shard's local 'expired' stream
cap = GB('KeysReader')
cap.foreach(lambda x:
execute('XADD', f'expired:{hashtag()}', '*', 'key', x['key']))
cap.register(prefix='*',
mode='sync',
eventTypes=['expired'],
readValue=False)
# Consume new messages from expiration streams and process them somehow
proc = GB('StreamReader')
proc.foreach(process)
proc.register(prefix='expired:*',
batch=100,
duration=1)
Reliable Keyspace Notification ¶
Capture each keyspace event and store to a Stream
Author: Redis
Python API
GearsBuilder() \
.foreach(lambda x: execute('XADD', "notifications-stream", '*', *sum([[k,v] for k,v in x.items()],[]))) \
.register(prefix="person:*", eventTypes=['hset', 'hmset'], mode='sync')
Distributed Monte Carlo to Estimate pi ¶
Estimate pi by throwing darts at a carefully-constructed dartboard.
There are far better ways to get Pi's value
This example is intended for educational purposes only. For all practical purposes, you'd be better off using the constant value 3.14159265359.
Author: Redis
Python API
TOTAL_DARTS = 1000000 # total number of darts
def inside(p):
''' Generates a random point that is or isn't inside the circle '''
from random import random
x, y = random(), random()
return x*x + y*y < 1
def throws():
''' Calculates each shard's number of throws '''
global TOTAL_DARTS
throws = TOTAL_DARTS
ci = execute('RG.INFOCLUSTER')
if type(ci) is not str: # assume a cluster
n = len(ci[2]) # number of shards
me = ci[1] # my shard's ID
ids = [x[1] for x in ci[2]].sort() # shards' IDs list
i = ids.index(me) # my index
throws = TOTAL_DARTS // n # minimum throws per shard
if i == 0 and TOTAL_DARTS % n > 0: # first shard gets remainder
throws += 1
yield throws
def estimate(hits):
''' Estimates Pi's value from hits '''
from math import log10
hits = hits * 4 # one quadrant is used
r = hits / 10 ** int(log10(hits)) # make it irrational
return f'Pi\'s estimated value is {r}'
gb = GB('PythonReader')
gb.flatmap(lambda x: [i for i in range(int(x))]) # throw the local darts
gb.filter(inside) # throw out missed darts
gb.accumulate(lambda a, x: 1 + (a if a else 0)) # count the remaining darts
gb.collect() # collect the results
gb.accumulate(lambda a, x: x + (a if a else 0)) # merge darts' counts
gb.map(estimate) # four pieces of pie
gb.run(throws)