/usr/bin/salt-master
from salt.scripts import salt_master
salt_master()
import salt.cli.daemons
from salt.utils import parsers, ip_bracket
# 初始化默认参数
import salt.config as config DEFAULT_MASTER_OPTS = {...} # 设置默认值
import salt.syspaths as syspaths
# 注册option,并从syspaths设置默认值
master = salt.cli.daemons.Master()
# 继承
parsers.MasterOptionParser
# 继承
OptionParser # 继承
optparse.OptionParser # 调用parsers.MasterOptionParser父类的_mixin_setup来注册选项
# 默认值从salt.syspaths读取
__init__(self, *args, **kwargs) 设置version description,usage等默认值
kwargs.setdefault('usage', self.usage) optparse.OptionParser.__init__(self, *args, **kwargs) self._populate_option_list(option_list,
add_help=add_help_option) OptionParser._populate_option_list(self, option_list, add_help=True) optparse.OptionParser._populate_option_list(
self, option_list, add_help=add_help
) # 调用parsers.MasterOptionParser父类的_mixin_setup来注册选项
for mixin_setup_func in self._mixin_setup_funcs:
mixin_setup_func(self) # 注册--config-dir选项
ConfigDirMixIn._mixin_setup(self) config_dir = os.environ.get('SALT_CONFIG_DIR', None) if not config_dir:
config_dir = syspaths.CONFIG_DIR self.add_option(
'-c', '--config-dir', default=config_dir,
help=('Pass in an alternative configuration directory. Default: '
'%default')
) # 继承
ConfigDirMixIn
MergeConfigMixIn
LogLevelMixIn
RunUserMixin
DaemonMixIn
PidfileMixin
SaltfileMixIn # 基类
__metaclass__ = OptionParserMeta _mixin_setup_funcs.append(继承类._mixin_setup_funcs) _mixin_after_parsed_funcs.append(继承类._mixin_after_parsed_funcs)
master.start()
# 解析配置文件和命令名参数
self.prepare()
# 解析命令行参数,解析saltfile master配置文件,
# 设置到self.config
self.parse_args() options, args = optparse.OptionParser.parse_args(self, args, values) self.options, self.args = options, args # 根据解析出来options选项名,拼接成process_{选项名}
# 如process_saltfile,并添加到process_option_funcs数组
# 并执行相应函数
process_option_funcs = []
for option_key in options.__dict__:
process_option_func = getattr(
self, 'process_{0}'.format(option_key), None
)
if process_option_func is not None:
process_option_funcs.append(process_option_func)
for process_option_func in _sorted(process_option_funcs):
process_option_func() process_saltfile() # 如self.options.saltfile为空,尝试从环境变量获取
self.options.saltfile = os.environ.get('SALT_SALTFILE', None) # 如还是为空,查看当前目录是否有Saltfile文件
saltfile = os.path.join(os.getcwd(), 'Saltfile') # 如继续为空,就是直接返回了
if not self.options.saltfile:
return # 读取yaml配置文件到字典
saltfile_config = config._read_conf_file(saltfile) process_config_dir() # 设置master选项值到self.config
self.config.update(self.setup_config()) # 根据DEFAULT_MASTER_OPTS和master配置文件来设置选项值,
# 并返回词典
return config.master_config(self.get_config_file_path()) # 设置日志等级
process_log_level() process_log_file() import salt.master self.master = salt.master.Master(self.config) # 继承
SMaster __init__(self, opts) 对ZMQ版本低于3.2的警告 SMaster.__init__(self, opts) # 多进程,共享内存
SMaster.aes = multiprocessing.Array(ctypes.c_char, salt.crypt.Crypticle.generate_key_string()) # 设置守护进程
self.daemonize_if_required() # 把进程pid写入文件
self.set_pidfile()
# 启动多进程
self.master.start()
# 启动前检查,如检查文件服务器后端
self._pre_flight() # 注册sigusr1信号,用来输出堆栈到/tmp目录下的文件
enable_sigusr1_handler # 注册sigusr2信号,使用yappi输出profile信息
enable_sigusr2_handler # 设置最大文件打开数
self.__set_max_open_files() # 初始化进程管理器
process_manager = salt.utils.process.ProcessManager() #启动master maintenance进程
process_manager.add_process(Maintenance, args=(self.opts,)) process = Maintenance(*args, **kwargs) process.start() self.run() self._post_fork_init() # Init fileserver manager
self.fileserver = salt.fileserver.Fileserver(self.opts) # load runners
runner_client = salt.runner.RunnerClient(ropts) # load returner
self.returners = salt.loader.returners(self.opts, {}) # Init Scheduler
self.schedule = salt.utils.schedule.Schedule(self.opts,
runner_client.functions_dict(),
returners=self.returners)
self.ckminions = salt.utils.minions.CkMinions(self.opts)
# Make Event bus for firing
self.event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'])
# Init any values needed by the git ext pillar
self.pillargitfs = salt.daemons.masterapi.init_git_pillar(self.opts)
# Set up search object
self.search = salt.search.Search(self.opts) while True:
now = int(time.time())
if (now - last) >= self.loop_interval:
salt.daemons.masterapi.clean_old_jobs(self.opts)
salt.daemons.masterapi.clean_expired_tokens(self.opts)
self.handle_search(now, last)
self.handle_pillargit()
self.handle_schedule()
self.handle_presence(old_present)
self.handle_key_rotate(now)
salt.daemons.masterapi.fileserver_update(self.fileserver)
salt.utils.verify.check_max_open_files(self.opts)
last = now
try:
time.sleep(self.loop_interval)
except KeyboardInterrupt:
break # 启动publisher进程 ZMQ
process_manager.add_process(Publisher, args=(self.opts,)) process = Publisher(*args, **kwargs) process.start() self.run() # master event进程 ZMQ
process_manager.add_process(salt.utils.event.EventPublisher, args=(self.opts,)) # reply server 进程
process_manager.add_process(run_reqserver)