RedisGears Examples ¶
The following sections consist of various recipes and basic examples showing the uses for RedisGears.
To contribute your example or recipe (and get the credit for it), click the "Edit this page" button at the top to submit a Pull Request.
Recipes ¶
This is a list of RedisGears recipes that can be used as-is or as an inspiration source.
Recipe | Description | Author | License | URL |
---|---|---|---|---|
WriteBehind | Write-Behind and Write-Through from Redis to other SQL/No-SQL databases | RedisLabs | BSD-3-Clause | git |
AnimalRecognitionDemo | An example of using Redis Streams, RedisGears and RedisAI for Realtime Video Analytics (i.e. filtering cats) | RedisLabs | BSD-3-Clause | git |
EdgeRealtimeVideoAnalytics | An example of using Redis Streams, RedisGears, RedisAI and RedisTimeSeries for Realtime Video Analytics (i.e. counting people) | RedisLabs | Apache-2.0 | git |
FraudDetectionDemo | An example that combines several Redis data structures and along with RedisGears and RedisAI to showcase the advantage of data locality during transaction scoring | RedisLabs | BSD-3-Clause | git |
Word Count ¶
The counting of words.
Author: RedisLabs
Assumptions
All keys store Redis String values. Each value is a sentence.
Python API
gb = GearsBuilder()
gb.map(lambda x: x['value']) # map records to "sentence" values
gb.flatmap(lambda x: x.split()) # split sentences to words
gb.countby() # count each word's occurances
gb.run()
Delete by Key Prefix ¶
Deletes all the keys with name beginning with a prefix and return their count.
Author: RedisLabs
Assumptions
There may be keys in the database. Some of these may have names beginning with the "delete_me:" prefix.
Python API
gb = GearsBuilder()
gb.map(lambda x: x['key']) # map the records to key names
gb.foreach(lambda x: execute('DEL', x)) # delete each key
gb.count() # count the records
gb.run('delete_me:*')
Basic Redis Stream Processing ¶
Copy every new message from the Redis Stream to a Redis Hash key.
Author: RedisLabs
Assumptions
An input Redis Stream is stored under the "mystream" key.
Python API
gb = GearsBuilder('StreamReader')
gb.foreach(lambda x: execute('HMSET', x['streamId'], *x)) # write to Redis Hash
gb.register('mystream')
Automatic Expiry ¶
Sets the time to live (TTL) for every updated key to one hour.
Author: RedisLabs
Python API
gb = GB()
gb.foreach(lambda x: execute('EXPIRE', x['key'], 3600))
gb.register('*', mode='sync', readValue=False)
Author: RedisLabs
Keyspace Notification Processing ¶
This example demonstrates a two-step process that:
- Synchronously captures distributed keyspace events
- Asynchronously processes the events' stream
Specifically, the example shows how expired key names can be output to the log.
Author: RedisLabs
def process(x):
'''
Processes a message from the local expiration stream
Note: in this example we simply print to the log, but feel free to replace
this logic with your own, e.g. an HTTP request to a REST API or a call to an
external data store.
'''
log(f"Key '{x['value']['key']}' expired at {x['id'].split('-')[0]}")
# Capture an expiration event and adds it to the shard's local 'expired' stream
cap = GB('KeysReader')
cap.foreach(lambda x:
execute('XADD', f'expired:{hashtag()}', '*', 'key', x['key']))
cap.register(prefix='*',
mode='sync',
eventTypes=['expired'],
readValue=False)
# Consume new messages from expiration streams and process them somehow
proc = GB('StreamReader')
proc.foreach(process)
proc.register(prefix='expired:*',
batch=100,
duration=1)
Distributed Monte Carlo Estimation of Pi's Value ¶
Estimate Pi by throwing darts at a carefully-constructed dartboard.
There are far better ways to get Pi's value
This example is intended for educational purposes only. For all practical purposes, you'd be better off using the constant value of 3.14159265359.
Author: RedisLabs
Python API
TOTAL_DARTS = 1000000 # total number of darts
def inside(p):
''' Generates a random point that is or isn't inside the circle '''
from random import random
x, y = random(), random()
return x*x + y*y < 1
def throws():
''' Calculates each shard's number of throws '''
global TOTAL_DARTS
throws = TOTAL_DARTS
ci = execute('RG.INFOCLUSTER')
if type(ci) is not str: # assume a cluster
n = len(ci[2]) # number of shards
me = ci[1] # my shard's ID
ids = [x[1] for x in ci[2]].sort() # shards' IDs list
i = ids.index(me) # my index
throws = TOTAL_DARTS // n # minimum throws per shard
if i == 0 and TOTAL_DARTS % n > 0: # first shard gets remainder
throws += 1
yield throws
def estimate(hits):
''' Estimates Pi's value from hits '''
from math import log10
hits = hits * 4 # one quadrant is used
r = hits / 10 ** int(log10(hits)) # make it irrational
return f'Pi\'s estimated value is {r}'
gb = GB('PythonReader')
gb.flatmap(lambda x: [i for i in range(int(x))]) # throw the local darts
gb.filter(inside) # throw out missed darts
gb.accumulate(lambda a, x: 1 + (a if a else 0)) # count the remaining darts
gb.collect() # collect the results
gb.accumulate(lambda a, x: x + (a if a else 0)) # merge darts' counts
gb.map(estimate) # four pieces of pie
gb.run(throws)