(编辑:jimmy 日期: 2024/11/12 浏览:2)
这篇文章主要介绍了Python实现自定义读写分离代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
思路
from flask import Flask from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state from sqlalchemy import orm class RoutingSession(SignallingSession): def get_bind(self, mapper=None, clause=None): state = get_state(self.app) # 判断读写操作 if self._flushing: # 写操作 ,使用主数据库 print("写入数据") return state.db.get_engine(self.app, bind='master') else: # 读操作, 使用从数据库 print('读取数据') return state.db.get_engine(self.app, bind='slave') class RoutingSQLAlchemy(SQLAlchemy): def create_session(self, options): return orm.sessionmaker(class_=RoutingSession, db=self, **options) app = Flask(__name__) # 设置数据库的连接地址 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.134:3306/demo' # 设置数据库的绑定地址 app.config['SQLALCHEMY_BINDS'] = { 'master': "mysql://root:mysql@192.168.105.134:3306/demo", 'slave': "mysql://root:mysql@192.168.105.134:8306/demo" } # 设置是否追踪数据库变化 一般不会开启, 影响性能 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 设置是否打印底层执行的SQL语句 app.config['SQLALCHEMY_ECHO'] = False # 创建数据库连接对象 db = RoutingSQLAlchemy(app) # 用户表 一 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20), unique=True) @app.route('/') def index(): # 增加数据 user1 = User(name='zs') db.session.add(user1) db.session.commit() # 查询数据 users = User.query.all() print(users) return "index" if __name__ == '__main__': # 删除所有继承自db.Model的表 db.drop_all() # 创建所有继承自db.Model的表 db.create_all() app.run(debug=True)
不太好,自动选择不能控制
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。