使用 SQLAlchemy 表达式时 Dask read_sql_table 出错

2023-12-03

我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 结合使用,以获取通过连接和过滤几个不同表创建的数据集。这文档表明这应该是可能的。

(下面的示例不包含任何联接,因为不需要它们来复制问题。)

我构建连接字符串,创建一个 SQLAlchemy 引擎和与数据库中的表相对应的表。 (我正在使用 PostgreSQL。)

import dask.dataframe as dd
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.sql import  select


username = 'username'
password = 'password'
server = 'prod'
database = 'my_db'

connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'

engine = create_engine(connection_string)

metadata = MetaData()

t = Table('my_table', metadata,
    Column('id'),
    schema='my_schema')

我能够构建一个选择并将其与 SQLAlchemy 一起使用,没有任何问题

>>> s = select([t]).limit(5)
>>> rp = engine.execute(s)
>>> rp.fetchall()

[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]

我还可以将 SQLAlchey 选择提供给 panda 的 read_sql,效果很好

>>> pd.read_sql(s, connection_string)

id
0   3140757
1   3118225
2   3156070
3   3193075
4   3114614

但是,当我将相同的选择传递给 dask 时,我收到一个编程错误。它表明 dask 正在转身并调用 pandas.read_sql,因此您会认为它应该起作用,但显然有些东西不起作用。

>>> dd.read_sql_table(s, connection_string, index_col='id')

---------------------------------------------------------------------------
ProgrammingError                          Traceback (most recent call last)
C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    508     def do_execute(self, cursor, statement, parameters, context=None):
--> 509         cursor.execute(statement, parameters)
    510 

ProgrammingError: subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id 
             ^
HINT:  For example, FROM (SELECT ...) [AS] foo.


The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
<ipython-input-5-0db95e60f442> in <module>
----> 1 dd.read_sql_table(s, connection_string, index_col='id')

C:\miniconda3\envs\my_env\lib\site-packages\dask\dataframe\io\sql.py in read_sql_table(table, uri, index_col, divisions, npartitions, limits, columns, bytes_per_chunk, head_rows, schema, meta, engine_kwargs, **kwargs)
    116         # derrive metadata from first few rows
    117         q = sql.select(columns).limit(head_rows).select_from(table)
--> 118         head = pd.read_sql(q, engine, **kwargs)
    119 
    120         if head.empty:

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_sql(sql, con, index_col, coerce_float, params, parse_dates, columns, chunksize)
    395             sql, index_col=index_col, params=params,
    396             coerce_float=coerce_float, parse_dates=parse_dates,
--> 397             chunksize=chunksize)
    398 
    399 

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize)
   1061         args = _convert_params(sql, params)
   1062 
-> 1063         result = self.execute(*args)
   1064         columns = result.keys()
   1065 

C:\miniconda3\envs\my_env\lib\site-packages\pandas\io\sql.py in execute(self, *args, **kwargs)
    952     def execute(self, *args, **kwargs):
    953         """Simple passthrough to SQLAlchemy connectable"""
--> 954         return self.connectable.execute(*args, **kwargs)
    955 
    956     def read_table(self, table_name, index_col=None, coerce_float=True,

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, statement, *multiparams, **params)
   2073 
   2074         connection = self.contextual_connect(close_with_result=True)
-> 2075         return connection.execute(statement, *multiparams, **params)
   2076 
   2077     def scalar(self, statement, *multiparams, **params):

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
    946             raise exc.ObjectNotExecutableError(object)
    947         else:
--> 948             return meth(self, multiparams, params)
    949 
    950     def _execute_function(self, func, multiparams, params):

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\sql\elements.py in _execute_on_connection(self, connection, multiparams, params)
    267     def _execute_on_connection(self, connection, multiparams, params):
    268         if self.supports_execution:
--> 269             return connection._execute_clauseelement(self, multiparams, params)
    270         else:
    271             raise exc.ObjectNotExecutableError(self)

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_clauseelement(self, elem, multiparams, params)
   1058             compiled_sql,
   1059             distilled_params,
-> 1060             compiled_sql, distilled_params
   1061         )
   1062         if self._has_events or self.engine._has_events:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1198                 parameters,
   1199                 cursor,
-> 1200                 context)
   1201 
   1202         if self._has_events or self.engine._has_events:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1411                 util.raise_from_cause(
   1412                     sqlalchemy_exception,
-> 1413                     exc_info
   1414                 )
   1415             else:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
    263     exc_type, exc_value, exc_tb = exc_info
    264     cause = exc_value if exc_value is not exception else None
