Skip to content

基于前面的几篇文章,我们终于可以总体上梳理Server的具体实现了,这里体现在StandardServer具体的功能实现上。@anarkh

  • Tomcat - Server的设计和实现: StandardServer
  • 理解思路
  • Server结构设计
  • server.xml
  • Server中的接口设计
  • StandardServer的实现
  • 线程池
  • Service相关方法实现
  • Lifecycle相关模板方法
  • 参考文章

理解思路

  • 第一:抓住StandardServer整体类依赖结构来理解

  • 第二:结合server.xml来理解

见下文具体阐述。

  • 第三:结合Server Config官方配置文档

http://tomcat.apache.org/tomcat-9.0-doc/config/server.html

Server结构设计

我们需要从高一点的维度去理解Server的结构设计,而不是多少方法多少代码;这里的理解一定是要结合Server.xml对应理解。@anarkh

server.xml

  • 首先要看下server.xml,这样你便知道了需要了解的四个部分
xml
<Server port="8005" shutdown="SHUTDOWN">
  

  
  <Listener className="org.apache.catalina.core.AprLifecycleListener" />
  <Listener className="org.apache.catalina.mbeans.ServerLifecycleListener" />
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
  <Listener className="org.apache.catalina.storeconfig.StoreConfigLifecycleListener"/>

  
  <GlobalNamingResources>

    <Environment name="simpleValue" type="java.lang.Integer" value="30"/>

    <Resource name="UserDatabase" auth="Container"
              type="org.apache.catalina.UserDatabase"
       description="User database that can be updated and saved"
           factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
          pathname="conf/tomcat-users.xml" />

  </GlobalNamingResources>

  
  <Service name="Catalina">

  </Service>
</Server>

Server中的接口设计

  • 公共属性 , 包括上面的port,shutdown, address等
java
public int getPort();



public void setPort(int port);


public int getPortOffset();


public void setPortOffset(int portOffset);


public int getPortWithOffset();


public String getAddress();



public void setAddress(String address);



public String getShutdown();



public void setShutdown(String shutdown);


public int getUtilityThreads();



public void setUtilityThreads(int utilityThreads);
属性描述
className使用的Java类名称。此类必须实现org.apache.catalina.Server接口。如果未指定类名,则将使用标准实现。
address该服务器等待关闭命令的TCP / IP地址。如果未指定地址,localhost则使用。
port该服务器等待关闭命令的TCP / IP端口号。设置为-1禁用关闭端口。注意:当使用Apache Commons Daemon启动Tomcat (在Windows上作为服务运行,或者在un * xes上使用jsvc运行)时,禁用关闭端口非常有效。但是,当使用标准shell脚本运行Tomcat时,不能使用它,因为它将阻止shutdown.bat
portOffset应用于port和嵌套到任何嵌套连接器的端口的偏移量。它必须是一个非负整数。如果未指定,0则使用默认值。
shutdown为了关闭Tomcat,必须通过与指定端口号的TCP / IP连接接收的命令字符串。
utilityThreads此service中用于各种实用程序任务(包括重复执行的线程)的线程数。特殊值0将导致使用该值 Runtime.getRuntime().availableProcessors()。Runtime.getRuntime().availableProcessors() + value除非小于1,否则将使用负值, 在这种情况下将使用1个线程。预设值是1。
  • NamingResources
java
public NamingResourcesImpl getGlobalNamingResources();



public void setGlobalNamingResources
    (NamingResourcesImpl globalNamingResources);



public javax.naming.Context getGlobalNamingContext();
  • Service相关, 包括添加Service, 查找Service,删除service等
java
public void addService(Service service);



public void await();



public Service findService(String name);



public Service[] findServices();



public void removeService(Service service);

StandardServer的实现

线程池

java
@Override
public int getUtilityThreads() {
    return utilityThreads;
}



private static int getUtilityThreadsInternal(int utilityThreads) {
    int result = utilityThreads;
    if (result &lt;= 0) {
        result = Runtime.getRuntime().availableProcessors() + result;
        if (result < 2) {
            result = 2;
        }
    }
    return result;
}


