diff --git a/lib/ansible/plugins/connection/ssh.py b/lib/ansible/plugins/connection/ssh.py
index e54a880dce3..12bdaf59e3b 100644
--- a/lib/ansible/plugins/connection/ssh.py
+++ b/lib/ansible/plugins/connection/ssh.py
@@ -131,6 +131,10 @@ except ImportError:
 SSHPASS_AVAILABLE = None
 
 
+class AnsibleControlPersistBrokenPipeError(AnsibleError):
+    ''' ControlPersist broken pipe '''
+    pass
+
 def _ssh_retry(func):
     """
     Decorator to retry ssh/scp/sftp in the case of a connection failure
@@ -148,11 +152,17 @@ def _ssh_retry(func):
         cmd_summary = "%s..." % args[0]
         for attempt in range(remaining_tries):
             try:
-                return_tuple = func(self, *args, **kwargs)
-                display.vvv(return_tuple, host=self.host)
-                # 0 = success
-                # 1-254 = remote command return code
-                # 255 = failure from the ssh command itself
+                try:
+                    return_tuple = func(self, *args, **kwargs)
+                    display.vvv(return_tuple, host=self.host)
+                    # 0 = success
+                    # 1-254 = remote command return code
+                    # 255 = failure from the ssh command itself
+                except (AnsibleControlPersistBrokenPipeError) as e:
+                    # Retry one more time because of the ControlPersist broken pipe (see #16731)
+                    display.vvv(u"RETRYING BECAUSE OF CONTROLPERSIST BROKEN PIPE")
+                    return_tuple = func(self, *args, **kwargs)
+
                 if return_tuple[0] != 255:
                     break
                 else:
@@ -744,6 +754,12 @@ class Connection(ConnectionBase):
             raise AnsibleError('using -c ssh on certain older ssh versions may not support ControlPersist, set ANSIBLE_SSH_ARGS="" '
                                '(or ssh_args in [ssh_connection] section of the config file) before running again')
 
+        # If we find a broken pipe because of ControlPersist timeout expiring (see #16731),
+        # we raise a special exception so that we can retry a connection.
+        controlpersist_broken_pipe = b'mux_client_hello_exchange: write packet: Broken pipe' in b_stderr
+        if p.returncode == 255 and controlpersist_broken_pipe:
+            raise AnsibleControlPersistBrokenPipeError('SSH Error: data could not be sent because of ControlPersist broken pipe.')
+
         if p.returncode == 255 and in_data and checkrc:
             raise AnsibleConnectionFailure('SSH Error: data could not be sent to remote host "%s". Make sure this host can be reached over ssh' % self.host)
 
diff --git a/test/units/plugins/connection/test_ssh.py b/test/units/plugins/connection/test_ssh.py
index c6470d5eed6..fb5e8f1e095 100644
--- a/test/units/plugins/connection/test_ssh.py
+++ b/test/units/plugins/connection/test_ssh.py
@@ -590,7 +590,7 @@ class TestSSHConnectionRetries(object):
 
         self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"second_line"]
         self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"]
-        type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 3 + [0] * 4)
+        type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 4 + [0] * 4)
 
         self.mock_selector.select.side_effect = [
             [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
@@ -621,7 +621,7 @@ class TestSSHConnectionRetries(object):
 
         self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"second_line"]
         self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"]
-        type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 3 + [0] * 4)
+        type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 4 + [0] * 4)
 
         self.mock_selector.select.side_effect = [
             [(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],