(编辑:jimmy 日期: 2025/11/4 浏览:2)
本篇博文主要介绍Python连接各种数据库的方法及简单使用
包括关系数据库:sqlite,mysql,mssql
非关系数据库:MongoDB,Redis
代码写的比较清楚,直接上代码
1.连接sqlite
# coding=utf-8
# http://www.runoob.com/sqlite/sqlite-python.html
import sqlite3
import traceback
try:
 # 如果表不存在,就创建
 with sqlite3.connect('test.db') as conn:
  print("Opened database successfully")
  # 删除表
  conn.execute("DROP TABLE IF EXISTS COMPANY")
  # 创建表
  sql = """
     CREATE TABLE IF NOT EXISTS COMPANY
    (ID INTEGER PRIMARY KEY  AUTOINCREMENT,
    NAME   TEXT NOT NULL,
    AGE   INT  NOT NULL,
    ADDRESS  CHAR(50),
    SALARY   REAL);
  """
  conn.execute(sql)
  print("create table successfully")
  # 添加数据
  conn.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES (",
       [('Paul', 32, 'California', 20000.00),
       ('Allen', 25, 'Texas', 15000.00),
       ('Teddy', 23, 'Norway', 20000.00),
       ('Mark', 25, 'Rich-Mond ', 65000.00),
       ('David', 27, 'Texas', 85000.00),
       ('Kim', 22, 'South-Hall', 45000.00),
       ('James', 24, 'Houston', 10000.00)])
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ( 'Paul', 32, 'California', 20000.00 )")
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ('Allen', 25, 'Texas', 15000.00 )")
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ('Teddy', 23, 'Norway', 20000.00 )")
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ( 'Mark', 25, 'Rich-Mond ', 65000.00 )")
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ( 'David', 27, 'Texas', 85000.00 )");
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ( 'Kim', 22, 'South-Hall', 45000.00 )")
  #
  # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)  # VALUES ( 'James', 24, 'Houston', 10000.00 )")
  # 提交,否则重新运行程序时,表中无数据
  conn.commit()
  print("insert successfully")
  # 查询表
  sql = """
   select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
   """
  result = conn.execute(sql)
  for row in result:
   print("-" * 50) # 输出50个-,作为分界线
   print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
   print("%-10s %s" % ("name", row[1]))
   print("%-10s %s" % ("age", row[2]))
   print("%-10s %s" % ("address", row[3]))
   print("%-10s %.2f" % ("salary", row[4]))
   # or
   # print('{:10s} {:.2f}'.format("salary", row[4]))
except sqlite3.Error as e:
 print("sqlite3 Error:", e)
 traceback.print_exc()
2.连接mysql
2.1使用mysqldb库中的_mysql
#! /usr/bin/env python2.7
# coding=utf-8
# Created by xiaosanyu at 16/5/30
# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/
import MySQLdb
from contextlib import closing
import traceback
try:
 # 获取一个数据库连接
 with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
  print("connect database successfully")
  with closing(conn.cursor()) as cur:
   # 删除表
   cur.execute("DROP TABLE IF EXISTS COMPANY")
   # 创建表
   sql = """
      CREATE TABLE IF NOT EXISTS COMPANY
     (ID INTEGER PRIMARY KEY NOT NULL auto_increment,
     NAME   TEXT NOT NULL,
     AGE   INT  NOT NULL,
     ADDRESS  CHAR(50),
     SALARY   REAL);
   """
   cur.execute(sql)
   print("create table successfully")
   # 添加数据
   # 在一个conn.execute里面里面执行多个sql语句是非法的
   cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
       [('Paul', 32, 'California', 20000.00),
        ('Allen', 25, 'Texas', 15000.00),
        ('Teddy', 23, 'Norway', 20000.00),
        ('Mark', 25, 'Rich-Mond ', 65000.00),
        ('David', 27, 'Texas', 85000.00),
        ('Kim', 22, 'South-Hall', 45000.00),
        ('James', 24, 'Houston', 10000.00)])
   # 提交,否则重新运行程序时,表中无数据
   conn.commit()
   print("insert successfully")
   # 查询表
   sql = """
    select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
    """
   cur.execute(sql)
   for row in cur.fetchall():
    print("-" * 50) # 输出50个-,作为分界线
    print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row[1]))
    print("%-10s %s" % ("age", row[2]))
    print("%-10s %s" % ("address", row[3]))
    print("%-10s %s" % ("salary", row[4]))
