python大数据分析代码案例

2023-11-13



#查询用户余额代码案例


import sys

import MySQLdb

import pandas as pd


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.14',

                'dbport' : 3306,

                'dbname' : 'HBAODB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusercoin(userid):

    i = int(userid) % 10

    reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

    #print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    return ret[0]

    

def getall(userlist):

    userdata = pd.DataFrame(columns=('userid', 'coin'))

    index = 0

    for userid in userlist:

        coins = getusercoin(userid) #调用前面的函数

        #print coins[0],coins[1]/100.0

        if coins[0] is not None:

            userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        else:

            userdata.loc[index] = (str(userid), 0)

        index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(50)

   




#用户消费查询代码案例


import sys

import MySQLdb

import pandas as pd

import datetime

import time


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.12',

                'dbport' : 3306,

                'dbname' : 'JIESUANDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

#用户人民币消费

def getuserconsume(userid, startday): #定义带参函数

    strdate = startday.strftime("%y%m%d")

    # 送礼物 +  守护 +  点歌 +  表情贴

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

        

#用户充值

def getusercharge(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)#调用前面的函数

    print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

    

#用户当天结余人民币

def getusercurcoin(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)

    print ret

    if ret:

        return float(ret[0][1])/100.0

    else:

        return 0

        

def getconsume():

    startdate = datetime.date(2017, 1, 1)

    enddate = datetime.date(2017, 2, 2)

    userid = 3101011990

    

    userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))


    index = 0

    

    # 计算日差

    td = enddate - startdate

    datelen = td.days + 1

    #print datelen

    delta = datetime.timedelta(days=1)

    allcoins = 0 

    for i in range(0,datelen):

        startday = startdate + delta * i

        consume_coin = getuserconsume(userid, startday)#调用前面的函数

        charge = getusercharge(userid, startday)#调用前面的函数

        dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数

        

        

        userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

        index += 1

        

    #userdata.loc[index] = ('total',str(userid), allcoins, 0)

    print userdata.tail(100)

    return

    

getconsume()






#查询用户机器ID 代码案例


import sys

import MySQLdb

import pandas as pd

import datetime


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.15',

                'dbport' : 3306,

                'dbname' : 'JIQIDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusermid(userid, months):

    i = int(userid) % 50

    reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

    print reqsql

    ret = sql_select(reqsql)

    #print ret

    #print ret[0]

    return ret

    

def getall(userlist):

    today = datetime.date.today()

    months = today.strftime("%Y%m")

    userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))

    index = 0

    for userid in userlist:

        coins = getusermid(userid, months)

        for i in range(len(coins)):

            #print coins[i]

            userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))

            index += 1

        

        #print coins[0],coins[1]/100.0

        #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        #index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(1000)





#人民币统计代码案例

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write


