tft每日頭條

 > 生活

 > 用實時接口推送曆史數據

用實時接口推送曆史數據

生活 更新时间:2025-08-12 23:39:48

文|吳國峰|得物技術

前言:

得物消息中心每天推送數億消息給得物用戶,每天引導數百萬的有效用戶點擊,為得物App提供了強大,高效且低成本的用戶觸達渠道。這麼龐大的系統,如何去監控系統的穩定性,保證故障盡早發現,及時響應至關重要。為此,我們搭建了得物消息中心SLA體系,相關架構如圖:

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)1

本文主要介紹我們如何實現SLA監控體系,并一步步重構優化的,作為過去工作的經驗總結分享給大家。

1. 背景

得物消息中心主要承接上遊各業務的消息推送請求,如營銷推送、商品推廣、訂單信息等。消息中心接受業務請求後,會根據業務需求去執行【消息内容檢驗,防疲勞,防重複,用戶信息查詢,廠商推送】等節點,最後再通過各手機廠商及得物自研的在線推送通道觸達用戶。整體推送流程簡化如下:

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)2

我們希望能夠對各個節點提供準确的SLA監控指标和告警能力,從而實現對整體系統的穩定性保證。下面是我們設計的部分指标:

監控指标

  • 節點推送量
  • 節點推送耗時
  • 節點耗時達标率
  • 整體耗時達标率
  • 節點阻塞量
  • 其他指标

告警能力

  • 節點耗時告警
  • 節點阻塞量告警
  • 其他告警能力

那我們如何實現對這些指标的統計呢?最簡單的方案就是在每個節點的入口和出口增加統計代碼,如下:

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)3

這就是我們的 方案0 我們用這種方案快速落地了SLA-節點推送數量統計的開發。但是這個方案的缺點也很明顯,考慮以下幾個問題:

  • 如何實現另一個SLA指标開發?比如節點耗時統計,也需要在每個節點的方法内部增加統計代碼。
  • 如果有新的節點需要做SLA統計,怎麼辦?需要把所有的統計指标在新節點上面再實現一遍。
  • 如何避免SLA統計邏輯異常導緻推送主流程失敗?到處加try{}catch()去捕獲異常嗎?
  • 如何分工?除了節點耗時統計外還有很多其他指标要實現。最簡單的分工方式就是按照SLA指标來分工,各自領幾個指标去開發,問題在于各個指标的統計邏輯都耦合在一起,按照統計指标來分工事實上變的不可能。

項目開發不好分工,通常意味着代碼耦合度過高,是典型的代碼壞味道,需要及時重構。

2. 痛點和目标

從上面的幾個問題出發,我們總結出 方案0 的幾個痛點,以及我們後續重構的目标。

2.1 痛點
  • 監控節點不清晰。消息推送服務涉及多個不同的操作步驟。這些步驟我們稱之為節點。但是這些節點的定義并不明确,隻是我們團隊内部約定俗成的一些概念。這就導緻日常溝通和開發中有很多模糊空間。

在項目開發過程中,經常會碰到長時間的争論找不到解法。原因往往是大家對基礎的概念理解沒有打通,各說各話。這時候要果斷停止争論,首先對基本概念給出一緻的定義,然後再開始讨論。模糊的概念是高效溝通的死敵。

  • 維護困難。

每個節點的統計都需要修改業務節點的代碼,統計代碼分散在整個系統的各個節點維護起來很麻煩;

同時推送流程的主要邏輯被淹沒在統計代碼中。典型的代碼如下,一個業務方法中可能有三分之一是SLA統計的代碼。

