当前位置:网站首页>Learning notes of JUC source code 2 - AQS sharing and semaphore, countdownlatch

Learning notes of JUC source code 2 - AQS sharing and semaphore, countdownlatch

2022-07-19 15:45:00 Cuzzz

 This paper mainly describes AQS Sharing mode of , Sharing and exclusive have similar routines , So if you don't know AQS Exclusive words , You can see mine 《JUC Source learning notes 1》
 The main references are 《Java The art of concurrent programming 》,《Java Concurrent programming practice 》 And the blogs of the following two bloggers 
https://segmentfault.com/a/1190000016447307  This is what I have seen AQS Share the best blog 
https://www.cnblogs.com/micrari/p/6937995.html  This article is right PROPAGATE A better interpretation of the role of 

I. 1 Semaphore+AQS Sharing mode

1.Semaphore The role of

The company has five pits that can be used to go to the toilet , For a toilet , The five pits can be regarded as five shared resources , Five employees can be allowed at the same time ( Threads ) Come to the bathroom , Currently, any employee enters one of the pits , Then you can use pit ( Shared resources ) Reduce , When employees come out , Shared resources are released , When all is occupied , Subsequent toilet users need to wait ( It is manifested in the thread getting shared resource blocking ) Of course, this wait can be interrupted ( Testing brings challenges to waiting development bug, Development gives up queuing and returns to the station ) This wait can also be timed out ( Wait too long, the state of mind collapsed, wait )

Semaphore Semaphores are used to control the number of operations of multiple threads accessing a specific shared resource at the same time 

intuitive , We can realize that ,Semaphore The formula is based on AQS Sharing mode of

2.Semaphore Common methods

