# coding:utf-8
"""
REST API for registration
Copyright (C) 2020 JASRI All Rights Reserved.
"""
import json
#from requests_toolbelt import MultipartEncoder
from requests_toolbelt.multipart.encoder import MultipartEncoderMonitor
import requests
import os
import codecs
import io
import uuid
import socket
import time
try:
from StringIO import StringIO # for Python 2
except ImportError:
from io import StringIO # for Python 3
try:
from BytesIO import BytesIO # for Python 2
except ImportError:
from io import BytesIO # for Python 3
import hashlib
import ftplib
from ftplib import FTP_TLS
FTP_TLS.encoding = "utf-8"
from . import config
from . import util
try:
from . import rest
except:
import rest
g_stored_bytes_read = 0
[docs]class Main():
def __init__(self, parent):
self.__parent = parent
self.files = Files(self)
self.check = Check(self)
self.reset = Reset(self)
self.metadata = Metadata(self)
[docs] def endpoint(self, path):
return self.__parent.endpoint(path)
[docs] def access_token(self):
return self.__parent.access_token()
[docs] def local_storage_path(self):
return self.__parent.local_storage_path()
[docs] def facility(self):
return self.__parent.facility()
[docs] def class_name(self):
return self.__parent.class_name()
[docs] def disk_name(self):
return self.__parent.disk_name()
[docs]class Files(): # post files with streaming mode
def __init__(self, parent):
self.__parent = parent
self.__endpoint = parent.endpoint(config.register_files_path)
[docs] def post(self, **v):
"""
REST API: ファイルアップロード
* Endpoint : [agent-url]/benten/v1/register/files
* Method : POST
* Response : JSON
* Authorization : 要
:param \**v:
See below.
:Keyword Arguments:
* *register_name* (``string``) : 登録名
* *files_encode* (``str``) : 登録ファイル情報
* json形式でencode要
:return:
A dict of mapping keys.
:Keyword:
* *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト
* *error* (``dict``) : エラー情報 (エラー発生時のみ付加)
"""
global g_stored_bytes_read
def upload_monitor(monitor):
global g_stored_bytes_read
threshold_size = 1048576 * 100 # 100 Mbyte
bytes_read = monitor.bytes_read
diff = bytes_read - g_stored_bytes_read
if diff > threshold_size:
g_stored_bytes_read = bytes_read
total = int(monitor.len/1048576)
rate = int(100.*(float(bytes_read)/float(monitor.len)))
util.log("[upload_monitor] total={} MB, {}% uploaded".format(
total, rate), flush=True)
vdata = {}
for key in ["register_name", "files_encode"]:
vdata[key] = v[key]
local_storage_path = self.__parent.local_storage_path()
facility = self.__parent.facility()
class_name = self.__parent.class_name()
disk_name = self.__parent.disk_name()
repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name)
files_input = util.decode_json(vdata["files_encode"])
files_input = util.add_info_file_dict(
files_input, local_storage_path, facility, class_name, disk_name)
vdata["files_encode"] = util.encode_json(files_input)
# ... adjust file time with registered time in metadata
# (filename should not be one of ["register_name", "file_encode"])
file_list = []
for f in files_input:
fname_storage = "%s/%s" % (local_storage_path,
f[len(repository_top_dir) + 1:])
str_utime = files_input[f]["time"]
mtime = util.mktime(str_utime)
atime = os.stat(fname_storage).st_mtime
# os.utime(fname_storage, times=(atime, mtime)) # <- not work with python2.7
os.utime(fname_storage, (atime, mtime))
fdata = codecs.open(fname_storage, "rb")
vdata[f] = (f, fdata, "application/octet-stream")
file_list.append(f)
# m = MultipartEncoder(fields=vdata)
g_stored_bytes_read = 0
threshold_size = 1048576 * 100 # 100 Mbyte
m = MultipartEncoderMonitor.from_fields(fields=vdata,
callback=upload_monitor)
headers = util.headers_authorization(self.__parent.access_token())
headers["Content-Type"] = m.content_type
headers["Content-Length"] = str(m.len)
total_size = m.len
if m.len > config.max_register_file_length:
util.log("**WARN** uploaded file size({}) excced the limit({})".format(m.len,
config.max_register_file_length))
if m.len > threshold_size:
util.log("[upload_monitor] for file_list={}".format(
file_list), flush=True)
ret = requests.post(self.__endpoint, data=m, verify=False,
headers=headers)
# fdata.close()
return util.json_response(ret)
[docs] def ftp(self, **v):
"""
ftp: ファイルアップロード
"""
t0_begin = time.time()
reply = {}
register_name = v.get("register_name")
files_encode = v.get("files_encode")
# ...
local_storage_path = self.__parent.local_storage_path()
facility = self.__parent.facility()
class_name = self.__parent.class_name()
disk_name = self.__parent.disk_name()
repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name)
meta_files = util.decode_json(files_encode)
meta_files = util.add_info_file_dict(
meta_files, local_storage_path, facility, class_name, disk_name)
# ... adjust file time with registered time in metadata
# (filename should not be one of ["register_name", "file_encode"])
for f in meta_files:
filename_storage = "{}/{}".format(local_storage_path,
f[len(repository_top_dir) + 1:])
str_utime = meta_files[f]["time"]
mtime = util.mktime(str_utime)
atime = os.stat(filename_storage).st_mtime
# os.utime(filename_storage, times=(atime, mtime)) # <- not work with python2.7
os.utime(filename_storage, (atime, mtime))
t0_end = time.time()
t1_begin = time.time()
# ... upload via ftp
#
repo = rest.Repository()
repo.authorize(self.__parent.access_token())
ret_dict = repo.auth.userinfo.post()
user = ret_dict.get("username")
passwd = self.__parent.access_token()
host = util.benten_uploader_host()
port = util.benten_uploader_port()
hash_register_name = hashlib.md5(register_name.encode("utf-8")).hexdigest()
ftp = FTP_TLS()
ftp.connect(host,port)
ftp.login(user,passwd)
if util.benten_uploader_secure_data():
ftp.prot_p() # option for data security
# optimize socket parameters for upload task
ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
config.ftp_socket_SO_KEEPALIVE)
ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
config.ftp_socket_TCP_KEEPINTVL)
ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
config.ftp_socket_TCP_KEEPIDLE)
try:
ftp.mkd(hash_register_name)
except ftplib.error_perm as e:
pass
meta_dict = {}
try:
meta_filename = "{}/metadata.json".format(hash_register_name)
with StringIO() as f:
ftp.retrlines("RETR {}".format(meta_filename), f.write)
meta_dict = json.loads(f.getvalue())
except:
pass
file_list = []
filename_list = []
# ... set uuid_name for files in meta_dict
for f in meta_files:
filename_list.append(f)
v = meta_files[f]
uuid_name = None
if f in meta_dict:
if "uuid_name" in meta_dict[f]:
uuid_name = meta_dict[f]["uuid_name"]
if uuid_name is None:
uuid_name = str(uuid.uuid4())
v["uuid_name"] = uuid_name
meta_dict[f] = v
util.log("[upload_monitor(ftps)] for file_list={}".format(filename_list))
for f in meta_files:
v = meta_files[f]
total_size = v["size"]
unit = "B"
if total_size >= 1024:
total_size /= 1024.
unit = "KB"
if total_size >= 1024:
total_size /= 1024.
unit = "MB"
if total_size >= 1024:
total_size /= 1024.
unit = "GB"
util.log("[upload_monitor(ftps)] {} ({} {})".format(f, total_size, unit))
filename_storage = "{}/{}".format(local_storage_path,
f[len(repository_top_dir) + 1:])
uuid_name = v["uuid_name"]
with codecs.open(filename_storage, "rb") as fd:
ftp.storbinary("STOR {}/{}".format(hash_register_name, uuid_name), fd)
v["name"] = f
file_list.append(v)
with BytesIO() as fd:
fd.write(util.str_json(meta_dict).encode("utf-8"))
fd.seek(0)
ftp.storbinary("STOR {}".format(meta_filename), fd)
ftp.close()
t1_end = time.time()
t2_begin = time.time()
file_error_list = []
count = 0
size = 0
count_total = 0
count_threshold = 100
count_max = len(meta_files)
size_threshold = 1024*1024*1024 # 1GB
each_dict = {}
for f in meta_files:
meta_value = meta_files[f]
each_dict[f] = meta_value
count += 1
count_total += 1
size += meta_value.get("size",0)
if (size < size_threshold) and \
(count < count_threshold) and \
(count_total < count_max):
continue
v = {}
v["register_name"] = register_name
v["files_encode"] = util.encode_json(each_dict)
util.log("[verification check] {}".format(list(each_dict.keys())))
ret = repo.register.check.post(**v)
if "error" in ret:
message = ret["error"]["message"]
util.log("--> Error: {}".format(message))
file_error_list_each = ret.get("file_error_list",[])
file_error_list.extend(file_error_list_each)
else:
util.log("--> OK")
# ... reset values
each_dict = {}
count = 0
size = 0
if len(file_error_list)>0:
reply["error"] = {}
reply["error"]["message"] = "error in file: {}".format(file_error_list)
reply["file_error_list"] = file_error_list
t2_end = time.time()
util.log("[upload_monitor(elapsed time)] all = {:.2f}s".format(t2_end - t0_begin))
util.log("[upload_monitor(elapsed time)] init = {:.2f}s".format(t0_end - t0_begin))
util.log("[upload_monitor(elapsed time)] ftps = {:.2f}s".format(t1_end - t1_begin))
util.log("[upload_monitor(elapsed time)] verification = {:.2f}s".format(t2_end - t2_begin))
reply["file_list"] = file_list
return reply
[docs]class Files20190215(): # post files as it is (not possible to upload file more than 2GB)
def __init__(self, parent):
self.__parent = parent
self.__endpoint = parent.endpoint(config.register_files_path)
[docs] def post(self, **v):
"""
REST API : ファイルアップロード
* Endpoint : [agent-url]/benten/v1/register/files
* Method : POST
* Response : JSON
* Authorization : 要
:param \**v:
See below.
:Keyword Arguments:
* *register_name* (``string``) : 登録名
* *files_encode* (``str``) : 登録ファイル情報
* json形式でencode要
:return:
A dict of mapping keys.
:Keyword:
* *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト
* *error* (``dict``) : エラー情報 (エラー発生時のみ付加)
"""
vdata = {}
for key in ["register_name", "files_encode"]:
vdata[key] = v[key]
local_storage_path = self.__parent.local_storage_path()
facility = self.__parent.facility()
class_name = self.__parent.class_name()
disk_name = self.__parent.disk_name()
repository_top_dir = "/%s/%s/%s" % (facility, class_name, disk_name)
files_input = util.decode_json(vdata["files_encode"])
files_input = util.add_info_file_dict(
files_input, local_storage_path, facility, class_name, disk_name)
vdata["files_encode"] = util.encode_json(files_input)
files = {}
# ... adjust file time with registered time in metadata
for f in files_input:
fname_storage = "%s/%s" % (local_storage_path,
f[len(repository_top_dir) + 1:])
str_utime = files_input[f]["time"]
mtime = util.mktime(str_utime)
atime = os.stat(fname_storage).st_mtime
# os.utime(fname_storage, times=(atime, mtime)) # <- not work with python2.7
os.utime(fname_storage, (atime, mtime))
fdata = codecs.open(fname_storage, "rb")
files[f] = (f, fdata, "application/octet-stream")
ret = requests.post(self.__endpoint, vdata, files=files, verify=False,
headers=util.headers_authorization(self.__parent.access_token()))
# fdata.close()
return util.json_response(ret)
[docs]class Check(): # check hash for uploaded data
def __init__(self, parent):
self.__parent = parent
self.__endpoint = parent.endpoint(config.register_check_path)
[docs] def post(self, **v):
"""
REST API: ファイル Check
* Endpoint : [agent-url]/benten/v1/register/check
* Method : POST
* Response : JSON
* Authorization : 要
:param \**v:
See below.
:Keyword Arguments:
* *register_name* (``string``) : 登録名
* *files_encode* (``str``) : 登録ファイル情報
* json形式でencode要
:return:
A dict of mapping keys.
:Keyword:
* *file_list* (``list(dict)``) : 登録指令を受け付けたファイルリスト
* *error* (``dict``) : エラー情報 (エラー発生時のみ付加)
"""
vdata = {}
for key in ["register_name", "files_encode"]:
vdata[key] = v[key]
ret = requests.post(self.__endpoint, vdata, verify=False,
headers=util.headers_authorization(self.__parent.access_token()))
return util.json_response(ret)
[docs]class Reset():
def __init__(self, parent):
self.__parent = parent
self.__endpoint = parent.endpoint(config.register_reset_path)
[docs] def post(self, **v):
"""
REST API: アップロードされたファイルのリセット
* Endpoint : [agent-url]/benten/v1/register/reset
* Method : POST
* Response : JSON
* Authorization : 要
:param \**v:
See below.
:Keyword Arguments:
* *register_name* (``str``) : 登録名
:return:
A dict of mapping keys.
:Keyword:
* *result* (``str``) : 実行結果の状況
* 成功した場合のみ terminatedを付加
* *error* (``dict``) : エラー情報 (エラー発生時のみ付加)
"""
vdata = {}
for key in ["register_name"]:
vdata[key] = v[key]
ret = requests.post(self.__endpoint, vdata, verify=False,
headers=util.headers_authorization(self.__parent.access_token()))
return util.json_response(ret)