日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

使用Python操作Redis5.0新特性Stream實現訂閱發布功能

系統 2250 0

  本文完整代碼下載:github鏈接

  目前在做的工作有一部門是搭建一個可供公司內部使用的推送平臺,用的中間件是redis,于是就自然的想用redis5.0版本的新特性來實現這個功能,網上的demo比較少,且大多是終端操作的命令行,寫了一個Python的類和大家分享。

在介紹具體實現之前,先大致介紹一下背景。

在Redis5.0版本發布之前,redis也有一個發布、訂閱功能,但功能非常簡單,只能單純的發布和訂閱,適合在即時通信里使用。缺點非常多:

  1. 消息沒有持久化的機制。在Pub/Sub模型中,消費者是和連接(Connection)綁定的,當消費者的連接斷掉(網絡原因或者消費者進程crash)后,再次重連,那么Channel中的消息將永久消失(對于該消費者而言),也就是說Pub/Sub模型缺少消息回溯的機制

  2. 消費消息的速度和消費者的數量成反比。在Redis的實現中,Redis會把Channel中的消息逐個(Linear)推送給每個消費者,因此當消費者的數量達到一定規模時,服務器的性能將線性下降,因此每個消費者獲取到消息的延遲也線性增長

  3. 當生產者產生消息的速度遠大于消費者的消費能力的時候(此時可以簡單地理解為消息積壓),消費者會被強制斷開連接,因此會造成消息的丟失,這個特性可以詳見redis的配置

  4. 對頻道的消費者信息沒有展現接口。 在我們的項目里需要管理每一個頻道的訂閱者,雖然redis本身有記錄,但是并沒有提供API可以訪問。

Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,設計和Kafka非常相似。

  1. Redis Stream有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
  2. 每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
  3. 每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。
  4. 同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。【但是每個消費者并沒有消費到哪條?消息的單獨記錄,所以后續我隊列的消費者就是一個只含有一個消費者的消費組,這樣可以方便記錄更多信息】
  5. 消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。

 具體實現:


            
              class SubRedis(object):

? ? def __init__(self):
? ? ? ? if not hasattr(SubRedis, 'pool'):
? ? ? ? ? ? SubRedis.getRedisCoon() ?#創建redis連接池
? ? ? ? self._coon = redis.Redis(connection_pool=SubRedis.pool)

? ? @staticmethod
? ? def getRedisCoon():
? ? ? ? SubRedis.pool = redis.ConnectionPool(host=redisInfo['SubRedisAddress'],password=redisInfo['SubRedisPassword'],port=redisInfo['SubRedisPort'],db=redisInfo['db'])

    #返回一個channel的具體信息: 訂閱者數量,最后送達的msg的ID...
    def channel_info(self,channel):
        return self._coon.xinfo_stream(channel)

    #返回一個channel的具體訂閱群組的信息(這里是返回訂閱者,因為每一個群組里只有一個消費者)
    def channel_consumers_info(self,channel):
        InfoList = self._coon.xinfo_groups(channel)
        for GroupDict in InfoList:
            GroupDict.pop("consumers")
        return InfoList

    #創建消費者
    def create_consumer_group(self,name,channel):
        ret = self._coon.xgroup_create(channel,name,id="$")
        if ret == True:
            print self.channel_consumers_info(channel)
        else:
            logging.error("create consumer %s fill,ret %s" %(name,ret))
    
    #往某一個channel發送消息
    def publish(self,channel,msg):
        msgid = self._coon.xadd(channel,msg)
        return msgid
    def consumer_already_subscribed(self,channel,consumer):
        channel_consumers_infolist = self.channel_consumers_info(channel)
        for consumer_dict in channel_consumers_infolist:
            if consumer in consumer_dict.values():
                logging.warning("consumer %s has already subscribed %s" % (consumer, channel))
                return True
        return False
    
    #已經存在的訂閱者訂閱新頻道
    def subscribe(self,name,channel):
        if(self.consumer_already_subscribed(channel,consumer)):
            return False
        self.create_consumer_group(name,channel)
        print "%s subscribe %s success,channel %s info:"%(name,channel,channel),self.channel_consumers_info(channel)
        return
    
    #監聽并寫入新消息到文件
    def listen_channel(self,channel,consumer,file):
        if not (self.consumer_already_subscribed(channel,consumer)):
            return False
        mess = self._coon.xreadgroup(consumer,consumer,{channel:">"})
        if mess != []:
            msg_list = mess[0][1]
            for msg in msg_list:
                id, content = msg[0], msg[1]
                content["msgid"] = id
                json_content = json.dumps(content)
                json_content += ","
                print ("new message: ",content)
                with open(file, "a") as f:
                    f.write(json_content)
                self._coon.xack(channel, consumer, id)
            
          

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 池州市| 渭南市| 石家庄市| 富顺县| 化隆| 五河县| 宝丰县| 新蔡县| 武强县| 东源县| 恩施市| 页游| 普兰县| 万载县| 阿荣旗| 桂林市| 石柱| 永定县| 柞水县| 宁波市| 名山县| 定边县| 湖南省| 阜宁县| 襄樊市| 景泰县| 芜湖县| 界首市| 文昌市| 舞阳县| 临海市| 浮梁县| 巴东县| 乐清市| 股票| 五峰| 迁西县| 城口县| 平陆县| 若羌县| 高唐县|