# coding:utf-8
"""
local_storage utilities
Copyright (C) 2020 JASRI All Rights Reserved.
"""
import sys
import os
import re
import copy
import collections
import zipfile
import codecs
from . import util
from . import config
from . import rest
[docs]class ConfigFile():
def __init__(self, filename, flag_replace=False):
self.__filename = filename
flag = False
if flag_replace:
flag = True
elif os.path.isfile(self.__filename) is False:
flag = True
if flag:
util.log("== ConfigFile::init() ==")
v = collections.OrderedDict()
v["local_storage_path"] = None
v["facility"] = None
v["class_name"] = None
v["disk_name"] = None
util.out_yaml(v, filename=self.__filename)
[docs] def list(self):
util.log("== ConfigFile::list() ==")
vdict = util.load_yaml(self.__filename)
util.out_json(vdict)
[docs] def set(self, key, value, debug=True):
if key is None:
message = "key=%s is not set" % key
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
if value is None:
message = "value=%s not set" % value
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
if debug:
util.log("== ConfigFile::set() ==")
util.log("--> {} = {}".format(key, value))
vdict = util.load_yaml(self.__filename)
vdict[key] = value
util.out_yaml(vdict, filename=self.__filename, debug=debug)
[docs] def reset(self):
util.log("== ConfigFile::reset() ==")
v = collections.OrderedDict()
v["local_storage_path"] = None
v["facility"] = None
v["class_name"] = None
v["disk_name"] = None
util.out_yaml(v, filename=self.__filename)
[docs] def load(self):
vdict = util.load_yaml(self.__filename)
keys = " ".join(vdict.keys())
Config = collections.namedtuple("Config", keys)
val = Config(**vdict)
return val
[docs]class ProposalNumberFile():
def __init__(self, filename, flag_replace=False, debug=True):
self.__filename = filename
self.__debug = debug
flag = False
if flag_replace:
flag = True
elif os.path.isfile(self.__filename) is False:
flag = True
if flag:
util.log("== ProposalNumberFile::init() (file={}) ==".format(self.__filename))
v = collections.OrderedDict()
v["proposal_number"] = None
util.out_yaml(v, filename=self.__filename, debug=False)
[docs] def list(self):
util.log("== ProposalNumberFile::list() ==")
vdict = util.load_yaml(self.__filename)
util.out_json(vdict)
[docs] def set_proposal_number(self, value):
if self.__debug:
util.log("== ProposalNumberFile::set_proposal_number() ==")
util.log("--> {}".format(value))
vdict = util.load_yaml(self.__filename)
vdict["proposal_number"] = value
util.out_yaml(vdict, filename=self.__filename, debug=self.__debug)
[docs] def clear(self):
util.log("== ProposalNumberFile::clear() ==")
vdict = util.load_yaml(self.__filename)
vdict["proposal_number"] = None
util.out_yaml(vdict, filename=self.__filename)
[docs] def proposal_number(self):
vdict = util.load_yaml(self.__filename)
val = None
if "proposal_number" in vdict:
val = vdict["proposal_number"]
return val
[docs]class RepositoryInfoFile():
def __init__(self, filename, debug=True):
self.__filename = filename
self.__debug = debug
[docs] def output(self, access_token,
repository_directory,
register_basename=None):
# ... get file_dict under base_directory
repo = rest.Repository()
repo.authorize(access_token, debug=False)
v = {}
if register_basename is not None:
register_name = util.register_name(register_basename,
repository_directory)
v["register_name_list"] = [register_name]
elif repository_directory != "/":
v["directory_list"] = [repository_directory]
v["flag_recursive"] = 1
v["flag_register_name"] = 1
v["flag_own"] = 1
vdict = repo.data.files.post(**v)
file_dict = collections.OrderedDict()
file_list = vdict.get("file_list", [])
for f in file_list:
f["time"] = int(util.mktime(f["time"]))
file_dict[f["name"]] = f
# ... get register_name_dict under base_directory
register_name_dict = collections.OrderedDict()
register_name_list = vdict.get("register_name_list", [])
for r in register_name_list:
register_name_dict[r] = collections.OrderedDict()
# ... check if repository_directory is not under register_name_directory
# if repository_directory is not None:
# for register_name in register_name_dict:
# if repository_directory.find(register_name) >= 0:
# message = "repository_directory=%s should not be under directory for register_name=%s" % \
# (repository_directory, register_name)
# raise util.Error(message, domain=util.error_domain(
# __file__, sys._getframe(), self.__class__))
# ... output results
if self.__debug:
util.log("==> output_repository_info_file: {}".format(self.__filename))
vout_dict = collections.OrderedDict()
vout_dict["time"] = util.strtime_datetime()
if repository_directory is not None:
vout_dict["repository_directory"] = repository_directory
if register_basename is not None:
vout_dict["register_basename"] = register_basename
vout_dict["file_dict"] = file_dict
vout_dict["register_name_dict"] = register_name_dict
util.out_json(vout_dict, debug=False, filename=self.__filename)
[docs] def load(self):
return util.load_json(self.__filename)
[docs]class LocalStorageInfoFile():
def __init__(self, filename, debug=True):
self.__filename = filename
self.__debug = debug
[docs] def output(self, config_value,
repository_directory,
repository_info_dict,
register_basename=None):
local_storage_path = config_value.local_storage_path
repository_top_directory = "/%s/%s/%s" % \
(config_value.facility,
config_value.class_name,
config_value.disk_name)
register_name_input = None
if register_basename is not None:
register_name_input = util.register_name(register_basename,
repository_directory)
# ... extract dict from repository_entry_file
repository_info_file_dict = repository_info_dict["file_dict"]
register_name_dict = repository_info_dict["register_name_dict"]
local_storage_keys = []
# ... define top_directory
top_directory = local_storage_path
if repository_directory != repository_top_directory:
top_directory += repository_directory[len(
repository_top_directory):]
# ... check local storage files under top_directory
# compare with repository_info_file_dict and assign status
# (status=normal, updated, ready, noMetadata, deleted)
directory_dict = collections.defaultdict(
lambda: collections.OrderedDict())
for root, dirs, files in os.walk(top_directory):
root = root.replace("\\","/") # replaced for windows directory)
file_local_storage_dir = root[len(local_storage_path):]
if file_local_storage_dir == "":
file_local_storage_dir = repository_top_directory
else:
file_local_storage_dir = repository_top_directory + file_local_storage_dir
directory_dict[file_local_storage_dir]["status"] = "noMetadata"
flag_repository = False
for f in files:
file_path = "%s/%s" % (root, f)
file_local_storage_path = repository_top_directory + \
"%s/%s" % (root[len(local_storage_path):], f)
if register_name_input is not None:
register_name_input_escape = re.escape(register_name_input)
str_match = "^%s((|\.[^/]+)|\/.+)$" % register_name_input_escape
if not re.match(str_match, file_local_storage_path):
continue
if "file_dict" not in directory_dict[file_local_storage_dir]:
directory_dict[file_local_storage_dir]["file_dict"] = collections.OrderedDict(
)
v = collections.OrderedDict()
v["status"] = "noMetadata"
v["time_storage"] = int(util.mktime_file(file_path))
v["size_storage"] = os.path.getsize(file_path)
# v["hash_storage"] = util.checksum(file_path) # too slow for big data
register_name = None
if file_local_storage_path in repository_info_file_dict:
flag_repository = True
v_rep = repository_info_file_dict[file_local_storage_path]
for key in v_rep:
v[key] = v_rep[key]
register_name = v_rep["register_name"]
# if v["hash_storage"] != v["hash"]:
if (v["size_storage"] != v["size"]) or \
(v["time_storage"] != v["time"]):
v["status"] = "updated"
w = collections.OrderedDict()
w["status"] = "updated"
register_name_dict[register_name][file_local_storage_path] = w
dir_test = file_local_storage_dir
while True:
directory_dict[dir_test]["status"] = "revised"
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
directory_dict["/"]["status"] = "revised"
break
else:
v["status"] = "normal"
if directory_dict[file_local_storage_dir]["status"] not in ["updated", "revised"]:
directory_dict[file_local_storage_dir]["status"] = "normal"
else:
basename = os.path.basename(file_local_storage_path)
dirname = os.path.dirname(file_local_storage_path)
if dirname == "/":
dirname = ""
registrer_name = None
file_test = "%s/%s" % (dirname, basename.split(".")[0])
while True:
if file_test in register_name_dict:
w = collections.OrderedDict()
w["status"] = "ready"
register_name_dict[file_test][file_local_storage_path] = w
register_name = file_test
v["status"] = "ready"
dir_test = file_local_storage_dir
while True:
directory_dict[dir_test]["status"] = "revised"
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
directory_dict["/"]["status"] = "revised"
break
file_test = os.path.dirname(file_test)
if len(file_test) <= 1:
break
if register_name is not None:
v["register_name"] = register_name
directory_dict[file_local_storage_dir]["file_dict"][file_local_storage_path] = v
local_storage_keys.append(file_local_storage_path)
if flag_repository:
dir_test = file_local_storage_dir
while True:
if "status" not in directory_dict[dir_test] or \
directory_dict[dir_test]["status"] not in ["updated", "revised"]:
directory_dict[dir_test]["status"] = "normal"
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
dir_test = "/"
if "status" not in directory_dict[dir_test] or \
directory_dict[dir_test]["status"] not in ["updated", "revised"]:
directory_dict[dir_test]["status"] = "normal"
break
if directory_dict[file_local_storage_dir]["status"] == "noMetadata":
flag_register_name = False
dir_test = file_local_storage_dir
flag_local_storage_dir_split = file_local_storage_dir.split(".")[
0]
if flag_local_storage_dir_split in register_name_dict:
flag_register_name = True
directory_dict[file_local_storage_dir]["register_name"] = flag_local_storage_dir_split
while True:
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
break
if dir_test in register_name_dict:
flag_register_name = True
if flag_register_name:
directory_dict[dir_test]["status"] = "revised"
if "register_name" not in directory_dict[file_local_storage_dir]:
directory_dict[file_local_storage_dir]["register_name"] = dir_test
# ... check deleted files
directory_deleted_set = set()
repository_keys = list(repository_info_file_dict.keys())
repository_set = set(repository_keys)
local_storage_set = set(local_storage_keys)
diff_list = list(repository_set - local_storage_set)
for f in diff_list:
v_rep = repository_info_file_dict[f]
register_name = v_rep["register_name"]
w = collections.OrderedDict()
w["status"] = "deleted"
register_name_dict[register_name][f] = w
v = collections.OrderedDict()
v["status"] = "deleted"
for key in v_rep:
v[key] = v_rep[key]
file_local_storage_dir = os.path.dirname(f)
if "file_dict" not in directory_dict[file_local_storage_dir]:
directory_dict[file_local_storage_dir]["file_dict"] = collections.OrderedDict(
)
directory_dict[file_local_storage_dir]["file_dict"][f] = v
dir_test = os.path.dirname(f)
while True:
dir_check = "%s%s" % (local_storage_path, dir_test)
flag_dir_check = os.path.isdir(dir_check)
if flag_dir_check:
directory_dict[dir_test]["status"] = "revised"
else:
directory_dict[dir_test]["status"] = "deleted"
directory_deleted_set.add(dir_test)
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
directory_dict["/"]["status"] = "revised"
break
# ... check deleted directories
for d in directory_deleted_set:
d_parent = os.path.dirname(d)
if d_parent == "":
d_parent = "/"
if "deleted_directory_list" not in directory_dict[d_parent]:
directory_dict[d_parent]["deleted_directory_list"] = []
directory_dict[d_parent]["deleted_directory_list"].append(d)
# ... check register_name for deleted directories
directory_tmp_dict = copy.copy(directory_dict)
register_name_tmp_dict = copy.copy(register_name_dict)
for register_name in register_name_dict:
if register_name in directory_tmp_dict:
directory_dict[register_name]["register_name"] = register_name
del directory_tmp_dict[register_name]
dir_test = register_name
# upper search
while True:
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
if "/" in directory_tmp_dict:
del directory_tmp_dict["/"]
break
if dir_test in directory_tmp_dict:
del directory_tmp_dict[dir_test]
# lower search
top_directory = local_storage_path + register_name
if os.path.isdir(top_directory) is False:
continue
del register_name_tmp_dict[register_name]
for root, dirs, files in os.walk(top_directory):
root_local_storage = root[len(local_storage_path):]
if root_local_storage == "":
root_local_storage = "/"
for d in dirs:
dir_local_storage = "%s/%s" % (root_local_storage, d)
directory_dict[dir_local_storage]["register_name"] = register_name
del directory_tmp_dict[dir_local_storage]
for reg in register_name_tmp_dict:
reg_escape = re.escape(reg)
for dir in directory_tmp_dict:
str_match = "^%s(|/.+)$" % reg_escape
if re.match(str_match, dir):
if "register_name" not in directory_dict[dir]:
directory_dict[dir]["register_name"] = register_name
del directory_tmp_dict[dir]
break
# ... upper search for directory with noMetadata
for d in directory_tmp_dict:
v = directory_tmp_dict[d]
if v["status"] != "noMetadata":
continue
while True:
if "status" in directory_dict[dir_test] and directory_dict[dir_test]["status"] == "normal":
directory_dict[dir_test]["status"] = "revised"
dir_test = os.path.dirname(dir_test)
if len(dir_test) <= 1:
break
# ... output the result
if self.__debug:
util.log("==> output_local_storage_info_file: {}".format(self.__filename))
vout_dict = collections.OrderedDict()
vout_dict["time"] = util.strtime_datetime()
if repository_directory is not None:
vout_dict["repository_directory"] = repository_directory
if register_basename is not None:
vout_dict["register_basename"] = register_basename
vout_dict["directory_dict"] = collections.OrderedDict(directory_dict)
vout_dict["register_name_dict"] = collections.OrderedDict(
register_name_dict)
util.out_json(vout_dict, debug=False, filename=self.__filename)
[docs] def load(self):
return util.load_json(self.__filename)
[docs]class Manage():
def __init__(self,
local_storage_info_file=None,
config_value=None,
repository_directory=None,
access_token=None,
debug=True):
self.__local_storage_info_file = local_storage_info_file
self.__local_storage_path = config_value.local_storage_path
self.__facility = config_value.facility
self.__class_name = config_value.class_name
self.__disk_name = config_value.disk_name
self.__repository_directory = repository_directory
self.__access_token = access_token
self.__debug = debug
self.__local_storage_info_dict = util.load_json(
self.__local_storage_info_file)
self.__directory_dict = self.__local_storage_info_dict["directory_dict"]
self.__register_name_dict = self.__local_storage_info_dict["register_name_dict"]
self.__repository_top_directory = "/%s/%s/%s" % (
self.__facility, self.__class_name, self.__disk_name)
repository_directory_storage = self.__local_storage_info_dict.get(
"repository_directory", None)
if repository_directory_storage is not None and \
repository_directory.find(repository_directory_storage) < 0:
message = "please execute benten.py init_index under repository_directory=%s)" % \
(repository_directory_storage)
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
[docs] def infoDict(self, file_list=None,
register_basename_list=None,
mode="file",
status_val=None,
type_val=None,
register_name=None,
flag_full_path=False,
flag_recursive=False):
local_storage_directory = self.__local_storage_path + \
self.__repository_directory[len(self.__repository_top_directory):]
register_name_select_list = util.register_name_list(register_basename_list,
self.__repository_directory)
if len(register_name_select_list) > 0:
flag_recursive = True
directoryList = []
fileList = []
for root, dirs, files in os.walk(local_storage_directory):
root_tmp = self.__repository_directory + \
root[len(local_storage_directory):]
root_tmp = root_tmp.replace("\\","/")
if root_tmp == "/":
root_tmp = ""
for d in dirs:
directoryList.append(root_tmp + "/" + d)
for f in files:
fileList.append(root_tmp + "/" + f)
if flag_recursive is False:
break
directory_dict = collections.OrderedDict()
if self.__repository_directory in self.__directory_dict:
directory_dict[self.__repository_directory] = self.__directory_dict[self.__repository_directory]
if flag_recursive:
for d in directoryList:
if d in self.__directory_dict:
directory_dict[d] = self.__directory_dict[d]
directoryDeletedList = []
dList = directory_dict.keys()
while True:
vListAdd = []
for d in dList:
if "deleted_directory_list" in self.__directory_dict[d]:
dList2 = self.__directory_dict[d]["deleted_directory_list"]
directoryDeletedList.extend(dList2)
vListAdd.extend(dList2)
dList = vListAdd
if len(vListAdd) == 0:
break
if flag_recursive is False:
break
file_dict_ref = {}
for d in directory_dict:
if "file_dict" in directory_dict[d]:
file_dict_ref.update(directory_dict[d]["file_dict"])
if flag_recursive:
for d in directoryDeletedList:
if "file_dict" in self.__directory_dict[d]:
file_dict_ref.update(self.__directory_dict[d]["file_dict"])
fileDeletedList = []
for f in file_dict_ref:
if file_dict_ref[f]["status"] == "deleted":
fileDeletedList.append(f)
vList = []
if (type_val != "file") and (len(directoryList) > 0):
for d in directoryList:
sub_directory_dict = self.__directory_dict.get(d, {})
status = sub_directory_dict.get("status", "undefined")
if (status_val not in [None, "revised"] and status != status_val) or \
(status_val == "revised" and status == "normal"):
continue
register_name = sub_directory_dict.get("register_name", None)
if len(register_name_select_list) != 0 and register_name not in register_name_select_list:
continue
v = collections.OrderedDict()
v["name"] = util.reference_path(
d, self.__repository_directory, flag_full_path)
v["status"] = status
v["type"] = "directory"
v["register_name"] = register_name
vList.append(v)
if (type_val != "file") and (len(directoryDeletedList) > 0):
for d in directoryDeletedList:
sub_directory_dict = self.__directory_dict.get(d, {})
status = "deleted"
if (status_val not in [None, "revised"] and status != status_val) or \
(status_val == "revised" and status == "normal"):
continue
register_name = sub_directory_dict.get("register_name", None)
if len(register_name_select_list) != 0 and register_name not in register_name_select_list:
continue
v = collections.OrderedDict()
v["name"] = util.reference_path(
d, self.__repository_directory, flag_full_path)
v["status"] = status
v["type"] = "directory"
v["register_name"] = register_name
vList.append(v)
if (type_val != "directory") and (len(fileList) > 0):
for f in fileList:
status = "undefined"
register_name = None
if f in file_dict_ref:
fdict = file_dict_ref[f]
status = fdict["status"]
register_name = fdict.get("register_name", None)
if (status_val not in [None, "revised"] and status != status_val) or \
(status_val == "revised" and status == "normal"):
continue
if len(register_name_select_list) != 0 and register_name not in register_name_select_list:
continue
v = collections.OrderedDict()
v["name"] = util.reference_path(
f, self.__repository_directory, flag_full_path)
v["status"] = status
v["type"] = "file"
v["register_name"] = register_name
vList.append(v)
if (type_val != "directory") and (len(fileDeletedList) > 0):
for f in fileDeletedList:
fdict = file_dict_ref[f]
status = "deleted"
if (status_val not in [None, "revised"] and status != status_val) or \
(status_val == "revised" and status == "normal"):
continue
register_name = None
if "register_name" in fdict:
register_name = fdict["register_name"]
if len(register_name_select_list) != 0 and register_name not in register_name_select_list:
continue
v = collections.OrderedDict()
v["name"] = util.reference_path(
f, self.__repository_directory, flag_full_path)
v["status"] = status
v["type"] = "file"
v["register_name"] = register_name
vList.append(v)
# ... construct dict for output
vDict = collections.OrderedDict()
vDict["repository_directory"] = self.__repository_directory
if register_name_select_list is not None:
vDict["register_name_select_list"] = register_name_select_list
vDict["mode"] = mode
if status_val is not None:
vDict["status"] = status_val
if type_val is not None:
vDict["type"] = type_val
vDict["full_path"] = flag_full_path
vDict["recursive"] = flag_recursive
file_out_list = []
if (file_list is None) or (len(file_list) == 0):
vList_sel = vList
vList_sel = sorted(vList_sel, key=lambda x: x["name"])
file_out_list = vList_sel
else:
vDict_each = None
count_all = len(file_list)
count = 0
for v in vList:
if v["name"] in file_list:
vDict_each = v
file_out_list.append(vDict_each)
count += 1
if count == count_all:
break
if mode in ["file", "all"]:
vDict["file_list"] = file_out_list
if mode in ["register_name", "register_name_only", "all"]:
register_name_dict = {}
for f in file_out_list:
register_name = f["register_name"]
if register_name is None:
continue
status = "normal"
if register_name in register_name_dict:
status = register_name_dict[register_name]["status"]
else:
register_name_dict[register_name] = collections.OrderedDict(
)
register_name_dict[register_name]["name"] = register_name
register_name_dict[register_name]["status"] = status
if mode != "register_name_only":
register_name_dict[register_name]["file_list"] = []
if f["status"] != "normal":
status = "revised"
register_name_dict[register_name]["status"] = status
if mode != "register_name_only":
register_name_dict[register_name]["file_list"].append(f)
register_name_list = []
keyList = list(register_name_dict.keys())
keyList.sort()
for key in keyList:
register_name_list.append(register_name_dict[key])
vDict["register_name_list"] = register_name_list
return vDict
[docs] def restore(self, register_basename,
disable_hash=False):
register_name_select_list = util.register_name_list([register_basename],
self.__repository_directory)
register_name_undefined_list = []
for reg in register_name_select_list:
if reg not in self.__register_name_dict:
register_name_undefined_list.append(reg)
if len(register_name_undefined_list) > 0:
message = "undefined register_name_list for %s" % (
str(register_name_undefined_list))
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
if len(register_name_select_list) == 0:
message = "no input for register_name_list"
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v = {}
v["register_name_list"] = register_name_select_list
v["flag_own"] = 1
ret_dict = repo.data.files.post(**v)
register_name = register_name_select_list[0]
register_name_dir = os.path.dirname(register_name)
out_dir = "%s%s" % (self.__local_storage_path,
register_name_dir[len(self.__repository_top_directory):])
file_list = ret_dict.get("file_list",[])
file_list_restored = []
for f in file_list:
v = {}
v["file"] = f["name"]
v["flag_subdirectory"] =1
ret_dict = repo.download.file.post(
v,out_directory=out_dir, flag_path=False, flag_subdirectory=True,
disable_hash=disable_hash)
if ret_dict is not None:
util.log("==> response")
util_client.out_json(ret_dict)
file_list_restored.append(f)
print("==> restore completed for register_name={}".format(register_name))
ret = {}
ret["register_name"] = register_name
ret["file_list_restored"] = file_list_restored
return ret