except MySQLdb.Error as e:
 print("Mysql Error:", e)
 traceback.print_exc() # 打印错误栈信息
2.2 使用MySQLdb
#! /usr/bin/env python2.7
# coding=utf-8
# Created by xiaosanyu at 16/5/30
# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/
import MySQLdb
from contextlib import closing
import traceback
try:
 # 获取一个数据库连接
 with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
  print("connect database successfully")
  with closing(conn.cursor()) as cur:
   # 删除表
   cur.execute("DROP TABLE IF EXISTS COMPANY")
   # 创建表
   sql = """
      CREATE TABLE IF NOT EXISTS COMPANY
     (ID INTEGER PRIMARY KEY NOT NULL auto_increment,
     NAME   TEXT NOT NULL,
     AGE   INT  NOT NULL,
     ADDRESS  CHAR(50),
     SALARY   REAL);
   """
   cur.execute(sql)
   print("create table successfully")
   # 添加数据
   # 在一个conn.execute里面里面执行多个sql语句是非法的
   cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
       [('Paul', 32, 'California', 20000.00),
        ('Allen', 25, 'Texas', 15000.00),
        ('Teddy', 23, 'Norway', 20000.00),
        ('Mark', 25, 'Rich-Mond ', 65000.00),
        ('David', 27, 'Texas', 85000.00),
        ('Kim', 22, 'South-Hall', 45000.00),
        ('James', 24, 'Houston', 10000.00)])
   # 提交,否则重新运行程序时,表中无数据
   conn.commit()
   print("insert successfully")
   # 查询表
   sql = """
    select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
    """
   cur.execute(sql)
   for row in cur.fetchall():
    print("-" * 50) # 输出50个-,作为分界线
    print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row[1]))
    print("%-10s %s" % ("age", row[2]))
    print("%-10s %s" % ("address", row[3]))
    print("%-10s %s" % ("salary", row[4]))
except MySQLdb.Error as e:
 print("Mysql Error:", e)
 traceback.print_exc() # 打印错误栈信息
2.3使用pymysql
2.1和2.2节使用MySQLdb,不支持Python3.x
pymysql对Python2.x和Python3.x的支持都比较好
# Created by xiaosanyu at 16/5/30
# coding=utf-8
# https://github.com/PyMySQL/PyMySQL/
import pymysql
from contextlib import closing
import traceback
try:
 # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭
 # with 嵌套上一层的with 要使用closing()
 with closing(pymysql.connect(host='localhost', user='root', passwd='root', db='test', port=3306,
         charset='utf8')) as conn:
  print("connect database successfully")
  # 获取游标,with关键字 表示退出时,cur自动关闭
  with conn.cursor() as cur:
   # 删除表
   cur.execute("DROP TABLE IF EXISTS COMPANY")
   # 创建表
   sql = """
      CREATE TABLE IF NOT EXISTS COMPANY
     (ID INTEGER PRIMARY KEY NOT NULL auto_increment,
     NAME   TEXT NOT NULL,
     AGE   INT  NOT NULL,
     ADDRESS  CHAR(50),
     SALARY   REAL);
   """
   cur.execute(sql)
   print("create table successfully")
   # 添加数据
   # 在一个conn.execute里面里面执行多个sql语句是非法的
   cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
       [('Paul', 32, 'California', 20000.00),
        ('Allen', 25, 'Texas', 15000.00),
        ('Teddy', 23, 'Norway', 20000.00),
        ('Mark', 25, 'Rich-Mond ', 65000.00),
        ('David', 27, 'Texas', 85000.00),
        ('Kim', 22, 'South-Hall', 45000.00),
        ('James', 24, 'Houston', 10000.00)])
   # 提交,否则重新运行程序时,表中无数据
   conn.commit()
   print("insert successfully")
   # 查询表
   sql = """
    select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
    """
   cur.execute(sql)
   for row in cur.fetchall():
    print("-" * 50) # 输出50个-,作为分界线
    print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row[1]))
    print("%-10s %s" % ("age", row[2]))
    print("%-10s %s" % ("address", row[3]))
    print("%-10s %s" % ("salary", row[4]))
