Python中MySQL操作封装

import pymysql
import Util


class Db:
    def __init__(self):
        self.host = Util.Config().get_conf('MYSQL_HOST')
        self.user = Util.Config().get_conf('MYSQL_USER', 'database')
        self.password = Util.Config().get_conf('MYSQL_PWD', 'database')
        self.database = Util.Config().get_conf('MYSQL_DATABASE', 'database')
        self.port = int(Util.Config().get_conf('MYSQL_PORT', 'database'))
        self.conn = None

    def connect(self):
        self.conn = pymysql.connect(
            host=self.host,
            port=self.port,
            user=self.user,
            password=self.password,
            database=self.database
        )

    def execute(self, query: str, result=False, values=None):
        self.connect()
        cursor = self.conn.cursor()
        cursor.execute(query, values)
        self.conn.commit()
        if result:
            return cursor.fetchall()

    def close(self) -> None:
        """
        关闭数据库连接
        """
        self.conn.close()

    def select(self, table: str, fields: list, where: dict = None) -> list:
        """
        查询语句
        :param table: 表名
        :param fields: 字段
        :param where: 查询条件,字典
        """
        # fields_str = str(tuple(fields)).replace("'", '').replace("(", '').replace(")", '')
        fields_str = ', '.join(fields)
        sql = f"SELECT {fields_str} FROM `{table}`"

        if where:
            where_sql = " WHERE "
            for index, (field, value) in enumerate(where.items()):
                where_sql += f"{field} = '{value}'"
                if index < len(where) - 1:
                    where_sql += " AND "
            sql += where_sql
        result = self.execute(sql, True)
        return result

    def insert(self, table: str, data_dict: dict, duplicate: dict = None) -> None:
        """
        插入语句
        :param table: 表名
        :param data_dict: 插入数据字段,字典
        :param duplicate: 存在更新字段,字典
        """

        db_fields = tuple(data_dict.keys())
        db_fields = str(db_fields).replace("'", '')
        data_values = tuple(data_dict.values())
        sql = f"INSERT INTO `{table}` {db_fields} VALUES {data_values}"

        if duplicate:
            du_sql = " ON DUPLICATE KEY UPDATE "
            for index, (field, value) in enumerate(duplicate.items()):
                du_sql += f"{field} = '{value}'"
                if index < len(duplicate) - 1:
                    du_sql += ", "
            sql += du_sql

        # 当仅有一个数据时替换掉元组中括号后的逗号
        if len(data_dict) == 1:
            sql = sql.replace(",", "")

        self.execute(sql)

    def update(self, table: str, data_dict: dict, where: dict) -> None:
        """
        更新语句
        :param table: 表名
        :param data_dict: 更新数据,字段
        :param where: 更新条件,字段
        """
        where_sql = " WHERE "
        for index, (field, value) in enumerate(where.items()):
            where_sql += f"{field} = '{value}'"
            if index < len(where) - 1:
                where_sql += " AND "

        set_sql = " SET "
        for index, (field, value) in enumerate(data_dict.items()):
            set_sql += f"{field} = '{value}'"
            if index < len(data_dict) - 1:
                set_sql += ", "

        sql = f"UPDATE `{table}` {set_sql} {where_sql}"

        self.execute(sql)


if __name__ == '__main__':
    Db()
消息盒子
# 您需要首次评论以获取消息 #
# 您需要首次评论以获取消息 #

只显示最新10条未读和已读信息