mirror of https://github.com/acidanthera/audk.git
194 lines
6.3 KiB
Python
194 lines
6.3 KiB
Python
## @file
|
|
# This file hooks file and directory creation and removal
|
|
#
|
|
# Copyright (c) 2014 - 2018, Intel Corporation. All rights reserved.<BR>
|
|
#
|
|
# SPDX-License-Identifier: BSD-2-Clause-Patent
|
|
#
|
|
|
|
'''
|
|
File hook
|
|
'''
|
|
|
|
import os
|
|
import stat
|
|
import time
|
|
import zipfile
|
|
from time import sleep
|
|
from Library import GlobalData
|
|
|
|
__built_in_remove__ = os.remove
|
|
__built_in_mkdir__ = os.mkdir
|
|
__built_in_rmdir__ = os.rmdir
|
|
__built_in_chmod__ = os.chmod
|
|
__built_in_open__ = open
|
|
|
|
_RMFILE = 0
|
|
_MKFILE = 1
|
|
_RMDIR = 2
|
|
_MKDIR = 3
|
|
_CHMOD = 4
|
|
|
|
gBACKUPFILE = 'file.backup'
|
|
gEXCEPTION_LIST = ['Conf'+os.sep+'DistributionPackageDatabase.db', '.tmp', gBACKUPFILE]
|
|
|
|
class _PathInfo:
|
|
def __init__(self, action, path, mode=-1):
|
|
self.action = action
|
|
self.path = path
|
|
self.mode = mode
|
|
|
|
class RecoverMgr:
|
|
def __init__(self, workspace):
|
|
self.rlist = []
|
|
self.zip = None
|
|
self.workspace = os.path.normpath(workspace)
|
|
self.backupfile = gBACKUPFILE
|
|
self.zipfile = os.path.join(self.workspace, gBACKUPFILE)
|
|
|
|
def _createzip(self):
|
|
if self.zip:
|
|
return
|
|
self.zip = zipfile.ZipFile(self.zipfile, 'w', zipfile.ZIP_DEFLATED)
|
|
|
|
def _save(self, tmp, path):
|
|
if not self._tryhook(path):
|
|
return
|
|
self.rlist.append(_PathInfo(tmp, path))
|
|
|
|
def bkrmfile(self, path):
|
|
arc = self._tryhook(path)
|
|
if arc and os.path.isfile(path):
|
|
self._createzip()
|
|
self.zip.write(path, arc.encode('utf_8'))
|
|
sta = os.stat(path)
|
|
oldmode = stat.S_IMODE(sta.st_mode)
|
|
self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
|
|
self.rlist.append(_PathInfo(_RMFILE, path))
|
|
__built_in_remove__(path)
|
|
|
|
def bkmkfile(self, path, mode, bufsize):
|
|
if not os.path.exists(path):
|
|
self._save(_MKFILE, path)
|
|
return __built_in_open__(path, mode, bufsize)
|
|
|
|
def bkrmdir(self, path):
|
|
if os.path.exists(path):
|
|
sta = os.stat(path)
|
|
oldmode = stat.S_IMODE(sta.st_mode)
|
|
self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
|
|
self._save(_RMDIR, path)
|
|
__built_in_rmdir__(path)
|
|
|
|
def bkmkdir(self, path, mode):
|
|
if not os.path.exists(path):
|
|
self._save(_MKDIR, path)
|
|
__built_in_mkdir__(path, mode)
|
|
|
|
def bkchmod(self, path, mode):
|
|
if self._tryhook(path) and os.path.exists(path):
|
|
sta = os.stat(path)
|
|
oldmode = stat.S_IMODE(sta.st_mode)
|
|
self.rlist.append(_PathInfo(_CHMOD, path, oldmode))
|
|
__built_in_chmod__(path, mode)
|
|
|
|
def rollback(self):
|
|
if self.zip:
|
|
self.zip.close()
|
|
self.zip = None
|
|
index = len(self.rlist) - 1
|
|
while index >= 0:
|
|
item = self.rlist[index]
|
|
exist = os.path.exists(item.path)
|
|
if item.action == _MKFILE and exist:
|
|
#if not os.access(item.path, os.W_OK):
|
|
# os.chmod(item.path, S_IWUSR)
|
|
__built_in_remove__(item.path)
|
|
elif item.action == _RMFILE and not exist:
|
|
if not self.zip:
|
|
self.zip = zipfile.ZipFile(self.zipfile, 'r', zipfile.ZIP_DEFLATED)
|
|
arcname = os.path.normpath(item.path)
|
|
arcname = arcname[len(self.workspace)+1:].encode('utf_8')
|
|
if os.sep != "/" and os.sep in arcname:
|
|
arcname = arcname.replace(os.sep, '/')
|
|
mtime = self.zip.getinfo(arcname).date_time
|
|
content = self.zip.read(arcname)
|
|
filep = __built_in_open__(item.path, "wb")
|
|
filep.write(content)
|
|
filep.close()
|
|
intime = time.mktime(mtime + (0, 0, 0))
|
|
os.utime(item.path, (intime, intime))
|
|
elif item.action == _MKDIR and exist:
|
|
while True:
|
|
try:
|
|
__built_in_rmdir__(item.path)
|
|
break
|
|
except IOError:
|
|
# Sleep a short time and try again
|
|
# The anti-virus software may delay the file removal in this directory
|
|
sleep(0.1)
|
|
elif item.action == _RMDIR and not exist:
|
|
__built_in_mkdir__(item.path)
|
|
elif item.action == _CHMOD and exist:
|
|
try:
|
|
__built_in_chmod__(item.path, item.mode)
|
|
except EnvironmentError:
|
|
pass
|
|
index -= 1
|
|
self.commit()
|
|
|
|
def commit(self):
|
|
if self.zip:
|
|
self.zip.close()
|
|
__built_in_remove__(self.zipfile)
|
|
|
|
# Check if path needs to be hooked
|
|
def _tryhook(self, path):
|
|
path = os.path.normpath(path)
|
|
works = self.workspace if str(self.workspace).endswith(os.sep) else (self.workspace + os.sep)
|
|
if not path.startswith(works):
|
|
return ''
|
|
for exceptdir in gEXCEPTION_LIST:
|
|
full = os.path.join(self.workspace, exceptdir)
|
|
if full == path or path.startswith(full + os.sep) or os.path.split(full)[0] == path:
|
|
return ''
|
|
return path[len(self.workspace)+1:]
|
|
|
|
def _hookrm(path):
|
|
if GlobalData.gRECOVERMGR:
|
|
GlobalData.gRECOVERMGR.bkrmfile(path)
|
|
else:
|
|
__built_in_remove__(path)
|
|
|
|
def _hookmkdir(path, mode=0o777):
|
|
if GlobalData.gRECOVERMGR:
|
|
GlobalData.gRECOVERMGR.bkmkdir(path, mode)
|
|
else:
|
|
__built_in_mkdir__(path, mode)
|
|
|
|
def _hookrmdir(path):
|
|
if GlobalData.gRECOVERMGR:
|
|
GlobalData.gRECOVERMGR.bkrmdir(path)
|
|
else:
|
|
__built_in_rmdir__(path)
|
|
|
|
def _hookmkfile(path, mode='r', bufsize=-1):
|
|
if GlobalData.gRECOVERMGR:
|
|
return GlobalData.gRECOVERMGR.bkmkfile(path, mode, bufsize)
|
|
return __built_in_open__(path, mode, bufsize)
|
|
|
|
def _hookchmod(path, mode):
|
|
if GlobalData.gRECOVERMGR:
|
|
GlobalData.gRECOVERMGR.bkchmod(path, mode)
|
|
else:
|
|
__built_in_chmod__(path, mode)
|
|
|
|
def SetRecoverMgr(mgr):
|
|
GlobalData.gRECOVERMGR = mgr
|
|
|
|
os.remove = _hookrm
|
|
os.mkdir = _hookmkdir
|
|
os.rmdir = _hookrmdir
|
|
os.chmod = _hookchmod
|
|
__FileHookOpen__ = _hookmkfile
|