recycler:Netty源码解析(八)之Recycler

当代码中存在大量的对某一个类的对象进行创建和销毁时,可以使用对象池(类似于线程池),因为对象的创建是耗费一定时间的,因此使用线程池可以在一定程度上提升程序的性能。例如,在PooledByteBufAllocator中,需要不停的对内存进行回收和释放,相应的就会不停地创建和回收ByteBuf,因此Netty中提供了一个对象池工具类——Recycler。

一. Recycler的使用

Recycler类的使用涉及到三个类:Recycler、Handle以及用户自定义的需要缓存的类。Recycler可以理解为一个对象回收器,或者说一个对象池,Handle用来负责对User对象的具体的管理,每个User对象实例都对应一个Handle实例。

//用户自定义的User类,即需要被Recycler管理的类private static class User { //必须要有一个handle成员变量,该handle负责对User对象的管理 private final Recycler.Handle<User> handle; //必须要有这样一个构造方法,将传递的handle保存到成员变量 public User(Recycler.Handle<User> handle) { this.handle = handle; } //回收user对象的方法,需要调用handle.recycle(this)将该User对象进行回收 public void recycle(){ handle.recycle(this); }}public static void main(String[] args){ Recycler<User> recycler = new Recycler<User>() { //重写newObject方法,recycler调用该方法来创建一个新对象, //注意必须将handle传递给新建的User对象,否则无法对该对象进行管理 @Override protected User newObject(Handle<User> handle) { return new User(handle); } }; //从对象池中取一个对象,此时对象池中还没有缓存的对象,因此会调用newObject方法创建一个User实例 User user1 = recycler.get(); //将该User实例放回对象池中 user1.recycle(); //从对象池中取一个对象,这时对象池中有一个之前回收的对象,因此无需调用newObject方法,而是直接返回池中的对象 User user2 = recycler.get(); System.out.println(user1==user2);//true //再从对象池中取一个对象,这时又需要调用newObject方法来新建一个对象 User user3 = recycler.get(); System.out.println(user2==user3);//false}

二. 新对象的创建

