mirror of https://github.com/Cisco-Talos/clamav
An ENABLE_TESTS CMake option is provided so that users can disable testing if they don't want it. Instructions for how to use this included in the INSTALL.cmake.md file. If you run `ctest`, each testcase will write out a log file to the <build>/unit_tests directory. As with Autotools' make check, the test files are from test/.split and unit_tests/.split files, but for CMake these are generated at build time instead of at test time. On Posix systems, sets the LD_LIBRARY_PATH so that ClamAV-compiled libraries can be loaded when running tests. On Windows systems, CTest will identify and collect all library dependencies and assemble a temporarily install under the build/unit_tests directory so that the libraries can be loaded when running tests. The same feature is used on Windows when using CMake to install to collect all DLL dependencies so that users don't have to install them manually afterwards. Each of the CTest tests are run using a custom wrapper around Python's unittest framework, which is also responsible for finding and inserting valgrind into the valgrind tests on Posix systems. Unlike with Autotools, the CMake CTest Valgrind-tests are enabled by default, if Valgrind can be found. There's no need to set VG=1. CTest's memcheck module is NOT supported, because we use Python to orchestrate our tests. Added a bunch of Windows compatibility changes to the unit tests. These were primarily changing / to PATHSEP and making adjustments to use Win32 C headers and ifdef out the POSIX ones which aren't available on Windows. Also disabled a bunch of tests on Win32 that don't work on Windows, notably the mmap ones and FD-passing (i.e. FILEDES) ones. Add JSON_C_HAVE_INTTYPES_H definition to clamav-config.h to eliminate warnings on Windows where json.h is included after inttypes.h because json-c's inttypes replacement relies on it. This is a it of a hack and may be removed if json-c fixes their inttypes header stuff in the future. Add preprocessor definitions on Windows to disable MSVC warnings about CRT secure and nonstandard functions. While there may be a better solution, this is needed to be able to see other more serious warnings. Add missing file comment block and copyright statement for clamsubmit.c. Also change json-c/json.h include filename to json.h in clamsubmit.c. The directory name is not required. Changed the hash table data integer type from long, which is poorly defined, to size_t -- which is capable of storing a pointer. Fixed a bunch of casts regarding this variable to eliminate warnings. Fixed two bugs causing utf8 encoding unit tests to fail on Windows: - The in_size variable should be the number of bytes, not the character count. This was was causing the SHIFT_JIS (japanese codepage) to UTF8 transcoding test to only transcode half the bytes. - It turns out that the MultiByteToWideChar() API can't transcode UTF16-BE to UTF16-LE. The solution is to just iterate over the buffer and flip the bytes on each uint16_t. This but was causing the UTF16-BE to UTF8 tests to fail. I also split up the utf8 transcoding tests into separate tests so I could see all of the failures instead of just the first one. Added a flags parameter to the unit test function to open testfiles because it turns out that on Windows if a file contains the \r\n it will replace it with just \n if you opened the file as a text file instead of as binary. However, if we open the CBC files as binary, then a bunch of bytecode tests fail. So I've changed the tests to open the CBC files in the bytecode tests as text files and open all other files as binary. Ported the feature tests from shell scripts to Python using a modified version of our QA test-framework, which is largely compatible and will allow us to migrate some QA tests into this repo. I'd like to add GitHub Actions pipelines in the future so that all public PR's get some testing before anyone has to manually review them. The clamd --log option was missing from the help string, though it definitely works. I've added it in this commit. It appears that clamd.c was never clang-format'd, so this commit also reformats clamd.c. Some of the check_clamd tests expected the path returned by clamd to match character for character with original path sent to clamd. However, as we now evaluate real paths before a scan, the path returned by clamd isn't going to match the relative (and possibly symlink-ridden) path passed to clamdscan. I fixed this test by changing the test to search for the basename: <signature> FOUND within the response instead of matching the exact path. Autotools: Link check_clamd with libclamav so we can use our utility functions in check_clamd.c.pull/152/head^2
parent
978fea6788
commit
2552cfd0d1
@ -0,0 +1,124 @@ |
||||
# Copyright 2019 Collabora, Ltd. |
||||
# Distributed under the Boost Software License, Version 1.0. |
||||
# (See accompanying file LICENSE_1_0.txt or copy at |
||||
# http://www.boost.org/LICENSE_1_0.txt) |
||||
# |
||||
# Original Author: |
||||
# 2019 Ryan Pavlik <ryan.pavlik@collabora.com> |
||||
|
||||
#.rst: |
||||
# FindCheck |
||||
# --------------- |
||||
# |
||||
# Find the "Check" C unit testing framework. |
||||
# |
||||
# See https://libcheck.github.io |
||||
# |
||||
# The Debian package for this is called ``check`` |
||||
# |
||||
# Targets |
||||
# ^^^^^^^ |
||||
# |
||||
# If successful, the following imported targets are created. |
||||
# |
||||
# ``libcheck::check`` |
||||
# |
||||
# Cache variables |
||||
# ^^^^^^^^^^^^^^^ |
||||
# |
||||
# The following cache variable may also be set to assist/control the operation of this module: |
||||
# |
||||
# ``LIBCHECK_ROOT_DIR`` |
||||
# The root to search for libcheck. |
||||
|
||||
set(LIBCHECK_ROOT_DIR "${LIBCHECK_ROOT_DIR}" CACHE PATH "Root to search for libcheck") |
||||
|
||||
find_package(PkgConfig QUIET) |
||||
if(PKG_CONFIG_FOUND) |
||||
set(_old_prefix_path "${CMAKE_PREFIX_PATH}") |
||||
# So pkg-config uses LIBCHECK_ROOT_DIR too. |
||||
if(LIBCHECK_ROOT_DIR) |
||||
list(APPEND CMAKE_PREFIX_PATH ${LIBCHECK_ROOT_DIR}) |
||||
endif() |
||||
pkg_check_modules(PC_LIBCHECK QUIET check) |
||||
# Restore |
||||
set(CMAKE_PREFIX_PATH "${_old_prefix_path}") |
||||
endif() |
||||
find_path(LIBCHECK_INCLUDE_DIR |
||||
NAMES |
||||
check.h |
||||
PATHS |
||||
${LIBCHECK_ROOT_DIR} |
||||
HINTS |
||||
${PC_LIBCHECK_INCLUDE_DIRS} |
||||
PATH_SUFFIXES |
||||
include |
||||
) |
||||
find_library(LIBCHECK_LIBRARY |
||||
NAMES |
||||
check_pic |
||||
check |
||||
PATHS |
||||
${LIBCHECK_ROOT_DIR} |
||||
HINTS |
||||
${PC_LIBCHECK_LIBRARY_DIRS} |
||||
PATH_SUFFIXES |
||||
lib |
||||
) |
||||
find_library(LIBCHECK_SUBUNIT_LIBRARY |
||||
NAMES |
||||
subunit |
||||
PATHS |
||||
${LIBCHECK_ROOT_DIR} |
||||
HINTS |
||||
${PC_LIBCHECK_LIBRARY_DIRS} |
||||
PATH_SUFFIXES |
||||
lib |
||||
) |
||||
find_library(LIBCHECK_LIBRT rt) |
||||
find_library(LIBCHECK_LIBM m) |
||||
|
||||
find_package(Threads QUIET) |
||||
|
||||
set(_libcheck_extra_required) |
||||
if(PC_LIBCHECK_FOUND AND "${PC_LIBCHECK_LIBRARIES}" MATCHES "subunit") |
||||
list(APPEND _libcheck_extra_required LIBCHECK_SUBUNIT_LIBRARY) |
||||
endif() |
||||
|
||||
include(FindPackageHandleStandardArgs) |
||||
find_package_handle_standard_args(Libcheck |
||||
REQUIRED_VARS |
||||
LIBCHECK_INCLUDE_DIR |
||||
LIBCHECK_LIBRARY |
||||
THREADS_FOUND |
||||
) |
||||
if(LIBCHECK_FOUND) |
||||
if(NOT TARGET libcheck::check) |
||||
add_library(libcheck::check UNKNOWN IMPORTED) |
||||
|
||||
set_target_properties(libcheck::check PROPERTIES |
||||
INTERFACE_INCLUDE_DIRECTORIES "${LIBCHECK_INCLUDE_DIR}") |
||||
set_target_properties(libcheck::check PROPERTIES |
||||
IMPORTED_LINK_INTERFACE_LANGUAGES "C" |
||||
IMPORTED_LOCATION ${LIBCHECK_LIBRARY}) |
||||
set_property(TARGET libcheck::check PROPERTY |
||||
IMPORTED_LINK_INTERFACE_LIBRARIES Threads::Threads) |
||||
|
||||
# if we found librt or libm, link them. |
||||
if(LIBCHECK_LIBRT) |
||||
set_property(TARGET libcheck::check APPEND PROPERTY |
||||
IMPORTED_LINK_INTERFACE_LIBRARIES "${LIBCHECK_LIBRT}") |
||||
endif() |
||||
if(LIBCHECK_LIBM) |
||||
set_property(TARGET libcheck::check APPEND PROPERTY |
||||
IMPORTED_LINK_INTERFACE_LIBRARIES "${LIBCHECK_LIBM}") |
||||
endif() |
||||
if(LIBCHECK_SUBUNIT_LIBRARY) |
||||
set_property(TARGET libcheck::check APPEND PROPERTY |
||||
IMPORTED_LINK_INTERFACE_LIBRARIES "${LIBCHECK_SUBUNIT_LIBRARY}") |
||||
endif() |
||||
|
||||
endif() |
||||
mark_as_advanced(LIBCHECK_INCLUDE_DIR LIBCHECK_LIBRARY LIBCHECK_SUBUNIT_LIBRARY) |
||||
endif() |
||||
mark_as_advanced(LIBCHECK_ROOT_DIR LIBCHECK_LIBRT LIBCHECK_LIBM) |
||||
@ -0,0 +1,36 @@ |
||||
# |
||||
# Find the Valgrind program. |
||||
# |
||||
# If found, will set: Valgrind_FOUND, Valgrind_VERSION, and Valgrind_EXECUTABLE |
||||
# |
||||
# If you have a custom install location for Valgrind, you can provide a hint |
||||
# by settings -DValgrind_HOME=<directory containing valgrind> |
||||
# |
||||
|
||||
find_program(Valgrind_EXECUTABLE valgrind |
||||
HINTS "${Valgrind_HOME}" |
||||
PATH_SUFFIXES "bin" |
||||
) |
||||
if(Valgrind_EXECUTABLE) |
||||
execute_process(COMMAND "${Valgrind_EXECUTABLE}" --version |
||||
OUTPUT_VARIABLE Valgrind_VERSION_OUTPUT |
||||
ERROR_VARIABLE Valgrind_VERSION_ERROR |
||||
RESULT_VARIABLE Valgrind_VERSION_RESULT |
||||
) |
||||
if(NOT ${Valgrind_VERSION_RESULT} EQUAL 0) |
||||
message(STATUS "Valgrind not found: Failed to determine version.") |
||||
unset(Valgrind_EXECUTABLE) |
||||
else() |
||||
string(REGEX |
||||
MATCH "[0-9]+\\.[0-9]+(\\.[0-9]+)?(-nightly)?" |
||||
Valgrind_VERSION "${Valgrind_VERSION_OUTPUT}" |
||||
) |
||||
set(Valgrind_VERSION "${Valgrind_VERSION}") |
||||
set(Valgrind_FOUND 1) |
||||
message(STATUS "Valgrind found: ${Valgrind_EXECUTABLE}, ${Valgrind_VERSION}") |
||||
endif() |
||||
|
||||
mark_as_advanced(Valgrind_EXECUTABLE Valgrind_VERSION) |
||||
else() |
||||
message(STATUS "Valgrind not found.") |
||||
endif() |
||||
@ -0,0 +1,76 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
# |
||||
# Assemble split files found in test/.split |
||||
# |
||||
set(TESTFILES |
||||
clam.cab |
||||
clam.exe |
||||
clam.zip |
||||
clam.arj |
||||
clam.exe.rtf |
||||
clam.exe.szdd |
||||
clam.tar.gz |
||||
clam.chm |
||||
clam.sis |
||||
clam-aspack.exe |
||||
clam-pespin.exe |
||||
clam-upx.exe |
||||
clam-fsg.exe |
||||
clam-mew.exe |
||||
clam-nsis.exe |
||||
clam-petite.exe |
||||
clam-upack.exe |
||||
clam-wwpack.exe |
||||
clam.pdf |
||||
clam.mail |
||||
clam.ppt |
||||
clam.tnef |
||||
clam.ea05.exe |
||||
clam.ea06.exe |
||||
clam.d64.zip |
||||
clam.exe.mbox.base64 |
||||
clam.exe.mbox.uu |
||||
clam.exe.binhex |
||||
clam.ole.doc |
||||
clam.impl.zip |
||||
clam.exe.html |
||||
clam.bin-be.cpio |
||||
clam.bin-le.cpio |
||||
clam.newc.cpio |
||||
clam.odc.cpio |
||||
clam-yc.exe |
||||
clam_IScab_int.exe |
||||
clam_IScab_ext.exe |
||||
clam_ISmsi_int.exe |
||||
clam_ISmsi_ext.exe |
||||
clam.7z |
||||
clam_cache_emax.tgz |
||||
clam.iso |
||||
clamjol.iso |
||||
clam.exe.bz2 |
||||
clam.bz2.zip |
||||
) |
||||
|
||||
if(ENABLE_UNRAR) |
||||
set(TESTFILES ${TESTFILES} |
||||
clam-v2.rar clam-v3.rar |
||||
) |
||||
endif() |
||||
|
||||
# Assemble split test file |
||||
function(assemble_testfile test_file) |
||||
add_custom_command(OUTPUT ${test_file} |
||||
COMMAND ${Python3_EXECUTABLE} |
||||
${CMAKE_CURRENT_SOURCE_DIR}/assemble_testfile.py |
||||
${test_file} |
||||
--build_dir ${CMAKE_CURRENT_BINARY_DIR} |
||||
--split_dir ${CMAKE_CURRENT_SOURCE_DIR}/.split |
||||
COMMENT "Assembling test file ${test_file} of ${original}") |
||||
add_custom_target(tgt_${test_file} ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/${test_file}) |
||||
#install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${test_file} DESTINATION ${CMAKE_INSTALL_BINDIR}) |
||||
endfunction() |
||||
|
||||
foreach(TESTFILE ${TESTFILES}) |
||||
assemble_testfile(${TESTFILE}) |
||||
endforeach() |
||||
@ -0,0 +1,47 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
import argparse |
||||
import os |
||||
from pathlib import Path |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument("test_file") |
||||
parser.add_argument("--split_dir", help="Location of split files", required=True) |
||||
parser.add_argument("--build_dir", help="Location to assemble file", required=True) |
||||
args = parser.parse_args() |
||||
|
||||
split_dir = Path(args.split_dir) |
||||
if not split_dir.exists(): |
||||
print(f"Error: Split directory does not exist: {args.split_dir}") |
||||
|
||||
match_pattern = Path(split_dir, f"split.{args.test_file}a*") |
||||
|
||||
input_files = [ |
||||
x for x in split_dir.iterdir() if (x.is_file() and x.match(f"{match_pattern}")) |
||||
] |
||||
|
||||
if len(input_files) == 0: |
||||
print(f"Error: No splits matching '{args.test_file}' in: {args.split_dir}") |
||||
exit(1) |
||||
|
||||
test_file_path = Path(args.build_dir, args.test_file) |
||||
try: |
||||
test_file_path.touch(0o666, exist_ok=True) |
||||
except FileNotFoundError: |
||||
print(f"Failed to create file: {test_file_path}") |
||||
exit(1) |
||||
|
||||
input_files.sort() |
||||
file_data = bytes() |
||||
for split_file in input_files: |
||||
file_data += split_file.read_bytes() |
||||
|
||||
test_file_path.write_bytes(file_data) |
||||
|
||||
print(f"Assembled: '{test_file_path}'") |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
@ -0,0 +1,385 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
# |
||||
# Assemble additional split files found in unit_tests/.split |
||||
# |
||||
set(TESTFILES |
||||
clam-phish-exe |
||||
) |
||||
|
||||
# Assemble split test file |
||||
function(assemble_testfile test_file) |
||||
add_custom_command(OUTPUT ${test_file} |
||||
COMMAND ${Python3_EXECUTABLE} |
||||
${CMAKE_CURRENT_SOURCE_DIR}/../test/assemble_testfile.py |
||||
${test_file} |
||||
--build_dir ${CMAKE_CURRENT_BINARY_DIR} |
||||
--split_dir ${CMAKE_CURRENT_SOURCE_DIR}/.split |
||||
COMMENT "Assembling test file ${test_file} of ${original}") |
||||
add_custom_target(tgt_${test_file} ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/${test_file}) |
||||
#install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${test_file} DESTINATION ${CMAKE_INSTALL_BINDIR}) |
||||
endfunction() |
||||
|
||||
foreach(TESTFILE ${TESTFILES}) |
||||
assemble_testfile(${TESTFILE}) |
||||
endforeach() |
||||
|
||||
|
||||
if(WIN32) |
||||
add_definitions(-DWIN32_LEAN_AND_MEAN) |
||||
add_definitions(-DHAVE_STRUCT_TIMESPEC) |
||||
add_definitions(-D_CRT_SECURE_NO_WARNINGS) |
||||
add_definitions(-D_CRT_SECURE_NO_DEPRECATE) |
||||
add_definitions(-D_CRT_NONSTDC_NO_DEPRECATE) |
||||
|
||||
# Windows compatibility headers |
||||
include_directories(${CMAKE_SOURCE_DIR}/win32/compat) |
||||
endif() |
||||
|
||||
# |
||||
# Programs used by tests |
||||
# |
||||
|
||||
# preprocessor defines for test programs |
||||
if(WIN32) |
||||
file(TO_NATIVE_PATH ${CMAKE_CURRENT_BINARY_DIR} OBJDIR) |
||||
string(REPLACE "\\" "\\\\" OBJDIR ${OBJDIR}) |
||||
file(TO_NATIVE_PATH ${CMAKE_CURRENT_BINARY_DIR} BUILDDIR) |
||||
string(REPLACE "\\" "\\\\" BUILDDIR ${BUILDDIR}) |
||||
file(TO_NATIVE_PATH ${CMAKE_CURRENT_SOURCE_DIR} SRCDIR) |
||||
string(REPLACE "\\" "\\\\" SRCDIR ${SRCDIR}) |
||||
else() |
||||
set(OBJDIR ${CMAKE_CURRENT_BINARY_DIR}) |
||||
set(BUILDDIR ${CMAKE_CURRENT_BINARY_DIR}) |
||||
set(SRCDIR ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
endif() |
||||
|
||||
if(ENABLE_APP) |
||||
# check_fpu_endian is used by the clamscan tests |
||||
add_executable(check_fpu_endian) |
||||
target_sources(check_fpu_endian |
||||
PRIVATE |
||||
checks.h |
||||
check_fpu_endian.c) |
||||
target_link_libraries(check_fpu_endian |
||||
PRIVATE |
||||
libcheck::check |
||||
clamav_obj |
||||
regex |
||||
lzma_sdk |
||||
yara |
||||
tomsfastmath |
||||
bytecode_runtime |
||||
ClamAV::libunrar_iface_iface |
||||
JSONC::jsonc |
||||
ClamAV::libmspack |
||||
OpenSSL::SSL |
||||
OpenSSL::Crypto |
||||
ZLIB::ZLIB |
||||
BZip2::BZip2 |
||||
PCRE2::pcre2 |
||||
LibXml2::LibXml2) |
||||
target_include_directories(check_fpu_endian PRIVATE ${PROJECT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/libclamav ${CMAKE_BINARY_DIR}) |
||||
target_compile_definitions(check_fpu_endian PUBLIC OBJDIR="${OBJDIR}" BUILDDIR="${BUILDDIR}" SRCDIR="${SRCDIR}") |
||||
|
||||
# check_clamd is used by the clamd tests |
||||
add_executable(check_clamd) |
||||
target_sources(check_clamd |
||||
PRIVATE check_clamd.c checks.h) |
||||
target_link_libraries(check_clamd |
||||
PRIVATE |
||||
libcheck::check |
||||
ClamAV::shared |
||||
clamav_obj |
||||
regex |
||||
lzma_sdk |
||||
yara |
||||
tomsfastmath |
||||
bytecode_runtime |
||||
ClamAV::libunrar_iface_iface |
||||
JSONC::jsonc |
||||
ClamAV::libmspack |
||||
OpenSSL::SSL |
||||
OpenSSL::Crypto |
||||
ZLIB::ZLIB |
||||
BZip2::BZip2 |
||||
PCRE2::pcre2 |
||||
LibXml2::LibXml2) |
||||
target_include_directories(check_clamd PRIVATE ${PROJECT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/libclamav ${CMAKE_BINARY_DIR}) |
||||
target_compile_definitions(check_clamd PUBLIC OBJDIR="${OBJDIR}" BUILDDIR="${BUILDDIR}" SRCDIR="${SRCDIR}") |
||||
ADD_CUSTOM_COMMAND(TARGET check_clamd |
||||
POST_BUILD |
||||
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/input/clamav.hdb ${CMAKE_CURRENT_BINARY_DIR}/.) |
||||
endif() |
||||
|
||||
# |
||||
# Test executables |
||||
# |
||||
add_executable(check_clamav) |
||||
target_sources(check_clamav |
||||
PRIVATE |
||||
checks.h |
||||
check_bytecode.c |
||||
check_clamav.c |
||||
check_disasm.c |
||||
check_htmlnorm.c |
||||
check_jsnorm.c |
||||
check_matchers.c |
||||
check_regex.c |
||||
check_str.c |
||||
check_uniq.c) |
||||
target_link_libraries(check_clamav |
||||
PRIVATE |
||||
libcheck::check |
||||
clamav_obj |
||||
regex |
||||
lzma_sdk |
||||
yara |
||||
tomsfastmath |
||||
bytecode_runtime |
||||
ClamAV::libunrar_iface_iface |
||||
JSONC::jsonc |
||||
ClamAV::libmspack |
||||
OpenSSL::SSL |
||||
OpenSSL::Crypto |
||||
ZLIB::ZLIB |
||||
BZip2::BZip2 |
||||
PCRE2::pcre2 |
||||
LibXml2::LibXml2) |
||||
target_include_directories(check_clamav PRIVATE ${PROJECT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/libclamav ${CMAKE_BINARY_DIR}) |
||||
target_compile_definitions(check_clamav PUBLIC OBJDIR="${OBJDIR}" BUILDDIR="${BUILDDIR}" SRCDIR="${SRCDIR}") |
||||
|
||||
# |
||||
# Paths to pass to our tests via environment variables |
||||
# |
||||
if(WIN32) |
||||
file(TO_NATIVE_PATH ${CMAKE_SOURCE_DIR} SOURCE) |
||||
file(TO_NATIVE_PATH ${CMAKE_BINARY_DIR} BUILD) |
||||
file(TO_NATIVE_PATH ${CMAKE_CURRENT_BINARY_DIR} TMP) |
||||
|
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/check_clamav.exe CHECK_CLAMAV) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/check_clamd.exe CHECK_CLAMD) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/check_fpu_endian.exe CHECK_FPU_ENDIAN) |
||||
|
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clambc.exe CLAMBC) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamd.exe CLAMD) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamdscan.exe CLAMDSCAN) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamdtop.exe CLAMDTOP) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamscan.exe CLAMSCAN) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamsubmit.exe CLAMSUBMIT) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/clamconf.exe CLAMCONF) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/freshclam.exe FRESHCLAM) |
||||
file(TO_NATIVE_PATH $<TARGET_FILE_DIR:check_clamav>/sigtool.exe SIGTOOL) |
||||
else() |
||||
set(LD_LIBRARY_PATH $<TARGET_FILE_DIR:ClamAV::libclamav>:$<TARGET_FILE_DIR:ClamAV::libmspack:$<TARGET_FILE_DIR:ClamAV::libunrar_iface>:$<TARGET_FILE_DIR:ClamAV::libunrar:$<TARGET_FILE_DIR:ClamAV::libfreshclam>:$ENV{LD_LIBRARY_PATH}) |
||||
|
||||
set(SOURCE ${CMAKE_SOURCE_DIR}) |
||||
set(BUILD ${CMAKE_BINARY_DIR}) |
||||
set(TMP ${CMAKE_CURRENT_BINARY_DIR}) |
||||
|
||||
set(CHECK_CLAMAV $<TARGET_FILE:check_clamav>) |
||||
set(CHECK_CLAMD $<TARGET_FILE:check_clamd>) |
||||
set(CHECK_FPU_ENDIAN $<TARGET_FILE:check_fpu_endian>) |
||||
|
||||
set(CLAMBC $<TARGET_FILE:clambc>) |
||||
set(CLAMD $<TARGET_FILE:clamd>) |
||||
set(CLAMDSCAN $<TARGET_FILE:clamdscan>) |
||||
set(CLAMDTOP $<TARGET_FILE:clamdtop>) |
||||
set(CLAMSCAN $<TARGET_FILE:clamscan>) |
||||
set(CLAMSUBMIT $<TARGET_FILE:clamsubmit>) |
||||
set(CLAMCONF $<TARGET_FILE:clamconf>) |
||||
set(FRESHCLAM $<TARGET_FILE:freshclam-bin>) |
||||
set(SIGTOOL $<TARGET_FILE:sigtool>) |
||||
|
||||
set(CLAMAV_MILTER $<TARGET_FILE:clamav-milter>) |
||||
set(CLAMONACC $<TARGET_FILE:clamonacc>) |
||||
endif() |
||||
|
||||
set(ENVIRONMENT |
||||
PYTHONTRACEMALLOC=1 VERSION=${PROJECT_VERSION}${VERSION_SUFFIX} |
||||
SOURCE=${SOURCE} BUILD=${BUILD} TMP=${TMP} |
||||
CK_FORK=no |
||||
CK_DEFAULT_TIMEOUT=60 |
||||
LD_LIBRARY_PATH=${LD_LIBRARY_PATH} |
||||
SOURCE=${SOURCE} |
||||
BUILD=${BUILD} |
||||
TMP=${TMP} |
||||
CHECK_CLAMAV=${CHECK_CLAMAV} |
||||
CHECK_CLAMD=${CHECK_CLAMD} |
||||
CHECK_FPU_ENDIAN=${CHECK_FPU_ENDIAN} |
||||
CLAMBC=${CLAMBC} |
||||
CLAMD=${CLAMD} |
||||
CLAMDSCAN=${CLAMDSCAN} |
||||
CLAMDTOP=${CLAMDTOP} |
||||
CLAMSCAN=${CLAMSCAN} |
||||
CLAMSUBMIT=${CLAMSUBMIT} |
||||
CLAMCONF=${CLAMCONF} |
||||
FRESHCLAM=${FRESHCLAM} |
||||
SIGTOOL=${SIGTOOL} |
||||
CLAMAV_MILTER=${CLAMAV_MILTER} |
||||
CLAMONACC=${CLAMONACC} |
||||
) |
||||
|
||||
# |
||||
# The Tests |
||||
# ~~~~~~~~~ |
||||
# |
||||
# Run all tests with: `ctest` |
||||
# or: `ctest -V` for verbose output |
||||
# |
||||
# Run a specific test like this: |
||||
# `ctest -V -R libclamav_valgrind_test` |
||||
# |
||||
add_test(NAME libclamav COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;libclamav_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST libclamav PROPERTY ENVIRONMENT ${ENVIRONMENT}) |
||||
if(Valgrind_FOUND) |
||||
add_test(NAME libclamav_valgrind COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;libclamav_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST libclamav_valgrind PROPERTY ENVIRONMENT ${ENVIRONMENT} VALGRIND=${Valgrind_EXECUTABLE}) |
||||
endif() |
||||
|
||||
if(ENABLE_APP) |
||||
add_test(NAME clamscan COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;clamscan_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST clamscan PROPERTY ENVIRONMENT ${ENVIRONMENT}) |
||||
if(Valgrind_FOUND) |
||||
add_test(NAME clamscan_valgrind COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;clamscan_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST clamscan_valgrind PROPERTY ENVIRONMENT ${ENVIRONMENT} VALGRIND=${Valgrind_EXECUTABLE}) |
||||
endif() |
||||
|
||||
add_test(NAME clamd COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;clamd_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST clamd PROPERTY ENVIRONMENT ${ENVIRONMENT}) |
||||
if(Valgrind_FOUND) |
||||
add_test(NAME clamd_valgrind COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;clamd_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST clamd_valgrind PROPERTY ENVIRONMENT ${ENVIRONMENT} VALGRIND=${Valgrind_EXECUTABLE}) |
||||
endif() |
||||
|
||||
add_test(NAME freshclam COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;freshclam_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST freshclam PROPERTY ENVIRONMENT ${ENVIRONMENT}) |
||||
if(Valgrind_FOUND) |
||||
add_test(NAME freshclam_valgrind COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;freshclam_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST freshclam_valgrind PROPERTY ENVIRONMENT ${ENVIRONMENT} VALGRIND=${Valgrind_EXECUTABLE}) |
||||
endif() |
||||
|
||||
add_test(NAME sigtool COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;sigtool_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST sigtool PROPERTY ENVIRONMENT ${ENVIRONMENT}) |
||||
if(Valgrind_FOUND) |
||||
add_test(NAME sigtool_valgrind COMMAND ${Python3_EXECUTABLE} -m;unittest;--verbose;sigtool_test.py |
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) |
||||
set_property(TEST sigtool_valgrind PROPERTY ENVIRONMENT ${ENVIRONMENT} VALGRIND=${Valgrind_EXECUTABLE}) |
||||
endif() |
||||
endif() |
||||
|
||||
if(WIN32) |
||||
# |
||||
# Prepare a test install, with all our DLL dependencies co-located with our EXEs and DLLs |
||||
# Generate GetLibs-$<CONFIG>.ctest which will collect all required DLL and EXE dependencies when `ctest` is run. |
||||
# |
||||
if(ENABLE_APP) |
||||
set(GEN_SCRIPT [[ |
||||
# Collect runtime DLL dependencies for our libs and apps |
||||
file(GET_RUNTIME_DEPENDENCIES |
||||
LIBRARIES |
||||
$<TARGET_FILE:ClamAV::libclamav> |
||||
$<TARGET_FILE:ClamAV::libfreshclam> |
||||
EXECUTABLES |
||||
$<TARGET_FILE:check_clamav> |
||||
$<TARGET_FILE:check_fpu_endian> |
||||
$<TARGET_FILE:check_clamd> |
||||
$<TARGET_FILE:clambc> |
||||
$<TARGET_FILE:clamd> |
||||
$<TARGET_FILE:clamdscan> |
||||
$<TARGET_FILE:clamdtop> |
||||
$<TARGET_FILE:clamscan> |
||||
$<TARGET_FILE:clamsubmit> |
||||
$<TARGET_FILE:clamconf> |
||||
$<TARGET_FILE:freshclam-bin> |
||||
$<TARGET_FILE:sigtool> |
||||
RESOLVED_DEPENDENCIES_VAR _r_deps |
||||
UNRESOLVED_DEPENDENCIES_VAR _u_deps |
||||
DIRECTORIES |
||||
$<TARGET_FILE_DIR:OpenSSL::SSL> |
||||
$<TARGET_FILE_DIR:OpenSSL::Crypto> |
||||
$<TARGET_FILE_DIR:ZLIB::ZLIB> |
||||
$<TARGET_FILE_DIR:BZip2::BZip2> |
||||
$<TARGET_FILE_DIR:PCRE2::pcre2> |
||||
$<TARGET_FILE_DIR:LibXml2::LibXml2> |
||||
$<TARGET_FILE_DIR:CURL::libcurl> |
||||
$<TARGET_FILE_DIR:JSONC::jsonc> |
||||
CONFLICTING_DEPENDENCIES_PREFIX CTEST_CONFLICTING_DEPENDENCIES |
||||
) |
||||
foreach(_file ${_r_deps}) |
||||
string(TOLOWER ${_file} _file_lower) |
||||
if(NOT ${_file_lower} MATCHES "c:[\\/]windows[\\/]system32.*") |
||||
message("Collecting DLL dependency: ${_file}") |
||||
file(COPY ${_file} DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
endif() |
||||
endforeach() |
||||
|
||||
# Collect our libs |
||||
file(COPY $<TARGET_FILE:ClamAV::libclamav> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libmspack> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libfreshclam> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libunrar> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libunrar_iface> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
|
||||
# Collect our apps |
||||
file(COPY $<TARGET_FILE:check_fpu_endian> DESTINATION $<TARGET_FILE_DIR:check_fpu_endian>) |
||||
file(COPY $<TARGET_FILE:check_clamd> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clambc> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamd> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamdscan> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamdtop> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamscan> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamsubmit> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:clamconf> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:freshclam-bin> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:sigtool> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
]]) |
||||
else() |
||||
# We don't have libfreshclam unit tests, so no need to check if ENABLE_LIBCLAMAV_ONLY is enabled. |
||||
set(GEN_SCRIPT [[ |
||||
# Collect runtime DLL dependencies for our libs |
||||
file(GET_RUNTIME_DEPENDENCIES |
||||
LIBRARIES |
||||
$<TARGET_FILE:ClamAV::libclamav> |
||||
EXECUTABLES |
||||
$<TARGET_FILE:check_clamav> |
||||
RESOLVED_DEPENDENCIES_VAR _r_deps |
||||
UNRESOLVED_DEPENDENCIES_VAR _u_deps |
||||
DIRECTORIES |
||||
$<TARGET_FILE_DIR:OpenSSL::SSL> |
||||
$<TARGET_FILE_DIR:OpenSSL::Crypto> |
||||
$<TARGET_FILE_DIR:ZLIB::ZLIB> |
||||
$<TARGET_FILE_DIR:BZip2::BZip2> |
||||
$<TARGET_FILE_DIR:PCRE2::pcre2> |
||||
$<TARGET_FILE_DIR:LibXml2::LibXml2> |
||||
$<TARGET_FILE_DIR:JSONC::jsonc> |
||||
CONFLICTING_DEPENDENCIES_PREFIX CTEST_CONFLICTING_DEPENDENCIES |
||||
) |
||||
foreach(_file ${_r_deps}) |
||||
string(TOLOWER ${_file} _file_lower) |
||||
if(NOT ${_file_lower} MATCHES "c:[\\/]windows[\\/]system32.*") |
||||
message("DEPENDENCY: ${_file}") |
||||
file(COPY ${_file} DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
endif() |
||||
endforeach() |
||||
|
||||
# Collect our libs |
||||
file(COPY $<TARGET_FILE:ClamAV::libclamav> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libmspack> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libunrar> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
file(COPY $<TARGET_FILE:ClamAV::libunrar_iface> DESTINATION $<TARGET_FILE_DIR:check_clamav>) |
||||
]]) |
||||
endif() |
||||
|
||||
file(GENERATE OUTPUT GetLibs-$<CONFIG>.ctest CONTENT ${GEN_SCRIPT}) |
||||
set_directory_properties(PROPERTIES TEST_INCLUDE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/Run-GetLibs.ctest) |
||||
endif() |
||||
@ -0,0 +1 @@ |
||||
include(GetLibs-${CTEST_CONFIGURATION_TYPE}.ctest) |
||||
@ -1,4 +0,0 @@ |
||||
#ifndef CHECKS_COMMON_H |
||||
#define CHECKS_COMMON_H |
||||
|
||||
#endif |
||||
@ -0,0 +1,455 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Run clamd tests. |
||||
""" |
||||
|
||||
import os |
||||
from pathlib import Path |
||||
import platform |
||||
import socket |
||||
import subprocess |
||||
import shutil |
||||
import sys |
||||
import time |
||||
import unittest |
||||
|
||||
import testcase |
||||
|
||||
|
||||
os_platform = platform.platform() |
||||
operating_system = os_platform.split('-')[0].lower() |
||||
|
||||
def check_port_available(port_num: int) -> bool: |
||||
''' |
||||
Check if port # is available |
||||
''' |
||||
port_is_available = True # It's probably available... |
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
||||
location = ("127.0.0.1", port_num) |
||||
|
||||
result_of_check = sock.connect_ex(location) |
||||
if result_of_check == 0: |
||||
port_is_available = False # Oh nevermind! Someone was listening! |
||||
sock.close() |
||||
|
||||
return port_is_available |
||||
|
||||
class TC(testcase.TestCase): |
||||
@classmethod |
||||
def setUpClass(cls): |
||||
super(TC, cls).setUpClass() |
||||
|
||||
TC.testpaths = list(TC.path_build.glob('test/clam*')) # A list of Path()'s of each of our generated test files |
||||
|
||||
TC.clamd_pid = TC.path_tmp / 'clamd-test.pid' |
||||
TC.clamd_socket = TC.path_build / 'unit_tests' / 'clamd-test.socket' # <-- this is hard-coded into the `check_clamd` program |
||||
TC.clamd_port_num = 3319 # <-- this is hard-coded into the `check_clamd` program |
||||
TC.path_db = TC.path_tmp / 'database' |
||||
TC.path_db.mkdir(parents=True) |
||||
shutil.copy( |
||||
str(TC.path_build / 'unit_tests' / 'clamav.hdb'), |
||||
str(TC.path_db), |
||||
) |
||||
shutil.copy( |
||||
str(TC.path_source / 'unit_tests' / 'input' / 'daily.pdb'), |
||||
str(TC.path_db), |
||||
) |
||||
|
||||
# Identify a TCP port we can use. |
||||
# Presently disabled because check_clamd's port # is hardcoded. |
||||
#found_open_port = False |
||||
#for port_num in range(3310, 3410): |
||||
# if check_port_available(port_num) == True: |
||||
# found_open_port = True |
||||
# break |
||||
#assert found_open_port == True |
||||
|
||||
# Prep a clamd.conf to use for most (if not all) of the tests. |
||||
config = f''' |
||||
Foreground yes |
||||
PidFile {TC.clamd_pid} |
||||
DatabaseDirectory {TC.path_db} |
||||
LogFileMaxSize 0 |
||||
LogTime yes |
||||
#Debug yes |
||||
LogClean yes |
||||
LogVerbose yes |
||||
ExitOnOOM yes |
||||
DetectPUA yes |
||||
ScanPDF yes |
||||
CommandReadTimeout 1 |
||||
MaxQueue 800 |
||||
MaxConnectionQueueLength 1024 |
||||
''' |
||||
if operating_system == 'windows': |
||||
# Only have TCP socket option for Windows. |
||||
config += f''' |
||||
TCPSocket {TC.clamd_port_num} |
||||
TCPAddr 127.0.0.1 |
||||
''' |
||||
else: |
||||
# Use LocalSocket for Posix, because that's what check_clamd expects. |
||||
config += f''' |
||||
LocalSocket {TC.clamd_socket} |
||||
TCPSocket {TC.clamd_port_num} |
||||
TCPAddr 127.0.0.1 |
||||
''' |
||||
|
||||
TC.clamd_config = TC.path_tmp / 'clamd-test.conf' |
||||
TC.clamd_config.write_text(config) |
||||
|
||||
# Check if fdpassing is supported. |
||||
TC.has_fdpass_support = False |
||||
with (TC.path_build / 'clamav-config.h').open('r') as clamav_config: |
||||
if "#define HAVE_FD_PASSING 1" in clamav_config.read(): |
||||
TC.has_fdpass_support = True |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
super(TC, cls).tearDownClass() |
||||
|
||||
def setUp(self): |
||||
super(TC, self).setUp() |
||||
self.proc = None |
||||
|
||||
def tearDown(self): |
||||
super(TC, self).tearDown() |
||||
|
||||
# Kill clamd (if running) |
||||
if self.proc != None: |
||||
try: |
||||
self.proc.terminate() |
||||
self.proc.wait(timeout=120) |
||||
self.proc.stdin.close() |
||||
except OSError as exc: |
||||
self.log.warning(f'Unexpected exception {exc}') |
||||
pass # ignore |
||||
self.proc = None |
||||
TC.clamd_pid.unlink(missing_ok=True) |
||||
TC.clamd_socket.unlink(missing_ok=True) |
||||
|
||||
self.verify_valgrind_log() |
||||
|
||||
def start_clamd(self): |
||||
''' |
||||
Start clamd |
||||
''' |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamd} --config-file={TC.clamd_config}' |
||||
self.log.info(f'Starting clamd: {command}') |
||||
self.proc = subprocess.Popen( |
||||
command.strip().split(' '), |
||||
stdin=subprocess.PIPE, |
||||
stdout=sys.stdout.buffer, |
||||
stderr=sys.stdout.buffer, |
||||
) |
||||
|
||||
def run_clamdscan(self, |
||||
scan_args, |
||||
expected_ec=0, |
||||
expected_out=[], |
||||
expected_err=[], |
||||
unexpected_out=[], |
||||
unexpected_err=[]): |
||||
''' |
||||
Run clamdscan in each mode |
||||
The first scan uses ping & wait to give clamd time to start. |
||||
''' |
||||
# default (filepath) mode |
||||
output = self.execute_command(f'{TC.clamdscan} --ping 5 --wait -c {TC.clamd_config} {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
# multi mode |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} -m {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
if TC.has_fdpass_support: |
||||
# fdpass |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} --fdpass {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
# fdpass multi mode |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} --fdpass -m {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
# stream |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} --stream {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
# stream multi mode |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} --stream -m {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
def run_clamdscan_file_only(self, |
||||
scan_args, |
||||
expected_ec=0, |
||||
expected_out=[], |
||||
expected_err=[], |
||||
unexpected_out=[], |
||||
unexpected_err=[]): |
||||
''' |
||||
Run clamdscan in filepath mode (and filepath multi mode) |
||||
The first scan uses ping & wait to give clamd time to start. |
||||
''' |
||||
# default mode |
||||
output = self.execute_command(f'{TC.clamdscan} --ping 5 --wait -c {TC.clamd_config} {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
# multi mode |
||||
output = self.execute_command(f'{TC.clamdscan} -c {TC.clamd_config} -m {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
def run_clamdscan_fdpass_only(self, |
||||
scan_args, |
||||
expected_ec=0, |
||||
expected_out=[], |
||||
expected_err=[], |
||||
unexpected_out=[], |
||||
unexpected_err=[]): |
||||
''' |
||||
Run clamdscan fdpass mode only |
||||
Use ping & wait to give clamd time to start. |
||||
''' |
||||
# fdpass |
||||
output = self.execute_command(f'{TC.clamdscan} --ping 5 --wait -c {TC.clamd_config} --fdpass {scan_args}') |
||||
assert output.ec == expected_ec |
||||
if expected_out != [] or unexpected_out != []: |
||||
self.verify_output(output.out, expected=expected_out, unexpected=unexpected_out) |
||||
if expected_err != [] or unexpected_err != []: |
||||
self.verify_output(output.err, expected=expected_err, unexpected=unexpected_err) |
||||
|
||||
def test_clamd_00_version(self): |
||||
''' |
||||
verify that clamd -v returns the version |
||||
''' |
||||
self.step_name('clamd version test') |
||||
|
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamd} --config-file={TC.clamd_config} -V' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
f'ClamAV {TC.version}', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamd_01_ping_pong(self): |
||||
''' |
||||
Verify that clamd responds to a PING command |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan PING PONG feature') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
output = self.execute_command(f'{TC.clamdscan} -p 5 -c {TC.clamd_config}') |
||||
|
||||
assert output.ec == 0 # success |
||||
self.verify_output(output.out, expected=['PONG']) |
||||
|
||||
def test_clamd_02_clamdscan_version(self): |
||||
''' |
||||
Verify that clamdscan --version returns the expected version # |
||||
Explanation: clamdscan --version will query clamd for it's version |
||||
and print out clamd's version. If it can't connect to clamd, it'll |
||||
throw and error saying as much and then report it's own version. |
||||
|
||||
In this test, we want to check clamd's version through clamdscan. |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan version feature') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
# First we'll ping-pong to make sure clamd is up |
||||
# If clamd isn't up before the version test, clamdscan will return it's |
||||
# own version, which isn't really the point of the test. |
||||
output = self.execute_command(f'{TC.clamdscan} --ping 5 -c {TC.clamd_config}') |
||||
assert output.ec == 0 # success |
||||
self.verify_output(output.out, expected=['PONG']) |
||||
|
||||
# Ok now it's up, let's check clamd's version via clamdscan. |
||||
output = self.execute_command(f'{TC.clamdscan} --version -c {TC.clamd_config}') |
||||
assert output.ec == 0 # success |
||||
self.verify_output(output.out, |
||||
expected=[f'ClamAV {TC.version}'], unexpected=['Could not connect to clamd']) |
||||
|
||||
def test_clamd_03_reload(self): |
||||
''' |
||||
In this test, it is not supposed to detect until we actually put the |
||||
signature there and reload! |
||||
''' |
||||
self.step_name('Test scan before & after reload') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
(TC.path_tmp / 'reload-testfile').write_bytes(b'ClamAV-RELOAD-Test') |
||||
|
||||
self.run_clamdscan(f'{TC.path_tmp / "reload-testfile"}', |
||||
expected_ec=0, expected_out=['reload-testfile: OK', 'Infected files: 0']) |
||||
|
||||
(TC.path_db / 'reload-test.ndb').write_text('ClamAV-RELOAD-TestFile:0:0:436c616d41562d52454c4f41442d54657374') |
||||
|
||||
output = self.execute_command(f'{TC.clamdscan} --reload -c {TC.clamd_config}') |
||||
assert output.ec == 0 # success |
||||
|
||||
time.sleep(2) # give clamd a moment to reload before trying again |
||||
# with multi-threaded reloading will clamd would happily |
||||
# re-scan with the old engine while it reloads. |
||||
|
||||
self.run_clamdscan(f'{TC.path_tmp / "reload-testfile"}', |
||||
expected_ec=1, expected_out=['ClamAV-RELOAD-TestFile.UNOFFICIAL FOUND', 'Infected files: 1']) |
||||
|
||||
def test_clamd_04_all_testfiles(self): |
||||
''' |
||||
Verify that clamd + clamdscan detect each of our <build>/test/clam* test files. |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan scan of all `test` files') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
expected_results = [f'{testpath.name}: ClamAV-Test-File.UNOFFICIAL FOUND' for testpath in TC.testpaths] |
||||
expected_results.append(f'Infected files: {len(TC.testpaths)}') |
||||
|
||||
self.run_clamdscan(f'{testfiles}', |
||||
expected_ec=1, expected_out=expected_results) |
||||
|
||||
def test_clamd_05_check_clamd(self): |
||||
''' |
||||
Uses the check_clamd program to test clamd's socket API in various ways |
||||
that aren't possible with clamdscan. |
||||
''' |
||||
self.step_name('Testing clamd + check_clamd') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
# Let's first use the ping-pong test to make sure clamd is listening. |
||||
output = self.execute_command(f'{TC.clamdscan} -p 5 -c {TC.clamd_config}') |
||||
assert output.ec == 0 # success |
||||
self.verify_output(output.out, expected=['PONG']) |
||||
|
||||
# Ok now run check_clamd to have fun with clamd's API |
||||
output = self.execute_command(f'{TC.check_clamd}') |
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
'100%', 'Failures: 0', 'Errors: 0' |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
# Let's do another ping-pong test to see if `check_clamd` killed clamd (Mu-ha-ha). |
||||
output = self.execute_command(f'{TC.clamdscan} -p 5 -c {TC.clamd_config}') |
||||
assert output.ec == 0 # success |
||||
self.verify_output(output.out, expected=['PONG']) |
||||
|
||||
def test_clamd_06_HeuristicScanPrecedence_off(self): |
||||
''' |
||||
Verify that HeuristicScanPrecedence off works as expected (default) |
||||
In a later test, we'll add `HeuristicScanPrecedence yes` to the config |
||||
and retest with it on. |
||||
|
||||
With it off, we expect the scan to complete and the "real" virus to alert |
||||
rather than the heuristic. |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan w/ HeuristicScanPrecedence no (default)') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
self.run_clamdscan(f'{TC.path_build / "unit_tests" / "clam-phish-exe"}', |
||||
expected_ec=1, expected_out=['ClamAV-Test-File']) |
||||
|
||||
def test_clamd_07_HeuristicScanPrecedence_on(self): |
||||
''' |
||||
Verify that HeuristicScanPrecedence on works as expected. |
||||
|
||||
With it on, we expect the scan to stop and raise an alert as soon as |
||||
the phishing heuristic is detected. |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan w/ HeuristicScanPrecedence yes') |
||||
|
||||
with TC.clamd_config.open('a') as config: |
||||
config.write(''' |
||||
HeuristicScanPrecedence yes |
||||
''') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
self.run_clamdscan(f'{TC.path_build / "unit_tests" / "clam-phish-exe"}', |
||||
expected_ec=1, expected_out=['Heuristics.Phishing.Email.SpoofedDomain']) |
||||
|
||||
@unittest.skipIf(operating_system == 'windows', 'This test uses a shell script to test virus-action. TODO: add Windows support to this test.') |
||||
def test_clamd_08_VirusEvent(self): |
||||
''' |
||||
Test that VirusEvent works |
||||
''' |
||||
self.step_name('Testing clamd + clamdscan w/ VirusEvent') |
||||
|
||||
with TC.clamd_config.open('a') as config: |
||||
config.write(f'VirusEvent {TC.path_source / "unit_tests" / "virusaction-test.sh"} {TC.path_tmp} "Virus found: %v"\n') |
||||
|
||||
self.start_clamd() |
||||
|
||||
poll = self.proc.poll() |
||||
assert poll == None # subprocess is alive if poll() returns None |
||||
|
||||
self.run_clamdscan_file_only(f'{TC.path_build / "test" / "clam.exe"}', |
||||
expected_ec=1)#, expected_out=['Virus found: ClamAV-Test-File.UNOFFICIAL']) |
||||
|
||||
self.log.info(f'verifying log output from virusaction-test.sh: {str(TC.path_tmp / "test-clamd.log")}') |
||||
self.verify_log(str(TC.path_tmp / 'test-clamd.log'), |
||||
expected=['Virus found: ClamAV-Test-File.UNOFFICIAL'], |
||||
unexpected=['VirusEvent incorrect', 'VirusName incorrect']) |
||||
@ -0,0 +1,220 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Run clamscan tests. |
||||
""" |
||||
|
||||
import os |
||||
from pathlib import Path |
||||
import platform |
||||
import shutil |
||||
import subprocess |
||||
import sys |
||||
import time |
||||
import unittest |
||||
|
||||
import testcase |
||||
|
||||
|
||||
os_platform = platform.platform() |
||||
operating_system = os_platform.split('-')[0].lower() |
||||
|
||||
|
||||
class TC(testcase.TestCase): |
||||
@classmethod |
||||
def setUpClass(cls): |
||||
super(TC, cls).setUpClass() |
||||
|
||||
TC.testpaths = list(TC.path_build.glob('test/clam*')) # A list of Path()'s of each of our generated test files |
||||
|
||||
# Prepare a directory to store our test databases |
||||
TC.path_db = TC.path_tmp / 'database' |
||||
TC.path_db.mkdir(parents=True) |
||||
|
||||
shutil.copy( |
||||
str(TC.path_build / 'unit_tests' / 'clamav.hdb'), |
||||
str(TC.path_db), |
||||
) |
||||
|
||||
(TC.path_db / 'clamav.ign2').write_text('ClamAV-Test-File\n') |
||||
|
||||
(TC.path_db / 'phish.pdb').write_text('H:example.com\n') |
||||
|
||||
(TC.path_db / 'icon.idb').write_text( |
||||
"EA0X-32x32x8:ea0x-grp1:ea0x-grp2:2046f030a42a07153f4120a0031600007000005e1617ef0000d21100cb090674150f880313970b0e7716116d01136216022500002f0a173700081a004a0e\n" |
||||
"IScab-16x16x8:iscab-grp1:iscab-grp2:107b3000168306015c20a0105b07060be0a0b11c050bea0706cb0a0bbb060b6f00017c06018301068109086b03046705081b000a270a002a000039002b17\n" |
||||
) |
||||
(TC.path_db / 'icon.ldb').write_text( |
||||
"ClamAV-Test-Icon-EA0X;Engine:52-1000,Target:1,IconGroup1:ea0x-grp1,IconGroup2:*;(0);0:4d5a\n" |
||||
"ClamAV-Test-Icon-IScab;Engine:52-1000,Target:1,IconGroup2:iscab-grp2;(0);0:4d5a\n" |
||||
) |
||||
(TC.path_db / 'Clam-VI.ldb').write_text( |
||||
"Clam-VI-Test:Target;Engine:52-255,Target:1;(0&1);VI:43006f006d00700061006e0079004e0061006d0065000000000063006f006d00700061006e007900;VI:500072006f0064007500630074004e0061006d0065000000000063006c0061006d00\n" |
||||
) |
||||
(TC.path_db / 'yara-at-offset.yara').write_text( |
||||
"rule yara_at_offset {strings: $tar_magic = { 75 73 74 61 72 } condition: $tar_magic at 257}\n" |
||||
) |
||||
(TC.path_db / 'yara-in-range.yara').write_text( |
||||
"rule yara_in_range {strings: $tar_magic = { 75 73 74 61 72 } condition: $tar_magic in (200..300)}\n" |
||||
) |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
super(TC, cls).tearDownClass() |
||||
|
||||
def setUp(self): |
||||
super(TC, self).setUp() |
||||
|
||||
def tearDown(self): |
||||
super(TC, self).tearDown() |
||||
self.verify_valgrind_log() |
||||
|
||||
def test_clamscan_00_version(self): |
||||
self.step_name('clamscan version test') |
||||
|
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -V' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
f'ClamAV {TC.version}', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_01_all_testfiles(self): |
||||
self.step_name('Test that clamscan alerts on all test files') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "clamav.hdb"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [f'{testpath.name}: ClamAV-Test-File.UNOFFICIAL FOUND' for testpath in TC.testpaths] |
||||
expected_results.append(f'Scanned files: {len(TC.testpaths)}') |
||||
expected_results.append(f'Infected files: {len(TC.testpaths)}') |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_02_all_testfiles_ign2(self): |
||||
self.step_name('Test that clamscan ignores ClamAV-Test-File alerts') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "clamav.hdb"} -d {TC.path_db / "clamav.ign2"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [f'{testpath.name}: ClamAV-Test-File.UNOFFICIAL FOUND' for testpath in TC.testpaths] |
||||
expected_results.append(f'Scanned files: {len(TC.testpaths)}') |
||||
expected_results.append(f'Infected files: {len(TC.testpaths)}') |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_03_phish_test_not_enabled(self): |
||||
self.step_name('Test that clamscan will load the phishing sigs w/out issue') |
||||
|
||||
testpaths = list(TC.path_source.glob('unit_tests/input/phish-test-*')) |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "phish.pdb"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # virus NOT found |
||||
|
||||
expected_results = [ |
||||
'Scanned files: 3', |
||||
'Infected files: 0', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_04_phish_test_alert_phishing_ssl_alert_phishing_cloak(self): |
||||
self.step_name('Test clamscan --alert-phishing-ssl --alert-phishing-cloak') |
||||
|
||||
testpaths = list(TC.path_source.glob('unit_tests/input/phish-test-*')) |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "phish.pdb"} --alert-phishing-ssl --alert-phishing-cloak {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [ |
||||
'phish-test-ssl: Heuristics.Phishing.Email.SSL-Spoof FOUND', |
||||
'phish-test-cloak: Heuristics.Phishing.Email.Cloaked.Null FOUND', |
||||
'Scanned files: 3', |
||||
'Infected files: 2', # there's a clean one |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_05_icon(self): |
||||
self.step_name('Test icon (.ldb + .idb) signatures') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "icon.ldb"} -d {TC.path_db / "icon.idb"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
# Use check_fpu_endian to determine expected results |
||||
command = f'{TC.check_fpu_endian}' |
||||
fpu_endian_output = self.execute_command(command) |
||||
|
||||
expected_results = [ |
||||
'clam_IScab_ext.exe: ClamAV-Test-Icon-IScab.UNOFFICIAL FOUND', |
||||
'clam_IScab_int.exe: ClamAV-Test-Icon-IScab.UNOFFICIAL FOUND', |
||||
] |
||||
if fpu_endian_output.ec == 3: |
||||
expected_num_infected = 3 |
||||
else: |
||||
expected_results.append('clam.ea06.exe: ClamAV-Test-Icon-EA0X.UNOFFICIAL FOUND') |
||||
expected_num_infected = 4 |
||||
expected_results.append(f'Infected files: {expected_num_infected}') |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_06_LDB_VI(self): |
||||
self.step_name('Test LDB VI feature') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "Clam-VI.ldb"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [ |
||||
'clam_ISmsi_ext.exe: Clam-VI-Test:Target.UNOFFICIAL FOUND', |
||||
'clam_ISmsi_int.exe: Clam-VI-Test:Target.UNOFFICIAL FOUND', |
||||
'Infected files: 2', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_07_yara_at_offset(self): |
||||
self.step_name('Test yara signature - detect TAR file magic at an offset') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "yara-at-offset.yara"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [ |
||||
'clam.tar.gz: YARA.yara_at_offset.UNOFFICIAL FOUND', |
||||
'clam_cache_emax.tgz: YARA.yara_at_offset.UNOFFICIAL FOUND', |
||||
'Infected files: 2', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_clamscan_08_yara_in_range(self): |
||||
self.step_name('Test yara signature - detect TAR file magic in a range') |
||||
|
||||
testfiles = ' '.join([str(testpath) for testpath in TC.testpaths]) |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.clamscan} -d {TC.path_db / "yara-in-range.yara"} {testfiles}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 1 # virus found |
||||
|
||||
expected_results = [ |
||||
'clam.tar.gz: YARA.yara_in_range.UNOFFICIAL FOUND', |
||||
'clam_cache_emax.tgz: YARA.yara_in_range.UNOFFICIAL FOUND', |
||||
'Infected files: 2', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
@ -0,0 +1,89 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Run freshclam tests |
||||
""" |
||||
|
||||
import os |
||||
from pathlib import Path |
||||
import platform |
||||
import shutil |
||||
import subprocess |
||||
import sys |
||||
import time |
||||
import unittest |
||||
|
||||
import testcase |
||||
|
||||
|
||||
os_platform = platform.platform() |
||||
operating_system = os_platform.split('-')[0].lower() |
||||
|
||||
|
||||
class TC(testcase.TestCase): |
||||
@classmethod |
||||
def setUpClass(cls): |
||||
super(TC, cls).setUpClass() |
||||
|
||||
# Prepare a directory to host our test databases |
||||
TC.path_www = Path(TC.path_tmp, 'www') |
||||
TC.path_www.mkdir() |
||||
shutil.copy( |
||||
str(TC.path_build / 'unit_tests' / 'clamav.hdb'), |
||||
str(TC.path_www), |
||||
) |
||||
|
||||
TC.path_db = Path(TC.path_tmp, 'database') |
||||
TC.freshclam_pid = Path(TC.path_tmp, 'freshclam-test.pid') |
||||
TC.freshclam_config = Path(TC.path_tmp, 'freshclam-test.conf') |
||||
TC.freshclam_config.write_text(f''' |
||||
DatabaseMirror 127.0.0.1 |
||||
PidFile {TC.freshclam_pid} |
||||
LogVerbose yes |
||||
LogFileMaxSize 0 |
||||
LogTime yes |
||||
DatabaseDirectory {TC.path_db} |
||||
DatabaseCustomURL file://{TC.path_www / "clamav.hdb"} |
||||
ExcludeDatabase daily |
||||
ExcludeDatabase main |
||||
ExcludeDatabase bytecode |
||||
''') |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
super(TC, cls).tearDownClass() |
||||
|
||||
def setUp(self): |
||||
super(TC, self).setUp() |
||||
|
||||
def tearDown(self): |
||||
super(TC, self).tearDown() |
||||
self.verify_valgrind_log() |
||||
|
||||
def test_freshclam_00_version(self): |
||||
self.step_name('freshclam version test') |
||||
|
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.freshclam} --config-file={TC.freshclam_config} -V' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
f'ClamAV {TC.version}', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
def test_freshclam_01_file_copy(self): |
||||
self.step_name('Basic freshclam test using file:// to "download" clamav.hdb') |
||||
|
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.freshclam} --config-file={TC.freshclam_config}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
f'Downloading clamav.hdb', |
||||
f'Database test passed.', |
||||
f'clamav.hdb updated', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
@ -0,0 +1,50 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Run libclamav unit tests |
||||
""" |
||||
|
||||
import os |
||||
from pathlib import Path |
||||
import platform |
||||
import subprocess |
||||
import sys |
||||
import time |
||||
import unittest |
||||
|
||||
import testcase |
||||
|
||||
|
||||
os_platform = platform.platform() |
||||
operating_system = os_platform.split('-')[0].lower() |
||||
|
||||
|
||||
class TC(testcase.TestCase): |
||||
@classmethod |
||||
def setUpClass(cls): |
||||
super(TC, cls).setUpClass() |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
super(TC, cls).tearDownClass() |
||||
|
||||
def setUp(self): |
||||
super(TC, self).setUp() |
||||
|
||||
def tearDown(self): |
||||
super(TC, self).tearDown() |
||||
self.verify_valgrind_log() |
||||
|
||||
def test_libclamav_00_unit_test(self): |
||||
self.step_name('libclamav unit tests') |
||||
|
||||
# If no valgrind, valgrind nad valgrind args are empty strings |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.check_clamav}' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
'100%', 'Failures: 0', 'Errors: 0' |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
@ -0,0 +1,76 @@ |
||||
# Copyright (C) 2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Run sigtool tests. |
||||
""" |
||||
|
||||
import os |
||||
from pathlib import Path |
||||
import platform |
||||
import shutil |
||||
import subprocess |
||||
import sys |
||||
import time |
||||
import unittest |
||||
|
||||
import testcase |
||||
|
||||
|
||||
os_platform = platform.platform() |
||||
operating_system = os_platform.split('-')[0].lower() |
||||
|
||||
|
||||
class TC(testcase.TestCase): |
||||
@classmethod |
||||
def setUpClass(cls): |
||||
super(TC, cls).setUpClass() |
||||
|
||||
# Prepare a directory to host our test databases |
||||
TC.path_www = TC.path_tmp / 'www' |
||||
TC.path_www.mkdir() |
||||
shutil.copy( |
||||
str(TC.path_build / 'unit_tests' / 'clamav.hdb'), |
||||
str(TC.path_www), |
||||
) |
||||
|
||||
TC.path_db = TC.path_tmp / 'database' |
||||
TC.sigtool_pid = TC.path_tmp / 'sigtool-test.pid' |
||||
TC.sigtool_config = TC.path_tmp / 'sigtool-test.conf' |
||||
TC.sigtool_config.write_text(f''' |
||||
DatabaseMirror 127.0.0.1 |
||||
PidFile {TC.sigtool_pid} |
||||
LogVerbose yes |
||||
LogFileMaxSize 0 |
||||
LogTime yes |
||||
DatabaseDirectory {TC.path_db} |
||||
DatabaseCustomURL file://{TC.path_www}/clamav.hdb |
||||
ExcludeDatabase daily |
||||
ExcludeDatabase main |
||||
ExcludeDatabase bytecode |
||||
''') |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
super(TC, cls).tearDownClass() |
||||
|
||||
def setUp(self): |
||||
super(TC, self).setUp() |
||||
|
||||
def tearDown(self): |
||||
super(TC, self).tearDown() |
||||
self.verify_valgrind_log() |
||||
|
||||
def test_sigtool_00_version(self): |
||||
self.step_name('sigtool version test') |
||||
|
||||
self.log.warning(f'VG: {os.getenv("VG")}') |
||||
command = f'{TC.valgrind} {TC.valgrind_args} {TC.sigtool} -V' |
||||
output = self.execute_command(command) |
||||
|
||||
assert output.ec == 0 # success |
||||
|
||||
expected_results = [ |
||||
f'ClamAV {TC.version}', |
||||
] |
||||
self.verify_output(output.out, expected=expected_results) |
||||
|
||||
@ -0,0 +1,918 @@ |
||||
# Copyright (C) 2017-2020 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
|
||||
""" |
||||
Wrapper for unittest to provide ClamAV specific test environment features. |
||||
""" |
||||
|
||||
from collections import namedtuple |
||||
import hashlib |
||||
import logging |
||||
import os |
||||
import platform |
||||
import re |
||||
import shutil |
||||
import signal |
||||
import subprocess |
||||
import sys |
||||
import tempfile |
||||
import threading |
||||
from typing import Union |
||||
import unittest |
||||
|
||||
from pathlib import Path |
||||
|
||||
EXECUTION_TIMEOUT = 200 |
||||
TIMEOUT_EXIT_CODE = 111 |
||||
|
||||
STRICT_ORDER = 0 |
||||
ANY_ORDER = 1 |
||||
CHUNK_SIZE = 100 |
||||
|
||||
loggers = {} |
||||
|
||||
|
||||
class TestCase(unittest.TestCase): |
||||
""" |
||||
This wrapper around unittest.TestCase provides added utilities and environment information. |
||||
""" |
||||
|
||||
version = "" |
||||
|
||||
path_source = None |
||||
path_build = None |
||||
path_tmp = None |
||||
|
||||
check_clamav = None |
||||
check_clamd = None |
||||
check_fpu_endian = None |
||||
milter = None |
||||
clambc = None |
||||
clamd = None |
||||
clamdscan = None |
||||
clamdtop = None |
||||
clamscan = None |
||||
clamsubmit = None |
||||
clamconf = None |
||||
clamonacc = None |
||||
freshclam = None |
||||
sigtool = None |
||||
|
||||
path_sample_config = None |
||||
|
||||
valgrind = "" # Not 'None' because we'll use this variable even if valgrind not found. |
||||
valgrind_args = "" |
||||
log_suffix = '.log' |
||||
|
||||
@classmethod |
||||
def setUpClass(cls): |
||||
""" |
||||
Initialize, to provide logging and test paths. |
||||
|
||||
Also initializes internal Executor and LogChecker required |
||||
for execute_command(), and verify_log() |
||||
""" |
||||
global loggers |
||||
|
||||
if loggers.get(cls.__name__): |
||||
cls.log = loggers.get(cls.__name__) |
||||
else: |
||||
cls.log = Logger(cls.__name__) |
||||
loggers[cls.__name__] = cls.log |
||||
|
||||
cls._executor = Executor() |
||||
cls._log_checker = LogChecker() |
||||
|
||||
os_platform = platform.platform() |
||||
cls.operating_system = os_platform.split("-")[0].lower() |
||||
|
||||
# Get test paths from environment variables. |
||||
cls.version = os.getenv("VERSION") |
||||
if cls.version == None: |
||||
raise Exception("VERSION environment variable not defined! Aborting...") |
||||
|
||||
cls.path_source = Path(os.getenv("SOURCE")) |
||||
cls.path_build = Path(os.getenv("BUILD")) |
||||
cls.path_tmp = Path(tempfile.mkdtemp(prefix=(cls.__name__ + "-"), dir=os.getenv("TMP"))) |
||||
cls.check_clamav = Path(os.getenv("CHECK_CLAMAV")) if os.getenv("CHECK_CLAMAV") != None else None |
||||
cls.check_clamd = Path(os.getenv("CHECK_CLAMD")) if os.getenv("CHECK_CLAMD") != None else None |
||||
cls.check_fpu_endian = Path(os.getenv("CHECK_FPU_ENDIAN")) if os.getenv("CHECK_FPU_ENDIAN") != None else None |
||||
cls.milter = Path(os.getenv("CLAMAV_MILTER")) if os.getenv("CLAMAV_MILTER") != None else None |
||||
cls.clambc = Path(os.getenv("CLAMBC")) if os.getenv("CLAMBC") != None else None |
||||
cls.clamd = Path(os.getenv("CLAMD")) if os.getenv("CLAMD") != None else None |
||||
cls.clamdscan = Path(os.getenv("CLAMDSCAN")) if os.getenv("CLAMDSCAN") != None else None |
||||
cls.clamdtop = Path(os.getenv("CLAMDTOP")) if os.getenv("CLAMDTOP") != None else None |
||||
cls.clamscan = Path(os.getenv("CLAMSCAN")) if os.getenv("CLAMSCAN") != None else None |
||||
cls.clamsubmit = Path(os.getenv("CLAMSUBMIT")) if os.getenv("CLAMSUBMIT") != None else None |
||||
cls.clamconf = Path(os.getenv("CLAMCONF")) if os.getenv("CLAMCONF") != None else None |
||||
cls.clamonacc = Path(os.getenv("CLAMONACC")) if os.getenv("CLAMONACC") != None else None |
||||
cls.freshclam = Path(os.getenv("FRESHCLAM")) if os.getenv("FRESHCLAM") != None else None |
||||
cls.sigtool = Path(os.getenv("SIGTOOL")) if os.getenv("SIGTOOL") != None else None |
||||
|
||||
if cls.operating_system == "windows": |
||||
cls.path_sample_config = cls.path_source / "win32" / "conf_examples" |
||||
else: |
||||
cls.path_sample_config = cls.path_source / "etc" |
||||
|
||||
# Check if Valgrind testing is requested |
||||
if os.getenv('VALGRIND') != None: |
||||
cls.log_suffix = '.valgrind.log' |
||||
cls.valgrind = Path(os.getenv("VALGRIND")) |
||||
cls.valgrind_args = f'-v --trace-children=yes --track-fds=yes --leak-check=full ' \ |
||||
f'--suppressions={cls.path_source / "unit_tests" / "valgrind.supp"} ' \ |
||||
f'--log-file={cls.path_tmp / "valgrind.log"} ' \ |
||||
f'--error-exitcode=123' |
||||
|
||||
# cls.log.info(f"{cls.__name__} Environment:") |
||||
# cls.log.info(f" version: {cls.version}") |
||||
# cls.log.info(f" path_source: {cls.path_source}") |
||||
# cls.log.info(f" path_build: {cls.path_build}") |
||||
# cls.log.info(f" path_tmp: {cls.path_tmp}") |
||||
# cls.log.info(f" check_clamav: {cls.check_clamav}") |
||||
# cls.log.info(f" check_clamd: {cls.check_clamd}") |
||||
# cls.log.info(f" check_fpu_endian: {cls.check_fpu_endian}") |
||||
# cls.log.info(f" milter: {cls.milter}") |
||||
# cls.log.info(f" clambc: {cls.clambc}") |
||||
# cls.log.info(f" clamd: {cls.clamd}") |
||||
# cls.log.info(f" clamdscan: {cls.clamdscan}") |
||||
# cls.log.info(f" clamdtop: {cls.clamdtop}") |
||||
# cls.log.info(f" clamscan: {cls.clamscan}") |
||||
# cls.log.info(f" clamsubmit: {cls.clamsubmit}") |
||||
# cls.log.info(f" clamconf: {cls.clamconf}") |
||||
# cls.log.info(f" clamonacc: {cls.clamonacc}") |
||||
# cls.log.info(f" freshclam: {cls.freshclam}") |
||||
# cls.log.info(f" sigtool: {cls.sigtool}") |
||||
# cls.log.info(f" valgrind: {cls.valgrind}") |
||||
|
||||
@classmethod |
||||
def tearDownClass(cls): |
||||
""" |
||||
Clean up after ourselves, |
||||
Delete the generated tmp directory. |
||||
""" |
||||
print("") |
||||
|
||||
try: |
||||
shutil.rmtree(cls.path_tmp) |
||||
cls.log.info("Removed tmp directory: {}".format(cls.path_tmp)) |
||||
except Exception: |
||||
cls.log.info("No tmp directory to clean up.") |
||||
|
||||
def setUp(self): |
||||
print("") |
||||
|
||||
log_path = Path(self.path_build / 'unit_tests' / f'{self._testMethodName}{self.log_suffix}') |
||||
log_path.unlink(missing_ok=True) |
||||
self.log = Logger(self._testMethodName, log_file=str(log_path)) |
||||
|
||||
def tearDown(self): |
||||
print("") |
||||
|
||||
def step_name(self, name): |
||||
"""Log name of a step. |
||||
|
||||
:Parameters: |
||||
- `name`: a string with name of the step to print. |
||||
""" |
||||
self.log.info("~" * 72) |
||||
self.log.info(name.center(72, " ")) |
||||
self.log.info("~" * 72) |
||||
|
||||
def execute(self, cmd, cwd=None, **kwargs): |
||||
"""Execute command. |
||||
|
||||
This method composes shell command from passed args and executes it. |
||||
Command template: '[sudo] cmd [options] data' |
||||
Example: |
||||
cmd='cp', data='source_file dest_file', options=['r','f'], |
||||
sudo=True |
||||
Composed result: 'sudo cp -rf source_file dest_file'. |
||||
|
||||
:Parameters: |
||||
- `cmd`: a string with a shell command to execute. |
||||
- `cwd`: a string with a current working directory to set. |
||||
|
||||
:Keywords: |
||||
- `data`: args for `cmd`(e.g. filename, dirname,). |
||||
- `options`: options for the shell command. |
||||
- `sudo`: use `sudo`? Default value is False. |
||||
- `timeout`: execution timeout in seconds. |
||||
- `env_vars`: a dictionary with custom environment variables. |
||||
- `interact`: a string to enter to the command stdin during |
||||
execution. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if `options` is not a list. |
||||
""" |
||||
executor = Executor(logger=self.log) |
||||
return executor.execute(cmd, cwd=cwd, kwargs=kwargs) |
||||
|
||||
def verify_output(self, text, expected=[], unexpected=[], order=ANY_ORDER): |
||||
"""Method verifies text. Check for expected or unexpected results. |
||||
|
||||
:Parameters: |
||||
- `text`: text to verify. |
||||
- `expected`: (iterable) expected items to be found. |
||||
- `unexpected`: (iterable) unexpected items to be found. |
||||
- `order`: expected appearance order. Default: any order. |
||||
""" |
||||
log_checker = LogChecker(self.log) |
||||
|
||||
if unexpected: |
||||
log_checker.verify_unexpected_output(unexpected, text) |
||||
if expected: |
||||
log_checker.verify_expected_output(expected, text, order=order) |
||||
|
||||
def verify_log( |
||||
self, log_file, expected=[], unexpected=[], ignored=[], order=ANY_ORDER |
||||
): |
||||
"""Method verifies log file. Check for expected or unexpected results. |
||||
|
||||
:Parameters: |
||||
- `log_file`: path to log file. |
||||
- `expected`: (iterable) expected items to be found. |
||||
- `unexpected`: (iterable) unexpected items to be found. |
||||
- `ignored`: (iterable) unexpected items which should be ignored. |
||||
- `order`: expected appearance order. Default: any order. |
||||
""" |
||||
log_checker = LogChecker(self.log) |
||||
|
||||
if unexpected: |
||||
log_checker.verify_unexpected_log(log_file, unexpected=unexpected, ignored=ignored) |
||||
if expected: |
||||
log_checker.verify_expected_log(log_file, expected=expected, order=order) |
||||
|
||||
def verify_valgrind_log(self, log_file: Union[Path, None]=None): |
||||
"""Method verifies a valgrind log file. |
||||
|
||||
If valgrind not enabled this is basically a nop. |
||||
|
||||
:Parameters: |
||||
- `log_file`: path to log file. |
||||
""" |
||||
if self.valgrind == "": |
||||
return |
||||
|
||||
if log_file == None: |
||||
log_file = self.path_tmp / 'valgrind.log' |
||||
|
||||
if not log_file.exists(): |
||||
raise AssertionError(f'{log_file} not found. Valgrind failed to run?') |
||||
|
||||
errors = False |
||||
self.log.info(f'Verifying {log_file}...') |
||||
try: |
||||
self.verify_log( |
||||
str(log_file), |
||||
expected=['ERROR SUMMARY: 0 errors'], |
||||
unexpected=[], |
||||
ignored=[] |
||||
) |
||||
except AssertionError: |
||||
self.log.warning("*" * 69) |
||||
self.log.warning(f'Valgrind test failed!'.center(69, ' ')) |
||||
self.log.warning(f'Please submit this log to https://bugzilla.clamav.net:'.center(69, ' ')) |
||||
self.log.warning(f'{log_file}'.center(69, ' ')) |
||||
self.log.warning("*" * 69) |
||||
errors = True |
||||
finally: |
||||
with log_file.open('r') as log: |
||||
found_summary = False |
||||
for line in log.readlines(): |
||||
if 'ERROR SUMMARY' in line: |
||||
found_summary = True |
||||
if (found_summary or errors) and len(line) < 500: |
||||
self.log.info(line.rstrip('\n')) |
||||
if errors: |
||||
raise AssertionError('Valgrind test FAILED!') |
||||
|
||||
def verify_cmd_result( |
||||
self, |
||||
result, |
||||
exit_code=0, |
||||
stderr_expected=[], |
||||
stderr_unexpected=[], |
||||
stdout_expected=[], |
||||
stdout_unexpected=[], |
||||
order=ANY_ORDER, |
||||
): |
||||
"""Check command result for expected/unexpected stdout/stderr. |
||||
|
||||
:Parameters: |
||||
- `result`: tuple(ec, out, err). |
||||
- `exit_code`: expected exit code value. |
||||
- `stderr_expected`: (iterable) expected items in stderr. |
||||
- `stderr_unexpected`: (iterable) unexpected items in stderr. |
||||
- `stdout_expected`: (iterable) expected items in stdout. |
||||
- `stdout_unexpected`: (iterable) unexpected items in stdout. |
||||
- `order`: expected appearance order. Default: any order. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1) format of `result` is wrong. |
||||
2) actual exit code value doesn't match expected. |
||||
""" |
||||
try: |
||||
ec, stdout, stderr = result |
||||
except: |
||||
raise AssertionError("Wrong result format: %s" % (result,)) |
||||
|
||||
assert ec == exit_code, ( |
||||
"Code mismatch.\nExpected: %s\nActual: %s\nError: %s" |
||||
% (exit_code, ec, stderr) |
||||
) |
||||
if stderr_expected: |
||||
self.verify_expected_output( |
||||
stderr_expected, stderr, order=order |
||||
) |
||||
if stderr_unexpected: |
||||
self.verify_unexpected_output(stderr_unexpected, stderr) |
||||
|
||||
if stdout_expected: |
||||
self.verify_expected_output( |
||||
stdout_expected, stdout, order=order |
||||
) |
||||
if stdout_unexpected: |
||||
self.verify_unexpected_output(stdout_unexpected, stdout) |
||||
|
||||
def _md5(self, filepath): |
||||
"""Get md5 hash sum of a given file. |
||||
|
||||
:Parameters: |
||||
- `filepath`: path to file. |
||||
|
||||
:Return: |
||||
- hash string |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if `filepath` is not a string |
||||
or is empty. |
||||
""" |
||||
assert isinstance(filepath, str), "Invalid filepath: %s." % (filepath,) |
||||
assert os.path.exists(filepath), "file does not exist: %s." % (filepath,) |
||||
|
||||
hash_md5 = hashlib.md5() |
||||
with open(filepath, "rb") as f: |
||||
for chunk in iter(lambda: f.read(4096), b""): |
||||
hash_md5.update(chunk) |
||||
return hash_md5.hexdigest() |
||||
|
||||
def get_md5(self, files): |
||||
"""Get md5 hash sum of every given file. |
||||
|
||||
:Parameters: |
||||
- `files`: a list or a tuple of files. |
||||
|
||||
:Return: |
||||
- dictionary like {file: md5sum}. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if `files` is empty. |
||||
""" |
||||
assert files, "`files` should not be empty." |
||||
files = files if isinstance(files, (list, tuple)) else [files] |
||||
md5_dict = {} |
||||
for path in files: |
||||
if os.path.isfile(path): |
||||
md5_dict[path] = self._md5(path) |
||||
return md5_dict |
||||
|
||||
def _pkill(self, process, options=["-9 -f"], sudo=False): |
||||
"""Wrapper for CLI *nix `pkill` command. |
||||
|
||||
*nix only. |
||||
|
||||
:Parameters: |
||||
- `process`: a string with pattern for process to kill. |
||||
- `options`: options for `pkill` command. |
||||
- `sudo`: use `sudo`? Default value is False. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if `process` is empty or is |
||||
not a string. |
||||
""" |
||||
assert self.operating_system != "windows" |
||||
assert ( |
||||
isinstance(process, str) and process |
||||
), "`process` must be a non-empty string." |
||||
|
||||
result = "" |
||||
error = "" |
||||
code = None |
||||
|
||||
res = self.execute( |
||||
"pkill", data='"%s"' % (process,), options=options, sudo=sudo |
||||
) |
||||
if res.ec != 0: |
||||
self.log.warning("Failed to pkill `%s` process." % (process,)) |
||||
code, error, result = ( |
||||
res.ec if not code or code == 0 else code, |
||||
"\n".join([error, res.err]), |
||||
"\n".join([result, res.out]), |
||||
) |
||||
return namedtuple("CmdResult", ["ec", "out", "err"])(code, result, error) |
||||
|
||||
def _taskkill(self, process, match_all=True): |
||||
"""Stop processes matching the given name. |
||||
|
||||
Windows only. |
||||
|
||||
:Parameters: |
||||
- `processes`: process name. |
||||
- `match_all`: find all processes that match 'process'. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1) `processes` is not a string or is an empty string. |
||||
""" |
||||
assert self.operating_system == "windows" |
||||
|
||||
wildcard = "*" if match_all else "" |
||||
result = "" |
||||
error = "" |
||||
code = None |
||||
|
||||
res = self.execute('taskkill /F /IM "%s%s"' % (process, wildcard)) |
||||
if res.ec != 0: |
||||
self.log.error("Failed to `stop` process.\nError: %s." % (res.err,)) |
||||
code, error, result = ( |
||||
res.ec if not code or code == 0 else code, |
||||
"\n".join([error, res.err]), |
||||
"\n".join([result, res.out]), |
||||
) |
||||
return namedtuple("CmdResult", ["ec", "out", "err"])(code, result, error) |
||||
|
||||
def stop_process(self, processes, options=["-9 -f"], sudo=False): |
||||
"""Stop all specified processes. |
||||
|
||||
:Parameters: |
||||
- `processes`: string name of a process, or a list or a tuple of processes to stop. |
||||
- `match_all`: find all processes that match 'processes'. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1) `processes` is not a string or is an empty string. |
||||
""" |
||||
assert processes, "`processes` should not be empty." |
||||
|
||||
processes = processes if isinstance(processes, (list, tuple)) else [processes] |
||||
results = [] |
||||
|
||||
for process in processes: |
||||
if self.operating_system == "windows": |
||||
res = self._taskkill(process, match_all=True) |
||||
else: |
||||
res = self._pkill(process, options, sudo) |
||||
results.append(res) |
||||
|
||||
return results |
||||
|
||||
def execute_command(self, cmd, **kwargs): |
||||
"""Execute custom command. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
""" |
||||
return self.execute(cmd, **kwargs) |
||||
|
||||
|
||||
class Logger(object): |
||||
|
||||
"""Logger class.""" |
||||
|
||||
_format = "[%(levelname)s]: %(message)s" |
||||
|
||||
_level = logging.DEBUG |
||||
|
||||
levels = { |
||||
"debug": logging.DEBUG, |
||||
"info": logging.INFO, |
||||
"warning": logging.WARNING, |
||||
"error": logging.ERROR, |
||||
"critical": logging.CRITICAL, |
||||
} |
||||
|
||||
def __init__(self, name, level="debug", log_file=""): |
||||
"""Initialize Logger instance.""" |
||||
|
||||
self.core = logging.getLogger(name) |
||||
self.core.propagate = False |
||||
|
||||
self.set_level(level) |
||||
|
||||
formatter = logging.Formatter(self._format, "%Y-%m-%d %H:%M:%S") |
||||
try: |
||||
handler = logging.StreamHandler(strm=sys.stdout) |
||||
except TypeError: |
||||
handler = logging.StreamHandler(stream=sys.stdout) |
||||
finally: |
||||
handler.setFormatter(formatter) |
||||
|
||||
self.core.addHandler(handler) |
||||
|
||||
if log_file != "": |
||||
filehandler = logging.FileHandler(filename=log_file) |
||||
filehandler.setLevel(self.levels[level.lower()]) |
||||
filehandler.setFormatter(formatter) |
||||
self.core.addHandler(filehandler) |
||||
|
||||
def set_level(self, level): |
||||
"""Set logging level.""" |
||||
self.core.setLevel(self.levels[level.lower()]) |
||||
|
||||
def __getattr__(self, attr): |
||||
return getattr(self.core, attr) |
||||
|
||||
|
||||
class Executor(object): |
||||
"""Common CLI executor class.""" |
||||
|
||||
def __init__(self, logger=None): |
||||
"""Initialize BaseExecutor instance.""" |
||||
global loggers |
||||
|
||||
if logger != None: |
||||
self._logger = logger |
||||
else: |
||||
if loggers.get(self.__class__.__name__): |
||||
self._logger = loggers.get(self.__class__.__name__) |
||||
else: |
||||
self._logger = Logger(self.__class__.__name__) |
||||
loggers[self.__class__.__name__] = self._logger |
||||
|
||||
self._process = None |
||||
self._process_pid = None |
||||
self.result = None |
||||
self.error = None |
||||
self.code = None |
||||
self.terminated = False |
||||
|
||||
def _log_cmd_results(self): |
||||
"""Log exit code, stdout and stderr of the executed command.""" |
||||
self._logger.debug("Exit code: %s" % self.code) |
||||
self._logger.debug("stdout: %s" % self.result) |
||||
if self.code: |
||||
self._logger.debug("stderr: %s" % self.error) |
||||
|
||||
def _start_cmd_thread(self, target, target_args, timeout=EXECUTION_TIMEOUT): |
||||
"""Start command thread and kill it if timeout exceeds. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
""" |
||||
# Start monitor thread. |
||||
thread = threading.Thread(target=target, args=target_args) |
||||
thread.start() |
||||
thread.join(timeout) |
||||
|
||||
# Kill process if timeout exceeded. |
||||
if thread.is_alive(): |
||||
if platform.system() == "Windows": |
||||
os.kill(self._process_pid, signal.CTRL_C_EVENT) |
||||
else: |
||||
os.killpg(self._process_pid, signal.SIGTERM) |
||||
self.terminated = True |
||||
thread.join() |
||||
|
||||
return namedtuple("CmdResult", ["ec", "out", "err"])( |
||||
self.code, self.result, self.error |
||||
) |
||||
|
||||
def __run(self, cmd, cwd=None, env_vars={}, interact=""): |
||||
"""Execute command in separate thread.""" |
||||
if platform.system() == "Windows": |
||||
self._logger.debug("Run command: %s" % (cmd,)) |
||||
self._process = subprocess.Popen( |
||||
cmd, |
||||
cwd=cwd, |
||||
stdout=subprocess.PIPE, |
||||
stdin=subprocess.PIPE, |
||||
stderr=subprocess.PIPE, |
||||
shell=True, |
||||
) |
||||
|
||||
else: |
||||
sys_env = os.environ.copy() |
||||
sys_env.update(env_vars) |
||||
|
||||
self._logger.debug("Run command: %s" % (cmd,)) |
||||
self._process = subprocess.Popen( |
||||
cmd, |
||||
cwd=cwd, |
||||
stdout=subprocess.PIPE, |
||||
stdin=subprocess.PIPE, |
||||
stderr=subprocess.PIPE, |
||||
preexec_fn=os.setsid, |
||||
env=sys_env, |
||||
shell=True, |
||||
) |
||||
|
||||
self._process_pid = self._process.pid |
||||
self.result, self.error = self._process.communicate(interact) |
||||
if self.result != None: |
||||
self.result = self.result.decode("utf-8", "ignore") |
||||
self.error = self.error.decode("utf-8", "ignore") |
||||
self.code = self._process.returncode |
||||
|
||||
if self.terminated: |
||||
self.error = 'Execution timeout exceeded for "%s" command.' % (cmd,) |
||||
self.code = TIMEOUT_EXIT_CODE |
||||
self.terminated = False |
||||
|
||||
self._log_cmd_results() |
||||
|
||||
def execute(self, cmd, cwd=None, **kwargs): |
||||
"""Execute command. |
||||
|
||||
This method composes shell command from passed args and executes it. |
||||
Command template: '[sudo] cmd [options] data' |
||||
Example: |
||||
cmd='cp', data='source_file dest_file', options=['r','f'], |
||||
sudo=True |
||||
Composed result: 'sudo cp -rf source_file dest_file'. |
||||
|
||||
:Parameters: |
||||
- `cmd`: a string with a shell command to execute. |
||||
- `cwd`: a string with a current working directory to set. |
||||
|
||||
:Keywords: |
||||
- `data`: args for `cmd`(e.g. filename, dirname,). |
||||
- `options`: options for the shell command. |
||||
- `sudo`: use `sudo`? Default value is False. |
||||
- `timeout`: execution timeout in seconds. |
||||
- `env_vars`: a dictionary with custom environment variables. |
||||
- `interact`: a string to enter to the command stdin during |
||||
execution. |
||||
|
||||
:Return: |
||||
- namedtuple(ec, out, err). |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if `options` is not a list. |
||||
""" |
||||
data = kwargs.get("data", "") |
||||
options = kwargs.get("options", []) |
||||
sudo = kwargs.get("sudo", False) |
||||
timeout = int(kwargs.get("timeout") or EXECUTION_TIMEOUT) |
||||
env_vars = kwargs.get("env_vars", {}) |
||||
interact = kwargs.get("interact", "") |
||||
assert isinstance(options, list), "`options` must be a list." |
||||
|
||||
if platform.system() == "Windows": |
||||
timeout = EXECUTION_TIMEOUT |
||||
return self._start_cmd_thread(self.__run, (cmd, cwd, interact), timeout) |
||||
|
||||
else: |
||||
opts = "" |
||||
if options: |
||||
# Remove duplicates preserving the order: |
||||
unq_opts = [] |
||||
for option in options: |
||||
option = option.strip("- ") |
||||
if option not in unq_opts: |
||||
unq_opts.append(option) |
||||
|
||||
opts = "-%s " % ("".join(unq_opts),) |
||||
|
||||
# Build command. |
||||
execute_cmd = "%s %s%s" % (cmd, opts, data) |
||||
if sudo: |
||||
execute_cmd = "sudo %s" % (execute_cmd,) |
||||
|
||||
return self._start_cmd_thread( |
||||
self.__run, (execute_cmd, cwd, env_vars, interact), timeout |
||||
) |
||||
|
||||
|
||||
class LogChecker: |
||||
|
||||
"""This class provides methods to check logs and strings.""" |
||||
|
||||
def __init__(self, logger=None): |
||||
"""Initialize LogChecker instance.""" |
||||
global loggers |
||||
|
||||
if logger != None: |
||||
self._logger = logger |
||||
else: |
||||
if loggers.get(self.__class__.__name__): |
||||
self._logger = loggers.get(self.__class__.__name__) |
||||
else: |
||||
self._logger = Logger(self.__class__.__name__) |
||||
loggers[self.__class__.__name__] = self._logger |
||||
|
||||
@staticmethod |
||||
def _prepare_value(value): |
||||
"""Convert given value to a list if needed.""" |
||||
return value if isinstance(value, (tuple, list)) else [value] |
||||
|
||||
def __crop_output(self, output, limit=(2000, 2000)): |
||||
"""Crop string with output to specified limits. |
||||
|
||||
:Parameters: |
||||
- `output`: a string to be cropped. |
||||
- `limit`: a tuple with a range to be cropped from `output`. |
||||
|
||||
:Return: |
||||
- cropped `output` if its length exceeds limit, otherwise - |
||||
`output`. |
||||
""" |
||||
crop_message = ( |
||||
"" |
||||
if len(output) <= sum(limit) |
||||
else "\n\n----- CROPPED -----\n ...\n----- CROPPED -----\n\n" |
||||
) |
||||
if crop_message: |
||||
return "".join((output[: limit[0]], crop_message, output[-limit[1] :])) |
||||
return output |
||||
|
||||
def verify_expected_output(self, expected_items, output, order=STRICT_ORDER): |
||||
"""Check presence of regex patterns in output string. |
||||
|
||||
:Parameters: |
||||
- `expected_items`: a list of regex patterns that should be found |
||||
in `output`. |
||||
- `output`: a string with output to verify. |
||||
- `order`: STRICT_ORDER, ANY_ORDER. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1)`output` is not a string. |
||||
2) one of expected items was not found in `output`. |
||||
3) items were found in wrong order. |
||||
""" |
||||
if output != None and not isinstance(output, str): |
||||
output = output.decode("utf-8", "ignore") |
||||
assert isinstance(output, str), "`output` must be a string." |
||||
expected_items = self._prepare_value(expected_items) |
||||
|
||||
last_found_position = 0 |
||||
for item in expected_items: |
||||
pattern = re.compile(item) |
||||
match = pattern.search(output) |
||||
assert match, "Expected item `%s` not found in output:\n%s" % ( |
||||
item, |
||||
self.__crop_output(output), |
||||
) |
||||
current_found_position = match.start() |
||||
# Compare current found position with last found position |
||||
if order == STRICT_ORDER: |
||||
assert current_found_position >= last_found_position, ( |
||||
"Expected item `%s` order is wrong in output:\n%s" |
||||
% (item, self.__crop_output(output)) |
||||
) |
||||
last_found_position = current_found_position |
||||
|
||||
def verify_unexpected_output(self, unexpected_items, output): |
||||
"""Check absence of regex patterns in output string. |
||||
|
||||
:Parameters: |
||||
- `unexpected_items`: a list of regex patterns that should be |
||||
absent in `output`. |
||||
- `output`: a string with output to verify. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1)`output` is not a string. |
||||
2) one of unexpected items was found in `output`. |
||||
""" |
||||
if output != None and not isinstance(output, str): |
||||
output = output.decode("utf-8", "ignore") |
||||
assert isinstance(output, str), "`output` must be a string." |
||||
unexpected_items = self._prepare_value(unexpected_items) |
||||
|
||||
for item in unexpected_items: |
||||
pattern = re.compile(item) |
||||
match = pattern.search(output) |
||||
assert not match, ( |
||||
"Unexpected item `%s` which should be absent " |
||||
"found in output:\n%s" % (item, self.__crop_output(output)) |
||||
) |
||||
|
||||
def verify_expected_log(self, filename, expected=[], order=STRICT_ORDER): |
||||
"""Check presence of regex patterns in specified file. |
||||
|
||||
:Parameters: |
||||
- `filename`: a string with absolute path to a file. |
||||
- `expected`: a list of regex patterns that should be found in |
||||
the file. |
||||
- `order`: STRICT_ORDER, ANY_ORDER. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1)`filename` is not a string. |
||||
2) specified file doesn't exist. |
||||
3) one of expected items was not found in the file. |
||||
4) items were found in wrong order. |
||||
""" |
||||
if filename != None and not isinstance(filename, str): |
||||
filename = filename.decode("utf-8", "ignore") |
||||
assert isinstance(filename, str), "`filename` must be a string." |
||||
assert os.path.isfile(filename), "No such file: %s." % (filename,) |
||||
expected = self._prepare_value(expected) |
||||
|
||||
def read_log(): |
||||
"""Read log file in chunks.""" |
||||
with open(filename, "r") as file_reader: |
||||
prev_lines, lines = [], [] |
||||
for idx, line in enumerate(file_reader, 1): |
||||
lines.append(line) |
||||
if idx % CHUNK_SIZE == 0: |
||||
yield idx, "".join(prev_lines + lines) |
||||
prev_lines, lines = lines, [] |
||||
if lines: |
||||
yield idx, "".join(prev_lines + lines) |
||||
|
||||
results = {} |
||||
for line_idx, chunk in read_log(): |
||||
chunk_size = chunk.count("\n") |
||||
for item in expected: |
||||
matches_iterator = re.finditer( |
||||
r"%s" % (item,), chunk, flags=re.MULTILINE |
||||
) |
||||
for match in matches_iterator: |
||||
relative_line = chunk.count("\n", 0, match.start()) + 1 |
||||
line = max(relative_line, line_idx - chunk_size + relative_line) |
||||
results[item] = results.get(item, [line]) |
||||
if line not in results[item]: |
||||
results[item].append(line) |
||||
|
||||
if order == STRICT_ORDER: |
||||
last_found_position = 0 |
||||
for item in expected: |
||||
found_matches = results.get(item) |
||||
assert found_matches, "Expected item `%s` not found in " "file: %s." % ( |
||||
item, |
||||
filename, |
||||
) |
||||
if len(found_matches) > 1: |
||||
self._logger.warning("More than one match for item `%s`." % (item,)) |
||||
# Item(s) found. Let's get line number of first appearance. |
||||
current_found_position = found_matches[0] |
||||
# Compare first appearances of current and previous items. |
||||
assert current_found_position > last_found_position, ( |
||||
"Expected item `%s` order is wrong in file: %s.\n" |
||||
"Current position: %s.\nPrevious position: %s." |
||||
% (item, filename, current_found_position, last_found_position) |
||||
) |
||||
last_found_position = current_found_position |
||||
else: |
||||
for item in expected: |
||||
found_matches = results.get(item) |
||||
assert found_matches, "Expected item `%s` not found in " "file: %s." % ( |
||||
item, |
||||
filename, |
||||
) |
||||
if len(found_matches) > 1: |
||||
self._logger.warning("More than one match for item `%s`." % (item,)) |
||||
|
||||
def verify_unexpected_log(self, filename, unexpected=[], ignored=[]): |
||||
"""Check absence of regex patterns in specified file. |
||||
|
||||
:Parameters: |
||||
- `filename`: a string with absolute path to a file. |
||||
- `unexpected`: a list of regex patterns that should be absent in |
||||
the file. |
||||
- `ignored`: a list of regex patterns that should be ignored. |
||||
|
||||
:Exceptions: |
||||
- `AssertionError`: is raised if: |
||||
1)`filename` is not a string. |
||||
2) specified file doesn't exist. |
||||
3) one of unexpected items was found in the file. |
||||
""" |
||||
if filename != None and not isinstance(filename, str): |
||||
filename = filename.decode("utf-8", "ignore") |
||||
assert isinstance(filename, str), "`filename` must be a string." |
||||
assert os.path.isfile(filename), "No such file: %s." % (filename,) |
||||
unexpected = self._prepare_value(unexpected) |
||||
ignored = self._prepare_value(ignored) |
||||
|
||||
with open(filename, "r") as file_reader: |
||||
found_items = [] |
||||
for line in file_reader: |
||||
for item in unexpected: |
||||
if re.search(r"%s" % (item,), line): |
||||
found_items.append(line.strip()) |
||||
if ignored: |
||||
for item in ignored: |
||||
for line in found_items[:]: |
||||
if re.search(r"%s" % (item,), line): |
||||
found_items.remove(line) |
||||
|
||||
assert len(found_items) == 0, "Unexpected items were found in %s:\n%s" % ( |
||||
filename, |
||||
found_items, |
||||
) |
||||
|
||||
Loading…
Reference in new issue