在ApacheBeam RedisIO中,我们使用PFADD方法将几个值添加到集合中。但是,我们无法通过在写入时设置有效期来限制集合中添加的值的生存期。
以下是使用Redis命令SET和EXPIRE显式设置有效期的示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import combiners
from apache_beam import coders
class ExpiringSetFn(beam.DoFn):
def __init__(self, expiration_sec):
self.expiration_sec = expiration_sec
def process(self, element):
for (key, value) in element.items():
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
redis_client.set(key, value, ex=self.expiration_sec)
class RedisWriteSetWithExpiration(beam.PTransform):
def __init__(self, expiration_sec):
self.expiration_sec = expiration_sec
def expand(self, pcoll):
return (pcoll
| beam.ParDo(ExpiringSetFn(self.expiration_sec)))
pipeline_options = PipelineOptions()
if __name__ == '__main__':
p = beam.Pipeline(options=pipeline_options)
set_data = {
'my_set': set(['hello', 'world']),
'my_other_set': set(['goodbye', 'world'])
}
p | beam.Create([set_data]) | RedisWriteSetWithExpiration(expiration_sec=10)
p.run()
注意,在这个例子中,我们使用了一个用户定义的DoFn,来将每个元素解包为键、值对,并将其写入Redis。expiration_sec
参数控制了Redis设置的过期时间。在这种情况下,每个键/值对都将被设置为10秒的过期时间。
如果您想使用更原生的PFADD方法,您可以考虑周期性地扫描并从集合中删除过期的元素。这可以通过使用Redis中的SortedSet来实现,其中分值就是过期时间戳