0
点赞
收藏
分享

微信扫一扫

Netty源码分析——read过程

ITWYY 2022-10-17 阅读 68

基于Netty源代码版本:netty-all-4.1.33.Final

前言

上一篇文章中,我们分析了processSelectedKey这个方法中的accept过程,本文将分析一下work线程中的read过程。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
	//检查该SelectionKey是否有效,如果无效,则关闭channel
	if (!k.isValid()) {
		final EventLoop eventLoop;
		try {
			eventLoop = ch.eventLoop();
		} catch (Throwable ignored) {
			// If the channel implementation throws an exception because there is no event loop, we ignore this
			// because we are only trying to determine if ch is registered to this event loop and thus has authority
			// to close ch.
			return;
		}
		// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
		// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
		// still healthy and should not be closed.
		// See https://github.com/netty/netty/issues/5125
		if (eventLoop != this || eventLoop == null) {
			return;
		}
		// close the channel if the key is not valid anymore
		unsafe.close(unsafe.voidPromise());
		return;
	}

	try {
		int readyOps = k.readyOps();
		// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
		// the NIO JDK channel implementation may throw a NotYetConnectedException.
		// 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
			// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
			// See https://github.com/netty/netty/issues/924
			int ops = k.interestOps();
			ops &= ~SelectionKey.OP_CONNECT;
			k.interestOps(ops);

			unsafe.finishConnect();
		}

		// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
		// 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
			ch.unsafe().forceFlush();
		}

		// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
		// to a spin loop
		// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
	} catch (CancelledKeyException ignored) {
		unsafe.close(unsafe.voidPromise());
	}
}

该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况

  • 1)OP_ACCEPT,接受客户端连接
  • 2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
  • 3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
  • 4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。

下面主要来看下当work 线程 selector检测到OP_READ事件时,内部干了些什么。

// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
	unsafe.read();
}

从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_READ,执行unsafe的read方法。注意这里的unsafe是NioByteUnsafe的实例

为什么说这里的unsafe是NioByteUnsafe的实例呢?在上篇博文Netty源码分析:accept中我们知道Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的实例

下面来看下NioByteUnsafe中的read方法,NioByteUnsafe是AbstractNioByteChannel中的内部类

@Override
public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	try {
		do {
			//1、分配缓存
			byteBuf = allocHandle.allocate(allocator);
			//2、将socketChannel数据写入缓存
			allocHandle.lastBytesRead(doReadBytes(byteBuf));
			if (allocHandle.lastBytesRead() <= 0) {
				// nothing was read. release the buffer.
				byteBuf.release();
				byteBuf = null;
				close = allocHandle.lastBytesRead() < 0;
				if (close) {
					// There is nothing left to read as we received an EOF.
					readPending = false;
				}
				break;
			}

			allocHandle.incMessagesRead(1);
			readPending = false;
			//3、触发pipeline的ChannelRead事件来对byteBuf进行后续处理
			pipeline.fireChannelRead(byteBuf);
			byteBuf = null;
		} while (allocHandle.continueReading());

		allocHandle.readComplete();
		pipeline.fireChannelReadComplete();

		if (close) {
			closeOnRead(pipeline);
		}
	} catch (Throwable t) {
		handleReadException(pipeline, byteBuf, t, close, allocHandle);
	} finally {
		// Check if there is a readPending which was not processed yet.
		// This could be for two reasons:
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
		//
		// See https://github.com/netty/netty/issues/2254
		if (!readPending && !config.isAutoRead()) {
			removeReadOp();
		}
	}
}

下面介绍比较重要的代码

allocHandler的实例化过程

allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allocHandler的实例化过程

@Override
public final void read() {
	······
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	······
}

@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
	if (recvHandle == null) {
		recvHandle = config().getRecvByteBufAllocator().newHandle();
	}
	return recvHandle;
}

其中, config.getRecvByteBufAllocator()得到的是一个 AdaptiveRecvByteBufAllocator实例DEFAULT。

public interface ChannelConfig {
	......
	<T extends RecvByteBufAllocator> T getRecvByteBufAllocator();
	......
}

public class DefaultChannelConfig implements ChannelConfig {
	......
    private volatile RecvByteBufAllocator rcvBufAllocator;

	@Override
    public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
        return (T) rcvBufAllocator;
    }

	    @Override
    public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        rcvBufAllocator = checkNotNull(allocator, "allocator");
        return this;
    }
	......
}

而AdaptiveRecvByteBufAllocator中的newHandler()方法的代码如下:

@Override
public Handle newHandle() {
	return new HandleImpl(minIndex, maxIndex, initial);
}