protected void doRepeatFilter(MessageDTO messageDTO) { //業務邏輯:防重複過濾 //... //業務邏輯:防重複過濾 if (messageSwitchApi.isOpenPushSla && messageDTO.getPushModelList().stream() .anyMatch(pushModel -> pushModel.getReadType().equals(MessageChannelEnums.PUSH.getChannelCode()))) { messageDTO.setCheckRepeatTime(System.currentTimeMillis()); if (messageDTO.getQueryUserTime() > 0) { long consumeTime = messageDTO.getCheckRepeatTime() - messageDTO.getQueryUserTime(); //SLA耗時統計邏輯 messageMonitorService.monitorPushNodeTimeCost( MessageConstants.MsgTimeConsumeNode.checkRepeatTime.name(), consumeTime, messageDTO); } } }

  • 影響性能

SLA監控邏輯都在推送線程中處理,有些監控統計比較耗時,降低了推送效率。

統計代碼會頻繁的寫Redis緩存,對緩存壓力較大。最好是能把部分數據寫入本地緩存,定時去合并到Redis中。

  • 難以擴展

新的節點需要監控時,沒辦法快速接入,需要到處複制監控邏輯。

新的監控指标要實現的話,需要修改每個業務節點的代碼。

2.2 目标

理清問題之後,針對系統既有的缺陷,我們提出了以下的重構目标:

  • 主流程的歸主流程,SLA 的歸 SLA。

SLA 監控代碼從主流程邏輯中剝離出來,徹底避免SLA代碼對主流程代碼的污染。

異步執行SLA 邏輯計算,獨立于推送業務主流程,防止SLA異常拖垮主流程。

  • 不同監控指标的計算相互獨立,避免代碼耦合。
  • 實現SLA監控代碼一次開發,到處複用。

快速支持新監控指标的實現。

複用已有監控指标到新的節點,理想的方式是在節點方法上加個注解就能實現對該節點的統計和監控。

3. 分步解題3.1 節點定義

SLA 是基于節點來實現的,那麼節點的概念不容許有模糊的空間。因此在重構開始之前,我們把節點的概念通過代碼的形式固定下來。

public enum NodeEnum { MESSAGE_TO_PUSH("msg","調用推送接口"), FREQUENCY_FILTER("msg","防疲勞"), REPEAT_FILTER("push","防重複"), CHANNEL_PUSH("push","手機廠商通道推送"), LC_PUSH("push","自研長連推送") //其他節點... }

3.2 aop

接下來考慮解耦主流程代碼和SLA監控代碼。最直接的方式當然就是AOP了。

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)4

以上是節點耗時統計的優化設計圖。把每個推送節點作為AOP切點,把每個節點上的SLA統計遷移到AOP中統一處理。到這裡,就實現了SLA代碼和主流程代碼的解耦。但這還隻是萬裡長征第一步。如果有其他的統計邏輯要實現怎麼辦?是否要全部堆積在AOP代碼裡面?

3.3 觀察者模式

SLA有很多個統計指标。我們不希望把所有指标的統計邏輯都堆積在一起。那麼如何進行解耦呢?答案是觀察者模式。我們在AOP切點之前發出節點進入事件(EnterEvent),切點退出之後發出節點退出事件(ExitEvent)。把各個指标統計邏輯抽象成節點事件處理器。各個處理器去監聽節點事件,從而實現統計指标間的邏輯解耦。

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)5

3.4 适配器

這裡還需要考慮一個問題。各個節點的出參和入參都不一緻,我們如何才能把不同節點的出入參統一成event對象來分發呢?如果我們直接在AOP代碼中去判斷切點所屬的節點,并取出該節點的參數,再去生成event對象,那麼AOP的邏輯複雜度會迅速膨脹,并且需要經常變動。比較好的方式是應用适配器模式。AOP負責找到切點對應的适配器,由适配器去負責把節點參數轉為event對象,于是方案演變如下:

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)6

4. 整體方案

現在我們對每個問題都找到了相應的解法,再把這些解法串起來,形成完整的重構方案。重構之後,SLA邏輯流程如下:

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)7

到這裡方案整體設計完成。在動手實現之前,還需要和方案相關成員同步設計方案,梳理潛在風險,并進行分工。涉及到全流程的重構,光有紙面的方案,很難保證方案評估的完整性和有效性。我們希望能夠驗證方案可行性,盡早的暴露方案的技術風險,保證項目相關小夥伴對方案的理解沒有大的偏差。這裡推薦一個好的方式是提供方案原型。

4.1 原型

方案原型是指對既有方案的一個最簡單的實現,用于技術評估和方案說明。

正所謂talk is cheap, show me the code。對程序員來說,方案設計的再完美,也沒有可運行的代碼有說服力。我們大概花了兩個小時時間基于現有設計快速實現了一個原型。原型代碼如下:

  • AOP切面類EventAop,負責把增強代碼織入切點執行前後。

public class EventAop { @Autowired private EventConfig eventConfig; @Autowired private AdaptorFactory adaptorFactory; @Around("@annotation(messageNode)") public Object around(ProceedingJoinPoint joinPoint, MessageNode messageNode) throws Throwable { Object result = null; MessageEvent enterEvent = adaptorFactory.beforProceed(joinPoint, messageNode); eventConfig.publishEvent(enterEvent); result = joinPoint.proceed(); MessageEvent exitEvent = adaptorFactory.postProceed(joinPoint, messageNode); eventConfig.publishEvent(exitEvent); return result; } }

  • 事件配置類EventConfig, 這裡直接使用Spring event 廣播器,負責分發event。

