当前位置:网站首页>2. Actual use of asynctool framework
2. Actual use of asynctool framework
2022-07-19 02:55:00 【The sound of the waves is still loud】
Preface
This article is a column 《AsyncTool Framework principle source code analysis 》 The second article in the series 《AsyncTool The framework is used in actual combat 》, adopt Arrange serial 、 parallel 、 Block waiting - Serial first , Post parallel 、 Block waiting - Parallel first , Post serial 、 abnormal 、 Overtime And other scenarios , Study AsyncTool frame Practical application of , And preliminarily explore its implementation principle .
special column 《AsyncTool Framework principle source code analysis 》 share 5 piece article , from the shallower to the deeper , from actual combat Use again Source code 、 principle analysis , Including but not limited to AsyncTool frame Thinking and summary , Finally, share my thoughts for AsyncTool frame Open source project Contribution code . Here are 《 Column catalog 》:
- 《AsyncTool Framework introduction and analysis implementation 》
- 《AsyncTool The framework is used in actual combat 》
- 《AsyncTool Framework principle source code analysis 》
- 《AsyncTool Some thoughts on the framework 》
- 《AsyncTool The framework is flawed ?》
1、 Serial scenario

Before the actual battle , Have a look first Worker Use of the smallest task unit .
Worker Is defined as follows , Realization IWorker,ICallback Functional interface , And rewrite the following 4 A way .4 The description of the two methods is as follows :
1、begin():Worker Before starting execution , Call back first begin()
2、action():Worker Where time-consuming operations are performed , such as RPC Interface call .
3、result():action() After execution , Callback result Method , It can be handled here action Return value in .
4、defaultValue(): Whole Worker Perform abnormal , Or a timeout , Callbacks defaultValue(),Worker return The default value is .
If not ICallback, Will be executed by default DefaultCallback The callback method .DefaultCallback Is an empty callback , There's no logic in it .
Simulate serial scenarios :A Task to parameter +1, after B Task to parameter +2, after C Task to parameter +3.( The scenario has no actual business meaning , It's simple )
(1)workerA:
public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/** * Worker Start with begin */
@Override
public void begin() {
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/** * Worker The time-consuming operations in are performed here RPC/IO * @param object object * @param allWrappers Task packaging * @return */
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 1;
return res;
}
/** * action Callback of execution results * @param success * @param param * @param workResult */
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("A - param:" + JSON.toJSONString(param));
System.out.println("A - result:" + JSON.toJSONString(workResult));
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/** * Worker Callback in case of exception * @return */
@Override
public Integer defaultValue() {
System.out.println("A - defaultValue");
return 101;
}
}
(2)workerB:
public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/** * Worker Start with begin */
@Override
public void begin() {
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/** * Worker The time-consuming operations in are performed here RPC/IO * @param object object * @param allWrappers Task packaging * @return */
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 2;
return res;
}
/** * action Callback of execution results * @param success * @param param * @param workResult */
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("B - param:" + JSON.toJSONString(param));
System.out.println("B - result:" + JSON.toJSONString(workResult));
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/** * Worker Callback in case of exception * @return */
@Override
public Integer defaultValue() {
System.out.println("B - defaultValue");
return 102;
}
}
(3)WorkerC:
public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/** * Worker Start with begin */
@Override
public void begin() {
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/** * Worker The time-consuming operations in are performed here RPC/IO * @param object object * @param allWrappers Task packaging * @return */
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 3;
return res;
}
/** * action Callback of execution results * @param success * @param param * @param workResult */
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("C - param:" + JSON.toJSONString(param));
System.out.println("C - result:" + JSON.toJSONString(workResult));
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/** * Worker Callback in case of exception * @return */
@Override
public Integer defaultValue() {
System.out.println("C - defaultValue");
return 103;
}
}
(4) layout WorkerWrapper Packaging :
above Worker Once you've created it , Use WorkerWrapper Yes Worker Package and arrange ,WorkerWrapper yes AsyncTool The smallest executable task unit of a component .
C It's the last step , It has no next.B Of next yes C,A Of next yes B. The order of arrangement is :C <- B <- A
public class Test {
public static void main(String[] args) {
// introduce Worker The unit of work
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
// packing Worker, Arrange serial sequence :C <- B <- A
//C It's the last step , It has no next
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3
.build();
//B Of next yes C
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2
.next(wrapperC)
.build();
//A Of next yes B
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
.next(wrapperB)
.build();
try {
//Action
Async.beginWork(1000, wrapperA);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}
(5) Through the actuator class Async Of beginWork Method submit task execution .
- Timeout: Set the timeout of the whole group of tasks , If Worker Task timeout , be Worker Use of results defaultValue() The default value is .
- ExecutorService executorService: Custom thread pool , If you don't customize it , Just leave by default COMMON_POOL. The default thread pool is the indefinite length thread pool .
- WorkerWrapper… workerWrapper: Starting task , It can be more than one . Be careful not to submit tasks of intermediate nodes , Just submit the starting task , The subsequent tasks of the orchestration will be executed automatically .
// Default indefinite long-distance pool
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
(6) The above is just a way of writing , If you think this kind of writing is anti human , You can also use depend Mode arrangement :
//A No, depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)
.build();
//B Of depend yes A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)
.depend(wrapperA)
.build();
//C Of depend yes B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)
.depend(wrapperB)
.build();
//begin
Async.beginWork(1000, wrapperA);
Running results :A:1+1=2;B:2+2=4;C:3+3=6

2、 Parallel scenes

Scene simulation : Based on serial scenario ,.A Task to parameter +1,B Task to parameter +2,C Task to parameter +3. Parallel execution .
WorkerWrapper Parallel orchestration :A\B\C None next and depend, 3 individual WorkerWrapper Together begin.Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
public class Test {
public static void main(String[] args) {
// introduce Worker The unit of work
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
/** * packing Worker, Arrange parallel sequence */
//A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
.build();
//B
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2
.build();
//C
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3
.build();
try {
//3 individual WorkerWrapper Together begin
Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}
Execution results :ABC Use different threads to execute in parallel .A:1+1=2;B:2+2=4;C:3+3=6

3、 Block waiting - Serial first , Post parallel

Block waiting - Serial first , Post parallel scene simulation :A Execute first , For parameters +1;A After execution ,B\C Execute in parallel at the same time ,B The task is based on A The return value of +2,C The task is based on A The return value of +3
(1)next How to write it :
public static void nextWork() {
// introduce Worker The unit of work
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//C It's the last step , It has no next
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(null)// No parameters , according to A The return value of +3
.build();
//B It's the last step , It has no next
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(null)// No parameters , according to A The return value of +2
.build();
//A Of next yes B、C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
//next yes B、C
.next(wrapperB, wrapperC)
.build();
try {
//Action
Async.beginWork(1000, wrapperA);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
(2)depend How to write it :
//A No, depend, It's the beginning
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)
.build();
//C depend A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(null)
.depend(wrapperA)
.build();
//B depend A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(null)
.depend(wrapperA)
.build();
Execution results :A:1+1 = 2;B:2+2 =4;C:3+2 = 5

4、 Block waiting - Parallel first , Post serial

Scene simulation : Block waiting - Parallel first , Post serial .
B\C Parallel execution .B For parameters +2,C For parameters +3,B\C After all execution ,A = B Return value +C Return value .
Be careful : need B and C meanwhile begin.Async.beginWork(4000, wrapperB, wrapperC);
(1)next How to write it :
public static void nextWork() {
// introduce Worker The unit of work
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//A It's the last step , No, next
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(null)// Parameter is null,A = B + C
.build();
//C next A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3 = 6
.next(wrapperA)
.build();
//B next A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2 = 4
.next(wrapperA)
.build();
try {
new SynchronousQueue<Runnable>();
//Action
Async.beginWork(4000, wrapperB, wrapperC);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
(2)depend How to write it :
//C No, depend, Is the starting node
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3 = 6
.build();
//B No, depend, Is the starting node
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2 = 4
.build();
//A depend B,C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(null)// Parameter is null,A = B + C
.depend(wrapperB, wrapperC)
.build();
Execution results :B:2+2=4;C:3+3 = 6;A = B+C = 10

5、 abnormal 、 Timeout callback scenario
this 2 Kinds of scenes , You can fine tune based on the above scenario , that will do debug debugging .
// Timeout time , Thread pool , initial Wrapper, Multiple
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
- Based on the... Set by the whole group timeout, If I run out of time , be worker The return value in uses defaultValue()
- If at present Worker The task is abnormal , Then the current task uses defaultValue(), also depend The current task is , also FastFail, return defaultValue()
If you like this article , Please don't be stingy with your praise , It's not easy to create , thank !

边栏推荐
猜你喜欢

OSPF综合实验

使用Virtual IP+Keepalived配置高可用
![mysqldump: [Warning] Using a password on the command line interface can be insecure.](/img/91/8b0d35f85bc0f46daac4e1e9bc9e34.png)
mysqldump: [Warning] Using a password on the command line interface can be insecure.

Changes of service account in kubernetes1.24

从MySQL架构看一条SQL语句是如何执行的?

What happens when you get stuck compiling and installing MySQL database in Linux system?

DNS domain name resolution

MySQL master-slave replication + read write separation

HCIA_ Rip experiment

Binary installation kubernetes 1.23.2
随机推荐
Yum warehouse service and PXE automatic deployment system
Configure VLAN and use OSPF protocol for layer 3 switches
Understand network namespaces
expect免交互
ENSP静态路由实验
三层交换机配置VLAN和使用OSPF协议
Dynamic programming - 01 knapsack problem
Upgrade kubernetes 1.23.2 to 1.24.1
Oracle获取最后一条,第一条数据(按时间获取第一条和最后一条数据)
Oracle查询时间段内所有日期
mysqldump: [Warning] Using a password on the command line interface can be insecure.
Learning network foundation
GFS分布式文件系统
All dates in Oracle query time period
MySQL存储引擎详解
【MySQL】MHA高可用
Firewall firewall
Traversal of binary tree
MySQL日志管理和完全备份增量备份与恢复
Nat comprehensive experiment