时间:10:37:39 来源:阿布扎比 作者:蜘蛛资讯网 点击:3535
{随机段子}

辽源市乩只刚有限公司_Netty源码—三、select

NioEventLoop功能

前面channel已经准备好了,可以接收来自客户端的请求了,NioEventLoop作为一个线程池,只有一个线程,但是有一个queue存储了待执行的task,由于只有一个线程,所以run方法是死循环,除非线程池shutdown。

这个run方法的主要作用:

  1. 执行selector.select,监听IO事件,并处理IO事件
  2. 由于NioEventLoop兼有线程池的功能,执行线程池中任务
// io.netty.channel.nio.NioEventLoop#run
protected void run() {
    // loop,循环处理IO事件或者处理线程池中的task任务
    for (;;) {
        try {
            // 判断接下来是是执行select还是直接处理IO事件和执行队列中的task
            // hasTask判断当前线程的queue中是否还有待执行的任务
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    // NioEventLoop默认不会有这种状态
                    continue;
                case SelectStrategy.SELECT:
                    // 说明当前queue中没有task待执行
                    select(wakenUp.getAndSet(false));
                    // 唤醒epoll_wait
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // 这个比例是处理IO事件所需的时间和花费在处理task时间的比例
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                // 如果比例是100,表示每次都处理完IO事件后,执行所有的task
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 保证能执行所有的task
                    runAllTasks();
                }
            } else {
                // 记录处理IO事件开始的时间
                final long ioStartTime = System.nanoTime();
                try {
                    // 处理IO事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio
                    // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            // 如果已经shutdown则关闭所有资源
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

// io.netty.channel.DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待
    // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回
        return selectNow();
    }
};

select过程

epoll模型中最重要的一部分来了,Java把epoll_wait封装成了一个selector,可以理解为多路复用选择器,所以在调用selector.select过程中最后都是通过epoll_wait实现的,下面先看看SelectorImpl的两个select方法

public int select(long timeout)
    throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    // timeout = 0,传递给epoll_wait的参数是-1,表示阻塞等待
    // timeout > 0,表示超时等待timeout时间后返回
    return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}

// 调用epoll_wait阻塞等待
public int select() throws IOException {
    return select(0);
}

// 调用epoll_wait立即返回
public int selectNow() throws IOException {
    return lockAndDoSelect(0);
}

上面三个select方法都调用了lockAndDoSelect,只是timeout参数不同,其实最后就是调用epoll_wait参数不同,epoll_wait有一个timeout参数,表示超时时间

  • -1:阻塞
  • 0:立即返回,非阻塞
  • 大于0:指定微秒
// sun.nio.ch.EPollSelectorImpl#doSelect
protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    // 省略中间代码...
    // 开始poll,这里的pollWrapper是EPollArrayWrapper
    pollWrapper.poll(timeout);
    // 省略中间代码...

    int numKeysUpdated = updateSelectedKeys();
    // 如果epoll_wait是因为wakeup pipe解除阻塞返回
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        // 清除中断文件描述符接收到的IO事件
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            // 读取完管道中的数据
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

int poll(long timeout) throws IOException {
    // 这里会向epoll注册每个socket需要监听的事件
    updateRegistrations();
    // 调用epollWait,这是一个native方法
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i

看看epollWait的native实现

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        // epoll_wait参数的含义是
        // epfd,创建的epoll句柄
        // events是一个结构体指针,如果有IO事件发生,linux会将事件放在这个结构体中返回
        // numfds是上面指针指向的结构体的个数,也就是最多能接收的IO事件的个数
        // timeout是超时时间
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

从native实现上可以看出最终调用了epoll_wait(在timeout >= 0),接着看看epoll_wait的里一个参数events的来源。在上一篇文章里面我们说了channel注册的时候会将自己需要监听的事件类型保存在sun.nio.ch.EPollArrayWrapper#eventsLow中,而上面EPollArrayWrapper#poll中又调用了updateSelectedKeys来注册每个socket监听的事件

// sun.nio.ch.EPollArrayWrapper#getUpdateEvents
// 获取需要监听的文件描述符对应的事件
private byte getUpdateEvents(int fd) {
    // 如果没有超出预定义的数组大小则直接从数组中获取
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        return eventsLow[fd];
    } else {
        // 超出预订单数组大小的部分从map中获取
        Byte result = eventsHigh.get(Integer.valueOf(fd));
        // result should never be null
        return result.byteValue();
    }
}

// sun.nio.ch.EPollArrayWrapper#updateRegistrations
// 这个方法是在epoll_wait前把需要监听的文件描述符及其需要监听的事件注册到epoll上
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        // 每调用一次setInterest,updateCount加1
        while (j < updateCount) {
            // 需要监听的文件描述符
            int fd = updateDescriptors[j];
            // 需要监听的事件,比如channel注册之后的事件是
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 调用epollCtl来add、update或者delete对应文件描述符坚挺的事件
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}

上面epollCtl又是一个native方法

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    // opcode,EPOLL_CTL_ADD(注册新的fd到epfd), EPOLL_CTL_MOD(修改已经注册的fd的监听事件), EPOLL_CTL_DEL(从epfd删除一个fd);
    // fd,需要监听的socket对应的文件描述符
    // event,该文件描述符监听的事件
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
    // 省略中间代码...
}

