Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Deduplicate parsing code and abstract away lines
Parsing .env files is a critical part of this package.  To make it
easier to change it and test it, it is important that it is done in only
one place.

Also, code that uses the parser now doesn't depend on the fact that each
key-value binding spans exactly one line.  This will make it easier to
handle multiline bindings in the future.
  • Loading branch information
bbc2 committed Nov 14, 2018
commit aea71c25040c90bb9184956058919d35de755344
74 changes: 46 additions & 28 deletions dotenv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
from __future__ import absolute_import, print_function, unicode_literals

import codecs
import fileinput
import io
import os
import re
import shutil
import sys
from subprocess import Popen
import tempfile
import warnings
from collections import OrderedDict
from collections import OrderedDict, namedtuple
from contextlib import contextmanager

from .compat import StringIO, PY2, WIN, text_type

__escape_decoder = codecs.getdecoder('unicode_escape')
__posix_variable = re.compile(r'\$\{[^\}]*\}')

Binding = namedtuple('Binding', 'key value original')


def decode_escaped(escaped):
return __escape_decoder(escaped)[0]
Expand Down Expand Up @@ -46,6 +49,12 @@ def parse_line(line):
return k, v


def parse_stream(stream):
for line in stream:
(key, value) = parse_line(line)
yield Binding(key=key, value=value, original=line)


class DotEnv():

def __init__(self, dotenv_path, verbose=False):
Expand Down Expand Up @@ -76,12 +85,9 @@ def dict(self):

def parse(self):
with self._get_stream() as stream:
for line in stream:
key, value = parse_line(line)
if not key:
continue

yield key, value
for mapping in parse_stream(stream):
if mapping.key is not None and mapping.value is not None:
yield mapping.key, mapping.value

def set_as_environment_variables(self, override=False):
"""
Expand Down Expand Up @@ -121,6 +127,20 @@ def get_key(dotenv_path, key_to_get):
return DotEnv(dotenv_path, verbose=True).get(key_to_get)


@contextmanager
def rewrite(path):
try:
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as dest:
with io.open(path) as source:
yield (source, dest)
except BaseException:
if os.path.isfile(dest.name):
os.unlink(dest.name)
raise
else:
shutil.move(dest.name, path)


def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
"""
Adds or Updates a key/value to the given .env
Expand All @@ -136,20 +156,19 @@ def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
if " " in value_to_set:
quote_mode = "always"

line_template = '{}="{}"' if quote_mode == "always" else '{}={}'
line_template = '{}="{}"\n' if quote_mode == "always" else '{}={}\n'
line_out = line_template.format(key_to_set, value_to_set)

replaced = False
for line in fileinput.input(dotenv_path, inplace=True):
k, v = parse_line(line)
if k == key_to_set:
replaced = True
line = "{}\n".format(line_out)
print(line, end='')

if not replaced:
with io.open(dotenv_path, "a") as f:
f.write("{}\n".format(line_out))
with rewrite(dotenv_path) as (source, dest):
replaced = False
for mapping in parse_stream(source):
if mapping.key == key_to_set:
dest.write(line_out)
replaced = True
else:
dest.write(mapping.original)
if not replaced:
dest.write(line_out)

return True, key_to_set, value_to_set

