monitor_server.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # Author:qiaozhanwei
  4. '''
  5. yum 安装pip
  6. yum -y install python-pip
  7. pip install kazoo 安装
  8. conda install -c conda-forge kazoo 安装
  9. 运行脚本:
  10. nohup python -u monitor_server.py > nohup.out 2>&1 &
  11. '''
  12. import socket
  13. import os
  14. import sched
  15. import time
  16. from datetime import datetime
  17. from kazoo.client import KazooClient
  18. schedule = sched.scheduler(time.time, time.sleep)
  19. class ZkClient:
  20. def __init__(self):
  21. # hosts配置zk地址集群
  22. self.zk = KazooClient(hosts='ark0:2181,ark1:2181,ark2:2181')
  23. self.zk.start()
  24. # 读取配置文件,组装成字典
  25. def read_file(self,path):
  26. with open(path, 'r') as f:
  27. dict = {}
  28. for line in f.readlines():
  29. arr = line.strip().split('=')
  30. if (len(arr) == 2):
  31. dict[arr[0]] = arr[1]
  32. return dict
  33. # 根据hostname获取ip地址
  34. def get_ip_by_hostname(self,hostname):
  35. return socket.gethostbyname(hostname)
  36. # 重启服务
  37. def restart_server(self,inc):
  38. config_dict = self.read_file('/data1_1T/escheduler/conf/config/run_config.conf')
  39. master_list = config_dict.get('masters').split(',')
  40. master_list = list(map(lambda item : self.get_ip_by_hostname(item),master_list))
  41. worker_list = config_dict.get('workers').split(',')
  42. worker_list = list(map(lambda item: self.get_ip_by_hostname(item), worker_list))
  43. if (self.zk.exists('/escheduler/masters')):
  44. zk_master_list = []
  45. zk_master_nodes = self.zk.get_children('/escheduler/masters')
  46. for zk_master_node in zk_master_nodes:
  47. zk_master_list.append(zk_master_node.split('_')[0])
  48. restart_master_list = list(set(master_list) - set(zk_master_list))
  49. if (len(restart_master_list) != 0):
  50. for master in restart_master_list:
  51. print("master " + self.get_ip_by_hostname(master) + " 服务已经掉了")
  52. os.system('ssh ' + self.get_ip_by_hostname(master) + ' sh /data1_1T/escheduler/bin/escheduler-daemon.sh start master-server')
  53. if (self.zk.exists('/escheduler/workers')):
  54. zk_worker_list = []
  55. zk_worker_nodes = self.zk.get_children('/escheduler/workers')
  56. for zk_worker_node in zk_worker_nodes:
  57. zk_worker_list.append(zk_worker_node.split('_')[0])
  58. restart_worker_list = list(set(worker_list) - set(zk_worker_list))
  59. if (len(restart_worker_list) != 0):
  60. for worker in restart_worker_list:
  61. print("worker " + self.get_ip_by_hostname(worker) + " 服务已经掉了")
  62. os.system('ssh ' + self.get_ip_by_hostname(worker) + ' sh /data1_1T/escheduler/bin/escheduler-daemon.sh start worker-server')
  63. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  64. schedule.enter(inc, 0, self.restart_server, (inc,))
  65. # 默认参数60s
  66. def main(self,inc=60):
  67. # enter四个参数分别为:间隔事件、优先级(用于同时间到达的两个事件同时执行时定序)、被调用触发的函数,
  68. # 给该触发函数的参数(tuple形式)
  69. schedule.enter(0, 0, self.restart_server, (inc,))
  70. schedule.run()
  71. if __name__ == '__main__':
  72. zkClient = ZkClient()
  73. zkClient.main(300)