Draft: Designing a Data Flow Language - Pipe Expressions

Renato Pereira

After creating my first programming language, I’ve been thinking which features I would like to reuse in another project. Given that I am quite obsessed for designing programming languages, the new project it is already in my mind.

I enjoy creating tools that are useful to me. KLCLANG for instance, my second language, serves as a simple calculator that I can use in the terminal. With it I can perform unit conversion such as 2MB to KB or 9.8ft to m.

I’m not aiming to “solve” any specific problem with it, just to fulfill a convenience for my own needs. Speaking of convenience, I frequently encounter similar challenges on my job: given a source of data, I need to perform some minor processing on it before taking further action.

Pretty generic, I know, but allow me to give you real case scenarios from my past experience:

The list goes on.

These are simple do-once (or twice, or thrice?!) tasks that are fun, but quite annoying to solve when you don’t have much time among other daily needs. I usually use node.js or python for such tasks. They offer fast interaction without too much hassle that are usually the case in statically typed languages. But even in these languages, there are some annoyances, such as dealing with their development environment and tooling, and installing (and learning) 3rd-party packages.

I’ve been trying to make myself more familiar with bash, but, hell, bash is a total mess. Furthermore, I am incapable of remembering the syntax and usage options for bash and other useful tools such as sed, awk and grep.

So, here it is my motivation for the new language: helping me with shell scripting.

Pipe Expressions

One feature I really liked in SHTLANG was the pipe expressions - though the implementation was SHT, the concept was convincing nonetheless. Pipe expressions typically are just syntactical sugar in most languages; they are shortcuts for passing the results of one function as arguments to another function.

-- Pipe expression from Gleam documentation:
"Hello, Mike!"
|> string.drop_right(1)
|> string.drop_left(7)
|> io.debug

-- This is equivalent to
io.debug(string.drop_left(string.drop_right("Hello, Joe!", 1), 7))

In SHT, I did something similar, but the pipes are lazy and works upon iterators. The expression itself returns an iterator unless I explicitly force its resolution:

value := math.fibonacci()
| takeWhile x: x < 4000000
| filter x: x is math.even
| sum
| to Number # forcing the resolution to number

In the example above, the resolution occurs at the to operatorm which accepts a type as argument. The type provides a meta function that determines what action to take upon the iterator. The number, for example, “queries” the previous iterator for one single element, returning a number representation of this value.

It’s worth noting that although it triggers a single iteration from the preceding object, the sum function, once queried, exhausts the previous stream of data until it receives an END signal.

Similar to SHT, lets assume that in our new language, every type can generate a stream and the Stream object can represent any sequence of objects, finite or infinite:

-- pseudocode
5            -- 5
[1, 2, 3]    -- 1, 2, 3
{a:1, b:2}   -- 'a', 'b'
'abc'        -- 'a', 'b', 'c'
http.Serve() -- req, req, req, ...
logStream    -- line, line, line, ...

Pipes functions may be modelled receiving a Stream as first parameter:

-- pseudocode
fn map(stream Stream, f fn) {
  for x in stream {
    yield f(x)

In this case, the for loop also works upon the iterator and we yield the results, creating another Stream.

Type System

I’ve been wondering if I should use a statically typed system or a dynamic one. While I’ve would like to experiment with a flexible static type system, I don’t know if I’ll shoot in the foot trying to implement something beyond my current skill level. Moreover, I believe that, in order to use the maximum potential of streams, converting automatically any type to a Stream object seems to be the more natural solution. For example:

-- There strings are store completely in memory
a := 'this is a large\ntext'
a.Lines()                -- ['this is a large', 'text']
for char in a {}         -- char by char stream
for line in a.Lines() {} -- line by line stream

-- But these could be lazy loaded
for char in os.Open('file') {}
for req in http.Serve() {}

With that, we could say that this language is dynamic typed and weak typed.

Other features I believe would help, but I don’t see how they fit in the pipe discussion:

I may draft some ideas about them in later posts.


Parallel execution of pipe steps appears to be a natural evolution of the feature. I could envision three different modes for utilizing pipes:

The forking strategy is pretty straightforward, upon query, we can execute the pipe function in parallel and then disregard it. The branching strategy, however, is trickier because the function below will query the branching function a single time, thus, any parallel processing would run one at-a-time.

Let visualize it:

-- This would execute pings ignoring the responses
| fork ip: http.Ping()             -- called and ignored
| each ip: print('Pinging %s', ip)

-- This would execute multiple pings, but the each function would
-- be called after each execution finishes
| branch ip: { http.Ping(), ip }
| each ip: print('Pinged %s, ip)

As this, branches should be smart enough to exhaustively query the incoming stream, but what if the pipe is interrupted after a single return?

| branch ip: { http.Ping(), ip }
| first ip: print('first ip to respond was %s', ip)

I guess it would be acceptable to ‘ignore’ the further responses from branch function or even interrupting its execution.

Another detail, if the branch function will exhaust the queries to the incoming stream, we may have to limit the number of parallel branch we can have.

Now what if we want to execute two different operations, both in parallel?

-- http.Ping and log.Write will execute in parallel, independently.
| fork ip: http.Ping()
| fork ip: log.Write()
| each ip: print(ip)

-- multiple branches should aggregate the information, which
-- seems to be quite complex to implement.
| branch ip: http.Ping()
| branch ip: log.Write()
| each (pingResult, logResult): print('do something')

These are still open questions that I’m not fully comfortable. Another decision I am uncertain is adding a different pipe symbol for fork and branch.