深度学习病理 针对精确注释病理数据集的分类方法
预处理部分
一、首先获取病理图片的标注信息(精确注释标签)
利用ASAP对病理图片进行标注,会生成xml文件,可以先将xml文件转换成json文件(为什么要将xml文档转换成json:json是一种理想的数据交换语言,易与人的阅读和编写,同时也易于机器的解析,而mxl文档就是普通的文档,并不适合计算机去解析),代码’camelyon16xml2json.py’
import sys
import os
import argparse
import logging
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../../')
from annotation import Formatter # noqa
parser = argparse.ArgumentParser(description='Convert Camelyon16 xml format to'
'internal json format')
parser.add_argument('--xml_path', default=None, metavar='XML_PATH', type=str,
help='Path to the input Camelyon16 xml annotation')
"""parser.add_argument('--json_path', default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/json', metavar='JSON_PATH', type=str,
help='Path to the output annotation in json format')"""
'''def run(args):
Formatter.camelyon16xml2json(args.xml_path, out_json)'''
def main():
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
for xml in os.listdir(args.xml_path):
xml_wholepath = os.path.join(args.xml_path,xml)
(xml_path, xml_name_ext) = os.path.split(xml_wholepath)#分离文件路径和文件名
(xml_name,extension) = os.path.splitext(xml_name_ext)#分离文件名和后缀
out_json = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/json/%s.json'%xml_name
Formatter.camelyon16xml2json(xml_wholepath, out_json)
if __name__ == '__main__':
main()
import json
import xml.etree.ElementTree as ET
import copy
import numpy as np
from skimage.measure import points_in_poly
np.random.seed(0)
class Polygon(object):
"""
Polygon represented as [N, 2] array of vertices
"""
def __init__(self, name, vertices):
"""
Initialize the polygon.
Arguments:
name: string, name of the polygon
vertices: [N, 2] 2D numpy array of int
"""
self._name = name
self._vertices = vertices
def __str__(self):
return self._name
def inside(self, coord):
"""
Determine if a given coordinate is inside the polygon or not.
Arguments:
coord: 2 element tuple of int, e.g. (x, y)
Returns:
bool, if the coord is inside the polygon.
"""
return points_in_poly([coord], self._vertices)[0]
def vertices(self):
return np.array(self._vertices)
class Annotation(object):
"""
Annotation about the regions within WSI in terms of vertices of polygons.
"""
def __init__(self):
self._json_path = ''
self._polygons_positive = []
self._polygons_negative = []
def __str__(self):
return self._json_path
def from_json(self, json_path):
"""
Initialize the annotation from a json file.
Arguments:
json_path: string, path to the json annotation.
"""
self._json_path = json_path
with open(json_path) as f:
annotations_json = json.load(f)
for annotation in annotations_json['positive']:
name = annotation['name']
vertices = np.array(annotation['vertices'])
polygon = Polygon(name, vertices)
self._polygons_positive.append(polygon)
for annotation in annotations_json['negative']:
name = annotation['name']
vertices = np.array(annotation['vertices'])
polygon = Polygon(name, vertices)
self._polygons_negative.append(polygon)
def inside_polygons(self, coord, is_positive):
"""
Determine if a given coordinate is inside the positive/negative
polygons of the annotation.
Arguments:
coord: 2 element tuple of int, e.g. (x, y)
is_positive: bool, inside positive or negative polygons.
Returns:
bool, if the coord is inside the positive/negative polygons of the
annotation.
"""
if is_positive:
polygons = copy.deepcopy(self._polygons_positive)
else:
polygons = copy.deepcopy(self._polygons_negative)
for polygon in polygons:
if polygon.inside(coord):
return True
return False
def polygon_vertices(self, is_positive):
"""
Return the polygon represented as [N, 2] array of vertices
Arguments:
is_positive: bool, return positive or negative polygons.
Returns:
[N, 2] 2D array of int
"""
if is_positive:
return list(map(lambda x: x.vertices(), self._polygons_positive))
else:
return list(map(lambda x: x.vertices(), self._polygons_negative))
class Formatter(object):
"""
Format converter e.g. CAMELYON16 to internal json
"""
def camelyon16xml2json(inxml, outjson):
"""
Convert an annotation of camelyon16 xml format into a json format.
Arguments:
inxml: string, path to the input camelyon16 xml format
outjson: string, path to the output json format
"""
root = ET.parse(inxml).getroot()
annotations_tumor = \
root.findall('./Annotations/Annotation[@PartOfGroup="Tumor"]')
annotations_0 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_0"]')
annotations_1 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_1"]')
annotations_2 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_2"]')
annotations_positive = \
annotations_tumor + annotations_0 + annotations_1
annotations_negative = annotations_2
json_dict = {}
json_dict['positive'] = []
json_dict['negative'] = []
for annotation in annotations_positive:
X = list(map(lambda x: float(x.get('X')),
annotation.findall('./Coordinates/Coordinate')))
Y = list(map(lambda x: float(x.get('Y')),
annotation.findall('./Coordinates/Coordinate')))
vertices = np.round([X, Y]).astype(int).transpose().tolist()
name = annotation.attrib['Name']
json_dict['positive'].append({'name': name, 'vertices': vertices})
for annotation in annotations_negative:
X = list(map(lambda x: float(x.get('X')),
annotation.findall('./Coordinates/Coordinate')))
Y = list(map(lambda x: float(x.get('Y')),
annotation.findall('./Coordinates/Coordinate')))
vertices = np.round([X, Y]).astype(int).transpose().tolist()
name = annotation.attrib['Name']
json_dict['negative'].append({'name': name, 'vertices': vertices})
with open(outjson, 'w') as f:
json.dump(json_dict, f, indent=1)
def json2camelyon16xml(self, dict, xml_path, group_color):
group = ["_" + str(i) for i in range(len(group_color))]
group_keys = dict.keys()
assert len(group_keys) == len(group_color)
# root and its two sub element
root = ET.Element('ASAP_Annotations')
sub_01 = ET.SubElement(root, "Annotations")
sub_02 = ET.SubElement(root, "AnnotationGroups")
# part of group. e.g. 2 color -- 2 part
self.partofgroup(sub_02, group_color)
# for vertices
for i, key in enumerate(group_keys):
group_ = group[i]
cor_ = group_color[i]
self.plot_area(sub_01, dict[key], group_, cor_)
tree = ET.ElementTree(root)
tree.write(xml_path)
def partofgroup(self, father_node, group_color):
cor = group_color
for i in range(len(group_color)):
title = ET.SubElement(father_node, "Group")
title.attrib = {"Color": cor[i], "PartOfGroup": "None",
"Name": "_" + str(i)}
ET.SubElement(title, "Attributes")
def plot_area(self, father_node, all_area, group_, cor_):
for i in range(len(all_area)):
# print(all_area)
dict_ = all_area[i]
title = ET.SubElement(father_node, "Annotation")
title.attrib = {"Color": cor_, "PartOfGroup": group_,
"Type": "Polygon", "Name": "_"+str(i)}
coordinates = ET.SubElement(title, "Coordinates")
dict_point = dict_["vertices"] # all vertices of the i area
for j in range(len(dict_point)):
X = dict_point[j][0]
Y = dict_point[j][1]
coordinate = ET.SubElement(coordinates, "Coordinate")
coordinate.attrib = {"Y": str(Y), "X": str(X), "Order": str(j)}
此时已经完成xml文档转换成json文档
二、获取补丁图像(patchs)
- 首先获取wsi的组织部分,即tissue_mask
import sys
import os
import argparse
import logging
import numpy as np
import openslide
from skimage.color import rgb2hsv
from skimage.filters import threshold_otsu
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../')
parser = argparse.ArgumentParser(description='Get tissue mask of WSI and save'
' it in npy format')
'''parser.add_argument('--wsi_path', default='/home/omnisky/tmp/quanhao/NCRF/NCRFtest/WSI-TRAIN/%s'%dataclass, metavar='WSI_PATH', type=str,
help='Path to the WSI file')'''
'''parser.add_argument('--npy_path', default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tissue_np/tumor_084.npy', metavar='NPY_PATH', type=str,
help='Path to the output npy mask file')'''
parser.add_argument('--level', default=6, type=int, help='at which WSI level'
' to obtain the mask, default 6')
parser.add_argument('--RGB_min', default=50, type=int, help='min value for RGB'
' channel, default 50')
def run(args,wsi_wholepath,out_tissue_mark):
logging.basicConfig(level=logging.INFO)
slide = openslide.OpenSlide(wsi_wholepath)
# note the shape of img_RGB is the transpose of slide.level_dimensions
img_RGB = np.transpose(np.array(slide.read_region((0, 0),
args.level,
slide.level_dimensions[args.level]).convert('RGB')),
axes=[1, 0, 2])
img_HSV = rgb2hsv(img_RGB)
background_R = img_RGB[:, :, 0] > threshold_otsu(img_RGB[:, :, 0])
background_G = img_RGB[:, :, 1] > threshold_otsu(img_RGB[:, :, 1])
background_B = img_RGB[:, :, 2] > threshold_otsu(img_RGB[:, :, 2])
tissue_RGB = np.logical_not(background_R & background_G & background_B)
tissue_S = img_HSV[:, :, 1] > threshold_otsu(img_HSV[:, :, 1])
min_R = img_RGB[:, :, 0] > args.RGB_min
min_G = img_RGB[:, :, 1] > args.RGB_min
min_B = img_RGB[:, :, 2] > args.RGB_min
tissue_mask = tissue_S & tissue_RGB & min_R & min_G & min_B
np.save(out_tissue_mark, tissue_mask)
def main():
args = parser.parse_args()
dataclass = "normal"
wsi_path = '/home/omnisky/tmp/quanhao/NCRF/NCRFtest/WSI-TRAIN/%s'%dataclass
for wsi in os.listdir(wsi_path):
wsi_wholepath = os.path.join(wsi_path,wsi)
(wsi_path, wsi_extname) = os.path.split(wsi_wholepath)
(wsi_name,extension) = os.path.splitext(wsi_extname)
out_tissue_mark = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tissue_np/%s.npy'%wsi_name
run(args,wsi_wholepath,out_tissue_mark)
print("finish,{}\n".format(wsi_name))
if __name__ == '__main__':
main()
- 通过精确注释获取组织部分中的肿瘤部分,即tumor_mask
import os
import sys
import logging
import argparse
import numpy as np
import openslide
import cv2
import json
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../')
parser = argparse.ArgumentParser(description='Get tumor mask of tumor-WSI and ''save it in npy format')
'''parser.add_argument('--wsi_path', default='/home/omnisky/tmp/quanhao/NCRF/NCRFtest/WSI-TRAIN', metavar='WSI_PATH', type=str,
help='Path to the WSI file')'''
'''parser.add_argument('--json_path', default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/json', metavar='JSON_PATH', type=str,
help='Path to the JSON file')'''
'''parser.add_argument('--npy_path', default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tumor_np/Tumor_084_tumor.npy', metavar='NPY_PATH', type=str,
help='Path to the output npy mask file')'''
parser.add_argument('--level', default=6, type=int, help='at which WSI level'
' to obtain the mask, default 6')
def run(args,wsi_wholepath,json_path,out_tumor_mask):
# get the level * dimensions e.g. tumor0.tif level 6 shape (1589, 7514)
slide = openslide.OpenSlide(wsi_wholepath)
w, h = slide.level_dimensions[args.level]
mask_tumor = np.zeros((h, w)) # the init mask, and all the value is 0
# get the factor of level * e.g. level 6 is 2^6
factor = slide.level_downsamples[args.level]
with open(json_path) as f:
dicts = json.load(f)
tumor_polygons = dicts['positive']
for tumor_polygon in tumor_polygons:
# plot a polygon
name = tumor_polygon["name"]
print('name:',name)
vertices = np.array(tumor_polygon["vertices"]) / factor
vertices = vertices.astype(np.int32)
cv2.fillPoly(mask_tumor, [vertices], (255))
mask_tumor = mask_tumor[:] > 127
mask_tumor = np.transpose(mask_tumor)
np.save(out_tumor_mask, mask_tumor)
def main():
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
dataclass ="tumor"
wsi_path = '/home/omnisky/tmp/quanhao/NCRF/NCRFtest/WSI-TRAIN/%s'%dataclass
for tumor in os.listdir(wsi_path):
wsi_wholepath = os.path.join(wsi_path,tumor)
(wsi_path,wsi_extname) = os.path.split(wsi_wholepath)
(wsi_name,extension) = os.path.splitext(wsi_extname)
json_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/json/%s.json'%wsi_name
out_tumor_mask ='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tumor_np/%s.npy'%wsi_name
run(args,wsi_wholepath,json_path,out_tumor_mask)
print("finish,{}\n".format(wsi_name))
if __name__ == "__main__":
main()
- 组织部分的剩余部分就是normal_mask
import sys
import os
import argparse
import logging
import numpy as np
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
parser = argparse.ArgumentParser(description="Get the normal region"
" from tumor WSI ")
'''parser.add_argument("--tumor_path", default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tumor_np', metavar='TUMOR_PATH', type=str,
help="Path to the tumor mask npy")
parser.add_argument("--tissue_path", default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tissue_np', metavar='TISSUE_PATH', type=str,
help="Path to the tissue mask npy")
parser.add_argument("--normal_path", default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/non_tumor_np', metavar='NORMAL_PATCH', type=str,
help="Path to the output normal region from tumor WSI npy")'''
def run(wsi_np_whole_path,tissue_path,normal_path):
tumor_mask = np.load(wsi_np_whole_path)
tissue_mask = np.load(tissue_path)
normal_mask = tissue_mask & (~ tumor_mask)
np.save(normal_path, normal_mask)
def main():
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
tumor_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tumor_np'
for wsi_np_extname in os.listdir(tumor_path):
wsi_np_whole_path = os.path.join(tumor_path,wsi_np_extname)
(wsi_np_path,wsi_np_extname) = os.path.split(wsi_np_whole_path)
(wsi_np_name,extension) = os.path.splitext(wsi_np_extname)#wsi_np_name is name
tissue_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tissue_np/%s.npy' % wsi_np_name
wsi_np_name = 'non_'+wsi_np_name
normal_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/non_tumor_np/%s.npy'%wsi_np_name
run(wsi_np_whole_path,tissue_path,normal_path)
print("finish,{}\n".format(wsi_np_name))
if __name__ == "__main__":
main()
- 在tumor_mask和normal_mask部分随机获取patch中心坐标点(我选择1000),利用这些坐标点随机生成1000个patchs(这样在一定程度上可以解决肿瘤和正常组织数据不均衡的问题)
import os
import sys
import logging
import argparse
import numpy as np
sys.path.append(os.path.join(os.path.abspath(__file__), "/../../"))
parser = argparse.ArgumentParser(description="Get center points of patches "
"from mask")
'''parser.add_argument("--mask_path", default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/tumor_np/Tumor_084_tumor.npy', metavar="MASK_PATH", type=str,
help="Path to the mask npy file")
parser.add_argument("--txt_path", default='/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/coord/train_spot.txt', metavar="TXT_PATH", type=str,
help="Path to the txt file")'''
parser.add_argument("--patch_number", default=1000, metavar="PATCH_NUMB", type=int,
help="The number of patches extracted from WSI")
parser.add_argument("--level", default=6, metavar="LEVEL", type=int,
help="Bool format, whether or not")
class patch_point_in_mask_gen(object):
'''
extract centre point from mask
inputs: mask path, centre point number
outputs: centre point
'''
def __init__(self, mask_path, number):
self.mask_path = mask_path
self.number = number
def get_patch_point(self):
mask_tissue = np.load(self.mask_path)
X_idcs, Y_idcs = np.where(mask_tissue)
centre_points = np.stack(np.vstack((X_idcs.T, Y_idcs.T)), axis=1)
if centre_points.shape[0] > self.number:
sampled_points = centre_points[np.random.randint(centre_points.shape[0],
size=self.number), :]
else:
sampled_points = centre_points
return sampled_points
def run(args,tumor_np_whole_path,coord_path):
sampled_points = patch_point_in_mask_gen(tumor_np_whole_path, args.patch_number).get_patch_point()
sampled_points = (sampled_points * 2 ** args.level).astype(np.int32) # make sure the factor
mask_name = os.path.split(tumor_np_whole_path)[-1].split(".")[0]
name = np.full((sampled_points.shape[0], 1), mask_name)
center_points = np.hstack((name, sampled_points))
txt_path = coord_path
with open(txt_path, "w+") as f:
np.savetxt(f, center_points, fmt="%s", delimiter=",")
def main():
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
dataclass = 'normal_np'
wsi_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/%s'%dataclass
for wsi_np_name in os.listdir(wsi_path):
wsi_np_whole_path = os.path.join(wsi_path,wsi_np_name)
(wsi_path,wsi_np_extname) = os.path.split(wsi_np_whole_path)
(wsi_name,extension) = os.path.splitext(wsi_np_extname)
coord_name = "coord_"+wsi_name
coord_path = '/home/omnisky/tmp/quanhao/ilikewind/camelyon16/preprocessing/coord/%s.txt'%coord_name
run(args,wsi_np_whole_path,coord_path)
print('finish,{}'.format(coord_name))
if __name__ == "__main__":
main()