首页 技术 正文
技术 2022年11月10日
0 收藏 993 点赞 2,284 浏览 58869 个字

一 web的一些框架介绍

 Flask:短小精悍,内部没有包含多少组件,但是第三方的组件是非常丰富的。

 Django:django是一个重武器,内部包含了非常多的组件:orm,form,modelForm,缓存,session等等

 Tornado:牛逼之处就是异步非阻塞框架和node.js

二 Flask的快速入门

 创建python虚拟环境:virtualenv 虚拟名

 安装:pip install flask

 什么是werkzeug:Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。官方的介绍说是一个 WSGI 工具包,它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 Request,Response 等等

from werkzeug.wrappers import Request, Response@Request.application
def hello(request):
return Response('Hello World!')if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost', 4000, hello)

 基本使用:

from flask import Flask
app = Flask(__name__)@app.route('/')
def hello_world():
return 'Hello World!'if __name__ == '__main__':
app.run()

 wsgi:使用werkzeug模块实现的,还可以使用wsgiref实现。本质是导入socket实现的。

   flask框架基础一旦出现这个,监听事件就开始了

 实例化Flask对象

  app.run():监听用户的请求,一旦有用户的请求过来,就会直接执行用户的__call__方法。

  flask的所有相关的组件全都存在flask文件下的,需要什么导入什么

from flask import Flask,render_template,request,redirect,session,url_for

 for_url:高级版的重定向

from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}
@app.route('/')
def hello_world():
return 'Hello World!'# @app.route('/index',methods=['GET'])
# def index():
# user=session.get('user_info')
# print(user)
# if user:
# return render_template('index.html',data=user,user_dict=USERS)
# return redirect('/login')@app.route('/index',methods=['GET'])
def index():
user=session.get('user_info')
if user:
return render_template('index.html',data=user,user_dict=USERS)
url=url_for('')
return redirect(url)@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
user=session.get('user_info')
if not user:
return redirect('/login')
return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST'],endpoint='')
def login():
if request.method=='POST':
user=request.form.get('user')
pwd=request.form.get('pwd')
if user=='fang' and pwd=='':
session['user_info']=user
return redirect('/index')
else:
return render_template('login.html',msg='用户名或密码错误') return render_template('login.html')if __name__ == '__main__':
app.run()

三 配置文件

 开放封闭原则:对代码的修改是封闭的,对配置文件的修改时开放的。

 app.debug=True:修改过后自动重启项目

 app.secret_key=’随机设置字符串’:全局设置session,Session, Cookies以及一些第三方扩展都会用到,

 app.config[‘debug’]=True:修改过后自动重启项目

 app.config:获取当前app的所有配置

 app.config.from_object:导入文件的一个类,

  内部实现原理:导入时,将路径由点分割

 配置文件的格式有:

flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
{
'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
'TESTING': False, 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
}

  格式一:

app.config['DEBUG'] = TruePS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...)

  格式二:

    app.config.from_pyfile("python文件名称")
如:
settings.py
DEBUG = True app.config.from_pyfile("settings.py") app.config.from_envvar("环境变量名称")
环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称")
JSON文件名称,必须是json格式,因为内部会执行json.loads app.config.from_mapping({'DEBUG':True})

  格式三:

    app.config.from_object("python类或类的路径")        app.config.from_object('pro_flask.settings.TestingConfig')        settings.py            class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config):
DEBUG = True class TestingConfig(Config):
TESTING = True PS: 从sys.path中已经存在路径开始写

 settings.py默认路径要放在当前项目的第一级目录下面

 PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True,则就是instance_path目录

四 路由系统

  • @app.route(‘/user/<username>’)
  • @app.route(‘/post/<int:post_id>’)
  • @app.route(‘/post/<float:post_id>’)
  • @app.route(‘/post/<path:path>’)
  • @app.route(‘/login’, methods=[‘GET’, ‘POST’])

 路由比较特殊,:是基于装饰器实现的,但是究其本质还是有add_url_rule实现的。

 装饰器可以有多个,放在上面和下面是不同的,

from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}def get_session(func):
def inner(*args,**kwargs):
user = session.get('user_info')
if not user:
return redirect('/login')
return func(*args,**kwargs)
return inner@app.route('/',endpoint='')
@get_session
def hello_world():
return 'Hello World!'@app.route('/index',methods=['GET'],endpoint='n1')
@get_session
def index():
# user=session.get('user_info')
# print(user)
# if user:
return render_template('index.html',user_dict=USERS)
# return redirect('/login')# @app.route('/index',methods=['GET'])
# def index():
# user=session.get('user_info')
# if user:
# return render_template('index.html',user_dict=USERS)
# url=url_for('111')
# return redirect(url)@app.route('/detail/<int:nid>',methods=['GET'],endpoint='n2')
def detail(nid):
user=session.get('user_info')
if not user:
return redirect('/login')
return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST'],endpoint='')
def login():
if request.method=='POST':
user=request.form.get('user')
pwd=request.form.get('pwd')
if user=='fang' and pwd=='':
session['user_info']=user
return redirect('/index')
else:
return render_template('login.html',msg='用户名或密码错误') return render_template('login.html')if __name__ == '__main__':
app.run()

 注意:这里加上装饰器,会重名,给他设置一个别名endpoint=‘别名’

 functools.wraps(函数):# 帮助我们保存一些设置函数的元信息