@Override
public void setUtilityThreads(int utilityThreads) {
    
    int oldUtilityThreads = this.utilityThreads;
    if (getUtilityThreadsInternal(utilityThreads) < getUtilityThreadsInternal(oldUtilityThreads)) {
        return;
    }
    this.utilityThreads = utilityThreads;
    if (oldUtilityThreads != utilityThreads && utilityExecutor != null) {
        reconfigureUtilityExecutor(getUtilityThreadsInternal(utilityThreads));
    }
}


private synchronized void reconfigureUtilityExecutor(int threads) {
    
    if (utilityExecutor != null) {
        utilityExecutor.setCorePoolSize(threads);
    } else {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
                new ScheduledThreadPoolExecutor(threads,
                        new TaskThreadFactory("Catalina-utility-", utilityThreadsAsDaemon, Thread.MIN_PRIORITY));
        scheduledThreadPoolExecutor.setKeepAliveTime(10, TimeUnit.SECONDS);
        scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
        scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        utilityExecutor = scheduledThreadPoolExecutor;
        utilityExecutorWrapper = new org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor(utilityExecutor);
    }
}



public boolean getUtilityThreadsAsDaemon() {
    return utilityThreadsAsDaemon;
}



public void setUtilityThreadsAsDaemon(boolean utilityThreadsAsDaemon) {
    this.utilityThreadsAsDaemon = utilityThreadsAsDaemon;
}

Service相关方法实现

里面的方法都很简单。

java
@Override
public void addService(Service service) {

    service.setServer(this);

    synchronized (servicesLock) {
        Service results[] = new Service[services.length + 1];
        System.arraycopy(services, 0, results, 0, services.length);
        results[services.length] = service;
        services = results;

        if (getState().isAvailable()) {
            try {
                service.start();
            } catch (LifecycleException e) {
                
            }
        }

        
        support.firePropertyChange("service", null, service);
    }

}

public void stopAwait() {
    stopAwait=true;
    Thread t = awaitThread;
    if (t != null) {
        ServerSocket s = awaitSocket;
        if (s != null) {
            awaitSocket = null;
            try {
                s.close();
            } catch (IOException e) {
                
            }
        }
        t.interrupt();
        try {
            t.join(1000);
        } catch (InterruptedException e) {
            
        }
    }
}


@Override
public void await() {
    
    if (getPortWithOffset() == -2) {
        
        return;
    }
    if (getPortWithOffset() == -1) {
        try {
            awaitThread = Thread.currentThread();
            while(!stopAwait) {
                try {
                    Thread.sleep( 10000 );
                } catch( InterruptedException ex ) {
                    
                }
            }
        } finally {
            awaitThread = null;
        }
        return;
    }

    
    try {
        awaitSocket = new ServerSocket(getPortWithOffset(), 1,
                InetAddress.getByName(address));
    } catch (IOException e) {
        log.error(sm.getString("standardServer.awaitSocket.fail", address,
                String.valueOf(getPortWithOffset()), String.valueOf(getPort()),
                String.valueOf(getPortOffset())), e);
        return;
    }

    try {
        awaitThread = Thread.currentThread();

        
        while (!stopAwait) {
            ServerSocket serverSocket = awaitSocket;
            if (serverSocket == null) {
                break;
            }

            
            Socket socket = null;
            StringBuilder command = new StringBuilder();
            try {
                InputStream stream;
                long acceptStartTime = System.currentTimeMillis();
                try {
                    socket = serverSocket.accept();
                    socket.setSoTimeout(10 * 1000);  
                    stream = socket.getInputStream();
                } catch (SocketTimeoutException ste) {
                    
                    
                    log.warn(sm.getString("standardServer.accept.timeout",
                            Long.valueOf(System.currentTimeMillis() - acceptStartTime)), ste);
                    continue;
                } catch (AccessControlException ace) {
                    log.warn(sm.getString("standardServer.accept.security"), ace);
                    continue;
                } catch (IOException e) {
                    if (stopAwait) {
                        
                        break;
                    }
                    log.error(sm.getString("standardServer.accept.error"), e);
                    break;
                }

                
                int expected = 1024; 
                while (expected < shutdown.length()) {
                    if (random == null)
                        random = new Random();
                    expected += (random.nextInt() % 1024);
                }
                while (expected > 0) {
                    int ch = -1;
                    try {
                        ch = stream.read();
                    } catch (IOException e) {
                        log.warn(sm.getString("standardServer.accept.readError"), e);
                        ch = -1;
                    }
                    
                    if (ch < 32 || ch == 127) {
                        break;
                    }
                    command.append((char) ch);
                    expected--;
                }
            } finally {
                
                try {
                    if (socket != null) {
                        socket.close();
                    }
                } catch (IOException e) {
                    
                }
            }

            
            boolean match = command.toString().equals(shutdown);
            if (match) {
                log.info(sm.getString("standardServer.shutdownViaPort"));
                break;
            } else
                log.warn(sm.getString("standardServer.invalidShutdownCommand", command.toString()));
        }
    } finally {
        ServerSocket serverSocket = awaitSocket;
        awaitThread = null;
        awaitSocket = null;

        
        if (serverSocket != null) {
            try {
                serverSocket.close();
            } catch (IOException e) {
                
            }
        }
    }
}



