Source code for benten_client.local_storage
# coding:utf-8
"""
local_storage
Copyright (C) 2020 JASRI All Rights Reserved.
"""
import os
import sys
import re
import shutil
import copy
import fnmatch
import collections
from os.path import expanduser
from . import util
from . import config
from . import local_storage_util
from . import rest
[docs]class LocalStorage():
def __init__(self, access_token=None,
current_directory=os.getcwd(),
debug=True):
util.check_workdir()
self.__access_token = access_token
self.__current_directory = current_directory
self.__local_storage_config_file = \
"%s/%s" % (util.workdir(), config.local_storage_config_file)
self.__proposal_number_file = \
"%s/%s" % (util.workdir(), config.proposal_number_file)
self.__repository_info_file = \
"%s/%s" % (util.workdir(), config.repository_info_file)
self.__local_storage_info_file = \
"%s/%s" % (util.workdir(), config.local_storage_info_file)
self.__config_value = self.config_value()
try:
self.__repository_directory = \
util.repository_directory(self.__config_value,
current_directory=self.__current_directory,
flag_print=False)
except:
self.__repository_directory = None
self.__debug = debug
# ... methods
[docs] def init(self):
"""
Init for configuration settings
:return:
"""
util.log("### LocalStorage::init() ###")
# check BENTEN_WORKDIR
benten_workdir = os.getenv(
"BENTEN_WORKDIR", "{}/.benten".format(expanduser("~")))
util.log("BENTEN_WORKDIR = {}".format(benten_workdir))
self.authorize(util.access_token())
parent_basename = os.path.basename(os.getcwd())
# check userinfo
repo = rest.Repository()
repo.authorize(util.access_token())
ret_dict = repo.auth.userinfo.post()
if "error" in ret_dict:
message = "Unauthorized"
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
profile = ret_dict["profile"]
vdict = {}
vdict["facility"] = ret_dict["facility"]
vdict["class_name"] = ret_dict["class_name"]
vdict["disk_name"] = parent_basename
# check configFile
vdict["local_storage_path"] = os.getcwd()
if os.name == "nt":
vdict["local_storage_path"] = os.getcwd().replace("\\", "/")
util.log("### Initialize configurations ###")
try:
vinput = raw_input(
"local_storage_path[default=%s]: " % vdict["local_storage_path"])
except:
vinput = input(
"local_storage_path[default=%s]: " % vdict["local_storage_path"])
if vinput != "":
vdict["local_storage_path"] = vinput
try:
vinput = raw_input("disk_name[default=%s]: " % vdict["disk_name"])
except:
vinput = input("disk_name[default=%s]: " % vdict["disk_name"])
if vinput != "":
vdict["disk_name"] = vinput
util.log("")
util.log("### configurations parameters ###")
util.log("local_storage_path = {}".format(vdict["local_storage_path"]))
util.log("facility = {}".format(vdict["facility"]))
util.log("class_name = {}".format(vdict["class_name"]))
util.log("disk_name = {}".format(vdict["disk_name"]))
util.log("")
try:
vinput = raw_input(
">> please type 'y' or 'yes' to set parameters :")
except:
vinput = input(">> please type 'y' or 'yes' to set parameters :")
if vinput not in ["y", "yes"]:
util.log("==> configuration parameters were NOT updated")
return
debug_tmp = self.__debug
self.__debug = False
for key in ["local_storage_path",
"facility",
"class_name",
"disk_name"]:
v = {}
v["key"] = key
v["value"] = vdict[key]
self.set_config(**v)
util.log("==> configuration parameters were updated")
self.__debug = debug_tmp
# ... methods
[docs] def init_index(self, **v):
"""
Init for local storage settings
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
* *debug* (``bool`` [Option]) : Debugオプション (デフォルトは False)
* *flag_all* (``bool`` [Option]): 全初期化するかどうかのフラグ (デフォルトはFalse)
:return:
"""
debug = self.__debug
if "debug" in v:
debug = v["debug"]
register_basename = None
if "register_basename" in v:
register_basename = v["register_basename"]
util.check_access_token(self.__access_token)
flag_all = v.get("flag_all", False)
repository_directory = util.repository_directory(self.__config_value,
self.__current_directory,
flag_all)
repositoryInfoFile = local_storage_util.RepositoryInfoFile(self.__repository_info_file,
debug=debug)
repositoryInfoFile.output(self.__access_token,
repository_directory,
register_basename)
localStorageInfoFile = local_storage_util.LocalStorageInfoFile(self.__local_storage_info_file,
debug=debug)
localStorageInfoFile.output(self.__config_value,
repository_directory,
repositoryInfoFile.load(),
register_basename)
[docs] def create_meta_system(self, **v):
"""
Create metadata for system
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
flag_replace = v.get("flag_replace", False)
current_directory = self.__current_directory
fname_base = "{}/{}.{}".format(current_directory,
register_basename,
"json")
fname_system = "{}/{}.system.{}".format(current_directory,
register_basename,
"json")
vdict_system = {}
if os.path.exists(fname_system):
vdict_system = util.load_json(fname_system)
# ... create template if there are no hbase json file
if os.path.exists(fname_base) is False:
proposal_number = self.proposal_number()
if proposal_number is not None:
message = "**Warning** No metadata file for {}\n".format(fname_base)
message += " template file is created with proposal_number={}".format(proposal_number)
util.log(message)
vdict = collections.OrderedDict()
vdict[config.metadata_key_proposal_number] = proposal_number
util.out_json(vdict, filename=fname_base)
vdict, count_contact_name = self.metadata_from_file(register_basename, flag_dataset=False, flag_count_contact_name=True)
# ... check if proposal_number defined
proposal_number = vdict.get(config.metadata_key_proposal_number)
if proposal_number is None:
proposal_number = vdict.get(config.metadata_key_proposal_number_alternative)
if proposal_number:
vdict[config.metadata_key_proposal_number] = proposal_number
if proposal_number is None:
message = "undefined proposal_number"
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
# ... get metadata for auto
vdict_input = collections.OrderedDict()
if os.path.exists(fname_system):
try:
vdict_input = util.load_json(fname_system)
except:
message = "illegal data format for %s" % fname_system
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
# ... get metadata for others (remove metadata for auto)
for k in vdict_input:
if k in vdict:
del vdict[k]
# ... get metadata for output
vdict_out = self.metadata_system(
vdict_input, register_basename, proposal_number=proposal_number,
data_input = vdict, count_contact_name=count_contact_name)
flag_updated = False
if vdict_out != vdict_system:
flag_updated = True
util.log("==> output system metadata into {}".format(fname_system))
util.out_json(vdict_out, debug=False, filename=fname_system)
v_ret = collections.OrderedDict()
v_ret["execute_create_meta_system"] = flag_updated
if flag_updated:
v_ret["metadata"] = vdict_out
return v_ret
[docs] def list(self, **v):
"""
List for local storage settings
:param \**v:
See below.
:Keyword Arguments:
* *directory* (``str`` [Option]) : 相対ディレクトリ
* デフォルトでは current_directory をベースにリスト検索を行う。
* *file_list* (``list(str)`` [Option]) : リスト表示を行うファイルリスト
* *register_basename_list* (``list(str)`` [Option]) : リスト表示で指定する登録ベース名リスト
* *mode* (``str`` [Option]) : リスト表示モード
* file, register_name, register_name_only, all から選択する。
* デフォルトでは all が選択される。
* *status* (``str`` [Option]) : リスト表示において指定するステータス
* normal, updated, deleted, noMetadata, ready, revised から選択する。
* *type* (``str`` [Option]) : リスト表示において指定するファイルのタイプ
* file, directory から選択する。
* *flag_full_path* (``bool`` [Option]): 結果でfull path表示を行うかどうかのフラグ
* デフォルトでは False
* *flag_recursive* (``bool`` [Option]): recursive検索するかどううかのフラグ
* デフォルトではFalse
:return:
A dict of mapping keys.
"""
directory = v.get("directory", None)
file_list = v.get("file_list", [])
register_basename_list = []
if "register_basename_list" in v:
register_basename_list = v["register_basename_list"]
mode = v.get("mode", None)
status_val = v.get("status", None)
type_val = v.get("type", None)
flag_full_path = v.get("flag_full_path", False)
flag_recursive = v.get("flag_recursive", False)
repository_directory = self.__repository_directory
if directory is not None:
repository_directory += "/" + directory
manage = self.Manage(repository_directory=repository_directory)
vDict = manage.infoDict(file_list=file_list,
register_basename_list=register_basename_list,
mode=mode,
status_val=status_val,
type_val=type_val,
flag_full_path=flag_full_path,
flag_recursive=flag_recursive)
return vDict
[docs] def upload(self, **v):
"""
Upload files
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
* *flag_not_upload* (``bool`` [Option]) : 指定ファイルをアップロードするかどうかのフラグ
* デフォルト値は False
* *flag_each* (``bool`` [Option]) : 指定ファイルをeach アップロードするかどうかのフラグ
* デフォルト値は False (複数ファイルがあった場合にはallアップロードを行う。)
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
flag_not_upload = v.get("flag_not_upload", False)
flag_each = v.get("flag_each", False)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
register_name = self.register_name(register_basename)
# check metadata in file and db
file_file_dict = self.metadata_file_from_file(register_basename)
file_db_dict = self.metadata_file_from_db(register_basename)
file_metadata_updated_list = []
for f in file_file_dict:
if f not in file_db_dict:
continue
file_db_dict_ref = copy.copy(file_db_dict[f])
if "time" in file_db_dict_ref:
if "time" not in file_file_dict[f]:
del file_db_dict_ref["time"]
if file_file_dict[f] != file_db_dict_ref:
file_metadata_updated_list.append(f)
manage = self.Manage()
vDict = manage.infoDict(register_basename_list=[register_basename])
info_file_list = vDict["file_list"]
file_list = []
if len(info_file_list) > 0:
for v in info_file_list:
fname = v["name"]
fname_repository = "%s/%s" % (
self.__repository_directory, fname)
if (v["type"] == "file" and v["status"] in ["ready", "updated"]) or \
(fname_repository in file_metadata_updated_list):
file_list.append(fname_repository)
else:
# this is necessary if index is not ready just after db creation
file_list = list(file_file_dict.keys())
file_list.sort()
key_list = ["format", "caption", "time"]
file_dict = collections.OrderedDict()
for f in file_list:
file_dict[f] = collections.OrderedDict()
if f in file_file_dict:
for key in file_file_dict[f]:
if key in key_list:
file_dict[f][key] = file_file_dict[f][key]
ret_dict = collections.OrderedDict()
error_list = []
file_list = []
if flag_not_upload in [False, 0]:
repo.set_local_storage_path(self.__config_value.local_storage_path)
repo.set_facility(self.__config_value.facility)
repo.set_class_name(self.__config_value.class_name)
repo.set_disk_name(self.__config_value.disk_name)
if flag_each is False:
v = {}
v["register_name"] = register_name
v["files_encode"] = util.encode_json(file_dict)
ret_dict = repo.register.files.post(**v)
if "file_list" in ret_dict:
file_list.extend(ret_dict["file_list"])
if "error" in ret_dict:
error_list.extend(ret_dict["error"]["errors"])
else:
for f in file_dict:
file_each_dict = {}
file_each_dict[f] = file_dict[f]
v = {}
v["register_name"] = register_name
v["files_encode"] = util.encode_json(file_each_dict)
ret_dict = repo.register.files.post(**v)
if "file_list" in ret_dict:
file_list.extend(ret_dict["file_list"])
if "error" in ret_dict:
error_list.extend(ret_dict["error"]["errors"])
ret_dict["flag_not_upload"] = flag_not_upload
ret_dict["flag_each"] = flag_each
ret_dict["file_list"] = file_list
if len(error_list) > 0:
if "error" in ret_dict:
ret_dict["error"]["errors"].extend(error_list)
else:
ret_dict["error"] = {}
ret_dict["error"]["message"] = error_list[-1]["message"]
ret_dict["error"]["errors"] = error_list
else:
file_list = []
for f in file_dict:
v = {}
v["name"] = f
v.update(file_dict[f])
file_list.append(v)
ret_dict = {}
ret_dict["flag_not_upload"] = flag_not_upload
ret_dict["flag_each"] = flag_each
ret_dict["file_list"] = file_list
return ret_dict
[docs] def upload_with_ftp(self, **v):
"""
Upload files
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
* *flag_not_upload* (``bool`` [Option]) : 指定ファイルをアップロードするかどうかのフラグ
* デフォルト値は False
* *flag_each* (``bool`` [Option]) : 指定ファイルをeach アップロードするかどうかのフラグ
* デフォルト値は False (複数ファイルがあった場合にはallアップロードを行う。)
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
flag_not_upload = v.get("flag_not_upload", False)
flag_each = v.get("flag_each", False)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
register_name = self.register_name(register_basename)
# check metadata in file and db
file_file_dict = self.metadata_file_from_file(register_basename)
file_db_dict = self.metadata_file_from_db(register_basename)
file_metadata_updated_list = []
for f in file_file_dict:
if f not in file_db_dict:
continue
file_db_dict_ref = copy.copy(file_db_dict[f])
if "time" in file_db_dict_ref:
if "time" not in file_file_dict[f]:
del file_db_dict_ref["time"]
if file_file_dict[f] != file_db_dict_ref:
file_metadata_updated_list.append(f)
manage = self.Manage()
vDict = manage.infoDict(register_basename_list=[register_basename])
info_file_list = vDict["file_list"]
file_list = []
if len(info_file_list) > 0:
for v in info_file_list:
fname = v["name"]
fname_repository = "%s/%s" % (
self.__repository_directory, fname)
if (v["type"] == "file" and v["status"] in ["ready", "updated"]) or \
(fname_repository in file_metadata_updated_list):
file_list.append(fname_repository)
else:
# this is necessary if index is not ready just after db creation
file_list = list(file_file_dict.keys())
file_list.sort()
key_list = ["format", "caption", "time"]
file_dict = collections.OrderedDict()
for f in file_list:
file_dict[f] = collections.OrderedDict()
if f in file_file_dict:
for key in file_file_dict[f]:
if key in key_list:
file_dict[f][key] = file_file_dict[f][key]
ret_dict = collections.OrderedDict()
error_list = []
file_list = []
if flag_not_upload in [False, 0]:
repo.set_local_storage_path(self.__config_value.local_storage_path)
repo.set_facility(self.__config_value.facility)
repo.set_class_name(self.__config_value.class_name)
repo.set_disk_name(self.__config_value.disk_name)
if flag_each is False:
v = {}
v["register_name"] = register_name
v["files_encode"] = util.encode_json(file_dict)
ret_dict = repo.register.files.ftp(**v)
if "file_list" in ret_dict:
file_list.extend(ret_dict["file_list"])
if "error" in ret_dict:
error_list.extend(ret_dict["error"]["errors"])
else:
for f in file_dict:
file_each_dict = {}
file_each_dict[f] = file_dict[f]
v = {}
v["register_name"] = register_name
v["files_encode"] = util.encode_json(file_each_dict)
ret_dict = repo.register.files.post(**v)
if "file_list" in ret_dict:
file_list.extend(ret_dict["file_list"])
if "error" in ret_dict:
error_list.extend(ret_dict["error"]["errors"])
ret_dict["flag_not_upload"] = flag_not_upload
ret_dict["flag_each"] = flag_each
ret_dict["file_list"] = file_list
if len(error_list) > 0:
if "error" in ret_dict:
ret_dict["error"]["errors"].extend(error_list)
else:
ret_dict["error"] = {}
ret_dict["error"]["message"] = error_list[-1]["message"]
ret_dict["error"]["errors"] = error_list
else:
file_list = []
for f in file_dict:
v = {}
v["name"] = f
v.update(file_dict[f])
file_list.append(v)
ret_dict = {}
ret_dict["flag_not_upload"] = flag_not_upload
ret_dict["flag_each"] = flag_each
ret_dict["file_list"] = file_list
return ret_dict
[docs] def reset_upload(self, **v):
"""
reset uploaded data for registration
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
# ... get register_name from register_basename
register_name = self.register_name(register_basename)
v = {}
v["register_name"] = register_name
ret_dict = repo.register.reset.post(**v)
ret_dict["register_name"] = register_name
return ret_dict
[docs] def register(self, **v):
"""
Register metadata
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
# ... get register_name from register_basename
register_name = self.register_name(register_basename)
file_list = []
manage = self.Manage()
flag_new = True
# ... check register_name is already registered or not
vDict = manage.infoDict()
info_file_list = vDict["file_list"]
for f in info_file_list:
if f["register_name"] == register_name:
flag_new = False
break
# ... get metadata from file (file list is excluded)
metadata_dict = self.metadata_from_file(register_basename)
if config.metadata_key_directory_list in metadata_dict:
del metadata_dict[config.metadata_key_directory_list]
v = {}
v["mode"] = "replace"
if flag_new is False:
v["mode"] = "update"
v["metadata_encode"] = util.encode_json(metadata_dict)
ret_dict = repo.register.metadata.post(**v)
ret_dict["mode"] = v["mode"]
ret_dict["register_name"] = register_name
return ret_dict
[docs] def restore(self, **v):
"""
Restore data
:param \**v:
See below.
* *register_basename* (``str``) : 登録ベース名
* *disable_hash* (``int`` [Option]) : disable hash check for downloaded file
:return:
A dict of mapping keys.
"""
vdict = collections.OrderedDict()
register_basename = v.get("register_basename", None)
disable_hash = v.get("disable_hash", False)
manage = self.Manage()
v = {}
v["register_basename"] = register_basename
vdict = manage.restore(register_basename,
disable_hash=disable_hash)
return vdict
[docs] def metadata(self, **v):
"""
Get metadata
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
* *mode* (``str`` [Option]) : モード
* db、またはfile から選択する。
* デフォルト値は db
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
mode = v.get("mode", "db")
pattern = v.get("pattern", "all")
register_name = self.register_name(register_basename)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v = {}
v["register_name"] = register_name
v["pattern"] = pattern
ret_dict = collections.OrderedDict()
ret_dict["mode"] = mode
ret_dict["metadata"] = {}
if mode == "db":
v_dict = repo.data.metadata.post(**v)
if ("metadata_list" in v_dict) and len(v_dict["metadata_list"]) > 0:
ret_dict["metadata"] = v_dict["metadata_list"][0]
if "error" in v_dict:
ret_dict["error"] = v_dict["error"]
elif mode == "file":
metadata_dict, count_contact_name = self.metadata_from_file(register_basename, flag_count_contact_name=True)
ret_dict["metadata"] = metadata_dict
ret_dict["create_meta_system"] = True
if config.metadata_key_version_data_register not in metadata_dict:
ret_dict["create_meta_system"] = False
elif metadata_dict[config.metadata_key_version_data_register] != \
config.parameter[config.metadata_key_version_data_register]:
ret_dict["create_meta_system"] = False
elif config.metadata_key_register_name in metadata_dict and \
metadata_dict[config.metadata_key_register_name] != register_name:
ret_dict["create_meta_system"] = False
elif count_contact_name:
ret_dict["create_meta_system"] = False
return ret_dict
[docs] def remove(self, **v):
"""
Remove data
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
manage = self.Manage()
vDict = manage.infoDict(register_basename_list=[register_basename])
info_file_list = vDict["file_list"]
file_list = []
local_storage_directory_list = []
for v in info_file_list:
if v["type"] == "file" and v["status"] == "deleted":
file_list.append("%s/%s" %
(self.__repository_directory, v["name"]))
elif v["type"] == "directory" and v["status"] == "noMetadata":
local_storage_directory_list.append(
"%s/%s" % (self.__repository_directory, v["name"]))
ret_dict = collections.OrderedDict()
if True:
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v = {}
v["file_list"] = file_list
ret_dict_tmp = repo.delete.post(**v)
local_storage_directory_removed_list = []
for d in local_storage_directory_list:
d_storage = d[len(self.__repository_directory) + 1:]
try:
local_storage_directory_removed_list.append(d)
os.rmdir(d_storage)
except:
pass
ret_dict["directory_local_storage_removed_list"] = local_storage_directory_removed_list
for key in ret_dict_tmp:
if key != "register_name_list":
ret_dict[key] = ret_dict_tmp[key]
return ret_dict
[docs] def delete(self, **v):
"""
Delete Data
:param \**v:
See below.
:Keyword Arguments:
* *register_basename* (``str``) : 登録ベース名
* *flag_delete_local_storage_files* (``bool`` [Option]) : local storage ファイル deleteのフラグ
* デフォルト値は False
:return:
A dict of mapping keys.
"""
register_basename = v.get("register_basename", None)
flag_delete_local_storage_files = v.get(
"flag_delete_local_storage_files", False)
register_name = self.register_name(register_basename)
manage = self.Manage()
vDict = manage.infoDict(register_basename_list=[register_basename])
file_list = []
info_file_list = vDict["file_list"]
for f in info_file_list:
if f["type"] != "file":
continue
v = {}
v["name"] = f["name"]
file_list.append(v)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v = {}
if register_name is not None:
v["register_name_list"] = [register_name]
ret_dict = collections.OrderedDict()
if True:
if flag_delete_local_storage_files:
for f in info_file_list:
if f["type"] != "file":
continue
os.remove(f["name"])
for f in info_file_list:
if f["type"] != "directory":
continue
try:
shutil.rmtree(f["name"])
except:
pass
ret_dict = repo.delete.post(**v)
ret_dict["flag_delete_local_storage_files"] = flag_delete_local_storage_files
ret_dict["register_name"] = register_name
if "register_name_list" in ret_dict:
del ret_dict["register_name_list"]
return ret_dict
# ... utilities
[docs] def Manage(self, local_storage_info_file=None,
config_value=None,
repository_directory=None,
access_token=None):
if local_storage_info_file is None:
local_storage_info_file = self.__local_storage_info_file
if config_value is None:
config_value = self.__config_value
if repository_directory is None:
repository_directory = self.__repository_directory
if access_token is None:
access_token = self.__access_token
return local_storage_util.Manage(local_storage_info_file=local_storage_info_file,
config_value=config_value,
repository_directory=repository_directory,
access_token=access_token,
debug=self.__debug)
[docs] def config_value(self):
configFile = local_storage_util.ConfigFile(
self.__local_storage_config_file)
config_val = configFile.load()
return config_val
[docs] def register_name(self, register_basename):
return util.register_name(register_basename, self.__repository_directory)
[docs] def authorize(self, v, debug=None):
if debug is None:
debug = True
else:
debug = self.__debug
self.set_access_token(v)
if debug:
util.log("[LocalStorage::authorize] access_token = {}".format(v))
[docs] def list_config(self):
"""
List ConfigFile for local storage settings
:return:
"""
configFile = local_storage_util.ConfigFile(
self.__local_storage_config_file)
configFile.list()
[docs] def set_config(self, **v):
"""
Set ConfigFile for local storage settings
:param \**v:
See below.
:Keyword Arguments:
* *key* (``str``) : キー名
* *value* (``str``): 値
:return:
"""
key = v.get("key", None)
value = v.get("value", None)
configFile = local_storage_util.ConfigFile(
self.__local_storage_config_file)
configFile.set(key, value, debug=self.__debug)
[docs] def reset_config(self):
"""
Reset ConfigFile for local storage settings
:return:
"""
configFile = local_storage_util.ConfigFile(
self.__local_storage_config_file)
configFile.reset()
[docs] def proposal_number(self):
proposalNumberFile = local_storage_util.ProposalNumberFile(
self.__proposal_number_file)
return proposalNumberFile.proposal_number()
[docs] def list_proposal_number(self):
"""
List proposal_number
:return:
"""
proposalNumberFile = local_storage_util.ProposalNumberFile(
self.__proposal_number_file)
proposalNumberFile.list()
[docs] def set_proposal_number(self, **v):
"""
Set proposal_number
:param \**v:
See below.
:Keyword Arguments:
* *proposal_number* (``str``) : 課題情報番号
:return:
"""
value = v.get("proposal_number", None)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
ret_dict = repo.proposal.post(**v)
vlist = []
if "list" in ret_dict:
vlist = ret_dict["list"]
if value not in vlist:
message = "proposal_number(=%s) is not valid" % value
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
proposalNumberFile = local_storage_util.ProposalNumberFile(
self.__proposal_number_file, debug=self.__debug)
proposalNumberFile.set_proposal_number(value)
[docs] def clear_proposal_number(self):
"""
Clear proposal_number
:return:
"""
proposalNumberFile = local_storage_util.ProposalNumberFile(
self.__proposal_number_file)
proposalNumberFile.clear()
[docs] def set_current_directory(self, v):
self.__current_directory = v
self.__repository_directory = util.repository_directory(self.__config_value,
current_directory=self.__current_directory)
[docs] def metadata_from_file(self, register_basename, flag_dataset=True,
flag_count_contact_name=False):
manage = self.Manage()
metadata_basename = "%s.json" % register_basename
flag_metadata_basename = False
file_list = []
vDict = manage.infoDict()
register_basename_escape = re.escape(register_basename)
info_file_list = vDict["file_list"]
for f in info_file_list:
if f["type"] != "file":
continue
if f["status"] == "deleted":
continue
fname = f["name"]
str_match = "^%s\.(|[^/]+\.)%s$" % (register_basename_escape, "json")
m = re.match(str_match, fname)
if m:
if fname == metadata_basename:
flag_metadata_basename = True
else:
file_list.append(fname)
file_list.sort()
if flag_metadata_basename:
file_list.insert(0, metadata_basename)
metadata_dict = {}
count_contact_name = 0
fname_system = "{}/{}.system.json".format(self.__current_directory, register_basename)
for f in file_list:
fname = "{}/{}".format(self.__current_directory, f)
v = util.load_json(fname)
if (fname != fname_system) and (config.metadata_key_contact_name in v):
count_contact_name += 1
metadata_dict.update(v)
register_name = "%s/%s" % (self.__repository_directory,
register_basename)
facility = self.__config_value.facility
class_name = self.__config_value.class_name
disk_name = self.__config_value.disk_name
if config.metadata_key_register_name not in metadata_dict:
metadata_dict[config.metadata_key_register_name] = register_name
if config.metadata_key_facility not in metadata_dict:
metadata_dict[config.metadata_key_facility] = facility
metadata_dict[config.metadata_key_class_name] = class_name
metadata_dict[config.metadata_key_disk_name] = disk_name
# ... add measurement_directory_list
if flag_dataset:
vdict = self.metadata_files(register_basename)
metadata_dict.update(vdict)
if flag_count_contact_name is False:
return metadata_dict
else:
return metadata_dict, count_contact_name
[docs] def metadata_file_from_file(self, register_basename):
file_file_dict = {}
metadata_dict = self.metadata_files(register_basename)
key_list = ["format", "caption", "time"]
if config.metadata_key_directory_list in metadata_dict:
dlist = metadata_dict[config.metadata_key_directory_list]
for d in dlist:
flist = []
if "file_list" in d:
flist = d["file_list"]
for f in flist:
if "name" not in f:
continue
fname = f["name"]
file_file_dict[fname] = {}
for key in key_list:
if key in f:
file_file_dict[fname][key] = f[key]
return file_file_dict
[docs] def metadata_file_from_db(self, register_basename):
file_db_dict = {}
key_list = ["format", "caption", "time"]
register_name = self.register_name(register_basename)
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v = {}
v["register_name"] = register_name
v_dict = repo.data.metadata.post(**v)
try:
vlist_ref = v_dict["metadata_list"][0][config.metadata_key_directory_list]
except:
vlist_ref = []
for v in vlist_ref:
if "file_list" not in v:
continue
flist = v["file_list"]
for f in flist:
fname = f["name"]
file_db_dict[fname] = {}
for key in key_list:
if key in f:
file_db_dict[fname][key] = f[key]
return file_db_dict
[docs] def metadata_system(self, vdict, register_basename, proposal_number=None,
data_input = {}, count_contact_name=0):
vdict_input = copy.copy(vdict)
if proposal_number is None:
proposal_number = self.proposal_number()
parameter = config.parameter
register_name = "%s/%s" % (self.__repository_directory,
register_basename)
vdict[config.metadata_key_register_name] = register_name
vdict[config.metadata_key_facility] = self.__config_value.facility
vdict[config.metadata_key_class_name] = self.__config_value.class_name
vdict[config.metadata_key_disk_name] = self.__config_value.disk_name
if (config.metadata_key_proposal_number not in vdict) and \
(config.metadata_key_proposal_number not in data_input):
vdict[config.metadata_key_proposal_number] = proposal_number
repo = rest.Repository()
repo.authorize(self.__access_token, debug=False)
v_in = {}
v_in["proposal_number"] = proposal_number
ret_dict = repo.proposal.metadata.post(**v_in)
metadata = ret_dict.get("metadata", {})
found = ret_dict.get("found")
if found is False:
message = "no metadata"
if proposal_number is not None:
message += " for proposal_number=%s" % proposal_number
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
for key in config.proposal_info_key_list:
if key in metadata:
vdict[key] = metadata[key]
# ... not output contact_name if external data exists
if count_contact_name == 0:
for key in metadata:
if key.startswith(config.metadata_key_contact_name_base):
vdict[key] = metadata[key]
else:
for key in vdict:
if key.startswith(config.metadata_key_contact_name_base):
del vdict[key]
# ... set system parameter (force registered)
for key in parameter:
vdict[key] = parameter[key]
# ... remove metadata key for subject_base
key_list = list(vdict.keys())
for key in key_list:
if key.startswith(config.metadata_key_subject_base):
del vdict[key]
return vdict
[docs] def metadata_files(self, register_basename):
register_basename_escape = re.escape(register_basename)
current_directory = self.__current_directory
if os.name == "nt":
current_directory = current_directory.replace("\\", "/")
# ... extract file_caption_list , file_format_list
fname_base = "%s/%s.%s" % (current_directory,
register_basename, "json")
vdict = collections.OrderedDict()
if os.path.exists(fname_base):
try:
vdict = util.load_json(fname_base)
except:
message = "illegal data format for %s" % fname_base
raise util.Error(message, domain=util.error_domain(
__file__, sys._getframe(), self.__class__))
file_caption_list = vdict.get(config.metadata_key_file_caption_list, [])
file_format_list = vdict.get(config.metadata_key_file_format_list, [])
# ...
register_name = self.register_name(register_basename)
parent_directory = os.path.dirname(register_name)
flist = []
files = os.listdir(current_directory)
# .. list files beloing to register_basename
for f in files:
fname = current_directory + "/" + f
if os.path.isfile(fname):
# str_match = "^%s\.(|[^/]+\.)[^/]+$" % (register_basename)
str_match = "^%s\.(|[^/]+\.)[^/]+$" % (register_basename_escape)
m = re.match(str_match, f)
if m:
fname = re.sub("^" + current_directory,
self.__repository_directory, fname)
flist.append(fname)
search_directory = current_directory + "/" + register_basename
for root, dirs, files in os.walk(search_directory):
for f in files:
fname = root + "/" + f
fname = re.sub("^" + current_directory,
self.__repository_directory, fname)
flist.append(fname)
flist = list(set(flist))
flist.sort()
caption_dict = {}
format_dict = {}
if True:
for f in flist:
for feach in file_caption_list:
fmatch = parent_directory + "/" + feach[0]
fcaption = feach[1]
if fnmatch.fnmatch(f, fmatch):
caption_dict[f] = fcaption
else:
benten_client.log("*ERR* invalid format in dataset_file_caption_list")
try:
for f in flist:
for ftest in file_format_list:
fmatch = parent_directory + "/" + ftest[0]
fformat = ftest[1]
if fnmatch.fnmatch(f, fmatch):
format_dict[f] = fformat
except:
benten_client.log("*ERR* invalid format in dataset_file_format_list")
d_dict = collections.OrderedDict()
for f in flist:
parent_directory = os.path.dirname(f)
if parent_directory not in d_dict:
d_dict[parent_directory] = []
d_dict[parent_directory].append(f)
o_dict = {}
o_dict[config.metadata_key_directory_list] = []
for d in d_dict:
v = collections.OrderedDict()
v["name"] = d
v["file_list"] = []
for f in d_dict[d]:
w = collections.OrderedDict()
w["name"] = f
if f in caption_dict:
w["caption"] = caption_dict[f]
if f in format_dict:
w["format"] = format_dict[f]
v["file_list"].append(w)
o_dict[config.metadata_key_directory_list].append(v)
return o_dict