public HandleImpl(int minIndex, int maxIndex, int initial) {
	this.minIndex = minIndex;
	this.maxIndex = maxIndex;

	index = getSizeTableIndex(initial);
	nextReceiveBufferSize = SIZE_TABLE[index];
}

其中,上面方法中所用到参数:minIndex maxIndex initial是什么意思呢? 含义如下:

  • minIndex是最小缓存在SIZE_TABLE中对应的下标。
  • maxIndex是最大缓存在SIZE_TABLE中对应的下标,
  • initial为初始化缓存大小。

AdaptiveRecvByteBufAllocator的相关常量字段

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;

    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;

    private static final int[] SIZE_TABLE;
}

上面这些字段的具体含义说明如下:

  • 1)SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。 从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。SIZE_TABLE初始化过程如下。
static {
	List<Integer> sizeTable = new ArrayList<Integer>();
	for (int i = 16; i < 512; i += 16) {
		sizeTable.add(i);
	}

	for (int i = 512; i > 0; i <<= 1) {
		sizeTable.add(i);
	}

	SIZE_TABLE = new int[sizeTable.size()];
	for (int i = 0; i < SIZE_TABLE.length; i ++) {
		SIZE_TABLE[i] = sizeTable.get(i);
	}
}
  • 2)DEFAULT_MINIMUM:最小缓存(64),在SIZE_TABLE中对应的下标为3。
  • 3)DEFAULT_MAXIMUM :最大缓存(65536),在SIZE_TABLE中对应的下标为38。
  • 4)DEFAULT_INITIAL :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
  • 5)INDEX_INCREMENT:上次预估缓存偏小,下次index的递增值。
  • 6)INDEX_DECREMENT :上次预估缓存偏大,下次index的递减值。

构造函数:

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
	if (minimum <= 0) {
		throw new IllegalArgumentException("minimum: " + minimum);
	}
	if (initial < minimum) {
		throw new IllegalArgumentException("initial: " + initial);
	}
	if (maximum < initial) {
		throw new IllegalArgumentException("maximum: " + maximum);
	}

	int minIndex = getSizeTableIndex(minimum);
	if (SIZE_TABLE[minIndex] < minimum) {
		this.minIndex = minIndex + 1;
	} else {
		this.minIndex = minIndex;
	}

	int maxIndex = getSizeTableIndex(maximum);
	if (SIZE_TABLE[maxIndex] > maximum) {
		this.maxIndex = maxIndex - 1;
	} else {
		this.maxIndex = maxIndex;
	}

	this.initial = initial;
}

该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allocHandle对象所要用到的参数。

private final int minIndex;
private final int maxIndex;
private final int initial;

其中,getSizeTableIndex函数的代码如下,该函数的功能为:找到SIZE_TABLE中的元素刚好大于或等于size的位置。

private static int getSizeTableIndex(final int size) {
	for (int low = 0, high = SIZE_TABLE.length - 1;;) {
		if (high < low) {
			return low;
		}
		if (high == low) {
			return high;
		}

		int mid = low + high >>> 1;
		int a = SIZE_TABLE[mid];
		int b = SIZE_TABLE[mid + 1];
		if (size > b) {
			low = mid + 1;
		} else if (size < a) {
			high = mid - 1;
		} else if (size == a) {
			return mid;
		} else {//这里的情况就是 a < size <= b 的情况
			return mid + 1;
		}
	}
}

byteBuf = allocHandle.allocate(allocator);

申请一块指定大小的内存 RecvByteBufAllocator#DelegatingHandle

public interface RecvByteBufAllocator {
    @Deprecated
    interface Handle {
        
        ByteBuf allocate(ByteBufAllocator alloc);
	}

    class DelegatingHandle implements Handle {
		private final Handle delegate;
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return delegate.allocate(alloc);
        }
	}
}

public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufAllocator {
	private final class HandleImpl implements ExtendedHandle {
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }
	}
}

直接调用了ioBuffer方法,继续看

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {    
	@Override
    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

	@Override
    public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity, maxCapacity);
        }
        return heapBuffer(initialCapacity, maxCapacity);
    }
}

ioBuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。先看 heapBuffer

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {    
    @Override
    public ByteBuf heapBuffer(int initialCapacity) {
        return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }

    @Override
    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);
        return newHeapBuffer(initialCapacity, maxCapacity);
    }
}

这里的newHeapBuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator,则直接返回一个UnpooledHeapByteBuf对象。

public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {    
	@Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
}

public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        return PlatformDependent.hasUnsafe() ?
                new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }
}

再看directBuffer

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
	@Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }

    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);//参数的有效性检查
        return newDirectBuffer(initialCapacity, maxCapacity);
    }
}

与newHeapBuffer一样,这里的newDirectBuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator。这里主要看下UnpooledByteBufAllocator. newDirectBuffer的内部实现

