如何从 PySpark 中某个表中找到的多个表中获取所有数据?

2024-05-07

我正在使用 pyspark/SQL。我有一个包含三列的表(MAIN_TABLE):

DATABASE_NAME
TABLE_NAME
SOURCE_TYPE

我想从 DATABASE_NAME 和 TABLE_NAME 列中的主表下找到的实际数据库和表中获取所有数据。但是,我只想从具有 SOURCE_TYPE = 'STANDARD' 的表中抓取数据,其他任何内容都不应该抓取。

我基本上需要在 MAIN_TABLE 下找到的所有表的数据的并集,其中 SOURCE_TYPE = 'STANDARD' 并且它们满足某些条件。我尝试运行,但它没有抓取 MAIN_TABLE 下具有 SOURCE_TYPE = 'STANDARD' 的所有表下找到的数据。我看起来好像缺少什么吗?

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()

# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")

# Initialize an empty DataFrame to store the result
result_df = None

# Loop through the filtered tables
for row in config_df.collect():
    database_name = row["database_name"]
    table_name = row["table_name"]

    # Generate a dynamic SQL query to select data from the source table
    sql_query = f"""
        SELECT
            header.profile,
            attributes.id,
            header.location,
            'SOURCE_TYPE' as source_type,
            header.actionname as actionname,
            transform.date,
            header.ip,
            header.country,
            '{database_name}' as source_database_name,
            '{table_name}' as source_event_name
        FROM {database_name}.{table_name}
        """

    # Execute the SQL query and create a DataFrame
    source_data_df = spark.sql(sql_query)

    # Union the source_data_df with the result_df
    if result_df is None:
        result_df = source_data_df
    else:
        result_df = result_df.unionAll(source_data_df)

# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")

# Stop the SparkSession
spark.stop()

