使用 python 从 hive 读取数据时的性能问题

2024-05-04

我在 hive 中有一个表,其中包含 351 837(110 MB 大小)记录,我正在使用 python 读取该表并写入 sql server。

在此过程中,从 hive 读取数据到 pandas dataframe 需要很长时间。当我加载整个记录(351k)时,需要 90 分钟。

为了改进,我采用了以下方法,例如从 hive 读取 10k 行一次并写入 sql server。但是从 hive 读取 10k 行并将其分配给 Dataframe 一次就需要 4-5 分钟的时间。

def execute_hadoop_export():
       """
       This will run the steps required for a Hadoop Export.  
       Return Values is boolean for success fail
       """
       try:

           hql='select * from db.table '
           # Open Hive ODBC Connection
           src_conn = pyodbc.connect("DSN=****",autocommit=True)
           cursor=src_conn.cursor()
           #tgt_conn = pyodbc.connect(target_connection)

           # Using SQLAlchemy to dynamically generate query and leverage dataframe.to_sql to write to sql server...
           sql_conn_url = urllib.quote_plus('DRIVER={ODBC Driver 13 for SQL Server};SERVER=Xyz;DATABASE=Db2;UID=ee;PWD=*****')
           sql_conn_str = "mssql+pyodbc:///?odbc_connect={0}".format(sql_conn_url)
           engine = sqlalchemy.create_engine(sql_conn_str)
           # read source table.
           vstart=datetime.datetime.now()
           for df in pandas.read_sql(hql, src_conn,chunksize=10000):

               vfinish=datetime.datetime.now()

               print 'Finished 10k rows reading from hive and it took', (vfinish-vstart).seconds/60.0,' minutes'
           # Get connection string for target from Ctrl.Connnection

               df.to_sql(name='table', schema='dbo', con=engine, chunksize=10000, if_exists="append", index=False) 
               print 'Finished 10k rows writing into sql server and it took', (datetime.datetime.now()-vfinish).seconds/60.0, ' minutes'
               vstart=datetime.datetime.now()
           cursor.Close()


       except Exception, e:
           print str(e)

output:

在Python中读取Hive表数据最快的方法是什么?

Update蜂巢表结构

CREATE TABLE `table1`(
  `policynumber` varchar(15), 
  `unitidentifier` int, 
  `unitvin` varchar(150), 
  `unitdescription` varchar(100), 
  `unitmodelyear` varchar(4), 
  `unitpremium` decimal(18,2), 
  `garagelocation` varchar(150), 
  `garagestate` varchar(50), 
  `bodilyinjuryoccurrence` decimal(18,2), 
  `bodilyinjuryaggregate` decimal(18,2), 
  `bodilyinjurypremium` decimal(18,2), 
  `propertydamagelimits` decimal(18,2), 
  `propertydamagepremium` decimal(18,2), 
  `medicallimits` decimal(18,2), 
  `medicalpremium` decimal(18,2), 
  `uninsuredmotoristoccurrence` decimal(18,2), 
  `uninsuredmotoristaggregate` decimal(18,2), 
  `uninsuredmotoristpremium` decimal(18,2), 
  `underinsuredmotoristoccurrence` decimal(18,2), 
  `underinsuredmotoristaggregate` decimal(18,2), 
  `underinsuredmotoristpremium` decimal(18,2), 
  `umpdoccurrence` decimal(18,2), 
  `umpddeductible` decimal(18,2), 
  `umpdpremium` decimal(18,2), 
  `comprehensivedeductible` decimal(18,2), 
  `comprehensivepremium` decimal(18,2), 
  `collisiondeductible` decimal(18,2), 
  `collisionpremium` decimal(18,2), 
  `emergencyroadservicepremium` decimal(18,2), 
  `autohomecredit` tinyint, 
  `lossfreecredit` tinyint, 
  `multipleautopoliciescredit` tinyint, 
  `hybridcredit` tinyint, 
  `goodstudentcredit` tinyint, 
  `multipleautocredit` tinyint, 
  `fortyfivepluscredit` tinyint, 
  `passiverestraintcredit` tinyint, 
  `defensivedrivercredit` tinyint, 
  `antitheftcredit` tinyint, 
  `antilockbrakescredit` tinyint, 
  `perkcredit` tinyint, 
  `plantype` varchar(100), 
  `costnew` decimal(18,2), 
  `isnocontinuousinsurancesurcharge` tinyint)