关于select过程中的中断说明

这里的中断是什么?

这里中断并不是操作系统层面的中断,只是中断epoll_wait。由于epoll_wait可能会阻塞等待IO事件(timeout = -1),这里的中断就是指中断epoll_wait,即时返回。也就是让select即时返回

这里的中断是怎么实现的?

由于epoll_wait处在等待的情况下的时候,如果有文件描述符上有事件发生,epoll_wait就会返回,所以基本思路就是在epoll监控的文件描述符上产生IO事件,具体实现原理就是使用管道创建两个文件描述符fd0,fd1(EPollSelectorImpl),fd0用来作为读描述符,fd1作为写描述符,然后将读描述符注册到epoll上,如果向fd1写内容,epoll发现fd0有IO事件就会返回,起到了让epoll_wait及时返回的作用。

什么时候会中断

  1. 调用Thread.interrupt()
  2. selector关闭的时候
  3. 可以直接调用sun.nio.ch.EPollSelectorImpl#wakeup,这是一个public方法

中断的方法是sun.nio.ch.EPollArrayWrapper#interrupt(),这个方法会调用一个native方法

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
    int fakebuf[1];
    fakebuf[0] = 1;
    // 传入的文件描述符是sun.nio.ch.EPollArrayWrapper#outgoingInterruptFD,也就是创建的pipe的写文件描述符fd1,向pipe的fd1写入一个字节的1
    if (write(fd, fakebuf, 1) < 0) {
        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
    }
}

这个时候epoll_wait就收到中断文件描述符sun.nio.ch.EPollArrayWrapper#incomingInterruptFD,也就是创建的pipe的读文件描述符上有IO事件产生,epoll_wait可以返回。

jdk src solaris native sun nio ch EPollArrayWrapper. c JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt JNIEnv env, jobject this, jint fd int fakebuf 1 fakebuf 1 chuan ru de wen jian miao shu fu shi sun. nio. ch. EPollArrayWrapper outgoingInterruptFD, ye jiu shi chuang jian de pipe de xie wen jian miao shu fu fd1, xiang pipe de fd1 xie ru yi ge zi jie de 1 if write fd, fakebuf, 1 lt JNU_ThrowIOExceptionWithLastError env," write to interrupt fd failed" zhe ge shi hou epoll_wait jiu shou dao zhong duan wen jian miao shu fu sun. nio. ch. EPollArrayWrapper incomingInterruptFD, ye jiu shi chuang jian de pipe de du wen jian miao shu fu shang you IO shi jian chan sheng, epoll_wait ke yi fan hui.

所以调用到方法EPollArrayWrapper#interrupt()就可以中断文件描述符,而方法EPollSelectorImpl#wakeup调用了EPollArrayWrapper#interrupt()。

那么为什么调用Thread.interrupt()的时候也会中断epoll_wait呢?

因为在Thread.interrupt()方法中

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            // 上面设置完中断标志位后,会调用当前线程的blocker的interrupt方法
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

而在sun.nio.ch.EPollSelectorImpl#doSelect方法中,开始poll之前会调用begin方法

protected final void begin() {
    if (interruptor == null) {
        // 新建一个interruptor
        interruptor = new Interruptible() {
            public void interrupt(Thread ignore) {
                // 此时Thread.interrupt()中的blocker就是这个匿名内部类,也就是调用的这个interrupt方法
                AbstractSelector.this.wakeup();
            }};
    }
    // 设置当前线程的interruptor
    AbstractInterruptibleChannel.blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

所以Thread.interrupt()会调用到EPollSelectorImpl#wakeup方法,也就可以起到中断select的作用。

什么时候清除中断标志?

可以不止一次的中断select,为了实现这个功能,每次在中断之后斗湖清除相关的中断标志。在sun.nio.ch.EPollSelectorImpl#doSelect方法中pollWrapper.poll完成之后

int poll(long timeout) throws IOException {
    updateRegistrations();
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i

总结

到目前为止已经看清了Java对应linux中epoll相关api的封装

// 创建epoll文件描述符
// 对应到Java就是创建selector
int epoll_create(int size);

// 打开一个网络通讯端口,也就是创建一个socket,创建返回一个文件描述符
// 对应到Java就是创建一个socketChannel
int socket(int domain, int type, int protocol);

// 将socket对应的文件描述符和ip:port绑定在一起
// 对应于Java中绑定ip:port
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

// 表明socket对应的文件描述符处于监听状态,并且最多允许有backlog个客户端处于连接等待状态
// 对应于Java中bind中调用listen方法
int listen(int sockfd, int backlog);

// 控制某个文件描述符上的事件:add、update、delete事件
// 对应于Java中调用select过程中添加每个channel关注的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 等待监控的所有描述符有事件发生
// 对应于Java中select的时候等待有IO事件发生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

当前文章:http://www.tomaszew.com/6pca/277459-470966-42028.html

发布时间:04:22:30

状元阁论坛??www.599799.com??www.550tm.com??王中王论坛www74123com??赛马会官方网??白小姐开奖结果??六合刘伯温论坛??挂牌??香港开奖现场直播结果??牛牛高手论坛网站??

本文标签: 万凰之王 邵阳肆炼棠科技有限公司 乐平市呀邻有限公司

回到顶部