Skip to content

RedisGears Functions Runtime

Python RedisGears functions are run using an embedded Python interpreter. Each function uses a separate sub-interpreter. All functions share the same environment and dependencies. The environment is imported with several defaults.

The following sections describe the runtime environment.

Python Interpreter

RedisGears embeds a Python version 3.7.2+ interpreter.

All functions use this interpreter. Each call to RG.PYEXECUTE maintains its own globals dictionary that isolates its execution context from other calls. This means that all of the functions submitted in a given call share the same interpreter and globals dictionary.

Further reference

For more information refer to:

Environment

The interpreter's environment can be extended with any dependent package that can later be imported and used by functions in their respective sub-interpreters.

Further reference

For more information about installing dependencies refer to:

GearsBuilder

The GearsBuilder class is imported to the runtime's environment by default.

It exposes the functionality of the function's context builder .

execute

The execute() function is imported to the runtime's environment by default.

This function executes an arbitrary Redis command.

Further reference

For more information about Redis commands refer to:

Python API

def execute(command, *args)

Arguments

  • command : the command to execute
  • args : the command's arguments

Examples

# Pings the server (reply should be 'PONG')
reply = execute('PING')

atomic

The atomic() Python context is imported to the runtime's environment by default.

The context ensures that all operations in it are executed atomically by blocking the main Redis process.

Python API

class atomic()

Examples

# Increments two keys atomically
def transaction(_):
    with atomic():
        execute('INCR', f'{{{hashtag()}}}:foo')
        execute('INCR', f'{{{hashtag()}}}:bar')

gb = GB('ShardsIDReader')
gb.foreach(transaction)
gb.run()

configGet

The configGet() function is imported to the runtime's environment by default.

This function fetches the current value of a RedisGears configuration option.

Python API

def configGet(key)

Arguments

  • key : the configuration option key

Examples

# Gets the current value for 'ProfileExecutions'
foo = configGet('ProfileExecutions')

gearsConfigGet

The gearsConfigGet() function is imported to the runtime's environment by default.

This function fetches the current value of a RedisGears configuration option and returns a default value if that key does not exist.

Python API

def gearsConfigGet(key, default=None)

Arguments

  • key : the configuration option key
  • default : a default value

Examples

# Gets the 'foo' configuration option key and defaults to 'bar'
foo = gearsConfigGet('foo', default='bar')

hashtag

The hashtag() function is imported to the runtime's environment by default.

This function returns a hashtag that maps to the lowest hash slot served by the local engine's shard. Put differently, it is useful as a hashtag for partitioning in a cluster.

Python API

def hashtag()

Examples

# Get the shard's hashtag
ht = hashtag()

log

The log() function is imported to the runtime's environment by default.

This function prints a message to Redis' log.

Python API

def log(message, level='notice')

Arguments

  • message : the message to output
  • level : the message's log level can be one of these:
    • 'debug'
    • 'verbose'
    • 'notice'
    • 'warning'

Examples

# Dumps every datum in the DB to the log for "debug" purposes
GB().foreach(lambda x: Log(str(x), level='debug')).run()

gearsFuture

The gearsFuture() function is imported to the runtime's environment by default.

This function returns a gearsFuture object, which allows another thread/process to process the record. Returning this object from a step's operation tells RedisGears to suspend execution until background processing had finished/failed.

The gearsFuture object provides two control methods: continueRun() and continueFailed() . Both methods are thread-safe and can be called at any time to signal that the background processing has finished. continueRun signals success and its argument is a record for the main process. continueFailed reports a failure to the main process and its argument is a string describing the failure.

Calling gearsFuture() is supported only from the context of the following operations: * map * flatmap * filter * foreach * aggregate * aggregateby

An attempt to create a gearsFuture object outside of the supported contexts will result in an exception.

Examples

import time
blocked = {}

# Example of blocking a client until a key has expired

def block(r):
    f = gearsFuture()
    key = r[1]
    if key not in blocked.keys():
        blocked[key] = []   
    blocked[key].append(f)
    return f

def unblock(r):
    res = 0
    key = r['key']
    futures = blocked.pop(key, None)
    if futures:
        res = len([f.continueRun('%s expired' % key) for f in futures])
        blocked[key]
    return res

GB('CommandReader').map(block).register(trigger='WaitForKeyExpiration', mode='sync')
GB().map(unblock).register(eventTypes=['expired'], mode='sync')

gearsFuture with Python Async Await

gearsFuture is also integrated seamlessly with Python's async/await syntax, so it possible to do the following:

import asyncio

async def c(r):
    await asyncio.sleep(1)
    return r

GB('ShardsIDReader').map(c).run()