Recycler是一个线程安全的类,是专门为了应对多线程场景而设计的。一个object对象在A线程中创建,可以在b线程中通过调用object.recycle()方法将其回收到Recycler中。因此Recycler有一个FastThreadLocal的成员变量,该threadLocal中为每个线程保存了一个Stack,这个Stack就是线程的对象池。关于Stack的详细介绍放在下一节讲,这里先来看一下Recycler的几个重要的参数,这些参数在初始化Stack的时候也会传递给Stack。

  • maxCapacityPerThread. 指的是为每个线程最多缓存多少个对象。如果某个线程的stack中缓存的对象大于这个值了,那么该线程再对对象进行回收时,Recycler会直接将其丢掉。
  • maxSharedCapacityFactor. 指的是一个线程新创建的对象可以被其他线程回收的比例。availableSharedCapacity=maxCapacityPerThread/maxSharedCapacityFactor
    一个线程创建的对象,最多只能有availableSharedCapacity个被其他线程回收。
  • interval. 指的是隔多少次才真正回收一次对象。其实调用object.recycle()时,Recycler并不一定会真正将该object放入缓存池,例如,当intercal=8时,那么调用8次object.recycle()方法,Recycler只会有一次将object进行缓存,另外七次都会将object直接丢弃,即Recycler只会缓存1/8的对象(简称interval机制),其目的就是为了避免频繁地回收对象。
  • maxDelayedQueuesPerThread. 指的是一个线程回收的其他线程的对象最多能属于几个线程。
  • private final int maxCapacityPerThread;private final int maxSharedCapacityFactor;private final int interval;private final int maxDelayedQueuesPerThread;private final FastThreadLocal<Recycler.Stack<T>> threadLocal = new FastThreadLocal<Recycler.Stack<T>>() { @Override protected Recycler.Stack<T> initialValue() { return new Recycler.Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Recycler.Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } }};

    来看一下Recycler的get()方法,首先判断参数maxCapacityPerThread是否为0,为0意味着每个线程最大允许缓存的对象数为0,即对象池中不缓存任何对象,因此直接返回一个新对象。如果不为0,则从threadLocal中取出该线程的stack,再从stack中pop一个对象(stack中其实缓存的是对象的handle,handle与object是一一对应的),如果handle为null,则说明此时stack中没有缓存任何对象,那么就调用newObject方法,创建一个新对象然后返回。

    public final T get() { if (maxCapacityPerThread == 0) { //每个线程最大允许缓存的对象数为0,即不缓存任何对象,因此直接返回一个新对象。 return newObject((Recycler.Handle<T>) NOOP_HANDLE); } //通过threadLocal获取该线程的stack Recycler.Stack<T> stack = threadLocal.get(); //从stack中取出一个缓存的对象 Recycler.DefaultHandle<T> handle = stack.pop(); if (handle == null) //该线程的stack中没有缓存任何对象,因此创建一个新对象 { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value;}

    三. 同线程回收对象、同线程取回对象

    通过调用对象的recycle()方法对一个对象进行回收,我们来看一下recycle()方法的具体实现。recycle()方法最终会调用stack.push()方法,将对象的回收操作交由对象的stack来执行,而在此之前,会先判断lastRecycledId和recycleId的值是否相等。lastRecycledId和recycleId是用来表示一个对象的状态,他们的值有如下几种情况:

  • lastRecycledId==recycleId==0. 说明该对象正在被某个线程使用,属于未被回收的状态。这只出现在以下两种情况中:1.通过newObject()方法新创建一个对象时,他的lastRecycledId和recycleId都为0;2.通过pop()方法从stack中取出一个对象时,会将他的lastRecycledId和recycleId都设置为0。
  • lastRecycledId==recycleId==Integer.MIN_VALUE+1. 说明该对象已经被回收,且是被创建他的线程所回收。
  • lastRecycledId==0,recycleId!=0. 说明该对象已经被回收,但不是被创建他的线程所回收。
  • 只有当lastRecycledId != recycleId,说明该对象之前已经被回收过了,所以这里会抛出异常,当然,也有可能lastRecycledId==recycleId==Integer.MIN_VALUE+1,这种情况在stack.push()方法中会进行判断。

    public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } stack.push(this);}

    在讲push方法的具体实现之前,先来看一下Stack类的内部构造。

  • WeakReference<Thread> threadRef.  该stack所属的线程,这是一个虚引用,因此当线程运行完不存在了之后,threadRef会等于null,该stack也会失效。
  • availableSharedCapacity、maxDelayedQueues、maxCapacity、interval. 这几个参数的含义上一节已经讲过了,不再赘述。
  • DefaultHandle<?>[] elements. stack中缓存的对象,用一个数组保存,这里缓存的对象是threadRef创建且也是threadRef回收的对象。
  • int size. stack中缓存的对象的个数。
  • int handleRecycleCount. 用来控制对象回收的频率,具体看注释。
  • WeakOrderQueue cursor, prev, head. 通过cursor,prev,head维护一个WeakOrderQueue双向链表,该链表中保存的是在threadRef中创建、但却在其他线程中回收的对象。从stack中取对象时,先从elements数组中找,elements中保存的是该线程创建且也是该线程回收的对象,如果在elements数组中找不到,就会去WeakOrderQueue中去找,如果还找不到,才会通过newObject方法创建一个新对象。
  • //该stack所属的线程final WeakReference<Thread> threadRef; final AtomicInteger availableSharedCapacity;private final int maxDelayedQueues;private final int maxCapacity;private final int interval;//该stack中缓存的对象,用一个数组保存DefaultHandle<?>[] elements;//stack中缓存的对象的个数int size;//interval用来指定回收对象的频率,handleRecycleCount则是用来标记是否需要真正回收对象。//handleRecycleCount初始值为interval,每次通过object.recycle()回收对象时,handleRecycleCount值+1,//handleRecycleCount等于interval时才会真正回收对象,然后handleRecycleCount再重置为0private int handleRecycleCount;private WeakOrderQueue cursor, prev;private volatile WeakOrderQueue head;

     stack的push()方法,首先判断当前线程(也就是要回收此对象的线程)是不是创建该对象的线程,如果是,调用pushNow方法,该对象会被放入stack的elements数组中,如果不是,调用pushLater方法,该对象会被放入stack的weakOrderQueue中。

    pushNow方法,首先判断recycleId和lastRecycleId是否有一个不等于0,如果有,那么抛出异常(前面已经说过)。然后将recycleId和lastRecycleId都设置为OWN_THREAD_ID,表明该对象已经被回收,然后再判断缓存的对象的个数是否达到了设定的最大值,以及是否触发interval机制,如果是,则不缓存该对象,方法直接返回。最后再判断elements数组是否需要扩容。总体来说,该方法的逻辑是很简单清晰的。

    void push(DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (threadRef.get() == currentThread) { // The current Thread is the thread that belongs to the Stack, we can try to push the object now. pushNow(item); } else { // The current Thread is not the one that belongs to the Stack // (or the Thread that belonged to the Stack was collected already), we need to signal that the push // happens later. pushLater(item, currentThread); }}private void pushNow(DefaultHandle<?> item) { //只有当recycleId==lastRecycledId==0时,item才是未被回收的状态, //如果不满足这个条件,说明对象已经被回收过了 if ((item.recycleId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } //OWN_THREAD_ID是一个常数,他等于Integer.MIN_VALUE+1。意味着将该对象的状态设置为已被同线程回收 item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; //达到该线程能缓存的对象的个数的上限,或者触发了前面所说的interval机制,则不缓存该对象,直接返回 if (size >= maxCapacity || dropHandle(item)) { // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } //elements数组已经满了,需要扩容 if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); } elements[size] = item; this.size = size + 1;}

    从stack中取对象的逻辑再pop方法中,该方法先判断elements的size是否为0,如果为0,则通过scavenge()方法去WeakOrderQueue中去找,如果还找不到,则返回null,说明已经没有缓存的对象了;如果不为0,则会判断对象的lastRecycledId和recycleId是否不相等,不相等说明该线程被多次回收过,那么这里就会抛出异常,如果lastRecycledId和recycleId不相等,则将lastRecycledId和recycleId都设为0,表示该对象正在被使用,然后返回这个对象。

    DefaultHandle<T> pop() { int size = this.size; //elements数组中没有缓存的对象 if (size == 0) { /** scanvenge()方法是从weakOrderQueue中去找被其他线程回收的对象, 如果找到了,就会将对象转移到elements数组中去,如果没找到,该方法返回null **/ if (!scavenge()) { return null; } size = this.size; if (size <= 0) { // double check, avoid races // weakOrderQueue中也没有找到,返回null return null; } } size --; DefaultHandle ret = elements[size]; elements[size] = null; // As we already set the element[size] to null we also need to store the updated size before we do // any validation. Otherwise we may see a null value when later try to pop again without a new element // added before. this.size = size; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; return ret;}

    四. 异线程回收对象

    异线程回收对象的逻辑在pushLater方法中。之前说过,异线程回收的对象是不放入elements数组中,而是放入WeakOrderQueue中,因此,Recycler中有一个FastThreadLocal常量:DELAYED_RECYCLED。用来保存每个线程回收的其他线程创建的对象,该线程保存的是一个Map,key是对象所属的stack,value是该线程回收的属于这个stack的对象。如图所示:thread A对应的Map中,stack B所对应weakOrderQueue中保存的是属于stack B(即由线程thread B创建)但由thread A进行回收的对象;thread B对应的Map中,stack A所对应的weakOrderQueue中保存的是属于stack A(即由线程thread A创建)但由thread B进行回收的对象,依次类推。

     接下来再看一下WeakOrderQueue的构造,一个WeakOrderQueue中有一个Link链表,每个Link中有一个Handle数组,这个数组才是真正保存对象的地方,每个数组的大小都是固定的(默认是16),WeakOrderQueue中还有一个next指针,指向下一个与其属于同一Stack的WeakOrderQueue。

     弄清楚了上面两幅图,再来看pushLater方法就很清晰了,以thread B回收thread A创建的对象为例,首先通过DELAYED_RECYCLED获取Map,再通过Map获取WeakOrderQueue,获取的是由thread A创建,但要被thread B回收的对象的WeakOrderQueue,如果WeakOrderQueue为null,意味着thread B之前从未回收过stack A 的对象,那么:

  • 如果Map的大小大于maxDelayedQueues(该参数的含义之前介绍过),意味着thread B已经为maxDelayedQueues个数的stack回收过对象了,因此不再回收stack A的对象,也就是向Map中put一个DUMMY
  • 通过newWeakOrderQueue方法创建一个queue,如果创建不成功,即方法返回null,那么直接return,表明此次回收对象失败。
  • 如果获取的WeakOrderQueue为DUMMY,那么同样直接return,此次回收对象失败。最后如果获取的是一个正常的WeakOrderQueue,那么就调用add方法将该对象插入到WeakOrderQueue中。

    //这里传入的thread是currentThread,也就是thread Bprivate void pushLater(DefaultHandle<?> item, Thread thread) { if (maxDelayedQueues == 0) { // We don't support recycling across threads and should just drop the item on the floor. return; } // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) Map<Recycler.Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); //this指向stack A,他属于thread A,因此这里获得的是由thread A创建,但要被thread B回收的对象的WeakOrderQueue WeakOrderQueue queue = delayedRecycled.get(this); //queue为null,意味着thread B之前从未回收过stack A的对象 if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // Check if we already reached the maximum number of delayed queues and if we can allocate at all. if ((queue = newWeakOrderQueue(thread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } queue.add(item);}

     Stack.newWeakOrderQueue()是向stack申请一个新的WeakOrderQueue,WeakOrderQueue.add()方法是将一个Handle插入WeakOrderQueue,这两个方法非常关键,我们一个一个来看。

    Stack的newWeakOrderQueue方法会调用WeakOrderQueue.newQueue()方法,并将自身的指针this传进去,注意这里this指向的是stack A(我们还是以thread B回收属于stack A的对象为例),newQueue方法又会调用reserveSpaceForLink方法,意味着每次给WeakOrderQueue扩容时,是以Link为最小单位(一个Link又可以存储LINK_CAPACITY个Handle),如果stack A的availableSharedCapacity大于LINK_CAPACITY,也就是说stack A还允许其他线程回收他的对象(前面讲过availableSharedCapacity这个参数的含义),那么reserveSpaceForLink会返回true,表明分配Link成功,否则返回false。分配成功后,会将该新创建的WeakOrderQueue的next指针指向stack A的 head,再将stack A的head更新为该WeakOrderQueue。如下图所示(Stack中cursor和prev这两个指针的含义在下一小节讲):

    此外,值得注意的是, WeakOrderQueue有一个指向Thread变量的弱引用,他的构造方法会传入一个Thread变量,这个thread是currentThread,也就是thread B,也就是说可以通过WeakOrderQueue.get()获取该WeakOrderQueue是被哪个线程回收的。

    private WeakOrderQueue newWeakOrderQueue(Thread thread) { //this==stack A return WeakOrderQueue.newQueue(this, thread);}static WeakOrderQueue newQueue(Recycler.Stack<?> stack, Thread thread) { // We allocated a Link so reserve the space if (!Recycler.WeakOrderQueue.Head.reserveSpaceForLink(stack.availableSharedCapacity)) { return null; } final WeakOrderQueue queue = new WeakOrderQueue(stack, thread); // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so // may be accessed while its still constructed. stack.setHead(queue); return queue;}static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) { //因为存在多个线程同时向stack A申请Link的情况,因此这里采用了for循环+CAS操作 for (;;) { int available = availableSharedCapacity.get(); if (available < LINK_CAPACITY) { return false; } if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) { return true; } }}synchronized void setHead(WeakOrderQueue queue) { queue.setNext(head); head = queue;}

    add方法将一个Handle(或者说一个对象)加入进一个WeakOrderQueue中,该方法首先会判断是否触发interval机制,如果触发则直接将该对象丢弃。然后WeakOrderQueue会将Handle放入Link链表的尾节点,如果尾节点Link已经满了,则会申请一个新的Link,并插入到链表的尾部。

    每个Link中有一个writeIndex指针,指向下一个Handle应该插入的位置,如果writeIndex==LINK_CAPACITY,意味着该Link已经填满了,因此需要新申请一个Link。此外,注意到对Link的writeIndex进行更新时,用到了lazySet操作,这时因为在多线程环境下时,在thread B回收thread A的对象,往Link中插入Handle的同时,thread A也可能从Link中取对象(当thread A自己的stack中没有对象时,就会从WeakOrderQueue中去取),也就是从Link中取Handle,这里用lazySet更新writeIndex,就有可能导致thread B刚刚插入到Link中的Handle,对thread A来说是不可见的,也就是说通过lazySet刻意地放弃了实时可见性,只保证最终可见性。其目的就是提升性能,因为如果保证对象的实时可见性,需要插入一些内存屏障,这会产生额外的开销。事实上这里也不需要保证实时可见性,如果thread A看不到thread B刚刚插入的Handle,大不了让thread A新创建一个对象。

    void add(DefaultHandle<?> handle) { //将handle.lastRecycledId设置为WeakOrderQueue的id,表明该对象是被异线程回收了。 //WeakOrderQueue的id是Recycler为每个WeakOrderQueue分配的一个独一无二的整数,且不会等于Integer.MIN_VALUE+1 handle.lastRecycledId = id; // While we also enforce the recycling ratio one we transfer objects from the WeakOrderQueue to the Stack // we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast // without control if the Stack if (handleRecycleCount < interval) { handleRecycleCount++; // Drop the item to prevent recycling to aggressive. return; } handleRecycleCount = 0; Recycler.WeakOrderQueue.Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { Recycler.WeakOrderQueue.Link link = head.newLink(); if (link == null) { // Drop it. return; } // We allocate a Link so reserve the space this.tail = tail = tail.next = link; writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1);}

     五. 从WeakOrderQueue获取对象

    前面提到,通过Recycler获取对象时,会先从线程的stack中的elements数组中获取,获取不到,则会从stack的WeakOrderQueue链表中获取,或者准确的说,是先从WeakOrderQueue中将对象转移到elements中(这一过程简称为对对象的收割),再从elements中获取。

    前面提到过,stack中有三个WeakOrderQueue指针,分别为head,cursor和prev:

  • head:指向stack的WeakOrderQueue链表的头节点,当有新的线程回收该stack的对象时,需要向stack申请一个新的WeakOrderQueue插入到链表的头部,这在前面已经介绍过。
  • cursor:第一次收割对象时会指向head,当stack从WeakOrderQueue链表中收割对象时,会从cursor指向的WeakOrderQueue中收割,如果cursor指向的WeakOrderQueue中的对象全部收割完了,则cursor会指向下一个WeakOrderQueue。
  • prev:总是指向cursor的上一个WeakOrderQueue节点。如果cursor指向head,那么prev为null。
  • 如图所示:

     我们再来看具体的代码实现,如果cursor指向null,说明这是第一次收割对象,或者上一次收割完之后已经走到了链表的末尾了,因此将cursor指向head,从头开始遍历链表。接下来是一个do-while循环,在该循环中,会将cursor指向的WeakOrderQueue的一个Link转移到stack的elements数组中去(stack收割对象是以Link为单位),如果收割失败,则将cursor指向下一个WeakOrderQueue,直到走到链表的末尾。在这个过程中,stack还会去判断cursor指向的WeakQrderQueue的回收线程是否还存在,如果不存在了,那么显然这个WeakQrderQueue也没有存在的必要了,因此会将这个WeakQrderQueue中的所有Link都进行收割,然后将这个WeakQrderQueue移出队列,但如果该节点是头节点除外,因为将头节点移出链表需要为链表重新设置头节点,而设置头节点的setHead(head)方法是一个同步方法,应该避免经常调用。如图所示:

    private boolean scavenge() { // continue an existing scavenge, if any if (scavengeSome()) { return true; } // 收割对象失败,将指针重置 prev = null; cursor = head; return false;}private boolean scavengeSome() { WeakOrderQueue prev; WeakOrderQueue cursor = this.cursor; //cursor为null,说明第一次收割对象 if (cursor == null) { prev = null; cursor = head; //head也为null,说明没有WeakOrderQueue,返回false,收割失败 if (cursor == null) { return false; } } else { prev = this.prev; } boolean success = false; do { if (cursor.transfer(this)) { //transfer方法将一个Link中的所有对象转移到stack的elements数组中,转移成功则直接返回true success = true; break; } // WeakOrderQueue next = cursor.getNext(); if (cursor.get() == null) { //与该WeakOrderQueue绑定的回收线程已经不存在了 //如果该WeakOrderQueue中还有对象,则全部进行收割 if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { // prev!=null说明cursor不指向头节点,因此要将cursor指向的节点移出链表 // Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed. cursor.reclaimAllSpaceAndUnlink(); prev.setNext(next); } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success;}

    transfer方法用来将一个WeakOrderQueue的一个Link的对象转移到stack的elements数组中,reclaimAllSpaceAndUnlink用来将一个WeakOrderQueue节点移出链表,这两个方法的逻辑比较简单,并且也不是关键方法,这里就不详细介绍了。

    (完)

    相关推荐

    相关文章