-
/usr/bin/salt-minion
- from salt.scripts import salt_minion
-
salt_minion()
- while true:
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=minion_process, args=(queue,))
-
process.start()
- os.fork()
-
minion_process(queue)
- minion = salt.cli.daemons.Minion()
- # 继承
parsers.MinionOptionParser
- # 继承
MasterOptionParser
- # 重写setup_config函数,解析minion配置文件
def setup_config(self):
return config.minion_config(self.get_config_file_path(),
cache_minion_id=True)
- MinionOptionParser与MasterOptionParser几乎一样,
都是解析命令行参数和解析YAML配置文件,唯一区别只是配置文件不同,
具体的讲解可以参考salt-master启动过程.
- minion.start()
- self.prepare()
- # 解析命令行参数及minion配置文件
self.parse_args()
- # 是否以守护进程运行
self.daemonize_if_required()
- self.minion = salt.minion.Minion(self.config)
- # 继承
MinionBase
- def __init__(self, opts, timeout=60,
safe=True, loaded_base_name=None):
- # 加载grains模块
opts['grains'] = salt.loader.grains(opts)
- # 评估master连接及认证
opts['master'] = self.eval_master(opts,
timeout,
safe)
- # 加载pillar
self.opts['pillar'] = salt.pillar.get_pillar(
opts,
opts['grains'],
opts['id'],
opts['environment']
).compile_pillar()
- self.minion.tune_in()
- # 绑定zmq pub pull事件系统
self._prepare_minion_event_system()
- self._init_context_and_poller()
- self.context = zmq.Context()
self.poller = zmq.Poller()
- self.epub_sock = self.context.socket(zmq.PUB)
- self.epull_sock = self.context.socket(zmq.PULL)
- self.epub_sock.bind(epub_uri)
- self.epull_sock.bind(epull_uri)
- self.socket = self.context.socket(zmq.SUB)
- self._set_reconnect_ivl()
- self.socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
- # 设置订阅主题
self._setsockopts()
- self._set_monitor_socket()
- self.monitor_socket = self.socket.get_monitor_socket()
- t = threading.Thread(target=self._socket_monitor, args=(self.monitor_socket,))
- t.start()
- # 连接master pub消息服务器
self.socket.connect(self.master_pub)
- self.poller.register(self.socket, zmq.POLLIN)
self.poller.register(self.epull_sock, zmq.POLLIN)
- # 连接master的ReqServer,默认端口为4506,通知minion已启动
self._fire_master_minion_start()
- self._state_run()
- while self._running is True:
- loop_interval = self.process_schedule(self, loop_interval)
- socks = self._do_poll(loop_interval)
- # 接收来自master的pub 消息,并执行_handle_payload
self._do_socket_recv(socks)
- messages = self.socket.recv_multipart(zmq.NOBLOCK)
payload = self.serial.loads(messages[1])
- self._handle_payload(payload)
- # 接收来自本地的push event,执行hand_event,
# 并publish给本地
self._do_event_poll(socks)
- package = self.epull_sock.recv(zmq.NOBLOCK)
- self.handle_event(package)
- self.epub_sock.send(package)
- # 执行beacons系统
self._process_beacons()
- # 如果出现异常
queue.put(random_delay)
- process.join()
- # 到这步就表示子进程退出了,
# 如果restart_delay为0,表示子进程为正常退出,不需要重启
restart_delay = queue.get(block=False)
- # 等待几秒钟重新启动
time.sleep(restart_delay)