def fromDayToDay(startdate, datelen, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        endday = startdate + delta * (i + 1)

        func(startday, endday)

    return

def fromDayToEndDay(startdate, datelen, endday, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        #endday = startdate + delta * (i + 1)

        func(startday, endday)

    return


# 获取人民币数据

def saveDayPackageData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.12',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'JIESUANDB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

    return


def savePackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayPackageData)

    

# 获取WF册数据

def saveDayWifiPhoneRegData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.15',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'AADB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

    return


def saveWifiPhoneReg():

    startday = datetime.date(2016, 12, 1)

    endday = datetime.date(2016, 12, 1)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

    

OPTypeName = {

    0:"会员",

    1:"道具",


}


OpDetailName19 = {

    1:"购物保存收益",

    2:"下注和返注",

    3:"发红包",

    4:"抢红包",


}


OpDetailName22 = {

    1:"活动1收益到总账号",

    2:"活动2收益到总账号",

    3:"活动3收益到总账号",


}


OpDetailName23 = {

    0:"购买会员",

    1:"购买道具",

    2:"扫雷",


}


def getOpTypeName(func):

    name = OPTypeName.get(func)

    if name == None:

        return ""

    else:

        return name.decode('utf8')

    

def getOpDetailName(func, detail):

    if func == 19:

        if detail > 10000 and detail < 30000:

            return "包裹回滚".decode('utf8')

        elif detail > 50000 and detail < 60000:

            return "红包接龙".decode('utf8')

        else:

            name = OpDetailName19.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 22:

            name = OpDetailName22.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 23:

            name = OpDetailName23.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    else:

        return ""


def getDayPackageData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    df.show(10)

    #df.createOrReplaceTempView('billdata')

    #df.registerTempTable("billdata")

    #sqlret = sqlc.sql("SELECT count(*) from billdata")

    #sqlret.show(1)

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('billdata')

    return

    

def getPackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数

    print 'getPackageData finish'


# 获取充值数据

def getChargeInfo(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.14', 

     'port': 3306, 

     'user':'user', 

     'passwd':'123654', 

     'db':'BAOIMDB', 

     'charset':'utf8'}

    

    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    regdata = pd.DataFrame()

    for i in range(0, 20): 

        sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

        print sql

        #pddf = pd.DataFrame()

        pddf = pd.read_sql(sql, con=mysql_cn)

        #print pddf.head(5)

        if len(pddf.index) > 0:

            regdata = regdata.append(pddf,ignore_index=True)

            print regdata.tail(5)

    

    if len(regdata.index) > 0:

        print regdata.describe()

        write("/home/haoren/logstatis/register"+strday+".parq", regdata)

    mysql_cn.close()

    return

    

def pudf(x):

    return getOpTypeName(x.OPTYPE)

    

def getMergeData(strday):

    dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

    dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()

    tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

    #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

    #print tempdf.head(10)

    tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

    #print tempdf.head(10)

    tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

    df = spark.createDataFrame(tempdf)

    df.show(10)

    return df

    

def analyzeDayBillData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'


    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

    df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)

    df3.show(10)

    df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")

    

    #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

    #df2.show(10)

    #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    #df.show(10)

    #df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeDayBillData2(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

    df = getMergeData(strday)

    return

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeBillData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数

    print 'analyzeBillData finish'

    

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()

#查询用户余额代码案例


import sys

import MySQLdb

import pandas as pd


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.14',

                'dbport' : 3306,

                'dbname' : 'HBAODB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusercoin(userid):

    i = int(userid) % 10

    reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

    #print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    return ret[0]

    

def getall(userlist):

    userdata = pd.DataFrame(columns=('userid', 'coin'))

    index = 0

    for userid in userlist:

        coins = getusercoin(userid) #调用前面的函数

        #print coins[0],coins[1]/100.0

        if coins[0] is not None:

            userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        else:

            userdata.loc[index] = (str(userid), 0)

        index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(50)

   




#用户消费查询代码案例


import sys

import MySQLdb

import pandas as pd

import datetime

import time


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.12',

                'dbport' : 3306,

                'dbname' : 'JIESUANDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

#用户人民币消费

def getuserconsume(userid, startday): #定义带参函数

    strdate = startday.strftime("%y%m%d")

    # 送礼物 +  守护 +  点歌 +  表情贴

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

        

#用户充值

def getusercharge(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)#调用前面的函数

    print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

    

#用户当天结余人民币

def getusercurcoin(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)

    print ret

    if ret:

        return float(ret[0][1])/100.0

    else:

        return 0

        

def getconsume():

    startdate = datetime.date(2017, 1, 1)

    enddate = datetime.date(2017, 2, 2)

    userid = 3101011990

    

    userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))


    index = 0

    

    # 计算日差

    td = enddate - startdate

    datelen = td.days + 1

    #print datelen

    delta = datetime.timedelta(days=1)

    allcoins = 0 

    for i in range(0,datelen):

        startday = startdate + delta * i

        consume_coin = getuserconsume(userid, startday)#调用前面的函数

        charge = getusercharge(userid, startday)#调用前面的函数

        dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数

        

        

        userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

        index += 1

        

    #userdata.loc[index] = ('total',str(userid), allcoins, 0)

    print userdata.tail(100)

    return

    

