Test: freshclam w/ zero-byte cdiff & cvd out-of-date

Add a test where freshclam received a zero-byte cdiff to trigger a whole
CVD database download, and the CVD served is older than advertised.

This is a regression test for a bug found & fixed by Andrew Williams.
pull/176/head
Micah Snyder 4 years ago committed by Andrew
parent 74a97fe485
commit ee8a62baf7
  1. 131
      unit_tests/freshclam_test.py

@ -263,8 +263,8 @@ class TC(testcase.TestCase):
# start with this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-1.cvd', TC.path_db / 'test.cvd')
# update to this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd')
# advertise this CVD (by sending the header response to Range requests)
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd.advertised')
# using these CDIFFs
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-2.cdiff', TC.path_www)
@ -311,13 +311,13 @@ class TC(testcase.TestCase):
self.verify_output(output.out, expected=expected_results, unexpected=unexpected_results)
def test_freshclam_06_cdiff_partial_minus_1(self):
self.step_name('Verify that freshclam can update from an older CVD to a newer with CDIFF patches')
self.step_name('Verify that freshclam will accept a partial update with 1 missing cdiff')
# start with this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-3.cvd', TC.path_db / 'test.cvd')
# update to this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd')
# advertise this CVD (by sending the header response to Range requests)
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd.advertised')
# using these CDIFFs
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-4.cdiff', TC.path_www)
@ -384,13 +384,16 @@ class TC(testcase.TestCase):
self.verify_output(output.out, expected=expected_results, unexpected=unexpected_results)
def test_freshclam_07_cdiff_partial_minus_2(self):
self.step_name('Verify that freshclam can update from an older CVD to a newer with CDIFF patches')
self.step_name('Verify that freshclam behavior with 2 missing cdiffs')
# start with this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-3.cvd', TC.path_db / 'test.cvd')
# update to this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd')
# advertise this CVD (by sending the header response to Range requests)
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd.advertised')
# serve this CVD when requested instead of the advertised one
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd.served')
# using these CDIFFs
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-4.cdiff', TC.path_www)
@ -455,6 +458,59 @@ class TC(testcase.TestCase):
]
self.verify_output(output.out, expected=expected_results, unexpected=unexpected_results)
def test_freshclam_07_no_cdiff_out_of_date_cvd(self):
self.step_name('Verify that freshclam will properly handle an out-of-date CVD update after a zero-byte CDIFF')
# start with this CVD
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-3.cvd', TC.path_db / 'test.cvd')
# advertise this CVD (by sending the header response to Range requests)
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-6.cvd', TC.path_www / 'test.cvd.advertised')
# serve this CVD when requested instead of the advertised one
shutil.copy(TC.path_source / 'unit_tests' / 'test_db' / 'test-5.cvd', TC.path_www / 'test.cvd.served')
# Serve a zero-byte test-4.cdiff instead of the real test-4.cdiff. This should trigger a whole CVD download.
with (TC.path_www / 'test-4.cdiff').open('w') as fp:
pass
handler = partial(WebServerHandler_WWW, TC.path_www)
TC.mock_mirror = Process(target=mock_database_mirror, args=(handler, TC.mock_mirror_port))
TC.mock_mirror.start()
if TC.freshclam_config.exists():
os.remove(str(TC.freshclam_config))
TC.freshclam_config.write_text('''
DatabaseMirror http://localhost:{port}
DNSDatabaseInfo no
PidFile {freshclam_pid}
LogVerbose yes
LogFileMaxSize 0
LogTime yes
DatabaseDirectory {path_db}
DatabaseOwner {user}
'''.format(
freshclam_pid=TC.freshclam_pid,
path_db=TC.path_db,
port=TC.mock_mirror_port,
user=getpass.getuser(),
))
command = '{valgrind} {valgrind_args} {freshclam} --config-file={freshclam_config} --update-db=test'.format(
valgrind=TC.valgrind, valgrind_args=TC.valgrind_args, freshclam=TC.freshclam, freshclam_config=TC.freshclam_config
)
output = self.execute_command(command)
assert output.ec == 0 # success
expected_results = [
'Incremental updates either failed or are disabled, so we\'ll have to settle for a slightly out-of-date database.',
]
unexpected_results = [
'already up-to-date'
]
self.verify_output(output.out, expected=expected_results, unexpected=unexpected_results)
def mock_database_mirror(handler, port=8001):
'''
Process entry point for our HTTP Server to mock a database mirror.
@ -527,8 +583,10 @@ class WebServerHandler_WWW(BaseHTTPRequestHandler):
'''
Make an HTTP server handler that has a configurable directory for hosting files.
Server handler to send a CVD header indicating an update is available,
Server handler to send a CVD header of `test.cvd.advertised` indicating an update is available,
and then to serve up CDIFFs that should allow the test to do an incremental update.
If `test.cvd` is requested, it will serve up `test.cvd.served` (not `test.cvd.advertised`)
'''
def __init__(self, path_www, *args, **kwargs):
@ -539,33 +597,46 @@ class WebServerHandler_WWW(BaseHTTPRequestHandler):
requested_file = self.path_www / self.path.lstrip('/')
print("Mock Server: Test requested: {}".format(requested_file))
if not requested_file.exists():
self.send_error(404, "{} Not Found".format(self.path.lstrip('/')))
elif 'Range' in self.headers:
if 'Range' in self.headers:
# This will send a CVD header so FreshClam thinks there is an update.
(range_start, range_end) = self.headers['Range'].split('=')[-1].split('-')
print("Mock Server: But they only want bytes {} through {} ...".format(range_start, range_end))
with requested_file.open('rb') as the_file:
self.send_response(206) # Partial file
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
if requested_file.name.endswith('.cvd'):
response_file = requested_file.parent / f'{requested_file}.advertised'
else:
response_file = requested_file
the_file.seek(int(range_start))
page = the_file.read(int(range_end) - int(range_start) + 1)
if not response_file.exists():
self.send_error(404, "{} Not Found".format(self.path.lstrip('/')))
else:
with response_file.open('rb') as the_file:
self.send_response(206) # Partial file
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
bytes_written = self.wfile.write(page)
print("Mock Server: Sending {} bytes back to client.".format(bytes_written))
the_file.seek(int(range_start))
page = the_file.read(int(range_end) - int(range_start) + 1)
bytes_written = self.wfile.write(page)
print("Mock Server: Sending {} bytes back to client.".format(bytes_written))
else:
# Send back some whole files
with requested_file.open('rb') as the_file:
self.send_response(200) # Partial file
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
page = the_file.read()
bytes_written = self.wfile.write(page)
print("Mock Server: Sending {} bytes back to client.".format(bytes_written))
if requested_file.name.endswith('.cvd'):
response_file = requested_file.parent / f'{requested_file}.served'
else:
response_file = requested_file
if not response_file.exists():
self.send_error(404, "{} Not Found".format(self.path.lstrip('/')))
else:
with response_file.open('rb') as the_file:
self.send_response(200) # Partial file
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
page = the_file.read()
bytes_written = self.wfile.write(page)
print("Mock Server: Sending {} bytes back to client.".format(bytes_written))

Loading…
Cancel
Save