RedisGears Readers ¶
The reader is the mandatory first step of any RedisGears function , and every function has exactly one reader. A reader reads data and generates input records from it. The input records are consumed by the function.
A function's reader is declared when initializing its
class
GearsBuilder
context builder
.
RedisGears supports several types of readers that operate on different types of input data. Furthermore, each reader may be used to process batch streaming data.
Reader | Output | Batch | Event |
---|---|---|---|
KeysReader | Redis keys and values | Yes | Yes |
KeysOnlyReader | Redis keys | Yes | No |
StreamReader | Redis Stream messages | Yes | Yes |
PythonReader | Arbitrary | Yes | No |
ShardsIDReader | Shard ID | Yes | No |
CommandReader | Command arguments | No | Yes |
The following sections describe the different readers' operation.
KeysReader ¶
The KeysReader scans the Redis database.
It generates records from keys and their respective values.
Input
The reader scans the entire database and any keys that are found can be used as input for generating records.
Output
A record is output for each input key. The record is a dictionary structure that has four keys and their respective values:
- 'key' : the name of the key
-
'value'
: the value of the key (
None
if the deleted) - 'type' : the core Redis type may be: 'string', 'hash', 'list', 'set', 'zset' or 'stream'
-
'event'
: the event that triggered the execution (
None
if the execution was created via the run function)
Batch Mode
The reader scans the entire database for keys. For each key found, it first reads a key's name, then fetches its value (unless used with
readValue
=
False
argument) and finally generates a record.
Its operation can be controlled by the following means:
- Glob-like pattern: generates records only for key names that match the pattern
- Read value: a Boolean specifying whether the value is read or not
- Use scanning: a Boolean specifying whether to scan and match the pattern or use it as an explicit key name
Event Mode
The reader is executed in response to events that are generated from write operations in the Redis database.
Its operation can be controlled with the following means:
- Prefix: generates records only for key names that start with the prefix
- Events: same, but only for whitelisted events
- Types: same, but only for whitelisted data types
- Read value: a Boolean specifying whether the value is read or not
Python API
Batch Mode
class GearsBuilder('KeysReader', defaultArg='*').run(noScan=False, readValue=True)
Arguments
- defaultArg : a glob-like pattern of key names
-
noScan
: when
True
the pattern is used as an explicit key name -
readValue
: when
False
the value will not be read, so the 'type' and 'value' of the record will be set toNone
Event Mode
class GearsBuilder('KeysReader').register(prefix='*', eventTypes=None,
keyTypes=None, readValue=True)
Arguments
- prefix : a prefix of key names
-
eventTypes
: a whitelist of event types that trigger execution when the
KeysReader
are used. The list may contain one or more:
- Any Redis or module command
- Any Redis event
-
keyTypes
: a whitelist of key types that trigger execution when using the
KeysReader
or
KeysOnlyReader
readers. The list may contain one or more from the following:
- Redis core types: 'string', 'hash', 'list', 'set', 'zset' or 'stream'
- Redis module types: 'module'
-
readValue
: when
False
the value will not be read, so the 'type' and 'value' of the record will be set toNone
Return
The output record's type is a
dict
.
The value is cast from the Redis type as follows:
Redis type | Python type |
---|---|
string |
str
|
hash |
dict
|
list |
list
|
set |
None
|
zset |
None
|
stream |
None
|
module |
None
|
Examples
Batch Mode
# KeysReader will return any key matching the glob pattern 'foo*'
gb = GB('KeysReader')
gb.run('foo*')
Event Mode
# KeysReader will trigger for keys prefixed by 'bar' of type 'string' or 'hash'
gb = GB('KeysReader')
gb.register('bar', keyTypes=['string', 'hash'])
KeysOnlyReader ¶
The
KeysOnlyReader
is implemented as a
PythonReader
and therefore does not support the
register()
action
. It returns keys' names as string records.
Input
The reader scans the entire database and any keys that are found can be used as input for generating records.
Output
A record is output for each input key. The record is a simple string that is the key's name.
Batch Mode
The reader scans the entire database for keys. For each key found, it returns the key as a string record.
Its operation can be controlled by the following means:
- Glob-like pattern: generates records only for key names that match the pattern
- Scan count: the amount of effort each scan iteration will invest
- Use scanning: a Boolean specifying whether to scan and match the pattern or use it as an explicit key name
- Shard-specific pattern: customizes the pattern for each shard
Event Mode
Not supported.
Python API
Batch Mode
class GearsBuilder('KeysOnlyReader').run(pattern='*', count=1000, noScan=False, patternGenerator=None)
COUNT
argument to the
Redis
SCAN
command
*
noScan
: when
True
the pattern is used as an explicit key name
*
patternGenerator
: a callback that returns a shard-specific pattern. If provided it overrides the
pattern
and
noScan
arguments. The callback will be called by each shard and is expected to return a tuple with two items, the first being the
pattern
string and the other the
noScan
value.
StreamReader ¶
The StreamReader reads the messages from one or more Redis Streams and generates records from these.
Input
The reader reads messages from the Stream value of a Redis key.
Output
A record is output for each message in the input Stream. The record is a dictionary structure with the following key:
- key : the Stream key name as string
- id : the message's id in the Stream
- value : a Python dictionary containing the message's data
All field-value pairs in the message's data are included as key-value pairs in the record under the value key.
Batch Mode
The reader reads the Stream from the beginning to the last message in it. Each message generates a record.
Its operation can be controlled with the following:
- Key name: the name of the key storing the Stream
- Start ID: message ID from which to start reading messages
Event Mode
The reader is executed in response to events generated by new messages added to the stream.
Its operation can be controlled with the following:
- Key name or prefix: the name or prefix of keys that store Streams
- Batch size: the number of new messages that trigger execution
- Duration: the time to wait before execution is triggered, regardless of the batch size
-
Failure policy: the policy for handling execution failures. May be one of:
- 'continue' : ignores a failure and continues to the next execution. This is the default policy.
- 'abort' : stops further executions.
- 'retry' : retries the execution after an interval specified with onFailedRetryInterval (default is one second).
- Trimming: whether or not to trim the stream
Python API
Batch Mode
class GearsBuilder('StreamReader', defaultArg='*').run(fromId='0-0')
Arguments
- defaultArg : the name or prefix of keys that store Streams
- fromId : message id from which to start read messages
Event Mode
class GearsBuilder('StreamReader').run(prefix='*', batch=1, duration=0, onFailedPolicy='continue', onFailedRetryInterval=1, trimStream=True)
Arguments
- prefix : the name or prefix of keys that store Streams
- batch : the number of new messages that trigger execution
- duration : the time to wait before execution is triggered, regardless of the batch size (0 for no duration)
- onFailedPolicy : the policy for handling execution failures, values should be as describe above
- onFailedRetryInterval : the interval (in milliseconds) in which to retry in case onFailedPolicy is 'retry'
-
trimStream
: when
True
the stream will be trimmed after execution
Return
The output record's type is a
dict
with fields and values as explained above.
PythonReader ¶
The reader is executed with a function callback that is a Python generator .
Input
A generator function callback.
Output
Anything
yield
ed by the generator function callback.
Batch Mode
The reader iterates the generator's yielded records.
Event Mode
Not supported.
Python API
Batch Mode
class GearsBuilder('PythonReader').run(generator)
Arguments
- generator : the generator function callback
Examples
The following example shows how to use the reader with a simple generator:
# This PythonReader will generate 6379 records
def generator():
for x in range(6379):
yield x
gb = GB('PythonReader')
gb.run(generator)
In cases where the generator needs additional input arguments, use a function callback that returns a generator function like so:
# This PythonReader will generate 42 records by creating a generator function
def createGenerator(count=6379):
def generator():
nonlocal count
for x in range(count):
yield x
return generator
gb = GB('PythonReader')
gb.run(createGenerator(count=42))
ShardsIDReader ¶
The reader returns a single record that is the shard's cluster identifier.
RedisGears Trivia
The 'ShardsIDReader' is implemented using the 'PythonReader' reader and this generator callback:
def ShardReaderCallback():
res = execute('RG.INFOCLUSTER')
if res == 'no cluster mode':
yield '1'
else:
yield res[1]
Output
The shard's cluster identifier.
Batch Mode
The reader returns a single record that is the shard's identifier.
Event Mode
Not supported.
Python API
Batch Mode
class GearsBuilder('ShardsIDReader').run()
Arguments
None.
Examples
gb = GB('ShardsIDReader')
gb.map(lambda x: f'My Shard ID is {x}')
gb.run()
CommandReader ¶
The
CommandReader
reads the trigger name and arguments sent by an
RG.TRIGGER
command
.
Input
The arguments of the
RG.TRIGGER
command
.
Output
The trigger's name and arguments.
Batch Mode
Not supported.
Event Mode
The reader returns a list record with elements being the trigger's name followed by any arguments that were provided.
Python API
Event Mode
class GearsBuilder('CommandReader').register(trigger=None, inorder=True)
Arguments
- trigger : the trigger's name
-
inorder
: if
True
, the commands will run one after the other in the order they came (this option is only relevant forasync_local
executions, onsync
executions you get it by default from the sync property, and its currently not possible to promise this property onasync
executions ). Supported only on v1.0.7 and above.
Examples
First, call
RG.PYEXECUTE
with the following:
# This CommandReader will return the number of arguments provided to it
gb = GB('CommandReader')
gb.map(lambda x: f'{x[0]} got {len(x)-1} arguments! Ah ah ah!')
gb.register(trigger='CountVonCount')
Then, you can do this:
redis> RG.TRIGGER CountVonCount bat bat
1) "CountVonCount got 2 arguments! Ah ah ah!"