from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}import functools
def get_session(func):
@functools.wraps(func)
def inner(*args,**kwargs):
user = session.get('user_info')
if not user:
return redirect('/login')
return func(*args,**kwargs)
return inner@app.route('/',endpoint='')
@get_session
def hello_world():
return 'Hello World!'@app.route('/index',methods=['GET'])
@get_session
def index():
# user=session.get('user_info')
# print(user)
# if user:
return render_template('index.html',user_dict=USERS)
# return redirect('/login')# @app.route('/index',methods=['GET'])
# def index():
# user=session.get('user_info')
# if user:
# return render_template('index.html',user_dict=USERS)
# url=url_for('111')
# return redirect(url)@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
user=session.get('user_info')
if not user:
return redirect('/login')
return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST'])
def login():
if request.method=='POST':
user=request.form.get('user')
pwd=request.form.get('pwd')
if user=='fang' and pwd=='':
session['user_info']=user
return redirect('/index')
else:
return render_template('login.html',msg='用户名或密码错误') return render_template('login.html')if __name__ == '__main__':
app.run()

 FBV的写法:

  使用装饰器,将括号里面的内容添加到路由中@app.route()

DEFAULT_CONVERTERS = {
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}

   methods=[]:里面写的是请求的方法,支持什么方法都要写上什么方法。

      endpoint=别名:为当前的url反向生成一个url,也就是起一个别名

      app.add_url_rule(‘/…’):添加路由的另一种方法

        def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result return inner @app.route('/index.html',methods=['GET','POST'],endpoint='index')
@auth
def index():
return 'Index' 或 def index():
return "Index" self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
or
app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
app.view_functions['index'] = index

 CBV的写法:

  MethodView:API的简单的一种实现方式,class创建的视图类就需要继承它。

class IndexView(views.MethodView):
methods=['GET']
decorators=[auth,]
def get(self):
return 'Index.GET' def post(self):
return 'Index.POST'

  app.add_url_rule(‘/路径’,view_func=’类名’.as_view(name=返回的那个函数)

app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))

  对于CBV来说:虽然传进去的是类名,但是最后返回的还是一个函数。

        def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result return inner class IndexView(views.View):
methods = ['GET']
decorators = [auth, ] def dispatch_request(self):
print('Index')
return 'Index!' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint 或 class IndexView(views.MethodView):
methods = ['GET']
decorators = [auth, ] def get(self):
return 'Index.GET' def post(self):
return 'Index.POST' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint @app.route和app.add_url_rule参数:
rule, URL规则
view_func, 视图函数名称
defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
endpoint=None, 名称,用于反向生成URL,即: url_for('名称')
methods=None, 允许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求,
如:
@app.route('/index',strict_slashes=False),
访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
@app.route('/index',strict_slashes=True)
仅访问 http://www.xx.com/index
redirect_to=None, 重定向到指定地址
如:
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')

def func(adapter, nid):
return "/home/888"
@app.route('/index/<int:nid>', redirect_to=func)
subdomain=None, 子域名访问
from flask import Flask, views, url_for app = Flask(import_name=__name__)
app.config['SERVER_NAME'] = 'wupeiqi.com:5000' @app.route("/", subdomain="admin")
def static_index():
"""Flask supports static subdomains
This is available at static.your-domain.tld"""
return "static.your-domain.tld" @app.route("/dynamic", subdomain="<username>")
def username_index(username):
"""Dynamic subdomains are also supported
Try going to user1.your-domain.tld/dynamic"""
return username + ".your-domain.tld" if __name__ == '__main__':
app.run()

 default:传入函数的参数。就是url后面的那个参数

 subdomain={}:创建子域名

 window设置域名:hosts而文件下面直接就可以设置了C:\Windows\System32\driver\etc\hosts

 mcs系统设置域名:/ect/hosts文件下面就可以设置域名了  

          from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value) def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val # 添加到flask中
app.url_map.converters['regex'] = RegexConverter @app.route('/index/<regex("\d+"):nid>')
def index(nid):
print(url_for('index', nid=''))
return 'Index' if __name__ == '__main__':
app.run()

五 模板

 1、模板的使用:Flask使用的是Jinja2模板,所以其语法和Django无差别

 2、自定义模板方法:Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入

 for循环,并取到索引

  {% for k,v in 对象.items()%}

    字典的取值方法:v.字段名  v[‘字段名’]  v.get(‘字段名’)

  {% endfor %}

 函数渲染:不仅要加上括号,还可以加上参数

  {{函数名(参数)}}  加上|safe:防止xss攻击

<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8">
<title></title>
</head>
<body>
<h1>自定义函数</h1>
{{ww()|safe}}</body>
</html>

  在后台如果使用

from flask import Flask,render_template
app = Flask(__name__)def wupeiqi():
return '<h1>Wupeiqi</h1>'@app.route('/login', methods=['GET', 'POST'])
def login():
return render_template('login.html', ww=wupeiqi)app.run()

 Markup:后台设置xss攻击

 宏定义:就是定义一块html,定义的这一块就是就是一个函数

  {% macre 函数名(参数)%}  {% endmacre %}

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body> {% macro input(name, type='text', value='') %}
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %} {{ input('n1') }} {% include 'tp.html' %} <h1>asdf{{ v.k1}}</h1>
</body>
</html>

六 请求和响应

 请求和响应都是从flask为念中导入的

 request:请求

 response:响应

  jsonify:响应的数据类型不是字符串类型,就是用这个将响应的内容转成字符串。

from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response app = Flask(__name__) @app.route('/login.html', methods=['GET', "POST"])
def login(): # 请求相关信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html') # response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key', 'value')
# response.headers['X-Something'] = 'A value'
# return response return "内容" if __name__ == '__main__':
app.run()