Method describe
public Semaphore(int permits) Construction method for specifying the number of licenses ( How many pits are there in the toilet )
public Semaphore(int permits, boolean fair) Create a semaphore with a given number of permissions and a given fair setting ( The second parameter specifies the release fairness , For example, the quality of employees , Is there anyone who doesn't line up to go to the bathroom )
public void acquire() throws InterruptedException Get a license interruptively , If you get permission , The number of licenses decreased 1 Method returns , Otherwise, the following occurs when blocking the current thread
1. Other threads released the license , And the current thread is licensed ( A toilet came out , And you go to the toilet as you wish )
2. Another thread interrupted the current thread ( The test mentioned bug Calling interrupted your queue )
public void acquireUninterruptibly() and acquire() similar , But this method does not respond to interruption, that is, it will not give up because of interruption on the way to obtain permission ( There are three nasty people , When the heavenly king Lao Tzu comes, he has to go to the toilet first )
public boolean tryAcquire() Try to get permission , If the license is successfully obtained, return true And reduce licenses , Instead, return to false,( You came to the toilet and looked at the following , If there is a hole, go in immediately , On the contrary, directly return to the station )
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException and tryAcquire() similar , But the response is interrupted , Support timeout , If you get the shared resource at the specified time, return true, If timeout does not get return false, If it is interrupted during the acquisition, an interrupt exception is thrown
public void release() Release license , And the number of licenses plus 1( Release the pit after toileting )
public int availablePermits() Returns the current number of licenses available in this semaphore .
public int drainPermits() Obtain and return all immediately available licenses .
protected void reducePermits(int reduction) Reduce the number of available licenses by the amount indicated .
acquire,acquireUninterruptibly,tryAcquire,release There are also overloaded methods that support obtaining a specified number of shared resources 

3.Semaphore How is it realized

Obvious ,Semaphore Is based on AQS Sharing mode of ,Semaphore Methods are delegated to Sync

3.1 acquire()—— Interruptible licensing ( Get one without parameters , There are parameters to specify to get n individual )

Semaphore Of acquire Method directly calls sync Of acquireSharedInterruptibly(1), This method is sync Parent class of AbstractQueuedSynchronizer The implementation of

and ReentrantLock A similar routine , Many concurrency uses this internal class , Leave the implementation of the function to the internal class

3.1.1 tryAcquireShared Try to get shared resources
 Relative to an exclusive lock `tryAcquire(int arg)` return boolean Type value , Shared lock `tryAcquireShared(int acquires)` Returns an integer value :

-  If the value is less than 0, It means that the current thread failed to obtain the shared lock 
-  If the value is greater than 0, It means that the current thread has successfully obtained the shared lock , And the next attempt by other threads to acquire the shared lock is likely to succeed 
-  If the value is equal to 0, It means that the current thread has successfully obtained the shared lock , But the next attempt by other threads to acquire the shared lock fails 
3.1.1.1 Unfair attempts to obtain shared resources

The direct call is nonfairTryAcquireShared Method

final int nonfairTryAcquireShared(int acquires) {
    // A choice 
    for (;;) {
        // Available licenses 
        int available = getState();
		// The remaining = You can use - Currently required licenses 
        int remaining = available - acquires;//   1
        
		// If the remaining is less than 0  or  cas Set the number of bits allowed true  Return the remaining number of licenses 
        // Worth a taste 
        if (remaining < 0 ||
            compareAndSetState(available, remaining))//  2
            return remaining;
    }
}
  • Spin ends

    1. The remaining licenses are less than 0 It means that the current remaining licenses are insufficient to meet our requirements

    2. Current licenses can meet our needs , And successful CAS Change the number of licenses

     Maybe threads A  Execute to 1 This line of discovery is enough , But at present, many threads are competing for resources , Lead to execution 2 When the current thread CAS Failure , Then it will enter the next cycle 
    
3.1.1.2 Try to get shared resources fairly
protected int tryAcquireShared(int acquires) {
    for (;;) {
        
        // If there are threads waiting ahead , For the sake of fairness , return -1  Failed to get shared resource 
        if (hasQueuedPredecessors())
            return -1;
        
        // As unfair 
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
3.1.2 doAcquireSharedInterruptibly Queue up to get shared resources

Although there is no name in exclusive mode doAcquireInterruptibly Methods , But it's still the same routine

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {

    // Tectonic node , Join the end of the synchronization queue 
    final Node node = addWaiter(Node.SHARED);
    // Get the failure flag 
    boolean failed = true;
    try {
        // Optional 
        for (;;) {
            // Previous nodes 
            final Node p = node.predecessor();
			// The predecessor node is the head node 
            if (p == head) {
                // Try to get shared resources 
                int r = tryAcquireShared(arg);
				// To be successful 
                if (r >= 0) {
                    // Set as the head node and propagate 
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // Suspends the current thread   If it is interrupted, throw an interrupt exception directly 
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // If the current node gives up , The correspondence here is interrupted ( The timeout acquisition method will also enter )
        if (failed)
            cancelAcquire(node);
    }
}

The difference from monopoly is :

  • addWaiter(Node.SHARED) Mark that the current node is in shared mode

    This Node.SHARED Set to the current node nextWaiter Attribute ,nextWaiter The function here is only to mark the mode of the current node ( Monopoly or share )

     stay Condition Waiting queue plays the role of serializing waiting threads , There will be a special explanation in the follow-up 
    
  • When exclusive, you call setHead Method , So this is calling theta setHeadAndPropagate( Current thread node ,tryAcquireShared Return value ( In semaphore, it can be understood as the number of remaining licenses ))

3.1.3 setHeadAndPropagate Set as the head node and propagate
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // Set as header , Head in AQS Is the thread that gets the lock , It also means getting out of the sync queue ,
    setHead(node);
    
    // Wake up the 
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared Method is used when the current shared lock is available , Wake up the head The next node of the node , The detailed function of this method will be explained later , Now let's analyze ,setHeadAndPropagate Under what circumstances will this method be called

  • The remaining shared resources are greater than 0 propagate > 0

    In shared lock mode , Locks can be shared by multiple threads , Now that the current thread has the shared lock , It also has the remaining shared resources , Then you can directly notify the subsequent node to get the lock , Instead of waiting for the lock to be released .( When I came to the toilet, I found that all five pits were available , Send a message to my good brother , come quick , Shit free )

  • The next node of the current node is not empty and is in shared mode if (s == null || s.isShared())

  • The waiting state of the old head node is less than 0 or The current head node waiting state is less than 0

    Shared resources are acquired , Threads will set themselves as the head node , All the head nodes in the sharing mode represent the threads that have obtained the shared resources or the threads that have obtained the shared resources

3.1.4 doReleaseShared Wake up the subsequent waiting thread
 When the current shared lock is available , Wake up the head The next node of the node 

This method is in addition to setHeadAndPropagate Was called unexpectedly , Still releasing shared resources (releaseShared) Will be called in , Imagine a scene , There is a thread A Release the lock at the same time , One thread B Get the lock , The former calls releaseShared, The latter calls setHeadAndPropagate , Concurrent calls to doReleaseShared Method to wake up the next node of the head node , therefore doReleaseShared Thread safety needs to be considered

3.1.4.1 Rough interpretation of the source code
// It's worth a good taste 
private void doReleaseShared() {
    // loop 
    for (;;) {
        // head    Maybe this line is finished h It's the old head , There is another thread getting the shared lock , Set yourself as the head 
        Node h = head;
        //h The head is not null  Not equal to tail , Note that at least two nodes in the current queue are required 
        if (h != null && h != tail) {
            //h State of the head 
            int ws = h.waitStatus;
            //h The head status is SINGNAL  Explain when the subsequent nodes join the team addWaiter Set the status of the current node , It indicates that subsequent nodes need to wake up 
            if (ws == Node.SIGNAL) {
                //CAS modify h Status as 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;          
                // Wake up subsequent threads 
                unparkSuccessor(h);
            }
            //h Status as 0  And CAS Set to PROPAGATE  Failure 
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        
        //h  Equal to the current header , It means that there is no head or change when executing the above paragraph , No other thread gets shared resources 
        if (h == head)                  
            break;
    }
}
3.1.4.2 doReleaseShared The condition of loop exit and the purpose of this design
if (h == head)                  
     break;

here h Just entered doReleaseShared The head node of time ,head Is the head of the current queue , If the two are equal, exit the loop

  • When is not equal —— Threads A Release shared resource calls doReleaseShared It has not been executed until the loop exits , Threads B Get the shared resource and set yourself as the new head node
  • What happens to inequality —— Threads A continue for Loop execution , If this time the thread B Also release resources , Then this method is executed by multiple threads
  • The purpose of this design —— The same function can also be achieved by designing each node to wake up only its own subsequent threads , However, multiple threads can wake up together to improve the efficiency of waking up threads waiting for shared resources . Even when a new thread acquires a shared lock, it will call doReleaseShared, Wake up the successor node ,
 There are five pits in a toilet , At a certain moment, five pits are ABCDE Occupy , There is something EF Two unlucky people waiting in line ,ABCDE When occupying the pit, you will set yourself as the head node , Several people will call when they get the pit doReleaseShared ( such as D The fourth one is , Found another pit , Immediately say , The brother in the back also has a toilet ) Another example is that five pits are occupied, but E Find yourself in SINGAL( yes E Remind yourself when you are in line. Remind yourself when you finish pulling , He plays on his cell phone first ( Hang up ))

 When more than one person finishes pulling at a certain moment , Release the pit and get out of the toilet ,A Release to if (h == head) When , It is found that the head node has changed , Continue to call brothers to see if there may be a hole ,B So it is with , At the same time, there may be many people who have finished pulling the toilet to wake up the people behind them to go to the bathroom , In this way, the people who line up behind to play mobile phones , Being awakened is more efficient , Thus, the utilization efficiency of the toilet is improved 
3.1.4.3.doReleaseShared The logic of awakening
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            
            //1. Situation 1 
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;           
                unparkSuccessor(h);
            }
            
            //2. Situation two 
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;              
        }
        if (h == head)                   
            break;
    }
}
  • Situation 1

    ws == Node.SIGNAL It means that the status of the subsequent nodes is modified when they join the team , Indicates that you need to wake up

    Use here CAS Ensure that multiple threads execute the current method, and only one thread can successfully wake up subsequent threads ( Shared lock has multiple threads waking up concurrently )

     At the same time, two boys came out of the toilet , Only one person needs to notify the waiting person 
    
  • Situation two

    ws == 0 It may be that the head node has no successors , So the node state is the initial state , But the top one if (h != null && h != tail) It ensures that there are at least two nodes in the queue

    ws == 0 It may also be the above situation 1 Revised to 0, But this situation will not enter the current branch

    Finally, it is only possible that the tail node becomes the head node ,compareAndSetWaitStatus(h, 0, Node.PROPAGATE) Need to return false To continue the cycle , It indicates that the status of subsequent nodes has been modified to SIGANAL, At this time, the subsequent nodes will continue to wake up circularly . Notice the top if (h != null && h != tail) That is, there are at least two nodes in the queue , Let the code run to the situation 1, If you want to ws == Node.SIGNAL If not, it means that this head node has just become a head node , The state has not been modified by the subsequent node to SINGANL, Then the subsequent node just changed the head node status to SINGAL To promote !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) by true, That is to say, case 2 is generated at the new head node , And it has not been modified as SINGAL, And execute in the head node thread compareAndSetWaitStatus(h, 0, Node.PROPAGATE) In an instant, the subsequent node preempted and modified the state of the head node to SINGAL To get to situation two continue in

    10:40:01 When A Successfully obtain the right to go to the toilet , At this time, the toilet is full ,A Set yourself as the head node , It is found that the original head node state is SINGANL, He is ready to wake up the brothers in line behind 
    10:40:02 B Found no toilet , First in the queue , Prepare to revise A The status of is SINGAL( Give Way A Remember to wake yourself up ) here A The wake-up process is already being implemented , At this time, there are two nodes in the queue ,A For the head ,B For the tail ,A Implementation to the situation 1, Find yourself not SINGAL, Come to the situation 2, Prepare to change your status to PROPAGATE But it failed ( here B Just revised A Status as SINGAL 了 )A Carry on for loop ( There may be others after going to the toilet , awakened B,B Become a new head node ),
     here A You will get the head node of the queue ( The node that just obtained the lock recently ) Carry on for loop , Finally, the head node of the queue has not changed ,A Just give up 
    

3.2 acquireUninterruptibly—— Non interruptible license acquisition

The direct call is AQS Of acquireShared(1) Method

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Rough logical sum acquireSharedInterruptibly, Its non response to interrupts is reflected in doAcquireShared in

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // If it's interrupted , Then make up for the interruption , Instead of throwing an exception 
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //parkAndCheckInterrupt()  Put back true  Indicates that the current thread is from LockSupport Because it was interrupted , Then put the interrupted Set as true, Continue to cycle 
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

The routine is similar to that in exclusive mode , The current thread starts from LockSupport Check its interrupt representation after returning from , If it is found that it is caused by an interrupt, the interrupt identifier will be added after the shared resource is currently obtained

3.3tryAcquire(int permits, long timeout, TimeUnit unit) Time out to get permission

Call directly AQS Of tryAcquireSharedNanos Method

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	
    // If you succeed in obtaining shared resources directly, return directly true 了   Short circuit subsequent doAcquireSharedNanos
    return tryAcquireShared(arg) >= 0 ||

        // Timeout to get shared resources 
        doAcquireSharedNanos(arg, nanosTimeout);
}
3.3.1doAcquireSharedNanos Timeout to get shared resources
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; 
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

The same routine as exclusive timeout acquisition , The remaining time must be greater than spinForTimeoutThreshold The threshold is suspended, otherwise spin , Response interrupt is similar

3.4 release(int permits)—— Release license

Directly called AQS Of releaseShared

public final boolean releaseShared(int arg) {
    // Static inner class overrides in semaphores 
    if (tryReleaseShared(arg)) {
		// Wake up the subsequent waiting thread   As I said before 
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    // The spin 
    for (;;) {
       
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
       
        //CAS Set the number of licenses ,
        if (compareAndSetState(current, next))
            return true;
    }
}

Unlike monopoly , Shared locks are released by multiple threads , So use self selection +CAS Ensure that the number of licenses will not be wrong in the case of concurrency

 The logic of reducing licenses is similar , It's also a cycle +CAS The way 

II CountDownLatch atresia

1.CountDownLatch The role of

The locking tool is equivalent to a door , Before the locking reaches the end state , The door is always closed , No thread can pass through , When the end state is reached , Locking will allow all threads to pass , When the locking reaches the end state, it will not change the state and remain open

2.CountDownLatch Usage scenarios and their common methods

2.1 Use scenarios

  • Ensure that a calculation continues after all resources are prepared
  • Make sure that a service is started after all other services it depends on are started
  • Wait for all participants of an operation to be ready before continuing (moba All players confirm to accept the game before entering the selection of heroes )

2.2 Common methods

Method effect
void await() throws InterruptedException Make the current thread wait until the latch counts down to zero , Two ways to return from this method
1. Count to 0
2. The waiting thread is interrupted , Throw an interrupt exception and return
void await(long timeout, TimeUnit unit)throws InterruptedException and await() similar , But if it times out, it will also return directly
void countDown() Count minus 1, If the count reaches 0 Then all waiting threads will be able to pass
long getCount() Returns the current count . This method is usually used for debugging and testing purposes

3.CountDownLatch How is it realized

AB Thread needs to wait CDE Continue to execute after execution , Actually CDE The lock is blocked AB, after CDE After releasing the lock AB To continue running

3.1 await The wait count for responding to interrupts is 0 & await(long timeout, TimeUnit unit) The timeout response interrupt wait count is 0

3.1.1 await Wait count for response interrupt

Directly call the static inner class acquireSharedInterruptibly(1) Method , This method will directly call the static inner class instance tryAcquireShared(1) Method

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

obtain AQS in state(CountDownLatch Constructor sets this value , Indicates how many threads need to wait for execution countDown), As we said above tryAcquireShared Return is greater than or equal to 0 Indicates that the shared resource was obtained successfully , A negative number indicates that the failure will subsequently enter the waiting queue , There is no return here 0 This situation , If the shared resource is 0 Express “ The door is open ” The thread executing this method can run freely , On the contrary, they will wait in line , The reason why I didn't return 0, Because CountDownLatch Support multiple threads, such as ABC Wait together , return 0 Indicates that the current thread obtains resources successfully, but subsequent threads will fail , return 1 It can ensure that the current thread will wake up other threads when it sees the door open

3.1.2 await(long timeout, TimeUnit unit) The timeout response interrupt wait count is 0

Directly called AQS Of tryAcquireSharedNanos, Also call the rewritten tryAcquireShared Method , Subsequent calls doAcquireSharedNanos The logic is the same as the semaphore above

3.2 countDown The count decreases 1

Call directly AQS Of releaseShared Method , Call to rewritten tryReleaseShared Method

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        
        // Before reduction 0  Go straight back to false
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
 			// Reduced to 0  Will return true
            return nextc == 0;
    }
}

