獲取一個類的所有子類
def itersubclasses(cls, _seen=None):
??? """Generator over all subclasses of a given class in depth first order."""
??? if not isinstance(cls, type):
??????? raise TypeError(_('itersubclasses must be called with '
????????????????????????? 'new-style classes, not %.100r') % cls)
??? _seen = _seen or set()
??? try:
??????? subs = cls.__subclasses__()
??? except TypeError:?? # fails only when cls is type
??????? subs = cls.__subclasses__(cls)
??? for sub in subs:
??????? if sub not in _seen:
??????????? _seen.add(sub)
??????????? yield sub
??????????? for sub in itersubclasses(sub, _seen):
??????????????? yield sub
簡單的線程配合
import threading
is_done = threading.Event()
consumer = threading.Thread(
??? target=self.consume_results,
??? args=(key, self.task, runner.result_queue, is_done))
consumer.start()
self.duration = runner.run(
??????? name, kw.get("context", {}), kw.get("args", {}))
is_done.set()
consumer.join() #主線程堵塞,直到consumer運行結束
多說一點,threading.Event()也可以被替換為threading.Condition(),condition有notify(), wait(), notifyAll()。解釋如下:
The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.
The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.
Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.
# Consume one item
cv.acquire()
while not an_item_is_available():
??? cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
計算運行時間
class Timer(object):
??? def __enter__(self):
??????? self.error = None
??????? self.start = time.time()
??????? return self
??? def __exit__(self, type, value, tb):
??????? self.finish = time.time()
??????? if type:
??????????? self.error = (type, value, tb)
??? def duration(self):
??????? return self.finish - self.start
with Timer() as timer:
??? func()
return timer.duration()
元類
__new__()方法接收到的參數依次是:
當前準備創建的類的對象;
類的名字;
類繼承的父類集合;
類的方法集合;
class ModelMetaclass(type):
??? def __new__(cls, name, bases, attrs):
??????? if name=='Model':
??????????? return type.__new__(cls, name, bases, attrs)
??????? mappings = dict()
??????? for k, v in attrs.iteritems():
??????????? if isinstance(v, Field):
??????????????? print('Found mapping: %s==>%s' % (k, v))
??????????????? mappings[k] = v
??????? for k in mappings.iterkeys():
??????????? attrs.pop(k)
??????? attrs['__table__'] = name # 假設表名和類名一致
??????? attrs['__mappings__'] = mappings # 保存屬性和列的映射關系
??????? return type.__new__(cls, name, bases, attrs)
class Model(dict):
??? __metaclass__ = ModelMetaclass
??? def __init__(self, **kw):
??????? super(Model, self).__init__(**kw)
??? def __getattr__(self, key):
??????? try:
??????????? return self[key]
??????? except KeyError:
??????????? raise AttributeError(r"'Model' object has no attribute '%s'" % key)
??? def __setattr__(self, key, value):
??????? self[key] = value
??? def save(self):
??????? fields = []
??????? params = []
??????? args = []
??????? for k, v in self.__mappings__.iteritems():
??????????? fields.append(v.name)
??????????? params.append('?')
??????????? args.append(getattr(self, k, None))
??????? sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))
??????? print('SQL: %s' % sql)
??????? print('ARGS: %s' % str(args))
class Field(object):
??? def __init__(self, name, column_type):
??????? self.name = name
??????? self.column_type = column_type
??? def __str__(self):
??????? return '<%s:%s>' % (self.__class__.__name__, self.name)
class StringField(Field):
??? def __init__(self, name):
??????? super(StringField, self).__init__(name, 'varchar(100)')
class IntegerField(Field):
??? def __init__(self, name):
??????? super(IntegerField, self).__init__(name, 'bigint')
class User(Model):
??? # 定義類的屬性到列的映射:
??? id = IntegerField('id')
??? name = StringField('username')
??? email = StringField('email')
??? password = StringField('password')
# 創建一個實例:
u = User(id=12345, name='Michael', email='test@orm.org', password='my-pwd')
# 保存到數據庫:
u.save()
輸出如下:
Found model: User
Found mapping: email ==>
Found mapping: password ==>
Found mapping: id ==>
Found mapping: name ==>
SQL: insert into User (password,email,username,uid) values (?,?,?,?)
ARGS: ['my-pwd', 'test@orm.org', 'Michael', 12345]
SQLAlchemy簡單使用
# 導入:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 創建對象的基類:
Base = declarative_base()
# 定義User對象:
class User(Base):
??? # 表的名字:
??? __tablename__ = 'user'
??? # 表的結構:
??? id = Column(String(20), primary_key=True)
??? name = Column(String(20))
# 初始化數據庫連接:
engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test') # '數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名'
# 創建DBSession類型:
DBSession = sessionmaker(bind=engine)
# 創建新User對象:
new_user = User(id='5', name='Bob')
# 添加到session:
session.add(new_user)
# 提交即保存到數據庫:
session.commit()
# 創建Query查詢,filter是where條件,最后調用one()返回唯一行,如果調用all()則返回所有行:
user = session.query(User).filter(User.id=='5').one()
# 關閉session:
session.close()
WSGI簡單使用和Web框架Flask的簡單使用
from wsgiref.simple_server import make_server
def application(environ, start_response):
??? start_response('200 OK', [('Content-Type', 'text/html')])
??? return '
Hello, web!
'# 創建一個服務器,IP地址為空,端口是8000,處理函數是application:
httpd = make_server('', 8000, application)
print "Serving HTTP on port 8000..."
# 開始監聽HTTP請求:
httpd.serve_forever()
了解了WSGI框架,我們發現:其實一個Web App,就是寫一個WSGI的處理函數,針對每個HTTP請求進行響應。
但是如何處理HTTP請求不是問題,問題是如何處理100個不同的URL。
一個最簡單和最土的想法是從environ變量里取出HTTP請求的信息,然后逐個判斷。
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def home():
??? return '
Home
'@app.route('/signin', methods=['GET'])
def signin_form():
??? return ''' '''
@app.route('/signin', methods=['POST'])
def signin():
??? # 需要從request對象讀取表單內容:
??? if request.form['username']=='admin' and request.form['password']=='password':
??????? return '
Hello, admin!
'??? return '
Bad username or password.
'if __name__ == '__main__':
??? app.run()
格式化顯示json
print(json.dumps(data, indent=4))
# 或者
import pprint
pprint.pprint(data)
實現類似Java或C中的枚舉
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import itertools
import sys
class ImmutableMixin(object):
??? _inited = False
??? def __init__(self):
??????? self._inited = True
??? def __setattr__(self, key, value):
??????? if self._inited:
??????????? raise Exception("unsupported action")
??????? super(ImmutableMixin, self).__setattr__(key, value)
class EnumMixin(object):
??? def __iter__(self):
??????? for k, v in itertools.imap(lambda x: (x, getattr(self, x)), dir(self)):
??????????? if not k.startswith('_'):
??????????????? yield v
class _RunnerType(ImmutableMixin, EnumMixin):
??? SERIAL = "serial"
??? CONSTANT = "constant"
??? CONSTANT_FOR_DURATION = "constant_for_duration"
??? RPS = "rps"
if __name__=="__main__":
??? print _RunnerType.CONSTANT
創建文件時指定權限
import os
def write_to_file(path, contents, umask=None):
??? """Write the given contents to a file
??? :param path: Destination file
??? :param contents: Desired contents of the file
??? :param umask: Umask to set when creating this file (will be reset)
??? """
??? if umask:
??????? saved_umask = os.umask(umask)
??? try:
??????? with open(path, 'w') as f:
??????????? f.write(contents)
??? finally:
??????? if umask:
??????????? os.umask(saved_umask)
if __name__ == '__main__':
??? write_to_file('/home/kong/tmp', 'test', 31)
??? # Then you will see a file is created with permission 640.
??? # Warning: If the file already exists, its permission will not be changed.
??? # Note:For file, default all permission is 666, and 777 for directory.
多進程并發執行
import multiprocessing
import time
import os
def run(flag):
??? print "flag: %s, sleep 2s in run" % flag
??? time.sleep(2)
??? print "%s exist" % flag
??? return flag
if __name__ == '__main__':
??? pool = multiprocessing.Pool(3)
??? iter_result = pool.imap(run, xrange(6))
??? print "sleep 5s\n\n"
??? time.sleep(5)
??? for i in range(6):
??????? try:
??????????? result = iter_result.next(600)
??????? except multiprocessing.TimeoutError as e:
??????????? raise
??????? print result
??? pool.close()
??? pool.join()
運行時自動填充函數參數
import decorator
def default_from_global(arg_name, env_name):
??? def default_from_global(f, *args, **kwargs):
??????? id_arg_index = f.func_code.co_varnames.index(arg_name)
??????? args = list(args)
??????? if args[id_arg_index] is None:
??????????? args[id_arg_index] = get_global(env_name)
??????????? if not args[id_arg_index]:
??????????????? print("Missing argument: --%(arg_name)s" % {"arg_name": arg_name})
??????????????? return(1)
??????? return f(*args, **kwargs)
??? return decorator.decorator(default_from_global)
# 如下是一個裝飾器,可以用在需要自動填充參數的函數上。功能是:
# 如果沒有傳遞函數的deploy_id參數,那么就從環境變量中獲取(調用自定義的get_global函數)
with_default_deploy_id = default_from_global('deploy_id', ENV_DEPLOYMENT)???
嵌套裝飾器
validator函數裝飾func1,func1使用時接收參數(*arg, **kwargs),而func1又裝飾func2(其實就是Rally中的scenario函數),給func2增加validators屬性,是一個函數的列表,函數的接收參數config, clients, task。這些函數最終調用func1,傳入參數(config, clients, task, *args, **kwargs),所以func1定義時參數是(config, clients, task, *arg, **kwargs)?
最終實現的效果是,func2有很多裝飾器,每個都會接收自己的參數,做一些校驗工作。
def validator(fn):
??? """Decorator that constructs a scenario validator from given function.
??? Decorated function should return ValidationResult on error.
??? :param fn: function that performs validation
??? :returns: rally scenario validator
??? """
??? def wrap_given(*args, **kwargs):
??????? """Dynamic validation decorator for scenario.
??????? :param args: the arguments of the decorator of the benchmark scenario
??????? ex. @my_decorator("arg1"), then args = ('arg1',)
??????? :param kwargs: the keyword arguments of the decorator of the scenario
??????? ex. @my_decorator(kwarg1="kwarg1"), then kwargs = {"kwarg1": "kwarg1"}
??????? """
??????? def wrap_validator(config, clients, task):
??????????? return (fn(config, clients, task, *args, **kwargs) or
??????????????????? ValidationResult())
??????? def wrap_scenario(scenario):
??????????? wrap_validator.permission = getattr(fn, "permission",
??????????????????????????????????????????????? consts.EndpointPermission.USER)
??????????? if not hasattr(scenario, "validators"):
??????????????? scenario.validators = []
??????????? scenario.validators.append(wrap_validator)
??????????? return scenario
??????? return wrap_scenario
??? return wrap_given
inspect庫的一些常見用法
inspect.getargspec(func) 獲取函數參數的名稱和默認值,返回一個四元組(args, varargs, keywords, defaults),其中:
args是參數名稱的列表;
varargs和keywords是*號和**號的變量名稱;
defaults是參數默認值的列表;
inspect.getcallargs(func[, *args][, **kwds]) 綁定函數參數。返回綁定后函數的入參字典。
python中的私有屬性和函數
Python把以兩個或以上下劃線字符開頭且沒有以兩個或以上下劃線結尾的變量當作私有變量。私有變量會在代碼生成之前被轉換為長格式(變為公有),這個過程叫"Private name mangling",如類A里的__private標識符將被轉換為_A__private,但當類名全部以下劃線命名的時候,Python就不再執行軋壓。而且,雖然叫私有變量,仍然有可能被訪問或修改(使用_classname__membername),所以, 總結如下:
無論是單下劃線還是雙下劃線開頭的成員,都是希望外部程序開發者不要直接使用這些成員變量和這些成員函數,只是雙下劃線從語法上能夠更直接的避免錯誤的使用,但是如果按照_類名__成員名則依然可以訪問到。單下劃線的在動態調試時可能會方便一些,只要項目組的人都遵守下劃線開頭的成員不直接使用,那使用單下劃線或許會更好。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