Expand All @@ -161,18 +180,17 @@ def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
If the .env path given doesn't exist, fails
If the given key doesn't exist in the .env, fails
"""
removed = False

if not os.path.exists(dotenv_path):
warnings.warn("can't delete from %s - it doesn't exist." % dotenv_path)
return None, key_to_unset

for line in fileinput.input(dotenv_path, inplace=True):
k, v = parse_line(line)
if k == key_to_unset:
removed = True
line = ''
print(line, end='')
removed = False
with rewrite(dotenv_path) as (source, dest):
for mapping in parse_stream(source):
if mapping.key == key_to_unset:
removed = True
else:
dest.write(mapping.original)

if not removed:
warnings.warn("key %s not removed from %s - key doesn't exist." % (key_to_unset, dotenv_path))
Expand Down
52 changes: 34 additions & 18 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
from os import environ
import os
from os.path import dirname, join

import pytest
import sh

import dotenv
from dotenv.version import __version__
from dotenv.cli import cli as dotenv_cli

import sh
from dotenv.version import __version__

here = dirname(__file__)
dotenv_path = join(here, '.env')
Expand Down Expand Up @@ -39,6 +40,17 @@ def test_set_key(dotenv_file):
assert 'HELLO="WORLD 2"\nfoo="bar"' == fp.read().strip()


def test_set_key_permission_error(dotenv_file):
os.chmod(dotenv_file, 0o000)

with pytest.raises(Exception):
dotenv.set_key(dotenv_file, "HELLO", "WORLD")

os.chmod(dotenv_file, 0o600)
with open(dotenv_file, "r") as fp:
assert fp.read() == ""


def test_list(cli, dotenv_file):
success, key_to_set, value_to_set = dotenv.set_key(dotenv_file, 'HELLO', 'WORLD')
result = cli.invoke(dotenv_cli, ['--file', dotenv_file, 'list'])
Expand Down Expand Up @@ -95,18 +107,22 @@ def test_value_with_special_characters():
sh.rm(dotenv_path)


def test_unset():
sh.touch(dotenv_path)
success, key_to_set, value_to_set = dotenv.set_key(dotenv_path, 'HELLO', 'WORLD')
stored_value = dotenv.get_key(dotenv_path, 'HELLO')
assert stored_value == 'WORLD'
success, key_to_unset = dotenv.unset_key(dotenv_path, 'HELLO')
def test_unset_ok(dotenv_file):
with open(dotenv_file, "w") as f:
f.write("a=b\nc=d")

success, key_to_unset = dotenv.unset_key(dotenv_file, "a")

assert success is True
assert dotenv.get_key(dotenv_path, 'HELLO') is None
success, key_to_unset = dotenv.unset_key(dotenv_path, 'RANDOM')
assert success is None
sh.rm(dotenv_path)
success, key_to_unset = dotenv.unset_key(dotenv_path, 'HELLO')
assert key_to_unset == "a"
with open(dotenv_file, "r") as f:
assert f.read() == "c=d"
sh.rm(dotenv_file)


def test_unset_non_existing_file():
success, key_to_unset = dotenv.unset_key('/non-existing', 'HELLO')

assert success is None


Expand Down Expand Up @@ -180,7 +196,7 @@ def test_get_key_with_interpolation(cli):
stored_value = dotenv.get_key(dotenv_path, 'BAR')
assert stored_value == 'CONCATENATED_WORLD_POSIX_VAR'
# test replace from environ taking precedence over file
environ["HELLO"] = "TAKES_PRECEDENCE"
os.environ["HELLO"] = "TAKES_PRECEDENCE"
stored_value = dotenv.get_key(dotenv_path, 'FOO')
assert stored_value == "TAKES_PRECEDENCE"
sh.rm(dotenv_path)
Expand All @@ -194,10 +210,10 @@ def test_get_key_with_interpolation_of_unset_variable(cli):
stored_value = dotenv.get_key(dotenv_path, 'FOO')
assert stored_value == ''
# unless present in environment
environ['NOT_SET'] = 'BAR'
os.environ['NOT_SET'] = 'BAR'
stored_value = dotenv.get_key(dotenv_path, 'FOO')
assert stored_value == 'BAR'
del(environ['NOT_SET'])
del(os.environ['NOT_SET'])
sh.rm(dotenv_path)


Expand Down
19 changes: 18 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sh

from dotenv import load_dotenv, find_dotenv, set_key, dotenv_values
from dotenv.main import parse_line
from dotenv.main import Binding, parse_line, parse_stream
from dotenv.compat import StringIO
from IPython.terminal.embed import InteractiveShellEmbed

Expand Down Expand Up @@ -42,6 +42,23 @@ def test_parse_line(test_input, expected):
assert parse_line(test_input) == expected


@pytest.mark.parametrize("test_input,expected", [
("", []),
("a=b", [Binding(key="a", value="b", original="a=b")]),
(
"a=b\nc=d",
[
Binding(key="a", value="b", original="a=b\n"),
Binding(key="c", value="d", original="c=d"),
],
),
])
def test_parse_stream(test_input, expected):
result = parse_stream(StringIO(test_input))

assert list(result) == expected


def test_warns_if_file_does_not_exist():
with warnings.catch_warnings(record=True) as w:
load_dotenv('.does_not_exist', verbose=True)
Expand Down