Skip to content

上文中介绍了Engine的设计,其中有Pipline相关内容没有介绍,本文将向你阐述Tomcat的管道机制以及它要解决的问题。@anarkh

  • Tomcat - Container的管道机制:责任链模式
  • 内容引入
  • 知识准备
  • 责任链模式
  • FilterChain
  • Pipline机制
  • Vavle接口设计
  • Pipline接口设计
  • BaseVavle设计
  • StandardPipline实现
  • ContainerBase中运用Pipline
  • 对比下两种责任链模式

内容引入

承接上文Engine的设计,从以下几个方面,我将向你解释为什么要理解Tomcat中管道机制,它要解决什么问题?@anarkh

  • Tomcat总计架构图中Pipeline和Vavle

  • 我们在上文Engine中有一块Pipline没有解释:

  • 为什么Tomcat要引入Pipline呢?它要解决什么问题呢?

下文将向你详细阐述。

知识准备

在弄清楚管道机制前,你需要一些基础知识和其它软件设计中的应用场景。

责任链模式

管道机制在设计模式上属于责任链模式,如果你不理解,请参看如下文章:

责任链模式(Chain of responsibility pattern): 通过责任链模式, 你可以为某个请求创建一个对象链. 每个对象依序检查此请求并对其进行处理或者将它传给链中的下一个对象。

FilterChain

在软件开发的常接触的责任链模式是FilterChain,它体现在很多软件设计中:

  • 比如Spring Security框架中

  • 比如HttpServletRequest处理的过滤器中

当一个request过来的时候,需要对这个request做一系列的加工,使用责任链模式可以使每个加工组件化,减少耦合。也可以使用在当一个request过来的时候,需要找到合适的加工方式。当一个加工方式不适合这个request的时候,传递到下一个加工方法,该加工方式再尝试对request加工。

网上找了图,这里我们后文将通过Tomcat请求处理向你阐述。

Pipline机制

为什么要有管道机制?

在一个比较复杂的大型系统中,如果一个对象或数据流需要进行繁杂的逻辑处理,我们可以选择在一个大的组件中直接处理这些繁杂的业务逻辑, 这个方式虽然达到目的,但扩展性和可重用性较差, 因为可能牵一发而动全身。更好的解决方案是采用管道机制,用一条管道把多个对象(阀门部件)连接起来,整体看起来就像若干个阀门嵌套在管道中一样,而处理逻辑放在阀门上

Vavle接口设计

理解它的设计,第一步就是阀门设计

java
public interface Valve {


    
    public Valve getNext();
    public void setNext(Valve valve);


    
    public void backgroundProcess();


    
    public void invoke(Request request, Response response)
        throws IOException, ServletException;

    
    public boolean isAsyncSupported();
}

Pipline接口设计

由于Pipline是为容器设计的,所以它在设计时加入了一个Containerd接口, 就是为了制定当前Pipline所属的容器:

java
public interface Contained {

    Container getContainer();

    void setContainer(Container container);
}

我们接着看下Pipline接口设计

java
public interface Pipeline extends Contained {

    
    public Valve getBasic();
    public void setBasic(Valve valve);


    
    public void addValve(Valve valve);
    public Valve[] getValves();
    public void removeValve(Valve valve);


    
    public Valve getFirst();


    
    public boolean isAsyncSupported();


    
    public void findNonAsyncValves(Set<String> result);
}

BaseVavle设计

由于Valve也是组件,需要生命周期管理,所以实现LifecycleMBeanBase,同时集成Contained和Valve

java
public abstract class ValveBase extends LifecycleMBeanBase implements Contained, Valve {

    protected static final StringManager sm = StringManager.getManager(ValveBase.class);


    

    public ValveBase() {
        this(false);
    }


    public ValveBase(boolean asyncSupported) {
        this.asyncSupported = asyncSupported;
    }


    

    
    protected boolean asyncSupported;


    
    protected Container container = null;


    
    protected Log containerLog = null;


    
    protected Valve next = null;


    

    
    @Override
    public Container getContainer() {
        return container;
    }


    
    @Override
    public void setContainer(Container container) {
        this.container = container;
    }


    @Override
    public boolean isAsyncSupported() {
        return asyncSupported;
    }


    public void setAsyncSupported(boolean asyncSupported) {
        this.asyncSupported = asyncSupported;
    }


    
    @Override
    public Valve getNext() {
        return next;
    }


    
    @Override
    public void setNext(Valve valve) {
        this.next = valve;
    }


    

    
    @Override
    public void backgroundProcess() {
        
    }


    @Override
    protected void initInternal() throws LifecycleException {
        super.initInternal();
        containerLog = getContainer().getLogger();
    }


    
    @Override
    protected synchronized void startInternal() throws LifecycleException {
        setState(LifecycleState.STARTING);
    }


    
    @Override
    protected synchronized void stopInternal() throws LifecycleException {
        setState(LifecycleState.STOPPING);
    }


    
    @Override
    public String toString() {
        return ToStringUtil.toString(this);
    }


    

