publicclassThreadimplementsRunnable{ /* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; }
staticclassThreadLocalMap{
/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ staticclassEntryextendsWeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value;
// ThreadLocal.java /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get(){ Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { T result = (T)e.value; return result; } } return setInitialValue(); }
/** * Get the map associated with a ThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t){ return t.threadLocals; } /** * Returns the current thread's "initial value" for this * thread-local variable. This method will be invoked the first * time a thread accesses the variable with the {@link #get} * method, unless the thread previously invoked the {@link #set} * method, in which case the {@code initialValue} method will not * be invoked for the thread. Normally, this method is invoked at * most once per thread, but it may be invoked again in case of * subsequent invocations of {@link #remove} followed by {@link #get}. * * <p>This implementation simply returns {@code null}; if the * programmer desires thread-local variables to have an initial * value other than {@code null}, {@code ThreadLocal} must be * subclassed, and this method overridden. Typically, an * anonymous inner class will be used. * * @return the initial value for this thread-local */ private T setInitialValue(){ T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; }
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */ publicvoidset(T value){ Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
/** * Set the value associated with key. * * @param key the thread local object * @param value the value to be set */ privatevoidset(ThreadLocal<?> key, Object value){
// We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not.
Entry[] tab = table; int len = tab.length; // 计算 hash 值对应的槽位 int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); // 如果该槽位存储的 ThreadLocal 对象就是自己,就返回 if (k == key) { e.value = value; return; } // 如果遍历找到了一个空的槽位,就占用它 if (k == null) { replaceStaleEntry(key, value, i); return; } }
tab[i] = new Entry(key, value); int sz = ++size; // 清除 key 为 null的槽位,如果size过大就扩容,扩容阈值是哈希表长度的 2 / 3 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
private Entry getEntry(ThreadLocal<?> key){ int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; // 直接 hash 槽位找到了目标对象,直接返回 if (e != null && e.get() == key) return e; else // 否则,遍历查找 return getEntryAfterMiss(key, i, e); }
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e){ Entry[] tab = table; int len = tab.length;
while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) // 这里如果发现了空的槽位,要进行重新 hash,来提升效率 expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } returnnull; }
privateintexpungeStaleEntry(int staleSlot){ Entry[] tab = table; int len = tab.length;
publicclassFastThreadLocalThreadextendsThread{ // This will be set to true if we have a chance to wrap the Runnable. privatefinalboolean cleanupFastThreadLocals; // 保存了所有的 FastThreadLocal 值 private InternalThreadLocalMap threadLocalMap; // ... }
// FastThreadLocal.java /** * Set the value for the current thread. */ publicfinalvoidset(V value){ if (value != InternalThreadLocalMap.UNSET) { // 获取 Map InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); setKnownNotUnset(threadLocalMap, value); } else { remove(); } }
/** * @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}. */ privatevoidsetKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value){ if (threadLocalMap.setIndexedVariable(index, value)) { // 如果设置成功,则将该 FastThreadLocal 保存起来,方便后续清理 addToVariablesToRemove(threadLocalMap, this); } }
// InternalThreadLocalMap.java /** * @return {@code true} if and only if a new thread-local variable has been created */ publicbooleansetIndexedVariable(int index, Object value){ Object[] lookup = indexedVariables; // 如果长度足够,直接通过 index 进行修改 if (index < lookup.length) { Object oldValue = lookup[index]; lookup[index] = value; return oldValue == UNSET; } else { // 否则进行扩容,扩容后的大小是比 index 大的最小的2的幂 expandIndexedVariableTableAndSet(index, value); returntrue; } }
// FastThreadLocal.java @SuppressWarnings("unchecked") privatestaticvoidaddToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable){ Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; // 保存 FastThreadLocal 对象的集合存在数组的 index 0 位置,因为 variablesToRemoveIndex 恒等于 0 if (v == InternalThreadLocalMap.UNSET || v == null) { variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { variablesToRemove = (Set<FastThreadLocal<?>>) v; }
variablesToRemove.add(variable); }
/** * Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you * are in a container environment, and you don't want to leave the thread local variables in the threads you do not * manage. */ publicstaticvoidremoveAll(){ // 在进行清除时,如果InternalThreadLocalMap为空,则说明没有使用 FastThreadLocal InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); if (threadLocalMap == null) { return; }
/** * Sets the value to uninitialized for the specified thread local map; * a proceeding call to get() will trigger a call to initialValue(). * The specified thread local map must be for the current thread. */ @SuppressWarnings("unchecked") publicfinalvoidremove(InternalThreadLocalMap threadLocalMap){ if (threadLocalMap == null) { return; } // 将对应槽位的值设为 null,并返回之前的值 Object v = threadLocalMap.removeIndexedVariable(index); // 将该 FastThreadLocal 从待删除结合(index0 保存的集合)中删除 removeFromVariablesToRemove(threadLocalMap, this); // 如果之前的值不是空,就调用 onRemoval 回调函数 if (v != InternalThreadLocalMap.UNSET) { try { onRemoval((V) v); } catch (Exception e) { PlatformDependent.throwException(e); } } }
// InternalThreadLocalMap.java // Cache line padding (must be public) // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes. publiclong rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
通常 CPU 的缓存行一般是 64 或 128 字节,为了防止InternalThreadLocalMap的不同实例被加载到同一个缓存行,我们需要多余填充一些字段,使得每个实例的大小超出缓存行的大小。