except pymysql.Error as e:
 print("Mysql Error:", e)
 traceback.print_exc()
3.连接mssql
# Created by xiaosanyu at 16/5/30
# http://www.pymssql.org/en/latest/
import pymssql
from contextlib import closing
try:
 # 先要保证数据库中有test数据库
 # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭
 # with 嵌套上一层的with 要使用closing()
 with closing(pymssql.connect(host='192.168.100.114', user='sa', password='sa12345', database='test', port=1433,
         charset='utf8')) as conn:
  print("connect database successfully")
  # 获取游标,with关键字 表示退出时,cur自动关闭
  with conn.cursor() as cur:
   # 删除表
   cur.execute(
     '''if exists (select 1 from sys.objects where name='COMPANY' and type='U') drop table COMPANY''')
   # 创建表
   sql = """
      CREATE TABLE COMPANY
     (ID INT IDENTITY(1,1) PRIMARY KEY NOT NULL ,
     NAME   TEXT NOT NULL,
     AGE   INT  NOT NULL,
     ADDRESS  CHAR(50),
     SALARY   REAL);
   """
   cur.execute(sql)
   print("create table successfully")
   # 添加数据
   # 在一个conn.execute里面里面执行多个sql语句是非法的
   cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
       [('Paul', 32, 'California', 20000.00),
        ('Allen', 25, 'Texas', 15000.00),
        ('Teddy', 23, 'Norway', 20000.00),
        ('Mark', 25, 'Rich-Mond', 65000.00),
        ('David', 27, 'Texas', 85000.00),
        ('Kim', 22, 'South-Hall', 45000.00),
        ('James', 24, 'Houston', 10000.00)])
   # 提交,否则重新运行程序时,表中无数据
   conn.commit()
   print("insert successfully")
   # 查询表
   sql = """
    select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
    """
   cur.execute(sql)
   for row in cur.fetchall():
    print("-" * 50) # 输出50个-,作为分界线
    print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row[1]))
    print("%-10s %s" % ("age", row[2]))
    print("%-10s %s" % ("address", row[3]))
    print("%-10s %s" % ("salary", row[4]))
except pymssql.Error as e:
 print("mssql Error:", e)
 # traceback.print_exc()