为了从 SOURCE_TYPE = 'ACTUAL' 的所有表中获取所有数据,我是否有什么做得不对的地方?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 PySpark 中某个表中找到的多个表中获取所有数据? 的相关文章

  • 如何 md5 所有列(无论类型如何)

    我想创建一个 sql 查询 或 plpgsql 它将 md5 所有给定的行 无论类型如何 但是 在下面 如果 1 为空 则哈希为空 UPDATE thetable SET hash md5 accountid accounttype cre
  • 为表中的每个组选择前 N 行

    我面临一个非常常见的问题 即 为表中的每个组选择前 N 行 考虑一个表id name hair colour score列 我想要一个结果集 对于每种头发颜色 都能得到前 3 名得分手的名字 为了解决这个问题 我得到了我所需要的Rick O
  • 执行带有 EXCEPTION 的 PostgreSQL 查询会导致两条不同的错误消息

    我有一个 PostgreSQL 查询 其中包含事务和列重复时的异常 BEGIN ALTER TABLE public cars ADD COLUMN top speed text EXCEPTION WHEN duplicate colum
  • 将表数据从一个 SQL Server 导出到另一台 SQL Server

    我有两个 SQL Server 都是 2005 版本 我想将多个表从一个表迁移到另一个表 我努力了 在源服务器上 我右键单击数据库 选择Tasks Generate scripts 问题是在下面Table View options没有Scr
  • 如何获得顶部带有千位分隔符的数字?

    SELECT count FROM table A 假设结果是8689 我怎样才能将它转换为8 689在 SQL Server 上 尝试这样 select replace convert varchar convert Money coun
  • POINT 列上的 MySQL INSERT/UPDATE

    我正在尝试用我国家的地理位置填充我的数据库 我的一张表有 4 个字段 ID PK 纬度 经度和地理点 EDIT SCDBs Punto Geografico SET lat 18 469692 SET lon 63 93212 SET g
  • SQL 错误:“没有这样的表”

    我试图解决为什么我的代码为所有查询返回 null 的原因 最后发现 sql 查询什么也没有返回 我使用简约代码创建了一个新的 AIR 文档 s WindowedApplication
  • 给定“java.sql.SQLIntegrityConstraintViolationException”是否可以确定错误的列

    鉴于我有一个类型为 java sql SQLIntegrityConstraintViolationException 的异常 是否可以以编程方式确定错误的列 或多列 我问这个问题是因为我想将错误映射回客户端的数据模型以指示错误的字段 例如
  • 如何查找当前数据库类型

    我们有一个 SQL 脚本可以在多种类型的数据库上执行 是否可以获取正在执行 SQL 脚本的当前数据库的类型 注意 我们不能使用非标准 SQL 即 TSQL 等 不 ANSI SQL 中没有任何关于确定数据库供应商的内容
  • 在 MySQL 中对整数字段运行带引号的数字(字符串)查询时会发生哪些复杂情况

    在 SQL 中 不应引用整数 因为如果引用 它将是一个字符串 但我很好奇如果我这样做会出现什么问题 并发症 例如 SELECT FROM table WHERE id 1 正确的 vs SELECT FROM table WHERE id
  • Pyspark:相当于 np.where [重复]

    这个问题在这里已经有答案了 这个操作在 Pyspark 中相当于什么 import pandas as pd import numpy as np df pd DataFrame Type list ABBC Set list ZZXY d
  • SQL Join 列上类似于另一列[重复]

    这个问题在这里已经有答案了 可能的重复 mysql连接查询使用like https stackoverflow com questions 1930809 mysql join query using like 我想要进行连接 其中一列包含
  • Postgresql 一张表的多个计数

    我想从表中的两列中获得这些列中值的统一计数 例如 两列是 表 报告 type place one home two school three work four cafe five friends six mall one work one
  • Oracle:如何获取刚刚插入的行的序列号?

    如何获取刚刚插入的行的序列号 插入 返回 declare s2 number begin insert into seqtest2 x values aaa returning seq into s2 dbms output put lin
  • 通过 osql.exe 运行脚本时出现问题

    我尝试以这种格式运行我的软件的更新脚本 osql exe i path to script U 用户 P 密码 S sqlserver 位置 d 数据库名称 n b 大多数脚本的格式相同 并且都以 GO 结尾 其中很多都运行得很好 但随机脚
  • 加密数据库字段的好方法?

    我被要求加密数据库中的各种数据库字段 问题是这些字段在读取后需要解密 我在用着Django and SQL Server 2005 有什么好主意吗 See 在 SQL Server 2005 数据库中使用对称加密 https web arc
  • 如何搜索例程的内容/(SP-触发函数)

    我需要在数据库内所有例程的例程主体 存储过程 函数 触发器 中搜索文本 我该怎么做 Thanks SELECT OBJECT NAME object id FROM sys sql modules WHERE definition LIKE
  • 实现软删除的最佳方法是什么?

    目前在做一个项目 我们要对大部分用户 用户角色 实现软删除 我们决定添加一个is deleted 0 数据库中每个表的字段并将其设置为 1 如果特定用户角色点击特定记录上的删除按钮 现在为了将来的维护 每个SELECT查询需要确保它们不包含
  • Spark 数据帧:根据另一列的值提取一列

    我有一个包含带有连接价目表的交易的数据框 paid currency EUR USD GBP 49 5 EUR 99 79 69 客户已支付 49 5 欧元 如 货币 列中所示 我现在想将支付的价格与价目表中的价格进行比较 因此 我需要根据
  • 在存储过程中验证用户的最简单方法?

    我需要一个存储过程 可以通过发送以下内容来检查登录尝试时他们是否是有效用户login and password查看它们在数据库中是否匹配 有没有一种简单的方法可以做到这一点 如果没有更多信息 我目前能提供的最好信息是 CREATE STOR