    @Override
    public String getObjectNameKeyProperties() {
        StringBuilder name = new StringBuilder("type=Valve");

        Container container = getContainer();

        name.append(container.getMBeanKeyProperties());

        int seq = 0;

        
        Pipeline p = container.getPipeline();
        if (p != null) {
            for (Valve valve : p.getValves()) {
                
                if (valve == null) {
                    continue;
                }
                
                if (valve == this) {
                    break;
                }
                if (valve.getClass() == this.getClass()) {
                    
                    
                    seq ++;
                }
            }
        }

        if (seq > 0) {
            name.append(",seq=");
            name.append(seq);
        }

        String className = this.getClass().getName();
        int period = className.lastIndexOf('.');
        if (period >= 0) {
            className = className.substring(period + 1);
        }
        name.append(",name=");
        name.append(className);

        return name.toString();
    }


    @Override
    public String getDomainInternal() {
        Container c = getContainer();
        if (c == null) {
            return null;
        } else {
            return c.getDomain();
        }
    }
}

StandardPipline实现

里面方法很简单,就直接贴代码了。它必然是继承LifecycleBase同时实现Pipline.

贴个图方面你理解

java
public class StandardPipeline extends LifecycleBase implements Pipeline {

    private static final Log log = LogFactory.getLog(StandardPipeline.class);
    private static final StringManager sm = StringManager.getManager(Constants.Package);

    


    
    public StandardPipeline() {

        this(null);

    }


    
    public StandardPipeline(Container container) {

        super();
        setContainer(container);

    }


    


    
    protected Valve basic = null;


    
    protected Container container = null;


    
    protected Valve first = null;


    

    @Override
    public boolean isAsyncSupported() {
        Valve valve = (first!=null)?first:basic;
        boolean supported = true;
        while (supported && valve!=null) {
            supported = supported & valve.isAsyncSupported();
            valve = valve.getNext();
        }
        return supported;
    }


    @Override
    public void findNonAsyncValves(Set<String> result) {
        Valve valve = (first!=null) ? first : basic;
        while (valve != null) {
            if (!valve.isAsyncSupported()) {
                result.add(valve.getClass().getName());
            }
            valve = valve.getNext();
        }
    }


    

    
    @Override
    public Container getContainer() {
        return this.container;
    }


    
    @Override
    public void setContainer(Container container) {
        this.container = container;
    }


