CARVIEW |
Testing a Click app with streaming input
For sqlite-utils#364 I needed to write a test for a Click app which dealt with input streamed to standard input. I needed to run some assertions during that process, which ruled out the usual CliRunner.invoke() testing tool since that works by running the command until completion.
I decided to use subprocess
to run the application. Here's the pattern I came up with for the test:
def test_insert_streaming_batch_size_1(db_path):
# https://github.com/simonw/sqlite-utils/issues/364
# Streaming with --batch-size 1 should commit on each record
# Can't use CliRunner().invoke() here bacuse we need to
# run assertions in between writing to process stdin
proc = subprocess.Popen(
[
sys.executable,
"-m",
"sqlite_utils",
"insert",
db_path,
"rows",
"-",
"--nl",
"--batch-size",
"1",
],
stdin=subprocess.PIPE
)
proc.stdin.write(b'{"name": "Azi"}\n')
proc.stdin.flush()
# Without this delay the data wasn't yet visible
time.sleep(0.2)
assert list(Database(db_path)["rows"].rows) == [{"name": "Azi"}]
proc.stdin.write(b'{"name": "Suna"}\n')
proc.stdin.flush()
time.sleep(0.2)
assert list(Database(db_path)["rows"].rows) == [{"name": "Azi"}, {"name": "Suna"}]
proc.stdin.close()
proc.wait()
assert proc.returncode == 0
The first trick I'm using here is running sys.executable
to start a Python process. This ensures I run the Python that is available in the current virtual environment.
I modified my sqlite-utils
command such that it could also be run using python -m sqlite_utils
- see sqlite-utils#368 for details.
Setting stdin=subprocess.PIPE
allows me to write data to the process's standard input using proc.stdin.write()
.
I realized I needed to call proc.stdin.flush()
after each write to ensure the write was pushed to the process in a predictable manner.
At the end of the test, running proc.stdin.close()
is equivalent to sending an end-of-file, then proc.wait()
ensures the process has finished and terminated.
Related
- python Running PyPy on macOS using Homebrew - 2022-09-14
- pytest Mocking subprocess with pytest-subprocess - 2023-03-08
- pytest Writing pytest tests against tools written with argparse - 2022-01-08
- python Using io.BufferedReader to peek against a non-peekable stream - 2021-02-15
- pytest Start a server in a subprocess during a pytest session - 2020-08-31
- python Running Python code in a subprocess with a time limit - 2020-12-06
- python Debugging a Click application using pdb - 2020-09-03
- python How to call pip programatically from Python - 2020-08-11
- pytest Using pytest and Playwright to test a JavaScript web application - 2022-07-24
- python Explicit file encodings using click.File - 2020-10-16
Created 2022-01-09T20:47:37-08:00 · Edit