# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
import logging
from logging.config import fileConfig
import requests
import Clinet # 私有的模块
fileConfig("logging_config.ini")
logger = logging.getLogger("killduplicatedjob")
#配置可以单独放到一个模块中
DB_USER = "xxxxxxx"
DB_PASSWORD = "xxxxxxxx"
DB_PORT = 111111
DB_HOST_PORT = "xxxxxxxxxx"
DB_DATA_BASE = "xxxxxxxxxxx"
REST_API_URL = "http://sample.com"
engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE))
# 这个 class 是为了在函数间传递时,不需要使用方了解属性的具体顺序而写的,也可以放到一个单独的模块中
class DuplicatedJobs(object):
def __init__(self, app_id, app_name, user):
self.app_id = app_id
self.app_name = app_name
self.user = user
def __repr__(self):
return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user)
def find_duplicated_jobs():
logger.info("starting find duplicated jobs")
(running_apps, app_name_to_user) = get_all_running_jobs()
all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue())
duplicated_jobs = []
for app in all_apps_on_yarn:
(app_id, app_name) = app
if app_id not in running_apps:
if not app_name.startswith("test"):
logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id))
user = app_name_to_user[app_name]
duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user))
else:
logger.info("Job[%s] is a test job, would not kill it" % app_name)
logger.info("Find duplicated jobs [%s]" % duplicated_jobs)
return duplicated_jobs
def get_apps_from_yarn_with_queue(queue):
param = {"queue": queue}
r = requests.get(REST_API_URL, params=param)
apps_on_yarn = []
try:
jobs = r.json().get("apps")
app_list = jobs.get("app", [])
for app in app_list:
app_id = app.get("id")
name = app.get("name")
apps_on_yarn.append((app_id, name))
except Exception as e: #Exception 最好进行单独的分开,针对每一种 Exception 进行不同的处理
logger.error("Get apps from Yarn Error, message[%s]" % e.message)
logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn)
return apps_on_yarn
def get_all_running_jobs():
job_infos = get_result_from_mysql("select * from xxxx where xx=yy")
app_ids = []
app_name_to_user = {}
for (topology_id, topology_name) in job_infos:
status_set = get_result_from_mysql("select * from xxxx where xx=yy")
application_id = status_set[0][0]
if "" != application_id:
configed_resource_queue = get_result_from_mysql("select * from xxxx where xx=yy")
app_ids.append(application_id)
app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1]
logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user))
return app_ids, app_name_to_user
def kill_duplicated_jobs(duplicated_jobs):
for job in duplicated_jobs:
app_id = job.app_id
app_name = job.app_name
user = job.user
logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
try:
Client.kill_job(app_id, user)
logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user))
except Exception as e:
logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user))
def get_result_from_mysql(sql):
a = engine.execute(sql)
return a.fetchall()
# 因为下面的资源可能发生变化,而且可能包含一些具体的逻辑,因此单独抽取出来,独立成一个函数
def get_resource_queue():
return "xxxxxxxxxxxxx"
if __name__ == "__main__":
kill_duplicated_jobs(find_duplicated_jobs())