Newer
Older
from socket import gethostname, gethostbyname
from ctypes import c_uint64
from shell.ais_shell import GetListInformation, AISGetLibraryVersionStr, AisShellpGetProgramVersion, AISRestart, \
AISDestroy, AISOpen, AISClose, AISUpdateAndGetCount, wr_status, sys_get_timezone_info, AISGetTime, AISSetTime, \
log_get, whitelist_read, blacklist_read, get_io_state, lock_open, relay_toogle, AISGetVersion, ee_lock, ee_unlock,\
log_by_index, log_by_time, get_unread_log_one, whitelist_write, blacklist_write, TestLights, PASS, \
password_set_default, password_change, edit_device_list, MainLoop, fw_update, config_file_rd,\
config_file_wr, DDEV_HND
def GetDLLVersion():
return "AIS_GetDLLVersion() >> %s\n" % AISGetLibraryVersionStr()
return '{0}\n{1}\n'.format(AisHttpGetProgramVersion(), AisShellpGetProgramVersion())
def GetExitApp():
if sys.platform.startswith('linux'):
os.system('pkill -9 python')
os.kill(os.getpid(), signal.SIGINT)
elif sys.platform.startswith('win'):
os._exit(0)
return '\nServer stopped !\nClose program !\n'
res, count = AISUpdateAndGetCount()
return ' COUNT >> {0} {1}'.format(count, wr_status('', res))
except WindowsError:
pass
def GetAISGetTime(devHnd):
return AISGetTime(devHnd)[0]
def SetAISSetTime(devHnd):
return '{0}\n{1}'.format(sys_get_timezone_info(), AISSetTime(devHnd))
def GetInformation(devHnd):
return '{0}\n{1}\n{2}'.format(AISGetTime(devHnd)[0], sys_get_timezone_info(), AISGetVersion(devHnd))
def GetLogByIndex(devHnd, **lpq):
start_index = (''.join(lpq.get(START_INDEX)))
end_index = (''.join(lpq.get(END_INDEX)))
if len(start_index) == 0 or len(end_index) == 0:
return 'You must enter values for START INDEX and END INDEX !'
else:
return log_by_index(int(start_index), int(end_index), devHnd)
def GetLogByTime(devHnd, **lpq):
start_time = (''.join(lpq.get(START_TIME)))
end_time = (''.join(lpq.get(END_TIME)))
if len(start_time) == 0 or len(end_time) == 0:
return 'You must enter values for START TIME and END TIME !'
else:
return log_by_time(int(start_time), int(end_time), devHnd)
def GetUnreadLog(devHnd, **lpq):
return get_unread_log_one(get_unread_log, devHnd)
def WhiteListWrite(devHnd, **lpq):
white_list_write = ''.join(lpq.get(WHITE_LIST_WRITE))
if len(white_list_write) == 0:
return 'You must enter values for write white list !'
else:
return whitelist_write(white_list_write, devHnd)
def BlackListWrite(devHnd, **lpq):
black_list_write = ''.join(lpq.get(BLACK_LIST_WRITE))
if len(black_list_write) == 0:
return 'You must enter values for write black list !'
else:
return blacklist_write(black_list_write, devHnd)
def GetTestLights(devHnd, **lpq):
return TestLights(lights_choise, devHnd)
global PASS
mess = ["Actual application password is :{0}\n" .format(PASS),
"Patch - new pass = default pass\n"]
set_def_pass = ''.join(lpq.get(DEFAULT_APP_PASS))
if len(set_def_pass) == 0:
set_def_pass = PASS
return ''.join((mess[0], mess[1], password_set_default(set_def_pass)))
else:
PASS = set_def_pass
return ''.join((mess[0], password_set_default(set_def_pass)))
def SetDevicePassword(devHnd, **lpq):
global PASS
new_pass = ''.join(lpq.get(NEW_PASS))
mess = ["Old password is actual application password :{0}\n".format(PASS),
"New password for units ( and application ) :{0}\n".format(new_pass),
"Patch - new pass = default pass\n",
"Try set new password for units= {0}\n" .format(new_pass)]
if len(new_pass) == 0:
new_pass = PASS
return ''.join((mess[0], mess[2], password_change(new_pass, devHnd)))
return ''.join((mess[0], mess[1], password_change(new_pass, devHnd)))
if lpq.get(DEVICE_TYPE):
device_type = ''.join(lpq.get(DEVICE_TYPE))
if lpq.get(DEVICE_ID):
device_id = ''.join(lpq.get(DEVICE_ID))
edit_list_choise = ''.join(lpq[EDIT_LIST])
if edit_list_choise == AVAILABLE_DEVICES:
return edit_device_list(1)
elif edit_list_choise == ACTUAL_LIST:
return edit_device_list(2)
elif edit_list_choise == CLEAR_LIST:
return edit_device_list(3)
elif edit_list_choise == ADD_DEVICE:
if len(device_type) == 0 or len(device_id) == 0:
return "You must enter values in the relevant fields !"
else:
return edit_device_list(4, "AIS_List_AddDeviceForCheck", int(device_type), int(device_id))
elif edit_list_choise == ERASE_DEVICE:
return edit_device_list(5, "AIS_List_EraseDeviceForCheck", int(device_type), int(device_id))
else:
return ''
if rt.strip() == '':
return 'Must enter value for seconds !'
else:
seconds = int(rt)
rt = ''
stop_time = c_uint64()
stop_time = time.time() + seconds
mess = ["RTE for {0} sec \n" .format(seconds),
"End RTE listen"]
while time.ctime(time.time()) < time.ctime(stop_time):
dev.hnd = hnd
r, rte = MainLoop(dev)
if rte:
rt += rte
time.sleep(.5)
return ''.join((mess[0], rt, mess[1]))
def FirmwareUpdate(devHnd, **lpq):
try:
if lpq.get('fw_file'):
fw_file_name = ''.join(lpq.get('fw_file_name'))
with open(fw_file_name, 'wb') as out:
out.write(''.join(lpq['fw_file']))
out.close()
os.remove(fw_file_name)
return fw_update(devHnd, fw_name=fw_file_name)
except Exception as exc:
return "ERROR: {0}" .format(exc)
def ReadConfigFromDevToFile(**lpq):
try:
if lpq.get(CONFIG_FILE_READ):
return 'DEPRECATED AIS_Config_Send(), AIS_Config_Read()'
conf_file_rd = ''.join(lpq.get(CONFIG_FILE_READ))
return config_file_rd(fname=conf_file_rd)
ip = gethostbyname(gethostname())
with open(conf_file_rd + '.config', 'wb') as out:
http_request(ip + ":" + str(HTTP_SERVER_PORT), out.write(conf_file_rd + '.config'))
except Exception as exc:
return "ERROR: {0}".format(exc)
def ReadConfigFromFileToDev(**lpq):
try:
if lpq.get(CONFIG_FILE_WR_NAME):
return 'DEPRECATED AIS_Config_Send(), AIS_Config_Read()'
confFileNameWR = ''.join(lpq.get(CONFIG_FILE_WR_NAME))
confFileWR = ''.join(lpq.get(CONFIG_FILE_WRITE))
with open(confFileNameWR, 'wb') as out:
out.write(confFileWR)
return config_file_wr(fname=confFileNameWR)
else:
return 'NO FILE'
except Exception as exc:
return "ERROR: {0}".format(exc)
action = {'q': GetListInformation,
'v': GetDLLVersion,
'R': AISRestart,
'D': AISDestroy,
'o': AISOpen,
'c': AISClose,
'w': whitelist_read,
'b': blacklist_read,
'g': get_io_state,
'G': lock_open,
'y': relay_toogle,
'f': AISGetVersion,
'E': ee_lock,
'e': ee_unlock,
'n': GetLogByIndex,
'N': GetLogByTime,
'u': GetUnreadLog,
'W': WhiteListWrite,
'B': BlackListWrite,
'L': GetTestLights,
'P': SetAppPassword,
'p': SetDevicePassword,
'Q': SetEditListDevices,
'r': GetReadRTE,
'F': FirmwareUpdate,
's': ReadConfigFromDevToFile,
'S': ReadConfigFromFileToDev
}
def command(getfunction, ldev, **pq):
try:
return action[getfunction].__call__(ldev.hnd)
except TypeError:
except TypeError:
return action[getfunction].__call__(ldev.hnd, **pq)