在 SQLAlchemy 中映射大量相似的表

2023-11-21

我有许多(~2000)个位置的时间序列数据。每个时间序列都有数百万行。我想将它们存储在 Postgres 数据库中。我当前的方法是为每个位置时间序列建立一个表,以及一个存储每个位置信息(坐标、海拔等)的元表。我正在使用 Python/SQLAlchemy 创建和填充表。我希望在元表和每个时间序列表之间建立关系,以执行诸如“选择在日期 A 和日期 B 之间有数据的所有位置”和“选择日期 A 的所有数据并导出带有坐标的 csv”之类的查询。创建许多具有相同结构(仅名称不同)并与元表有关系的表的最佳方法是什么?或者我应该使用不同的数据库设计?

目前我正在使用这种类型的方法来生成很多类似的映射:

from sqlalchemy import create_engine, MetaData
from sqlalchemy.types import Float, String, DateTime, Integer
from sqlalchemy import Column, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, backref

Base = declarative_base()


def make_timeseries(name):
    class TimeSeries(Base):

        __tablename__ = name
        table_name = Column(String(50), ForeignKey('locations.table_name'))
        datetime = Column(DateTime, primary_key=True)
        value = Column(Float)

        location = relationship('Location', backref=backref('timeseries',
                                lazy='dynamic'))

        def __init__(self, table_name, datetime, value):
            self.table_name = table_name
            self.datetime = datetime
            self.value = value

        def __repr__(self):
            return "{}: {}".format(self.datetime, self.value)

    return TimeSeries


class Location(Base):

    __tablename__ = 'locations'
    id = Column(Integer, primary_key=True)
    table_name = Column(String(50), unique=True)
    lon = Column(Float)
    lat = Column(Float)