The same way CAS+ Optional guarantee state, But if it has been reduced before 0 Return of words false, And only after the reduction is 0 Will return true, Ensure invalid multiple wakeups

 Suppose the count is 4, But five threads execute countDown, When the fifth thread executes, it is found that four old six are one step ahead , Straight back false, When the first four old six execute, only the last one will return true( Reduced to 0) This is the time to execute tryReleaseShared  Wake up 

Third, a little picture of shared resources

1.doReleaseShared Multithreading calls at the same time

You can see that when multiple threads release shared resources , If there are queued nodes in the current queue , Then there may be multiple threads calling concurrently doReleaseShared The possibility of , If the header node is signal It indicates that subsequent nodes need to wake up , Use CAS It ensures that only one thread can execute successfully unparkSuccessor Wake up subsequent threads , Subsequent threads may have executed before tryAcquireShard Return negative , Prepare to hang yourself , Maybe he was executed before hanging himself unpark, Or it is executed immediately after hanging unpark, Continue to take shared resources , And those CAS The failed thread will continue to wake up , This reflects the release of three resources , Will not wake up only one . And this method exits only when the head node does not change during wakeup , No reform means that the competition for shared resources is not so fierce ( The head node is the node that gets the shared resource recently )

2.doReleaseShared An extremely short state

At this point the thread B And thread A There must be a thread CAS Failure , If the thread B Failure , That means threads A success CAS by SIGNAL, however shouldParkAfterFailedAcquire return false And continue to spin , Maybe at this time tryAcquireShared If you succeed, there is no need to hang , If the thread A Self selected to tryAcquireShared, Being preempted by an old six thread to obtain shared resources , At this point, the thread A Will execute shouldParkAfterFailedAcquire return true Ready to hang yourself , This is a thread B Maybe it wakes up the thread successfully A. If the thread ACAS failed , There will also be a spin , Threads B If CAS Success will also have a spin , Maybe the thread A You will successfully get the shared resources and change yourself into the head node , Threads B Also perform a spin . All of this is to improve the throughput of the system and try not to waste shared resources , Don't let the thread that needs to work be suspended because it doesn't wake up in time .

原网站

版权声明
本文为[Cuzzz]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/200/202207180005063129.html