--> 265     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    266 
    267 if py3k:

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    246             value.__cause__ = cause
    247         if value.__traceback__ is not tb:
--> 248             raise value.with_traceback(tb)
    249         raise value
    250 

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1191                         statement,
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:
   1195             self._handle_dbapi_exception(

C:\miniconda3\envs\my_env\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    507 
    508     def do_execute(self, cursor, statement, parameters, context=None):
--> 509         cursor.execute(statement, parameters)
    510 
    511     def do_execute_no_params(self, cursor, statement, context=None):

ProgrammingError: (psycopg2.ProgrammingError) subquery in FROM must have an alias
LINE 2: FROM (SELECT my_schema.my_table.id AS id 
             ^
HINT:  For example, FROM (SELECT ...) [AS] foo.
 [SQL: 'SELECT id \nFROM (SELECT my_schema.my_table.id AS id \nFROM my_schema.my_table \n LIMIT %(param_1)s) \n LIMIT %(param_2)s'] [parameters: {'param_1': 5, 'param_2': 5}] (Background on this error at: http://sqlalche.me/e/f405)

正如 Chris 在另一个答案中所说,Dask 将您的查询包装在某种形式中SELECT columns FROM (yourquery),这对于 PostgreSQL 来说是无效的语法,因为它需要该括号表达式的别名。无需重新实现整个read_sql_table方法,只需添加即可为表达式添加别名.alias('somename')根据您的选择,即

select([t]).limit(5).alias('foo')

该表达式在被 Dask 包装后会为 Postgres 生成正确的语法

SELECT columns FROM (yourquery) AS foo
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 SQLAlchemy 表达式时 Dask read_sql_table 出错 的相关文章

  • pandas 替换多个值

    以下是示例数据框 gt gt gt df pd DataFrame a 1 1 1 2 2 b 11 22 33 44 55 gt gt gt df a b 0 1 11 1 1 22 2 1 33 3 2 44 4 3 55 现在我想根据
  • 如何使用包含代码的“asyncio.sleep()”进行单元测试?

    我在编写 asyncio sleep 包含的单元测试时遇到问题 我要等待实际的睡眠时间吗 I used freezegun到嘲笑时间 当我尝试使用普通可调用对象运行测试时 这个库非常有用 但我找不到运行包含 asyncio sleep 的测
  • 打破嵌套循环[重复]

    这个问题在这里已经有答案了 有没有比抛出异常更简单的方法来打破嵌套循环 在Perl https en wikipedia org wiki Perl 您可以为每个循环指定标签 并且至少继续一个外循环 for x in range 10 fo
  • __del__ 真的是析构函数吗?

    我主要用 C 做事情 其中 析构函数方法实际上是为了销毁所获取的资源 最近我开始使用python 这真的很有趣而且很棒 我开始了解到它有像java一样的GC 因此 没有过分强调对象所有权 构造和销毁 据我所知 init 方法对我来说在 py
  • keras加载模型错误尝试将包含17层的权重文件加载到0层的模型中

    我目前正在使用 keras 开发 vgg16 模型 我用我的一些图层微调 vgg 模型 拟合我的模型 训练 后 我保存我的模型model save name h5 可以毫无问题地保存 但是 当我尝试使用以下命令重新加载模型时load mod
  • 运行多个 scrapy 蜘蛛的正确方法

    我只是尝试使用在同一进程中运行多个蜘蛛新的 scrapy 文档 http doc scrapy org en 1 0 topics practices html但我得到 AttributeError CrawlerProcess objec
  • 使用 Pycharm 在 Windows 下启动应用程序时出现 UnicodeDecodeError

    问题是当我尝试启动应用程序 app py 时 我收到以下错误 UnicodeDecodeError utf 8 编解码器无法解码位置 5 中的字节 0xb3 起始字节无效 整个文件app py coding utf 8 from flask
  • Python 中的二进制缓冲区

    在Python中你可以使用StringIO https docs python org library struct html用于字符数据的类似文件的缓冲区 内存映射文件 https docs python org library mmap
  • NameError:名称“urllib”未定义”

    CODE import networkx as net from urllib request import urlopen def read lj friends g name fetch the friend list from Liv
  • feedparser 在脚本运行期间失败,但无法在交互式 python 控制台中重现

    当我运行 eclipse 或在 iPython 中运行脚本时 它失败了 ascii codec can t decode byte 0xe2 in position 32 ordinal not in range 128 我不知道为什么 但
  • python pandas 中的双端队列

    我正在使用Python的deque 实现一个简单的循环缓冲区 from collections import deque import numpy as np test sequence np array range 100 2 resha
  • Abaqus 将曲面转化为集合

    我一直试图在模型中找到两个表面的中心 参见照片 但未能成功 它们是元素表面 面 查询中没有选项可以查找元素表面的中心 只能查找元素集的中心 找到节点集的中心也很好 但是我的节点集没有出现在工具 gt 查询 gt 质量属性选项中 而且我找不到
  • python 集合可以包含的值的数量是否有限制?

    我正在尝试使用 python 设置作为 mysql 表中 ids 的过滤器 python集存储了所有要过滤的id 现在大约有30000个 这个数字会随着时间的推移慢慢增长 我担心python集的最大容量 它可以包含的元素数量有限制吗 您最大
  • Python:字符串不会转换为浮点数[重复]

    这个问题在这里已经有答案了 我几个小时前写了这个程序 while True print What would you like me to double line raw input gt if line done break else f
  • 表达式中的 Python 'in' 关键字与 for 循环中的比较 [重复]

    这个问题在这里已经有答案了 我明白什么是in运算符在此代码中执行的操作 some list 1 2 3 4 5 print 2 in some list 我也明白i将采用此代码中列表的每个值 for i in 1 2 3 4 5 print
  • 循环中断打破tqdm

    下面的简单代码使用tqdm https github com tqdm tqdm在循环迭代时显示进度条 import tqdm for f in tqdm tqdm range 100000000 if f gt 100000000 4 b
  • 从 pygame 获取 numpy 数组

    我想通过 python 访问我的网络摄像头 不幸的是 由于网络摄像头的原因 openCV 无法工作 Pygame camera 使用以下代码就像魅力一样 from pygame import camera display camera in
  • Nuitka 未使用 nuitka --recurse-all hello.py [错误] 编译 exe

    我正在尝试通过 nuitka 创建一个简单的 exe 这样我就可以在我的笔记本电脑上运行它 而无需安装 Python 我在 Windows 10 上并使用 Anaconda Python 3 我输入 nuitka recurse all h
  • Python:计算字典的重复值

    我有一本字典如下 dictA unit1 test1 alpha unit1 test2 beta unit2 test1 alpha unit2 test2 gamma unit3 test1 delta unit3 test2 gamm
  • 如何从没有结尾的管道中读取 python 中的 stdin

    当管道来自 打开 时 不知道正确的名称 我无法从 python 中的标准输入或管道读取数据 文件 我有作为例子管道测试 py import sys import time k 0 try for line in sys stdin k k

随机推荐

  • Android Studio 64 位错误:32 位 Linux Android 模拟器二进制文件已弃用

    我最近将我的 ubuntu 13 10 64 位升级到 14 04 64 位 Android Studio 在 13 10 上始终运行良好 在 14 04 上 我遇到了 gradle 问题 this one 这是缺少 libz so 1 的
  • 在命令提示符下打开 Sqlite Db

    朋友们 我正在研究 Sqlite DB 我想通过 ADB shell 命令打开 查看或创建数据库 我在命令提示符中编写了以下命令 cd C android sdk r04 windows android sdk windows tools
  • Python If == true 语句仅适用于 readline 的最后一行

    我的函数仅表示单词文件中的最后一个单词是字谜 第一个辅助函数 但文件中的每个单词都是我测试的单词的字谜词 并通过主函数之外的辅助函数独立返回 true 我不确定这是否与 n是字符串的一部分 然后它解释了这一点 但我尝试放入一个 if 语句
  • 在 PHP/MySQL 中使用 Week(Date) 时,如何将一周的第一天设置为星期一?

    我正在使用以下代码构建 Google 图表 以提取 MySQL 表中与一年中的周数相对应的所有条目 目前 周数从星期日开始 我想更改此设置 以便它们从星期一开始 但我不确定如何执行此操作 i 1 while i lt 53 week sta
  • perl中与-e和正则表达式匹配的文件名

    我需要检查目录中是否存在文件 文件名的模式如下 d1 d2 d3 abcd 12345 67890 dat 在我的程序中 我将知道文件名abcd 我需要写一个if使用条件 e选项并查找与上面给定模式匹配的文件 您可以使用glob返回名称与模
  • xslt 是将文本转换为 xml 结构的好方法吗?

    我正在尝试找到一个更好的解决方案来将纯文本 但每个字段具有预定义的长度 转换为 xml 例如输入文本可以是 Testuser new york 10018 前 11 个字符表示用户名 接下来的 12 个字符表示城市 接下来的 5 个字符表示
  • 没有主键的实体的 Symfony Doctrine 模型

    我在重建 Web 应用程序时正在使用旧数据库 我想使用 Symfony2 x 它显然将 Doctrine 作为 ORM 我有大约 50 个 mysql 表 它们没有主键 当我尝试生成模型时 它不允许我这样做并抛出 表上没有主键 的异常 我是
  • 如何使用联系人框架获取 iOS 9 中的所有联系人记录

    AddressBook 框架的大部分内容在 iOS 9 中已被弃用 在新的 Contacts 框架中文档仅显示如何获取与a匹配的记录NSPredicate 但是如果我想要怎么办all记录 其他两个答案都只从容器中加载联系人defaultCo
  • TLS 不适用于 Kubernetes 中的负载均衡器后端服务

    我试图通过创建服务类型作为负载均衡器来公开集群中的应用程序 这样做的原因是我希望这个应用程序有一个单独的沟通渠道 我有一个 KOPS 集群 我想使用AWS的网络负载均衡器 以便它获得静态IP 当我创建服务并将端口 80 映射到应用程序运行的
  • laravel 5.5 由于不活动,页面已过期。请刷新并重试

    我是 Laravel 的新手 我有一个我不明白的问题 我的项目中有一个日志表单 我的方法是POST 当我尝试请求时 结果是 由于不活动 该页面已过期 请刷新并尝试 再次 但是如果我将方法更改为GET 效果很好 有人可以告诉我为什么会这样以及
  • 卸载从源代码构建的 python?

    我已经从源代码安装了 python 2 6 后来又错误地从包管理器安装了另一个 python 2 6 我找不到卸载从源代码构建的 python 的方法 这可能 容易吗 运行 ubuntu 10 04 您可以使用 checkinstall 来
  • WPF 工具包 DataGrid 复选框问题

    我真的希望有人能在这里帮助我 我的程序中有一个 DataGrid 它有一个复选框列 DataGrid 的 ItemsSource 是以编程方式加载的 DataSet 当我在 DataGrid 中选择几个项目然后滚动它时 我得到了一些非常奇怪
  • 在 HTML 代码中的何处插入 JavaScript 库和 CSS?

    我对 Web 开发不太陌生 当我在互联网上搜索其他主题时 我看到很多人将流行的 JS 库放在他们网站的不同地方 例如 在 上插入 JS 库非常开始或开始 部分 在加载任何 JS 代码或 CSS 文件之前 例如 在 上插入 JS 库结束了 部
  • 如何为spark-submit添加资源jar?

    我的spark应用程序依赖于adam 2 11 0 20 0 jar 每次我都必须将我的应用程序与adam 2 11 0 20 0 jar打包为fat jar以提交到spark 例如我的fat jar是myApp1 adam 2 11 0
  • 以编程方式运行 MSBuild

    我正在尝试以编程方式执行 MSBuild 但无法执行以下命令 string command string Format C Windows Microsoft NET Framework v4 0 30319 msbuild exe 0 1
  • 如何使用 Jersey 2 测试框架为此类编写单元测试

    我正在尝试为 Rest api 调用编写单元测试 该调用具有 POST 方法 用于使用 Jersey2 将视频文件添加到基于 Web 的应用程序 这是我的类方法的签名 TemplateController java 我想为其编写单元测试 P
  • Hibernate MSSQL datetime2 映射

    我有一个存储过程 它返回数据库中数据类型为 datetime2 Java 文件中数据类型为 Date 的列 当我尝试对从数据库获取的时间调用 getTime 时 它返回 19994321211 毫秒 相当于 IST 2015 年 5 月 4
  • 如何使用 fwrite 将结构写入文件?

    我对 C 很陌生 并且在使用 fwrite 时遇到了麻烦 我正在寻找使用一个包含两个值的结构 struct keyEncode unsigned short key 2 unsigned short encoded 2 然后我在 main
  • 如何使用 Jax-RS 返回 Java List Json

    我想知道如何让方法返回 List 的 JSON 数组 例如 GET Produces application json public List
  • 使用 SQLAlchemy 表达式时 Dask read_sql_table 出错

    我正在尝试将 SQLAlchemy 表达式与 dask 的 read sql table 结合使用 以获取通过连接和过滤几个不同表创建的数据集 这文档表明这应该是可能的 下面的示例不包含任何联接 因为不需要它们来复制问题 我构建连接字符串