0
点赞
收藏
分享

微信扫一扫

python 远程调度进程服务与客户端


python 远程调度进程服务与客户端


核心思想: 在本地或远程机器上创建一个进程,提供调度服务。使用了 APScheduler。

安装:APScheduler

$ wget https://pypi.python.org/packages/89/52/22f11f5ca425f16409797c5c651de380824a252acd1ced4ef1ee54507bbf/APScheduler-3.1.0.tar.gz#md5=fae7e2a06a5f4b608599bf8b237bb40a
$ python2.7 setup.py build
$ python2.7 setup.py install

测试:


$ python2.7 -c "from apscheduler.schedulers.blocking import BlockingScheduler"


服务端:

#!/usr/bin/python2.7
#-*- coding: UTF-8 -*-
#
# apsserver.py
#
# Refer:
# http://www.cnblogs.com/Xjng/p/4902514.html
#
# Queues are thread and process safe.
# Refer:
# https://docs.python.org/2/library/multiprocessing.html
#
########################################################################
# The MIT License (MIT)
# http://opensource.org/licenses/MIT
#
# Copyright (c) 2015 copyright cheungmine
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject
# to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
########################################################################

import os, time, datetime

from multiprocessing.managers import BaseManager
from multiprocessing import RLock

# aps
from apscheduler.schedulers.background import BackgroundScheduler
import logging
logging.basicConfig()

# global variables
#
lock = RLock()

scheduler = BackgroundScheduler()
scheduler.start()


class APSchedulerManager(BaseManager): pass

def tick():
print('Tick! The time is: %s' % datetime.datetime.now())


# thread safe class
#
class APSchedulerConnectProxy(object):
global scheduler

def AddJob(self, config):
print("AddJob: %s" % config)
return scheduler.add_job(tick, 'interval', seconds=3)


def Start(self):
scheduler.start()


def Shutdown(self, wait):
scheduler.shutdown()


###########################################################
#
class APSchedulerManagerServer:
APSchedulerManager.register('get_lock', callable=lambda: lock)
APSchedulerManager.register('APSchedulerConnect', APSchedulerConnectProxy)

def __init__(self, host, port, passkey):
self.manager = APSchedulerManager(address=(host, port), authkey=passkey)
self.server = self.manager.get_server()
pass


def start(self):
self.server.serve_forever()
pass


def stop(self):
self.server.shutdown()
self.is_stop = 1
pass

###########################################################
# start_apserver('', 12345, 'abc')
#
def start_apserver(host, port, passkey):
from apsserver import APSchedulerManagerServer
server = APSchedulerManagerServer(host, port, passkey)
server.start()


###########################################################
# main
if __name__ == "__main__":
from apsserver import APSchedulerManagerServer

print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
start_apserver('', 12345, 'abc')
except (KeyboardInterrupt, SystemExit):
print('\nUser has pressed Ctrl+Break')
finally:
pass


客户端:

#!/usr/bin/python2.7
#-*- coding: UTF-8 -*-
#
# apsclient.py
#
# Refer:
# http://www.cnblogs.com/Xjng/p/4902514.html
#
# Queues are thread and process safe.
# Refer:
# https://docs.python.org/2/library/multiprocessing.html
#
########################################################################
# The MIT License (MIT)
# http://opensource.org/licenses/MIT
#
# Copyright (c) 2015 copyright cheungmine
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject
# to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
########################################################################

from multiprocessing.managers import BaseManager
from multiprocessing import RLock

# aps
from apscheduler.schedulers.background import BackgroundScheduler
import logging
logging.basicConfig()


class APSchedulerConnectProxy(object): pass

class APSchedulerManager(BaseManager): pass


class APSchedulerManagerClient:
APSchedulerManager.register('get_lock', callable=lambda: lock)
APSchedulerManager.register('APSchedulerConnect', callable=lambda: APSchedulerConnectProxy)

def __init__(self, servhost, servport, authkey):
self.config = {
"host": servhost,
"port": int(servport),
"authkey": authkey
}

self._lock = None
self.manager = APSchedulerManager(address=(self.config['host'], self.config['port']), authkey=self.config['authkey'])
self.manager.connect()
pass


def __del__(self):
self.unlock()
pass


def lock(self):
if not self._lock:
self._lock = self.manager.get_lock()
self._lock.acquire()
pass


def unlock(self):
if self._lock:
l = self._lock
self._lock = None
l.release()
pass


def getAPSchedulerConnect(self):
self.apschedulerConn = self.manager.APSchedulerConnect()
return self.apschedulerConn


###########################################################
# connect_apserver('', 12345, 'abc')
#
def connect_apserver(host, port, passkey):
from apsclient import APSchedulerManagerClient

client = APSchedulerManagerClient(host, port, passkey)

print "connect to apsserver success"

apschedulerConn = client.getAPSchedulerConnect()
apschedulerConn.AddJob("hello apsserver")


###########################################################
# main
if __name__ == "__main__":
import os

print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
connect_apserver('', 12345, 'abc')
except (KeyboardInterrupt, SystemExit):
print('\nUser has pressed Ctrl+Break')
finally:
pass


启动服务端与客户端。服务端输出:

 ./apsserver.py
Press Ctrl+C to exit
AddJob: hello apsserver
Tick! The time is: 2016-04-26 17:38:02.899242
Tick! The time is: 2016-04-26 17:38:05.899450
Tick! The time is: 2016-04-26 17:38:08.899632
Tick! The time is: 2016-04-26 17:38:11.900290


举报

相关推荐

0 条评论