我正在尝试为 S&P 500 ETF 创建一个包含 30 分钟数据的 PostgreSQL 表
(spy30new,用于测试新插入的数据)来自具有 15 分钟数据的多个股票的表(全部 15 个)。 all15 在“dt”(时间戳)和“instr”(股票代码)上有一个索引。我希望spy30new 在“dt”上有一个索引。
import numpy as np
import pandas as pd
from datetime import datetime, date, time, timedelta
from dateutil import parser
from sqlalchemy import create_engine
# Query all15
engine = create_engine('postgresql://user:passwd@localhost:5432/stocks')
new15Df = (pd.read_sql_query("SELECT dt, o, h, l, c, v FROM all15 WHERE (instr = 'SPY') AND (date(dt) BETWEEN '2016-06-27' AND '2016-07-15');", engine)).sort_values('dt')
# Correct for Time Zone.
new15Df['dt'] = (new15Df['dt'].copy()).apply(lambda d: d + timedelta(hours=-4))
# spy0030Df contains the 15-minute data at 00 & 30 minute time points
# spy1545Df contains the 15-minute data at 15 & 45 minute time points
spy0030Df = (new15Df[new15Df['dt'].apply(lambda d: d.minute % 30) == 0]).reset_index(drop=True)
spy1545Df = (new15Df[new15Df['dt'].apply(lambda d: d.minute % 30) == 15]).reset_index(drop=True)
high = pd.concat([spy1545Df['h'], spy0030Df['h']], axis=1).max(axis=1)
low = pd.concat([spy1545Df['l'], spy0030Df['l']], axis=1).min(axis=1)
volume = spy1545Df['v'] + spy0030Df['v']
# spy30Df assembled and pushed to PostgreSQL as table spy30new
spy30Df = pd.concat([spy0030Df['dt'], spy1545Df['o'], high, low, spy0030Df['c'], volume], ignore_index = True, axis=1)
spy30Df.columns = ['d', 'o', 'h', 'l', 'c', 'v']
spy30Df.set_index(['dt'], inplace=True)
spy30Df.to_sql('spy30new', engine, if_exists='append', index_label='dt')
这会出现错误“ValueError:无法将 DatetimeIndex 转换为 dtype datetime64[us]”
到目前为止我已经尝试过的(我已经使用 pandas 成功将 CSV 文件推送到 PG。但这里的源是 PG 数据库):
-
不放置索引'dt'
spy30Df.set_index(['dt'], inplace=True) # Remove this line
spy30Df.to_sql('spy30new', engine, if_exists='append') # Delete the index_label option
-
使用将“dt”从 pandas.tslib.Timestamp 类型转换为 datetime.datetimeto_pydatetime()
(如果 psycopg2 可以与 python dt 一起使用,但不能与 pandas Timestamp 一起使用)
u = (spy0030Df['dt']).tolist()
timesAsPyDt = np.asarray(map((lambda d: d.to_pydatetime()), u))
spy30Df = pd.concat([spy1545Df['o'], high, low, spy0030Df['c'], volume], ignore_index = True, axis=1)
newArray = np.c_[timesAsPyDt, spy30Df.values]
colNames = ['dt', 'o', 'h', 'l', 'c', 'v']
newDf = pd.DataFrame(newArray, columns=colNames)
newDf.set_index(['dt'], inplace=True)
newDf.to_sql('spy30new', engine, if_exists='append', index_label='dt')
-
Using datetime.utcfromtimestamp()
timesAsDt = (spy0030Df['dt']).apply(lambda d: datetime.utcfromtimestamp(d.tolist()/1e9))
-
Using pd.to_datetime()
timesAsDt = pd.to_datetime(spy0030Df['dt'])