public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {    
	@Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        final ByteBuf buf;
        if (PlatformDependent.hasUnsafe()) {
            buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
    }
}

因为InstrumentedUnpooledUnsafeDirectByteBuf继承自UnpooledUnsafeDirectByteBuf UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。

public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf {    
	UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity, boolean doFree) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialBuffer == null) {
            throw new NullPointerException("initialBuffer");
        }
        if (!initialBuffer.isDirect()) {
            throw new IllegalArgumentException("initialBuffer is not a direct buffer.");
        }
        if (initialBuffer.isReadOnly()) {
            throw new IllegalArgumentException("initialBuffer is a read-only buffer.");
        }

        int initialCapacity = initialBuffer.remaining();
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        doNotFree = !doFree;
        setByteBuffer(initialBuffer.order(ByteOrder.BIG_ENDIAN), false);
        writerIndex(initialCapacity);
    }

	protected ByteBuffer allocateDirect(int initialCapacity) {
		return ByteBuffer.allocateDirect(initialCapacity);
	}

    final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }
        }
        this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }
}

上面代码的主要逻辑为:

  • 1、先利用ByteBuffer的allocateDirect方法分配了大小为initialCapacity的缓存
  • 2、然后判断将旧缓存给free掉
  • 3、最后将新缓存赋给字段buffer上

其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 获取buffer的address字段值,指向缓存地址。 capacity = buffer.remaining() 获取缓存容量。 接下来看toLeakAwareBuffer(buf)方法

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {    
	protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
        ResourceLeakTracker<ByteBuf> leak;
        switch (ResourceLeakDetector.getLevel()) {
            case SIMPLE:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new SimpleLeakAwareByteBuf(buf, leak);
                }
                break;
            case ADVANCED:
            case PARANOID:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new AdvancedLeakAwareByteBuf(buf, leak);
                }
                break;
            default:
                break;
        }
        return buf;
    }
}

方法toLeakAwareBuffer(buf)对申请的buf又进行了一次包装。 上面一长串的分析,得到了缓存后,回到AbstractNioByteChannel.read方法,继续看。

doReadBytes方法

下面看下doReadBytes方法:将socketChannel数据写入缓存。 NioSocketChannel.java

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
	final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
	allocHandle.attemptedBytesRead(byteBuf.writableBytes());
	return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

将Channel中的数据读入缓存byteBuf中。继续看 WrappedByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
	return buf.writeBytes(in, length);
}

AbstractByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
	ensureWritable(length);
	int writtenBytes = setBytes(writerIndex, in, length);
	if (writtenBytes > 0) {
		writerIndex += writtenBytes;
	}
	return writtenBytes;
}

这里的setBytes方法有不同的实现,这里看下UnpooledUnsafeDirectByteBuf的setBytes的实现。

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
	ensureAccessible();
	ByteBuffer tmpBuf = internalNioBuffer();
	tmpBuf.clear().position(index).limit(index + length);
	try {
		return in.read(tmpBuf);
	} catch (ClosedChannelException ignored) {
		return -1;//当Channel 已经关闭,则返回-1
	}
}

private ByteBuffer internalNioBuffer() {
	ByteBuffer tmpNioBuf = this.tmpNioBuf;
	if (tmpNioBuf == null) {
		this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
	}
	return tmpNioBuf;
}

最终底层采用ByteBuffer实现read操作,无论是PooledByteBuf、还是UnpooledXXXBuf,里面都将底层数据结构BufBuffer/array转换为ByteBuffer 来实现read操作。即无论是UnPooledXXXBuf还是PooledXXXBuf里面都有一个ByteBuffer tmpNioBuf,这个tmpNioBuf才是真正用来存储从管道Channel中读取出的内容的。到这里就完成了将channel的数据读入到了缓存Buf中。

 

我们具体来看看 in.read(tmpBuf); FileChannel和SocketChannel的read最后都是依赖的IOUtil来实现,代码如下:

public class FileChannelImpl extends FileChannel {    
	public int read(ByteBuffer var1) throws IOException {
        this.ensureOpen();
        if (!this.readable) {
            throw new NonReadableChannelException();
        } else {
            synchronized(this.positionLock) {
                int var3 = 0;
                int var4 = -1;

                try {
                    this.begin();
                    var4 = this.threads.add();
                    if (!this.isOpen()) {
                        byte var12 = 0;
                        return var12;
                    } else {
                        do {
                            var3 = IOUtil.read(this.fd, var1, -1L, this.nd);
                        } while(var3 == -3 && this.isOpen());

                        int var5 = IOStatus.normalize(var3);
                        return var5;
                    }
                } finally {
                    this.threads.remove(var4);
                    this.end(var3 > 0);

                    assert IOStatus.check(var3);

                }
            }
        }
    }
}