七 Session和Cookie

 session在使用前必须要有app.sceret_key加密,就相当于加盐

 session[‘名’]=字段:设置session

 应用demo:实例,使用装饰器写一个用户认证

  思路:装饰器,一个函数可以加上多个装饰器,反向查找的名称不允许重名:endpoint

  session.pop:删除一个session

 我们这里使用的session是flask内置的使用加密的cookie来保存数据的。

 基本使用:

from flask import Flask, session, redirect, url_for, escape, requestapp = Flask(__name__)@app.route('/')
def index():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
return 'You are not logged in'@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
session['username'] = request.form['username']
return redirect(url_for('index'))
return '''
<form action="" method="post">
<p><input type=text name=username>
<p><input type=submit value=Login>
</form>
'''@app.route('/logout')
def logout():
# remove the username from the session if it's there
session.pop('username', None)
return redirect(url_for('index'))# set the secret key. keep this really secret:
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'基本使用

 自定义session 

pip3 install Flask-Session        run.py
from flask import Flask
from flask import session
from pro_flask.utils.session import MySessionInterface
app = Flask(__name__) app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
app.session_interface = MySessionInterface() @app.route('/login.html', methods=['GET', "POST"])
def login():
print(session)
session['user1'] = 'alex'
session['user2'] = 'alex'
del session['user2'] return "内容" if __name__ == '__main__':
app.run() session.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import uuid
import json
from flask.sessions import SessionInterface
from flask.sessions import SessionMixin
from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin):
def __init__(self, initial=None, sid=None):
self.sid = sid
self.initial = initial
super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value):
super(MySession, self).__setitem__(key, value) def __getitem__(self, item):
return super(MySession, self).__getitem__(item) def __delitem__(self, key):
super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface):
session_class = MySession
container = {} def __init__(self):
import redis
self.redis = redis.Redis() def _generate_sid(self):
return str(uuid.uuid4()) def _get_signer(self, app):
if not app.secret_key:
return None
return Signer(app.secret_key, salt='flask-session',
key_derivation='hmac') def open_session(self, app, request):
"""
程序刚启动时执行,需要返回一个session对象
"""
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid) signer = self._get_signer(app)
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid) # session保存在redis中
# val = self.redis.get(sid)
# session保存在内存中
val = self.container.get(sid) if val is not None:
try:
data = json.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid)
return self.session_class(sid=sid) def save_session(self, app, session, response):
"""
程序结束前执行,可以保存session中所有的值
如:
保存到resit
写入到用户cookie
"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中
# self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
# session保存在内存中
self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)自定义Session

 第三方session

from flask import Flask, session, redirect
from flask.ext.session import Sessionapp = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd'app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='')
Session(app)@app.route('/login')
def login():
session['username'] = 'alex'
return redirect('/index')@app.route('/index')
def index():
name = session['username']
return nameif __name__ == '__main__':
app.run()第三方session

八 闪现

 flash:向某一个地方设置一个值

  category:设置值的分类

 get_flashed_messages:从某一个地方获取多个值,并且清除

  他们也是基于app.secret_key实现的

from flask import Flask,flash,get_flashed_messagesapp = Flask(__name__)
app.secret_key = 'asdfasdf'
@app.route('/get')
def get():
# 从某个地方获取设置过的所有值,并清除。
data = get_flashed_messages()
print(data)
return 'Hello World!'@app.route('/set')
def set():
# 向某个地方设置一个值
flash('阿斯蒂芬') return 'Hello World!'if __name__ == '__main__':
app.run()

  category_filter:只取一类的值

from flask import Flask,flash,get_flashed_messages,request,redirectapp = Flask(__name__)
app.secret_key = 'asdfasdf'@app.route('/index')
def index():
# 从某个地方获取设置过的所有值,并清除。
val = request.args.get('v')
if val == 'oldboy':
return 'Hello World!'
flash('超时错误',category="x1")
return "ssdsdsdfsd"
# return redirect('/error')@app.route('/error')
def error():
"""
展示错误信息
:return:
"""
data = get_flashed_messages(category_filter=['x1'])
if data:
msg = data[0]
else:
msg = "..."
return "错误信息:%s" %(msg,)if __name__ == '__main__':
app.run()

request.query_string.get(‘字段’):获取到所有的url

 request.args.get(‘字段’):获取get请求后面的值

 什么是闪现:设置不管多少次的值,是基于session实现的,只要一次取到全部的值,并且清除

 闪现的用法:应用于临时数据的操作,比如:显示错误信息等等

from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request
app = Flask(__name__)
app.secret_key ='sdfsdfsdf'@app.route('/users')
def users():
# 方式一
# msg = request.args.get('msg','')
# 方式二
# msg = session.get('msg')
# if msg:
# del session['msg']
# 方式三
v = get_flashed_messages()
print(v)
msg = ''
return render_template('users.html',msg=msg)@app.route('/useradd')
def user_add():
# 在数据库中添加一条数据
# 假设添加成功,在跳转到列表页面时,显示添加成功
# 方式一
# return redirect('/users?msg=添加成功')
# 方式二
# session['msg'] = '添加成功'
# 方式三
flash('添加成功')
return redirect('/users')if __name__ == '__main__':
app.run(debug=True)

九 蓝图(blueprint)

 什么是蓝图:功能分类,还可以定义自己的模板和路由分发。就是为了构造目录的。

 蓝图的特性:不仅能将不同功能的app进行分来,并且还可以定义自己的模板路径可以实现路由的分发。可以实现每一个程序构造自己的app。

 Blueprint:创建蓝图  (’蓝图名’,’__name__’)

  url_prefix:为下面所有函数的使用机上一个前缀,可以为统一的某一类加上前缀

  template_folder:定义自己的模板路径(html文件勒颈)

 app.register_blueprint:将蓝图注册到app

 小型应用程序:示例

 大型应用程序:示例

小中型:

flask框架基础

flask框架基础

manage.py

import fcrm
if __name__ == '__main__':
fcrm.app.run()

__init__.py(只要一导入fcrm就会执行__init__.py文件)

flask框架基础

from flask import Flask
#导入accout 和order
from fcrm.views import accout
from fcrm.views import order
app = Flask(__name__)
print(app.root_path) #根目录app.register_blueprint(accout.accout) #吧蓝图注册到app里面,accout.accout是创建的蓝图对象
app.register_blueprint(order.order)

flask框架基础

accout.py

flask框架基础

from flask import  Blueprint,render_template
accout = Blueprint("accout",__name__)@accout.route('/accout')
def xx():
return "accout"@accout.route("/login")
def login():
return render_template("login.html")

flask框架基础

order.py

from flask import Blueprint
order = Blueprint("order",__name__)@order.route('/order')
def register(): #注意视图函数的名字不能和蓝图对象的名字一样
return "order

使用蓝图时需要注意的

flask框架基础

大型:

flask框架基础

flask框架基础

flask框架基础

十 请求的扩展

 类似于django的中间件

 @app.defore_request:定制请求函数,每次请求都会执行有这个装饰器的函数,每次都是从上到下执行的

 request.url:拿到正要运行的url

 @app.after_request:定制响应的函数,每次响应执行这个装饰器的函数,每次都是从下到上执行的

 请求哈响应可以有多个,如果请求给拦截了,但是所有的响应都会执行

 @app.errorhandler:定制错误的信息。

 @ app.template_global:为模板定制函数,也就是自定义模板

 @app.before_first_request:只有第一次请求来了才执行这个函数

from flask import Flask,render_template,request,redirect,session,url_for
app = Flask(__name__)
app.debug = True
app.secret_key = 'siuljskdjfs'@app.before_request
def process_request1(*args,**kwargs):
print('process_request1 进来了')@app.before_request
def process_request2(*args,**kwargs):
print('process_request2 进来了')@app.after_request
def process_response1(response):
print('process_response1 走了')
return response@app.after_request
def process_response2(response):
print('process_response2 走了')
return response@app.errorhandler(404)
def error_404(arg):
return "404错误了"@app.before_first_request
def first(*args,**kwargs):
pass@app.route('/index',methods=['GET'])
def index():
print('index函数')
return "Index"if __name__ == '__main__':
app.run()

 基于中间件实现用户的认证登陆

                        @app.before_request
def process_request(*args,**kwargs):
if request.path == '/login':
return None
user = session.get('user_info')
if user:
return None
return redirect('/login')

 模板中定制方法:

            @app.template_global()
def sb(a1, a2):
return a1 + a2
{{sb(1,2)}} @app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
{{ 1|db(2,3)}}

十一 中间件

 中间件:在这里就是一个请求的入口

 每次请求进来都会执行app的call方法。

from flask import Flaskapp = Flask(__name__)@app.route('/')
def index():
return 'Hello World!'class Md(object):
def __init__(self,old_wsgi_app):
self.old_wsgi_app = old_wsgi_app def __call__(self, environ, start_response):
print('开始之前')
ret = self.old_wsgi_app(environ, start_response)
print('结束之后')
return retif __name__ == '__main__':
app.wsgi_app = Md(app.wsgi_app)
app.run()

十二 上下文处理

 threading.local:为每一个线程开辟一个单独的内存空间来保存他自己的值

import threading# class Foo():
# def __init__(self):
# self.name=0
#
# local_value=Foo()local_value=threading.local()
def func(num):
local_value.name=num
import time
time.sleep(1)
print(local_value.name,threading.current_thread().name)for i in range(20):
th=threading.Thread(target=func,args=(i,),name='线程%s'%i)
th.start()

 request:

  情况一:单线程和单进程的情况下不会有问题,应为自己使用为完毕过后,就会自动的清空request。

  情况二:单进程和多线程,threading.local对象

  情况三:但进程和单线程多个协程,theading.local对象就会出问题。

 解决方法:

  以后不支持协程:就可以使用内置的threading.local对象

  支持协程:自定义类似threading.local对象的功能支持协程

 _thread.get_ident:获取线程的一个唯一标识

 greenlet.getcurrent:获取协程额唯一标识

 自定义支持协程:

"""
{
1368:{}
}
"""import threading
try:
from greenlet import getcurrent as get_ident # 协程
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident # 线程class Local():
def __init__(self):
self.storage={}
self.get_ident=get_ident def set(self,k,v):
ident=self.get_ident()
origin=self.storage.get(ident)
if not origin:
origin={k:v}
else:
origin[k]=v
self.storage[ident]=origin def get(self,k):
ident=self.get_ident()
origin=self.storage.get(ident)
if not origin:
return None
return origin.get(k,None)local_values=Local()def task(num):
local_values.set('name',num)
import time
time.sleep(1)
print(local_values.get('name'),threading.current_thread().name)for i in range(20):
th=threading.Thread(target=task,args=(i,),name='线程%s'%i)
th.start()

 反射实例:

class Foo(object):
def __init__(self):
object.__setattr__(self,'storage',{}) def __setattr__(self, key, value):
self.storage={'k1:v1'}
print(key,value) def __getattr__(self, item):
print(item)
return 'df'obj=Foo()
# obj.x = 123print(obj)

 自定义支持协程的flask:使用反射实现

"""
{
1368:{}
}
"""
import flask.globals
import threading
try:
from greenlet import getcurrent as get_ident # 协程
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident # 线程class Local():
def __init__(self):
object.__setattr__(self,'__storage__',{})
object.__setattr__(self,'__ident_func__',get_ident) def __setattr__(self, name, value):
ident=self.__ident_func__()
storage=self.__storage__
try:
storage[ident][name]=value
except KeyError:
storage[ident]={name:value}
def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) def __delattr__(self, name):
try:
del self.__storage__[self.__ident_finc__()][name]
except KeyError:
raise AttributeError(name)local_values=Local()def task(num):
local_values.name=num
import time
time.sleep(1)
print(local_values.name,threading.current_thread().name)
for i in range(20):
th=threading.Thread(target=task,args=(i,),name='线程%s'%i)
th.start()

 补充:
  偏函数:functools模块

   functools.partial:创建一个新的函数,主要是为了给原函数传入参数

# import functools
#
# def func(a1,a2):
# print(a1+a2)
#
#
# new_func=functools.partial(func,12) #
# new_func(21)

  面向对象的__add__方法补充:当把面向对象中的所有__函数__实现时,对象做任何操作时,都会执行其中对应的方法。

   __add__:谁在前面就会调用谁的__add__方法

# class Foo():
# def __init__(self,num):
# self.num=num
# def __add__(self,other):
# return Foo(self.num+other.num)
# obj1=Foo(11)
# obj2=Foo(22)
# v=obj1+obj2
# print(v)

  拼接列表中的值:

   itertools.chain:传入函数,产生一个新的函数对象的列表,主要帮助我们做列表元素的拼接

    实例1:

from itertools import chain# def a1(x):
# return x+1
#
# func_list=[a1,lambda x:x+1]
#
# def a2(y):
# return y+10
#
# func_list_1=chain([a2],func_list)
# for func in func_list_1:
# print(func)

    实例2:

# v1=[1,2,3]
# v2=[4,5,6]
#
# for l in chain(v1,v2): # chain 主要是做列表的拼接
# print(l)

  上下文源码流程:https://www.processon.com/diagraming/5ab8c9f0e4b0d165d5b83fbb

 谈谈flask的上下文管理:

        - 与django相比是两种不同的实现方式。
- django/tornado是通过传参数形式
- flask是通过上下文管理
两种都可以实现,只不过试下方式不一样。
- 上下文管理:
- threading.local/Local类,其中创建了一个字典{greelet做唯一标识:存数据} 保证数据隔离
- 请求进来:
- 请求相关所有数据封装到了RequestContext中。
- 再讲RequestContext对象添加到Local中(通过LocalStack将对象添加到Local对象中)
- 使用,调用request
- 调用此类方法 request.method、print(request)、request+xxx 会执行LocalProxy中对应的方法
- 函数
- 通过LocalStack去Local中获取值。
- 请求终止
- 通过LocalStack的pop方法 Local中将值异常。

 补充:

    再将对象封装到Local中
Flask可以传入任何的字符串参数

 请求上下文:请求上下文封装的就是RequestContext对象

  request:是LocalProxy对象

   这个对象实例化事传入了一个函数,还有一个request参数
   以后执行偏函数partail(_Lookup_req_object,’request’)时,自动传递request参数
   目标:去Local中获取ctx,然后再在ctx中获取request
   ctx.push:里面做的事,将ctx通过LocalStack添加到Local中

  session:经过push之后,session里面已经有值了,seif.session帮助我获取session信息

 应用上下文:应用上下文封装的是AppContext

  AppContext里面封装了app对象
  app.context:创建了app对象
  app.app_ctx_global_class:相当于一个全局变量

  app和g

  g:每个请求周期都会创建一个用于在请求周期中传递值的一个容器。只有一次生命周期可用,因为一次请求周期后第二次就会重新创建
print(g):执行g的实例对象

 请求到来 ,有人来访问:

                # 将请求相关的数据environ封装到了RequestContext对象中
# 再讲对象封装到local中(每个线程/每个协程独立空间存储)
# ctx.app # 当前APP的名称
# ctx.request # Request对象(封装请求相关东西)
# ctx.session # 空
_request_ctx_stack.local = {
唯一标识:{
"stack":[ctx, ]
},
唯一标识:{
"stack":[ctx, ]
},
} # app_ctx = AppContext对象
# app_ctx.app
# app_ctx.g _app_ctx_stack.local = {
唯一标识:{
"stack":[app_ctx, ]
},
唯一标识:{
"stack":[app_ctx, ]
},
}

 使用:print打印request,session.g,current_app的时候,都会执行他们相对应的对象的__str__
他们之间不同的是偏函数不一样

                    from flask import request,session,g,current_app                    print(request,session,g,current_app)                    都会执行相应LocalProxy对象的 __str__                    current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session')) current_app = LocalProxy(_find_app)
g = LocalProxy(partial(_lookup_app_object, 'g'))

 终止,全部pop

 问题:

  如果他是多线程的时候是如何实现的。

            请求到来的时候就是两个Local,但是没有值,一共只有两个Local,进来一个线程创建自己的唯一标识。不过进来多少的线程,都只用这两个Local

  flask的local中保存数据时,使用列表创建出来的栈。为什么用栈?

                   - 如果写web程序,web运行环境;栈中永远保存1条数据(可以不用栈)。
- 写脚本获取app信息时,可能存在app上下文嵌套关系。
from flask import Flask,current_app,globals,_app_ctx_stack app1 = Flask('app01')
app1.debug = False # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g app2 = Flask('app02')
app2.debug = True # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g with app1.app_context():# __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
# {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438>]}}
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG']) with app2.app_context():
# {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438> ]}}
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG']) print(current_app.config['DEBUG'])

 多app应用:不仅可以使用蓝图进行分发,还可以使用app进行分发
  DispatchMiddleware:可以进行路由分发。
  在这里面没有app.run,直接可以run_simple启动

            from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index')
def index():
return "app01" @app2.route('/index2')
def index2():
return "app2" # http://www.oldboyedu.com/index
# http://www.oldboyedu.com/sec/index2
dm = DispatcherMiddleware(app1, {
'/sec': app2,
}) if __name__ == "__main__":
run_simple('localhost', 5000, dm)

 问题:web访问多app,上线问管理是如何实现的

  请求进来,为栈添加的还是一个值

 问题:为什么使用栈,离线脚本

  如果写web程序,或者web运行环境:栈中永远保存一条数据(这个可以不使用栈)
  写脚本获取app信息的时候,可能会存在上下文嵌套关系。这时有可能要用到栈

 问题:问题:Web访问多app应用时,上下文管理是如何实现?

 补充:

 遇到with就会执行__enter__方法,这个方法返回值,as后面那个值就是返回值
 当指执行完毕之后 ,自动调用类的__exit__方法
from flask import Flask,current_app,globals,_app_ctx_stackapp1 = Flask('app01')
app1.debug = False # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.gapp2 = Flask('app02')
app2.debug = True # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.gwith app1.app_context(): # # __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG'])
with app1.app_context():
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG'])
print(current_app.config['DEBUG'])with app1.app_context():
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG'])
with app1.app_context():
print(_app_ctx_stack._local.__storage__)
print(current_app.config['DEBUG'])

 补充:永远两个Local对象

flask框架基础

 实现细节:
  ResquestContext对象通过LocalStark添加到Local中
  导入的request是一个LocalProxy对象,然后在通过偏函数调用了LocakStack,在调用Local
  RequestContext的auto_pop,在执行LocalStack,再到Local中移除

十三 数据库的连接池

 前夕:

  django的常用数据库:

   ORM:django默认的数据库

   pymysql模块:导入mysql数据库,python2和python3版本都有这个模块

   MySQLdb:一样,也是导入mysql数据库,这个模块只有python2版本才有

  flask/其他:

   pymysql:导mysql数据库

   MySQLdb:导入mysql数据库

  SQLAchemy:

   是ORM的一款框架,可以导入mysql数据库(pymysql / MySQLdb)

 原生SQL:

from flask import Flaskapp = Flask(__name__)@app.route("/")
def hello():
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8') cursor = CONN.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close() print(result) return "Hello World"if __name__ == '__main__':
app.run()
from flask import Flaskapp = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8')@app.route("/")
def hello():
cursor = CONN.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
print(result)
return "Hello World"if __name__ == '__main__':
app.run()######加锁#########################
from flask import Flask
import threading
app = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8')@app.route("/")
def hello():
with threading.Lock():
cursor = CONN.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close() print(result) return "Hello World"if __name__ == '__main__':
app.run()

 问题:
 解决:

  不能为每一个用户创建一个链接

  创建一定数量的连接池,只要连接池有空的,才会进来,不然就会等着。

 使用UBDtils模块:

  下载网站:https://pypi.python.org/pypi/DBUtils

   pip install UBDtils

  如果安装到虚拟环境下面,需要先切换到虚拟环境下面

  使用:

   模式一:

    为每一个线程创建一个链接,即使县城调用close方法,也不会关闭掉

    占用连接池不放,浪费了连接池

import pymysql
from DBUtils.PersistentDB import PersistentDBPOOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8'
)def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()func()

   模式二:
    设置最大限制创建连接池的数量,进来一个线程,就创建一个连接池

    如果超过连接池的限制数量,就会在那里等着有空的连接池释放出来才能够下一个线程链接进去

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8'
)def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection() # print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()func()

  工作实用:

                         import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='pooldb',
charset='utf8'
) """
class SQLHelper(object): @staticmethod
def fetch_one(sql,args):
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
conn.close()
return result @staticmethod
def fetch_all(self,sql,args):
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
conn.close()
return result # 以后使用:
result = SQLHelper.fetch_one('select * from xxx',[])
print(result)
"""

十四 信号

 什么是信号:signal 一种处理异步时间的方法。信号是POSIX系统的信号,由硬件或软件触发,再有操作系统内核发给应用程序的中断形式。POSIX由一系列的信号集。

 什么是信号量:semaphore 一种进程同步的机制。信号量是POSIX进程间通信的工具,在它上面定义了一系列操作原语,简单地讲它可以在进程间进行通信。

 flask自己没有信号,要依赖blinker这个模块 安装 pip install blinker
 内置信号:

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否)appcontext_pushed = _signals.signal('appcontext-pushed') # 请求上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 请求上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发

 信号放哪里了:放在了signals文件里面
 signals.request_started.conncet(函数名) :注册函数
 等待请求的到来
 在视图函数执行之前触法信号的
 是怎么触发信号的?
  signals.request_started.send:方法触发信号
  send是在full_dispatch_request方法中执行的

 信号的源码流程:

 a. before_first_request
 b. 触发 request_started 信号
 c. before_request
 d. 模板渲染
渲染前的信号 before_render_template.send(app, template=template, context=context)
rv = template.render(context) # 模板渲染
渲染后的信号 template_rendered.send(app, template=template, context=context)
 e. after_request
 f. session.save_session()
 g. 触发 request_finished信号 如果上述过程出错:
触发错误处理信号 got_request_exception.send(self, exception=e) h. 触发信号 request_tearing_down 由信号引发的源码流程:找扩展点

十五 flask-session插件

 Flask中的session处理机制(内置:将session保存在加密cookie中实现)

 内置的session:

  请求刚到来:获取随机字符串,存在则去“数据库”中获取原来的个人数据,否则创建一个空容器。 –> 内存:对象(随机字符串,{放置数据的容器})

# 1. obj = 创建SecureCookieSessionInterface()
# 2. obj = open_session(self.request) = SecureCookieSession()
# self.session = SecureCookieSession()对象。
self.session = self.app.open_session(self.request)

  视图:操作内存中 对象(随机字符串,{放置数据的容器})
   响应:内存对象(随机字符串,{放置数据的容器})
  将数据保存到“数据库”
  把随机字符串写在用户cookie中。

 自定义:

  请求刚到来:

# 创建特殊字典,并添加到Local中。
# 调用关系:
# self.session_interface.open_session(self, request)
# 由于默认app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().open_session(self, request)
# 由于默认app中的session_interface=MySessionInterFace()
# MySessionInterFace().open_session(self, request)
self.session = self.app.open_session(self.request)

  调用:

from flask import Flask,sessionapp = Flask(__name__)
app.secret_key = 'suijksdfsd'import json
class MySessionInterFace(object):
def open_session(self,app,request):
return {} def save_session(self, app, session, response):
response.set_cookie('session_idfsdfsdfsdf',json.dumps(session)) def is_null_session(self, obj):
"""Checks if a given object is a null session. Null sessions are
not asked to be saved. This checks if the object is an instance of :attr:`null_session_class`
by default.
"""
return Falseapp.session_interface = MySessionInterFace()@app.route('/')
def index():
# 特殊空字典
# 在local的ctx中找到session
# 在空字典中写值
# 在空字典中获取值
session['xxx'] = 123 return 'Index'# # 一旦请求到来
# app.__call__
# app.wsgi_app
# app.session_interface
# app.open_sessionif __name__ == '__main__': app.run()

session -> LocalProxy -> 偏函数 -> LocalStack -> Local
  请求终止:

# 由于默认app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().save_session(self, app, session, response)
# 由于默认app中的session_interface=MySessionInterFace()
# MySessionInterFace().save_session(self, app, session, response)

 flask-session组件
  使用:

from flask import Flask,session
from flask_session import RedisSessionInterfaceapp = Flask(__name__)
app.secret_key = 'suijksdfsd'

   方式一

from redis import Redis
conn = Redis()
app.session_interface = RedisSessionInterface(conn,key_prefix='__',use_signer=False)

   方式二

from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='')
Session(app)
@app.route('/')
def index():
session['xxx'] = 123
return 'Index'if __name__ == '__main__':app.run()

  源码:
      流程
 问题:设置cookie时,如何设定关闭浏览器则cookie失效。
  response.set_cookie(‘k’,’v’,exipre=None)

十六 wtforms组建

 安装:pip3 install wtforms

 使用:

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgetsapp = Flask(__name__, template_folder='templates')
app.debug = Trueclass RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
) pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
) pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
) email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
) gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int # “1” “2”
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
) hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
) favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
) def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)if __name__ == '__main__':
app.run()

 源码流程:

  实现方式:

                        1. 自动生成HTML
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'} )
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
) def __iter__(self):
return iter([self.name,self.pwd])
# 方式一:
obj = LoginForm()
print(obj.name) # 调用字段的__str__
# 方式二:
obj = LoginForm()
for item in obj:
print(item) # 调用字段的__str__

  校验:

a. 后台定义好正则
b. 用户发来数据
c. 对数据进行校验

  源码实现:自动生成HTML文件
     解释:metaclass

  Metaclass作用:用来指定当前类是谁来创建的,如果不指定默认是type创建的
  类继承:只要这个指定了Metaclass,那么这个类以及他的子类都指定同一个metaclass
- MetaClass作用:用来指定当前类由谁来创建(默认type创建)。
- 使用metaclass
class Foo(metaclass=type):
pass class Foo(object):
__metaclass__ = type
- 类继承 class MyType(type):
def __init__(self,*args,**kwargs):
print('init')
super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs):
print('call本质:调用类的__new__,再调用类的__init__')
return super(MyType,self).__call__( *args, **kwargs) class Foo(metaclass=MyType):
pass class Bar(Foo):
pass obj = Bar()
- 问题:
1. 什么意思?
# 类由type来创建
class Foo(metaclass=type)
# 继承Type
class Foo(type)
2. Flask多线程:服务端开多线程

   其他方式使用:

class MyType(type):
def __init__(self,*args,**kwargs):
print('init')
super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs):
print('call')
return super(MyType,self).__call__(*args,**kwargs)# Base=MyType('Base',(object,),{}) 是有MyType创建; metaclass=MyType
#
# class Foo(Base):
# pass
#
# Foo()
# 1. type可以创建类metaclass=type;MyType也可以创建类metaclass=MyType
# 2. Base = MyType('Base', (object,), {}) -->
# class Base(metaclass=MyType):
# pass
# class Foo(Base):
# passclass Foo(MyType("Base",(object,),{})):
# 第一个是类名,第二个就是他的父类,第三个就是属性
passFoo()
class MyType(type):
def __init__(self,*args,**kwargs):
print('init')
super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs):
print('call')
return super(MyType,self).__call__(*args,**kwargs)def with_metaclass(obj):
return MyType('XX',(obj,),{})class Foo(with_metaclass(object)):
passFoo()
"""
1. 什么意思?# 类由type来创建
class Foo(metaclass=type)
# 继承Type
class Foo(type)"""class Foo(object):
pass
obj = Foo()
# 对象是由类创建# 一切皆对象,类由type创建
class Foo(object):
passFoo = type('Foo',(object,),{})# 一切皆对象,类由MyType创建
class MyType(type):
pass
Foo = MyType('Foo',(object,),{})class Foo(object,metaclass=MyType):
pass# 一切皆对象,类由MyType创建
class MyType(type):
def __init__(self, *args, **kwargs):
print('init')
super(MyType, self).__init__(*args, **kwargs) def __call__(cls, *args, **kwargs):
print('call')
return super(MyType, cls).__call__(*args, **kwargs)Foo = MyType('Foo',(object,),{})class Foo(object,metaclass=MyType):
passFoo()

  实例:form = LoginForm()

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgetsapp = Flask(__name__, template_folder='templates')app.debug = Trueclass MyForm(Form):
'''
创建字段,内部包含正则表达式
'''
name=simple.StringField(label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空'),
validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'}
)
password=simple.PasswordField(label='密码',
validators=[
validators.DataRequired(message='密码不能为空'),
validators.Length(max=18,message='用户名不能大于%(max)'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method=='GET':
form=MyForm()
return render_template('login.html',form=form)
else:
form = MyForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run()

  验证:form.validate()

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

 补充:

  为什么要在类里面定义方法:主要是为对象的数据进行二次加工

  类的对象不能直接被for循环,若想被循环,在类里面加上__iter__方法

十七 SQLAlchmey ORM框架

 目标:类/对象操作 -> SQL -> pymysql、MySQLdb -> 再在数据库中执行。

 基本使用:这个不常见,

class MyForm(Form):
'''
创建字段,内部包含正则表达式
'''
name=simple.StringField(label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空'),
validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'}
)
password=simple.PasswordField(label='密码',
validators=[
validators.DataRequired(message='密码不能为空'),
validators.Length(max=18,message='用户名不能大于%(max)'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)

  方式一:

                    engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from t1"
)
result = cursor.fetchall()
cursor.close()
conn.close()

  方式二:

                    engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
#"select * from t1"
"select sleep(2)"
)
result = cursor.fetchall()
cursor.close()
conn.close() for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()

 ORM:

                models.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base):
__tablename__ = 'users' # 数据库表名称
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列, def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) Base.metadata.create_all(engine) def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) Base.metadata.drop_all(engine) if __name__ == '__main__':
#drop_db()
#init_db() app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection
con = Connection() # ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
con.add(obj1)
# 提交事务
con.commit() # 关闭session
con.close()

 详细信息:

  创建表:

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase = declarative_base()class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now) # 3期师兄
extra = Column(Text, nullable=True) __table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'email'),
)class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))class b2g(Base):
__tablename__ = 'b2g'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)class Boy(Base):
__tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)engine = create_engine(
"mysql+pymysql://root:0410@127.0.0.1:3306/sqlachemy?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)

  对于表的增删改查:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textfrom db import Users, Hostsengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()# ################ 添加 ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)session.add_all([
Users(name="wupeiqi"),
Users(name="alex"),
Hosts(name="c1.com"),
])
session.commit()
"""# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()
"""
# ################ 修改 ################
"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
r1 = session.query(Users).all()
r2 = session.query(Users.name.label('xx'), Users.age).all()
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
"""session.close()

  查看的其他操作:

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all()# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()# 限制
ret = session.query(Users)[1:2]# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组
from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 连表ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

  原生的sql语句:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hostsengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)session.close()

  多表操作:

一对多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Personengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Hobby(caption='乒乓球'),
Hobby(caption='羽毛球'),
Person(name='张三', hobby_id=3),
Person(name='李四', hobby_id=4),
])person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飞'), Person(name='博雅')]
session.add(hb)session.commit()
"""# 使用relationship正向查询
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""# 使用relationship反向查询
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""session.close()多对多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Groupengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Server(hostname='c1.com'),
Server(hostname='c2.com'),
Group(name='A组'),
Group(name='B组'),
])
session.commit()s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()gp = Group(name='C组')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()ser = Server(hostname='c6.com')
ser.groups = [Group(name='F组'),Group(name='G组')]
session.add(ser)
session.commit()
"""# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""session.close()

  其他操作:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Groupengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""session.close()其他

  基于scoped_session实现线程安全:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Usersengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
session = scoped_session(Session)# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

  多线程执行实例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threadingfrom sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Usersengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)def task(arg):
session = Session() obj1 = Users(name="alex1")
session.add(obj1) session.commit()for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()多线程执行示例

 flask_sqlalchemy:Flask-SQLAlchemy(文件和目录的管理)

  Flask和SQLAlchemy的管理
 - db = SQLAlchemy()
- 包含配置
- 包含ORM基类
- 包含create_all
- engine
- 创建连接 # 目录结构保存好

 flask_sqlalchemy实例:https://pan.baidu.com/s/1IL68-68tBDluDtqsB1NS1g

 补充:

3. pipreqs    pip3 install pipreqs    pipreqs ./        

十八 flask_script和flask_migrate

 flask_script:功能相当于django中的manage文件,用于启动flask项目使用的,python manage.py runserver

 flask_migrate:相当于django中的数据库迁移,makemigrations/migrate -> migrate/upgrade

  批量导入:xlrd/xlwt

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
生成依赖文件:
pipreqs ./"""
from sansa import create_app,db
from flask_script import Manager
from flask_migrate import Migrate,MigrateCommandapp = create_app()
manager = Manager(app)
migrate = Migrate(app, db)"""
# 数据库迁移命名
python manage.py db init
python manage.py db migrate
python manage.py db upgrade
"""
manager.add_command('db', MigrateCommand)@manager.command
def custom(arg):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg)@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令
执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url)if __name__ == '__main__':
manager.run()"""
1. 运行程序时:
python manage.py runserver """

补充:  

 flask和django的其他导入静态文件

  flask框架基础

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,489
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,904
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,737
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,490
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,128
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,291