随机推荐

  • 防止被 0 除的 Typescript 类型

    我正在使用打字稿创建一个用于培训目的的计算系统 但在除法过程中出现打字错误 您知道如何解决吗 type Variable value number resolve gt number type NoZeroVariable value Om
  • 从其他模块调用的数组扩展

    其他模块 例如 XCTest 项目 无法使用数组扩展方法 为了简单起见 下面的代码什么也不做 但可以用来重现错误 import Foundation extension Array mutating func myMethod toInde
  • 将选择标准添加到 read.table

    让我们采用以下我导入的数据集的简化版本read table a lt as data frame c M M F F F b lt as data frame c 25 22 33 17 18 df lt cbind a b colname
  • 在Python中获取目录基名的优雅方法?

    我有几个脚本将目录名称作为输入 并且我的程序在这些目录中创建文件 有时我想获取给程序的目录的基本名称 并用它在目录中创建各种文件 例如 directory name given by user via command line output
  • 如何在S3中存储数据并允许用户使用rails API / iOS客户端以安全的方式访问?

    我是编写 Rails 和 API 的新手 我需要一些有关 S3 存储解决方案的帮助 这是我的问题 我正在为 iOS 应用程序编写一个 API 用户在 iOS 上使用 Facebook API 登录 服务器根据 Facebook 向 iOS
  • 调用泛型类型的方法?

    为什么下面的代码在 Delphi XE 中会产生错误 unit UTest interface type TTest class public procedure Foo
  • 设置角度组件的完整高度

    我无法让我的列表成为全高 我的代码由于嵌套组件而更加复杂 但我仍然可以使用此代码来复制它 这是一个笨蛋 http plnkr co edit R0QgLz8cjyRHYOLf4uJW http plnkr co edit R0QgLz8cj
  • 在散景中隐藏轴

    如何在散景图中隐藏 x 轴和 y 轴 我已经根据此进行了检查和尝试 p1 figure visible None p1 select type Axis visible 0 xaxis Axis plot p1 visible 0 和喜欢h
  • 您可以使用 Openpyxl 将全名拆分为名字和姓氏吗?

    我有一个 Excel 文件 我一直在尝试使用 openpyxl 将列 全名 拆分为两个单独的名字和姓氏列 例如 我有 from openpyxl import Workbook load workbook wb load workboo p
  • 在 Swift 中使用 enumeratorAtUrl 从 NSFileManager 返回目录枚举器时出现问题

    我试图从 NSFileManager 方法 enumeratorAtUrl 返回 NSDirectoryEnumerator 对象 这导致编译器错误 Cannot convert the expressions type NSDirecto
  • 就地修改 XML 文件?

    假设我有以下 XML 文件
  • 如何使用 gcloud 凭据对 Dialogflow API 进行身份验证

    我有一个 Node JS 应用程序 可以向 Dialogflow 代理发出请求 我实际上使用基于临时令牌的请求 但是我如何更改它以通过谷歌服务凭证来做到这一点 https cloud google com docs authenticati
  • 使用 Azure AD B2C 登录 Xamarin Android 应用

    经过一周的研究可与 Azure AD B2C 一起使用 Xamarin 以 Android 平台 而不是 Xamarin Forms 为目标的身份验证原理后 我终于寻求一些建议 我有一个带有 登录 按钮的活动 我想通过按钮的触摸事件登录到
  • 蓝牙 LE:地址类型

    我正在研究 iBeacon 技术 但我找不到有关地址类型的特定问题的任何答案 我找到了解释地址类型的文档 蓝牙规范 但我似乎找不到如何在两种类型 公共和随机 之间进行选择 这是我发现它的一个例子 它是由 Raspberry PI 上的 iB
  • React Native:如何在组件中添加脚本标签

    我正在尝试在 React Native 应用程序的组件内添加标签 下面是我的代码 它似乎不起作用 谁能告诉我如何解决这个问题 import React Component from react import PropTypes from p
  • Tensorflow无法分配设备进行操作

    我正在尝试跑步NVidia 脸部生成器演示 https github com tkarras progressive growing of gans在我的电脑上 我使用的是 Windows 10 我已经下载了源代码 并尝试按照页面下方的步骤
  • WPF DataGrid DataBindingComplete 事件在哪里?

    数据绑定完成后 我需要采取一些操作 例如 根据其他一些单元格使某些单元格只读 在WinForm DataGridView中 我曾经在DataBindingComplete事件中执行此操作 但是 我在 WPF DataGrid 中找不到这样的
  • CouchDB 视图中的链接文档

    我很难理解 CouchDB链接文档 http wiki apache org couchdb Introduction to CouchDB views Linked documents特征 我有两个types存储在单个 CouchDB 数
  • asp.net mvc 3 中模糊的远程属性验证

    asp net mvc 3 中的内置远程属性会执行 onchange 验证 我希望它在模糊时验证 有没有办法自定义它 或者还有其他东西可以这样做 我确信这是一个非常普遍的需求 你可以设置默认值 http docs jquery com Pl
  • 如何从 PySpark 中某个表中找到的多个表中获取所有数据?

    我正在使用 pyspark SQL 我有一个包含三列的表 MAIN TABLE DATABASE NAME TABLE NAME SOURCE TYPE 我想从 DATABASE NAME 和 TABLE NAME 列中的主表下找到的实际数