4.连接MongoDB
# Created by xiaosanyu at 16/5/30
# https://docs.mongodb.com/ecosystem/drivers/python/
# https://pypi.python.org/pypi/pymongo/
import pymongo
from pymongo.mongo_client import MongoClient
import pymongo.errors
import traceback
try:
 # 连接到 mongodb 服务
 mongoClient = MongoClient('localhost', 27017)
 # 连接到数据库
 mongoDatabase = mongoClient.test
 print("connect database successfully")
 # 获取集合
 mongoCollection = mongoDatabase.COMPANY
 # 移除所有数据
 mongoCollection.remove()
 # 添加数据
 mongoCollection.insert_many([{"Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
         {"Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
         {"Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"},
         {"Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"},
         {"Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"},
         {"Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"},
         {"Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}, ])
 #获取集合中的值
 for row in mongoCollection.find():
  print("-" * 50) # 输出50个-,作为分界线
  print("%-10s %s" % ("_id", row['_id'])) # 字段名固定10位宽度,并且左对齐
  print("%-10s %s" % ("name", row['Name']))
  print("%-10s %s" % ("age", row['Age']))
  print("%-10s %s" % ("address", row['Address']))
  print("%-10s %s" % ("salary", row['Salary']))
 print('\n\n\n')
 # 使id自增
 mongoCollection.remove()
 # 创建计数表
 mongoDatabase.counters.save({"_id": "people_id", "sequence_value": 0})
 # 创建存储过程
 mongoDatabase.system_js.getSequenceValue = '''function getSequenceValue(sequenceName){
   var sequenceDocument = db.counters.findAndModify({
    query: {_id: sequenceName},
    update: {$inc:{sequence_value: 1}},
    new:true
   });
   return sequenceDocument.sequence_value;
  }'''
 mongoCollection.insert_many(
   [{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Paul", "Age": "32",
    "Address": "California", "Salary": "20000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Allen", "Age": "25",
    "Address": "Texas", "Salary": "15000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Teddy", "Age": "23",
    "Address": "Norway", "Salary": "20000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Mark", "Age": "25",
    "Address": "Rich-Mond", "Salary": "65000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "David", "Age": "27",
    "Address": "Texas", "Salary": "85000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Kim", "Age": "22",
    "Address": "South-Hall", "Salary": "45000.00"},
    {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "James", "Age": "24",
    "Address": "Houston", "Salary": "10000.00"}, ])
 for row in mongoCollection.find():
  print("-" * 50) # 输出50个-,作为分界线
  print("%-10s %s" % ("_id", int(row['_id']))) # 字段名固定10位宽度,并且左对齐
  print("%-10s %s" % ("name", row['Name']))
  print("%-10s %s" % ("age", row['Age']))
  print("%-10s %s" % ("address", row['Address']))
  print("%-10s %s" % ("salary", row['Salary']))
except pymongo.errors.PyMongoError as e:
 print("mongo Error:", e)
 traceback.print_exc()
5.连接Redis
5.1使用redis
# coding=utf-8
# Created by xiaosanyu at 16/5/31
# https://pypi.python.org/pypi/redis/2.10.5
# http://redis-py.readthedocs.io/en/latest/#
import redis
r = redis.Redis(host='localhost', port=6379, db=0, password="12345")
print("connect", r.ping())
# 看信息
info = r.info()
# or 查看部分信息
# info = r.info("Server")
# 输出信息
items = info.items()
for i, (key, value) in enumerate(items):
 print("item %s----%s:%s" % (i, key, value))
# 删除键和对应的值
r.delete("company")
# 可以一次性push一条或多条数据
r.rpush("company", {"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
  {"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
  {"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"})
r.rpush("company", {"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"})
r.rpush("company", {"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"})
r.rpush("company", {"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"})
r.rpush("company", {"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"})
# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
 print("-" * 50) # 输出50个-,作为分界线
 print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐
 print("%-10s %s" % ("name", row['Name']))
 print("%-10s %s" % ("age", row['Age']))
 print("%-10s %s" % ("address", row['Address']))
 print("%-10s %s" % ("salary", row['Salary']))
# 关闭当前连接
# r.shutdown() #这个是关闭redis服务端
5.2使用pyredis
# Created by xiaosanyu at 16/5/30
# http://pyredis.readthedocs.io/en/latest/
import pyredis
r = pyredis.Client(host='localhost', port=6379, database=0, password="12345")
print("connect", r.ping().decode("utf-8"))
# 看信息
# info = r.execute("info").decode()
# or 查看部分信息
info = r.execute("info", "Server").decode()
# 输出信息
print(info)
# 删除键和对应的值
r.delete("company")
# 可以一次性push一条或多条数据
r.rpush("company", '''{"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}''',
  '''{"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}''',
  '''{"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}''')
r.rpush("company", '''{"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}''')
r.rpush("company", '''{"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}''')
r.rpush("company", '''{"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}''')
r.rpush("company", '''{"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}''')
# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
 print("-" * 50) # 输出50个-,作为分界线
 print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐
 print("%-10s %s" % ("name", row['Name']))
 print("%-10s %s" % ("age", row['Age']))
 print("%-10s %s" % ("address", row['Address']))
 print("%-10s %s" % ("salary", row['Salary']))
# 关闭当前连接
r.close()
代码下载:python_connect_database
以上这篇python 连接各类主流数据库的实例代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。