当前位置:网站首页>Mqant in-depth analysis
Mqant in-depth analysis
2022-07-19 04:12:00 【sandyznb】
This article mainly focuses on BaseModule Of OnInit Carried out by

// OnInit When the module is initialized, call
func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings, opt ...server.Option) {
// Initialization module
m.App = app
m.subclass = subclass
m.settings = settings
// Create a remote call RPC
opts := server.Options{
Metadata: map[string]string{},
}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
opt = append(opt, server.Registry(app.Registry()))
}
if opts.RegisterInterval == 0 {
opt = append(opt, server.RegisterInterval(app.Options().RegisterInterval))
}
if opts.RegisterTTL == 0 {
opt = append(opt, server.RegisterTTL(app.Options().RegisterTTL))
}
if len(opts.Name) == 0 {
opt = append(opt, server.Name(subclass.GetType()))
}
if len(opts.ID) == 0 {
opt = append(opt, server.ID(mqanttools.GenerateID().String()))
}
if len(opts.Version) == 0 {
opt = append(opt, server.Version(subclass.Version()))
}
server := server.NewServer(opt...)
err := server.OnInit(subclass, app, settings)
if err != nil {
log.Warning("server OnInit fail id(%s) error(%s)", m.GetServerID(), err)
}
hostname, _ := os.Hostname()
server.Options().Metadata["hostname"] = hostname
server.Options().Metadata["pid"] = fmt.Sprintf("%v", os.Getpid())
ctx, cancel := context.WithCancel(context.Background())
m.exit = cancel
m.serviceStopeds = make(chan bool)
m.service = service.NewService(
service.Server(server),
service.RegisterTTL(app.Options().RegisterTTL),
service.RegisterInterval(app.Options().RegisterInterval),
service.Context(ctx),
)
go func() {
err := m.service.Run()
if err != nil {
log.Warning("service run fail id(%s) error(%s)", m.GetServerID(), err)
}
close(m.serviceStopeds)
}()
m.GetServer().SetListener(m)
}All the deeper things in the framework It's all from here , Mainly a service,service Depend on server, So let's take a look at server Creation and initialization of
One :server Creation and initialization of
// NewServer returns a new server with options passed in
func NewServer(opt ...Option) Server {
return newRPCServer(opt...)
}
func newRPCServer(opts ...Option) Server {
options := newOptions(opts...)
return &rpcServer{
opts: options,
exit: make(chan chan error),
}
}
func newOptions(opt ...Option) Options {
opts := Options{
Metadata: map[string]string{},
}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}
if len(opts.Address) == 0 {
opts.Address = DefaultAddress
}
if len(opts.Name) == 0 {
opts.Name = DefaultName
}
if len(opts.ID) == 0 {
opts.ID = DefaultID
}
if len(opts.Version) == 0 {
opts.Version = DefaultVersion
}
return opts
}
func (s *rpcServer) OnInit(module module.Module, app module.App, settings *conf.ModuleSettings) error {
server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPC
if err != nil {
log.Warning("Dial: %s", err)
}
s.server = server
s.opts.Address = server.Addr()
if err := s.ServiceRegister(); err != nil {
return err
}
return nil
}