getconsume()






#查询用户机器ID 代码案例


import sys

import MySQLdb

import pandas as pd

import datetime


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.15',

                'dbport' : 3306,

                'dbname' : 'JIQIDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusermid(userid, months):

    i = int(userid) % 50

    reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

    print reqsql

    ret = sql_select(reqsql)

    #print ret

    #print ret[0]

    return ret

    

def getall(userlist):

    today = datetime.date.today()

    months = today.strftime("%Y%m")

    userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))

    index = 0

    for userid in userlist:

        coins = getusermid(userid, months)

        for i in range(len(coins)):

            #print coins[i]

            userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))

            index += 1

        

        #print coins[0],coins[1]/100.0

        #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        #index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(1000)





#人民币统计代码案例

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write


def fromDayToDay(startdate, datelen, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        endday = startdate + delta * (i + 1)

        func(startday, endday)

    return

def fromDayToEndDay(startdate, datelen, endday, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        #endday = startdate + delta * (i + 1)

        func(startday, endday)

    return


# 获取人民币数据

def saveDayPackageData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.12',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'JIESUANDB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

    return


def savePackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayPackageData)

    

# 获取WF册数据

def saveDayWifiPhoneRegData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.15',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'AADB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

    return


def saveWifiPhoneReg():

    startday = datetime.date(2016, 12, 1)

    endday = datetime.date(2016, 12, 1)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

    

OPTypeName = {

    0:"会员",

    1:"道具",


}


OpDetailName19 = {

    1:"购物保存收益",

    2:"下注和返注",

    3:"发红包",

    4:"抢红包",


}


OpDetailName22 = {

    1:"活动1收益到总账号",

    2:"活动2收益到总账号",

    3:"活动3收益到总账号",


}


OpDetailName23 = {

    0:"购买会员",

    1:"购买道具",

    2:"扫雷",


}


def getOpTypeName(func):

    name = OPTypeName.get(func)

    if name == None:

        return ""

    else:

        return name.decode('utf8')

    

def getOpDetailName(func, detail):

    if func == 19:

        if detail > 10000 and detail < 30000:

            return "包裹回滚".decode('utf8')

        elif detail > 50000 and detail < 60000:

            return "红包接龙".decode('utf8')

        else:

            name = OpDetailName19.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 22:

            name = OpDetailName22.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 23:

            name = OpDetailName23.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    else:

        return ""


def getDayPackageData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    df.show(10)

    #df.createOrReplaceTempView('billdata')

    #df.registerTempTable("billdata")

    #sqlret = sqlc.sql("SELECT count(*) from billdata")

    #sqlret.show(1)

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('billdata')

    return

    

def getPackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数

    print 'getPackageData finish'


# 获取充值数据

def getChargeInfo(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.14', 

     'port': 3306, 

     'user':'user', 

     'passwd':'123654', 

     'db':'BAOIMDB', 

     'charset':'utf8'}

    

    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    regdata = pd.DataFrame()

    for i in range(0, 20): 

        sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

        print sql

        #pddf = pd.DataFrame()

        pddf = pd.read_sql(sql, con=mysql_cn)

        #print pddf.head(5)

        if len(pddf.index) > 0:

            regdata = regdata.append(pddf,ignore_index=True)

            print regdata.tail(5)

    

    if len(regdata.index) > 0:

        print regdata.describe()

        write("/home/haoren/logstatis/register"+strday+".parq", regdata)

    mysql_cn.close()

    return

    

def pudf(x):

    return getOpTypeName(x.OPTYPE)

    

def getMergeData(strday):

    dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

    dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()

    tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

    #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

    #print tempdf.head(10)

    tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

    #print tempdf.head(10)

    tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

    df = spark.createDataFrame(tempdf)

    df.show(10)

    return df

    

def analyzeDayBillData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'


    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

    df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)

    df3.show(10)

    df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")

    

    #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

    #df2.show(10)

    #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    #df.show(10)

    #df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeDayBillData2(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

    df = getMergeData(strday)

    return

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeBillData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数

    print 'analyzeBillData finish'

    

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python大数据分析代码案例 的相关文章

  • 几种压缩算法

    一 行程长度压缩 原理是将一扫描行中的颜色值相同的相邻像素用一个计数值和那些像素的颜色值来代替 例如 aaabccccccddeee 则可用3a1b6c2d3e来代替 对于拥有大面积 相同颜色区域的图像 用RLE压缩方法非常有效 由RLE原
  • Java多线程:线程8锁案例分析

    线程8锁案例分析 通过分析代码 推测打印结果 并运行代码进行验证 1 两个线程调用同一个对象的两个同步方法 被synchronized修饰的方法 锁的对象是方法的调用者 因为两个方法的调用者是同一个 所以两个方法用的是同一个锁 先调用方法的
  • 编译filament

    从github上下载github 每次都被中断 于是灵机一动 从码云上下载1 8下载 果然速度快 先cmake 发现需要python 把enable java去掉 编译后试试 hellopbr
  • nginx系统学习5--常用配置4--防盗链配置

    6 4 防盗链配置 6 4 1 操作 01 配置 server listen 7000 server name www test com www test1 com root html www test location index tes
  • PTA数组2

    目录 1 方阵对角线元素求和及计数 2 使用选择法升序 3 输入10个正整数到a数组中 对a 10 数组中的素数升序排序 4 对a 10 数组中的素数排序 1 方阵对角线元素求和及计数 对输入的一个N N的方阵 求其两条对角线上的元素之和及
  • Excel根据身份证号提取省份

    身份证号码各位的含义 1 2位省 自治区 直辖市代码 3 4位地级市 盟 自治州代码 5 6位县 县级市 区代码 7 14位出生年月日 比如19670401代表1967年4月1日 15 17位为顺序号 其中17位 倒数第二位 男为单数 女为
  • 关于多线程的爬取心得和用法

    最近爬取一些学习上所要用到的东西 因为要搞得东西比较多 写的爬取的速度大大下降 于是我就尝试提升以下程序爬取的速度 正好学过多线程 灵机一动就搞了个多线程的程序爬取 此次没有啥好说的 有啥不懂请看注释 import queue import
  • (Java 功能篇) Java Proxool

    本文题目没有强调的是Java应用 不是Java Web应用 原因是从网上找有关Proxool的例子 全是一个摸样 都是将讲解Java Web中的应用 真没劲 难道Proxool离开了web就不能用了 不信你可以google一把看看 在阅读了
  • 定时器之编码器模式

    1 什么是编码器 编码器 encoder 是将信号或数据进行编制 转换为可用以通讯 传输和存储的信号形式的设备 编码器把角位移或直线位移转换成电信号 前者称为码盘 后者称为码尺 2 分类 按照读出方式编码器可以分为接触式和非接触式两种 按照
  • [转]IDEA中使用Debug

    一 Debug开篇 首先看下IDEA中Debug模式下的界面 如下是在IDEA中启动Debug模式 进入断点后的界面 我这里是Windows 可能和Mac的图标等会有些不一样 就简单说下图中标注的8个地方 以Debug模式启动服务 左边的一
  • VS环境下,关于“找不到 **.dll,无法执行代码,重新安装程序可能会解决此问题。”的四种解决方案

    dll 是动态链接库文件 里面存储着函数和数据 lib是静态数据连接库文件 存储着函数名和文件位置 也就是说在执行程序时 exe文件可通过lib文件找到dll文件 并执行在程序中调用的函数 Windows在查找dll文件会按照以下几种方式顺
  • 被动遥感和主动遥感的概念辨析

    主动遥感 主动遥感 又称有源遥感 有时也称遥测 指从遥感平台上的人工辐射源 向目标物发射一定形式的电磁波 再由传感器接收和记录其反射波的遥感系统 其主要优点是不依赖太阳辐射 可以昼夜工作 而且可以根据探测目的的不同 主动选择电磁波的波长和发
  • CSS > Flex 布局中的放大和收缩计算

    原文 https dev opera com articles flexbox basics 译者注 本文仅简单翻译下原文中关于如何计算 flex 属性的值的部分 其他有关 Flex 布局的知识本文不作探讨 2015 12 5更新 关于 f
  • pixi.js 导出部分区域裁剪图片

    方案 先通过api到出image对象 在通过canvas绘制图片 在导出数据 代码 const x y this app stage getBounds 超出的x y const stageImage this app renderer p
  • mybatis ---- 级联查询 一对多 (集合映射)

    关联有嵌套查询和嵌套结果两种方式 本文是按照嵌套结果这种方式来说明的 上一章介绍了多对一的关系 用到了
  • 订单管理实现功能

    一 目标以及实现思路 商家 查看订单 发货 订单状态 1未发货 2已发货 3已签收 4已撤单 默认值1 订单项查看 思路 订单表的查询 将订单表的订单状态由1改为2通过订单的id到订单项表查询出对应的订单 买家 查看订单 撤单 签收 思路
  • Cocos2d-x 3.x部署

    这是我第一次写技术文章 这里只是分享一下我的部署经验 请各位参考 谢谢 我简单的写写我的部署步骤 只参考了官方的readme 环境 win7 64位 1 打开控制台cmd 2 进入到cocos2d x引擎目录 键入setup py 键入的前
  • CSAPP Lab4- PerfLab

    代码优化 typedef struct unsigned short red R value unsigned short green G value unsigned short blue B value pixel 图像用一维数组表示
  • 树莓派4B安装64位系统 以及基础配置

    最近在使用一个SDK时发现只提供了linux64版本的 而我用的树莓派系统是32位的 查了一下发现官方有提供64位版本的 开始了重装系统 总结了一下基础配置包括换源 远程连接 中文输入法 摄像头等配置 大纲 准备工具 1 SD卡格式化 2
  • “起床困难综合症”「NOI2014」【题解】

    起床困难综合症 洛谷 题目 题目描述 drd的防御战线由n扇防御门组成 每扇防御门包括一个运算op和一个参数t 其中运算一定是OR XOR AND中的一种 参数则一定为非负整数 如果还未通过防御门时攻击力为x 则其通过这扇防御门后攻击力将变