public class EventConfig { @bean public ApplicationEventMulticaster applicationEventMulticaster() { //@1 //創建一個事件廣播器 SimpleApplicationEventMulticaster result = new SimpleApplicationEventMulticaster(); return result; } public void publishEvent(MessageEvent event) { this.applicationEventMulticaster().multicastEvent(event); } }

  • MessageEvent, 繼承Spring event提供的ApplicationEvent類。

public class MessageEvent extends ApplicationEvent {}

  • 節點适配器工廠類, 獲取節點對應的适配器,把節點信息轉換為MessageEvent對象。

public class AdaptorFactory { @Autowired private DefaultNodeAdaptor defaultNodeAdaptor; //支持切點之前産生事件 public MessageEvent beforeProceed(Object[] args, MessageNode messageNode) { INodeAdaptor adaptor = getAdaptor(messageNode.node()); return adaptor.beforeProceedEvent(args, messageNode); } //支持切點之後産生事件 public MessageEvent afterProceed(Object[] args, MessageNode messageNode, MessageEvent event) { INodeAdaptor adaptor = getAdaptor(messageNode.node()); return adaptor.postProceedEvent(args, event); } private INodeAdaptor getAdaptor(NodeEnum nodeEnum) { return defaultNodeAdaptor; } }

4.2 技術審查

在整體方案和原型代碼的基礎上,我們還需要審查方案中所用的技術,是否有風險,評估這些風險對既有功能,分工排期等的影響面。比如我們這邊用到的主要是Spring AOP, Spring Event機制,那麼他們可能潛在以下問題,需要在開發前就做好評估的:

  • Spring AOP的問題:Spring AOP中私有方法無法增強。bean自己調用自己的public方法也無法增強。
  • Spring Event的問題:默認事件處理和事件分發是在同一個線程中運行的,實現時需要配置Spring事件線程池,把事件處理線程和業務線程分隔開。

潛在的技術問題要充分溝通。每個成員的技術背景不同,你眼裡很簡單的技術問題,可能别人半天就爬不出來。方案設計者要充分預知潛在的技術問題,提前溝通,避免無謂的問題排查,進而提升開發效率。

4.3 成本收益分析
  • 成本

一套完整的方案考慮的不僅僅是技術可行性,還需要考慮實現成本。我們通過一個表格來簡單說明我此次重構前後的成本對比。

成本項

重構前

重構後

備注

Spring AOP實現

0

1

使用現有成熟技術,成本較低,且風險可控。

Spring Event實現

0

1

統計指标實現

節點耗時

2

1

重構前統計指标需要在整個業務鍊路上添加,重構之後各個統計指标隻需專注于本身指标初拉力器的開發,開發成本至少降低50%

節點阻塞

2

1

其他指标

2n

n

測試

n

1

重構之後,各個指标的測試、推送主流程的測試可以獨立進行,互不影響,能明顯提升測試效率。

