有多种方法可以比较 2 个存储库(服务器文件系统和本地文件系统)之间的 .csv 文件。
方法一:使用hashlib
该方法使用Python模块hashlib.我使用哈希算法 sha256 来计算文件的哈希摘要。我将文件的哈希值与确切的文件名进行比较。此方法效果很好,但它会忽略两个目录中不存在的任何文件。
import hashlib
def compare_common_files_by_hash(directory_one, directory_two):
d1_files = set(os.listdir(directory_one))
d2_files = set(os.listdir(directory_two))
common_files = list(d1_files & d2_files)
if common_files:
for filename in common_files:
hash_01 = hashlib.sha256(open(f'{directory_one}/{filename}', 'rb').read()).hexdigest()
hash_02 = hashlib.sha256(open(f'{directory_two}/{filename}', 'rb').read()).hexdigest()
if hash_01 == hash_02:
print(f'The file - {filename} is identical in the directories {directory_one} and {directory_two}')
elif hash_01 != hash_02:
print(f'The file - {filename} is different in the directories {directory_one} and {directory_two}')
方法2:使用os st_size
该方法使用Python模块os.在此示例中,我比较了文件的大小。此方法工作正常,但它会对任何具有任何数据更改且不会更改文件大小的文件进行错误分类。
import os
def compare_common_files_by_size(directory_one, directory_two):
d1_files = set(os.listdir(directory_one))
d2_files = set(os.listdir(directory_two))
common_files = list(d1_files & d2_files)
if common_files:
for filename in common_files:
file_01 = os.stat(f'{directory_one}/{filename}')
file_02 = os.stat(f'{directory_two}/{filename}')
if file_01.st_size == file_02.st_size:
print(f'The file - {filename} is identical in the directories {directory_one} and {directory_two}')
elif file_01.st_size != file_02.st_size:
print(f'The file - {filename} is different in the directories {directory_one} and'
f' {directory_two}')
方法3:使用os st_size和st_mtime
该方法也使用Python模块os.在这个例子中,我不仅比较了文件的大小,还比较了最后的修改时间。此方法效果很好,但会将文件错误地分类为相同。在测试中,我保存了一个没有数据修改的文件,操作系统时间将文件标记为不同,但实际上并没有真正不同。
import os
def compare_common_files_by_metadata(directory_one, directory_two):
d1_files = set(os.listdir(directory_one))
d2_files = set(os.listdir(directory_two))
common_files = list(d1_files & d2_files)
if common_files:
for filename in common_files:
file_01 = os.stat(f'{directory_one}/{filename}')
file_02 = os.stat(f'{directory_two}/{filename}')
if file_01.st_size == file_02.st_size and file_01.st_mtime == file_02.st_mtime:
print(f'The file - {filename} is identical in the directories {directory_one} and {directory_two}')
elif file_01.st_size != file_02.st_size or file_01.st_mtime != file_02.st_mtime:
print(f'The file - {filename} is different in the directories {directory_one} and'
f' {directory_two}')
方法4:使用set()
本示例使用Pythonset()确定两个同名 csv 文件之间的行与行之间的差异。此方法将输出 2 个 csv 文件之间的精确变化。
import os
def compare_common_files_by_lines(directory_one, directory_two):
d1_files = set(os.listdir(directory_one))
d2_files = set(os.listdir(directory_two))
common_files = list(d1_files & d2_files)
if common_files:
for filename in common_files:
if fileName.endswith('.csv'):
file_01 = open(f'{directory_one}/{filename}', 'r', encoding='ISO-8859-1')
file_02 = open(f'{directory_two}/{filename}', 'r', encoding='ISO-8859-1')
csv_file_01 = set(map(tuple, csv.reader(file_01)))
csv_file_02 = set(map(tuple, csv.reader(file_02)))
different = csv_file_01 ^ csv_file_02
for row in sorted(different, key=lambda x: x, reverse=True):
if row:
print(f'This row: \n {row} \n was different between the file {fileName} in the directories'
f' {directory_one} and {directory_two}')
方法5:使用文件cmp.cmp
该方法使用Python模块filecmp.在这个例子中我使用了文件cmp.cmp with shallow set to False。将此参数设置为False指示filecmp查看文件的内容而不是元数据,例如文件大小,这是默认的文件cmp.cmp。此方法与使用的方法 1 一样有效hashlib.
import filecmp
def compare_common_files(directory_one, directory_two):
d1_files = set(os.listdir(directory_one))
d2_files = set(os.listdir(directory_two))
common_files = list(d1_files & d2_files)
if common_files:
for filename in common_files:
file_01 = f'{directory_one}/{filename}'
file_02 = f'{directory_two}/{filename}'
comparison = filecmp.cmp(file_01, file_02, shallow=False)
if comparison:
print(f'The file - {filename} is identical in the directories - {directory_one} and {directory_two}')
elif not comparison:
print(f'The file - {filename} is different in the directories - {directory_one} and {directory_two}')
方法6:使用filecmp.dircmp
该方法也使用Python模块filecmp.在这个例子中我使用了文件cmp.dircmp,这使我不仅可以识别两个目录之间不常见的文件,还可以找到那些名称相似但内容不同的文件。
import filecmp
def directory_recursive(directory_one, directory_two):
files = filecmp.dircmp(directory_one, directory_two)
for filename in files.diff_files:
print(f'The file - {filename} is different in the directories - {files.left} and {files.right}')
for filename in files.left_only:
print(f'The file - {filename} - was only found in the directory {files.left}')
for filename in files.right_only:
print(f'The file - {filename} - was only found in the directory {files.right}')
方法七:逐行比较
此示例对 2 个 csv 文件进行逐行比较,并输出不同的行。输出可以添加到 Python 字典或 JSON 文件中以供辅助。
import csv
def get_csv_file_lines(file):
with open(file, 'r', encoding='utf-8') as csv_file:
rows = csv.reader(csv_file)
for row in rows:
yield row
def compare_csv_files_line_by_line(csv_file_one, csv_file_two):
csvfile_02 = get_csv_file_lines(csv_file_two)
for line_one in get_csv_file_lines(csv_file_one):
line_two = csvfile_02.__next__()
if line_two != line_one:
print('File names being compared:')
print(f'csv_file_one: {csv_file_one}')
print(f'csv_file_two: {csv_file_two}')
print(f'The following rows have difference in the files being compared.')
print('csv_file_one:', line_one)
print('csv_file_two:', line_two)
print('\n')
使用 hashlib 将本地文件系统存储到 S3 存储桶
下面的示例是用于比较本地文件系统和远程 S3 存储桶之间的文件的真实用例。我原本打算使用对象.e_tagAWS S3 创建的标签,但该标签可能存在问题,不应在哈希比较操作中使用。我决定查询 S3 并将单个文件加载到内存文件系统中,该文件系统可以在每次比较操作期间查询和清空。这个方法效果很好,对我的系统性能没有产生不利影响。
import fs
import os
import boto3
import hashlib
def create_temp_memory_filesystem():
mem_fs = fs.open_fs('mem://')
virtual_disk = mem_fs.makedir('hidden_dir')
return mem_fs, virtual_disk
def query_s3_file_by_name(filename, memory_filesystem, temp_directory):
s3 = boto3.resource('s3', aws_access_key_id='your_access_key_id',
aws_secret_access_key='your_secret_access_key')
bucket = s3.Bucket('your_bucket_name')
for obj in bucket.objects.all():
if obj.key == filename:
body = obj.get()['Body'].read()
with memory_filesystem.open(f'{temp_directory}/s3_{filename}', 'w') as f:
f.write(str(body))
f.close()
def compare_local_files_to_s3_files(local_csv_files):
virtual_disk = create_temp_memory_filesystem()
directory_name = str(virtual_disk[1]).split('/')[1]
files = set(os.listdir(local_csv_files))
for filename in files:
if filename.endswith('.csv'):
local_file_hash = hashlib.sha256(open(f'{local_csv_files}/{filename}', 'rb').read()).hexdigest()
query_s3_file_by_name(filename, virtual_disk[0], directory_name)
virtual_files = virtual_disk[0].opendir(directory_name)
for file_name in virtual_files.listdir('/'):
s3_file_hash = hashlib.sha256(open(file_name, 'rb').read()).hexdigest()
if local_file_hash == s3_file_hash:
print(f'The file - {filename} is identical in both the local file system and the S3 bucket.')
elif local_file_hash != s3_file_hash:
print(f'The file - {filename} is different between the local file system and the S3 bucket.')
virtual_files.remove(file_name)
virtual_disk[0].close()
使用 filecmp 将本地文件系统存储到 S3 存储桶
这个例子与上面的例子相同,除了我使用文件cmp.cmp代替hashlib用于比较操作。
import fs
import os
import boto3
import filecmp
def create_temp_memory_filesystem():
mem_fs = fs.open_fs('mem://')
virtual_disk = mem_fs.makedir('hidden_dir')
return mem_fs, virtual_disk
def query_s3_file_by_name(filename, memory_filesystem, temp_directory):
s3 = boto3.resource('s3', aws_access_key_id='your_access_key_id',
aws_secret_access_key='your_secret_access_key')
bucket = s3.Bucket('your_bucket_name')
for obj in bucket.objects.all():
if obj.key == filename:
body = obj.get()['Body'].read()
with memory_filesystem.open(f'{temp_directory}/s3_{filename}', 'w') as f:
f.write(str(body))
f.close()
def compare_local_files_to_s3_files(local_csv_files):
virtual_disk = create_temp_memory_filesystem()
directory_name = str(virtual_disk[1]).split('/')[1]
files = set(os.listdir(local_csv_files))
for filename in files:
if filename.endswith('.csv'):
local_file = f'{local_csv_files}/{filename}'
query_s3_file_by_name(filename, virtual_disk[0], directory_name)
virtual_files = virtual_disk[0].opendir(directory_name)
for file_name in virtual_files.listdir('/'):
comparison = filecmp.cmp(local_file, file_name, shallow=False)
if comparison:
print(f'The file - {filename} is identical in both the local file system and the S3 bucket.')
elif not comparison:
print(f'The file - {filename} is different between the local file system and the S3 bucket.')
virtual_files.remove(file_name)
virtual_disk[0].close()
使用 hashlib 的本地文件系统到 Google Cloud 存储桶
此示例与上面的 S3 hashlib 代码示例类似,但它使用 Google Cloud 存储桶。
import fs
import os
import hashlib
from google.cloud import storage
def create_temp_memory_filesystem():
mem_fs = fs.open_fs('mem://')
virtual_disk = mem_fs.makedir('hidden_dir')
return mem_fs, virtual_disk
def query_google_cloud_storage_file_by_name(filename, memory_filesystem, temp_directory):
client = storage.Client.from_service_account_json('path_to_your_credentials.json')
bucket = client.get_bucket('your_bucket_name')
blobs = bucket.list_blobs()
for blob in blobs:
if blob.name == filename:
with memory_filesystem.open(f'{temp_directory}/{filename}', 'w') as f:
f.write(str(blob.download_to_filename(blob.name)))
f.close()
def compare_local_files_to_google_storage_files(local_csv_files):
virtual_disk = create_temp_memory_filesystem()
directory_name = str(virtual_disk[1]).split('/')[1]
files = set(os.listdir(local_csv_files))
for filename in files:
if filename.endswith('.csv'):
local_file_hash = hashlib.sha256(open(f'{local_csv_files}/{filename}', 'rb').read()).hexdigest()
query_google_cloud_storage_file_by_name(filename, virtual_disk[0], directory_name)
virtual_files = virtual_disk[0].opendir(directory_name)
for file_name in virtual_files.listdir('/'):
gs_file_hash = hashlib.sha256(open(file_name, 'rb').read()).hexdigest()
if local_file_hash == gs_file_hash:
print(f'The file - {filename} is identical in both the local file system and the Google Cloud bucket.')
elif local_file_hash != gs_file_hash:
print(f'The file - {filename} is different between the local file system and the Google Cloud bucket.')
virtual_files.remove(file_name)
virtual_disk[0].close()
使用 filecmp 将本地文件系统存储到 Google Cloud 存储桶
此示例与上面的 S3 filecmp 代码示例类似,但它使用 Google Cloud 存储桶。
import fs
import os
import filecmp
from google.cloud import storage
def create_temp_memory_filesystem():
mem_fs = fs.open_fs('mem://')
virtual_disk = mem_fs.makedir('hidden_dir')
return mem_fs, virtual_disk
def query_google_cloud_storage_file_by_name(filename, memory_filesystem, temp_directory):
client = storage.Client.from_service_account_json('path_to_your_credentials.json')
bucket = client.get_bucket('your_bucket_name')
blobs = bucket.list_blobs()
for blob in blobs:
if blob.name == filename:
with memory_filesystem.open(f'{temp_directory}/{filename}', 'w') as f:
f.write(str(blob.download_to_filename(blob.name)))
f.close()
def compare_local_files_to_google_storage_files(local_csv_files):
virtual_disk = create_temp_memory_filesystem()
directory_name = str(virtual_disk[1]).split('/')[1]
files = set(os.listdir(local_csv_files))
for filename in files:
if filename.endswith('.csv'):
local_file = f'{local_csv_files}/{filename}'
query_google_cloud_storage_file_by_name(filename, virtual_disk[0], directory_name)
virtual_files = virtual_disk[0].opendir(directory_name)
for file_name in virtual_files.listdir('/'):
comparison = filecmp.cmp(local_file, file_name, shallow=False)
if comparison:
print(f'The file - {filename} is identical in both the local file system and the Google Cloud bucket.')
elif not comparison:
print(f'The file - {filename} is different between the local file system and the Google Cloud bucket.')
virtual_files.remove(file_name)
virtual_disk[0].close()