@Override
public Service findService(String name) {
    if (name == null) {
        return null;
    }
    synchronized (servicesLock) {
        for (Service service : services) {
            if (name.equals(service.getName())) {
                return service;
            }
        }
    }
    return null;
}



@Override
public Service[] findServices() {
    return services;
}


public ObjectName[] getServiceNames() {
    ObjectName onames[]=new ObjectName[ services.length ];
    for( int i=0; i<services.length; i++ ) {
        onames[i]=((StandardService)services[i]).getObjectName();
    }
    return onames;
}



@Override
public void removeService(Service service) {

    synchronized (servicesLock) {
        int j = -1;
        for (int i = 0; i < services.length; i++) {
            if (service == services[i]) {
                j = i;
                break;
            }
        }
        if (j < 0)
            return;
        try {
            services[j].stop();
        } catch (LifecycleException e) {
            
        }
        int k = 0;
        Service results[] = new Service[services.length - 1];
        for (int i = 0; i < services.length; i++) {
            if (i != j)
                results[k++] = services[i];
        }
        services = results;

        
        support.firePropertyChange("service", service, null);
    }

}

Lifecycle相关模板方法

这里只展示startInternal方法

java
@Override
protected void startInternal() throws LifecycleException {

    fireLifecycleEvent(CONFIGURE_START_EVENT, null);
    setState(LifecycleState.STARTING);

    globalNamingResources.start();

    
    synchronized (servicesLock) {
        for (int i = 0; i < services.length; i++) {
            services[i].start();
        }
    }

    if (periodicEventDelay > 0) {
        monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run() {
                        startPeriodicLifecycleEvent();
                    }
                }, 0, 60, TimeUnit.SECONDS);
    }
}
    
protected void startPeriodicLifecycleEvent() {
    if (periodicLifecycleEventFuture == null || (periodicLifecycleEventFuture != null && periodicLifecycleEventFuture.isDone())) {
        if (periodicLifecycleEventFuture != null && periodicLifecycleEventFuture.isDone()) {
            
            try {
                periodicLifecycleEventFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                log.error(sm.getString("standardServer.periodicEventError"), e);
            }
        }
        periodicLifecycleEventFuture = getUtilityExecutor().scheduleAtFixedRate(
                new Runnable() {
                    @Override
                    public void run() {
                        fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null);
                    }
                }, periodicEventDelay, periodicEventDelay, TimeUnit.SECONDS);
    }
}

方法的第一行代码先触发 CONFIGURE_START_EVENT 事件,以便执行 StandardServer 的 LifecycleListener 监听器,然后调用 setState 方法设置成 LifecycleBase 的 state 属性为 LifecycleState.STARTING。 接着就 globalNamingResources.start(),跟 initInternal 方法其实是类似的。

再接着就调用 Service 的 start 方法来启动 Service 组件。可以看出,StandardServe 的 startInternal 跟 initInternal 方法类似,都是调用内部的 service 组件的相关方法。

调用完 service.init 方法后,就使用 getUtilityExecutor() 返回的线程池延迟执行startPeriodicLifecycleEvent 方法,而在 startPeriodicLifecycleEvent 方法里,也是使用 getUtilityExecutor() 方法,定期执行 fireLifecycleEvent 方法,处理 Lifecycle.PERIODIC_EVENT 事件,如果有需要定期处理的,可以再 Server 的 LifecycleListener 里处理 Lifecycle.PERIODIC_EVENT 事件。

参考文章

https://segmentfault.com/a/1190000022016991