    @Override
    protected void initInternal() {
        
    }


    
    @Override
    protected synchronized void startInternal() throws LifecycleException {

        
        Valve current = first;
        if (current == null) {
            current = basic;
        }
        while (current != null) {
            if (current instanceof Lifecycle)
                ((Lifecycle) current).start();
            current = current.getNext();
        }

        setState(LifecycleState.STARTING);
    }


    
    @Override
    protected synchronized void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);

        
        Valve current = first;
        if (current == null) {
            current = basic;
        }
        while (current != null) {
            if (current instanceof Lifecycle)
                ((Lifecycle) current).stop();
            current = current.getNext();
        }
    }


    @Override
    protected void destroyInternal() {
        Valve[] valves = getValves();
        for (Valve valve : valves) {
            removeValve(valve);
        }
    }


    
    @Override
    public String toString() {
        return ToStringUtil.toString(this);
    }


    


    
    @Override
    public Valve getBasic() {
        return this.basic;
    }


    
    @Override
    public void setBasic(Valve valve) {

        
        Valve oldBasic = this.basic;
        if (oldBasic == valve)
            return;

        
        if (oldBasic != null) {
            if (getState().isAvailable() && (oldBasic instanceof Lifecycle)) {
                try {
                    ((Lifecycle) oldBasic).stop();
                } catch (LifecycleException e) {
                    log.error(sm.getString("standardPipeline.basic.stop"), e);
                }
            }
            if (oldBasic instanceof Contained) {
                try {
                    ((Contained) oldBasic).setContainer(null);
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                }
            }
        }

        
        if (valve == null)
            return;
        if (valve instanceof Contained) {
            ((Contained) valve).setContainer(this.container);
        }
        if (getState().isAvailable() && valve instanceof Lifecycle) {
            try {
                ((Lifecycle) valve).start();
            } catch (LifecycleException e) {
                log.error(sm.getString("standardPipeline.basic.start"), e);
                return;
            }
        }

        
        Valve current = first;
        while (current != null) {
            if (current.getNext() == oldBasic) {
                current.setNext(valve);
                break;
            }
            current = current.getNext();
        }

        this.basic = valve;

    }


    
    @Override
    public void addValve(Valve valve) {

        
        if (valve instanceof Contained)
            ((Contained) valve).setContainer(this.container);

        
        if (getState().isAvailable()) {
            if (valve instanceof Lifecycle) {
                try {
                    ((Lifecycle) valve).start();
                } catch (LifecycleException e) {
                    log.error(sm.getString("standardPipeline.valve.start"), e);
                }
            }
        }

        
        if (first == null) {
            first = valve;
            valve.setNext(basic);
        } else {
            Valve current = first;
            while (current != null) {
                if (current.getNext() == basic) {
                    current.setNext(valve);
                    valve.setNext(basic);
                    break;
                }
                current = current.getNext();
            }
        }

        container.fireContainerEvent(Container.ADD_VALVE_EVENT, valve);
    }


    
    @Override
    public Valve[] getValves() {

        List&lt;Valve&gt; valveList = new ArrayList<>();
        Valve current = first;
        if (current == null) {
            current = basic;
        }
        while (current != null) {
            valveList.add(current);
            current = current.getNext();
        }

        return valveList.toArray(new Valve[0]);

    }

    public ObjectName[] getValveObjectNames() {

        List&lt;ObjectName&gt; valveList = new ArrayList<>();
        Valve current = first;
        if (current == null) {
            current = basic;
        }
        while (current != null) {
            if (current instanceof JmxEnabled) {
                valveList.add(((JmxEnabled) current).getObjectName());
            }
            current = current.getNext();
        }

        return valveList.toArray(new ObjectName[0]);

    }

    
    @Override
    public void removeValve(Valve valve) {

        Valve current;
        if(first == valve) {
            first = first.getNext();
            current = null;
        } else {
            current = first;
        }
        while (current != null) {
            if (current.getNext() == valve) {
                current.setNext(valve.getNext());
                break;
            }
            current = current.getNext();
        }

        if (first == basic) first = null;

        if (valve instanceof Contained)
            ((Contained) valve).setContainer(null);

        if (valve instanceof Lifecycle) {
            
            if (getState().isAvailable()) {
                try {
                    ((Lifecycle) valve).stop();
                } catch (LifecycleException e) {
                    log.error(sm.getString("standardPipeline.valve.stop"), e);
                }
            }
            try {
                ((Lifecycle) valve).destroy();
            } catch (LifecycleException e) {
                log.error(sm.getString("standardPipeline.valve.destroy"), e);
            }
        }

        container.fireContainerEvent(Container.REMOVE_VALVE_EVENT, valve);
    }


    @Override
    public Valve getFirst() {
        if (first != null) {
            return first;
        }

        return basic;
    }
}

ContainerBase中运用Pipline

那么容器中是如何运用Pipline的呢?

  • 容器中是如何运用Pipline的?

由于Container中都有涉及,实现方法肯定是在抽象的实现类中,所以肯定是在ContainerBase中实现。

  • 初始化
java
protected final Pipeline pipeline = new StandardPipeline(this);

@Override
public Pipeline getPipeline() {
    return this.pipeline;
}
  • Lifecycle模板方法
java
@Override
protected synchronized void startInternal() throws LifecycleException {
...
    
    if (pipeline instanceof Lifecycle) {
        ((Lifecycle) pipeline).start();
    }
...
}

@Override
protected synchronized void stopInternal() throws LifecycleException {
  ...

    
    if (pipeline instanceof Lifecycle &&
            ((Lifecycle) pipeline).getState().isAvailable()) {
        ((Lifecycle) pipeline).stop();
    }
  ...
}

@Override
protected void destroyInternal() throws LifecycleException {
...

    
    if (pipeline instanceof Lifecycle) {
        ((Lifecycle) pipeline).destroy();
    }
...
    super.destroyInternal();
}
  • 重点是backgroundProcess方法
java
@Override
public void backgroundProcess() {

    if (!getState().isAvailable())
        return;

    Cluster cluster = getClusterInternal();
    if (cluster != null) {
        try {
            cluster.backgroundProcess();
        } catch (Exception e) {
            log.warn(sm.getString("containerBase.backgroundProcess.cluster",
                    cluster), e);
        }
    }
    Realm realm = getRealmInternal();
    if (realm != null) {
        try {
            realm.backgroundProcess();
        } catch (Exception e) {
            log.warn(sm.getString("containerBase.backgroundProcess.realm", realm), e);
        }
    }
    
    Valve current = pipeline.getFirst();
    while (current != null) {
        try {
            current.backgroundProcess();
        } catch (Exception e) {
            log.warn(sm.getString("containerBase.backgroundProcess.valve", current), e);
        }
        current = current.getNext();
    }
    fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null);
}

看下相关链路

对比下两种责任链模式

管道/阀门过滤器链/过滤器
管道(Pipeline)过滤器链(FilterChain)
阀门(Valve)过滤器(Filter)
底层实现为具有头(first)、尾(basic)指针的单向链表底层实现为数组
Valve的核心方法invoke(request,response)Filter核心方法doFilter(request,response,chain)
pipeline.getFirst().invoke(request,response)filterchain.doFilter(request,response)