【前面的话 】在前文 Sentinel进阶之基本原理  中简单介绍了一下Sentinel的基本原理,今天就来具体说一下Sentinel的流量控制。
壹、概述
FlowSlot 会根据预设的规则,结合前面 NodeSelectorSlot、ClusterNodeBuilderSlot、StatistcSlot 统计出来的实时信息进行流量控制。
限流的直接表现是在执行 Entry nodeA = SphU.entry(资源名字) 的时候抛出 FlowException 异常。FlowException 是 BlockException 的子类,您可以捕捉 BlockException 来自定义被限流之后的处理逻辑。
同一个资源可以对应多条限流规则。FlowSlot 会对该资源的所有限流规则依次遍历,直到有规则触发限流或者所有规则遍历完毕。
一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:
resource:资源名,即限流规则的作用对象 count: 限流阈值grade: 限流阈值类型,QPS 或线程数strategy: 根据调用关系选择策略 
贰、基于QPS/并发数的流量控制
流量控制主要有两种统计类型,一种是统计线程数,另外一种则是统计 QPS。类型由 FlowRule.grade 字段来定义。其中,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。其中线程数、QPS 值,都是由 StatisticSlot 实时统计获取的。
可以通过下面的命令查看实时统计信息:
1 curl http://localhost:8719/cnode?id=resourceName 
8719端口可以通过配置文件修改
 
输出内容格式如下:
1 2 idx id   thread  pass  blocked   success  total Rt   1m-pass   1m-block   1m-all   exeption 2   abc647 0     46     0           46     46   1       2763      0         2763     0 
其中:
thread: 代表当前处理该资源的线程数; 
pass: 代表一秒内到来到的请求; 
blocked: 代表一秒内被流量控制的请求数量; 
success: 代表一秒内成功处理完的请求; 
total: 代表到一秒内到来的请求以及被阻止的请求总和; 
RT: 代表一秒内该资源的平均响应时间; 
1m-pass: 则是一分钟内到来的请求; 
1m-block: 则是一分钟内被阻止的请求; 
1m-all: 则是一分钟内到来的请求和被阻止的请求的总和; 
exception: 则是一秒内业务本身异常的总和。 
 