  • 收益
  1. 代碼清晰。SLA統計邏輯和流程邏輯解耦。SLA各個指标的統計完全解耦,互不依賴。
  2. 提升開發效率。SLA指标統計一次開發,到處複用。隻要在需要監控的代碼上加上節點注解。
  3. 提高性能。SLA邏輯在獨立的線程中執行,不影響主流程。
  4. 提高穩定性。SLA邏輯和主流程解耦,SLA頻繁變更也不影響主流程代碼,也不會由于SLA異常拖垮主流程。
  5. 方便分工排期。重構也解決了不好分工的難題。由于各個指标通過重構實現了邏輯隔離,實現時完全可以獨立開發。因此我們可以簡單的按照SLA統計指标來安排分工。

代碼重構最難的不是技術,而是決策。決定系統是否要重構,何時重構才是最難的部分。往往一個團隊會花費大量時間去糾結是否要重構,但是到最後都沒人敢做出最終決策。之所以難是因為缺乏決策材料。可以考慮引入成本收益表等決策工具,對重構進行定性、定量分析,幫助我們決策。

5. 避坑指南

實現的過程也碰到的一些比較有意思的坑,下面列出來供大家參考。

5.1 AOP失效

Spring AOP使用cglib 和jdk動态代理實現對原始bean對象的代理增強。不管是哪種方式,都會為原始bean對象生成一個代理對象,我們調用原始對象時,實際上是先調用代理對象,由代理對象執行切片邏輯,并用反射去調用原始對象的方法,實際上運行如下代碼。

public static Object invokeJoinpointUsingReflection(@Nullable Object target, Method method, Object[] args) throws Throwable { //使用反射調用原始對象的方法 ReflectionUtils.makeAccessible(method); return method.invoke(target, args); }

這時候如果被調用方法是原始對象本身的方法,那就不會調用代理對象了。這就導緻代理對象邏輯沒有執行,從而不觸發代碼增強。具體原理參考以下例子,假設我們有一個服務實現類AServiceImpl,提供了method1(), method2()兩個方法。其中method1()調用了method2()。

@Service("aService") public class AServiceImpl implements AService{ @MessageNode(node = NodeEnum.Node1) public void method1(){ this.method2(); } @MessageNode(node = NodeEnum.Node2) public void method2(){ } }

我們的AOP代碼通過@MessageNode注解作為切點織入統計邏輯。

@Component("eventAop") @Aspect @Slf4j public class EventAop { @Around("@annotation(messageNode)") public Object around(ProceedingJoinPoint joinPoint, MessageNode messageNode) throws Throwable { /** 節點開始統計邏輯... **/ //執行切點 result = joinPoint.proceed(); /** 節點結束統計邏輯... **/ return result; } }

Spring啟動的時候,IOC容器會為AserviceImpl生成兩個實例,一個是原始AServiceImpl對象,一個是增強過後的ProxyAserviceImpl對象,方法method1的調用過程如下,可以看到從method1()去調用method2()方法的時候,沒有走代理對象,而是直接調用了目标對象的method2()方法。

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)8

5.1.1 解決辦法
  • 注入代理對象aopSelf,主動調用代理對象aopSelf的方法。

示例代碼如下:

@Service("aService") public class AServiceImpl implements AService{ @Autowired @Lazy private AService aopSelf; @MessageNode(node = NodeEnum.Node1) public void method1(){ aopSelft.method2(); } @MessageNode(node = NodeEnum.Node2) public void method2(){ } }

  • 以上方法治标不治本。我們從頭探究為何會出現自調用的代碼增強?原因是我們要對兩個不同的節點進行SLA統計增強。但是這兩個節點的方法定義在同一個Service類當中,這顯然違反了編碼的單一功能原則。因此更好的方式應該是抽象出一個單獨的類來處理。代碼如下:

@Service("aService") public class AServiceImpl implements AService{ @Autowired @Lazy private AService aopSelf; @MessageNode(node = NodeEnum.Node1) public void method1(){ aopSelft.method2(); } @MessageNode(node = NodeEnum.Node2) public void method2(){ } }

重構能夠幫助我們發現并且定位代碼壞味道,從而指導我們對壞代碼進行重新抽象和優化。

5.2 通用依賴包

實現中碰到的另一個問題,是如何提供通用依賴。由于消息中心内部分很多不同的微服務,比如我們有承接外部業務推送請求的Message服務,還有把業務請求轉發給各個手機廠商的Push服務,還有推送到達後,給得物App打上小紅點的Hot服務等等,這些服務都需要做SLA監控。這時候就需要把現有的方案抽象出公共依賴,供各服務使用。

我們的做法是把【節點定義,AOP配置,Spring Event配置,節點适配器接口類】抽象到common依賴包,各個服務隻需要依賴這個common包就可以快速接入SLA統計能力。

這裡有一個比較有意思的點,像【AOP配置,Spring Event配置, 節點适配器】這些Bean的配置,是要開放給各個服務自己配置,還是直接在common包裡默認提供配置?比如下面的這個Bean配置,決定了Spring Event處理線程池的關鍵配置項,如核心線程數,緩存池大小。

@Bean public ThreadPoolExecutorFactoryBean applicationEventMulticasterThreadPool() { ThreadPoolExecutorFactoryBean result = new ThreadPoolExecutorFactoryBean(); result.setThreadNamePrefix("slaEventMulticasterThreadPool-"); result.setCorePoolSize(5); result.setMaxPoolSize(5); result.setQueueCapacity(100000); return result; }

這個配置交給各個服務自己管理,靈活性會高一點,但同時意味着服務對common包的使用成本也高一點,common包使用者需要自己去決定配置内容。相對的,在common包中直接提供呢,靈活性降低,但是用起來方便。後面參照 Spring Boot 約定大于配置的設計規範,還是決定直接在common包中提供配置,邏輯是各個服務對這些配置的需求不會有太大差别,為了這點靈活性提升使用成本,不是很有必要。當然如果後續有服務确實需要不同的配置,還可以根據實踐需求靈活支持。

約定大于配置,也可以叫做約定優于配置(convention overconfiguration),也稱作按約定編程,是一種軟件設計範式,指在減少軟件開發人員需做決定的數量,獲得簡單的好處,而又不失靈活性。

我們都知道Spring, Spring Boot的理念很先進,而實踐中能夠借鑒先進理念指導開發實踐也算是一種工程人員的幸福,正如孔老夫子所說:就有道而正焉,可謂好學矣。

5.3 業務結果

代碼實現之後,又進行了性能壓測,線上灰度發布,線上數據觀察等等步驟,在這裡就不再贅述。那麼SLA技術演進到這裡,為我們的推送業務獲取了哪些業務結果呢?下面提供比較典型的結果。

消息推送服務會調用手機廠商的推送服務接口,我們要監控廠商推送接口的耗時性能怎麼辦呢?在重構之前,我們需要在各個廠商推送接口之前和之後增加統計代碼,計算耗時并寫入緩存。在重構之後,我們要做的,隻是簡單的添加一個節點注解即可實現。比如我們想統計OPPO推送接口的SLA指标,隻需添加如下注解:

@MessageNode(node = NodeEnum.OPPO_PUSH, needEnterEvent = true) public MessageStatisticsDTO sendPush(PushChannelBO bo) { if (bo.getPushTokenDTOList().size() == 1) { return sendToSingle(bo); } else { return sendToList(bo); } }

然後我們在控制台就能很快發現OPPO推送的統計信息。比如我們看控制台上OPPO推送瓶頸耗時20s,那說明OPPO的推送連接肯定有超時,連接池的配置需要優化。

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)9

除了監控指标,我們還支持實時告警,比如下面這個節點阻塞告警,我們能夠及時發現系統中堆積多的節點,迅速排查是否節點有卡頓,還是上遊調用量猛增,從而把潛在的線上問題扼殺在搖籃之中。

用實時接口推送曆史數據(日均數億推送穩定性監控實踐)10

6. 展望未來

SLA上線一周之内,我們就已經依賴這套技術發現系統中的潛在問題,但是事實上對SLA的業務收益我們完全可以有更大的想象空間。比如說:

  • 我們目前監控的主要還是技術上的指标,像節點耗時,節點阻塞等。後續我們可以把業務相關的統計也支持上,我們可以迅速定位是哪個業務方的調用導緻系統阻塞等等業務性能數據。
  • 其次我們可以統計各業務推送的ROI數據,目前消息服務支持數億的消息,但是這裡面哪些推送收益高,哪些收益低目前是沒有明确的指标的。我們可以收集各業務的推送量,點擊量等信息去計算業務推送的ROI指标。
  • 當我們有了業務性能數據,業務ROI指标,我們就有機會對業務推送做精細化的管控,比如今天推送資源緊張,我是否可以暫緩低ROI的業務推送。比如高ROI推送今天已經觸達過用戶,我們是否可以取消當天的類似推送,防止打擾用戶等等。

這些都是消息中心SLA能夠為業務進行推送賦能的方向,而且這些方向可以基于目前的SLA技術架構迅速低成本的落地,真正實現技術服務于業務,技術推動業務。

7. 總結

以上是消息中心SLA重構演進的整個過程。對于消息服務來說,SLA的開發沒有盡頭,我們要持續關注系統的核心指标,不斷完善監控工具。正因為如此,我們更需要夯實SLA的技術基礎,在靈活輕量的技術底座上去實現更複雜的功能。回過頭看這個重構過程,我們也總結了以下一些經驗供大家參考。

  • 不要害怕重構,也不要過度設計。重構的目的不在于炫技,而在于解決實際問題,提高開發效率。
  • 要有原型。複雜的設計往往難以開展,解決辦法是從最小實踐開始。方案原型就是方案設計的最小實踐,有了原型之後再審查設計,演進方案會方便很多。
  • 對技術充分掌控。預知所用技術的潛在風險,保證有足夠的技術能力去解決。并且要把風險提前暴露給團隊成員,減少踩坑幾率,避免無謂的開發調試成本。
,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关生活资讯推荐

热门生活资讯推荐

网友关注

Copyright 2023-2025 - www.tftnews.com All Rights Reserved