This server What's done in the bag is actually right rpc_server and consul Packaging ,OnInit Only then created a local RPC, And then assign it to server object
server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPCSetListener、Register、RegisterGO All calls are created above rpc It's the same as api
ServiceRegister and ServiceDeregister Mainly based on consul Requirements for registration of relevant information and elimination of registration information
OnDestroy() Mainly called Stop()---> s.server.Done() call rpcserver Of Done()
func (s *RPCServer) Done() (err error) {
// Wait for the request being executed to complete
//close(s.mq_chan) // close mq_chan passageway
//<-s.call_chan_done //mq_chan The information of the channel has been processed
s.wg.Wait()
//s.call_chan_done <- nil
// Close the queue link
if s.nats_server != nil {
err = s.nats_server.Shutdown()
}
return
}server This package There are so many things , That's right rpcserver Packaging + Service registration and discovery Provide the interface
Two :service The creation of
// NewService NewService
func NewService(opts ...Option) Service {
return newService(opts...)
}
func newService(opts ...Option) Service {
options := newOptions(opts...)
return &service{
opts: options,
}
}
func newOptions(opts ...Option) Options {
opt := Options{
Registry: registry.DefaultRegistry,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}service The creation of It's all old school ,service After creation, it is called Run()
func (s *service) Run() error {
if err := s.Start(); err != nil {
return err
}
// start reg loop
ex := make(chan bool)
go s.run(ex)
//ch := make(chan os.Signal, 1)
//signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
//case <-ch:
// wait on context cancel
case <-s.opts.Context.Done():
}
// exit reg loop
close(ex)
return s.Stop()
}Run() There is nothing in it , The first is to call service Their own Start(), Mainly call server Of Start()
func (s *service) Start() error {
for _, fn := range s.opts.BeforeStart {
if err := fn(); err != nil {
return err
}
}
if err := s.opts.Server.Start(); err != nil {
return err
}
if err := s.opts.Server.ServiceRegister(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
}
}
return nil
}server Of Start() In fact, nothing was done inside , however server Of ServiceRegister() But it's done , As already said . Actually server It has been executed once after the creation ServiceRegister, then serverice There's another one inside , this 2 Time Just remove it once , Personally, I think it's a little redundant .BeforeStart() and AfterStart() Set a callback and rent a call , It is seldom used
service Of Start() When it's done, you'll call your own run() 了
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.ServiceRegister()
if err != nil {
log.Warning("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}The main thing here is to turn on the timer , Timed call ServiceRegister()
then select Keep blocking waiting for the one created in front ctx Completion

That is to say, at this time service I'm running normally , When the customized module is OnDestroy()


m.exit() It's called cancel(), Called cancel() after service Of Run() Medium select There will be no more congestion

That is, the one in front ctx complete ,ex Then it is closed , that run() The timer in will be stopped ,s.Stop() Mainly called server Of ServiceDeregister() and Stop() It's called (Stop What I did has been said before ).Run() The life cycle of is coming ,serviceStopeds Then it is closed . Then it will be executed m.GetServer().OnDestroy() ,server Of OnDestroy() Also call server Of Stop(), Actually, I've done it once before .
边栏推荐
- micro、M3O微服务系列(三)
- 1. PostgreSQL queries the data of nearly 24 hours according to the dynamic table name
- Intel + Lenovo jointly launched open source cloud solutions
- To build agile teams, these methods are indispensable
- 通过Dao投票STI的销毁,SeekTiger真正做到由社区驱动
- windows10:vscode下go语言的适配
- Buddy: initialize memory domain
- IN Tech 2022|英特尔技术产品创新速览
- 【数据库】期末必知必会-----第一章 数据库概述
- Introduction au cadre Maui 05 compréhension du modèle de données mvvm
猜你喜欢
![[database] knowledge and skills at the end of the term ----- Chapter 9 database design](/img/8d/90ccb1eac114706d27fccc1250b58c.png)
[database] knowledge and skills at the end of the term ----- Chapter 9 database design

英特尔+联想共同推出开源云解决方案

Live broadcast of cloud intelligence face to face is waiting for you: computing power redefines productivity

Wechat official account page authorization 40029 error "suggested collection"

In tech 2022 | Intel technology product innovation quick view

小程序毕设作品之微信电子书阅读小程序毕业设计(2)小程序功能

micro、M3O微服务系列(三)

Skillfully use enterprise network disk to collect reports or summaries

FTXUI基础笔记(botton按钮组件基础)

donet framework4.X==windows窗体应用新建项目,通过System.Data.SqlClient连接sqlserver进行查询
随机推荐
Cocos creator 3.0 Basics - common operations
minimum spanning tree
Unity Shader - “快速“ 次散射 (Fast SSS : Fast Subsurface Scattering)
priority_queue的介绍及其使用
Wechat online education video on demand learning applet graduation design (3) background function
Skillfully use enterprise network disk to collect reports or summaries
让程序员早点下班的《技术写作指南》
Deconstruction of typescript array / object / string / function parameters
Timeline components
Chapter 3 performance platform godeye source code analysis - memory module
微信附近的人小程序怎么开(开通附近小程序的方法)
软件测试-进阶篇
[database] must know at the end of the term ----- Chapter 6 experiment
IN Tech 2022|英特尔技术产品创新速览
英特尔+联想共同推出开源云解决方案
【Notebook系列第七期】OpenVINO预训练模型的的下载和使用方法
Unity shader - "fast SSS: fast subsurface scattering"
Mathematical modeling learning (67): detailed introduction to xgboost classification model case tutorial
机器学习09:无监督学习
Leetcode 931: minimum sum of descent path