2.1、并发线程数流量控制 线程数限流用于保护业务线程数不被耗尽。例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。为应对高线程占用的情况,业内有使用隔离的方案,比如通过不同业务逻辑使用不同线程池来隔离业务自身之间的资源争抢(线程池隔离),或者使用信号量来控制同时请求的个数(信号量隔离)。这种隔离方案虽然能够控制线程数量,但无法控制请求排队时间。当请求过多时排队也是无益的,直接拒绝能够迅速降低系统压力。Sentinel线程数限流不负责创建和管理线程池,而是简单统计当前请求上下文的线程个数,如果超出阈值,新的请求会被立即拒绝。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 public  class  FlowThreadDemo  {    private  static  AtomicInteger  pass  =  new  AtomicInteger ();     private  static  AtomicInteger  block  =  new  AtomicInteger ();     private  static  AtomicInteger  total  =  new  AtomicInteger ();     private  static  AtomicInteger  activeThread  =  new  AtomicInteger ();     private  static  volatile  boolean  stop  =  false ;     private  static  final  int  threadCount  =  100 ;     private  static  int  seconds  =  60  + 40 ;     private  static  volatile  int  methodBRunningTime  =  2000 ;     public  static  void  main (String[] args)  throws  Exception {         System.out.println(             "MethodA will call methodB. After running for a while, methodB becomes fast, "                  + "which make methodA also become fast " );         tick();         initFlowRule();         for  (int  i  =  0 ; i < threadCount; i++) {             Thread  entryThread  =  new  Thread (new  Runnable () {                 @Override                  public  void  run ()  {                     while  (true ) {                         Entry  methodA  =  null ;                         try  {                             TimeUnit.MILLISECONDS.sleep(5 );                             methodA = SphU.entry("methodA" );                             activeThread.incrementAndGet();                             Entry  methodB  =  SphU.entry("methodB" );                             TimeUnit.MILLISECONDS.sleep(methodBRunningTime);                             methodB.exit();                             pass.addAndGet(1 );                         } catch  (BlockException e1) {                             block.incrementAndGet();                         } catch  (Exception e2) {                                                      } finally  {                             total.incrementAndGet();                             if  (methodA != null ) {                                 methodA.exit();                                 activeThread.decrementAndGet();                             }                         }                     }                 }             });             entryThread.setName("working thread" );             entryThread.start();         }     }     private  static  void  initFlowRule ()  {         List<FlowRule> rules = new  ArrayList <FlowRule>();         FlowRule  rule1  =  new  FlowRule ();         rule1.setResource("methodA" );                  rule1.setCount(20 );         rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);         rule1.setLimitApp("default" );         rules.add(rule1);         FlowRuleManager.loadRules(rules);     }     private  static  void  tick ()  {         Thread  timer  =  new  Thread (new  TimerTask ());         timer.setName("sentinel-timer-task" );         timer.start();     }     static  class  TimerTask  implements  Runnable  {         @Override          public  void  run ()  {             long  start  =  System.currentTimeMillis();             System.out.println("begin to statistic!!!" );             long  oldTotal  =  0 ;             long  oldPass  =  0 ;             long  oldBlock  =  0 ;             while  (!stop) {                 try  {                     TimeUnit.SECONDS.sleep(1 );                 } catch  (InterruptedException e) {                 }                 long  globalTotal  =  total.get();                 long  oneSecondTotal  =  globalTotal - oldTotal;                 oldTotal = globalTotal;                 long  globalPass  =  pass.get();                 long  oneSecondPass  =  globalPass - oldPass;                 oldPass = globalPass;                 long  globalBlock  =  block.get();                 long  oneSecondBlock  =  globalBlock - oldBlock;                 oldBlock = globalBlock;                 System.out.println(seconds + " total qps is: "  + oneSecondTotal);                 System.out.println(TimeUtil.currentTimeMillis() + ", total:"  + oneSecondTotal                     + ", pass:"  + oneSecondPass                     + ", block:"  + oneSecondBlock                     + " activeThread:"  + activeThread.get());                 if  (seconds-- <= 0 ) {                     stop = true ;                 }                 if  (seconds == 40 ) {                     System.out.println("method B is running much faster; more requests are allowed to pass" );                     methodBRunningTime = 20 ;                 }             }             long  cost  =  System.currentTimeMillis() - start;             System.out.println("time cost: "  + cost + " ms" );             System.out.println("total:"  + total.get() + ", pass:"  + pass.get()                 + ", block:"  + block.get());             System.exit(0 );         }     } } 
2.2、QPS流量控制 当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的手段包括下面 3 种,对应 FlowRule 中的 controlBehavior 字段:
1、直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式。该方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。
2、冷启动(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式。该方式主要用于系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮的情况。
  通常冷启动的过程系统允许通过的 QPS 曲线如下图所示:
3、匀速器(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式。这种方式严格控制了请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。
  
  这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
 
叁、基于调用关系的流量控制
调用关系包括调用方、被调用方;方法又可能会调用其它方法,形成一个调用链路的层次关系。Sentinel 通过 NodeSelectorSlot 建立不同资源间的调用的关系,并且通过 ClusterNodeBuilderSlot 记录每个资源的实时统计信息。
有了调用链路的统计信息,我们可以衍生出多种流量控制手段。
3.1 根据调用方限流 ContextUtil.enter(resourceName, origin) 方法中的 origin 参数标明了调用方身份。这些信息会在 ClusterBuilderSlot 中被统计。可通过以下命令来展示不同的调用方对同一个资源的调用数据:
1 curl http://localhost:8719/origin?id=nodeA 
调用数据示例:
1 2 3 4 id: nodeA idx origin  threadNum passedQps blockedQps totalQps aRt   1m-passed 1m-blocked 1m-total  1   caller1 0         0         0          0        0     0         0          0 2   caller2 0         0         0          0        0     0         0          0 
上面这个命令展示了资源名为 nodeA 的资源被两个不同的调用方调用的统计。
限流规则中的 limitApp 字段用于根据调用方进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:
default:表示不区分调用者,来自任何调用者的请求都将进行限流统计。如果这个资源名的调用总和超过了这条规则定义的阈值,则触发限流。{some_origin_name}:表示针对特定的调用者,只有来自这个调用者的请求才会进行流量控制。例如 NodeA 配置了一条针对调用者caller1的规则,那么当且仅当来自 caller1 对 NodeA 的请求才会触发流量控制。other:表示针对除 {some_origin_name} 以外的其余调用方的流量进行流量控制。例如,资源NodeA配置了一条针对调用者 caller1 的限流规则,同时又配置了一条调用者为 other 的规则,那么任意来自非 caller1 对 NodeA 的调用,都不能超过 other 这条规则定义的阈值。 
同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default
 
3.2 根据调用链路入口限流:链路限流 NodeSelectorSlot 中记录了资源之间的调用链路,这些资源通过调用关系,相互之间构成一棵调用树。这棵树的根节点是一个名字为 machine-root 的虚拟节点,调用链的入口都是这个虚节点的子节点。
一棵典型的调用树如下图所示:
1 2 3 4 5 6 7           machine-root               /       \              /         \        Entrance1     Entrance2           /             \          /               \ DefaultNode(nodeA)   DefaultNode(nodeA) 
上图中来自入口 Entrance1 和 Entrance2 的请求都调用到了资源 NodeA,Sentinel 允许只根据某个入口的统计信息对资源限流。比如我们可以设置 FlowRule.strategy 为 RuleConstant.CHAIN,同时设置 FlowRule.ref_identity 为 Entrance1 来表示只有从入口 Entrance1 的调用才会记录到 NodeA 的限流统计当中,而对来自 Entrance2 的调用漠不关心。
调用链的入口是通过 API 方法 ContextUtil.enter(name) 定义的。
3.3 具有关系的资源流量控制:关联流量控制 当两个资源之间具有资源争抢或者依赖关系的时候,这两个资源便具有了关联。比如对数据库同一个字段的读操作和写操作存在争抢,读的速度过高会影响写得速度,写的速度过高会影响读的速度。如果放任读写操作争抢资源,则争抢本身带来的开销会降低整体的吞吐量。可使用关联限流来避免具有关联关系的资源之间过度的争抢,举例来说,read_db 和 write_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置 FlowRule.strategy 为 RuleConstant.RELATE 同时设置 FlowRule.ref_identity 为 write_db。这样当写库操作过于频繁时,读数据的请求会被限流。
【后面的话 】最后是我自己实践的源码  ,包括流量控制和初始规则加载等等。
另外在使用API去加载规则的时候,发现存在规则不生效的时候,通过调试发现:Sentinel在加载规则到内存中的时候会校验规则的合法性,如果规则不合法,该规则将不被加载。
具体可以查看com.alibaba.csp.sentinel.property#configLoad方法的实现类中参数校验方法,下面贴出FlowRule 的校验方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 /**  * Check whether provided flow rule is valid.  *  * @param rule flow rule to check  * @return true if valid, otherwise false  */ public static boolean isValidRule(FlowRule rule) {     boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0         && rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0;     if (!baseValid) {         return false;     }     // Check strategy and control (shaping) behavior.     return checkClusterField(rule) && checkStrategyField(rule) && checkControlBehaviorField(rule); } private static boolean checkClusterField(/*@NonNull*/ FlowRule rule) {     if (!rule.isClusterMode()) {         return true;     }     ClusterFlowConfig clusterConfig = rule.getClusterConfig();     if (clusterConfig == null) {         return false;     }     if (!validClusterRuleId(clusterConfig.getFlowId())) {         return false;     }     if (!isWindowConfigValid(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())) {         return false;     }     switch (clusterConfig.getStrategy()) {         case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL:             return true;         default:             return false;     } } public static boolean isWindowConfigValid(int sampleCount, int windowIntervalMs) {     return sampleCount > 0 && windowIntervalMs > 0 && windowIntervalMs % sampleCount == 0; } private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) {     if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) {         return StringUtil.isNotBlank(rule.getRefResource());     }     return true; } private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) {     switch (rule.getControlBehavior()) {         case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:             return rule.getWarmUpPeriodSec() > 0;         case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:             return rule.getMaxQueueingTimeMs() > 0;         case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:             return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0;         default:             return true;     } }