随机推荐

  • Kafka 安装与部署(单机版)与kafkaDemo调试测试(包含JAVA Demo)

    部署需要的包 http download csdn net download liangmaoxuan 10228805 1 kafka 2 10 0 10 2 0 tar 1 解压kafka 2 10 0 10 2 0安装包 tar xv
  • React生命周期getDerivedStateFromProps&getSnapshotBeforeUpdate

    getDerivedStateFromProps getDerivedStateFromProps nextProps preState nextProps 与componentWillReceiveProps的nextProps参数相同
  • 《Android开发——Android Studio的下载、安装与配置》

    Android开发 Android Studio的下载 安装与配置 一 下载 Android Studio最新的版本有一些BUG 不稳定 推荐安装老版本 在正式安装Android Studio之前 需要安装JDK 同学们可以参考下面这个博客
  • 链路追踪jaeger

    这里的链路指的是客户端向服务发起一个请求 该请求所经过的路线 也可以说是该请求经过的流量 例如 客户端发起一个下订单的请求其流量过程 request gt service gt order web gt order srv gt mysql
  • 《面向对象程序设计》教学资源汇总(2023)

    面向对象程序设计 教学资源汇总 2023 一 教学网站 blog csdn net bigleo 二 课堂派加课码 加课码 M274UN QRCode 三 课件下载 课件下载 长期有效 提取密码 tqucqx 四 本课程课件 二套 及实验
  • Sequence Modeling: Recurrent and Recursive Nets(3)

    CONTENTS Leaky Units and Other Strategies for Multiple Time Scales One way to deal with long term dependencies is to des
  • I2C软件模拟中的IO方向设置问题

    例程 STM32F103系列 I2C软件模拟实验 战舰例程 问题 下面两行关于 IO方向 的代码不太明白 之前一直看的例程都是库函数的代码 突然间冒出来两行寄存器的代码一时间手足无措 define SDA IN GPIOB gt CRL 0
  • Linux安装docker,在docker上安装mysql

    一 linux安装docker 1 下载安装包 下载地址 Index of linux centos 7 x86 64 stable Packages 我用的操作系统是centos7 根据自己操作系统找到相应版本下载 2 上传安装包 我用的
  • Docker学习笔记(三)-编写自己的Dockerfile

    Dockerfile是什么 Dockerfile用于快速创建自定义的Docker镜像 在上一篇博客中我们知道常见的三种创建image的手法 一般情况我们可以通过在基础镜像的基础上通过docker commit的方式生成新的image 但是对
  • IDEAweb项目文件夹没有蓝色小点

    问题原因 idea没有识别web文件夹为一个web项目 解决方案 需要手动选中该moudle 主动add web文件夹即可
  • centos7 kvm 设置桥接网卡br0

    centos kvm 设置桥接网卡br0 一 关于kvm的操作 1 查看CPU是否支持VT egrep vmx svm color always proc cpuinfo 2 检查内核模块是否加载 lsmod grep kvm 3 查看Se
  • Java的垃圾回收机制简述

    Java垃圾回收机制简述 一 由谁来做 Java的垃圾回收是由JVM Java虚拟机 来做的 二 什么时候做 1 CPU空闲的时候 自动进行回收 2 在堆内存存储满了之后 自动进行回收 3 程序调用System gc 主动尝试进行回收 三
  • Docker目录迁移

    问题 系统 根目录空间满 导致docker容器停用 df lh 显示已占用100 而 home目录还有1T空间未用 需求 将docker目录移动到 home目录 docker默认位置 var lib docker 查看Docker目录 do
  • STM32------ADC(模/数转换器)

    文章目录 前言 一 ADC定义 二 模拟信号 三 数字信号 四 模数转换的过程 五 特性 六 硬件电路 1 原理图 2 可调电阻用到的引脚 3 不同的引脚支持的ADC是不一样的 4 存储对齐方式 七 思考题 八 源码下载 总结 前言 STM
  • LeetCode题目笔记——1233. 删除子文件夹,写法妙哉妙哉

    文章目录 题目描述 题目难度 中等 方法一 排序 代码 C 代码 Python 方法二 字典树 总结 题目描述 你是一位系统管理员 手里有一份文件夹列表 folder 你的任务是要删除该列表中的所有 子文件夹 并以 任意顺序 返回剩下的文件
  • 记录“conda添加清华镜像源”问题--查看添加删除

    conda查看添加加删除清华镜像源 一 查看镜像源 二 添加新镜像源 三 删除旧镜像源 四 切回默认源 一 查看镜像源 查看conda镜像源的命令有两个 1 conda info 镜像源显示在channel URLs属性中 2 conda
  • 简述gitee使用及创建仓库及远程连接

    第一步 找到gitee网址 进入 Gitee 基于 Git 的代码托管和研发协作平台 第二步 点击右上角注册按钮 第三步 登录 第四步 点击右上角加号图标 下拉菜单的新建仓库 第五步 新建仓库 取一个仓库名 点击创建按钮 第六步 跳转至新建
  • Javascript制作简易计算器并实现其功能

    使用JS的函数功能 制作一个简易的计算器 包括加 减 乘 除的功能 并使用函数传参的方式完成计算器的功能 输入任意操作数 通过四则运算计算出结果 使用函数传参的方式完成计算器的功能 CSS部分
  • 至少有一位重复数字--动态规划

    leetcode 1012 至少有一位重复的数字 题目描述 给定正整数 N 返回小于等于 N 且具有至少 1 位重复数字的正整数的个数 示例1 输入 20 输出 1 解释 具有至少 1 位重复数字的正数 lt 20 只有 11 示例2 输入
  • python大数据分析代码案例

    查询用户余额代码案例 import sys import MySQLdb import pandas as pd optmap dbuser aduser dbpass 123654 dbhost 192 168 10 14 dbport