CLUSTERED BY ( 
  policynumber, 
  unitidentifier) 
INTO 50 BUCKETS

注意:我也尝试过使用 sqoop 导出选项,但我的配置单元表已经采用分桶格式。


我尝试过多重处理,可以将 2 小时缩短 8-10 分钟。请找到下面的脚本。

from multiprocessing import Pool
import pandas as pd
import datetime
from query import hivetable
from write_tosql import write_to_sql
p = Pool(37)
lst=[]
#we have 351k rows so generating series to use in hivetable method
for i in range(1,360000,10000):
    lst.append(i)
print 'started reading ',datetime.datetime.now()
#we have 40 cores in  cluster 
p = Pool(37)
s=p.map(hivetable, [i for i in lst])
s_df=pd.concat(s)
print 'finished reading ',datetime.datetime.now()
print 'Started writing to sql server ',datetime.datetime.now()
write_to_sql(s_df)
print 'Finished writing to sql server ',datetime.datetime.now()

---------query.py文件--------

import pyodbc
from multiprocessing import Pool
from functools import partial
import pandas as pd

conn = pyodbc.connect("DSN=******",autocommit=True)

def hivetable(row):
    query = 'select * from (select row_number() OVER (order by policynumber) as rownum, * from dbg.tble ) tbl1 where rownum between '+str(row) +' and '+str(row+9999)+';'
    result = pd.read_sql(query,conn)
    return result

---------Write_tosql.py文件---------

import sqlalchemy
import urllib
import pyodbc
def write_to_sql(s_df):
    sql_conn_url = urllib.quote_plus('DRIVER={ODBC Driver 13 for SQL Server};SERVER=ser;DATABASE=db;UID=sqoop;PWD=#####;')
    sql_conn_str = "mssql+pyodbc:///?odbc_connect={0}".format(sql_conn_url)
    engine = sqlalchemy.create_engine(sql_conn_str)
    s_df.rename(columns=lambda x: remove_table_alias(x), inplace=True)
    s_df.to_sql(name='tbl2', schema='dbo', con=engine, chunksize=10000, if_exists="append", index=False)
def remove_table_alias(columnName):
    try:
        if(columnName.find(".") != -1):
            return columnName.split(".")[1]
        return columnName
    except Exception, e:
        print "ERROR in _remove_table_alias ",str(e)

