只封装了select选择数据和insert插入数据:
python展开代码# encoding: utf-8
import traceback
import pymysql
import pandas as pd
class easy_pymysql():
    # 初始化
    def __init__(self, host, user, passwd):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.conn = None
        self.cursor = None
    def connect(self):
        self.conn = pymysql.connect(host=self.host,
                                    user=self.user,
                                    password=self.passwd,
                                    # database=self.dbName,
                                    use_unicode=True,
                                    charset='utf8')
        self.cursor = self.conn.cursor()
    def close(self):
        self.cursor.close()
        self.conn.close()
    def select_data(self, sql):
        result = None
        self.connect()
        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchall()
            result = pd.DataFrame(result)
        except:
            traceback.print_exc()
            self.conn.rollback()
        self.close()
        return result
    def insert(self, df: pd.DataFrame, tablename: str):
        df = df.to_dict(orient="split")
        tup_col = ", ".join(df['columns'])
        datas = str([tuple(i) for i in df['data']])[1:-1]
        sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}"
        self.connect()
        try:
            self.cursor.execute(sql)
            self.conn.commit()
        except:
            traceback.print_exc()
            self.conn.rollback()
        self.close()
    def insert_table(self, df: pd.DataFrame, tablename: str):
        if len(df) <= 20000:
            self.insert(df, tablename)
        else:
            n = len(df) // 20000
            for i in range(n):
                df_sub = df[20000 * i:(20000 * (i + 1))]
                self.insert(df_sub, tablename)
            df_sub = df[(20000 * (i + 1)):]
            self.insert(df_sub, tablename)
        print(f"finish transfer mysql {len(df)}")
    def update_table(self, df: pd.DataFrame, tablename: str):
        self.connect()
        try:
            idchname, upchname = list(df.columns)
            print(idchname, upchname)
            id_list = list(df[idchname])
            upch_list = list(df[upchname])
            for id_, upch_ in zip(id_list, upch_list):
                sql = f"UPDATE {tablename} SET {upchname}={upch_} WHERE {idchname}={id_}"
                self.cursor.execute(sql)
            self.conn.commit()
        except:
            traceback.print_exc()
            self.conn.rollback()
        self.close()
只封装了select选择数据和insert插入数据:
python展开代码# encoding: utf-8
from clickhouse_driver.client import Client
import pandas as pd
class easy_clickhouse():
    # 初始化
    # def __init__(self, host, user="", passwd=""):
    def __init__(self, host):
        self.host = host
        self.client = Client(host=self.host)
    def select_data(self, sql):
        result = self.client.execute(sql)
        if len(result) == 0:
            return None
        else:
            result = pd.DataFrame(result)
        return result
    def insert(self, df: pd.DataFrame, tablename: str):
        df = df.to_dict(orient="split")
        tup_col = ", ".join(df['columns'])
        datas = str([tuple(i) for i in df['data']])[1:-1]
        sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}"
        self.client.execute(sql)
    def insert_table(self, df: pd.DataFrame, tablename: str):
        if len(df) <= 20000:
            self.insert(df, tablename)
        else:
            n = len(df) // 20000
            for i in range(n):
                df_sub = df[20000 * i:(20000 * (i + 1))]
                self.insert(df_sub, tablename)
            df_sub = df[(20000 * (i + 1)):]
            self.insert(df_sub, tablename)
        print(f"finish transfer clickhouse {len(df)}")
python展开代码def insert_db_table(datadict_list):
    '''
    datadict_list是字典列表  每个字典是一条记录
    开发环境的
    '''
    host = '10.10.90.11'
    table = "tbs"
    sql = "insert into %s " % (table)
    c = clientclickhouse(host=host)
    sum1 = 0
    for datadict in datadict_list:
        result = datadict  # 字典
        names = ["%s" % x for x in result.keys()]
        names = ",".join(names)
        if sum1 == 0:
            sql = sql + "(%s) values " % (names)
        values = ["'%s'" % x if isinstance(x, str) else "{}".format(x) for x in result.values()]
        values = ",".join(values)
        sql = sql + "(%s)," % (values)
        sum1 = sum1 + 1
    sql_str = sql[0:-1]
    c.execute(sql_str)


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!