if __name__ == '__main__':
    connection_string = 'postgresql://user:pw@localhost/location_test'
    engine = create_engine(connection_string)
    metadata = MetaData(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()

    TS1 = make_timeseries('ts1')
    # TS2 = make_timeseries('ts2')   # this breaks because of the foreign key
    Base.metadata.create_all(engine)
    session.add(TS1("ts1", "2001-01-01", 999))
    session.add(TS1("ts1", "2001-01-02", -555))

    qs = session.query(Location).first()
    print qs.timeseries.all()

这种方法有一些问题,最值得注意的是,如果我创建多个TimeSeries外键不起作用。以前我使用过一些解决方法,但这一切似乎都是一个大黑客,我觉得必须有更好的方法来做到这一点。我应该如何组织和访问我的数据?


Alternative-1: Table Partitioning

Partitioning我一读就立即想到exactly相同的表结构。我不是 DBA,也没有太多使用它的生产经验(在 PostgreSQL 上更是如此),但是 请阅读PostgreSQL - Partitioning文档。表分区旨在准确解决您遇到的问题,但超过 1K 的表/分区听起来很有挑战性;因此,请在论坛/SO 上进行更多研究,以了解与此主题相关的可扩展性问题。

鉴于您最常用的两个搜索条件,datetime组件非常重要,因此必须有可靠的索引策略。如果你决定一起去partitioningroot,明显的分区策略将基于日期范围。与最新数据相比,这可能允许您将旧数据划分为不同的块,特别是假设旧数据(几乎从未)更新,因此物理布局将是密集且高效的;而您可以采用另一种策略来获取更多“最新”数据。

Alternative-2: trick SQLAlchemy

这基本上通过欺骗 SA 假设所有这些来使您的示例代码工作TimeSeries are children一个实体使用Concrete Table Inheritance。下面的代码是独立的,创建了 50 个包含最少数据的表。但是,如果您已经有一个数据库,它应该允许您相当快地检查性能,以便您可以在可能性很小的情况下做出决定。

from datetime import date, datetime

from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float, ForeignKey, func
from sqlalchemy.orm import sessionmaker, relationship, configure_mappers, joinedload
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.declarative import AbstractConcreteBase, ConcreteBase


engine = create_engine('sqlite:///:memory:', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base(engine)


# MODEL
class Location(Base):
    __tablename__ = 'locations'
    id = Column(Integer, primary_key=True)
    table_name = Column(String(50), unique=True)
    lon = Column(Float)
    lat = Column(Float)


class TSBase(AbstractConcreteBase, Base):
    @declared_attr
    def table_name(cls):
        return Column(String(50), ForeignKey('locations.table_name'))


def make_timeseries(name):
    class TimeSeries(TSBase):
        __tablename__ = name
        __mapper_args__ = { 'polymorphic_identity': name, 'concrete':True}

        datetime = Column(DateTime, primary_key=True)
        value = Column(Float)

        def __init__(self, datetime, value, table_name=name ):
            self.table_name = table_name
            self.datetime = datetime
            self.value = value

    return TimeSeries


def _test_model():
    _NUM = 50
    # 0. generate classes for all tables
    TS_list = [make_timeseries('ts{}'.format(1+i)) for i in range(_NUM)]
    TS1, TS2, TS3 = TS_list[:3] # just to have some named ones
    Base.metadata.create_all()
    print('-'*80)

    # 1. configure mappers
    configure_mappers()

    # 2. define relationship
    Location.timeseries = relationship(TSBase, lazy="dynamic")
    print('-'*80)

    # 3. add some test data
    session.add_all([Location(table_name='ts{}'.format(1+i), lat=5+i, lon=1+i*2)
        for i in range(_NUM)])
    session.commit()
    print('-'*80)

    session.add(TS1(datetime(2001,1,1,3), 999))
    session.add(TS1(datetime(2001,1,2,2), 1))
    session.add(TS2(datetime(2001,1,2,8), 33))
    session.add(TS2(datetime(2002,1,2,18,50), -555))
    session.add(TS3(datetime(2005,1,3,3,33), 8))
    session.commit()


    # Query-1: get all timeseries of one Location
    #qs = session.query(Location).first()
    qs = session.query(Location).filter(Location.table_name == "ts1").first()
    print(qs)
    print(qs.timeseries.all())
    assert 2 == len(qs.timeseries.all())
    print('-'*80)


    # Query-2: select all location with data between date-A and date-B
    dateA, dateB = date(2001,1,1), date(2003,12,31)
    qs = (session.query(Location)
            .join(TSBase, Location.timeseries)
            .filter(TSBase.datetime >= dateA)
            .filter(TSBase.datetime <= dateB)
            ).all()
    print(qs)
    assert 2 == len(qs)
    print('-'*80)


    # Query-3: select all data (including coordinates) for date A
    dateA = date(2001,1,1)
    qs = (session.query(Location.lat, Location.lon, TSBase.datetime, TSBase.value)
            .join(TSBase, Location.timeseries)
            .filter(func.date(TSBase.datetime) == dateA)
            ).all()
    print(qs)
    # @note: qs is list of tuples; easy export to CSV
    assert 1 == len(qs)
    print('-'*80)


if __name__ == '__main__':
    _test_model()

Alternative-3: a-la BigData

如果您确实在使用数据库时遇到性能问题,我可能会尝试:

  • 仍然像现在一样将数据保存在单独的表/数据库/模式中
  • 使用数据库引擎提供的“本机”解决方案批量导入数据
  • use MapReduce-like analysis.
    • 在这里,我将继续使用 python 和 sqlalchemy 并实现自己的分布式查询和聚合(或找到现有的东西)。显然,只有当您不需要直接在数据库上生成这些结果时,这才有效。

edit-1: Alternative-4: TimeSeries databases

我没有大规模使用这些的经验,但绝对是一个值得考虑的选择。


如果您稍后能分享您的发现和整个决策过程,那就太好了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 SQLAlchemy 中映射大量相似的表 的相关文章

  • 如何从网页中嵌入的 Tableau 图表中抓取工具提示值

    我试图弄清楚是否有一种方法以及如何使用 python 从网页中的 Tableau 嵌入图形中抓取工具提示值 以下是当用户将鼠标悬停在条形上时带有工具提示的图表示例 我从要从中抓取的原始网页中获取了此网址 https covid19 colo
  • 执行带有 EXCEPTION 的 PostgreSQL 查询会导致两条不同的错误消息

    我有一个 PostgreSQL 查询 其中包含事务和列重复时的异常 BEGIN ALTER TABLE public cars ADD COLUMN top speed text EXCEPTION WHEN duplicate colum
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 使用 Tkinter 显示 numpy 数组中的图像

    我对 Python 缺乏经验 第一次使用 Tkinter 制作一个 UI 显示我的数字分类程序与 mnist 数据集的结果 当图像来自 numpy 数组而不是我的 PC 上的文件路径时 我有一个关于在 Tkinter 中显示图像的问题 我为
  • Python pickle:腌制对象不等于源对象

    我认为这是预期的行为 但想检查一下 也许找出原因 因为我所做的研究结果是空白 我有一个函数可以提取数据 创建自定义类的新实例 然后将其附加到列表中 该类仅包含变量 然后 我使用协议 2 作为二进制文件将该列表腌制到文件中 稍后我重新运行脚本
  • Python 函数可以从作用域之外赋予新属性吗?

    我不知道你可以这样做 def tom print tom s locals locals def dick z print z name z name z guest Harry print z guest z guest print di
  • 添加不同形状的 numpy 数组

    我想添加两个不同形状的 numpy 数组 但不进行广播 而是将 缺失 值视为零 可能最简单的例子是 1 2 3 2 gt 3 2 3 or 1 2 3 2 1 gt 3 2 3 1 0 0 我事先不知道形状 我正在弄乱每个 np shape
  • 如何在ipywidget按钮中显示全文?

    我正在创建一个ipywidget带有一些文本的按钮 但按钮中未显示全文 我使用的代码如下 import ipywidgets as widgets from IPython display import display button wid
  • 在 postgres 查询中使用列表

    我有一个动态列表 list a b c d 所以长度可能会改变 我想在查询中比较这些列表值 select from student where name in all the list values 我想将列表值传递到此查询中 我怎样才能做
  • SQL不允许表中有重复记录

    如何使其不添加重复项 我想让它通过 ID 之外的所有其他列进行检查 我希望这个无效 ID col1 col2 col3 1 first middle last ID col1 col2 col3 2 first middle last 我希
  • python获取上传/下载速度

    我想在我的计算机上监控上传和下载速度 一个名为 conky 的程序已经在 conky conf 中执行了以下操作 Connection quality alignr wireless link qual perc wlan0 downspe
  • 如何在Python中对类别进行加权随机抽样

    给定一个元组列表 其中每个元组都包含一个概率和一个项目 我想根据其概率对项目进行采样 例如 给出列表 3 a 4 b 3 c 我想在 40 的时间内对 b 进行采样 在 python 中执行此操作的规范方法是什么 我查看了 random 模
  • 向 Altair 图表添加背景实心填充

    I like Altair a lot for making graphs in Python As a tribute I wanted to regenerate the Economist graph s in Mistakes we
  • 使用 Python 绘制 2D 核密度估计

    I would like to plot a 2D kernel density estimation I find the seaborn package very useful here However after searching
  • 使用其构造函数初始化 OrderedDict 以便保留初始数据的顺序的正确方法?

    初始化有序字典 OD 以使其保留初始数据的顺序的正确方法是什么 from collections import OrderedDict Obviously wrong because regular dict loses order d O
  • 使用 Python 的 matplotlib 选择在屏幕上显示哪些图形以及将哪些图形保存到文件中

    我想用Python创建不同的图形matplotlib pyplot 然后 我想将其中一些保存到文件中 而另一些则应使用show 命令 然而 show 显示all创建的数字 我可以通过调用来避免这种情况close 创建我不想在屏幕上显示的绘图
  • 从列表指向字典变量

    假设你有一个清单 a 3 4 1 我想用这些信息来指向字典 b 3 4 1 现在 我需要的是一个常规 看到该值后 在 b 的位置内读写一个值 我不喜欢复制变量 我想直接改变变量b的内容 假设b是一个嵌套字典 你可以这样做 reduce di
  • 在 Oracle 行的多个列上使用透视

    我在 Oracle 表中有以下示例数据 tab1 我正在尝试将行转换为列 我知道如何在某一列上使用 Oracle 数据透视表 但是否可以将其应用于多个列 样本数据 Type weight height A 50 10 A 60 12 B 4
  • 如何将输入读取为数字?

    这个问题的答案是社区努力 help privileges edit community wiki 编辑现有答案以改进这篇文章 目前不接受新的答案或互动 Why are x and y下面的代码中使用字符串而不是整数 注意 在Python 2
  • Statsmodels.formula.api OLS不显示截距的统计值

    我正在运行以下源代码 import statsmodels formula api as sm Add one column of ones for the intercept term X np append arr np ones 50

随机推荐