任何其他解决方案都会帮助我缩短时间。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 python 从 hive 读取数据时的性能问题 的相关文章

  • 类属性在功能上依赖于其他类属性

    我正在尝试使用静态类属性来定义另一个静态类属性 我认为可以通过以下代码来实现 f lambda s s 1 class A foo foo bar f A foo 然而 这导致NameError name A is not defined
  • NLTK、搭配问题:需要解包的值太多(预期为 2)

    我尝试使用 NLTK 检索搭配 但出现错误 我使用内置的古腾堡语料库 I wrote alice nltk corpus gutenberg fileids 7 al nltk corpus gutenberg words alice al
  • python中函数变量的作用域

    假设我们有两个函数 def ftpConnect ftp FTP server ftp login ftp cwd path def getFileList ftpConnect files ftp nlst print files 如果我
  • 使用 genfromtxt 导入 numpy 中缺失值的 csv 数据

    我有一个 csv 文件 看起来像这样 实际文件有更多的列和行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 假设文件的名称是info csv如果我尝试使用导入它 data numpy genfromtxt i
  • Pandas:根据列名进行列的成对乘法

    我有以下数据框 gt gt gt df pd DataFrame ap1 X 1 2 3 4 as1 X 1 2 3 4 ap2 X 2 2 2 2 as2 X 3 3 3 3 gt gt gt df ap1 X as1 X ap2 X a
  • 使用 dict 在数据框中查找行

    df pd DataFrame a 1 2 3 b 4 5 6 produces a b 0 1 4 1 2 5 2 3 6 给定一个字典 d a 2 b 5 我将如何提取数据帧中字典的键值与所有列值匹配的行 所以在这种情况下 a b 1
  • Python While 循环,and (&) 运算符不起作用

    我正在努力寻找最大公因数 我写了一个糟糕的 运算密集型 算法 它将较低的值减一 使用 检查它是否均匀地划分了分子和分母 如果是 则退出程序 但是 我的 while 循环没有使用 and 运算符 因此一旦分子可整除 它就会停止 即使它不是正确
  • Python unicode 字符代码?

    有没有办法将 Unicode 字符 插入 Python 3 中的字符串 例如 gt gt gt import unicode gt gt gt string This is a full block s unicode charcode U
  • FastText - 由于 C++ 扩展未能分配内存,无法加载 model.bin

    我正在尝试使用 FastText Python APIhttps pypi python org pypi fasttext https pypi python org pypi fasttext虽然 据我所知 此 API 无法加载较新的
  • python中的sys.stdin.fileno()是什么

    如果这是非常基本的或之前已经问过的 我很抱歉 我用谷歌搜索但找不到简单且令人满意的解释 我想知道什么sys stdin fileno is 我在代码中看到了它 但不明白它的作用 这是实际的代码块 fileno sys stdin filen
  • WindowsError:[错误 5] 访问被拒绝

    我一直在尝试终止一个进程 但我的所有选项都给出了 Windows 访问被拒绝错误 我通过以下方式打开进程 一个python脚本 test subprocess Popen sys executable testsc py 我想杀死那个进程
  • 最佳实践 - 存储过程日志记录

    如果您有一个长时间运行的 SP 您会以某种方式记录其操作还是只是等待此消息 命令成功完成 我认为 关于这个主题可以有很多解决方案 但是有没有最佳实践 一个经常使用的简单解决方案 EDIT 我发现了一个关于这个主题的有趣链接 http web
  • 如何在单独的文件中使用 FastAPI Depends 作为端点/路由?

    我在单独的文件中定义了一个 Websocket 端点 例如 from starlette endpoints import WebSocketEndpoint from connection service import Connectio
  • Google App Engine 中的自定义身份验证

    有谁知道或知道我可以在哪里学习如何使用 Python 和 Google App Engine 创建自定义身份验证流程 我不想使用 Google 帐户进行身份验证 并且希望能够创建自己的用户 如果不是专门针对 Google App Engin
  • 从 dask 数据框中的日期时间序列获取年份和星期?

    如果我有一个 Pandas 数据框和一个日期时间类型的列 我可以按如下方式获取年份 df year df date dt year 对于 dask 数据框 这是行不通的 如果我先计算 像这样 df year df date compute
  • 将 Scikit-Learn OneHotEncoder 与 Pandas DataFrame 结合使用

    我正在尝试使用 Scikit Learn 的 OneHotEncoder 将 Pandas DataFrame 中包含字符串的列替换为 one hot 编码的等效项 我的下面的代码不起作用 from sklearn preprocessin
  • 从时间序列生成日期特征

    我有一个数据框 其中包含如下列 Date temp data holiday day 01 01 2000 10000 0 1 02 01 2000 0 1 2 03 01 2000 2000 0 3 30 01 2000 200 0 30
  • 如何使用 Django (Python) 登录表单?

    我在 Django 中构建了一个登录表单 现在我遇到了路由问题 当我选择登录按钮时 表单不会发送正确的遮阳篷 我认为前端的表单无法从 查看 py 文件 所以它不会发送任何 awnser 并且登录过程无法工作 该表单是一个简单的静态 html
  • 将此 MATLAB 代码转换为 Python 时我做错了什么?

    我正在努力将生成波形的 MATLAB 代码转换为 Python 就上下文而言 这是原子力显微镜带激发响应的模拟 与代码错误无关 在 MATLAB 中从 r vec 生成的图形与我在 Python 中生成的图形不同 我是否正确地将 MATLA
  • 使用 numpy 加速 for 循环

    下一个 for 循环如何使用 numpy 获得加速 我想这里可以使用一些奇特的索引技巧 但我不知道是哪一个 这里可以使用 einsum 吗 a 0 for i in range len b a numpy mean C d e f b i

随机推荐