最后目的就是将SocketChannel中的数据读出存放到ByteBuffer dst中,我们看看 IOUtil.read(fd, dst, -1, nd)

public class IOUtil {    
	static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
			//如果最终承载数据的buffer是DirectBuffer,则直接将数据读入到堆外内存中
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
			// 分配临时的堆外内存
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

            int var7;
            try {
				// Socket I/O 操作会将数据读入到堆外内存中
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
					// 将堆外内存的数据拷贝到堆内存中(用户定义的缓存,在jvm中分配内存)
                    var1.put(var5);
                }

                var7 = var6;
            } finally {
				// 里面会调用DirectBuffer.cleaner().clean()来释放临时的堆外内存
                Util.offerFirstTemporaryDirectBuffer(var5);
            }

            return var7;
        }
    }
}

通过上述实现可以看出,基于channel的数据读取步骤如下:

  • 1、如果缓存内存是DirectBuffer,就直接将Channel中的数据读取到堆外内存
  • 2、如果缓存内存是堆内存,则先申请一块和缓存同大小的临时 DirectByteBuffer var5。
  • 3、将内核缓存中的数据读到堆外缓存var5,底层由NativeDispatcher的read实现。
  • 4、把堆外缓存var5的数据拷贝到堆内存var1(用户定义的缓存,在jvm中分配内存)。
  • 5、会调用DirectBuffer.cleaner().clean()来释放创建的临时的堆外内存

如果AbstractNioByteChannel.read中第一步创建的是堆外内存,则会直接将数据读入到堆外内存,并不会先创建临时堆外内存,再将数据读入到堆外内存,最后将堆外内存拷贝到堆内存 简单的说,如果使用堆外内存,则只会复制一次数据,如果使用堆内存,则会复制两次数据

我们来看看readIntoNativeBuffer

private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
	int var5 = var1.position();
	int var6 = var1.limit();

	assert var5 <= var6;

	int var7 = var5 <= var6 ? var6 - var5 : 0;
	if (var7 == 0) {
		return 0;
	} else {
		boolean var8 = false;
		int var9;
		if (var2 != -1L) {
			var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
		} else {
			var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
		}

		if (var9 > 0) {
			//重新定位buffer的position 
			var1.position(var5 + var9);
		}

		return var9;
	}
}

这个函数就是将内核缓冲区中的数据读取到堆外缓存DirectBuffer 回到AbstractNioByteChannel.read方法,继续看。

@Override
public final void read() {
	final ChannelConfig config = config();
	if (shouldBreakReadReady(config)) {
		clearReadPending();
		return;
	}
	final ChannelPipeline pipeline = pipeline();
	final ByteBufAllocator allocator = config.getAllocator();
	final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
	allocHandle.reset(config);

	ByteBuf byteBuf = null;
	boolean close = false;
	try {
		do {
			byteBuf = allocHandle.allocate(allocator);
			allocHandle.lastBytesRead(doReadBytes(byteBuf));
			if (allocHandle.lastBytesRead() <= 0) {
				// nothing was read. release the buffer.
				byteBuf.release();
				byteBuf = null;
				close = allocHandle.lastBytesRead() < 0;
				if (close) {
					// There is nothing left to read as we received an EOF.
					readPending = false;
				}
				break;
			}

			allocHandle.incMessagesRead(1);
			readPending = false;
			pipeline.fireChannelRead(byteBuf);
			byteBuf = null;
		} while (allocHandle.continueReading());

		allocHandle.readComplete();
		pipeline.fireChannelReadComplete();

		if (close) {
			closeOnRead(pipeline);
		}
	} catch (Throwable t) {
		handleReadException(pipeline, byteBuf, t, close, allocHandle);
	} finally {
		// Check if there is a readPending which was not processed yet.
		// This could be for two reasons:
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
		//
		// See https://github.com/netty/netty/issues/2254
		if (!readPending && !config.isAutoRead()) {
			removeReadOp();
		}
	}
}

int localReadAmount = doReadBytes(byteBuf);

  • 1、如果返回0,则表示没有读取到数据,则退出循环。
  • 2、如果返回-1,表示对端已经关闭连接,则退出循环。
  • 3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。Pipeline的事件处理在我之前的博文中有详细的介绍。处理完成之后,再一次从Channel读取数据,直至退出循环。
  • 4、循环次数超过maxMessagesPerRead时,即只能在管道中读取maxMessagesPerRead次数据,既是还没有读完也要退出。在上篇博文中,Boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到OP_ACCEPT事件后一次只能接受maxMessagesPerRead个客户端连接

 

参考: https://www.cnblogs.com/java-chen-hao/p/11477384.html

举报

相关推荐

0 条评论